diff options
author | jabdelmalek@google.com <jabdelmalek@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-04-06 20:33:36 +0000 |
---|---|---|
committer | jabdelmalek@google.com <jabdelmalek@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-04-06 20:33:36 +0000 |
commit | 1e9499c21c23b52391d952572bd9059df532efcb (patch) | |
tree | bd96f3ed1bd4226b062085316346c77e6bb999ab /ipc | |
parent | ae2e0f96d38961c160bf54962a55eb1f44a0f943 (diff) | |
download | chromium_src-1e9499c21c23b52391d952572bd9059df532efcb.zip chromium_src-1e9499c21c23b52391d952572bd9059df532efcb.tar.gz chromium_src-1e9499c21c23b52391d952572bd9059df532efcb.tar.bz2 |
Allow synchronous messages to be sent from threads other than the main thread. This simplifies code that needs to do this (i.e. webkit db and file threads).
BUG=23423
Review URL: http://codereview.chromium.org/1601005
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@43752 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'ipc')
-rw-r--r-- | ipc/ipc.gypi | 2 | ||||
-rw-r--r-- | ipc/ipc_sync_channel.h | 21 | ||||
-rw-r--r-- | ipc/ipc_sync_channel_unittest.cc | 65 | ||||
-rw-r--r-- | ipc/ipc_sync_message.cc | 10 | ||||
-rw-r--r-- | ipc/ipc_sync_message.h | 17 | ||||
-rw-r--r-- | ipc/ipc_sync_message_filter.cc | 126 | ||||
-rw-r--r-- | ipc/ipc_sync_message_filter.h | 79 |
7 files changed, 295 insertions, 25 deletions
diff --git a/ipc/ipc.gypi b/ipc/ipc.gypi index fb14f1f..52b27bf 100644 --- a/ipc/ipc.gypi +++ b/ipc/ipc.gypi @@ -36,6 +36,8 @@ 'ipc_sync_channel.h', 'ipc_sync_message.cc', 'ipc_sync_message.h', + 'ipc_sync_message_filter.cc', + 'ipc_sync_message_filter.h', ], 'include_dirs': [ '..', diff --git a/ipc/ipc_sync_channel.h b/ipc/ipc_sync_channel.h index cb88f47..e66dd92 100644 --- a/ipc/ipc_sync_channel.h +++ b/ipc/ipc_sync_channel.h @@ -13,6 +13,7 @@ #include "base/ref_counted.h" #include "base/waitable_event_watcher.h" #include "ipc/ipc_channel_proxy.h" +#include "ipc/ipc_sync_message.h" namespace base { class WaitableEvent; @@ -23,8 +24,8 @@ namespace IPC { class SyncMessage; class MessageReplyDeserializer; -// This is similar to IPC::ChannelProxy, with the added feature of supporting -// sending synchronous messages. +// This is similar to ChannelProxy, with the added feature of supporting sending +// synchronous messages. // Note that care must be taken that the lifetime of the ipc_thread argument // is more than this object. If the message loop goes away while this object // is running and it's used to send a message, then it will use the invalid @@ -63,7 +64,7 @@ class SyncChannel : public ChannelProxy, // Adds information about an outgoing sync message to the context so that // we know how to deserialize the reply. - void Push(IPC::SyncMessage* sync_msg); + void Push(SyncMessage* sync_msg); // Cleanly remove the top deserializer (and throw it away). Returns the // result of the Send call for that message. @@ -96,7 +97,7 @@ class SyncChannel : public ChannelProxy, private: ~SyncContext(); - // IPC::ChannelProxy methods that we override. + // ChannelProxy methods that we override. // Called on the listener thread. virtual void Clear(); @@ -113,18 +114,6 @@ class SyncChannel : public ChannelProxy, // WaitableEventWatcher::Delegate implementation. virtual void OnWaitableEventSignaled(base::WaitableEvent* arg); - // When sending a synchronous message, this structure contains an object - // that knows how to deserialize the response. - struct PendingSyncMsg { - PendingSyncMsg(int id, IPC::MessageReplyDeserializer* d, - base::WaitableEvent* e) : - id(id), deserializer(d), done_event(e), send_result(false) { } - int id; - IPC::MessageReplyDeserializer* deserializer; - base::WaitableEvent* done_event; - bool send_result; - }; - typedef std::deque<PendingSyncMsg> PendingSyncMessageQueue; PendingSyncMessageQueue deserializers_; Lock deserializers_lock_; diff --git a/ipc/ipc_sync_channel_unittest.cc b/ipc/ipc_sync_channel_unittest.cc index c9a5c88..6033c21 100644 --- a/ipc/ipc_sync_channel_unittest.cc +++ b/ipc/ipc_sync_channel_unittest.cc @@ -18,6 +18,7 @@ #include "base/waitable_event.h" #include "ipc/ipc_message.h" #include "ipc/ipc_sync_channel.h" +#include "ipc/ipc_sync_message_filter.h" #include "testing/gtest/include/gtest/gtest.h" @@ -120,12 +121,14 @@ class Worker : public Channel::Listener, public Message::Sender { } Channel::Mode mode() { return mode_; } WaitableEvent* done_event() { return done_.get(); } + WaitableEvent* shutdown_event() { return &shutdown_event_; } void ResetChannel() { channel_.reset(); } - - protected: // Derived classes need to call this when they've completed their part of // the test. void Done() { done_->Signal(); } + + protected: + IPC::SyncChannel* channel() { return channel_.get(); } // Functions for dervied classes to implement if they wish. virtual void Run() { } virtual void OnAnswer(int* answer) { NOTREACHED(); } @@ -1054,3 +1057,61 @@ TEST_F(IPCSyncChannelTest, DoneEventRace) { workers.push_back(new SimpleClient()); RunTest(workers); } + +//----------------------------------------------------------------------------- + +namespace { + +class TestSyncMessageFilter : public IPC::SyncMessageFilter { + public: + TestSyncMessageFilter(base::WaitableEvent* shutdown_event, Worker* worker) + : SyncMessageFilter(shutdown_event), + worker_(worker), + thread_("helper_thread") { + base::Thread::Options options; + options.message_loop_type = MessageLoop::TYPE_DEFAULT; + thread_.StartWithOptions(options); + } + + virtual void OnFilterAdded(Channel* channel) { + SyncMessageFilter::OnFilterAdded(channel); + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &TestSyncMessageFilter::SendMessageOnHelperThread)); + } + + void SendMessageOnHelperThread() { + int answer = 0; + bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer)); + DCHECK(result); + DCHECK_EQ(answer, 42); + + worker_->Done(); + } + + Worker* worker_; + base::Thread thread_; +}; + +class SyncMessageFilterServer : public Worker { + public: + SyncMessageFilterServer() + : Worker(Channel::MODE_SERVER, "sync_message_filter_server") { + filter_ = new TestSyncMessageFilter(shutdown_event(), this); + } + + void Run() { + channel()->AddFilter(filter_.get()); + } + + scoped_refptr<TestSyncMessageFilter> filter_; +}; + +} // namespace + +// Tests basic synchronous call +TEST_F(IPCSyncChannelTest, SyncMessageFilter) { + std::vector<Worker*> workers; + workers.push_back(new SyncMessageFilterServer()); + workers.push_back(new SimpleClient()); + RunTest(workers); +} diff --git a/ipc/ipc_sync_message.cc b/ipc/ipc_sync_message.cc index 23f3d16..8ae65fa 100644 --- a/ipc/ipc_sync_message.cc +++ b/ipc/ipc_sync_message.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. +// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -9,16 +9,18 @@ #endif #include <stack> +#include "base/atomic_sequence_num.h" #include "base/logging.h" #include "base/waitable_event.h" #include "ipc/ipc_sync_message.h" namespace IPC { -uint32 SyncMessage::next_id_ = 0; #define kSyncMessageHeaderSize 4 -base::WaitableEvent* dummy_event = new base::WaitableEvent(true, true); +static base::AtomicSequenceNumber g_next_id(base::LINKER_INITIALIZED); + +static base::WaitableEvent* dummy_event = new base::WaitableEvent(true, true); SyncMessage::SyncMessage( int32 routing_id, @@ -34,7 +36,7 @@ SyncMessage::SyncMessage( // Add synchronous message data before the message payload. SyncHeader header; - header.message_id = ++next_id_; + header.message_id = g_next_id.GetNext(); WriteSyncHeader(this, header); } diff --git a/ipc/ipc_sync_message.h b/ipc/ipc_sync_message.h index 48e9f99..ea6387a4 100644 --- a/ipc/ipc_sync_message.h +++ b/ipc/ipc_sync_message.h @@ -1,4 +1,4 @@ -// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -77,8 +77,6 @@ class SyncMessage : public Message { MessageReplyDeserializer* deserializer_; base::WaitableEvent* pump_messages_event_; - - static uint32 next_id_; // for generation of unique ids }; // Used to deserialize parameters from a reply to a synchronous message @@ -92,6 +90,19 @@ class MessageReplyDeserializer { virtual bool SerializeOutputParameters(const Message& msg, void* iter) = 0; }; +// When sending a synchronous message, this structure contains an object +// that knows how to deserialize the response. +struct PendingSyncMsg { + PendingSyncMsg(int id, + MessageReplyDeserializer* d, + base::WaitableEvent* e) + : id(id), deserializer(d), done_event(e), send_result(false) { } + int id; + MessageReplyDeserializer* deserializer; + base::WaitableEvent* done_event; + bool send_result; +}; + } // namespace IPC #endif // IPC_IPC_SYNC_MESSAGE_H_ diff --git a/ipc/ipc_sync_message_filter.cc b/ipc/ipc_sync_message_filter.cc new file mode 100644 index 0000000..db3f86e --- /dev/null +++ b/ipc/ipc_sync_message_filter.cc @@ -0,0 +1,126 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "ipc/ipc_sync_message_filter.h" + +#include "base/logging.h" +#include "base/message_loop.h" +#include "ipc/ipc_sync_message.h" + +namespace IPC { + +SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event) + : channel_(NULL), + listener_loop_(MessageLoop::current()), + io_loop_(NULL), + shutdown_event_(shutdown_event) { +} + +SyncMessageFilter::~SyncMessageFilter() { +} + +bool SyncMessageFilter::Send(Message* message) { + { + AutoLock auto_lock(lock_); + if (!io_loop_) { + delete message; + return false; + } + } + + if (!message->is_sync()) { + io_loop_->PostTask( + FROM_HERE, + NewRunnableMethod(this, &SyncMessageFilter::SendOnIOThread, message)); + return true; + } + + base::WaitableEvent done_event(true, false); + PendingSyncMsg pending_message( + SyncMessage::GetMessageId(*message), + reinterpret_cast<SyncMessage*>(message)->GetReplyDeserializer(), + &done_event); + + { + AutoLock auto_lock(lock_); + // Can't use this class on the main thread or else it can lead to deadlocks. + // Also by definition, can't use this on IO thread since we're blocking it. + DCHECK(MessageLoop::current() != listener_loop_); + DCHECK(MessageLoop::current() != io_loop_); + pending_sync_messages_[MessageLoop::current()] = &pending_message; + } + + io_loop_->PostTask( + FROM_HERE, + NewRunnableMethod(this, &SyncMessageFilter::SendOnIOThread, message)); + + base::WaitableEvent* events[2] = { shutdown_event_, &done_event }; + base::WaitableEvent::WaitMany(events, 2); + + { + AutoLock auto_lock(lock_); + delete pending_message.deserializer; + pending_sync_messages_.erase(MessageLoop::current()); + } + + return pending_message.send_result; +} + +void SyncMessageFilter::SendOnIOThread(Message* message) { + if (channel_) { + channel_->Send(message); + return; + } + + if (message->is_sync()) { + // We don't know which thread sent it, but it doesn't matter, just signal + // them all. + SignalAllEvents(); + } + + delete message; +} + +void SyncMessageFilter::SignalAllEvents() { + AutoLock auto_lock(lock_); + for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); + iter != pending_sync_messages_.end(); ++iter) { + iter->second->done_event->Signal(); + } +} + +void SyncMessageFilter::OnFilterAdded(Channel* channel) { + channel_ = channel; + AutoLock auto_lock(lock_); + io_loop_ = MessageLoop::current(); +} + +void SyncMessageFilter::OnChannelError() { + channel_ = NULL; + SignalAllEvents(); +} + +void SyncMessageFilter::OnChannelClosing() { + channel_ = NULL; + SignalAllEvents(); +} + +bool SyncMessageFilter::OnMessageReceived(const Message& message) { + AutoLock auto_lock(lock_); + for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); + iter != pending_sync_messages_.end(); ++iter) { + if (SyncMessage::IsMessageReplyTo(message, iter->second->id)) { + if (!message.is_reply_error()) { + iter->second->send_result = + iter->second->deserializer->SerializeOutputParameters(message); + } + iter->second->done_event->Signal(); + return true; + } + } + + return false; +} + +} // namespace IPC diff --git a/ipc/ipc_sync_message_filter.h b/ipc/ipc_sync_message_filter.h new file mode 100644 index 0000000..71d24c1 --- /dev/null +++ b/ipc/ipc_sync_message_filter.h @@ -0,0 +1,79 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef IPC_IPC_SYNC_MESSAGE_FILTER_H_ +#define IPC_IPC_SYNC_MESSAGE_FILTER_H_ + +#include "base/basictypes.h" +#include "base/hash_tables.h" +#include "base/lock.h" +#include "base/ref_counted.h" +#include "base/waitable_event.h" +#include "ipc/ipc_channel_proxy.h" +#include "ipc/ipc_sync_message.h" + +class MessageLoop; + +#if defined(COMPILER_GCC) +// Allows us to use MessageLoop in a hash_map with gcc (MSVC is okay without +// specifying this). +namespace __gnu_cxx { +template<> +struct hash<MessageLoop*> { + size_t operator()(MessageLoop* message_loop) const { + return reinterpret_cast<size_t>(message_loop); + } +}; +} +#endif + +namespace IPC { + +class MessageReplyDeserializer; + +// This MessageFilter allows sending synchronous IPC messages from a thread +// other than the listener thread associated with the SyncChannel. It does not +// support fancy features that SyncChannel does, such as handling recursion or +// receiving messages while waiting for a response. Note that this object can +// be used to send simultaneous synchronous messages from different threads. +class SyncMessageFilter : public ChannelProxy::MessageFilter, + public Message::Sender { + public: + explicit SyncMessageFilter(base::WaitableEvent* shutdown_event); + virtual ~SyncMessageFilter(); + + // Message::Sender implementation. + virtual bool Send(Message* message); + + // ChannelProxy::MessageFilter implementation. + virtual void OnFilterAdded(Channel* channel); + virtual void OnChannelError(); + virtual void OnChannelClosing(); + virtual bool OnMessageReceived(const Message& message); + + private: + void SendOnIOThread(Message* message); + // Signal all the pending sends as done, used in an error condition. + void SignalAllEvents(); + + // The channel to which this filter was added. + Channel* channel_; + + MessageLoop* listener_loop_; // The process's main thread. + MessageLoop* io_loop_; // The message loop where the Channel lives. + + typedef base::hash_map<MessageLoop*, PendingSyncMsg*> PendingSyncMessages; + PendingSyncMessages pending_sync_messages_; + + // Locks data members above. + Lock lock_; + + base::WaitableEvent* shutdown_event_; + + DISALLOW_COPY_AND_ASSIGN(SyncMessageFilter); +}; + +} // namespace IPC + +#endif // IPC_IPC_SYNC_MESSAGE_FILTER_H_ |