1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
|
// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/synchronization/waitable_event_watcher.h"
#include "base/message_loop.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
namespace base {
// -----------------------------------------------------------------------------
// WaitableEventWatcher (async waits).
//
// The basic design is that we add an AsyncWaiter to the wait-list of the event.
// That AsyncWaiter has a pointer to MessageLoop, and a Task to be posted to it.
// The MessageLoop ends up running the task, which calls the delegate.
//
// Since the wait can be canceled, we have a thread-safe Flag object which is
// set when the wait has been canceled. At each stage in the above, we check the
// flag before going onto the next stage. Since the wait may only be canceled in
// the MessageLoop which runs the Task, we are assured that the delegate cannot
// be called after canceling...
// -----------------------------------------------------------------------------
// A thread-safe, reference-counted, write-once flag.
// -----------------------------------------------------------------------------
class Flag : public RefCountedThreadSafe<Flag> {
public:
Flag() { flag_ = false; }
void Set() {
AutoLock locked(lock_);
flag_ = true;
}
bool value() const {
AutoLock locked(lock_);
return flag_;
}
private:
mutable Lock lock_;
bool flag_;
};
// -----------------------------------------------------------------------------
// This is an asynchronous waiter which posts a task to a MessageLoop when
// fired. An AsyncWaiter may only be in a single wait-list.
// -----------------------------------------------------------------------------
class AsyncWaiter : public WaitableEvent::Waiter {
public:
AsyncWaiter(MessageLoop* message_loop, Task* task, Flag* flag)
: message_loop_(message_loop),
cb_task_(task),
flag_(flag) { }
bool Fire(WaitableEvent* event) {
if (flag_->value()) {
// If the callback has been canceled, we don't enqueue the task, we just
// delete it instead.
delete cb_task_;
} else {
message_loop_->PostTask(FROM_HERE, cb_task_);
}
// We are removed from the wait-list by the WaitableEvent itself. It only
// remains to delete ourselves.
delete this;
// We can always return true because an AsyncWaiter is never in two
// different wait-lists at the same time.
return true;
}
// See StopWatching for discussion
bool Compare(void* tag) {
return tag == flag_.get();
}
private:
MessageLoop *const message_loop_;
Task *const cb_task_;
scoped_refptr<Flag> flag_;
};
// -----------------------------------------------------------------------------
// For async waits we need to make a callback in a MessageLoop thread. We do
// this by posting this task, which calls the delegate and keeps track of when
// the event is canceled.
// -----------------------------------------------------------------------------
class AsyncCallbackTask : public Task {
public:
AsyncCallbackTask(Flag* flag, WaitableEventWatcher::Delegate* delegate,
WaitableEvent* event)
: flag_(flag),
delegate_(delegate),
event_(event) {
}
void Run() {
// Runs in MessageLoop thread.
if (!flag_->value()) {
// This is to let the WaitableEventWatcher know that the event has occured
// because it needs to be able to return NULL from GetWatchedObject
flag_->Set();
delegate_->OnWaitableEventSignaled(event_);
}
// We are deleted by the MessageLoop
}
private:
scoped_refptr<Flag> flag_;
WaitableEventWatcher::Delegate *const delegate_;
WaitableEvent *const event_;
};
WaitableEventWatcher::WaitableEventWatcher()
: message_loop_(NULL),
cancel_flag_(NULL),
waiter_(NULL),
callback_task_(NULL),
event_(NULL),
delegate_(NULL) {
}
WaitableEventWatcher::~WaitableEventWatcher() {
StopWatching();
}
// -----------------------------------------------------------------------------
// The Handle is how the user cancels a wait. After deleting the Handle we
// insure that the delegate cannot be called.
// -----------------------------------------------------------------------------
bool WaitableEventWatcher::StartWatching
(WaitableEvent* event, WaitableEventWatcher::Delegate* delegate) {
MessageLoop *const current_ml = MessageLoop::current();
DCHECK(current_ml) << "Cannot create WaitableEventWatcher without a "
"current MessageLoop";
// A user may call StartWatching from within the callback function. In this
// case, we won't know that we have finished watching, expect that the Flag
// will have been set in AsyncCallbackTask::Run()
if (cancel_flag_.get() && cancel_flag_->value()) {
if (message_loop_) {
message_loop_->RemoveDestructionObserver(this);
message_loop_ = NULL;
}
cancel_flag_ = NULL;
}
DCHECK(!cancel_flag_.get()) << "StartWatching called while still watching";
cancel_flag_ = new Flag;
callback_task_ = new AsyncCallbackTask(cancel_flag_, delegate, event);
WaitableEvent::WaitableEventKernel* kernel = event->kernel_.get();
AutoLock locked(kernel->lock_);
delegate_ = delegate;
event_ = event;
if (kernel->signaled_) {
if (!kernel->manual_reset_)
kernel->signaled_ = false;
// No hairpinning - we can't call the delegate directly here. We have to
// enqueue a task on the MessageLoop as normal.
current_ml->PostTask(FROM_HERE, callback_task_);
return true;
}
message_loop_ = current_ml;
current_ml->AddDestructionObserver(this);
kernel_ = kernel;
waiter_ = new AsyncWaiter(current_ml, callback_task_, cancel_flag_);
event->Enqueue(waiter_);
return true;
}
void WaitableEventWatcher::StopWatching() {
delegate_ = NULL;
if (message_loop_) {
message_loop_->RemoveDestructionObserver(this);
message_loop_ = NULL;
}
if (!cancel_flag_.get()) // if not currently watching...
return;
if (cancel_flag_->value()) {
// In this case, the event has fired, but we haven't figured that out yet.
// The WaitableEvent may have been deleted too.
cancel_flag_ = NULL;
return;
}
if (!kernel_.get()) {
// We have no kernel. This means that we never enqueued a Waiter on an
// event because the event was already signaled when StartWatching was
// called.
//
// In this case, a task was enqueued on the MessageLoop and will run.
// We set the flag in case the task hasn't yet run. The flag will stop the
// delegate getting called. If the task has run then we have the last
// reference to the flag and it will be deleted immedately after.
cancel_flag_->Set();
cancel_flag_ = NULL;
return;
}
AutoLock locked(kernel_->lock_);
// We have a lock on the kernel. No one else can signal the event while we
// have it.
// We have a possible ABA issue here. If Dequeue was to compare only the
// pointer values then it's possible that the AsyncWaiter could have been
// fired, freed and the memory reused for a different Waiter which was
// enqueued in the same wait-list. We would think that that waiter was our
// AsyncWaiter and remove it.
//
// To stop this, Dequeue also takes a tag argument which is passed to the
// virtual Compare function before the two are considered a match. So we need
// a tag which is good for the lifetime of this handle: the Flag. Since we
// have a reference to the Flag, its memory cannot be reused while this object
// still exists. So if we find a waiter with the correct pointer value, and
// which shares a Flag pointer, we have a real match.
if (kernel_->Dequeue(waiter_, cancel_flag_.get())) {
// Case 2: the waiter hasn't been signaled yet; it was still on the wait
// list. We've removed it, thus we can delete it and the task (which cannot
// have been enqueued with the MessageLoop because the waiter was never
// signaled)
delete waiter_;
delete callback_task_;
cancel_flag_ = NULL;
return;
}
// Case 3: the waiter isn't on the wait-list, thus it was signaled. It may
// not have run yet, so we set the flag to tell it not to bother enqueuing the
// task on the MessageLoop, but to delete it instead. The Waiter deletes
// itself once run.
cancel_flag_->Set();
cancel_flag_ = NULL;
// If the waiter has already run then the task has been enqueued. If the Task
// hasn't yet run, the flag will stop the delegate from getting called. (This
// is thread safe because one may only delete a Handle from the MessageLoop
// thread.)
//
// If the delegate has already been called then we have nothing to do. The
// task has been deleted by the MessageLoop.
}
WaitableEvent* WaitableEventWatcher::GetWatchedEvent() {
if (!cancel_flag_.get())
return NULL;
if (cancel_flag_->value())
return NULL;
return event_;
}
// -----------------------------------------------------------------------------
// This is called when the MessageLoop which the callback will be run it is
// deleted. We need to cancel the callback as if we had been deleted, but we
// will still be deleted at some point in the future.
// -----------------------------------------------------------------------------
void WaitableEventWatcher::WillDestroyCurrentMessageLoop() {
StopWatching();
}
} // namespace base
|