diff options
author | pliard@chromium.org <pliard@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-11-18 13:49:46 +0000 |
---|---|---|
committer | pliard@chromium.org <pliard@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-11-18 13:49:46 +0000 |
commit | 715d972e1cd8f4606bb6fe20db4326aca15f248c (patch) | |
tree | 12844bb93aef7542120da084e492df1358b35759 | |
parent | 3e00d6a31bc49f33df3348eba43e8832d9730e8a (diff) | |
download | chromium_src-715d972e1cd8f4606bb6fe20db4326aca15f248c.zip chromium_src-715d972e1cd8f4606bb6fe20db4326aca15f248c.tar.gz chromium_src-715d972e1cd8f4606bb6fe20db4326aca15f248c.tar.bz2 |
Revert 235213 "android: forwader2: Simplify Forwarder implementa..."
> android: forwader2: Simplify Forwarder implementation.
>
> This patch does several things to the Forwarder class used in the 'forwarder2'
> tool:
>
> - Add a Stop() method to gently ask the forwarder to shut-down. This stops
> the forwarder from listening from incoming data, but keeps it running
> until it could flush its buffer properly, to avoid packet loss.
>
> - Simplify / refactor the buffer management code inside a Forwarder
> instance, to make the state of its buffer, and their transitions,
> more explict.
>
> BUG=313809
> R=pliard@chromium.org
>
> Review URL: https://codereview.chromium.org/61793013
TBR=digit@chromium.org
Review URL: https://codereview.chromium.org/70193013
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@235729 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | tools/android/forwarder2/forwarder.cc | 261 | ||||
-rw-r--r-- | tools/android/forwarder2/forwarder.h | 1 | ||||
-rw-r--r-- | tools/android/forwarder2/socket.cc | 18 | ||||
-rw-r--r-- | tools/android/forwarder2/socket.h | 7 |
4 files changed, 83 insertions, 204 deletions
diff --git a/tools/android/forwarder2/forwarder.cc b/tools/android/forwarder2/forwarder.cc index f662e12..df4c29c 100644 --- a/tools/android/forwarder2/forwarder.cc +++ b/tools/android/forwarder2/forwarder.cc @@ -8,206 +8,77 @@ #include "base/bind.h" #include "base/logging.h" #include "base/memory/ref_counted.h" -#include "base/message_loop/message_loop_proxy.h" #include "base/posix/eintr_wrapper.h" #include "base/single_thread_task_runner.h" -#include "base/threading/thread.h" -#include "tools/android/forwarder2/pipe_notifier.h" #include "tools/android/forwarder2/socket.h" namespace forwarder2 { namespace { // Helper class to buffer reads and writes from one socket to another. -// Each implements a small buffer connected two one input socket, and -// one output socket. -// -// socket_from_ ---> [BufferedCopier] ---> socket_to_ -// -// These objects are used in a pair to handle duplex traffic, as in: -// -// ------> [BufferedCopier_1] ---> -// / \ -// socket_1 * * socket_2 -// \ / -// <------ [BufferedCopier_2] <---- -// -// When a BufferedCopier is in the READING state (see below), it only listens -// to events on its input socket, and won't detect when its output socket -// disconnects. To work around this, its peer will call its Close() method -// when that happens. - class BufferedCopier { public: - // Possible states: - // READING - Empty buffer and Waiting for input. - // WRITING - Data in buffer, and waiting for output. - // CLOSING - Like WRITING, but do not try to read after that. - // CLOSED - Completely closed. - // - // State transitions are: - // - // T01: READING ---[receive data]---> WRITING - // T02: READING ---[error on input socket]---> CLOSED - // T03: READING ---[Close() call]---> CLOSED - // - // T04: WRITING ---[write partial data]---> WRITING - // T05: WRITING ---[write all data]----> READING - // T06: WRITING ---[error on output socket]----> CLOSED - // T07: WRITING ---[Close() call]---> CLOSING - // - // T08: CLOSING ---[write partial data]---> CLOSING - // T09: CLOSING ---[write all data]----> CLOSED - // T10: CLOSING ---[Close() call]---> CLOSING - // T11: CLOSING ---[error on output socket] ---> CLOSED - // - enum State { - STATE_READING = 0, - STATE_WRITING = 1, - STATE_CLOSING = 2, - STATE_CLOSED = 3, - }; - // Does NOT own the pointers. - BufferedCopier(Socket* socket_from, Socket* socket_to) + BufferedCopier(Socket* socket_from, + Socket* socket_to) : socket_from_(socket_from), socket_to_(socket_to), bytes_read_(0), - write_offset_(0), - peer_(NULL), - state_(STATE_READING) {} - - // Sets the 'peer_' field pointing to the other BufferedCopier in a pair. - void SetPeer(BufferedCopier* peer) { peer_ = peer; } - - // Gently asks to close a buffer. Called either by the peer or the forwarder. - void Close() { - switch (state_) { - case STATE_READING: - state_ = STATE_CLOSED; // T03 - break; - case STATE_WRITING: - state_ = STATE_CLOSING; // T07 - break; - case STATE_CLOSING: - break; // T10 - case STATE_CLOSED: - ; - } + write_offset_(0) { } - // Call this before select(). This updates |read_fds|, - // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed. - void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) { - int fd; - switch (state_) { - case STATE_READING: - DCHECK(bytes_read_ == 0); - DCHECK(write_offset_ == 0); - fd = socket_from_->fd(); - if (fd < 0) { - ForceClose(); // T02 - return; - } - FD_SET(fd, read_fds); - break; + bool AddToReadSet(fd_set* read_fds) { + if (bytes_read_ == 0) + return socket_from_->AddFdToSet(read_fds); + return false; + } - case STATE_WRITING: - case STATE_CLOSING: - DCHECK(bytes_read_ > 0); - DCHECK(write_offset_ < bytes_read_); - fd = socket_to_->fd(); - if (fd < 0) { - ForceClose(); // T06 - return; - } - FD_SET(fd, write_fds); - break; + bool AddToWriteSet(fd_set* write_fds) { + if (write_offset_ < bytes_read_) + return socket_to_->AddFdToSet(write_fds); + return false; + } - case STATE_CLOSED: - return; + bool TryRead(const fd_set& read_fds) { + if (!socket_from_->IsFdInSet(read_fds)) + return false; + if (bytes_read_ != 0) // Can't read. + return false; + int ret = socket_from_->Read(buffer_, kBufferSize); + if (ret > 0) { + bytes_read_ = ret; + return true; } - *max_fd = std::max(*max_fd, fd); + return false; } - // Call this after a select() call to operate over the buffer. - void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) { - int fd, ret; - switch (state_) { - case STATE_READING: - fd = socket_from_->fd(); - if (fd < 0) { - state_ = STATE_CLOSED; // T02 - return; - } - if (!FD_ISSET(fd, &read_fds)) - return; - - ret = socket_from_->NonBlockingRead(buffer_, kBufferSize); - if (ret <= 0) { - ForceClose(); // T02 - return; - } - bytes_read_ = ret; - write_offset_ = 0; - state_ = STATE_WRITING; // T01 - break; - - case STATE_WRITING: - case STATE_CLOSING: - fd = socket_to_->fd(); - if (fd < 0) { - ForceClose(); // T06 + T11 - return; - } - if (!FD_ISSET(fd, &write_fds)) - return; - - ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_, - bytes_read_ - write_offset_); - if (ret <= 0) { - ForceClose(); // T06 + T11 - return; - } - - write_offset_ += ret; - if (write_offset_ < bytes_read_) - return; // T08 + T04 - + bool TryWrite(const fd_set& write_fds) { + if (!socket_to_->IsFdInSet(write_fds)) + return false; + if (write_offset_ >= bytes_read_) // Nothing to write. + return false; + int ret = socket_to_->Write(buffer_ + write_offset_, + bytes_read_ - write_offset_); + if (ret > 0) { + write_offset_ += ret; + if (write_offset_ == bytes_read_) { write_offset_ = 0; bytes_read_ = 0; - if (state_ == STATE_CLOSING) { - ForceClose(); // T09 - return; - } - state_ = STATE_READING; // T05 - break; - - case STATE_CLOSED: - ; + } + return true; } + return false; } private: - // Internal method used to close the buffer and notify the peer, if any. - void ForceClose() { - if (peer_) { - peer_->Close(); - peer_ = NULL; - } - state_ = STATE_CLOSED; - } - // Not owned. Socket* socket_from_; Socket* socket_to_; - // A big buffer to let the file-over-http bridge work more like real file. + // A big buffer to let our file-over-http bridge work more like real file. static const int kBufferSize = 1024 * 128; int bytes_read_; int write_offset_; - BufferedCopier* peer_; - State state_; char buffer_[kBufferSize]; DISALLOW_COPY_AND_ASSIGN(BufferedCopier); @@ -223,13 +94,12 @@ class BufferedCopier { // created it. class Forwarder { public: - // Create a new Forwarder instance. |socket1| and |socket2| are the two socket - // endpoints. Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) : socket1_(socket1.Pass()), socket2_(socket2.Pass()), destructor_runner_(base::MessageLoopProxy::current()), - thread_("ForwarderThread") {} + thread_("ForwarderThread") { + } void Start() { thread_.Start(); @@ -238,54 +108,42 @@ class Forwarder { base::Bind(&Forwarder::ThreadHandler, base::Unretained(this))); } - void Stop() { deletion_notifier_.Notify(); } - private: void ThreadHandler() { + const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1; fd_set read_fds; fd_set write_fds; // Copy from socket1 to socket2 BufferedCopier buffer1(socket1_.get(), socket2_.get()); - // Copy from socket2 to socket1 BufferedCopier buffer2(socket2_.get(), socket1_.get()); - buffer1.SetPeer(&buffer2); - buffer2.SetPeer(&buffer1); - - for (;;) { + bool run = true; + while (run) { FD_ZERO(&read_fds); FD_ZERO(&write_fds); - int max_fd = -1; - buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd); - buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd); + buffer1.AddToReadSet(&read_fds); + buffer2.AddToReadSet(&read_fds); + buffer1.AddToWriteSet(&write_fds); + buffer2.AddToWriteSet(&write_fds); - if (max_fd < 0) { - // Both buffers are closed. Exit immediately. - break; - } - - const int deletion_fd = deletion_notifier_.receiver_fd(); - if (deletion_fd >= 0) { - FD_SET(deletion_fd, &read_fds); - max_fd = std::max(max_fd, deletion_fd); - } - - if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <= - 0) { + if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) { PLOG(ERROR) << "select"; break; } - - buffer1.ProcessSelect(read_fds, write_fds); - buffer2.ProcessSelect(read_fds, write_fds); - - if (deletion_fd >= 0 && FD_ISSET(deletion_fd, &read_fds)) { - buffer1.Close(); - buffer2.Close(); - } + // When a socket in the read set closes the connection, select() returns + // with that socket descriptor set as "ready to read". When we call + // TryRead() below, it will return false, but the while loop will continue + // to run until all the write operations are finished, to make sure the + // buffers are completely flushed out. + + // Keep running while we have some operation to do. + run = buffer1.TryRead(read_fds); + run = run || buffer2.TryRead(read_fds); + run = run || buffer1.TryWrite(write_fds); + run = run || buffer2.TryWrite(write_fds); } // Note that the thread that |destruction_runner_| runs tasks on could be @@ -294,13 +152,12 @@ class Forwarder { socket1_.reset(); socket2_.reset(); - // Ensure the object is destroyed on the thread that created it. + // Note that base::Thread must be destroyed on the thread it was created on. destructor_runner_->DeleteSoon(FROM_HERE, this); } scoped_ptr<Socket> socket1_; scoped_ptr<Socket> socket2_; - PipeNotifier deletion_notifier_; scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_; base::Thread thread_; }; diff --git a/tools/android/forwarder2/forwarder.h b/tools/android/forwarder2/forwarder.h index d4d2762..651b5e8 100644 --- a/tools/android/forwarder2/forwarder.h +++ b/tools/android/forwarder2/forwarder.h @@ -6,6 +6,7 @@ #define TOOLS_ANDROID_FORWARDER2_FORWARDER_H_ #include "base/memory/scoped_ptr.h" +#include "base/threading/thread.h" namespace forwarder2 { diff --git a/tools/android/forwarder2/socket.cc b/tools/android/forwarder2/socket.cc index fb0b48f..d6a4737 100644 --- a/tools/android/forwarder2/socket.cc +++ b/tools/android/forwarder2/socket.cc @@ -287,6 +287,19 @@ int Socket::GetPort() { return port_; } +bool Socket::IsFdInSet(const fd_set& fds) const { + if (IsClosed()) + return false; + return FD_ISSET(socket_, &fds); +} + +bool Socket::AddFdToSet(fd_set* fds) const { + if (IsClosed()) + return false; + FD_SET(socket_, fds); + return true; +} + int Socket::ReadNumBytes(void* buffer, size_t num_bytes) { int bytes_read = 0; int ret = 1; @@ -429,6 +442,11 @@ bool Socket::WaitForEvent(EventType type, int timeout_secs) { } // static +int Socket::GetHighestFileDescriptor(const Socket& s1, const Socket& s2) { + return std::max(s1.socket_, s2.socket_); +} + +// static pid_t Socket::GetUnixDomainSocketProcessOwner(const std::string& path) { Socket socket; if (!socket.ConnectUnix(path)) diff --git a/tools/android/forwarder2/socket.h b/tools/android/forwarder2/socket.h index 6047a1c..33d72fc 100644 --- a/tools/android/forwarder2/socket.h +++ b/tools/android/forwarder2/socket.h @@ -37,13 +37,14 @@ class Socket { void Close(); bool IsClosed() const { return socket_ < 0; } - int fd() const { return socket_; } - bool Accept(Socket* new_socket); // Returns the port allocated to this socket or zero on error. int GetPort(); + bool IsFdInSet(const fd_set& fds) const; + bool AddFdToSet(fd_set* fds) const; + // Just a wrapper around unix read() function. // Reads up to buffer_size, but may read less then buffer_size. // Returns the number of bytes read. @@ -88,6 +89,8 @@ class Socket { bool DidReceiveEvent() const; + static int GetHighestFileDescriptor(const Socket& s1, const Socket& s2); + static pid_t GetUnixDomainSocketProcessOwner(const std::string& path); private: |