diff options
Diffstat (limited to 'base')
-rw-r--r-- | base/atomic_ref_count.h | 10 | ||||
-rw-r--r-- | base/base_lib.scons | 4 | ||||
-rw-r--r-- | base/base_unittests.scons | 1 | ||||
-rw-r--r-- | base/build/base.vcproj | 8 | ||||
-rw-r--r-- | base/waitable_event.h | 87 | ||||
-rw-r--r-- | base/waitable_event_generic.cc | 71 | ||||
-rw-r--r-- | base/waitable_event_posix.cc | 392 | ||||
-rw-r--r-- | base/waitable_event_unittest.cc | 55 | ||||
-rw-r--r-- | base/waitable_event_watcher.h | 148 | ||||
-rw-r--r-- | base/waitable_event_watcher_posix.cc | 253 | ||||
-rw-r--r-- | base/waitable_event_watcher_unittest.cc | 136 | ||||
-rw-r--r-- | base/waitable_event_watcher_win.cc | 60 | ||||
-rw-r--r-- | base/waitable_event_win.cc | 40 |
13 files changed, 1173 insertions, 92 deletions
diff --git a/base/atomic_ref_count.h b/base/atomic_ref_count.h index af83572..66fe07b 100644 --- a/base/atomic_ref_count.h +++ b/base/atomic_ref_count.h @@ -13,12 +13,12 @@ namespace base { -typedef base::subtle::Atomic32 AtomicRefCount; +typedef subtle::Atomic32 AtomicRefCount; // Increment a reference count by "increment", which must exceed 0. inline void AtomicRefCountIncN(volatile AtomicRefCount *ptr, AtomicRefCount increment) { - base::subtle::NoBarrier_AtomicIncrement(ptr, increment); + subtle::NoBarrier_AtomicIncrement(ptr, increment); } // Decrement a reference count by "decrement", which must exceed 0, @@ -27,7 +27,7 @@ inline void AtomicRefCountIncN(volatile AtomicRefCount *ptr, // became zero will be visible to a thread that has just made the count zero. inline bool AtomicRefCountDecN(volatile AtomicRefCount *ptr, AtomicRefCount decrement) { - return base::subtle::Barrier_AtomicIncrement(ptr, -decrement) != 0; + return subtle::Barrier_AtomicIncrement(ptr, -decrement) != 0; } // Increment a reference count by 1. @@ -49,14 +49,14 @@ inline bool AtomicRefCountDec(volatile AtomicRefCount *ptr) { // needed for the owning thread to act on the object, knowing that it has // exclusive access to the object. inline bool AtomicRefCountIsOne(volatile AtomicRefCount *ptr) { - return base::subtle::Acquire_Load(ptr) == 1; + return subtle::Acquire_Load(ptr) == 1; } // Return whether the reference count is zero. With conventional object // referencing counting, the object will be destroyed, so the reference count // should never be zero. Hence this is generally used for a debug check. inline bool AtomicRefCountIsZero(volatile AtomicRefCount *ptr) { - return base::subtle::Acquire_Load(ptr) == 0; + return subtle::Acquire_Load(ptr) == 0; } } // namespace base diff --git a/base/base_lib.scons b/base/base_lib.scons index ee52dbf..573eea3 100644 --- a/base/base_lib.scons +++ b/base/base_lib.scons @@ -320,6 +320,7 @@ if not env.Bit('windows'): 'third_party/purify/pure_api.c', 'time_win.cc', 'waitable_event_win.cc', + 'waitable_event_watcher_win.cc', 'win_util.cc', 'wmi_util.cc', 'worker_pool.cc', @@ -343,7 +344,8 @@ if env.Bit('posix'): 'thread_local_storage_posix.cc', 'thread_local_posix.cc', 'time_posix.cc', - 'waitable_event_generic.cc', + 'waitable_event_posix.cc', + 'waitable_event_watcher_posix.cc', ]) if env.Bit('mac'): diff --git a/base/base_unittests.scons b/base/base_unittests.scons index 69172e9..a7ea02f 100644 --- a/base/base_unittests.scons +++ b/base/base_unittests.scons @@ -112,6 +112,7 @@ input_files = ChromeFileList([ 'tuple_unittest.cc', 'values_unittest.cc', 'waitable_event_unittest.cc', + 'waitable_event_watcher_unittest.cc', 'watchdog_unittest.cc', 'win_util_unittest.cc', 'wmi_util_unittest.cc', diff --git a/base/build/base.vcproj b/base/build/base.vcproj index c5de938..2bfc20a 100644 --- a/base/build/base.vcproj +++ b/base/build/base.vcproj @@ -954,6 +954,14 @@ > </File> <File + RelativePath="..\waitable_event_watcher.h" + > + </File> + <File + RelativePath="..\waitable_event_watcher_win.cc" + > + </File> + <File RelativePath="..\waitable_event_win.cc" > </File> diff --git a/base/waitable_event.h b/base/waitable_event.h index b723653..84feedc 100644 --- a/base/waitable_event.h +++ b/base/waitable_event.h @@ -8,18 +8,27 @@ #include "base/basictypes.h" #if defined(OS_WIN) -typedef void* HANDLE; -#else +#include <windows.h> +#endif + +#if defined(OS_POSIX) +#include <list> +#include <utility> #include "base/condition_variable.h" #include "base/lock.h" +#include "base/ref_counted.h" #endif +#include "base/message_loop.h" + namespace base { class TimeDelta; // A WaitableEvent can be a useful thread synchronization tool when you want to -// allow one thread to wait for another thread to finish some work. +// allow one thread to wait for another thread to finish some work. For +// non-Windows systems, this can only be used from within a single address +// space. // // Use a WaitableEvent when you would otherwise use a Lock+ConditionVariable to // protect a simple boolean value. However, if you find yourself using a @@ -31,7 +40,6 @@ class TimeDelta; // by a Windows event object. This is intentional. If you are writing Windows // specific code and you need other features of a Windows event, then you might // be better off just using an Windows event directly. -// class WaitableEvent { public: // If manual_reset is true, then to set the event state to non-signaled, a @@ -40,6 +48,13 @@ class WaitableEvent { // waiting thread has been released. WaitableEvent(bool manual_reset, bool initially_signaled); +#if defined(OS_WIN) + // Create a WaitableEvent from an Event HANDLE which has already been + // created. This objects takes ownership of the HANDLE and will close it when + // deleted. + explicit WaitableEvent(HANDLE event_handle); +#endif + // WARNING: Destroying a WaitableEvent while threads are waiting on it is not // supported. Doing so will cause crashes or other instability. ~WaitableEvent(); @@ -64,14 +79,70 @@ class WaitableEvent { // does not necessarily mean that max_time was exceeded. bool TimedWait(const TimeDelta& max_time); +#if defined(OS_WIN) + HANDLE handle() const { return handle_; } +#endif + + // Wait, synchronously, on multiple events. + // waitables: an array of WaitableEvent pointers + // count: the number of elements in @waitables + // + // returns: the index of a WaitableEvent which has been signaled. + static size_t WaitMany(WaitableEvent** waitables, size_t count); + + // For asynchronous waiting, see WaitableEventWatcher + + // This is a private helper class. It's here because it's used by friends of + // this class (such as WaitableEventWatcher) to be able to enqueue elements + // of the wait-list + class Waiter { + public: + // Signal the waiter to wake up. + // + // Consider the case of a Waiter which is in multiple WaitableEvent's + // wait-lists. Each WaitableEvent is automatic-reset and two of them are + // signaled at the same time. Now, each will wake only the first waiter in + // the wake-list before resetting. However, if those two waiters happen to + // be the same object (as can happen if another thread didn't have a chance + // to dequeue the waiter from the other wait-list in time), two auto-resets + // will have happened, but only one waiter has been signaled! + // + // Because of this, a Waiter may "reject" a wake by returning false. In + // this case, the auto-reset WaitableEvent shouldn't act as if anything has + // been notified. + virtual bool Fire(WaitableEvent* signaling_event) = 0; + + // Waiters may implement this in order to provide an extra condition for + // two Waiters to be considered equal. In WaitableEvent::Dequeue, if the + // pointers match then this function is called as a final check. See the + // comments in ~Handle for why. + virtual bool Compare(void* tag) = 0; + }; + private: + friend class WaitableEventWatcher; + #if defined(OS_WIN) - HANDLE event_; + HANDLE handle_; #else - Lock lock_; // Needs to be listed first so it will be constructed first. - ConditionVariable cvar_; + bool SignalAll(); + bool SignalOne(); + void Enqueue(Waiter* waiter); + bool Dequeue(Waiter* waiter, void* tag); + + // When dealing with arrays of WaitableEvent*, we want to sort by the address + // of the WaitableEvent in order to have a globally consistent locking order. + // In that case we keep them, in sorted order, in an array of pairs where the + // second element is the index of the WaitableEvent in the original, + // unsorted, array. + typedef std::pair<WaitableEvent*, size_t> WaiterAndIndex; + static unsigned EnqueueMany(WaiterAndIndex* waitables, + size_t count, Waiter* waiter); + + Lock lock_; bool signaled_; - bool manual_reset_; + const bool manual_reset_; + std::list<Waiter*> waiters_; #endif DISALLOW_COPY_AND_ASSIGN(WaitableEvent); diff --git a/base/waitable_event_generic.cc b/base/waitable_event_generic.cc deleted file mode 100644 index 61eeeb8..0000000 --- a/base/waitable_event_generic.cc +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "base/waitable_event.h" - -namespace base { - -WaitableEvent::WaitableEvent(bool manual_reset, bool signaled) - : lock_(), - cvar_(&lock_), - signaled_(signaled), - manual_reset_(manual_reset) { -} - -WaitableEvent::~WaitableEvent() { - // Members are destroyed in the reverse of their initialization order, so we - // should not have to worry about lock_ being destroyed before cvar_. -} - -void WaitableEvent::Reset() { - AutoLock locked(lock_); - signaled_ = false; -} - -void WaitableEvent::Signal() { - AutoLock locked(lock_); - if (!signaled_) { - signaled_ = true; - if (manual_reset_) { - cvar_.Broadcast(); - } else { - cvar_.Signal(); - } - } -} - -bool WaitableEvent::IsSignaled() { - return TimedWait(TimeDelta::FromMilliseconds(0)); -} - -bool WaitableEvent::Wait() { - AutoLock locked(lock_); - while (!signaled_) - cvar_.Wait(); - if (!manual_reset_) - signaled_ = false; - return true; -} - -bool WaitableEvent::TimedWait(const TimeDelta& max_time) { - AutoLock locked(lock_); - // In case of spurious wake-ups, we need to adjust the amount of time that we - // spend sleeping. - TimeDelta total_time; - for (;;) { - TimeTicks start = TimeTicks::Now(); - cvar_.TimedWait(max_time - total_time); - if (signaled_) - break; - total_time += TimeTicks::Now() - start; - if (total_time >= max_time) - break; - } - bool result = signaled_; - if (!manual_reset_) - signaled_ = false; - return result; -} - -} // namespace base diff --git a/base/waitable_event_posix.cc b/base/waitable_event_posix.cc new file mode 100644 index 0000000..be5af6e --- /dev/null +++ b/base/waitable_event_posix.cc @@ -0,0 +1,392 @@ +// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/waitable_event.h" + +#include "base/condition_variable.h" +#include "base/lock.h" +#include "base/message_loop.h" + +// ----------------------------------------------------------------------------- +// A WaitableEvent on POSIX is implemented as a wait-list. Currently we don't +// support cross-process events (where one process can signal an event which +// others are waiting on). Because of this, we can avoid having one thread per +// listener in several cases. +// +// The WaitableEvent maintains a list of waiters, protected by a lock. Each +// waiter is either an async wait, in which case we have a Task and the +// MessageLoop to run it on, or a blocking wait, in which case we have the +// condition variable to signal. +// +// Waiting involves grabbing the lock and adding oneself to the wait list. Async +// waits can be canceled, which means grabbing the lock and removing oneself +// from the list. +// +// Waiting on multiple events is handled by adding a single, synchronous wait to +// the wait-list of many events. An event passes a pointer to itself when +// firing a waiter and so we can store that pointer to find out which event +// triggered. +// ----------------------------------------------------------------------------- + +namespace base { + +// ----------------------------------------------------------------------------- +// This is just an abstract base class for waking the two types of waiters +// ----------------------------------------------------------------------------- +WaitableEvent::WaitableEvent(bool manual_reset, bool initially_signaled) + : signaled_(false), + manual_reset_(manual_reset) { + DCHECK(!initially_signaled) << "Not implemented"; +} + +WaitableEvent::~WaitableEvent() { + DCHECK(waiters_.empty()) << "Deleting WaitableEvent with listeners!"; +} + +void WaitableEvent::Reset() { + AutoLock locked(lock_); + signaled_ = false; +} + +void WaitableEvent::Signal() { + AutoLock locked(lock_); + + if (signaled_) + return; + + if (manual_reset_) { + SignalAll(); + signaled_ = true; + } else { + // In the case of auto reset, if no waiters were woken, we remain + // signaled. + if (!SignalOne()) + signaled_ = true; + } +} + +bool WaitableEvent::IsSignaled() { + AutoLock locked(lock_); + + const bool result = signaled_; + if (result && !manual_reset_) + signaled_ = false; + return result; +} + +// ----------------------------------------------------------------------------- +// Synchronous waits + +// ----------------------------------------------------------------------------- +// This is an synchronous waiter. The thread is waiting on the given condition +// variable and the fired flag in this object. +// ----------------------------------------------------------------------------- +class SyncWaiter : public WaitableEvent::Waiter { + public: + SyncWaiter(ConditionVariable* cv, Lock* lock) + : fired_(false), + cv_(cv), + lock_(lock), + signaling_event_(NULL) { } + + bool Fire(WaitableEvent *signaling_event) { + lock_->Acquire(); + const bool previous_value = fired_; + fired_ = true; + if (!previous_value) + signaling_event_ = signaling_event; + lock_->Release(); + + if (previous_value) + return false; + + cv_->Broadcast(); + + // SyncWaiters are stack allocated on the stack of the blocking thread. + return true; + } + + WaitableEvent* signaled_event() const { + return signaling_event_; + } + + // --------------------------------------------------------------------------- + // These waiters are always stack allocated and don't delete themselves. Thus + // there's no problem and the ABA tag is the same as the object pointer. + // --------------------------------------------------------------------------- + bool Compare(void* tag) { + return this == tag; + } + + // --------------------------------------------------------------------------- + // Called with lock held. + // --------------------------------------------------------------------------- + bool fired() const { + return fired_; + } + + // --------------------------------------------------------------------------- + // During a TimedWait, we need a way to make sure that an auto-reset + // WaitableEvent doesn't think that this event has been signaled between + // unlocking it and removing it from the wait-list. Called with lock held. + // --------------------------------------------------------------------------- + void Disable() { + fired_ = true; + } + + private: + bool fired_; + ConditionVariable *const cv_; + Lock *const lock_; + WaitableEvent* signaling_event_; // The WaitableEvent which woke us +}; + +bool WaitableEvent::TimedWait(const TimeDelta& max_time) { + const Time end_time(Time::Now() + max_time); + + lock_.Acquire(); + if (signaled_) { + if (!manual_reset_) { + // In this case we were signaled when we had no waiters. Now that + // someone has waited upon us, we can automatically reset. + signaled_ = false; + } + + lock_.Release(); + return true; + } + + Lock lock; + lock.Acquire(); + ConditionVariable cv(&lock); + SyncWaiter sw(&cv, &lock); + + Enqueue(&sw); + lock_.Release(); + // We are violating locking order here by holding the SyncWaiter lock but not + // the WaitableEvent lock. However, this is safe because we don't lock @lock_ + // again before unlocking it. + + for (;;) { + if (sw.fired()) { + lock.Release(); + return true; + } + + if (max_time.ToInternalValue() < 0) { + cv.Wait(); + } else { + const Time current_time(Time::Now()); + if (current_time >= end_time) { + // We can't acquire @lock_ before releasing @lock (because of locking + // order), however, inbetween the two a signal could be fired and @sw + // would accept it, however we will still return false, so the signal + // would be lost on an auto-reset WaitableEvent. Thus we call Disable + // which makes sw::Fire return false. + sw.Disable(); + lock.Release(); + + lock_.Acquire(); + Dequeue(&sw, &sw); + lock_.Release(); + return false; + } + const TimeDelta max_wait(end_time - current_time); + + cv.TimedWait(max_wait); + } + } +} + +bool WaitableEvent::Wait() { + return TimedWait(TimeDelta::FromSeconds(-1)); +} + +// ----------------------------------------------------------------------------- + + +// ----------------------------------------------------------------------------- +// Synchronous waiting on multiple objects. + +static bool // StrictWeakOrdering +cmp_fst_addr(const std::pair<WaitableEvent*, unsigned> &a, + const std::pair<WaitableEvent*, unsigned> &b) { + return a.first < b.first; +} + +// static +size_t WaitableEvent::WaitMany(WaitableEvent** raw_waitables, + size_t count) { + DCHECK(count) << "Cannot wait on no events"; + + // We need to acquire the locks in a globally consistent order. Thus we sort + // the array of waitables by address. We actually sort a pairs so that we can + // map back to the original index values later. + std::vector<std::pair<WaitableEvent*, size_t> > waitables; + waitables.reserve(count); + for (size_t i = 0; i < count; ++i) + waitables.push_back(std::make_pair(raw_waitables[i], i)); + + DCHECK_EQ(count, waitables.size()); + + sort(waitables.begin(), waitables.end(), cmp_fst_addr); + + // The set of waitables must be distinct. Since we have just sorted by + // address, we can check this cheaply by comparing pairs of consecutive + // elements. + for (size_t i = 0; i < waitables.size() - 1; ++i) { + DCHECK(waitables[i].first != waitables[i+1].first); + } + + Lock lock; + ConditionVariable cv(&lock); + SyncWaiter sw(&cv, &lock); + + const size_t r = EnqueueMany(&waitables[0], count, &sw); + if (r) { + // One of the events is already signaled. The SyncWaiter has not been + // enqueued anywhere. EnqueueMany returns the count of remaining waitables + // when the signaled one was seen, so the index of the signaled event is + // @count - @r. + return waitables[count - r].second; + } + + // At this point, we hold the locks on all the WaitableEvents and we have + // enqueued our waiter in them all. + lock.Acquire(); + // Release the WaitableEvent locks in the reverse order + for (size_t i = 0; i < count; ++i) { + waitables[count - (1 + i)].first->lock_.Release(); + } + + for (;;) { + if (sw.fired()) + break; + + cv.Wait(); + } + lock.Release(); + + // The address of the WaitableEvent which fired is stored in the SyncWaiter. + WaitableEvent *const signaled_event = sw.signaled_event(); + // This will store the index of the raw_waitables which fired. + size_t signaled_index; + + // Take the locks of each WaitableEvent in turn (except the signaled one) and + // remove our SyncWaiter from the wait-list + for (size_t i = 0; i < count; ++i) { + if (raw_waitables[i] != signaled_event) { + raw_waitables[i]->lock_.Acquire(); + // There's no possible ABA issue with the address of the SyncWaiter here + // because it lives on the stack. Thus the tag value is just the pointer + // value again. + raw_waitables[i]->Dequeue(&sw, &sw); + raw_waitables[i]->lock_.Release(); + } else { + signaled_index = i; + } + } + + return signaled_index; +} + +// ----------------------------------------------------------------------------- +// If return value == 0: +// The locks of the WaitableEvents have been taken in order and the Waiter has +// been enqueued in the wait-list of each. None of the WaitableEvents are +// currently signaled +// else: +// None of the WaitableEvent locks are held. The Waiter has not been enqueued +// in any of them and the return value is the index of the first WaitableEvent +// which was signaled, from the end of the array. +// ----------------------------------------------------------------------------- +// static +unsigned WaitableEvent::EnqueueMany + (std::pair<WaitableEvent*, unsigned>* waitables, + unsigned count, Waiter* waiter) { + if (!count) + return 0; + + waitables[0].first->lock_.Acquire(); + if (waitables[0].first->signaled_) { + if (!waitables[0].first->manual_reset_) + waitables[0].first->signaled_ = false; + waitables[0].first->lock_.Release(); + return count; + } + + const unsigned r = EnqueueMany(waitables + 1, count - 1, waiter); + if (r) { + waitables[0].first->lock_.Release(); + } else { + waitables[0].first->Enqueue(waiter); + } + + return r; +} + +// ----------------------------------------------------------------------------- + + +// ----------------------------------------------------------------------------- +// Private functions... + +// ----------------------------------------------------------------------------- +// Wake all waiting waiters. Called with lock held. +// ----------------------------------------------------------------------------- +bool WaitableEvent::SignalAll() { + bool signaled_at_least_one = false; + + for (std::list<Waiter*>::iterator + i = waiters_.begin(); i != waiters_.end(); ++i) { + if ((*i)->Fire(this)) + signaled_at_least_one = true; + } + + waiters_.clear(); + return signaled_at_least_one; +} + +// --------------------------------------------------------------------------- +// Try to wake a single waiter. Return true if one was woken. Called with lock +// held. +// --------------------------------------------------------------------------- +bool WaitableEvent::SignalOne() { + for (;;) { + if (waiters_.empty()) + return false; + + const bool r = (*waiters_.begin())->Fire(this); + waiters_.pop_front(); + if (r) + return true; + } +} + +// ----------------------------------------------------------------------------- +// Add a waiter to the list of those waiting. Called with lock held. +// ----------------------------------------------------------------------------- +void WaitableEvent::Enqueue(Waiter* waiter) { + waiters_.push_back(waiter); +} + +// ----------------------------------------------------------------------------- +// Remove a waiter from the list of those waiting. Return true if the waiter was +// actually removed. Called with lock held. +// ----------------------------------------------------------------------------- +bool WaitableEvent::Dequeue(Waiter* waiter, void* tag) { + for (std::list<Waiter*>::iterator + i = waiters_.begin(); i != waiters_.end(); ++i) { + if (*i == waiter && (*i)->Compare(tag)) { + waiters_.erase(i); + return true; + } + } + + return false; +} + +// ----------------------------------------------------------------------------- + +} // namespace base diff --git a/base/waitable_event_unittest.cc b/base/waitable_event_unittest.cc index e7bab49..b2590a8 100644 --- a/base/waitable_event_unittest.cc +++ b/base/waitable_event_unittest.cc @@ -4,6 +4,7 @@ #include "base/time.h" #include "base/waitable_event.h" +#include "base/platform_thread.h" #include "testing/gtest/include/gtest/gtest.h" using base::TimeDelta; @@ -52,3 +53,57 @@ TEST(WaitableEventTest, AutoBasics) { EXPECT_TRUE(event.TimedWait(TimeDelta::FromMilliseconds(10))); } +TEST(WaitableEventTest, WaitManyShortcut) { + WaitableEvent* ev[5]; + for (unsigned i = 0; i < 5; ++i) + ev[i] = new WaitableEvent(false, false); + + ev[3]->Signal(); + EXPECT_EQ(WaitableEvent::WaitMany(ev, 5), 3u); + + ev[3]->Signal(); + EXPECT_EQ(WaitableEvent::WaitMany(ev, 5), 3u); + + ev[4]->Signal(); + EXPECT_EQ(WaitableEvent::WaitMany(ev, 5), 4u); + + ev[0]->Signal(); + EXPECT_EQ(WaitableEvent::WaitMany(ev, 5), 0u); + + for (unsigned i = 0; i < 5; ++i) + delete ev[i]; +} + +class WaitableEventSignaler : public PlatformThread::Delegate { + public: + WaitableEventSignaler(double seconds, WaitableEvent* ev) + : seconds_(seconds), + ev_(ev) { + } + + void ThreadMain() { + PlatformThread::Sleep(static_cast<int>(seconds_ * 1000)); + ev_->Signal(); + } + + private: + const double seconds_; + WaitableEvent *const ev_; +}; + +TEST(WaitableEventTest, WaitMany) { + WaitableEvent* ev[5]; + for (unsigned i = 0; i < 5; ++i) + ev[i] = new WaitableEvent(false, false); + + WaitableEventSignaler signaler(0.1, ev[2]); + PlatformThreadHandle thread; + PlatformThread::Create(0, &signaler, &thread); + + EXPECT_EQ(WaitableEvent::WaitMany(ev, 5), 2u); + + PlatformThread::Join(thread); + + for (unsigned i = 0; i < 5; ++i) + delete ev[i]; +} diff --git a/base/waitable_event_watcher.h b/base/waitable_event_watcher.h new file mode 100644 index 0000000..3e17a10 --- /dev/null +++ b/base/waitable_event_watcher.h @@ -0,0 +1,148 @@ +// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_WAITABLE_EVENT_WATCHER_H_ +#define BASE_WAITABLE_EVENT_WATCHER_H_ + +#include "build/build_config.h" + +#if defined(OS_WIN) +#include "base/object_watcher.h" +#else +#include "base/message_loop.h" +#endif + +namespace base { + +class WaitableEvent; +class Flag; +class AsyncWaiter; +class AsyncCallbackTask; + +// ----------------------------------------------------------------------------- +// This class provides a way to wait on a WaitableEvent asynchronously. +// +// Each instance of this object can be waiting on a single WaitableEvent. When +// the waitable event is signaled, a callback is made in the thread of a given +// MessageLoop. This callback can be deleted by deleting the waiter. +// +// Typical usage: +// +// class MyClass : public base::WaitableEventWatcher::Delegate { +// public: +// void DoStuffWhenSignaled(WaitableEvent *waitable_event) { +// watcher_.StartWatching(waitable_event, this); +// } +// virtual void OnWaitableEventSignaled(WaitableEvent* waitable_event) { +// // OK, time to do stuff! +// } +// private: +// base::WaitableEventWatcher watcher_; +// }; +// +// In the above example, MyClass wants to "do stuff" when waitable_event +// becomes signaled. WaitableEventWatcher makes this task easy. When MyClass +// goes out of scope, the watcher_ will be destroyed, and there is no need to +// worry about OnWaitableEventSignaled being called on a deleted MyClass +// pointer. +// +// BEWARE: With automatically reset WaitableEvents, a signal may be lost if it +// occurs just before a WaitableEventWatcher is deleted. There is currently no +// safe way to stop watching an automatic reset WaitableEvent without possibly +// missing a signal. +// ----------------------------------------------------------------------------- + +class WaitableEventWatcher +#if defined(OS_POSIX) + : public MessageLoop::DestructionObserver +#endif +{ + public: + + WaitableEventWatcher(); + ~WaitableEventWatcher(); + + class Delegate { + public: + virtual ~Delegate() { } + + // ------------------------------------------------------------------------- + // This is called on the MessageLoop thread when WaitableEvent has been + // signaled. + // + // Note: the event may not be signaled by the time that this function is + // called. This indicates only that it has been signaled at some point in + // the past. + // ------------------------------------------------------------------------- + virtual void OnWaitableEventSignaled(WaitableEvent* waitable_event) = 0; + }; + + // --------------------------------------------------------------------------- + // When @event is signaled, the given delegate is called on the thread of the + // current message loop when StartWatching is called. The delegate is not + // deleted. + // --------------------------------------------------------------------------- + bool StartWatching(WaitableEvent* event, Delegate* delegate); + + // --------------------------------------------------------------------------- + // Cancel the current watch. Must be called from the same thread which + // started the watch. + // + // Does nothing if no event is being watched, nor if the watch has completed. + // The delegate will *not* be called for the current watch after this + // function returns. Since the delegate runs on the same thread as this + // function, it cannot be called during this function either. + // --------------------------------------------------------------------------- + void StopWatching(); + + // --------------------------------------------------------------------------- + // Return the currently watched event, or NULL if no object is currently being + // watched. + // --------------------------------------------------------------------------- + WaitableEvent* GetWatchedEvent(); + + private: + WaitableEvent* event_; + +#if defined(OS_WIN) + // --------------------------------------------------------------------------- + // The helper class exists because, if WaitableEventWatcher were to inherit + // from ObjectWatcher::Delegate, then it couldn't also have an inner class + // called Delegate (at least on Windows). Thus this object exists to proxy + // the callback function + // --------------------------------------------------------------------------- + class ObjectWatcherHelper : public ObjectWatcher::Delegate { + public: + ObjectWatcherHelper(WaitableEventWatcher* watcher); + + // ------------------------------------------------------------------------- + // Implementation of ObjectWatcher::Delegate + // ------------------------------------------------------------------------- + void OnObjectSignaled(HANDLE h); + + private: + WaitableEventWatcher *const watcher_; + }; + + void OnObjectSignaled(); + + Delegate* delegate_; + ObjectWatcherHelper helper_; + ObjectWatcher watcher_; +#else + // --------------------------------------------------------------------------- + // Implementation of MessageLoop::DestructionObserver + // --------------------------------------------------------------------------- + void WillDestroyCurrentMessageLoop(); + + MessageLoop* message_loop_; + scoped_refptr<Flag> cancel_flag_; + AsyncWaiter* waiter_; + AsyncCallbackTask* callback_task_; +#endif +}; + +} // namespace base + +#endif // BASE_WAITABLE_EVENT_WATCHER_H_ diff --git a/base/waitable_event_watcher_posix.cc b/base/waitable_event_watcher_posix.cc new file mode 100644 index 0000000..e4e1a7f --- /dev/null +++ b/base/waitable_event_watcher_posix.cc @@ -0,0 +1,253 @@ +// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/waitable_event_watcher.h" + +#include "base/condition_variable.h" +#include "base/lock.h" +#include "base/message_loop.h" +#include "base/waitable_event.h" + +namespace base { + +// ----------------------------------------------------------------------------- +// WaitableEventWatcher (async waits). +// +// The basic design is that we add an AsyncWaiter to the wait-list of the event. +// That AsyncWaiter has a pointer to MessageLoop, and a Task to be posted to it. +// The MessageLoop ends up running the task, which calls the delegate. +// +// Since the wait can be canceled, we have a thread-safe Flag object which is +// set when the wait has been canceled. At each stage in the above, we check the +// flag before going onto the next stage. Since the wait may only be canceled in +// the MessageLoop which runs the Task, we are assured that the delegate cannot +// be called after canceling... + +// ----------------------------------------------------------------------------- +// A thread-safe, reference-counted, write-once flag. +// ----------------------------------------------------------------------------- +class Flag : public RefCountedThreadSafe<Flag> { + public: + Flag() { flag_ = false; } + + void Set() { + AutoLock locked(lock_); + flag_ = true; + } + + bool value() const { + AutoLock locked(lock_); + return flag_; + } + + private: + mutable Lock lock_; + bool flag_; +}; + +// ----------------------------------------------------------------------------- +// This is an asynchronous waiter which posts a task to a MessageLoop when +// fired. An AsyncWaiter may only be in a single wait-list. +// ----------------------------------------------------------------------------- +class AsyncWaiter : public WaitableEvent::Waiter { + public: + AsyncWaiter(MessageLoop* message_loop, Task* task, Flag* flag) + : message_loop_(message_loop), + cb_task_(task), + flag_(flag) { } + + bool Fire(WaitableEvent* event) { + if (flag_->value()) { + // If the callback has been canceled, we don't enqueue the task, we just + // delete it instead. + delete cb_task_; + } else { + message_loop_->PostTask(FROM_HERE, cb_task_); + } + + // We are removed from the wait-list by the WaitableEvent itself. It only + // remains to delete ourselves. + delete this; + + // We can always return true because an AsyncWaiter is never in two + // different wait-lists at the same time. + return true; + } + + // See StopWatching for discussion + bool Compare(void* tag) { + return tag == flag_.get(); + } + + MessageLoop *const message_loop_; + Task *const cb_task_; + scoped_refptr<Flag> flag_; +}; + +// ----------------------------------------------------------------------------- +// For async waits we need to make a callback in a MessageLoop thread. We do +// this by posting this task, which calls the delegate and keeps track of when +// the event is canceled. +// ----------------------------------------------------------------------------- +class AsyncCallbackTask : public Task { + public: + AsyncCallbackTask(Flag* flag, WaitableEventWatcher::Delegate* delegate, + WaitableEvent* event) + : flag_(flag), + delegate_(delegate), + event_(event) { + } + + void Run() { + // Runs in MessageLoop thread. + if (!flag_->value()) + delegate_->OnWaitableEventSignaled(event_); + + // This is to let the WaitableEventWatcher know that the event has occured + // because it needs to be able to return NULL from GetWatchedEvent + flag_->Set(); + + // We are deleted by the MessageLoop + } + + private: + scoped_refptr<Flag> flag_; + WaitableEventWatcher::Delegate *const delegate_; + WaitableEvent *const event_; +}; + +WaitableEventWatcher::WaitableEventWatcher() + : event_(NULL), + message_loop_(NULL), + cancel_flag_(NULL), + callback_task_(NULL) { +} + +WaitableEventWatcher::~WaitableEventWatcher() { + StopWatching(); +} + +// ----------------------------------------------------------------------------- +// The Handle is how the user cancels a wait. After deleting the Handle we +// insure that the delegate cannot be called. +// ----------------------------------------------------------------------------- +bool WaitableEventWatcher::StartWatching + (WaitableEvent* event, WaitableEventWatcher::Delegate* delegate) { + MessageLoop *const current_ml = MessageLoop::current(); + DCHECK(current_ml) << "Cannot create WaitableEventWatcher without a " + "current MessageLoop"; + + DCHECK(!cancel_flag_.get()) << "StartWatching called while still watching"; + + cancel_flag_ = new Flag; + callback_task_ = new AsyncCallbackTask(cancel_flag_, delegate, event); + + AutoLock locked(event->lock_); + + if (event->signaled_) { + if (!event->manual_reset_) + event->signaled_ = false; + + // No hairpinning - we can't call the delegate directly here. We have to + // enqueue a task on the MessageLoop as normal. + current_ml->PostTask(FROM_HERE, callback_task_); + return true; + } + + message_loop_ = current_ml; + current_ml->AddDestructionObserver(this); + + event_ = event; + waiter_ = new AsyncWaiter(current_ml, callback_task_, cancel_flag_); + event->Enqueue(waiter_); + + return true; +} + +void WaitableEventWatcher::StopWatching() { + if (message_loop_) { + message_loop_->RemoveDestructionObserver(this); + message_loop_ = NULL; + } + + if (!cancel_flag_.get()) // if not currently watching... + return; + + if (!event_) { + // We have no WaitableEvent. This means that we never enqueued a Waiter on + // an event because the event was already signaled when StartWatching was + // called. + // + // In this case, a task was enqueued on the MessageLoop and will run. + // We set the flag in case the task hasn't yet run. The flag will stop the + // delegate getting called. If the task has run then we have the last + // reference to the flag and it will be deleted immedately after. + cancel_flag_->Set(); + cancel_flag_ = NULL; + return; + } + + AutoLock locked(event_->lock_); + // We have a lock on the WaitableEvent. No one else can signal the event while + // we have it. + + // We have a possible ABA issue here. If Dequeue was to compare only the + // pointer values then it's possible that the AsyncWaiter could have been + // fired, freed and the memory reused for a different Waiter which was + // enqueued in the same wait-list. We would think that that waiter was our + // AsyncWaiter and remove it. + // + // To stop this, Dequeue also takes a tag argument which is passed to the + // virtual Compare function before the two are considered a match. So we need + // a tag which is good for the lifetime of this handle: the Flag. Since we + // have a reference to the Flag, its memory cannot be reused while this object + // still exists. So if we find a waiter with the correct pointer value, and + // which shares a Flag pointer, we have a real match. + if (event_->Dequeue(waiter_, cancel_flag_.get())) { + // Case 2: the waiter hasn't been signaled yet; it was still on the wait + // list. We've removed it, thus we can delete it and the task (which cannot + // have been enqueued with the MessageLoop because the waiter was never + // signaled) + delete waiter_; + delete callback_task_; + cancel_flag_ = NULL; + return; + } + + // Case 3: the waiter isn't on the wait-list, thus it was signaled. It may + // not have run yet, so we set the flag to tell it not to bother enqueuing the + // task on the MessageLoop, but to delete it instead. The Waiter deletes + // itself once run. + cancel_flag_->Set(); + cancel_flag_ = NULL; + + // If the waiter has already run then the task has been enqueued. If the Task + // hasn't yet run, the flag will stop the delegate from getting called. (This + // is thread safe because one may only delete a Handle from the MessageLoop + // thread.) + // + // If the delegate has already been called then we have nothing to do. The + // task has been deleted by the MessageLoop. +} + +WaitableEvent* WaitableEventWatcher::GetWatchedEvent() { + if (!cancel_flag_.get()) + return NULL; + + if (cancel_flag_->value()) + return NULL; + + return event_; +} + +// ----------------------------------------------------------------------------- +// This is called when the MessageLoop which the callback will be run it is +// deleted. We need to cancel the callback as if we had been deleted, but we +// will still be deleted at some point in the future. +// ----------------------------------------------------------------------------- +void WaitableEventWatcher::WillDestroyCurrentMessageLoop() { + StopWatching(); +} + +} // namespace base diff --git a/base/waitable_event_watcher_unittest.cc b/base/waitable_event_watcher_unittest.cc new file mode 100644 index 0000000..c50807f --- /dev/null +++ b/base/waitable_event_watcher_unittest.cc @@ -0,0 +1,136 @@ +// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/message_loop.h" +#include "base/platform_thread.h" +#include "base/waitable_event.h" +#include "base/waitable_event_watcher.h" +#include "testing/gtest/include/gtest/gtest.h" + +using base::WaitableEvent; +using base::WaitableEventWatcher; + +namespace { + +class QuitDelegate : public WaitableEventWatcher::Delegate { + public: + virtual void OnWaitableEventSignaled(WaitableEvent* event) { + MessageLoop::current()->Quit(); + } +}; + +class DecrementCountDelegate : public WaitableEventWatcher::Delegate { + public: + DecrementCountDelegate(int* counter) : counter_(counter) { + } + virtual void OnWaitableEventSignaled(WaitableEvent* object) { + --(*counter_); + } + private: + int* counter_; +}; + +} // namespace + +void RunTest_BasicSignal(MessageLoop::Type message_loop_type) { + MessageLoop message_loop(message_loop_type); + + // A manual-reset event that is not yet signaled. + WaitableEvent event(true, false); + + WaitableEventWatcher watcher; + EXPECT_EQ(NULL, watcher.GetWatchedObject()); + + QuitDelegate delegate; + watcher.StartWatching(&event, &delegate); + EXPECT_EQ(&event, watcher.GetWatchedObject()); + + event.Signal(); + + MessageLoop::current()->Run(); + + EXPECT_EQ(NULL, watcher.GetWatchedObject()); +} + +void RunTest_BasicCancel(MessageLoop::Type message_loop_type) { + MessageLoop message_loop(message_loop_type); + + // A manual-reset event that is not yet signaled. + WaitableEvent event(true, false); + + WaitableEventWatcher watcher; + + QuitDelegate delegate; + watcher.StartWatching(&event, &delegate); + + watcher.StopWatching(); +} + +void RunTest_CancelAfterSet(MessageLoop::Type message_loop_type) { + MessageLoop message_loop(message_loop_type); + + // A manual-reset event that is not yet signaled. + WaitableEvent event(true, false); + + WaitableEventWatcher watcher; + + int counter = 1; + DecrementCountDelegate delegate(&counter); + + watcher.StartWatching(&event, &delegate); + + event.Signal(); + + // Let the background thread do its business + PlatformThread::Sleep(30); + + watcher.StopWatching(); + + MessageLoop::current()->RunAllPending(); + + // Our delegate should not have fired. + EXPECT_EQ(1, counter); +} + +void RunTest_OutlivesMessageLoop(MessageLoop::Type message_loop_type) { + // Simulate a MessageLoop that dies before an WaitableEventWatcher. This + // ordinarily doesn't happen when people use the Thread class, but it can + // happen when people use the Singleton pattern or atexit. + WaitableEvent event(true, false); + { + WaitableEventWatcher watcher; + { + MessageLoop message_loop(message_loop_type); + + QuitDelegate delegate; + watcher.StartWatching(&event, &delegate); + } + } +} + +//----------------------------------------------------------------------------- + +TEST(ObjectWatcherTest, BasicSignal) { + RunTest_BasicSignal(MessageLoop::TYPE_DEFAULT); + RunTest_BasicSignal(MessageLoop::TYPE_IO); + RunTest_BasicSignal(MessageLoop::TYPE_UI); +} + +TEST(ObjectWatcherTest, BasicCancel) { + RunTest_BasicCancel(MessageLoop::TYPE_DEFAULT); + RunTest_BasicCancel(MessageLoop::TYPE_IO); + RunTest_BasicCancel(MessageLoop::TYPE_UI); +} + +TEST(ObjectWatcherTest, CancelAfterSet) { + RunTest_CancelAfterSet(MessageLoop::TYPE_DEFAULT); + RunTest_CancelAfterSet(MessageLoop::TYPE_IO); + RunTest_CancelAfterSet(MessageLoop::TYPE_UI); +} + +TEST(ObjectWatcherTest, OutlivesMessageLoop) { + RunTest_OutlivesMessageLoop(MessageLoop::TYPE_DEFAULT); + RunTest_OutlivesMessageLoop(MessageLoop::TYPE_IO); + RunTest_OutlivesMessageLoop(MessageLoop::TYPE_UI); +} diff --git a/base/waitable_event_watcher_win.cc b/base/waitable_event_watcher_win.cc new file mode 100644 index 0000000..7619aa4 --- /dev/null +++ b/base/waitable_event_watcher_win.cc @@ -0,0 +1,60 @@ +// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/waitable_event_watcher.h" + +#include "base/compiler_specific.h" +#include "base/object_watcher.h" +#include "base/waitable_event.h" + +namespace base { + +WaitableEventWatcher::ObjectWatcherHelper::ObjectWatcherHelper( + WaitableEventWatcher* watcher) + : watcher_(watcher) { +}; + +void WaitableEventWatcher::ObjectWatcherHelper::OnObjectSignaled(HANDLE h) { + watcher_->OnObjectSignaled(); +} + + +WaitableEventWatcher::WaitableEventWatcher() + : event_(NULL), + ALLOW_THIS_IN_INITIALIZER_LIST(helper_(this)), + delegate_(NULL) { +} + +WaitableEventWatcher::~WaitableEventWatcher() { +} + +bool WaitableEventWatcher::StartWatching(WaitableEvent* event, + Delegate* delegate) { + delegate_ = delegate; + event_ = event; + + return watcher_.StartWatching(event->handle(), &helper_); +} + +void WaitableEventWatcher::StopWatching() { + delegate_ = NULL; + event_ = NULL; + watcher_.StopWatching(); +} + +WaitableEvent* WaitableEventWatcher::GetWatchedEvent() { + return event_; +} + +void WaitableEventWatcher::OnObjectSignaled() { + WaitableEvent* event = event_; + Delegate* delegate = delegate_; + event_ = NULL; + delegate_ = NULL; + DCHECK(event); + + delegate->OnWaitableEventSignaled(event); +} + +} // namespace base diff --git a/base/waitable_event_win.cc b/base/waitable_event_win.cc index 257f145..001a5df 100644 --- a/base/waitable_event_win.cc +++ b/base/waitable_event_win.cc @@ -13,22 +13,27 @@ namespace base { WaitableEvent::WaitableEvent(bool manual_reset, bool signaled) - : event_(CreateEvent(NULL, manual_reset, signaled, NULL)) { + : handle_(CreateEvent(NULL, manual_reset, signaled, NULL)) { // We're probably going to crash anyways if this is ever NULL, so we might as // well make our stack reports more informative by crashing here. - CHECK(event_); + CHECK(handle_); +} + +WaitableEvent::WaitableEvent(HANDLE handle) + : handle_(handle) { + CHECK(handle) << "Tried to create WaitableEvent from NULL handle"; } WaitableEvent::~WaitableEvent() { - CloseHandle(event_); + CloseHandle(handle_); } void WaitableEvent::Reset() { - ResetEvent(event_); + ResetEvent(handle_); } void WaitableEvent::Signal() { - SetEvent(event_); + SetEvent(handle_); } bool WaitableEvent::IsSignaled() { @@ -36,7 +41,7 @@ bool WaitableEvent::IsSignaled() { } bool WaitableEvent::Wait() { - DWORD result = WaitForSingleObject(event_, INFINITE); + DWORD result = WaitForSingleObject(handle_, INFINITE); // It is most unexpected that this should ever fail. Help consumers learn // about it if it should ever fail. DCHECK(result == WAIT_OBJECT_0) << "WaitForSingleObject failed"; @@ -49,7 +54,7 @@ bool WaitableEvent::TimedWait(const TimeDelta& max_time) { // is in milliseconds. If there are 5.5ms left, should the delay be 5 or 6? // It should be 6 to avoid returning too early. double timeout = ceil(max_time.InMillisecondsF()); - DWORD result = WaitForSingleObject(event_, static_cast<DWORD>(timeout)); + DWORD result = WaitForSingleObject(handle_, static_cast<DWORD>(timeout)); switch (result) { case WAIT_OBJECT_0: return true; @@ -62,4 +67,25 @@ bool WaitableEvent::TimedWait(const TimeDelta& max_time) { return false; } +// static +size_t WaitableEvent::WaitMany(WaitableEvent** events, size_t count) { + HANDLE handles[MAXIMUM_WAIT_OBJECTS]; + CHECK(count <= MAXIMUM_WAIT_OBJECTS) + << "Can only wait on " << MAXIMUM_WAIT_OBJECTS << " with WaitMany"; + + for (size_t i = 0; i < count; ++i) + handles[i] = events[i]->handle(); + + DWORD result = + WaitForMultipleObjects(count, handles, + FALSE, // don't wait for all the objects + INFINITE); // no timeout + if (result < WAIT_OBJECT_0 || result >= WAIT_OBJECT_0 + count) { + NOTREACHED() << "WaitForMultipleObjects failed: " << GetLastError(); + return 0; + } + + return result - WAIT_OBJECT_0; +} + } // namespace base |