diff options
Diffstat (limited to 'base/message_pump_libevent.cc')
-rw-r--r-- | base/message_pump_libevent.cc | 134 |
1 files changed, 77 insertions, 57 deletions
diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc index a281980..9c13110 100644 --- a/base/message_pump_libevent.cc +++ b/base/message_pump_libevent.cc @@ -15,17 +15,48 @@ namespace base { // Return 0 on success // Too small a function to bother putting in a library? -static int SetNonBlocking(int fd) -{ - int flags = fcntl(fd, F_GETFL, 0); - if (-1 == flags) - flags = 0; - return fcntl(fd, F_SETFL, flags | O_NONBLOCK); +static int SetNonBlocking(int fd) { + int flags = fcntl(fd, F_GETFL, 0); + if (flags == -1) + flags = 0; + return fcntl(fd, F_SETFL, flags | O_NONBLOCK); +} + +MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() + : is_persistent_(false), + event_(NULL) { +} + +MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { + if (event_.get()) { + StopWatchingFileDescriptor(); + } +} + +void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e, + bool is_persistent) { + DCHECK(e); + + // Cleanup any old event we might have been watching. + if (event_.get()) { + StopWatchingFileDescriptor(); + } + + is_persistent = is_persistent_; + event_.reset(e); +} + +bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { + if (event_.get() == NULL) { + return true; + } + + // event_del() is a no-op of the event isn't active. + return (event_del(event_.get()) == 0); } // Called if a byte is received on the wakeup pipe. void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { - base::MessagePumpLibevent* that = static_cast<base::MessagePumpLibevent*>(context); DCHECK(that->wakeup_pipe_out_ == socket); @@ -61,7 +92,7 @@ bool MessagePumpLibevent::Init() { wakeup_event_ = new event; event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, - OnWakeup, this); + OnWakeup, this); event_base_set(event_base_, wakeup_event_); if (event_add(wakeup_event_, 0)) @@ -77,67 +108,56 @@ MessagePumpLibevent::~MessagePumpLibevent() { event_base_free(event_base_); } -void MessagePumpLibevent::WatchSocket(int socket, short interest_mask, - event* e, Watcher* watcher) { +bool MessagePumpLibevent::WatchFileDescriptor(int fd, + bool persistent, + Mode mode, + FileDescriptorWatcher *controller, + Watcher *delegate) { + DCHECK(fd > 0); + DCHECK(controller); + DCHECK(delegate); + DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE); - // Set current interest mask and message pump for this event - event_set(e, socket, interest_mask, OnReadinessNotification, watcher); + int event_mask = persistent ? EV_PERSIST : 0; + if (mode == WATCH_READ || mode == WATCH_READ_WRITE) { + event_mask |= EV_READ; + } + if (mode == WATCH_WRITE || mode == WATCH_READ_WRITE) { + event_mask |= EV_WRITE; + } + + // Ownership is transferred to the controller. + scoped_ptr<event> evt(new event); + // Set current interest mask and message pump for this event. + event_set(evt.get(), fd, event_mask | EV_READ, OnLibeventNotification, + delegate); // Tell libevent which message pump this socket will belong to when we add it. - event_base_set(event_base_, e); + if (event_base_set(event_base_, evt.get()) != 0) { + return false; + } // Add this socket to the list of monitored sockets. - if (event_add(e, NULL)) - NOTREACHED(); -} - -void MessagePumpLibevent::WatchFileHandle(int fd, short interest_mask, - event* e, FileWatcher* watcher) { - // Set current interest mask and message pump for this event - if ((interest_mask & EV_READ) != 0) { - event_set(e, fd, interest_mask, OnFileReadReadinessNotification, watcher); - } else { - event_set(e, fd, interest_mask, OnFileWriteReadinessNotification, watcher); + if (event_add(evt.get(), NULL) != 0) { + return false; } - // Tell libevent which message pump this fd will belong to when we add it. - event_base_set(event_base_, e); - - // Add this fd to the list of monitored sockets. - if (event_add(e, NULL)) - NOTREACHED(); -} - -void MessagePumpLibevent::UnwatchSocket(event* e) { - // Remove this socket from the list of monitored sockets. - if (event_del(e)) - NOTREACHED(); + // Transfer ownership of e to controller. + controller->Init(evt.release(), persistent); + return true; } -void MessagePumpLibevent::UnwatchFileHandle(event* e) { - // Remove this fd from the list of monitored fds. - if (event_del(e)) - NOTREACHED(); -} -void MessagePumpLibevent::OnReadinessNotification(int socket, short flags, - void* context) { - // The given socket is ready for I/O. - // Tell the owner what kind of I/O the socket is ready for. +void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, + void* context) { Watcher* watcher = static_cast<Watcher*>(context); - watcher->OnSocketReady(flags); -} -void MessagePumpLibevent::OnFileReadReadinessNotification(int fd, short flags, - void* context) { - FileWatcher* watcher = static_cast<FileWatcher*>(context); - watcher->OnFileReadReady(fd); -} - -void MessagePumpLibevent::OnFileWriteReadinessNotification(int fd, short flags, - void* context) { - FileWatcher* watcher = static_cast<FileWatcher*>(context); - watcher->OnFileWriteReady(fd); + if (flags & EV_WRITE) { + watcher->OnFileCanWriteWithoutBlocking(fd); + } + if (flags & EV_READ) { + watcher->OnFileCanReadWithoutBlocking(fd); + } } // Reentrant! |