summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--base/message_loop.cc22
-rw-r--r--base/message_loop.h3
-rw-r--r--base/message_pump_libevent.cc43
-rw-r--r--base/message_pump_libevent.h30
4 files changed, 84 insertions, 14 deletions
diff --git a/base/message_loop.cc b/base/message_loop.cc
index 1145439..898fbfa 100644
--- a/base/message_loop.cc
+++ b/base/message_loop.cc
@@ -193,7 +193,7 @@ void MessageLoop::RunInternal() {
return;
}
#endif
-
+
pump_->Run(this);
}
@@ -206,10 +206,10 @@ bool MessageLoop::ProcessNextDelayedNonNestableTask() {
if (deferred_non_nestable_work_queue_.empty())
return false;
-
+
Task* task = deferred_non_nestable_work_queue_.front().task;
deferred_non_nestable_work_queue_.pop();
-
+
RunTask(task);
return true;
}
@@ -420,7 +420,7 @@ bool MessageLoop::DoDelayedWork(Time* next_delayed_work_time) {
*next_delayed_work_time = Time();
return false;
}
-
+
if (delayed_work_queue_.top().delayed_run_time > Time::Now()) {
*next_delayed_work_time = delayed_work_queue_.top().delayed_run_time;
return false;
@@ -428,7 +428,7 @@ bool MessageLoop::DoDelayedWork(Time* next_delayed_work_time) {
PendingTask pending_task = delayed_work_queue_.top();
delayed_work_queue_.pop();
-
+
if (!delayed_work_queue_.empty())
*next_delayed_work_time = delayed_work_queue_.top().delayed_run_time;
@@ -593,12 +593,22 @@ bool MessageLoopForIO::WaitForIOCompletion(DWORD timeout, IOHandler* filter) {
#elif defined(OS_POSIX)
-void MessageLoopForIO::WatchSocket(int socket, short interest_mask,
+void MessageLoopForIO::WatchSocket(int socket, short interest_mask,
struct event* e, Watcher* watcher) {
pump_libevent()->WatchSocket(socket, interest_mask, e, watcher);
}
+void MessageLoopForIO::WatchFileHandle(int fd, short interest_mask,
+ struct event* e, FileWatcher* watcher) {
+ pump_libevent()->WatchFileHandle(fd, interest_mask, e, watcher);
+}
+
+
void MessageLoopForIO::UnwatchSocket(struct event* e) {
pump_libevent()->UnwatchSocket(e);
}
+
+void MessageLoopForIO::UnwatchFileHandle(struct event* e) {
+ pump_libevent()->UnwatchFileHandle(e);
+}
#endif
diff --git a/base/message_loop.h b/base/message_loop.h
index 8f16d8e..69db8c1 100644
--- a/base/message_loop.h
+++ b/base/message_loop.h
@@ -473,11 +473,14 @@ class MessageLoopForIO : public MessageLoop {
#elif defined(OS_POSIX)
typedef base::MessagePumpLibevent::Watcher Watcher;
+ typedef base::MessagePumpLibevent::FileWatcher FileWatcher;
// Please see MessagePumpLibevent for definitions of these methods.
void WatchSocket(int socket, short interest_mask,
struct event* e, Watcher* watcher);
+ void WatchFileHandle(int fd, short interest_mask, event* e, FileWatcher*);
void UnwatchSocket(struct event* e);
+ void UnwatchFileHandle(event* e);
#endif // defined(OS_POSIX)
};
diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc
index 0dd533e..a281980 100644
--- a/base/message_pump_libevent.cc
+++ b/base/message_pump_libevent.cc
@@ -26,7 +26,7 @@ static int SetNonBlocking(int fd)
// Called if a byte is received on the wakeup pipe.
void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
- base::MessagePumpLibevent* that =
+ base::MessagePumpLibevent* that =
static_cast<base::MessagePumpLibevent*>(context);
DCHECK(that->wakeup_pipe_out_ == socket);
@@ -60,7 +60,7 @@ bool MessagePumpLibevent::Init() {
wakeup_pipe_in_ = fds[1];
wakeup_event_ = new event;
- event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
+ event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
OnWakeup, this);
event_base_set(event_base_, wakeup_event_);
@@ -77,7 +77,7 @@ MessagePumpLibevent::~MessagePumpLibevent() {
event_base_free(event_base_);
}
-void MessagePumpLibevent::WatchSocket(int socket, short interest_mask,
+void MessagePumpLibevent::WatchSocket(int socket, short interest_mask,
event* e, Watcher* watcher) {
// Set current interest mask and message pump for this event
@@ -91,13 +91,36 @@ void MessagePumpLibevent::WatchSocket(int socket, short interest_mask,
NOTREACHED();
}
+void MessagePumpLibevent::WatchFileHandle(int fd, short interest_mask,
+ event* e, FileWatcher* watcher) {
+ // Set current interest mask and message pump for this event
+ if ((interest_mask & EV_READ) != 0) {
+ event_set(e, fd, interest_mask, OnFileReadReadinessNotification, watcher);
+ } else {
+ event_set(e, fd, interest_mask, OnFileWriteReadinessNotification, watcher);
+ }
+
+ // Tell libevent which message pump this fd will belong to when we add it.
+ event_base_set(event_base_, e);
+
+ // Add this fd to the list of monitored sockets.
+ if (event_add(e, NULL))
+ NOTREACHED();
+}
+
void MessagePumpLibevent::UnwatchSocket(event* e) {
// Remove this socket from the list of monitored sockets.
if (event_del(e))
NOTREACHED();
}
-void MessagePumpLibevent::OnReadinessNotification(int socket, short flags,
+void MessagePumpLibevent::UnwatchFileHandle(event* e) {
+ // Remove this fd from the list of monitored fds.
+ if (event_del(e))
+ NOTREACHED();
+}
+
+void MessagePumpLibevent::OnReadinessNotification(int socket, short flags,
void* context) {
// The given socket is ready for I/O.
// Tell the owner what kind of I/O the socket is ready for.
@@ -105,6 +128,18 @@ void MessagePumpLibevent::OnReadinessNotification(int socket, short flags,
watcher->OnSocketReady(flags);
}
+void MessagePumpLibevent::OnFileReadReadinessNotification(int fd, short flags,
+ void* context) {
+ FileWatcher* watcher = static_cast<FileWatcher*>(context);
+ watcher->OnFileReadReady(fd);
+}
+
+void MessagePumpLibevent::OnFileWriteReadinessNotification(int fd, short flags,
+ void* context) {
+ FileWatcher* watcher = static_cast<FileWatcher*>(context);
+ watcher->OnFileWriteReady(fd);
+}
+
// Reentrant!
void MessagePumpLibevent::Run(Delegate* delegate) {
DCHECK(keep_running_) << "Quit must have been called outside of Run!";
diff --git a/base/message_pump_libevent.h b/base/message_pump_libevent.h
index 9934f31..b54bc36 100644
--- a/base/message_pump_libevent.h
+++ b/base/message_pump_libevent.h
@@ -18,7 +18,7 @@ namespace base {
// TODO(dkegel): add support for background file IO somehow
class MessagePumpLibevent : public MessagePump {
public:
- // Used with WatchObject to asynchronously monitor the I/O readiness of a
+ // Used with WatchSocket to asynchronously monitor the I/O readiness of a
// socket.
class Watcher {
public:
@@ -27,22 +27,38 @@ class MessagePumpLibevent : public MessagePump {
virtual void OnSocketReady(short eventmask) = 0;
};
+ // Used with WatchFileHandle to monitor I/O readiness for a File Handle.
+ class FileWatcher {
+ public:
+ virtual ~FileWatcher() {}
+ // Called from MessageLoop::Run when a non-blocking read/write can be made.
+ virtual void OnFileReadReady(int fd) = 0;
+ virtual void OnFileWriteReady(int fd) = 0;
+ };
+
MessagePumpLibevent();
virtual ~MessagePumpLibevent();
// Have the current thread's message loop watch for a ready socket.
// Caller must provide a struct event for this socket for libevent's use.
// The event and interest_mask fields are defined in libevent.
- // Returns true on success.
+ // Returns true on success.
// TODO(dkegel): hide libevent better; abstraction still too leaky
// TODO(dkegel): better error handing
// TODO(dkegel): switch to edge-triggered readiness notification
void WatchSocket(int socket, short interest_mask, event* e, Watcher*);
+ // TODO(playmobil): Merge this with WatchSocket().
+ void WatchFileHandle(int fd, short interest_mask, event* e, FileWatcher*);
+
// Stop watching a socket.
// Event was previously initialized by WatchSocket.
void UnwatchSocket(event* e);
+ // Stop watching a File Handle.
+ // Event was previously initialized by WatchFileHandle.
+ void UnwatchFileHandle(event* e);
+
// MessagePump methods:
virtual void Run(Delegate* delegate);
virtual void Quit();
@@ -63,17 +79,23 @@ class MessagePumpLibevent : public MessagePump {
// The time at which we should call DoDelayedWork.
Time delayed_work_time_;
- // Libevent dispatcher. Watches all sockets registered with it, and sends
+ // Libevent dispatcher. Watches all sockets registered with it, and sends
// readiness callbacks when a socket is ready for I/O.
event_base* event_base_;
// Called by libevent to tell us a registered socket is ready
static void OnReadinessNotification(int socket, short flags, void* context);
+ // Called by libevent to tell us a registered fd is ready.
+ static void OnFileReadReadinessNotification(int fd, short flags,
+ void* context);
+ static void OnFileWriteReadinessNotification(int fd, short flags,
+ void* context);
+
// Unix pipe used to implement ScheduleWork()
// ... callback; called by libevent inside Run() when pipe is ready to read
static void OnWakeup(int socket, short flags, void* context);
- // ... write end; ScheduleWork() writes a single byte to it
+ // ... write end; ScheduleWork() writes a single byte to it
int wakeup_pipe_in_;
// ... read end; OnWakeup reads it and then breaks Run() out of its sleep
int wakeup_pipe_out_;