summaryrefslogtreecommitdiffstats
path: root/ipc
diff options
context:
space:
mode:
authorjabdelmalek@google.com <jabdelmalek@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2010-04-06 20:33:36 +0000
committerjabdelmalek@google.com <jabdelmalek@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2010-04-06 20:33:36 +0000
commit1e9499c21c23b52391d952572bd9059df532efcb (patch)
treebd96f3ed1bd4226b062085316346c77e6bb999ab /ipc
parentae2e0f96d38961c160bf54962a55eb1f44a0f943 (diff)
downloadchromium_src-1e9499c21c23b52391d952572bd9059df532efcb.zip
chromium_src-1e9499c21c23b52391d952572bd9059df532efcb.tar.gz
chromium_src-1e9499c21c23b52391d952572bd9059df532efcb.tar.bz2
Allow synchronous messages to be sent from threads other than the main thread. This simplifies code that needs to do this (i.e. webkit db and file threads).
BUG=23423 Review URL: http://codereview.chromium.org/1601005 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@43752 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'ipc')
-rw-r--r--ipc/ipc.gypi2
-rw-r--r--ipc/ipc_sync_channel.h21
-rw-r--r--ipc/ipc_sync_channel_unittest.cc65
-rw-r--r--ipc/ipc_sync_message.cc10
-rw-r--r--ipc/ipc_sync_message.h17
-rw-r--r--ipc/ipc_sync_message_filter.cc126
-rw-r--r--ipc/ipc_sync_message_filter.h79
7 files changed, 295 insertions, 25 deletions
diff --git a/ipc/ipc.gypi b/ipc/ipc.gypi
index fb14f1f..52b27bf 100644
--- a/ipc/ipc.gypi
+++ b/ipc/ipc.gypi
@@ -36,6 +36,8 @@
'ipc_sync_channel.h',
'ipc_sync_message.cc',
'ipc_sync_message.h',
+ 'ipc_sync_message_filter.cc',
+ 'ipc_sync_message_filter.h',
],
'include_dirs': [
'..',
diff --git a/ipc/ipc_sync_channel.h b/ipc/ipc_sync_channel.h
index cb88f47..e66dd92 100644
--- a/ipc/ipc_sync_channel.h
+++ b/ipc/ipc_sync_channel.h
@@ -13,6 +13,7 @@
#include "base/ref_counted.h"
#include "base/waitable_event_watcher.h"
#include "ipc/ipc_channel_proxy.h"
+#include "ipc/ipc_sync_message.h"
namespace base {
class WaitableEvent;
@@ -23,8 +24,8 @@ namespace IPC {
class SyncMessage;
class MessageReplyDeserializer;
-// This is similar to IPC::ChannelProxy, with the added feature of supporting
-// sending synchronous messages.
+// This is similar to ChannelProxy, with the added feature of supporting sending
+// synchronous messages.
// Note that care must be taken that the lifetime of the ipc_thread argument
// is more than this object. If the message loop goes away while this object
// is running and it's used to send a message, then it will use the invalid
@@ -63,7 +64,7 @@ class SyncChannel : public ChannelProxy,
// Adds information about an outgoing sync message to the context so that
// we know how to deserialize the reply.
- void Push(IPC::SyncMessage* sync_msg);
+ void Push(SyncMessage* sync_msg);
// Cleanly remove the top deserializer (and throw it away). Returns the
// result of the Send call for that message.
@@ -96,7 +97,7 @@ class SyncChannel : public ChannelProxy,
private:
~SyncContext();
- // IPC::ChannelProxy methods that we override.
+ // ChannelProxy methods that we override.
// Called on the listener thread.
virtual void Clear();
@@ -113,18 +114,6 @@ class SyncChannel : public ChannelProxy,
// WaitableEventWatcher::Delegate implementation.
virtual void OnWaitableEventSignaled(base::WaitableEvent* arg);
- // When sending a synchronous message, this structure contains an object
- // that knows how to deserialize the response.
- struct PendingSyncMsg {
- PendingSyncMsg(int id, IPC::MessageReplyDeserializer* d,
- base::WaitableEvent* e) :
- id(id), deserializer(d), done_event(e), send_result(false) { }
- int id;
- IPC::MessageReplyDeserializer* deserializer;
- base::WaitableEvent* done_event;
- bool send_result;
- };
-
typedef std::deque<PendingSyncMsg> PendingSyncMessageQueue;
PendingSyncMessageQueue deserializers_;
Lock deserializers_lock_;
diff --git a/ipc/ipc_sync_channel_unittest.cc b/ipc/ipc_sync_channel_unittest.cc
index c9a5c88..6033c21 100644
--- a/ipc/ipc_sync_channel_unittest.cc
+++ b/ipc/ipc_sync_channel_unittest.cc
@@ -18,6 +18,7 @@
#include "base/waitable_event.h"
#include "ipc/ipc_message.h"
#include "ipc/ipc_sync_channel.h"
+#include "ipc/ipc_sync_message_filter.h"
#include "testing/gtest/include/gtest/gtest.h"
@@ -120,12 +121,14 @@ class Worker : public Channel::Listener, public Message::Sender {
}
Channel::Mode mode() { return mode_; }
WaitableEvent* done_event() { return done_.get(); }
+ WaitableEvent* shutdown_event() { return &shutdown_event_; }
void ResetChannel() { channel_.reset(); }
-
- protected:
// Derived classes need to call this when they've completed their part of
// the test.
void Done() { done_->Signal(); }
+
+ protected:
+ IPC::SyncChannel* channel() { return channel_.get(); }
// Functions for dervied classes to implement if they wish.
virtual void Run() { }
virtual void OnAnswer(int* answer) { NOTREACHED(); }
@@ -1054,3 +1057,61 @@ TEST_F(IPCSyncChannelTest, DoneEventRace) {
workers.push_back(new SimpleClient());
RunTest(workers);
}
+
+//-----------------------------------------------------------------------------
+
+namespace {
+
+class TestSyncMessageFilter : public IPC::SyncMessageFilter {
+ public:
+ TestSyncMessageFilter(base::WaitableEvent* shutdown_event, Worker* worker)
+ : SyncMessageFilter(shutdown_event),
+ worker_(worker),
+ thread_("helper_thread") {
+ base::Thread::Options options;
+ options.message_loop_type = MessageLoop::TYPE_DEFAULT;
+ thread_.StartWithOptions(options);
+ }
+
+ virtual void OnFilterAdded(Channel* channel) {
+ SyncMessageFilter::OnFilterAdded(channel);
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
+ this, &TestSyncMessageFilter::SendMessageOnHelperThread));
+ }
+
+ void SendMessageOnHelperThread() {
+ int answer = 0;
+ bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
+ DCHECK(result);
+ DCHECK_EQ(answer, 42);
+
+ worker_->Done();
+ }
+
+ Worker* worker_;
+ base::Thread thread_;
+};
+
+class SyncMessageFilterServer : public Worker {
+ public:
+ SyncMessageFilterServer()
+ : Worker(Channel::MODE_SERVER, "sync_message_filter_server") {
+ filter_ = new TestSyncMessageFilter(shutdown_event(), this);
+ }
+
+ void Run() {
+ channel()->AddFilter(filter_.get());
+ }
+
+ scoped_refptr<TestSyncMessageFilter> filter_;
+};
+
+} // namespace
+
+// Tests basic synchronous call
+TEST_F(IPCSyncChannelTest, SyncMessageFilter) {
+ std::vector<Worker*> workers;
+ workers.push_back(new SyncMessageFilterServer());
+ workers.push_back(new SimpleClient());
+ RunTest(workers);
+}
diff --git a/ipc/ipc_sync_message.cc b/ipc/ipc_sync_message.cc
index 23f3d16..8ae65fa 100644
--- a/ipc/ipc_sync_message.cc
+++ b/ipc/ipc_sync_message.cc
@@ -1,4 +1,4 @@
-// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
@@ -9,16 +9,18 @@
#endif
#include <stack>
+#include "base/atomic_sequence_num.h"
#include "base/logging.h"
#include "base/waitable_event.h"
#include "ipc/ipc_sync_message.h"
namespace IPC {
-uint32 SyncMessage::next_id_ = 0;
#define kSyncMessageHeaderSize 4
-base::WaitableEvent* dummy_event = new base::WaitableEvent(true, true);
+static base::AtomicSequenceNumber g_next_id(base::LINKER_INITIALIZED);
+
+static base::WaitableEvent* dummy_event = new base::WaitableEvent(true, true);
SyncMessage::SyncMessage(
int32 routing_id,
@@ -34,7 +36,7 @@ SyncMessage::SyncMessage(
// Add synchronous message data before the message payload.
SyncHeader header;
- header.message_id = ++next_id_;
+ header.message_id = g_next_id.GetNext();
WriteSyncHeader(this, header);
}
diff --git a/ipc/ipc_sync_message.h b/ipc/ipc_sync_message.h
index 48e9f99..ea6387a4 100644
--- a/ipc/ipc_sync_message.h
+++ b/ipc/ipc_sync_message.h
@@ -1,4 +1,4 @@
-// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
@@ -77,8 +77,6 @@ class SyncMessage : public Message {
MessageReplyDeserializer* deserializer_;
base::WaitableEvent* pump_messages_event_;
-
- static uint32 next_id_; // for generation of unique ids
};
// Used to deserialize parameters from a reply to a synchronous message
@@ -92,6 +90,19 @@ class MessageReplyDeserializer {
virtual bool SerializeOutputParameters(const Message& msg, void* iter) = 0;
};
+// When sending a synchronous message, this structure contains an object
+// that knows how to deserialize the response.
+struct PendingSyncMsg {
+ PendingSyncMsg(int id,
+ MessageReplyDeserializer* d,
+ base::WaitableEvent* e)
+ : id(id), deserializer(d), done_event(e), send_result(false) { }
+ int id;
+ MessageReplyDeserializer* deserializer;
+ base::WaitableEvent* done_event;
+ bool send_result;
+};
+
} // namespace IPC
#endif // IPC_IPC_SYNC_MESSAGE_H_
diff --git a/ipc/ipc_sync_message_filter.cc b/ipc/ipc_sync_message_filter.cc
new file mode 100644
index 0000000..db3f86e
--- /dev/null
+++ b/ipc/ipc_sync_message_filter.cc
@@ -0,0 +1,126 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/ipc_sync_message_filter.h"
+
+#include "base/logging.h"
+#include "base/message_loop.h"
+#include "ipc/ipc_sync_message.h"
+
+namespace IPC {
+
+SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event)
+ : channel_(NULL),
+ listener_loop_(MessageLoop::current()),
+ io_loop_(NULL),
+ shutdown_event_(shutdown_event) {
+}
+
+SyncMessageFilter::~SyncMessageFilter() {
+}
+
+bool SyncMessageFilter::Send(Message* message) {
+ {
+ AutoLock auto_lock(lock_);
+ if (!io_loop_) {
+ delete message;
+ return false;
+ }
+ }
+
+ if (!message->is_sync()) {
+ io_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &SyncMessageFilter::SendOnIOThread, message));
+ return true;
+ }
+
+ base::WaitableEvent done_event(true, false);
+ PendingSyncMsg pending_message(
+ SyncMessage::GetMessageId(*message),
+ reinterpret_cast<SyncMessage*>(message)->GetReplyDeserializer(),
+ &done_event);
+
+ {
+ AutoLock auto_lock(lock_);
+ // Can't use this class on the main thread or else it can lead to deadlocks.
+ // Also by definition, can't use this on IO thread since we're blocking it.
+ DCHECK(MessageLoop::current() != listener_loop_);
+ DCHECK(MessageLoop::current() != io_loop_);
+ pending_sync_messages_[MessageLoop::current()] = &pending_message;
+ }
+
+ io_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &SyncMessageFilter::SendOnIOThread, message));
+
+ base::WaitableEvent* events[2] = { shutdown_event_, &done_event };
+ base::WaitableEvent::WaitMany(events, 2);
+
+ {
+ AutoLock auto_lock(lock_);
+ delete pending_message.deserializer;
+ pending_sync_messages_.erase(MessageLoop::current());
+ }
+
+ return pending_message.send_result;
+}
+
+void SyncMessageFilter::SendOnIOThread(Message* message) {
+ if (channel_) {
+ channel_->Send(message);
+ return;
+ }
+
+ if (message->is_sync()) {
+ // We don't know which thread sent it, but it doesn't matter, just signal
+ // them all.
+ SignalAllEvents();
+ }
+
+ delete message;
+}
+
+void SyncMessageFilter::SignalAllEvents() {
+ AutoLock auto_lock(lock_);
+ for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin();
+ iter != pending_sync_messages_.end(); ++iter) {
+ iter->second->done_event->Signal();
+ }
+}
+
+void SyncMessageFilter::OnFilterAdded(Channel* channel) {
+ channel_ = channel;
+ AutoLock auto_lock(lock_);
+ io_loop_ = MessageLoop::current();
+}
+
+void SyncMessageFilter::OnChannelError() {
+ channel_ = NULL;
+ SignalAllEvents();
+}
+
+void SyncMessageFilter::OnChannelClosing() {
+ channel_ = NULL;
+ SignalAllEvents();
+}
+
+bool SyncMessageFilter::OnMessageReceived(const Message& message) {
+ AutoLock auto_lock(lock_);
+ for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin();
+ iter != pending_sync_messages_.end(); ++iter) {
+ if (SyncMessage::IsMessageReplyTo(message, iter->second->id)) {
+ if (!message.is_reply_error()) {
+ iter->second->send_result =
+ iter->second->deserializer->SerializeOutputParameters(message);
+ }
+ iter->second->done_event->Signal();
+ return true;
+ }
+ }
+
+ return false;
+}
+
+} // namespace IPC
diff --git a/ipc/ipc_sync_message_filter.h b/ipc/ipc_sync_message_filter.h
new file mode 100644
index 0000000..71d24c1
--- /dev/null
+++ b/ipc/ipc_sync_message_filter.h
@@ -0,0 +1,79 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPC_IPC_SYNC_MESSAGE_FILTER_H_
+#define IPC_IPC_SYNC_MESSAGE_FILTER_H_
+
+#include "base/basictypes.h"
+#include "base/hash_tables.h"
+#include "base/lock.h"
+#include "base/ref_counted.h"
+#include "base/waitable_event.h"
+#include "ipc/ipc_channel_proxy.h"
+#include "ipc/ipc_sync_message.h"
+
+class MessageLoop;
+
+#if defined(COMPILER_GCC)
+// Allows us to use MessageLoop in a hash_map with gcc (MSVC is okay without
+// specifying this).
+namespace __gnu_cxx {
+template<>
+struct hash<MessageLoop*> {
+ size_t operator()(MessageLoop* message_loop) const {
+ return reinterpret_cast<size_t>(message_loop);
+ }
+};
+}
+#endif
+
+namespace IPC {
+
+class MessageReplyDeserializer;
+
+// This MessageFilter allows sending synchronous IPC messages from a thread
+// other than the listener thread associated with the SyncChannel. It does not
+// support fancy features that SyncChannel does, such as handling recursion or
+// receiving messages while waiting for a response. Note that this object can
+// be used to send simultaneous synchronous messages from different threads.
+class SyncMessageFilter : public ChannelProxy::MessageFilter,
+ public Message::Sender {
+ public:
+ explicit SyncMessageFilter(base::WaitableEvent* shutdown_event);
+ virtual ~SyncMessageFilter();
+
+ // Message::Sender implementation.
+ virtual bool Send(Message* message);
+
+ // ChannelProxy::MessageFilter implementation.
+ virtual void OnFilterAdded(Channel* channel);
+ virtual void OnChannelError();
+ virtual void OnChannelClosing();
+ virtual bool OnMessageReceived(const Message& message);
+
+ private:
+ void SendOnIOThread(Message* message);
+ // Signal all the pending sends as done, used in an error condition.
+ void SignalAllEvents();
+
+ // The channel to which this filter was added.
+ Channel* channel_;
+
+ MessageLoop* listener_loop_; // The process's main thread.
+ MessageLoop* io_loop_; // The message loop where the Channel lives.
+
+ typedef base::hash_map<MessageLoop*, PendingSyncMsg*> PendingSyncMessages;
+ PendingSyncMessages pending_sync_messages_;
+
+ // Locks data members above.
+ Lock lock_;
+
+ base::WaitableEvent* shutdown_event_;
+
+ DISALLOW_COPY_AND_ASSIGN(SyncMessageFilter);
+};
+
+} // namespace IPC
+
+#endif // IPC_IPC_SYNC_MESSAGE_FILTER_H_