diff options
Diffstat (limited to 'mojo/public/cpp/bindings/lib/router.cc')
-rw-r--r-- | mojo/public/cpp/bindings/lib/router.cc | 133 |
1 files changed, 99 insertions, 34 deletions
diff --git a/mojo/public/cpp/bindings/lib/router.cc b/mojo/public/cpp/bindings/lib/router.cc index 9bc9aa1..bb83477 100644 --- a/mojo/public/cpp/bindings/lib/router.cc +++ b/mojo/public/cpp/bindings/lib/router.cc @@ -7,7 +7,10 @@ #include <stdint.h> #include <utility> +#include "base/bind.h" #include "base/logging.h" +#include "base/message_loop/message_loop.h" +#include "base/stl_util.h" namespace mojo { namespace internal { @@ -60,6 +63,13 @@ class ResponderThunk : public MessageReceiverWithStatus { // ---------------------------------------------------------------------------- +Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received) + : response_received(in_response_received) {} + +Router::SyncResponseInfo::~SyncResponseInfo() {} + +// ---------------------------------------------------------------------------- + Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) : router_(router) { } @@ -75,6 +85,7 @@ bool Router::HandleIncomingMessageThunk::Accept(Message* message) { Router::Router(ScopedMessagePipeHandle message_pipe, FilterChain filters, + bool expects_sync_requests, const MojoAsyncWaiter* waiter) : thunk_(this), filters_(std::move(filters)), @@ -84,19 +95,15 @@ Router::Router(ScopedMessagePipeHandle message_pipe, incoming_receiver_(nullptr), next_request_id_(0), testing_mode_(false), + pending_task_for_messages_(false), weak_factory_(this) { filters_.SetSink(&thunk_); + if (expects_sync_requests) + connector_.RegisterSyncHandleWatch(); connector_.set_incoming_receiver(filters_.GetHead()); } -Router::~Router() { - weak_factory_.InvalidateWeakPtrs(); - - for (auto& pair : async_responders_) - delete pair.second; - for (auto& pair : sync_responders_) - delete pair.second; -} +Router::~Router() {} bool Router::Accept(Message* message) { DCHECK(thread_checker_.CalledOnValidThread()); @@ -119,27 +126,32 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { if (!message->has_flag(kMessageIsSync)) { // We assume ownership of |responder|. - async_responders_[request_id] = responder; + async_responders_[request_id] = make_scoped_ptr(responder); return true; } - sync_responders_[request_id] = responder; - base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); - for (;;) { - // TODO(yzshen): Here we should allow incoming sync requests to re-enter and - // block async messages. - bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); - // The message pipe has disconnected. - if (!result) - break; + if (!connector_.RegisterSyncHandleWatch()) + return false; - // This instance has been destroyed. - if (!weak_self) - break; + bool response_received = false; + scoped_ptr<MessageReceiver> sync_responder(responder); + sync_responses_.insert(std::make_pair( + request_id, make_scoped_ptr(new SyncResponseInfo(&response_received)))); - // The corresponding response message has arrived. - if (sync_responders_.find(request_id) == sync_responders_.end()) - break; + base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); + bool result = connector_.RunSyncHandleWatch(&response_received); + // Make sure that this instance hasn't been destroyed. + if (weak_self) { + DCHECK(ContainsKey(sync_responses_, request_id)); + auto iter = sync_responses_.find(request_id); + DCHECK_EQ(&response_received, iter->second->response_received); + if (result && response_received) { + scoped_ptr<Message> response = std::move(iter->second->response); + ignore_result(sync_responder->Accept(response.get())); + } + sync_responses_.erase(iter); + + connector_.UnregisterSyncHandleWatch(); } // Return true means that we take ownership of |responder|. @@ -154,6 +166,51 @@ void Router::EnableTestingMode() { bool Router::HandleIncomingMessage(Message* message) { DCHECK(thread_checker_.CalledOnValidThread()); + + const bool during_sync_call = + connector_.during_sync_handle_watcher_callback(); + if (!message->has_flag(kMessageIsSync) && + (during_sync_call || !pending_messages_.empty())) { + scoped_ptr<Message> pending_message(new Message); + message->MoveTo(pending_message.get()); + pending_messages_.push(std::move(pending_message)); + + if (!pending_task_for_messages_) { + pending_task_for_messages_ = true; + base::MessageLoop::current()->PostTask( + FROM_HERE, base::Bind(&Router::HandleQueuedMessages, + weak_factory_.GetWeakPtr())); + } + + return true; + } + + return HandleMessageInternal(message); +} + +void Router::HandleQueuedMessages() { + DCHECK(thread_checker_.CalledOnValidThread()); + DCHECK(pending_task_for_messages_); + + base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); + while (!pending_messages_.empty()) { + scoped_ptr<Message> message(std::move(pending_messages_.front())); + pending_messages_.pop(); + + bool result = HandleMessageInternal(message.get()); + if (!weak_self) + return; + + if (!result && !testing_mode_) { + connector_.RaiseError(); + break; + } + } + + pending_task_for_messages_ = false; +} + +bool Router::HandleMessageInternal(Message* message) { if (message->has_flag(kMessageExpectsResponse)) { if (!incoming_receiver_) return false; @@ -166,20 +223,28 @@ bool Router::HandleIncomingMessage(Message* message) { return ok; } else if (message->has_flag(kMessageIsResponse)) { - ResponderMap& responder_map = message->has_flag(kMessageIsSync) - ? sync_responders_ - : async_responders_; uint64_t request_id = message->request_id(); - ResponderMap::iterator it = responder_map.find(request_id); - if (it == responder_map.end()) { + + if (message->has_flag(kMessageIsSync)) { + auto it = sync_responses_.find(request_id); + if (it == sync_responses_.end()) { + DCHECK(testing_mode_); + return false; + } + it->second->response.reset(new Message()); + message->MoveTo(it->second->response.get()); + *it->second->response_received = true; + return true; + } + + auto it = async_responders_.find(request_id); + if (it == async_responders_.end()) { DCHECK(testing_mode_); return false; } - MessageReceiver* responder = it->second; - responder_map.erase(it); - bool ok = responder->Accept(message); - delete responder; - return ok; + scoped_ptr<MessageReceiver> responder = std::move(it->second); + async_responders_.erase(it); + return responder->Accept(message); } else { if (!incoming_receiver_) return false; |