diff options
Diffstat (limited to 'mojo/public/cpp/bindings/lib/connector.cc')
-rw-r--r-- | mojo/public/cpp/bindings/lib/connector.cc | 122 |
1 files changed, 100 insertions, 22 deletions
diff --git a/mojo/public/cpp/bindings/lib/connector.cc b/mojo/public/cpp/bindings/lib/connector.cc index ceccaa9..07c1094 100644 --- a/mojo/public/cpp/bindings/lib/connector.cc +++ b/mojo/public/cpp/bindings/lib/connector.cc @@ -7,9 +7,11 @@ #include <stdint.h> #include <utility> +#include "base/bind.h" #include "base/logging.h" #include "base/macros.h" #include "base/synchronization/lock.h" +#include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" namespace mojo { namespace internal { @@ -52,8 +54,11 @@ Connector::Connector(ScopedMessagePipeHandle message_pipe, drop_writes_(false), enforce_errors_from_incoming_receiver_(true), paused_(false), - destroyed_flag_(nullptr), - lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr) { + lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr), + register_sync_handle_watch_count_(0), + registered_with_sync_handle_watcher_(false), + sync_handle_watcher_callback_count_(0), + weak_factory_(this) { // Even though we don't have an incoming receiver, we still want to monitor // the message pipe to know if is closed or encounters an error. WaitToReadMore(); @@ -62,9 +67,6 @@ Connector::Connector(ScopedMessagePipeHandle message_pipe, Connector::~Connector() { DCHECK(thread_checker_.CalledOnValidThread()); - if (destroyed_flag_) - *destroyed_flag_ = true; - CancelWait(); } @@ -191,17 +193,84 @@ bool Connector::Accept(Message* message) { return true; } +bool Connector::RegisterSyncHandleWatch() { + DCHECK(thread_checker_.CalledOnValidThread()); + + if (error_) + return false; + + register_sync_handle_watch_count_++; + + if (!registered_with_sync_handle_watcher_ && !paused_) { + registered_with_sync_handle_watcher_ = + SyncHandleWatcher::current()->RegisterHandle( + message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, + base::Bind(&Connector::OnSyncHandleWatcherHandleReady, + base::Unretained(this))); + } + return true; +} + +void Connector::UnregisterSyncHandleWatch() { + DCHECK(thread_checker_.CalledOnValidThread()); + + if (register_sync_handle_watch_count_ == 0) { + NOTREACHED(); + return; + } + + register_sync_handle_watch_count_--; + if (register_sync_handle_watch_count_ > 0) + return; + + if (registered_with_sync_handle_watcher_) { + SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); + registered_with_sync_handle_watcher_ = false; + } +} + +bool Connector::RunSyncHandleWatch(const bool* should_stop) { + DCHECK(thread_checker_.CalledOnValidThread()); + DCHECK_GT(register_sync_handle_watch_count_, 0u); + + if (error_) + return false; + + ResumeIncomingMethodCallProcessing(); + + if (!should_stop_sync_handle_watch_) + should_stop_sync_handle_watch_ = new base::RefCountedData<bool>(false); + + // This object may be destroyed during the WatchAllHandles() call. So we have + // to preserve the boolean that WatchAllHandles uses. + scoped_refptr<base::RefCountedData<bool>> preserver = + should_stop_sync_handle_watch_; + const bool* should_stop_array[] = {should_stop, + &should_stop_sync_handle_watch_->data}; + return SyncHandleWatcher::current()->WatchAllHandles(should_stop_array, 2); +} + // static void Connector::CallOnHandleReady(void* closure, MojoResult result) { Connector* self = static_cast<Connector*>(closure); - self->OnHandleReady(result); + CHECK(self->async_wait_id_ != 0); + self->async_wait_id_ = 0; + self->OnHandleReadyInternal(result); } -void Connector::OnHandleReady(MojoResult result) { +void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { + base::WeakPtr<Connector> weak_self(weak_factory_.GetWeakPtr()); + + sync_handle_watcher_callback_count_++; + OnHandleReadyInternal(result); + // At this point, this object might have been deleted. + if (weak_self) + sync_handle_watcher_callback_count_--; +} + +void Connector::OnHandleReadyInternal(MojoResult result) { DCHECK(thread_checker_.CalledOnValidThread()); - CHECK(async_wait_id_ != 0); - async_wait_id_ = 0; if (result != MOJO_RESULT_OK) { HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); return; @@ -218,6 +287,15 @@ void Connector::WaitToReadMore() { MOJO_DEADLINE_INDEFINITE, &Connector::CallOnHandleReady, this); + + if (register_sync_handle_watch_count_ > 0 && + !registered_with_sync_handle_watcher_) { + registered_with_sync_handle_watcher_ = + SyncHandleWatcher::current()->RegisterHandle( + message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, + base::Bind(&Connector::OnSyncHandleWatcherHandleReady, + base::Unretained(this))); + } } bool Connector::ReadSingleMessage(MojoResult* read_result) { @@ -227,9 +305,7 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { // Detect if |this| was destroyed during message dispatch. Allow for the // possibility of re-entering ReadMore() through message dispatch. - bool was_destroyed_during_dispatch = false; - bool* previous_destroyed_flag = destroyed_flag_; - destroyed_flag_ = &was_destroyed_during_dispatch; + base::WeakPtr<Connector> weak_self = weak_factory_.GetWeakPtr(); Message message; const MojoResult rv = ReadMessage(message_pipe_.get(), &message); @@ -247,13 +323,8 @@ bool Connector::ReadSingleMessage(MojoResult* read_result) { incoming_receiver_ && incoming_receiver_->Accept(&message); } - if (was_destroyed_during_dispatch) { - if (previous_destroyed_flag) - *previous_destroyed_flag = true; // Propagate flag. + if (!weak_self) return false; - } - - destroyed_flag_ = previous_destroyed_flag; if (rv == MOJO_RESULT_SHOULD_WAIT) return true; @@ -295,11 +366,18 @@ void Connector::ReadAllAvailableMessages() { } void Connector::CancelWait() { - if (!async_wait_id_) - return; + if (async_wait_id_) { + waiter_->CancelWait(async_wait_id_); + async_wait_id_ = 0; + } + + if (registered_with_sync_handle_watcher_) { + SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); + registered_with_sync_handle_watcher_ = false; + } - waiter_->CancelWait(async_wait_id_); - async_wait_id_ = 0; + if (should_stop_sync_handle_watch_) + should_stop_sync_handle_watch_->data = true; } void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { |