diff options
author | pliard@chromium.org <pliard@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-07-30 12:13:28 +0000 |
---|---|---|
committer | pliard@chromium.org <pliard@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-07-30 12:13:28 +0000 |
commit | 7f09ee436f706a22b254d3fee7eb20beb53fdad8 (patch) | |
tree | 5e56d40ab8f68524220d3f19c808e01d1e3fecd6 /tools/android | |
parent | 672346f283ee7946a5d1dea1b0a0152a5ceafeea (diff) | |
download | chromium_src-7f09ee436f706a22b254d3fee7eb20beb53fdad8.zip chromium_src-7f09ee436f706a22b254d3fee7eb20beb53fdad8.tar.gz chromium_src-7f09ee436f706a22b254d3fee7eb20beb53fdad8.tar.bz2 |
Remove Thread wrapper class in forwarder2.
This class unnecessarily reinvented the wheel in a non-RAII way that proved to
be harmful (see r210830).
Also, this class wasn't playing nicely with Chromium's base idioms. For
instance it was impossible to do task posting with it. This implied some
non-idiomatic threading code that was hard to read and reason about (e.g. using
locks, pthread_condition_t...) thus hard to maintain and debug. See
device_listener.cc for instance.
This CL also refactors the shutdown code to ensure that in general all the
objects are always destroyed on the thread they were created on. This wasn't
the case before.
BUG=242846
R=bulach@chromium.org, digit@google.com
Review URL: https://codereview.chromium.org/19478003
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@214338 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools/android')
-rw-r--r-- | tools/android/forwarder2/command.h | 6 | ||||
-rw-r--r-- | tools/android/forwarder2/device_controller.cc | 224 | ||||
-rw-r--r-- | tools/android/forwarder2/device_controller.h | 41 | ||||
-rw-r--r-- | tools/android/forwarder2/device_forwarder_main.cc | 41 | ||||
-rw-r--r-- | tools/android/forwarder2/device_listener.cc | 260 | ||||
-rw-r--r-- | tools/android/forwarder2/device_listener.h | 120 | ||||
-rw-r--r-- | tools/android/forwarder2/forwarder.cc | 137 | ||||
-rw-r--r-- | tools/android/forwarder2/forwarder.gyp | 2 | ||||
-rw-r--r-- | tools/android/forwarder2/forwarder.h | 28 | ||||
-rw-r--r-- | tools/android/forwarder2/host_controller.cc | 186 | ||||
-rw-r--r-- | tools/android/forwarder2/host_controller.h | 68 | ||||
-rw-r--r-- | tools/android/forwarder2/host_forwarder_main.cc | 69 | ||||
-rw-r--r-- | tools/android/forwarder2/thread.cc | 40 | ||||
-rw-r--r-- | tools/android/forwarder2/thread.h | 35 |
14 files changed, 637 insertions, 620 deletions
diff --git a/tools/android/forwarder2/command.h b/tools/android/forwarder2/command.h index dfbd7e0..8e222ef 100644 --- a/tools/android/forwarder2/command.h +++ b/tools/android/forwarder2/command.h @@ -26,9 +26,9 @@ enum Type { HOST_SERVER_SUCCESS, KILL_ALL_LISTENERS, LISTEN, - UNMAP_PORT, - UNMAP_PORT_ERROR, - UNMAP_PORT_SUCCESS, + UNLISTEN, + UNLISTEN_ERROR, + UNLISTEN_SUCCESS, }; } // namespace command diff --git a/tools/android/forwarder2/device_controller.cc b/tools/android/forwarder2/device_controller.cc index e9ce9cb..87d0e17 100644 --- a/tools/android/forwarder2/device_controller.cc +++ b/tools/android/forwarder2/device_controller.cc @@ -4,139 +4,151 @@ #include "tools/android/forwarder2/device_controller.h" -#include <errno.h> -#include <stdlib.h> +#include <utility> +#include "base/bind.h" +#include "base/bind_helpers.h" #include "base/logging.h" #include "base/memory/scoped_ptr.h" -#include "base/safe_strerror_posix.h" +#include "base/message_loop/message_loop_proxy.h" +#include "base/single_thread_task_runner.h" #include "tools/android/forwarder2/command.h" #include "tools/android/forwarder2/device_listener.h" #include "tools/android/forwarder2/socket.h" namespace forwarder2 { -DeviceController::DeviceController(int exit_notifier_fd) - : exit_notifier_fd_(exit_notifier_fd) { - kickstart_adb_socket_.AddEventFd(exit_notifier_fd); +// static +scoped_ptr<DeviceController> DeviceController::Create( + const std::string& adb_unix_socket, + int exit_notifier_fd) { + scoped_ptr<DeviceController> device_controller; + scoped_ptr<Socket> host_socket(new Socket()); + if (!host_socket->BindUnix(adb_unix_socket)) { + PLOG(ERROR) << "Could not BindAndListen DeviceController socket on port " + << adb_unix_socket << ": "; + return device_controller.Pass(); + } + LOG(INFO) << "Listening on Unix Domain Socket " << adb_unix_socket; + device_controller.reset( + new DeviceController(host_socket.Pass(), exit_notifier_fd)); + return device_controller.Pass(); } DeviceController::~DeviceController() { - KillAllListeners(); - CleanUpDeadListeners(); - CHECK_EQ(0, listeners_.size()); + DCHECK(construction_task_runner_->RunsTasksOnCurrentThread()); } -void DeviceController::CleanUpDeadListeners() { - // Clean up dead listeners. - for (ListenersMap::iterator it(&listeners_); !it.IsAtEnd(); it.Advance()) { - if (!it.GetCurrentValue()->is_alive()) - // Remove deletes the listener. - listeners_.Remove(it.GetCurrentKey()); - } +void DeviceController::Start() { + AcceptHostCommandSoon(); } -void DeviceController::KillAllListeners() { - for (ListenersMap::iterator it(&listeners_); !it.IsAtEnd(); it.Advance()) - it.GetCurrentValue()->ForceExit(); - for (ListenersMap::iterator it(&listeners_); !it.IsAtEnd(); it.Advance()) { - it.GetCurrentValue()->Join(); - CHECK(!it.GetCurrentValue()->is_alive()); - } +DeviceController::DeviceController(scoped_ptr<Socket> host_socket, + int exit_notifier_fd) + : host_socket_(host_socket.Pass()), + exit_notifier_fd_(exit_notifier_fd), + construction_task_runner_(base::MessageLoopProxy::current()), + weak_ptr_factory_(this) { + host_socket_->AddEventFd(exit_notifier_fd); } -bool DeviceController::Init(const std::string& adb_unix_socket) { - if (!kickstart_adb_socket_.BindUnix(adb_unix_socket)) { - LOG(ERROR) << "Could not BindAndListen DeviceController socket on port " - << adb_unix_socket << ": " << safe_strerror(errno); - return false; - } - LOG(INFO) << "Listening on Unix Domain Socket " << adb_unix_socket; - return true; +void DeviceController::AcceptHostCommandSoon() { + base::MessageLoopProxy::current()->PostTask( + FROM_HERE, + base::Bind(&DeviceController::AcceptHostCommandInternal, + base::Unretained(this))); } -void DeviceController::Start() { - while (true) { - CleanUpDeadListeners(); - scoped_ptr<Socket> socket(new Socket); - if (!kickstart_adb_socket_.Accept(socket.get())) { - if (!kickstart_adb_socket_.DidReceiveEvent()) { - LOG(ERROR) << "Could not Accept DeviceController socket: " - << safe_strerror(errno); - } else { - LOG(INFO) << "Received exit notification"; +void DeviceController::AcceptHostCommandInternal() { + scoped_ptr<Socket> socket(new Socket); + if (!host_socket_->Accept(socket.get())) { + if (!host_socket_->DidReceiveEvent()) + PLOG(ERROR) << "Could not Accept DeviceController socket"; + else + LOG(INFO) << "Received exit notification"; + return; + } + base::ScopedClosureRunner accept_next_client( + base::Bind(&DeviceController::AcceptHostCommandSoon, + base::Unretained(this))); + // So that |socket| doesn't block on read if it has notifications. + socket->AddEventFd(exit_notifier_fd_); + int port; + command::Type command; + if (!ReadCommand(socket.get(), &port, &command)) { + LOG(ERROR) << "Invalid command received."; + return; + } + const ListenersMap::iterator listener_it = listeners_.find(port); + DeviceListener* const listener = listener_it == listeners_.end() + ? static_cast<DeviceListener*>(NULL) : listener_it->second.get(); + switch (command) { + case command::LISTEN: { + if (listener != NULL) { + LOG(WARNING) << "Already forwarding port " << port + << ". Attempting to restart the listener.\n"; + // Note that this deletes the listener object. + listeners_.erase(listener_it); } + scoped_ptr<DeviceListener> new_listener( + DeviceListener::Create( + socket.Pass(), port, base::Bind(&DeviceController::DeleteListener, + weak_ptr_factory_.GetWeakPtr()))); + if (!new_listener) + return; + new_listener->Start(); + // |port| can be zero, to allow dynamically allocated port, so instead, we + // call DeviceListener::listener_port() to retrieve the currently + // allocated port to this new listener. + const int listener_port = new_listener->listener_port(); + listeners_.insert( + std::make_pair(listener_port, + linked_ptr<DeviceListener>(new_listener.release()))); + LOG(INFO) << "Forwarding device port " << listener_port << " to host."; break; } - // So that |socket| doesn't block on read if it has notifications. - socket->AddEventFd(exit_notifier_fd_); - int port; - command::Type command; - if (!ReadCommand(socket.get(), &port, &command)) { - LOG(ERROR) << "Invalid command received."; - continue; - } - DeviceListener* listener = listeners_.Lookup(port); - switch (command) { - case command::LISTEN: { - if (listener != NULL) { - LOG(WARNING) << "Already forwarding port " << port - << ". Attempting to restart the listener.\n"; - listener->ForceExit(); - listener->Join(); - CHECK(!listener->is_alive()); - // Remove deletes the listener object. - listeners_.Remove(port); - } - scoped_ptr<DeviceListener> new_listener( - new DeviceListener(socket.Pass(), port)); - if (!new_listener->BindListenerSocket()) - continue; - new_listener->Start(); - // |port| can be zero, to allow dynamically allocated port, so instead, - // we call DeviceListener::listener_port() to retrieve the currently - // allocated port to this new listener, which has been set by the - // BindListenerSocket() method in case of success. - const int listener_port = new_listener->listener_port(); - // |new_listener| is now owned by listeners_ map. - listeners_.AddWithID(new_listener.release(), listener_port); - LOG(INFO) << "Forwarding device port " << listener_port << " to host."; + case command::DATA_CONNECTION: + if (listener == NULL) { + LOG(ERROR) << "Data Connection command received, but " + << "listener has not been set up yet for port " << port; + // After this point it is assumed that, once we close our Adb Data + // socket, the Adb forwarder command will propagate the closing of + // sockets all the way to the host side. break; } - case command::DATA_CONNECTION: - if (listener == NULL) { - LOG(ERROR) << "Data Connection command received, but " - << "listener has not been set up yet for port " << port; - // After this point it is assumed that, once we close our Adb Data - // socket, the Adb forwarder command will propagate the closing of - // sockets all the way to the host side. - continue; - } else if (!listener->SetAdbDataSocket(socket.Pass())) { - LOG(ERROR) << "Could not set Adb Data Socket for port: " << port; - // Same assumption as above, but in this case the socket is closed - // inside SetAdbDataSocket. - continue; - } - break; - case command::UNMAP_PORT: - if (!listener) { - SendCommand(command::UNMAP_PORT_ERROR, port, socket.get()); - break; - } - listener->ForceExit(); - listener->Join(); - CHECK(!listener->is_alive()); - listeners_.Remove(port); - SendCommand(command::UNMAP_PORT_SUCCESS, port, socket.get()); + listener->SetAdbDataSocket(socket.Pass()); + break; + case command::UNLISTEN: + if (!listener) { + SendCommand(command::UNLISTEN_ERROR, port, socket.get()); break; - default: - // TODO(felipeg): add a KillAllListeners command. - LOG(ERROR) << "Invalid command received. Port: " << port - << " Command: " << command; - } + } + listeners_.erase(listener_it); + SendCommand(command::UNLISTEN_SUCCESS, port, socket.get()); + break; + default: + // TODO(felipeg): add a KillAllListeners command. + LOG(ERROR) << "Invalid command received. Port: " << port + << " Command: " << command; } - KillAllListeners(); - CleanUpDeadListeners(); +} + +// static +void DeviceController::DeleteListener( + const base::WeakPtr<DeviceController>& device_controller_ptr, + int listener_port) { + DeviceController* const controller = device_controller_ptr.get(); + if (!controller) + return; + DCHECK(controller->construction_task_runner_->RunsTasksOnCurrentThread()); + const ListenersMap::iterator listener_it = controller->listeners_.find( + listener_port); + if (listener_it == controller->listeners_.end()) + return; + const linked_ptr<DeviceListener> listener = listener_it->second; + // Note that the listener is removed from the map before it gets destroyed in + // case its destructor would access the map. + controller->listeners_.erase(listener_it); } } // namespace forwarder diff --git a/tools/android/forwarder2/device_controller.h b/tools/android/forwarder2/device_controller.h index 4c36698..3daedb3 100644 --- a/tools/android/forwarder2/device_controller.h +++ b/tools/android/forwarder2/device_controller.h @@ -5,39 +5,58 @@ #ifndef TOOLS_ANDROID_FORWARDER2_DEVICE_CONTROLLER_H_ #define TOOLS_ANDROID_FORWARDER2_DEVICE_CONTROLLER_H_ -#include <hash_map> #include <string> #include "base/basictypes.h" -#include "base/id_map.h" +#include "base/containers/hash_tables.h" #include "base/memory/linked_ptr.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" #include "tools/android/forwarder2/socket.h" +namespace base { +class SingleThreadTaskRunner; +} // namespace base + namespace forwarder2 { class DeviceListener; +// There is a single DeviceController per device_forwarder process, and it is in +// charge of managing all active redirections on the device side (one +// DeviceListener each). class DeviceController { public: - explicit DeviceController(int exit_notifier_fd); + static scoped_ptr<DeviceController> Create(const std::string& adb_unix_socket, + int exit_notifier_fd); ~DeviceController(); - bool Init(const std::string& adb_unix_socket); - void Start(); private: - void KillAllListeners(); - void CleanUpDeadListeners(); + typedef base::hash_map< + int /* port */, linked_ptr<DeviceListener> > ListenersMap; - // Map from Port to DeviceListener objects (owns the pointer). - typedef IDMap<DeviceListener, IDMapOwnPointer> ListenersMap; + DeviceController(scoped_ptr<Socket> host_socket, int exit_notifier_fd); - ListenersMap listeners_; - Socket kickstart_adb_socket_; + void AcceptHostCommandSoon(); + void AcceptHostCommandInternal(); + // Note that this can end up being called after the DeviceController is + // destroyed which is why a weak pointer is used. + static void DeleteListener( + const base::WeakPtr<DeviceController>& device_controller_ptr, + int listener_port); + + const scoped_ptr<Socket> host_socket_; // Used to notify the controller to exit. const int exit_notifier_fd_; + // Lets ensure DeviceListener instances are deleted on the thread they were + // created on. + const scoped_refptr<base::SingleThreadTaskRunner> construction_task_runner_; + base::WeakPtrFactory<DeviceController> weak_ptr_factory_; + ListenersMap listeners_; DISALLOW_COPY_AND_ASSIGN(DeviceController); }; diff --git a/tools/android/forwarder2/device_forwarder_main.cc b/tools/android/forwarder2/device_forwarder_main.cc index 5c94769..cad46f4 100644 --- a/tools/android/forwarder2/device_forwarder_main.cc +++ b/tools/android/forwarder2/device_forwarder_main.cc @@ -48,6 +48,25 @@ class ServerDelegate : public Daemon::ServerDelegate { public: ServerDelegate() : initialized_(false) {} + virtual ~ServerDelegate() { + if (!controller_thread_.get()) + return; + // The DeviceController instance, if any, is constructed on the controller + // thread. Make sure that it gets deleted on that same thread. Note that + // DeleteSoon() is not used here since it would imply reading |controller_| + // from the main thread while it's set on the internal thread. + controller_thread_->message_loop_proxy()->PostTask( + FROM_HERE, + base::Bind(&ServerDelegate::DeleteControllerOnInternalThread, + base::Unretained(this))); + } + + void DeleteControllerOnInternalThread() { + DCHECK( + controller_thread_->message_loop_proxy()->RunsTasksOnCurrentThread()); + controller_.reset(); + } + // Daemon::ServerDelegate: virtual void Init() OVERRIDE { DCHECK(!g_notifier); @@ -65,31 +84,30 @@ class ServerDelegate : public Daemon::ServerDelegate { } controller_thread_->message_loop()->PostTask( FROM_HERE, - base::Bind(&ServerDelegate::StartController, GetExitNotifierFD(), - base::Passed(&client_socket))); + base::Bind(&ServerDelegate::StartController, base::Unretained(this), + GetExitNotifierFD(), base::Passed(&client_socket))); initialized_ = true; } private: - static void StartController(int exit_notifier_fd, - scoped_ptr<Socket> client_socket) { - forwarder2::DeviceController controller(exit_notifier_fd); - if (!controller.Init(kUnixDomainSocketPath)) { + void StartController(int exit_notifier_fd, scoped_ptr<Socket> client_socket) { + DCHECK(!controller_.get()); + scoped_ptr<DeviceController> controller( + DeviceController::Create(kUnixDomainSocketPath, exit_notifier_fd)); + if (!controller.get()) { client_socket->WriteString( base::StringPrintf("ERROR: Could not initialize device controller " "with ADB socket path: %s", kUnixDomainSocketPath)); return; } + controller_.swap(controller); + controller_->Start(); client_socket->WriteString("OK"); client_socket->Close(); - // Note that the following call is blocking which explains why the device - // controller has to live on a separate thread (so that the daemon command - // server is not blocked). - controller.Start(); } - base::AtExitManager at_exit_manager_; // Used by base::Thread. + scoped_ptr<DeviceController> controller_; scoped_ptr<base::Thread> controller_thread_; bool initialized_; }; @@ -128,6 +146,7 @@ int RunDeviceForwarder(int argc, char** argv) { std::cerr << "Usage: device_forwarder [--kill-server]" << std::endl; return 1; } + base::AtExitManager at_exit_manager; // Used by base::Thread. ClientDelegate client_delegate; ServerDelegate daemon_delegate; const char kLogFilePath[] = ""; // Log to logcat. diff --git a/tools/android/forwarder2/device_listener.cc b/tools/android/forwarder2/device_listener.cc index 4b6265d..1819a8a 100644 --- a/tools/android/forwarder2/device_listener.cc +++ b/tools/android/forwarder2/device_listener.cc @@ -4,185 +4,145 @@ #include "tools/android/forwarder2/device_listener.h" -#include <errno.h> -#include <signal.h> -#include <stdio.h> -#include <stdlib.h> - -#include <string> - +#include "base/bind.h" +#include "base/bind_helpers.h" +#include "base/callback.h" #include "base/logging.h" #include "base/memory/scoped_ptr.h" +#include "base/message_loop/message_loop_proxy.h" +#include "base/single_thread_task_runner.h" #include "tools/android/forwarder2/command.h" #include "tools/android/forwarder2/forwarder.h" #include "tools/android/forwarder2/socket.h" -namespace { -// Timeout for the Listener to be waiting for a new Adb Data Connection. -const int kDeviceListenerTimeoutSeconds = 10; -} - namespace forwarder2 { -DeviceListener::DeviceListener(scoped_ptr<Socket> adb_control_socket, int port) - : adb_control_socket_(adb_control_socket.Pass()), - listener_port_(port), - is_alive_(false), - must_exit_(false) { - CHECK(adb_control_socket_.get()); - adb_control_socket_->AddEventFd(exit_notifier_.receiver_fd()); - listener_socket_.AddEventFd(exit_notifier_.receiver_fd()); - pthread_mutex_init(&adb_data_socket_mutex_, NULL); - pthread_cond_init(&adb_data_socket_cond_, NULL); +// static +scoped_ptr<DeviceListener> DeviceListener::Create( + scoped_ptr<Socket> host_socket, + int listener_port, + const DeleteCallback& delete_callback) { + scoped_ptr<Socket> listener_socket(new Socket()); + scoped_ptr<DeviceListener> device_listener; + if (!listener_socket->BindTcp("", listener_port)) { + LOG(ERROR) << "Device could not bind and listen to local port " + << listener_port; + SendCommand(command::BIND_ERROR, listener_port, host_socket.get()); + return device_listener.Pass(); + } + // In case the |listener_port_| was zero, GetPort() will return the + // currently (non-zero) allocated port for this socket. + listener_port = listener_socket->GetPort(); + SendCommand(command::BIND_SUCCESS, listener_port, host_socket.get()); + device_listener.reset( + new DeviceListener( + scoped_ptr<PipeNotifier>(new PipeNotifier()), listener_socket.Pass(), + host_socket.Pass(), listener_port, delete_callback)); + return device_listener.Pass(); } DeviceListener::~DeviceListener() { - CHECK(!is_alive()); - adb_control_socket_->Close(); - listener_socket_.Close(); + DCHECK(deletion_task_runner_->RunsTasksOnCurrentThread()); + exit_notifier_->Notify(); } -void DeviceListener::SetMustExitLocked() { - must_exit_ = true; - exit_notifier_.Notify(); +void DeviceListener::Start() { + thread_.Start(); + AcceptNextClientSoon(); } -void DeviceListener::ForceExit() { - // Set must_exit and wake up the threads waiting. - pthread_mutex_lock(&adb_data_socket_mutex_); - SetMustExitLocked(); - pthread_cond_broadcast(&adb_data_socket_cond_); - pthread_mutex_unlock(&adb_data_socket_mutex_); +void DeviceListener::SetAdbDataSocket(scoped_ptr<Socket> adb_data_socket) { + thread_.message_loop_proxy()->PostTask( + FROM_HERE, + base::Bind(&DeviceListener::OnAdbDataSocketReceivedOnInternalThread, + base::Unretained(this), base::Passed(&adb_data_socket))); } -bool DeviceListener::WaitForAdbDataSocket() { - timespec time_to_wait = {}; - time_to_wait.tv_sec = time(NULL) + kDeviceListenerTimeoutSeconds; - - pthread_mutex_lock(&adb_data_socket_mutex_); - int ret = 0; - while (!must_exit_ && adb_data_socket_.get() == NULL && ret == 0) { - ret = pthread_cond_timedwait(&adb_data_socket_cond_, - &adb_data_socket_mutex_, - &time_to_wait); - if (ret != 0) { - if (adb_data_socket_.get()) { - adb_data_socket_->Close(); - adb_data_socket_.reset(); - } - LOG(ERROR) << "Error while waiting for Adb Data Socket."; - SetMustExitLocked(); - } - } - pthread_mutex_unlock(&adb_data_socket_mutex_); - if (ret == ETIMEDOUT) { - LOG(ERROR) << "DeviceListener timeout while waiting for " - << "the Adb Data Socket for port: " << listener_port_; - } - // Check both return value and also if set_must_exit() was called. - return ret == 0 && !must_exit_ && adb_data_socket_.get(); +DeviceListener::DeviceListener(scoped_ptr<PipeNotifier> pipe_notifier, + scoped_ptr<Socket> listener_socket, + scoped_ptr<Socket> host_socket, + int port, + const DeleteCallback& delete_callback) + : exit_notifier_(pipe_notifier.Pass()), + listener_socket_(listener_socket.Pass()), + host_socket_(host_socket.Pass()), + listener_port_(port), + delete_callback_(delete_callback), + deletion_task_runner_(base::MessageLoopProxy::current()), + thread_("DeviceListener") { + CHECK(host_socket_.get()); + DCHECK(deletion_task_runner_.get()); + DCHECK(exit_notifier_.get()); + host_socket_->AddEventFd(exit_notifier_->receiver_fd()); + listener_socket_->AddEventFd(exit_notifier_->receiver_fd()); } -bool DeviceListener::SetAdbDataSocket(scoped_ptr<Socket> adb_data_socket) { - CHECK(adb_data_socket.get()); - pthread_mutex_lock(&adb_data_socket_mutex_); - adb_data_socket_.swap(adb_data_socket); - int ret = pthread_cond_broadcast(&adb_data_socket_cond_); - if (ret != 0) - SetMustExitLocked(); - - // We must check |must_exit_| while still in the lock, since the ownership of - // the adb_data_socket_ must be transactionally transferred to the other - // thread. - if (must_exit_ && adb_data_socket_.get()) { - adb_data_socket_->Close(); - adb_data_socket_.reset(); - } - pthread_mutex_unlock(&adb_data_socket_mutex_); - return ret == 0; +void DeviceListener::AcceptNextClientSoon() { + thread_.message_loop_proxy()->PostTask( + FROM_HERE, + base::Bind(&DeviceListener::AcceptClientOnInternalThread, + base::Unretained(this))); } -void DeviceListener::RunInternal() { - while (!must_exit_) { - scoped_ptr<Socket> device_data_socket(new Socket); - if (!listener_socket_.Accept(device_data_socket.get())) { - if (listener_socket_.DidReceiveEvent()) { - LOG(INFO) << "Received exit notification, stopped accepting clients."; - break; - } - LOG(WARNING) << "Could not Accept in ListenerSocket."; - SendCommand(command::ACCEPT_ERROR, - listener_port_, - adb_control_socket_.get()); - break; - } - SendCommand(command::ACCEPT_SUCCESS, - listener_port_, - adb_control_socket_.get()); - if (!ReceivedCommand(command::HOST_SERVER_SUCCESS, - adb_control_socket_.get())) { - SendCommand(command::ACK, listener_port_, adb_control_socket_.get()); - LOG(ERROR) << "Host could not connect to server."; - device_data_socket->Close(); - if (adb_control_socket_->has_error()) { - LOG(ERROR) << "Adb Control connection lost. " - << "Listener port: " << listener_port_; - break; - } - // It can continue if the host forwarder could not connect to the host - // server but the control connection is still alive (no errors). The - // device acknowledged that (above), and it can re-try later. - continue; +void DeviceListener::AcceptClientOnInternalThread() { + device_data_socket_.reset(new Socket()); + if (!listener_socket_->Accept(device_data_socket_.get())) { + if (listener_socket_->DidReceiveEvent()) { + LOG(INFO) << "Received exit notification, stopped accepting clients."; + SelfDelete(); + return; } - if (!WaitForAdbDataSocket()) { - LOG(ERROR) << "Device could not receive an Adb Data connection."; - SendCommand(command::ADB_DATA_SOCKET_ERROR, - listener_port_, - adb_control_socket_.get()); - device_data_socket->Close(); - continue; + LOG(WARNING) << "Could not Accept in ListenerSocket."; + SendCommand(command::ACCEPT_ERROR, listener_port_, host_socket_.get()); + SelfDelete(); + return; + } + SendCommand(command::ACCEPT_SUCCESS, listener_port_, host_socket_.get()); + if (!ReceivedCommand(command::HOST_SERVER_SUCCESS, + host_socket_.get())) { + SendCommand(command::ACK, listener_port_, host_socket_.get()); + LOG(ERROR) << "Host could not connect to server."; + device_data_socket_->Close(); + if (host_socket_->has_error()) { + LOG(ERROR) << "Adb Control connection lost. " + << "Listener port: " << listener_port_; + SelfDelete(); + return; } - SendCommand(command::ADB_DATA_SOCKET_SUCCESS, - listener_port_, - adb_control_socket_.get()); - CHECK(adb_data_socket_.get()); - // Forwarder object will self delete after returning from the Run() call. - Forwarder* forwarder = new Forwarder(device_data_socket.Pass(), - adb_data_socket_.Pass()); - forwarder->Start(); + // It can continue if the host forwarder could not connect to the host + // server but the control connection is still alive (no errors). The device + // acknowledged that (above), and it can re-try later. + AcceptNextClientSoon(); + return; } } -bool DeviceListener::BindListenerSocket() { - bool success = listener_socket_.BindTcp("", listener_port_); - if (success) { - // In case the |listener_port_| was zero, GetPort() will return the - // currently (non-zero) allocated port for this socket. - listener_port_ = listener_socket_.GetPort(); - SendCommand(command::BIND_SUCCESS, - listener_port_, - adb_control_socket_.get()); - is_alive_ = true; - } else { - LOG(ERROR) << "Device could not bind and listen to local port " - << listener_port_; - SendCommand(command::BIND_ERROR, - listener_port_, - adb_control_socket_.get()); - adb_control_socket_->Close(); +void DeviceListener::OnAdbDataSocketReceivedOnInternalThread( + scoped_ptr<Socket> adb_data_socket) { + adb_data_socket_.swap(adb_data_socket); + SendCommand(command::ADB_DATA_SOCKET_SUCCESS, listener_port_, + host_socket_.get()); + CHECK(adb_data_socket_.get()); + StartForwarder(device_data_socket_.Pass(), adb_data_socket_.Pass()); + AcceptNextClientSoon(); +} + +void DeviceListener::SelfDelete() { + if (!deletion_task_runner_->RunsTasksOnCurrentThread()) { + deletion_task_runner_->PostTask( + FROM_HERE, + base::Bind(&DeviceListener::SelfDeleteOnDeletionTaskRunner, + delete_callback_, listener_port_)); + return; } - return success; + SelfDeleteOnDeletionTaskRunner(delete_callback_, listener_port_); } -void DeviceListener::Run() { - if (is_alive_) - RunInternal(); - adb_control_socket_->Close(); - listener_socket_.Close(); - // This must be the last statement of the Run() method of the DeviceListener - // class, since the main thread checks if this thread is dead and deletes - // the object. It will be a race condition otherwise. - is_alive_ = false; +// static +void DeviceListener::SelfDeleteOnDeletionTaskRunner( + const DeleteCallback& delete_callback, + int listener_port) { + delete_callback.Run(listener_port); } } // namespace forwarder diff --git a/tools/android/forwarder2/device_listener.h b/tools/android/forwarder2/device_listener.h index 44d37aa..2a69823 100644 --- a/tools/android/forwarder2/device_listener.h +++ b/tools/android/forwarder2/device_listener.h @@ -5,76 +5,106 @@ #ifndef TOOLS_ANDROID_FORWARDER2_DEVICE_LISTENER_H_ #define TOOLS_ANDROID_FORWARDER2_DEVICE_LISTENER_H_ -#include <pthread.h> - #include "base/basictypes.h" +#include "base/callback.h" #include "base/compiler_specific.h" #include "base/logging.h" +#include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" +#include "base/threading/thread.h" #include "tools/android/forwarder2/pipe_notifier.h" #include "tools/android/forwarder2/socket.h" -#include "tools/android/forwarder2/thread.h" + +namespace base { +class SingleThreadTaskRunner; +} // namespace base namespace forwarder2 { class Forwarder; -class DeviceListener : public Thread { +// A DeviceListener instance is used in the device_forwarder program to bind to +// a specific device-side |port| and wait for client connections. When a +// connection happens, it informs the corresponding HostController instance +// running on the host, through |host_socket|. Then the class expects a call to +// its SetAdbDataSocket() method (performed by the device controller) once the +// host opened a new connection to the device. When this happens, a new internal +// Forwarder instance is started. +// Note that instances of this class are owned by the device controller which +// creates and destroys them on the same thread. In case an internal error +// happens on the DeviceListener's internal thread, the DeviceListener +// can also self-delete by executing the user-provided callback on the thread +// the DeviceListener was created on. +// Note that the DeviceListener's destructor joins its internal thread (i.e. +// waits for its completion) which means that the internal thread is guaranteed +// not to be running anymore once the object is deleted. +class DeviceListener { public: - DeviceListener(scoped_ptr<Socket> adb_control_socket, int port); - virtual ~DeviceListener(); + // Callback that is used for self-deletion as a way to let the device + // controller perform some additional cleanup work (e.g. removing the device + // listener instance from its internal map before deleting it). + typedef base::Callback<void (int /* listener port */)> DeleteCallback; - bool WaitForAdbDataSocket(); + static scoped_ptr<DeviceListener> Create( + scoped_ptr<Socket> host_socket, + int port, + const DeleteCallback& delete_callback); - bool SetAdbDataSocket(scoped_ptr<Socket> adb_data_socket); + ~DeviceListener(); - bool BindListenerSocket(); + void Start(); - // |is_alive_| is set only on BindAndListenSocket and written once when Run() - // terminates. So even in case of a race condition, the worst that could - // happen is for the main thread to see the listener alive when it isn't. And - // also, this is not a problem since the main thread checks the liveliness of - // the listeners in a loop. - bool is_alive() const { return is_alive_; } - void ForceExit(); + void SetAdbDataSocket(scoped_ptr<Socket> adb_data_socket); int listener_port() const { return listener_port_; } - protected: - // Thread: - virtual void Run() OVERRIDE; - private: - void RunInternal(); - - // Must be called after successfully acquired mutex. - void SetMustExitLocked(); - - // The listener socket for sending control commands. - scoped_ptr<Socket> adb_control_socket_; - + DeviceListener(scoped_ptr<PipeNotifier> pipe_notifier, + scoped_ptr<Socket> listener_socket, + scoped_ptr<Socket> host_socket, + int port, + const DeleteCallback& delete_callback); + + // Pushes an AcceptClientOnInternalThread() task to the internal thread's + // message queue in order to wait for a new client soon. + void AcceptNextClientSoon(); + + void AcceptClientOnInternalThread(); + + void OnAdbDataSocketReceivedOnInternalThread( + scoped_ptr<Socket> adb_data_socket); + + void SelfDelete(); + + // Note that this can be called after the DeviceListener instance gets deleted + // which is why this method is static. + static void SelfDeleteOnDeletionTaskRunner( + const DeleteCallback& delete_callback, + int listener_port); + + // Used for the listener thread to be notified on destruction. We have one + // notifier per Listener thread since each Listener thread may be requested to + // exit for different reasons independently from each other and independent + // from the main program, ex. when the host requests to forward/listen the + // same port again. Both the |host_socket_| and |listener_socket_| + // must share the same receiver file descriptor from |exit_notifier_| and it + // is set in the constructor. + const scoped_ptr<PipeNotifier> exit_notifier_; // The local device listener socket for accepting connections from the local // port (listener_port_). - Socket listener_socket_; - + const scoped_ptr<Socket> listener_socket_; + // The listener socket for sending control commands. + const scoped_ptr<Socket> host_socket_; + scoped_ptr<Socket> device_data_socket_; // This is the adb connection to transport the actual data, used for creating // the forwarder. Ownership transferred to the Forwarder. scoped_ptr<Socket> adb_data_socket_; - - int listener_port_; - pthread_mutex_t adb_data_socket_mutex_; - pthread_cond_t adb_data_socket_cond_; - bool is_alive_; - bool must_exit_; - - // Used for the listener thread to be notified from ForceExit() which is - // called from the main thread. We have one notifier per Listener thread since - // each Listener thread may be requested to exit for different reasons - // independently from each other and independent from the main program, - // ex. when the host requests to forward/listen the same port again. Both the - // |adb_control_socket_| and |listener_socket_| must share the same receiver - // file descriptor from |exit_notifier_| and it is set in the constructor. - PipeNotifier exit_notifier_; + const int listener_port_; + const DeleteCallback delete_callback_; + // Task runner used for deletion set at construction time (i.e. the object is + // deleted on the same thread it is created on). + scoped_refptr<base::SingleThreadTaskRunner> deletion_task_runner_; + base::Thread thread_; DISALLOW_COPY_AND_ASSIGN(DeviceListener); }; diff --git a/tools/android/forwarder2/forwarder.cc b/tools/android/forwarder2/forwarder.cc index 5068572..df4c29c 100644 --- a/tools/android/forwarder2/forwarder.cc +++ b/tools/android/forwarder2/forwarder.cc @@ -4,18 +4,15 @@ #include "tools/android/forwarder2/forwarder.h" -#include <errno.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - +#include "base/basictypes.h" +#include "base/bind.h" #include "base/logging.h" +#include "base/memory/ref_counted.h" #include "base/posix/eintr_wrapper.h" -#include "base/safe_strerror_posix.h" +#include "base/single_thread_task_runner.h" #include "tools/android/forwarder2/socket.h" namespace forwarder2 { - namespace { // Helper class to buffer reads and writes from one socket to another. @@ -87,62 +84,88 @@ class BufferedCopier { DISALLOW_COPY_AND_ASSIGN(BufferedCopier); }; -} // namespace - -Forwarder::Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) - : socket1_(socket1.Pass()), - socket2_(socket2.Pass()) { - DCHECK(socket1_.get()); - DCHECK(socket2_.get()); -} - -Forwarder::~Forwarder() { - Detach(); -} - -void Forwarder::Run() { - const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1; - fd_set read_fds; - fd_set write_fds; - - // Copy from socket1 to socket2 - BufferedCopier buffer1(socket1_.get(), socket2_.get()); +// Internal class that wraps a helper thread to forward traffic between +// |socket1| and |socket2|. After creating a new instance, call its Start() +// method to launch operations. Thread stops automatically if one of the socket +// disconnects, but ensures that all buffered writes to the other, still alive, +// socket, are written first. When this happens, the instance will delete itself +// automatically. +// Note that the instance will always be destroyed on the same thread that +// created it. +class Forwarder { + public: + Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) + : socket1_(socket1.Pass()), + socket2_(socket2.Pass()), + destructor_runner_(base::MessageLoopProxy::current()), + thread_("ForwarderThread") { + } - // Copy from socket2 to socket1 - BufferedCopier buffer2(socket2_.get(), socket1_.get()); + void Start() { + thread_.Start(); + thread_.message_loop_proxy()->PostTask( + FROM_HERE, + base::Bind(&Forwarder::ThreadHandler, base::Unretained(this))); + } - bool run = true; - while (run) { - FD_ZERO(&read_fds); - FD_ZERO(&write_fds); + private: + void ThreadHandler() { + const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1; + fd_set read_fds; + fd_set write_fds; + + // Copy from socket1 to socket2 + BufferedCopier buffer1(socket1_.get(), socket2_.get()); + // Copy from socket2 to socket1 + BufferedCopier buffer2(socket2_.get(), socket1_.get()); + + bool run = true; + while (run) { + FD_ZERO(&read_fds); + FD_ZERO(&write_fds); + + buffer1.AddToReadSet(&read_fds); + buffer2.AddToReadSet(&read_fds); + buffer1.AddToWriteSet(&write_fds); + buffer2.AddToWriteSet(&write_fds); + + if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) { + PLOG(ERROR) << "select"; + break; + } + // When a socket in the read set closes the connection, select() returns + // with that socket descriptor set as "ready to read". When we call + // TryRead() below, it will return false, but the while loop will continue + // to run until all the write operations are finished, to make sure the + // buffers are completely flushed out. + + // Keep running while we have some operation to do. + run = buffer1.TryRead(read_fds); + run = run || buffer2.TryRead(read_fds); + run = run || buffer1.TryWrite(write_fds); + run = run || buffer2.TryWrite(write_fds); + } - buffer1.AddToReadSet(&read_fds); - buffer2.AddToReadSet(&read_fds); - buffer1.AddToWriteSet(&write_fds); - buffer2.AddToWriteSet(&write_fds); + // Note that the thread that |destruction_runner_| runs tasks on could be + // temporarily blocked on I/O (e.g. select()) therefore it is safer to close + // the sockets now rather than relying on the destructor. + socket1_.reset(); + socket2_.reset(); - if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) { - LOG(ERROR) << "Select error: " << safe_strerror(errno); - break; - } - // When a socket in the read set closes the connection, select() returns - // with that socket descriptor set as "ready to read". When we call - // TryRead() below, it will return false, but the while loop will continue - // to run until all the write operations are finished, to make sure the - // buffers are completely flushed out. - - // Keep running while we have some operation to do. - run = buffer1.TryRead(read_fds); - run = run || buffer2.TryRead(read_fds); - run = run || buffer1.TryWrite(write_fds); - run = run || buffer2.TryWrite(write_fds); + // Note that base::Thread must be destroyed on the thread it was created on. + destructor_runner_->DeleteSoon(FROM_HERE, this); } - delete this; -} + scoped_ptr<Socket> socket1_; + scoped_ptr<Socket> socket2_; + scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_; + base::Thread thread_; +}; + +} // namespace -void Forwarder::Join() { - NOTREACHED() << "Can't Join a Forwarder thread."; +void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) { + (new Forwarder(socket1.Pass(), socket2.Pass()))->Start(); } -} // namespace forwarder +} // namespace forwarder2 diff --git a/tools/android/forwarder2/forwarder.gyp b/tools/android/forwarder2/forwarder.gyp index f12174a..fdc19aa4 100644 --- a/tools/android/forwarder2/forwarder.gyp +++ b/tools/android/forwarder2/forwarder.gyp @@ -53,7 +53,6 @@ 'forwarder.cc', 'pipe_notifier.cc', 'socket.cc', - 'thread.cc', ], }, { @@ -76,7 +75,6 @@ 'host_forwarder_main.cc', 'pipe_notifier.cc', 'socket.cc', - 'thread.cc', # TODO(pliard): Remove this. This is needed to avoid undefined # references at link time. '../../../base/message_loop/message_pump_glib.cc', diff --git a/tools/android/forwarder2/forwarder.h b/tools/android/forwarder2/forwarder.h index 1ba09232..651b5e8 100644 --- a/tools/android/forwarder2/forwarder.h +++ b/tools/android/forwarder2/forwarder.h @@ -5,35 +5,15 @@ #ifndef TOOLS_ANDROID_FORWARDER2_FORWARDER_H_ #define TOOLS_ANDROID_FORWARDER2_FORWARDER_H_ -#include "base/compiler_specific.h" -#include "base/logging.h" #include "base/memory/scoped_ptr.h" -#include "tools/android/forwarder2/socket.h" -#include "tools/android/forwarder2/thread.h" +#include "base/threading/thread.h" namespace forwarder2 { -class Forwarder : public Thread { - public: - Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2); - virtual ~Forwarder(); +class Socket; - // This object self deletes after running, so one cannot join. - // Thread: - virtual void Join() OVERRIDE; +void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2); - protected: - // Thread: - // This object self deletes after running. - virtual void Run() OVERRIDE; - - private: - scoped_ptr<Socket> socket1_; - scoped_ptr<Socket> socket2_; - - DISALLOW_COPY_AND_ASSIGN(Forwarder); -}; - -} // namespace forwarder +} // namespace forwarder2 #endif // TOOLS_ANDROID_FORWARDER2_FORWARDER_H_ diff --git a/tools/android/forwarder2/host_controller.cc b/tools/android/forwarder2/host_controller.cc index d98a4ad2..1588e72 100644 --- a/tools/android/forwarder2/host_controller.cc +++ b/tools/android/forwarder2/host_controller.cc @@ -16,95 +16,109 @@ namespace forwarder2 { -HostController::HostController(int device_port, - const std::string& forward_to_host, - int forward_to_host_port, - int adb_port, - int exit_notifier_fd, - const DeleteCallback& delete_callback) - : device_port_(device_port), - forward_to_host_(forward_to_host), - forward_to_host_port_(forward_to_host_port), - adb_port_(adb_port), - global_exit_notifier_fd_(exit_notifier_fd), - delete_callback_(delete_callback), - ready_(false), - thread_("HostControllerThread") { - adb_control_socket_.AddEventFd(global_exit_notifier_fd_); - adb_control_socket_.AddEventFd(delete_controller_notifier_.receiver_fd()); -} - -HostController::~HostController() { - delete_controller_notifier_.Notify(); - // Note that the Forwarder instance (that also received a delete notification) - // might still be running on its own thread at this point. This is not a - // problem since it will self-delete once the socket that it is operating on - // is closed. -} - -bool HostController::Connect() { - if (!adb_control_socket_.ConnectTcp("", adb_port_)) { +// static +scoped_ptr<HostController> HostController::Create( + int device_port, + int host_port, + int adb_port, + int exit_notifier_fd, + const DeletionCallback& deletion_callback) { + scoped_ptr<HostController> host_controller; + scoped_ptr<PipeNotifier> delete_controller_notifier(new PipeNotifier()); + scoped_ptr<Socket> adb_control_socket(new Socket()); + adb_control_socket->AddEventFd(exit_notifier_fd); + adb_control_socket->AddEventFd(delete_controller_notifier->receiver_fd()); + if (!adb_control_socket->ConnectTcp(std::string(), adb_port)) { LOG(ERROR) << "Could not connect HostController socket on port: " - << adb_port_; - SelfDelete(); - return false; + << adb_port; + return host_controller.Pass(); } // Send the command to the device start listening to the "device_forward_port" bool send_command_success = SendCommand( - command::LISTEN, device_port_, &adb_control_socket_); + command::LISTEN, device_port, adb_control_socket.get()); CHECK(send_command_success); int device_port_allocated; command::Type command; - if (!ReadCommand(&adb_control_socket_, &device_port_allocated, &command) || + if (!ReadCommand( + adb_control_socket.get(), &device_port_allocated, &command) || command != command::BIND_SUCCESS) { - LOG(ERROR) << "Device binding error using port " << device_port_; - SelfDelete(); - return false; + LOG(ERROR) << "Device binding error using port " << device_port; + return host_controller.Pass(); } - // When doing dynamically allocation of port, we get the port from the - // BIND_SUCCESS command we received above. - device_port_ = device_port_allocated; - ready_ = true; - return true; + host_controller.reset( + new HostController( + device_port_allocated, host_port, adb_port, exit_notifier_fd, + deletion_callback, adb_control_socket.Pass(), + delete_controller_notifier.Pass())); + return host_controller.Pass(); +} + +HostController::~HostController() { + DCHECK(deletion_task_runner_->RunsTasksOnCurrentThread()); + delete_controller_notifier_->Notify(); + // Note that the Forwarder instance (that also received a delete notification) + // might still be running on its own thread at this point. This is not a + // problem since it will self-delete once the socket that it is operating on + // is closed. } void HostController::Start() { thread_.Start(); + ReadNextCommandSoon(); +} + +HostController::HostController( + int device_port, + int host_port, + int adb_port, + int exit_notifier_fd, + const DeletionCallback& deletion_callback, + scoped_ptr<Socket> adb_control_socket, + scoped_ptr<PipeNotifier> delete_controller_notifier) + : device_port_(device_port), + host_port_(host_port), + adb_port_(adb_port), + global_exit_notifier_fd_(exit_notifier_fd), + deletion_callback_(deletion_callback), + adb_control_socket_(adb_control_socket.Pass()), + delete_controller_notifier_(delete_controller_notifier.Pass()), + deletion_task_runner_(base::MessageLoopProxy::current()), + thread_("HostControllerThread") { +} + +void HostController::ReadNextCommandSoon() { thread_.message_loop_proxy()->PostTask( FROM_HERE, - base::Bind(&HostController::ThreadHandler, base::Unretained(this))); + base::Bind(&HostController::ReadCommandOnInternalThread, + base::Unretained(this))); } -void HostController::ThreadHandler() { - CHECK(ready_) << "HostController not ready. Must call Connect() first."; - while (true) { - if (!ReceivedCommand(command::ACCEPT_SUCCESS, &adb_control_socket_)) { - SelfDelete(); - return; - } - // Try to connect to host server. - scoped_ptr<Socket> host_server_data_socket(CreateSocket()); - if (!host_server_data_socket->ConnectTcp( - forward_to_host_, forward_to_host_port_)) { - LOG(ERROR) << "Could not Connect HostServerData socket on port: " - << forward_to_host_port_; - SendCommand(command::HOST_SERVER_ERROR, - device_port_, - &adb_control_socket_); - if (ReceivedCommand(command::ACK, &adb_control_socket_)) { - // It can continue if the host forwarder could not connect to the host - // server but the device acknowledged that, so that the device could - // re-try later. - continue; - } - SelfDelete(); +void HostController::ReadCommandOnInternalThread() { + if (!ReceivedCommand(command::ACCEPT_SUCCESS, adb_control_socket_.get())) { + SelfDelete(); + return; + } + // Try to connect to host server. + scoped_ptr<Socket> host_server_data_socket(CreateSocket()); + if (!host_server_data_socket->ConnectTcp(std::string(), host_port_)) { + LOG(ERROR) << "Could not Connect HostServerData socket on port: " + << host_port_; + SendCommand( + command::HOST_SERVER_ERROR, device_port_, adb_control_socket_.get()); + if (ReceivedCommand(command::ACK, adb_control_socket_.get())) { + // It can continue if the host forwarder could not connect to the host + // server but the device acknowledged that, so that the device could + // re-try later. + ReadNextCommandSoon(); return; } - SendCommand(command::HOST_SERVER_SUCCESS, - device_port_, - &adb_control_socket_); - StartForwarder(host_server_data_socket.Pass()); + SelfDelete(); + return; } + SendCommand( + command::HOST_SERVER_SUCCESS, device_port_, adb_control_socket_.get()); + StartForwarder(host_server_data_socket.Pass()); + ReadNextCommandSoon(); } void HostController::StartForwarder( @@ -117,37 +131,34 @@ void HostController::StartForwarder( } // Open the Adb data connection, and send a command with the // |device_forward_port| as a way for the device to identify the connection. - SendCommand(command::DATA_CONNECTION, - device_port_, - adb_data_socket.get()); + SendCommand(command::DATA_CONNECTION, device_port_, adb_data_socket.get()); // Check that the device received the new Adb Data Connection. Note that this // check is done through the |adb_control_socket_| that is handled in the // DeviceListener thread just after the call to WaitForAdbDataSocket(). if (!ReceivedCommand(command::ADB_DATA_SOCKET_SUCCESS, - &adb_control_socket_)) { + adb_control_socket_.get())) { LOG(ERROR) << "Device could not handle the new Adb Data Connection."; SelfDelete(); return; } - Forwarder* forwarder = new Forwarder(host_server_data_socket.Pass(), - adb_data_socket.Pass()); - // Forwarder object will self delete after returning. - forwarder->Start(); + forwarder2::StartForwarder( + host_server_data_socket.Pass(), adb_data_socket.Pass()); } scoped_ptr<Socket> HostController::CreateSocket() { scoped_ptr<Socket> socket(new Socket()); socket->AddEventFd(global_exit_notifier_fd_); - socket->AddEventFd(delete_controller_notifier_.receiver_fd()); + socket->AddEventFd(delete_controller_notifier_->receiver_fd()); return socket.Pass(); } void HostController::SelfDelete() { - base::ScopedClosureRunner delete_runner( - base::Bind( - &DeleteCallback::Run, base::Unretained(&delete_callback_), - base::Unretained(this))); + scoped_ptr<HostController> self_deleter(this); + deletion_task_runner_->PostTask( + FROM_HERE, + base::Bind(&HostController::SelfDeleteOnDeletionTaskRunner, + deletion_callback_, base::Passed(&self_deleter))); // Tell the device to delete its corresponding controller instance before we // self-delete. Socket socket; @@ -155,14 +166,21 @@ void HostController::SelfDelete() { LOG(ERROR) << "Could not connect to device on port " << adb_port_; return; } - if (!SendCommand(command::UNMAP_PORT, device_port_, &socket)) { + if (!SendCommand(command::UNLISTEN, device_port_, &socket)) { LOG(ERROR) << "Could not send unmap command for port " << device_port_; return; } - if (!ReceivedCommand(command::UNMAP_PORT_SUCCESS, &socket)) { + if (!ReceivedCommand(command::UNLISTEN_SUCCESS, &socket)) { LOG(ERROR) << "Unamp command failed for port " << device_port_; return; } } +// static +void HostController::SelfDeleteOnDeletionTaskRunner( + const DeletionCallback& deletion_callback, + scoped_ptr<HostController> controller) { + deletion_callback.Run(controller.Pass()); +} + } // namespace forwarder2 diff --git a/tools/android/forwarder2/host_controller.h b/tools/android/forwarder2/host_controller.h index dc75f88..aaedb94 100644 --- a/tools/android/forwarder2/host_controller.h +++ b/tools/android/forwarder2/host_controller.h @@ -17,42 +17,53 @@ namespace forwarder2 { -class HostProxy; - +// This class partners with DeviceController and has the same lifetime and +// threading characteristics as DeviceListener. In a nutshell, this class +// operates on its own thread and is destroyed on the thread it was constructed +// on. The class' deletion can happen in two different ways: +// - Its destructor was called by its owner (HostControllersManager). +// - Its internal thread requested self-deletion after an error happened. In +// this case the owner (HostControllersManager) is notified on the +// construction thread through the provided DeletionCallback invoked with the +// HostController instance. When this callback is invoked, it's up to the +// owner to delete the instance. class HostController { public: // Callback used for self-deletion that lets the client perform some cleanup // work before deleting the HostController instance. - typedef base::Callback<void (HostController*)> DeleteCallback; + typedef base::Callback<void (scoped_ptr<HostController>)> DeletionCallback; - // If |device_port| is zero, it dynamically allocates a port. - HostController(int device_port, - const std::string& forward_to_host, - int forward_to_host_port, - int adb_port, - int exit_notifier_fd, - const DeleteCallback& delete_callback); + // If |device_port| is zero then a dynamic port is allocated (and retrievable + // through device_port() below). + static scoped_ptr<HostController> Create( + int device_port, + int host_port, + int adb_port, + int exit_notifier_fd, + const DeletionCallback& deletion_callback); ~HostController(); - // Connects to the device forwarder app and sets the |device_port_| to the - // dynamically allocated port (when the provided |device_port| is zero). - // Returns true on success. Clients must call Connect() before calling - // Start(). - bool Connect(); - // Starts the internal controller thread. void Start(); int adb_port() const { return adb_port_; } - // Gets the current device allocated port. Must be called after Connect(). int device_port() const { return device_port_; } private: - void ThreadHandler(); + HostController(int device_port, + int host_port, + int adb_port, + int exit_notifier_fd, + const DeletionCallback& deletion_callback, + scoped_ptr<Socket> adb_control_socket, + scoped_ptr<PipeNotifier> delete_controller_notifier); - void StartForwarder(scoped_ptr<Socket> host_server_data_socket_ptr); + void ReadNextCommandSoon(); + void ReadCommandOnInternalThread(); + + void StartForwarder(scoped_ptr<Socket> host_server_data_socket); // Helper method that creates a socket and adds the appropriate event file // descriptors. @@ -60,19 +71,24 @@ class HostController { void SelfDelete(); - Socket adb_control_socket_; - int device_port_; - const std::string forward_to_host_; - const int forward_to_host_port_; + static void SelfDeleteOnDeletionTaskRunner( + const DeletionCallback& deletion_callback, + scoped_ptr<HostController> controller); + + const int device_port_; + const int host_port_; const int adb_port_; // Used to notify the controller when the process is killed. const int global_exit_notifier_fd_; // Used to let the client delete the instance in case an error happened. - const DeleteCallback delete_callback_; + const DeletionCallback deletion_callback_; + scoped_ptr<Socket> adb_control_socket_; + scoped_ptr<PipeNotifier> delete_controller_notifier_; // Used to cancel the pending blocking IO operations when the host controller // instance is deleted. - PipeNotifier delete_controller_notifier_; - bool ready_; + // Task runner used for deletion set at construction time (i.e. the object is + // deleted on the same thread it is created on). + const scoped_refptr<base::SingleThreadTaskRunner> deletion_task_runner_; base::Thread thread_; DISALLOW_COPY_AND_ASSIGN(HostController); diff --git a/tools/android/forwarder2/host_forwarder_main.cc b/tools/android/forwarder2/host_forwarder_main.cc index e38d8cb..1071aa7 100644 --- a/tools/android/forwarder2/host_forwarder_main.cc +++ b/tools/android/forwarder2/host_forwarder_main.cc @@ -16,6 +16,7 @@ #include <vector> #include "base/at_exit.h" +#include "base/basictypes.h" #include "base/bind.h" #include "base/command_line.h" #include "base/compiler_specific.h" @@ -25,6 +26,7 @@ #include "base/logging.h" #include "base/memory/linked_ptr.h" #include "base/memory/scoped_vector.h" +#include "base/memory/weak_ptr.h" #include "base/pickle.h" #include "base/posix/eintr_wrapper.h" #include "base/safe_strerror_posix.h" @@ -80,9 +82,24 @@ void KillHandler(int signal_number) { exit(1); } +// Manages HostController instances. There is one HostController instance for +// each connection being forwarded. Note that forwarding can happen with many +// devices (identified with a serial id). class HostControllersManager { public: - HostControllersManager() : has_failed_(false) {} + HostControllersManager() + : weak_ptr_factory_(this), + controllers_(new HostControllerMap()), + has_failed_(false) { + } + + ~HostControllersManager() { + if (!thread_.get()) + return; + // Delete the controllers on the thread they were created on. + thread_->message_loop_proxy()->DeleteSoon( + FROM_HERE, controllers_.release()); + } void HandleRequest(const std::string& device_serial, int device_port, @@ -117,25 +134,24 @@ class HostControllersManager { } // Invoked when a HostController instance reports an error (e.g. due to a - // device connectivity issue). - void DeleteHostController(HostController* host_controller) { - if (!thread_->message_loop_proxy()->RunsTasksOnCurrentThread()) { - // This can be invoked from the host controller internal thread. - thread_->message_loop_proxy()->PostTask( - FROM_HERE, - base::Bind( - &HostControllersManager::DeleteHostControllerOnInternalThread, - base::Unretained(this), host_controller)); + // device connectivity issue). Note that this could be called after the + // controller manager was destroyed which is why a weak pointer is used. + static void DeleteHostController( + const base::WeakPtr<HostControllersManager>& manager_ptr, + scoped_ptr<HostController> host_controller) { + HostController* const controller = host_controller.release(); + HostControllersManager* const manager = manager_ptr.get(); + if (!manager) { + // Note that |controller| is not leaked in this case since the host + // controllers manager owns the controllers. If the manager was deleted + // then all the controllers (including |controller|) were also deleted. return; } - DeleteHostControllerOnInternalThread(host_controller); - } - - void DeleteHostControllerOnInternalThread(HostController* host_controller) { - // Note that this will delete |host_controller| which is owned by the map. - controllers_.erase( - MakeHostControllerMapKey(host_controller->adb_port(), - host_controller->device_port())); + DCHECK(manager->thread_->message_loop_proxy()->RunsTasksOnCurrentThread()); + // Note that this will delete |controller| which is owned by the map. + manager->controllers_->erase( + MakeHostControllerMapKey(controller->adb_port(), + controller->device_port())); } void HandleRequestOnInternalThread(const std::string& device_serial, @@ -154,7 +170,7 @@ class HostControllersManager { // Remove the previously created host controller. const std::string controller_key = MakeHostControllerMapKey( adb_port, -device_port); - const HostControllerMap::size_type removed_elements = controllers_.erase( + const HostControllerMap::size_type removed_elements = controllers_->erase( controller_key); SendMessage( !removed_elements ? "ERROR: could not unmap port" : "OK", @@ -169,7 +185,7 @@ class HostControllersManager { if (!use_dynamic_port_allocation) { const std::string controller_key = MakeHostControllerMapKey( adb_port, device_port); - if (controllers_.find(controller_key) != controllers_.end()) { + if (controllers_->find(controller_key) != controllers_->end()) { LOG(INFO) << "Already forwarding device port " << device_port << " to host port " << host_port; SendMessage(base::StringPrintf("%d:%d", device_port, host_port), @@ -179,11 +195,11 @@ class HostControllersManager { } // Create a new host controller. scoped_ptr<HostController> host_controller( - new HostController( - device_port, "127.0.0.1", host_port, adb_port, GetExitNotifierFD(), + HostController::Create( + device_port, host_port, adb_port, GetExitNotifierFD(), base::Bind(&HostControllersManager::DeleteHostController, - base::Unretained(this)))); - if (!host_controller->Connect()) { + weak_ptr_factory_.GetWeakPtr()))); + if (!host_controller.get()) { has_failed_ = true; SendMessage("ERROR: Connection to device failed.", client_socket.get()); return; @@ -196,7 +212,7 @@ class HostControllersManager { if (!SendMessage(msg, client_socket.get())) return; host_controller->Start(); - controllers_.insert( + controllers_->insert( std::make_pair(MakeHostControllerMapKey(adb_port, device_port), linked_ptr<HostController>(host_controller.release()))); } @@ -232,8 +248,9 @@ class HostControllersManager { return result; } + base::WeakPtrFactory<HostControllersManager> weak_ptr_factory_; base::hash_map<std::string, int> device_serial_to_adb_port_map_; - HostControllerMap controllers_; + scoped_ptr<HostControllerMap> controllers_; bool has_failed_; scoped_ptr<base::AtExitManager> at_exit_manager_; // Needed by base::Thread. scoped_ptr<base::Thread> thread_; diff --git a/tools/android/forwarder2/thread.cc b/tools/android/forwarder2/thread.cc deleted file mode 100644 index 45b7558..0000000 --- a/tools/android/forwarder2/thread.cc +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright (c) 2012 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "tools/android/forwarder2/thread.h" - -#include "base/logging.h" - -namespace forwarder2 { - -Thread::Thread() : thread_(static_cast<pthread_t>(-1)) {} - -Thread::~Thread() {} - -void Thread::Start() { - int ret = pthread_create(&thread_, NULL, &ThreadCallback, this); - CHECK_EQ(0, ret); -} - -void Thread::Detach() { - CHECK_NE(static_cast<pthread_t>(-1), thread_); - int ret = pthread_detach(thread_); - CHECK_EQ(0, ret); -} - -void Thread::Join() { - CHECK_NE(static_cast<pthread_t>(-1), thread_); - int ret = pthread_join(thread_, NULL); - CHECK_EQ(0, ret); -} - -// static -void* Thread::ThreadCallback(void* arg) { - CHECK(arg); - Thread* obj = reinterpret_cast<Thread*>(arg); - obj->Run(); - return NULL; -} - -} // namespace forwarder diff --git a/tools/android/forwarder2/thread.h b/tools/android/forwarder2/thread.h deleted file mode 100644 index 72a8f52..0000000 --- a/tools/android/forwarder2/thread.h +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright (c) 2012 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef TOOLS_ANDROID_FORWARDER2_THREAD_H_ -#define TOOLS_ANDROID_FORWARDER2_THREAD_H_ - -#include <pthread.h> - -namespace forwarder2 { - -// Helper abstract class to create a new thread and curry the -// call to the Run() method in an object from the new thread. -class Thread { - public: - Thread(); - virtual ~Thread(); - - // Start thread calling Run(). - void Start(); - void Detach(); - virtual void Join(); - - protected: - virtual void Run() = 0; - - private: - static void* ThreadCallback(void* arg); - - pthread_t thread_; -}; - -} // namespace forwarder - -#endif // TOOLS_ANDROID_FORWARDER2_THREAD_H_ |