summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjam@chromium.org <jam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2008-10-24 19:21:13 +0000
committerjam@chromium.org <jam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2008-10-24 19:21:13 +0000
commit3cdb7af81de506be730544edcfe4a5547f0fdaea (patch)
tree223c1e000b0eccf637565345904fb4055e41c5ea
parent8947da6c7cd943bcb4be80a258e263a84aa15006 (diff)
downloadchromium_src-3cdb7af81de506be730544edcfe4a5547f0fdaea.zip
chromium_src-3cdb7af81de506be730544edcfe4a5547f0fdaea.tar.gz
chromium_src-3cdb7af81de506be730544edcfe4a5547f0fdaea.tar.bz2
Make IPC::SyncChannel not duplicate the underlying MessageLoop implementation by pumping messages on its own. This fixes the problem of windowless plugins not painting on right click, and generally makes this class almost ported, other than using a generic version of events/locks.Through this change I've also cleaned up the class and hopefully made it more understandable.
Review URL: http://codereview.chromium.org/8001 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@3934 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--base/object_watcher.cc7
-rw-r--r--base/object_watcher.h4
-rw-r--r--base/object_watcher_unittest.cc3
-rw-r--r--chrome/common/ipc_channel_proxy.cc24
-rw-r--r--chrome/common/ipc_channel_proxy.h24
-rw-r--r--chrome/common/ipc_sync_channel.cc406
-rw-r--r--chrome/common/ipc_sync_channel.h93
-rw-r--r--chrome/common/ipc_sync_channel_unittest.cc477
-rw-r--r--chrome/common/ipc_sync_message.h5
9 files changed, 668 insertions, 375 deletions
diff --git a/base/object_watcher.cc b/base/object_watcher.cc
index 4e6e57a..cad281f 100644
--- a/base/object_watcher.cc
+++ b/base/object_watcher.cc
@@ -107,6 +107,13 @@ bool ObjectWatcher::StopWatching() {
return true;
}
+HANDLE ObjectWatcher::GetWatchedObject() {
+ if (!watch_)
+ return NULL;
+
+ return watch_->object;
+}
+
// static
void CALLBACK ObjectWatcher::DoneWaiting(void* param, BOOLEAN timed_out) {
DCHECK(!timed_out);
diff --git a/base/object_watcher.h b/base/object_watcher.h
index 3a0b9a9..4660ce7 100644
--- a/base/object_watcher.h
+++ b/base/object_watcher.h
@@ -68,6 +68,10 @@ class ObjectWatcher : public MessageLoop::DestructionObserver {
//
bool StopWatching();
+ // Returns the handle of the object being watched, or NULL if the object
+ // watcher is stopped.
+ HANDLE GetWatchedObject();
+
private:
// Called on a background thread when done waiting.
static void CALLBACK DoneWaiting(void* param, BOOLEAN timed_out);
diff --git a/base/object_watcher_unittest.cc b/base/object_watcher_unittest.cc
index 3d2b068..06e7284 100644
--- a/base/object_watcher_unittest.cc
+++ b/base/object_watcher_unittest.cc
@@ -34,6 +34,7 @@ void RunTest_BasicSignal(MessageLoop::Type message_loop_type) {
MessageLoop message_loop(message_loop_type);
base::ObjectWatcher watcher;
+ EXPECT_EQ(NULL, watcher.GetWatchedObject());
// A manual-reset event that is not yet signaled.
HANDLE event = CreateEvent(NULL, TRUE, FALSE, NULL);
@@ -41,11 +42,13 @@ void RunTest_BasicSignal(MessageLoop::Type message_loop_type) {
QuitDelegate delegate;
bool ok = watcher.StartWatching(event, &delegate);
EXPECT_TRUE(ok);
+ EXPECT_EQ(event, watcher.GetWatchedObject());
SetEvent(event);
MessageLoop::current()->Run();
+ EXPECT_EQ(NULL, watcher.GetWatchedObject());
CloseHandle(event);
}
diff --git a/chrome/common/ipc_channel_proxy.cc b/chrome/common/ipc_channel_proxy.cc
index baac273..23d8ea9 100644
--- a/chrome/common/ipc_channel_proxy.cc
+++ b/chrome/common/ipc_channel_proxy.cc
@@ -52,14 +52,16 @@ bool ChannelProxy::Context::TryFilters(const Message& message) {
// Called on the IPC::Channel thread
void ChannelProxy::Context::OnMessageReceived(const Message& message) {
// First give a chance to the filters to process this message.
- if (TryFilters(message))
- return;
+ if (!TryFilters(message))
+ OnMessageReceivedNoFilter(message);
+}
+// Called on the IPC::Channel thread
+void ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
// NOTE: This code relies on the listener's message loop not going away while
// this thread is active. That should be a reasonable assumption, but it
// feels risky. We may want to invent some more indirect way of referring to
// a MessageLoop if this becomes a problem.
-
listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
this, &Context::OnDispatchMessage, message));
}
@@ -82,7 +84,7 @@ void ChannelProxy::Context::OnChannelError() {
}
// Called on the IPC::Channel thread
-void ChannelProxy::Context::OnOpenChannel() {
+void ChannelProxy::Context::OnChannelOpened() {
DCHECK(channel_ != NULL);
// Assume a reference to ourselves on behalf of this thread. This reference
@@ -99,7 +101,7 @@ void ChannelProxy::Context::OnOpenChannel() {
}
// Called on the IPC::Channel thread
-void ChannelProxy::Context::OnCloseChannel() {
+void ChannelProxy::Context::OnChannelClosed() {
// It's okay for IPC::ChannelProxy::Close to be called more than once, which
// would result in this branch being taken.
if (!channel_)
@@ -220,22 +222,18 @@ void ChannelProxy::Init(const std::wstring& channel_id, Channel::Mode mode,
// complete initialization on the background thread
context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
- context_.get(), &Context::OnOpenChannel));
+ context_.get(), &Context::OnChannelOpened));
}
void ChannelProxy::Close() {
// Clear the backpointer to the listener so that any pending calls to
// Context::OnDispatchMessage or OnDispatchError will be ignored. It is
// possible that the channel could be closed while it is receiving messages!
- context_->clear();
+ context_->Clear();
- if (MessageLoop::current() == context_->ipc_message_loop()) {
- // We're being destructed on the IPC thread, so no need to use the message
- // loop as it might go away.
- context_->OnCloseChannel();
- } else {
+ if (context_->ipc_message_loop()) {
context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
- context_.get(), &Context::OnCloseChannel));
+ context_.get(), &Context::OnChannelClosed));
}
}
diff --git a/chrome/common/ipc_channel_proxy.h b/chrome/common/ipc_channel_proxy.h
index 8a722d9..59c8e47 100644
--- a/chrome/common/ipc_channel_proxy.h
+++ b/chrome/common/ipc_channel_proxy.h
@@ -112,8 +112,6 @@ class ChannelProxy : public Message::Sender {
void RemoveFilter(MessageFilter* filter);
protected:
- Channel::Listener* listener() const { return context_->listener(); }
-
class Context;
// A subclass uses this constructor if it needs to add more information
// to the internal state. If create_pipe_now is true, the pipe is created
@@ -129,6 +127,7 @@ class ChannelProxy : public Message::Sender {
Context(Channel::Listener* listener, MessageFilter* filter,
MessageLoop* ipc_thread);
virtual ~Context() { }
+ MessageLoop* ipc_message_loop() const { return ipc_message_loop_; }
protected:
// IPC::Channel::Listener methods:
@@ -136,6 +135,9 @@ class ChannelProxy : public Message::Sender {
virtual void OnChannelConnected(int32 peer_pid);
virtual void OnChannelError();
+ // Like OnMessageReceived but doesn't try the filters.
+ void OnMessageReceivedNoFilter(const Message& message);
+
Channel::Listener* listener() const { return listener_; }
const std::wstring& channel_id() const { return channel_id_; }
@@ -143,14 +145,21 @@ class ChannelProxy : public Message::Sender {
// Returns true if the message was processed, false otherwise.
bool TryFilters(const Message& message);
+ // Like Open and Close, but called on the IPC thread.
+ virtual void OnChannelOpened();
+ virtual void OnChannelClosed();
+
+ // Called on the consumers thread when the ChannelProxy is closed. At that
+ // point the consumer is telling us that they don't want to receive any
+ // more messages, so we honor that wish by forgetting them!
+ virtual void Clear() { listener_ = NULL; }
+
private:
friend class ChannelProxy;
// Create the Channel
void CreateChannel(const std::wstring& id, const Channel::Mode& mode);
// Methods called via InvokeLater:
- void OnOpenChannel();
- void OnCloseChannel();
void OnSendMessage(Message* message_ptr);
void OnAddFilter(MessageFilter* filter);
void OnRemoveFilter(MessageFilter* filter);
@@ -158,13 +167,6 @@ class ChannelProxy : public Message::Sender {
void OnDispatchConnected(int32 peer_pid);
void OnDispatchError();
- MessageLoop* ipc_message_loop() const { return ipc_message_loop_; }
-
- // Called on the consumers thread when the ChannelProxy is closed. At that
- // point the consumer is telling us that they don't want to receive any
- // more messages, so we honor that wish by forgetting them!
- void clear() { listener_ = NULL; }
-
MessageLoop* listener_message_loop_;
Channel::Listener* listener_;
diff --git a/chrome/common/ipc_sync_channel.cc b/chrome/common/ipc_sync_channel.cc
index fec8965..01ac78e 100644
--- a/chrome/common/ipc_sync_channel.cc
+++ b/chrome/common/ipc_sync_channel.cc
@@ -37,17 +37,21 @@ class SyncChannel::ReceivedSyncMsgQueue;
class SyncChannel::ReceivedSyncMsgQueue :
public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
public:
- ReceivedSyncMsgQueue() :
- blocking_event_(CreateEvent(NULL, FALSE, FALSE, NULL)),
- task_pending_(false),
- listener_message_loop_(MessageLoop::current()) {
+ // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
+ // if necessary. Call RemoveListener on the same thread when done.
+ static ReceivedSyncMsgQueue* AddListener() {
+ // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
+ // SyncChannel objects can block the same thread).
+ ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
+ if (!rv) {
+ rv = new ReceivedSyncMsgQueue();
+ ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
+ }
+ rv->listener_count_++;
+ return rv;
}
~ReceivedSyncMsgQueue() {
- DCHECK(lazy_tls_ptr_.Pointer()->Get());
- DCHECK(MessageLoop::current() == listener_message_loop_);
- CloseHandle(blocking_event_);
- lazy_tls_ptr_.Pointer()->Set(NULL);
}
// Called on IPC thread when a synchronous message or reply arrives.
@@ -66,7 +70,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
channel_id));
}
- SetEvent(blocking_event_);
+ SetEvent(dispatch_event_);
if (!was_task_pending) {
listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
this, &ReceivedSyncMsgQueue::DispatchMessagesTask));
@@ -105,7 +109,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
}
#ifdef IPC_MESSAGE_LOG_ENABLED
- IPC::Logging* logger = IPC::Logging::current();
+ Logging* logger = Logging::current();
if (logger->Enabled())
logger->OnPreDispatchMessage(*message);
#endif
@@ -123,7 +127,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
}
// Called on the IPC thread when the current sync Send() call is unblocked.
- void OnUnblock() {
+ void DidUnblock() {
if (!received_replies_.empty()) {
MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod(
this, &ReceivedSyncMsgQueue::DispatchReplies));
@@ -149,9 +153,14 @@ class SyncChannel::ReceivedSyncMsgQueue :
message_queue_.push(temp_queue.front());
temp_queue.pop();
}
+
+ if (--listener_count_ == 0) {
+ DCHECK(lazy_tls_ptr_.Pointer()->Get());
+ lazy_tls_ptr_.Pointer()->Set(NULL);
+ }
}
- HANDLE blocking_event() { return blocking_event_; }
+ HANDLE dispatch_event() { return dispatch_event_; }
MessageLoop* listener_message_loop() { return listener_message_loop_; }
// Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
@@ -159,12 +168,19 @@ class SyncChannel::ReceivedSyncMsgQueue :
lazy_tls_ptr_;
private:
+ ReceivedSyncMsgQueue() :
+ dispatch_event_(CreateEvent(NULL, TRUE, FALSE, NULL)),
+ task_pending_(false),
+ listener_message_loop_(MessageLoop::current()),
+ listener_count_(0) {
+ }
+
// Called on the ipc thread to check if we can unblock any current Send()
// calls based on a queued reply.
void DispatchReplies() {
for (size_t i = 0; i < received_replies_.size(); ++i) {
Message* message = received_replies_[i].message;
- if (received_replies_[i].context->UnblockListener(message)) {
+ if (received_replies_[i].context->TryToUnblockListener(message)) {
delete message;
received_replies_.erase(received_replies_.begin() + i);
return;
@@ -172,13 +188,6 @@ class SyncChannel::ReceivedSyncMsgQueue :
}
}
- // Set when we got a synchronous message that we must respond to as the
- // sender needs its reply before it can reply to our original synchronous
- // message.
- HANDLE blocking_event_;
-
- MessageLoop* listener_message_loop_;
-
// Holds information about a queued synchronous message.
struct ReceivedMessage {
ReceivedMessage(Message* m, Channel::Listener* l, const std::wstring& i)
@@ -190,8 +199,6 @@ class SyncChannel::ReceivedSyncMsgQueue :
typedef std::queue<ReceivedMessage> SyncMessageQueue;
SyncMessageQueue message_queue_;
- Lock message_lock_;
- bool task_pending_;
// Holds information about a queued reply message.
struct Reply {
@@ -204,6 +211,15 @@ class SyncChannel::ReceivedSyncMsgQueue :
};
std::vector<Reply> received_replies_;
+
+ // Set when we got a synchronous message that we must respond to as the
+ // sender needs its reply before it can reply to our original synchronous
+ // message.
+ ScopedHandle dispatch_event_;
+ MessageLoop* listener_message_loop_;
+ Lock message_lock_;
+ bool task_pending_;
+ int listener_count_;
};
base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
@@ -212,120 +228,89 @@ base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
SyncChannel::SyncContext::SyncContext(
Channel::Listener* listener,
MessageFilter* filter,
- MessageLoop* ipc_thread)
+ MessageLoop* ipc_thread,
+ HANDLE shutdown_event)
: ChannelProxy::Context(listener, filter, ipc_thread),
- channel_closed_(false),
- reply_deserialize_result_(false) {
- // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
- // SyncChannel objects that can block the same thread).
- received_sync_msgs_ = ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Get();
-
- if (!received_sync_msgs_) {
- // Stash a pointer to the listener thread's ReceivedSyncMsgQueue, as we
- // need to be able to access it in the IPC thread.
- received_sync_msgs_ = new ReceivedSyncMsgQueue();
- ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(received_sync_msgs_);
- }
-
- // Addref manually so that we can ensure destruction on the listener thread
- // (so that the TLS object is NULLd).
- received_sync_msgs_->AddRef();
+ shutdown_event_(shutdown_event),
+ received_sync_msgs_(ReceivedSyncMsgQueue::AddListener()){
}
SyncChannel::SyncContext::~SyncContext() {
while (!deserializers_.empty())
- PopDeserializer(true);
-
- received_sync_msgs_->listener_message_loop()->ReleaseSoon(
- FROM_HERE, received_sync_msgs_);
+ Pop();
}
// Adds information about an outgoing sync message to the context so that
// we know how to deserialize the reply. Returns a handle that's set when
// the reply has arrived.
-HANDLE SyncChannel::SyncContext::Push(IPC::SyncMessage* sync_msg) {
- PendingSyncMsg pending(IPC::SyncMessage::GetMessageId(*sync_msg),
+void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
+ PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
sync_msg->GetReplyDeserializer(),
CreateEvent(NULL, FALSE, FALSE, NULL));
AutoLock auto_lock(deserializers_lock_);
- deserializers_.push(pending);
+ deserializers_.push_back(pending);
+}
+
+bool SyncChannel::SyncContext::Pop() {
+ AutoLock auto_lock(deserializers_lock_);
+ PendingSyncMsg msg = deserializers_.back();
+ delete msg.deserializer;
+ CloseHandle(msg.done_event);
+ deserializers_.pop_back();
+ return msg.send_result;
+}
- return pending.reply_event;
+HANDLE SyncChannel::SyncContext::GetSendDoneEvent() {
+ AutoLock auto_lock(deserializers_lock_);
+ return deserializers_.back().done_event;
}
-HANDLE SyncChannel::SyncContext::blocking_event() {
- return received_sync_msgs_->blocking_event();
+HANDLE SyncChannel::SyncContext::GetDispatchEvent() {
+ return received_sync_msgs_->dispatch_event();
}
void SyncChannel::SyncContext::DispatchMessages() {
received_sync_msgs_->DispatchMessages();
}
-void SyncChannel::SyncContext::RemoveListener(Channel::Listener* listener) {
- received_sync_msgs_->RemoveListener(listener);
-}
-
-bool SyncChannel::SyncContext::UnblockListener(const Message* msg) {
- bool rv = false;
- HANDLE reply_event = NULL;
+bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
{
- if (channel_closed_) {
- // The channel is closed, or we couldn't connect, so cancel all Send()
- // calls.
- reply_deserialize_result_ = false;
- {
- AutoLock auto_lock(deserializers_lock_);
- if (!deserializers_.empty())
- reply_event = deserializers_.top().reply_event;
- }
+ AutoLock auto_lock(deserializers_lock_);
+ if (deserializers_.empty() ||
+ !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
+ return false;
+ }
- if (reply_event)
- PopDeserializer(false);
- } else {
- {
- AutoLock auto_lock(deserializers_lock_);
- if (deserializers_.empty())
- return false;
-
- if (!IPC::SyncMessage::IsMessageReplyTo(*msg, deserializers_.top().id))
- return false;
-
- rv = true;
- if (msg->is_reply_error()) {
- reply_deserialize_result_ = false;
- } else {
- reply_deserialize_result_ = deserializers_.top().deserializer->
- SerializeOutputParameters(*msg);
- }
-
- // Can't CloseHandle the event just yet, since doing so might cause the
- // Wait call above to never return.
- reply_event = deserializers_.top().reply_event;
- }
- PopDeserializer(false);
+ if (!msg->is_reply_error()) {
+ deserializers_.back().send_result = deserializers_.back().deserializer->
+ SerializeOutputParameters(*msg);
}
+ SetEvent(deserializers_.back().done_event);
}
- if (reply_event)
- SetEvent(reply_event);
-
// We got a reply to a synchronous Send() call that's blocking the listener
// thread. However, further down the call stack there could be another
// blocking Send() call, whose reply we received after we made this last
// Send() call. So check if we have any queued replies available that
// can now unblock the listener thread.
- received_sync_msgs_->OnUnblock();
+ received_sync_msgs_->DidUnblock();
+
+ return true;
+}
+
+void SyncChannel::SyncContext::Clear() {
+ CancelPendingSends();
+ received_sync_msgs_->RemoveListener(listener());
- return rv;
+ Context::Clear();
}
-// Called on the IPC thread.
void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
// Give the filters a chance at processing this message.
if (TryFilters(msg))
return;
- if (UnblockListener(&msg))
+ if (TryToUnblockListener(&msg))
return;
if (msg.should_unblock()) {
@@ -338,149 +323,158 @@ void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
return;
}
- return Context::OnMessageReceived(msg);
+ return Context::OnMessageReceivedNoFilter(msg);
}
-// Called on the IPC thread.
void SyncChannel::SyncContext::OnChannelError() {
- channel_closed_ = true;
- UnblockListener(NULL);
-
+ CancelPendingSends();
Context::OnChannelError();
}
-void SyncChannel::SyncContext::PopDeserializer(bool close_reply_event) {
- PendingSyncMsg msg = deserializers_.top();
- delete msg.deserializer;
- if (close_reply_event)
- CloseHandle(msg.reply_event);
- deserializers_.pop();
+void SyncChannel::SyncContext::OnChannelOpened() {
+ shutdown_watcher_.StartWatching(shutdown_event_, this);
+ Context::OnChannelOpened();
}
-SyncChannel::SyncChannel(const std::wstring& channel_id, Channel::Mode mode,
- Channel::Listener* listener, MessageFilter* filter,
- MessageLoop* ipc_message_loop,
- bool create_pipe_now, HANDLE shutdown_event)
- : ChannelProxy(channel_id, mode, ipc_message_loop,
- new SyncContext(listener, filter, ipc_message_loop),
- create_pipe_now),
- shutdown_event_(shutdown_event),
+void SyncChannel::SyncContext::OnChannelClosed() {
+ shutdown_watcher_.StopWatching();
+ Context::OnChannelClosed();
+}
+
+void SyncChannel::SyncContext::OnSendTimeout(int message_id) {
+ AutoLock auto_lock(deserializers_lock_);
+ PendingSyncMessageQueue::iterator iter;
+ for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
+ if ((*iter).id == message_id) {
+ SetEvent((*iter).done_event);
+ break;
+ }
+ }
+}
+
+void SyncChannel::SyncContext::CancelPendingSends() {
+ AutoLock auto_lock(deserializers_lock_);
+ PendingSyncMessageQueue::iterator iter;
+ for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++)
+ SetEvent((*iter).done_event);
+}
+
+void SyncChannel::SyncContext::OnObjectSignaled(HANDLE object) {
+ DCHECK(object == shutdown_event_);
+ // Process shut down before we can get a reply to a synchronous message.
+ // Cancel pending Send calls, which will end up setting the send done event.
+ CancelPendingSends();
+}
+
+
+SyncChannel::SyncChannel(
+ const std::wstring& channel_id, Channel::Mode mode,
+ Channel::Listener* listener, MessageFilter* filter,
+ MessageLoop* ipc_message_loop, bool create_pipe_now, HANDLE shutdown_event)
+ : ChannelProxy(
+ channel_id, mode, ipc_message_loop,
+ new SyncContext(listener, filter, ipc_message_loop, shutdown_event),
+ create_pipe_now),
sync_messages_with_no_timeout_allowed_(true) {
- DCHECK(shutdown_event_ != NULL);
+ // Ideally we only want to watch this object when running a nested message
+ // loop. However, we don't know when it exits if there's another nested
+ // message loop running under it or not, so we wouldn't know whether to
+ // stop or keep watching. So we always watch it, and create the event as
+ // manual reset since the object watcher might otherwise reset the event
+ // when we're doing a WaitForMultipleObjects.
+ dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this);
}
SyncChannel::~SyncChannel() {
- // The listener ensures that its lifetime is greater than SyncChannel. But
- // after SyncChannel is destructed there's no guarantee that the listener is
- // still around, so we wouldn't want ReceivedSyncMsgQueue to call the
- // listener.
- sync_context()->RemoveListener(listener());
}
-bool SyncChannel::Send(IPC::Message* message) {
+bool SyncChannel::Send(Message* message) {
return SendWithTimeout(message, INFINITE);
}
-bool SyncChannel::SendWithTimeout(IPC::Message* message, int timeout_ms) {
- bool message_is_sync = message->is_sync();
- HANDLE pump_messages_event = NULL;
+bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) {
+ if (!message->is_sync()) {
+ ChannelProxy::Send(message);
+ return true;
+ }
- HANDLE reply_event = NULL;
- if (message_is_sync) {
- DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE);
- IPC::SyncMessage* sync_msg = static_cast<IPC::SyncMessage*>(message);
- reply_event = sync_context()->Push(sync_msg);
- pump_messages_event = sync_msg->pump_messages_event();
+ // *this* might get deleted in WaitForReply.
+ scoped_refptr<SyncContext> context(sync_context());
+ if (WaitForSingleObject(context->shutdown_event(), 0) == WAIT_OBJECT_0) {
+ delete message;
+ return false;
}
- // Send the message using the ChannelProxy
+ DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE);
+ SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
+ context->Push(sync_msg);
+ int message_id = SyncMessage::GetMessageId(*sync_msg);
+ HANDLE pump_messages_event = sync_msg->pump_messages_event();
+
ChannelProxy::Send(message);
- if (!message_is_sync)
- return true;
- do {
- // Wait for reply, or for any other incoming synchronous message.
- DCHECK(reply_event != NULL);
- HANDLE objects[] = { shutdown_event_,
- reply_event,
- sync_context()->blocking_event(),
- pump_messages_event};
-
- DWORD result;
- TimeTicks before = TimeTicks::Now();
- if (pump_messages_event == NULL) {
- // No need to pump messages since we didn't get an event to check.
- result = WaitForMultipleObjects(3, objects, FALSE, timeout_ms);
- } else {
- // If the event is set, then we pump messages. Otherwise we also wait on
- // it so that if it gets set we start pumping messages.
- if (WaitForSingleObject(pump_messages_event, 0) == WAIT_OBJECT_0) {
- // Before calling MsgWaitForMultipleObjects() we check that our events
- // are not signaled. The windows message queue might always have events
- // starving the checking of our events otherwise.
- result = WaitForMultipleObjects(3, objects, FALSE, 0);
- if (result == WAIT_TIMEOUT) {
- result = MsgWaitForMultipleObjects(3, objects, FALSE, timeout_ms,
- QS_ALLINPUT);
- }
- } else {
- result = WaitForMultipleObjects(4, objects, FALSE, timeout_ms);
- }
- }
+ if (timeout_ms != INFINITE) {
+ // We use the sync message id so that when a message times out, we don't
+ // confuse it with another send that is either above/below this Send in
+ // the call stack.
+ context->ipc_message_loop()->PostDelayedTask(FROM_HERE,
+ NewRunnableMethod(context.get(),
+ &SyncContext::OnSendTimeout, message_id), timeout_ms);
+ }
- if (result == WAIT_OBJECT_0 || result == WAIT_TIMEOUT) {
- // Process shut down before we can get a reply to a synchronous message,
- // or timed-out. Unblock the thread.
- sync_context()->PopDeserializer(true);
- return false;
- }
+ // Wait for reply, or for any other incoming synchronous messages.
+ WaitForReply(pump_messages_event);
- if (result == WAIT_OBJECT_0 + 1) {
- // We got the reply to our synchronous message.
- CloseHandle(reply_event);
- return sync_context()->reply_deserialize_result();
- }
+ return context->Pop();
+}
- if (result == WAIT_OBJECT_0 + 2) {
+void SyncChannel::WaitForReply(HANDLE pump_messages_event) {
+ while (true) {
+ HANDLE objects[] = { sync_context()->GetDispatchEvent(),
+ sync_context()->GetSendDoneEvent(),
+ pump_messages_event };
+ uint32 count = pump_messages_event ? 3: 2;
+ DWORD result = WaitForMultipleObjects(count, objects, FALSE, INFINITE);
+ if (result == WAIT_OBJECT_0) {
// We're waiting for a reply, but we received a blocking synchronous
// call. We must process it or otherwise a deadlock might occur.
+ ResetEvent(sync_context()->GetDispatchEvent());
sync_context()->DispatchMessages();
- } else if (result == WAIT_OBJECT_0 + 3) {
- // Run a nested messsage loop to pump all the thread's messages. We
- // shutdown the nested loop when there are no more messages.
- pump_messages_events_.push(pump_messages_event);
- bool old_state = MessageLoop::current()->NestableTasksAllowed();
- MessageLoop::current()->SetNestableTasksAllowed(true);
- // Process a message, but come right back out of the MessageLoop (don't
- // loop, sleep, or wait for a kMsgQuit).
- MessageLoop::current()->RunAllPending();
- MessageLoop::current()->SetNestableTasksAllowed(old_state);
- pump_messages_events_.pop();
- } else {
- DCHECK(result == WAIT_OBJECT_0 + 4);
- // We were doing a WaitForMultipleObjects, but now the pump messages
- // event is set, so the next time we loop we'll use
- // MsgWaitForMultipleObjects instead.
+ continue;
}
- if (timeout_ms != INFINITE) {
- TimeDelta time_delta = TimeTicks::Now() - before;
- timeout_ms -= static_cast<int>(time_delta.InMilliseconds());
- if (timeout_ms <= 0) {
- // We timed-out while processing messages.
- sync_context()->PopDeserializer(true);
- return false;
- }
- }
+ if (result == WAIT_OBJECT_0 + 2)
+ WaitForReplyWithNestedMessageLoop(); // Start a nested message loop.
- // Continue looping until we either get the reply to our synchronous message
- // or we time-out.
- } while (true);
+ break;
+ }
}
-bool SyncChannel::UnblockListener(Message* message) {
- return sync_context()->UnblockListener(message);
+void SyncChannel::WaitForReplyWithNestedMessageLoop() {
+ HANDLE old_done_event = send_done_watcher_.GetWatchedObject();
+ send_done_watcher_.StopWatching();
+ send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this);
+ bool old_state = MessageLoop::current()->NestableTasksAllowed();
+ MessageLoop::current()->SetNestableTasksAllowed(true);
+ MessageLoop::current()->Run();
+ MessageLoop::current()->SetNestableTasksAllowed(old_state);
+ if (old_done_event)
+ send_done_watcher_.StartWatching(old_done_event, this);
}
-} // namespace IPC
+void SyncChannel::OnObjectSignaled(HANDLE object) {
+ HANDLE dispatch_event = sync_context()->GetDispatchEvent();
+ if (object == dispatch_event) {
+ // The call to DispatchMessages might delete this object, so reregister
+ // the object watcher first.
+ ResetEvent(dispatch_event);
+ dispatch_watcher_.StartWatching(dispatch_event, this);
+ sync_context()->DispatchMessages();
+ } else {
+ // We got the reply, timed out or the process shutdown.
+ DCHECK(object == sync_context()->GetSendDoneEvent());
+ MessageLoop::current()->Quit();
+ }
+}
+} // namespace IPC
diff --git a/chrome/common/ipc_sync_channel.h b/chrome/common/ipc_sync_channel.h
index 54c9fe9..6c03745 100644
--- a/chrome/common/ipc_sync_channel.h
+++ b/chrome/common/ipc_sync_channel.h
@@ -7,11 +7,12 @@
#include <windows.h>
#include <string>
-#include <stack>
-#include <queue>
+#include <deque>
#include "base/basictypes.h"
#include "base/lock.h"
+#include "base/object_watcher.h"
#include "base/ref_counted.h"
+#include "base/scoped_handle.h"
#include "chrome/common/ipc_channel_proxy.h"
namespace IPC {
@@ -24,17 +25,17 @@ class SyncMessage;
// is more than this object. If the message loop goes away while this object
// is running and it's used to send a message, then it will use the invalid
// message loop pointer to proxy it to the ipc thread.
-class SyncChannel : public ChannelProxy {
+class SyncChannel : public ChannelProxy,
+ public base::ObjectWatcher::Delegate {
public:
SyncChannel(const std::wstring& channel_id, Channel::Mode mode,
Channel::Listener* listener, MessageFilter* filter,
MessageLoop* ipc_message_loop, bool create_pipe_now,
- HANDLE shutdown_handle);
+ HANDLE shutdown_event);
~SyncChannel();
virtual bool Send(Message* message);
virtual bool SendWithTimeout(Message* message, int timeout_ms);
- bool UnblockListener(Message* message);
// Whether we allow sending messages with no time-out.
void set_sync_messages_with_no_timeout_allowed(bool value) {
@@ -48,74 +49,104 @@ class SyncChannel : public ChannelProxy {
// SyncContext holds the per object data for SyncChannel, so that SyncChannel
// can be deleted while it's being used in a different thread. See
// ChannelProxy::Context for more information.
- class SyncContext : public Context {
+ class SyncContext : public Context,
+ public base::ObjectWatcher::Delegate {
public:
SyncContext(Channel::Listener* listener,
MessageFilter* filter,
- MessageLoop* ipc_thread);
+ MessageLoop* ipc_thread,
+ HANDLE shutdown_event);
~SyncContext();
// Adds information about an outgoing sync message to the context so that
- // we know how to deserialize the reply. Returns a handle that's set when
- // the reply has arrived.
- HANDLE Push(IPC::SyncMessage* sync_msg);
+ // we know how to deserialize the reply.
+ void Push(IPC::SyncMessage* sync_msg);
- // Returns true if the reply message was deserialized without any errors,
- // or false otherwise.
- bool reply_deserialize_result() { return reply_deserialize_result_; }
+ // Cleanly remove the top deserializer (and throw it away). Returns the
+ // result of the Send call for that message.
+ bool Pop();
+
+ // Returns an event that's set when the send is complete, timed out or the
+ // process shut down.
+ HANDLE GetSendDoneEvent();
// Returns an event that's set when an incoming message that's not the reply
// needs to get dispatched (by calling SyncContext::DispatchMessages).
- HANDLE blocking_event();
+ HANDLE GetDispatchEvent();
void DispatchMessages();
- void RemoveListener(Channel::Listener* listener);
// Checks if the given message is blocking the listener thread because of a
// synchronous send. If it is, the thread is unblocked and true is returned.
// Otherwise the function returns false.
- bool UnblockListener(const Message* msg);
+ bool TryToUnblockListener(const Message* msg);
+
+ // Called on the IPC thread when a sync send that runs a nested message loop
+ // times out.
+ void OnSendTimeout(int message_id);
- // Cleanly remove the top deserializer (and throw it away).
- void PopDeserializer(bool close_reply_event);
+ HANDLE shutdown_event() { return shutdown_event_; }
private:
- void OnMessageReceived(const Message& msg);
- void OnChannelError();
+ // IPC::ChannelProxy methods that we override.
+
+ // Called on the listener thread.
+ virtual void Clear();
+
+ // Called on the IPC thread.
+ virtual void OnMessageReceived(const Message& msg);
+ virtual void OnChannelError();
+ virtual void OnChannelOpened();
+ virtual void OnChannelClosed();
+
+ // Cancels all pending Send calls.
+ void CancelPendingSends();
+
+ // ObjectWatcher::Delegate implementation.
+ virtual void OnObjectSignaled(HANDLE object);
// When sending a synchronous message, this structure contains an object that
// knows how to deserialize the response.
struct PendingSyncMsg {
PendingSyncMsg(int id, IPC::MessageReplyDeserializer* d, HANDLE e) :
- id(id), deserializer(d), reply_event(e) { }
+ id(id), deserializer(d), done_event(e), send_result(false) { }
int id;
IPC::MessageReplyDeserializer* deserializer;
- HANDLE reply_event;
+ HANDLE done_event;
+ bool send_result;
};
- typedef std::stack<PendingSyncMsg> PendingSyncMessageQueue;
+ typedef std::deque<PendingSyncMsg> PendingSyncMessageQueue;
PendingSyncMessageQueue deserializers_;
Lock deserializers_lock_;
- // This can't be a scoped_refptr because it needs to be released on the
- // listener thread.
- ReceivedSyncMsgQueue* received_sync_msgs_;
+ scoped_refptr<ReceivedSyncMsgQueue> received_sync_msgs_;
- bool channel_closed_;
- bool reply_deserialize_result_;
+ HANDLE shutdown_event_;
+ base::ObjectWatcher shutdown_watcher_;
};
private:
+ // ObjectWatcher::Delegate implementation.
+ virtual void OnObjectSignaled(HANDLE object);
+
SyncContext* sync_context() { return reinterpret_cast<SyncContext*>(context()); }
- // Copy of shutdown event that we get in constructor.
- HANDLE shutdown_event_;
+ // Both these functions wait for a reply, timeout or process shutdown. The
+ // latter one also runs a nested message loop in the meantime.
+ void WaitForReply(HANDLE pump_messages_event);
- std::stack<HANDLE> pump_messages_events_;
+ // Runs a nested message loop until a reply arrives, times out, or the process
+ // shuts down.
+ void WaitForReplyWithNestedMessageLoop();
bool sync_messages_with_no_timeout_allowed_;
+ // Used to signal events between the IPC and listener threads.
+ base::ObjectWatcher send_done_watcher_;
+ base::ObjectWatcher dispatch_watcher_;
+
DISALLOW_EVIL_CONSTRUCTORS(SyncChannel);
};
diff --git a/chrome/common/ipc_sync_channel_unittest.cc b/chrome/common/ipc_sync_channel_unittest.cc
index 11ba159..decd6b6 100644
--- a/chrome/common/ipc_sync_channel_unittest.cc
+++ b/chrome/common/ipc_sync_channel_unittest.cc
@@ -83,17 +83,14 @@ class Worker : public Channel::Listener, public Message::Sender {
// The IPC thread needs to outlive SyncChannel, so force the correct order of
// destruction.
virtual ~Worker() {
- CloseChannel();
- // We must stop the threads and release the channel here. The IPC thread
- // must die before the listener thread, otherwise if its in the process of
- // sending a message, it will get an error, it will use channel_, which
- // references listener_. There are many ways of crashing, depending on
- // timing.
- // This is a race condition so you may not see it all the time even if you
- // reverse the Stop() calls. You may see this bug with AppVerifier only.
+ Event listener_done, ipc_done;
+ ListenerThread()->message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
+ this, &Worker::OnListenerThreadShutdown, listener_done.handle(),
+ ipc_done.handle()));
+ HANDLE handles[] = { listener_done.handle(), ipc_done.handle() };
+ WaitForMultipleObjects(2, handles, TRUE, INFINITE);
ipc_thread_.Stop();
listener_thread_.Stop();
- channel_.reset();
}
void AddRef() { }
void Release() { }
@@ -102,12 +99,13 @@ class Worker : public Channel::Listener, public Message::Sender {
return channel_->SendWithTimeout(msg, timeout_ms);
}
void WaitForChannelCreation() { channel_created_.Wait(); }
- void CloseChannel() { channel_.reset(); }
+ void CloseChannel() {
+ DCHECK(MessageLoop::current() == ListenerThread()->message_loop());
+ channel_->Close();
+ }
void Start() {
StartThread(&listener_thread_);
- base::Thread* thread =
- overrided_thread_ ? overrided_thread_ : &listener_thread_;
- thread->message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
+ ListenerThread()->message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
this, &Worker::OnStart));
}
void OverrideThread(base::Thread* overrided_thread) {
@@ -124,7 +122,6 @@ class Worker : public Channel::Listener, public Message::Sender {
// Functions for dervied classes to implement if they wish.
virtual void Run() { }
- virtual void OnDouble(int in, int* out) { NOTREACHED(); }
virtual void OnAnswer(int* answer) { NOTREACHED(); }
virtual void OnAnswerDelay(Message* reply_msg) {
// The message handler map below can only take one entry for
@@ -137,8 +134,18 @@ class Worker : public Channel::Listener, public Message::Sender {
SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, answer);
Send(reply_msg);
}
+ virtual void OnDouble(int in, int* out) { NOTREACHED(); }
+ virtual void OnDoubleDelay(int in, Message* reply_msg) {
+ int result;
+ OnDouble(in, &result);
+ SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, result);
+ Send(reply_msg);
+ }
private:
+ base::Thread* ListenerThread() {
+ return overrided_thread_ ? overrided_thread_ : &listener_thread_;
+ }
// Called on the listener thread to create the sync channel.
void OnStart() {
// Link ipc_thread_, listener_thread_ and channel_ altogether.
@@ -150,9 +157,22 @@ class Worker : public Channel::Listener, public Message::Sender {
Run();
}
+ void OnListenerThreadShutdown(HANDLE listener_event, HANDLE ipc_event) {
+ // SyncChannel needs to be destructed on the thread that it was created on.
+ channel_.reset();
+ SetEvent(listener_event);
+
+ ipc_thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
+ this, &Worker::OnIPCThreadShutdown, ipc_event));
+ }
+
+ void OnIPCThreadShutdown(HANDLE ipc_event) {
+ SetEvent(ipc_event);
+ }
+
void OnMessageReceived(const Message& message) {
IPC_BEGIN_MESSAGE_MAP(Worker, message)
- IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Double, OnDouble)
+ IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_Double, OnDoubleDelay)
IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_AnswerToLife,
OnAnswerDelay)
IPC_END_MESSAGE_MAP()
@@ -211,21 +231,27 @@ void RunTest(std::vector<Worker*> workers) {
} // namespace
-
//-----------------------------------------------------------------------------
namespace {
class SimpleServer : public Worker {
public:
- SimpleServer() : Worker(Channel::MODE_SERVER, "simpler_server") { }
+ SimpleServer(bool pump_during_send)
+ : Worker(Channel::MODE_SERVER, "simpler_server"),
+ pump_during_send_(pump_during_send) { }
void Run() {
int answer = 0;
- bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
DCHECK(result);
DCHECK(answer == 42);
Done();
}
+
+ bool pump_during_send_;
};
class SimpleClient : public Worker {
@@ -238,16 +264,20 @@ class SimpleClient : public Worker {
}
};
-} // namespace
-
-// Tests basic synchronous call
-TEST_F(IPCSyncChannelTest, Simple) {
+void Simple(bool pump_during_send) {
std::vector<Worker*> workers;
- workers.push_back(new SimpleServer());
+ workers.push_back(new SimpleServer(pump_during_send));
workers.push_back(new SimpleClient());
RunTest(workers);
}
+} // namespace
+
+// Tests basic synchronous call
+TEST_F(IPCSyncChannelTest, Simple) {
+ Simple(false);
+ Simple(true);
+}
//-----------------------------------------------------------------------------
@@ -264,16 +294,20 @@ class DelayClient : public Worker {
}
};
-} // namespace
-
-// Tests that asynchronous replies work
-TEST_F(IPCSyncChannelTest, DelayReply) {
+void DelayReply(bool pump_during_send) {
std::vector<Worker*> workers;
- workers.push_back(new SimpleServer());
+ workers.push_back(new SimpleServer(pump_during_send));
workers.push_back(new DelayClient());
RunTest(workers);
}
+} // namespace
+
+// Tests that asynchronous replies work
+TEST_F(IPCSyncChannelTest, DelayReply) {
+ DelayReply(false);
+ DelayReply(true);
+}
//-----------------------------------------------------------------------------
@@ -281,22 +315,30 @@ namespace {
class NoHangServer : public Worker {
public:
- explicit NoHangServer(Event* got_first_reply)
- : Worker(Channel::MODE_SERVER, "no_hang_server"),
- got_first_reply_(got_first_reply) { }
+ explicit NoHangServer(Event* got_first_reply, bool pump_during_send)
+ : Worker(Channel::MODE_SERVER, "no_hang_server"),
+ got_first_reply_(got_first_reply),
+ pump_during_send_(pump_during_send) { }
void Run() {
int answer = 0;
- bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
DCHECK(result);
DCHECK(answer == 42);
got_first_reply_->Set();
- result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
+ msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ result = Send(msg);
DCHECK(!result);
Done();
}
Event* got_first_reply_;
+ bool pump_during_send_;
};
class NoHangClient : public Worker {
@@ -318,29 +360,37 @@ class NoHangClient : public Worker {
Event* got_first_reply_;
};
-} // namespace
-
-// Tests that caller doesn't hang if receiver dies
-TEST_F(IPCSyncChannelTest, NoHang) {
+void NoHang(bool pump_during_send) {
Event got_first_reply;
-
std::vector<Worker*> workers;
- workers.push_back(new NoHangServer(&got_first_reply));
+ workers.push_back(new NoHangServer(&got_first_reply, pump_during_send));
workers.push_back(new NoHangClient(&got_first_reply));
RunTest(workers);
}
+} // namespace
+
+// Tests that caller doesn't hang if receiver dies
+TEST_F(IPCSyncChannelTest, NoHang) {
+ NoHang(false);
+ NoHang(true);
+}
//-----------------------------------------------------------------------------
namespace {
-class RecursiveServer : public Worker {
+class UnblockServer : public Worker {
public:
- RecursiveServer() : Worker(Channel::MODE_SERVER, "recursive_server") { }
+ UnblockServer(bool pump_during_send)
+ : Worker(Channel::MODE_SERVER, "unblock_server"),
+ pump_during_send_(pump_during_send) { }
void Run() {
int answer = 0;
- bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
DCHECK(result);
DCHECK(answer == 42);
Done();
@@ -349,29 +399,160 @@ class RecursiveServer : public Worker {
void OnDouble(int in, int* out) {
*out = in * 2;
}
+
+ bool pump_during_send_;
};
-class RecursiveClient : public Worker {
+class UnblockClient : public Worker {
public:
- RecursiveClient() : Worker(Channel::MODE_CLIENT, "recursive_client") { }
+ UnblockClient(bool pump_during_send)
+ : Worker(Channel::MODE_CLIENT, "unblock_client"),
+ pump_during_send_(pump_during_send) { }
void OnAnswer(int* answer) {
- BOOL result = Send(new SyncChannelTestMsg_Double(21, answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_Double(21, answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ BOOL result = Send(msg);
DCHECK(result);
Done();
}
+
+ bool pump_during_send_;
};
+void Unblock(bool server_pump, bool client_pump) {
+ std::vector<Worker*> workers;
+ workers.push_back(new UnblockServer(server_pump));
+ workers.push_back(new UnblockClient(client_pump));
+ RunTest(workers);
+}
+
} // namespace
// Tests that the caller unblocks to answer a sync message from the receiver.
+TEST_F(IPCSyncChannelTest, Unblock) {
+ Unblock(false, false);
+ Unblock(false, true);
+ Unblock(true, false);
+ Unblock(true, true);
+}
+
+//-----------------------------------------------------------------------------
+
+namespace {
+
+class RecursiveServer : public Worker {
+ public:
+ explicit RecursiveServer(
+ bool expected_send_result, bool pump_first, bool pump_second)
+ : Worker(Channel::MODE_SERVER, "recursive_server"),
+ expected_send_result_(expected_send_result),
+ pump_first_(pump_first), pump_second_(pump_second) { }
+ void Run() {
+ int answer;
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_Double(21, &answer);
+ if (pump_first_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
+ DCHECK(result == expected_send_result_);
+ Done();
+ }
+
+ void OnDouble(int in, int* out) {
+ int answer;
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_second_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
+ DCHECK(result == expected_send_result_);
+ }
+
+ bool expected_send_result_, pump_first_, pump_second_;
+};
+
+class RecursiveClient : public Worker {
+ public:
+ explicit RecursiveClient(bool pump_during_send, bool close_channel)
+ : Worker(Channel::MODE_CLIENT, "recursive_client"),
+ pump_during_send_(pump_during_send), close_channel_(close_channel) { }
+
+ void OnDoubleDelay(int in, Message* reply_msg) {
+ int answer = 0;
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
+ DCHECK(result != close_channel_);
+ if (!close_channel_) {
+ SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2);
+ Send(reply_msg);
+ }
+ Done();
+ }
+
+ void OnAnswerDelay(Message* reply_msg) {
+ if (close_channel_) {
+ CloseChannel();
+ } else {
+ SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
+ Send(reply_msg);
+ }
+ }
+
+ bool pump_during_send_, close_channel_;
+};
+
+void Recursive(
+ bool server_pump_first, bool server_pump_second, bool client_pump) {
+ std::vector<Worker*> workers;
+ workers.push_back(
+ new RecursiveServer(true, server_pump_first, server_pump_second));
+ workers.push_back(new RecursiveClient(client_pump, false));
+ RunTest(workers);
+}
+
+} // namespace
+
+// Tests a server calling Send while another Send is pending.
TEST_F(IPCSyncChannelTest, Recursive) {
+ Recursive(false, false, false);
+ Recursive(false, false, true);
+ Recursive(false, true, false);
+ Recursive(false, true, true);
+ Recursive(true, false, false);
+ Recursive(true, false, true);
+ Recursive(true, true, false);
+ Recursive(true, true, true);
+}
+
+//-----------------------------------------------------------------------------
+
+namespace {
+
+void RecursiveNoHang(
+ bool server_pump_first, bool server_pump_second, bool client_pump) {
std::vector<Worker*> workers;
- workers.push_back(new RecursiveServer());
- workers.push_back(new RecursiveClient());
+ workers.push_back(
+ new RecursiveServer(false, server_pump_first, server_pump_second));
+ workers.push_back(new RecursiveClient(client_pump, true));
RunTest(workers);
}
+} // namespace
+
+// Tests that if a caller makes a sync call during an existing sync call and
+// the receiver dies, neither of the Send() calls hang.
+TEST_F(IPCSyncChannelTest, RecursiveNoHang) {
+ RecursiveNoHang(false, false, false);
+ RecursiveNoHang(false, false, true);
+ RecursiveNoHang(false, true, false);
+ RecursiveNoHang(false, true, true);
+ RecursiveNoHang(true, false, false);
+ RecursiveNoHang(true, false, true);
+ RecursiveNoHang(true, true, false);
+ RecursiveNoHang(true, true, true);
+}
//-----------------------------------------------------------------------------
@@ -379,14 +560,22 @@ namespace {
class MultipleServer1 : public Worker {
public:
- MultipleServer1() : Worker(L"test_channel1", Channel::MODE_SERVER) { }
+ MultipleServer1(bool pump_during_send)
+ : Worker(L"test_channel1", Channel::MODE_SERVER),
+ pump_during_send_(pump_during_send) { }
+
void Run() {
int answer = 0;
- bool result = Send(new SyncChannelTestMsg_Double(5, &answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
DCHECK(result);
DCHECK(answer == 10);
Done();
}
+
+ bool pump_during_send_;
};
class MultipleClient1 : public Worker {
@@ -419,15 +608,21 @@ class MultipleServer2 : public Worker {
class MultipleClient2 : public Worker {
public:
- MultipleClient2(Event* client1_msg_received, Event* client1_can_reply) :
- Worker(L"test_channel2", Channel::MODE_CLIENT),
+ MultipleClient2(
+ Event* client1_msg_received, Event* client1_can_reply,
+ bool pump_during_send)
+ : Worker(L"test_channel2", Channel::MODE_CLIENT),
client1_msg_received_(client1_msg_received),
- client1_can_reply_(client1_can_reply) { }
+ client1_can_reply_(client1_can_reply),
+ pump_during_send_(pump_during_send) { }
void Run() {
int answer = 0;
client1_msg_received_->Wait();
- bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
DCHECK(result);
DCHECK(answer == 42);
client1_can_reply_->Set();
@@ -436,13 +631,10 @@ class MultipleClient2 : public Worker {
private:
Event *client1_msg_received_, *client1_can_reply_;
+ bool pump_during_send_;
};
-} // namespace
-
-// Tests that multiple SyncObjects on the same listener thread can unblock each
-// other.
-TEST_F(IPCSyncChannelTest, Multiple) {
+void Multiple(bool server_pump, bool client_pump) {
std::vector<Worker*> workers;
// A shared worker thread so that server1 and server2 run on one thread.
@@ -461,10 +653,10 @@ TEST_F(IPCSyncChannelTest, Multiple) {
workers.push_back(worker);
worker = new MultipleClient2(
- &client1_msg_received, &client1_can_reply);
+ &client1_msg_received, &client1_can_reply, client_pump);
workers.push_back(worker);
- worker = new MultipleServer1();
+ worker = new MultipleServer1(server_pump);
worker->OverrideThread(&worker_thread);
workers.push_back(worker);
@@ -475,6 +667,16 @@ TEST_F(IPCSyncChannelTest, Multiple) {
RunTest(workers);
}
+} // namespace
+
+// Tests that multiple SyncObjects on the same listener thread can unblock each
+// other.
+TEST_F(IPCSyncChannelTest, Multiple) {
+ Multiple(false, false);
+ Multiple(false, true);
+ Multiple(true, false);
+ Multiple(true, true);
+}
//-----------------------------------------------------------------------------
@@ -482,14 +684,21 @@ namespace {
class QueuedReplyServer1 : public Worker {
public:
- QueuedReplyServer1() : Worker(L"test_channel1", Channel::MODE_SERVER) { }
+ QueuedReplyServer1(bool pump_during_send)
+ : Worker(L"test_channel1", Channel::MODE_SERVER),
+ pump_during_send_(pump_during_send) { }
void Run() {
int answer = 0;
- bool result = Send(new SyncChannelTestMsg_Double(5, &answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
DCHECK(result);
DCHECK(answer == 10);
Done();
}
+
+ bool pump_during_send_;
};
class QueuedReplyClient1 : public Worker {
@@ -531,30 +740,29 @@ class QueuedReplyServer2 : public Worker {
class QueuedReplyClient2 : public Worker {
public:
- explicit QueuedReplyClient2(Event* client1_msg_received) :
- Worker(L"test_channel2", Channel::MODE_CLIENT),
- client1_msg_received_(client1_msg_received) { }
+ explicit QueuedReplyClient2(
+ Event* client1_msg_received, bool pump_during_send)
+ : Worker(L"test_channel2", Channel::MODE_CLIENT),
+ client1_msg_received_(client1_msg_received),
+ pump_during_send_(pump_during_send){ }
void Run() {
int answer = 0;
client1_msg_received_->Wait();
- bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
DCHECK(result);
DCHECK(answer == 42);
Done();
}
- private:
Event *client1_msg_received_;
+ bool pump_during_send_;
};
-} // namespace
-
-// While a blocking send is in progress, the listener thread might answer other
-// synchronous messages. This tests that if during the response to another
-// message the reply to the original messages comes, it is queued up correctly
-// and the original Send is unblocked later.
-TEST_F(IPCSyncChannelTest, QueuedReply) {
+void QueuedReply(bool server_pump, bool client_pump) {
std::vector<Worker*> workers;
// A shared worker thread so that server1 and server2 run on one thread.
@@ -569,10 +777,10 @@ TEST_F(IPCSyncChannelTest, QueuedReply) {
worker->OverrideThread(&worker_thread);
workers.push_back(worker);
- worker = new QueuedReplyClient2(&client1_msg_received);
+ worker = new QueuedReplyClient2(&client1_msg_received, client_pump);
workers.push_back(worker);
- worker = new QueuedReplyServer1();
+ worker = new QueuedReplyServer1(server_pump);
worker->OverrideThread(&worker_thread);
workers.push_back(worker);
@@ -583,6 +791,18 @@ TEST_F(IPCSyncChannelTest, QueuedReply) {
RunTest(workers);
}
+} // namespace
+
+// While a blocking send is in progress, the listener thread might answer other
+// synchronous messages. This tests that if during the response to another
+// message the reply to the original messages comes, it is queued up correctly
+// and the original Send is unblocked later.
+TEST_F(IPCSyncChannelTest, QueuedReply) {
+ QueuedReply(false, false);
+ QueuedReply(false, true);
+ QueuedReply(true, false);
+ QueuedReply(true, true);
+}
//-----------------------------------------------------------------------------
@@ -590,14 +810,18 @@ namespace {
class BadServer : public Worker {
public:
- BadServer() : Worker(Channel::MODE_SERVER, "simpler_server") { }
+ BadServer(bool pump_during_send)
+ : Worker(Channel::MODE_SERVER, "simpler_server"),
+ pump_during_send_(pump_during_send) { }
void Run() {
int answer = 0;
- Message* msg = new SyncMessage(MSG_ROUTING_CONTROL,
- SyncChannelTestMsg_Double::ID,
- Message::PRIORITY_NORMAL,
- NULL);
+ IPC::SyncMessage* msg = new SyncMessage(
+ MSG_ROUTING_CONTROL, SyncChannelTestMsg_Double::ID,
+ Message::PRIORITY_NORMAL, NULL);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+
// Temporarily set the minimum logging very high so that the assertion
// in ipc_message_utils doesn't fire.
int log_level = logging::GetMinLogLevel();
@@ -613,27 +837,33 @@ class BadServer : public Worker {
Done();
}
-};
-} // namespace
+ bool pump_during_send_;
+};
-// Tests that if a message is not serialized correctly, the Send() will fail.
-TEST_F(IPCSyncChannelTest, BadMessage) {
+void BadMessage(bool pump_during_send) {
std::vector<Worker*> workers;
- workers.push_back(new BadServer());
+ workers.push_back(new BadServer(pump_during_send));
workers.push_back(new SimpleClient());
RunTest(workers);
}
+} // namespace
+
+// Tests that if a message is not serialized correctly, the Send() will fail.
+TEST_F(IPCSyncChannelTest, BadMessage) {
+ BadMessage(false);
+ BadMessage(true);
+}
//-----------------------------------------------------------------------------
namespace {
-class ChattyRecursiveClient : public Worker {
+class ChattyClient : public Worker {
public:
- ChattyRecursiveClient() :
- Worker(Channel::MODE_CLIENT, "chatty_recursive_client") { }
+ ChattyClient() :
+ Worker(Channel::MODE_CLIENT, "chatty_client") { }
void OnAnswer(int* answer) {
// The PostMessage limit is 10k. Send 20% more than that.
@@ -649,19 +879,23 @@ class ChattyRecursiveClient : public Worker {
}
};
+void ChattyServer(bool pump_during_send) {
+ std::vector<Worker*> workers;
+ workers.push_back(new UnblockServer(pump_during_send));
+ workers.push_back(new ChattyClient());
+ RunTest(workers);
+}
+
} // namespace
// Tests http://b/issue?id=1093251 - that sending lots of sync messages while
// the receiver is waiting for a sync reply does not overflow the PostMessage
// queue.
TEST_F(IPCSyncChannelTest, ChattyServer) {
- std::vector<Worker*> workers;
- workers.push_back(new RecursiveServer());
- workers.push_back(new ChattyRecursiveClient());
- RunTest(workers);
+ ChattyServer(false);
+ ChattyServer(true);
}
-
//------------------------------------------------------------------------------
namespace {
@@ -669,19 +903,22 @@ namespace {
class TimeoutServer : public Worker {
public:
TimeoutServer(int timeout_ms,
- std::vector<bool> timeout_seq)
+ std::vector<bool> timeout_seq,
+ bool pump_during_send)
: Worker(Channel::MODE_SERVER, "timeout_server"),
timeout_ms_(timeout_ms),
- timeout_seq_(timeout_seq) {
+ timeout_seq_(timeout_seq),
+ pump_during_send_(pump_during_send) {
}
void Run() {
for (std::vector<bool>::const_iterator iter = timeout_seq_.begin();
iter != timeout_seq_.end(); ++iter) {
int answer = 0;
- bool result =
- SendWithTimeout(new SyncChannelTestMsg_AnswerToLife(&answer),
- timeout_ms_);
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = SendWithTimeout(msg, timeout_ms_);
if (*iter) {
// Time-out expected.
DCHECK(!result);
@@ -697,6 +934,7 @@ class TimeoutServer : public Worker {
private:
int timeout_ms_;
std::vector<bool> timeout_seq_;
+ bool pump_during_send_;
};
class UnresponsiveClient : public Worker {
@@ -725,35 +963,29 @@ class UnresponsiveClient : public Worker {
std::vector<bool> timeout_seq_;
};
-} // namespace
-
-// Tests that SendWithTimeout does not time-out if the response comes back fast
-// enough.
-TEST_F(IPCSyncChannelTest, SendWithTimeoutOK) {
+void SendWithTimeoutOK(bool pump_during_send) {
std::vector<Worker*> workers;
std::vector<bool> timeout_seq;
timeout_seq.push_back(false);
timeout_seq.push_back(false);
timeout_seq.push_back(false);
- workers.push_back(new TimeoutServer(5000, timeout_seq));
+ workers.push_back(new TimeoutServer(5000, timeout_seq, pump_during_send));
workers.push_back(new SimpleClient());
RunTest(workers);
}
-// Tests that SendWithTimeout does time-out.
-TEST_F(IPCSyncChannelTest, SendWithTimeoutTimeout) {
+void SendWithTimeoutTimeout(bool pump_during_send) {
std::vector<Worker*> workers;
std::vector<bool> timeout_seq;
timeout_seq.push_back(true);
timeout_seq.push_back(false);
timeout_seq.push_back(false);
- workers.push_back(new TimeoutServer(100, timeout_seq));
+ workers.push_back(new TimeoutServer(100, timeout_seq, pump_during_send));
workers.push_back(new UnresponsiveClient(timeout_seq));
RunTest(workers);
}
-// Sends some message that time-out and some that succeed.
-TEST_F(IPCSyncChannelTest, SendWithTimeoutMixedOKAndTimeout) {
+void SendWithTimeoutMixedOKAndTimeout(bool pump_during_send) {
std::vector<Worker*> workers;
std::vector<bool> timeout_seq;
timeout_seq.push_back(true);
@@ -761,7 +993,28 @@ TEST_F(IPCSyncChannelTest, SendWithTimeoutMixedOKAndTimeout) {
timeout_seq.push_back(false);
timeout_seq.push_back(true);
timeout_seq.push_back(false);
- workers.push_back(new TimeoutServer(100, timeout_seq));
+ workers.push_back(new TimeoutServer(100, timeout_seq, pump_during_send));
workers.push_back(new UnresponsiveClient(timeout_seq));
RunTest(workers);
-} \ No newline at end of file
+}
+
+} // namespace
+
+// Tests that SendWithTimeout does not time-out if the response comes back fast
+// enough.
+TEST_F(IPCSyncChannelTest, SendWithTimeoutOK) {
+ SendWithTimeoutOK(false);
+ SendWithTimeoutOK(true);
+}
+
+// Tests that SendWithTimeout does time-out.
+TEST_F(IPCSyncChannelTest, SendWithTimeoutTimeout) {
+ SendWithTimeoutTimeout(false);
+ SendWithTimeoutTimeout(true);
+}
+
+// Sends some message that time-out and some that succeed.
+TEST_F(IPCSyncChannelTest, SendWithTimeoutMixedOKAndTimeout) {
+ SendWithTimeoutMixedOKAndTimeout(false);
+ SendWithTimeoutMixedOKAndTimeout(true);
+}
diff --git a/chrome/common/ipc_sync_message.h b/chrome/common/ipc_sync_message.h
index d098a99..86cb62f 100644
--- a/chrome/common/ipc_sync_message.h
+++ b/chrome/common/ipc_sync_message.h
@@ -27,8 +27,9 @@ class SyncMessage : public Message {
// If this message can cause the receiver to block while waiting for user
// input (i.e. by calling MessageBox), then the caller needs to pump window
// messages and dispatch asynchronous messages while waiting for the reply.
- // If this handle is passed in, then window messages will be pumped while
- // it's set. The handle must be valid until after the Send call returns.
+ // If this handle is passed in, then window messages will start being pumped
+ // when it's set. Note that this behavior will continue even if the event is
+ // later reset. The handle must be valid until after the Send call returns.
void set_pump_messages_event(HANDLE event) {
pump_messages_event_ = event;
if (event) {