summaryrefslogtreecommitdiffstats
path: root/tools/android
diff options
context:
space:
mode:
authorpliard@chromium.org <pliard@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-01-16 05:37:37 +0000
committerpliard@chromium.org <pliard@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-01-16 05:37:37 +0000
commite10ada65e3e90d71af1cfb799ca620cbeb763bb1 (patch)
treee2868c811ee32cd7e79fab61ab5c4eec49efeb4f /tools/android
parent4ad25d6c98e037ffb915cdbbd94ec608b7278be3 (diff)
downloadchromium_src-e10ada65e3e90d71af1cfb799ca620cbeb763bb1.zip
chromium_src-e10ada65e3e90d71af1cfb799ca620cbeb763bb1.tar.gz
chromium_src-e10ada65e3e90d71af1cfb799ca620cbeb763bb1.tar.bz2
Revert "Revert 235213 "android: forwader2: Simplify Forwarder implementa...""
This relands digit@'s CL that completely reworks the traffic forwarding logic and that should guarantee in particular that select() never blocks without a good reason (i.e. that we are not leaking Forwarder instances due to the fact that their internal thread would be blocked on select()). The initial problem with r235213 was only the use of PipeNotifier which added two file descriptors for each Forwarder instance (in addition to the two sockets the Forwarder instance operates on). We were exceeding the file descriptor limit (1024) on the intl_ko_th_vi Telemetry page set which is not surprising since Chrome easily keeps 256 sockets around in its socket pool. Therefore this change simply removes the PipeNotifier since we were not (yet) using it anyway. The initial problem was reproduced and the fix was tested as well with: tools/perf/run_measurement -v --browser=android-chromium-testshell \ --show-stdout page_cycler tools/perf/page_sets/intl_ko_th_vi.json Note that a next step would be to have all the Forwarder instances operate on a common thread owned by DeviceListener so that we don't end up with 256 threads. BUG=332403 Review URL: https://codereview.chromium.org/137923004 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@245144 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools/android')
-rw-r--r--tools/android/forwarder2/forwarder.cc247
-rw-r--r--tools/android/forwarder2/forwarder.h1
-rw-r--r--tools/android/forwarder2/socket.cc18
-rw-r--r--tools/android/forwarder2/socket.h7
4 files changed, 191 insertions, 82 deletions
diff --git a/tools/android/forwarder2/forwarder.cc b/tools/android/forwarder2/forwarder.cc
index df4c29c..2f25b9c 100644
--- a/tools/android/forwarder2/forwarder.cc
+++ b/tools/android/forwarder2/forwarder.cc
@@ -8,77 +8,208 @@
#include "base/bind.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
+#include "base/message_loop/message_loop_proxy.h"
#include "base/posix/eintr_wrapper.h"
#include "base/single_thread_task_runner.h"
+#include "base/threading/thread.h"
#include "tools/android/forwarder2/socket.h"
namespace forwarder2 {
namespace {
// Helper class to buffer reads and writes from one socket to another.
+// Each implements a small buffer connected two one input socket, and
+// one output socket.
+//
+// socket_from_ ---> [BufferedCopier] ---> socket_to_
+//
+// These objects are used in a pair to handle duplex traffic, as in:
+//
+// ------> [BufferedCopier_1] --->
+// / \
+// socket_1 * * socket_2
+// \ /
+// <------ [BufferedCopier_2] <----
+//
+// When a BufferedCopier is in the READING state (see below), it only listens
+// to events on its input socket, and won't detect when its output socket
+// disconnects. To work around this, its peer will call its Close() method
+// when that happens.
+
class BufferedCopier {
public:
+ // Possible states:
+ // READING - Empty buffer and Waiting for input.
+ // WRITING - Data in buffer, and waiting for output.
+ // CLOSING - Like WRITING, but do not try to read after that.
+ // CLOSED - Completely closed.
+ //
+ // State transitions are:
+ //
+ // T01: READING ---[receive data]---> WRITING
+ // T02: READING ---[error on input socket]---> CLOSED
+ // T03: READING ---[Close() call]---> CLOSED
+ //
+ // T04: WRITING ---[write partial data]---> WRITING
+ // T05: WRITING ---[write all data]----> READING
+ // T06: WRITING ---[error on output socket]----> CLOSED
+ // T07: WRITING ---[Close() call]---> CLOSING
+ //
+ // T08: CLOSING ---[write partial data]---> CLOSING
+ // T09: CLOSING ---[write all data]----> CLOSED
+ // T10: CLOSING ---[Close() call]---> CLOSING
+ // T11: CLOSING ---[error on output socket] ---> CLOSED
+ //
+ enum State {
+ STATE_READING = 0,
+ STATE_WRITING = 1,
+ STATE_CLOSING = 2,
+ STATE_CLOSED = 3,
+ };
+
// Does NOT own the pointers.
- BufferedCopier(Socket* socket_from,
- Socket* socket_to)
+ BufferedCopier(Socket* socket_from, Socket* socket_to)
: socket_from_(socket_from),
socket_to_(socket_to),
bytes_read_(0),
- write_offset_(0) {
- }
+ write_offset_(0),
+ peer_(NULL),
+ state_(STATE_READING) {}
- bool AddToReadSet(fd_set* read_fds) {
- if (bytes_read_ == 0)
- return socket_from_->AddFdToSet(read_fds);
- return false;
+ // Sets the 'peer_' field pointing to the other BufferedCopier in a pair.
+ void SetPeer(BufferedCopier* peer) {
+ DCHECK(!peer_);
+ peer_ = peer;
}
- bool AddToWriteSet(fd_set* write_fds) {
- if (write_offset_ < bytes_read_)
- return socket_to_->AddFdToSet(write_fds);
- return false;
+ // Gently asks to close a buffer. Called either by the peer or the forwarder.
+ void Close() {
+ switch (state_) {
+ case STATE_READING:
+ state_ = STATE_CLOSED; // T03
+ break;
+ case STATE_WRITING:
+ state_ = STATE_CLOSING; // T07
+ break;
+ case STATE_CLOSING:
+ break; // T10
+ case STATE_CLOSED:
+ ;
+ }
}
- bool TryRead(const fd_set& read_fds) {
- if (!socket_from_->IsFdInSet(read_fds))
- return false;
- if (bytes_read_ != 0) // Can't read.
- return false;
- int ret = socket_from_->Read(buffer_, kBufferSize);
- if (ret > 0) {
- bytes_read_ = ret;
- return true;
+ // Call this before select(). This updates |read_fds|,
+ // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed.
+ void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
+ int fd;
+ switch (state_) {
+ case STATE_READING:
+ DCHECK(bytes_read_ == 0);
+ DCHECK(write_offset_ == 0);
+ fd = socket_from_->fd();
+ if (fd < 0) {
+ ForceClose(); // T02
+ return;
+ }
+ FD_SET(fd, read_fds);
+ break;
+
+ case STATE_WRITING:
+ case STATE_CLOSING:
+ DCHECK(bytes_read_ > 0);
+ DCHECK(write_offset_ < bytes_read_);
+ fd = socket_to_->fd();
+ if (fd < 0) {
+ ForceClose(); // T06
+ return;
+ }
+ FD_SET(fd, write_fds);
+ break;
+
+ case STATE_CLOSED:
+ return;
}
- return false;
+ *max_fd = std::max(*max_fd, fd);
}
- bool TryWrite(const fd_set& write_fds) {
- if (!socket_to_->IsFdInSet(write_fds))
- return false;
- if (write_offset_ >= bytes_read_) // Nothing to write.
- return false;
- int ret = socket_to_->Write(buffer_ + write_offset_,
- bytes_read_ - write_offset_);
- if (ret > 0) {
- write_offset_ += ret;
- if (write_offset_ == bytes_read_) {
+ // Call this after a select() call to operate over the buffer.
+ void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) {
+ int fd, ret;
+ switch (state_) {
+ case STATE_READING:
+ fd = socket_from_->fd();
+ if (fd < 0) {
+ state_ = STATE_CLOSED; // T02
+ return;
+ }
+ if (!FD_ISSET(fd, &read_fds))
+ return;
+
+ ret = socket_from_->NonBlockingRead(buffer_, kBufferSize);
+ if (ret <= 0) {
+ ForceClose(); // T02
+ return;
+ }
+ bytes_read_ = ret;
+ write_offset_ = 0;
+ state_ = STATE_WRITING; // T01
+ break;
+
+ case STATE_WRITING:
+ case STATE_CLOSING:
+ fd = socket_to_->fd();
+ if (fd < 0) {
+ ForceClose(); // T06 + T11
+ return;
+ }
+ if (!FD_ISSET(fd, &write_fds))
+ return;
+
+ ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_,
+ bytes_read_ - write_offset_);
+ if (ret <= 0) {
+ ForceClose(); // T06 + T11
+ return;
+ }
+
+ write_offset_ += ret;
+ if (write_offset_ < bytes_read_)
+ return; // T08 + T04
+
write_offset_ = 0;
bytes_read_ = 0;
- }
- return true;
+ if (state_ == STATE_CLOSING) {
+ ForceClose(); // T09
+ return;
+ }
+ state_ = STATE_READING; // T05
+ break;
+
+ case STATE_CLOSED:
+ ;
}
- return false;
}
private:
+ // Internal method used to close the buffer and notify the peer, if any.
+ void ForceClose() {
+ if (peer_) {
+ peer_->Close();
+ peer_ = NULL;
+ }
+ state_ = STATE_CLOSED;
+ }
+
// Not owned.
Socket* socket_from_;
Socket* socket_to_;
- // A big buffer to let our file-over-http bridge work more like real file.
+ // A big buffer to let the file-over-http bridge work more like real file.
static const int kBufferSize = 1024 * 128;
int bytes_read_;
int write_offset_;
+ BufferedCopier* peer_;
+ State state_;
char buffer_[kBufferSize];
DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
@@ -94,12 +225,13 @@ class BufferedCopier {
// created it.
class Forwarder {
public:
+ // Create a new Forwarder instance. |socket1| and |socket2| are the two socket
+ // endpoints.
Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2)
: socket1_(socket1.Pass()),
socket2_(socket2.Pass()),
destructor_runner_(base::MessageLoopProxy::current()),
- thread_("ForwarderThread") {
- }
+ thread_("ForwarderThread") {}
void Start() {
thread_.Start();
@@ -110,40 +242,39 @@ class Forwarder {
private:
void ThreadHandler() {
- const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1;
fd_set read_fds;
fd_set write_fds;
// Copy from socket1 to socket2
BufferedCopier buffer1(socket1_.get(), socket2_.get());
+
// Copy from socket2 to socket1
BufferedCopier buffer2(socket2_.get(), socket1_.get());
- bool run = true;
- while (run) {
+ buffer1.SetPeer(&buffer2);
+ buffer2.SetPeer(&buffer1);
+
+ for (;;) {
FD_ZERO(&read_fds);
FD_ZERO(&write_fds);
- buffer1.AddToReadSet(&read_fds);
- buffer2.AddToReadSet(&read_fds);
- buffer1.AddToWriteSet(&write_fds);
- buffer2.AddToWriteSet(&write_fds);
+ int max_fd = -1;
+ buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd);
+ buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd);
- if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) {
+ if (max_fd < 0) {
+ // Both buffers are closed. Exit immediately.
+ break;
+ }
+
+ if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <=
+ 0) {
PLOG(ERROR) << "select";
break;
}
- // When a socket in the read set closes the connection, select() returns
- // with that socket descriptor set as "ready to read". When we call
- // TryRead() below, it will return false, but the while loop will continue
- // to run until all the write operations are finished, to make sure the
- // buffers are completely flushed out.
-
- // Keep running while we have some operation to do.
- run = buffer1.TryRead(read_fds);
- run = run || buffer2.TryRead(read_fds);
- run = run || buffer1.TryWrite(write_fds);
- run = run || buffer2.TryWrite(write_fds);
+
+ buffer1.ProcessSelect(read_fds, write_fds);
+ buffer2.ProcessSelect(read_fds, write_fds);
}
// Note that the thread that |destruction_runner_| runs tasks on could be
@@ -152,7 +283,7 @@ class Forwarder {
socket1_.reset();
socket2_.reset();
- // Note that base::Thread must be destroyed on the thread it was created on.
+ // Ensure the object is destroyed on the thread that created it.
destructor_runner_->DeleteSoon(FROM_HERE, this);
}
diff --git a/tools/android/forwarder2/forwarder.h b/tools/android/forwarder2/forwarder.h
index 651b5e8..d4d2762 100644
--- a/tools/android/forwarder2/forwarder.h
+++ b/tools/android/forwarder2/forwarder.h
@@ -6,7 +6,6 @@
#define TOOLS_ANDROID_FORWARDER2_FORWARDER_H_
#include "base/memory/scoped_ptr.h"
-#include "base/threading/thread.h"
namespace forwarder2 {
diff --git a/tools/android/forwarder2/socket.cc b/tools/android/forwarder2/socket.cc
index d6a4737..fb0b48f 100644
--- a/tools/android/forwarder2/socket.cc
+++ b/tools/android/forwarder2/socket.cc
@@ -287,19 +287,6 @@ int Socket::GetPort() {
return port_;
}
-bool Socket::IsFdInSet(const fd_set& fds) const {
- if (IsClosed())
- return false;
- return FD_ISSET(socket_, &fds);
-}
-
-bool Socket::AddFdToSet(fd_set* fds) const {
- if (IsClosed())
- return false;
- FD_SET(socket_, fds);
- return true;
-}
-
int Socket::ReadNumBytes(void* buffer, size_t num_bytes) {
int bytes_read = 0;
int ret = 1;
@@ -442,11 +429,6 @@ bool Socket::WaitForEvent(EventType type, int timeout_secs) {
}
// static
-int Socket::GetHighestFileDescriptor(const Socket& s1, const Socket& s2) {
- return std::max(s1.socket_, s2.socket_);
-}
-
-// static
pid_t Socket::GetUnixDomainSocketProcessOwner(const std::string& path) {
Socket socket;
if (!socket.ConnectUnix(path))
diff --git a/tools/android/forwarder2/socket.h b/tools/android/forwarder2/socket.h
index 33d72fc..6047a1c 100644
--- a/tools/android/forwarder2/socket.h
+++ b/tools/android/forwarder2/socket.h
@@ -37,14 +37,13 @@ class Socket {
void Close();
bool IsClosed() const { return socket_ < 0; }
+ int fd() const { return socket_; }
+
bool Accept(Socket* new_socket);
// Returns the port allocated to this socket or zero on error.
int GetPort();
- bool IsFdInSet(const fd_set& fds) const;
- bool AddFdToSet(fd_set* fds) const;
-
// Just a wrapper around unix read() function.
// Reads up to buffer_size, but may read less then buffer_size.
// Returns the number of bytes read.
@@ -89,8 +88,6 @@ class Socket {
bool DidReceiveEvent() const;
- static int GetHighestFileDescriptor(const Socket& s1, const Socket& s2);
-
static pid_t GetUnixDomainSocketProcessOwner(const std::string& path);
private: