summaryrefslogtreecommitdiffstats
path: root/mojo
diff options
context:
space:
mode:
authoryzshen <yzshen@chromium.org>2016-03-23 14:56:57 -0700
committerCommit bot <commit-bot@chromium.org>2016-03-23 21:58:07 +0000
commitce283b206e338af34a49fa353e39802dc2cfc73d (patch)
tree65a9a7525e175ba7430cedbf792a0d724aa14376 /mojo
parentb84d29c79dfa3bfe1c3489d2c9785dbc813058b4 (diff)
downloadchromium_src-ce283b206e338af34a49fa353e39802dc2cfc73d.zip
chromium_src-ce283b206e338af34a49fa353e39802dc2cfc73d.tar.gz
chromium_src-ce283b206e338af34a49fa353e39802dc2cfc73d.tar.bz2
Mojo C++ bindings: some MultiplexRouter optimization.
This CL: - reduces post tasks; - reduces map lookups; - removes unnecessary lock and state check when sending messages. BUG= Review URL: https://codereview.chromium.org/1831513002 Cr-Commit-Position: refs/heads/master@{#382933}
Diffstat (limited to 'mojo')
-rw-r--r--mojo/public/cpp/bindings/lib/multiplex_router.cc121
-rw-r--r--mojo/public/cpp/bindings/lib/multiplex_router.h14
2 files changed, 86 insertions, 49 deletions
diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc
index ca22e86..b9e086c 100644
--- a/mojo/public/cpp/bindings/lib/multiplex_router.cc
+++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc
@@ -11,6 +11,7 @@
#include "base/bind.h"
#include "base/macros.h"
#include "base/message_loop/message_loop.h"
+#include "base/single_thread_task_runner.h"
#include "base/stl_util.h"
#include "mojo/public/cpp/bindings/associated_group.h"
#include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h"
@@ -48,8 +49,8 @@ class MultiplexRouter::InterfaceEndpoint
peer_closed_ = true;
}
- const scoped_refptr<base::SingleThreadTaskRunner> task_runner() const {
- return task_runner_;
+ base::SingleThreadTaskRunner* task_runner() const {
+ return task_runner_.get();
}
void set_task_runner(
scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
@@ -128,6 +129,7 @@ MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit,
control_message_handler_(this),
control_message_proxy_(&connector_),
next_interface_id_value_(1),
+ posted_to_process_tasks_(false),
testing_mode_(false) {
connector_.set_incoming_receiver(&header_validator_);
connector_.set_connection_error_handler(
@@ -180,17 +182,16 @@ ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
return ScopedInterfaceEndpointHandle();
base::AutoLock locker(lock_);
- if (ContainsKey(endpoints_, id)) {
+ bool inserted = false;
+ InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
+ if (inserted) {
+ if (encountered_error_)
+ UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
+ } else {
// If the endpoint already exist, it is because we have received a
// notification that the peer endpoint has closed.
- InterfaceEndpoint* endpoint = endpoints_[id].get();
CHECK(!endpoint->closed());
CHECK(endpoint->peer_closed());
- } else {
- InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
- endpoints_[id] = endpoint;
- if (encountered_error_)
- UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
}
return ScopedInterfaceEndpointHandle(id, true, this);
}
@@ -266,17 +267,7 @@ void MultiplexRouter::DetachEndpointClient(
bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle,
Message* message) {
- const InterfaceId id = handle.id();
-
- base::AutoLock locker(lock_);
- if (!ContainsKey(endpoints_, id))
- return false;
-
- InterfaceEndpoint* endpoint = endpoints_[id].get();
- if (endpoint->peer_closed())
- return false;
-
- message->set_interface_id(id);
+ message->set_interface_id(handle.id());
return connector_.Accept(message);
}
@@ -325,8 +316,19 @@ bool MultiplexRouter::Accept(Message* message) {
scoped_refptr<MultiplexRouter> protector(this);
base::AutoLock locker(lock_);
- tasks_.push_back(Task::CreateIncomingMessageTask(message));
- ProcessTasks(false);
+
+ bool processed = tasks_.empty() && ProcessIncomingMessage(message, false);
+
+ if (!processed) {
+ // Either the task queue is not empty or we cannot process the message
+ // directly. In both cases, there is no need to call ProcessTasks().
+ tasks_.push_back(Task::CreateIncomingMessageTask(message));
+ } else if (!tasks_.empty()) {
+ // Processing the message may result in new tasks (for error notification)
+ // being added to the queue. In this case, we have to attempt to process the
+ // tasks.
+ ProcessTasks(false);
+ }
// Always return true. If we see errors during message processing, we will
// explicitly call Connector::RaiseError() to disconnect the message pipe.
@@ -339,10 +341,7 @@ bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) {
if (IsMasterInterfaceId(id))
return false;
- if (!ContainsKey(endpoints_, id))
- endpoints_[id] = new InterfaceEndpoint(this, id);
-
- InterfaceEndpoint* endpoint = endpoints_[id].get();
+ InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
DCHECK(!endpoint->peer_closed());
if (endpoint->client())
@@ -360,10 +359,7 @@ bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) {
if (IsMasterInterfaceId(id))
return false;
- if (!ContainsKey(endpoints_, id))
- endpoints_[id] = new InterfaceEndpoint(this, id);
-
- InterfaceEndpoint* endpoint = endpoints_[id].get();
+ InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
DCHECK(!endpoint->closed());
UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
@@ -398,13 +394,17 @@ void MultiplexRouter::OnPipeConnectionError() {
void MultiplexRouter::ProcessTasks(bool force_async) {
lock_.AssertAcquired();
+ if (posted_to_process_tasks_)
+ return;
+
while (!tasks_.empty()) {
scoped_ptr<Task> task(std::move(tasks_.front()));
tasks_.pop_front();
- bool processed = task->IsNotifyErrorTask()
- ? ProcessNotifyErrorTask(task.get(), force_async)
- : ProcessIncomingMessageTask(task.get(), force_async);
+ bool processed =
+ task->IsNotifyErrorTask()
+ ? ProcessNotifyErrorTask(task.get(), force_async)
+ : ProcessIncomingMessage(task->message.get(), force_async);
if (!processed) {
tasks_.push_front(std::move(task));
@@ -420,8 +420,7 @@ bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) {
return true;
if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) {
- endpoint->task_runner()->PostTask(
- FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
+ MaybePostToProcessTasks(endpoint->task_runner());
return false;
}
@@ -438,10 +437,9 @@ bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) {
return true;
}
-bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) {
+bool MultiplexRouter::ProcessIncomingMessage(Message* message,
+ bool force_async) {
lock_.AssertAcquired();
- Message* message = task->message.get();
-
if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
if (!control_message_handler_.Accept(message))
RaiseErrorInNonTestingMode();
@@ -451,7 +449,9 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) {
InterfaceId id = message->interface_id();
DCHECK(IsValidInterfaceId(id));
- if (!ContainsKey(endpoints_, id)) {
+ bool inserted = false;
+ InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
+ if (inserted) {
DCHECK(!IsMasterInterfaceId(id));
// Currently, it is legitimate to receive messages for an endpoint
@@ -459,15 +459,12 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) {
// a message that is discarded. Once we add support to specify all
// enclosing endpoints in message header, we should be able to remove
// this.
- InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
- endpoints_[id] = endpoint;
UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
control_message_proxy_.NotifyPeerEndpointClosed(id);
return true;
}
- InterfaceEndpoint* endpoint = endpoints_[id].get();
if (endpoint->closed())
return true;
@@ -478,13 +475,11 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) {
}
if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) {
- endpoint->task_runner()->PostTask(
- FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
+ MaybePostToProcessTasks(endpoint->task_runner());
return false;
}
InterfaceEndpointClient* client = endpoint->client();
- scoped_ptr<Message> owned_message = std::move(task->message);
bool result = false;
{
// We must unlock before calling into |client| because it may call this
@@ -494,7 +489,7 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) {
// It is safe to call into |client| without the lock. Because |client| is
// always accessed on the same thread, including DetachEndpointClient().
base::AutoUnlock unlocker(lock_);
- result = client->HandleIncomingMessage(owned_message.get());
+ result = client->HandleIncomingMessage(message);
}
if (!result)
RaiseErrorInNonTestingMode();
@@ -502,10 +497,22 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) {
return true;
}
+void MultiplexRouter::MaybePostToProcessTasks(
+ base::SingleThreadTaskRunner* task_runner) {
+ lock_.AssertAcquired();
+ if (posted_to_process_tasks_)
+ return;
+
+ posted_to_process_tasks_ = true;
+ task_runner->PostTask(
+ FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
+}
+
void MultiplexRouter::LockAndCallProcessTasks() {
// There is no need to hold a ref to this class in this case because this is
// always called using base::Bind(), which holds a ref.
base::AutoLock locker(lock_);
+ posted_to_process_tasks_ = false;
ProcessTasks(false);
}
@@ -530,5 +537,27 @@ void MultiplexRouter::RaiseErrorInNonTestingMode() {
RaiseError();
}
+MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
+ InterfaceId id,
+ bool* inserted) {
+ lock_.AssertAcquired();
+ // Either |inserted| is nullptr or it points to a boolean initialized as
+ // false.
+ DCHECK(!inserted || !*inserted);
+
+ auto iter = endpoints_.find(id);
+ InterfaceEndpoint* endpoint;
+ if (iter == endpoints_.end()) {
+ endpoint = new InterfaceEndpoint(this, id);
+ endpoints_[id] = endpoint;
+ if (inserted)
+ *inserted = true;
+ } else {
+ endpoint = iter->second.get();
+ }
+
+ return endpoint;
+}
+
} // namespace internal
} // namespace mojo
diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.h b/mojo/public/cpp/bindings/lib/multiplex_router.h
index f66d202..a8bf9b5 100644
--- a/mojo/public/cpp/bindings/lib/multiplex_router.h
+++ b/mojo/public/cpp/bindings/lib/multiplex_router.h
@@ -27,6 +27,10 @@
#include "mojo/public/cpp/bindings/lib/pipe_control_message_proxy.h"
#include "mojo/public/cpp/bindings/lib/scoped_interface_endpoint_handle.h"
+namespace base {
+class SingleThreadTaskRunner;
+}
+
namespace mojo {
class AssociatedGroup;
@@ -172,11 +176,11 @@ class MultiplexRouter
// on to a ref outside of |lock_| before calling this method.
void ProcessTasks(bool force_async);
- // Returns true to indicate that |task| has been processed. Otherwise the task
- // will be added back to the front of the queue.
+ // Returns true to indicate that |task|/|message| has been processed.
bool ProcessNotifyErrorTask(Task* task, bool force_async);
- bool ProcessIncomingMessageTask(Task* task, bool force_async);
+ bool ProcessIncomingMessage(Message* message, bool force_async);
+ void MaybePostToProcessTasks(base::SingleThreadTaskRunner* task_runner);
void LockAndCallProcessTasks();
// Updates the state of |endpoint|. If both the endpoint and its peer have
@@ -188,6 +192,8 @@ class MultiplexRouter
void RaiseErrorInNonTestingMode();
+ InterfaceEndpoint* FindOrInsertEndpoint(InterfaceId id, bool* inserted);
+
// Whether to set the namespace bit when generating interface IDs. Please see
// comments of kInterfaceIdNamespaceMask.
const bool set_interface_id_namespace_bit_;
@@ -208,6 +214,8 @@ class MultiplexRouter
std::deque<scoped_ptr<Task>> tasks_;
+ bool posted_to_process_tasks_;
+
bool testing_mode_;
DISALLOW_COPY_AND_ASSIGN(MultiplexRouter);