summaryrefslogtreecommitdiffstats
path: root/mojo/public/cpp/bindings/lib/router.cc
diff options
context:
space:
mode:
Diffstat (limited to 'mojo/public/cpp/bindings/lib/router.cc')
-rw-r--r--mojo/public/cpp/bindings/lib/router.cc128
1 files changed, 28 insertions, 100 deletions
diff --git a/mojo/public/cpp/bindings/lib/router.cc b/mojo/public/cpp/bindings/lib/router.cc
index a78cdc7..9bc9aa1 100644
--- a/mojo/public/cpp/bindings/lib/router.cc
+++ b/mojo/public/cpp/bindings/lib/router.cc
@@ -7,10 +7,7 @@
#include <stdint.h>
#include <utility>
-#include "base/bind.h"
#include "base/logging.h"
-#include "base/message_loop/message_loop.h"
-#include "base/stl_util.h"
namespace mojo {
namespace internal {
@@ -63,13 +60,6 @@ class ResponderThunk : public MessageReceiverWithStatus {
// ----------------------------------------------------------------------------
-Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received)
- : response_received(in_response_received) {}
-
-Router::SyncResponseInfo::~SyncResponseInfo() {}
-
-// ----------------------------------------------------------------------------
-
Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router)
: router_(router) {
}
@@ -85,7 +75,6 @@ bool Router::HandleIncomingMessageThunk::Accept(Message* message) {
Router::Router(ScopedMessagePipeHandle message_pipe,
FilterChain filters,
- bool expects_sync_requests,
const MojoAsyncWaiter* waiter)
: thunk_(this),
filters_(std::move(filters)),
@@ -95,15 +84,19 @@ Router::Router(ScopedMessagePipeHandle message_pipe,
incoming_receiver_(nullptr),
next_request_id_(0),
testing_mode_(false),
- pending_task_for_messages_(false),
weak_factory_(this) {
filters_.SetSink(&thunk_);
- if (expects_sync_requests)
- connector_.RegisterSyncHandleWatch();
connector_.set_incoming_receiver(filters_.GetHead());
}
-Router::~Router() {}
+Router::~Router() {
+ weak_factory_.InvalidateWeakPtrs();
+
+ for (auto& pair : async_responders_)
+ delete pair.second;
+ for (auto& pair : sync_responders_)
+ delete pair.second;
+}
bool Router::Accept(Message* message) {
DCHECK(thread_checker_.CalledOnValidThread());
@@ -126,21 +119,17 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) {
if (!message->has_flag(kMessageIsSync)) {
// We assume ownership of |responder|.
- async_responders_[request_id] = make_scoped_ptr(responder);
+ async_responders_[request_id] = responder;
return true;
}
- if (!connector_.RegisterSyncHandleWatch())
- return false;
-
- bool response_received = false;
- scoped_ptr<MessageReceiver> sync_responder(responder);
- sync_responses_.insert(std::make_pair(
- request_id, make_scoped_ptr(new SyncResponseInfo(&response_received))));
-
+ sync_responders_[request_id] = responder;
base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
- do {
- bool result = connector_.RunSyncHandleWatch(&response_received);
+ for (;;) {
+ // TODO(yzshen): Here we should allow incoming sync requests to re-enter and
+ // block async messages.
+ bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
+ // The message pipe has disconnected.
if (!result)
break;
@@ -149,17 +138,9 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) {
break;
// The corresponding response message has arrived.
- DCHECK(response_received);
- DCHECK(ContainsKey(sync_responses_, request_id));
- auto iter = sync_responses_.find(request_id);
- DCHECK_EQ(&response_received, iter->second->response_received);
- scoped_ptr<Message> response = std::move(iter->second->response);
- sync_responses_.erase(iter);
- ignore_result(sync_responder->Accept(response.get()));
- } while (false);
-
- if (weak_self)
- connector_.UnregisterSyncHandleWatch();
+ if (sync_responders_.find(request_id) == sync_responders_.end())
+ break;
+ }
// Return true means that we take ownership of |responder|.
return true;
@@ -173,51 +154,6 @@ void Router::EnableTestingMode() {
bool Router::HandleIncomingMessage(Message* message) {
DCHECK(thread_checker_.CalledOnValidThread());
-
- const bool during_sync_call =
- connector_.during_sync_handle_watcher_callback();
- if (!message->has_flag(kMessageIsSync) &&
- (during_sync_call || !pending_messages_.empty())) {
- scoped_ptr<Message> pending_message(new Message);
- message->MoveTo(pending_message.get());
- pending_messages_.push(std::move(pending_message));
-
- if (!pending_task_for_messages_) {
- pending_task_for_messages_ = true;
- base::MessageLoop::current()->PostTask(
- FROM_HERE, base::Bind(&Router::HandleQueuedMessages,
- weak_factory_.GetWeakPtr()));
- }
-
- return true;
- }
-
- return HandleMessageInternal(message);
-}
-
-void Router::HandleQueuedMessages() {
- DCHECK(thread_checker_.CalledOnValidThread());
- DCHECK(pending_task_for_messages_);
-
- base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
- while (!pending_messages_.empty()) {
- scoped_ptr<Message> message(std::move(pending_messages_.front()));
- pending_messages_.pop();
-
- bool result = HandleMessageInternal(message.get());
- if (!weak_self)
- return;
-
- if (!result && !testing_mode_) {
- connector_.RaiseError();
- break;
- }
- }
-
- pending_task_for_messages_ = false;
-}
-
-bool Router::HandleMessageInternal(Message* message) {
if (message->has_flag(kMessageExpectsResponse)) {
if (!incoming_receiver_)
return false;
@@ -230,28 +166,20 @@ bool Router::HandleMessageInternal(Message* message) {
return ok;
} else if (message->has_flag(kMessageIsResponse)) {
+ ResponderMap& responder_map = message->has_flag(kMessageIsSync)
+ ? sync_responders_
+ : async_responders_;
uint64_t request_id = message->request_id();
-
- if (message->has_flag(kMessageIsSync)) {
- auto it = sync_responses_.find(request_id);
- if (it == sync_responses_.end()) {
- DCHECK(testing_mode_);
- return false;
- }
- it->second->response.reset(new Message());
- message->MoveTo(it->second->response.get());
- *it->second->response_received = true;
- return true;
- }
-
- auto it = async_responders_.find(request_id);
- if (it == async_responders_.end()) {
+ ResponderMap::iterator it = responder_map.find(request_id);
+ if (it == responder_map.end()) {
DCHECK(testing_mode_);
return false;
}
- scoped_ptr<MessageReceiver> responder = std::move(it->second);
- async_responders_.erase(it);
- return responder->Accept(message);
+ MessageReceiver* responder = it->second;
+ responder_map.erase(it);
+ bool ok = responder->Accept(message);
+ delete responder;
+ return ok;
} else {
if (!incoming_receiver_)
return false;