diff options
author | jeremy@chromium.org <jeremy@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-12-15 22:02:17 +0000 |
---|---|---|
committer | jeremy@chromium.org <jeremy@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-12-15 22:02:17 +0000 |
commit | e45e6c09f752dd23b2560cc64d990e6c03082083 (patch) | |
tree | 13b0e19b53b60f01e3528b20f74bf8173b4e4d7b /base | |
parent | bf54f6c60220a24ef1230f7c18153f2b077f5125 (diff) | |
download | chromium_src-e45e6c09f752dd23b2560cc64d990e6c03082083.zip chromium_src-e45e6c09f752dd23b2560cc64d990e6c03082083.tar.gz chromium_src-e45e6c09f752dd23b2560cc64d990e6c03082083.tar.bz2 |
message_pump_libevent refactor:
* Unify WatchSocket & WatchFileHandle.
* Better encapsulate libevent.
* Fix a bug with blocking writes in ipc_posix.cc
Review URL: http://codereview.chromium.org/13757
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@7010 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base')
-rw-r--r-- | base/message_loop.cc | 27 | ||||
-rw-r--r-- | base/message_loop.h | 23 | ||||
-rw-r--r-- | base/message_pump_libevent.cc | 150 | ||||
-rw-r--r-- | base/message_pump_libevent.h | 97 |
4 files changed, 178 insertions, 119 deletions
diff --git a/base/message_loop.cc b/base/message_loop.cc index 898fbfa..12ad3fa 100644 --- a/base/message_loop.cc +++ b/base/message_loop.cc @@ -593,22 +593,17 @@ bool MessageLoopForIO::WaitForIOCompletion(DWORD timeout, IOHandler* filter) { #elif defined(OS_POSIX) -void MessageLoopForIO::WatchSocket(int socket, short interest_mask, - struct event* e, Watcher* watcher) { - pump_libevent()->WatchSocket(socket, interest_mask, e, watcher); +bool MessageLoopForIO::WatchFileDescriptor(int fd, + bool persistent, + Mode mode, + FileDescriptorWatcher *controller, + Watcher *delegate) { + return pump_libevent()->WatchFileDescriptor( + fd, + persistent, + static_cast<base::MessagePumpLibevent::Mode>(mode), + controller, + delegate); } -void MessageLoopForIO::WatchFileHandle(int fd, short interest_mask, - struct event* e, FileWatcher* watcher) { - pump_libevent()->WatchFileHandle(fd, interest_mask, e, watcher); -} - - -void MessageLoopForIO::UnwatchSocket(struct event* e) { - pump_libevent()->UnwatchSocket(e); -} - -void MessageLoopForIO::UnwatchFileHandle(struct event* e) { - pump_libevent()->UnwatchFileHandle(e); -} #endif diff --git a/base/message_loop.h b/base/message_loop.h index 69db8c1..6cd0ef4 100644 --- a/base/message_loop.h +++ b/base/message_loop.h @@ -473,14 +473,21 @@ class MessageLoopForIO : public MessageLoop { #elif defined(OS_POSIX) typedef base::MessagePumpLibevent::Watcher Watcher; - typedef base::MessagePumpLibevent::FileWatcher FileWatcher; - - // Please see MessagePumpLibevent for definitions of these methods. - void WatchSocket(int socket, short interest_mask, - struct event* e, Watcher* watcher); - void WatchFileHandle(int fd, short interest_mask, event* e, FileWatcher*); - void UnwatchSocket(struct event* e); - void UnwatchFileHandle(event* e); + typedef base::MessagePumpLibevent::FileDescriptorWatcher + FileDescriptorWatcher; + + enum Mode { + WATCH_READ = base::MessagePumpLibevent::WATCH_READ, + WATCH_WRITE = base::MessagePumpLibevent::WATCH_WRITE, + WATCH_READ_WRITE = base::MessagePumpLibevent::WATCH_READ_WRITE + }; + + // Please see MessagePumpLibevent for definition. + bool WatchFileDescriptor(int fd, + bool persistent, + Mode mode, + FileDescriptorWatcher *controller, + Watcher *delegate); #endif // defined(OS_POSIX) }; diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc index a281980..f31c01b 100644 --- a/base/message_pump_libevent.cc +++ b/base/message_pump_libevent.cc @@ -15,17 +15,48 @@ namespace base { // Return 0 on success // Too small a function to bother putting in a library? -static int SetNonBlocking(int fd) -{ - int flags = fcntl(fd, F_GETFL, 0); - if (-1 == flags) - flags = 0; - return fcntl(fd, F_SETFL, flags | O_NONBLOCK); +static int SetNonBlocking(int fd) { + int flags = fcntl(fd, F_GETFL, 0); + if (flags == -1) + flags = 0; + return fcntl(fd, F_SETFL, flags | O_NONBLOCK); +} + +MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() + : is_persistent_(false), + event_(NULL) { +} + +MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { + if (event_.get()) { + StopWatchingFileDescriptor(); + } +} + +void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e, + bool is_persistent) { + DCHECK(e); + DCHECK(event_.get() == NULL); + + is_persistent_ = is_persistent; + event_.reset(e); +} + +event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() { + return event_.release(); +} + +bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { + if (event_.get() == NULL) { + return true; + } + + // event_del() is a no-op of the event isn't active. + return (event_del(event_.get()) == 0); } // Called if a byte is received on the wakeup pipe. void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { - base::MessagePumpLibevent* that = static_cast<base::MessagePumpLibevent*>(context); DCHECK(that->wakeup_pipe_out_ == socket); @@ -61,7 +92,7 @@ bool MessagePumpLibevent::Init() { wakeup_event_ = new event; event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, - OnWakeup, this); + OnWakeup, this); event_base_set(event_base_, wakeup_event_); if (event_add(wakeup_event_, 0)) @@ -77,67 +108,72 @@ MessagePumpLibevent::~MessagePumpLibevent() { event_base_free(event_base_); } -void MessagePumpLibevent::WatchSocket(int socket, short interest_mask, - event* e, Watcher* watcher) { +bool MessagePumpLibevent::WatchFileDescriptor(int fd, + bool persistent, + Mode mode, + FileDescriptorWatcher *controller, + Watcher *delegate) { + DCHECK(fd > 0); + DCHECK(controller); + DCHECK(delegate); + DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE); - // Set current interest mask and message pump for this event - event_set(e, socket, interest_mask, OnReadinessNotification, watcher); + int event_mask = persistent ? EV_PERSIST : 0; + if ((mode & WATCH_READ) != 0) { + event_mask |= EV_READ; + } + if ((mode & WATCH_WRITE) != 0) { + event_mask |= EV_WRITE; + } - // Tell libevent which message pump this socket will belong to when we add it. - event_base_set(event_base_, e); + // |should_delete_event| is true if we're modifying an event that's currently + // active in |controller|. + // If we're modifying an existing event and there's an error then we need to + // tell libevent to clean it up via event_delete() before returning. + bool should_delete_event = true; + scoped_ptr<event> evt(controller->ReleaseEvent()); + if (evt.get() == NULL) { + should_delete_event = false; + // Ownership is transferred to the controller. + evt.reset(new event); + } - // Add this socket to the list of monitored sockets. - if (event_add(e, NULL)) - NOTREACHED(); -} + // Set current interest mask and message pump for this event. + event_set(evt.get(), fd, event_mask, OnLibeventNotification, + delegate); -void MessagePumpLibevent::WatchFileHandle(int fd, short interest_mask, - event* e, FileWatcher* watcher) { - // Set current interest mask and message pump for this event - if ((interest_mask & EV_READ) != 0) { - event_set(e, fd, interest_mask, OnFileReadReadinessNotification, watcher); - } else { - event_set(e, fd, interest_mask, OnFileWriteReadinessNotification, watcher); + // Tell libevent which message pump this socket will belong to when we add it. + if (event_base_set(event_base_, evt.get()) != 0) { + if (should_delete_event) { + event_del(evt.get()); + } + return false; } - // Tell libevent which message pump this fd will belong to when we add it. - event_base_set(event_base_, e); - - // Add this fd to the list of monitored sockets. - if (event_add(e, NULL)) - NOTREACHED(); -} + // Add this socket to the list of monitored sockets. + if (event_add(evt.get(), NULL) != 0) { + if (should_delete_event) { + event_del(evt.get()); + } + return false; + } -void MessagePumpLibevent::UnwatchSocket(event* e) { - // Remove this socket from the list of monitored sockets. - if (event_del(e)) - NOTREACHED(); + // Transfer ownership of e to controller. + controller->Init(evt.release(), persistent); + return true; } -void MessagePumpLibevent::UnwatchFileHandle(event* e) { - // Remove this fd from the list of monitored fds. - if (event_del(e)) - NOTREACHED(); -} -void MessagePumpLibevent::OnReadinessNotification(int socket, short flags, - void* context) { - // The given socket is ready for I/O. - // Tell the owner what kind of I/O the socket is ready for. +void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, + void* context) { Watcher* watcher = static_cast<Watcher*>(context); - watcher->OnSocketReady(flags); -} -void MessagePumpLibevent::OnFileReadReadinessNotification(int fd, short flags, - void* context) { - FileWatcher* watcher = static_cast<FileWatcher*>(context); - watcher->OnFileReadReady(fd); -} - -void MessagePumpLibevent::OnFileWriteReadinessNotification(int fd, short flags, - void* context) { - FileWatcher* watcher = static_cast<FileWatcher*>(context); - watcher->OnFileWriteReady(fd); + if (flags & EV_WRITE) { + watcher->OnFileCanWriteWithoutBlocking(fd); + } + if (flags & EV_READ) { + watcher->OnFileCanReadWithoutBlocking(fd); + } } // Reentrant! diff --git a/base/message_pump_libevent.h b/base/message_pump_libevent.h index b54bc36..aa5d5f8 100644 --- a/base/message_pump_libevent.h +++ b/base/message_pump_libevent.h @@ -6,6 +6,7 @@ #define BASE_MESSAGE_PUMP_LIBEVENT_H_ #include "base/message_pump.h" +#include "base/scoped_ptr.h" #include "base/time.h" // Declare structs we need from libevent.h rather than including it @@ -18,46 +19,71 @@ namespace base { // TODO(dkegel): add support for background file IO somehow class MessagePumpLibevent : public MessagePump { public: - // Used with WatchSocket to asynchronously monitor the I/O readiness of a - // socket. - class Watcher { - public: - virtual ~Watcher() {} - // Called from MessageLoop::Run when a ready socket is detected. - virtual void OnSocketReady(short eventmask) = 0; + + // Object returned by WatchFileDescriptor to manage further watching. + class FileDescriptorWatcher { + public: + FileDescriptorWatcher(); + ~FileDescriptorWatcher(); // Implicitly calls StopWatchingFileDescriptor. + + // NOTE: These methods aren't called StartWatching()/StopWatching() to + // avoid confusion with the win32 ObjectWatcher class. + + // Stop watching the FD, always safe to call. No-op if there's nothing + // to do. + bool StopWatchingFileDescriptor(); + + private: + // Called by MessagePumpLibevent, ownership of |e| is transferred to this + // object. + void Init(event* e, bool is_persistent); + + // Used by MessagePumpLibevent to take ownership of event_. + event *ReleaseEvent(); + friend class MessagePumpLibevent; + + private: + bool is_persistent_; // false if this event is one-shot. + scoped_ptr<event> event_; + DISALLOW_COPY_AND_ASSIGN(FileDescriptorWatcher); }; - // Used with WatchFileHandle to monitor I/O readiness for a File Handle. - class FileWatcher { + // Used with WatchFileDescptor to asynchronously monitor the I/O readiness of + // a File Descriptor. + class Watcher { public: - virtual ~FileWatcher() {} - // Called from MessageLoop::Run when a non-blocking read/write can be made. - virtual void OnFileReadReady(int fd) = 0; - virtual void OnFileWriteReady(int fd) = 0; + virtual ~Watcher() {} + // Called from MessageLoop::Run when an FD can be read from/written to + // without blocking + virtual void OnFileCanReadWithoutBlocking(int fd) = 0; + virtual void OnFileCanWriteWithoutBlocking(int fd) = 0; }; MessagePumpLibevent(); virtual ~MessagePumpLibevent(); - // Have the current thread's message loop watch for a ready socket. - // Caller must provide a struct event for this socket for libevent's use. - // The event and interest_mask fields are defined in libevent. + enum Mode { + WATCH_READ = 1 << 0, + WATCH_WRITE = 1 << 1, + WATCH_READ_WRITE = WATCH_READ | WATCH_WRITE + }; + + // Have the current thread's message loop watch for a a situation in which + // reading/writing to the FD can be performed without Blocking. + // Callers must provide a preallocated FileDescriptorWatcher object which + // can later be used to manage the Lifetime of this event. + // If a FileDescriptorWatcher is passed in which is already attached to + // an event, then the effect is cumulative i.e. after the call |controller| + // will watch both the previous event and the new one. + // If an error occurs while calling this method in a cumulative fashion, the + // event previously attached to |controller| is aborted. // Returns true on success. - // TODO(dkegel): hide libevent better; abstraction still too leaky - // TODO(dkegel): better error handing // TODO(dkegel): switch to edge-triggered readiness notification - void WatchSocket(int socket, short interest_mask, event* e, Watcher*); - - // TODO(playmobil): Merge this with WatchSocket(). - void WatchFileHandle(int fd, short interest_mask, event* e, FileWatcher*); - - // Stop watching a socket. - // Event was previously initialized by WatchSocket. - void UnwatchSocket(event* e); - - // Stop watching a File Handle. - // Event was previously initialized by WatchFileHandle. - void UnwatchFileHandle(event* e); + bool WatchFileDescriptor(int fd, + bool persistent, + Mode mode, + FileDescriptorWatcher *controller, + Watcher *delegate); // MessagePump methods: virtual void Run(Delegate* delegate); @@ -83,14 +109,9 @@ class MessagePumpLibevent : public MessagePump { // readiness callbacks when a socket is ready for I/O. event_base* event_base_; - // Called by libevent to tell us a registered socket is ready - static void OnReadinessNotification(int socket, short flags, void* context); - - // Called by libevent to tell us a registered fd is ready. - static void OnFileReadReadinessNotification(int fd, short flags, - void* context); - static void OnFileWriteReadinessNotification(int fd, short flags, - void* context); + // Called by libevent to tell us a registered FD can be read/written to. + static void OnLibeventNotification(int fd, short flags, + void* context); // Unix pipe used to implement ScheduleWork() // ... callback; called by libevent inside Run() when pipe is ready to read |