diff options
-rw-r--r-- | base/object_watcher.cc | 7 | ||||
-rw-r--r-- | base/object_watcher.h | 4 | ||||
-rw-r--r-- | base/object_watcher_unittest.cc | 3 | ||||
-rw-r--r-- | chrome/common/ipc_channel_proxy.cc | 24 | ||||
-rw-r--r-- | chrome/common/ipc_channel_proxy.h | 24 | ||||
-rw-r--r-- | chrome/common/ipc_sync_channel.cc | 406 | ||||
-rw-r--r-- | chrome/common/ipc_sync_channel.h | 93 | ||||
-rw-r--r-- | chrome/common/ipc_sync_channel_unittest.cc | 477 | ||||
-rw-r--r-- | chrome/common/ipc_sync_message.h | 5 |
9 files changed, 668 insertions, 375 deletions
diff --git a/base/object_watcher.cc b/base/object_watcher.cc index 4e6e57a..cad281f 100644 --- a/base/object_watcher.cc +++ b/base/object_watcher.cc @@ -107,6 +107,13 @@ bool ObjectWatcher::StopWatching() { return true; } +HANDLE ObjectWatcher::GetWatchedObject() { + if (!watch_) + return NULL; + + return watch_->object; +} + // static void CALLBACK ObjectWatcher::DoneWaiting(void* param, BOOLEAN timed_out) { DCHECK(!timed_out); diff --git a/base/object_watcher.h b/base/object_watcher.h index 3a0b9a9..4660ce7 100644 --- a/base/object_watcher.h +++ b/base/object_watcher.h @@ -68,6 +68,10 @@ class ObjectWatcher : public MessageLoop::DestructionObserver { // bool StopWatching(); + // Returns the handle of the object being watched, or NULL if the object + // watcher is stopped. + HANDLE GetWatchedObject(); + private: // Called on a background thread when done waiting. static void CALLBACK DoneWaiting(void* param, BOOLEAN timed_out); diff --git a/base/object_watcher_unittest.cc b/base/object_watcher_unittest.cc index 3d2b068..06e7284 100644 --- a/base/object_watcher_unittest.cc +++ b/base/object_watcher_unittest.cc @@ -34,6 +34,7 @@ void RunTest_BasicSignal(MessageLoop::Type message_loop_type) { MessageLoop message_loop(message_loop_type); base::ObjectWatcher watcher; + EXPECT_EQ(NULL, watcher.GetWatchedObject()); // A manual-reset event that is not yet signaled. HANDLE event = CreateEvent(NULL, TRUE, FALSE, NULL); @@ -41,11 +42,13 @@ void RunTest_BasicSignal(MessageLoop::Type message_loop_type) { QuitDelegate delegate; bool ok = watcher.StartWatching(event, &delegate); EXPECT_TRUE(ok); + EXPECT_EQ(event, watcher.GetWatchedObject()); SetEvent(event); MessageLoop::current()->Run(); + EXPECT_EQ(NULL, watcher.GetWatchedObject()); CloseHandle(event); } diff --git a/chrome/common/ipc_channel_proxy.cc b/chrome/common/ipc_channel_proxy.cc index baac273..23d8ea9 100644 --- a/chrome/common/ipc_channel_proxy.cc +++ b/chrome/common/ipc_channel_proxy.cc @@ -52,14 +52,16 @@ bool ChannelProxy::Context::TryFilters(const Message& message) { // Called on the IPC::Channel thread void ChannelProxy::Context::OnMessageReceived(const Message& message) { // First give a chance to the filters to process this message. - if (TryFilters(message)) - return; + if (!TryFilters(message)) + OnMessageReceivedNoFilter(message); +} +// Called on the IPC::Channel thread +void ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) { // NOTE: This code relies on the listener's message loop not going away while // this thread is active. That should be a reasonable assumption, but it // feels risky. We may want to invent some more indirect way of referring to // a MessageLoop if this becomes a problem. - listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( this, &Context::OnDispatchMessage, message)); } @@ -82,7 +84,7 @@ void ChannelProxy::Context::OnChannelError() { } // Called on the IPC::Channel thread -void ChannelProxy::Context::OnOpenChannel() { +void ChannelProxy::Context::OnChannelOpened() { DCHECK(channel_ != NULL); // Assume a reference to ourselves on behalf of this thread. This reference @@ -99,7 +101,7 @@ void ChannelProxy::Context::OnOpenChannel() { } // Called on the IPC::Channel thread -void ChannelProxy::Context::OnCloseChannel() { +void ChannelProxy::Context::OnChannelClosed() { // It's okay for IPC::ChannelProxy::Close to be called more than once, which // would result in this branch being taken. if (!channel_) @@ -220,22 +222,18 @@ void ChannelProxy::Init(const std::wstring& channel_id, Channel::Mode mode, // complete initialization on the background thread context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( - context_.get(), &Context::OnOpenChannel)); + context_.get(), &Context::OnChannelOpened)); } void ChannelProxy::Close() { // Clear the backpointer to the listener so that any pending calls to // Context::OnDispatchMessage or OnDispatchError will be ignored. It is // possible that the channel could be closed while it is receiving messages! - context_->clear(); + context_->Clear(); - if (MessageLoop::current() == context_->ipc_message_loop()) { - // We're being destructed on the IPC thread, so no need to use the message - // loop as it might go away. - context_->OnCloseChannel(); - } else { + if (context_->ipc_message_loop()) { context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( - context_.get(), &Context::OnCloseChannel)); + context_.get(), &Context::OnChannelClosed)); } } diff --git a/chrome/common/ipc_channel_proxy.h b/chrome/common/ipc_channel_proxy.h index 8a722d9..59c8e47 100644 --- a/chrome/common/ipc_channel_proxy.h +++ b/chrome/common/ipc_channel_proxy.h @@ -112,8 +112,6 @@ class ChannelProxy : public Message::Sender { void RemoveFilter(MessageFilter* filter); protected: - Channel::Listener* listener() const { return context_->listener(); } - class Context; // A subclass uses this constructor if it needs to add more information // to the internal state. If create_pipe_now is true, the pipe is created @@ -129,6 +127,7 @@ class ChannelProxy : public Message::Sender { Context(Channel::Listener* listener, MessageFilter* filter, MessageLoop* ipc_thread); virtual ~Context() { } + MessageLoop* ipc_message_loop() const { return ipc_message_loop_; } protected: // IPC::Channel::Listener methods: @@ -136,6 +135,9 @@ class ChannelProxy : public Message::Sender { virtual void OnChannelConnected(int32 peer_pid); virtual void OnChannelError(); + // Like OnMessageReceived but doesn't try the filters. + void OnMessageReceivedNoFilter(const Message& message); + Channel::Listener* listener() const { return listener_; } const std::wstring& channel_id() const { return channel_id_; } @@ -143,14 +145,21 @@ class ChannelProxy : public Message::Sender { // Returns true if the message was processed, false otherwise. bool TryFilters(const Message& message); + // Like Open and Close, but called on the IPC thread. + virtual void OnChannelOpened(); + virtual void OnChannelClosed(); + + // Called on the consumers thread when the ChannelProxy is closed. At that + // point the consumer is telling us that they don't want to receive any + // more messages, so we honor that wish by forgetting them! + virtual void Clear() { listener_ = NULL; } + private: friend class ChannelProxy; // Create the Channel void CreateChannel(const std::wstring& id, const Channel::Mode& mode); // Methods called via InvokeLater: - void OnOpenChannel(); - void OnCloseChannel(); void OnSendMessage(Message* message_ptr); void OnAddFilter(MessageFilter* filter); void OnRemoveFilter(MessageFilter* filter); @@ -158,13 +167,6 @@ class ChannelProxy : public Message::Sender { void OnDispatchConnected(int32 peer_pid); void OnDispatchError(); - MessageLoop* ipc_message_loop() const { return ipc_message_loop_; } - - // Called on the consumers thread when the ChannelProxy is closed. At that - // point the consumer is telling us that they don't want to receive any - // more messages, so we honor that wish by forgetting them! - void clear() { listener_ = NULL; } - MessageLoop* listener_message_loop_; Channel::Listener* listener_; diff --git a/chrome/common/ipc_sync_channel.cc b/chrome/common/ipc_sync_channel.cc index fec8965..01ac78e 100644 --- a/chrome/common/ipc_sync_channel.cc +++ b/chrome/common/ipc_sync_channel.cc @@ -37,17 +37,21 @@ class SyncChannel::ReceivedSyncMsgQueue; class SyncChannel::ReceivedSyncMsgQueue : public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { public: - ReceivedSyncMsgQueue() : - blocking_event_(CreateEvent(NULL, FALSE, FALSE, NULL)), - task_pending_(false), - listener_message_loop_(MessageLoop::current()) { + // Returns the ReceivedSyncMsgQueue instance for this thread, creating one + // if necessary. Call RemoveListener on the same thread when done. + static ReceivedSyncMsgQueue* AddListener() { + // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple + // SyncChannel objects can block the same thread). + ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); + if (!rv) { + rv = new ReceivedSyncMsgQueue(); + ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); + } + rv->listener_count_++; + return rv; } ~ReceivedSyncMsgQueue() { - DCHECK(lazy_tls_ptr_.Pointer()->Get()); - DCHECK(MessageLoop::current() == listener_message_loop_); - CloseHandle(blocking_event_); - lazy_tls_ptr_.Pointer()->Set(NULL); } // Called on IPC thread when a synchronous message or reply arrives. @@ -66,7 +70,7 @@ class SyncChannel::ReceivedSyncMsgQueue : channel_id)); } - SetEvent(blocking_event_); + SetEvent(dispatch_event_); if (!was_task_pending) { listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); @@ -105,7 +109,7 @@ class SyncChannel::ReceivedSyncMsgQueue : } #ifdef IPC_MESSAGE_LOG_ENABLED - IPC::Logging* logger = IPC::Logging::current(); + Logging* logger = Logging::current(); if (logger->Enabled()) logger->OnPreDispatchMessage(*message); #endif @@ -123,7 +127,7 @@ class SyncChannel::ReceivedSyncMsgQueue : } // Called on the IPC thread when the current sync Send() call is unblocked. - void OnUnblock() { + void DidUnblock() { if (!received_replies_.empty()) { MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod( this, &ReceivedSyncMsgQueue::DispatchReplies)); @@ -149,9 +153,14 @@ class SyncChannel::ReceivedSyncMsgQueue : message_queue_.push(temp_queue.front()); temp_queue.pop(); } + + if (--listener_count_ == 0) { + DCHECK(lazy_tls_ptr_.Pointer()->Get()); + lazy_tls_ptr_.Pointer()->Set(NULL); + } } - HANDLE blocking_event() { return blocking_event_; } + HANDLE dispatch_event() { return dispatch_event_; } MessageLoop* listener_message_loop() { return listener_message_loop_; } // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. @@ -159,12 +168,19 @@ class SyncChannel::ReceivedSyncMsgQueue : lazy_tls_ptr_; private: + ReceivedSyncMsgQueue() : + dispatch_event_(CreateEvent(NULL, TRUE, FALSE, NULL)), + task_pending_(false), + listener_message_loop_(MessageLoop::current()), + listener_count_(0) { + } + // Called on the ipc thread to check if we can unblock any current Send() // calls based on a queued reply. void DispatchReplies() { for (size_t i = 0; i < received_replies_.size(); ++i) { Message* message = received_replies_[i].message; - if (received_replies_[i].context->UnblockListener(message)) { + if (received_replies_[i].context->TryToUnblockListener(message)) { delete message; received_replies_.erase(received_replies_.begin() + i); return; @@ -172,13 +188,6 @@ class SyncChannel::ReceivedSyncMsgQueue : } } - // Set when we got a synchronous message that we must respond to as the - // sender needs its reply before it can reply to our original synchronous - // message. - HANDLE blocking_event_; - - MessageLoop* listener_message_loop_; - // Holds information about a queued synchronous message. struct ReceivedMessage { ReceivedMessage(Message* m, Channel::Listener* l, const std::wstring& i) @@ -190,8 +199,6 @@ class SyncChannel::ReceivedSyncMsgQueue : typedef std::queue<ReceivedMessage> SyncMessageQueue; SyncMessageQueue message_queue_; - Lock message_lock_; - bool task_pending_; // Holds information about a queued reply message. struct Reply { @@ -204,6 +211,15 @@ class SyncChannel::ReceivedSyncMsgQueue : }; std::vector<Reply> received_replies_; + + // Set when we got a synchronous message that we must respond to as the + // sender needs its reply before it can reply to our original synchronous + // message. + ScopedHandle dispatch_event_; + MessageLoop* listener_message_loop_; + Lock message_lock_; + bool task_pending_; + int listener_count_; }; base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > @@ -212,120 +228,89 @@ base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > SyncChannel::SyncContext::SyncContext( Channel::Listener* listener, MessageFilter* filter, - MessageLoop* ipc_thread) + MessageLoop* ipc_thread, + HANDLE shutdown_event) : ChannelProxy::Context(listener, filter, ipc_thread), - channel_closed_(false), - reply_deserialize_result_(false) { - // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple - // SyncChannel objects that can block the same thread). - received_sync_msgs_ = ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Get(); - - if (!received_sync_msgs_) { - // Stash a pointer to the listener thread's ReceivedSyncMsgQueue, as we - // need to be able to access it in the IPC thread. - received_sync_msgs_ = new ReceivedSyncMsgQueue(); - ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(received_sync_msgs_); - } - - // Addref manually so that we can ensure destruction on the listener thread - // (so that the TLS object is NULLd). - received_sync_msgs_->AddRef(); + shutdown_event_(shutdown_event), + received_sync_msgs_(ReceivedSyncMsgQueue::AddListener()){ } SyncChannel::SyncContext::~SyncContext() { while (!deserializers_.empty()) - PopDeserializer(true); - - received_sync_msgs_->listener_message_loop()->ReleaseSoon( - FROM_HERE, received_sync_msgs_); + Pop(); } // Adds information about an outgoing sync message to the context so that // we know how to deserialize the reply. Returns a handle that's set when // the reply has arrived. -HANDLE SyncChannel::SyncContext::Push(IPC::SyncMessage* sync_msg) { - PendingSyncMsg pending(IPC::SyncMessage::GetMessageId(*sync_msg), +void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { + PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), CreateEvent(NULL, FALSE, FALSE, NULL)); AutoLock auto_lock(deserializers_lock_); - deserializers_.push(pending); + deserializers_.push_back(pending); +} + +bool SyncChannel::SyncContext::Pop() { + AutoLock auto_lock(deserializers_lock_); + PendingSyncMsg msg = deserializers_.back(); + delete msg.deserializer; + CloseHandle(msg.done_event); + deserializers_.pop_back(); + return msg.send_result; +} - return pending.reply_event; +HANDLE SyncChannel::SyncContext::GetSendDoneEvent() { + AutoLock auto_lock(deserializers_lock_); + return deserializers_.back().done_event; } -HANDLE SyncChannel::SyncContext::blocking_event() { - return received_sync_msgs_->blocking_event(); +HANDLE SyncChannel::SyncContext::GetDispatchEvent() { + return received_sync_msgs_->dispatch_event(); } void SyncChannel::SyncContext::DispatchMessages() { received_sync_msgs_->DispatchMessages(); } -void SyncChannel::SyncContext::RemoveListener(Channel::Listener* listener) { - received_sync_msgs_->RemoveListener(listener); -} - -bool SyncChannel::SyncContext::UnblockListener(const Message* msg) { - bool rv = false; - HANDLE reply_event = NULL; +bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { { - if (channel_closed_) { - // The channel is closed, or we couldn't connect, so cancel all Send() - // calls. - reply_deserialize_result_ = false; - { - AutoLock auto_lock(deserializers_lock_); - if (!deserializers_.empty()) - reply_event = deserializers_.top().reply_event; - } + AutoLock auto_lock(deserializers_lock_); + if (deserializers_.empty() || + !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { + return false; + } - if (reply_event) - PopDeserializer(false); - } else { - { - AutoLock auto_lock(deserializers_lock_); - if (deserializers_.empty()) - return false; - - if (!IPC::SyncMessage::IsMessageReplyTo(*msg, deserializers_.top().id)) - return false; - - rv = true; - if (msg->is_reply_error()) { - reply_deserialize_result_ = false; - } else { - reply_deserialize_result_ = deserializers_.top().deserializer-> - SerializeOutputParameters(*msg); - } - - // Can't CloseHandle the event just yet, since doing so might cause the - // Wait call above to never return. - reply_event = deserializers_.top().reply_event; - } - PopDeserializer(false); + if (!msg->is_reply_error()) { + deserializers_.back().send_result = deserializers_.back().deserializer-> + SerializeOutputParameters(*msg); } + SetEvent(deserializers_.back().done_event); } - if (reply_event) - SetEvent(reply_event); - // We got a reply to a synchronous Send() call that's blocking the listener // thread. However, further down the call stack there could be another // blocking Send() call, whose reply we received after we made this last // Send() call. So check if we have any queued replies available that // can now unblock the listener thread. - received_sync_msgs_->OnUnblock(); + received_sync_msgs_->DidUnblock(); + + return true; +} + +void SyncChannel::SyncContext::Clear() { + CancelPendingSends(); + received_sync_msgs_->RemoveListener(listener()); - return rv; + Context::Clear(); } -// Called on the IPC thread. void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) { // Give the filters a chance at processing this message. if (TryFilters(msg)) return; - if (UnblockListener(&msg)) + if (TryToUnblockListener(&msg)) return; if (msg.should_unblock()) { @@ -338,149 +323,158 @@ void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) { return; } - return Context::OnMessageReceived(msg); + return Context::OnMessageReceivedNoFilter(msg); } -// Called on the IPC thread. void SyncChannel::SyncContext::OnChannelError() { - channel_closed_ = true; - UnblockListener(NULL); - + CancelPendingSends(); Context::OnChannelError(); } -void SyncChannel::SyncContext::PopDeserializer(bool close_reply_event) { - PendingSyncMsg msg = deserializers_.top(); - delete msg.deserializer; - if (close_reply_event) - CloseHandle(msg.reply_event); - deserializers_.pop(); +void SyncChannel::SyncContext::OnChannelOpened() { + shutdown_watcher_.StartWatching(shutdown_event_, this); + Context::OnChannelOpened(); } -SyncChannel::SyncChannel(const std::wstring& channel_id, Channel::Mode mode, - Channel::Listener* listener, MessageFilter* filter, - MessageLoop* ipc_message_loop, - bool create_pipe_now, HANDLE shutdown_event) - : ChannelProxy(channel_id, mode, ipc_message_loop, - new SyncContext(listener, filter, ipc_message_loop), - create_pipe_now), - shutdown_event_(shutdown_event), +void SyncChannel::SyncContext::OnChannelClosed() { + shutdown_watcher_.StopWatching(); + Context::OnChannelClosed(); +} + +void SyncChannel::SyncContext::OnSendTimeout(int message_id) { + AutoLock auto_lock(deserializers_lock_); + PendingSyncMessageQueue::iterator iter; + for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { + if ((*iter).id == message_id) { + SetEvent((*iter).done_event); + break; + } + } +} + +void SyncChannel::SyncContext::CancelPendingSends() { + AutoLock auto_lock(deserializers_lock_); + PendingSyncMessageQueue::iterator iter; + for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) + SetEvent((*iter).done_event); +} + +void SyncChannel::SyncContext::OnObjectSignaled(HANDLE object) { + DCHECK(object == shutdown_event_); + // Process shut down before we can get a reply to a synchronous message. + // Cancel pending Send calls, which will end up setting the send done event. + CancelPendingSends(); +} + + +SyncChannel::SyncChannel( + const std::wstring& channel_id, Channel::Mode mode, + Channel::Listener* listener, MessageFilter* filter, + MessageLoop* ipc_message_loop, bool create_pipe_now, HANDLE shutdown_event) + : ChannelProxy( + channel_id, mode, ipc_message_loop, + new SyncContext(listener, filter, ipc_message_loop, shutdown_event), + create_pipe_now), sync_messages_with_no_timeout_allowed_(true) { - DCHECK(shutdown_event_ != NULL); + // Ideally we only want to watch this object when running a nested message + // loop. However, we don't know when it exits if there's another nested + // message loop running under it or not, so we wouldn't know whether to + // stop or keep watching. So we always watch it, and create the event as + // manual reset since the object watcher might otherwise reset the event + // when we're doing a WaitForMultipleObjects. + dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); } SyncChannel::~SyncChannel() { - // The listener ensures that its lifetime is greater than SyncChannel. But - // after SyncChannel is destructed there's no guarantee that the listener is - // still around, so we wouldn't want ReceivedSyncMsgQueue to call the - // listener. - sync_context()->RemoveListener(listener()); } -bool SyncChannel::Send(IPC::Message* message) { +bool SyncChannel::Send(Message* message) { return SendWithTimeout(message, INFINITE); } -bool SyncChannel::SendWithTimeout(IPC::Message* message, int timeout_ms) { - bool message_is_sync = message->is_sync(); - HANDLE pump_messages_event = NULL; +bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { + if (!message->is_sync()) { + ChannelProxy::Send(message); + return true; + } - HANDLE reply_event = NULL; - if (message_is_sync) { - DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE); - IPC::SyncMessage* sync_msg = static_cast<IPC::SyncMessage*>(message); - reply_event = sync_context()->Push(sync_msg); - pump_messages_event = sync_msg->pump_messages_event(); + // *this* might get deleted in WaitForReply. + scoped_refptr<SyncContext> context(sync_context()); + if (WaitForSingleObject(context->shutdown_event(), 0) == WAIT_OBJECT_0) { + delete message; + return false; } - // Send the message using the ChannelProxy + DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE); + SyncMessage* sync_msg = static_cast<SyncMessage*>(message); + context->Push(sync_msg); + int message_id = SyncMessage::GetMessageId(*sync_msg); + HANDLE pump_messages_event = sync_msg->pump_messages_event(); + ChannelProxy::Send(message); - if (!message_is_sync) - return true; - do { - // Wait for reply, or for any other incoming synchronous message. - DCHECK(reply_event != NULL); - HANDLE objects[] = { shutdown_event_, - reply_event, - sync_context()->blocking_event(), - pump_messages_event}; - - DWORD result; - TimeTicks before = TimeTicks::Now(); - if (pump_messages_event == NULL) { - // No need to pump messages since we didn't get an event to check. - result = WaitForMultipleObjects(3, objects, FALSE, timeout_ms); - } else { - // If the event is set, then we pump messages. Otherwise we also wait on - // it so that if it gets set we start pumping messages. - if (WaitForSingleObject(pump_messages_event, 0) == WAIT_OBJECT_0) { - // Before calling MsgWaitForMultipleObjects() we check that our events - // are not signaled. The windows message queue might always have events - // starving the checking of our events otherwise. - result = WaitForMultipleObjects(3, objects, FALSE, 0); - if (result == WAIT_TIMEOUT) { - result = MsgWaitForMultipleObjects(3, objects, FALSE, timeout_ms, - QS_ALLINPUT); - } - } else { - result = WaitForMultipleObjects(4, objects, FALSE, timeout_ms); - } - } + if (timeout_ms != INFINITE) { + // We use the sync message id so that when a message times out, we don't + // confuse it with another send that is either above/below this Send in + // the call stack. + context->ipc_message_loop()->PostDelayedTask(FROM_HERE, + NewRunnableMethod(context.get(), + &SyncContext::OnSendTimeout, message_id), timeout_ms); + } - if (result == WAIT_OBJECT_0 || result == WAIT_TIMEOUT) { - // Process shut down before we can get a reply to a synchronous message, - // or timed-out. Unblock the thread. - sync_context()->PopDeserializer(true); - return false; - } + // Wait for reply, or for any other incoming synchronous messages. + WaitForReply(pump_messages_event); - if (result == WAIT_OBJECT_0 + 1) { - // We got the reply to our synchronous message. - CloseHandle(reply_event); - return sync_context()->reply_deserialize_result(); - } + return context->Pop(); +} - if (result == WAIT_OBJECT_0 + 2) { +void SyncChannel::WaitForReply(HANDLE pump_messages_event) { + while (true) { + HANDLE objects[] = { sync_context()->GetDispatchEvent(), + sync_context()->GetSendDoneEvent(), + pump_messages_event }; + uint32 count = pump_messages_event ? 3: 2; + DWORD result = WaitForMultipleObjects(count, objects, FALSE, INFINITE); + if (result == WAIT_OBJECT_0) { // We're waiting for a reply, but we received a blocking synchronous // call. We must process it or otherwise a deadlock might occur. + ResetEvent(sync_context()->GetDispatchEvent()); sync_context()->DispatchMessages(); - } else if (result == WAIT_OBJECT_0 + 3) { - // Run a nested messsage loop to pump all the thread's messages. We - // shutdown the nested loop when there are no more messages. - pump_messages_events_.push(pump_messages_event); - bool old_state = MessageLoop::current()->NestableTasksAllowed(); - MessageLoop::current()->SetNestableTasksAllowed(true); - // Process a message, but come right back out of the MessageLoop (don't - // loop, sleep, or wait for a kMsgQuit). - MessageLoop::current()->RunAllPending(); - MessageLoop::current()->SetNestableTasksAllowed(old_state); - pump_messages_events_.pop(); - } else { - DCHECK(result == WAIT_OBJECT_0 + 4); - // We were doing a WaitForMultipleObjects, but now the pump messages - // event is set, so the next time we loop we'll use - // MsgWaitForMultipleObjects instead. + continue; } - if (timeout_ms != INFINITE) { - TimeDelta time_delta = TimeTicks::Now() - before; - timeout_ms -= static_cast<int>(time_delta.InMilliseconds()); - if (timeout_ms <= 0) { - // We timed-out while processing messages. - sync_context()->PopDeserializer(true); - return false; - } - } + if (result == WAIT_OBJECT_0 + 2) + WaitForReplyWithNestedMessageLoop(); // Start a nested message loop. - // Continue looping until we either get the reply to our synchronous message - // or we time-out. - } while (true); + break; + } } -bool SyncChannel::UnblockListener(Message* message) { - return sync_context()->UnblockListener(message); +void SyncChannel::WaitForReplyWithNestedMessageLoop() { + HANDLE old_done_event = send_done_watcher_.GetWatchedObject(); + send_done_watcher_.StopWatching(); + send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this); + bool old_state = MessageLoop::current()->NestableTasksAllowed(); + MessageLoop::current()->SetNestableTasksAllowed(true); + MessageLoop::current()->Run(); + MessageLoop::current()->SetNestableTasksAllowed(old_state); + if (old_done_event) + send_done_watcher_.StartWatching(old_done_event, this); } -} // namespace IPC +void SyncChannel::OnObjectSignaled(HANDLE object) { + HANDLE dispatch_event = sync_context()->GetDispatchEvent(); + if (object == dispatch_event) { + // The call to DispatchMessages might delete this object, so reregister + // the object watcher first. + ResetEvent(dispatch_event); + dispatch_watcher_.StartWatching(dispatch_event, this); + sync_context()->DispatchMessages(); + } else { + // We got the reply, timed out or the process shutdown. + DCHECK(object == sync_context()->GetSendDoneEvent()); + MessageLoop::current()->Quit(); + } +} +} // namespace IPC diff --git a/chrome/common/ipc_sync_channel.h b/chrome/common/ipc_sync_channel.h index 54c9fe9..6c03745 100644 --- a/chrome/common/ipc_sync_channel.h +++ b/chrome/common/ipc_sync_channel.h @@ -7,11 +7,12 @@ #include <windows.h> #include <string> -#include <stack> -#include <queue> +#include <deque> #include "base/basictypes.h" #include "base/lock.h" +#include "base/object_watcher.h" #include "base/ref_counted.h" +#include "base/scoped_handle.h" #include "chrome/common/ipc_channel_proxy.h" namespace IPC { @@ -24,17 +25,17 @@ class SyncMessage; // is more than this object. If the message loop goes away while this object // is running and it's used to send a message, then it will use the invalid // message loop pointer to proxy it to the ipc thread. -class SyncChannel : public ChannelProxy { +class SyncChannel : public ChannelProxy, + public base::ObjectWatcher::Delegate { public: SyncChannel(const std::wstring& channel_id, Channel::Mode mode, Channel::Listener* listener, MessageFilter* filter, MessageLoop* ipc_message_loop, bool create_pipe_now, - HANDLE shutdown_handle); + HANDLE shutdown_event); ~SyncChannel(); virtual bool Send(Message* message); virtual bool SendWithTimeout(Message* message, int timeout_ms); - bool UnblockListener(Message* message); // Whether we allow sending messages with no time-out. void set_sync_messages_with_no_timeout_allowed(bool value) { @@ -48,74 +49,104 @@ class SyncChannel : public ChannelProxy { // SyncContext holds the per object data for SyncChannel, so that SyncChannel // can be deleted while it's being used in a different thread. See // ChannelProxy::Context for more information. - class SyncContext : public Context { + class SyncContext : public Context, + public base::ObjectWatcher::Delegate { public: SyncContext(Channel::Listener* listener, MessageFilter* filter, - MessageLoop* ipc_thread); + MessageLoop* ipc_thread, + HANDLE shutdown_event); ~SyncContext(); // Adds information about an outgoing sync message to the context so that - // we know how to deserialize the reply. Returns a handle that's set when - // the reply has arrived. - HANDLE Push(IPC::SyncMessage* sync_msg); + // we know how to deserialize the reply. + void Push(IPC::SyncMessage* sync_msg); - // Returns true if the reply message was deserialized without any errors, - // or false otherwise. - bool reply_deserialize_result() { return reply_deserialize_result_; } + // Cleanly remove the top deserializer (and throw it away). Returns the + // result of the Send call for that message. + bool Pop(); + + // Returns an event that's set when the send is complete, timed out or the + // process shut down. + HANDLE GetSendDoneEvent(); // Returns an event that's set when an incoming message that's not the reply // needs to get dispatched (by calling SyncContext::DispatchMessages). - HANDLE blocking_event(); + HANDLE GetDispatchEvent(); void DispatchMessages(); - void RemoveListener(Channel::Listener* listener); // Checks if the given message is blocking the listener thread because of a // synchronous send. If it is, the thread is unblocked and true is returned. // Otherwise the function returns false. - bool UnblockListener(const Message* msg); + bool TryToUnblockListener(const Message* msg); + + // Called on the IPC thread when a sync send that runs a nested message loop + // times out. + void OnSendTimeout(int message_id); - // Cleanly remove the top deserializer (and throw it away). - void PopDeserializer(bool close_reply_event); + HANDLE shutdown_event() { return shutdown_event_; } private: - void OnMessageReceived(const Message& msg); - void OnChannelError(); + // IPC::ChannelProxy methods that we override. + + // Called on the listener thread. + virtual void Clear(); + + // Called on the IPC thread. + virtual void OnMessageReceived(const Message& msg); + virtual void OnChannelError(); + virtual void OnChannelOpened(); + virtual void OnChannelClosed(); + + // Cancels all pending Send calls. + void CancelPendingSends(); + + // ObjectWatcher::Delegate implementation. + virtual void OnObjectSignaled(HANDLE object); // When sending a synchronous message, this structure contains an object that // knows how to deserialize the response. struct PendingSyncMsg { PendingSyncMsg(int id, IPC::MessageReplyDeserializer* d, HANDLE e) : - id(id), deserializer(d), reply_event(e) { } + id(id), deserializer(d), done_event(e), send_result(false) { } int id; IPC::MessageReplyDeserializer* deserializer; - HANDLE reply_event; + HANDLE done_event; + bool send_result; }; - typedef std::stack<PendingSyncMsg> PendingSyncMessageQueue; + typedef std::deque<PendingSyncMsg> PendingSyncMessageQueue; PendingSyncMessageQueue deserializers_; Lock deserializers_lock_; - // This can't be a scoped_refptr because it needs to be released on the - // listener thread. - ReceivedSyncMsgQueue* received_sync_msgs_; + scoped_refptr<ReceivedSyncMsgQueue> received_sync_msgs_; - bool channel_closed_; - bool reply_deserialize_result_; + HANDLE shutdown_event_; + base::ObjectWatcher shutdown_watcher_; }; private: + // ObjectWatcher::Delegate implementation. + virtual void OnObjectSignaled(HANDLE object); + SyncContext* sync_context() { return reinterpret_cast<SyncContext*>(context()); } - // Copy of shutdown event that we get in constructor. - HANDLE shutdown_event_; + // Both these functions wait for a reply, timeout or process shutdown. The + // latter one also runs a nested message loop in the meantime. + void WaitForReply(HANDLE pump_messages_event); - std::stack<HANDLE> pump_messages_events_; + // Runs a nested message loop until a reply arrives, times out, or the process + // shuts down. + void WaitForReplyWithNestedMessageLoop(); bool sync_messages_with_no_timeout_allowed_; + // Used to signal events between the IPC and listener threads. + base::ObjectWatcher send_done_watcher_; + base::ObjectWatcher dispatch_watcher_; + DISALLOW_EVIL_CONSTRUCTORS(SyncChannel); }; diff --git a/chrome/common/ipc_sync_channel_unittest.cc b/chrome/common/ipc_sync_channel_unittest.cc index 11ba159..decd6b6 100644 --- a/chrome/common/ipc_sync_channel_unittest.cc +++ b/chrome/common/ipc_sync_channel_unittest.cc @@ -83,17 +83,14 @@ class Worker : public Channel::Listener, public Message::Sender { // The IPC thread needs to outlive SyncChannel, so force the correct order of // destruction. virtual ~Worker() { - CloseChannel(); - // We must stop the threads and release the channel here. The IPC thread - // must die before the listener thread, otherwise if its in the process of - // sending a message, it will get an error, it will use channel_, which - // references listener_. There are many ways of crashing, depending on - // timing. - // This is a race condition so you may not see it all the time even if you - // reverse the Stop() calls. You may see this bug with AppVerifier only. + Event listener_done, ipc_done; + ListenerThread()->message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &Worker::OnListenerThreadShutdown, listener_done.handle(), + ipc_done.handle())); + HANDLE handles[] = { listener_done.handle(), ipc_done.handle() }; + WaitForMultipleObjects(2, handles, TRUE, INFINITE); ipc_thread_.Stop(); listener_thread_.Stop(); - channel_.reset(); } void AddRef() { } void Release() { } @@ -102,12 +99,13 @@ class Worker : public Channel::Listener, public Message::Sender { return channel_->SendWithTimeout(msg, timeout_ms); } void WaitForChannelCreation() { channel_created_.Wait(); } - void CloseChannel() { channel_.reset(); } + void CloseChannel() { + DCHECK(MessageLoop::current() == ListenerThread()->message_loop()); + channel_->Close(); + } void Start() { StartThread(&listener_thread_); - base::Thread* thread = - overrided_thread_ ? overrided_thread_ : &listener_thread_; - thread->message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + ListenerThread()->message_loop()->PostTask(FROM_HERE, NewRunnableMethod( this, &Worker::OnStart)); } void OverrideThread(base::Thread* overrided_thread) { @@ -124,7 +122,6 @@ class Worker : public Channel::Listener, public Message::Sender { // Functions for dervied classes to implement if they wish. virtual void Run() { } - virtual void OnDouble(int in, int* out) { NOTREACHED(); } virtual void OnAnswer(int* answer) { NOTREACHED(); } virtual void OnAnswerDelay(Message* reply_msg) { // The message handler map below can only take one entry for @@ -137,8 +134,18 @@ class Worker : public Channel::Listener, public Message::Sender { SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, answer); Send(reply_msg); } + virtual void OnDouble(int in, int* out) { NOTREACHED(); } + virtual void OnDoubleDelay(int in, Message* reply_msg) { + int result; + OnDouble(in, &result); + SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, result); + Send(reply_msg); + } private: + base::Thread* ListenerThread() { + return overrided_thread_ ? overrided_thread_ : &listener_thread_; + } // Called on the listener thread to create the sync channel. void OnStart() { // Link ipc_thread_, listener_thread_ and channel_ altogether. @@ -150,9 +157,22 @@ class Worker : public Channel::Listener, public Message::Sender { Run(); } + void OnListenerThreadShutdown(HANDLE listener_event, HANDLE ipc_event) { + // SyncChannel needs to be destructed on the thread that it was created on. + channel_.reset(); + SetEvent(listener_event); + + ipc_thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &Worker::OnIPCThreadShutdown, ipc_event)); + } + + void OnIPCThreadShutdown(HANDLE ipc_event) { + SetEvent(ipc_event); + } + void OnMessageReceived(const Message& message) { IPC_BEGIN_MESSAGE_MAP(Worker, message) - IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Double, OnDouble) + IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_Double, OnDoubleDelay) IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_AnswerToLife, OnAnswerDelay) IPC_END_MESSAGE_MAP() @@ -211,21 +231,27 @@ void RunTest(std::vector<Worker*> workers) { } // namespace - //----------------------------------------------------------------------------- namespace { class SimpleServer : public Worker { public: - SimpleServer() : Worker(Channel::MODE_SERVER, "simpler_server") { } + SimpleServer(bool pump_during_send) + : Worker(Channel::MODE_SERVER, "simpler_server"), + pump_during_send_(pump_during_send) { } void Run() { int answer = 0; - bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer)); + IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer); + if (pump_during_send_) + msg->EnableMessagePumping(); + bool result = Send(msg); DCHECK(result); DCHECK(answer == 42); Done(); } + + bool pump_during_send_; }; class SimpleClient : public Worker { @@ -238,16 +264,20 @@ class SimpleClient : public Worker { } }; -} // namespace - -// Tests basic synchronous call -TEST_F(IPCSyncChannelTest, Simple) { +void Simple(bool pump_during_send) { std::vector<Worker*> workers; - workers.push_back(new SimpleServer()); + workers.push_back(new SimpleServer(pump_during_send)); workers.push_back(new SimpleClient()); RunTest(workers); } +} // namespace + +// Tests basic synchronous call +TEST_F(IPCSyncChannelTest, Simple) { + Simple(false); + Simple(true); +} //----------------------------------------------------------------------------- @@ -264,16 +294,20 @@ class DelayClient : public Worker { } }; -} // namespace - -// Tests that asynchronous replies work -TEST_F(IPCSyncChannelTest, DelayReply) { +void DelayReply(bool pump_during_send) { std::vector<Worker*> workers; - workers.push_back(new SimpleServer()); + workers.push_back(new SimpleServer(pump_during_send)); workers.push_back(new DelayClient()); RunTest(workers); } +} // namespace + +// Tests that asynchronous replies work +TEST_F(IPCSyncChannelTest, DelayReply) { + DelayReply(false); + DelayReply(true); +} //----------------------------------------------------------------------------- @@ -281,22 +315,30 @@ namespace { class NoHangServer : public Worker { public: - explicit NoHangServer(Event* got_first_reply) - : Worker(Channel::MODE_SERVER, "no_hang_server"), - got_first_reply_(got_first_reply) { } + explicit NoHangServer(Event* got_first_reply, bool pump_during_send) + : Worker(Channel::MODE_SERVER, "no_hang_server"), + got_first_reply_(got_first_reply), + pump_during_send_(pump_during_send) { } void Run() { int answer = 0; - bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer)); + IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer); + if (pump_during_send_) + msg->EnableMessagePumping(); + bool result = Send(msg); DCHECK(result); DCHECK(answer == 42); got_first_reply_->Set(); - result = Send(new SyncChannelTestMsg_AnswerToLife(&answer)); + msg = new SyncChannelTestMsg_AnswerToLife(&answer); + if (pump_during_send_) + msg->EnableMessagePumping(); + result = Send(msg); DCHECK(!result); Done(); } Event* got_first_reply_; + bool pump_during_send_; }; class NoHangClient : public Worker { @@ -318,29 +360,37 @@ class NoHangClient : public Worker { Event* got_first_reply_; }; -} // namespace - -// Tests that caller doesn't hang if receiver dies -TEST_F(IPCSyncChannelTest, NoHang) { +void NoHang(bool pump_during_send) { Event got_first_reply; - std::vector<Worker*> workers; - workers.push_back(new NoHangServer(&got_first_reply)); + workers.push_back(new NoHangServer(&got_first_reply, pump_during_send)); workers.push_back(new NoHangClient(&got_first_reply)); RunTest(workers); } +} // namespace + +// Tests that caller doesn't hang if receiver dies +TEST_F(IPCSyncChannelTest, NoHang) { + NoHang(false); + NoHang(true); +} //----------------------------------------------------------------------------- namespace { -class RecursiveServer : public Worker { +class UnblockServer : public Worker { public: - RecursiveServer() : Worker(Channel::MODE_SERVER, "recursive_server") { } + UnblockServer(bool pump_during_send) + : Worker(Channel::MODE_SERVER, "unblock_server"), + pump_during_send_(pump_during_send) { } void Run() { int answer = 0; - bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer)); + IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer); + if (pump_during_send_) + msg->EnableMessagePumping(); + bool result = Send(msg); DCHECK(result); DCHECK(answer == 42); Done(); @@ -349,29 +399,160 @@ class RecursiveServer : public Worker { void OnDouble(int in, int* out) { *out = in * 2; } + + bool pump_during_send_; }; -class RecursiveClient : public Worker { +class UnblockClient : public Worker { public: - RecursiveClient() : Worker(Channel::MODE_CLIENT, "recursive_client") { } + UnblockClient(bool pump_during_send) + : Worker(Channel::MODE_CLIENT, "unblock_client"), + pump_during_send_(pump_during_send) { } void OnAnswer(int* answer) { - BOOL result = Send(new SyncChannelTestMsg_Double(21, answer)); + IPC::SyncMessage* msg = new SyncChannelTestMsg_Double(21, answer); + if (pump_during_send_) + msg->EnableMessagePumping(); + BOOL result = Send(msg); DCHECK(result); Done(); } + + bool pump_during_send_; }; +void Unblock(bool server_pump, bool client_pump) { + std::vector<Worker*> workers; + workers.push_back(new UnblockServer(server_pump)); + workers.push_back(new UnblockClient(client_pump)); + RunTest(workers); +} + } // namespace // Tests that the caller unblocks to answer a sync message from the receiver. +TEST_F(IPCSyncChannelTest, Unblock) { + Unblock(false, false); + Unblock(false, true); + Unblock(true, false); + Unblock(true, true); +} + +//----------------------------------------------------------------------------- + +namespace { + +class RecursiveServer : public Worker { + public: + explicit RecursiveServer( + bool expected_send_result, bool pump_first, bool pump_second) + : Worker(Channel::MODE_SERVER, "recursive_server"), + expected_send_result_(expected_send_result), + pump_first_(pump_first), pump_second_(pump_second) { } + void Run() { + int answer; + IPC::SyncMessage* msg = new SyncChannelTestMsg_Double(21, &answer); + if (pump_first_) + msg->EnableMessagePumping(); + bool result = Send(msg); + DCHECK(result == expected_send_result_); + Done(); + } + + void OnDouble(int in, int* out) { + int answer; + IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer); + if (pump_second_) + msg->EnableMessagePumping(); + bool result = Send(msg); + DCHECK(result == expected_send_result_); + } + + bool expected_send_result_, pump_first_, pump_second_; +}; + +class RecursiveClient : public Worker { + public: + explicit RecursiveClient(bool pump_during_send, bool close_channel) + : Worker(Channel::MODE_CLIENT, "recursive_client"), + pump_during_send_(pump_during_send), close_channel_(close_channel) { } + + void OnDoubleDelay(int in, Message* reply_msg) { + int answer = 0; + IPC::SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer); + if (pump_during_send_) + msg->EnableMessagePumping(); + bool result = Send(msg); + DCHECK(result != close_channel_); + if (!close_channel_) { + SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2); + Send(reply_msg); + } + Done(); + } + + void OnAnswerDelay(Message* reply_msg) { + if (close_channel_) { + CloseChannel(); + } else { + SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42); + Send(reply_msg); + } + } + + bool pump_during_send_, close_channel_; +}; + +void Recursive( + bool server_pump_first, bool server_pump_second, bool client_pump) { + std::vector<Worker*> workers; + workers.push_back( + new RecursiveServer(true, server_pump_first, server_pump_second)); + workers.push_back(new RecursiveClient(client_pump, false)); + RunTest(workers); +} + +} // namespace + +// Tests a server calling Send while another Send is pending. TEST_F(IPCSyncChannelTest, Recursive) { + Recursive(false, false, false); + Recursive(false, false, true); + Recursive(false, true, false); + Recursive(false, true, true); + Recursive(true, false, false); + Recursive(true, false, true); + Recursive(true, true, false); + Recursive(true, true, true); +} + +//----------------------------------------------------------------------------- + +namespace { + +void RecursiveNoHang( + bool server_pump_first, bool server_pump_second, bool client_pump) { std::vector<Worker*> workers; - workers.push_back(new RecursiveServer()); - workers.push_back(new RecursiveClient()); + workers.push_back( + new RecursiveServer(false, server_pump_first, server_pump_second)); + workers.push_back(new RecursiveClient(client_pump, true)); RunTest(workers); } +} // namespace + +// Tests that if a caller makes a sync call during an existing sync call and +// the receiver dies, neither of the Send() calls hang. +TEST_F(IPCSyncChannelTest, RecursiveNoHang) { + RecursiveNoHang(false, false, false); + RecursiveNoHang(false, false, true); + RecursiveNoHang(false, true, false); + RecursiveNoHang(false, true, true); + RecursiveNoHang(true, false, false); + RecursiveNoHang(true, false, true); + RecursiveNoHang(true, true, false); + RecursiveNoHang(true, true, true); +} //----------------------------------------------------------------------------- @@ -379,14 +560,22 @@ namespace { class MultipleServer1 : public Worker { public: - MultipleServer1() : Worker(L"test_channel1", Channel::MODE_SERVER) { } + MultipleServer1(bool pump_during_send) + : Worker(L"test_channel1", Channel::MODE_SERVER), + pump_during_send_(pump_during_send) { } + void Run() { int answer = 0; - bool result = Send(new SyncChannelTestMsg_Double(5, &answer)); + IPC::SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer); + if (pump_during_send_) + msg->EnableMessagePumping(); + bool result = Send(msg); DCHECK(result); DCHECK(answer == 10); Done(); } + + bool pump_during_send_; }; class MultipleClient1 : public Worker { @@ -419,15 +608,21 @@ class MultipleServer2 : public Worker { class MultipleClient2 : public Worker { public: - MultipleClient2(Event* client1_msg_received, Event* client1_can_reply) : - Worker(L"test_channel2", Channel::MODE_CLIENT), + MultipleClient2( + Event* client1_msg_received, Event* client1_can_reply, + bool pump_during_send) + : Worker(L"test_channel2", Channel::MODE_CLIENT), client1_msg_received_(client1_msg_received), - client1_can_reply_(client1_can_reply) { } + client1_can_reply_(client1_can_reply), + pump_during_send_(pump_during_send) { } void Run() { int answer = 0; client1_msg_received_->Wait(); - bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer)); + IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer); + if (pump_during_send_) + msg->EnableMessagePumping(); + bool result = Send(msg); DCHECK(result); DCHECK(answer == 42); client1_can_reply_->Set(); @@ -436,13 +631,10 @@ class MultipleClient2 : public Worker { private: Event *client1_msg_received_, *client1_can_reply_; + bool pump_during_send_; }; -} // namespace - -// Tests that multiple SyncObjects on the same listener thread can unblock each -// other. -TEST_F(IPCSyncChannelTest, Multiple) { +void Multiple(bool server_pump, bool client_pump) { std::vector<Worker*> workers; // A shared worker thread so that server1 and server2 run on one thread. @@ -461,10 +653,10 @@ TEST_F(IPCSyncChannelTest, Multiple) { workers.push_back(worker); worker = new MultipleClient2( - &client1_msg_received, &client1_can_reply); + &client1_msg_received, &client1_can_reply, client_pump); workers.push_back(worker); - worker = new MultipleServer1(); + worker = new MultipleServer1(server_pump); worker->OverrideThread(&worker_thread); workers.push_back(worker); @@ -475,6 +667,16 @@ TEST_F(IPCSyncChannelTest, Multiple) { RunTest(workers); } +} // namespace + +// Tests that multiple SyncObjects on the same listener thread can unblock each +// other. +TEST_F(IPCSyncChannelTest, Multiple) { + Multiple(false, false); + Multiple(false, true); + Multiple(true, false); + Multiple(true, true); +} //----------------------------------------------------------------------------- @@ -482,14 +684,21 @@ namespace { class QueuedReplyServer1 : public Worker { public: - QueuedReplyServer1() : Worker(L"test_channel1", Channel::MODE_SERVER) { } + QueuedReplyServer1(bool pump_during_send) + : Worker(L"test_channel1", Channel::MODE_SERVER), + pump_during_send_(pump_during_send) { } void Run() { int answer = 0; - bool result = Send(new SyncChannelTestMsg_Double(5, &answer)); + IPC::SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer); + if (pump_during_send_) + msg->EnableMessagePumping(); + bool result = Send(msg); DCHECK(result); DCHECK(answer == 10); Done(); } + + bool pump_during_send_; }; class QueuedReplyClient1 : public Worker { @@ -531,30 +740,29 @@ class QueuedReplyServer2 : public Worker { class QueuedReplyClient2 : public Worker { public: - explicit QueuedReplyClient2(Event* client1_msg_received) : - Worker(L"test_channel2", Channel::MODE_CLIENT), - client1_msg_received_(client1_msg_received) { } + explicit QueuedReplyClient2( + Event* client1_msg_received, bool pump_during_send) + : Worker(L"test_channel2", Channel::MODE_CLIENT), + client1_msg_received_(client1_msg_received), + pump_during_send_(pump_during_send){ } void Run() { int answer = 0; client1_msg_received_->Wait(); - bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer)); + IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer); + if (pump_during_send_) + msg->EnableMessagePumping(); + bool result = Send(msg); DCHECK(result); DCHECK(answer == 42); Done(); } - private: Event *client1_msg_received_; + bool pump_during_send_; }; -} // namespace - -// While a blocking send is in progress, the listener thread might answer other -// synchronous messages. This tests that if during the response to another -// message the reply to the original messages comes, it is queued up correctly -// and the original Send is unblocked later. -TEST_F(IPCSyncChannelTest, QueuedReply) { +void QueuedReply(bool server_pump, bool client_pump) { std::vector<Worker*> workers; // A shared worker thread so that server1 and server2 run on one thread. @@ -569,10 +777,10 @@ TEST_F(IPCSyncChannelTest, QueuedReply) { worker->OverrideThread(&worker_thread); workers.push_back(worker); - worker = new QueuedReplyClient2(&client1_msg_received); + worker = new QueuedReplyClient2(&client1_msg_received, client_pump); workers.push_back(worker); - worker = new QueuedReplyServer1(); + worker = new QueuedReplyServer1(server_pump); worker->OverrideThread(&worker_thread); workers.push_back(worker); @@ -583,6 +791,18 @@ TEST_F(IPCSyncChannelTest, QueuedReply) { RunTest(workers); } +} // namespace + +// While a blocking send is in progress, the listener thread might answer other +// synchronous messages. This tests that if during the response to another +// message the reply to the original messages comes, it is queued up correctly +// and the original Send is unblocked later. +TEST_F(IPCSyncChannelTest, QueuedReply) { + QueuedReply(false, false); + QueuedReply(false, true); + QueuedReply(true, false); + QueuedReply(true, true); +} //----------------------------------------------------------------------------- @@ -590,14 +810,18 @@ namespace { class BadServer : public Worker { public: - BadServer() : Worker(Channel::MODE_SERVER, "simpler_server") { } + BadServer(bool pump_during_send) + : Worker(Channel::MODE_SERVER, "simpler_server"), + pump_during_send_(pump_during_send) { } void Run() { int answer = 0; - Message* msg = new SyncMessage(MSG_ROUTING_CONTROL, - SyncChannelTestMsg_Double::ID, - Message::PRIORITY_NORMAL, - NULL); + IPC::SyncMessage* msg = new SyncMessage( + MSG_ROUTING_CONTROL, SyncChannelTestMsg_Double::ID, + Message::PRIORITY_NORMAL, NULL); + if (pump_during_send_) + msg->EnableMessagePumping(); + // Temporarily set the minimum logging very high so that the assertion // in ipc_message_utils doesn't fire. int log_level = logging::GetMinLogLevel(); @@ -613,27 +837,33 @@ class BadServer : public Worker { Done(); } -}; -} // namespace + bool pump_during_send_; +}; -// Tests that if a message is not serialized correctly, the Send() will fail. -TEST_F(IPCSyncChannelTest, BadMessage) { +void BadMessage(bool pump_during_send) { std::vector<Worker*> workers; - workers.push_back(new BadServer()); + workers.push_back(new BadServer(pump_during_send)); workers.push_back(new SimpleClient()); RunTest(workers); } +} // namespace + +// Tests that if a message is not serialized correctly, the Send() will fail. +TEST_F(IPCSyncChannelTest, BadMessage) { + BadMessage(false); + BadMessage(true); +} //----------------------------------------------------------------------------- namespace { -class ChattyRecursiveClient : public Worker { +class ChattyClient : public Worker { public: - ChattyRecursiveClient() : - Worker(Channel::MODE_CLIENT, "chatty_recursive_client") { } + ChattyClient() : + Worker(Channel::MODE_CLIENT, "chatty_client") { } void OnAnswer(int* answer) { // The PostMessage limit is 10k. Send 20% more than that. @@ -649,19 +879,23 @@ class ChattyRecursiveClient : public Worker { } }; +void ChattyServer(bool pump_during_send) { + std::vector<Worker*> workers; + workers.push_back(new UnblockServer(pump_during_send)); + workers.push_back(new ChattyClient()); + RunTest(workers); +} + } // namespace // Tests http://b/issue?id=1093251 - that sending lots of sync messages while // the receiver is waiting for a sync reply does not overflow the PostMessage // queue. TEST_F(IPCSyncChannelTest, ChattyServer) { - std::vector<Worker*> workers; - workers.push_back(new RecursiveServer()); - workers.push_back(new ChattyRecursiveClient()); - RunTest(workers); + ChattyServer(false); + ChattyServer(true); } - //------------------------------------------------------------------------------ namespace { @@ -669,19 +903,22 @@ namespace { class TimeoutServer : public Worker { public: TimeoutServer(int timeout_ms, - std::vector<bool> timeout_seq) + std::vector<bool> timeout_seq, + bool pump_during_send) : Worker(Channel::MODE_SERVER, "timeout_server"), timeout_ms_(timeout_ms), - timeout_seq_(timeout_seq) { + timeout_seq_(timeout_seq), + pump_during_send_(pump_during_send) { } void Run() { for (std::vector<bool>::const_iterator iter = timeout_seq_.begin(); iter != timeout_seq_.end(); ++iter) { int answer = 0; - bool result = - SendWithTimeout(new SyncChannelTestMsg_AnswerToLife(&answer), - timeout_ms_); + IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer); + if (pump_during_send_) + msg->EnableMessagePumping(); + bool result = SendWithTimeout(msg, timeout_ms_); if (*iter) { // Time-out expected. DCHECK(!result); @@ -697,6 +934,7 @@ class TimeoutServer : public Worker { private: int timeout_ms_; std::vector<bool> timeout_seq_; + bool pump_during_send_; }; class UnresponsiveClient : public Worker { @@ -725,35 +963,29 @@ class UnresponsiveClient : public Worker { std::vector<bool> timeout_seq_; }; -} // namespace - -// Tests that SendWithTimeout does not time-out if the response comes back fast -// enough. -TEST_F(IPCSyncChannelTest, SendWithTimeoutOK) { +void SendWithTimeoutOK(bool pump_during_send) { std::vector<Worker*> workers; std::vector<bool> timeout_seq; timeout_seq.push_back(false); timeout_seq.push_back(false); timeout_seq.push_back(false); - workers.push_back(new TimeoutServer(5000, timeout_seq)); + workers.push_back(new TimeoutServer(5000, timeout_seq, pump_during_send)); workers.push_back(new SimpleClient()); RunTest(workers); } -// Tests that SendWithTimeout does time-out. -TEST_F(IPCSyncChannelTest, SendWithTimeoutTimeout) { +void SendWithTimeoutTimeout(bool pump_during_send) { std::vector<Worker*> workers; std::vector<bool> timeout_seq; timeout_seq.push_back(true); timeout_seq.push_back(false); timeout_seq.push_back(false); - workers.push_back(new TimeoutServer(100, timeout_seq)); + workers.push_back(new TimeoutServer(100, timeout_seq, pump_during_send)); workers.push_back(new UnresponsiveClient(timeout_seq)); RunTest(workers); } -// Sends some message that time-out and some that succeed. -TEST_F(IPCSyncChannelTest, SendWithTimeoutMixedOKAndTimeout) { +void SendWithTimeoutMixedOKAndTimeout(bool pump_during_send) { std::vector<Worker*> workers; std::vector<bool> timeout_seq; timeout_seq.push_back(true); @@ -761,7 +993,28 @@ TEST_F(IPCSyncChannelTest, SendWithTimeoutMixedOKAndTimeout) { timeout_seq.push_back(false); timeout_seq.push_back(true); timeout_seq.push_back(false); - workers.push_back(new TimeoutServer(100, timeout_seq)); + workers.push_back(new TimeoutServer(100, timeout_seq, pump_during_send)); workers.push_back(new UnresponsiveClient(timeout_seq)); RunTest(workers); -}
\ No newline at end of file +} + +} // namespace + +// Tests that SendWithTimeout does not time-out if the response comes back fast +// enough. +TEST_F(IPCSyncChannelTest, SendWithTimeoutOK) { + SendWithTimeoutOK(false); + SendWithTimeoutOK(true); +} + +// Tests that SendWithTimeout does time-out. +TEST_F(IPCSyncChannelTest, SendWithTimeoutTimeout) { + SendWithTimeoutTimeout(false); + SendWithTimeoutTimeout(true); +} + +// Sends some message that time-out and some that succeed. +TEST_F(IPCSyncChannelTest, SendWithTimeoutMixedOKAndTimeout) { + SendWithTimeoutMixedOKAndTimeout(false); + SendWithTimeoutMixedOKAndTimeout(true); +} diff --git a/chrome/common/ipc_sync_message.h b/chrome/common/ipc_sync_message.h index d098a99..86cb62f 100644 --- a/chrome/common/ipc_sync_message.h +++ b/chrome/common/ipc_sync_message.h @@ -27,8 +27,9 @@ class SyncMessage : public Message { // If this message can cause the receiver to block while waiting for user // input (i.e. by calling MessageBox), then the caller needs to pump window // messages and dispatch asynchronous messages while waiting for the reply. - // If this handle is passed in, then window messages will be pumped while - // it's set. The handle must be valid until after the Send call returns. + // If this handle is passed in, then window messages will start being pumped + // when it's set. Note that this behavior will continue even if the event is + // later reset. The handle must be valid until after the Send call returns. void set_pump_messages_event(HANDLE event) { pump_messages_event_ = event; if (event) { |