diff options
-rw-r--r-- | base/message_loop.cc | 22 | ||||
-rw-r--r-- | base/message_loop.h | 3 | ||||
-rw-r--r-- | base/message_pump_libevent.cc | 43 | ||||
-rw-r--r-- | base/message_pump_libevent.h | 30 |
4 files changed, 84 insertions, 14 deletions
diff --git a/base/message_loop.cc b/base/message_loop.cc index 1145439..898fbfa 100644 --- a/base/message_loop.cc +++ b/base/message_loop.cc @@ -193,7 +193,7 @@ void MessageLoop::RunInternal() { return; } #endif - + pump_->Run(this); } @@ -206,10 +206,10 @@ bool MessageLoop::ProcessNextDelayedNonNestableTask() { if (deferred_non_nestable_work_queue_.empty()) return false; - + Task* task = deferred_non_nestable_work_queue_.front().task; deferred_non_nestable_work_queue_.pop(); - + RunTask(task); return true; } @@ -420,7 +420,7 @@ bool MessageLoop::DoDelayedWork(Time* next_delayed_work_time) { *next_delayed_work_time = Time(); return false; } - + if (delayed_work_queue_.top().delayed_run_time > Time::Now()) { *next_delayed_work_time = delayed_work_queue_.top().delayed_run_time; return false; @@ -428,7 +428,7 @@ bool MessageLoop::DoDelayedWork(Time* next_delayed_work_time) { PendingTask pending_task = delayed_work_queue_.top(); delayed_work_queue_.pop(); - + if (!delayed_work_queue_.empty()) *next_delayed_work_time = delayed_work_queue_.top().delayed_run_time; @@ -593,12 +593,22 @@ bool MessageLoopForIO::WaitForIOCompletion(DWORD timeout, IOHandler* filter) { #elif defined(OS_POSIX) -void MessageLoopForIO::WatchSocket(int socket, short interest_mask, +void MessageLoopForIO::WatchSocket(int socket, short interest_mask, struct event* e, Watcher* watcher) { pump_libevent()->WatchSocket(socket, interest_mask, e, watcher); } +void MessageLoopForIO::WatchFileHandle(int fd, short interest_mask, + struct event* e, FileWatcher* watcher) { + pump_libevent()->WatchFileHandle(fd, interest_mask, e, watcher); +} + + void MessageLoopForIO::UnwatchSocket(struct event* e) { pump_libevent()->UnwatchSocket(e); } + +void MessageLoopForIO::UnwatchFileHandle(struct event* e) { + pump_libevent()->UnwatchFileHandle(e); +} #endif diff --git a/base/message_loop.h b/base/message_loop.h index 8f16d8e..69db8c1 100644 --- a/base/message_loop.h +++ b/base/message_loop.h @@ -473,11 +473,14 @@ class MessageLoopForIO : public MessageLoop { #elif defined(OS_POSIX) typedef base::MessagePumpLibevent::Watcher Watcher; + typedef base::MessagePumpLibevent::FileWatcher FileWatcher; // Please see MessagePumpLibevent for definitions of these methods. void WatchSocket(int socket, short interest_mask, struct event* e, Watcher* watcher); + void WatchFileHandle(int fd, short interest_mask, event* e, FileWatcher*); void UnwatchSocket(struct event* e); + void UnwatchFileHandle(event* e); #endif // defined(OS_POSIX) }; diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc index 0dd533e..a281980 100644 --- a/base/message_pump_libevent.cc +++ b/base/message_pump_libevent.cc @@ -26,7 +26,7 @@ static int SetNonBlocking(int fd) // Called if a byte is received on the wakeup pipe. void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { - base::MessagePumpLibevent* that = + base::MessagePumpLibevent* that = static_cast<base::MessagePumpLibevent*>(context); DCHECK(that->wakeup_pipe_out_ == socket); @@ -60,7 +60,7 @@ bool MessagePumpLibevent::Init() { wakeup_pipe_in_ = fds[1]; wakeup_event_ = new event; - event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, + event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, OnWakeup, this); event_base_set(event_base_, wakeup_event_); @@ -77,7 +77,7 @@ MessagePumpLibevent::~MessagePumpLibevent() { event_base_free(event_base_); } -void MessagePumpLibevent::WatchSocket(int socket, short interest_mask, +void MessagePumpLibevent::WatchSocket(int socket, short interest_mask, event* e, Watcher* watcher) { // Set current interest mask and message pump for this event @@ -91,13 +91,36 @@ void MessagePumpLibevent::WatchSocket(int socket, short interest_mask, NOTREACHED(); } +void MessagePumpLibevent::WatchFileHandle(int fd, short interest_mask, + event* e, FileWatcher* watcher) { + // Set current interest mask and message pump for this event + if ((interest_mask & EV_READ) != 0) { + event_set(e, fd, interest_mask, OnFileReadReadinessNotification, watcher); + } else { + event_set(e, fd, interest_mask, OnFileWriteReadinessNotification, watcher); + } + + // Tell libevent which message pump this fd will belong to when we add it. + event_base_set(event_base_, e); + + // Add this fd to the list of monitored sockets. + if (event_add(e, NULL)) + NOTREACHED(); +} + void MessagePumpLibevent::UnwatchSocket(event* e) { // Remove this socket from the list of monitored sockets. if (event_del(e)) NOTREACHED(); } -void MessagePumpLibevent::OnReadinessNotification(int socket, short flags, +void MessagePumpLibevent::UnwatchFileHandle(event* e) { + // Remove this fd from the list of monitored fds. + if (event_del(e)) + NOTREACHED(); +} + +void MessagePumpLibevent::OnReadinessNotification(int socket, short flags, void* context) { // The given socket is ready for I/O. // Tell the owner what kind of I/O the socket is ready for. @@ -105,6 +128,18 @@ void MessagePumpLibevent::OnReadinessNotification(int socket, short flags, watcher->OnSocketReady(flags); } +void MessagePumpLibevent::OnFileReadReadinessNotification(int fd, short flags, + void* context) { + FileWatcher* watcher = static_cast<FileWatcher*>(context); + watcher->OnFileReadReady(fd); +} + +void MessagePumpLibevent::OnFileWriteReadinessNotification(int fd, short flags, + void* context) { + FileWatcher* watcher = static_cast<FileWatcher*>(context); + watcher->OnFileWriteReady(fd); +} + // Reentrant! void MessagePumpLibevent::Run(Delegate* delegate) { DCHECK(keep_running_) << "Quit must have been called outside of Run!"; diff --git a/base/message_pump_libevent.h b/base/message_pump_libevent.h index 9934f31..b54bc36 100644 --- a/base/message_pump_libevent.h +++ b/base/message_pump_libevent.h @@ -18,7 +18,7 @@ namespace base { // TODO(dkegel): add support for background file IO somehow class MessagePumpLibevent : public MessagePump { public: - // Used with WatchObject to asynchronously monitor the I/O readiness of a + // Used with WatchSocket to asynchronously monitor the I/O readiness of a // socket. class Watcher { public: @@ -27,22 +27,38 @@ class MessagePumpLibevent : public MessagePump { virtual void OnSocketReady(short eventmask) = 0; }; + // Used with WatchFileHandle to monitor I/O readiness for a File Handle. + class FileWatcher { + public: + virtual ~FileWatcher() {} + // Called from MessageLoop::Run when a non-blocking read/write can be made. + virtual void OnFileReadReady(int fd) = 0; + virtual void OnFileWriteReady(int fd) = 0; + }; + MessagePumpLibevent(); virtual ~MessagePumpLibevent(); // Have the current thread's message loop watch for a ready socket. // Caller must provide a struct event for this socket for libevent's use. // The event and interest_mask fields are defined in libevent. - // Returns true on success. + // Returns true on success. // TODO(dkegel): hide libevent better; abstraction still too leaky // TODO(dkegel): better error handing // TODO(dkegel): switch to edge-triggered readiness notification void WatchSocket(int socket, short interest_mask, event* e, Watcher*); + // TODO(playmobil): Merge this with WatchSocket(). + void WatchFileHandle(int fd, short interest_mask, event* e, FileWatcher*); + // Stop watching a socket. // Event was previously initialized by WatchSocket. void UnwatchSocket(event* e); + // Stop watching a File Handle. + // Event was previously initialized by WatchFileHandle. + void UnwatchFileHandle(event* e); + // MessagePump methods: virtual void Run(Delegate* delegate); virtual void Quit(); @@ -63,17 +79,23 @@ class MessagePumpLibevent : public MessagePump { // The time at which we should call DoDelayedWork. Time delayed_work_time_; - // Libevent dispatcher. Watches all sockets registered with it, and sends + // Libevent dispatcher. Watches all sockets registered with it, and sends // readiness callbacks when a socket is ready for I/O. event_base* event_base_; // Called by libevent to tell us a registered socket is ready static void OnReadinessNotification(int socket, short flags, void* context); + // Called by libevent to tell us a registered fd is ready. + static void OnFileReadReadinessNotification(int fd, short flags, + void* context); + static void OnFileWriteReadinessNotification(int fd, short flags, + void* context); + // Unix pipe used to implement ScheduleWork() // ... callback; called by libevent inside Run() when pipe is ready to read static void OnWakeup(int socket, short flags, void* context); - // ... write end; ScheduleWork() writes a single byte to it + // ... write end; ScheduleWork() writes a single byte to it int wakeup_pipe_in_; // ... read end; OnWakeup reads it and then breaks Run() out of its sleep int wakeup_pipe_out_; |