diff options
author | dkegel@google.com <dkegel@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-09-18 00:10:38 +0000 |
---|---|---|
committer | dkegel@google.com <dkegel@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-09-18 00:10:38 +0000 |
commit | dab8619b4848bb10dfe78a3f0c59d6e661ec9610 (patch) | |
tree | 187818b2b685f5545832b45bbd20d01b53aca6d5 | |
parent | 0fe41aecdbfd97633bda1a892b2f3efb60f1d3ed (diff) | |
download | chromium_src-dab8619b4848bb10dfe78a3f0c59d6e661ec9610.zip chromium_src-dab8619b4848bb10dfe78a3f0c59d6e661ec9610.tar.gz chromium_src-dab8619b4848bb10dfe78a3f0c59d6e661ec9610.tar.bz2 |
Make tcp_client_socket_unittest pass on Linux.
Requires another changeset that puts libevent in third_party;
I'll upload that next.
This is not the final word; it makes too many syscalls
per read. But it's a start.
Review URL: http://codereview.chromium.org/3202
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@2346 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | base/SConscript | 2 | ||||
-rw-r--r-- | base/message_loop.cc | 22 | ||||
-rw-r--r-- | base/message_loop.h | 15 | ||||
-rw-r--r-- | base/message_pump_libevent.cc | 179 | ||||
-rw-r--r-- | base/message_pump_libevent.h | 89 | ||||
-rw-r--r-- | build/SConscript.main | 5 | ||||
-rw-r--r-- | chrome/SConscript | 1 | ||||
-rw-r--r-- | net/SConscript | 4 | ||||
-rw-r--r-- | net/base/tcp_client_socket.h | 71 | ||||
-rw-r--r-- | net/base/tcp_client_socket_libevent.cc | 288 |
10 files changed, 656 insertions, 20 deletions
diff --git a/base/SConscript b/base/SConscript index 6012b38..dedcc3c 100644 --- a/base/SConscript +++ b/base/SConscript @@ -159,6 +159,7 @@ if env['PLATFORM'] == 'posix': 'base_paths_linux.cc', 'file_util_linux.cc', 'hmac_nss.cc', + 'message_pump_libevent.cc', 'nss_init.cc', 'sys_string_conversions_linux.cc', 'worker_pool.cc', @@ -190,6 +191,7 @@ env_tests.Prepend( LIBS = [ 'base', 'base_gfx', + 'event', 'gtest', 'icuuc', 'libpng', diff --git a/base/message_loop.cc b/base/message_loop.cc index 4b95532..48e05eb 100644 --- a/base/message_loop.cc +++ b/base/message_loop.cc @@ -13,6 +13,10 @@ #include "base/string_util.h" #include "base/thread_local.h" +#if defined(OS_POSIX) +#include "base/message_pump_libevent.h" +#endif + // A lazily created thread local storage for quick access to a thread's message // loop, if one exists. This should be safe and free of static constructors. static base::LazyInstance<base::ThreadLocalPointer<MessageLoop> > lazy_tls_ptr( @@ -78,6 +82,12 @@ MessageLoop::MessageLoop(Type type) } else { pump_ = new base::MessagePumpWin(); } +#elif defined(OS_POSIX) + if (type_ == TYPE_IO) { + pump_ = new base::MessagePumpLibevent(); + } else { + pump_ = new base::MessagePumpDefault(); + } #else pump_ = new base::MessagePumpDefault(); #endif @@ -561,4 +571,14 @@ void MessageLoopForIO::WatchObject(HANDLE object, Watcher* watcher) { pump_win()->WatchObject(object, watcher); } -#endif // defined(OS_WIN) +#elif defined(OS_POSIX) + +void MessageLoopForIO::WatchSocket(int socket, short interest_mask, + struct event* e, Watcher* watcher) { + pump_libevent()->WatchSocket(socket, interest_mask, e, watcher); +} + +void MessageLoopForIO::UnwatchSocket(struct event* e) { + pump_libevent()->UnwatchSocket(e); +} +#endif diff --git a/base/message_loop.h b/base/message_loop.h index a65a2e8..51093c6 100644 --- a/base/message_loop.h +++ b/base/message_loop.h @@ -21,6 +21,8 @@ // We need this to declare base::MessagePumpWin::Dispatcher, which we should // really just eliminate. #include "base/message_pump_win.h" +#elif defined(OS_POSIX) +#include "base/message_pump_libevent.h" #endif // A MessageLoop is used to process events for a particular thread. There is @@ -274,6 +276,11 @@ class MessageLoop : public base::MessagePump::Delegate { base::MessagePumpWin* pump_win() { return static_cast<base::MessagePumpWin*>(pump_.get()); } +#elif defined(OS_POSIX) + base::MessagePumpLibevent* pump_libevent() { + return static_cast<base::MessagePumpLibevent*>(pump_.get()); + } + protected: #endif // A function to encapsulate all the exception handling capability in the @@ -450,6 +457,14 @@ class MessageLoopForIO : public MessageLoop { // Please see MessagePumpWin for definitions of these methods. void WatchObject(HANDLE object, Watcher* watcher); + +#elif defined(OS_POSIX) + typedef base::MessagePumpLibevent::Watcher Watcher; + + // Please see MessagePumpLibevent for definitions of these methods. + void WatchSocket(int socket, short interest_mask, + struct event* e, Watcher* watcher); + void UnwatchSocket(struct event* e); #endif // defined(OS_WIN) }; diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc new file mode 100644 index 0000000..38e7db6 --- /dev/null +++ b/base/message_pump_libevent.cc @@ -0,0 +1,179 @@ +// Copyright (c) 2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/message_pump_libevent.h" + +#include "base/logging.h" +#include "base/time.h" +#include "third_party/libevent/event.h" + +#include <fcntl.h> + +namespace base { + +// Return 0 on success +// Too small a function to bother putting in a library? +static int SetNonBlocking(int fd) +{ + int flags = fcntl(fd, F_GETFL, 0); + if (-1 == flags) + flags = 0; + return fcntl(fd, F_SETFL, flags | O_NONBLOCK); +} + +// Called if a byte is received on the wakeup pipe. +void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { + + base::MessagePumpLibevent* that = + static_cast<base::MessagePumpLibevent*>(context); + DCHECK(that->wakeup_pipe_out_ == socket); + + // Remove and discard the wakeup byte. + char buf; + int nread = read(socket, &buf, 1); + DCHECK(nread == 1); + // Tell libevent to break out of inner loop. + event_base_loopbreak(that->event_base_); +} + +MessagePumpLibevent::MessagePumpLibevent() + : keep_running_(true), + in_run_(false), + event_base_(event_base_new()), + wakeup_pipe_in_(-1), + wakeup_pipe_out_(-1) { + if (!Init()) + NOTREACHED(); +} + +bool MessagePumpLibevent::Init() { + int fds[2]; + if (pipe(fds)) + return false; + if (SetNonBlocking(fds[0])) + return false; + if (SetNonBlocking(fds[1])) + return false; + wakeup_pipe_out_ = fds[0]; + wakeup_pipe_in_ = fds[1]; + + wakeup_event_ = new event; + event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, + OnWakeup, this); + event_base_set(event_base_, wakeup_event_); + + if (event_add(wakeup_event_, 0)) + return false; + return true; +} + +MessagePumpLibevent::~MessagePumpLibevent() { + DCHECK(wakeup_event_); + DCHECK(event_base_); + event_del(wakeup_event_); + delete wakeup_event_; + event_base_free(event_base_); +} + +void MessagePumpLibevent::WatchSocket(int socket, short interest_mask, + event* e, Watcher* watcher) { + + // Set current interest mask and message pump for this event + event_set(e, socket, interest_mask, OnReadinessNotification, watcher); + + // Tell libevent which message pump this socket will belong to when we add it. + event_base_set(event_base_, e); + + // Add this socket to the list of monitored sockets. + if (event_add(e, NULL)) + NOTREACHED(); +} + +void MessagePumpLibevent::UnwatchSocket(event* e) { + // Remove this socket from the list of monitored sockets. + if (event_del(e)) + NOTREACHED(); +} + +void MessagePumpLibevent::OnReadinessNotification(int socket, short flags, + void* context) { + // The given socket is ready for I/O. + // Tell the owner what kind of I/O the socket is ready for. + Watcher* watcher = static_cast<Watcher*>(context); + watcher->OnSocketReady(flags); +} + +// Reentrant! +void MessagePumpLibevent::Run(Delegate* delegate) { + DCHECK(keep_running_) << "Quit must have been called outside of Run!"; + + bool old_in_run = in_run_; + in_run_ = true; + + for (;;) { + bool did_work = delegate->DoWork(); + if (!keep_running_) + break; + + did_work |= delegate->DoDelayedWork(&delayed_work_time_); + if (!keep_running_) + break; + + if (did_work) + continue; + + did_work = delegate->DoIdleWork(); + if (!keep_running_) + break; + + if (did_work) + continue; + + // EVLOOP_ONCE tells libevent to only block once, + // but to service all pending events when it wakes up. + if (delayed_work_time_.is_null()) { + event_base_loop(event_base_, EVLOOP_ONCE); + } else { + TimeDelta delay = delayed_work_time_ - Time::Now(); + if (delay > TimeDelta()) { + struct timeval poll_tv; + poll_tv.tv_sec = delay.InSeconds(); + poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond; + event_base_loopexit(event_base_, &poll_tv); + event_base_loop(event_base_, EVLOOP_ONCE); + } else { + // It looks like delayed_work_time_ indicates a time in the past, so we + // need to call DoDelayedWork now. + delayed_work_time_ = Time(); + } + } + } + + keep_running_ = true; + in_run_ = old_in_run; +} + +void MessagePumpLibevent::Quit() { + DCHECK(in_run_); + // Tell both libevent and Run that they should break out of their loops. + keep_running_ = false; + ScheduleWork(); +} + +void MessagePumpLibevent::ScheduleWork() { + // Tell libevent (in a threadsafe way) that it should break out of its loop. + char buf = 0; + int nwrite = write(wakeup_pipe_in_, &buf, 1); + DCHECK(nwrite == 1); +} + +void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) { + // We know that we can't be blocked on Wait right now since this method can + // only be called on the same thread as Run, so we only need to update our + // record of how long to sleep when we do sleep. + delayed_work_time_ = delayed_work_time; +} + +} // namespace base + diff --git a/base/message_pump_libevent.h b/base/message_pump_libevent.h new file mode 100644 index 0000000..7534e50 --- /dev/null +++ b/base/message_pump_libevent.h @@ -0,0 +1,89 @@ +// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_MESSAGE_PUMP_LIBEVENT_H_ +#define BASE_MESSAGE_PUMP_LIBEVENT_H_ + +#include "base/message_pump.h" +#include "base/time.h" + +// Declare structs we need from libevent.h rather than including it +struct event_base; +struct event; + +namespace base { + +// Class to monitor sockets and issue callbacks when sockets are ready for I/O +// TODO(dkegel): add support for background file IO somehow +class MessagePumpLibevent : public MessagePump { + public: + // Used with WatchObject to asynchronously monitor the I/O readiness of a + // socket. + class Watcher { + public: + virtual ~Watcher() {} + // Called from MessageLoop::Run when a ready socket is detected. + virtual void OnSocketReady(short eventmask) = 0; + }; + + MessagePumpLibevent(); + virtual ~MessagePumpLibevent(); + + // Have the current thread's message loop watch for a ready socket. + // Caller must provide a struct event for this socket for libevent's use. + // The event and interest_mask fields are defined in libevent. + // Returns true on success. + // TODO(dkegel): hide libevent better; abstraction still too leaky + // TODO(dkegel): better error handing + // TODO(dkegel): switch to edge-triggered readiness notification + void WatchSocket(int socket, short interest_mask, event* e, Watcher*); + + // Stop watching a socket. + // Event was previously initialized by WatchSocket. + void UnwatchSocket(event* e); + + // MessagePump methods: + virtual void Run(Delegate* delegate); + virtual void Quit(); + virtual void ScheduleWork(); + virtual void ScheduleDelayedWork(const Time& delayed_work_time); + + private: + + // Risky part of constructor. Returns true on success. + bool Init(); + + // This flag is set to false when Run should return. + bool keep_running_; + + // This flag is set when inside Run. + bool in_run_; + + // The time at which we should call DoDelayedWork. + Time delayed_work_time_; + + // Libevent dispatcher. Watches all sockets registered with it, and sends + // readiness callbacks when a socket is ready for I/O. + event_base* event_base_; + + // Called by libevent to tell us a registered socket is ready + static void OnReadinessNotification(int socket, short flags, void* context); + + // Unix pipe used to implement ScheduleWork() + // ... callback; called by libevent inside Run() when pipe is ready to read + static void OnWakeup(int socket, short flags, void* context); + // ... write end; ScheduleWork() writes a single byte to it + int wakeup_pipe_in_; + // ... read end; OnWakeup reads it and then breaks Run() out of its sleep + int wakeup_pipe_out_; + // ... libevent wrapper for read end + event* wakeup_event_; + + DISALLOW_COPY_AND_ASSIGN(MessagePumpLibevent); +}; + +} // namespace base + +#endif // BASE_MESSAGE_PUMP_LIBEVENT_H_ + diff --git a/build/SConscript.main b/build/SConscript.main index 25b8fec..df6e80d 100644 --- a/build/SConscript.main +++ b/build/SConscript.main @@ -63,6 +63,7 @@ env = Environment( BSPATCH_DIR = '$THIRD_PARTY_DIR/bspatch', BZIP2_DIR = '$THIRD_PARTY_DIR/bzip2', ICU38_DIR = '$THIRD_PARTY_DIR/icu38', + LIBEVENT_DIR = '$THIRD_PARTY_DIR/libevent', LIBJPEG_DIR = '$THIRD_PARTY_DIR/libjpeg', LIBPNG_DIR = '$THIRD_PARTY_DIR/libpng', LIBXML_DIR = '$THIRD_PARTY_DIR/libxml', @@ -500,6 +501,10 @@ if LoadComponent('third_party'): '$LIBXML_DIR/SConscript', '$LIBXSLT_DIR/SConscript', ]) + if env['PLATFORM'] == 'posix': + sconscripts.extend([ + '$LIBEVENT_DIR/SConscript', + ]) # This is temporary until we get this lib to build on other platforms. if env['PLATFORM'] == 'win32': sconscripts.extend([ diff --git a/chrome/SConscript b/chrome/SConscript index 282e6c2..cef80b4c 100644 --- a/chrome/SConscript +++ b/chrome/SConscript @@ -186,6 +186,7 @@ libs = [ 'plugin/plugin.lib', 'renderer/renderer.lib', 'third_party/hunspell/hunspell.lib', + 'third_party/libevent/libevent.lib', 'third_party/sqlite/sqlite.lib', 'views/views.lib', '$V8_DIR/v8.lib', diff --git a/net/SConscript b/net/SConscript index b735894..98d0730 100644 --- a/net/SConscript +++ b/net/SConscript @@ -132,6 +132,7 @@ if env['PLATFORM'] == 'posix': input_files.extend([ # TODO(tc): gnome-vfs? xdgmime? /etc/mime.types? 'base/platform_mime_util_linux.cc', + 'base/tcp_client_socket_libevent.cc', ]) if env['PLATFORM'] in ('darwin', 'posix'): @@ -168,6 +169,7 @@ env_tests.Prepend( 'net', # net must come before base and modp_b64 'bzip2', # bzip2 must come before base 'base', + 'event', 'googleurl', 'gtest', 'icuuc', @@ -223,6 +225,7 @@ unittest_files = [ 'base/net_util_unittest.cc', 'base/registry_controlled_domain_unittest.cc', 'base/run_all_unittests.cc', + 'base/tcp_client_socket_unittest.cc', 'base/test_completion_callback_unittest.cc', 'disk_cache/addr_unittest.cc', 'disk_cache/block_files_unittest.cc', @@ -241,7 +244,6 @@ if env['PLATFORM'] == 'win32': 'base/directory_lister_unittest.cc', 'base/ssl_config_service_unittest.cc', 'base/ssl_client_socket_unittest.cc', - 'base/tcp_client_socket_unittest.cc', 'base/wininet_util_unittest.cc', 'disk_cache/backend_unittest.cc', 'http/http_cache_unittest.cc', diff --git a/net/base/tcp_client_socket.h b/net/base/tcp_client_socket.h index 06a57fe..32cd59a 100644 --- a/net/base/tcp_client_socket.h +++ b/net/base/tcp_client_socket.h @@ -5,9 +5,19 @@ #ifndef NET_BASE_TCP_CLIENT_SOCKET_H_ #define NET_BASE_TCP_CLIENT_SOCKET_H_ -#include <ws2tcpip.h> +#include "build/build_config.h" +#if defined(OS_WIN) +#include <ws2tcpip.h> #include "base/object_watcher.h" +#elif defined(OS_POSIX) +struct event; // From libevent +#define SOCKET int +#include "base/message_pump_libevent.h" +#endif + +#include "base/completion_callback.h" +#include "base/scoped_ptr.h" #include "net/base/address_list.h" #include "net/base/client_socket.h" @@ -18,7 +28,12 @@ namespace net { // NOTE: The implementation supports half duplex only. Read and Write calls // must not be in progress at the same time. class TCPClientSocket : public ClientSocket, - public base::ObjectWatcher::Delegate { +#if defined(OS_WIN) + public base::ObjectWatcher::Delegate +#elif defined(OS_POSIX) + public base::MessagePumpLibevent::Watcher +#endif +{ public: // The IP address(es) and port number to connect to. The TCP socket will try // each IP address in the list until it succeeds in establishing a @@ -34,31 +49,23 @@ class TCPClientSocket : public ClientSocket, virtual bool IsConnected() const; // Socket methods: + // Try to transfer buf_len bytes to/from socket. + // If a result is available now, return it; else call back later with one. + // Do not call again until a result is returned! + // If any bytes were transferred, the result is the byte count. + // On error, result is a negative error code; see net/base/net_error_list.h + // TODO: what would a zero return value indicate? + // TODO: support multiple outstanding requests? virtual int Read(char* buf, int buf_len, CompletionCallback* callback); virtual int Write(const char* buf, int buf_len, CompletionCallback* callback); private: - int CreateSocket(const struct addrinfo* ai); - void DoCallback(int rv); - void DidCompleteConnect(); - void DidCompleteIO(); - - // base::ObjectWatcher::Delegate methods: - virtual void OnObjectSignaled(HANDLE object); - SOCKET socket_; - OVERLAPPED overlapped_; - WSABUF buffer_; - - base::ObjectWatcher watcher_; - - CompletionCallback* callback_; // The list of addresses we should try in order to establish a connection. AddressList addresses_; - // The addrinfo that we are attempting to use or NULL if all addrinfos have - // been tried. + // Where we are in above list, or NULL if all addrinfos have been tried. const struct addrinfo* current_ai_; enum WaitState { @@ -68,6 +75,34 @@ class TCPClientSocket : public ClientSocket, WAITING_WRITE }; WaitState wait_state_; + +#if defined(OS_WIN) + // base::ObjectWatcher::Delegate methods: + virtual void OnObjectSignaled(HANDLE object); + + OVERLAPPED overlapped_; + WSABUF buffer_; + + base::ObjectWatcher watcher_; +#elif defined(OS_POSIX) + // The socket's libevent wrapper + scoped_ptr<event> event_; + + // Called by MessagePumpLibevent when the socket is ready to do I/O + void OnSocketReady(short flags); + + // The buffer used by OnSocketReady to retry Read and Write requests + char* buf_; + int buf_len_; +#endif + + // External callback; called when read or write is complete. + CompletionCallback* callback_; + + int CreateSocket(const struct addrinfo* ai); + void DoCallback(int rv); + void DidCompleteConnect(); + void DidCompleteIO(); }; } // namespace net diff --git a/net/base/tcp_client_socket_libevent.cc b/net/base/tcp_client_socket_libevent.cc new file mode 100644 index 0000000..7f815c3 --- /dev/null +++ b/net/base/tcp_client_socket_libevent.cc @@ -0,0 +1,288 @@ +// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/base/tcp_client_socket.h" + +#include <errno.h> +#include <fcntl.h> +#include <netdb.h> +#include <sys/socket.h> + +#include "base/message_loop.h" +#include "net/base/net_errors.h" +#include "third_party/libevent/event.h" + + +namespace net { + +const int kInvalidSocket = -1; + +// Return 0 on success +// Too small a function to bother putting in a library? +static int SetNonBlocking(int fd) +{ + int flags = fcntl(fd, F_GETFL, 0); + if (-1 == flags) + flags = 0; + return fcntl(fd, F_SETFL, flags | O_NONBLOCK); +} + +// Convert values from <errno.h> to values from "net/base/net_errors.h" +static int MapPosixError(int err) { + // There are numerous posix error codes, but these are the ones we thus far + // find interesting. + // TODO(port): fill this with a real conversion table + switch (err) { + case EWOULDBLOCK: return ERR_IO_PENDING; + default: + return ERR_FAILED; + } +} + +//----------------------------------------------------------------------------- + +TCPClientSocket::TCPClientSocket(const AddressList& addresses) + : socket_(kInvalidSocket), + addresses_(addresses), + current_ai_(addresses_.head()), + wait_state_(NOT_WAITING), + event_(new event) { +} + +TCPClientSocket::~TCPClientSocket() { + Disconnect(); +} + +int TCPClientSocket::Connect(CompletionCallback* callback) { + + // If already connected, then just return OK. + if (socket_ != kInvalidSocket) + return OK; + + DCHECK(wait_state_ == NOT_WAITING); + + const addrinfo* ai = current_ai_; + DCHECK(ai); + + int rv = CreateSocket(ai); + if (rv != OK) + return rv; + + if (!connect(socket_, ai->ai_addr, static_cast<int>(ai->ai_addrlen))) { + // Connected without waiting! + return OK; + } + + // Synchronous operation not supported + DCHECK(callback); + + if (errno != EINPROGRESS && errno != EWOULDBLOCK) { + LOG(ERROR) << "connect failed: " << errno; + return MapPosixError(errno); + } + + // Initialize event_ and link it to our MessagePump. + // POLLOUT is set if the connection is established. + // POLLIN is set if the connection fails, + // so select for both read and write. + MessageLoopForIO::current()->WatchSocket( + socket_, EV_READ|EV_WRITE|EV_PERSIST, event_.get(), this); + + wait_state_ = WAITING_CONNECT; + callback_ = callback; + return ERR_IO_PENDING; +} + +int TCPClientSocket::ReconnectIgnoringLastError(CompletionCallback* callback) { + // No ignorable errors! + return ERR_FAILED; +} + +void TCPClientSocket::Disconnect() { + if (socket_ == kInvalidSocket) + return; + + MessageLoopForIO::current()->UnwatchSocket(event_.get()); + close(socket_); + socket_ = kInvalidSocket; + + // Reset for next time. + current_ai_ = addresses_.head(); +} + +bool TCPClientSocket::IsConnected() const { + if (socket_ == kInvalidSocket || wait_state_ == WAITING_CONNECT) + return false; + + // Check if connection is alive. + char c; + int rv = recv(socket_, &c, 1, MSG_PEEK); + if (rv == 0) + return false; + + return true; +} + +int TCPClientSocket::Read(char* buf, + int buf_len, + CompletionCallback* callback) { + DCHECK(socket_ != kInvalidSocket); + DCHECK(wait_state_ == NOT_WAITING); + DCHECK(!callback_); + // Synchronous operation not supported + DCHECK(callback); + DCHECK(buf_len > 0); + + int nread = read(socket_, buf, buf_len); + if (nread > 0) { + return nread; + } + if (nread == -1 && errno != EWOULDBLOCK) + return MapPosixError(errno); + + MessageLoopForIO::current()->WatchSocket( + socket_, EV_READ|EV_PERSIST, event_.get(), this); + + buf_ = buf; + buf_len_ = buf_len; + wait_state_ = WAITING_READ; + callback_ = callback; + return ERR_IO_PENDING; +} + +int TCPClientSocket::Write(const char* buf, + int buf_len, + CompletionCallback* callback) { + DCHECK(socket_ != kInvalidSocket); + DCHECK(wait_state_ == NOT_WAITING); + DCHECK(!callback_); + // Synchronous operation not supported + DCHECK(callback); + DCHECK(buf_len > 0); + + int nwrite = write(socket_, buf, buf_len); + if (nwrite > 0) { + return nwrite; + } + if (nwrite == -1 && errno != EWOULDBLOCK) + return MapPosixError(errno); + + MessageLoopForIO::current()->WatchSocket( + socket_, EV_WRITE|EV_PERSIST, event_.get(), this); + + buf_ = const_cast<char*>(buf); + buf_len_ = buf_len; + wait_state_ = WAITING_WRITE; + callback_ = callback; + return ERR_IO_PENDING; +} + +int TCPClientSocket::CreateSocket(const addrinfo* ai) { + socket_ = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (socket_ == kInvalidSocket) + return MapPosixError(errno); + + // All our socket I/O is nonblocking + if (SetNonBlocking(socket_)) + return MapPosixError(errno); + + return OK; +} + +void TCPClientSocket::DoCallback(int rv) { + DCHECK(rv != ERR_IO_PENDING); + DCHECK(callback_); + + // since Run may result in Read being called, clear callback_ up front. + CompletionCallback* c = callback_; + callback_ = NULL; + c->Run(rv); +} + +void TCPClientSocket::DidCompleteConnect() { + int result = ERR_UNEXPECTED; + + wait_state_ = NOT_WAITING; + + // Check to see if connect succeeded + int error_code = -1; + socklen_t len = sizeof(error_code); + if (getsockopt(socket_, SOL_SOCKET, SO_ERROR, + reinterpret_cast<char*>(&error_code), &len) < 0) { + result = MapPosixError(errno); + } else if (error_code == EINPROGRESS) { + result = ERR_IO_PENDING; + // And await next callback. Haven't seen this case yet myself. + } else if (current_ai_->ai_next && ( + error_code == EADDRNOTAVAIL || + error_code == EAFNOSUPPORT || + error_code == ECONNREFUSED || + error_code == ENETUNREACH || + error_code == EHOSTUNREACH || + error_code == ETIMEDOUT)) { + // This address failed, try next one in list. + const addrinfo* next = current_ai_->ai_next; + Disconnect(); + current_ai_ = next; + result = Connect(callback_); + } else if (error_code) { + result = MapPosixError(error_code); + } else { + result = 0; + MessageLoopForIO::current()->UnwatchSocket(event_.get()); + } + + if (result != ERR_IO_PENDING) + DoCallback(result); +} + +void TCPClientSocket::DidCompleteIO() { + int bytes_transferred; + switch (wait_state_) { + case WAITING_READ: + bytes_transferred = read(socket_, buf_, buf_len_); + break; + case WAITING_WRITE: + bytes_transferred = write(socket_, buf_, buf_len_); + break; + default: + NOTREACHED(); + } + + int result; + if (bytes_transferred > 0) { + result = bytes_transferred; + } else if (bytes_transferred == 0) { + // TODO(port): can we tell why it closed, and return a more informative + // message? And why does the unit test want to see zero? + //result = ERR_CONNECTION_CLOSED; + result = 0; + } else { + result = MapPosixError(errno); + } + + if (result != ERR_IO_PENDING) { + wait_state_ = NOT_WAITING; + MessageLoopForIO::current()->UnwatchSocket(event_.get()); + DoCallback(result); + } +} + +void TCPClientSocket::OnSocketReady(short flags) { + switch (wait_state_) { + case WAITING_CONNECT: + DidCompleteConnect(); + break; + case WAITING_READ: + case WAITING_WRITE: + DidCompleteIO(); + break; + default: + NOTREACHED(); + break; + } +} + +} // namespace net + |