diff options
author | jeremy@chromium.org <jeremy@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-12-15 22:02:17 +0000 |
---|---|---|
committer | jeremy@chromium.org <jeremy@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-12-15 22:02:17 +0000 |
commit | e45e6c09f752dd23b2560cc64d990e6c03082083 (patch) | |
tree | 13b0e19b53b60f01e3528b20f74bf8173b4e4d7b /net | |
parent | bf54f6c60220a24ef1230f7c18153f2b077f5125 (diff) | |
download | chromium_src-e45e6c09f752dd23b2560cc64d990e6c03082083.zip chromium_src-e45e6c09f752dd23b2560cc64d990e6c03082083.tar.gz chromium_src-e45e6c09f752dd23b2560cc64d990e6c03082083.tar.bz2 |
message_pump_libevent refactor:
* Unify WatchSocket & WatchFileHandle.
* Better encapsulate libevent.
* Fix a bug with blocking writes in ipc_posix.cc
Review URL: http://codereview.chromium.org/13757
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@7010 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/base/listen_socket.cc | 23 | ||||
-rw-r--r-- | net/base/listen_socket.h | 25 | ||||
-rw-r--r-- | net/base/tcp_client_socket.h | 13 | ||||
-rw-r--r-- | net/base/tcp_client_socket_libevent.cc | 61 |
4 files changed, 71 insertions, 51 deletions
diff --git a/net/base/listen_socket.cc b/net/base/listen_socket.cc index 8428e06..b2185a2 100644 --- a/net/base/listen_socket.cc +++ b/net/base/listen_socket.cc @@ -12,7 +12,6 @@ #include <errno.h> #include <sys/socket.h> #include <arpa/inet.h> -#include "base/message_loop.h" #include "net/base/net_errors.h" #include "third_party/libevent/event.h" #endif @@ -30,12 +29,7 @@ const int SOCKET_ERROR = -1; const int kReadBufSize = 200; ListenSocket::ListenSocket(SOCKET s, ListenSocketDelegate *del) -#if defined(OS_WIN) : socket_(s), -#elif defined(OS_POSIX) - : event_(new event), - socket_(s), -#endif socket_delegate_(del) { #if defined(OS_WIN) socket_event_ = WSACreateEvent(); @@ -177,8 +171,7 @@ void ListenSocket::UnwatchSocket() { #if defined(OS_WIN) watcher_.StopWatching(); #elif defined(OS_POSIX) - MessageLoopForIO::current()->UnwatchSocket(event_.get()); - wait_state_ = NOT_WAITING; + watcher_.StopWatchingFileDescriptor(); #endif } @@ -187,8 +180,9 @@ void ListenSocket::WatchSocket(WaitState state) { WSAEventSelect(socket_, socket_event_, FD_ACCEPT | FD_CLOSE | FD_READ); watcher_.StartWatching(socket_event_, this); #elif defined(OS_POSIX) - MessageLoopForIO::current()->WatchSocket( - socket_, EV_READ|EV_PERSIST, event_.get(),this); + // Implicitly calls StartWatchingFileDescriptor(). + MessageLoopForIO::current()->WatchFileDescriptor( + socket_, true, MessageLoopForIO::WATCH_READ, &watcher_, this); wait_state_ = state; #endif } @@ -250,7 +244,7 @@ void ListenSocket::OnObjectSignaled(HANDLE object) { } } #elif defined(OS_POSIX) -void ListenSocket::OnSocketReady(short flags) { +void ListenSocket::OnFileCanReadWithoutBlocking(int fd) { if (wait_state_ == WAITING_ACCEPT) { Accept(); } @@ -262,4 +256,11 @@ void ListenSocket::OnSocketReady(short flags) { // TODO(erikkay): this seems to get hit multiple times after the close } } + +void ListenSocket::OnFileCanWriteWithoutBlocking(int fd) { + // MessagePumpLibevent callback, we don't listen for write events + // so we shouldn't ever reach here. + NOTREACHED(); +} + #endif diff --git a/net/base/listen_socket.h b/net/base/listen_socket.h index 2b32b5b..e405940 100644 --- a/net/base/listen_socket.h +++ b/net/base/listen_socket.h @@ -13,13 +13,14 @@ #if defined(OS_WIN) #include <winsock2.h> +#endif +#include <string> +#if defined(OS_WIN) #include "base/object_watcher.h" #elif defined(OS_POSIX) #include "base/message_loop.h" #include "net/base/net_util.h" #include "net/base/net_errors.h" -#include "third_party/libevent/event.h" -#include "base/message_pump_libevent.h" #endif #include "base/basictypes.h" @@ -35,7 +36,7 @@ class ListenSocket : public base::RefCountedThreadSafe<ListenSocket>, #if defined(OS_WIN) public base::ObjectWatcher::Delegate #elif defined(OS_POSIX) - public base::MessagePumpLibevent::Watcher + public MessageLoopForIO::Watcher #endif { public: @@ -80,11 +81,11 @@ class ListenSocket : public base::RefCountedThreadSafe<ListenSocket>, NOT_WAITING = 0, WAITING_ACCEPT = 1, WAITING_READ = 3, - WAITING_CLOSE = 4 + WAITING_CLOSE = 4 }; - // Pass any value in case of Windows, because in Windows - // we are not using state. - void WatchSocket(WaitState state); + // Pass any value in case of Windows, because in Windows + // we are not using state. + void WatchSocket(WaitState state); void UnwatchSocket(); #if defined(OS_WIN) @@ -95,17 +96,17 @@ class ListenSocket : public base::RefCountedThreadSafe<ListenSocket>, #elif defined(OS_POSIX) WaitState wait_state_; // The socket's libevent wrapper - scoped_ptr<event> event_; + MessageLoopForIO::FileDescriptorWatcher watcher_; // Called by MessagePumpLibevent when the socket is ready to do I/O - void OnSocketReady(short flags); + void OnFileCanReadWithoutBlocking(int fd); + void OnFileCanWriteWithoutBlocking(int fd); #endif - SOCKET socket_; + SOCKET socket_; ListenSocketDelegate *socket_delegate_; private: DISALLOW_EVIL_CONSTRUCTORS(ListenSocket); }; -#endif // NET_BASE_SOCKET_H_ - +#endif // NET_BASE_SOCKET_H_ diff --git a/net/base/tcp_client_socket.h b/net/base/tcp_client_socket.h index 05a433e..5fca519 100644 --- a/net/base/tcp_client_socket.h +++ b/net/base/tcp_client_socket.h @@ -14,7 +14,7 @@ struct event; // From libevent #include <sys/socket.h> // for struct sockaddr #define SOCKET int -#include "base/message_pump_libevent.h" +#include "base/message_loop.h" #endif #include "base/scoped_ptr.h" @@ -26,7 +26,7 @@ namespace net { // A client socket that uses TCP as the transport layer. // -// NOTE: The windows implementation supports half duplex only. +// NOTE: The windows implementation supports half duplex only. // Read and Write calls must not be in progress at the same time. // The libevent implementation supports full duplex because that // made it slightly easier to implement ssl. @@ -34,7 +34,7 @@ class TCPClientSocket : public ClientSocket, #if defined(OS_WIN) public base::ObjectWatcher::Delegate #elif defined(OS_POSIX) - public base::MessagePumpLibevent::Watcher + public MessageLoopForIO::Watcher #endif { public: @@ -52,7 +52,7 @@ class TCPClientSocket : public ClientSocket, virtual bool IsConnected() const; // Socket methods: - // Multiple outstanding requests are not supported. + // Multiple outstanding requests are not supported. // Full duplex mode (reading and writing at the same time) is not supported // on Windows (but is supported on Linux and Mac for ease of implementation // of SSLClientSocket) @@ -97,10 +97,11 @@ class TCPClientSocket : public ClientSocket, bool waiting_connect_; // The socket's libevent wrapper - scoped_ptr<event> event_; + MessageLoopForIO::FileDescriptorWatcher socket_watcher_; // Called by MessagePumpLibevent when the socket is ready to do I/O - void OnSocketReady(short flags); + void OnFileCanReadWithoutBlocking(int fd); + void OnFileCanWriteWithoutBlocking(int fd); // The buffer used by OnSocketReady to retry Read requests char* buf_; diff --git a/net/base/tcp_client_socket_libevent.cc b/net/base/tcp_client_socket_libevent.cc index 79909c20..7a9084b 100644 --- a/net/base/tcp_client_socket_libevent.cc +++ b/net/base/tcp_client_socket_libevent.cc @@ -68,7 +68,6 @@ TCPClientSocket::TCPClientSocket(const AddressList& addresses) addresses_(addresses), current_ai_(addresses_.head()), waiting_connect_(false), - event_(new event), write_callback_(NULL), callback_(NULL) { } @@ -106,12 +105,17 @@ int TCPClientSocket::Connect(CompletionCallback* callback) { return MapPosixError(errno); } - // Initialize event_ and link it to our MessagePump. + // Initialize socket_watcher_ and link it to our MessagePump. // POLLOUT is set if the connection is established. - // POLLIN is set if the connection fails, - // so select for both read and write. - MessageLoopForIO::current()->WatchSocket( - socket_, EV_READ|EV_WRITE|EV_PERSIST, event_.get(), this); + // POLLIN is set if the connection fails. + if (!MessageLoopForIO::current()->WatchFileDescriptor( + socket_, true, MessageLoopForIO::WATCH_WRITE, &socket_watcher_, + this)) { + DLOG(INFO) << "WatchFileDescriptor failed: " << errno; + close(socket_); + socket_ = kInvalidSocket; + return MapPosixError(errno); + } waiting_connect_ = true; callback_ = callback; @@ -127,7 +131,7 @@ void TCPClientSocket::Disconnect() { if (socket_ == kInvalidSocket) return; - MessageLoopForIO::current()->UnwatchSocket(event_.get()); + socket_watcher_.StopWatchingFileDescriptor(); close(socket_); socket_ = kInvalidSocket; waiting_connect_ = false; @@ -170,8 +174,12 @@ int TCPClientSocket::Read(char* buf, return MapPosixError(errno); } - MessageLoopForIO::current()->WatchSocket( - socket_, EV_READ|EV_PERSIST, event_.get(), this); + if (!MessageLoopForIO::current()->WatchFileDescriptor( + socket_, true, MessageLoopForIO::WATCH_READ, &socket_watcher_, this)) + { + DLOG(INFO) << "WatchFileDescriptor failed on read, errno " << errno; + return MapPosixError(errno); + } buf_ = buf; buf_len_ = buf_len; @@ -196,8 +204,13 @@ int TCPClientSocket::Write(const char* buf, if (errno != EAGAIN && errno != EWOULDBLOCK) return MapPosixError(errno); - MessageLoopForIO::current()->WatchSocket( - socket_, EV_WRITE|EV_PERSIST, event_.get(), this); + if (!MessageLoopForIO::current()->WatchFileDescriptor( + socket_, true, MessageLoopForIO::WATCH_WRITE, &socket_watcher_, this)) + { + DLOG(INFO) << "WatchFileDescriptor failed on write, errno " << errno; + return MapPosixError(errno); + } + write_buf_ = buf; write_buf_len_ = buf_len; @@ -263,12 +276,13 @@ void TCPClientSocket::DidCompleteConnect() { result = Connect(callback_); } else { result = MapPosixError(error_code); - MessageLoopForIO::current()->UnwatchSocket(event_.get()); + socket_watcher_.StopWatchingFileDescriptor(); waiting_connect_ = false; } - if (result != ERR_IO_PENDING) + if (result != ERR_IO_PENDING) { DoCallback(result); + } } void TCPClientSocket::DidCompleteRead() { @@ -285,7 +299,7 @@ void TCPClientSocket::DidCompleteRead() { if (result != ERR_IO_PENDING) { buf_ = NULL; buf_len_ = 0; - MessageLoopForIO::current()->UnwatchSocket(event_.get()); + socket_watcher_.StopWatchingFileDescriptor(); DoCallback(result); } } @@ -304,21 +318,24 @@ void TCPClientSocket::DidCompleteWrite() { if (result != ERR_IO_PENDING) { write_buf_ = NULL; write_buf_len_ = 0; - MessageLoopForIO::current()->UnwatchSocket(event_.get()); + socket_watcher_.StopWatchingFileDescriptor(); DoWriteCallback(result); } } -void TCPClientSocket::OnSocketReady(short flags) { - // the only used bits of flags are EV_READ and EV_WRITE +void TCPClientSocket::OnFileCanReadWithoutBlocking(int fd) { + // When a socket connects it signals both Read and Write, we handle + // DidCompleteConnect() in the write handler. + if (!waiting_connect_ && callback_) { + DidCompleteRead(); + } +} +void TCPClientSocket::OnFileCanWriteWithoutBlocking(int fd) { if (waiting_connect_) { DidCompleteConnect(); - } else { - if ((flags & EV_WRITE) && write_callback_) - DidCompleteWrite(); - if ((flags & EV_READ) && callback_) - DidCompleteRead(); + } else if (write_callback_) { + DidCompleteWrite(); } } |