diff options
author | jhorwich@chromium.org <jhorwich@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-01-11 22:39:54 +0000 |
---|---|---|
committer | jhorwich@chromium.org <jhorwich@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-01-11 22:39:54 +0000 |
commit | 522cc10d714c4a2e519176a8f24d2549820f63dc (patch) | |
tree | 1860802c1776ec2669811866f9be16f0cfd1275e /ipc | |
parent | 76b2482e4bb8586807d92025b942817030b3690d (diff) | |
download | chromium_src-522cc10d714c4a2e519176a8f24d2549820f63dc.zip chromium_src-522cc10d714c4a2e519176a8f24d2549820f63dc.tar.gz chromium_src-522cc10d714c4a2e519176a8f24d2549820f63dc.tar.bz2 |
Reimplement ReceivedSyncMsgQueue::DispatchMessages
Implementation of IPC::SyncChannel::ReceivedSyncMsgQueue::DispatchMessages that
does not hold any messages in a local stack-frame's delayed_queue, which was
causing me to see an inbound sync message from a plugin not dispatched while
the renderer was waiting for replies from the plugin. This was causing the
plugin and renderer to deadlock waiting for each other.
BUG=108491
TEST=Run Pepperized O3D and observe for tab hangs
TEST=Run ipc_tests unittests
Review URL: http://codereview.chromium.org/9022038
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@117309 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'ipc')
-rw-r--r-- | ipc/ipc_sync_channel.cc | 46 | ||||
-rw-r--r-- | ipc/ipc_sync_channel_unittest.cc | 260 |
2 files changed, 288 insertions, 18 deletions
diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc index f3c5384..74d9744 100644 --- a/ipc/ipc_sync_channel.cc +++ b/ipc/ipc_sync_channel.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -64,6 +64,7 @@ class SyncChannel::ReceivedSyncMsgQueue : // We set the event in case the listener thread is blocked (or is about // to). In case it's not, the PostTask dispatches the messages. message_queue_.push_back(QueuedMessage(new Message(msg), context)); + message_queue_version_++; } dispatch_event_.Signal(); @@ -89,27 +90,35 @@ class SyncChannel::ReceivedSyncMsgQueue : } void DispatchMessages(SyncContext* dispatching_context) { - SyncMessageQueue delayed_queue; + bool first_time = true; + uint32 expected_version = 0; + SyncMessageQueue::iterator it; while (true) { - Message* message; + Message* message = NULL; scoped_refptr<SyncChannel::SyncContext> context; { base::AutoLock auto_lock(message_lock_); - if (message_queue_.empty()) { - message_queue_ = delayed_queue; - break; + if (first_time || message_queue_version_ != expected_version) { + it = message_queue_.begin(); + first_time = false; + } + for (; it != message_queue_.end(); it++) { + if (!it->context->restrict_dispatch() || + it->context == dispatching_context) { + message = it->message; + context = it->context; + it = message_queue_.erase(it); + message_queue_version_++; + expected_version = message_queue_version_; + break; + } } - - message = message_queue_.front().message; - context = message_queue_.front().context; - message_queue_.pop_front(); - } - if (context->restrict_dispatch() && context != dispatching_context) { - delayed_queue.push_back(QueuedMessage(message, context)); - } else { - context->OnDispatchMessage(*message); - delete message; } + + if (message == NULL) + break; + context->OnDispatchMessage(*message); + delete message; } } @@ -122,6 +131,7 @@ class SyncChannel::ReceivedSyncMsgQueue : if (iter->context == context) { delete iter->message; iter = message_queue_.erase(iter); + message_queue_version_++; } else { iter++; } @@ -169,6 +179,7 @@ class SyncChannel::ReceivedSyncMsgQueue : // See the comment in SyncChannel::SyncChannel for why this event is created // as manual reset. ReceivedSyncMsgQueue() : + message_queue_version_(0), dispatch_event_(true, false), listener_message_loop_(base::MessageLoopProxy::current()), task_pending_(false), @@ -185,8 +196,9 @@ class SyncChannel::ReceivedSyncMsgQueue : scoped_refptr<SyncChannel::SyncContext> context; }; - typedef std::deque<QueuedMessage> SyncMessageQueue; + typedef std::list<QueuedMessage> SyncMessageQueue; SyncMessageQueue message_queue_; + uint32 message_queue_version_; // Used to signal DispatchMessages to rescan std::vector<QueuedMessage> received_replies_; diff --git a/ipc/ipc_sync_channel_unittest.cc b/ipc/ipc_sync_channel_unittest.cc index e7903f4..f396843 100644 --- a/ipc/ipc_sync_channel_unittest.cc +++ b/ipc/ipc_sync_channel_unittest.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. // @@ -1334,4 +1334,262 @@ TEST_F(IPCSyncChannelTest, RestrictedDispatch) { EXPECT_EQ(3, success); } +//----------------------------------------------------------------------------- + +// This test case inspired by crbug.com/108491 +// We create two servers that use the same ListenerThread but have +// SetRestrictDispatchToSameChannel set to true. +// We create clients, then use some specific WaitableEvent wait/signalling to +// ensure that messages get dispatched in a way that causes a deadlock due to +// a nested dispatch and an eligible message in a higher-level dispatch's +// delayed_queue. Specifically, we start with client1 about so send an +// unblocking message to server1, while the shared listener thread for the +// servers server1 and server2 is about to send a non-unblocking message to +// client1. At the same time, client2 will be about to send an unblocking +// message to server2. Server1 will handle the client1->server1 message by +// telling server2 to send a non-unblocking message to client2. +// What should happen is that the send to server2 should find the pending, +// same-context client2->server2 message to dispatch, causing client2 to +// unblock then handle the server2->client2 message, so that the shared +// servers' listener thread can then respond to the client1->server1 message. +// Then client1 can handle the non-unblocking server1->client1 message. +// The old code would end up in a state where the server2->client2 message is +// sent, but the client2->server2 message (which is eligible for dispatch, and +// which is what client2 is waiting for) is stashed in a local delayed_queue +// that has server1's channel context, causing a deadlock. +// WaitableEvents in the events array are used to: +// event 0: indicate to client1 that server listener is in OnDoServerTask +// event 1: indicate to client1 that client2 listener is in OnDoClient2Task +// event 2: indicate to server1 that client2 listener is in OnDoClient2Task +// event 3: indicate to client2 that server listener is in OnDoServerTask + +namespace { + +class RestrictedDispatchDeadlockServer : public Worker { + public: + RestrictedDispatchDeadlockServer(int server_num, + WaitableEvent* server_ready_event, + WaitableEvent** events, + RestrictedDispatchDeadlockServer* peer) + : Worker(server_num == 1 ? "channel1" : "channel2", Channel::MODE_SERVER), + server_num_(server_num), + server_ready_event_(server_ready_event), + events_(events), + peer_(peer), + client_kicked_(false) { } + + void OnDoServerTask() { + events_[3]->Signal(); + events_[2]->Wait(); + events_[0]->Signal(); + SendMessageToClient(); + } + + void Run() { + channel()->SetRestrictDispatchToSameChannel(true); + server_ready_event_->Signal(); + } + + base::Thread* ListenerThread() { return Worker::ListenerThread(); } + + private: + bool OnMessageReceived(const Message& message) { + IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockServer, message) + IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) + IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done) + IPC_END_MESSAGE_MAP() + return true; + } + + void OnNoArgs() { + if (server_num_ == 1) { + DCHECK(peer_ != NULL); + peer_->SendMessageToClient(); + } + } + + void SendMessageToClient() { + Message* msg = new SyncChannelTestMsg_NoArgs; + msg->set_unblock(false); + DCHECK(!msg->should_unblock()); + Send(msg); + } + + int server_num_; + WaitableEvent* server_ready_event_; + WaitableEvent** events_; + RestrictedDispatchDeadlockServer* peer_; + bool client_kicked_; +}; + +class RestrictedDispatchDeadlockClient2 : public Worker { + public: + RestrictedDispatchDeadlockClient2(RestrictedDispatchDeadlockServer* server, + WaitableEvent* server_ready_event, + WaitableEvent** events) + : Worker("channel2", Channel::MODE_CLIENT), + server_(server), + server_ready_event_(server_ready_event), + events_(events), + received_msg_(false), + received_noarg_reply_(false), + done_issued_(false) {} + + void Run() { + server_ready_event_->Wait(); + } + + void OnDoClient2Task() { + events_[3]->Wait(); + events_[1]->Signal(); + events_[2]->Signal(); + DCHECK(received_msg_ == false); + + Message* message = new SyncChannelTestMsg_NoArgs; + message->set_unblock(true); + Send(message); + received_noarg_reply_ = true; + } + + base::Thread* ListenerThread() { return Worker::ListenerThread(); } + private: + bool OnMessageReceived(const Message& message) { + IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient2, message) + IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) + IPC_END_MESSAGE_MAP() + return true; + } + + void OnNoArgs() { + received_msg_ = true; + PossiblyDone(); + } + + void PossiblyDone() { + if (received_noarg_reply_ && received_msg_) { + DCHECK(done_issued_ == false); + done_issued_ = true; + Send(new SyncChannelTestMsg_Done); + Done(); + } + } + + RestrictedDispatchDeadlockServer* server_; + WaitableEvent* server_ready_event_; + WaitableEvent** events_; + bool received_msg_; + bool received_noarg_reply_; + bool done_issued_; +}; + +class RestrictedDispatchDeadlockClient1 : public Worker { + public: + RestrictedDispatchDeadlockClient1(RestrictedDispatchDeadlockServer* server, + RestrictedDispatchDeadlockClient2* peer, + WaitableEvent* server_ready_event, + WaitableEvent** events) + : Worker("channel1", Channel::MODE_CLIENT), + server_(server), + peer_(peer), + server_ready_event_(server_ready_event), + events_(events), + received_msg_(false), + received_noarg_reply_(false), + done_issued_(false) {} + + void Run() { + server_ready_event_->Wait(); + server_->ListenerThread()->message_loop()->PostTask( + FROM_HERE, + base::Bind(&RestrictedDispatchDeadlockServer::OnDoServerTask, server_)); + peer_->ListenerThread()->message_loop()->PostTask( + FROM_HERE, + base::Bind(&RestrictedDispatchDeadlockClient2::OnDoClient2Task, peer_)); + events_[0]->Wait(); + events_[1]->Wait(); + DCHECK(received_msg_ == false); + + Message* message = new SyncChannelTestMsg_NoArgs; + message->set_unblock(true); + Send(message); + received_noarg_reply_ = true; + PossiblyDone(); + } + + base::Thread* ListenerThread() { return Worker::ListenerThread(); } + private: + bool OnMessageReceived(const Message& message) { + IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient1, message) + IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) + IPC_END_MESSAGE_MAP() + return true; + } + + void OnNoArgs() { + received_msg_ = true; + PossiblyDone(); + } + + void PossiblyDone() { + if (received_noarg_reply_ && received_msg_) { + DCHECK(done_issued_ == false); + done_issued_ = true; + Send(new SyncChannelTestMsg_Done); + Done(); + } + } + + RestrictedDispatchDeadlockServer* server_; + RestrictedDispatchDeadlockClient2* peer_; + WaitableEvent* server_ready_event_; + WaitableEvent** events_; + bool received_msg_; + bool received_noarg_reply_; + bool done_issued_; +}; + +} // namespace + +TEST_F(IPCSyncChannelTest, RestrictedDispatchDeadlock) { + std::vector<Worker*> workers; + + // A shared worker thread so that server1 and server2 run on one thread. + base::Thread worker_thread("RestrictedDispatchDeadlock"); + ASSERT_TRUE(worker_thread.Start()); + + WaitableEvent server1_ready(false, false); + WaitableEvent server2_ready(false, false); + + WaitableEvent event0(false, false); + WaitableEvent event1(false, false); + WaitableEvent event2(false, false); + WaitableEvent event3(false, false); + WaitableEvent* events[4] = {&event0, &event1, &event2, &event3}; + + RestrictedDispatchDeadlockServer* server1; + RestrictedDispatchDeadlockServer* server2; + RestrictedDispatchDeadlockClient1* client1; + RestrictedDispatchDeadlockClient2* client2; + + server2 = new RestrictedDispatchDeadlockServer(2, &server2_ready, events, + NULL); + server2->OverrideThread(&worker_thread); + workers.push_back(server2); + + client2 = new RestrictedDispatchDeadlockClient2(server2, &server2_ready, + events); + workers.push_back(client2); + + server1 = new RestrictedDispatchDeadlockServer(1, &server1_ready, events, + server2); + server1->OverrideThread(&worker_thread); + workers.push_back(server1); + + client1 = new RestrictedDispatchDeadlockClient1(server1, client2, + &server1_ready, events); + workers.push_back(client1); + + RunTest(workers); +} + } // namespace IPC |