diff options
author | jcampan@google.com <jcampan@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-08-12 01:25:41 +0000 |
---|---|---|
committer | jcampan@google.com <jcampan@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-08-12 01:25:41 +0000 |
commit | d65cab7ac310ea12c5d946ff40242a243ce911da (patch) | |
tree | fe3eaa4a4e6f14a7416c4b4da351e18cd34d34b2 /chrome/common | |
parent | 1a48f315b0ca5c26c4446070edfb5842ed06c8c7 (diff) | |
download | chromium_src-d65cab7ac310ea12c5d946ff40242a243ce911da.zip chromium_src-d65cab7ac310ea12c5d946ff40242a243ce911da.tar.gz chromium_src-d65cab7ac310ea12c5d946ff40242a243ce911da.tar.bz2 |
Enabling sync_channel in the browser to allow accessibility code making blocking calls. This replaces my previous CL that was somehow duplicating some of these functionalities.
BUG=None
TEST=Run the unit tests.
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@691 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/common')
-rw-r--r-- | chrome/common/ipc_channel_proxy.cc | 14 | ||||
-rw-r--r-- | chrome/common/ipc_channel_proxy.h | 5 | ||||
-rw-r--r-- | chrome/common/ipc_sync_channel.cc | 79 | ||||
-rw-r--r-- | chrome/common/ipc_sync_channel.h | 24 | ||||
-rw-r--r-- | chrome/common/ipc_sync_channel_unittest.cc | 105 | ||||
-rw-r--r-- | chrome/common/ipc_sync_message.cc | 7 | ||||
-rw-r--r-- | chrome/common/ipc_sync_message.h | 4 |
7 files changed, 207 insertions, 31 deletions
diff --git a/chrome/common/ipc_channel_proxy.cc b/chrome/common/ipc_channel_proxy.cc index 9d6b473..2cddc8e 100644 --- a/chrome/common/ipc_channel_proxy.cc +++ b/chrome/common/ipc_channel_proxy.cc @@ -55,8 +55,7 @@ void ChannelProxy::Context::CreateChannel(const std::wstring& id, channel_ = new Channel(id, mode, this); } -// Called on the IPC::Channel thread -void ChannelProxy::Context::OnMessageReceived(const Message& message) { +bool ChannelProxy::Context::TryFilters(const Message& message) { #ifdef IPC_MESSAGE_LOG_ENABLED Logging* logger = Logging::current(); if (logger->Enabled()) @@ -69,9 +68,17 @@ void ChannelProxy::Context::OnMessageReceived(const Message& message) { if (logger->Enabled()) logger->OnPostDispatchMessage(message, channel_id_); #endif - return; + return true; } } + return false; +} + +// Called on the IPC::Channel thread +void ChannelProxy::Context::OnMessageReceived(const Message& message) { + // First give a chance to the filters to process this message. + if (TryFilters(message)) + return; // NOTE: This code relies on the listener's message loop not going away while // this thread is active. That should be a reasonable assumption, but it @@ -217,7 +224,6 @@ ChannelProxy::ChannelProxy(const std::wstring& channel_id, Channel::Mode mode, } ChannelProxy::ChannelProxy(const std::wstring& channel_id, Channel::Mode mode, - Channel::Listener* listener, MessageFilter* filter, MessageLoop* ipc_thread, Context* context, bool create_pipe_now) : context_(context) { diff --git a/chrome/common/ipc_channel_proxy.h b/chrome/common/ipc_channel_proxy.h index ca8abb4..82b3d88 100644 --- a/chrome/common/ipc_channel_proxy.h +++ b/chrome/common/ipc_channel_proxy.h @@ -149,7 +149,6 @@ class ChannelProxy : public Message::Sender { // to the internal state. If create_pipe_now is true, the pipe is created // immediately. Otherwise it's created on the IO thread. ChannelProxy(const std::wstring& channel_id, Channel::Mode mode, - Channel::Listener* listener, MessageFilter* filter, MessageLoop* ipc_thread_loop, Context* context, bool create_pipe_now); @@ -170,6 +169,10 @@ class ChannelProxy : public Message::Sender { Channel::Listener* listener() const { return listener_; } const std::wstring& channel_id() const { return channel_id_; } + // Gives the filters a chance at processing |message|. + // Returns true if the message was processed, false otherwise. + bool TryFilters(const Message& message); + private: friend class ChannelProxy; // Create the Channel diff --git a/chrome/common/ipc_sync_channel.cc b/chrome/common/ipc_sync_channel.cc index 897e7bf..e01759a 100644 --- a/chrome/common/ipc_sync_channel.cc +++ b/chrome/common/ipc_sync_channel.cc @@ -70,7 +70,6 @@ class SyncChannel::ReceivedSyncMsgQueue : ~ReceivedSyncMsgQueue() { CloseHandle(blocking_event_); - ThreadLocalStorage::Set(g_tls_index, NULL); } // Called on IPC thread when a synchronous message or reply arrives. @@ -252,13 +251,20 @@ SyncChannel::SyncContext::SyncContext( // Stash a pointer to the listener thread's ReceivedSyncMsgQueue, as we // need to be able to access it in the IPC thread. received_sync_msgs_ = new ReceivedSyncMsgQueue(); + // TODO(jcampan): http:///b/1319842 we are adding an extra-ref to keep this + // object around for the duration of the process. We used to remove it from + // the TLS when it was destroyed, but that was causing problems as we could + // be destroyed in a different thread then the thread we had been created + // on. This needs to be revisited at some point. + received_sync_msgs_->AddRef(); + ThreadLocalStorage::Set(g_tls_index, received_sync_msgs_.get()); } } SyncChannel::SyncContext::~SyncContext() { while (!deserializers_.empty()) - PopDeserializer(); + PopDeserializer(true); } // Adds information about an outgoing sync message to the context so that @@ -297,7 +303,7 @@ bool SyncChannel::SyncContext::UnblockListener(const Message* msg) { reply_deserialize_result_ = false; if (!deserializers_.empty()) { reply_event = deserializers_.top().reply_event; - PopDeserializer(); + PopDeserializer(false); } } else { if (deserializers_.empty()) @@ -317,7 +323,7 @@ bool SyncChannel::SyncContext::UnblockListener(const Message* msg) { // Can't CloseHandle the event just yet, since doing so might cause the // Wait call above to never return. reply_event = deserializers_.top().reply_event; - PopDeserializer(); + PopDeserializer(false); } } @@ -336,6 +342,10 @@ bool SyncChannel::SyncContext::UnblockListener(const Message* msg) { // Called on the IPC thread. void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) { + // Give the filters a chance at processing this message. + if (TryFilters(msg)) + return; + if (UnblockListener(&msg)) return; @@ -360,19 +370,23 @@ void SyncChannel::SyncContext::OnChannelError() { Context::OnChannelError(); } -void SyncChannel::SyncContext::PopDeserializer() { - delete deserializers_.top().deserializer; +void SyncChannel::SyncContext::PopDeserializer(bool close_reply_event) { + PendingSyncMsg msg = deserializers_.top(); + delete msg.deserializer; + if (close_reply_event) + CloseHandle(msg.reply_event); deserializers_.pop(); } SyncChannel::SyncChannel(const std::wstring& channel_id, Channel::Mode mode, - Channel::Listener* listener, + Channel::Listener* listener, MessageFilter* filter, MessageLoop* ipc_message_loop, - bool create_pipe_now) - : ChannelProxy(channel_id, mode, listener, NULL, ipc_message_loop, - new SyncContext(listener, NULL, ipc_message_loop), + bool create_pipe_now, HANDLE shutdown_event) + : ChannelProxy(channel_id, mode, ipc_message_loop, + new SyncContext(listener, filter, ipc_message_loop), create_pipe_now), - shutdown_event_(ChildProcess::GetShutDownEvent()) { + shutdown_event_(shutdown_event), + sync_messages_with_no_timeout_allowed_(true) { DCHECK(shutdown_event_ != NULL); } @@ -385,11 +399,16 @@ SyncChannel::~SyncChannel() { } bool SyncChannel::Send(IPC::Message* message) { + return SendWithTimeout(message, INFINITE); +} + +bool SyncChannel::SendWithTimeout(IPC::Message* message, int timeout_ms) { bool message_is_sync = message->is_sync(); HANDLE pump_messages_event = NULL; HANDLE reply_event = NULL; if (message_is_sync) { + DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE); IPC::SyncMessage* sync_msg = static_cast<IPC::SyncMessage*>(message); reply_event = sync_context()->Push(sync_msg); pump_messages_event = sync_msg->pump_messages_event(); @@ -409,24 +428,32 @@ bool SyncChannel::Send(IPC::Message* message) { pump_messages_event}; DWORD result; + TimeTicks before = TimeTicks::Now(); if (pump_messages_event == NULL) { // No need to pump messages since we didn't get an event to check. - result = WaitForMultipleObjects(3, objects, FALSE, INFINITE); + result = WaitForMultipleObjects(3, objects, FALSE, timeout_ms); } else { // If the event is set, then we pump messages. Otherwise we also wait on // it so that if it gets set we start pumping messages. if (WaitForSingleObject(pump_messages_event, 0) == WAIT_OBJECT_0) { - result = MsgWaitForMultipleObjects(3, objects, FALSE, INFINITE, - QS_ALLINPUT); + // Before calling MsgWaitForMultipleObjects() we check that our events + // are not signaled. The windows message queue might always have events + // starving the checking of our events otherwise. + result = WaitForMultipleObjects(3, objects, FALSE, 0); + if (result == WAIT_TIMEOUT) { + result = MsgWaitForMultipleObjects(3, objects, FALSE, timeout_ms, + QS_ALLINPUT); + } } else { - result = WaitForMultipleObjects(4, objects, FALSE, INFINITE); + result = WaitForMultipleObjects(4, objects, FALSE, timeout_ms); } } - if (result == WAIT_OBJECT_0) { - // Process shut down before we can get a reply to a synchronous message. - // Unblock the thread. - // Leak reply_event. Since we're shutting down, it's not a big deal. + if (result == WAIT_OBJECT_0 || result == WAIT_TIMEOUT) { + // Process shut down before we can get a reply to a synchronous message, + // or timed-out. Unblock the thread. + AutoLock auto_lock(*(sync_context()->deserializers_lock())); + sync_context()->PopDeserializer(true); return false; } @@ -458,7 +485,19 @@ bool SyncChannel::Send(IPC::Message* message) { // MsgWaitForMultipleObjects instead. } - // Continue looping until we get the reply to our synchronous message. + if (timeout_ms != INFINITE) { + TimeDelta time_delta = TimeTicks::Now() - before; + timeout_ms -= static_cast<int>(time_delta.InMilliseconds()); + if (timeout_ms <= 0) { + // We timed-out while processing messages. + AutoLock auto_lock(*(sync_context()->deserializers_lock())); + sync_context()->PopDeserializer(true); + return false; + } + } + + // Continue looping until we either get the reply to our synchronous message + // or we time-out. } while (true); } diff --git a/chrome/common/ipc_sync_channel.h b/chrome/common/ipc_sync_channel.h index af6e153..bd473d4 100644 --- a/chrome/common/ipc_sync_channel.h +++ b/chrome/common/ipc_sync_channel.h @@ -52,13 +52,20 @@ class SyncMessage; class SyncChannel : public ChannelProxy { public: SyncChannel(const std::wstring& channel_id, Channel::Mode mode, - Channel::Listener* listener, MessageLoop* ipc_message_loop, - bool create_pipe_now); + Channel::Listener* listener, MessageFilter* filter, + MessageLoop* ipc_message_loop, bool create_pipe_now, + HANDLE shutdown_handle); ~SyncChannel(); virtual bool Send(Message* message); + virtual bool SendWithTimeout(Message* message, int timeout_ms); bool UnblockListener(Message* message); + // Whether we allow sending messages with no time-out. + void set_sync_messages_with_no_timeout_allowed(bool value) { + sync_messages_with_no_timeout_allowed_ = value; + } + protected: class ReceivedSyncMsgQueue; friend class ReceivedSyncMsgQueue; @@ -95,6 +102,14 @@ class SyncChannel : public ChannelProxy { // Otherwise the function returns false. bool UnblockListener(const Message* msg); + + // Cleanly remove the top deserializer (and throw it away). + // You need to acquire the deserializers_lock before calling this. + void PopDeserializer(bool close_reply_event); + + // Returns the lock that should be acquired before calling PopDeserializer. + Lock* deserializers_lock() { return &deserializers_lock_; } + private: void OnMessageReceived(const Message& msg); void OnChannelError(); @@ -109,9 +124,6 @@ class SyncChannel : public ChannelProxy { HANDLE reply_event; }; - // Cleanly remove the top deserializer (and throw it away). - void PopDeserializer(); - typedef std::stack<PendingSyncMsg> PendingSyncMessageQueue; PendingSyncMessageQueue deserializers_; Lock deserializers_lock_; @@ -130,6 +142,8 @@ class SyncChannel : public ChannelProxy { std::stack<HANDLE> pump_messages_events_; + bool sync_messages_with_no_timeout_allowed_; + DISALLOW_EVIL_CONSTRUCTORS(SyncChannel); }; diff --git a/chrome/common/ipc_sync_channel_unittest.cc b/chrome/common/ipc_sync_channel_unittest.cc index 74f046a..78d825c 100644 --- a/chrome/common/ipc_sync_channel_unittest.cc +++ b/chrome/common/ipc_sync_channel_unittest.cc @@ -116,6 +116,9 @@ class Worker : public Channel::Listener, public Message::Sender { void AddRef() { } void Release() { } bool Send(Message* msg) { return channel_->Send(msg); } + bool SendWithTimeout(Message* msg, int timeout_ms) { + return channel_->SendWithTimeout(msg, timeout_ms); + } void WaitForChannelCreation() { channel_created_.Wait(); } void CloseChannel() { channel_.reset(); } void Start() { @@ -158,7 +161,8 @@ class Worker : public Channel::Listener, public Message::Sender { ipc_thread_.Start(); // Link ipc_thread_, listener_thread_ and channel_ altogether. channel_.reset(new SyncChannel( - channel_name_, mode_, this, ipc_thread_.message_loop(), true)); + channel_name_, mode_, this, NULL, ipc_thread_.message_loop(), true, + TestProcess::GetShutDownEvent())); channel_created_.Set(); Run(); } @@ -623,3 +627,102 @@ TEST(IPCSyncChannelTest, ChattyServer) { workers.push_back(new ChattyRecursiveClient()); RunTest(workers); } + + +//------------------------------------------------------------------------------ +class TimeoutServer : public Worker { + public: + TimeoutServer(int timeout_ms, + std::vector<bool> timeout_seq) + : Worker(Channel::MODE_SERVER, "timeout_server"), + timeout_ms_(timeout_ms), + timeout_seq_(timeout_seq) { + } + + void Run() { + for (std::vector<bool>::const_iterator iter = timeout_seq_.begin(); + iter != timeout_seq_.end(); ++iter) { + int answer = 0; + bool result = + SendWithTimeout(new SyncChannelTestMsg_AnswerToLife(&answer), + timeout_ms_); + if (*iter) { + // Time-out expected. + DCHECK(!result); + DCHECK(answer == 0); + } else { + DCHECK(result); + DCHECK(answer == 42); + } + } + Done(); + } + + private: + int timeout_ms_; + std::vector<bool> timeout_seq_; +}; + +class UnresponsiveClient : public Worker { + public: + UnresponsiveClient(std::vector<bool> timeout_seq) + : Worker(Channel::MODE_CLIENT, "unresponsive_client"), + timeout_seq_(timeout_seq) { + } + + void OnAnswerDelay(Message* reply_msg) { + DCHECK(!timeout_seq_.empty()); + if (!timeout_seq_[0]) { + SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42); + Send(reply_msg); + } else { + // Don't reply. + } + timeout_seq_.erase(timeout_seq_.begin()); + if (timeout_seq_.empty()) + Done(); + } + + private: + // Whether we should time-out or respond to the various messages we receive. + std::vector<bool> timeout_seq_; +}; + +// Tests that SendWithTimeout does not time-out if the response comes back fast +// enough. +TEST(IPCSyncChannelTest, SendWithTimeoutOK) { + std::vector<Worker*> workers; + std::vector<bool> timeout_seq; + timeout_seq.push_back(false); + timeout_seq.push_back(false); + timeout_seq.push_back(false); + workers.push_back(new TimeoutServer(5000, timeout_seq)); + workers.push_back(new SimpleClient()); + RunTest(workers); +} + +// Tests that SendWithTimeout does time-out. +TEST(IPCSyncChannelTest, SendWithTimeoutTimeout) { + std::vector<Worker*> workers; + std::vector<bool> timeout_seq; + timeout_seq.push_back(true); + timeout_seq.push_back(false); + timeout_seq.push_back(false); + workers.push_back(new TimeoutServer(100, timeout_seq)); + workers.push_back(new UnresponsiveClient(timeout_seq)); + RunTest(workers); +} + +// Sends some message that time-out and some that succeed. +TEST(IPCSyncChannelTest, SendWithTimeoutMixedOKAndTimeout) { + std::vector<Worker*> workers; + std::vector<bool> timeout_seq; + timeout_seq.push_back(true); + timeout_seq.push_back(false); + timeout_seq.push_back(false); + timeout_seq.push_back(true); + timeout_seq.push_back(false); + workers.push_back(new TimeoutServer(100, timeout_seq)); + workers.push_back(new UnresponsiveClient(timeout_seq)); + RunTest(workers); +} diff --git a/chrome/common/ipc_sync_message.cc b/chrome/common/ipc_sync_message.cc index 43d6d5e..8dc75c4 100644 --- a/chrome/common/ipc_sync_message.cc +++ b/chrome/common/ipc_sync_message.cc @@ -38,6 +38,8 @@ namespace IPC { uint32 SyncMessage::next_id_ = 0; #define kSyncMessageHeaderSize 4 +// A dummy handle used by EnableMessagePumping. +HANDLE dummy_event = ::CreateEvent(NULL, TRUE, TRUE, NULL); SyncMessage::SyncMessage( int32 routing_id, @@ -63,6 +65,11 @@ MessageReplyDeserializer* SyncMessage::GetReplyDeserializer() { return rv; } +void SyncMessage::EnableMessagePumping() { + DCHECK(!pump_messages_event_); + set_pump_messages_event(dummy_event); +} + bool SyncMessage::IsMessageReplyTo(const Message& msg, int request_id) { if (!msg.is_reply()) return false; diff --git a/chrome/common/ipc_sync_message.h b/chrome/common/ipc_sync_message.h index bcf47cbe..8ab6cae 100644 --- a/chrome/common/ipc_sync_message.h +++ b/chrome/common/ipc_sync_message.h @@ -63,6 +63,10 @@ class SyncMessage : public Message { } } + // Call this if you always want to pump messages. You can call this method + // or set_pump_messages_event but not both. + void EnableMessagePumping(); + HANDLE pump_messages_event() const { return pump_messages_event_; } // Returns true if the message is a reply to the given request id. |