// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/message_pump/handle_watcher.h" #include #include "base/atomic_sequence_num.h" #include "base/bind.h" #include "base/lazy_instance.h" #include "base/logging.h" #include "base/macros.h" #include "base/memory/singleton.h" #include "base/memory/weak_ptr.h" #include "base/message_loop/message_loop.h" #include "base/profiler/scoped_tracker.h" #include "base/single_thread_task_runner.h" #include "base/synchronization/lock.h" #include "base/thread_task_runner_handle.h" #include "base/threading/thread.h" #include "base/time/time.h" #include "mojo/message_pump/message_pump_mojo.h" #include "mojo/message_pump/message_pump_mojo_handler.h" #include "mojo/message_pump/time_helper.h" namespace mojo { namespace common { typedef int WatcherID; namespace { const char kWatcherThreadName[] = "handle-watcher-thread"; base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) { return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() : internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline); } // Tracks the data for a single call to Start(). struct WatchData { WatchData() : id(0), handle_signals(MOJO_HANDLE_SIGNAL_NONE), task_runner(NULL) {} WatcherID id; Handle handle; MojoHandleSignals handle_signals; base::TimeTicks deadline; base::Callback callback; scoped_refptr task_runner; }; // WatcherBackend -------------------------------------------------------------- // WatcherBackend is responsible for managing the requests and interacting with // MessagePumpMojo. All access (outside of creation/destruction) is done on the // thread WatcherThreadManager creates. class WatcherBackend : public MessagePumpMojoHandler { public: WatcherBackend(); ~WatcherBackend() override; void StartWatching(const WatchData& data); void StopWatching(WatcherID watcher_id); private: typedef std::map HandleToWatchDataMap; // Invoked when a handle needs to be removed and notified. void RemoveAndNotify(const Handle& handle, MojoResult result); // Searches through |handle_to_data_| for |watcher_id|. Returns true if found // and sets |handle| to the Handle. Returns false if not a known id. bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const; // MessagePumpMojoHandler overrides: void OnHandleReady(const Handle& handle) override; void OnHandleError(const Handle& handle, MojoResult result) override; // Maps from assigned id to WatchData. HandleToWatchDataMap handle_to_data_; DISALLOW_COPY_AND_ASSIGN(WatcherBackend); }; WatcherBackend::WatcherBackend() { } WatcherBackend::~WatcherBackend() { } void WatcherBackend::StartWatching(const WatchData& data) { RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED); DCHECK_EQ(0u, handle_to_data_.count(data.handle)); handle_to_data_[data.handle] = data; MessagePumpMojo::current()->AddHandler(this, data.handle, data.handle_signals, data.deadline); } void WatcherBackend::StopWatching(WatcherID watcher_id) { // Because of the thread hop it is entirely possible to get here and not // have a valid handle registered for |watcher_id|. Handle handle; if (!GetMojoHandleByWatcherID(watcher_id, &handle)) return; handle_to_data_.erase(handle); MessagePumpMojo::current()->RemoveHandler(handle); } void WatcherBackend::RemoveAndNotify(const Handle& handle, MojoResult result) { if (handle_to_data_.count(handle) == 0) return; const WatchData data(handle_to_data_[handle]); handle_to_data_.erase(handle); MessagePumpMojo::current()->RemoveHandler(handle); data.task_runner->PostTask(FROM_HERE, base::Bind(data.callback, result)); } bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const { for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin(); i != handle_to_data_.end(); ++i) { if (i->second.id == watcher_id) { *handle = i->second.handle; return true; } } return false; } void WatcherBackend::OnHandleReady(const Handle& handle) { RemoveAndNotify(handle, MOJO_RESULT_OK); } void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) { RemoveAndNotify(handle, result); } // WatcherThreadManager -------------------------------------------------------- // WatcherThreadManager manages the background thread that listens for handles // to be ready. All requests are handled by WatcherBackend. class WatcherThreadManager { public: ~WatcherThreadManager(); // Returns the shared instance. static WatcherThreadManager* GetInstance(); // Starts watching the requested handle. Returns a unique ID that is used to // stop watching the handle. When the handle is ready |callback| is notified // on the thread StartWatching() was invoked on. // This may be invoked on any thread. WatcherID StartWatching(const Handle& handle, MojoHandleSignals handle_signals, base::TimeTicks deadline, const base::Callback& callback); // Stops watching a handle. // This may be invoked on any thread. void StopWatching(WatcherID watcher_id); private: enum RequestType { REQUEST_START, REQUEST_STOP, }; // See description of |requests_| for details. struct RequestData { RequestData() : type(REQUEST_START), stop_id(0) {} RequestType type; WatchData start_data; WatcherID stop_id; }; typedef std::vector Requests; friend struct base::DefaultSingletonTraits; WatcherThreadManager(); // Schedules a request on the background thread. See |requests_| for details. void AddRequest(const RequestData& data); // Processes requests added to |requests_|. This is invoked on the backend // thread. void ProcessRequestsOnBackendThread(); base::Thread thread_; base::AtomicSequenceNumber watcher_id_generator_; WatcherBackend backend_; // Protects |requests_|. base::Lock lock_; // Start/Stop result in adding a RequestData to |requests_| (protected by // |lock_|). When the background thread wakes up it processes the requests. Requests requests_; DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager); }; WatcherThreadManager::~WatcherThreadManager() { thread_.Stop(); } WatcherThreadManager* WatcherThreadManager::GetInstance() { return base::Singleton::get(); } WatcherID WatcherThreadManager::StartWatching( const Handle& handle, MojoHandleSignals handle_signals, base::TimeTicks deadline, const base::Callback& callback) { RequestData request_data; request_data.type = REQUEST_START; request_data.start_data.id = watcher_id_generator_.GetNext(); request_data.start_data.handle = handle; request_data.start_data.callback = callback; request_data.start_data.handle_signals = handle_signals; request_data.start_data.deadline = deadline; request_data.start_data.task_runner = base::ThreadTaskRunnerHandle::Get(); AddRequest(request_data); return request_data.start_data.id; } void WatcherThreadManager::StopWatching(WatcherID watcher_id) { // Handle the case of StartWatching() followed by StopWatching() before // |thread_| woke up. { base::AutoLock auto_lock(lock_); for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) { if (i->type == REQUEST_START && i->start_data.id == watcher_id) { // Watcher ids are not reused, so if we find it we can stop. requests_.erase(i); return; } } } // TODO(amistry): Remove ScopedTracker below once http://crbug.com/554761 is // fixed. tracked_objects::ScopedTracker tracking_profile( FROM_HERE_WITH_EXPLICIT_FUNCTION( "554761 WatcherThreadManager::StopWatching")); RequestData request_data; request_data.type = REQUEST_STOP; request_data.stop_id = watcher_id; AddRequest(request_data); } void WatcherThreadManager::AddRequest(const RequestData& data) { { base::AutoLock auto_lock(lock_); const bool was_empty = requests_.empty(); requests_.push_back(data); if (!was_empty) return; } // We outlive |thread_|, so it's safe to use Unretained() here. thread_.task_runner()->PostTask( FROM_HERE, base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread, base::Unretained(this))); } void WatcherThreadManager::ProcessRequestsOnBackendThread() { DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current()); Requests requests; { base::AutoLock auto_lock(lock_); requests_.swap(requests); } for (size_t i = 0; i < requests.size(); ++i) { if (requests[i].type == REQUEST_START) { backend_.StartWatching(requests[i].start_data); } else { backend_.StopWatching(requests[i].stop_id); } } } WatcherThreadManager::WatcherThreadManager() : thread_(kWatcherThreadName) { base::Thread::Options thread_options; thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create); thread_.StartWithOptions(thread_options); } } // namespace // HandleWatcher::StateBase and subclasses ------------------------------------- // The base class of HandleWatcher's state. Owns the user's callback and // monitors the current thread's MessageLoop to know when to force the callback // to run (with an error) even though the pipe hasn't been signaled yet. class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver { public: StateBase(HandleWatcher* watcher, const base::Callback& callback) : watcher_(watcher), callback_(callback), got_ready_(false) { base::MessageLoop::current()->AddDestructionObserver(this); } ~StateBase() override { base::MessageLoop::current()->RemoveDestructionObserver(this); } protected: void NotifyHandleReady(MojoResult result) { got_ready_ = true; NotifyAndDestroy(result); } bool got_ready() const { return got_ready_; } private: void WillDestroyCurrentMessageLoop() override { // The current thread is exiting. Simulate a watch error. NotifyAndDestroy(MOJO_RESULT_ABORTED); } void NotifyAndDestroy(MojoResult result) { base::Callback callback = callback_; watcher_->Stop(); // Destroys |this|. callback.Run(result); } HandleWatcher* watcher_; base::Callback callback_; // Have we been notified that the handle is ready? bool got_ready_; DISALLOW_COPY_AND_ASSIGN(StateBase); }; // If the thread on which HandleWatcher is used runs MessagePumpMojo, // SameThreadWatchingState is used to directly watch the handle on the same // thread. class HandleWatcher::SameThreadWatchingState : public StateBase, public MessagePumpMojoHandler { public: SameThreadWatchingState(HandleWatcher* watcher, const Handle& handle, MojoHandleSignals handle_signals, MojoDeadline deadline, const base::Callback& callback) : StateBase(watcher, callback), handle_(handle) { DCHECK(MessagePumpMojo::IsCurrent()); MessagePumpMojo::current()->AddHandler( this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline)); } ~SameThreadWatchingState() override { if (!got_ready()) MessagePumpMojo::current()->RemoveHandler(handle_); } private: // MessagePumpMojoHandler overrides: void OnHandleReady(const Handle& handle) override { StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK); } void OnHandleError(const Handle& handle, MojoResult result) override { StopWatchingAndNotifyReady(handle, result); } void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) { DCHECK_EQ(handle.value(), handle_.value()); MessagePumpMojo::current()->RemoveHandler(handle_); NotifyHandleReady(result); } Handle handle_; DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState); }; // If the thread on which HandleWatcher is used runs a message pump different // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the // handle on the handle watcher thread. class HandleWatcher::SecondaryThreadWatchingState : public StateBase { public: SecondaryThreadWatchingState(HandleWatcher* watcher, const Handle& handle, MojoHandleSignals handle_signals, MojoDeadline deadline, const base::Callback& callback) : StateBase(watcher, callback), weak_factory_(this) { watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching( handle, handle_signals, MojoDeadlineToTimeTicks(deadline), base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady, weak_factory_.GetWeakPtr())); } ~SecondaryThreadWatchingState() override { // If we've been notified the handle is ready (|got_ready()| is true) then // the watch has been implicitly removed by // WatcherThreadManager/MessagePumpMojo and we don't have to call // StopWatching(). To do so would needlessly entail posting a task and // blocking until the background thread services it. if (!got_ready()) WatcherThreadManager::GetInstance()->StopWatching(watcher_id_); } private: WatcherID watcher_id_; // Used to weakly bind |this| to the WatcherThreadManager. base::WeakPtrFactory weak_factory_; DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState); }; // HandleWatcher --------------------------------------------------------------- HandleWatcher::HandleWatcher() { } HandleWatcher::~HandleWatcher() { } void HandleWatcher::Start(const Handle& handle, MojoHandleSignals handle_signals, MojoDeadline deadline, const base::Callback& callback) { DCHECK(handle.is_valid()); DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals); // Need to clear the state before creating a new one. state_.reset(); if (MessagePumpMojo::IsCurrent()) { state_.reset(new SameThreadWatchingState( this, handle, handle_signals, deadline, callback)); } else { state_.reset(new SecondaryThreadWatchingState( this, handle, handle_signals, deadline, callback)); } } void HandleWatcher::Stop() { state_.reset(); } } // namespace common } // namespace mojo