summaryrefslogtreecommitdiffstats
path: root/chrome
diff options
context:
space:
mode:
authorjeremy@chromium.org <jeremy@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2008-12-12 19:09:47 +0000
committerjeremy@chromium.org <jeremy@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2008-12-12 19:09:47 +0000
commiteb8c76326fb39a429dc8bda38da22a86bb6802b3 (patch)
tree2155abfd972894d6ca5bfdf11a8fed37da065d4c /chrome
parentb3d64d5c0d00415968935942b00d37704cd7507c (diff)
downloadchromium_src-eb8c76326fb39a429dc8bda38da22a86bb6802b3.zip
chromium_src-eb8c76326fb39a429dc8bda38da22a86bb6802b3.tar.gz
chromium_src-eb8c76326fb39a429dc8bda38da22a86bb6802b3.tar.bz2
message_pump_libevent refactor:
* Unify WatchSocket & WatchFileHandle. * Better encapsulate libevent. * Fix a bug with blocking writes in ipc_posix.cc Review URL: http://codereview.chromium.org/13757 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@6911 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome')
-rw-r--r--chrome/common/ipc_channel_posix.cc109
-rw-r--r--chrome/common/ipc_channel_posix.h29
2 files changed, 54 insertions, 84 deletions
diff --git a/chrome/common/ipc_channel_posix.cc b/chrome/common/ipc_channel_posix.cc
index 9a30f80..74a2407 100644
--- a/chrome/common/ipc_channel_posix.cc
+++ b/chrome/common/ipc_channel_posix.cc
@@ -6,10 +6,16 @@
#include <errno.h>
#include <fcntl.h>
+#include <stddef.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
-#include <stddef.h>
+#if defined(OS_LINUX)
+#include <linux/un.h>
+#elif defined(OS_MACOSX)
+#include <sys/un.h>
+#endif
+
#include "base/logging.h"
#include "base/process_util.h"
@@ -18,12 +24,6 @@
#include "chrome/common/chrome_counters.h"
#include "chrome/common/ipc_message_utils.h"
-#if defined(OS_LINUX)
-#include <linux/un.h>
-#elif defined(OS_MACOSX)
-#include <sys/un.h>
-#endif
-
namespace IPC {
//------------------------------------------------------------------------------
@@ -150,9 +150,7 @@ bool ClientConnectToFifo(const std::string &pipe_name, int* client_socket) {
Channel::ChannelImpl::ChannelImpl(const std::wstring& channel_id, Mode mode,
Listener* listener)
: mode_(mode),
- server_listen_connection_event_(new EventHolder()),
- read_event_(new EventHolder()),
- write_event_(new EventHolder()),
+ is_blocked_on_write_(false),
message_send_bytes_written_(0),
server_listen_pipe_(-1),
pipe_(-1),
@@ -214,21 +212,22 @@ bool Channel::ChannelImpl::Connect() {
if (server_listen_pipe_ == -1) {
return false;
}
- event *ev = &(server_listen_connection_event_->event);
- MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_,
- EV_READ | EV_PERSIST,
- ev,
- this);
- server_listen_connection_event_->is_active = true;
+ MessageLoopForIO::current()->WatchFileDescriptor(
+ server_listen_pipe_,
+ true,
+ MessageLoopForIO::WATCH_READ,
+ &server_listen_connection_watcher_,
+ this);
} else {
if (pipe_ == -1) {
return false;
}
- MessageLoopForIO::current()->WatchFileHandle(pipe_,
- EV_READ | EV_PERSIST,
- &(read_event_->event),
- this);
- read_event_->is_active = true;
+ MessageLoopForIO::current()->WatchFileDescriptor(
+ pipe_,
+ true,
+ MessageLoopForIO::WATCH_READ,
+ &read_watcher_,
+ this);
waiting_connect_ = false;
}
@@ -317,6 +316,7 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() {
bool Channel::ChannelImpl::ProcessOutgoingMessages() {
DCHECK(!waiting_connect_); // Why are we trying to send messages if there's
// no connection?
+ is_blocked_on_write_ = false;
if (output_queue_.empty())
return true;
@@ -324,15 +324,6 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() {
if (pipe_ == -1)
return false;
- // If libevent was monitoring the socket for us (we blocked when trying to
- // write a message last time), then delete the underlying libevent structure.
- if (write_event_->is_active) {
- // TODO(playmobil): This calls event_del(), but we can probably
- // do with just calling event_add here.
- MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event));
- write_event_->is_active = false;
- }
-
// Write out all the messages we can till the write blocks or there are no
// more outgoing messages.
while (!output_queue_.empty()) {
@@ -355,12 +346,13 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() {
message_send_bytes_written_ += bytes_written;
// Tell libevent to call us back once things are unblocked.
- MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_,
- EV_WRITE,
- &(write_event_->event),
- this);
- write_event_->is_active = true;
-
+ is_blocked_on_write_ = true;
+ MessageLoopForIO::current()->WatchFileDescriptor(
+ pipe_,
+ false, // One shot
+ MessageLoopForIO::WATCH_WRITE,
+ &write_watcher_,
+ this);
} else {
message_send_bytes_written_ = 0;
@@ -391,7 +383,7 @@ bool Channel::ChannelImpl::Send(Message* message) {
output_queue_.push(message);
if (!waiting_connect_) {
- if (!write_event_->is_active) {
+ if (!is_blocked_on_write_) {
if (!ProcessOutgoingMessages())
return false;
}
@@ -401,7 +393,7 @@ bool Channel::ChannelImpl::Send(Message* message) {
}
// Called by libevent when we can read from th pipe without blocking.
-void Channel::ChannelImpl::OnFileReadReady(int fd) {
+void Channel::ChannelImpl::OnFileCanReadWithoutBlocking(int fd) {
bool send_server_hello_msg = false;
if (waiting_connect_ && mode_ == MODE_SERVER) {
if (!ServerAcceptFifoConnection(server_listen_pipe_, &pipe_)) {
@@ -410,16 +402,16 @@ void Channel::ChannelImpl::OnFileReadReady(int fd) {
// No need to watch the listening socket any longer since only one client
// can connect. So unregister with libevent.
- event *ev = &(server_listen_connection_event_->event);
- MessageLoopForIO::current()->UnwatchFileHandle(ev);
- server_listen_connection_event_->is_active = false;
+ server_listen_connection_watcher_.StopWatchingFileDescriptor();
// Start watching our end of the socket.
- MessageLoopForIO::current()->WatchFileHandle(pipe_,
- EV_READ | EV_PERSIST,
- &(read_event_->event),
- this);
- read_event_->is_active = true;
+ MessageLoopForIO::current()->WatchFileDescriptor(
+ pipe_,
+ true,
+ MessageLoopForIO::WATCH_READ,
+ &read_watcher_,
+ this);
+
waiting_connect_ = false;
send_server_hello_msg = true;
}
@@ -436,12 +428,14 @@ void Channel::ChannelImpl::OnFileReadReady(int fd) {
// This gives us a chance to kill the client if the incoming handshake
// is invalid.
if (send_server_hello_msg) {
+ // This should be our first write so there' sno chance we can block here...
+ DCHECK(is_blocked_on_write_ == false);
ProcessOutgoingMessages();
}
}
// Called by libevent when we can write to the pipe without blocking.
-void Channel::ChannelImpl::OnFileWriteReady(int fd) {
+void Channel::ChannelImpl::OnFileCanWriteWithoutBlocking(int fd) {
if (!ProcessOutgoingMessages()) {
Close();
listener_->OnChannelError();
@@ -453,11 +447,7 @@ void Channel::ChannelImpl::Close() {
// idempotent.
// Unregister libevent for the listening socket and close it.
- if (server_listen_connection_event_ &&
- server_listen_connection_event_->is_active) {
- MessageLoopForIO::current()->UnwatchFileHandle(
- &(server_listen_connection_event_->event));
- }
+ server_listen_connection_watcher_.StopWatchingFileDescriptor();
if (server_listen_pipe_ != -1) {
close(server_listen_pipe_);
@@ -465,24 +455,13 @@ void Channel::ChannelImpl::Close() {
}
// Unregister libevent for the FIFO and close it.
- if (read_event_ && read_event_->is_active) {
- MessageLoopForIO::current()->UnwatchFileHandle(&(read_event_->event));
- }
- if (write_event_ && write_event_->is_active) {
- MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event));
- }
+ read_watcher_.StopWatchingFileDescriptor();
+ write_watcher_.StopWatchingFileDescriptor();
if (pipe_ != -1) {
close(pipe_);
pipe_ = -1;
}
- delete server_listen_connection_event_;
- server_listen_connection_event_ = NULL;
- delete read_event_;
- read_event_ = NULL;
- delete write_event_;
- write_event_ = NULL;
-
// Unlink the FIFO
unlink(pipe_name_.c_str());
diff --git a/chrome/common/ipc_channel_posix.h b/chrome/common/ipc_channel_posix.h
index b2849dc..9b34494 100644
--- a/chrome/common/ipc_channel_posix.h
+++ b/chrome/common/ipc_channel_posix.h
@@ -11,11 +11,10 @@
#include <string>
#include "base/message_loop.h"
-#include "third_party/libevent/event.h"
namespace IPC {
-class Channel::ChannelImpl : public MessageLoopForIO::FileWatcher {
+class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
public:
// Mirror methods of Channel, see ipc_channel.h for description.
ChannelImpl(const std::wstring& channel_id, Mode mode, Listener* listener);
@@ -31,27 +30,19 @@ class Channel::ChannelImpl : public MessageLoopForIO::FileWatcher {
bool ProcessIncomingMessages();
bool ProcessOutgoingMessages();
- void OnFileReadReady(int fd);
- void OnFileWriteReady(int fd);
+ void OnFileCanReadWithoutBlocking(int fd);
+ void OnFileCanWriteWithoutBlocking(int fd);
Mode mode_;
- // Wrapper for Libevent event.
- // TODO(playmobil): MessageLoopForIO needs to better encapsulate libevent.
- struct EventHolder {
- EventHolder() : is_active(false) {}
- ~EventHolder() {}
+ // After accepting one client connection on our server socket we want to
+ // stop listening.
+ MessageLoopForIO::FileDescriptorWatcher server_listen_connection_watcher_;
+ MessageLoopForIO::FileDescriptorWatcher read_watcher_;
+ MessageLoopForIO::FileDescriptorWatcher write_watcher_;
- bool is_active;
-
- // libevent's set functions set all the needed members of this struct, so no
- // need to initialize before use.
- struct event event;
- };
-
- EventHolder *server_listen_connection_event_;
- EventHolder *read_event_;
- EventHolder *write_event_;
+ // Are we currently blocked waiting for a write to complete.
+ bool is_blocked_on_write_;
// If sending a message blocks then we use this variable
// to keep track of where we are.