diff options
author | jeremy@chromium.org <jeremy@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-12-12 19:09:47 +0000 |
---|---|---|
committer | jeremy@chromium.org <jeremy@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-12-12 19:09:47 +0000 |
commit | eb8c76326fb39a429dc8bda38da22a86bb6802b3 (patch) | |
tree | 2155abfd972894d6ca5bfdf11a8fed37da065d4c /chrome | |
parent | b3d64d5c0d00415968935942b00d37704cd7507c (diff) | |
download | chromium_src-eb8c76326fb39a429dc8bda38da22a86bb6802b3.zip chromium_src-eb8c76326fb39a429dc8bda38da22a86bb6802b3.tar.gz chromium_src-eb8c76326fb39a429dc8bda38da22a86bb6802b3.tar.bz2 |
message_pump_libevent refactor:
* Unify WatchSocket & WatchFileHandle.
* Better encapsulate libevent.
* Fix a bug with blocking writes in ipc_posix.cc
Review URL: http://codereview.chromium.org/13757
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@6911 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome')
-rw-r--r-- | chrome/common/ipc_channel_posix.cc | 109 | ||||
-rw-r--r-- | chrome/common/ipc_channel_posix.h | 29 |
2 files changed, 54 insertions, 84 deletions
diff --git a/chrome/common/ipc_channel_posix.cc b/chrome/common/ipc_channel_posix.cc index 9a30f80..74a2407 100644 --- a/chrome/common/ipc_channel_posix.cc +++ b/chrome/common/ipc_channel_posix.cc @@ -6,10 +6,16 @@ #include <errno.h> #include <fcntl.h> +#include <stddef.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/stat.h> -#include <stddef.h> +#if defined(OS_LINUX) +#include <linux/un.h> +#elif defined(OS_MACOSX) +#include <sys/un.h> +#endif + #include "base/logging.h" #include "base/process_util.h" @@ -18,12 +24,6 @@ #include "chrome/common/chrome_counters.h" #include "chrome/common/ipc_message_utils.h" -#if defined(OS_LINUX) -#include <linux/un.h> -#elif defined(OS_MACOSX) -#include <sys/un.h> -#endif - namespace IPC { //------------------------------------------------------------------------------ @@ -150,9 +150,7 @@ bool ClientConnectToFifo(const std::string &pipe_name, int* client_socket) { Channel::ChannelImpl::ChannelImpl(const std::wstring& channel_id, Mode mode, Listener* listener) : mode_(mode), - server_listen_connection_event_(new EventHolder()), - read_event_(new EventHolder()), - write_event_(new EventHolder()), + is_blocked_on_write_(false), message_send_bytes_written_(0), server_listen_pipe_(-1), pipe_(-1), @@ -214,21 +212,22 @@ bool Channel::ChannelImpl::Connect() { if (server_listen_pipe_ == -1) { return false; } - event *ev = &(server_listen_connection_event_->event); - MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_, - EV_READ | EV_PERSIST, - ev, - this); - server_listen_connection_event_->is_active = true; + MessageLoopForIO::current()->WatchFileDescriptor( + server_listen_pipe_, + true, + MessageLoopForIO::WATCH_READ, + &server_listen_connection_watcher_, + this); } else { if (pipe_ == -1) { return false; } - MessageLoopForIO::current()->WatchFileHandle(pipe_, - EV_READ | EV_PERSIST, - &(read_event_->event), - this); - read_event_->is_active = true; + MessageLoopForIO::current()->WatchFileDescriptor( + pipe_, + true, + MessageLoopForIO::WATCH_READ, + &read_watcher_, + this); waiting_connect_ = false; } @@ -317,6 +316,7 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() { bool Channel::ChannelImpl::ProcessOutgoingMessages() { DCHECK(!waiting_connect_); // Why are we trying to send messages if there's // no connection? + is_blocked_on_write_ = false; if (output_queue_.empty()) return true; @@ -324,15 +324,6 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() { if (pipe_ == -1) return false; - // If libevent was monitoring the socket for us (we blocked when trying to - // write a message last time), then delete the underlying libevent structure. - if (write_event_->is_active) { - // TODO(playmobil): This calls event_del(), but we can probably - // do with just calling event_add here. - MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event)); - write_event_->is_active = false; - } - // Write out all the messages we can till the write blocks or there are no // more outgoing messages. while (!output_queue_.empty()) { @@ -355,12 +346,13 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() { message_send_bytes_written_ += bytes_written; // Tell libevent to call us back once things are unblocked. - MessageLoopForIO::current()->WatchFileHandle(server_listen_pipe_, - EV_WRITE, - &(write_event_->event), - this); - write_event_->is_active = true; - + is_blocked_on_write_ = true; + MessageLoopForIO::current()->WatchFileDescriptor( + pipe_, + false, // One shot + MessageLoopForIO::WATCH_WRITE, + &write_watcher_, + this); } else { message_send_bytes_written_ = 0; @@ -391,7 +383,7 @@ bool Channel::ChannelImpl::Send(Message* message) { output_queue_.push(message); if (!waiting_connect_) { - if (!write_event_->is_active) { + if (!is_blocked_on_write_) { if (!ProcessOutgoingMessages()) return false; } @@ -401,7 +393,7 @@ bool Channel::ChannelImpl::Send(Message* message) { } // Called by libevent when we can read from th pipe without blocking. -void Channel::ChannelImpl::OnFileReadReady(int fd) { +void Channel::ChannelImpl::OnFileCanReadWithoutBlocking(int fd) { bool send_server_hello_msg = false; if (waiting_connect_ && mode_ == MODE_SERVER) { if (!ServerAcceptFifoConnection(server_listen_pipe_, &pipe_)) { @@ -410,16 +402,16 @@ void Channel::ChannelImpl::OnFileReadReady(int fd) { // No need to watch the listening socket any longer since only one client // can connect. So unregister with libevent. - event *ev = &(server_listen_connection_event_->event); - MessageLoopForIO::current()->UnwatchFileHandle(ev); - server_listen_connection_event_->is_active = false; + server_listen_connection_watcher_.StopWatchingFileDescriptor(); // Start watching our end of the socket. - MessageLoopForIO::current()->WatchFileHandle(pipe_, - EV_READ | EV_PERSIST, - &(read_event_->event), - this); - read_event_->is_active = true; + MessageLoopForIO::current()->WatchFileDescriptor( + pipe_, + true, + MessageLoopForIO::WATCH_READ, + &read_watcher_, + this); + waiting_connect_ = false; send_server_hello_msg = true; } @@ -436,12 +428,14 @@ void Channel::ChannelImpl::OnFileReadReady(int fd) { // This gives us a chance to kill the client if the incoming handshake // is invalid. if (send_server_hello_msg) { + // This should be our first write so there' sno chance we can block here... + DCHECK(is_blocked_on_write_ == false); ProcessOutgoingMessages(); } } // Called by libevent when we can write to the pipe without blocking. -void Channel::ChannelImpl::OnFileWriteReady(int fd) { +void Channel::ChannelImpl::OnFileCanWriteWithoutBlocking(int fd) { if (!ProcessOutgoingMessages()) { Close(); listener_->OnChannelError(); @@ -453,11 +447,7 @@ void Channel::ChannelImpl::Close() { // idempotent. // Unregister libevent for the listening socket and close it. - if (server_listen_connection_event_ && - server_listen_connection_event_->is_active) { - MessageLoopForIO::current()->UnwatchFileHandle( - &(server_listen_connection_event_->event)); - } + server_listen_connection_watcher_.StopWatchingFileDescriptor(); if (server_listen_pipe_ != -1) { close(server_listen_pipe_); @@ -465,24 +455,13 @@ void Channel::ChannelImpl::Close() { } // Unregister libevent for the FIFO and close it. - if (read_event_ && read_event_->is_active) { - MessageLoopForIO::current()->UnwatchFileHandle(&(read_event_->event)); - } - if (write_event_ && write_event_->is_active) { - MessageLoopForIO::current()->UnwatchFileHandle(&(write_event_->event)); - } + read_watcher_.StopWatchingFileDescriptor(); + write_watcher_.StopWatchingFileDescriptor(); if (pipe_ != -1) { close(pipe_); pipe_ = -1; } - delete server_listen_connection_event_; - server_listen_connection_event_ = NULL; - delete read_event_; - read_event_ = NULL; - delete write_event_; - write_event_ = NULL; - // Unlink the FIFO unlink(pipe_name_.c_str()); diff --git a/chrome/common/ipc_channel_posix.h b/chrome/common/ipc_channel_posix.h index b2849dc..9b34494 100644 --- a/chrome/common/ipc_channel_posix.h +++ b/chrome/common/ipc_channel_posix.h @@ -11,11 +11,10 @@ #include <string> #include "base/message_loop.h" -#include "third_party/libevent/event.h" namespace IPC { -class Channel::ChannelImpl : public MessageLoopForIO::FileWatcher { +class Channel::ChannelImpl : public MessageLoopForIO::Watcher { public: // Mirror methods of Channel, see ipc_channel.h for description. ChannelImpl(const std::wstring& channel_id, Mode mode, Listener* listener); @@ -31,27 +30,19 @@ class Channel::ChannelImpl : public MessageLoopForIO::FileWatcher { bool ProcessIncomingMessages(); bool ProcessOutgoingMessages(); - void OnFileReadReady(int fd); - void OnFileWriteReady(int fd); + void OnFileCanReadWithoutBlocking(int fd); + void OnFileCanWriteWithoutBlocking(int fd); Mode mode_; - // Wrapper for Libevent event. - // TODO(playmobil): MessageLoopForIO needs to better encapsulate libevent. - struct EventHolder { - EventHolder() : is_active(false) {} - ~EventHolder() {} + // After accepting one client connection on our server socket we want to + // stop listening. + MessageLoopForIO::FileDescriptorWatcher server_listen_connection_watcher_; + MessageLoopForIO::FileDescriptorWatcher read_watcher_; + MessageLoopForIO::FileDescriptorWatcher write_watcher_; - bool is_active; - - // libevent's set functions set all the needed members of this struct, so no - // need to initialize before use. - struct event event; - }; - - EventHolder *server_listen_connection_event_; - EventHolder *read_event_; - EventHolder *write_event_; + // Are we currently blocked waiting for a write to complete. + bool is_blocked_on_write_; // If sending a message blocks then we use this variable // to keep track of where we are. |