diff options
Diffstat (limited to 'base/message_pump_libevent.cc')
-rw-r--r-- | base/message_pump_libevent.cc | 59 |
1 files changed, 50 insertions, 9 deletions
diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc index 2ad1d97..c2390b4 100644 --- a/base/message_pump_libevent.cc +++ b/base/message_pump_libevent.cc @@ -7,9 +7,10 @@ #include <errno.h> #include <fcntl.h> -#include "eintr_wrapper.h" #include "base/auto_reset.h" +#include "base/eintr_wrapper.h" #include "base/logging.h" +#include "base/observer_list.h" #include "base/scoped_nsautorelease_pool.h" #include "base/scoped_ptr.h" #include "base/time.h" @@ -50,7 +51,9 @@ static int SetNonBlocking(int fd) { MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() : is_persistent_(false), - event_(NULL) { + event_(NULL), + pump_(NULL), + watcher_(NULL) { } MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { @@ -82,9 +85,25 @@ bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { // event_del() is a no-op if the event isn't active. int rv = event_del(e); delete e; + pump_ = NULL; + watcher_ = NULL; return (rv == 0); } +void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking( + int fd, MessagePumpLibevent* pump) { + pump->WillProcessIOEvent(); + watcher_->OnFileCanReadWithoutBlocking(fd); + pump->DidProcessIOEvent(); +} + +void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking( + int fd, MessagePumpLibevent* pump) { + pump->WillProcessIOEvent(); + watcher_->OnFileCanWriteWithoutBlocking(fd); + pump->DidProcessIOEvent(); +} + // Called if a byte is received on the wakeup pipe. void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { base::MessagePumpLibevent* that = @@ -142,9 +161,9 @@ MessagePumpLibevent::~MessagePumpLibevent() { event_del(wakeup_event_); delete wakeup_event_; if (wakeup_pipe_in_ >= 0) - close(wakeup_pipe_in_); + HANDLE_EINTR(close(wakeup_pipe_in_)); if (wakeup_pipe_out_ >= 0) - close(wakeup_pipe_out_); + HANDLE_EINTR(close(wakeup_pipe_out_)); event_base_free(event_base_); } @@ -190,7 +209,7 @@ bool MessagePumpLibevent::WatchFileDescriptor(int fd, } // Set current interest mask and message pump for this event. - event_set(evt.get(), fd, event_mask, OnLibeventNotification, delegate); + event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller); // Tell libevent which message pump this socket will belong to when we add it. if (event_base_set(event_base_, evt.get()) != 0) { @@ -204,19 +223,25 @@ bool MessagePumpLibevent::WatchFileDescriptor(int fd, // Transfer ownership of evt to controller. controller->Init(evt.release(), persistent); + + controller->set_watcher(delegate); + controller->set_pump(this); + return true; } - void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, void* context) { - Watcher* watcher = static_cast<Watcher*>(context); + FileDescriptorWatcher* controller = + static_cast<FileDescriptorWatcher*>(context); + + MessagePumpLibevent* pump = controller->pump(); if (flags & EV_WRITE) { - watcher->OnFileCanWriteWithoutBlocking(fd); + controller->OnFileCanWriteWithoutBlocking(fd, pump); } if (flags & EV_READ) { - watcher->OnFileCanReadWithoutBlocking(fd); + controller->OnFileCanReadWithoutBlocking(fd, pump); } } @@ -304,4 +329,20 @@ void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) { delayed_work_time_ = delayed_work_time; } +void MessagePumpLibevent::AddIOObserver(IOObserver *obs) { + io_observers_.AddObserver(obs); +} + +void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) { + io_observers_.RemoveObserver(obs); +} + +void MessagePumpLibevent::WillProcessIOEvent() { + FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent()); +} + +void MessagePumpLibevent::DidProcessIOEvent() { + FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent()); +} + } // namespace base |