diff options
Diffstat (limited to 'mojo/public/cpp/bindings/lib/router.cc')
-rw-r--r-- | mojo/public/cpp/bindings/lib/router.cc | 128 |
1 files changed, 28 insertions, 100 deletions
diff --git a/mojo/public/cpp/bindings/lib/router.cc b/mojo/public/cpp/bindings/lib/router.cc index a78cdc7..9bc9aa1 100644 --- a/mojo/public/cpp/bindings/lib/router.cc +++ b/mojo/public/cpp/bindings/lib/router.cc @@ -7,10 +7,7 @@ #include <stdint.h> #include <utility> -#include "base/bind.h" #include "base/logging.h" -#include "base/message_loop/message_loop.h" -#include "base/stl_util.h" namespace mojo { namespace internal { @@ -63,13 +60,6 @@ class ResponderThunk : public MessageReceiverWithStatus { // ---------------------------------------------------------------------------- -Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received) - : response_received(in_response_received) {} - -Router::SyncResponseInfo::~SyncResponseInfo() {} - -// ---------------------------------------------------------------------------- - Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) : router_(router) { } @@ -85,7 +75,6 @@ bool Router::HandleIncomingMessageThunk::Accept(Message* message) { Router::Router(ScopedMessagePipeHandle message_pipe, FilterChain filters, - bool expects_sync_requests, const MojoAsyncWaiter* waiter) : thunk_(this), filters_(std::move(filters)), @@ -95,15 +84,19 @@ Router::Router(ScopedMessagePipeHandle message_pipe, incoming_receiver_(nullptr), next_request_id_(0), testing_mode_(false), - pending_task_for_messages_(false), weak_factory_(this) { filters_.SetSink(&thunk_); - if (expects_sync_requests) - connector_.RegisterSyncHandleWatch(); connector_.set_incoming_receiver(filters_.GetHead()); } -Router::~Router() {} +Router::~Router() { + weak_factory_.InvalidateWeakPtrs(); + + for (auto& pair : async_responders_) + delete pair.second; + for (auto& pair : sync_responders_) + delete pair.second; +} bool Router::Accept(Message* message) { DCHECK(thread_checker_.CalledOnValidThread()); @@ -126,21 +119,17 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { if (!message->has_flag(kMessageIsSync)) { // We assume ownership of |responder|. - async_responders_[request_id] = make_scoped_ptr(responder); + async_responders_[request_id] = responder; return true; } - if (!connector_.RegisterSyncHandleWatch()) - return false; - - bool response_received = false; - scoped_ptr<MessageReceiver> sync_responder(responder); - sync_responses_.insert(std::make_pair( - request_id, make_scoped_ptr(new SyncResponseInfo(&response_received)))); - + sync_responders_[request_id] = responder; base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); - do { - bool result = connector_.RunSyncHandleWatch(&response_received); + for (;;) { + // TODO(yzshen): Here we should allow incoming sync requests to re-enter and + // block async messages. + bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); + // The message pipe has disconnected. if (!result) break; @@ -149,17 +138,9 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { break; // The corresponding response message has arrived. - DCHECK(response_received); - DCHECK(ContainsKey(sync_responses_, request_id)); - auto iter = sync_responses_.find(request_id); - DCHECK_EQ(&response_received, iter->second->response_received); - scoped_ptr<Message> response = std::move(iter->second->response); - sync_responses_.erase(iter); - ignore_result(sync_responder->Accept(response.get())); - } while (false); - - if (weak_self) - connector_.UnregisterSyncHandleWatch(); + if (sync_responders_.find(request_id) == sync_responders_.end()) + break; + } // Return true means that we take ownership of |responder|. return true; @@ -173,51 +154,6 @@ void Router::EnableTestingMode() { bool Router::HandleIncomingMessage(Message* message) { DCHECK(thread_checker_.CalledOnValidThread()); - - const bool during_sync_call = - connector_.during_sync_handle_watcher_callback(); - if (!message->has_flag(kMessageIsSync) && - (during_sync_call || !pending_messages_.empty())) { - scoped_ptr<Message> pending_message(new Message); - message->MoveTo(pending_message.get()); - pending_messages_.push(std::move(pending_message)); - - if (!pending_task_for_messages_) { - pending_task_for_messages_ = true; - base::MessageLoop::current()->PostTask( - FROM_HERE, base::Bind(&Router::HandleQueuedMessages, - weak_factory_.GetWeakPtr())); - } - - return true; - } - - return HandleMessageInternal(message); -} - -void Router::HandleQueuedMessages() { - DCHECK(thread_checker_.CalledOnValidThread()); - DCHECK(pending_task_for_messages_); - - base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); - while (!pending_messages_.empty()) { - scoped_ptr<Message> message(std::move(pending_messages_.front())); - pending_messages_.pop(); - - bool result = HandleMessageInternal(message.get()); - if (!weak_self) - return; - - if (!result && !testing_mode_) { - connector_.RaiseError(); - break; - } - } - - pending_task_for_messages_ = false; -} - -bool Router::HandleMessageInternal(Message* message) { if (message->has_flag(kMessageExpectsResponse)) { if (!incoming_receiver_) return false; @@ -230,28 +166,20 @@ bool Router::HandleMessageInternal(Message* message) { return ok; } else if (message->has_flag(kMessageIsResponse)) { + ResponderMap& responder_map = message->has_flag(kMessageIsSync) + ? sync_responders_ + : async_responders_; uint64_t request_id = message->request_id(); - - if (message->has_flag(kMessageIsSync)) { - auto it = sync_responses_.find(request_id); - if (it == sync_responses_.end()) { - DCHECK(testing_mode_); - return false; - } - it->second->response.reset(new Message()); - message->MoveTo(it->second->response.get()); - *it->second->response_received = true; - return true; - } - - auto it = async_responders_.find(request_id); - if (it == async_responders_.end()) { + ResponderMap::iterator it = responder_map.find(request_id); + if (it == responder_map.end()) { DCHECK(testing_mode_); return false; } - scoped_ptr<MessageReceiver> responder = std::move(it->second); - async_responders_.erase(it); - return responder->Accept(message); + MessageReceiver* responder = it->second; + responder_map.erase(it); + bool ok = responder->Accept(message); + delete responder; + return ok; } else { if (!incoming_receiver_) return false; |