diff options
author | jam@chromium.org <jam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-12-02 19:16:07 +0000 |
---|---|---|
committer | jam@chromium.org <jam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-12-02 19:16:07 +0000 |
commit | 4b580bf3020b1e0eaf5b7efad50896b4c62474c5 (patch) | |
tree | 94f893bf3422dccda5e4bc05e4f8b8cca0d62638 /ipc | |
parent | e669998f4edaa156aafbe2fb93b3e3dae1ebd06c (diff) | |
download | chromium_src-4b580bf3020b1e0eaf5b7efad50896b4c62474c5.zip chromium_src-4b580bf3020b1e0eaf5b7efad50896b4c62474c5.tar.gz chromium_src-4b580bf3020b1e0eaf5b7efad50896b4c62474c5.tar.bz2 |
Add a base class for objects that want to filter messages on the IO thread. I'll switch the filters to it in future separate changes.
I've also taken out the special case for an initial filter from the IPC classes. The reason it existed was that there was a race condition of some messages not being filtered if a filter is added after construction but before launching the peer process. Taking it out allows us to add more than one filter and makes things a little cleaner.
Review URL: http://codereview.chromium.org/5513001
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@68043 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'ipc')
-rw-r--r-- | ipc/ipc_channel_proxy.cc | 61 | ||||
-rw-r--r-- | ipc/ipc_channel_proxy.h | 32 | ||||
-rw-r--r-- | ipc/ipc_sync_channel.cc | 13 | ||||
-rw-r--r-- | ipc/ipc_sync_channel.h | 9 | ||||
-rw-r--r-- | ipc/ipc_sync_channel_unittest.cc | 2 | ||||
-rw-r--r-- | ipc/ipc_tests.cc | 2 |
6 files changed, 79 insertions, 40 deletions
diff --git a/ipc/ipc_channel_proxy.cc b/ipc/ipc_channel_proxy.cc index f05ae485..8450c28 100644 --- a/ipc/ipc_channel_proxy.cc +++ b/ipc/ipc_channel_proxy.cc @@ -61,7 +61,6 @@ void ChannelProxy::MessageFilter::OnDestruct() const { //------------------------------------------------------------------------------ ChannelProxy::Context::Context(Channel::Listener* listener, - MessageFilter* filter, MessageLoop* ipc_message_loop) : listener_message_loop_(MessageLoop::current()), listener_(listener), @@ -69,8 +68,6 @@ ChannelProxy::Context::Context(Channel::Listener* listener, channel_(NULL), peer_pid_(0), channel_connected_called_(false) { - if (filter) - filters_.push_back(make_scoped_refptr(filter)); } void ChannelProxy::Context::CreateChannel(const std::string& id, @@ -118,6 +115,12 @@ void ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) { // Called on the IPC::Channel thread void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) { + // Add any pending filters. This avoids a race condition where someone + // creates a ChannelProxy, calls AddFilter, and then right after starts the + // peer process. The IO thread could receive a message before the task to add + // the filter is run on the IO thread. + OnAddFilter(); + peer_pid_ = peer_pid; for (size_t i = 0; i < filters_.size(); ++i) filters_[i]->OnChannelConnected(peer_pid); @@ -189,13 +192,24 @@ void ChannelProxy::Context::OnSendMessage(Message* message) { } // Called on the IPC::Channel thread -void ChannelProxy::Context::OnAddFilter(MessageFilter* filter) { - filters_.push_back(make_scoped_refptr(filter)); +void ChannelProxy::Context::OnAddFilter() { + std::vector<scoped_refptr<MessageFilter> > filters; + { + AutoLock auto_lock(pending_filters_lock_); + filters.swap(pending_filters_); + } + + for (size_t i = 0; i < filters.size(); ++i) { + filters_.push_back(filters[i]); - // If the channel has already been created, then we need to send this message - // so that the filter gets access to the Channel. - if (channel_) - filter->OnFilterAdded(channel_); + // If the channel has already been created, then we need to send this + // message so that the filter gets access to the Channel. + if (channel_) + filters[i]->OnFilterAdded(channel_); + // Ditto for the peer process id. + if (peer_pid_) + filters[i]->OnChannelConnected(peer_pid_); + } } // Called on the IPC::Channel thread @@ -212,6 +226,15 @@ void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) { } // Called on the listener's thread +void ChannelProxy::Context::AddFilter(MessageFilter* filter) { + AutoLock auto_lock(pending_filters_lock_); + pending_filters_.push_back(make_scoped_refptr(filter)); + ipc_message_loop_->PostTask( + FROM_HERE, + NewRunnableMethod(this, &Context::OnAddFilter)); +} + +// Called on the listener's thread void ChannelProxy::Context::OnDispatchMessage(const Message& message) { if (!listener_) return; @@ -255,15 +278,18 @@ void ChannelProxy::Context::OnDispatchError() { //----------------------------------------------------------------------------- -ChannelProxy::ChannelProxy(const std::string& channel_id, Channel::Mode mode, - Channel::Listener* listener, MessageFilter* filter, +ChannelProxy::ChannelProxy(const std::string& channel_id, + Channel::Mode mode, + Channel::Listener* listener, MessageLoop* ipc_thread) - : context_(new Context(listener, filter, ipc_thread)) { + : context_(new Context(listener, ipc_thread)) { Init(channel_id, mode, ipc_thread, true); } -ChannelProxy::ChannelProxy(const std::string& channel_id, Channel::Mode mode, - MessageLoop* ipc_thread, Context* context, +ChannelProxy::ChannelProxy(const std::string& channel_id, + Channel::Mode mode, + MessageLoop* ipc_thread, + Context* context, bool create_pipe_now) : context_(context) { Init(channel_id, mode, ipc_thread, create_pipe_now); @@ -314,12 +340,7 @@ bool ChannelProxy::Send(Message* message) { } void ChannelProxy::AddFilter(MessageFilter* filter) { - context_->ipc_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod( - context_.get(), - &Context::OnAddFilter, - make_scoped_refptr(filter))); + context_->AddFilter(filter); } void ChannelProxy::RemoveFilter(MessageFilter* filter) { diff --git a/ipc/ipc_channel_proxy.h b/ipc/ipc_channel_proxy.h index 53a39b4..a85841d 100644 --- a/ipc/ipc_channel_proxy.h +++ b/ipc/ipc_channel_proxy.h @@ -8,6 +8,7 @@ #include <vector> +#include "base/lock.h" #include "base/ref_counted.h" #include "ipc/ipc_channel.h" @@ -104,8 +105,9 @@ class ChannelProxy : public Message::Sender { // on the background thread. Any message not handled by the filter will be // dispatched to the listener. The given message loop indicates where the // IPC::Channel should be created. - ChannelProxy(const std::string& channel_id, Channel::Mode mode, - Channel::Listener* listener, MessageFilter* filter, + ChannelProxy(const std::string& channel_id, + Channel::Mode mode, + Channel::Listener* listener, MessageLoop* ipc_thread_loop); virtual ~ChannelProxy(); @@ -129,6 +131,10 @@ class ChannelProxy : public Message::Sender { // Ordinarily, messages sent to the ChannelProxy are routed to the matching // listener on the worker thread. This API allows code to intercept messages // before they are sent to the worker thread. + // If you call this before the target process is launched, then you're + // guaranteed to not miss any messages. But if you call this anytime after, + // then some messages might be missed since the filter is added internally on + // the IO thread. void AddFilter(MessageFilter* filter); void RemoveFilter(MessageFilter* filter); @@ -147,16 +153,17 @@ class ChannelProxy : public Message::Sender { // A subclass uses this constructor if it needs to add more information // to the internal state. If create_pipe_now is true, the pipe is created // immediately. Otherwise it's created on the IO thread. - ChannelProxy(const std::string& channel_id, Channel::Mode mode, - MessageLoop* ipc_thread_loop, Context* context, + ChannelProxy(const std::string& channel_id, + Channel::Mode mode, + MessageLoop* ipc_thread_loop, + Context* context, bool create_pipe_now); // Used internally to hold state that is referenced on the IPC thread. class Context : public base::RefCountedThreadSafe<Context>, public Channel::Listener { public: - Context(Channel::Listener* listener, MessageFilter* filter, - MessageLoop* ipc_thread); + Context(Channel::Listener* listener, MessageLoop* ipc_thread); void ClearIPCMessageLoop() { ipc_message_loop_ = NULL; } MessageLoop* ipc_message_loop() const { return ipc_message_loop_; } const std::string& channel_id() const { return channel_id_; } @@ -196,10 +203,13 @@ class ChannelProxy : public Message::Sender { // Create the Channel void CreateChannel(const std::string& id, const Channel::Mode& mode); - // Methods called via InvokeLater: + // Methods called on the IO thread. void OnSendMessage(Message* message_ptr); - void OnAddFilter(MessageFilter* filter); + void OnAddFilter(); void OnRemoveFilter(MessageFilter* filter); + + // Methods called on the listener thread. + void AddFilter(MessageFilter* filter); void OnDispatchConnected(); void OnDispatchError(); @@ -213,6 +223,12 @@ class ChannelProxy : public Message::Sender { std::string channel_id_; int peer_pid_; bool channel_connected_called_; + + // Holds filters between the AddFilter call on the listerner thread and the + // IPC thread when they're added to filters_. + std::vector<scoped_refptr<MessageFilter> > pending_filters_; + // Lock for pending_filters_. + Lock pending_filters_lock_; }; Context* context() { return context_; } diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc index 8598c9c..e77846c 100644 --- a/ipc/ipc_sync_channel.cc +++ b/ipc/ipc_sync_channel.cc @@ -200,10 +200,9 @@ base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > SyncChannel::SyncContext::SyncContext( Channel::Listener* listener, - MessageFilter* filter, MessageLoop* ipc_thread, WaitableEvent* shutdown_event) - : ChannelProxy::Context(listener, filter, ipc_thread), + : ChannelProxy::Context(listener, ipc_thread), received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), shutdown_event_(shutdown_event) { } @@ -356,13 +355,15 @@ void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) { SyncChannel::SyncChannel( - const std::string& channel_id, Channel::Mode mode, - Channel::Listener* listener, MessageFilter* filter, - MessageLoop* ipc_message_loop, bool create_pipe_now, + const std::string& channel_id, + Channel::Mode mode, + Channel::Listener* listener, + MessageLoop* ipc_message_loop, + bool create_pipe_now, WaitableEvent* shutdown_event) : ChannelProxy( channel_id, mode, ipc_message_loop, - new SyncContext(listener, filter, ipc_message_loop, shutdown_event), + new SyncContext(listener, ipc_message_loop, shutdown_event), create_pipe_now), sync_messages_with_no_timeout_allowed_(true) { // Ideally we only want to watch this object when running a nested message diff --git a/ipc/ipc_sync_channel.h b/ipc/ipc_sync_channel.h index 713b868..3435042 100644 --- a/ipc/ipc_sync_channel.h +++ b/ipc/ipc_sync_channel.h @@ -34,9 +34,11 @@ class MessageReplyDeserializer; class SyncChannel : public ChannelProxy, public base::WaitableEventWatcher::Delegate { public: - SyncChannel(const std::string& channel_id, Channel::Mode mode, - Channel::Listener* listener, MessageFilter* filter, - MessageLoop* ipc_message_loop, bool create_pipe_now, + SyncChannel(const std::string& channel_id, + Channel::Mode mode, + Channel::Listener* listener, + MessageLoop* ipc_message_loop, + bool create_pipe_now, base::WaitableEvent* shutdown_event); virtual ~SyncChannel(); @@ -59,7 +61,6 @@ class SyncChannel : public ChannelProxy, public base::WaitableEventWatcher::Delegate { public: SyncContext(Channel::Listener* listener, - MessageFilter* filter, MessageLoop* ipc_thread, base::WaitableEvent* shutdown_event); diff --git a/ipc/ipc_sync_channel_unittest.cc b/ipc/ipc_sync_channel_unittest.cc index 15d5a38..bf21917 100644 --- a/ipc/ipc_sync_channel_unittest.cc +++ b/ipc/ipc_sync_channel_unittest.cc @@ -170,7 +170,7 @@ class Worker : public Channel::Listener, public Message::Sender { // Link ipc_thread_, listener_thread_ and channel_ altogether. StartThread(&ipc_thread_, MessageLoop::TYPE_IO); channel_.reset(new SyncChannel( - channel_name_, mode_, this, NULL, ipc_thread_.message_loop(), true, + channel_name_, mode_, this, ipc_thread_.message_loop(), true, &shutdown_event_)); channel_created_->Signal(); Run(); diff --git a/ipc/ipc_tests.cc b/ipc/ipc_tests.cc index aa30182..aee01f5 100644 --- a/ipc/ipc_tests.cc +++ b/ipc/ipc_tests.cc @@ -250,7 +250,7 @@ TEST_F(IPCChannelTest, ChannelProxyTest) { { // setup IPC channel proxy IPC::ChannelProxy chan(kTestClientChannel, IPC::Channel::MODE_SERVER, - &channel_listener, NULL, thread.message_loop()); + &channel_listener, thread.message_loop()); channel_listener.Init(&chan); |