summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpliard@chromium.org <pliard@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-11-18 13:49:46 +0000
committerpliard@chromium.org <pliard@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-11-18 13:49:46 +0000
commit715d972e1cd8f4606bb6fe20db4326aca15f248c (patch)
tree12844bb93aef7542120da084e492df1358b35759
parent3e00d6a31bc49f33df3348eba43e8832d9730e8a (diff)
downloadchromium_src-715d972e1cd8f4606bb6fe20db4326aca15f248c.zip
chromium_src-715d972e1cd8f4606bb6fe20db4326aca15f248c.tar.gz
chromium_src-715d972e1cd8f4606bb6fe20db4326aca15f248c.tar.bz2
Revert 235213 "android: forwader2: Simplify Forwarder implementa..."
> android: forwader2: Simplify Forwarder implementation. > > This patch does several things to the Forwarder class used in the 'forwarder2' > tool: > > - Add a Stop() method to gently ask the forwarder to shut-down. This stops > the forwarder from listening from incoming data, but keeps it running > until it could flush its buffer properly, to avoid packet loss. > > - Simplify / refactor the buffer management code inside a Forwarder > instance, to make the state of its buffer, and their transitions, > more explict. > > BUG=313809 > R=pliard@chromium.org > > Review URL: https://codereview.chromium.org/61793013 TBR=digit@chromium.org Review URL: https://codereview.chromium.org/70193013 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@235729 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--tools/android/forwarder2/forwarder.cc261
-rw-r--r--tools/android/forwarder2/forwarder.h1
-rw-r--r--tools/android/forwarder2/socket.cc18
-rw-r--r--tools/android/forwarder2/socket.h7
4 files changed, 83 insertions, 204 deletions
diff --git a/tools/android/forwarder2/forwarder.cc b/tools/android/forwarder2/forwarder.cc
index f662e12..df4c29c 100644
--- a/tools/android/forwarder2/forwarder.cc
+++ b/tools/android/forwarder2/forwarder.cc
@@ -8,206 +8,77 @@
#include "base/bind.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
-#include "base/message_loop/message_loop_proxy.h"
#include "base/posix/eintr_wrapper.h"
#include "base/single_thread_task_runner.h"
-#include "base/threading/thread.h"
-#include "tools/android/forwarder2/pipe_notifier.h"
#include "tools/android/forwarder2/socket.h"
namespace forwarder2 {
namespace {
// Helper class to buffer reads and writes from one socket to another.
-// Each implements a small buffer connected two one input socket, and
-// one output socket.
-//
-// socket_from_ ---> [BufferedCopier] ---> socket_to_
-//
-// These objects are used in a pair to handle duplex traffic, as in:
-//
-// ------> [BufferedCopier_1] --->
-// / \
-// socket_1 * * socket_2
-// \ /
-// <------ [BufferedCopier_2] <----
-//
-// When a BufferedCopier is in the READING state (see below), it only listens
-// to events on its input socket, and won't detect when its output socket
-// disconnects. To work around this, its peer will call its Close() method
-// when that happens.
-
class BufferedCopier {
public:
- // Possible states:
- // READING - Empty buffer and Waiting for input.
- // WRITING - Data in buffer, and waiting for output.
- // CLOSING - Like WRITING, but do not try to read after that.
- // CLOSED - Completely closed.
- //
- // State transitions are:
- //
- // T01: READING ---[receive data]---> WRITING
- // T02: READING ---[error on input socket]---> CLOSED
- // T03: READING ---[Close() call]---> CLOSED
- //
- // T04: WRITING ---[write partial data]---> WRITING
- // T05: WRITING ---[write all data]----> READING
- // T06: WRITING ---[error on output socket]----> CLOSED
- // T07: WRITING ---[Close() call]---> CLOSING
- //
- // T08: CLOSING ---[write partial data]---> CLOSING
- // T09: CLOSING ---[write all data]----> CLOSED
- // T10: CLOSING ---[Close() call]---> CLOSING
- // T11: CLOSING ---[error on output socket] ---> CLOSED
- //
- enum State {
- STATE_READING = 0,
- STATE_WRITING = 1,
- STATE_CLOSING = 2,
- STATE_CLOSED = 3,
- };
-
// Does NOT own the pointers.
- BufferedCopier(Socket* socket_from, Socket* socket_to)
+ BufferedCopier(Socket* socket_from,
+ Socket* socket_to)
: socket_from_(socket_from),
socket_to_(socket_to),
bytes_read_(0),
- write_offset_(0),
- peer_(NULL),
- state_(STATE_READING) {}
-
- // Sets the 'peer_' field pointing to the other BufferedCopier in a pair.
- void SetPeer(BufferedCopier* peer) { peer_ = peer; }
-
- // Gently asks to close a buffer. Called either by the peer or the forwarder.
- void Close() {
- switch (state_) {
- case STATE_READING:
- state_ = STATE_CLOSED; // T03
- break;
- case STATE_WRITING:
- state_ = STATE_CLOSING; // T07
- break;
- case STATE_CLOSING:
- break; // T10
- case STATE_CLOSED:
- ;
- }
+ write_offset_(0) {
}
- // Call this before select(). This updates |read_fds|,
- // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed.
- void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
- int fd;
- switch (state_) {
- case STATE_READING:
- DCHECK(bytes_read_ == 0);
- DCHECK(write_offset_ == 0);
- fd = socket_from_->fd();
- if (fd < 0) {
- ForceClose(); // T02
- return;
- }
- FD_SET(fd, read_fds);
- break;
+ bool AddToReadSet(fd_set* read_fds) {
+ if (bytes_read_ == 0)
+ return socket_from_->AddFdToSet(read_fds);
+ return false;
+ }
- case STATE_WRITING:
- case STATE_CLOSING:
- DCHECK(bytes_read_ > 0);
- DCHECK(write_offset_ < bytes_read_);
- fd = socket_to_->fd();
- if (fd < 0) {
- ForceClose(); // T06
- return;
- }
- FD_SET(fd, write_fds);
- break;
+ bool AddToWriteSet(fd_set* write_fds) {
+ if (write_offset_ < bytes_read_)
+ return socket_to_->AddFdToSet(write_fds);
+ return false;
+ }
- case STATE_CLOSED:
- return;
+ bool TryRead(const fd_set& read_fds) {
+ if (!socket_from_->IsFdInSet(read_fds))
+ return false;
+ if (bytes_read_ != 0) // Can't read.
+ return false;
+ int ret = socket_from_->Read(buffer_, kBufferSize);
+ if (ret > 0) {
+ bytes_read_ = ret;
+ return true;
}
- *max_fd = std::max(*max_fd, fd);
+ return false;
}
- // Call this after a select() call to operate over the buffer.
- void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) {
- int fd, ret;
- switch (state_) {
- case STATE_READING:
- fd = socket_from_->fd();
- if (fd < 0) {
- state_ = STATE_CLOSED; // T02
- return;
- }
- if (!FD_ISSET(fd, &read_fds))
- return;
-
- ret = socket_from_->NonBlockingRead(buffer_, kBufferSize);
- if (ret <= 0) {
- ForceClose(); // T02
- return;
- }
- bytes_read_ = ret;
- write_offset_ = 0;
- state_ = STATE_WRITING; // T01
- break;
-
- case STATE_WRITING:
- case STATE_CLOSING:
- fd = socket_to_->fd();
- if (fd < 0) {
- ForceClose(); // T06 + T11
- return;
- }
- if (!FD_ISSET(fd, &write_fds))
- return;
-
- ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_,
- bytes_read_ - write_offset_);
- if (ret <= 0) {
- ForceClose(); // T06 + T11
- return;
- }
-
- write_offset_ += ret;
- if (write_offset_ < bytes_read_)
- return; // T08 + T04
-
+ bool TryWrite(const fd_set& write_fds) {
+ if (!socket_to_->IsFdInSet(write_fds))
+ return false;
+ if (write_offset_ >= bytes_read_) // Nothing to write.
+ return false;
+ int ret = socket_to_->Write(buffer_ + write_offset_,
+ bytes_read_ - write_offset_);
+ if (ret > 0) {
+ write_offset_ += ret;
+ if (write_offset_ == bytes_read_) {
write_offset_ = 0;
bytes_read_ = 0;
- if (state_ == STATE_CLOSING) {
- ForceClose(); // T09
- return;
- }
- state_ = STATE_READING; // T05
- break;
-
- case STATE_CLOSED:
- ;
+ }
+ return true;
}
+ return false;
}
private:
- // Internal method used to close the buffer and notify the peer, if any.
- void ForceClose() {
- if (peer_) {
- peer_->Close();
- peer_ = NULL;
- }
- state_ = STATE_CLOSED;
- }
-
// Not owned.
Socket* socket_from_;
Socket* socket_to_;
- // A big buffer to let the file-over-http bridge work more like real file.
+ // A big buffer to let our file-over-http bridge work more like real file.
static const int kBufferSize = 1024 * 128;
int bytes_read_;
int write_offset_;
- BufferedCopier* peer_;
- State state_;
char buffer_[kBufferSize];
DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
@@ -223,13 +94,12 @@ class BufferedCopier {
// created it.
class Forwarder {
public:
- // Create a new Forwarder instance. |socket1| and |socket2| are the two socket
- // endpoints.
Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2)
: socket1_(socket1.Pass()),
socket2_(socket2.Pass()),
destructor_runner_(base::MessageLoopProxy::current()),
- thread_("ForwarderThread") {}
+ thread_("ForwarderThread") {
+ }
void Start() {
thread_.Start();
@@ -238,54 +108,42 @@ class Forwarder {
base::Bind(&Forwarder::ThreadHandler, base::Unretained(this)));
}
- void Stop() { deletion_notifier_.Notify(); }
-
private:
void ThreadHandler() {
+ const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1;
fd_set read_fds;
fd_set write_fds;
// Copy from socket1 to socket2
BufferedCopier buffer1(socket1_.get(), socket2_.get());
-
// Copy from socket2 to socket1
BufferedCopier buffer2(socket2_.get(), socket1_.get());
- buffer1.SetPeer(&buffer2);
- buffer2.SetPeer(&buffer1);
-
- for (;;) {
+ bool run = true;
+ while (run) {
FD_ZERO(&read_fds);
FD_ZERO(&write_fds);
- int max_fd = -1;
- buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd);
- buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd);
+ buffer1.AddToReadSet(&read_fds);
+ buffer2.AddToReadSet(&read_fds);
+ buffer1.AddToWriteSet(&write_fds);
+ buffer2.AddToWriteSet(&write_fds);
- if (max_fd < 0) {
- // Both buffers are closed. Exit immediately.
- break;
- }
-
- const int deletion_fd = deletion_notifier_.receiver_fd();
- if (deletion_fd >= 0) {
- FD_SET(deletion_fd, &read_fds);
- max_fd = std::max(max_fd, deletion_fd);
- }
-
- if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <=
- 0) {
+ if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) {
PLOG(ERROR) << "select";
break;
}
-
- buffer1.ProcessSelect(read_fds, write_fds);
- buffer2.ProcessSelect(read_fds, write_fds);
-
- if (deletion_fd >= 0 && FD_ISSET(deletion_fd, &read_fds)) {
- buffer1.Close();
- buffer2.Close();
- }
+ // When a socket in the read set closes the connection, select() returns
+ // with that socket descriptor set as "ready to read". When we call
+ // TryRead() below, it will return false, but the while loop will continue
+ // to run until all the write operations are finished, to make sure the
+ // buffers are completely flushed out.
+
+ // Keep running while we have some operation to do.
+ run = buffer1.TryRead(read_fds);
+ run = run || buffer2.TryRead(read_fds);
+ run = run || buffer1.TryWrite(write_fds);
+ run = run || buffer2.TryWrite(write_fds);
}
// Note that the thread that |destruction_runner_| runs tasks on could be
@@ -294,13 +152,12 @@ class Forwarder {
socket1_.reset();
socket2_.reset();
- // Ensure the object is destroyed on the thread that created it.
+ // Note that base::Thread must be destroyed on the thread it was created on.
destructor_runner_->DeleteSoon(FROM_HERE, this);
}
scoped_ptr<Socket> socket1_;
scoped_ptr<Socket> socket2_;
- PipeNotifier deletion_notifier_;
scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_;
base::Thread thread_;
};
diff --git a/tools/android/forwarder2/forwarder.h b/tools/android/forwarder2/forwarder.h
index d4d2762..651b5e8 100644
--- a/tools/android/forwarder2/forwarder.h
+++ b/tools/android/forwarder2/forwarder.h
@@ -6,6 +6,7 @@
#define TOOLS_ANDROID_FORWARDER2_FORWARDER_H_
#include "base/memory/scoped_ptr.h"
+#include "base/threading/thread.h"
namespace forwarder2 {
diff --git a/tools/android/forwarder2/socket.cc b/tools/android/forwarder2/socket.cc
index fb0b48f..d6a4737 100644
--- a/tools/android/forwarder2/socket.cc
+++ b/tools/android/forwarder2/socket.cc
@@ -287,6 +287,19 @@ int Socket::GetPort() {
return port_;
}
+bool Socket::IsFdInSet(const fd_set& fds) const {
+ if (IsClosed())
+ return false;
+ return FD_ISSET(socket_, &fds);
+}
+
+bool Socket::AddFdToSet(fd_set* fds) const {
+ if (IsClosed())
+ return false;
+ FD_SET(socket_, fds);
+ return true;
+}
+
int Socket::ReadNumBytes(void* buffer, size_t num_bytes) {
int bytes_read = 0;
int ret = 1;
@@ -429,6 +442,11 @@ bool Socket::WaitForEvent(EventType type, int timeout_secs) {
}
// static
+int Socket::GetHighestFileDescriptor(const Socket& s1, const Socket& s2) {
+ return std::max(s1.socket_, s2.socket_);
+}
+
+// static
pid_t Socket::GetUnixDomainSocketProcessOwner(const std::string& path) {
Socket socket;
if (!socket.ConnectUnix(path))
diff --git a/tools/android/forwarder2/socket.h b/tools/android/forwarder2/socket.h
index 6047a1c..33d72fc 100644
--- a/tools/android/forwarder2/socket.h
+++ b/tools/android/forwarder2/socket.h
@@ -37,13 +37,14 @@ class Socket {
void Close();
bool IsClosed() const { return socket_ < 0; }
- int fd() const { return socket_; }
-
bool Accept(Socket* new_socket);
// Returns the port allocated to this socket or zero on error.
int GetPort();
+ bool IsFdInSet(const fd_set& fds) const;
+ bool AddFdToSet(fd_set* fds) const;
+
// Just a wrapper around unix read() function.
// Reads up to buffer_size, but may read less then buffer_size.
// Returns the number of bytes read.
@@ -88,6 +89,8 @@ class Socket {
bool DidReceiveEvent() const;
+ static int GetHighestFileDescriptor(const Socket& s1, const Socket& s2);
+
static pid_t GetUnixDomainSocketProcessOwner(const std::string& path);
private: