diff options
author | ojan@google.com <ojan@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-12-12 19:55:17 +0000 |
---|---|---|
committer | ojan@google.com <ojan@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-12-12 19:55:17 +0000 |
commit | 900342a3e34354312a2ce854196e95fa8a0c07c4 (patch) | |
tree | 8d73bce7518e4077c805053cb553ef11b3b1949d | |
parent | ab844167875b2003cb44081c6b64c0097df30b48 (diff) | |
download | chromium_src-900342a3e34354312a2ce854196e95fa8a0c07c4.zip chromium_src-900342a3e34354312a2ce854196e95fa8a0c07c4.tar.gz chromium_src-900342a3e34354312a2ce854196e95fa8a0c07c4.tar.bz2 |
Reverting 6911.
Review URL: http://codereview.chromium.org/14068
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@6916 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | base/message_loop.cc | 27 | ||||
-rw-r--r-- | base/message_loop.h | 23 | ||||
-rw-r--r-- | base/message_pump_libevent.cc | 134 | ||||
-rw-r--r-- | base/message_pump_libevent.h | 91 | ||||
-rw-r--r-- | chrome/common/ipc_channel_posix.cc | 109 | ||||
-rw-r--r-- | chrome/common/ipc_channel_posix.h | 29 | ||||
-rw-r--r-- | net/base/listen_socket.cc | 23 | ||||
-rw-r--r-- | net/base/listen_socket.h | 23 | ||||
-rw-r--r-- | net/base/tcp_client_socket.h | 13 | ||||
-rw-r--r-- | net/base/tcp_client_socket_libevent.cc | 39 |
10 files changed, 250 insertions, 261 deletions
diff --git a/base/message_loop.cc b/base/message_loop.cc index 12ad3fa..898fbfa 100644 --- a/base/message_loop.cc +++ b/base/message_loop.cc @@ -593,17 +593,22 @@ bool MessageLoopForIO::WaitForIOCompletion(DWORD timeout, IOHandler* filter) { #elif defined(OS_POSIX) -bool MessageLoopForIO::WatchFileDescriptor(int fd, - bool persistent, - Mode mode, - FileDescriptorWatcher *controller, - Watcher *delegate) { - return pump_libevent()->WatchFileDescriptor( - fd, - persistent, - static_cast<base::MessagePumpLibevent::Mode>(mode), - controller, - delegate); +void MessageLoopForIO::WatchSocket(int socket, short interest_mask, + struct event* e, Watcher* watcher) { + pump_libevent()->WatchSocket(socket, interest_mask, e, watcher); } +void MessageLoopForIO::WatchFileHandle(int fd, short interest_mask, + struct event* e, FileWatcher* watcher) { + pump_libevent()->WatchFileHandle(fd, interest_mask, e, watcher); +} + + +void MessageLoopForIO::UnwatchSocket(struct event* e) { + pump_libevent()->UnwatchSocket(e); +} + +void MessageLoopForIO::UnwatchFileHandle(struct event* e) { + pump_libevent()->UnwatchFileHandle(e); +} #endif diff --git a/base/message_loop.h b/base/message_loop.h index 6cd0ef4..69db8c1 100644 --- a/base/message_loop.h +++ b/base/message_loop.h @@ -473,21 +473,14 @@ class MessageLoopForIO : public MessageLoop { #elif defined(OS_POSIX) typedef base::MessagePumpLibevent::Watcher Watcher; - typedef base::MessagePumpLibevent::FileDescriptorWatcher - FileDescriptorWatcher; - - enum Mode { - WATCH_READ = base::MessagePumpLibevent::WATCH_READ, - WATCH_WRITE = base::MessagePumpLibevent::WATCH_WRITE, - WATCH_READ_WRITE = base::MessagePumpLibevent::WATCH_READ_WRITE - }; - - // Please see MessagePumpLibevent for definition. - bool WatchFileDescriptor(int fd, - bool persistent, - Mode mode, - FileDescriptorWatcher *controller, - Watcher *delegate); + typedef base::MessagePumpLibevent::FileWatcher FileWatcher; + + // Please see MessagePumpLibevent for definitions of these methods. + void WatchSocket(int socket, short interest_mask, + struct event* e, Watcher* watcher); + void WatchFileHandle(int fd, short interest_mask, event* e, FileWatcher*); + void UnwatchSocket(struct event* e); + void UnwatchFileHandle(event* e); #endif // defined(OS_POSIX) }; diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc index 9c13110..a281980 100644 --- a/base/message_pump_libevent.cc +++ b/base/message_pump_libevent.cc @@ -15,48 +15,17 @@ namespace base { // Return 0 on success // Too small a function to bother putting in a library? -static int SetNonBlocking(int fd) { - int flags = fcntl(fd, F_GETFL, 0); - if (flags == -1) - flags = 0; - return fcntl(fd, F_SETFL, flags | O_NONBLOCK); -} - -MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() - : is_persistent_(false), - event_(NULL) { -} - -MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { - if (event_.get()) { - StopWatchingFileDescriptor(); - } -} - -void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e, - bool is_persistent) { - DCHECK(e); - - // Cleanup any old event we might have been watching. - if (event_.get()) { - StopWatchingFileDescriptor(); - } - - is_persistent = is_persistent_; - event_.reset(e); -} - -bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { - if (event_.get() == NULL) { - return true; - } - - // event_del() is a no-op of the event isn't active. - return (event_del(event_.get()) == 0); +static int SetNonBlocking(int fd) +{ + int flags = fcntl(fd, F_GETFL, 0); + if (-1 == flags) + flags = 0; + return fcntl(fd, F_SETFL, flags | O_NONBLOCK); } // Called if a byte is received on the wakeup pipe. void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { + base::MessagePumpLibevent* that = static_cast<base::MessagePumpLibevent*>(context); DCHECK(that->wakeup_pipe_out_ == socket); @@ -92,7 +61,7 @@ bool MessagePumpLibevent::Init() { wakeup_event_ = new event; event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, - OnWakeup, this); + OnWakeup, this); event_base_set(event_base_, wakeup_event_); if (event_add(wakeup_event_, 0)) @@ -108,56 +77,67 @@ MessagePumpLibevent::~MessagePumpLibevent() { event_base_free(event_base_); } -bool MessagePumpLibevent::WatchFileDescriptor(int fd, - bool persistent, - Mode mode, - FileDescriptorWatcher *controller, - Watcher *delegate) { - DCHECK(fd > 0); - DCHECK(controller); - DCHECK(delegate); - DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE); +void MessagePumpLibevent::WatchSocket(int socket, short interest_mask, + event* e, Watcher* watcher) { - int event_mask = persistent ? EV_PERSIST : 0; - if (mode == WATCH_READ || mode == WATCH_READ_WRITE) { - event_mask |= EV_READ; - } - if (mode == WATCH_WRITE || mode == WATCH_READ_WRITE) { - event_mask |= EV_WRITE; - } - - // Ownership is transferred to the controller. - scoped_ptr<event> evt(new event); - // Set current interest mask and message pump for this event. - event_set(evt.get(), fd, event_mask | EV_READ, OnLibeventNotification, - delegate); + // Set current interest mask and message pump for this event + event_set(e, socket, interest_mask, OnReadinessNotification, watcher); // Tell libevent which message pump this socket will belong to when we add it. - if (event_base_set(event_base_, evt.get()) != 0) { - return false; - } + event_base_set(event_base_, e); // Add this socket to the list of monitored sockets. - if (event_add(evt.get(), NULL) != 0) { - return false; + if (event_add(e, NULL)) + NOTREACHED(); +} + +void MessagePumpLibevent::WatchFileHandle(int fd, short interest_mask, + event* e, FileWatcher* watcher) { + // Set current interest mask and message pump for this event + if ((interest_mask & EV_READ) != 0) { + event_set(e, fd, interest_mask, OnFileReadReadinessNotification, watcher); + } else { + event_set(e, fd, interest_mask, OnFileWriteReadinessNotification, watcher); } - // Transfer ownership of e to controller. - controller->Init(evt.release(), persistent); - return true; + // Tell libevent which message pump this fd will belong to when we add it. + event_base_set(event_base_, e); + + // Add this fd to the list of monitored sockets. + if (event_add(e, NULL)) + NOTREACHED(); } +void MessagePumpLibevent::UnwatchSocket(event* e) { + // Remove this socket from the list of monitored sockets. + if (event_del(e)) + NOTREACHED(); +} + +void MessagePumpLibevent::UnwatchFileHandle(event* e) { + // Remove this fd from the list of monitored fds. + if (event_del(e)) + NOTREACHED(); +} -void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, - void* context) { +void MessagePumpLibevent::OnReadinessNotification(int socket, short flags, + void* context) { + // The given socket is ready for I/O. + // Tell the owner what kind of I/O the socket is ready for. Watcher* watcher = static_cast<Watcher*>(context); + watcher->OnSocketReady(flags); +} - if (flags & EV_WRITE) { - watcher->OnFileCanWriteWithoutBlocking(fd); - } - if (flags & EV_READ) { - watcher->OnFileCanReadWithoutBlocking(fd); - } +void MessagePumpLibevent::OnFileReadReadinessNotification(int fd, short flags, + void* context) { + FileWatcher* watcher = static_cast<FileWatcher*>(context); + watcher->OnFileReadReady(fd); +} + +void MessagePumpLibevent::OnFileWriteReadinessNotification(int fd, short flags, + void* context) { + FileWatcher* watcher = static_cast<FileWatcher*>(context); + watcher->OnFileWriteReady(fd); } // Reentrant! diff --git a/base/message_pump_libevent.h b/base/message_pump_libevent.h index 36475c1..b54bc36 100644 --- a/base/message_pump_libevent.h +++ b/base/message_pump_libevent.h @@ -6,7 +6,6 @@ #define BASE_MESSAGE_PUMP_LIBEVENT_H_ #include "base/message_pump.h" -#include "base/scoped_ptr.h" #include "base/time.h" // Declare structs we need from libevent.h rather than including it @@ -19,65 +18,46 @@ namespace base { // TODO(dkegel): add support for background file IO somehow class MessagePumpLibevent : public MessagePump { public: - - // Object returned by WatchFileDescriptor to manage further watching. - class FileDescriptorWatcher { - public: - FileDescriptorWatcher(); - ~FileDescriptorWatcher(); // Implicitly calls StopWatching. - - // NOTE: These methods aren't called StartWatching()/StopWatching() to - // avoid confusion with the win32 ObjectWatcher class. - - // Stop watching the FD, always safe to call. No-op if there's nothing - // to do. - bool StopWatchingFileDescriptor(); - - private: - // Called by MessagePumpLibevent, ownership of |e| is transferred to this - // object. - // If this FileWatcher is already watching an event, the previous event is - // terminated and cleaned up here. - void Init(event* e, bool is_persistent); - friend class MessagePumpLibevent; - - private: - bool is_persistent_; // false if this event is one-shot. - scoped_ptr<event> event_; - DISALLOW_COPY_AND_ASSIGN(FileDescriptorWatcher); - }; - - // Used with WatchFileDescptor to asynchronously monitor the I/O readiness of - // a File Descriptor. + // Used with WatchSocket to asynchronously monitor the I/O readiness of a + // socket. class Watcher { public: virtual ~Watcher() {} - // Called from MessageLoop::Run when an FD can be read from/written to - // without blocking - virtual void OnFileCanReadWithoutBlocking(int fd) = 0; - virtual void OnFileCanWriteWithoutBlocking(int fd) = 0; + // Called from MessageLoop::Run when a ready socket is detected. + virtual void OnSocketReady(short eventmask) = 0; + }; + + // Used with WatchFileHandle to monitor I/O readiness for a File Handle. + class FileWatcher { + public: + virtual ~FileWatcher() {} + // Called from MessageLoop::Run when a non-blocking read/write can be made. + virtual void OnFileReadReady(int fd) = 0; + virtual void OnFileWriteReady(int fd) = 0; }; MessagePumpLibevent(); virtual ~MessagePumpLibevent(); - enum Mode { - WATCH_READ, - WATCH_WRITE, - WATCH_READ_WRITE - }; - - // Have the current thread's message loop watch for a a situation in which - // reading/writing to the FD can be performed without Blocking. - // Callers must provide a preallocated FileDescriptorWatcher object which - // can later be used to manage the Lifetime of this event. + // Have the current thread's message loop watch for a ready socket. + // Caller must provide a struct event for this socket for libevent's use. + // The event and interest_mask fields are defined in libevent. // Returns true on success. + // TODO(dkegel): hide libevent better; abstraction still too leaky + // TODO(dkegel): better error handing // TODO(dkegel): switch to edge-triggered readiness notification - bool WatchFileDescriptor(int fd, - bool persistent, - Mode mode, - FileDescriptorWatcher *controller, - Watcher *delegate); + void WatchSocket(int socket, short interest_mask, event* e, Watcher*); + + // TODO(playmobil): Merge this with WatchSocket(). + void WatchFileHandle(int fd, short interest_mask, event* e, FileWatcher*); + + // Stop watching a socket. + // Event was previously initialized by WatchSocket. + void UnwatchSocket(event* e); + + // Stop watching a File Handle. + // Event was previously initialized by WatchFileHandle. + void UnwatchFileHandle(event* e); // MessagePump methods: virtual void Run(Delegate* delegate); @@ -103,9 +83,14 @@ class MessagePumpLibevent : public MessagePump { // readiness callbacks when a socket is ready for I/O. event_base* event_base_; - // Called by libevent to tell us a registered FD can be read/written to. - static void OnLibeventNotification(int fd, short flags, - void* context); + // Called by libevent to tell us a registered socket is ready + static void OnReadinessNotification(int socket, short flags, void* context); + + // Called by libevent to tell us a registered fd is ready. + static void OnFileReadReadinessNotification(int fd, short flags, + void* context); + static void OnFileWriteReadinessNotification(int fd, short flags, + void* context); // Unix pipe used to implement ScheduleWork() // ... callback; called by libevent inside Run() when pipe is ready to read diff --git a/chrome/common/ipc_channel_posix.cc b/chrome/common/ipc_channel_posix.cc index 74a2407..9a30f80 100644 --- a/chrome/common/ipc_channel_posix.cc +++ b/chrome/common/ipc_channel_posix.cc @@ -6,16 +6,10 @@ #include <errno.h> #include <fcntl.h> -#include <stddef.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/stat.h> -#if defined(OS_LINUX) -#include <linux/un.h> -#elif defined(OS_MACOSX) -#include <sys/un.h> -#endif - +#include <stddef.h> #include "base/logging.h" #include "base/process_util.h" @@ -24,6 +18,12 @@ #include "chrome/common/chrome_counters.h" #include "chrome/common/ipc_message_utils.h" +#if defined(OS_LINUX) +#include <linux/un.h> +#elif defined(OS_MACOSX) +#include <sys/un.h> +#endif + namespace IPC { //------------------------------------------------------------------------------ @@ -150,7 +150,9 @@ bool ClientConnectToFifo(const std::string &pipe_name, int* client_socket) { Channel::ChannelImpl::ChannelImpl(const std::wstring& channel_id, Mode mode, Listener* listener) : mode_(mode), - is_blocked_on_write_(false), + server_listen_connection_event_(new EventHolder()), + read_event_(new EventHolder()), + write_event_(new EventHolder()), message_send_bytes_written_(0), server_listen_pipe_(-1), pipe_(-1), @@ -212,22 +214,21 @@ bool Channel::ChannelImpl::Connect() { if (server_listen_pipe_ == -1) { return false; } - MessageLoopForIO::current()->WatchFileDescriptor( - server_listen_pipe_, - true, - MessageLoopForIO::WATCH_READ, - &server_listen_connection_watcher_, - this); + event *ev = &(server_listen_connection_event_->event); + MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_, + EV_READ | EV_PERSIST, + ev, + this); + server_listen_connection_event_->is_active = true; } else { if (pipe_ == -1) { return false; } - MessageLoopForIO::current()->WatchFileDescriptor( - pipe_, - true, - MessageLoopForIO::WATCH_READ, - &read_watcher_, - this); + MessageLoopForIO::current()->WatchFileHandle(pipe_, + EV_READ | EV_PERSIST, + &(read_event_->event), + this); + read_event_->is_active = true; waiting_connect_ = false; } @@ -316,7 +317,6 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() { bool Channel::ChannelImpl::ProcessOutgoingMessages() { DCHECK(!waiting_connect_); // Why are we trying to send messages if there's // no connection? - is_blocked_on_write_ = false; if (output_queue_.empty()) return true; @@ -324,6 +324,15 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() { if (pipe_ == -1) return false; + // If libevent was monitoring the socket for us (we blocked when trying to + // write a message last time), then delete the underlying libevent structure. + if (write_event_->is_active) { + // TODO(playmobil): This calls event_del(), but we can probably + // do with just calling event_add here. + MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event)); + write_event_->is_active = false; + } + // Write out all the messages we can till the write blocks or there are no // more outgoing messages. while (!output_queue_.empty()) { @@ -346,13 +355,12 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() { message_send_bytes_written_ += bytes_written; // Tell libevent to call us back once things are unblocked. - is_blocked_on_write_ = true; - MessageLoopForIO::current()->WatchFileDescriptor( - pipe_, - false, // One shot - MessageLoopForIO::WATCH_WRITE, - &write_watcher_, - this); + MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_, + EV_WRITE, + &(write_event_->event), + this); + write_event_->is_active = true; + } else { message_send_bytes_written_ = 0; @@ -383,7 +391,7 @@ bool Channel::ChannelImpl::Send(Message* message) { output_queue_.push(message); if (!waiting_connect_) { - if (!is_blocked_on_write_) { + if (!write_event_->is_active) { if (!ProcessOutgoingMessages()) return false; } @@ -393,7 +401,7 @@ bool Channel::ChannelImpl::Send(Message* message) { } // Called by libevent when we can read from th pipe without blocking. -void Channel::ChannelImpl::OnFileCanReadWithoutBlocking(int fd) { +void Channel::ChannelImpl::OnFileReadReady(int fd) { bool send_server_hello_msg = false; if (waiting_connect_ && mode_ == MODE_SERVER) { if (!ServerAcceptFifoConnection(server_listen_pipe_, &pipe_)) { @@ -402,16 +410,16 @@ void Channel::ChannelImpl::OnFileCanReadWithoutBlocking(int fd) { // No need to watch the listening socket any longer since only one client // can connect. So unregister with libevent. - server_listen_connection_watcher_.StopWatchingFileDescriptor(); + event *ev = &(server_listen_connection_event_->event); + MessageLoopForIO::current()->UnwatchFileHandle(ev); + server_listen_connection_event_->is_active = false; // Start watching our end of the socket. - MessageLoopForIO::current()->WatchFileDescriptor( - pipe_, - true, - MessageLoopForIO::WATCH_READ, - &read_watcher_, - this); - + MessageLoopForIO::current()->WatchFileHandle(pipe_, + EV_READ | EV_PERSIST, + &(read_event_->event), + this); + read_event_->is_active = true; waiting_connect_ = false; send_server_hello_msg = true; } @@ -428,14 +436,12 @@ void Channel::ChannelImpl::OnFileCanReadWithoutBlocking(int fd) { // This gives us a chance to kill the client if the incoming handshake // is invalid. if (send_server_hello_msg) { - // This should be our first write so there' sno chance we can block here... - DCHECK(is_blocked_on_write_ == false); ProcessOutgoingMessages(); } } // Called by libevent when we can write to the pipe without blocking. -void Channel::ChannelImpl::OnFileCanWriteWithoutBlocking(int fd) { +void Channel::ChannelImpl::OnFileWriteReady(int fd) { if (!ProcessOutgoingMessages()) { Close(); listener_->OnChannelError(); @@ -447,7 +453,11 @@ void Channel::ChannelImpl::Close() { // idempotent. // Unregister libevent for the listening socket and close it. - server_listen_connection_watcher_.StopWatchingFileDescriptor(); + if (server_listen_connection_event_ && + server_listen_connection_event_->is_active) { + MessageLoopForIO::current()->UnwatchFileHandle( + &(server_listen_connection_event_->event)); + } if (server_listen_pipe_ != -1) { close(server_listen_pipe_); @@ -455,13 +465,24 @@ void Channel::ChannelImpl::Close() { } // Unregister libevent for the FIFO and close it. - read_watcher_.StopWatchingFileDescriptor(); - write_watcher_.StopWatchingFileDescriptor(); + if (read_event_ && read_event_->is_active) { + MessageLoopForIO::current()->UnwatchFileHandle(&(read_event_->event)); + } + if (write_event_ && write_event_->is_active) { + MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event)); + } if (pipe_ != -1) { close(pipe_); pipe_ = -1; } + delete server_listen_connection_event_; + server_listen_connection_event_ = NULL; + delete read_event_; + read_event_ = NULL; + delete write_event_; + write_event_ = NULL; + // Unlink the FIFO unlink(pipe_name_.c_str()); diff --git a/chrome/common/ipc_channel_posix.h b/chrome/common/ipc_channel_posix.h index 9b34494..b2849dc 100644 --- a/chrome/common/ipc_channel_posix.h +++ b/chrome/common/ipc_channel_posix.h @@ -11,10 +11,11 @@ #include <string> #include "base/message_loop.h" +#include "third_party/libevent/event.h" namespace IPC { -class Channel::ChannelImpl : public MessageLoopForIO::Watcher { +class Channel::ChannelImpl : public MessageLoopForIO::FileWatcher { public: // Mirror methods of Channel, see ipc_channel.h for description. ChannelImpl(const std::wstring& channel_id, Mode mode, Listener* listener); @@ -30,19 +31,27 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher { bool ProcessIncomingMessages(); bool ProcessOutgoingMessages(); - void OnFileCanReadWithoutBlocking(int fd); - void OnFileCanWriteWithoutBlocking(int fd); + void OnFileReadReady(int fd); + void OnFileWriteReady(int fd); Mode mode_; - // After accepting one client connection on our server socket we want to - // stop listening. - MessageLoopForIO::FileDescriptorWatcher server_listen_connection_watcher_; - MessageLoopForIO::FileDescriptorWatcher read_watcher_; - MessageLoopForIO::FileDescriptorWatcher write_watcher_; + // Wrapper for Libevent event. + // TODO(playmobil): MessageLoopForIO needs to better encapsulate libevent. + struct EventHolder { + EventHolder() : is_active(false) {} + ~EventHolder() {} - // Are we currently blocked waiting for a write to complete. - bool is_blocked_on_write_; + bool is_active; + + // libevent's set functions set all the needed members of this struct, so no + // need to initialize before use. + struct event event; + }; + + EventHolder *server_listen_connection_event_; + EventHolder *read_event_; + EventHolder *write_event_; // If sending a message blocks then we use this variable // to keep track of where we are. diff --git a/net/base/listen_socket.cc b/net/base/listen_socket.cc index b2185a2..8428e06 100644 --- a/net/base/listen_socket.cc +++ b/net/base/listen_socket.cc @@ -12,6 +12,7 @@ #include <errno.h> #include <sys/socket.h> #include <arpa/inet.h> +#include "base/message_loop.h" #include "net/base/net_errors.h" #include "third_party/libevent/event.h" #endif @@ -29,7 +30,12 @@ const int SOCKET_ERROR = -1; const int kReadBufSize = 200; ListenSocket::ListenSocket(SOCKET s, ListenSocketDelegate *del) +#if defined(OS_WIN) : socket_(s), +#elif defined(OS_POSIX) + : event_(new event), + socket_(s), +#endif socket_delegate_(del) { #if defined(OS_WIN) socket_event_ = WSACreateEvent(); @@ -171,7 +177,8 @@ void ListenSocket::UnwatchSocket() { #if defined(OS_WIN) watcher_.StopWatching(); #elif defined(OS_POSIX) - watcher_.StopWatchingFileDescriptor(); + MessageLoopForIO::current()->UnwatchSocket(event_.get()); + wait_state_ = NOT_WAITING; #endif } @@ -180,9 +187,8 @@ void ListenSocket::WatchSocket(WaitState state) { WSAEventSelect(socket_, socket_event_, FD_ACCEPT | FD_CLOSE | FD_READ); watcher_.StartWatching(socket_event_, this); #elif defined(OS_POSIX) - // Implicitly calls StartWatchingFileDescriptor(). - MessageLoopForIO::current()->WatchFileDescriptor( - socket_, true, MessageLoopForIO::WATCH_READ, &watcher_, this); + MessageLoopForIO::current()->WatchSocket( + socket_, EV_READ|EV_PERSIST, event_.get(),this); wait_state_ = state; #endif } @@ -244,7 +250,7 @@ void ListenSocket::OnObjectSignaled(HANDLE object) { } } #elif defined(OS_POSIX) -void ListenSocket::OnFileCanReadWithoutBlocking(int fd) { +void ListenSocket::OnSocketReady(short flags) { if (wait_state_ == WAITING_ACCEPT) { Accept(); } @@ -256,11 +262,4 @@ void ListenSocket::OnFileCanReadWithoutBlocking(int fd) { // TODO(erikkay): this seems to get hit multiple times after the close } } - -void ListenSocket::OnFileCanWriteWithoutBlocking(int fd) { - // MessagePumpLibevent callback, we don't listen for write events - // so we shouldn't ever reach here. - NOTREACHED(); -} - #endif diff --git a/net/base/listen_socket.h b/net/base/listen_socket.h index f5f8977..2b32b5b 100644 --- a/net/base/listen_socket.h +++ b/net/base/listen_socket.h @@ -11,7 +11,6 @@ #ifndef NET_BASE_SOCKET_H_ #define NET_BASE_SOCKET_H_ -#include <string> #if defined(OS_WIN) #include <winsock2.h> #include "base/object_watcher.h" @@ -19,6 +18,8 @@ #include "base/message_loop.h" #include "net/base/net_util.h" #include "net/base/net_errors.h" +#include "third_party/libevent/event.h" +#include "base/message_pump_libevent.h" #endif #include "base/basictypes.h" @@ -34,7 +35,7 @@ class ListenSocket : public base::RefCountedThreadSafe<ListenSocket>, #if defined(OS_WIN) public base::ObjectWatcher::Delegate #elif defined(OS_POSIX) - public MessageLoopForIO::Watcher + public base::MessagePumpLibevent::Watcher #endif { public: @@ -79,11 +80,11 @@ class ListenSocket : public base::RefCountedThreadSafe<ListenSocket>, NOT_WAITING = 0, WAITING_ACCEPT = 1, WAITING_READ = 3, - WAITING_CLOSE = 4 + WAITING_CLOSE = 4 }; - // Pass any value in case of Windows, because in Windows - // we are not using state. - void WatchSocket(WaitState state); + // Pass any value in case of Windows, because in Windows + // we are not using state. + void WatchSocket(WaitState state); void UnwatchSocket(); #if defined(OS_WIN) @@ -94,17 +95,17 @@ class ListenSocket : public base::RefCountedThreadSafe<ListenSocket>, #elif defined(OS_POSIX) WaitState wait_state_; // The socket's libevent wrapper - MessageLoopForIO::FileDescriptorWatcher watcher_; + scoped_ptr<event> event_; // Called by MessagePumpLibevent when the socket is ready to do I/O - void OnFileCanReadWithoutBlocking(int fd); - void OnFileCanWriteWithoutBlocking(int fd); + void OnSocketReady(short flags); #endif - SOCKET socket_; + SOCKET socket_; ListenSocketDelegate *socket_delegate_; private: DISALLOW_EVIL_CONSTRUCTORS(ListenSocket); }; -#endif // NET_BASE_SOCKET_H_ +#endif // NET_BASE_SOCKET_H_ + diff --git a/net/base/tcp_client_socket.h b/net/base/tcp_client_socket.h index 5fca519..05a433e 100644 --- a/net/base/tcp_client_socket.h +++ b/net/base/tcp_client_socket.h @@ -14,7 +14,7 @@ struct event; // From libevent #include <sys/socket.h> // for struct sockaddr #define SOCKET int -#include "base/message_loop.h" +#include "base/message_pump_libevent.h" #endif #include "base/scoped_ptr.h" @@ -26,7 +26,7 @@ namespace net { // A client socket that uses TCP as the transport layer. // -// NOTE: The windows implementation supports half duplex only. +// NOTE: The windows implementation supports half duplex only. // Read and Write calls must not be in progress at the same time. // The libevent implementation supports full duplex because that // made it slightly easier to implement ssl. @@ -34,7 +34,7 @@ class TCPClientSocket : public ClientSocket, #if defined(OS_WIN) public base::ObjectWatcher::Delegate #elif defined(OS_POSIX) - public MessageLoopForIO::Watcher + public base::MessagePumpLibevent::Watcher #endif { public: @@ -52,7 +52,7 @@ class TCPClientSocket : public ClientSocket, virtual bool IsConnected() const; // Socket methods: - // Multiple outstanding requests are not supported. + // Multiple outstanding requests are not supported. // Full duplex mode (reading and writing at the same time) is not supported // on Windows (but is supported on Linux and Mac for ease of implementation // of SSLClientSocket) @@ -97,11 +97,10 @@ class TCPClientSocket : public ClientSocket, bool waiting_connect_; // The socket's libevent wrapper - MessageLoopForIO::FileDescriptorWatcher socket_watcher_; + scoped_ptr<event> event_; // Called by MessagePumpLibevent when the socket is ready to do I/O - void OnFileCanReadWithoutBlocking(int fd); - void OnFileCanWriteWithoutBlocking(int fd); + void OnSocketReady(short flags); // The buffer used by OnSocketReady to retry Read requests char* buf_; diff --git a/net/base/tcp_client_socket_libevent.cc b/net/base/tcp_client_socket_libevent.cc index 933e95b..79909c20 100644 --- a/net/base/tcp_client_socket_libevent.cc +++ b/net/base/tcp_client_socket_libevent.cc @@ -68,6 +68,7 @@ TCPClientSocket::TCPClientSocket(const AddressList& addresses) addresses_(addresses), current_ai_(addresses_.head()), waiting_connect_(false), + event_(new event), write_callback_(NULL), callback_(NULL) { } @@ -109,8 +110,8 @@ int TCPClientSocket::Connect(CompletionCallback* callback) { // POLLOUT is set if the connection is established. // POLLIN is set if the connection fails, // so select for both read and write. - MessageLoopForIO::current()->WatchFileDescriptor( - socket_, true, MessageLoopForIO::WATCH_READ_WRITE, &socket_watcher_, this); + MessageLoopForIO::current()->WatchSocket( + socket_, EV_READ|EV_WRITE|EV_PERSIST, event_.get(), this); waiting_connect_ = true; callback_ = callback; @@ -126,7 +127,7 @@ void TCPClientSocket::Disconnect() { if (socket_ == kInvalidSocket) return; - socket_watcher_.StopWatchingFileDescriptor(); + MessageLoopForIO::current()->UnwatchSocket(event_.get()); close(socket_); socket_ = kInvalidSocket; waiting_connect_ = false; @@ -169,8 +170,8 @@ int TCPClientSocket::Read(char* buf, return MapPosixError(errno); } - MessageLoopForIO::current()->WatchFileDescriptor( - socket_, true, MessageLoopForIO::WATCH_READ, &socket_watcher_, this); + MessageLoopForIO::current()->WatchSocket( + socket_, EV_READ|EV_PERSIST, event_.get(), this); buf_ = buf; buf_len_ = buf_len; @@ -195,9 +196,8 @@ int TCPClientSocket::Write(const char* buf, if (errno != EAGAIN && errno != EWOULDBLOCK) return MapPosixError(errno); - MessageLoopForIO::current()->WatchFileDescriptor( - socket_, true, MessageLoopForIO::WATCH_WRITE, &socket_watcher_, this); - + MessageLoopForIO::current()->WatchSocket( + socket_, EV_WRITE|EV_PERSIST, event_.get(), this); write_buf_ = buf; write_buf_len_ = buf_len; @@ -263,7 +263,7 @@ void TCPClientSocket::DidCompleteConnect() { result = Connect(callback_); } else { result = MapPosixError(error_code); - socket_watcher_.StopWatchingFileDescriptor(); + MessageLoopForIO::current()->UnwatchSocket(event_.get()); waiting_connect_ = false; } @@ -285,7 +285,7 @@ void TCPClientSocket::DidCompleteRead() { if (result != ERR_IO_PENDING) { buf_ = NULL; buf_len_ = 0; - socket_watcher_.StopWatchingFileDescriptor(); + MessageLoopForIO::current()->UnwatchSocket(event_.get()); DoCallback(result); } } @@ -304,24 +304,21 @@ void TCPClientSocket::DidCompleteWrite() { if (result != ERR_IO_PENDING) { write_buf_ = NULL; write_buf_len_ = 0; - socket_watcher_.StopWatchingFileDescriptor(); + MessageLoopForIO::current()->UnwatchSocket(event_.get()); DoWriteCallback(result); } } -void TCPClientSocket::OnFileCanReadWithoutBlocking(int fd) { - if (waiting_connect_) { - DidCompleteConnect(); - } else if (callback_) { - DidCompleteRead(); - } -} +void TCPClientSocket::OnSocketReady(short flags) { + // the only used bits of flags are EV_READ and EV_WRITE -void TCPClientSocket::OnFileCanWriteWithoutBlocking(int fd) { if (waiting_connect_) { DidCompleteConnect(); - } else if (write_callback_) { - DidCompleteWrite(); + } else { + if ((flags & EV_WRITE) && write_callback_) + DidCompleteWrite(); + if ((flags & EV_READ) && callback_) + DidCompleteRead(); } } |