summaryrefslogtreecommitdiffstats
path: root/base/message_pump_libevent.cc
diff options
context:
space:
mode:
authordkegel@google.com <dkegel@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2008-09-18 18:46:26 +0000
committerdkegel@google.com <dkegel@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2008-09-18 18:46:26 +0000
commit36987e9fae95a51a980b4f0d71ca2df1a630407e (patch)
tree8d77b940020eb84b1926b41c8e5cec964299038c /base/message_pump_libevent.cc
parent5a22409d7b625fa1f2a03194ff6c011f82f150dc (diff)
downloadchromium_src-36987e9fae95a51a980b4f0d71ca2df1a630407e.zip
chromium_src-36987e9fae95a51a980b4f0d71ca2df1a630407e.tar.gz
chromium_src-36987e9fae95a51a980b4f0d71ca2df1a630407e.tar.bz2
Use libevent, second try. Changes this time:
- remove bogus include of base/completion_callback.h - add DEPS rules to allow including third_party/libevent Review URL: http://codereview.chromium.org/2964 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@2371 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base/message_pump_libevent.cc')
-rw-r--r--base/message_pump_libevent.cc179
1 files changed, 179 insertions, 0 deletions
diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc
new file mode 100644
index 0000000..38e7db6
--- /dev/null
+++ b/base/message_pump_libevent.cc
@@ -0,0 +1,179 @@
+// Copyright (c) 2008 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/message_pump_libevent.h"
+
+#include "base/logging.h"
+#include "base/time.h"
+#include "third_party/libevent/event.h"
+
+#include <fcntl.h>
+
+namespace base {
+
+// Return 0 on success
+// Too small a function to bother putting in a library?
+static int SetNonBlocking(int fd)
+{
+ int flags = fcntl(fd, F_GETFL, 0);
+ if (-1 == flags)
+ flags = 0;
+ return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+}
+
+// Called if a byte is received on the wakeup pipe.
+void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
+
+ base::MessagePumpLibevent* that =
+ static_cast<base::MessagePumpLibevent*>(context);
+ DCHECK(that->wakeup_pipe_out_ == socket);
+
+ // Remove and discard the wakeup byte.
+ char buf;
+ int nread = read(socket, &buf, 1);
+ DCHECK(nread == 1);
+ // Tell libevent to break out of inner loop.
+ event_base_loopbreak(that->event_base_);
+}
+
+MessagePumpLibevent::MessagePumpLibevent()
+ : keep_running_(true),
+ in_run_(false),
+ event_base_(event_base_new()),
+ wakeup_pipe_in_(-1),
+ wakeup_pipe_out_(-1) {
+ if (!Init())
+ NOTREACHED();
+}
+
+bool MessagePumpLibevent::Init() {
+ int fds[2];
+ if (pipe(fds))
+ return false;
+ if (SetNonBlocking(fds[0]))
+ return false;
+ if (SetNonBlocking(fds[1]))
+ return false;
+ wakeup_pipe_out_ = fds[0];
+ wakeup_pipe_in_ = fds[1];
+
+ wakeup_event_ = new event;
+ event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
+ OnWakeup, this);
+ event_base_set(event_base_, wakeup_event_);
+
+ if (event_add(wakeup_event_, 0))
+ return false;
+ return true;
+}
+
+MessagePumpLibevent::~MessagePumpLibevent() {
+ DCHECK(wakeup_event_);
+ DCHECK(event_base_);
+ event_del(wakeup_event_);
+ delete wakeup_event_;
+ event_base_free(event_base_);
+}
+
+void MessagePumpLibevent::WatchSocket(int socket, short interest_mask,
+ event* e, Watcher* watcher) {
+
+ // Set current interest mask and message pump for this event
+ event_set(e, socket, interest_mask, OnReadinessNotification, watcher);
+
+ // Tell libevent which message pump this socket will belong to when we add it.
+ event_base_set(event_base_, e);
+
+ // Add this socket to the list of monitored sockets.
+ if (event_add(e, NULL))
+ NOTREACHED();
+}
+
+void MessagePumpLibevent::UnwatchSocket(event* e) {
+ // Remove this socket from the list of monitored sockets.
+ if (event_del(e))
+ NOTREACHED();
+}
+
+void MessagePumpLibevent::OnReadinessNotification(int socket, short flags,
+ void* context) {
+ // The given socket is ready for I/O.
+ // Tell the owner what kind of I/O the socket is ready for.
+ Watcher* watcher = static_cast<Watcher*>(context);
+ watcher->OnSocketReady(flags);
+}
+
+// Reentrant!
+void MessagePumpLibevent::Run(Delegate* delegate) {
+ DCHECK(keep_running_) << "Quit must have been called outside of Run!";
+
+ bool old_in_run = in_run_;
+ in_run_ = true;
+
+ for (;;) {
+ bool did_work = delegate->DoWork();
+ if (!keep_running_)
+ break;
+
+ did_work |= delegate->DoDelayedWork(&delayed_work_time_);
+ if (!keep_running_)
+ break;
+
+ if (did_work)
+ continue;
+
+ did_work = delegate->DoIdleWork();
+ if (!keep_running_)
+ break;
+
+ if (did_work)
+ continue;
+
+ // EVLOOP_ONCE tells libevent to only block once,
+ // but to service all pending events when it wakes up.
+ if (delayed_work_time_.is_null()) {
+ event_base_loop(event_base_, EVLOOP_ONCE);
+ } else {
+ TimeDelta delay = delayed_work_time_ - Time::Now();
+ if (delay > TimeDelta()) {
+ struct timeval poll_tv;
+ poll_tv.tv_sec = delay.InSeconds();
+ poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
+ event_base_loopexit(event_base_, &poll_tv);
+ event_base_loop(event_base_, EVLOOP_ONCE);
+ } else {
+ // It looks like delayed_work_time_ indicates a time in the past, so we
+ // need to call DoDelayedWork now.
+ delayed_work_time_ = Time();
+ }
+ }
+ }
+
+ keep_running_ = true;
+ in_run_ = old_in_run;
+}
+
+void MessagePumpLibevent::Quit() {
+ DCHECK(in_run_);
+ // Tell both libevent and Run that they should break out of their loops.
+ keep_running_ = false;
+ ScheduleWork();
+}
+
+void MessagePumpLibevent::ScheduleWork() {
+ // Tell libevent (in a threadsafe way) that it should break out of its loop.
+ char buf = 0;
+ int nwrite = write(wakeup_pipe_in_, &buf, 1);
+ DCHECK(nwrite == 1);
+}
+
+void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) {
+ // We know that we can't be blocked on Wait right now since this method can
+ // only be called on the same thread as Run, so we only need to update our
+ // record of how long to sleep when we do sleep.
+ delayed_work_time_ = delayed_work_time;
+}
+
+} // namespace base
+