summaryrefslogtreecommitdiffstats
path: root/chrome/common
diff options
context:
space:
mode:
authorjcampan@google.com <jcampan@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2008-08-12 01:25:41 +0000
committerjcampan@google.com <jcampan@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2008-08-12 01:25:41 +0000
commitd65cab7ac310ea12c5d946ff40242a243ce911da (patch)
treefe3eaa4a4e6f14a7416c4b4da351e18cd34d34b2 /chrome/common
parent1a48f315b0ca5c26c4446070edfb5842ed06c8c7 (diff)
downloadchromium_src-d65cab7ac310ea12c5d946ff40242a243ce911da.zip
chromium_src-d65cab7ac310ea12c5d946ff40242a243ce911da.tar.gz
chromium_src-d65cab7ac310ea12c5d946ff40242a243ce911da.tar.bz2
Enabling sync_channel in the browser to allow accessibility code making blocking calls. This replaces my previous CL that was somehow duplicating some of these functionalities.
BUG=None TEST=Run the unit tests. git-svn-id: svn://svn.chromium.org/chrome/trunk/src@691 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/common')
-rw-r--r--chrome/common/ipc_channel_proxy.cc14
-rw-r--r--chrome/common/ipc_channel_proxy.h5
-rw-r--r--chrome/common/ipc_sync_channel.cc79
-rw-r--r--chrome/common/ipc_sync_channel.h24
-rw-r--r--chrome/common/ipc_sync_channel_unittest.cc105
-rw-r--r--chrome/common/ipc_sync_message.cc7
-rw-r--r--chrome/common/ipc_sync_message.h4
7 files changed, 207 insertions, 31 deletions
diff --git a/chrome/common/ipc_channel_proxy.cc b/chrome/common/ipc_channel_proxy.cc
index 9d6b473..2cddc8e 100644
--- a/chrome/common/ipc_channel_proxy.cc
+++ b/chrome/common/ipc_channel_proxy.cc
@@ -55,8 +55,7 @@ void ChannelProxy::Context::CreateChannel(const std::wstring& id,
channel_ = new Channel(id, mode, this);
}
-// Called on the IPC::Channel thread
-void ChannelProxy::Context::OnMessageReceived(const Message& message) {
+bool ChannelProxy::Context::TryFilters(const Message& message) {
#ifdef IPC_MESSAGE_LOG_ENABLED
Logging* logger = Logging::current();
if (logger->Enabled())
@@ -69,9 +68,17 @@ void ChannelProxy::Context::OnMessageReceived(const Message& message) {
if (logger->Enabled())
logger->OnPostDispatchMessage(message, channel_id_);
#endif
- return;
+ return true;
}
}
+ return false;
+}
+
+// Called on the IPC::Channel thread
+void ChannelProxy::Context::OnMessageReceived(const Message& message) {
+ // First give a chance to the filters to process this message.
+ if (TryFilters(message))
+ return;
// NOTE: This code relies on the listener's message loop not going away while
// this thread is active. That should be a reasonable assumption, but it
@@ -217,7 +224,6 @@ ChannelProxy::ChannelProxy(const std::wstring& channel_id, Channel::Mode mode,
}
ChannelProxy::ChannelProxy(const std::wstring& channel_id, Channel::Mode mode,
- Channel::Listener* listener, MessageFilter* filter,
MessageLoop* ipc_thread, Context* context,
bool create_pipe_now)
: context_(context) {
diff --git a/chrome/common/ipc_channel_proxy.h b/chrome/common/ipc_channel_proxy.h
index ca8abb4..82b3d88 100644
--- a/chrome/common/ipc_channel_proxy.h
+++ b/chrome/common/ipc_channel_proxy.h
@@ -149,7 +149,6 @@ class ChannelProxy : public Message::Sender {
// to the internal state. If create_pipe_now is true, the pipe is created
// immediately. Otherwise it's created on the IO thread.
ChannelProxy(const std::wstring& channel_id, Channel::Mode mode,
- Channel::Listener* listener, MessageFilter* filter,
MessageLoop* ipc_thread_loop, Context* context,
bool create_pipe_now);
@@ -170,6 +169,10 @@ class ChannelProxy : public Message::Sender {
Channel::Listener* listener() const { return listener_; }
const std::wstring& channel_id() const { return channel_id_; }
+ // Gives the filters a chance at processing |message|.
+ // Returns true if the message was processed, false otherwise.
+ bool TryFilters(const Message& message);
+
private:
friend class ChannelProxy;
// Create the Channel
diff --git a/chrome/common/ipc_sync_channel.cc b/chrome/common/ipc_sync_channel.cc
index 897e7bf..e01759a 100644
--- a/chrome/common/ipc_sync_channel.cc
+++ b/chrome/common/ipc_sync_channel.cc
@@ -70,7 +70,6 @@ class SyncChannel::ReceivedSyncMsgQueue :
~ReceivedSyncMsgQueue() {
CloseHandle(blocking_event_);
- ThreadLocalStorage::Set(g_tls_index, NULL);
}
// Called on IPC thread when a synchronous message or reply arrives.
@@ -252,13 +251,20 @@ SyncChannel::SyncContext::SyncContext(
// Stash a pointer to the listener thread's ReceivedSyncMsgQueue, as we
// need to be able to access it in the IPC thread.
received_sync_msgs_ = new ReceivedSyncMsgQueue();
+ // TODO(jcampan): http:///b/1319842 we are adding an extra-ref to keep this
+ // object around for the duration of the process. We used to remove it from
+ // the TLS when it was destroyed, but that was causing problems as we could
+ // be destroyed in a different thread then the thread we had been created
+ // on. This needs to be revisited at some point.
+ received_sync_msgs_->AddRef();
+
ThreadLocalStorage::Set(g_tls_index, received_sync_msgs_.get());
}
}
SyncChannel::SyncContext::~SyncContext() {
while (!deserializers_.empty())
- PopDeserializer();
+ PopDeserializer(true);
}
// Adds information about an outgoing sync message to the context so that
@@ -297,7 +303,7 @@ bool SyncChannel::SyncContext::UnblockListener(const Message* msg) {
reply_deserialize_result_ = false;
if (!deserializers_.empty()) {
reply_event = deserializers_.top().reply_event;
- PopDeserializer();
+ PopDeserializer(false);
}
} else {
if (deserializers_.empty())
@@ -317,7 +323,7 @@ bool SyncChannel::SyncContext::UnblockListener(const Message* msg) {
// Can't CloseHandle the event just yet, since doing so might cause the
// Wait call above to never return.
reply_event = deserializers_.top().reply_event;
- PopDeserializer();
+ PopDeserializer(false);
}
}
@@ -336,6 +342,10 @@ bool SyncChannel::SyncContext::UnblockListener(const Message* msg) {
// Called on the IPC thread.
void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
+ // Give the filters a chance at processing this message.
+ if (TryFilters(msg))
+ return;
+
if (UnblockListener(&msg))
return;
@@ -360,19 +370,23 @@ void SyncChannel::SyncContext::OnChannelError() {
Context::OnChannelError();
}
-void SyncChannel::SyncContext::PopDeserializer() {
- delete deserializers_.top().deserializer;
+void SyncChannel::SyncContext::PopDeserializer(bool close_reply_event) {
+ PendingSyncMsg msg = deserializers_.top();
+ delete msg.deserializer;
+ if (close_reply_event)
+ CloseHandle(msg.reply_event);
deserializers_.pop();
}
SyncChannel::SyncChannel(const std::wstring& channel_id, Channel::Mode mode,
- Channel::Listener* listener,
+ Channel::Listener* listener, MessageFilter* filter,
MessageLoop* ipc_message_loop,
- bool create_pipe_now)
- : ChannelProxy(channel_id, mode, listener, NULL, ipc_message_loop,
- new SyncContext(listener, NULL, ipc_message_loop),
+ bool create_pipe_now, HANDLE shutdown_event)
+ : ChannelProxy(channel_id, mode, ipc_message_loop,
+ new SyncContext(listener, filter, ipc_message_loop),
create_pipe_now),
- shutdown_event_(ChildProcess::GetShutDownEvent()) {
+ shutdown_event_(shutdown_event),
+ sync_messages_with_no_timeout_allowed_(true) {
DCHECK(shutdown_event_ != NULL);
}
@@ -385,11 +399,16 @@ SyncChannel::~SyncChannel() {
}
bool SyncChannel::Send(IPC::Message* message) {
+ return SendWithTimeout(message, INFINITE);
+}
+
+bool SyncChannel::SendWithTimeout(IPC::Message* message, int timeout_ms) {
bool message_is_sync = message->is_sync();
HANDLE pump_messages_event = NULL;
HANDLE reply_event = NULL;
if (message_is_sync) {
+ DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE);
IPC::SyncMessage* sync_msg = static_cast<IPC::SyncMessage*>(message);
reply_event = sync_context()->Push(sync_msg);
pump_messages_event = sync_msg->pump_messages_event();
@@ -409,24 +428,32 @@ bool SyncChannel::Send(IPC::Message* message) {
pump_messages_event};
DWORD result;
+ TimeTicks before = TimeTicks::Now();
if (pump_messages_event == NULL) {
// No need to pump messages since we didn't get an event to check.
- result = WaitForMultipleObjects(3, objects, FALSE, INFINITE);
+ result = WaitForMultipleObjects(3, objects, FALSE, timeout_ms);
} else {
// If the event is set, then we pump messages. Otherwise we also wait on
// it so that if it gets set we start pumping messages.
if (WaitForSingleObject(pump_messages_event, 0) == WAIT_OBJECT_0) {
- result = MsgWaitForMultipleObjects(3, objects, FALSE, INFINITE,
- QS_ALLINPUT);
+ // Before calling MsgWaitForMultipleObjects() we check that our events
+ // are not signaled. The windows message queue might always have events
+ // starving the checking of our events otherwise.
+ result = WaitForMultipleObjects(3, objects, FALSE, 0);
+ if (result == WAIT_TIMEOUT) {
+ result = MsgWaitForMultipleObjects(3, objects, FALSE, timeout_ms,
+ QS_ALLINPUT);
+ }
} else {
- result = WaitForMultipleObjects(4, objects, FALSE, INFINITE);
+ result = WaitForMultipleObjects(4, objects, FALSE, timeout_ms);
}
}
- if (result == WAIT_OBJECT_0) {
- // Process shut down before we can get a reply to a synchronous message.
- // Unblock the thread.
- // Leak reply_event. Since we're shutting down, it's not a big deal.
+ if (result == WAIT_OBJECT_0 || result == WAIT_TIMEOUT) {
+ // Process shut down before we can get a reply to a synchronous message,
+ // or timed-out. Unblock the thread.
+ AutoLock auto_lock(*(sync_context()->deserializers_lock()));
+ sync_context()->PopDeserializer(true);
return false;
}
@@ -458,7 +485,19 @@ bool SyncChannel::Send(IPC::Message* message) {
// MsgWaitForMultipleObjects instead.
}
- // Continue looping until we get the reply to our synchronous message.
+ if (timeout_ms != INFINITE) {
+ TimeDelta time_delta = TimeTicks::Now() - before;
+ timeout_ms -= static_cast<int>(time_delta.InMilliseconds());
+ if (timeout_ms <= 0) {
+ // We timed-out while processing messages.
+ AutoLock auto_lock(*(sync_context()->deserializers_lock()));
+ sync_context()->PopDeserializer(true);
+ return false;
+ }
+ }
+
+ // Continue looping until we either get the reply to our synchronous message
+ // or we time-out.
} while (true);
}
diff --git a/chrome/common/ipc_sync_channel.h b/chrome/common/ipc_sync_channel.h
index af6e153..bd473d4 100644
--- a/chrome/common/ipc_sync_channel.h
+++ b/chrome/common/ipc_sync_channel.h
@@ -52,13 +52,20 @@ class SyncMessage;
class SyncChannel : public ChannelProxy {
public:
SyncChannel(const std::wstring& channel_id, Channel::Mode mode,
- Channel::Listener* listener, MessageLoop* ipc_message_loop,
- bool create_pipe_now);
+ Channel::Listener* listener, MessageFilter* filter,
+ MessageLoop* ipc_message_loop, bool create_pipe_now,
+ HANDLE shutdown_handle);
~SyncChannel();
virtual bool Send(Message* message);
+ virtual bool SendWithTimeout(Message* message, int timeout_ms);
bool UnblockListener(Message* message);
+ // Whether we allow sending messages with no time-out.
+ void set_sync_messages_with_no_timeout_allowed(bool value) {
+ sync_messages_with_no_timeout_allowed_ = value;
+ }
+
protected:
class ReceivedSyncMsgQueue;
friend class ReceivedSyncMsgQueue;
@@ -95,6 +102,14 @@ class SyncChannel : public ChannelProxy {
// Otherwise the function returns false.
bool UnblockListener(const Message* msg);
+
+ // Cleanly remove the top deserializer (and throw it away).
+ // You need to acquire the deserializers_lock before calling this.
+ void PopDeserializer(bool close_reply_event);
+
+ // Returns the lock that should be acquired before calling PopDeserializer.
+ Lock* deserializers_lock() { return &deserializers_lock_; }
+
private:
void OnMessageReceived(const Message& msg);
void OnChannelError();
@@ -109,9 +124,6 @@ class SyncChannel : public ChannelProxy {
HANDLE reply_event;
};
- // Cleanly remove the top deserializer (and throw it away).
- void PopDeserializer();
-
typedef std::stack<PendingSyncMsg> PendingSyncMessageQueue;
PendingSyncMessageQueue deserializers_;
Lock deserializers_lock_;
@@ -130,6 +142,8 @@ class SyncChannel : public ChannelProxy {
std::stack<HANDLE> pump_messages_events_;
+ bool sync_messages_with_no_timeout_allowed_;
+
DISALLOW_EVIL_CONSTRUCTORS(SyncChannel);
};
diff --git a/chrome/common/ipc_sync_channel_unittest.cc b/chrome/common/ipc_sync_channel_unittest.cc
index 74f046a..78d825c 100644
--- a/chrome/common/ipc_sync_channel_unittest.cc
+++ b/chrome/common/ipc_sync_channel_unittest.cc
@@ -116,6 +116,9 @@ class Worker : public Channel::Listener, public Message::Sender {
void AddRef() { }
void Release() { }
bool Send(Message* msg) { return channel_->Send(msg); }
+ bool SendWithTimeout(Message* msg, int timeout_ms) {
+ return channel_->SendWithTimeout(msg, timeout_ms);
+ }
void WaitForChannelCreation() { channel_created_.Wait(); }
void CloseChannel() { channel_.reset(); }
void Start() {
@@ -158,7 +161,8 @@ class Worker : public Channel::Listener, public Message::Sender {
ipc_thread_.Start();
// Link ipc_thread_, listener_thread_ and channel_ altogether.
channel_.reset(new SyncChannel(
- channel_name_, mode_, this, ipc_thread_.message_loop(), true));
+ channel_name_, mode_, this, NULL, ipc_thread_.message_loop(), true,
+ TestProcess::GetShutDownEvent()));
channel_created_.Set();
Run();
}
@@ -623,3 +627,102 @@ TEST(IPCSyncChannelTest, ChattyServer) {
workers.push_back(new ChattyRecursiveClient());
RunTest(workers);
}
+
+
+//------------------------------------------------------------------------------
+class TimeoutServer : public Worker {
+ public:
+ TimeoutServer(int timeout_ms,
+ std::vector<bool> timeout_seq)
+ : Worker(Channel::MODE_SERVER, "timeout_server"),
+ timeout_ms_(timeout_ms),
+ timeout_seq_(timeout_seq) {
+ }
+
+ void Run() {
+ for (std::vector<bool>::const_iterator iter = timeout_seq_.begin();
+ iter != timeout_seq_.end(); ++iter) {
+ int answer = 0;
+ bool result =
+ SendWithTimeout(new SyncChannelTestMsg_AnswerToLife(&answer),
+ timeout_ms_);
+ if (*iter) {
+ // Time-out expected.
+ DCHECK(!result);
+ DCHECK(answer == 0);
+ } else {
+ DCHECK(result);
+ DCHECK(answer == 42);
+ }
+ }
+ Done();
+ }
+
+ private:
+ int timeout_ms_;
+ std::vector<bool> timeout_seq_;
+};
+
+class UnresponsiveClient : public Worker {
+ public:
+ UnresponsiveClient(std::vector<bool> timeout_seq)
+ : Worker(Channel::MODE_CLIENT, "unresponsive_client"),
+ timeout_seq_(timeout_seq) {
+ }
+
+ void OnAnswerDelay(Message* reply_msg) {
+ DCHECK(!timeout_seq_.empty());
+ if (!timeout_seq_[0]) {
+ SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
+ Send(reply_msg);
+ } else {
+ // Don't reply.
+ }
+ timeout_seq_.erase(timeout_seq_.begin());
+ if (timeout_seq_.empty())
+ Done();
+ }
+
+ private:
+ // Whether we should time-out or respond to the various messages we receive.
+ std::vector<bool> timeout_seq_;
+};
+
+// Tests that SendWithTimeout does not time-out if the response comes back fast
+// enough.
+TEST(IPCSyncChannelTest, SendWithTimeoutOK) {
+ std::vector<Worker*> workers;
+ std::vector<bool> timeout_seq;
+ timeout_seq.push_back(false);
+ timeout_seq.push_back(false);
+ timeout_seq.push_back(false);
+ workers.push_back(new TimeoutServer(5000, timeout_seq));
+ workers.push_back(new SimpleClient());
+ RunTest(workers);
+}
+
+// Tests that SendWithTimeout does time-out.
+TEST(IPCSyncChannelTest, SendWithTimeoutTimeout) {
+ std::vector<Worker*> workers;
+ std::vector<bool> timeout_seq;
+ timeout_seq.push_back(true);
+ timeout_seq.push_back(false);
+ timeout_seq.push_back(false);
+ workers.push_back(new TimeoutServer(100, timeout_seq));
+ workers.push_back(new UnresponsiveClient(timeout_seq));
+ RunTest(workers);
+}
+
+// Sends some message that time-out and some that succeed.
+TEST(IPCSyncChannelTest, SendWithTimeoutMixedOKAndTimeout) {
+ std::vector<Worker*> workers;
+ std::vector<bool> timeout_seq;
+ timeout_seq.push_back(true);
+ timeout_seq.push_back(false);
+ timeout_seq.push_back(false);
+ timeout_seq.push_back(true);
+ timeout_seq.push_back(false);
+ workers.push_back(new TimeoutServer(100, timeout_seq));
+ workers.push_back(new UnresponsiveClient(timeout_seq));
+ RunTest(workers);
+}
diff --git a/chrome/common/ipc_sync_message.cc b/chrome/common/ipc_sync_message.cc
index 43d6d5e..8dc75c4 100644
--- a/chrome/common/ipc_sync_message.cc
+++ b/chrome/common/ipc_sync_message.cc
@@ -38,6 +38,8 @@ namespace IPC {
uint32 SyncMessage::next_id_ = 0;
#define kSyncMessageHeaderSize 4
+// A dummy handle used by EnableMessagePumping.
+HANDLE dummy_event = ::CreateEvent(NULL, TRUE, TRUE, NULL);
SyncMessage::SyncMessage(
int32 routing_id,
@@ -63,6 +65,11 @@ MessageReplyDeserializer* SyncMessage::GetReplyDeserializer() {
return rv;
}
+void SyncMessage::EnableMessagePumping() {
+ DCHECK(!pump_messages_event_);
+ set_pump_messages_event(dummy_event);
+}
+
bool SyncMessage::IsMessageReplyTo(const Message& msg, int request_id) {
if (!msg.is_reply())
return false;
diff --git a/chrome/common/ipc_sync_message.h b/chrome/common/ipc_sync_message.h
index bcf47cbe..8ab6cae 100644
--- a/chrome/common/ipc_sync_message.h
+++ b/chrome/common/ipc_sync_message.h
@@ -63,6 +63,10 @@ class SyncMessage : public Message {
}
}
+ // Call this if you always want to pump messages. You can call this method
+ // or set_pump_messages_event but not both.
+ void EnableMessagePumping();
+
HANDLE pump_messages_event() const { return pump_messages_event_; }
// Returns true if the message is a reply to the given request id.