summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpliard@chromium.org <pliard@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-01-29 16:15:15 +0000
committerpliard@chromium.org <pliard@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-01-29 16:15:15 +0000
commitf1ba82468de0925f41e6f8d6f2bf0bf0347efdd8 (patch)
tree7466eca6fb829f5295183420d7c7df9f21bc3cfb
parent656eb9cb4ec1f046cf4a091e80d424872b7b5195 (diff)
downloadchromium_src-f1ba82468de0925f41e6f8d6f2bf0bf0347efdd8.zip
chromium_src-f1ba82468de0925f41e6f8d6f2bf0bf0347efdd8.tar.gz
chromium_src-f1ba82468de0925f41e6f8d6f2bf0bf0347efdd8.tar.bz2
forwarder2: Make the Forwarder instances operate on a single thread.
The Forwarder class used to operate on its own internal thread. This was leading to e.g. 256 threads being spawned since Chrome keeps a lot of sockets around in its socket pool. This CL makes the ForwarderManager have an internal thread performing a single select() on all the socket file descriptors and it notifies all the Forwarder instances when some events occur. The fact that ForwarderManager has an internal thread that joins on destruction also makes the interactions between ForwarderManager and its clients much simpler. In particular it allows us to remove its ref-counted thread-safe delegate. This CL should make the device_forwarder daemon more lightweight and make the results in page_cyclers hopefully more stable. The internal buffer used to forward the traffic is also shrunk to 32 KBytes since 128 KBytes was way too big (read() rarely returns more than 16 KBytes). BUG=332403 Review URL: https://codereview.chromium.org/148113003 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@247694 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--tools/android/forwarder2/device_listener.h2
-rw-r--r--tools/android/forwarder2/forwarder.cc110
-rw-r--r--tools/android/forwarder2/forwarder.h49
-rw-r--r--tools/android/forwarder2/forwarders_manager.cc145
-rw-r--r--tools/android/forwarder2/forwarders_manager.h57
-rw-r--r--tools/android/forwarder2/host_controller.h8
-rw-r--r--tools/android/forwarder2/pipe_notifier.cc13
-rw-r--r--tools/android/forwarder2/pipe_notifier.h2
8 files changed, 175 insertions, 211 deletions
diff --git a/tools/android/forwarder2/device_listener.h b/tools/android/forwarder2/device_listener.h
index 1daf3f6..c7724f4 100644
--- a/tools/android/forwarder2/device_listener.h
+++ b/tools/android/forwarder2/device_listener.h
@@ -95,8 +95,6 @@ class DeviceListener {
// Task runner used for deletion set at construction time (i.e. the object is
// deleted on the same thread it is created on).
scoped_refptr<base::SingleThreadTaskRunner> deletion_task_runner_;
- // See host_controller.h for explanations about lifetime and threading
- // interactions between this thread and the ForwardersManager below.
base::Thread thread_;
ForwardersManager forwarders_manager_;
diff --git a/tools/android/forwarder2/forwarder.cc b/tools/android/forwarder2/forwarder.cc
index d747dbe..1e0bcd0 100644
--- a/tools/android/forwarder2/forwarder.cc
+++ b/tools/android/forwarder2/forwarder.cc
@@ -5,19 +5,18 @@
#include "tools/android/forwarder2/forwarder.h"
#include "base/basictypes.h"
-#include "base/bind.h"
#include "base/logging.h"
-#include "base/memory/ref_counted.h"
-#include "base/message_loop/message_loop_proxy.h"
#include "base/posix/eintr_wrapper.h"
-#include "base/single_thread_task_runner.h"
-#include "base/threading/thread.h"
-#include "tools/android/forwarder2/pipe_notifier.h"
#include "tools/android/forwarder2/socket.h"
namespace forwarder2 {
namespace {
+const int kBufferSize = 32 * 1024;
+
+} // namespace
+
+
// Helper class to buffer reads and writes from one socket to another.
// Each implements a small buffer connected two one input socket, and
// one output socket.
@@ -37,7 +36,7 @@ namespace {
// disconnects. To work around this, its peer will call its Close() method
// when that happens.
-class BufferedCopier {
+class Forwarder::BufferedCopier {
public:
// Possible states:
// READING - Empty buffer and Waiting for input.
@@ -83,6 +82,8 @@ class BufferedCopier {
peer_ = peer;
}
+ bool is_closed() const { return state_ == STATE_CLOSED; }
+
// Gently asks to close a buffer. Called either by the peer or the forwarder.
void Close() {
switch (state_) {
@@ -205,8 +206,6 @@ class BufferedCopier {
Socket* socket_from_;
Socket* socket_to_;
- // A big buffer to let the file-over-http bridge work more like real file.
- static const int kBufferSize = 1024 * 128;
int bytes_read_;
int write_offset_;
BufferedCopier* peer_;
@@ -216,82 +215,41 @@ class BufferedCopier {
DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
};
-} // namespace
-
Forwarder::Forwarder(scoped_ptr<Socket> socket1,
- scoped_ptr<Socket> socket2,
- PipeNotifier* deletion_notifier,
- const ErrorCallback& error_callback)
- : self_deleter_helper_(this, error_callback),
- deletion_notifier_(deletion_notifier),
- socket1_(socket1.Pass()),
+ scoped_ptr<Socket> socket2)
+ : socket1_(socket1.Pass()),
socket2_(socket2.Pass()),
- thread_("ForwarderThread") {
- DCHECK(deletion_notifier_);
+ buffer1_(new BufferedCopier(socket1_.get(), socket2_.get())),
+ buffer2_(new BufferedCopier(socket2_.get(), socket1_.get())) {
+ buffer1_->SetPeer(buffer2_.get());
+ buffer2_->SetPeer(buffer1_.get());
}
-Forwarder::~Forwarder() {}
-
-void Forwarder::Start() {
- thread_.Start();
- thread_.message_loop_proxy()->PostTask(
- FROM_HERE,
- base::Bind(&Forwarder::ThreadHandler, base::Unretained(this)));
+Forwarder::~Forwarder() {
+ DCHECK(thread_checker_.CalledOnValidThread());
}
-void Forwarder::ThreadHandler() {
- fd_set read_fds;
- fd_set write_fds;
-
- // Copy from socket1 to socket2
- BufferedCopier buffer1(socket1_.get(), socket2_.get());
- // Copy from socket2 to socket1
- BufferedCopier buffer2(socket2_.get(), socket1_.get());
-
- buffer1.SetPeer(&buffer2);
- buffer2.SetPeer(&buffer1);
-
- for (;;) {
- FD_ZERO(&read_fds);
- FD_ZERO(&write_fds);
-
- int max_fd = -1;
- buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd);
- buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd);
-
- if (max_fd < 0) {
- // Both buffers are closed. Exit immediately.
- break;
- }
-
- const int deletion_fd = deletion_notifier_->receiver_fd();
- if (deletion_fd >= 0) {
- FD_SET(deletion_fd, &read_fds);
- max_fd = std::max(max_fd, deletion_fd);
- }
-
- if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <=
- 0) {
- PLOG(ERROR) << "select";
- break;
- }
-
- buffer1.ProcessSelect(read_fds, write_fds);
- buffer2.ProcessSelect(read_fds, write_fds);
+void Forwarder::RegisterFDs(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ buffer1_->PrepareSelect(read_fds, write_fds, max_fd);
+ buffer2_->PrepareSelect(read_fds, write_fds, max_fd);
+}
- if (deletion_fd >= 0 && FD_ISSET(deletion_fd, &read_fds)) {
- buffer1.Close();
- buffer2.Close();
- }
- }
+void Forwarder::ProcessEvents(const fd_set& read_fds, const fd_set& write_fds) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ buffer1_->ProcessSelect(read_fds, write_fds);
+ buffer2_->ProcessSelect(read_fds, write_fds);
+}
- // Note that the thread that the destructor will run on could be temporarily
- // blocked on I/O (e.g. select()) therefore it is safer to close the sockets
- // now rather than relying on the destructor.
- socket1_.reset();
- socket2_.reset();
+bool Forwarder::IsClosed() const {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ return buffer1_->is_closed() && buffer2_->is_closed();
+}
- self_deleter_helper_.MaybeSelfDeleteSoon();
+void Forwarder::Shutdown() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ buffer1_->Close();
+ buffer2_->Close();
}
} // namespace forwarder2
diff --git a/tools/android/forwarder2/forwarder.h b/tools/android/forwarder2/forwarder.h
index 5ad1b1a..0be86fcc 100644
--- a/tools/android/forwarder2/forwarder.h
+++ b/tools/android/forwarder2/forwarder.h
@@ -5,48 +5,41 @@
#ifndef TOOLS_ANDROID_FORWARDER2_FORWARDER_H_
#define TOOLS_ANDROID_FORWARDER2_FORWARDER_H_
-#include "base/callback.h"
+#include <sys/select.h>
+
#include "base/memory/scoped_ptr.h"
-#include "base/threading/thread.h"
-#include "tools/android/forwarder2/self_deleter_helper.h"
+#include "base/threading/thread_checker.h"
namespace forwarder2 {
-class PipeNotifier;
class Socket;
-// Internal class that wraps a helper thread to forward traffic between
-// |socket1| and |socket2|. After creating a new instance, call its Start()
-// method to launch operations. Thread stops automatically if one of the socket
-// disconnects, but ensures that all buffered writes to the other, still alive,
-// socket, are written first. When this happens, the instance will delete itself
-// automatically.
-// Note that the instance will always be destroyed on the same thread that
-// created it.
+// Internal class that forwards traffic between |socket1| and |socket2|. Note
+// that this class is not thread-safe.
class Forwarder {
public:
- // Callback used on error invoked by the Forwarder to self-delete.
- typedef base::Callback<void (scoped_ptr<Forwarder>)> ErrorCallback;
-
- Forwarder(scoped_ptr<Socket> socket1,
- scoped_ptr<Socket> socket2,
- PipeNotifier* deletion_notifier,
- const ErrorCallback& error_callback);
+ Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2);
~Forwarder();
- void Start();
+ void RegisterFDs(fd_set* read_fds, fd_set* write_fds, int* max_fd);
- private:
- void ThreadHandler();
+ void ProcessEvents(const fd_set& read_fds, const fd_set& write_fds);
- void OnThreadHandlerCompletion(const ErrorCallback& error_callback);
+ bool IsClosed() const;
- SelfDeleterHelper<Forwarder> self_deleter_helper_;
- PipeNotifier* const deletion_notifier_;
- scoped_ptr<Socket> socket1_;
- scoped_ptr<Socket> socket2_;
- base::Thread thread_;
+ void Shutdown();
+
+ private:
+ class BufferedCopier;
+
+ base::ThreadChecker thread_checker_;
+ const scoped_ptr<Socket> socket1_;
+ const scoped_ptr<Socket> socket2_;
+ // Copies data from socket1 to socket2.
+ const scoped_ptr<BufferedCopier> buffer1_;
+ // Copies data from socket2 to socket1.
+ const scoped_ptr<BufferedCopier> buffer2_;
};
} // namespace forwarder2
diff --git a/tools/android/forwarder2/forwarders_manager.cc b/tools/android/forwarder2/forwarders_manager.cc
index 1b3cd2c..1795cb5 100644
--- a/tools/android/forwarder2/forwarders_manager.cc
+++ b/tools/android/forwarder2/forwarders_manager.cc
@@ -4,88 +4,129 @@
#include "tools/android/forwarder2/forwarders_manager.h"
+#include <sys/select.h>
+#include <unistd.h>
+
#include <algorithm>
#include "base/basictypes.h"
#include "base/bind.h"
+#include "base/callback_helpers.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/message_loop/message_loop_proxy.h"
+#include "base/posix/eintr_wrapper.h"
#include "tools/android/forwarder2/forwarder.h"
#include "tools/android/forwarder2/socket.h"
namespace forwarder2 {
-ForwardersManager::ForwardersManager() : delegate_(new Delegate()) {}
+ForwardersManager::ForwardersManager() : thread_("ForwardersManagerThread") {
+ thread_.Start();
+ WaitForEventsOnInternalThreadSoon();
+}
+
ForwardersManager::~ForwardersManager() {
- delegate_->Clear();
+ deletion_notifier_.Notify();
}
void ForwardersManager::CreateAndStartNewForwarder(scoped_ptr<Socket> socket1,
scoped_ptr<Socket> socket2) {
- delegate_->CreateAndStartNewForwarder(socket1.Pass(), socket2.Pass());
+ // Note that the internal Forwarder vector is populated on the internal thread
+ // which is the only thread from which it's accessed.
+ thread_.message_loop_proxy()->PostTask(
+ FROM_HERE,
+ base::Bind(&ForwardersManager::CreateNewForwarderOnInternalThread,
+ base::Unretained(this), base::Passed(&socket1),
+ base::Passed(&socket2)));
+
+ // Guarantees that the CreateNewForwarderOnInternalThread callback posted to
+ // the internal thread gets executed immediately.
+ wakeup_notifier_.Notify();
}
-ForwardersManager::Delegate::Delegate() {}
+void ForwardersManager::CreateNewForwarderOnInternalThread(
+ scoped_ptr<Socket> socket1,
+ scoped_ptr<Socket> socket2) {
+ DCHECK(thread_.message_loop_proxy()->RunsTasksOnCurrentThread());
+ forwarders_.push_back(new Forwarder(socket1.Pass(), socket2.Pass()));
+}
-ForwardersManager::Delegate::~Delegate() {
- // The forwarder instances should already have been deleted on their
- // construction thread. Deleting them here would be unsafe since we don't know
- // which thread this destructor is called on.
- DCHECK(forwarders_.empty());
+void ForwardersManager::WaitForEventsOnInternalThreadSoon() {
+ thread_.message_loop_proxy()->PostTask(
+ FROM_HERE,
+ base::Bind(&ForwardersManager::WaitForEventsOnInternalThread,
+ base::Unretained(this)));
}
-void ForwardersManager::Delegate::Clear() {
- if (!forwarders_constructor_runner_) {
- DCHECK(forwarders_.empty());
- return;
+void ForwardersManager::WaitForEventsOnInternalThread() {
+ DCHECK(thread_.message_loop_proxy()->RunsTasksOnCurrentThread());
+ fd_set read_fds;
+ fd_set write_fds;
+
+ FD_ZERO(&read_fds);
+ FD_ZERO(&write_fds);
+
+ // Populate the file descriptor sets.
+ int max_fd = -1;
+ for (ScopedVector<Forwarder>::iterator it = forwarders_.begin();
+ it != forwarders_.end(); ++it) {
+ Forwarder* const forwarder = *it;
+ forwarder->RegisterFDs(&read_fds, &write_fds, &max_fd);
+ }
+
+ const int notifier_fds[] = {
+ wakeup_notifier_.receiver_fd(),
+ deletion_notifier_.receiver_fd(),
+ };
+
+ for (int i = 0; i < arraysize(notifier_fds); ++i) {
+ const int notifier_fd = notifier_fds[i];
+ DCHECK_GT(notifier_fd, -1);
+ FD_SET(notifier_fd, &read_fds);
+ max_fd = std::max(max_fd, notifier_fd);
}
- if (forwarders_constructor_runner_->RunsTasksOnCurrentThread()) {
- ClearOnForwarderConstructorThread();
+
+ const int ret = HANDLE_EINTR(
+ select(max_fd + 1, &read_fds, &write_fds, NULL, NULL));
+ if (ret < 0) {
+ PLOG(ERROR) << "select";
return;
}
- forwarders_constructor_runner_->PostTask(
- FROM_HERE,
- base::Bind(
- &ForwardersManager::Delegate::ClearOnForwarderConstructorThread,
- this));
-}
-void ForwardersManager::Delegate::CreateAndStartNewForwarder(
- scoped_ptr<Socket> socket1,
- scoped_ptr<Socket> socket2) {
- const scoped_refptr<base::SingleThreadTaskRunner> current_task_runner(
- base::MessageLoopProxy::current());
- DCHECK(current_task_runner);
- if (forwarders_constructor_runner_) {
- DCHECK_EQ(current_task_runner, forwarders_constructor_runner_);
- } else {
- forwarders_constructor_runner_ = current_task_runner;
+ const bool must_shutdown = FD_ISSET(
+ deletion_notifier_.receiver_fd(), &read_fds);
+ if (must_shutdown && forwarders_.empty())
+ return;
+
+ base::ScopedClosureRunner wait_for_events_soon(
+ base::Bind(&ForwardersManager::WaitForEventsOnInternalThreadSoon,
+ base::Unretained(this)));
+
+ if (FD_ISSET(wakeup_notifier_.receiver_fd(), &read_fds)) {
+ // Note that the events on FDs other than the wakeup notifier one, if any,
+ // will be processed upon the next select().
+ wakeup_notifier_.Reset();
+ return;
}
- forwarders_.push_back(
- new Forwarder(socket1.Pass(), socket2.Pass(),
- &deletion_notifier_,
- base::Bind(&ForwardersManager::Delegate::OnForwarderError,
- this)));
- forwarders_.back()->Start();
-}
-void ForwardersManager::Delegate::OnForwarderError(
- scoped_ptr<Forwarder> forwarder) {
- DCHECK(forwarders_constructor_runner_->RunsTasksOnCurrentThread());
- const ScopedVector<Forwarder>::iterator it = std::find(
- forwarders_.begin(), forwarders_.end(), forwarder.get());
- DCHECK(it != forwarders_.end());
- std::swap(*it, forwarders_.back());
- forwarders_.pop_back();
- ignore_result(forwarder.release()); // Deleted by the pop_back() above.
-}
+ // Notify the Forwarder instances and remove the ones that are closed.
+ for (size_t i = 0; i < forwarders_.size(); ) {
+ Forwarder* const forwarder = forwarders_[i];
+ forwarder->ProcessEvents(read_fds, write_fds);
-void ForwardersManager::Delegate::ClearOnForwarderConstructorThread() {
- DCHECK(forwarders_constructor_runner_->RunsTasksOnCurrentThread());
- deletion_notifier_.Notify();
- forwarders_.clear();
+ if (must_shutdown)
+ forwarder->Shutdown();
+
+ if (!forwarder->IsClosed()) {
+ ++i;
+ continue;
+ }
+
+ std::swap(forwarders_[i], forwarders_.back());
+ forwarders_.pop_back();
+ }
}
} // namespace forwarder2
diff --git a/tools/android/forwarder2/forwarders_manager.h b/tools/android/forwarder2/forwarders_manager.h
index 303ec5c..4c6dea6 100644
--- a/tools/android/forwarder2/forwarders_manager.h
+++ b/tools/android/forwarder2/forwarders_manager.h
@@ -5,70 +5,39 @@
#ifndef TOOLS_ANDROID_FORWARDER2_FORWARDERS_MANAGER_H_
#define TOOLS_ANDROID_FORWARDER2_FORWARDERS_MANAGER_H_
-#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/memory/scoped_vector.h"
+#include "base/threading/thread.h"
#include "tools/android/forwarder2/pipe_notifier.h"
-namespace base {
-
-class SingleThreadTaskRunner;
-
-} // namespace base
-
namespace forwarder2 {
class Forwarder;
class Socket;
-// Creates and manages lifetime of a set of Forwarder instances. This class
-// ensures in particular that the forwarder instances are deleted on their
-// construction thread no matter which thread ~ForwardersManager() is called on.
+// Creates, owns and notifies Forwarder instances on its own internal thread.
class ForwardersManager {
public:
- // Can be called on any thread.
ForwardersManager();
+
+ // Must be called on the thread the constructor was called on.
~ForwardersManager();
- // Note that the (unique) thread that this method is called on must outlive
- // the ForwardersManager instance. This is needed since the Forwarder
- // instances are deleted on the thread they were constructed on.
+ // Can be called on any thread.
void CreateAndStartNewForwarder(scoped_ptr<Socket> socket1,
scoped_ptr<Socket> socket2);
private:
- // Ref-counted thread-safe delegate that can (by definition) outlive its outer
- // ForwarderManager instance. This is needed since the forwarder instances are
- // destroyed on a thread other than the one ~ForwardersManager() is called on.
- // The forwarder instances can also call OnForwarderError() below at any time
- // on their construction thread (which is the same as their destruction
- // thread). This means that a WeakPtr to ForwardersManager can't be used
- // instead of ref-counting since checking the validity of the WeakPtr on a
- // thread other than the one ~ForwardersManager() is called on would be racy.
- class Delegate : public base::RefCountedThreadSafe<Delegate> {
- public:
- Delegate();
-
- void Clear();
-
- void CreateAndStartNewForwarder(scoped_ptr<Socket> socket1,
- scoped_ptr<Socket> socket2);
-
- private:
- friend class base::RefCountedThreadSafe<Delegate>;
-
- virtual ~Delegate();
-
- void OnForwarderError(scoped_ptr<Forwarder> forwarder);
-
- void ClearOnForwarderConstructorThread();
+ void CreateNewForwarderOnInternalThread(scoped_ptr<Socket> socket1,
+ scoped_ptr<Socket> socket2);
- scoped_refptr<base::SingleThreadTaskRunner> forwarders_constructor_runner_;
- PipeNotifier deletion_notifier_;
- ScopedVector<Forwarder> forwarders_;
- };
+ void WaitForEventsOnInternalThreadSoon();
+ void WaitForEventsOnInternalThread();
- const scoped_refptr<Delegate> delegate_;
+ ScopedVector<Forwarder> forwarders_;
+ PipeNotifier deletion_notifier_;
+ PipeNotifier wakeup_notifier_;
+ base::Thread thread_;
};
} // namespace forwarder2
diff --git a/tools/android/forwarder2/host_controller.h b/tools/android/forwarder2/host_controller.h
index fbf3847..d228bcc 100644
--- a/tools/android/forwarder2/host_controller.h
+++ b/tools/android/forwarder2/host_controller.h
@@ -85,14 +85,6 @@ class HostController {
// Task runner used for deletion set at deletion time (i.e. the object is
// deleted on the same thread it is created on).
const scoped_refptr<base::SingleThreadTaskRunner> deletion_task_runner_;
- // Note that this thread must outlive |forwarders_manager_| below. The
- // ForwardersManager's internal delegate (outliving ForwardersManager) may
- // post a task to this thread when it gets deleted. Also note that
- // base::Thread joins on destruction which means that ~Thread() will only
- // return after it executed all the tasks enqueued in its message loop. This
- // also means that it is guaranteed that all the Forwarder instances owned by
- // the ForwardersManager's internal delegate are destructed before
- // ~HostController() returns.
base::Thread thread_;
ForwardersManager forwarders_manager_;
diff --git a/tools/android/forwarder2/pipe_notifier.cc b/tools/android/forwarder2/pipe_notifier.cc
index 9110fff..02842bd 100644
--- a/tools/android/forwarder2/pipe_notifier.cc
+++ b/tools/android/forwarder2/pipe_notifier.cc
@@ -34,10 +34,21 @@ bool PipeNotifier::Notify() {
errno = 0;
int ret = HANDLE_EINTR(write(sender_fd_, "1", 1));
if (ret < 0) {
- LOG(WARNING) << "Error while notifying pipe. " << safe_strerror(errno);
+ PLOG(ERROR) << "write";
return false;
}
return true;
}
+void PipeNotifier::Reset() {
+ char c;
+ int ret = HANDLE_EINTR(read(receiver_fd_, &c, 1));
+ if (ret < 0) {
+ PLOG(ERROR) << "read";
+ return;
+ }
+ DCHECK_EQ(1, ret);
+ DCHECK_EQ('1', c);
+}
+
} // namespace forwarder
diff --git a/tools/android/forwarder2/pipe_notifier.h b/tools/android/forwarder2/pipe_notifier.h
index efe303b..aadb269 100644
--- a/tools/android/forwarder2/pipe_notifier.h
+++ b/tools/android/forwarder2/pipe_notifier.h
@@ -23,6 +23,8 @@ class PipeNotifier {
int receiver_fd() const { return receiver_fd_; }
+ void Reset();
+
private:
int sender_fd_;
int receiver_fd_;