summaryrefslogtreecommitdiffstats
path: root/base
diff options
context:
space:
mode:
authorjeremy@chromium.org <jeremy@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2008-12-15 22:02:17 +0000
committerjeremy@chromium.org <jeremy@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2008-12-15 22:02:17 +0000
commite45e6c09f752dd23b2560cc64d990e6c03082083 (patch)
tree13b0e19b53b60f01e3528b20f74bf8173b4e4d7b /base
parentbf54f6c60220a24ef1230f7c18153f2b077f5125 (diff)
downloadchromium_src-e45e6c09f752dd23b2560cc64d990e6c03082083.zip
chromium_src-e45e6c09f752dd23b2560cc64d990e6c03082083.tar.gz
chromium_src-e45e6c09f752dd23b2560cc64d990e6c03082083.tar.bz2
message_pump_libevent refactor:
* Unify WatchSocket & WatchFileHandle. * Better encapsulate libevent. * Fix a bug with blocking writes in ipc_posix.cc Review URL: http://codereview.chromium.org/13757 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@7010 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base')
-rw-r--r--base/message_loop.cc27
-rw-r--r--base/message_loop.h23
-rw-r--r--base/message_pump_libevent.cc150
-rw-r--r--base/message_pump_libevent.h97
4 files changed, 178 insertions, 119 deletions
diff --git a/base/message_loop.cc b/base/message_loop.cc
index 898fbfa..12ad3fa 100644
--- a/base/message_loop.cc
+++ b/base/message_loop.cc
@@ -593,22 +593,17 @@ bool MessageLoopForIO::WaitForIOCompletion(DWORD timeout, IOHandler* filter) {
#elif defined(OS_POSIX)
-void MessageLoopForIO::WatchSocket(int socket, short interest_mask,
- struct event* e, Watcher* watcher) {
- pump_libevent()->WatchSocket(socket, interest_mask, e, watcher);
+bool MessageLoopForIO::WatchFileDescriptor(int fd,
+ bool persistent,
+ Mode mode,
+ FileDescriptorWatcher *controller,
+ Watcher *delegate) {
+ return pump_libevent()->WatchFileDescriptor(
+ fd,
+ persistent,
+ static_cast<base::MessagePumpLibevent::Mode>(mode),
+ controller,
+ delegate);
}
-void MessageLoopForIO::WatchFileHandle(int fd, short interest_mask,
- struct event* e, FileWatcher* watcher) {
- pump_libevent()->WatchFileHandle(fd, interest_mask, e, watcher);
-}
-
-
-void MessageLoopForIO::UnwatchSocket(struct event* e) {
- pump_libevent()->UnwatchSocket(e);
-}
-
-void MessageLoopForIO::UnwatchFileHandle(struct event* e) {
- pump_libevent()->UnwatchFileHandle(e);
-}
#endif
diff --git a/base/message_loop.h b/base/message_loop.h
index 69db8c1..6cd0ef4 100644
--- a/base/message_loop.h
+++ b/base/message_loop.h
@@ -473,14 +473,21 @@ class MessageLoopForIO : public MessageLoop {
#elif defined(OS_POSIX)
typedef base::MessagePumpLibevent::Watcher Watcher;
- typedef base::MessagePumpLibevent::FileWatcher FileWatcher;
-
- // Please see MessagePumpLibevent for definitions of these methods.
- void WatchSocket(int socket, short interest_mask,
- struct event* e, Watcher* watcher);
- void WatchFileHandle(int fd, short interest_mask, event* e, FileWatcher*);
- void UnwatchSocket(struct event* e);
- void UnwatchFileHandle(event* e);
+ typedef base::MessagePumpLibevent::FileDescriptorWatcher
+ FileDescriptorWatcher;
+
+ enum Mode {
+ WATCH_READ = base::MessagePumpLibevent::WATCH_READ,
+ WATCH_WRITE = base::MessagePumpLibevent::WATCH_WRITE,
+ WATCH_READ_WRITE = base::MessagePumpLibevent::WATCH_READ_WRITE
+ };
+
+ // Please see MessagePumpLibevent for definition.
+ bool WatchFileDescriptor(int fd,
+ bool persistent,
+ Mode mode,
+ FileDescriptorWatcher *controller,
+ Watcher *delegate);
#endif // defined(OS_POSIX)
};
diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc
index a281980..f31c01b 100644
--- a/base/message_pump_libevent.cc
+++ b/base/message_pump_libevent.cc
@@ -15,17 +15,48 @@ namespace base {
// Return 0 on success
// Too small a function to bother putting in a library?
-static int SetNonBlocking(int fd)
-{
- int flags = fcntl(fd, F_GETFL, 0);
- if (-1 == flags)
- flags = 0;
- return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+static int SetNonBlocking(int fd) {
+ int flags = fcntl(fd, F_GETFL, 0);
+ if (flags == -1)
+ flags = 0;
+ return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+}
+
+MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
+ : is_persistent_(false),
+ event_(NULL) {
+}
+
+MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
+ if (event_.get()) {
+ StopWatchingFileDescriptor();
+ }
+}
+
+void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e,
+ bool is_persistent) {
+ DCHECK(e);
+ DCHECK(event_.get() == NULL);
+
+ is_persistent_ = is_persistent;
+ event_.reset(e);
+}
+
+event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
+ return event_.release();
+}
+
+bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
+ if (event_.get() == NULL) {
+ return true;
+ }
+
+ // event_del() is a no-op of the event isn't active.
+ return (event_del(event_.get()) == 0);
}
// Called if a byte is received on the wakeup pipe.
void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
-
base::MessagePumpLibevent* that =
static_cast<base::MessagePumpLibevent*>(context);
DCHECK(that->wakeup_pipe_out_ == socket);
@@ -61,7 +92,7 @@ bool MessagePumpLibevent::Init() {
wakeup_event_ = new event;
event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
- OnWakeup, this);
+ OnWakeup, this);
event_base_set(event_base_, wakeup_event_);
if (event_add(wakeup_event_, 0))
@@ -77,67 +108,72 @@ MessagePumpLibevent::~MessagePumpLibevent() {
event_base_free(event_base_);
}
-void MessagePumpLibevent::WatchSocket(int socket, short interest_mask,
- event* e, Watcher* watcher) {
+bool MessagePumpLibevent::WatchFileDescriptor(int fd,
+ bool persistent,
+ Mode mode,
+ FileDescriptorWatcher *controller,
+ Watcher *delegate) {
+ DCHECK(fd > 0);
+ DCHECK(controller);
+ DCHECK(delegate);
+ DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
- // Set current interest mask and message pump for this event
- event_set(e, socket, interest_mask, OnReadinessNotification, watcher);
+ int event_mask = persistent ? EV_PERSIST : 0;
+ if ((mode & WATCH_READ) != 0) {
+ event_mask |= EV_READ;
+ }
+ if ((mode & WATCH_WRITE) != 0) {
+ event_mask |= EV_WRITE;
+ }
- // Tell libevent which message pump this socket will belong to when we add it.
- event_base_set(event_base_, e);
+ // |should_delete_event| is true if we're modifying an event that's currently
+ // active in |controller|.
+ // If we're modifying an existing event and there's an error then we need to
+ // tell libevent to clean it up via event_delete() before returning.
+ bool should_delete_event = true;
+ scoped_ptr<event> evt(controller->ReleaseEvent());
+ if (evt.get() == NULL) {
+ should_delete_event = false;
+ // Ownership is transferred to the controller.
+ evt.reset(new event);
+ }
- // Add this socket to the list of monitored sockets.
- if (event_add(e, NULL))
- NOTREACHED();
-}
+ // Set current interest mask and message pump for this event.
+ event_set(evt.get(), fd, event_mask, OnLibeventNotification,
+ delegate);
-void MessagePumpLibevent::WatchFileHandle(int fd, short interest_mask,
- event* e, FileWatcher* watcher) {
- // Set current interest mask and message pump for this event
- if ((interest_mask & EV_READ) != 0) {
- event_set(e, fd, interest_mask, OnFileReadReadinessNotification, watcher);
- } else {
- event_set(e, fd, interest_mask, OnFileWriteReadinessNotification, watcher);
+ // Tell libevent which message pump this socket will belong to when we add it.
+ if (event_base_set(event_base_, evt.get()) != 0) {
+ if (should_delete_event) {
+ event_del(evt.get());
+ }
+ return false;
}
- // Tell libevent which message pump this fd will belong to when we add it.
- event_base_set(event_base_, e);
-
- // Add this fd to the list of monitored sockets.
- if (event_add(e, NULL))
- NOTREACHED();
-}
+ // Add this socket to the list of monitored sockets.
+ if (event_add(evt.get(), NULL) != 0) {
+ if (should_delete_event) {
+ event_del(evt.get());
+ }
+ return false;
+ }
-void MessagePumpLibevent::UnwatchSocket(event* e) {
- // Remove this socket from the list of monitored sockets.
- if (event_del(e))
- NOTREACHED();
+ // Transfer ownership of e to controller.
+ controller->Init(evt.release(), persistent);
+ return true;
}
-void MessagePumpLibevent::UnwatchFileHandle(event* e) {
- // Remove this fd from the list of monitored fds.
- if (event_del(e))
- NOTREACHED();
-}
-void MessagePumpLibevent::OnReadinessNotification(int socket, short flags,
- void* context) {
- // The given socket is ready for I/O.
- // Tell the owner what kind of I/O the socket is ready for.
+void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
+ void* context) {
Watcher* watcher = static_cast<Watcher*>(context);
- watcher->OnSocketReady(flags);
-}
-void MessagePumpLibevent::OnFileReadReadinessNotification(int fd, short flags,
- void* context) {
- FileWatcher* watcher = static_cast<FileWatcher*>(context);
- watcher->OnFileReadReady(fd);
-}
-
-void MessagePumpLibevent::OnFileWriteReadinessNotification(int fd, short flags,
- void* context) {
- FileWatcher* watcher = static_cast<FileWatcher*>(context);
- watcher->OnFileWriteReady(fd);
+ if (flags & EV_WRITE) {
+ watcher->OnFileCanWriteWithoutBlocking(fd);
+ }
+ if (flags & EV_READ) {
+ watcher->OnFileCanReadWithoutBlocking(fd);
+ }
}
// Reentrant!
diff --git a/base/message_pump_libevent.h b/base/message_pump_libevent.h
index b54bc36..aa5d5f8 100644
--- a/base/message_pump_libevent.h
+++ b/base/message_pump_libevent.h
@@ -6,6 +6,7 @@
#define BASE_MESSAGE_PUMP_LIBEVENT_H_
#include "base/message_pump.h"
+#include "base/scoped_ptr.h"
#include "base/time.h"
// Declare structs we need from libevent.h rather than including it
@@ -18,46 +19,71 @@ namespace base {
// TODO(dkegel): add support for background file IO somehow
class MessagePumpLibevent : public MessagePump {
public:
- // Used with WatchSocket to asynchronously monitor the I/O readiness of a
- // socket.
- class Watcher {
- public:
- virtual ~Watcher() {}
- // Called from MessageLoop::Run when a ready socket is detected.
- virtual void OnSocketReady(short eventmask) = 0;
+
+ // Object returned by WatchFileDescriptor to manage further watching.
+ class FileDescriptorWatcher {
+ public:
+ FileDescriptorWatcher();
+ ~FileDescriptorWatcher(); // Implicitly calls StopWatchingFileDescriptor.
+
+ // NOTE: These methods aren't called StartWatching()/StopWatching() to
+ // avoid confusion with the win32 ObjectWatcher class.
+
+ // Stop watching the FD, always safe to call. No-op if there's nothing
+ // to do.
+ bool StopWatchingFileDescriptor();
+
+ private:
+ // Called by MessagePumpLibevent, ownership of |e| is transferred to this
+ // object.
+ void Init(event* e, bool is_persistent);
+
+ // Used by MessagePumpLibevent to take ownership of event_.
+ event *ReleaseEvent();
+ friend class MessagePumpLibevent;
+
+ private:
+ bool is_persistent_; // false if this event is one-shot.
+ scoped_ptr<event> event_;
+ DISALLOW_COPY_AND_ASSIGN(FileDescriptorWatcher);
};
- // Used with WatchFileHandle to monitor I/O readiness for a File Handle.
- class FileWatcher {
+ // Used with WatchFileDescptor to asynchronously monitor the I/O readiness of
+ // a File Descriptor.
+ class Watcher {
public:
- virtual ~FileWatcher() {}
- // Called from MessageLoop::Run when a non-blocking read/write can be made.
- virtual void OnFileReadReady(int fd) = 0;
- virtual void OnFileWriteReady(int fd) = 0;
+ virtual ~Watcher() {}
+ // Called from MessageLoop::Run when an FD can be read from/written to
+ // without blocking
+ virtual void OnFileCanReadWithoutBlocking(int fd) = 0;
+ virtual void OnFileCanWriteWithoutBlocking(int fd) = 0;
};
MessagePumpLibevent();
virtual ~MessagePumpLibevent();
- // Have the current thread's message loop watch for a ready socket.
- // Caller must provide a struct event for this socket for libevent's use.
- // The event and interest_mask fields are defined in libevent.
+ enum Mode {
+ WATCH_READ = 1 << 0,
+ WATCH_WRITE = 1 << 1,
+ WATCH_READ_WRITE = WATCH_READ | WATCH_WRITE
+ };
+
+ // Have the current thread's message loop watch for a a situation in which
+ // reading/writing to the FD can be performed without Blocking.
+ // Callers must provide a preallocated FileDescriptorWatcher object which
+ // can later be used to manage the Lifetime of this event.
+ // If a FileDescriptorWatcher is passed in which is already attached to
+ // an event, then the effect is cumulative i.e. after the call |controller|
+ // will watch both the previous event and the new one.
+ // If an error occurs while calling this method in a cumulative fashion, the
+ // event previously attached to |controller| is aborted.
// Returns true on success.
- // TODO(dkegel): hide libevent better; abstraction still too leaky
- // TODO(dkegel): better error handing
// TODO(dkegel): switch to edge-triggered readiness notification
- void WatchSocket(int socket, short interest_mask, event* e, Watcher*);
-
- // TODO(playmobil): Merge this with WatchSocket().
- void WatchFileHandle(int fd, short interest_mask, event* e, FileWatcher*);
-
- // Stop watching a socket.
- // Event was previously initialized by WatchSocket.
- void UnwatchSocket(event* e);
-
- // Stop watching a File Handle.
- // Event was previously initialized by WatchFileHandle.
- void UnwatchFileHandle(event* e);
+ bool WatchFileDescriptor(int fd,
+ bool persistent,
+ Mode mode,
+ FileDescriptorWatcher *controller,
+ Watcher *delegate);
// MessagePump methods:
virtual void Run(Delegate* delegate);
@@ -83,14 +109,9 @@ class MessagePumpLibevent : public MessagePump {
// readiness callbacks when a socket is ready for I/O.
event_base* event_base_;
- // Called by libevent to tell us a registered socket is ready
- static void OnReadinessNotification(int socket, short flags, void* context);
-
- // Called by libevent to tell us a registered fd is ready.
- static void OnFileReadReadinessNotification(int fd, short flags,
- void* context);
- static void OnFileWriteReadinessNotification(int fd, short flags,
- void* context);
+ // Called by libevent to tell us a registered FD can be read/written to.
+ static void OnLibeventNotification(int fd, short flags,
+ void* context);
// Unix pipe used to implement ScheduleWork()
// ... callback; called by libevent inside Run() when pipe is ready to read