diff options
author | ajwong@chromium.org <ajwong@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-08-18 23:13:01 +0000 |
---|---|---|
committer | ajwong@chromium.org <ajwong@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-08-18 23:13:01 +0000 |
commit | c31af70db21e370eee35a677f6fcc62a2963784a (patch) | |
tree | b3f1908471efc10ac35d61cbffa2c7d160d6247f | |
parent | d55c2380d728c2cc29c1a84970bb1d6f51c397d7 (diff) | |
download | chromium_src-c31af70db21e370eee35a677f6fcc62a2963784a.zip chromium_src-c31af70db21e370eee35a677f6fcc62a2963784a.tar.gz chromium_src-c31af70db21e370eee35a677f6fcc62a2963784a.tar.bz2 |
Implementation of PostTaskAndReply() in MessageLoopProxy and BrowserThread.
This ensures that the request/reply closures are always deleted on the origin
thread, or leaked if the task cannot be completed (due to message loop
shutdown).
BUG=86301
TEST=new unittests
Review URL: http://codereview.chromium.org/7210053
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@97387 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | base/base.gyp | 1 | ||||
-rw-r--r-- | base/message_loop.h | 2 | ||||
-rw-r--r-- | base/message_loop_proxy.cc | 77 | ||||
-rw-r--r-- | base/message_loop_proxy.h | 45 | ||||
-rw-r--r-- | base/message_loop_proxy_unittest.cc | 263 | ||||
-rw-r--r-- | base/task.h | 11 | ||||
-rw-r--r-- | base/threading/thread_collision_warner.h | 2 | ||||
-rw-r--r-- | content/browser/browser_thread.cc | 12 | ||||
-rw-r--r-- | content/browser/browser_thread.h | 6 | ||||
-rw-r--r-- | content/browser/browser_thread_unittest.cc | 18 | ||||
-rw-r--r-- | tools/valgrind/memcheck/suppressions.txt | 7 |
11 files changed, 440 insertions, 4 deletions
diff --git a/base/base.gyp b/base/base.gyp index 8e28506..40e1ac2 100644 --- a/base/base.gyp +++ b/base/base.gyp @@ -151,6 +151,7 @@ 'memory/singleton_unittest.cc', 'memory/weak_ptr_unittest.cc', 'message_loop_proxy_impl_unittest.cc', + 'message_loop_proxy_unittest.cc', 'message_loop_unittest.cc', 'message_pump_glib_unittest.cc', 'message_pump_libevent_unittest.cc', diff --git a/base/message_loop.h b/base/message_loop.h index 976c5ce..601120f 100644 --- a/base/message_loop.h +++ b/base/message_loop.h @@ -272,7 +272,7 @@ class BASE_EXPORT MessageLoop : public base::MessagePump::Delegate { } const std::string& thread_name() const { return thread_name_; } - // Gets the message loop proxy associated with this message loop proxy + // Gets the message loop proxy associated with this message loop. scoped_refptr<base::MessageLoopProxy> message_loop_proxy() { return message_loop_proxy_.get(); } diff --git a/base/message_loop_proxy.cc b/base/message_loop_proxy.cc index a38db39..7433e25 100644 --- a/base/message_loop_proxy.cc +++ b/base/message_loop_proxy.cc @@ -1,17 +1,92 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/message_loop_proxy.h" +#include "base/bind.h" + namespace base { +namespace { + +// This relay class remembers the MessageLoop that it was created on, and +// ensures that both the |task| and |reply| Closures are deleted on this same +// thread. Also, |task| is guaranteed to be deleted before |reply| is run or +// deleted. +// +// If this is not possible because the originating MessageLoop is no longer +// available, the the |task| and |reply| Closures are leaked. Leaking is +// considered preferable to having a thread-safetey violations caused by +// invoking the Closure destructor on the wrong thread. +class PostTaskAndReplyRelay { + public: + PostTaskAndReplyRelay(const tracked_objects::Location& from_here, + const Closure& task, const Closure& reply) + : from_here_(from_here), + origin_loop_(MessageLoopProxy::current()) { + task_ = task; + reply_ = reply; + } + + ~PostTaskAndReplyRelay() { + DCHECK(origin_loop_->BelongsToCurrentThread()); + task_.Reset(); + reply_.Reset(); + } + + void Run() { + task_.Run(); + origin_loop_->PostTask( + from_here_, + Bind(&PostTaskAndReplyRelay::RunReplyAndSelfDestruct, + base::Unretained(this))); + } + + private: + void RunReplyAndSelfDestruct() { + DCHECK(origin_loop_->BelongsToCurrentThread()); + + // Force |task_| to be released before |reply_| is to ensure that no one + // accidentally depends on |task_| keeping one of its arguments alive while + // |reply_| is executing. + task_.Reset(); + + reply_.Run(); + + // Cue mission impossible theme. + delete this; + } + + tracked_objects::Location from_here_; + scoped_refptr<MessageLoopProxy> origin_loop_; + Closure reply_; + Closure task_; +}; + +} // namespace + MessageLoopProxy::MessageLoopProxy() { } MessageLoopProxy::~MessageLoopProxy() { } +bool MessageLoopProxy::PostTaskAndReply( + const tracked_objects::Location& from_here, + const Closure& task, + const Closure& reply) { + PostTaskAndReplyRelay* relay = + new PostTaskAndReplyRelay(from_here, task, reply); + if (!PostTask(from_here, Bind(&PostTaskAndReplyRelay::Run, + Unretained(relay)))) { + delete relay; + return false; + } + + return true; +} + void MessageLoopProxy::OnDestruct() const { delete this; } diff --git a/base/message_loop_proxy.h b/base/message_loop_proxy.h index 9a856bf..decaa38 100644 --- a/base/message_loop_proxy.h +++ b/base/message_loop_proxy.h @@ -70,6 +70,51 @@ class BASE_EXPORT MessageLoopProxy // this proxy represents. virtual bool BelongsToCurrentThread() = 0; + // Executes |task| on the given MessageLoopProxy. On completion, |reply| + // is passed back to the MessageLoopProxy for the thread that called + // PostTaskAndReply(). Both |task| and |reply| are guaranteed to be deleted + // on the thread from which PostTaskAndReply() is invoked. This allows + // objects that must be deleted on the originating thread to be bound into the + // |task| and |reply| Closures. In particular, it can be useful to use + // WeakPtr<> in the |reply| Closure so that the reply operation can be + // canceled. See the following pseudo-code: + // + // class DataBuffer : public RefCountedThreadSafe<DataBuffer> { + // public: + // // Called to add data into a buffer. + // void AddData(void* buf, size_t length); + // ... + // }; + // + // + // class DataLoader : public SupportsWeakPtr<ReadToBuffer> { + // public: + // void GetData() { + // scoped_refptr<DataBuffer> buffer = new DataBuffer(); + // target_thread_.message_loop_proxy()->PostTaskAndReply( + // FROM_HERE, + // base::Bind(&DataBuffer::AddData, buffer), + // base::Bind(&DataLoader::OnDataReceived, AsWeakPtr(), buffer)); + // } + // + // private: + // void OnDataReceived(scoped_refptr<DataBuffer> buffer) { + // // Do something with buffer. + // } + // }; + // + // + // Things to notice: + // * Results of |task| are shared with |reply| by binding a shared argument + // (a DataBuffer instance). + // * The DataLoader object has no special thread safety. + // * The DataLoader object can be deleted while |task| is still running, + // and the reply will cancel itself safely because it is bound to a + // WeakPtr<>. + bool PostTaskAndReply(const tracked_objects::Location& from_here, + const Closure& task, + const Closure& reply); + template <class T> bool DeleteSoon(const tracked_objects::Location& from_here, T* object) { diff --git a/base/message_loop_proxy_unittest.cc b/base/message_loop_proxy_unittest.cc new file mode 100644 index 0000000..aeb2f0c --- /dev/null +++ b/base/message_loop_proxy_unittest.cc @@ -0,0 +1,263 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/message_loop_proxy.h" + +#include "base/atomic_sequence_num.h" +#include "base/bind.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/message_loop.h" +#include "base/synchronization/waitable_event.h" +#include "base/threading/thread.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +namespace { + +class MessageLoopProxyTest : public testing::Test { + public: + MessageLoopProxyTest() + : current_loop_(new MessageLoop()), + task_thread_("task_thread"), + thread_sync_(true, false) { + } + + void DeleteCurrentMessageLoop() { + current_loop_.reset(); + } + + protected: + virtual void SetUp() { + // Use SetUp() instead of the constructor to avoid posting a task to a + // partialy constructed object. + task_thread_.Start(); + + // Allow us to pause the |task_thread_|'s MessageLoop. + task_thread_.message_loop()->PostTask( + FROM_HERE, + Bind(&MessageLoopProxyTest::BlockTaskThreadHelper, Unretained(this))); + } + + virtual void TearDown() { + // Make sure the |task_thread_| is not blocked, and stop the thread + // fully before destuction because its tasks may still depend on the + // |thread_sync_| event. + thread_sync_.Signal(); + task_thread_.Stop(); + DeleteCurrentMessageLoop(); + } + + // Make LoopRecorder threadsafe so that there is defined behavior even if a + // threading mistake sneaks into the PostTaskAndReplyRelay implementation. + class LoopRecorder : public RefCountedThreadSafe<LoopRecorder> { + public: + LoopRecorder(MessageLoop** run_on, MessageLoop** deleted_on, + int* destruct_order) + : run_on_(run_on), + deleted_on_(deleted_on), + destruct_order_(destruct_order) { + } + + void RecordRun() { + *run_on_ = MessageLoop::current(); + } + + private: + friend class RefCountedThreadSafe<LoopRecorder>; + ~LoopRecorder() { + *deleted_on_ = MessageLoop::current(); + *destruct_order_ = g_order.GetNext(); + } + + MessageLoop** run_on_; + MessageLoop** deleted_on_; + int* destruct_order_; + }; + + static void RecordLoop(scoped_refptr<LoopRecorder> recorder) { + recorder->RecordRun(); + } + + static void RecordLoopAndQuit(scoped_refptr<LoopRecorder> recorder) { + recorder->RecordRun(); + MessageLoop::current()->Quit(); + } + + void UnblockTaskThread() { + thread_sync_.Signal(); + } + + void BlockTaskThreadHelper() { + thread_sync_.Wait(); + } + + static AtomicSequenceNumber g_order; + + scoped_ptr<MessageLoop> current_loop_; + Thread task_thread_; + + private: + base::WaitableEvent thread_sync_; +}; + +AtomicSequenceNumber MessageLoopProxyTest::g_order(LINKER_INITIALIZED); + +TEST_F(MessageLoopProxyTest, PostTaskAndReply_Basic) { + MessageLoop* task_run_on = NULL; + MessageLoop* task_deleted_on = NULL; + int task_delete_order = -1; + MessageLoop* reply_run_on = NULL; + MessageLoop* reply_deleted_on = NULL; + int reply_delete_order = -1; + + scoped_refptr<LoopRecorder> task_recoder = + new LoopRecorder(&task_run_on, &task_deleted_on, &task_delete_order); + scoped_refptr<LoopRecorder> reply_recoder = + new LoopRecorder(&reply_run_on, &reply_deleted_on, &reply_delete_order); + + ASSERT_TRUE(task_thread_.message_loop_proxy()->PostTaskAndReply( + FROM_HERE, + Bind(&RecordLoop, task_recoder), + Bind(&RecordLoopAndQuit, reply_recoder))); + + // Die if base::Bind doesn't retain a reference to the recorders. + task_recoder = NULL; + reply_recoder = NULL; + ASSERT_FALSE(task_deleted_on); + ASSERT_FALSE(reply_deleted_on); + + UnblockTaskThread(); + current_loop_->Run(); + + EXPECT_EQ(task_thread_.message_loop(), task_run_on); + EXPECT_EQ(current_loop_.get(), task_deleted_on); + EXPECT_EQ(current_loop_.get(), reply_run_on); + EXPECT_EQ(current_loop_.get(), reply_deleted_on); + EXPECT_LT(task_delete_order, reply_delete_order); +} + +TEST_F(MessageLoopProxyTest, PostTaskAndReplyOnDeletedThreadDoesNotLeak) { + MessageLoop* task_run_on = NULL; + MessageLoop* task_deleted_on = NULL; + int task_delete_order = -1; + MessageLoop* reply_run_on = NULL; + MessageLoop* reply_deleted_on = NULL; + int reply_delete_order = -1; + + scoped_refptr<LoopRecorder> task_recoder = + new LoopRecorder(&task_run_on, &task_deleted_on, &task_delete_order); + scoped_refptr<LoopRecorder> reply_recoder = + new LoopRecorder(&reply_run_on, &reply_deleted_on, &reply_delete_order); + + // Grab a MessageLoopProxy to a dead MessageLoop. + scoped_refptr<MessageLoopProxy> task_loop_proxy = + task_thread_.message_loop_proxy(); + UnblockTaskThread(); + task_thread_.Stop(); + + ASSERT_FALSE(task_loop_proxy->PostTaskAndReply( + FROM_HERE, + Bind(&RecordLoop, task_recoder), + Bind(&RecordLoopAndQuit, reply_recoder))); + + // The relay should have properly deleted its resources leaving us as the only + // reference. + EXPECT_EQ(task_delete_order, reply_delete_order); + ASSERT_TRUE(task_recoder->HasOneRef()); + ASSERT_TRUE(reply_recoder->HasOneRef()); + + // Nothing should have run though. + EXPECT_FALSE(task_run_on); + EXPECT_FALSE(reply_run_on); +} + +TEST_F(MessageLoopProxyTest, PostTaskAndReply_SameLoop) { + MessageLoop* task_run_on = NULL; + MessageLoop* task_deleted_on = NULL; + int task_delete_order = -1; + MessageLoop* reply_run_on = NULL; + MessageLoop* reply_deleted_on = NULL; + int reply_delete_order = -1; + + scoped_refptr<LoopRecorder> task_recoder = + new LoopRecorder(&task_run_on, &task_deleted_on, &task_delete_order); + scoped_refptr<LoopRecorder> reply_recoder = + new LoopRecorder(&reply_run_on, &reply_deleted_on, &reply_delete_order); + + // Enqueue the relay. + ASSERT_TRUE(current_loop_->message_loop_proxy()->PostTaskAndReply( + FROM_HERE, + Bind(&RecordLoop, task_recoder), + Bind(&RecordLoopAndQuit, reply_recoder))); + + // Die if base::Bind doesn't retain a reference to the recorders. + task_recoder = NULL; + reply_recoder = NULL; + ASSERT_FALSE(task_deleted_on); + ASSERT_FALSE(reply_deleted_on); + + current_loop_->Run(); + + EXPECT_EQ(current_loop_.get(), task_run_on); + EXPECT_EQ(current_loop_.get(), task_deleted_on); + EXPECT_EQ(current_loop_.get(), reply_run_on); + EXPECT_EQ(current_loop_.get(), reply_deleted_on); + EXPECT_LT(task_delete_order, reply_delete_order); +} + +TEST_F(MessageLoopProxyTest, PostTaskAndReply_DeadReplyLoopDoesNotDelete) { + MessageLoop* task_run_on = NULL; + MessageLoop* task_deleted_on = NULL; + int task_delete_order = -1; + MessageLoop* reply_run_on = NULL; + MessageLoop* reply_deleted_on = NULL; + int reply_delete_order = -1; + + scoped_refptr<LoopRecorder> task_recoder = + new LoopRecorder(&task_run_on, &task_deleted_on, &task_delete_order); + scoped_refptr<LoopRecorder> reply_recoder = + new LoopRecorder(&reply_run_on, &reply_deleted_on, &reply_delete_order); + + // Enqueue the relay. + task_thread_.message_loop_proxy()->PostTaskAndReply( + FROM_HERE, + Bind(&RecordLoop, task_recoder), + Bind(&RecordLoopAndQuit, reply_recoder)); + + // Die if base::Bind doesn't retain a reference to the recorders. + task_recoder = NULL; + reply_recoder = NULL; + ASSERT_FALSE(task_deleted_on); + ASSERT_FALSE(reply_deleted_on); + + UnblockTaskThread(); + + // Mercilessly whack the current loop before |reply| gets to run. + current_loop_.reset(); + + // This should ensure the relay has been run. We need to record the + // MessageLoop pointer before stopping the thread because Thread::Stop() will + // NULL out its own pointer. + MessageLoop* task_loop = task_thread_.message_loop(); + task_thread_.Stop(); + + EXPECT_EQ(task_loop, task_run_on); + ASSERT_FALSE(task_deleted_on); + EXPECT_FALSE(reply_run_on); + ASSERT_FALSE(reply_deleted_on); + EXPECT_EQ(task_delete_order, reply_delete_order); + + // The PostTaskAndReplyRelay is leaked here. Even if we had a reference to + // it, we cannot just delete it because PostTaskAndReplyRelay's destructor + // checks that MessageLoop::current() is the the same as when the + // PostTaskAndReplyRelay object was constructed. However, this loop must have + // aleady been deleted in order to perform this test. See + // http://crbug.com/86301. +} + +} // namespace + +} // namespace base diff --git a/base/task.h b/base/task.h index 460c8c2..b95d748 100644 --- a/base/task.h +++ b/base/task.h @@ -217,6 +217,17 @@ class ReleaseTask : public CancelableTask { const T* obj_; }; +// Equivalents for use by base::Bind(). +template<typename T> +void DeletePointer(T* obj) { + delete obj; +} + +template<typename T> +void ReleasePointer(T* obj) { + obj->Release(); +} + // RunnableMethodTraits -------------------------------------------------------- // // This traits-class is used by RunnableMethod to manage the lifetime of the diff --git a/base/threading/thread_collision_warner.h b/base/threading/thread_collision_warner.h index 4460602..24f0920 100644 --- a/base/threading/thread_collision_warner.h +++ b/base/threading/thread_collision_warner.h @@ -79,7 +79,7 @@ // // // Example: Class that has to be contructed/destroyed on same thread, it has -// a "shareable" method (with external syncronization) and a not +// a "shareable" method (with external synchronization) and a not // shareable method (even with external synchronization). // // In this case 3 Critical sections have to be defined diff --git a/content/browser/browser_thread.cc b/content/browser/browser_thread.cc index b4d95ce..c3b4005 100644 --- a/content/browser/browser_thread.cc +++ b/content/browser/browser_thread.cc @@ -4,6 +4,7 @@ #include "content/browser/browser_thread.h" +#include "base/bind.h" #include "base/message_loop.h" #include "base/message_loop_proxy.h" #include "base/threading/thread_restrictions.h" @@ -221,6 +222,17 @@ bool BrowserThread::PostNonNestableDelayedTask( } // static +bool BrowserThread::PostTaskAndReply( + ID identifier, + const tracked_objects::Location& from_here, + const base::Closure& task, + const base::Closure& reply) { + return GetMessageLoopProxyForThread(identifier)->PostTaskAndReply(from_here, + task, + reply); +} + +// static bool BrowserThread::GetCurrentThreadIdentifier(ID* identifier) { // We shouldn't use MessageLoop::current() since it uses LazyInstance which // may be deleted by ~AtExitManager when a WorkerPool thread calls this diff --git a/content/browser/browser_thread.h b/content/browser/browser_thread.h index 112ce3f..3c47fc0 100644 --- a/content/browser/browser_thread.h +++ b/content/browser/browser_thread.h @@ -128,6 +128,12 @@ class BrowserThread : public base::Thread { Task* task, int64 delay_ms); + static bool PostTaskAndReply( + ID identifier, + const tracked_objects::Location& from_here, + const base::Closure& task, + const base::Closure& reply); + template <class T> static bool DeleteSoon(ID identifier, const tracked_objects::Location& from_here, diff --git a/content/browser/browser_thread_unittest.cc b/content/browser/browser_thread_unittest.cc index 0a3ff17..f5f6e47 100644 --- a/content/browser/browser_thread_unittest.cc +++ b/content/browser/browser_thread_unittest.cc @@ -2,6 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include "base/bind.h" #include "base/memory/scoped_ptr.h" #include "base/message_loop.h" #include "base/message_loop_proxy.h" @@ -34,6 +35,9 @@ class BrowserThreadTest : public testing::Test { message_loop->PostTask(FROM_HERE, new MessageLoop::QuitTask); } + static void DoNothing() { + } + class DummyTask : public Task { public: explicit DummyTask(bool* deleted) : deleted_(deleted) { } @@ -130,6 +134,19 @@ TEST_F(BrowserThreadTest, ReleaseViaMessageLoopProxy) { MessageLoop::current()->Run(); } +TEST_F(BrowserThreadTest, PostTaskAndReply) { + // Most of the heavy testing for PostTaskAndReply() is done inside the + // MessageLoopProxy test. This just makes sure we get piped through at all. + ASSERT_TRUE(BrowserThread::PostTaskAndReply( + BrowserThread::FILE, + FROM_HERE, + base::Bind(&BrowserThreadTest::DoNothing), + base::Bind(&MessageLoop::Quit, + base::Unretained(MessageLoop::current()->current())))); + MessageLoop::current()->Run(); +} + + TEST_F(BrowserThreadTest, TaskToNonExistentThreadIsDeletedViaMessageLoopProxy) { bool deleted = false; scoped_refptr<base::MessageLoopProxy> message_loop_proxy = @@ -163,4 +180,3 @@ TEST_F(BrowserThreadTest, PostTaskViaMessageLoopProxyAfterThreadIsDeleted) { EXPECT_FALSE(ret); EXPECT_TRUE(deleted); } - diff --git a/tools/valgrind/memcheck/suppressions.txt b/tools/valgrind/memcheck/suppressions.txt index 502f842..f148883 100644 --- a/tools/valgrind/memcheck/suppressions.txt +++ b/tools/valgrind/memcheck/suppressions.txt @@ -745,6 +745,13 @@ fun:_ZN4base40ToolsSanityTest_AccessesToNewMemory_Test8TestBodyEv } { + bug_86301 This test explicitly verifies PostTaskAndReply leaks the task if the originating MessageLoop has been deleted. + Memcheck:Leak + fun:_Znw* + fun:_ZN4base16MessageLoopProxy16PostTaskAndReplyERKN15tracked_objects8LocationERKNS_8CallbackIFvvEEES9_ + fun:_ZN4base12_GLOBAL__N_169MessageLoopProxyTest_PostTaskAndReply_DeadReplyLoopDoesNotDelete_Test8TestBodyEv +} +{ logging::InitLogging never frees filename. It would be hard to free properly. Memcheck:Leak ... |