From ce283b206e338af34a49fa353e39802dc2cfc73d Mon Sep 17 00:00:00 2001 From: yzshen Date: Wed, 23 Mar 2016 14:56:57 -0700 Subject: Mojo C++ bindings: some MultiplexRouter optimization. This CL: - reduces post tasks; - reduces map lookups; - removes unnecessary lock and state check when sending messages. BUG= Review URL: https://codereview.chromium.org/1831513002 Cr-Commit-Position: refs/heads/master@{#382933} --- mojo/public/cpp/bindings/lib/multiplex_router.cc | 121 ++++++++++++++--------- mojo/public/cpp/bindings/lib/multiplex_router.h | 14 ++- 2 files changed, 86 insertions(+), 49 deletions(-) diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc index ca22e86..b9e086c 100644 --- a/mojo/public/cpp/bindings/lib/multiplex_router.cc +++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc @@ -11,6 +11,7 @@ #include "base/bind.h" #include "base/macros.h" #include "base/message_loop/message_loop.h" +#include "base/single_thread_task_runner.h" #include "base/stl_util.h" #include "mojo/public/cpp/bindings/associated_group.h" #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" @@ -48,8 +49,8 @@ class MultiplexRouter::InterfaceEndpoint peer_closed_ = true; } - const scoped_refptr task_runner() const { - return task_runner_; + base::SingleThreadTaskRunner* task_runner() const { + return task_runner_.get(); } void set_task_runner( scoped_refptr task_runner) { @@ -128,6 +129,7 @@ MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, control_message_handler_(this), control_message_proxy_(&connector_), next_interface_id_value_(1), + posted_to_process_tasks_(false), testing_mode_(false) { connector_.set_incoming_receiver(&header_validator_); connector_.set_connection_error_handler( @@ -180,17 +182,16 @@ ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( return ScopedInterfaceEndpointHandle(); base::AutoLock locker(lock_); - if (ContainsKey(endpoints_, id)) { + bool inserted = false; + InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); + if (inserted) { + if (encountered_error_) + UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); + } else { // If the endpoint already exist, it is because we have received a // notification that the peer endpoint has closed. - InterfaceEndpoint* endpoint = endpoints_[id].get(); CHECK(!endpoint->closed()); CHECK(endpoint->peer_closed()); - } else { - InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); - endpoints_[id] = endpoint; - if (encountered_error_) - UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); } return ScopedInterfaceEndpointHandle(id, true, this); } @@ -266,17 +267,7 @@ void MultiplexRouter::DetachEndpointClient( bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, Message* message) { - const InterfaceId id = handle.id(); - - base::AutoLock locker(lock_); - if (!ContainsKey(endpoints_, id)) - return false; - - InterfaceEndpoint* endpoint = endpoints_[id].get(); - if (endpoint->peer_closed()) - return false; - - message->set_interface_id(id); + message->set_interface_id(handle.id()); return connector_.Accept(message); } @@ -325,8 +316,19 @@ bool MultiplexRouter::Accept(Message* message) { scoped_refptr protector(this); base::AutoLock locker(lock_); - tasks_.push_back(Task::CreateIncomingMessageTask(message)); - ProcessTasks(false); + + bool processed = tasks_.empty() && ProcessIncomingMessage(message, false); + + if (!processed) { + // Either the task queue is not empty or we cannot process the message + // directly. In both cases, there is no need to call ProcessTasks(). + tasks_.push_back(Task::CreateIncomingMessageTask(message)); + } else if (!tasks_.empty()) { + // Processing the message may result in new tasks (for error notification) + // being added to the queue. In this case, we have to attempt to process the + // tasks. + ProcessTasks(false); + } // Always return true. If we see errors during message processing, we will // explicitly call Connector::RaiseError() to disconnect the message pipe. @@ -339,10 +341,7 @@ bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { if (IsMasterInterfaceId(id)) return false; - if (!ContainsKey(endpoints_, id)) - endpoints_[id] = new InterfaceEndpoint(this, id); - - InterfaceEndpoint* endpoint = endpoints_[id].get(); + InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); DCHECK(!endpoint->peer_closed()); if (endpoint->client()) @@ -360,10 +359,7 @@ bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { if (IsMasterInterfaceId(id)) return false; - if (!ContainsKey(endpoints_, id)) - endpoints_[id] = new InterfaceEndpoint(this, id); - - InterfaceEndpoint* endpoint = endpoints_[id].get(); + InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); DCHECK(!endpoint->closed()); UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); @@ -398,13 +394,17 @@ void MultiplexRouter::OnPipeConnectionError() { void MultiplexRouter::ProcessTasks(bool force_async) { lock_.AssertAcquired(); + if (posted_to_process_tasks_) + return; + while (!tasks_.empty()) { scoped_ptr task(std::move(tasks_.front())); tasks_.pop_front(); - bool processed = task->IsNotifyErrorTask() - ? ProcessNotifyErrorTask(task.get(), force_async) - : ProcessIncomingMessageTask(task.get(), force_async); + bool processed = + task->IsNotifyErrorTask() + ? ProcessNotifyErrorTask(task.get(), force_async) + : ProcessIncomingMessage(task->message.get(), force_async); if (!processed) { tasks_.push_front(std::move(task)); @@ -420,8 +420,7 @@ bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { return true; if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { - endpoint->task_runner()->PostTask( - FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); + MaybePostToProcessTasks(endpoint->task_runner()); return false; } @@ -438,10 +437,9 @@ bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { return true; } -bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) { +bool MultiplexRouter::ProcessIncomingMessage(Message* message, + bool force_async) { lock_.AssertAcquired(); - Message* message = task->message.get(); - if (PipeControlMessageHandler::IsPipeControlMessage(message)) { if (!control_message_handler_.Accept(message)) RaiseErrorInNonTestingMode(); @@ -451,7 +449,9 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) { InterfaceId id = message->interface_id(); DCHECK(IsValidInterfaceId(id)); - if (!ContainsKey(endpoints_, id)) { + bool inserted = false; + InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); + if (inserted) { DCHECK(!IsMasterInterfaceId(id)); // Currently, it is legitimate to receive messages for an endpoint @@ -459,15 +459,12 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) { // a message that is discarded. Once we add support to specify all // enclosing endpoints in message header, we should be able to remove // this. - InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); - endpoints_[id] = endpoint; UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); control_message_proxy_.NotifyPeerEndpointClosed(id); return true; } - InterfaceEndpoint* endpoint = endpoints_[id].get(); if (endpoint->closed()) return true; @@ -478,13 +475,11 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) { } if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { - endpoint->task_runner()->PostTask( - FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); + MaybePostToProcessTasks(endpoint->task_runner()); return false; } InterfaceEndpointClient* client = endpoint->client(); - scoped_ptr owned_message = std::move(task->message); bool result = false; { // We must unlock before calling into |client| because it may call this @@ -494,7 +489,7 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) { // It is safe to call into |client| without the lock. Because |client| is // always accessed on the same thread, including DetachEndpointClient(). base::AutoUnlock unlocker(lock_); - result = client->HandleIncomingMessage(owned_message.get()); + result = client->HandleIncomingMessage(message); } if (!result) RaiseErrorInNonTestingMode(); @@ -502,10 +497,22 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) { return true; } +void MultiplexRouter::MaybePostToProcessTasks( + base::SingleThreadTaskRunner* task_runner) { + lock_.AssertAcquired(); + if (posted_to_process_tasks_) + return; + + posted_to_process_tasks_ = true; + task_runner->PostTask( + FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); +} + void MultiplexRouter::LockAndCallProcessTasks() { // There is no need to hold a ref to this class in this case because this is // always called using base::Bind(), which holds a ref. base::AutoLock locker(lock_); + posted_to_process_tasks_ = false; ProcessTasks(false); } @@ -530,5 +537,27 @@ void MultiplexRouter::RaiseErrorInNonTestingMode() { RaiseError(); } +MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( + InterfaceId id, + bool* inserted) { + lock_.AssertAcquired(); + // Either |inserted| is nullptr or it points to a boolean initialized as + // false. + DCHECK(!inserted || !*inserted); + + auto iter = endpoints_.find(id); + InterfaceEndpoint* endpoint; + if (iter == endpoints_.end()) { + endpoint = new InterfaceEndpoint(this, id); + endpoints_[id] = endpoint; + if (inserted) + *inserted = true; + } else { + endpoint = iter->second.get(); + } + + return endpoint; +} + } // namespace internal } // namespace mojo diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.h b/mojo/public/cpp/bindings/lib/multiplex_router.h index f66d202..a8bf9b5 100644 --- a/mojo/public/cpp/bindings/lib/multiplex_router.h +++ b/mojo/public/cpp/bindings/lib/multiplex_router.h @@ -27,6 +27,10 @@ #include "mojo/public/cpp/bindings/lib/pipe_control_message_proxy.h" #include "mojo/public/cpp/bindings/lib/scoped_interface_endpoint_handle.h" +namespace base { +class SingleThreadTaskRunner; +} + namespace mojo { class AssociatedGroup; @@ -172,11 +176,11 @@ class MultiplexRouter // on to a ref outside of |lock_| before calling this method. void ProcessTasks(bool force_async); - // Returns true to indicate that |task| has been processed. Otherwise the task - // will be added back to the front of the queue. + // Returns true to indicate that |task|/|message| has been processed. bool ProcessNotifyErrorTask(Task* task, bool force_async); - bool ProcessIncomingMessageTask(Task* task, bool force_async); + bool ProcessIncomingMessage(Message* message, bool force_async); + void MaybePostToProcessTasks(base::SingleThreadTaskRunner* task_runner); void LockAndCallProcessTasks(); // Updates the state of |endpoint|. If both the endpoint and its peer have @@ -188,6 +192,8 @@ class MultiplexRouter void RaiseErrorInNonTestingMode(); + InterfaceEndpoint* FindOrInsertEndpoint(InterfaceId id, bool* inserted); + // Whether to set the namespace bit when generating interface IDs. Please see // comments of kInterfaceIdNamespaceMask. const bool set_interface_id_namespace_bit_; @@ -208,6 +214,8 @@ class MultiplexRouter std::deque> tasks_; + bool posted_to_process_tasks_; + bool testing_mode_; DISALLOW_COPY_AND_ASSIGN(MultiplexRouter); -- cgit v1.1