summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordkegel@google.com <dkegel@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2008-09-18 00:10:38 +0000
committerdkegel@google.com <dkegel@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2008-09-18 00:10:38 +0000
commitdab8619b4848bb10dfe78a3f0c59d6e661ec9610 (patch)
tree187818b2b685f5545832b45bbd20d01b53aca6d5
parent0fe41aecdbfd97633bda1a892b2f3efb60f1d3ed (diff)
downloadchromium_src-dab8619b4848bb10dfe78a3f0c59d6e661ec9610.zip
chromium_src-dab8619b4848bb10dfe78a3f0c59d6e661ec9610.tar.gz
chromium_src-dab8619b4848bb10dfe78a3f0c59d6e661ec9610.tar.bz2
Make tcp_client_socket_unittest pass on Linux.
Requires another changeset that puts libevent in third_party; I'll upload that next. This is not the final word; it makes too many syscalls per read. But it's a start. Review URL: http://codereview.chromium.org/3202 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@2346 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--base/SConscript2
-rw-r--r--base/message_loop.cc22
-rw-r--r--base/message_loop.h15
-rw-r--r--base/message_pump_libevent.cc179
-rw-r--r--base/message_pump_libevent.h89
-rw-r--r--build/SConscript.main5
-rw-r--r--chrome/SConscript1
-rw-r--r--net/SConscript4
-rw-r--r--net/base/tcp_client_socket.h71
-rw-r--r--net/base/tcp_client_socket_libevent.cc288
10 files changed, 656 insertions, 20 deletions
diff --git a/base/SConscript b/base/SConscript
index 6012b38..dedcc3c 100644
--- a/base/SConscript
+++ b/base/SConscript
@@ -159,6 +159,7 @@ if env['PLATFORM'] == 'posix':
'base_paths_linux.cc',
'file_util_linux.cc',
'hmac_nss.cc',
+ 'message_pump_libevent.cc',
'nss_init.cc',
'sys_string_conversions_linux.cc',
'worker_pool.cc',
@@ -190,6 +191,7 @@ env_tests.Prepend(
LIBS = [
'base',
'base_gfx',
+ 'event',
'gtest',
'icuuc',
'libpng',
diff --git a/base/message_loop.cc b/base/message_loop.cc
index 4b95532..48e05eb 100644
--- a/base/message_loop.cc
+++ b/base/message_loop.cc
@@ -13,6 +13,10 @@
#include "base/string_util.h"
#include "base/thread_local.h"
+#if defined(OS_POSIX)
+#include "base/message_pump_libevent.h"
+#endif
+
// A lazily created thread local storage for quick access to a thread's message
// loop, if one exists. This should be safe and free of static constructors.
static base::LazyInstance<base::ThreadLocalPointer<MessageLoop> > lazy_tls_ptr(
@@ -78,6 +82,12 @@ MessageLoop::MessageLoop(Type type)
} else {
pump_ = new base::MessagePumpWin();
}
+#elif defined(OS_POSIX)
+ if (type_ == TYPE_IO) {
+ pump_ = new base::MessagePumpLibevent();
+ } else {
+ pump_ = new base::MessagePumpDefault();
+ }
#else
pump_ = new base::MessagePumpDefault();
#endif
@@ -561,4 +571,14 @@ void MessageLoopForIO::WatchObject(HANDLE object, Watcher* watcher) {
pump_win()->WatchObject(object, watcher);
}
-#endif // defined(OS_WIN)
+#elif defined(OS_POSIX)
+
+void MessageLoopForIO::WatchSocket(int socket, short interest_mask,
+ struct event* e, Watcher* watcher) {
+ pump_libevent()->WatchSocket(socket, interest_mask, e, watcher);
+}
+
+void MessageLoopForIO::UnwatchSocket(struct event* e) {
+ pump_libevent()->UnwatchSocket(e);
+}
+#endif
diff --git a/base/message_loop.h b/base/message_loop.h
index a65a2e8..51093c6 100644
--- a/base/message_loop.h
+++ b/base/message_loop.h
@@ -21,6 +21,8 @@
// We need this to declare base::MessagePumpWin::Dispatcher, which we should
// really just eliminate.
#include "base/message_pump_win.h"
+#elif defined(OS_POSIX)
+#include "base/message_pump_libevent.h"
#endif
// A MessageLoop is used to process events for a particular thread. There is
@@ -274,6 +276,11 @@ class MessageLoop : public base::MessagePump::Delegate {
base::MessagePumpWin* pump_win() {
return static_cast<base::MessagePumpWin*>(pump_.get());
}
+#elif defined(OS_POSIX)
+ base::MessagePumpLibevent* pump_libevent() {
+ return static_cast<base::MessagePumpLibevent*>(pump_.get());
+ }
+ protected:
#endif
// A function to encapsulate all the exception handling capability in the
@@ -450,6 +457,14 @@ class MessageLoopForIO : public MessageLoop {
// Please see MessagePumpWin for definitions of these methods.
void WatchObject(HANDLE object, Watcher* watcher);
+
+#elif defined(OS_POSIX)
+ typedef base::MessagePumpLibevent::Watcher Watcher;
+
+ // Please see MessagePumpLibevent for definitions of these methods.
+ void WatchSocket(int socket, short interest_mask,
+ struct event* e, Watcher* watcher);
+ void UnwatchSocket(struct event* e);
#endif // defined(OS_WIN)
};
diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc
new file mode 100644
index 0000000..38e7db6
--- /dev/null
+++ b/base/message_pump_libevent.cc
@@ -0,0 +1,179 @@
+// Copyright (c) 2008 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/message_pump_libevent.h"
+
+#include "base/logging.h"
+#include "base/time.h"
+#include "third_party/libevent/event.h"
+
+#include <fcntl.h>
+
+namespace base {
+
+// Return 0 on success
+// Too small a function to bother putting in a library?
+static int SetNonBlocking(int fd)
+{
+ int flags = fcntl(fd, F_GETFL, 0);
+ if (-1 == flags)
+ flags = 0;
+ return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+}
+
+// Called if a byte is received on the wakeup pipe.
+void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
+
+ base::MessagePumpLibevent* that =
+ static_cast<base::MessagePumpLibevent*>(context);
+ DCHECK(that->wakeup_pipe_out_ == socket);
+
+ // Remove and discard the wakeup byte.
+ char buf;
+ int nread = read(socket, &buf, 1);
+ DCHECK(nread == 1);
+ // Tell libevent to break out of inner loop.
+ event_base_loopbreak(that->event_base_);
+}
+
+MessagePumpLibevent::MessagePumpLibevent()
+ : keep_running_(true),
+ in_run_(false),
+ event_base_(event_base_new()),
+ wakeup_pipe_in_(-1),
+ wakeup_pipe_out_(-1) {
+ if (!Init())
+ NOTREACHED();
+}
+
+bool MessagePumpLibevent::Init() {
+ int fds[2];
+ if (pipe(fds))
+ return false;
+ if (SetNonBlocking(fds[0]))
+ return false;
+ if (SetNonBlocking(fds[1]))
+ return false;
+ wakeup_pipe_out_ = fds[0];
+ wakeup_pipe_in_ = fds[1];
+
+ wakeup_event_ = new event;
+ event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
+ OnWakeup, this);
+ event_base_set(event_base_, wakeup_event_);
+
+ if (event_add(wakeup_event_, 0))
+ return false;
+ return true;
+}
+
+MessagePumpLibevent::~MessagePumpLibevent() {
+ DCHECK(wakeup_event_);
+ DCHECK(event_base_);
+ event_del(wakeup_event_);
+ delete wakeup_event_;
+ event_base_free(event_base_);
+}
+
+void MessagePumpLibevent::WatchSocket(int socket, short interest_mask,
+ event* e, Watcher* watcher) {
+
+ // Set current interest mask and message pump for this event
+ event_set(e, socket, interest_mask, OnReadinessNotification, watcher);
+
+ // Tell libevent which message pump this socket will belong to when we add it.
+ event_base_set(event_base_, e);
+
+ // Add this socket to the list of monitored sockets.
+ if (event_add(e, NULL))
+ NOTREACHED();
+}
+
+void MessagePumpLibevent::UnwatchSocket(event* e) {
+ // Remove this socket from the list of monitored sockets.
+ if (event_del(e))
+ NOTREACHED();
+}
+
+void MessagePumpLibevent::OnReadinessNotification(int socket, short flags,
+ void* context) {
+ // The given socket is ready for I/O.
+ // Tell the owner what kind of I/O the socket is ready for.
+ Watcher* watcher = static_cast<Watcher*>(context);
+ watcher->OnSocketReady(flags);
+}
+
+// Reentrant!
+void MessagePumpLibevent::Run(Delegate* delegate) {
+ DCHECK(keep_running_) << "Quit must have been called outside of Run!";
+
+ bool old_in_run = in_run_;
+ in_run_ = true;
+
+ for (;;) {
+ bool did_work = delegate->DoWork();
+ if (!keep_running_)
+ break;
+
+ did_work |= delegate->DoDelayedWork(&delayed_work_time_);
+ if (!keep_running_)
+ break;
+
+ if (did_work)
+ continue;
+
+ did_work = delegate->DoIdleWork();
+ if (!keep_running_)
+ break;
+
+ if (did_work)
+ continue;
+
+ // EVLOOP_ONCE tells libevent to only block once,
+ // but to service all pending events when it wakes up.
+ if (delayed_work_time_.is_null()) {
+ event_base_loop(event_base_, EVLOOP_ONCE);
+ } else {
+ TimeDelta delay = delayed_work_time_ - Time::Now();
+ if (delay > TimeDelta()) {
+ struct timeval poll_tv;
+ poll_tv.tv_sec = delay.InSeconds();
+ poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
+ event_base_loopexit(event_base_, &poll_tv);
+ event_base_loop(event_base_, EVLOOP_ONCE);
+ } else {
+ // It looks like delayed_work_time_ indicates a time in the past, so we
+ // need to call DoDelayedWork now.
+ delayed_work_time_ = Time();
+ }
+ }
+ }
+
+ keep_running_ = true;
+ in_run_ = old_in_run;
+}
+
+void MessagePumpLibevent::Quit() {
+ DCHECK(in_run_);
+ // Tell both libevent and Run that they should break out of their loops.
+ keep_running_ = false;
+ ScheduleWork();
+}
+
+void MessagePumpLibevent::ScheduleWork() {
+ // Tell libevent (in a threadsafe way) that it should break out of its loop.
+ char buf = 0;
+ int nwrite = write(wakeup_pipe_in_, &buf, 1);
+ DCHECK(nwrite == 1);
+}
+
+void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) {
+ // We know that we can't be blocked on Wait right now since this method can
+ // only be called on the same thread as Run, so we only need to update our
+ // record of how long to sleep when we do sleep.
+ delayed_work_time_ = delayed_work_time;
+}
+
+} // namespace base
+
diff --git a/base/message_pump_libevent.h b/base/message_pump_libevent.h
new file mode 100644
index 0000000..7534e50
--- /dev/null
+++ b/base/message_pump_libevent.h
@@ -0,0 +1,89 @@
+// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BASE_MESSAGE_PUMP_LIBEVENT_H_
+#define BASE_MESSAGE_PUMP_LIBEVENT_H_
+
+#include "base/message_pump.h"
+#include "base/time.h"
+
+// Declare structs we need from libevent.h rather than including it
+struct event_base;
+struct event;
+
+namespace base {
+
+// Class to monitor sockets and issue callbacks when sockets are ready for I/O
+// TODO(dkegel): add support for background file IO somehow
+class MessagePumpLibevent : public MessagePump {
+ public:
+ // Used with WatchObject to asynchronously monitor the I/O readiness of a
+ // socket.
+ class Watcher {
+ public:
+ virtual ~Watcher() {}
+ // Called from MessageLoop::Run when a ready socket is detected.
+ virtual void OnSocketReady(short eventmask) = 0;
+ };
+
+ MessagePumpLibevent();
+ virtual ~MessagePumpLibevent();
+
+ // Have the current thread's message loop watch for a ready socket.
+ // Caller must provide a struct event for this socket for libevent's use.
+ // The event and interest_mask fields are defined in libevent.
+ // Returns true on success.
+ // TODO(dkegel): hide libevent better; abstraction still too leaky
+ // TODO(dkegel): better error handing
+ // TODO(dkegel): switch to edge-triggered readiness notification
+ void WatchSocket(int socket, short interest_mask, event* e, Watcher*);
+
+ // Stop watching a socket.
+ // Event was previously initialized by WatchSocket.
+ void UnwatchSocket(event* e);
+
+ // MessagePump methods:
+ virtual void Run(Delegate* delegate);
+ virtual void Quit();
+ virtual void ScheduleWork();
+ virtual void ScheduleDelayedWork(const Time& delayed_work_time);
+
+ private:
+
+ // Risky part of constructor. Returns true on success.
+ bool Init();
+
+ // This flag is set to false when Run should return.
+ bool keep_running_;
+
+ // This flag is set when inside Run.
+ bool in_run_;
+
+ // The time at which we should call DoDelayedWork.
+ Time delayed_work_time_;
+
+ // Libevent dispatcher. Watches all sockets registered with it, and sends
+ // readiness callbacks when a socket is ready for I/O.
+ event_base* event_base_;
+
+ // Called by libevent to tell us a registered socket is ready
+ static void OnReadinessNotification(int socket, short flags, void* context);
+
+ // Unix pipe used to implement ScheduleWork()
+ // ... callback; called by libevent inside Run() when pipe is ready to read
+ static void OnWakeup(int socket, short flags, void* context);
+ // ... write end; ScheduleWork() writes a single byte to it
+ int wakeup_pipe_in_;
+ // ... read end; OnWakeup reads it and then breaks Run() out of its sleep
+ int wakeup_pipe_out_;
+ // ... libevent wrapper for read end
+ event* wakeup_event_;
+
+ DISALLOW_COPY_AND_ASSIGN(MessagePumpLibevent);
+};
+
+} // namespace base
+
+#endif // BASE_MESSAGE_PUMP_LIBEVENT_H_
+
diff --git a/build/SConscript.main b/build/SConscript.main
index 25b8fec..df6e80d 100644
--- a/build/SConscript.main
+++ b/build/SConscript.main
@@ -63,6 +63,7 @@ env = Environment(
BSPATCH_DIR = '$THIRD_PARTY_DIR/bspatch',
BZIP2_DIR = '$THIRD_PARTY_DIR/bzip2',
ICU38_DIR = '$THIRD_PARTY_DIR/icu38',
+ LIBEVENT_DIR = '$THIRD_PARTY_DIR/libevent',
LIBJPEG_DIR = '$THIRD_PARTY_DIR/libjpeg',
LIBPNG_DIR = '$THIRD_PARTY_DIR/libpng',
LIBXML_DIR = '$THIRD_PARTY_DIR/libxml',
@@ -500,6 +501,10 @@ if LoadComponent('third_party'):
'$LIBXML_DIR/SConscript',
'$LIBXSLT_DIR/SConscript',
])
+ if env['PLATFORM'] == 'posix':
+ sconscripts.extend([
+ '$LIBEVENT_DIR/SConscript',
+ ])
# This is temporary until we get this lib to build on other platforms.
if env['PLATFORM'] == 'win32':
sconscripts.extend([
diff --git a/chrome/SConscript b/chrome/SConscript
index 282e6c2..cef80b4c 100644
--- a/chrome/SConscript
+++ b/chrome/SConscript
@@ -186,6 +186,7 @@ libs = [
'plugin/plugin.lib',
'renderer/renderer.lib',
'third_party/hunspell/hunspell.lib',
+ 'third_party/libevent/libevent.lib',
'third_party/sqlite/sqlite.lib',
'views/views.lib',
'$V8_DIR/v8.lib',
diff --git a/net/SConscript b/net/SConscript
index b735894..98d0730 100644
--- a/net/SConscript
+++ b/net/SConscript
@@ -132,6 +132,7 @@ if env['PLATFORM'] == 'posix':
input_files.extend([
# TODO(tc): gnome-vfs? xdgmime? /etc/mime.types?
'base/platform_mime_util_linux.cc',
+ 'base/tcp_client_socket_libevent.cc',
])
if env['PLATFORM'] in ('darwin', 'posix'):
@@ -168,6 +169,7 @@ env_tests.Prepend(
'net', # net must come before base and modp_b64
'bzip2', # bzip2 must come before base
'base',
+ 'event',
'googleurl',
'gtest',
'icuuc',
@@ -223,6 +225,7 @@ unittest_files = [
'base/net_util_unittest.cc',
'base/registry_controlled_domain_unittest.cc',
'base/run_all_unittests.cc',
+ 'base/tcp_client_socket_unittest.cc',
'base/test_completion_callback_unittest.cc',
'disk_cache/addr_unittest.cc',
'disk_cache/block_files_unittest.cc',
@@ -241,7 +244,6 @@ if env['PLATFORM'] == 'win32':
'base/directory_lister_unittest.cc',
'base/ssl_config_service_unittest.cc',
'base/ssl_client_socket_unittest.cc',
- 'base/tcp_client_socket_unittest.cc',
'base/wininet_util_unittest.cc',
'disk_cache/backend_unittest.cc',
'http/http_cache_unittest.cc',
diff --git a/net/base/tcp_client_socket.h b/net/base/tcp_client_socket.h
index 06a57fe..32cd59a 100644
--- a/net/base/tcp_client_socket.h
+++ b/net/base/tcp_client_socket.h
@@ -5,9 +5,19 @@
#ifndef NET_BASE_TCP_CLIENT_SOCKET_H_
#define NET_BASE_TCP_CLIENT_SOCKET_H_
-#include <ws2tcpip.h>
+#include "build/build_config.h"
+#if defined(OS_WIN)
+#include <ws2tcpip.h>
#include "base/object_watcher.h"
+#elif defined(OS_POSIX)
+struct event; // From libevent
+#define SOCKET int
+#include "base/message_pump_libevent.h"
+#endif
+
+#include "base/completion_callback.h"
+#include "base/scoped_ptr.h"
#include "net/base/address_list.h"
#include "net/base/client_socket.h"
@@ -18,7 +28,12 @@ namespace net {
// NOTE: The implementation supports half duplex only. Read and Write calls
// must not be in progress at the same time.
class TCPClientSocket : public ClientSocket,
- public base::ObjectWatcher::Delegate {
+#if defined(OS_WIN)
+ public base::ObjectWatcher::Delegate
+#elif defined(OS_POSIX)
+ public base::MessagePumpLibevent::Watcher
+#endif
+{
public:
// The IP address(es) and port number to connect to. The TCP socket will try
// each IP address in the list until it succeeds in establishing a
@@ -34,31 +49,23 @@ class TCPClientSocket : public ClientSocket,
virtual bool IsConnected() const;
// Socket methods:
+ // Try to transfer buf_len bytes to/from socket.
+ // If a result is available now, return it; else call back later with one.
+ // Do not call again until a result is returned!
+ // If any bytes were transferred, the result is the byte count.
+ // On error, result is a negative error code; see net/base/net_error_list.h
+ // TODO: what would a zero return value indicate?
+ // TODO: support multiple outstanding requests?
virtual int Read(char* buf, int buf_len, CompletionCallback* callback);
virtual int Write(const char* buf, int buf_len, CompletionCallback* callback);
private:
- int CreateSocket(const struct addrinfo* ai);
- void DoCallback(int rv);
- void DidCompleteConnect();
- void DidCompleteIO();
-
- // base::ObjectWatcher::Delegate methods:
- virtual void OnObjectSignaled(HANDLE object);
-
SOCKET socket_;
- OVERLAPPED overlapped_;
- WSABUF buffer_;
-
- base::ObjectWatcher watcher_;
-
- CompletionCallback* callback_;
// The list of addresses we should try in order to establish a connection.
AddressList addresses_;
- // The addrinfo that we are attempting to use or NULL if all addrinfos have
- // been tried.
+ // Where we are in above list, or NULL if all addrinfos have been tried.
const struct addrinfo* current_ai_;
enum WaitState {
@@ -68,6 +75,34 @@ class TCPClientSocket : public ClientSocket,
WAITING_WRITE
};
WaitState wait_state_;
+
+#if defined(OS_WIN)
+ // base::ObjectWatcher::Delegate methods:
+ virtual void OnObjectSignaled(HANDLE object);
+
+ OVERLAPPED overlapped_;
+ WSABUF buffer_;
+
+ base::ObjectWatcher watcher_;
+#elif defined(OS_POSIX)
+ // The socket's libevent wrapper
+ scoped_ptr<event> event_;
+
+ // Called by MessagePumpLibevent when the socket is ready to do I/O
+ void OnSocketReady(short flags);
+
+ // The buffer used by OnSocketReady to retry Read and Write requests
+ char* buf_;
+ int buf_len_;
+#endif
+
+ // External callback; called when read or write is complete.
+ CompletionCallback* callback_;
+
+ int CreateSocket(const struct addrinfo* ai);
+ void DoCallback(int rv);
+ void DidCompleteConnect();
+ void DidCompleteIO();
};
} // namespace net
diff --git a/net/base/tcp_client_socket_libevent.cc b/net/base/tcp_client_socket_libevent.cc
new file mode 100644
index 0000000..7f815c3
--- /dev/null
+++ b/net/base/tcp_client_socket_libevent.cc
@@ -0,0 +1,288 @@
+// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/base/tcp_client_socket.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <sys/socket.h>
+
+#include "base/message_loop.h"
+#include "net/base/net_errors.h"
+#include "third_party/libevent/event.h"
+
+
+namespace net {
+
+const int kInvalidSocket = -1;
+
+// Return 0 on success
+// Too small a function to bother putting in a library?
+static int SetNonBlocking(int fd)
+{
+ int flags = fcntl(fd, F_GETFL, 0);
+ if (-1 == flags)
+ flags = 0;
+ return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+}
+
+// Convert values from <errno.h> to values from "net/base/net_errors.h"
+static int MapPosixError(int err) {
+ // There are numerous posix error codes, but these are the ones we thus far
+ // find interesting.
+ // TODO(port): fill this with a real conversion table
+ switch (err) {
+ case EWOULDBLOCK: return ERR_IO_PENDING;
+ default:
+ return ERR_FAILED;
+ }
+}
+
+//-----------------------------------------------------------------------------
+
+TCPClientSocket::TCPClientSocket(const AddressList& addresses)
+ : socket_(kInvalidSocket),
+ addresses_(addresses),
+ current_ai_(addresses_.head()),
+ wait_state_(NOT_WAITING),
+ event_(new event) {
+}
+
+TCPClientSocket::~TCPClientSocket() {
+ Disconnect();
+}
+
+int TCPClientSocket::Connect(CompletionCallback* callback) {
+
+ // If already connected, then just return OK.
+ if (socket_ != kInvalidSocket)
+ return OK;
+
+ DCHECK(wait_state_ == NOT_WAITING);
+
+ const addrinfo* ai = current_ai_;
+ DCHECK(ai);
+
+ int rv = CreateSocket(ai);
+ if (rv != OK)
+ return rv;
+
+ if (!connect(socket_, ai->ai_addr, static_cast<int>(ai->ai_addrlen))) {
+ // Connected without waiting!
+ return OK;
+ }
+
+ // Synchronous operation not supported
+ DCHECK(callback);
+
+ if (errno != EINPROGRESS && errno != EWOULDBLOCK) {
+ LOG(ERROR) << "connect failed: " << errno;
+ return MapPosixError(errno);
+ }
+
+ // Initialize event_ and link it to our MessagePump.
+ // POLLOUT is set if the connection is established.
+ // POLLIN is set if the connection fails,
+ // so select for both read and write.
+ MessageLoopForIO::current()->WatchSocket(
+ socket_, EV_READ|EV_WRITE|EV_PERSIST, event_.get(), this);
+
+ wait_state_ = WAITING_CONNECT;
+ callback_ = callback;
+ return ERR_IO_PENDING;
+}
+
+int TCPClientSocket::ReconnectIgnoringLastError(CompletionCallback* callback) {
+ // No ignorable errors!
+ return ERR_FAILED;
+}
+
+void TCPClientSocket::Disconnect() {
+ if (socket_ == kInvalidSocket)
+ return;
+
+ MessageLoopForIO::current()->UnwatchSocket(event_.get());
+ close(socket_);
+ socket_ = kInvalidSocket;
+
+ // Reset for next time.
+ current_ai_ = addresses_.head();
+}
+
+bool TCPClientSocket::IsConnected() const {
+ if (socket_ == kInvalidSocket || wait_state_ == WAITING_CONNECT)
+ return false;
+
+ // Check if connection is alive.
+ char c;
+ int rv = recv(socket_, &c, 1, MSG_PEEK);
+ if (rv == 0)
+ return false;
+
+ return true;
+}
+
+int TCPClientSocket::Read(char* buf,
+ int buf_len,
+ CompletionCallback* callback) {
+ DCHECK(socket_ != kInvalidSocket);
+ DCHECK(wait_state_ == NOT_WAITING);
+ DCHECK(!callback_);
+ // Synchronous operation not supported
+ DCHECK(callback);
+ DCHECK(buf_len > 0);
+
+ int nread = read(socket_, buf, buf_len);
+ if (nread > 0) {
+ return nread;
+ }
+ if (nread == -1 && errno != EWOULDBLOCK)
+ return MapPosixError(errno);
+
+ MessageLoopForIO::current()->WatchSocket(
+ socket_, EV_READ|EV_PERSIST, event_.get(), this);
+
+ buf_ = buf;
+ buf_len_ = buf_len;
+ wait_state_ = WAITING_READ;
+ callback_ = callback;
+ return ERR_IO_PENDING;
+}
+
+int TCPClientSocket::Write(const char* buf,
+ int buf_len,
+ CompletionCallback* callback) {
+ DCHECK(socket_ != kInvalidSocket);
+ DCHECK(wait_state_ == NOT_WAITING);
+ DCHECK(!callback_);
+ // Synchronous operation not supported
+ DCHECK(callback);
+ DCHECK(buf_len > 0);
+
+ int nwrite = write(socket_, buf, buf_len);
+ if (nwrite > 0) {
+ return nwrite;
+ }
+ if (nwrite == -1 && errno != EWOULDBLOCK)
+ return MapPosixError(errno);
+
+ MessageLoopForIO::current()->WatchSocket(
+ socket_, EV_WRITE|EV_PERSIST, event_.get(), this);
+
+ buf_ = const_cast<char*>(buf);
+ buf_len_ = buf_len;
+ wait_state_ = WAITING_WRITE;
+ callback_ = callback;
+ return ERR_IO_PENDING;
+}
+
+int TCPClientSocket::CreateSocket(const addrinfo* ai) {
+ socket_ = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
+ if (socket_ == kInvalidSocket)
+ return MapPosixError(errno);
+
+ // All our socket I/O is nonblocking
+ if (SetNonBlocking(socket_))
+ return MapPosixError(errno);
+
+ return OK;
+}
+
+void TCPClientSocket::DoCallback(int rv) {
+ DCHECK(rv != ERR_IO_PENDING);
+ DCHECK(callback_);
+
+ // since Run may result in Read being called, clear callback_ up front.
+ CompletionCallback* c = callback_;
+ callback_ = NULL;
+ c->Run(rv);
+}
+
+void TCPClientSocket::DidCompleteConnect() {
+ int result = ERR_UNEXPECTED;
+
+ wait_state_ = NOT_WAITING;
+
+ // Check to see if connect succeeded
+ int error_code = -1;
+ socklen_t len = sizeof(error_code);
+ if (getsockopt(socket_, SOL_SOCKET, SO_ERROR,
+ reinterpret_cast<char*>(&error_code), &len) < 0) {
+ result = MapPosixError(errno);
+ } else if (error_code == EINPROGRESS) {
+ result = ERR_IO_PENDING;
+ // And await next callback. Haven't seen this case yet myself.
+ } else if (current_ai_->ai_next && (
+ error_code == EADDRNOTAVAIL ||
+ error_code == EAFNOSUPPORT ||
+ error_code == ECONNREFUSED ||
+ error_code == ENETUNREACH ||
+ error_code == EHOSTUNREACH ||
+ error_code == ETIMEDOUT)) {
+ // This address failed, try next one in list.
+ const addrinfo* next = current_ai_->ai_next;
+ Disconnect();
+ current_ai_ = next;
+ result = Connect(callback_);
+ } else if (error_code) {
+ result = MapPosixError(error_code);
+ } else {
+ result = 0;
+ MessageLoopForIO::current()->UnwatchSocket(event_.get());
+ }
+
+ if (result != ERR_IO_PENDING)
+ DoCallback(result);
+}
+
+void TCPClientSocket::DidCompleteIO() {
+ int bytes_transferred;
+ switch (wait_state_) {
+ case WAITING_READ:
+ bytes_transferred = read(socket_, buf_, buf_len_);
+ break;
+ case WAITING_WRITE:
+ bytes_transferred = write(socket_, buf_, buf_len_);
+ break;
+ default:
+ NOTREACHED();
+ }
+
+ int result;
+ if (bytes_transferred > 0) {
+ result = bytes_transferred;
+ } else if (bytes_transferred == 0) {
+ // TODO(port): can we tell why it closed, and return a more informative
+ // message? And why does the unit test want to see zero?
+ //result = ERR_CONNECTION_CLOSED;
+ result = 0;
+ } else {
+ result = MapPosixError(errno);
+ }
+
+ if (result != ERR_IO_PENDING) {
+ wait_state_ = NOT_WAITING;
+ MessageLoopForIO::current()->UnwatchSocket(event_.get());
+ DoCallback(result);
+ }
+}
+
+void TCPClientSocket::OnSocketReady(short flags) {
+ switch (wait_state_) {
+ case WAITING_CONNECT:
+ DidCompleteConnect();
+ break;
+ case WAITING_READ:
+ case WAITING_WRITE:
+ DidCompleteIO();
+ break;
+ default:
+ NOTREACHED();
+ break;
+ }
+}
+
+} // namespace net
+