summaryrefslogtreecommitdiffstats
path: root/base
diff options
context:
space:
mode:
authormbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2008-10-22 23:09:21 +0000
committermbelshe@google.com <mbelshe@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2008-10-22 23:09:21 +0000
commit503631cfa06aaae686f2e9a6f55ebc1859e8dab3 (patch)
tree8bc3a1d62749c65585db050b0926add085485436 /base
parent4d40fd494fc3142ded2eddc2a1becaa7a1584d06 (diff)
downloadchromium_src-503631cfa06aaae686f2e9a6f55ebc1859e8dab3.zip
chromium_src-503631cfa06aaae686f2e9a6f55ebc1859e8dab3.tar.gz
chromium_src-503631cfa06aaae686f2e9a6f55ebc1859e8dab3.tar.bz2
Create a thread-safe observer list. Will be used
by SystemMonitor. Right now the class requires that Observers be RefCounted<>. This is because we invoke tasks via NewRunnableMethod for them. However, because we manually track lifecycle via AddObserver/RemoveObserver, we could override the RunnableMethodTraits to not require RefCounted<>. This would have the advantage that callers do not need to make all Observers be RefCounted, but makes it more critical that observers not forget to call RemoveObserver(). Review URL: http://codereview.chromium.org/7353 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@3787 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base')
-rw-r--r--base/build/base.vcproj4
-rw-r--r--base/build/base_unittests.vcproj4
-rw-r--r--base/observer_list_threadsafe.h190
-rw-r--r--base/observer_list_unittest.cc207
-rw-r--r--base/task.h14
5 files changed, 413 insertions, 6 deletions
diff --git a/base/build/base.vcproj b/base/build/base.vcproj
index 0350c33..a1731da 100644
--- a/base/build/base.vcproj
+++ b/base/build/base.vcproj
@@ -502,6 +502,10 @@
>
</File>
<File
+ RelativePath="..\observer_list_threadsafe.h"
+ >
+ </File>
+ <File
RelativePath="..\path_service.cc"
>
</File>
diff --git a/base/build/base_unittests.vcproj b/base/build/base_unittests.vcproj
index ea8cce5..94dd0b5 100644
--- a/base/build/base_unittests.vcproj
+++ b/base/build/base_unittests.vcproj
@@ -244,6 +244,10 @@
>
</File>
<File
+ RelativePath="..\observer_list_unittest.cc"
+ >
+ </File>
+ <File
RelativePath="..\path_service_unittest.cc"
>
</File>
diff --git a/base/observer_list_threadsafe.h b/base/observer_list_threadsafe.h
new file mode 100644
index 0000000..fcfa0ed
--- /dev/null
+++ b/base/observer_list_threadsafe.h
@@ -0,0 +1,190 @@
+// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BASE_OBSERVER_LIST_THREADSAFE_H_
+#define BASE_OBSERVER_LIST_THREADSAFE_H_
+
+#include <vector>
+#include <algorithm>
+
+#include "base/basictypes.h"
+#include "base/logging.h"
+#include "base/message_loop.h"
+#include "base/observer_list.h"
+#include "base/ref_counted.h"
+#include "base/task.h"
+
+///////////////////////////////////////////////////////////////////////////////
+//
+// OVERVIEW:
+//
+// A thread-safe container for a list of observers.
+// This is similar to the observer_list (see observer_list.h), but it
+// is more robust for multi-threaded situations.
+//
+// The following use cases are supported:
+// * Observers can register for notifications from any thread.
+// Callbacks to the observer will occur on the same thread where
+// the observer initially called AddObserver() from.
+// * Any thread may trigger a notification via NOTIFY_OBSERVERS.
+// * Observers can remove themselves from the observer list inside
+// of a callback.
+// * If one thread is notifying observers concurrently with an observer
+// removing itself from the observer list, the notifications will
+// be silently dropped.
+//
+// The drawback of the threadsafe observer list is that notifications
+// are not as real-time as the non-threadsafe version of this class.
+// Notifications will always be done via PostTask() to another thread,
+// whereas with the non-thread-safe observer_list, notifications happen
+// synchronously and immediately.
+//
+// IMPLEMENTATION NOTES
+// The ObserverListThreadSafe maintains an ObserverList for each thread
+// which uses the ThreadSafeObserver. When Notifying the observers,
+// we simply call PostTask to each registered thread, and then each thread
+// will notify its regular ObserverList.
+//
+///////////////////////////////////////////////////////////////////////////////
+template <class ObserverType>
+class ObserverListThreadSafe :
+ public base::RefCountedThreadSafe<ObserverListThreadSafe<ObserverType> > {
+ public:
+ ObserverListThreadSafe() {}
+
+ ~ObserverListThreadSafe() {
+ typename ObserversListMap::const_iterator it;
+ for (it = observer_lists_.begin(); it != observer_lists_.end(); ++it)
+ delete (*it).second;
+ observer_lists_.clear();
+ }
+
+ // Add an observer to the list.
+ void AddObserver(ObserverType* obs) {
+ ObserverList<ObserverType>* list = NULL;
+ MessageLoop* loop = MessageLoop::current();
+ {
+ AutoLock lock(list_lock_);
+ if (observer_lists_.find(loop) == observer_lists_.end())
+ observer_lists_[loop] = new ObserverList<ObserverType>();
+ list = observer_lists_[loop];
+ }
+ list->AddObserver(obs);
+ }
+
+ // Remove an observer from the list.
+ // If there are pending notifications in-transit to the observer, they will
+ // be aborted.
+ // RemoveObserver MUST be called from the same thread which called
+ // AddObserver.
+ void RemoveObserver(ObserverType* obs) {
+ ObserverList<ObserverType>* list = NULL;
+ MessageLoop* loop = MessageLoop::current();
+ {
+ AutoLock lock(list_lock_);
+ DCHECK(observer_lists_.find(loop) != observer_lists_.end()) <<
+ "RemoveObserver called on for unknown thread";
+ list = observer_lists_[loop];
+
+ // If we're about to remove the last observer from the list,
+ // then we can remove this observer_list entirely.
+ if (list->size() == 1)
+ observer_lists_.erase(loop);
+ }
+ list->RemoveObserver(obs);
+
+ // If RemoveObserver is called from a notification, the size will be
+ // nonzero. Instead of deleting here, the NotifyWrapper will delete
+ // when it finishes iterating.
+ if (list->size() == 0)
+ delete list;
+ }
+
+ // Notify methods.
+ // Make a thread-safe callback to each Observer in the list.
+ // Note, these calls are effectively asynchronous. You cannot assume
+ // that at the completion of the Notify call that all Observers have
+ // been Notified. The notification may still be pending delivery.
+ template <class Method>
+ void Notify(Method m) {
+ UnboundMethod<ObserverType, Method, Tuple0> method(m, MakeTuple());
+ Notify<Method, Tuple0>(method);
+ }
+
+ template <class Method, class A>
+ void Notify(Method m, const A &a) {
+ UnboundMethod<ObserverType, Method, Tuple1<A> > method(m, MakeTuple(a));
+ Notify<Method, Tuple1<A> >(method);
+ }
+
+ // TODO(mbelshe): Add more wrappers for Notify() with more arguments.
+
+ private:
+ template <class Method, class Params>
+ void Notify(const UnboundMethod<ObserverType, Method, Params>& method) {
+ AutoLock lock(list_lock_);
+ typename ObserversListMap::iterator it;
+ for (it = observer_lists_.begin(); it != observer_lists_.end(); ++it) {
+ MessageLoop* loop = (*it).first;
+ ObserverList<ObserverType>* list = (*it).second;
+ loop->PostTask(FROM_HERE,
+ NewRunnableMethod(this,
+ &ObserverListThreadSafe<ObserverType>::
+ template NotifyWrapper<Method, Params>, list, method));
+ }
+ }
+
+ // Wrapper which is called to fire the notifications for each thread's
+ // ObserverList. This function MUST be called on the thread which owns
+ // the unsafe ObserverList.
+ template <class Method, class Params>
+ void NotifyWrapper(ObserverList<ObserverType>* list,
+ const UnboundMethod<ObserverType, Method, Params>& method) {
+
+ // Check that this list still needs notifications.
+ {
+ AutoLock lock(list_lock_);
+ typename ObserversListMap::iterator it =
+ observer_lists_.find(MessageLoop::current());
+
+ // The ObserverList could have been removed already. In fact, it could
+ // have been removed and then re-added! If the master list's loop
+ // does not match this one, then we do not need to finish this
+ // notification.
+ if (it == observer_lists_.end() || it->second != list)
+ return;
+ }
+
+ {
+ typename ObserverList<ObserverType>::Iterator it(*list);
+ ObserverType* obs;
+ while ((obs = it.GetNext()) != NULL)
+ method.Run(obs);
+ }
+
+ // If there are no more observers on the list, we can now delete it.
+ if (list->size() == 0) {
+#ifndef NDEBUG
+ {
+ AutoLock lock(list_lock_);
+ // Verify this list is no longer registered.
+ typename ObserversListMap::iterator it =
+ observer_lists_.find(MessageLoop::current());
+ DCHECK(it == observer_lists_.end() || it->second != list);
+ }
+#endif
+ delete list;
+ }
+ }
+
+ typedef std::map<MessageLoop*, ObserverList<ObserverType>*> ObserversListMap;
+
+ // These are marked mutable to facilitate having NotifyAll be const.
+ Lock list_lock_; // Protects the observer_lists_.
+ ObserversListMap observer_lists_;
+
+ DISALLOW_EVIL_CONSTRUCTORS(ObserverListThreadSafe);
+};
+
+#endif // BASE_OBSERVER_LIST_THREADSAFE_H_
diff --git a/base/observer_list_unittest.cc b/base/observer_list_unittest.cc
index 1fea315..529e8a9 100644
--- a/base/observer_list_unittest.cc
+++ b/base/observer_list_unittest.cc
@@ -2,7 +2,11 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include "base/message_loop.h"
#include "base/observer_list.h"
+#include "base/observer_list_threadsafe.h"
+#include "base/platform_thread.h"
+#include "base/ref_counted.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace {
@@ -18,7 +22,7 @@ class Foo {
class Adder : public Foo {
public:
- Adder(int scaler) : total(0), scaler_(scaler) {}
+ explicit Adder(int scaler) : total(0), scaler_(scaler) {}
virtual void Observe(int x) {
total += x * scaler_;
}
@@ -30,23 +34,126 @@ class Adder : public Foo {
class Disrupter : public Foo {
public:
- Disrupter(ObserverList<Foo>& list, Foo* doomed) : list_(list), doomed_(doomed) {
- }
+ Disrupter(ObserverList<Foo>* list, Foo* doomed)
+ : list_(list), doomed_(doomed) { }
virtual ~Disrupter() { }
virtual void Observe(int x) {
- list_.RemoveObserver(doomed_);
+ list_->RemoveObserver(doomed_);
}
private:
- ObserverList<Foo>& list_;
+ ObserverList<Foo>* list_;
Foo* doomed_;
};
+class ThreadSafeDisrupter : public Foo {
+ public:
+ ThreadSafeDisrupter(ObserverListThreadSafe<Foo>* list, Foo* doomed)
+ : list_(list), doomed_(doomed) { }
+ virtual ~ThreadSafeDisrupter() { }
+ virtual void Observe(int x) {
+ list_->RemoveObserver(doomed_);
+ }
+ private:
+ ObserverListThreadSafe<Foo>* list_;
+ Foo* doomed_;
+};
+
+class ObserverListThreadSafeTest : public testing::Test {
+};
+
+static const int kThreadRunTime = 10000; // ms to run the multi-threaded test.
+
+// A thread for use in the ThreadSafeObserver test
+// which will add and remove itself from the notification
+// list repeatedly.
+class AddRemoveThread : public PlatformThread::Delegate,
+ public Foo {
+ public:
+ AddRemoveThread(ObserverListThreadSafe<Foo>* list, bool notify)
+ : list_(list),
+ in_list_(false),
+ start_(Time::Now()),
+ count_observes_(0),
+ count_addtask_(0),
+ do_notifies_(notify) {
+ factory_ = new ScopedRunnableMethodFactory<AddRemoveThread>(this);
+ }
+
+ ~AddRemoveThread() {
+ delete factory_;
+ }
+
+ void ThreadMain() {
+ loop_ = new MessageLoop(); // Fire up a message loop.
+ loop_->PostTask(FROM_HERE,
+ factory_->NewRunnableMethod(&AddRemoveThread::AddTask));
+ loop_->Run();
+ //LOG(ERROR) << "Loop 0x" << std::hex << loop_ << " done. " << count_observes_ << ", " << count_addtask_;
+ delete loop_;
+ loop_ = reinterpret_cast<MessageLoop*>(0xdeadbeef);
+ }
+
+ // This task just keeps posting to itself in an attempt
+ // to race with the notifier.
+ void AddTask() {
+ count_addtask_++;
+
+ if ((Time::Now() - start_).InMilliseconds() > kThreadRunTime) {
+ LOG(INFO) << "DONE!";
+ return;
+ }
+
+ if (!in_list_) {
+ list_->AddObserver(this);
+ in_list_ = true;
+ }
+
+ if (do_notifies_) {
+ list_->Notify(&Foo::Observe, 10);
+ }
+
+ loop_->PostDelayedTask(FROM_HERE,
+ factory_->NewRunnableMethod(&AddRemoveThread::AddTask), 0);
+ }
+
+ void Quit() {
+ loop_->PostTask(FROM_HERE, new MessageLoop::QuitTask());
+ }
+
+ virtual void Observe(int x) {
+ count_observes_++;
+
+ // If we're getting called after we removed ourselves from
+ // the list, that is very bad!
+ DCHECK(in_list_);
+
+ // This callback should fire on the appropriate thread
+ EXPECT_EQ(loop_, MessageLoop::current());
+
+ list_->RemoveObserver(this);
+ in_list_ = false;
+ }
+
+ private:
+ ObserverListThreadSafe<Foo>* list_;
+ MessageLoop* loop_;
+ bool in_list_; // Are we currently registered for notifications.
+ // in_list_ is only used on |this| thread.
+ Time start_; // The time we started the test.
+
+ int count_observes_; // Number of times we observed.
+ int count_addtask_; // Number of times thread AddTask was called
+ bool do_notifies_; // Whether these threads should do notifications.
+
+ ScopedRunnableMethodFactory<AddRemoveThread>* factory_;
+};
+
} // namespace
TEST(ObserverListTest, BasicTest) {
ObserverList<Foo> observer_list;
Adder a(1), b(-1), c(1), d(-1);
- Disrupter evil(observer_list, &c);
+ Disrupter evil(&observer_list, &c);
observer_list.AddObserver(&a);
observer_list.AddObserver(&b);
@@ -65,3 +172,91 @@ TEST(ObserverListTest, BasicTest) {
EXPECT_EQ(d.total, -10);
}
+TEST(ObserverListThreadSafeTest, BasicTest) {
+ MessageLoop loop;
+
+ scoped_refptr<ObserverListThreadSafe<Foo> > observer_list(
+ new ObserverListThreadSafe<Foo>);
+ Adder a(1);
+ Adder b(-1);
+ Adder c(1);
+ Adder d(-1);
+ ThreadSafeDisrupter evil(observer_list.get(), &c);
+
+ observer_list->AddObserver(&a);
+ observer_list->AddObserver(&b);
+
+ observer_list->Notify(&Foo::Observe, 10);
+ loop.RunAllPending();
+
+ observer_list->AddObserver(&evil);
+ observer_list->AddObserver(&c);
+ observer_list->AddObserver(&d);
+
+ observer_list->Notify(&Foo::Observe, 10);
+ loop.RunAllPending();
+
+ EXPECT_EQ(a.total, 20);
+ EXPECT_EQ(b.total, -20);
+ EXPECT_EQ(c.total, 0);
+ EXPECT_EQ(d.total, -10);
+}
+
+
+// A test driver for a multi-threaded notification loop. Runs a number
+// of observer threads, each of which constantly adds/removes itself
+// from the observer list. Optionally, if cross_thread_notifies is set
+// to true, the observer threads will also trigger notifications to
+// all observers.
+static void ThreadSafeObserverHarness(int num_threads,
+ bool cross_thread_notifies) {
+ MessageLoop loop;
+
+ const int kMaxThreads = 15;
+ num_threads = num_threads > kMaxThreads ? kMaxThreads : num_threads;
+
+ scoped_refptr<ObserverListThreadSafe<Foo> > observer_list(
+ new ObserverListThreadSafe<Foo>);
+ Adder a(1);
+ Adder b(-1);
+ Adder c(1);
+ Adder d(-1);
+
+ observer_list->AddObserver(&a);
+ observer_list->AddObserver(&b);
+
+ AddRemoveThread* threaded_observer[kMaxThreads];
+ PlatformThreadHandle threads[kMaxThreads];
+ for (int index = 0; index < num_threads; index++) {
+ threaded_observer[index] = new AddRemoveThread(observer_list.get(), false);
+ EXPECT_TRUE(PlatformThread::Create(0,
+ threaded_observer[index], &threads[index]));
+ }
+
+ Time start = Time::Now();
+ while (true) {
+ if ((Time::Now() - start).InMilliseconds() > kThreadRunTime)
+ break;
+
+ observer_list->Notify(&Foo::Observe, 10);
+
+ loop.RunAllPending();
+ }
+
+ for (int index = 0; index < num_threads; index++) {
+ threaded_observer[index]->Quit();
+ PlatformThread::Join(threads[index]);
+ }
+}
+
+TEST(ObserverListThreadSafeTest, CrossThreadObserver) {
+ // Use 7 observer threads. Notifications only come from
+ // the main thread.
+ ThreadSafeObserverHarness(7, false);
+}
+
+TEST(ObserverListThreadSafeTest, CrossThreadNotifications) {
+ // Use 3 observer threads. Notifications will fire from
+ // the main thread and all 3 observer threads.
+ ThreadSafeObserverHarness(3, true);
+}
diff --git a/base/task.h b/base/task.h
index 8db4560..9d4e6e2 100644
--- a/base/task.h
+++ b/base/task.h
@@ -625,5 +625,19 @@ typename Callback5<Arg1, Arg2, Arg3, Arg4, Arg5>::Type* NewCallback(
Tuple5<Arg1, Arg2, Arg3, Arg4, Arg5> >(object, method);
}
+// An UnboundMethod is a wrapper for a method where the actual object is
+// provided at Run dispatch time.
+template <class T, class Method, class Params>
+class UnboundMethod {
+ public:
+ UnboundMethod(Method m, Params p) : m_(m), p_(p) {}
+ void Run(T* obj) const {
+ DispatchToMethod(obj, m_, p_);
+ }
+ private:
+ Method m_;
+ Params p_;
+};
+
#endif // BASE_TASK_H__