diff options
author | dkegel@google.com <dkegel@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-09-18 00:10:38 +0000 |
---|---|---|
committer | dkegel@google.com <dkegel@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-09-18 00:10:38 +0000 |
commit | dab8619b4848bb10dfe78a3f0c59d6e661ec9610 (patch) | |
tree | 187818b2b685f5545832b45bbd20d01b53aca6d5 /base/message_pump_libevent.cc | |
parent | 0fe41aecdbfd97633bda1a892b2f3efb60f1d3ed (diff) | |
download | chromium_src-dab8619b4848bb10dfe78a3f0c59d6e661ec9610.zip chromium_src-dab8619b4848bb10dfe78a3f0c59d6e661ec9610.tar.gz chromium_src-dab8619b4848bb10dfe78a3f0c59d6e661ec9610.tar.bz2 |
Make tcp_client_socket_unittest pass on Linux.
Requires another changeset that puts libevent in third_party;
I'll upload that next.
This is not the final word; it makes too many syscalls
per read. But it's a start.
Review URL: http://codereview.chromium.org/3202
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@2346 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'base/message_pump_libevent.cc')
-rw-r--r-- | base/message_pump_libevent.cc | 179 |
1 files changed, 179 insertions, 0 deletions
diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc new file mode 100644 index 0000000..38e7db6 --- /dev/null +++ b/base/message_pump_libevent.cc @@ -0,0 +1,179 @@ +// Copyright (c) 2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/message_pump_libevent.h" + +#include "base/logging.h" +#include "base/time.h" +#include "third_party/libevent/event.h" + +#include <fcntl.h> + +namespace base { + +// Return 0 on success +// Too small a function to bother putting in a library? +static int SetNonBlocking(int fd) +{ + int flags = fcntl(fd, F_GETFL, 0); + if (-1 == flags) + flags = 0; + return fcntl(fd, F_SETFL, flags | O_NONBLOCK); +} + +// Called if a byte is received on the wakeup pipe. +void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { + + base::MessagePumpLibevent* that = + static_cast<base::MessagePumpLibevent*>(context); + DCHECK(that->wakeup_pipe_out_ == socket); + + // Remove and discard the wakeup byte. + char buf; + int nread = read(socket, &buf, 1); + DCHECK(nread == 1); + // Tell libevent to break out of inner loop. + event_base_loopbreak(that->event_base_); +} + +MessagePumpLibevent::MessagePumpLibevent() + : keep_running_(true), + in_run_(false), + event_base_(event_base_new()), + wakeup_pipe_in_(-1), + wakeup_pipe_out_(-1) { + if (!Init()) + NOTREACHED(); +} + +bool MessagePumpLibevent::Init() { + int fds[2]; + if (pipe(fds)) + return false; + if (SetNonBlocking(fds[0])) + return false; + if (SetNonBlocking(fds[1])) + return false; + wakeup_pipe_out_ = fds[0]; + wakeup_pipe_in_ = fds[1]; + + wakeup_event_ = new event; + event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, + OnWakeup, this); + event_base_set(event_base_, wakeup_event_); + + if (event_add(wakeup_event_, 0)) + return false; + return true; +} + +MessagePumpLibevent::~MessagePumpLibevent() { + DCHECK(wakeup_event_); + DCHECK(event_base_); + event_del(wakeup_event_); + delete wakeup_event_; + event_base_free(event_base_); +} + +void MessagePumpLibevent::WatchSocket(int socket, short interest_mask, + event* e, Watcher* watcher) { + + // Set current interest mask and message pump for this event + event_set(e, socket, interest_mask, OnReadinessNotification, watcher); + + // Tell libevent which message pump this socket will belong to when we add it. + event_base_set(event_base_, e); + + // Add this socket to the list of monitored sockets. + if (event_add(e, NULL)) + NOTREACHED(); +} + +void MessagePumpLibevent::UnwatchSocket(event* e) { + // Remove this socket from the list of monitored sockets. + if (event_del(e)) + NOTREACHED(); +} + +void MessagePumpLibevent::OnReadinessNotification(int socket, short flags, + void* context) { + // The given socket is ready for I/O. + // Tell the owner what kind of I/O the socket is ready for. + Watcher* watcher = static_cast<Watcher*>(context); + watcher->OnSocketReady(flags); +} + +// Reentrant! +void MessagePumpLibevent::Run(Delegate* delegate) { + DCHECK(keep_running_) << "Quit must have been called outside of Run!"; + + bool old_in_run = in_run_; + in_run_ = true; + + for (;;) { + bool did_work = delegate->DoWork(); + if (!keep_running_) + break; + + did_work |= delegate->DoDelayedWork(&delayed_work_time_); + if (!keep_running_) + break; + + if (did_work) + continue; + + did_work = delegate->DoIdleWork(); + if (!keep_running_) + break; + + if (did_work) + continue; + + // EVLOOP_ONCE tells libevent to only block once, + // but to service all pending events when it wakes up. + if (delayed_work_time_.is_null()) { + event_base_loop(event_base_, EVLOOP_ONCE); + } else { + TimeDelta delay = delayed_work_time_ - Time::Now(); + if (delay > TimeDelta()) { + struct timeval poll_tv; + poll_tv.tv_sec = delay.InSeconds(); + poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond; + event_base_loopexit(event_base_, &poll_tv); + event_base_loop(event_base_, EVLOOP_ONCE); + } else { + // It looks like delayed_work_time_ indicates a time in the past, so we + // need to call DoDelayedWork now. + delayed_work_time_ = Time(); + } + } + } + + keep_running_ = true; + in_run_ = old_in_run; +} + +void MessagePumpLibevent::Quit() { + DCHECK(in_run_); + // Tell both libevent and Run that they should break out of their loops. + keep_running_ = false; + ScheduleWork(); +} + +void MessagePumpLibevent::ScheduleWork() { + // Tell libevent (in a threadsafe way) that it should break out of its loop. + char buf = 0; + int nwrite = write(wakeup_pipe_in_, &buf, 1); + DCHECK(nwrite == 1); +} + +void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) { + // We know that we can't be blocked on Wait right now since this method can + // only be called on the same thread as Run, so we only need to update our + // record of how long to sleep when we do sleep. + delayed_work_time_ = delayed_work_time; +} + +} // namespace base + |