summaryrefslogtreecommitdiffstats
path: root/base/message_loop/message_pump_libevent.cc
diff options
context:
space:
mode:
Diffstat (limited to 'base/message_loop/message_pump_libevent.cc')
-rw-r--r--base/message_loop/message_pump_libevent.cc375
1 files changed, 375 insertions, 0 deletions
diff --git a/base/message_loop/message_pump_libevent.cc b/base/message_loop/message_pump_libevent.cc
new file mode 100644
index 0000000..8de0db2
--- /dev/null
+++ b/base/message_loop/message_pump_libevent.cc
@@ -0,0 +1,375 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/message_loop/message_pump_libevent.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "base/auto_reset.h"
+#include "base/compiler_specific.h"
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/observer_list.h"
+#include "base/posix/eintr_wrapper.h"
+#include "base/time.h"
+#include "third_party/libevent/event.h"
+
+#if defined(OS_MACOSX)
+#include "base/mac/scoped_nsautorelease_pool.h"
+#endif
+
+// Lifecycle of struct event
+// Libevent uses two main data structures:
+// struct event_base (of which there is one per message pump), and
+// struct event (of which there is roughly one per socket).
+// The socket's struct event is created in
+// MessagePumpLibevent::WatchFileDescriptor(),
+// is owned by the FileDescriptorWatcher, and is destroyed in
+// StopWatchingFileDescriptor().
+// It is moved into and out of lists in struct event_base by
+// the libevent functions event_add() and event_del().
+//
+// TODO(dkegel):
+// At the moment bad things happen if a FileDescriptorWatcher
+// is active after its MessagePumpLibevent has been destroyed.
+// See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
+// Not clear yet whether that situation occurs in practice,
+// but if it does, we need to fix it.
+
+namespace base {
+
+// Return 0 on success
+// Too small a function to bother putting in a library?
+static int SetNonBlocking(int fd) {
+ int flags = fcntl(fd, F_GETFL, 0);
+ if (flags == -1)
+ flags = 0;
+ return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+}
+
+MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
+ : event_(NULL),
+ pump_(NULL),
+ watcher_(NULL),
+ weak_factory_(this) {
+}
+
+MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
+ if (event_) {
+ StopWatchingFileDescriptor();
+ }
+}
+
+bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
+ event* e = ReleaseEvent();
+ if (e == NULL)
+ return true;
+
+ // event_del() is a no-op if the event isn't active.
+ int rv = event_del(e);
+ delete e;
+ pump_ = NULL;
+ watcher_ = NULL;
+ return (rv == 0);
+}
+
+void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e) {
+ DCHECK(e);
+ DCHECK(!event_);
+
+ event_ = e;
+}
+
+event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
+ struct event *e = event_;
+ event_ = NULL;
+ return e;
+}
+
+void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking(
+ int fd, MessagePumpLibevent* pump) {
+ // Since OnFileCanWriteWithoutBlocking() gets called first, it can stop
+ // watching the file descriptor.
+ if (!watcher_)
+ return;
+ pump->WillProcessIOEvent();
+ watcher_->OnFileCanReadWithoutBlocking(fd);
+ pump->DidProcessIOEvent();
+}
+
+void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking(
+ int fd, MessagePumpLibevent* pump) {
+ DCHECK(watcher_);
+ pump->WillProcessIOEvent();
+ watcher_->OnFileCanWriteWithoutBlocking(fd);
+ pump->DidProcessIOEvent();
+}
+
+MessagePumpLibevent::MessagePumpLibevent()
+ : keep_running_(true),
+ in_run_(false),
+ processed_io_events_(false),
+ event_base_(event_base_new()),
+ wakeup_pipe_in_(-1),
+ wakeup_pipe_out_(-1) {
+ if (!Init())
+ NOTREACHED();
+}
+
+MessagePumpLibevent::~MessagePumpLibevent() {
+ DCHECK(wakeup_event_);
+ DCHECK(event_base_);
+ event_del(wakeup_event_);
+ delete wakeup_event_;
+ if (wakeup_pipe_in_ >= 0) {
+ if (HANDLE_EINTR(close(wakeup_pipe_in_)) < 0)
+ DPLOG(ERROR) << "close";
+ }
+ if (wakeup_pipe_out_ >= 0) {
+ if (HANDLE_EINTR(close(wakeup_pipe_out_)) < 0)
+ DPLOG(ERROR) << "close";
+ }
+ event_base_free(event_base_);
+}
+
+bool MessagePumpLibevent::WatchFileDescriptor(int fd,
+ bool persistent,
+ int mode,
+ FileDescriptorWatcher *controller,
+ Watcher *delegate) {
+ DCHECK_GE(fd, 0);
+ DCHECK(controller);
+ DCHECK(delegate);
+ DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
+ // WatchFileDescriptor should be called on the pump thread. It is not
+ // threadsafe, and your watcher may never be registered.
+ DCHECK(watch_file_descriptor_caller_checker_.CalledOnValidThread());
+
+ int event_mask = persistent ? EV_PERSIST : 0;
+ if (mode & WATCH_READ) {
+ event_mask |= EV_READ;
+ }
+ if (mode & WATCH_WRITE) {
+ event_mask |= EV_WRITE;
+ }
+
+ scoped_ptr<event> evt(controller->ReleaseEvent());
+ if (evt.get() == NULL) {
+ // Ownership is transferred to the controller.
+ evt.reset(new event);
+ } else {
+ // Make sure we don't pick up any funky internal libevent masks.
+ int old_interest_mask = evt.get()->ev_events &
+ (EV_READ | EV_WRITE | EV_PERSIST);
+
+ // Combine old/new event masks.
+ event_mask |= old_interest_mask;
+
+ // Must disarm the event before we can reuse it.
+ event_del(evt.get());
+
+ // It's illegal to use this function to listen on 2 separate fds with the
+ // same |controller|.
+ if (EVENT_FD(evt.get()) != fd) {
+ NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
+ return false;
+ }
+ }
+
+ // Set current interest mask and message pump for this event.
+ event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
+
+ // Tell libevent which message pump this socket will belong to when we add it.
+ if (event_base_set(event_base_, evt.get())) {
+ return false;
+ }
+
+ // Add this socket to the list of monitored sockets.
+ if (event_add(evt.get(), NULL)) {
+ return false;
+ }
+
+ // Transfer ownership of evt to controller.
+ controller->Init(evt.release());
+
+ controller->set_watcher(delegate);
+ controller->set_pump(this);
+
+ return true;
+}
+
+void MessagePumpLibevent::AddIOObserver(IOObserver *obs) {
+ io_observers_.AddObserver(obs);
+}
+
+void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) {
+ io_observers_.RemoveObserver(obs);
+}
+
+// Tell libevent to break out of inner loop.
+static void timer_callback(int fd, short events, void *context)
+{
+ event_base_loopbreak((struct event_base *)context);
+}
+
+// Reentrant!
+void MessagePumpLibevent::Run(Delegate* delegate) {
+ DCHECK(keep_running_) << "Quit must have been called outside of Run!";
+ AutoReset<bool> auto_reset_in_run(&in_run_, true);
+
+ // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.
+ // Instead, make our own timer and reuse it on each call to event_base_loop().
+ scoped_ptr<event> timer_event(new event);
+
+ for (;;) {
+#if defined(OS_MACOSX)
+ mac::ScopedNSAutoreleasePool autorelease_pool;
+#endif
+
+ bool did_work = delegate->DoWork();
+ if (!keep_running_)
+ break;
+
+ event_base_loop(event_base_, EVLOOP_NONBLOCK);
+ did_work |= processed_io_events_;
+ processed_io_events_ = false;
+ if (!keep_running_)
+ break;
+
+ did_work |= delegate->DoDelayedWork(&delayed_work_time_);
+ if (!keep_running_)
+ break;
+
+ if (did_work)
+ continue;
+
+ did_work = delegate->DoIdleWork();
+ if (!keep_running_)
+ break;
+
+ if (did_work)
+ continue;
+
+ // EVLOOP_ONCE tells libevent to only block once,
+ // but to service all pending events when it wakes up.
+ if (delayed_work_time_.is_null()) {
+ event_base_loop(event_base_, EVLOOP_ONCE);
+ } else {
+ TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
+ if (delay > TimeDelta()) {
+ struct timeval poll_tv;
+ poll_tv.tv_sec = delay.InSeconds();
+ poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
+ event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
+ event_base_set(event_base_, timer_event.get());
+ event_add(timer_event.get(), &poll_tv);
+ event_base_loop(event_base_, EVLOOP_ONCE);
+ event_del(timer_event.get());
+ } else {
+ // It looks like delayed_work_time_ indicates a time in the past, so we
+ // need to call DoDelayedWork now.
+ delayed_work_time_ = TimeTicks();
+ }
+ }
+ }
+
+ keep_running_ = true;
+}
+
+void MessagePumpLibevent::Quit() {
+ DCHECK(in_run_);
+ // Tell both libevent and Run that they should break out of their loops.
+ keep_running_ = false;
+ ScheduleWork();
+}
+
+void MessagePumpLibevent::ScheduleWork() {
+ // Tell libevent (in a threadsafe way) that it should break out of its loop.
+ char buf = 0;
+ int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
+ DCHECK(nwrite == 1 || errno == EAGAIN)
+ << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
+}
+
+void MessagePumpLibevent::ScheduleDelayedWork(
+ const TimeTicks& delayed_work_time) {
+ // We know that we can't be blocked on Wait right now since this method can
+ // only be called on the same thread as Run, so we only need to update our
+ // record of how long to sleep when we do sleep.
+ delayed_work_time_ = delayed_work_time;
+}
+
+void MessagePumpLibevent::WillProcessIOEvent() {
+ FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent());
+}
+
+void MessagePumpLibevent::DidProcessIOEvent() {
+ FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent());
+}
+
+bool MessagePumpLibevent::Init() {
+ int fds[2];
+ if (pipe(fds)) {
+ DLOG(ERROR) << "pipe() failed, errno: " << errno;
+ return false;
+ }
+ if (SetNonBlocking(fds[0])) {
+ DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
+ return false;
+ }
+ if (SetNonBlocking(fds[1])) {
+ DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
+ return false;
+ }
+ wakeup_pipe_out_ = fds[0];
+ wakeup_pipe_in_ = fds[1];
+
+ wakeup_event_ = new event;
+ event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
+ OnWakeup, this);
+ event_base_set(event_base_, wakeup_event_);
+
+ if (event_add(wakeup_event_, 0))
+ return false;
+ return true;
+}
+
+// static
+void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
+ void* context) {
+ WeakPtr<FileDescriptorWatcher> controller =
+ static_cast<FileDescriptorWatcher*>(context)->weak_factory_.GetWeakPtr();
+ DCHECK(controller.get());
+
+ MessagePumpLibevent* pump = controller->pump();
+ pump->processed_io_events_ = true;
+
+ if (flags & EV_WRITE) {
+ controller->OnFileCanWriteWithoutBlocking(fd, pump);
+ }
+ // Check |controller| in case it's been deleted in
+ // controller->OnFileCanWriteWithoutBlocking().
+ if (controller.get() && flags & EV_READ) {
+ controller->OnFileCanReadWithoutBlocking(fd, pump);
+ }
+}
+
+// Called if a byte is received on the wakeup pipe.
+// static
+void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
+ MessagePumpLibevent* that = static_cast<MessagePumpLibevent*>(context);
+ DCHECK(that->wakeup_pipe_out_ == socket);
+
+ // Remove and discard the wakeup byte.
+ char buf;
+ int nread = HANDLE_EINTR(read(socket, &buf, 1));
+ DCHECK_EQ(nread, 1);
+ that->processed_io_events_ = true;
+ // Tell libevent to break out of inner loop.
+ event_base_loopbreak(that->event_base_);
+}
+
+} // namespace base