summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--base/object_watcher.cc7
-rw-r--r--base/object_watcher.h4
-rw-r--r--base/object_watcher_unittest.cc3
-rw-r--r--chrome/common/ipc_channel_proxy.cc24
-rw-r--r--chrome/common/ipc_channel_proxy.h24
-rw-r--r--chrome/common/ipc_sync_channel.cc406
-rw-r--r--chrome/common/ipc_sync_channel.h93
-rw-r--r--chrome/common/ipc_sync_channel_unittest.cc477
-rw-r--r--chrome/common/ipc_sync_message.h5
9 files changed, 668 insertions, 375 deletions
diff --git a/base/object_watcher.cc b/base/object_watcher.cc
index 4e6e57a..cad281f 100644
--- a/base/object_watcher.cc
+++ b/base/object_watcher.cc
@@ -107,6 +107,13 @@ bool ObjectWatcher::StopWatching() {
return true;
}
+HANDLE ObjectWatcher::GetWatchedObject() {
+ if (!watch_)
+ return NULL;
+
+ return watch_->object;
+}
+
// static
void CALLBACK ObjectWatcher::DoneWaiting(void* param, BOOLEAN timed_out) {
DCHECK(!timed_out);
diff --git a/base/object_watcher.h b/base/object_watcher.h
index 3a0b9a9..4660ce7 100644
--- a/base/object_watcher.h
+++ b/base/object_watcher.h
@@ -68,6 +68,10 @@ class ObjectWatcher : public MessageLoop::DestructionObserver {
//
bool StopWatching();
+ // Returns the handle of the object being watched, or NULL if the object
+ // watcher is stopped.
+ HANDLE GetWatchedObject();
+
private:
// Called on a background thread when done waiting.
static void CALLBACK DoneWaiting(void* param, BOOLEAN timed_out);
diff --git a/base/object_watcher_unittest.cc b/base/object_watcher_unittest.cc
index 3d2b068..06e7284 100644
--- a/base/object_watcher_unittest.cc
+++ b/base/object_watcher_unittest.cc
@@ -34,6 +34,7 @@ void RunTest_BasicSignal(MessageLoop::Type message_loop_type) {
MessageLoop message_loop(message_loop_type);
base::ObjectWatcher watcher;
+ EXPECT_EQ(NULL, watcher.GetWatchedObject());
// A manual-reset event that is not yet signaled.
HANDLE event = CreateEvent(NULL, TRUE, FALSE, NULL);
@@ -41,11 +42,13 @@ void RunTest_BasicSignal(MessageLoop::Type message_loop_type) {
QuitDelegate delegate;
bool ok = watcher.StartWatching(event, &delegate);
EXPECT_TRUE(ok);
+ EXPECT_EQ(event, watcher.GetWatchedObject());
SetEvent(event);
MessageLoop::current()->Run();
+ EXPECT_EQ(NULL, watcher.GetWatchedObject());
CloseHandle(event);
}
diff --git a/chrome/common/ipc_channel_proxy.cc b/chrome/common/ipc_channel_proxy.cc
index baac273..23d8ea9 100644
--- a/chrome/common/ipc_channel_proxy.cc
+++ b/chrome/common/ipc_channel_proxy.cc
@@ -52,14 +52,16 @@ bool ChannelProxy::Context::TryFilters(const Message& message) {
// Called on the IPC::Channel thread
void ChannelProxy::Context::OnMessageReceived(const Message& message) {
// First give a chance to the filters to process this message.
- if (TryFilters(message))
- return;
+ if (!TryFilters(message))
+ OnMessageReceivedNoFilter(message);
+}
+// Called on the IPC::Channel thread
+void ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
// NOTE: This code relies on the listener's message loop not going away while
// this thread is active. That should be a reasonable assumption, but it
// feels risky. We may want to invent some more indirect way of referring to
// a MessageLoop if this becomes a problem.
-
listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
this, &Context::OnDispatchMessage, message));
}
@@ -82,7 +84,7 @@ void ChannelProxy::Context::OnChannelError() {
}
// Called on the IPC::Channel thread
-void ChannelProxy::Context::OnOpenChannel() {
+void ChannelProxy::Context::OnChannelOpened() {
DCHECK(channel_ != NULL);
// Assume a reference to ourselves on behalf of this thread. This reference
@@ -99,7 +101,7 @@ void ChannelProxy::Context::OnOpenChannel() {
}
// Called on the IPC::Channel thread
-void ChannelProxy::Context::OnCloseChannel() {
+void ChannelProxy::Context::OnChannelClosed() {
// It's okay for IPC::ChannelProxy::Close to be called more than once, which
// would result in this branch being taken.
if (!channel_)
@@ -220,22 +222,18 @@ void ChannelProxy::Init(const std::wstring& channel_id, Channel::Mode mode,
// complete initialization on the background thread
context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
- context_.get(), &Context::OnOpenChannel));
+ context_.get(), &Context::OnChannelOpened));
}
void ChannelProxy::Close() {
// Clear the backpointer to the listener so that any pending calls to
// Context::OnDispatchMessage or OnDispatchError will be ignored. It is
// possible that the channel could be closed while it is receiving messages!
- context_->clear();
+ context_->Clear();
- if (MessageLoop::current() == context_->ipc_message_loop()) {
- // We're being destructed on the IPC thread, so no need to use the message
- // loop as it might go away.
- context_->OnCloseChannel();
- } else {
+ if (context_->ipc_message_loop()) {
context_->ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
- context_.get(), &Context::OnCloseChannel));
+ context_.get(), &Context::OnChannelClosed));
}
}
diff --git a/chrome/common/ipc_channel_proxy.h b/chrome/common/ipc_channel_proxy.h
index 8a722d9..59c8e47 100644
--- a/chrome/common/ipc_channel_proxy.h
+++ b/chrome/common/ipc_channel_proxy.h
@@ -112,8 +112,6 @@ class ChannelProxy : public Message::Sender {
void RemoveFilter(MessageFilter* filter);
protected:
- Channel::Listener* listener() const { return context_->listener(); }
-
class Context;
// A subclass uses this constructor if it needs to add more information
// to the internal state. If create_pipe_now is true, the pipe is created
@@ -129,6 +127,7 @@ class ChannelProxy : public Message::Sender {
Context(Channel::Listener* listener, MessageFilter* filter,
MessageLoop* ipc_thread);
virtual ~Context() { }
+ MessageLoop* ipc_message_loop() const { return ipc_message_loop_; }
protected:
// IPC::Channel::Listener methods:
@@ -136,6 +135,9 @@ class ChannelProxy : public Message::Sender {
virtual void OnChannelConnected(int32 peer_pid);
virtual void OnChannelError();
+ // Like OnMessageReceived but doesn't try the filters.
+ void OnMessageReceivedNoFilter(const Message& message);
+
Channel::Listener* listener() const { return listener_; }
const std::wstring& channel_id() const { return channel_id_; }
@@ -143,14 +145,21 @@ class ChannelProxy : public Message::Sender {
// Returns true if the message was processed, false otherwise.
bool TryFilters(const Message& message);
+ // Like Open and Close, but called on the IPC thread.
+ virtual void OnChannelOpened();
+ virtual void OnChannelClosed();
+
+ // Called on the consumers thread when the ChannelProxy is closed. At that
+ // point the consumer is telling us that they don't want to receive any
+ // more messages, so we honor that wish by forgetting them!
+ virtual void Clear() { listener_ = NULL; }
+
private:
friend class ChannelProxy;
// Create the Channel
void CreateChannel(const std::wstring& id, const Channel::Mode& mode);
// Methods called via InvokeLater:
- void OnOpenChannel();
- void OnCloseChannel();
void OnSendMessage(Message* message_ptr);
void OnAddFilter(MessageFilter* filter);
void OnRemoveFilter(MessageFilter* filter);
@@ -158,13 +167,6 @@ class ChannelProxy : public Message::Sender {
void OnDispatchConnected(int32 peer_pid);
void OnDispatchError();
- MessageLoop* ipc_message_loop() const { return ipc_message_loop_; }
-
- // Called on the consumers thread when the ChannelProxy is closed. At that
- // point the consumer is telling us that they don't want to receive any
- // more messages, so we honor that wish by forgetting them!
- void clear() { listener_ = NULL; }
-
MessageLoop* listener_message_loop_;
Channel::Listener* listener_;
diff --git a/chrome/common/ipc_sync_channel.cc b/chrome/common/ipc_sync_channel.cc
index fec8965..01ac78e 100644
--- a/chrome/common/ipc_sync_channel.cc
+++ b/chrome/common/ipc_sync_channel.cc
@@ -37,17 +37,21 @@ class SyncChannel::ReceivedSyncMsgQueue;
class SyncChannel::ReceivedSyncMsgQueue :
public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
public:
- ReceivedSyncMsgQueue() :
- blocking_event_(CreateEvent(NULL, FALSE, FALSE, NULL)),
- task_pending_(false),
- listener_message_loop_(MessageLoop::current()) {
+ // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
+ // if necessary. Call RemoveListener on the same thread when done.
+ static ReceivedSyncMsgQueue* AddListener() {
+ // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
+ // SyncChannel objects can block the same thread).
+ ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
+ if (!rv) {
+ rv = new ReceivedSyncMsgQueue();
+ ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
+ }
+ rv->listener_count_++;
+ return rv;
}
~ReceivedSyncMsgQueue() {
- DCHECK(lazy_tls_ptr_.Pointer()->Get());
- DCHECK(MessageLoop::current() == listener_message_loop_);
- CloseHandle(blocking_event_);
- lazy_tls_ptr_.Pointer()->Set(NULL);
}
// Called on IPC thread when a synchronous message or reply arrives.
@@ -66,7 +70,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
channel_id));
}
- SetEvent(blocking_event_);
+ SetEvent(dispatch_event_);
if (!was_task_pending) {
listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
this, &ReceivedSyncMsgQueue::DispatchMessagesTask));
@@ -105,7 +109,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
}
#ifdef IPC_MESSAGE_LOG_ENABLED
- IPC::Logging* logger = IPC::Logging::current();
+ Logging* logger = Logging::current();
if (logger->Enabled())
logger->OnPreDispatchMessage(*message);
#endif
@@ -123,7 +127,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
}
// Called on the IPC thread when the current sync Send() call is unblocked.
- void OnUnblock() {
+ void DidUnblock() {
if (!received_replies_.empty()) {
MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod(
this, &ReceivedSyncMsgQueue::DispatchReplies));
@@ -149,9 +153,14 @@ class SyncChannel::ReceivedSyncMsgQueue :
message_queue_.push(temp_queue.front());
temp_queue.pop();
}
+
+ if (--listener_count_ == 0) {
+ DCHECK(lazy_tls_ptr_.Pointer()->Get());
+ lazy_tls_ptr_.Pointer()->Set(NULL);
+ }
}
- HANDLE blocking_event() { return blocking_event_; }
+ HANDLE dispatch_event() { return dispatch_event_; }
MessageLoop* listener_message_loop() { return listener_message_loop_; }
// Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
@@ -159,12 +168,19 @@ class SyncChannel::ReceivedSyncMsgQueue :
lazy_tls_ptr_;
private:
+ ReceivedSyncMsgQueue() :
+ dispatch_event_(CreateEvent(NULL, TRUE, FALSE, NULL)),
+ task_pending_(false),
+ listener_message_loop_(MessageLoop::current()),
+ listener_count_(0) {
+ }
+
// Called on the ipc thread to check if we can unblock any current Send()
// calls based on a queued reply.
void DispatchReplies() {
for (size_t i = 0; i < received_replies_.size(); ++i) {
Message* message = received_replies_[i].message;
- if (received_replies_[i].context->UnblockListener(message)) {
+ if (received_replies_[i].context->TryToUnblockListener(message)) {
delete message;
received_replies_.erase(received_replies_.begin() + i);
return;
@@ -172,13 +188,6 @@ class SyncChannel::ReceivedSyncMsgQueue :
}
}
- // Set when we got a synchronous message that we must respond to as the
- // sender needs its reply before it can reply to our original synchronous
- // message.
- HANDLE blocking_event_;
-
- MessageLoop* listener_message_loop_;
-
// Holds information about a queued synchronous message.
struct ReceivedMessage {
ReceivedMessage(Message* m, Channel::Listener* l, const std::wstring& i)
@@ -190,8 +199,6 @@ class SyncChannel::ReceivedSyncMsgQueue :
typedef std::queue<ReceivedMessage> SyncMessageQueue;
SyncMessageQueue message_queue_;
- Lock message_lock_;
- bool task_pending_;
// Holds information about a queued reply message.
struct Reply {
@@ -204,6 +211,15 @@ class SyncChannel::ReceivedSyncMsgQueue :
};
std::vector<Reply> received_replies_;
+
+ // Set when we got a synchronous message that we must respond to as the
+ // sender needs its reply before it can reply to our original synchronous
+ // message.
+ ScopedHandle dispatch_event_;
+ MessageLoop* listener_message_loop_;
+ Lock message_lock_;
+ bool task_pending_;
+ int listener_count_;
};
base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
@@ -212,120 +228,89 @@ base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
SyncChannel::SyncContext::SyncContext(
Channel::Listener* listener,
MessageFilter* filter,
- MessageLoop* ipc_thread)
+ MessageLoop* ipc_thread,
+ HANDLE shutdown_event)
: ChannelProxy::Context(listener, filter, ipc_thread),
- channel_closed_(false),
- reply_deserialize_result_(false) {
- // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
- // SyncChannel objects that can block the same thread).
- received_sync_msgs_ = ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Get();
-
- if (!received_sync_msgs_) {
- // Stash a pointer to the listener thread's ReceivedSyncMsgQueue, as we
- // need to be able to access it in the IPC thread.
- received_sync_msgs_ = new ReceivedSyncMsgQueue();
- ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(received_sync_msgs_);
- }
-
- // Addref manually so that we can ensure destruction on the listener thread
- // (so that the TLS object is NULLd).
- received_sync_msgs_->AddRef();
+ shutdown_event_(shutdown_event),
+ received_sync_msgs_(ReceivedSyncMsgQueue::AddListener()){
}
SyncChannel::SyncContext::~SyncContext() {
while (!deserializers_.empty())
- PopDeserializer(true);
-
- received_sync_msgs_->listener_message_loop()->ReleaseSoon(
- FROM_HERE, received_sync_msgs_);
+ Pop();
}
// Adds information about an outgoing sync message to the context so that
// we know how to deserialize the reply. Returns a handle that's set when
// the reply has arrived.
-HANDLE SyncChannel::SyncContext::Push(IPC::SyncMessage* sync_msg) {
- PendingSyncMsg pending(IPC::SyncMessage::GetMessageId(*sync_msg),
+void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
+ PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
sync_msg->GetReplyDeserializer(),
CreateEvent(NULL, FALSE, FALSE, NULL));
AutoLock auto_lock(deserializers_lock_);
- deserializers_.push(pending);
+ deserializers_.push_back(pending);
+}
+
+bool SyncChannel::SyncContext::Pop() {
+ AutoLock auto_lock(deserializers_lock_);
+ PendingSyncMsg msg = deserializers_.back();
+ delete msg.deserializer;
+ CloseHandle(msg.done_event);
+ deserializers_.pop_back();
+ return msg.send_result;
+}
- return pending.reply_event;
+HANDLE SyncChannel::SyncContext::GetSendDoneEvent() {
+ AutoLock auto_lock(deserializers_lock_);
+ return deserializers_.back().done_event;
}
-HANDLE SyncChannel::SyncContext::blocking_event() {
- return received_sync_msgs_->blocking_event();
+HANDLE SyncChannel::SyncContext::GetDispatchEvent() {
+ return received_sync_msgs_->dispatch_event();
}
void SyncChannel::SyncContext::DispatchMessages() {
received_sync_msgs_->DispatchMessages();
}
-void SyncChannel::SyncContext::RemoveListener(Channel::Listener* listener) {
- received_sync_msgs_->RemoveListener(listener);
-}
-
-bool SyncChannel::SyncContext::UnblockListener(const Message* msg) {
- bool rv = false;
- HANDLE reply_event = NULL;
+bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
{
- if (channel_closed_) {
- // The channel is closed, or we couldn't connect, so cancel all Send()
- // calls.
- reply_deserialize_result_ = false;
- {
- AutoLock auto_lock(deserializers_lock_);
- if (!deserializers_.empty())
- reply_event = deserializers_.top().reply_event;
- }
+ AutoLock auto_lock(deserializers_lock_);
+ if (deserializers_.empty() ||
+ !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
+ return false;
+ }
- if (reply_event)
- PopDeserializer(false);
- } else {
- {
- AutoLock auto_lock(deserializers_lock_);
- if (deserializers_.empty())
- return false;
-
- if (!IPC::SyncMessage::IsMessageReplyTo(*msg, deserializers_.top().id))
- return false;
-
- rv = true;
- if (msg->is_reply_error()) {
- reply_deserialize_result_ = false;
- } else {
- reply_deserialize_result_ = deserializers_.top().deserializer->
- SerializeOutputParameters(*msg);
- }
-
- // Can't CloseHandle the event just yet, since doing so might cause the
- // Wait call above to never return.
- reply_event = deserializers_.top().reply_event;
- }
- PopDeserializer(false);
+ if (!msg->is_reply_error()) {
+ deserializers_.back().send_result = deserializers_.back().deserializer->
+ SerializeOutputParameters(*msg);
}
+ SetEvent(deserializers_.back().done_event);
}
- if (reply_event)
- SetEvent(reply_event);
-
// We got a reply to a synchronous Send() call that's blocking the listener
// thread. However, further down the call stack there could be another
// blocking Send() call, whose reply we received after we made this last
// Send() call. So check if we have any queued replies available that
// can now unblock the listener thread.
- received_sync_msgs_->OnUnblock();
+ received_sync_msgs_->DidUnblock();
+
+ return true;
+}
+
+void SyncChannel::SyncContext::Clear() {
+ CancelPendingSends();
+ received_sync_msgs_->RemoveListener(listener());
- return rv;
+ Context::Clear();
}
-// Called on the IPC thread.
void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
// Give the filters a chance at processing this message.
if (TryFilters(msg))
return;
- if (UnblockListener(&msg))
+ if (TryToUnblockListener(&msg))
return;
if (msg.should_unblock()) {
@@ -338,149 +323,158 @@ void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
return;
}
- return Context::OnMessageReceived(msg);
+ return Context::OnMessageReceivedNoFilter(msg);
}
-// Called on the IPC thread.
void SyncChannel::SyncContext::OnChannelError() {
- channel_closed_ = true;
- UnblockListener(NULL);
-
+ CancelPendingSends();
Context::OnChannelError();
}
-void SyncChannel::SyncContext::PopDeserializer(bool close_reply_event) {
- PendingSyncMsg msg = deserializers_.top();
- delete msg.deserializer;
- if (close_reply_event)
- CloseHandle(msg.reply_event);
- deserializers_.pop();
+void SyncChannel::SyncContext::OnChannelOpened() {
+ shutdown_watcher_.StartWatching(shutdown_event_, this);
+ Context::OnChannelOpened();
}
-SyncChannel::SyncChannel(const std::wstring& channel_id, Channel::Mode mode,
- Channel::Listener* listener, MessageFilter* filter,
- MessageLoop* ipc_message_loop,
- bool create_pipe_now, HANDLE shutdown_event)
- : ChannelProxy(channel_id, mode, ipc_message_loop,
- new SyncContext(listener, filter, ipc_message_loop),
- create_pipe_now),
- shutdown_event_(shutdown_event),
+void SyncChannel::SyncContext::OnChannelClosed() {
+ shutdown_watcher_.StopWatching();
+ Context::OnChannelClosed();
+}
+
+void SyncChannel::SyncContext::OnSendTimeout(int message_id) {
+ AutoLock auto_lock(deserializers_lock_);
+ PendingSyncMessageQueue::iterator iter;
+ for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
+ if ((*iter).id == message_id) {
+ SetEvent((*iter).done_event);
+ break;
+ }
+ }
+}
+
+void SyncChannel::SyncContext::CancelPendingSends() {
+ AutoLock auto_lock(deserializers_lock_);
+ PendingSyncMessageQueue::iterator iter;
+ for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++)
+ SetEvent((*iter).done_event);
+}
+
+void SyncChannel::SyncContext::OnObjectSignaled(HANDLE object) {
+ DCHECK(object == shutdown_event_);
+ // Process shut down before we can get a reply to a synchronous message.
+ // Cancel pending Send calls, which will end up setting the send done event.
+ CancelPendingSends();
+}
+
+
+SyncChannel::SyncChannel(
+ const std::wstring& channel_id, Channel::Mode mode,
+ Channel::Listener* listener, MessageFilter* filter,
+ MessageLoop* ipc_message_loop, bool create_pipe_now, HANDLE shutdown_event)
+ : ChannelProxy(
+ channel_id, mode, ipc_message_loop,
+ new SyncContext(listener, filter, ipc_message_loop, shutdown_event),
+ create_pipe_now),
sync_messages_with_no_timeout_allowed_(true) {
- DCHECK(shutdown_event_ != NULL);
+ // Ideally we only want to watch this object when running a nested message
+ // loop. However, we don't know when it exits if there's another nested
+ // message loop running under it or not, so we wouldn't know whether to
+ // stop or keep watching. So we always watch it, and create the event as
+ // manual reset since the object watcher might otherwise reset the event
+ // when we're doing a WaitForMultipleObjects.
+ dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this);
}
SyncChannel::~SyncChannel() {
- // The listener ensures that its lifetime is greater than SyncChannel. But
- // after SyncChannel is destructed there's no guarantee that the listener is
- // still around, so we wouldn't want ReceivedSyncMsgQueue to call the
- // listener.
- sync_context()->RemoveListener(listener());
}
-bool SyncChannel::Send(IPC::Message* message) {
+bool SyncChannel::Send(Message* message) {
return SendWithTimeout(message, INFINITE);
}
-bool SyncChannel::SendWithTimeout(IPC::Message* message, int timeout_ms) {
- bool message_is_sync = message->is_sync();
- HANDLE pump_messages_event = NULL;
+bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) {
+ if (!message->is_sync()) {
+ ChannelProxy::Send(message);
+ return true;
+ }
- HANDLE reply_event = NULL;
- if (message_is_sync) {
- DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE);
- IPC::SyncMessage* sync_msg = static_cast<IPC::SyncMessage*>(message);
- reply_event = sync_context()->Push(sync_msg);
- pump_messages_event = sync_msg->pump_messages_event();
+ // *this* might get deleted in WaitForReply.
+ scoped_refptr<SyncContext> context(sync_context());
+ if (WaitForSingleObject(context->shutdown_event(), 0) == WAIT_OBJECT_0) {
+ delete message;
+ return false;
}
- // Send the message using the ChannelProxy
+ DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE);
+ SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
+ context->Push(sync_msg);
+ int message_id = SyncMessage::GetMessageId(*sync_msg);
+ HANDLE pump_messages_event = sync_msg->pump_messages_event();
+
ChannelProxy::Send(message);
- if (!message_is_sync)
- return true;
- do {
- // Wait for reply, or for any other incoming synchronous message.
- DCHECK(reply_event != NULL);
- HANDLE objects[] = { shutdown_event_,
- reply_event,
- sync_context()->blocking_event(),
- pump_messages_event};
-
- DWORD result;
- TimeTicks before = TimeTicks::Now();
- if (pump_messages_event == NULL) {
- // No need to pump messages since we didn't get an event to check.
- result = WaitForMultipleObjects(3, objects, FALSE, timeout_ms);
- } else {
- // If the event is set, then we pump messages. Otherwise we also wait on
- // it so that if it gets set we start pumping messages.
- if (WaitForSingleObject(pump_messages_event, 0) == WAIT_OBJECT_0) {
- // Before calling MsgWaitForMultipleObjects() we check that our events
- // are not signaled. The windows message queue might always have events
- // starving the checking of our events otherwise.
- result = WaitForMultipleObjects(3, objects, FALSE, 0);
- if (result == WAIT_TIMEOUT) {
- result = MsgWaitForMultipleObjects(3, objects, FALSE, timeout_ms,
- QS_ALLINPUT);
- }
- } else {
- result = WaitForMultipleObjects(4, objects, FALSE, timeout_ms);
- }
- }
+ if (timeout_ms != INFINITE) {
+ // We use the sync message id so that when a message times out, we don't
+ // confuse it with another send that is either above/below this Send in
+ // the call stack.
+ context->ipc_message_loop()->PostDelayedTask(FROM_HERE,
+ NewRunnableMethod(context.get(),
+ &SyncContext::OnSendTimeout, message_id), timeout_ms);
+ }
- if (result == WAIT_OBJECT_0 || result == WAIT_TIMEOUT) {
- // Process shut down before we can get a reply to a synchronous message,
- // or timed-out. Unblock the thread.
- sync_context()->PopDeserializer(true);
- return false;
- }
+ // Wait for reply, or for any other incoming synchronous messages.
+ WaitForReply(pump_messages_event);
- if (result == WAIT_OBJECT_0 + 1) {
- // We got the reply to our synchronous message.
- CloseHandle(reply_event);
- return sync_context()->reply_deserialize_result();
- }
+ return context->Pop();
+}
- if (result == WAIT_OBJECT_0 + 2) {
+void SyncChannel::WaitForReply(HANDLE pump_messages_event) {
+ while (true) {
+ HANDLE objects[] = { sync_context()->GetDispatchEvent(),
+ sync_context()->GetSendDoneEvent(),
+ pump_messages_event };
+ uint32 count = pump_messages_event ? 3: 2;
+ DWORD result = WaitForMultipleObjects(count, objects, FALSE, INFINITE);
+ if (result == WAIT_OBJECT_0) {
// We're waiting for a reply, but we received a blocking synchronous
// call. We must process it or otherwise a deadlock might occur.
+ ResetEvent(sync_context()->GetDispatchEvent());
sync_context()->DispatchMessages();
- } else if (result == WAIT_OBJECT_0 + 3) {
- // Run a nested messsage loop to pump all the thread's messages. We
- // shutdown the nested loop when there are no more messages.
- pump_messages_events_.push(pump_messages_event);
- bool old_state = MessageLoop::current()->NestableTasksAllowed();
- MessageLoop::current()->SetNestableTasksAllowed(true);
- // Process a message, but come right back out of the MessageLoop (don't
- // loop, sleep, or wait for a kMsgQuit).
- MessageLoop::current()->RunAllPending();
- MessageLoop::current()->SetNestableTasksAllowed(old_state);
- pump_messages_events_.pop();
- } else {
- DCHECK(result == WAIT_OBJECT_0 + 4);
- // We were doing a WaitForMultipleObjects, but now the pump messages
- // event is set, so the next time we loop we'll use
- // MsgWaitForMultipleObjects instead.
+ continue;
}
- if (timeout_ms != INFINITE) {
- TimeDelta time_delta = TimeTicks::Now() - before;
- timeout_ms -= static_cast<int>(time_delta.InMilliseconds());
- if (timeout_ms <= 0) {
- // We timed-out while processing messages.
- sync_context()->PopDeserializer(true);
- return false;
- }
- }
+ if (result == WAIT_OBJECT_0 + 2)
+ WaitForReplyWithNestedMessageLoop(); // Start a nested message loop.
- // Continue looping until we either get the reply to our synchronous message
- // or we time-out.
- } while (true);
+ break;
+ }
}
-bool SyncChannel::UnblockListener(Message* message) {
- return sync_context()->UnblockListener(message);
+void SyncChannel::WaitForReplyWithNestedMessageLoop() {
+ HANDLE old_done_event = send_done_watcher_.GetWatchedObject();
+ send_done_watcher_.StopWatching();
+ send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this);
+ bool old_state = MessageLoop::current()->NestableTasksAllowed();
+ MessageLoop::current()->SetNestableTasksAllowed(true);
+ MessageLoop::current()->Run();
+ MessageLoop::current()->SetNestableTasksAllowed(old_state);
+ if (old_done_event)
+ send_done_watcher_.StartWatching(old_done_event, this);
}
-} // namespace IPC
+void SyncChannel::OnObjectSignaled(HANDLE object) {
+ HANDLE dispatch_event = sync_context()->GetDispatchEvent();
+ if (object == dispatch_event) {
+ // The call to DispatchMessages might delete this object, so reregister
+ // the object watcher first.
+ ResetEvent(dispatch_event);
+ dispatch_watcher_.StartWatching(dispatch_event, this);
+ sync_context()->DispatchMessages();
+ } else {
+ // We got the reply, timed out or the process shutdown.
+ DCHECK(object == sync_context()->GetSendDoneEvent());
+ MessageLoop::current()->Quit();
+ }
+}
+} // namespace IPC
diff --git a/chrome/common/ipc_sync_channel.h b/chrome/common/ipc_sync_channel.h
index 54c9fe9..6c03745 100644
--- a/chrome/common/ipc_sync_channel.h
+++ b/chrome/common/ipc_sync_channel.h
@@ -7,11 +7,12 @@
#include <windows.h>
#include <string>
-#include <stack>
-#include <queue>
+#include <deque>
#include "base/basictypes.h"
#include "base/lock.h"
+#include "base/object_watcher.h"
#include "base/ref_counted.h"
+#include "base/scoped_handle.h"
#include "chrome/common/ipc_channel_proxy.h"
namespace IPC {
@@ -24,17 +25,17 @@ class SyncMessage;
// is more than this object. If the message loop goes away while this object
// is running and it's used to send a message, then it will use the invalid
// message loop pointer to proxy it to the ipc thread.
-class SyncChannel : public ChannelProxy {
+class SyncChannel : public ChannelProxy,
+ public base::ObjectWatcher::Delegate {
public:
SyncChannel(const std::wstring& channel_id, Channel::Mode mode,
Channel::Listener* listener, MessageFilter* filter,
MessageLoop* ipc_message_loop, bool create_pipe_now,
- HANDLE shutdown_handle);
+ HANDLE shutdown_event);
~SyncChannel();
virtual bool Send(Message* message);
virtual bool SendWithTimeout(Message* message, int timeout_ms);
- bool UnblockListener(Message* message);
// Whether we allow sending messages with no time-out.
void set_sync_messages_with_no_timeout_allowed(bool value) {
@@ -48,74 +49,104 @@ class SyncChannel : public ChannelProxy {
// SyncContext holds the per object data for SyncChannel, so that SyncChannel
// can be deleted while it's being used in a different thread. See
// ChannelProxy::Context for more information.
- class SyncContext : public Context {
+ class SyncContext : public Context,
+ public base::ObjectWatcher::Delegate {
public:
SyncContext(Channel::Listener* listener,
MessageFilter* filter,
- MessageLoop* ipc_thread);
+ MessageLoop* ipc_thread,
+ HANDLE shutdown_event);
~SyncContext();
// Adds information about an outgoing sync message to the context so that
- // we know how to deserialize the reply. Returns a handle that's set when
- // the reply has arrived.
- HANDLE Push(IPC::SyncMessage* sync_msg);
+ // we know how to deserialize the reply.
+ void Push(IPC::SyncMessage* sync_msg);
- // Returns true if the reply message was deserialized without any errors,
- // or false otherwise.
- bool reply_deserialize_result() { return reply_deserialize_result_; }
+ // Cleanly remove the top deserializer (and throw it away). Returns the
+ // result of the Send call for that message.
+ bool Pop();
+
+ // Returns an event that's set when the send is complete, timed out or the
+ // process shut down.
+ HANDLE GetSendDoneEvent();
// Returns an event that's set when an incoming message that's not the reply
// needs to get dispatched (by calling SyncContext::DispatchMessages).
- HANDLE blocking_event();
+ HANDLE GetDispatchEvent();
void DispatchMessages();
- void RemoveListener(Channel::Listener* listener);
// Checks if the given message is blocking the listener thread because of a
// synchronous send. If it is, the thread is unblocked and true is returned.
// Otherwise the function returns false.
- bool UnblockListener(const Message* msg);
+ bool TryToUnblockListener(const Message* msg);
+
+ // Called on the IPC thread when a sync send that runs a nested message loop
+ // times out.
+ void OnSendTimeout(int message_id);
- // Cleanly remove the top deserializer (and throw it away).
- void PopDeserializer(bool close_reply_event);
+ HANDLE shutdown_event() { return shutdown_event_; }
private:
- void OnMessageReceived(const Message& msg);
- void OnChannelError();
+ // IPC::ChannelProxy methods that we override.
+
+ // Called on the listener thread.
+ virtual void Clear();
+
+ // Called on the IPC thread.
+ virtual void OnMessageReceived(const Message& msg);
+ virtual void OnChannelError();
+ virtual void OnChannelOpened();
+ virtual void OnChannelClosed();
+
+ // Cancels all pending Send calls.
+ void CancelPendingSends();
+
+ // ObjectWatcher::Delegate implementation.
+ virtual void OnObjectSignaled(HANDLE object);
// When sending a synchronous message, this structure contains an object that
// knows how to deserialize the response.
struct PendingSyncMsg {
PendingSyncMsg(int id, IPC::MessageReplyDeserializer* d, HANDLE e) :
- id(id), deserializer(d), reply_event(e) { }
+ id(id), deserializer(d), done_event(e), send_result(false) { }
int id;
IPC::MessageReplyDeserializer* deserializer;
- HANDLE reply_event;
+ HANDLE done_event;
+ bool send_result;
};
- typedef std::stack<PendingSyncMsg> PendingSyncMessageQueue;
+ typedef std::deque<PendingSyncMsg> PendingSyncMessageQueue;
PendingSyncMessageQueue deserializers_;
Lock deserializers_lock_;
- // This can't be a scoped_refptr because it needs to be released on the
- // listener thread.
- ReceivedSyncMsgQueue* received_sync_msgs_;
+ scoped_refptr<ReceivedSyncMsgQueue> received_sync_msgs_;
- bool channel_closed_;
- bool reply_deserialize_result_;
+ HANDLE shutdown_event_;
+ base::ObjectWatcher shutdown_watcher_;
};
private:
+ // ObjectWatcher::Delegate implementation.
+ virtual void OnObjectSignaled(HANDLE object);
+
SyncContext* sync_context() { return reinterpret_cast<SyncContext*>(context()); }
- // Copy of shutdown event that we get in constructor.
- HANDLE shutdown_event_;
+ // Both these functions wait for a reply, timeout or process shutdown. The
+ // latter one also runs a nested message loop in the meantime.
+ void WaitForReply(HANDLE pump_messages_event);
- std::stack<HANDLE> pump_messages_events_;
+ // Runs a nested message loop until a reply arrives, times out, or the process
+ // shuts down.
+ void WaitForReplyWithNestedMessageLoop();
bool sync_messages_with_no_timeout_allowed_;
+ // Used to signal events between the IPC and listener threads.
+ base::ObjectWatcher send_done_watcher_;
+ base::ObjectWatcher dispatch_watcher_;
+
DISALLOW_EVIL_CONSTRUCTORS(SyncChannel);
};
diff --git a/chrome/common/ipc_sync_channel_unittest.cc b/chrome/common/ipc_sync_channel_unittest.cc
index 11ba159..decd6b6 100644
--- a/chrome/common/ipc_sync_channel_unittest.cc
+++ b/chrome/common/ipc_sync_channel_unittest.cc
@@ -83,17 +83,14 @@ class Worker : public Channel::Listener, public Message::Sender {
// The IPC thread needs to outlive SyncChannel, so force the correct order of
// destruction.
virtual ~Worker() {
- CloseChannel();
- // We must stop the threads and release the channel here. The IPC thread
- // must die before the listener thread, otherwise if its in the process of
- // sending a message, it will get an error, it will use channel_, which
- // references listener_. There are many ways of crashing, depending on
- // timing.
- // This is a race condition so you may not see it all the time even if you
- // reverse the Stop() calls. You may see this bug with AppVerifier only.
+ Event listener_done, ipc_done;
+ ListenerThread()->message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
+ this, &Worker::OnListenerThreadShutdown, listener_done.handle(),
+ ipc_done.handle()));
+ HANDLE handles[] = { listener_done.handle(), ipc_done.handle() };
+ WaitForMultipleObjects(2, handles, TRUE, INFINITE);
ipc_thread_.Stop();
listener_thread_.Stop();
- channel_.reset();
}
void AddRef() { }
void Release() { }
@@ -102,12 +99,13 @@ class Worker : public Channel::Listener, public Message::Sender {
return channel_->SendWithTimeout(msg, timeout_ms);
}
void WaitForChannelCreation() { channel_created_.Wait(); }
- void CloseChannel() { channel_.reset(); }
+ void CloseChannel() {
+ DCHECK(MessageLoop::current() == ListenerThread()->message_loop());
+ channel_->Close();
+ }
void Start() {
StartThread(&listener_thread_);
- base::Thread* thread =
- overrided_thread_ ? overrided_thread_ : &listener_thread_;
- thread->message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
+ ListenerThread()->message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
this, &Worker::OnStart));
}
void OverrideThread(base::Thread* overrided_thread) {
@@ -124,7 +122,6 @@ class Worker : public Channel::Listener, public Message::Sender {
// Functions for dervied classes to implement if they wish.
virtual void Run() { }
- virtual void OnDouble(int in, int* out) { NOTREACHED(); }
virtual void OnAnswer(int* answer) { NOTREACHED(); }
virtual void OnAnswerDelay(Message* reply_msg) {
// The message handler map below can only take one entry for
@@ -137,8 +134,18 @@ class Worker : public Channel::Listener, public Message::Sender {
SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, answer);
Send(reply_msg);
}
+ virtual void OnDouble(int in, int* out) { NOTREACHED(); }
+ virtual void OnDoubleDelay(int in, Message* reply_msg) {
+ int result;
+ OnDouble(in, &result);
+ SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, result);
+ Send(reply_msg);
+ }
private:
+ base::Thread* ListenerThread() {
+ return overrided_thread_ ? overrided_thread_ : &listener_thread_;
+ }
// Called on the listener thread to create the sync channel.
void OnStart() {
// Link ipc_thread_, listener_thread_ and channel_ altogether.
@@ -150,9 +157,22 @@ class Worker : public Channel::Listener, public Message::Sender {
Run();
}
+ void OnListenerThreadShutdown(HANDLE listener_event, HANDLE ipc_event) {
+ // SyncChannel needs to be destructed on the thread that it was created on.
+ channel_.reset();
+ SetEvent(listener_event);
+
+ ipc_thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
+ this, &Worker::OnIPCThreadShutdown, ipc_event));
+ }
+
+ void OnIPCThreadShutdown(HANDLE ipc_event) {
+ SetEvent(ipc_event);
+ }
+
void OnMessageReceived(const Message& message) {
IPC_BEGIN_MESSAGE_MAP(Worker, message)
- IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Double, OnDouble)
+ IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_Double, OnDoubleDelay)
IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_AnswerToLife,
OnAnswerDelay)
IPC_END_MESSAGE_MAP()
@@ -211,21 +231,27 @@ void RunTest(std::vector<Worker*> workers) {
} // namespace
-
//-----------------------------------------------------------------------------
namespace {
class SimpleServer : public Worker {
public:
- SimpleServer() : Worker(Channel::MODE_SERVER, "simpler_server") { }
+ SimpleServer(bool pump_during_send)
+ : Worker(Channel::MODE_SERVER, "simpler_server"),
+ pump_during_send_(pump_during_send) { }
void Run() {
int answer = 0;
- bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
DCHECK(result);
DCHECK(answer == 42);
Done();
}
+
+ bool pump_during_send_;
};
class SimpleClient : public Worker {
@@ -238,16 +264,20 @@ class SimpleClient : public Worker {
}
};
-} // namespace
-
-// Tests basic synchronous call
-TEST_F(IPCSyncChannelTest, Simple) {
+void Simple(bool pump_during_send) {
std::vector<Worker*> workers;
- workers.push_back(new SimpleServer());
+ workers.push_back(new SimpleServer(pump_during_send));
workers.push_back(new SimpleClient());
RunTest(workers);
}
+} // namespace
+
+// Tests basic synchronous call
+TEST_F(IPCSyncChannelTest, Simple) {
+ Simple(false);
+ Simple(true);
+}
//-----------------------------------------------------------------------------
@@ -264,16 +294,20 @@ class DelayClient : public Worker {
}
};
-} // namespace
-
-// Tests that asynchronous replies work
-TEST_F(IPCSyncChannelTest, DelayReply) {
+void DelayReply(bool pump_during_send) {
std::vector<Worker*> workers;
- workers.push_back(new SimpleServer());
+ workers.push_back(new SimpleServer(pump_during_send));
workers.push_back(new DelayClient());
RunTest(workers);
}
+} // namespace
+
+// Tests that asynchronous replies work
+TEST_F(IPCSyncChannelTest, DelayReply) {
+ DelayReply(false);
+ DelayReply(true);
+}
//-----------------------------------------------------------------------------
@@ -281,22 +315,30 @@ namespace {
class NoHangServer : public Worker {
public:
- explicit NoHangServer(Event* got_first_reply)
- : Worker(Channel::MODE_SERVER, "no_hang_server"),
- got_first_reply_(got_first_reply) { }
+ explicit NoHangServer(Event* got_first_reply, bool pump_during_send)
+ : Worker(Channel::MODE_SERVER, "no_hang_server"),
+ got_first_reply_(got_first_reply),
+ pump_during_send_(pump_during_send) { }
void Run() {
int answer = 0;
- bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
DCHECK(result);
DCHECK(answer == 42);
got_first_reply_->Set();
- result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
+ msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ result = Send(msg);
DCHECK(!result);
Done();
}
Event* got_first_reply_;
+ bool pump_during_send_;
};
class NoHangClient : public Worker {
@@ -318,29 +360,37 @@ class NoHangClient : public Worker {
Event* got_first_reply_;
};
-} // namespace
-
-// Tests that caller doesn't hang if receiver dies
-TEST_F(IPCSyncChannelTest, NoHang) {
+void NoHang(bool pump_during_send) {
Event got_first_reply;
-
std::vector<Worker*> workers;
- workers.push_back(new NoHangServer(&got_first_reply));
+ workers.push_back(new NoHangServer(&got_first_reply, pump_during_send));
workers.push_back(new NoHangClient(&got_first_reply));
RunTest(workers);
}
+} // namespace
+
+// Tests that caller doesn't hang if receiver dies
+TEST_F(IPCSyncChannelTest, NoHang) {
+ NoHang(false);
+ NoHang(true);
+}
//-----------------------------------------------------------------------------
namespace {
-class RecursiveServer : public Worker {
+class UnblockServer : public Worker {
public:
- RecursiveServer() : Worker(Channel::MODE_SERVER, "recursive_server") { }
+ UnblockServer(bool pump_during_send)
+ : Worker(Channel::MODE_SERVER, "unblock_server"),
+ pump_during_send_(pump_during_send) { }
void Run() {
int answer = 0;
- bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
DCHECK(result);
DCHECK(answer == 42);
Done();
@@ -349,29 +399,160 @@ class RecursiveServer : public Worker {
void OnDouble(int in, int* out) {
*out = in * 2;
}
+
+ bool pump_during_send_;
};
-class RecursiveClient : public Worker {
+class UnblockClient : public Worker {
public:
- RecursiveClient() : Worker(Channel::MODE_CLIENT, "recursive_client") { }
+ UnblockClient(bool pump_during_send)
+ : Worker(Channel::MODE_CLIENT, "unblock_client"),
+ pump_during_send_(pump_during_send) { }
void OnAnswer(int* answer) {
- BOOL result = Send(new SyncChannelTestMsg_Double(21, answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_Double(21, answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ BOOL result = Send(msg);
DCHECK(result);
Done();
}
+
+ bool pump_during_send_;
};
+void Unblock(bool server_pump, bool client_pump) {
+ std::vector<Worker*> workers;
+ workers.push_back(new UnblockServer(server_pump));
+ workers.push_back(new UnblockClient(client_pump));
+ RunTest(workers);
+}
+
} // namespace
// Tests that the caller unblocks to answer a sync message from the receiver.
+TEST_F(IPCSyncChannelTest, Unblock) {
+ Unblock(false, false);
+ Unblock(false, true);
+ Unblock(true, false);
+ Unblock(true, true);
+}
+
+//-----------------------------------------------------------------------------
+
+namespace {
+
+class RecursiveServer : public Worker {
+ public:
+ explicit RecursiveServer(
+ bool expected_send_result, bool pump_first, bool pump_second)
+ : Worker(Channel::MODE_SERVER, "recursive_server"),
+ expected_send_result_(expected_send_result),
+ pump_first_(pump_first), pump_second_(pump_second) { }
+ void Run() {
+ int answer;
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_Double(21, &answer);
+ if (pump_first_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
+ DCHECK(result == expected_send_result_);
+ Done();
+ }
+
+ void OnDouble(int in, int* out) {
+ int answer;
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_second_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
+ DCHECK(result == expected_send_result_);
+ }
+
+ bool expected_send_result_, pump_first_, pump_second_;
+};
+
+class RecursiveClient : public Worker {
+ public:
+ explicit RecursiveClient(bool pump_during_send, bool close_channel)
+ : Worker(Channel::MODE_CLIENT, "recursive_client"),
+ pump_during_send_(pump_during_send), close_channel_(close_channel) { }
+
+ void OnDoubleDelay(int in, Message* reply_msg) {
+ int answer = 0;
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
+ DCHECK(result != close_channel_);
+ if (!close_channel_) {
+ SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2);
+ Send(reply_msg);
+ }
+ Done();
+ }
+
+ void OnAnswerDelay(Message* reply_msg) {
+ if (close_channel_) {
+ CloseChannel();
+ } else {
+ SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
+ Send(reply_msg);
+ }
+ }
+
+ bool pump_during_send_, close_channel_;
+};
+
+void Recursive(
+ bool server_pump_first, bool server_pump_second, bool client_pump) {
+ std::vector<Worker*> workers;
+ workers.push_back(
+ new RecursiveServer(true, server_pump_first, server_pump_second));
+ workers.push_back(new RecursiveClient(client_pump, false));
+ RunTest(workers);
+}
+
+} // namespace
+
+// Tests a server calling Send while another Send is pending.
TEST_F(IPCSyncChannelTest, Recursive) {
+ Recursive(false, false, false);
+ Recursive(false, false, true);
+ Recursive(false, true, false);
+ Recursive(false, true, true);
+ Recursive(true, false, false);
+ Recursive(true, false, true);
+ Recursive(true, true, false);
+ Recursive(true, true, true);
+}
+
+//-----------------------------------------------------------------------------
+
+namespace {
+
+void RecursiveNoHang(
+ bool server_pump_first, bool server_pump_second, bool client_pump) {
std::vector<Worker*> workers;
- workers.push_back(new RecursiveServer());
- workers.push_back(new RecursiveClient());
+ workers.push_back(
+ new RecursiveServer(false, server_pump_first, server_pump_second));
+ workers.push_back(new RecursiveClient(client_pump, true));
RunTest(workers);
}
+} // namespace
+
+// Tests that if a caller makes a sync call during an existing sync call and
+// the receiver dies, neither of the Send() calls hang.
+TEST_F(IPCSyncChannelTest, RecursiveNoHang) {
+ RecursiveNoHang(false, false, false);
+ RecursiveNoHang(false, false, true);
+ RecursiveNoHang(false, true, false);
+ RecursiveNoHang(false, true, true);
+ RecursiveNoHang(true, false, false);
+ RecursiveNoHang(true, false, true);
+ RecursiveNoHang(true, true, false);
+ RecursiveNoHang(true, true, true);
+}
//-----------------------------------------------------------------------------
@@ -379,14 +560,22 @@ namespace {
class MultipleServer1 : public Worker {
public:
- MultipleServer1() : Worker(L"test_channel1", Channel::MODE_SERVER) { }
+ MultipleServer1(bool pump_during_send)
+ : Worker(L"test_channel1", Channel::MODE_SERVER),
+ pump_during_send_(pump_during_send) { }
+
void Run() {
int answer = 0;
- bool result = Send(new SyncChannelTestMsg_Double(5, &answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
DCHECK(result);
DCHECK(answer == 10);
Done();
}
+
+ bool pump_during_send_;
};
class MultipleClient1 : public Worker {
@@ -419,15 +608,21 @@ class MultipleServer2 : public Worker {
class MultipleClient2 : public Worker {
public:
- MultipleClient2(Event* client1_msg_received, Event* client1_can_reply) :
- Worker(L"test_channel2", Channel::MODE_CLIENT),
+ MultipleClient2(
+ Event* client1_msg_received, Event* client1_can_reply,
+ bool pump_during_send)
+ : Worker(L"test_channel2", Channel::MODE_CLIENT),
client1_msg_received_(client1_msg_received),
- client1_can_reply_(client1_can_reply) { }
+ client1_can_reply_(client1_can_reply),
+ pump_during_send_(pump_during_send) { }
void Run() {
int answer = 0;
client1_msg_received_->Wait();
- bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
DCHECK(result);
DCHECK(answer == 42);
client1_can_reply_->Set();
@@ -436,13 +631,10 @@ class MultipleClient2 : public Worker {
private:
Event *client1_msg_received_, *client1_can_reply_;
+ bool pump_during_send_;
};
-} // namespace
-
-// Tests that multiple SyncObjects on the same listener thread can unblock each
-// other.
-TEST_F(IPCSyncChannelTest, Multiple) {
+void Multiple(bool server_pump, bool client_pump) {
std::vector<Worker*> workers;
// A shared worker thread so that server1 and server2 run on one thread.
@@ -461,10 +653,10 @@ TEST_F(IPCSyncChannelTest, Multiple) {
workers.push_back(worker);
worker = new MultipleClient2(
- &client1_msg_received, &client1_can_reply);
+ &client1_msg_received, &client1_can_reply, client_pump);
workers.push_back(worker);
- worker = new MultipleServer1();
+ worker = new MultipleServer1(server_pump);
worker->OverrideThread(&worker_thread);
workers.push_back(worker);
@@ -475,6 +667,16 @@ TEST_F(IPCSyncChannelTest, Multiple) {
RunTest(workers);
}
+} // namespace
+
+// Tests that multiple SyncObjects on the same listener thread can unblock each
+// other.
+TEST_F(IPCSyncChannelTest, Multiple) {
+ Multiple(false, false);
+ Multiple(false, true);
+ Multiple(true, false);
+ Multiple(true, true);
+}
//-----------------------------------------------------------------------------
@@ -482,14 +684,21 @@ namespace {
class QueuedReplyServer1 : public Worker {
public:
- QueuedReplyServer1() : Worker(L"test_channel1", Channel::MODE_SERVER) { }
+ QueuedReplyServer1(bool pump_during_send)
+ : Worker(L"test_channel1", Channel::MODE_SERVER),
+ pump_during_send_(pump_during_send) { }
void Run() {
int answer = 0;
- bool result = Send(new SyncChannelTestMsg_Double(5, &answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
DCHECK(result);
DCHECK(answer == 10);
Done();
}
+
+ bool pump_during_send_;
};
class QueuedReplyClient1 : public Worker {
@@ -531,30 +740,29 @@ class QueuedReplyServer2 : public Worker {
class QueuedReplyClient2 : public Worker {
public:
- explicit QueuedReplyClient2(Event* client1_msg_received) :
- Worker(L"test_channel2", Channel::MODE_CLIENT),
- client1_msg_received_(client1_msg_received) { }
+ explicit QueuedReplyClient2(
+ Event* client1_msg_received, bool pump_during_send)
+ : Worker(L"test_channel2", Channel::MODE_CLIENT),
+ client1_msg_received_(client1_msg_received),
+ pump_during_send_(pump_during_send){ }
void Run() {
int answer = 0;
client1_msg_received_->Wait();
- bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = Send(msg);
DCHECK(result);
DCHECK(answer == 42);
Done();
}
- private:
Event *client1_msg_received_;
+ bool pump_during_send_;
};
-} // namespace
-
-// While a blocking send is in progress, the listener thread might answer other
-// synchronous messages. This tests that if during the response to another
-// message the reply to the original messages comes, it is queued up correctly
-// and the original Send is unblocked later.
-TEST_F(IPCSyncChannelTest, QueuedReply) {
+void QueuedReply(bool server_pump, bool client_pump) {
std::vector<Worker*> workers;
// A shared worker thread so that server1 and server2 run on one thread.
@@ -569,10 +777,10 @@ TEST_F(IPCSyncChannelTest, QueuedReply) {
worker->OverrideThread(&worker_thread);
workers.push_back(worker);
- worker = new QueuedReplyClient2(&client1_msg_received);
+ worker = new QueuedReplyClient2(&client1_msg_received, client_pump);
workers.push_back(worker);
- worker = new QueuedReplyServer1();
+ worker = new QueuedReplyServer1(server_pump);
worker->OverrideThread(&worker_thread);
workers.push_back(worker);
@@ -583,6 +791,18 @@ TEST_F(IPCSyncChannelTest, QueuedReply) {
RunTest(workers);
}
+} // namespace
+
+// While a blocking send is in progress, the listener thread might answer other
+// synchronous messages. This tests that if during the response to another
+// message the reply to the original messages comes, it is queued up correctly
+// and the original Send is unblocked later.
+TEST_F(IPCSyncChannelTest, QueuedReply) {
+ QueuedReply(false, false);
+ QueuedReply(false, true);
+ QueuedReply(true, false);
+ QueuedReply(true, true);
+}
//-----------------------------------------------------------------------------
@@ -590,14 +810,18 @@ namespace {
class BadServer : public Worker {
public:
- BadServer() : Worker(Channel::MODE_SERVER, "simpler_server") { }
+ BadServer(bool pump_during_send)
+ : Worker(Channel::MODE_SERVER, "simpler_server"),
+ pump_during_send_(pump_during_send) { }
void Run() {
int answer = 0;
- Message* msg = new SyncMessage(MSG_ROUTING_CONTROL,
- SyncChannelTestMsg_Double::ID,
- Message::PRIORITY_NORMAL,
- NULL);
+ IPC::SyncMessage* msg = new SyncMessage(
+ MSG_ROUTING_CONTROL, SyncChannelTestMsg_Double::ID,
+ Message::PRIORITY_NORMAL, NULL);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+
// Temporarily set the minimum logging very high so that the assertion
// in ipc_message_utils doesn't fire.
int log_level = logging::GetMinLogLevel();
@@ -613,27 +837,33 @@ class BadServer : public Worker {
Done();
}
-};
-} // namespace
+ bool pump_during_send_;
+};
-// Tests that if a message is not serialized correctly, the Send() will fail.
-TEST_F(IPCSyncChannelTest, BadMessage) {
+void BadMessage(bool pump_during_send) {
std::vector<Worker*> workers;
- workers.push_back(new BadServer());
+ workers.push_back(new BadServer(pump_during_send));
workers.push_back(new SimpleClient());
RunTest(workers);
}
+} // namespace
+
+// Tests that if a message is not serialized correctly, the Send() will fail.
+TEST_F(IPCSyncChannelTest, BadMessage) {
+ BadMessage(false);
+ BadMessage(true);
+}
//-----------------------------------------------------------------------------
namespace {
-class ChattyRecursiveClient : public Worker {
+class ChattyClient : public Worker {
public:
- ChattyRecursiveClient() :
- Worker(Channel::MODE_CLIENT, "chatty_recursive_client") { }
+ ChattyClient() :
+ Worker(Channel::MODE_CLIENT, "chatty_client") { }
void OnAnswer(int* answer) {
// The PostMessage limit is 10k. Send 20% more than that.
@@ -649,19 +879,23 @@ class ChattyRecursiveClient : public Worker {
}
};
+void ChattyServer(bool pump_during_send) {
+ std::vector<Worker*> workers;
+ workers.push_back(new UnblockServer(pump_during_send));
+ workers.push_back(new ChattyClient());
+ RunTest(workers);
+}
+
} // namespace
// Tests http://b/issue?id=1093251 - that sending lots of sync messages while
// the receiver is waiting for a sync reply does not overflow the PostMessage
// queue.
TEST_F(IPCSyncChannelTest, ChattyServer) {
- std::vector<Worker*> workers;
- workers.push_back(new RecursiveServer());
- workers.push_back(new ChattyRecursiveClient());
- RunTest(workers);
+ ChattyServer(false);
+ ChattyServer(true);
}
-
//------------------------------------------------------------------------------
namespace {
@@ -669,19 +903,22 @@ namespace {
class TimeoutServer : public Worker {
public:
TimeoutServer(int timeout_ms,
- std::vector<bool> timeout_seq)
+ std::vector<bool> timeout_seq,
+ bool pump_during_send)
: Worker(Channel::MODE_SERVER, "timeout_server"),
timeout_ms_(timeout_ms),
- timeout_seq_(timeout_seq) {
+ timeout_seq_(timeout_seq),
+ pump_during_send_(pump_during_send) {
}
void Run() {
for (std::vector<bool>::const_iterator iter = timeout_seq_.begin();
iter != timeout_seq_.end(); ++iter) {
int answer = 0;
- bool result =
- SendWithTimeout(new SyncChannelTestMsg_AnswerToLife(&answer),
- timeout_ms_);
+ IPC::SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
+ if (pump_during_send_)
+ msg->EnableMessagePumping();
+ bool result = SendWithTimeout(msg, timeout_ms_);
if (*iter) {
// Time-out expected.
DCHECK(!result);
@@ -697,6 +934,7 @@ class TimeoutServer : public Worker {
private:
int timeout_ms_;
std::vector<bool> timeout_seq_;
+ bool pump_during_send_;
};
class UnresponsiveClient : public Worker {
@@ -725,35 +963,29 @@ class UnresponsiveClient : public Worker {
std::vector<bool> timeout_seq_;
};
-} // namespace
-
-// Tests that SendWithTimeout does not time-out if the response comes back fast
-// enough.
-TEST_F(IPCSyncChannelTest, SendWithTimeoutOK) {
+void SendWithTimeoutOK(bool pump_during_send) {
std::vector<Worker*> workers;
std::vector<bool> timeout_seq;
timeout_seq.push_back(false);
timeout_seq.push_back(false);
timeout_seq.push_back(false);
- workers.push_back(new TimeoutServer(5000, timeout_seq));
+ workers.push_back(new TimeoutServer(5000, timeout_seq, pump_during_send));
workers.push_back(new SimpleClient());
RunTest(workers);
}
-// Tests that SendWithTimeout does time-out.
-TEST_F(IPCSyncChannelTest, SendWithTimeoutTimeout) {
+void SendWithTimeoutTimeout(bool pump_during_send) {
std::vector<Worker*> workers;
std::vector<bool> timeout_seq;
timeout_seq.push_back(true);
timeout_seq.push_back(false);
timeout_seq.push_back(false);
- workers.push_back(new TimeoutServer(100, timeout_seq));
+ workers.push_back(new TimeoutServer(100, timeout_seq, pump_during_send));
workers.push_back(new UnresponsiveClient(timeout_seq));
RunTest(workers);
}
-// Sends some message that time-out and some that succeed.
-TEST_F(IPCSyncChannelTest, SendWithTimeoutMixedOKAndTimeout) {
+void SendWithTimeoutMixedOKAndTimeout(bool pump_during_send) {
std::vector<Worker*> workers;
std::vector<bool> timeout_seq;
timeout_seq.push_back(true);
@@ -761,7 +993,28 @@ TEST_F(IPCSyncChannelTest, SendWithTimeoutMixedOKAndTimeout) {
timeout_seq.push_back(false);
timeout_seq.push_back(true);
timeout_seq.push_back(false);
- workers.push_back(new TimeoutServer(100, timeout_seq));
+ workers.push_back(new TimeoutServer(100, timeout_seq, pump_during_send));
workers.push_back(new UnresponsiveClient(timeout_seq));
RunTest(workers);
-} \ No newline at end of file
+}
+
+} // namespace
+
+// Tests that SendWithTimeout does not time-out if the response comes back fast
+// enough.
+TEST_F(IPCSyncChannelTest, SendWithTimeoutOK) {
+ SendWithTimeoutOK(false);
+ SendWithTimeoutOK(true);
+}
+
+// Tests that SendWithTimeout does time-out.
+TEST_F(IPCSyncChannelTest, SendWithTimeoutTimeout) {
+ SendWithTimeoutTimeout(false);
+ SendWithTimeoutTimeout(true);
+}
+
+// Sends some message that time-out and some that succeed.
+TEST_F(IPCSyncChannelTest, SendWithTimeoutMixedOKAndTimeout) {
+ SendWithTimeoutMixedOKAndTimeout(false);
+ SendWithTimeoutMixedOKAndTimeout(true);
+}
diff --git a/chrome/common/ipc_sync_message.h b/chrome/common/ipc_sync_message.h
index d098a99..86cb62f 100644
--- a/chrome/common/ipc_sync_message.h
+++ b/chrome/common/ipc_sync_message.h
@@ -27,8 +27,9 @@ class SyncMessage : public Message {
// If this message can cause the receiver to block while waiting for user
// input (i.e. by calling MessageBox), then the caller needs to pump window
// messages and dispatch asynchronous messages while waiting for the reply.
- // If this handle is passed in, then window messages will be pumped while
- // it's set. The handle must be valid until after the Send call returns.
+ // If this handle is passed in, then window messages will start being pumped
+ // when it's set. Note that this behavior will continue even if the event is
+ // later reset. The handle must be valid until after the Send call returns.
void set_pump_messages_event(HANDLE event) {
pump_messages_event_ = event;
if (event) {