diff options
author | mbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-10-22 23:09:21 +0000 |
---|---|---|
committer | mbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-10-22 23:09:21 +0000 |
commit | 503631cfa06aaae686f2e9a6f55ebc1859e8dab3 (patch) | |
tree | 8bc3a1d62749c65585db050b0926add085485436 /base | |
parent | 4d40fd494fc3142ded2eddc2a1becaa7a1584d06 (diff) | |
download | chromium_src-503631cfa06aaae686f2e9a6f55ebc1859e8dab3.zip chromium_src-503631cfa06aaae686f2e9a6f55ebc1859e8dab3.tar.gz chromium_src-503631cfa06aaae686f2e9a6f55ebc1859e8dab3.tar.bz2 |
Create a thread-safe observer list. Will be used
by SystemMonitor.
Right now the class requires that Observers be RefCounted<>. This is because we invoke tasks via NewRunnableMethod for them. However, because we manually track lifecycle via AddObserver/RemoveObserver, we could override the RunnableMethodTraits to not require RefCounted<>. This would have the advantage that callers do not need to make all Observers be RefCounted, but makes it more critical that observers not forget to call RemoveObserver().
Review URL: http://codereview.chromium.org/7353
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@3787 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base')
-rw-r--r-- | base/build/base.vcproj | 4 | ||||
-rw-r--r-- | base/build/base_unittests.vcproj | 4 | ||||
-rw-r--r-- | base/observer_list_threadsafe.h | 190 | ||||
-rw-r--r-- | base/observer_list_unittest.cc | 207 | ||||
-rw-r--r-- | base/task.h | 14 |
5 files changed, 413 insertions, 6 deletions
diff --git a/base/build/base.vcproj b/base/build/base.vcproj index 0350c33..a1731da 100644 --- a/base/build/base.vcproj +++ b/base/build/base.vcproj @@ -502,6 +502,10 @@ > </File> <File + RelativePath="..\observer_list_threadsafe.h" + > + </File> + <File RelativePath="..\path_service.cc" > </File> diff --git a/base/build/base_unittests.vcproj b/base/build/base_unittests.vcproj index ea8cce5..94dd0b5 100644 --- a/base/build/base_unittests.vcproj +++ b/base/build/base_unittests.vcproj @@ -244,6 +244,10 @@ > </File> <File + RelativePath="..\observer_list_unittest.cc" + > + </File> + <File RelativePath="..\path_service_unittest.cc" > </File> diff --git a/base/observer_list_threadsafe.h b/base/observer_list_threadsafe.h new file mode 100644 index 0000000..fcfa0ed --- /dev/null +++ b/base/observer_list_threadsafe.h @@ -0,0 +1,190 @@ +// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ +#define BASE_OBSERVER_LIST_THREADSAFE_H_ + +#include <vector> +#include <algorithm> + +#include "base/basictypes.h" +#include "base/logging.h" +#include "base/message_loop.h" +#include "base/observer_list.h" +#include "base/ref_counted.h" +#include "base/task.h" + +/////////////////////////////////////////////////////////////////////////////// +// +// OVERVIEW: +// +// A thread-safe container for a list of observers. +// This is similar to the observer_list (see observer_list.h), but it +// is more robust for multi-threaded situations. +// +// The following use cases are supported: +// * Observers can register for notifications from any thread. +// Callbacks to the observer will occur on the same thread where +// the observer initially called AddObserver() from. +// * Any thread may trigger a notification via NOTIFY_OBSERVERS. +// * Observers can remove themselves from the observer list inside +// of a callback. +// * If one thread is notifying observers concurrently with an observer +// removing itself from the observer list, the notifications will +// be silently dropped. +// +// The drawback of the threadsafe observer list is that notifications +// are not as real-time as the non-threadsafe version of this class. +// Notifications will always be done via PostTask() to another thread, +// whereas with the non-thread-safe observer_list, notifications happen +// synchronously and immediately. +// +// IMPLEMENTATION NOTES +// The ObserverListThreadSafe maintains an ObserverList for each thread +// which uses the ThreadSafeObserver. When Notifying the observers, +// we simply call PostTask to each registered thread, and then each thread +// will notify its regular ObserverList. +// +/////////////////////////////////////////////////////////////////////////////// +template <class ObserverType> +class ObserverListThreadSafe : + public base::RefCountedThreadSafe<ObserverListThreadSafe<ObserverType> > { + public: + ObserverListThreadSafe() {} + + ~ObserverListThreadSafe() { + typename ObserversListMap::const_iterator it; + for (it = observer_lists_.begin(); it != observer_lists_.end(); ++it) + delete (*it).second; + observer_lists_.clear(); + } + + // Add an observer to the list. + void AddObserver(ObserverType* obs) { + ObserverList<ObserverType>* list = NULL; + MessageLoop* loop = MessageLoop::current(); + { + AutoLock lock(list_lock_); + if (observer_lists_.find(loop) == observer_lists_.end()) + observer_lists_[loop] = new ObserverList<ObserverType>(); + list = observer_lists_[loop]; + } + list->AddObserver(obs); + } + + // Remove an observer from the list. + // If there are pending notifications in-transit to the observer, they will + // be aborted. + // RemoveObserver MUST be called from the same thread which called + // AddObserver. + void RemoveObserver(ObserverType* obs) { + ObserverList<ObserverType>* list = NULL; + MessageLoop* loop = MessageLoop::current(); + { + AutoLock lock(list_lock_); + DCHECK(observer_lists_.find(loop) != observer_lists_.end()) << + "RemoveObserver called on for unknown thread"; + list = observer_lists_[loop]; + + // If we're about to remove the last observer from the list, + // then we can remove this observer_list entirely. + if (list->size() == 1) + observer_lists_.erase(loop); + } + list->RemoveObserver(obs); + + // If RemoveObserver is called from a notification, the size will be + // nonzero. Instead of deleting here, the NotifyWrapper will delete + // when it finishes iterating. + if (list->size() == 0) + delete list; + } + + // Notify methods. + // Make a thread-safe callback to each Observer in the list. + // Note, these calls are effectively asynchronous. You cannot assume + // that at the completion of the Notify call that all Observers have + // been Notified. The notification may still be pending delivery. + template <class Method> + void Notify(Method m) { + UnboundMethod<ObserverType, Method, Tuple0> method(m, MakeTuple()); + Notify<Method, Tuple0>(method); + } + + template <class Method, class A> + void Notify(Method m, const A &a) { + UnboundMethod<ObserverType, Method, Tuple1<A> > method(m, MakeTuple(a)); + Notify<Method, Tuple1<A> >(method); + } + + // TODO(mbelshe): Add more wrappers for Notify() with more arguments. + + private: + template <class Method, class Params> + void Notify(const UnboundMethod<ObserverType, Method, Params>& method) { + AutoLock lock(list_lock_); + typename ObserversListMap::iterator it; + for (it = observer_lists_.begin(); it != observer_lists_.end(); ++it) { + MessageLoop* loop = (*it).first; + ObserverList<ObserverType>* list = (*it).second; + loop->PostTask(FROM_HERE, + NewRunnableMethod(this, + &ObserverListThreadSafe<ObserverType>:: + template NotifyWrapper<Method, Params>, list, method)); + } + } + + // Wrapper which is called to fire the notifications for each thread's + // ObserverList. This function MUST be called on the thread which owns + // the unsafe ObserverList. + template <class Method, class Params> + void NotifyWrapper(ObserverList<ObserverType>* list, + const UnboundMethod<ObserverType, Method, Params>& method) { + + // Check that this list still needs notifications. + { + AutoLock lock(list_lock_); + typename ObserversListMap::iterator it = + observer_lists_.find(MessageLoop::current()); + + // The ObserverList could have been removed already. In fact, it could + // have been removed and then re-added! If the master list's loop + // does not match this one, then we do not need to finish this + // notification. + if (it == observer_lists_.end() || it->second != list) + return; + } + + { + typename ObserverList<ObserverType>::Iterator it(*list); + ObserverType* obs; + while ((obs = it.GetNext()) != NULL) + method.Run(obs); + } + + // If there are no more observers on the list, we can now delete it. + if (list->size() == 0) { +#ifndef NDEBUG + { + AutoLock lock(list_lock_); + // Verify this list is no longer registered. + typename ObserversListMap::iterator it = + observer_lists_.find(MessageLoop::current()); + DCHECK(it == observer_lists_.end() || it->second != list); + } +#endif + delete list; + } + } + + typedef std::map<MessageLoop*, ObserverList<ObserverType>*> ObserversListMap; + + // These are marked mutable to facilitate having NotifyAll be const. + Lock list_lock_; // Protects the observer_lists_. + ObserversListMap observer_lists_; + + DISALLOW_EVIL_CONSTRUCTORS(ObserverListThreadSafe); +}; + +#endif // BASE_OBSERVER_LIST_THREADSAFE_H_ diff --git a/base/observer_list_unittest.cc b/base/observer_list_unittest.cc index 1fea315..529e8a9 100644 --- a/base/observer_list_unittest.cc +++ b/base/observer_list_unittest.cc @@ -2,7 +2,11 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include "base/message_loop.h" #include "base/observer_list.h" +#include "base/observer_list_threadsafe.h" +#include "base/platform_thread.h" +#include "base/ref_counted.h" #include "testing/gtest/include/gtest/gtest.h" namespace { @@ -18,7 +22,7 @@ class Foo { class Adder : public Foo { public: - Adder(int scaler) : total(0), scaler_(scaler) {} + explicit Adder(int scaler) : total(0), scaler_(scaler) {} virtual void Observe(int x) { total += x * scaler_; } @@ -30,23 +34,126 @@ class Adder : public Foo { class Disrupter : public Foo { public: - Disrupter(ObserverList<Foo>& list, Foo* doomed) : list_(list), doomed_(doomed) { - } + Disrupter(ObserverList<Foo>* list, Foo* doomed) + : list_(list), doomed_(doomed) { } virtual ~Disrupter() { } virtual void Observe(int x) { - list_.RemoveObserver(doomed_); + list_->RemoveObserver(doomed_); } private: - ObserverList<Foo>& list_; + ObserverList<Foo>* list_; Foo* doomed_; }; +class ThreadSafeDisrupter : public Foo { + public: + ThreadSafeDisrupter(ObserverListThreadSafe<Foo>* list, Foo* doomed) + : list_(list), doomed_(doomed) { } + virtual ~ThreadSafeDisrupter() { } + virtual void Observe(int x) { + list_->RemoveObserver(doomed_); + } + private: + ObserverListThreadSafe<Foo>* list_; + Foo* doomed_; +}; + +class ObserverListThreadSafeTest : public testing::Test { +}; + +static const int kThreadRunTime = 10000; // ms to run the multi-threaded test. + +// A thread for use in the ThreadSafeObserver test +// which will add and remove itself from the notification +// list repeatedly. +class AddRemoveThread : public PlatformThread::Delegate, + public Foo { + public: + AddRemoveThread(ObserverListThreadSafe<Foo>* list, bool notify) + : list_(list), + in_list_(false), + start_(Time::Now()), + count_observes_(0), + count_addtask_(0), + do_notifies_(notify) { + factory_ = new ScopedRunnableMethodFactory<AddRemoveThread>(this); + } + + ~AddRemoveThread() { + delete factory_; + } + + void ThreadMain() { + loop_ = new MessageLoop(); // Fire up a message loop. + loop_->PostTask(FROM_HERE, + factory_->NewRunnableMethod(&AddRemoveThread::AddTask)); + loop_->Run(); + //LOG(ERROR) << "Loop 0x" << std::hex << loop_ << " done. " << count_observes_ << ", " << count_addtask_; + delete loop_; + loop_ = reinterpret_cast<MessageLoop*>(0xdeadbeef); + } + + // This task just keeps posting to itself in an attempt + // to race with the notifier. + void AddTask() { + count_addtask_++; + + if ((Time::Now() - start_).InMilliseconds() > kThreadRunTime) { + LOG(INFO) << "DONE!"; + return; + } + + if (!in_list_) { + list_->AddObserver(this); + in_list_ = true; + } + + if (do_notifies_) { + list_->Notify(&Foo::Observe, 10); + } + + loop_->PostDelayedTask(FROM_HERE, + factory_->NewRunnableMethod(&AddRemoveThread::AddTask), 0); + } + + void Quit() { + loop_->PostTask(FROM_HERE, new MessageLoop::QuitTask()); + } + + virtual void Observe(int x) { + count_observes_++; + + // If we're getting called after we removed ourselves from + // the list, that is very bad! + DCHECK(in_list_); + + // This callback should fire on the appropriate thread + EXPECT_EQ(loop_, MessageLoop::current()); + + list_->RemoveObserver(this); + in_list_ = false; + } + + private: + ObserverListThreadSafe<Foo>* list_; + MessageLoop* loop_; + bool in_list_; // Are we currently registered for notifications. + // in_list_ is only used on |this| thread. + Time start_; // The time we started the test. + + int count_observes_; // Number of times we observed. + int count_addtask_; // Number of times thread AddTask was called + bool do_notifies_; // Whether these threads should do notifications. + + ScopedRunnableMethodFactory<AddRemoveThread>* factory_; +}; + } // namespace TEST(ObserverListTest, BasicTest) { ObserverList<Foo> observer_list; Adder a(1), b(-1), c(1), d(-1); - Disrupter evil(observer_list, &c); + Disrupter evil(&observer_list, &c); observer_list.AddObserver(&a); observer_list.AddObserver(&b); @@ -65,3 +172,91 @@ TEST(ObserverListTest, BasicTest) { EXPECT_EQ(d.total, -10); } +TEST(ObserverListThreadSafeTest, BasicTest) { + MessageLoop loop; + + scoped_refptr<ObserverListThreadSafe<Foo> > observer_list( + new ObserverListThreadSafe<Foo>); + Adder a(1); + Adder b(-1); + Adder c(1); + Adder d(-1); + ThreadSafeDisrupter evil(observer_list.get(), &c); + + observer_list->AddObserver(&a); + observer_list->AddObserver(&b); + + observer_list->Notify(&Foo::Observe, 10); + loop.RunAllPending(); + + observer_list->AddObserver(&evil); + observer_list->AddObserver(&c); + observer_list->AddObserver(&d); + + observer_list->Notify(&Foo::Observe, 10); + loop.RunAllPending(); + + EXPECT_EQ(a.total, 20); + EXPECT_EQ(b.total, -20); + EXPECT_EQ(c.total, 0); + EXPECT_EQ(d.total, -10); +} + + +// A test driver for a multi-threaded notification loop. Runs a number +// of observer threads, each of which constantly adds/removes itself +// from the observer list. Optionally, if cross_thread_notifies is set +// to true, the observer threads will also trigger notifications to +// all observers. +static void ThreadSafeObserverHarness(int num_threads, + bool cross_thread_notifies) { + MessageLoop loop; + + const int kMaxThreads = 15; + num_threads = num_threads > kMaxThreads ? kMaxThreads : num_threads; + + scoped_refptr<ObserverListThreadSafe<Foo> > observer_list( + new ObserverListThreadSafe<Foo>); + Adder a(1); + Adder b(-1); + Adder c(1); + Adder d(-1); + + observer_list->AddObserver(&a); + observer_list->AddObserver(&b); + + AddRemoveThread* threaded_observer[kMaxThreads]; + PlatformThreadHandle threads[kMaxThreads]; + for (int index = 0; index < num_threads; index++) { + threaded_observer[index] = new AddRemoveThread(observer_list.get(), false); + EXPECT_TRUE(PlatformThread::Create(0, + threaded_observer[index], &threads[index])); + } + + Time start = Time::Now(); + while (true) { + if ((Time::Now() - start).InMilliseconds() > kThreadRunTime) + break; + + observer_list->Notify(&Foo::Observe, 10); + + loop.RunAllPending(); + } + + for (int index = 0; index < num_threads; index++) { + threaded_observer[index]->Quit(); + PlatformThread::Join(threads[index]); + } +} + +TEST(ObserverListThreadSafeTest, CrossThreadObserver) { + // Use 7 observer threads. Notifications only come from + // the main thread. + ThreadSafeObserverHarness(7, false); +} + +TEST(ObserverListThreadSafeTest, CrossThreadNotifications) { + // Use 3 observer threads. Notifications will fire from + // the main thread and all 3 observer threads. + ThreadSafeObserverHarness(3, true); +} diff --git a/base/task.h b/base/task.h index 8db4560..9d4e6e2 100644 --- a/base/task.h +++ b/base/task.h @@ -625,5 +625,19 @@ typename Callback5<Arg1, Arg2, Arg3, Arg4, Arg5>::Type* NewCallback( Tuple5<Arg1, Arg2, Arg3, Arg4, Arg5> >(object, method); } +// An UnboundMethod is a wrapper for a method where the actual object is +// provided at Run dispatch time. +template <class T, class Method, class Params> +class UnboundMethod { + public: + UnboundMethod(Method m, Params p) : m_(m), p_(p) {} + void Run(T* obj) const { + DispatchToMethod(obj, m_, p_); + } + private: + Method m_; + Params p_; +}; + #endif // BASE_TASK_H__ |