summaryrefslogtreecommitdiffstats
path: root/ipc
diff options
context:
space:
mode:
authorjhorwich@chromium.org <jhorwich@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-01-11 22:39:54 +0000
committerjhorwich@chromium.org <jhorwich@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-01-11 22:39:54 +0000
commit522cc10d714c4a2e519176a8f24d2549820f63dc (patch)
tree1860802c1776ec2669811866f9be16f0cfd1275e /ipc
parent76b2482e4bb8586807d92025b942817030b3690d (diff)
downloadchromium_src-522cc10d714c4a2e519176a8f24d2549820f63dc.zip
chromium_src-522cc10d714c4a2e519176a8f24d2549820f63dc.tar.gz
chromium_src-522cc10d714c4a2e519176a8f24d2549820f63dc.tar.bz2
Reimplement ReceivedSyncMsgQueue::DispatchMessages
Implementation of IPC::SyncChannel::ReceivedSyncMsgQueue::DispatchMessages that does not hold any messages in a local stack-frame's delayed_queue, which was causing me to see an inbound sync message from a plugin not dispatched while the renderer was waiting for replies from the plugin. This was causing the plugin and renderer to deadlock waiting for each other. BUG=108491 TEST=Run Pepperized O3D and observe for tab hangs TEST=Run ipc_tests unittests Review URL: http://codereview.chromium.org/9022038 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@117309 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'ipc')
-rw-r--r--ipc/ipc_sync_channel.cc46
-rw-r--r--ipc/ipc_sync_channel_unittest.cc260
2 files changed, 288 insertions, 18 deletions
diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc
index f3c5384..74d9744 100644
--- a/ipc/ipc_sync_channel.cc
+++ b/ipc/ipc_sync_channel.cc
@@ -1,4 +1,4 @@
-// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
@@ -64,6 +64,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
// We set the event in case the listener thread is blocked (or is about
// to). In case it's not, the PostTask dispatches the messages.
message_queue_.push_back(QueuedMessage(new Message(msg), context));
+ message_queue_version_++;
}
dispatch_event_.Signal();
@@ -89,27 +90,35 @@ class SyncChannel::ReceivedSyncMsgQueue :
}
void DispatchMessages(SyncContext* dispatching_context) {
- SyncMessageQueue delayed_queue;
+ bool first_time = true;
+ uint32 expected_version = 0;
+ SyncMessageQueue::iterator it;
while (true) {
- Message* message;
+ Message* message = NULL;
scoped_refptr<SyncChannel::SyncContext> context;
{
base::AutoLock auto_lock(message_lock_);
- if (message_queue_.empty()) {
- message_queue_ = delayed_queue;
- break;
+ if (first_time || message_queue_version_ != expected_version) {
+ it = message_queue_.begin();
+ first_time = false;
+ }
+ for (; it != message_queue_.end(); it++) {
+ if (!it->context->restrict_dispatch() ||
+ it->context == dispatching_context) {
+ message = it->message;
+ context = it->context;
+ it = message_queue_.erase(it);
+ message_queue_version_++;
+ expected_version = message_queue_version_;
+ break;
+ }
}
-
- message = message_queue_.front().message;
- context = message_queue_.front().context;
- message_queue_.pop_front();
- }
- if (context->restrict_dispatch() && context != dispatching_context) {
- delayed_queue.push_back(QueuedMessage(message, context));
- } else {
- context->OnDispatchMessage(*message);
- delete message;
}
+
+ if (message == NULL)
+ break;
+ context->OnDispatchMessage(*message);
+ delete message;
}
}
@@ -122,6 +131,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
if (iter->context == context) {
delete iter->message;
iter = message_queue_.erase(iter);
+ message_queue_version_++;
} else {
iter++;
}
@@ -169,6 +179,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
// See the comment in SyncChannel::SyncChannel for why this event is created
// as manual reset.
ReceivedSyncMsgQueue() :
+ message_queue_version_(0),
dispatch_event_(true, false),
listener_message_loop_(base::MessageLoopProxy::current()),
task_pending_(false),
@@ -185,8 +196,9 @@ class SyncChannel::ReceivedSyncMsgQueue :
scoped_refptr<SyncChannel::SyncContext> context;
};
- typedef std::deque<QueuedMessage> SyncMessageQueue;
+ typedef std::list<QueuedMessage> SyncMessageQueue;
SyncMessageQueue message_queue_;
+ uint32 message_queue_version_; // Used to signal DispatchMessages to rescan
std::vector<QueuedMessage> received_replies_;
diff --git a/ipc/ipc_sync_channel_unittest.cc b/ipc/ipc_sync_channel_unittest.cc
index e7903f4..f396843 100644
--- a/ipc/ipc_sync_channel_unittest.cc
+++ b/ipc/ipc_sync_channel_unittest.cc
@@ -1,4 +1,4 @@
-// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//
@@ -1334,4 +1334,262 @@ TEST_F(IPCSyncChannelTest, RestrictedDispatch) {
EXPECT_EQ(3, success);
}
+//-----------------------------------------------------------------------------
+
+// This test case inspired by crbug.com/108491
+// We create two servers that use the same ListenerThread but have
+// SetRestrictDispatchToSameChannel set to true.
+// We create clients, then use some specific WaitableEvent wait/signalling to
+// ensure that messages get dispatched in a way that causes a deadlock due to
+// a nested dispatch and an eligible message in a higher-level dispatch's
+// delayed_queue. Specifically, we start with client1 about so send an
+// unblocking message to server1, while the shared listener thread for the
+// servers server1 and server2 is about to send a non-unblocking message to
+// client1. At the same time, client2 will be about to send an unblocking
+// message to server2. Server1 will handle the client1->server1 message by
+// telling server2 to send a non-unblocking message to client2.
+// What should happen is that the send to server2 should find the pending,
+// same-context client2->server2 message to dispatch, causing client2 to
+// unblock then handle the server2->client2 message, so that the shared
+// servers' listener thread can then respond to the client1->server1 message.
+// Then client1 can handle the non-unblocking server1->client1 message.
+// The old code would end up in a state where the server2->client2 message is
+// sent, but the client2->server2 message (which is eligible for dispatch, and
+// which is what client2 is waiting for) is stashed in a local delayed_queue
+// that has server1's channel context, causing a deadlock.
+// WaitableEvents in the events array are used to:
+// event 0: indicate to client1 that server listener is in OnDoServerTask
+// event 1: indicate to client1 that client2 listener is in OnDoClient2Task
+// event 2: indicate to server1 that client2 listener is in OnDoClient2Task
+// event 3: indicate to client2 that server listener is in OnDoServerTask
+
+namespace {
+
+class RestrictedDispatchDeadlockServer : public Worker {
+ public:
+ RestrictedDispatchDeadlockServer(int server_num,
+ WaitableEvent* server_ready_event,
+ WaitableEvent** events,
+ RestrictedDispatchDeadlockServer* peer)
+ : Worker(server_num == 1 ? "channel1" : "channel2", Channel::MODE_SERVER),
+ server_num_(server_num),
+ server_ready_event_(server_ready_event),
+ events_(events),
+ peer_(peer),
+ client_kicked_(false) { }
+
+ void OnDoServerTask() {
+ events_[3]->Signal();
+ events_[2]->Wait();
+ events_[0]->Signal();
+ SendMessageToClient();
+ }
+
+ void Run() {
+ channel()->SetRestrictDispatchToSameChannel(true);
+ server_ready_event_->Signal();
+ }
+
+ base::Thread* ListenerThread() { return Worker::ListenerThread(); }
+
+ private:
+ bool OnMessageReceived(const Message& message) {
+ IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockServer, message)
+ IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
+ IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
+ IPC_END_MESSAGE_MAP()
+ return true;
+ }
+
+ void OnNoArgs() {
+ if (server_num_ == 1) {
+ DCHECK(peer_ != NULL);
+ peer_->SendMessageToClient();
+ }
+ }
+
+ void SendMessageToClient() {
+ Message* msg = new SyncChannelTestMsg_NoArgs;
+ msg->set_unblock(false);
+ DCHECK(!msg->should_unblock());
+ Send(msg);
+ }
+
+ int server_num_;
+ WaitableEvent* server_ready_event_;
+ WaitableEvent** events_;
+ RestrictedDispatchDeadlockServer* peer_;
+ bool client_kicked_;
+};
+
+class RestrictedDispatchDeadlockClient2 : public Worker {
+ public:
+ RestrictedDispatchDeadlockClient2(RestrictedDispatchDeadlockServer* server,
+ WaitableEvent* server_ready_event,
+ WaitableEvent** events)
+ : Worker("channel2", Channel::MODE_CLIENT),
+ server_(server),
+ server_ready_event_(server_ready_event),
+ events_(events),
+ received_msg_(false),
+ received_noarg_reply_(false),
+ done_issued_(false) {}
+
+ void Run() {
+ server_ready_event_->Wait();
+ }
+
+ void OnDoClient2Task() {
+ events_[3]->Wait();
+ events_[1]->Signal();
+ events_[2]->Signal();
+ DCHECK(received_msg_ == false);
+
+ Message* message = new SyncChannelTestMsg_NoArgs;
+ message->set_unblock(true);
+ Send(message);
+ received_noarg_reply_ = true;
+ }
+
+ base::Thread* ListenerThread() { return Worker::ListenerThread(); }
+ private:
+ bool OnMessageReceived(const Message& message) {
+ IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient2, message)
+ IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
+ IPC_END_MESSAGE_MAP()
+ return true;
+ }
+
+ void OnNoArgs() {
+ received_msg_ = true;
+ PossiblyDone();
+ }
+
+ void PossiblyDone() {
+ if (received_noarg_reply_ && received_msg_) {
+ DCHECK(done_issued_ == false);
+ done_issued_ = true;
+ Send(new SyncChannelTestMsg_Done);
+ Done();
+ }
+ }
+
+ RestrictedDispatchDeadlockServer* server_;
+ WaitableEvent* server_ready_event_;
+ WaitableEvent** events_;
+ bool received_msg_;
+ bool received_noarg_reply_;
+ bool done_issued_;
+};
+
+class RestrictedDispatchDeadlockClient1 : public Worker {
+ public:
+ RestrictedDispatchDeadlockClient1(RestrictedDispatchDeadlockServer* server,
+ RestrictedDispatchDeadlockClient2* peer,
+ WaitableEvent* server_ready_event,
+ WaitableEvent** events)
+ : Worker("channel1", Channel::MODE_CLIENT),
+ server_(server),
+ peer_(peer),
+ server_ready_event_(server_ready_event),
+ events_(events),
+ received_msg_(false),
+ received_noarg_reply_(false),
+ done_issued_(false) {}
+
+ void Run() {
+ server_ready_event_->Wait();
+ server_->ListenerThread()->message_loop()->PostTask(
+ FROM_HERE,
+ base::Bind(&RestrictedDispatchDeadlockServer::OnDoServerTask, server_));
+ peer_->ListenerThread()->message_loop()->PostTask(
+ FROM_HERE,
+ base::Bind(&RestrictedDispatchDeadlockClient2::OnDoClient2Task, peer_));
+ events_[0]->Wait();
+ events_[1]->Wait();
+ DCHECK(received_msg_ == false);
+
+ Message* message = new SyncChannelTestMsg_NoArgs;
+ message->set_unblock(true);
+ Send(message);
+ received_noarg_reply_ = true;
+ PossiblyDone();
+ }
+
+ base::Thread* ListenerThread() { return Worker::ListenerThread(); }
+ private:
+ bool OnMessageReceived(const Message& message) {
+ IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient1, message)
+ IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
+ IPC_END_MESSAGE_MAP()
+ return true;
+ }
+
+ void OnNoArgs() {
+ received_msg_ = true;
+ PossiblyDone();
+ }
+
+ void PossiblyDone() {
+ if (received_noarg_reply_ && received_msg_) {
+ DCHECK(done_issued_ == false);
+ done_issued_ = true;
+ Send(new SyncChannelTestMsg_Done);
+ Done();
+ }
+ }
+
+ RestrictedDispatchDeadlockServer* server_;
+ RestrictedDispatchDeadlockClient2* peer_;
+ WaitableEvent* server_ready_event_;
+ WaitableEvent** events_;
+ bool received_msg_;
+ bool received_noarg_reply_;
+ bool done_issued_;
+};
+
+} // namespace
+
+TEST_F(IPCSyncChannelTest, RestrictedDispatchDeadlock) {
+ std::vector<Worker*> workers;
+
+ // A shared worker thread so that server1 and server2 run on one thread.
+ base::Thread worker_thread("RestrictedDispatchDeadlock");
+ ASSERT_TRUE(worker_thread.Start());
+
+ WaitableEvent server1_ready(false, false);
+ WaitableEvent server2_ready(false, false);
+
+ WaitableEvent event0(false, false);
+ WaitableEvent event1(false, false);
+ WaitableEvent event2(false, false);
+ WaitableEvent event3(false, false);
+ WaitableEvent* events[4] = {&event0, &event1, &event2, &event3};
+
+ RestrictedDispatchDeadlockServer* server1;
+ RestrictedDispatchDeadlockServer* server2;
+ RestrictedDispatchDeadlockClient1* client1;
+ RestrictedDispatchDeadlockClient2* client2;
+
+ server2 = new RestrictedDispatchDeadlockServer(2, &server2_ready, events,
+ NULL);
+ server2->OverrideThread(&worker_thread);
+ workers.push_back(server2);
+
+ client2 = new RestrictedDispatchDeadlockClient2(server2, &server2_ready,
+ events);
+ workers.push_back(client2);
+
+ server1 = new RestrictedDispatchDeadlockServer(1, &server1_ready, events,
+ server2);
+ server1->OverrideThread(&worker_thread);
+ workers.push_back(server1);
+
+ client1 = new RestrictedDispatchDeadlockClient1(server1, client2,
+ &server1_ready, events);
+ workers.push_back(client1);
+
+ RunTest(workers);
+}
+
} // namespace IPC