summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--base/message_loop.cc27
-rw-r--r--base/message_loop.h23
-rw-r--r--base/message_pump_libevent.cc150
-rw-r--r--base/message_pump_libevent.h97
-rw-r--r--chrome/common/ipc_channel_posix.cc123
-rw-r--r--chrome/common/ipc_channel_posix.h29
-rw-r--r--net/base/listen_socket.cc23
-rw-r--r--net/base/listen_socket.h25
-rw-r--r--net/base/tcp_client_socket.h13
-rw-r--r--net/base/tcp_client_socket_libevent.cc61
10 files changed, 310 insertions, 261 deletions
diff --git a/base/message_loop.cc b/base/message_loop.cc
index 898fbfa..12ad3fa 100644
--- a/base/message_loop.cc
+++ b/base/message_loop.cc
@@ -593,22 +593,17 @@ bool MessageLoopForIO::WaitForIOCompletion(DWORD timeout, IOHandler* filter) {
#elif defined(OS_POSIX)
-void MessageLoopForIO::WatchSocket(int socket, short interest_mask,
- struct event* e, Watcher* watcher) {
- pump_libevent()->WatchSocket(socket, interest_mask, e, watcher);
+bool MessageLoopForIO::WatchFileDescriptor(int fd,
+ bool persistent,
+ Mode mode,
+ FileDescriptorWatcher *controller,
+ Watcher *delegate) {
+ return pump_libevent()->WatchFileDescriptor(
+ fd,
+ persistent,
+ static_cast<base::MessagePumpLibevent::Mode>(mode),
+ controller,
+ delegate);
}
-void MessageLoopForIO::WatchFileHandle(int fd, short interest_mask,
- struct event* e, FileWatcher* watcher) {
- pump_libevent()->WatchFileHandle(fd, interest_mask, e, watcher);
-}
-
-
-void MessageLoopForIO::UnwatchSocket(struct event* e) {
- pump_libevent()->UnwatchSocket(e);
-}
-
-void MessageLoopForIO::UnwatchFileHandle(struct event* e) {
- pump_libevent()->UnwatchFileHandle(e);
-}
#endif
diff --git a/base/message_loop.h b/base/message_loop.h
index 69db8c1..6cd0ef4 100644
--- a/base/message_loop.h
+++ b/base/message_loop.h
@@ -473,14 +473,21 @@ class MessageLoopForIO : public MessageLoop {
#elif defined(OS_POSIX)
typedef base::MessagePumpLibevent::Watcher Watcher;
- typedef base::MessagePumpLibevent::FileWatcher FileWatcher;
-
- // Please see MessagePumpLibevent for definitions of these methods.
- void WatchSocket(int socket, short interest_mask,
- struct event* e, Watcher* watcher);
- void WatchFileHandle(int fd, short interest_mask, event* e, FileWatcher*);
- void UnwatchSocket(struct event* e);
- void UnwatchFileHandle(event* e);
+ typedef base::MessagePumpLibevent::FileDescriptorWatcher
+ FileDescriptorWatcher;
+
+ enum Mode {
+ WATCH_READ = base::MessagePumpLibevent::WATCH_READ,
+ WATCH_WRITE = base::MessagePumpLibevent::WATCH_WRITE,
+ WATCH_READ_WRITE = base::MessagePumpLibevent::WATCH_READ_WRITE
+ };
+
+ // Please see MessagePumpLibevent for definition.
+ bool WatchFileDescriptor(int fd,
+ bool persistent,
+ Mode mode,
+ FileDescriptorWatcher *controller,
+ Watcher *delegate);
#endif // defined(OS_POSIX)
};
diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc
index a281980..f31c01b 100644
--- a/base/message_pump_libevent.cc
+++ b/base/message_pump_libevent.cc
@@ -15,17 +15,48 @@ namespace base {
// Return 0 on success
// Too small a function to bother putting in a library?
-static int SetNonBlocking(int fd)
-{
- int flags = fcntl(fd, F_GETFL, 0);
- if (-1 == flags)
- flags = 0;
- return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+static int SetNonBlocking(int fd) {
+ int flags = fcntl(fd, F_GETFL, 0);
+ if (flags == -1)
+ flags = 0;
+ return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+}
+
+MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
+ : is_persistent_(false),
+ event_(NULL) {
+}
+
+MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
+ if (event_.get()) {
+ StopWatchingFileDescriptor();
+ }
+}
+
+void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e,
+ bool is_persistent) {
+ DCHECK(e);
+ DCHECK(event_.get() == NULL);
+
+ is_persistent_ = is_persistent;
+ event_.reset(e);
+}
+
+event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
+ return event_.release();
+}
+
+bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
+ if (event_.get() == NULL) {
+ return true;
+ }
+
+ // event_del() is a no-op of the event isn't active.
+ return (event_del(event_.get()) == 0);
}
// Called if a byte is received on the wakeup pipe.
void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
-
base::MessagePumpLibevent* that =
static_cast<base::MessagePumpLibevent*>(context);
DCHECK(that->wakeup_pipe_out_ == socket);
@@ -61,7 +92,7 @@ bool MessagePumpLibevent::Init() {
wakeup_event_ = new event;
event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
- OnWakeup, this);
+ OnWakeup, this);
event_base_set(event_base_, wakeup_event_);
if (event_add(wakeup_event_, 0))
@@ -77,67 +108,72 @@ MessagePumpLibevent::~MessagePumpLibevent() {
event_base_free(event_base_);
}
-void MessagePumpLibevent::WatchSocket(int socket, short interest_mask,
- event* e, Watcher* watcher) {
+bool MessagePumpLibevent::WatchFileDescriptor(int fd,
+ bool persistent,
+ Mode mode,
+ FileDescriptorWatcher *controller,
+ Watcher *delegate) {
+ DCHECK(fd > 0);
+ DCHECK(controller);
+ DCHECK(delegate);
+ DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
- // Set current interest mask and message pump for this event
- event_set(e, socket, interest_mask, OnReadinessNotification, watcher);
+ int event_mask = persistent ? EV_PERSIST : 0;
+ if ((mode & WATCH_READ) != 0) {
+ event_mask |= EV_READ;
+ }
+ if ((mode & WATCH_WRITE) != 0) {
+ event_mask |= EV_WRITE;
+ }
- // Tell libevent which message pump this socket will belong to when we add it.
- event_base_set(event_base_, e);
+ // |should_delete_event| is true if we're modifying an event that's currently
+ // active in |controller|.
+ // If we're modifying an existing event and there's an error then we need to
+ // tell libevent to clean it up via event_delete() before returning.
+ bool should_delete_event = true;
+ scoped_ptr<event> evt(controller->ReleaseEvent());
+ if (evt.get() == NULL) {
+ should_delete_event = false;
+ // Ownership is transferred to the controller.
+ evt.reset(new event);
+ }
- // Add this socket to the list of monitored sockets.
- if (event_add(e, NULL))
- NOTREACHED();
-}
+ // Set current interest mask and message pump for this event.
+ event_set(evt.get(), fd, event_mask, OnLibeventNotification,
+ delegate);
-void MessagePumpLibevent::WatchFileHandle(int fd, short interest_mask,
- event* e, FileWatcher* watcher) {
- // Set current interest mask and message pump for this event
- if ((interest_mask & EV_READ) != 0) {
- event_set(e, fd, interest_mask, OnFileReadReadinessNotification, watcher);
- } else {
- event_set(e, fd, interest_mask, OnFileWriteReadinessNotification, watcher);
+ // Tell libevent which message pump this socket will belong to when we add it.
+ if (event_base_set(event_base_, evt.get()) != 0) {
+ if (should_delete_event) {
+ event_del(evt.get());
+ }
+ return false;
}
- // Tell libevent which message pump this fd will belong to when we add it.
- event_base_set(event_base_, e);
-
- // Add this fd to the list of monitored sockets.
- if (event_add(e, NULL))
- NOTREACHED();
-}
+ // Add this socket to the list of monitored sockets.
+ if (event_add(evt.get(), NULL) != 0) {
+ if (should_delete_event) {
+ event_del(evt.get());
+ }
+ return false;
+ }
-void MessagePumpLibevent::UnwatchSocket(event* e) {
- // Remove this socket from the list of monitored sockets.
- if (event_del(e))
- NOTREACHED();
+ // Transfer ownership of e to controller.
+ controller->Init(evt.release(), persistent);
+ return true;
}
-void MessagePumpLibevent::UnwatchFileHandle(event* e) {
- // Remove this fd from the list of monitored fds.
- if (event_del(e))
- NOTREACHED();
-}
-void MessagePumpLibevent::OnReadinessNotification(int socket, short flags,
- void* context) {
- // The given socket is ready for I/O.
- // Tell the owner what kind of I/O the socket is ready for.
+void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
+ void* context) {
Watcher* watcher = static_cast<Watcher*>(context);
- watcher->OnSocketReady(flags);
-}
-void MessagePumpLibevent::OnFileReadReadinessNotification(int fd, short flags,
- void* context) {
- FileWatcher* watcher = static_cast<FileWatcher*>(context);
- watcher->OnFileReadReady(fd);
-}
-
-void MessagePumpLibevent::OnFileWriteReadinessNotification(int fd, short flags,
- void* context) {
- FileWatcher* watcher = static_cast<FileWatcher*>(context);
- watcher->OnFileWriteReady(fd);
+ if (flags & EV_WRITE) {
+ watcher->OnFileCanWriteWithoutBlocking(fd);
+ }
+ if (flags & EV_READ) {
+ watcher->OnFileCanReadWithoutBlocking(fd);
+ }
}
// Reentrant!
diff --git a/base/message_pump_libevent.h b/base/message_pump_libevent.h
index b54bc36..aa5d5f8 100644
--- a/base/message_pump_libevent.h
+++ b/base/message_pump_libevent.h
@@ -6,6 +6,7 @@
#define BASE_MESSAGE_PUMP_LIBEVENT_H_
#include "base/message_pump.h"
+#include "base/scoped_ptr.h"
#include "base/time.h"
// Declare structs we need from libevent.h rather than including it
@@ -18,46 +19,71 @@ namespace base {
// TODO(dkegel): add support for background file IO somehow
class MessagePumpLibevent : public MessagePump {
public:
- // Used with WatchSocket to asynchronously monitor the I/O readiness of a
- // socket.
- class Watcher {
- public:
- virtual ~Watcher() {}
- // Called from MessageLoop::Run when a ready socket is detected.
- virtual void OnSocketReady(short eventmask) = 0;
+
+ // Object returned by WatchFileDescriptor to manage further watching.
+ class FileDescriptorWatcher {
+ public:
+ FileDescriptorWatcher();
+ ~FileDescriptorWatcher(); // Implicitly calls StopWatchingFileDescriptor.
+
+ // NOTE: These methods aren't called StartWatching()/StopWatching() to
+ // avoid confusion with the win32 ObjectWatcher class.
+
+ // Stop watching the FD, always safe to call. No-op if there's nothing
+ // to do.
+ bool StopWatchingFileDescriptor();
+
+ private:
+ // Called by MessagePumpLibevent, ownership of |e| is transferred to this
+ // object.
+ void Init(event* e, bool is_persistent);
+
+ // Used by MessagePumpLibevent to take ownership of event_.
+ event *ReleaseEvent();
+ friend class MessagePumpLibevent;
+
+ private:
+ bool is_persistent_; // false if this event is one-shot.
+ scoped_ptr<event> event_;
+ DISALLOW_COPY_AND_ASSIGN(FileDescriptorWatcher);
};
- // Used with WatchFileHandle to monitor I/O readiness for a File Handle.
- class FileWatcher {
+ // Used with WatchFileDescptor to asynchronously monitor the I/O readiness of
+ // a File Descriptor.
+ class Watcher {
public:
- virtual ~FileWatcher() {}
- // Called from MessageLoop::Run when a non-blocking read/write can be made.
- virtual void OnFileReadReady(int fd) = 0;
- virtual void OnFileWriteReady(int fd) = 0;
+ virtual ~Watcher() {}
+ // Called from MessageLoop::Run when an FD can be read from/written to
+ // without blocking
+ virtual void OnFileCanReadWithoutBlocking(int fd) = 0;
+ virtual void OnFileCanWriteWithoutBlocking(int fd) = 0;
};
MessagePumpLibevent();
virtual ~MessagePumpLibevent();
- // Have the current thread's message loop watch for a ready socket.
- // Caller must provide a struct event for this socket for libevent's use.
- // The event and interest_mask fields are defined in libevent.
+ enum Mode {
+ WATCH_READ = 1 << 0,
+ WATCH_WRITE = 1 << 1,
+ WATCH_READ_WRITE = WATCH_READ | WATCH_WRITE
+ };
+
+ // Have the current thread's message loop watch for a a situation in which
+ // reading/writing to the FD can be performed without Blocking.
+ // Callers must provide a preallocated FileDescriptorWatcher object which
+ // can later be used to manage the Lifetime of this event.
+ // If a FileDescriptorWatcher is passed in which is already attached to
+ // an event, then the effect is cumulative i.e. after the call |controller|
+ // will watch both the previous event and the new one.
+ // If an error occurs while calling this method in a cumulative fashion, the
+ // event previously attached to |controller| is aborted.
// Returns true on success.
- // TODO(dkegel): hide libevent better; abstraction still too leaky
- // TODO(dkegel): better error handing
// TODO(dkegel): switch to edge-triggered readiness notification
- void WatchSocket(int socket, short interest_mask, event* e, Watcher*);
-
- // TODO(playmobil): Merge this with WatchSocket().
- void WatchFileHandle(int fd, short interest_mask, event* e, FileWatcher*);
-
- // Stop watching a socket.
- // Event was previously initialized by WatchSocket.
- void UnwatchSocket(event* e);
-
- // Stop watching a File Handle.
- // Event was previously initialized by WatchFileHandle.
- void UnwatchFileHandle(event* e);
+ bool WatchFileDescriptor(int fd,
+ bool persistent,
+ Mode mode,
+ FileDescriptorWatcher *controller,
+ Watcher *delegate);
// MessagePump methods:
virtual void Run(Delegate* delegate);
@@ -83,14 +109,9 @@ class MessagePumpLibevent : public MessagePump {
// readiness callbacks when a socket is ready for I/O.
event_base* event_base_;
- // Called by libevent to tell us a registered socket is ready
- static void OnReadinessNotification(int socket, short flags, void* context);
-
- // Called by libevent to tell us a registered fd is ready.
- static void OnFileReadReadinessNotification(int fd, short flags,
- void* context);
- static void OnFileWriteReadinessNotification(int fd, short flags,
- void* context);
+ // Called by libevent to tell us a registered FD can be read/written to.
+ static void OnLibeventNotification(int fd, short flags,
+ void* context);
// Unix pipe used to implement ScheduleWork()
// ... callback; called by libevent inside Run() when pipe is ready to read
diff --git a/chrome/common/ipc_channel_posix.cc b/chrome/common/ipc_channel_posix.cc
index 9a30f80..f2591f6 100644
--- a/chrome/common/ipc_channel_posix.cc
+++ b/chrome/common/ipc_channel_posix.cc
@@ -6,10 +6,16 @@
#include <errno.h>
#include <fcntl.h>
+#include <stddef.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
-#include <stddef.h>
+#if defined(OS_LINUX)
+#include <linux/un.h>
+#elif defined(OS_MACOSX)
+#include <sys/un.h>
+#endif
+
#include "base/logging.h"
#include "base/process_util.h"
@@ -18,12 +24,6 @@
#include "chrome/common/chrome_counters.h"
#include "chrome/common/ipc_message_utils.h"
-#if defined(OS_LINUX)
-#include <linux/un.h>
-#elif defined(OS_MACOSX)
-#include <sys/un.h>
-#endif
-
namespace IPC {
//------------------------------------------------------------------------------
@@ -150,16 +150,14 @@ bool ClientConnectToFifo(const std::string &pipe_name, int* client_socket) {
Channel::ChannelImpl::ChannelImpl(const std::wstring& channel_id, Mode mode,
Listener* listener)
: mode_(mode),
- server_listen_connection_event_(new EventHolder()),
- read_event_(new EventHolder()),
- write_event_(new EventHolder()),
- message_send_bytes_written_(0),
- server_listen_pipe_(-1),
- pipe_(-1),
- listener_(listener),
- waiting_connect_(true),
- processing_incoming_(false),
- factory_(this) {
+ is_blocked_on_write_(false),
+ message_send_bytes_written_(0),
+ server_listen_pipe_(-1),
+ pipe_(-1),
+ listener_(listener),
+ waiting_connect_(true),
+ processing_incoming_(false),
+ factory_(this) {
if (!CreatePipe(channel_id, mode)) {
// The pipe may have been closed already.
LOG(WARNING) << "Unable to create pipe named \"" << channel_id <<
@@ -214,21 +212,22 @@ bool Channel::ChannelImpl::Connect() {
if (server_listen_pipe_ == -1) {
return false;
}
- event *ev = &(server_listen_connection_event_->event);
- MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_,
- EV_READ | EV_PERSIST,
- ev,
- this);
- server_listen_connection_event_->is_active = true;
+ MessageLoopForIO::current()->WatchFileDescriptor(
+ server_listen_pipe_,
+ true,
+ MessageLoopForIO::WATCH_READ,
+ &server_listen_connection_watcher_,
+ this);
} else {
if (pipe_ == -1) {
return false;
}
- MessageLoopForIO::current()->WatchFileHandle(pipe_,
- EV_READ | EV_PERSIST,
- &(read_event_->event),
- this);
- read_event_->is_active = true;
+ MessageLoopForIO::current()->WatchFileDescriptor(
+ pipe_,
+ true,
+ MessageLoopForIO::WATCH_READ,
+ &read_watcher_,
+ this);
waiting_connect_ = false;
}
@@ -317,6 +316,7 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() {
bool Channel::ChannelImpl::ProcessOutgoingMessages() {
DCHECK(!waiting_connect_); // Why are we trying to send messages if there's
// no connection?
+ is_blocked_on_write_ = false;
if (output_queue_.empty())
return true;
@@ -324,15 +324,6 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() {
if (pipe_ == -1)
return false;
- // If libevent was monitoring the socket for us (we blocked when trying to
- // write a message last time), then delete the underlying libevent structure.
- if (write_event_->is_active) {
- // TODO(playmobil): This calls event_del(), but we can probably
- // do with just calling event_add here.
- MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event));
- write_event_->is_active = false;
- }
-
// Write out all the messages we can till the write blocks or there are no
// more outgoing messages.
while (!output_queue_.empty()) {
@@ -355,12 +346,13 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() {
message_send_bytes_written_ += bytes_written;
// Tell libevent to call us back once things are unblocked.
- MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_,
- EV_WRITE,
- &(write_event_->event),
- this);
- write_event_->is_active = true;
-
+ is_blocked_on_write_ = true;
+ MessageLoopForIO::current()->WatchFileDescriptor(
+ pipe_,
+ false, // One shot
+ MessageLoopForIO::WATCH_WRITE,
+ &write_watcher_,
+ this);
} else {
message_send_bytes_written_ = 0;
@@ -391,7 +383,7 @@ bool Channel::ChannelImpl::Send(Message* message) {
output_queue_.push(message);
if (!waiting_connect_) {
- if (!write_event_->is_active) {
+ if (!is_blocked_on_write_) {
if (!ProcessOutgoingMessages())
return false;
}
@@ -401,7 +393,7 @@ bool Channel::ChannelImpl::Send(Message* message) {
}
// Called by libevent when we can read from th pipe without blocking.
-void Channel::ChannelImpl::OnFileReadReady(int fd) {
+void Channel::ChannelImpl::OnFileCanReadWithoutBlocking(int fd) {
bool send_server_hello_msg = false;
if (waiting_connect_ && mode_ == MODE_SERVER) {
if (!ServerAcceptFifoConnection(server_listen_pipe_, &pipe_)) {
@@ -410,16 +402,16 @@ void Channel::ChannelImpl::OnFileReadReady(int fd) {
// No need to watch the listening socket any longer since only one client
// can connect. So unregister with libevent.
- event *ev = &(server_listen_connection_event_->event);
- MessageLoopForIO::current()->UnwatchFileHandle(ev);
- server_listen_connection_event_->is_active = false;
+ server_listen_connection_watcher_.StopWatchingFileDescriptor();
// Start watching our end of the socket.
- MessageLoopForIO::current()->WatchFileHandle(pipe_,
- EV_READ | EV_PERSIST,
- &(read_event_->event),
- this);
- read_event_->is_active = true;
+ MessageLoopForIO::current()->WatchFileDescriptor(
+ pipe_,
+ true,
+ MessageLoopForIO::WATCH_READ,
+ &read_watcher_,
+ this);
+
waiting_connect_ = false;
send_server_hello_msg = true;
}
@@ -436,12 +428,14 @@ void Channel::ChannelImpl::OnFileReadReady(int fd) {
// This gives us a chance to kill the client if the incoming handshake
// is invalid.
if (send_server_hello_msg) {
+ // This should be our first write so there' sno chance we can block here...
+ DCHECK(is_blocked_on_write_ == false);
ProcessOutgoingMessages();
}
}
// Called by libevent when we can write to the pipe without blocking.
-void Channel::ChannelImpl::OnFileWriteReady(int fd) {
+void Channel::ChannelImpl::OnFileCanWriteWithoutBlocking(int fd) {
if (!ProcessOutgoingMessages()) {
Close();
listener_->OnChannelError();
@@ -453,11 +447,7 @@ void Channel::ChannelImpl::Close() {
// idempotent.
// Unregister libevent for the listening socket and close it.
- if (server_listen_connection_event_ &&
- server_listen_connection_event_->is_active) {
- MessageLoopForIO::current()->UnwatchFileHandle(
- &(server_listen_connection_event_->event));
- }
+ server_listen_connection_watcher_.StopWatchingFileDescriptor();
if (server_listen_pipe_ != -1) {
close(server_listen_pipe_);
@@ -465,24 +455,13 @@ void Channel::ChannelImpl::Close() {
}
// Unregister libevent for the FIFO and close it.
- if (read_event_ && read_event_->is_active) {
- MessageLoopForIO::current()->UnwatchFileHandle(&(read_event_->event));
- }
- if (write_event_ && write_event_->is_active) {
- MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event));
- }
+ read_watcher_.StopWatchingFileDescriptor();
+ write_watcher_.StopWatchingFileDescriptor();
if (pipe_ != -1) {
close(pipe_);
pipe_ = -1;
}
- delete server_listen_connection_event_;
- server_listen_connection_event_ = NULL;
- delete read_event_;
- read_event_ = NULL;
- delete write_event_;
- write_event_ = NULL;
-
// Unlink the FIFO
unlink(pipe_name_.c_str());
diff --git a/chrome/common/ipc_channel_posix.h b/chrome/common/ipc_channel_posix.h
index b2849dc..e788963 100644
--- a/chrome/common/ipc_channel_posix.h
+++ b/chrome/common/ipc_channel_posix.h
@@ -11,11 +11,10 @@
#include <string>
#include "base/message_loop.h"
-#include "third_party/libevent/event.h"
namespace IPC {
-class Channel::ChannelImpl : public MessageLoopForIO::FileWatcher {
+class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
public:
// Mirror methods of Channel, see ipc_channel.h for description.
ChannelImpl(const std::wstring& channel_id, Mode mode, Listener* listener);
@@ -31,27 +30,19 @@ class Channel::ChannelImpl : public MessageLoopForIO::FileWatcher {
bool ProcessIncomingMessages();
bool ProcessOutgoingMessages();
- void OnFileReadReady(int fd);
- void OnFileWriteReady(int fd);
+ void OnFileCanReadWithoutBlocking(int fd);
+ void OnFileCanWriteWithoutBlocking(int fd);
Mode mode_;
- // Wrapper for Libevent event.
- // TODO(playmobil): MessageLoopForIO needs to better encapsulate libevent.
- struct EventHolder {
- EventHolder() : is_active(false) {}
- ~EventHolder() {}
+ // After accepting one client connection on our server socket we want to
+ // stop listening.
+ MessageLoopForIO::FileDescriptorWatcher server_listen_connection_watcher_;
+ MessageLoopForIO::FileDescriptorWatcher read_watcher_;
+ MessageLoopForIO::FileDescriptorWatcher write_watcher_;
- bool is_active;
-
- // libevent's set functions set all the needed members of this struct, so no
- // need to initialize before use.
- struct event event;
- };
-
- EventHolder *server_listen_connection_event_;
- EventHolder *read_event_;
- EventHolder *write_event_;
+ // Indicates whether we're currently blocked waiting for a write to complete.
+ bool is_blocked_on_write_;
// If sending a message blocks then we use this variable
// to keep track of where we are.
diff --git a/net/base/listen_socket.cc b/net/base/listen_socket.cc
index 8428e06..b2185a2 100644
--- a/net/base/listen_socket.cc
+++ b/net/base/listen_socket.cc
@@ -12,7 +12,6 @@
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
-#include "base/message_loop.h"
#include "net/base/net_errors.h"
#include "third_party/libevent/event.h"
#endif
@@ -30,12 +29,7 @@ const int SOCKET_ERROR = -1;
const int kReadBufSize = 200;
ListenSocket::ListenSocket(SOCKET s, ListenSocketDelegate *del)
-#if defined(OS_WIN)
: socket_(s),
-#elif defined(OS_POSIX)
- : event_(new event),
- socket_(s),
-#endif
socket_delegate_(del) {
#if defined(OS_WIN)
socket_event_ = WSACreateEvent();
@@ -177,8 +171,7 @@ void ListenSocket::UnwatchSocket() {
#if defined(OS_WIN)
watcher_.StopWatching();
#elif defined(OS_POSIX)
- MessageLoopForIO::current()->UnwatchSocket(event_.get());
- wait_state_ = NOT_WAITING;
+ watcher_.StopWatchingFileDescriptor();
#endif
}
@@ -187,8 +180,9 @@ void ListenSocket::WatchSocket(WaitState state) {
WSAEventSelect(socket_, socket_event_, FD_ACCEPT | FD_CLOSE | FD_READ);
watcher_.StartWatching(socket_event_, this);
#elif defined(OS_POSIX)
- MessageLoopForIO::current()->WatchSocket(
- socket_, EV_READ|EV_PERSIST, event_.get(),this);
+ // Implicitly calls StartWatchingFileDescriptor().
+ MessageLoopForIO::current()->WatchFileDescriptor(
+ socket_, true, MessageLoopForIO::WATCH_READ, &watcher_, this);
wait_state_ = state;
#endif
}
@@ -250,7 +244,7 @@ void ListenSocket::OnObjectSignaled(HANDLE object) {
}
}
#elif defined(OS_POSIX)
-void ListenSocket::OnSocketReady(short flags) {
+void ListenSocket::OnFileCanReadWithoutBlocking(int fd) {
if (wait_state_ == WAITING_ACCEPT) {
Accept();
}
@@ -262,4 +256,11 @@ void ListenSocket::OnSocketReady(short flags) {
// TODO(erikkay): this seems to get hit multiple times after the close
}
}
+
+void ListenSocket::OnFileCanWriteWithoutBlocking(int fd) {
+ // MessagePumpLibevent callback, we don't listen for write events
+ // so we shouldn't ever reach here.
+ NOTREACHED();
+}
+
#endif
diff --git a/net/base/listen_socket.h b/net/base/listen_socket.h
index 2b32b5b..e405940 100644
--- a/net/base/listen_socket.h
+++ b/net/base/listen_socket.h
@@ -13,13 +13,14 @@
#if defined(OS_WIN)
#include <winsock2.h>
+#endif
+#include <string>
+#if defined(OS_WIN)
#include "base/object_watcher.h"
#elif defined(OS_POSIX)
#include "base/message_loop.h"
#include "net/base/net_util.h"
#include "net/base/net_errors.h"
-#include "third_party/libevent/event.h"
-#include "base/message_pump_libevent.h"
#endif
#include "base/basictypes.h"
@@ -35,7 +36,7 @@ class ListenSocket : public base::RefCountedThreadSafe<ListenSocket>,
#if defined(OS_WIN)
public base::ObjectWatcher::Delegate
#elif defined(OS_POSIX)
- public base::MessagePumpLibevent::Watcher
+ public MessageLoopForIO::Watcher
#endif
{
public:
@@ -80,11 +81,11 @@ class ListenSocket : public base::RefCountedThreadSafe<ListenSocket>,
NOT_WAITING = 0,
WAITING_ACCEPT = 1,
WAITING_READ = 3,
- WAITING_CLOSE = 4
+ WAITING_CLOSE = 4
};
- // Pass any value in case of Windows, because in Windows
- // we are not using state.
- void WatchSocket(WaitState state);
+ // Pass any value in case of Windows, because in Windows
+ // we are not using state.
+ void WatchSocket(WaitState state);
void UnwatchSocket();
#if defined(OS_WIN)
@@ -95,17 +96,17 @@ class ListenSocket : public base::RefCountedThreadSafe<ListenSocket>,
#elif defined(OS_POSIX)
WaitState wait_state_;
// The socket's libevent wrapper
- scoped_ptr<event> event_;
+ MessageLoopForIO::FileDescriptorWatcher watcher_;
// Called by MessagePumpLibevent when the socket is ready to do I/O
- void OnSocketReady(short flags);
+ void OnFileCanReadWithoutBlocking(int fd);
+ void OnFileCanWriteWithoutBlocking(int fd);
#endif
- SOCKET socket_;
+ SOCKET socket_;
ListenSocketDelegate *socket_delegate_;
private:
DISALLOW_EVIL_CONSTRUCTORS(ListenSocket);
};
-#endif // NET_BASE_SOCKET_H_
-
+#endif // NET_BASE_SOCKET_H_
diff --git a/net/base/tcp_client_socket.h b/net/base/tcp_client_socket.h
index 05a433e..5fca519 100644
--- a/net/base/tcp_client_socket.h
+++ b/net/base/tcp_client_socket.h
@@ -14,7 +14,7 @@
struct event; // From libevent
#include <sys/socket.h> // for struct sockaddr
#define SOCKET int
-#include "base/message_pump_libevent.h"
+#include "base/message_loop.h"
#endif
#include "base/scoped_ptr.h"
@@ -26,7 +26,7 @@ namespace net {
// A client socket that uses TCP as the transport layer.
//
-// NOTE: The windows implementation supports half duplex only.
+// NOTE: The windows implementation supports half duplex only.
// Read and Write calls must not be in progress at the same time.
// The libevent implementation supports full duplex because that
// made it slightly easier to implement ssl.
@@ -34,7 +34,7 @@ class TCPClientSocket : public ClientSocket,
#if defined(OS_WIN)
public base::ObjectWatcher::Delegate
#elif defined(OS_POSIX)
- public base::MessagePumpLibevent::Watcher
+ public MessageLoopForIO::Watcher
#endif
{
public:
@@ -52,7 +52,7 @@ class TCPClientSocket : public ClientSocket,
virtual bool IsConnected() const;
// Socket methods:
- // Multiple outstanding requests are not supported.
+ // Multiple outstanding requests are not supported.
// Full duplex mode (reading and writing at the same time) is not supported
// on Windows (but is supported on Linux and Mac for ease of implementation
// of SSLClientSocket)
@@ -97,10 +97,11 @@ class TCPClientSocket : public ClientSocket,
bool waiting_connect_;
// The socket's libevent wrapper
- scoped_ptr<event> event_;
+ MessageLoopForIO::FileDescriptorWatcher socket_watcher_;
// Called by MessagePumpLibevent when the socket is ready to do I/O
- void OnSocketReady(short flags);
+ void OnFileCanReadWithoutBlocking(int fd);
+ void OnFileCanWriteWithoutBlocking(int fd);
// The buffer used by OnSocketReady to retry Read requests
char* buf_;
diff --git a/net/base/tcp_client_socket_libevent.cc b/net/base/tcp_client_socket_libevent.cc
index 79909c20..7a9084b 100644
--- a/net/base/tcp_client_socket_libevent.cc
+++ b/net/base/tcp_client_socket_libevent.cc
@@ -68,7 +68,6 @@ TCPClientSocket::TCPClientSocket(const AddressList& addresses)
addresses_(addresses),
current_ai_(addresses_.head()),
waiting_connect_(false),
- event_(new event),
write_callback_(NULL),
callback_(NULL) {
}
@@ -106,12 +105,17 @@ int TCPClientSocket::Connect(CompletionCallback* callback) {
return MapPosixError(errno);
}
- // Initialize event_ and link it to our MessagePump.
+ // Initialize socket_watcher_ and link it to our MessagePump.
// POLLOUT is set if the connection is established.
- // POLLIN is set if the connection fails,
- // so select for both read and write.
- MessageLoopForIO::current()->WatchSocket(
- socket_, EV_READ|EV_WRITE|EV_PERSIST, event_.get(), this);
+ // POLLIN is set if the connection fails.
+ if (!MessageLoopForIO::current()->WatchFileDescriptor(
+ socket_, true, MessageLoopForIO::WATCH_WRITE, &socket_watcher_,
+ this)) {
+ DLOG(INFO) << "WatchFileDescriptor failed: " << errno;
+ close(socket_);
+ socket_ = kInvalidSocket;
+ return MapPosixError(errno);
+ }
waiting_connect_ = true;
callback_ = callback;
@@ -127,7 +131,7 @@ void TCPClientSocket::Disconnect() {
if (socket_ == kInvalidSocket)
return;
- MessageLoopForIO::current()->UnwatchSocket(event_.get());
+ socket_watcher_.StopWatchingFileDescriptor();
close(socket_);
socket_ = kInvalidSocket;
waiting_connect_ = false;
@@ -170,8 +174,12 @@ int TCPClientSocket::Read(char* buf,
return MapPosixError(errno);
}
- MessageLoopForIO::current()->WatchSocket(
- socket_, EV_READ|EV_PERSIST, event_.get(), this);
+ if (!MessageLoopForIO::current()->WatchFileDescriptor(
+ socket_, true, MessageLoopForIO::WATCH_READ, &socket_watcher_, this))
+ {
+ DLOG(INFO) << "WatchFileDescriptor failed on read, errno " << errno;
+ return MapPosixError(errno);
+ }
buf_ = buf;
buf_len_ = buf_len;
@@ -196,8 +204,13 @@ int TCPClientSocket::Write(const char* buf,
if (errno != EAGAIN && errno != EWOULDBLOCK)
return MapPosixError(errno);
- MessageLoopForIO::current()->WatchSocket(
- socket_, EV_WRITE|EV_PERSIST, event_.get(), this);
+ if (!MessageLoopForIO::current()->WatchFileDescriptor(
+ socket_, true, MessageLoopForIO::WATCH_WRITE, &socket_watcher_, this))
+ {
+ DLOG(INFO) << "WatchFileDescriptor failed on write, errno " << errno;
+ return MapPosixError(errno);
+ }
+
write_buf_ = buf;
write_buf_len_ = buf_len;
@@ -263,12 +276,13 @@ void TCPClientSocket::DidCompleteConnect() {
result = Connect(callback_);
} else {
result = MapPosixError(error_code);
- MessageLoopForIO::current()->UnwatchSocket(event_.get());
+ socket_watcher_.StopWatchingFileDescriptor();
waiting_connect_ = false;
}
- if (result != ERR_IO_PENDING)
+ if (result != ERR_IO_PENDING) {
DoCallback(result);
+ }
}
void TCPClientSocket::DidCompleteRead() {
@@ -285,7 +299,7 @@ void TCPClientSocket::DidCompleteRead() {
if (result != ERR_IO_PENDING) {
buf_ = NULL;
buf_len_ = 0;
- MessageLoopForIO::current()->UnwatchSocket(event_.get());
+ socket_watcher_.StopWatchingFileDescriptor();
DoCallback(result);
}
}
@@ -304,21 +318,24 @@ void TCPClientSocket::DidCompleteWrite() {
if (result != ERR_IO_PENDING) {
write_buf_ = NULL;
write_buf_len_ = 0;
- MessageLoopForIO::current()->UnwatchSocket(event_.get());
+ socket_watcher_.StopWatchingFileDescriptor();
DoWriteCallback(result);
}
}
-void TCPClientSocket::OnSocketReady(short flags) {
- // the only used bits of flags are EV_READ and EV_WRITE
+void TCPClientSocket::OnFileCanReadWithoutBlocking(int fd) {
+ // When a socket connects it signals both Read and Write, we handle
+ // DidCompleteConnect() in the write handler.
+ if (!waiting_connect_ && callback_) {
+ DidCompleteRead();
+ }
+}
+void TCPClientSocket::OnFileCanWriteWithoutBlocking(int fd) {
if (waiting_connect_) {
DidCompleteConnect();
- } else {
- if ((flags & EV_WRITE) && write_callback_)
- DidCompleteWrite();
- if ((flags & EV_READ) && callback_)
- DidCompleteRead();
+ } else if (write_callback_) {
+ DidCompleteWrite();
}
}