diff options
author | jeremy@chromium.org <jeremy@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-12-12 19:09:47 +0000 |
---|---|---|
committer | jeremy@chromium.org <jeremy@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-12-12 19:09:47 +0000 |
commit | eb8c76326fb39a429dc8bda38da22a86bb6802b3 (patch) | |
tree | 2155abfd972894d6ca5bfdf11a8fed37da065d4c | |
parent | b3d64d5c0d00415968935942b00d37704cd7507c (diff) | |
download | chromium_src-eb8c76326fb39a429dc8bda38da22a86bb6802b3.zip chromium_src-eb8c76326fb39a429dc8bda38da22a86bb6802b3.tar.gz chromium_src-eb8c76326fb39a429dc8bda38da22a86bb6802b3.tar.bz2 |
message_pump_libevent refactor:
* Unify WatchSocket & WatchFileHandle.
* Better encapsulate libevent.
* Fix a bug with blocking writes in ipc_posix.cc
Review URL: http://codereview.chromium.org/13757
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@6911 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | base/message_loop.cc | 27 | ||||
-rw-r--r-- | base/message_loop.h | 23 | ||||
-rw-r--r-- | base/message_pump_libevent.cc | 134 | ||||
-rw-r--r-- | base/message_pump_libevent.h | 91 | ||||
-rw-r--r-- | chrome/common/ipc_channel_posix.cc | 109 | ||||
-rw-r--r-- | chrome/common/ipc_channel_posix.h | 29 | ||||
-rw-r--r-- | net/base/listen_socket.cc | 23 | ||||
-rw-r--r-- | net/base/listen_socket.h | 23 | ||||
-rw-r--r-- | net/base/tcp_client_socket.h | 13 | ||||
-rw-r--r-- | net/base/tcp_client_socket_libevent.cc | 39 |
10 files changed, 261 insertions, 250 deletions
diff --git a/base/message_loop.cc b/base/message_loop.cc index 898fbfa..12ad3fa 100644 --- a/base/message_loop.cc +++ b/base/message_loop.cc @@ -593,22 +593,17 @@ bool MessageLoopForIO::WaitForIOCompletion(DWORD timeout, IOHandler* filter) { #elif defined(OS_POSIX) -void MessageLoopForIO::WatchSocket(int socket, short interest_mask, - struct event* e, Watcher* watcher) { - pump_libevent()->WatchSocket(socket, interest_mask, e, watcher); +bool MessageLoopForIO::WatchFileDescriptor(int fd, + bool persistent, + Mode mode, + FileDescriptorWatcher *controller, + Watcher *delegate) { + return pump_libevent()->WatchFileDescriptor( + fd, + persistent, + static_cast<base::MessagePumpLibevent::Mode>(mode), + controller, + delegate); } -void MessageLoopForIO::WatchFileHandle(int fd, short interest_mask, - struct event* e, FileWatcher* watcher) { - pump_libevent()->WatchFileHandle(fd, interest_mask, e, watcher); -} - - -void MessageLoopForIO::UnwatchSocket(struct event* e) { - pump_libevent()->UnwatchSocket(e); -} - -void MessageLoopForIO::UnwatchFileHandle(struct event* e) { - pump_libevent()->UnwatchFileHandle(e); -} #endif diff --git a/base/message_loop.h b/base/message_loop.h index 69db8c1..6cd0ef4 100644 --- a/base/message_loop.h +++ b/base/message_loop.h @@ -473,14 +473,21 @@ class MessageLoopForIO : public MessageLoop { #elif defined(OS_POSIX) typedef base::MessagePumpLibevent::Watcher Watcher; - typedef base::MessagePumpLibevent::FileWatcher FileWatcher; - - // Please see MessagePumpLibevent for definitions of these methods. - void WatchSocket(int socket, short interest_mask, - struct event* e, Watcher* watcher); - void WatchFileHandle(int fd, short interest_mask, event* e, FileWatcher*); - void UnwatchSocket(struct event* e); - void UnwatchFileHandle(event* e); + typedef base::MessagePumpLibevent::FileDescriptorWatcher + FileDescriptorWatcher; + + enum Mode { + WATCH_READ = base::MessagePumpLibevent::WATCH_READ, + WATCH_WRITE = base::MessagePumpLibevent::WATCH_WRITE, + WATCH_READ_WRITE = base::MessagePumpLibevent::WATCH_READ_WRITE + }; + + // Please see MessagePumpLibevent for definition. + bool WatchFileDescriptor(int fd, + bool persistent, + Mode mode, + FileDescriptorWatcher *controller, + Watcher *delegate); #endif // defined(OS_POSIX) }; diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc index a281980..9c13110 100644 --- a/base/message_pump_libevent.cc +++ b/base/message_pump_libevent.cc @@ -15,17 +15,48 @@ namespace base { // Return 0 on success // Too small a function to bother putting in a library? -static int SetNonBlocking(int fd) -{ - int flags = fcntl(fd, F_GETFL, 0); - if (-1 == flags) - flags = 0; - return fcntl(fd, F_SETFL, flags | O_NONBLOCK); +static int SetNonBlocking(int fd) { + int flags = fcntl(fd, F_GETFL, 0); + if (flags == -1) + flags = 0; + return fcntl(fd, F_SETFL, flags | O_NONBLOCK); +} + +MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() + : is_persistent_(false), + event_(NULL) { +} + +MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { + if (event_.get()) { + StopWatchingFileDescriptor(); + } +} + +void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e, + bool is_persistent) { + DCHECK(e); + + // Cleanup any old event we might have been watching. + if (event_.get()) { + StopWatchingFileDescriptor(); + } + + is_persistent = is_persistent_; + event_.reset(e); +} + +bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { + if (event_.get() == NULL) { + return true; + } + + // event_del() is a no-op of the event isn't active. + return (event_del(event_.get()) == 0); } // Called if a byte is received on the wakeup pipe. void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { - base::MessagePumpLibevent* that = static_cast<base::MessagePumpLibevent*>(context); DCHECK(that->wakeup_pipe_out_ == socket); @@ -61,7 +92,7 @@ bool MessagePumpLibevent::Init() { wakeup_event_ = new event; event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, - OnWakeup, this); + OnWakeup, this); event_base_set(event_base_, wakeup_event_); if (event_add(wakeup_event_, 0)) @@ -77,67 +108,56 @@ MessagePumpLibevent::~MessagePumpLibevent() { event_base_free(event_base_); } -void MessagePumpLibevent::WatchSocket(int socket, short interest_mask, - event* e, Watcher* watcher) { +bool MessagePumpLibevent::WatchFileDescriptor(int fd, + bool persistent, + Mode mode, + FileDescriptorWatcher *controller, + Watcher *delegate) { + DCHECK(fd > 0); + DCHECK(controller); + DCHECK(delegate); + DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE); - // Set current interest mask and message pump for this event - event_set(e, socket, interest_mask, OnReadinessNotification, watcher); + int event_mask = persistent ? EV_PERSIST : 0; + if (mode == WATCH_READ || mode == WATCH_READ_WRITE) { + event_mask |= EV_READ; + } + if (mode == WATCH_WRITE || mode == WATCH_READ_WRITE) { + event_mask |= EV_WRITE; + } + + // Ownership is transferred to the controller. + scoped_ptr<event> evt(new event); + // Set current interest mask and message pump for this event. + event_set(evt.get(), fd, event_mask | EV_READ, OnLibeventNotification, + delegate); // Tell libevent which message pump this socket will belong to when we add it. - event_base_set(event_base_, e); + if (event_base_set(event_base_, evt.get()) != 0) { + return false; + } // Add this socket to the list of monitored sockets. - if (event_add(e, NULL)) - NOTREACHED(); -} - -void MessagePumpLibevent::WatchFileHandle(int fd, short interest_mask, - event* e, FileWatcher* watcher) { - // Set current interest mask and message pump for this event - if ((interest_mask & EV_READ) != 0) { - event_set(e, fd, interest_mask, OnFileReadReadinessNotification, watcher); - } else { - event_set(e, fd, interest_mask, OnFileWriteReadinessNotification, watcher); + if (event_add(evt.get(), NULL) != 0) { + return false; } - // Tell libevent which message pump this fd will belong to when we add it. - event_base_set(event_base_, e); - - // Add this fd to the list of monitored sockets. - if (event_add(e, NULL)) - NOTREACHED(); -} - -void MessagePumpLibevent::UnwatchSocket(event* e) { - // Remove this socket from the list of monitored sockets. - if (event_del(e)) - NOTREACHED(); + // Transfer ownership of e to controller. + controller->Init(evt.release(), persistent); + return true; } -void MessagePumpLibevent::UnwatchFileHandle(event* e) { - // Remove this fd from the list of monitored fds. - if (event_del(e)) - NOTREACHED(); -} -void MessagePumpLibevent::OnReadinessNotification(int socket, short flags, - void* context) { - // The given socket is ready for I/O. - // Tell the owner what kind of I/O the socket is ready for. +void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, + void* context) { Watcher* watcher = static_cast<Watcher*>(context); - watcher->OnSocketReady(flags); -} -void MessagePumpLibevent::OnFileReadReadinessNotification(int fd, short flags, - void* context) { - FileWatcher* watcher = static_cast<FileWatcher*>(context); - watcher->OnFileReadReady(fd); -} - -void MessagePumpLibevent::OnFileWriteReadinessNotification(int fd, short flags, - void* context) { - FileWatcher* watcher = static_cast<FileWatcher*>(context); - watcher->OnFileWriteReady(fd); + if (flags & EV_WRITE) { + watcher->OnFileCanWriteWithoutBlocking(fd); + } + if (flags & EV_READ) { + watcher->OnFileCanReadWithoutBlocking(fd); + } } // Reentrant! diff --git a/base/message_pump_libevent.h b/base/message_pump_libevent.h index b54bc36..36475c1 100644 --- a/base/message_pump_libevent.h +++ b/base/message_pump_libevent.h @@ -6,6 +6,7 @@ #define BASE_MESSAGE_PUMP_LIBEVENT_H_ #include "base/message_pump.h" +#include "base/scoped_ptr.h" #include "base/time.h" // Declare structs we need from libevent.h rather than including it @@ -18,46 +19,65 @@ namespace base { // TODO(dkegel): add support for background file IO somehow class MessagePumpLibevent : public MessagePump { public: - // Used with WatchSocket to asynchronously monitor the I/O readiness of a - // socket. - class Watcher { - public: - virtual ~Watcher() {} - // Called from MessageLoop::Run when a ready socket is detected. - virtual void OnSocketReady(short eventmask) = 0; + + // Object returned by WatchFileDescriptor to manage further watching. + class FileDescriptorWatcher { + public: + FileDescriptorWatcher(); + ~FileDescriptorWatcher(); // Implicitly calls StopWatching. + + // NOTE: These methods aren't called StartWatching()/StopWatching() to + // avoid confusion with the win32 ObjectWatcher class. + + // Stop watching the FD, always safe to call. No-op if there's nothing + // to do. + bool StopWatchingFileDescriptor(); + + private: + // Called by MessagePumpLibevent, ownership of |e| is transferred to this + // object. + // If this FileWatcher is already watching an event, the previous event is + // terminated and cleaned up here. + void Init(event* e, bool is_persistent); + friend class MessagePumpLibevent; + + private: + bool is_persistent_; // false if this event is one-shot. + scoped_ptr<event> event_; + DISALLOW_COPY_AND_ASSIGN(FileDescriptorWatcher); }; - // Used with WatchFileHandle to monitor I/O readiness for a File Handle. - class FileWatcher { + // Used with WatchFileDescptor to asynchronously monitor the I/O readiness of + // a File Descriptor. + class Watcher { public: - virtual ~FileWatcher() {} - // Called from MessageLoop::Run when a non-blocking read/write can be made. - virtual void OnFileReadReady(int fd) = 0; - virtual void OnFileWriteReady(int fd) = 0; + virtual ~Watcher() {} + // Called from MessageLoop::Run when an FD can be read from/written to + // without blocking + virtual void OnFileCanReadWithoutBlocking(int fd) = 0; + virtual void OnFileCanWriteWithoutBlocking(int fd) = 0; }; MessagePumpLibevent(); virtual ~MessagePumpLibevent(); - // Have the current thread's message loop watch for a ready socket. - // Caller must provide a struct event for this socket for libevent's use. - // The event and interest_mask fields are defined in libevent. + enum Mode { + WATCH_READ, + WATCH_WRITE, + WATCH_READ_WRITE + }; + + // Have the current thread's message loop watch for a a situation in which + // reading/writing to the FD can be performed without Blocking. + // Callers must provide a preallocated FileDescriptorWatcher object which + // can later be used to manage the Lifetime of this event. // Returns true on success. - // TODO(dkegel): hide libevent better; abstraction still too leaky - // TODO(dkegel): better error handing // TODO(dkegel): switch to edge-triggered readiness notification - void WatchSocket(int socket, short interest_mask, event* e, Watcher*); - - // TODO(playmobil): Merge this with WatchSocket(). - void WatchFileHandle(int fd, short interest_mask, event* e, FileWatcher*); - - // Stop watching a socket. - // Event was previously initialized by WatchSocket. - void UnwatchSocket(event* e); - - // Stop watching a File Handle. - // Event was previously initialized by WatchFileHandle. - void UnwatchFileHandle(event* e); + bool WatchFileDescriptor(int fd, + bool persistent, + Mode mode, + FileDescriptorWatcher *controller, + Watcher *delegate); // MessagePump methods: virtual void Run(Delegate* delegate); @@ -83,14 +103,9 @@ class MessagePumpLibevent : public MessagePump { // readiness callbacks when a socket is ready for I/O. event_base* event_base_; - // Called by libevent to tell us a registered socket is ready - static void OnReadinessNotification(int socket, short flags, void* context); - - // Called by libevent to tell us a registered fd is ready. - static void OnFileReadReadinessNotification(int fd, short flags, - void* context); - static void OnFileWriteReadinessNotification(int fd, short flags, - void* context); + // Called by libevent to tell us a registered FD can be read/written to. + static void OnLibeventNotification(int fd, short flags, + void* context); // Unix pipe used to implement ScheduleWork() // ... callback; called by libevent inside Run() when pipe is ready to read diff --git a/chrome/common/ipc_channel_posix.cc b/chrome/common/ipc_channel_posix.cc index 9a30f80..74a2407 100644 --- a/chrome/common/ipc_channel_posix.cc +++ b/chrome/common/ipc_channel_posix.cc @@ -6,10 +6,16 @@ #include <errno.h> #include <fcntl.h> +#include <stddef.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/stat.h> -#include <stddef.h> +#if defined(OS_LINUX) +#include <linux/un.h> +#elif defined(OS_MACOSX) +#include <sys/un.h> +#endif + #include "base/logging.h" #include "base/process_util.h" @@ -18,12 +24,6 @@ #include "chrome/common/chrome_counters.h" #include "chrome/common/ipc_message_utils.h" -#if defined(OS_LINUX) -#include <linux/un.h> -#elif defined(OS_MACOSX) -#include <sys/un.h> -#endif - namespace IPC { //------------------------------------------------------------------------------ @@ -150,9 +150,7 @@ bool ClientConnectToFifo(const std::string &pipe_name, int* client_socket) { Channel::ChannelImpl::ChannelImpl(const std::wstring& channel_id, Mode mode, Listener* listener) : mode_(mode), - server_listen_connection_event_(new EventHolder()), - read_event_(new EventHolder()), - write_event_(new EventHolder()), + is_blocked_on_write_(false), message_send_bytes_written_(0), server_listen_pipe_(-1), pipe_(-1), @@ -214,21 +212,22 @@ bool Channel::ChannelImpl::Connect() { if (server_listen_pipe_ == -1) { return false; } - event *ev = &(server_listen_connection_event_->event); - MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_, - EV_READ | EV_PERSIST, - ev, - this); - server_listen_connection_event_->is_active = true; + MessageLoopForIO::current()->WatchFileDescriptor( + server_listen_pipe_, + true, + MessageLoopForIO::WATCH_READ, + &server_listen_connection_watcher_, + this); } else { if (pipe_ == -1) { return false; } - MessageLoopForIO::current()->WatchFileHandle(pipe_, - EV_READ | EV_PERSIST, - &(read_event_->event), - this); - read_event_->is_active = true; + MessageLoopForIO::current()->WatchFileDescriptor( + pipe_, + true, + MessageLoopForIO::WATCH_READ, + &read_watcher_, + this); waiting_connect_ = false; } @@ -317,6 +316,7 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() { bool Channel::ChannelImpl::ProcessOutgoingMessages() { DCHECK(!waiting_connect_); // Why are we trying to send messages if there's // no connection? + is_blocked_on_write_ = false; if (output_queue_.empty()) return true; @@ -324,15 +324,6 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() { if (pipe_ == -1) return false; - // If libevent was monitoring the socket for us (we blocked when trying to - // write a message last time), then delete the underlying libevent structure. - if (write_event_->is_active) { - // TODO(playmobil): This calls event_del(), but we can probably - // do with just calling event_add here. - MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event)); - write_event_->is_active = false; - } - // Write out all the messages we can till the write blocks or there are no // more outgoing messages. while (!output_queue_.empty()) { @@ -355,12 +346,13 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() { message_send_bytes_written_ += bytes_written; // Tell libevent to call us back once things are unblocked. - MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_, - EV_WRITE, - &(write_event_->event), - this); - write_event_->is_active = true; - + is_blocked_on_write_ = true; + MessageLoopForIO::current()->WatchFileDescriptor( + pipe_, + false, // One shot + MessageLoopForIO::WATCH_WRITE, + &write_watcher_, + this); } else { message_send_bytes_written_ = 0; @@ -391,7 +383,7 @@ bool Channel::ChannelImpl::Send(Message* message) { output_queue_.push(message); if (!waiting_connect_) { - if (!write_event_->is_active) { + if (!is_blocked_on_write_) { if (!ProcessOutgoingMessages()) return false; } @@ -401,7 +393,7 @@ bool Channel::ChannelImpl::Send(Message* message) { } // Called by libevent when we can read from th pipe without blocking. -void Channel::ChannelImpl::OnFileReadReady(int fd) { +void Channel::ChannelImpl::OnFileCanReadWithoutBlocking(int fd) { bool send_server_hello_msg = false; if (waiting_connect_ && mode_ == MODE_SERVER) { if (!ServerAcceptFifoConnection(server_listen_pipe_, &pipe_)) { @@ -410,16 +402,16 @@ void Channel::ChannelImpl::OnFileReadReady(int fd) { // No need to watch the listening socket any longer since only one client // can connect. So unregister with libevent. - event *ev = &(server_listen_connection_event_->event); - MessageLoopForIO::current()->UnwatchFileHandle(ev); - server_listen_connection_event_->is_active = false; + server_listen_connection_watcher_.StopWatchingFileDescriptor(); // Start watching our end of the socket. - MessageLoopForIO::current()->WatchFileHandle(pipe_, - EV_READ | EV_PERSIST, - &(read_event_->event), - this); - read_event_->is_active = true; + MessageLoopForIO::current()->WatchFileDescriptor( + pipe_, + true, + MessageLoopForIO::WATCH_READ, + &read_watcher_, + this); + waiting_connect_ = false; send_server_hello_msg = true; } @@ -436,12 +428,14 @@ void Channel::ChannelImpl::OnFileReadReady(int fd) { // This gives us a chance to kill the client if the incoming handshake // is invalid. if (send_server_hello_msg) { + // This should be our first write so there' sno chance we can block here... + DCHECK(is_blocked_on_write_ == false); ProcessOutgoingMessages(); } } // Called by libevent when we can write to the pipe without blocking. -void Channel::ChannelImpl::OnFileWriteReady(int fd) { +void Channel::ChannelImpl::OnFileCanWriteWithoutBlocking(int fd) { if (!ProcessOutgoingMessages()) { Close(); listener_->OnChannelError(); @@ -453,11 +447,7 @@ void Channel::ChannelImpl::Close() { // idempotent. // Unregister libevent for the listening socket and close it. - if (server_listen_connection_event_ && - server_listen_connection_event_->is_active) { - MessageLoopForIO::current()->UnwatchFileHandle( - &(server_listen_connection_event_->event)); - } + server_listen_connection_watcher_.StopWatchingFileDescriptor(); if (server_listen_pipe_ != -1) { close(server_listen_pipe_); @@ -465,24 +455,13 @@ void Channel::ChannelImpl::Close() { } // Unregister libevent for the FIFO and close it. - if (read_event_ && read_event_->is_active) { - MessageLoopForIO::current()->UnwatchFileHandle(&(read_event_->event)); - } - if (write_event_ && write_event_->is_active) { - MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event)); - } + read_watcher_.StopWatchingFileDescriptor(); + write_watcher_.StopWatchingFileDescriptor(); if (pipe_ != -1) { close(pipe_); pipe_ = -1; } - delete server_listen_connection_event_; - server_listen_connection_event_ = NULL; - delete read_event_; - read_event_ = NULL; - delete write_event_; - write_event_ = NULL; - // Unlink the FIFO unlink(pipe_name_.c_str()); diff --git a/chrome/common/ipc_channel_posix.h b/chrome/common/ipc_channel_posix.h index b2849dc..9b34494 100644 --- a/chrome/common/ipc_channel_posix.h +++ b/chrome/common/ipc_channel_posix.h @@ -11,11 +11,10 @@ #include <string> #include "base/message_loop.h" -#include "third_party/libevent/event.h" namespace IPC { -class Channel::ChannelImpl : public MessageLoopForIO::FileWatcher { +class Channel::ChannelImpl : public MessageLoopForIO::Watcher { public: // Mirror methods of Channel, see ipc_channel.h for description. ChannelImpl(const std::wstring& channel_id, Mode mode, Listener* listener); @@ -31,27 +30,19 @@ class Channel::ChannelImpl : public MessageLoopForIO::FileWatcher { bool ProcessIncomingMessages(); bool ProcessOutgoingMessages(); - void OnFileReadReady(int fd); - void OnFileWriteReady(int fd); + void OnFileCanReadWithoutBlocking(int fd); + void OnFileCanWriteWithoutBlocking(int fd); Mode mode_; - // Wrapper for Libevent event. - // TODO(playmobil): MessageLoopForIO needs to better encapsulate libevent. - struct EventHolder { - EventHolder() : is_active(false) {} - ~EventHolder() {} + // After accepting one client connection on our server socket we want to + // stop listening. + MessageLoopForIO::FileDescriptorWatcher server_listen_connection_watcher_; + MessageLoopForIO::FileDescriptorWatcher read_watcher_; + MessageLoopForIO::FileDescriptorWatcher write_watcher_; - bool is_active; - - // libevent's set functions set all the needed members of this struct, so no - // need to initialize before use. - struct event event; - }; - - EventHolder *server_listen_connection_event_; - EventHolder *read_event_; - EventHolder *write_event_; + // Are we currently blocked waiting for a write to complete. + bool is_blocked_on_write_; // If sending a message blocks then we use this variable // to keep track of where we are. diff --git a/net/base/listen_socket.cc b/net/base/listen_socket.cc index 8428e06..b2185a2 100644 --- a/net/base/listen_socket.cc +++ b/net/base/listen_socket.cc @@ -12,7 +12,6 @@ #include <errno.h> #include <sys/socket.h> #include <arpa/inet.h> -#include "base/message_loop.h" #include "net/base/net_errors.h" #include "third_party/libevent/event.h" #endif @@ -30,12 +29,7 @@ const int SOCKET_ERROR = -1; const int kReadBufSize = 200; ListenSocket::ListenSocket(SOCKET s, ListenSocketDelegate *del) -#if defined(OS_WIN) : socket_(s), -#elif defined(OS_POSIX) - : event_(new event), - socket_(s), -#endif socket_delegate_(del) { #if defined(OS_WIN) socket_event_ = WSACreateEvent(); @@ -177,8 +171,7 @@ void ListenSocket::UnwatchSocket() { #if defined(OS_WIN) watcher_.StopWatching(); #elif defined(OS_POSIX) - MessageLoopForIO::current()->UnwatchSocket(event_.get()); - wait_state_ = NOT_WAITING; + watcher_.StopWatchingFileDescriptor(); #endif } @@ -187,8 +180,9 @@ void ListenSocket::WatchSocket(WaitState state) { WSAEventSelect(socket_, socket_event_, FD_ACCEPT | FD_CLOSE | FD_READ); watcher_.StartWatching(socket_event_, this); #elif defined(OS_POSIX) - MessageLoopForIO::current()->WatchSocket( - socket_, EV_READ|EV_PERSIST, event_.get(),this); + // Implicitly calls StartWatchingFileDescriptor(). + MessageLoopForIO::current()->WatchFileDescriptor( + socket_, true, MessageLoopForIO::WATCH_READ, &watcher_, this); wait_state_ = state; #endif } @@ -250,7 +244,7 @@ void ListenSocket::OnObjectSignaled(HANDLE object) { } } #elif defined(OS_POSIX) -void ListenSocket::OnSocketReady(short flags) { +void ListenSocket::OnFileCanReadWithoutBlocking(int fd) { if (wait_state_ == WAITING_ACCEPT) { Accept(); } @@ -262,4 +256,11 @@ void ListenSocket::OnSocketReady(short flags) { // TODO(erikkay): this seems to get hit multiple times after the close } } + +void ListenSocket::OnFileCanWriteWithoutBlocking(int fd) { + // MessagePumpLibevent callback, we don't listen for write events + // so we shouldn't ever reach here. + NOTREACHED(); +} + #endif diff --git a/net/base/listen_socket.h b/net/base/listen_socket.h index 2b32b5b..f5f8977 100644 --- a/net/base/listen_socket.h +++ b/net/base/listen_socket.h @@ -11,6 +11,7 @@ #ifndef NET_BASE_SOCKET_H_ #define NET_BASE_SOCKET_H_ +#include <string> #if defined(OS_WIN) #include <winsock2.h> #include "base/object_watcher.h" @@ -18,8 +19,6 @@ #include "base/message_loop.h" #include "net/base/net_util.h" #include "net/base/net_errors.h" -#include "third_party/libevent/event.h" -#include "base/message_pump_libevent.h" #endif #include "base/basictypes.h" @@ -35,7 +34,7 @@ class ListenSocket : public base::RefCountedThreadSafe<ListenSocket>, #if defined(OS_WIN) public base::ObjectWatcher::Delegate #elif defined(OS_POSIX) - public base::MessagePumpLibevent::Watcher + public MessageLoopForIO::Watcher #endif { public: @@ -80,11 +79,11 @@ class ListenSocket : public base::RefCountedThreadSafe<ListenSocket>, NOT_WAITING = 0, WAITING_ACCEPT = 1, WAITING_READ = 3, - WAITING_CLOSE = 4 + WAITING_CLOSE = 4 }; - // Pass any value in case of Windows, because in Windows - // we are not using state. - void WatchSocket(WaitState state); + // Pass any value in case of Windows, because in Windows + // we are not using state. + void WatchSocket(WaitState state); void UnwatchSocket(); #if defined(OS_WIN) @@ -95,17 +94,17 @@ class ListenSocket : public base::RefCountedThreadSafe<ListenSocket>, #elif defined(OS_POSIX) WaitState wait_state_; // The socket's libevent wrapper - scoped_ptr<event> event_; + MessageLoopForIO::FileDescriptorWatcher watcher_; // Called by MessagePumpLibevent when the socket is ready to do I/O - void OnSocketReady(short flags); + void OnFileCanReadWithoutBlocking(int fd); + void OnFileCanWriteWithoutBlocking(int fd); #endif - SOCKET socket_; + SOCKET socket_; ListenSocketDelegate *socket_delegate_; private: DISALLOW_EVIL_CONSTRUCTORS(ListenSocket); }; -#endif // NET_BASE_SOCKET_H_ - +#endif // NET_BASE_SOCKET_H_ diff --git a/net/base/tcp_client_socket.h b/net/base/tcp_client_socket.h index 05a433e..5fca519 100644 --- a/net/base/tcp_client_socket.h +++ b/net/base/tcp_client_socket.h @@ -14,7 +14,7 @@ struct event; // From libevent #include <sys/socket.h> // for struct sockaddr #define SOCKET int -#include "base/message_pump_libevent.h" +#include "base/message_loop.h" #endif #include "base/scoped_ptr.h" @@ -26,7 +26,7 @@ namespace net { // A client socket that uses TCP as the transport layer. // -// NOTE: The windows implementation supports half duplex only. +// NOTE: The windows implementation supports half duplex only. // Read and Write calls must not be in progress at the same time. // The libevent implementation supports full duplex because that // made it slightly easier to implement ssl. @@ -34,7 +34,7 @@ class TCPClientSocket : public ClientSocket, #if defined(OS_WIN) public base::ObjectWatcher::Delegate #elif defined(OS_POSIX) - public base::MessagePumpLibevent::Watcher + public MessageLoopForIO::Watcher #endif { public: @@ -52,7 +52,7 @@ class TCPClientSocket : public ClientSocket, virtual bool IsConnected() const; // Socket methods: - // Multiple outstanding requests are not supported. + // Multiple outstanding requests are not supported. // Full duplex mode (reading and writing at the same time) is not supported // on Windows (but is supported on Linux and Mac for ease of implementation // of SSLClientSocket) @@ -97,10 +97,11 @@ class TCPClientSocket : public ClientSocket, bool waiting_connect_; // The socket's libevent wrapper - scoped_ptr<event> event_; + MessageLoopForIO::FileDescriptorWatcher socket_watcher_; // Called by MessagePumpLibevent when the socket is ready to do I/O - void OnSocketReady(short flags); + void OnFileCanReadWithoutBlocking(int fd); + void OnFileCanWriteWithoutBlocking(int fd); // The buffer used by OnSocketReady to retry Read requests char* buf_; diff --git a/net/base/tcp_client_socket_libevent.cc b/net/base/tcp_client_socket_libevent.cc index 79909c20..933e95b 100644 --- a/net/base/tcp_client_socket_libevent.cc +++ b/net/base/tcp_client_socket_libevent.cc @@ -68,7 +68,6 @@ TCPClientSocket::TCPClientSocket(const AddressList& addresses) addresses_(addresses), current_ai_(addresses_.head()), waiting_connect_(false), - event_(new event), write_callback_(NULL), callback_(NULL) { } @@ -110,8 +109,8 @@ int TCPClientSocket::Connect(CompletionCallback* callback) { // POLLOUT is set if the connection is established. // POLLIN is set if the connection fails, // so select for both read and write. - MessageLoopForIO::current()->WatchSocket( - socket_, EV_READ|EV_WRITE|EV_PERSIST, event_.get(), this); + MessageLoopForIO::current()->WatchFileDescriptor( + socket_, true, MessageLoopForIO::WATCH_READ_WRITE, &socket_watcher_, this); waiting_connect_ = true; callback_ = callback; @@ -127,7 +126,7 @@ void TCPClientSocket::Disconnect() { if (socket_ == kInvalidSocket) return; - MessageLoopForIO::current()->UnwatchSocket(event_.get()); + socket_watcher_.StopWatchingFileDescriptor(); close(socket_); socket_ = kInvalidSocket; waiting_connect_ = false; @@ -170,8 +169,8 @@ int TCPClientSocket::Read(char* buf, return MapPosixError(errno); } - MessageLoopForIO::current()->WatchSocket( - socket_, EV_READ|EV_PERSIST, event_.get(), this); + MessageLoopForIO::current()->WatchFileDescriptor( + socket_, true, MessageLoopForIO::WATCH_READ, &socket_watcher_, this); buf_ = buf; buf_len_ = buf_len; @@ -196,8 +195,9 @@ int TCPClientSocket::Write(const char* buf, if (errno != EAGAIN && errno != EWOULDBLOCK) return MapPosixError(errno); - MessageLoopForIO::current()->WatchSocket( - socket_, EV_WRITE|EV_PERSIST, event_.get(), this); + MessageLoopForIO::current()->WatchFileDescriptor( + socket_, true, MessageLoopForIO::WATCH_WRITE, &socket_watcher_, this); + write_buf_ = buf; write_buf_len_ = buf_len; @@ -263,7 +263,7 @@ void TCPClientSocket::DidCompleteConnect() { result = Connect(callback_); } else { result = MapPosixError(error_code); - MessageLoopForIO::current()->UnwatchSocket(event_.get()); + socket_watcher_.StopWatchingFileDescriptor(); waiting_connect_ = false; } @@ -285,7 +285,7 @@ void TCPClientSocket::DidCompleteRead() { if (result != ERR_IO_PENDING) { buf_ = NULL; buf_len_ = 0; - MessageLoopForIO::current()->UnwatchSocket(event_.get()); + socket_watcher_.StopWatchingFileDescriptor(); DoCallback(result); } } @@ -304,21 +304,24 @@ void TCPClientSocket::DidCompleteWrite() { if (result != ERR_IO_PENDING) { write_buf_ = NULL; write_buf_len_ = 0; - MessageLoopForIO::current()->UnwatchSocket(event_.get()); + socket_watcher_.StopWatchingFileDescriptor(); DoWriteCallback(result); } } -void TCPClientSocket::OnSocketReady(short flags) { - // the only used bits of flags are EV_READ and EV_WRITE +void TCPClientSocket::OnFileCanReadWithoutBlocking(int fd) { + if (waiting_connect_) { + DidCompleteConnect(); + } else if (callback_) { + DidCompleteRead(); + } +} +void TCPClientSocket::OnFileCanWriteWithoutBlocking(int fd) { if (waiting_connect_) { DidCompleteConnect(); - } else { - if ((flags & EV_WRITE) && write_callback_) - DidCompleteWrite(); - if ((flags & EV_READ) && callback_) - DidCompleteRead(); + } else if (write_callback_) { + DidCompleteWrite(); } } |