From 514411fcb4657f575eeb3e39a9056478a28306a0 Mon Sep 17 00:00:00 2001 From: "jeremy@chromium.org" Date: Wed, 10 Dec 2008 22:28:11 +0000 Subject: Refactor IPC::Channel to have a common header. Review URL: http://codereview.chromium.org/11024 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@6743 0039d316-1c4b-4281-b951-d872f2087c98 --- chrome/chrome.xcodeproj/project.pbxproj | 4 +- chrome/common/common.scons | 2 +- chrome/common/common.vcproj | 2 +- chrome/common/ipc_channel.cc | 395 ----------------------------- chrome/common/ipc_channel.h | 111 +-------- chrome/common/ipc_channel_posix.cc | 71 +++--- chrome/common/ipc_channel_posix.h | 93 +++++++ chrome/common/ipc_channel_proxy.h | 2 + chrome/common/ipc_channel_win.cc | 427 ++++++++++++++++++++++++++++++++ chrome/common/ipc_channel_win.h | 81 ++++++ chrome/common/ipc_fuzzing_tests.cc | 4 +- chrome/common/resource_dispatcher.cc | 1 + chrome/common/resource_dispatcher.h | 2 + 13 files changed, 667 insertions(+), 528 deletions(-) delete mode 100644 chrome/common/ipc_channel.cc create mode 100644 chrome/common/ipc_channel_posix.h create mode 100644 chrome/common/ipc_channel_win.cc create mode 100644 chrome/common/ipc_channel_win.h (limited to 'chrome') diff --git a/chrome/chrome.xcodeproj/project.pbxproj b/chrome/chrome.xcodeproj/project.pbxproj index 230c961..d2e7225 100644 --- a/chrome/chrome.xcodeproj/project.pbxproj +++ b/chrome/chrome.xcodeproj/project.pbxproj @@ -1272,7 +1272,6 @@ 4D7BFBA60E9D4C9F009A6919 /* env_vars.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = env_vars.h; sourceTree = ""; }; 4D7BFBA70E9D4C9F009A6919 /* filter_policy.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = filter_policy.h; sourceTree = ""; }; 4D7BFBA80E9D4C9F009A6919 /* gears_api.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = gears_api.h; sourceTree = ""; }; - 4D7BFBA90E9D4C9F009A6919 /* ipc_channel.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = ipc_channel.cc; sourceTree = ""; }; 4D7BFBAA0E9D4C9F009A6919 /* ipc_channel.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = ipc_channel.h; sourceTree = ""; }; 4D7BFBAB0E9D4C9F009A6919 /* ipc_channel_proxy.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = ipc_channel_proxy.cc; sourceTree = ""; }; 4D7BFBAC0E9D4C9F009A6919 /* ipc_channel_proxy.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = ipc_channel_proxy.h; sourceTree = ""; }; @@ -1423,6 +1422,7 @@ B54BD8FB0ED622C00093FD54 /* mach_message_source_mac.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = mach_message_source_mac.h; sourceTree = ""; }; B562C8410ED49C830077A23F /* mach_ipc_mac.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = mach_ipc_mac.h; sourceTree = ""; }; B562C8420ED49C830077A23F /* mach_ipc_mac.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = mach_ipc_mac.mm; sourceTree = ""; }; + B5D7CD350EF0702F00EE645F /* ipc_channel_posix.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = ipc_channel_posix.h; sourceTree = ""; }; B5FDBFAE0EE4623000BEC6E6 /* ipc_tests */ = {isa = PBXFileReference; explicitFileType = "compiled.mach-o.executable"; includeInIndex = 0; path = ipc_tests; sourceTree = BUILT_PRODUCTS_DIR; }; B5FDC0570EE488E500BEC6E6 /* ipc_channel_posix.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = ipc_channel_posix.cc; sourceTree = ""; }; E45060E40EE87B86003BE099 /* Chromium.app */ = {isa = PBXFileReference; explicitFileType = wrapper.application; includeInIndex = 0; path = Chromium.app; sourceTree = BUILT_PRODUCTS_DIR; }; @@ -2294,9 +2294,9 @@ 4D7BFBA60E9D4C9F009A6919 /* env_vars.h */, 4D7BFBA70E9D4C9F009A6919 /* filter_policy.h */, 4D7BFBA80E9D4C9F009A6919 /* gears_api.h */, - 4D7BFBA90E9D4C9F009A6919 /* ipc_channel.cc */, 4D7BFBAA0E9D4C9F009A6919 /* ipc_channel.h */, B5FDC0570EE488E500BEC6E6 /* ipc_channel_posix.cc */, + B5D7CD350EF0702F00EE645F /* ipc_channel_posix.h */, 4D7BFBAB0E9D4C9F009A6919 /* ipc_channel_proxy.cc */, 4D7BFBAC0E9D4C9F009A6919 /* ipc_channel_proxy.h */, 4D7BFBAD0E9D4C9F009A6919 /* ipc_fuzzing_tests.cc */, diff --git a/chrome/common/common.scons b/chrome/common/common.scons index 0116edf..5f4d8fa 100644 --- a/chrome/common/common.scons +++ b/chrome/common/common.scons @@ -90,7 +90,7 @@ if env['PLATFORM'] == 'win32': 'gfx/icon_util.cc', 'gfx/path.cc', 'gfx/text_elider.cc', - 'ipc_channel.cc', + 'ipc_channel_win.cc', 'ipc_channel_proxy.cc', 'ipc_logging.cc', 'ipc_message_utils.cc', diff --git a/chrome/common/common.vcproj b/chrome/common/common.vcproj index 7293656..e1331e0 100644 --- a/chrome/common/common.vcproj +++ b/chrome/common/common.vcproj @@ -221,7 +221,7 @@ Name="ipc" > -#include - -#include "base/compiler_specific.h" -#include "base/logging.h" -#include "base/win_util.h" -#include "chrome/common/chrome_counters.h" -#include "chrome/common/ipc_logging.h" -#include "chrome/common/ipc_message_utils.h" - -using namespace std; - -namespace IPC { - -//------------------------------------------------------------------------------ - -Channel::State::State(Channel* channel) : is_pending(false) { - memset(&context.overlapped, 0, sizeof(context.overlapped)); - context.handler = channel; -} - -Channel::State::~State() { - COMPILE_ASSERT(!offsetof(Channel::State, context), starts_with_io_context); -} - -//------------------------------------------------------------------------------ - -Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener) - : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)), - ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)), - pipe_(INVALID_HANDLE_VALUE), - listener_(listener), - waiting_connect_(mode == MODE_SERVER), - processing_incoming_(false), - ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) { - if (!CreatePipe(channel_id, mode)) { - // The pipe may have been closed already. - LOG(WARNING) << "Unable to create pipe named \"" << channel_id << - "\" in " << (mode == 0 ? "server" : "client") << " mode."; - } -} - -void Channel::Close() { - bool waited = false; - if (input_state_.is_pending || output_state_.is_pending) { - CancelIo(pipe_); - waited = true; - } - - // Closing the handle at this point prevents us from issuing more requests - // form OnIOCompleted(). - if (pipe_ != INVALID_HANDLE_VALUE) { - CloseHandle(pipe_); - pipe_ = INVALID_HANDLE_VALUE; - } - - // Make sure all IO has completed. - base::Time start = base::Time::Now(); - while (input_state_.is_pending || output_state_.is_pending) { - MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); - } - if (waited) { - // We want to see if we block the message loop for too long. - UMA_HISTOGRAM_TIMES(L"AsyncIO.IPCChannelClose", base::Time::Now() - start); - } - - while (!output_queue_.empty()) { - Message* m = output_queue_.front(); - output_queue_.pop(); - delete m; - } -} - -bool Channel::Send(Message* message) { - chrome::Counters::ipc_send_counter().Increment(); -#ifdef IPC_MESSAGE_DEBUG_EXTRA - DLOG(INFO) << "sending message @" << message << " on channel @" << this - << " with type " << message->type() - << " (" << output_queue_.size() << " in queue)"; -#endif - -#ifdef IPC_MESSAGE_LOG_ENABLED - Logging::current()->OnSendMessage(message, L""); -#endif - - output_queue_.push(message); - // ensure waiting to write - if (!waiting_connect_) { - if (!output_state_.is_pending) { - if (!ProcessOutgoingMessages(NULL, 0)) - return false; - } - } - - return true; -} - -const wstring Channel::PipeName(const wstring& channel_id) const { - wostringstream ss; - // XXX(darin): get application name from somewhere else - ss << L"\\\\.\\pipe\\chrome." << channel_id; - return ss.str(); -} - -bool Channel::CreatePipe(const wstring& channel_id, Mode mode) { - DCHECK(pipe_ == INVALID_HANDLE_VALUE); - const wstring pipe_name = PipeName(channel_id); - if (mode == MODE_SERVER) { - SECURITY_ATTRIBUTES security_attributes = {0}; - security_attributes.bInheritHandle = FALSE; - security_attributes.nLength = sizeof(SECURITY_ATTRIBUTES); - if (!win_util::GetLogonSessionOnlyDACL( - reinterpret_cast( - &security_attributes.lpSecurityDescriptor))) { - NOTREACHED(); - } - - pipe_ = CreateNamedPipeW(pipe_name.c_str(), - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | - FILE_FLAG_FIRST_PIPE_INSTANCE, - PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, - 1, // number of pipe instances - BUF_SIZE, // output buffer size (XXX tune) - BUF_SIZE, // input buffer size (XXX tune) - 5000, // timeout in milliseconds (XXX tune) - &security_attributes); - LocalFree(security_attributes.lpSecurityDescriptor); - } else { - pipe_ = CreateFileW(pipe_name.c_str(), - GENERIC_READ | GENERIC_WRITE, - 0, - NULL, - OPEN_EXISTING, - SECURITY_SQOS_PRESENT | SECURITY_IDENTIFICATION | - FILE_FLAG_OVERLAPPED, - NULL); - } - if (pipe_ == INVALID_HANDLE_VALUE) { - // If this process is being closed, the pipe may be gone already. - LOG(WARNING) << "failed to create pipe: " << GetLastError(); - return false; - } - - // Create the Hello message to be sent when Connect is called - scoped_ptr m(new Message(MSG_ROUTING_NONE, - HELLO_MESSAGE_TYPE, - IPC::Message::PRIORITY_NORMAL)); - if (!m->WriteInt(GetCurrentProcessId())) { - CloseHandle(pipe_); - pipe_ = INVALID_HANDLE_VALUE; - return false; - } - - output_queue_.push(m.release()); - return true; -} - -bool Channel::Connect() { - DLOG(WARNING) << "Connect called twice"; - - if (pipe_ == INVALID_HANDLE_VALUE) - return false; - - MessageLoopForIO::current()->RegisterIOHandler(pipe_, this); - - // Check to see if there is a client connected to our pipe... - if (waiting_connect_) - ProcessConnection(); - - if (!input_state_.is_pending) { - // Complete setup asynchronously. By not setting input_state_.is_pending - // to true, we indicate to OnIOCompleted that this is the special - // initialization signal. - MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod( - &Channel::OnIOCompleted, &input_state_.context, 0, 0)); - } - - if (!waiting_connect_) - ProcessOutgoingMessages(NULL, 0); - return true; -} - -bool Channel::ProcessConnection() { - if (input_state_.is_pending) - input_state_.is_pending = false; - - // Do we have a client connected to our pipe? - if (INVALID_HANDLE_VALUE == pipe_) - return false; - - BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped); - - DWORD err = GetLastError(); - if (ok) { - // Uhm, the API documentation says that this function should never - // return success when used in overlapped mode. - NOTREACHED(); - return false; - } - - switch (err) { - case ERROR_IO_PENDING: - input_state_.is_pending = true; - break; - case ERROR_PIPE_CONNECTED: - waiting_connect_ = false; - break; - default: - NOTREACHED(); - return false; - } - - return true; -} - -bool Channel::ProcessIncomingMessages(MessageLoopForIO::IOContext* context, - DWORD bytes_read) { - if (input_state_.is_pending) { - input_state_.is_pending = false; - DCHECK(context); - - if (!context || !bytes_read) - return false; - } else { - // This happens at channel initialization. - DCHECK(!bytes_read && context == &input_state_.context); - } - - for (;;) { - if (bytes_read == 0) { - if (INVALID_HANDLE_VALUE == pipe_) - return false; - - // Read from pipe... - BOOL ok = ReadFile(pipe_, - input_buf_, - BUF_SIZE, - &bytes_read, - &input_state_.context.overlapped); - if (!ok) { - DWORD err = GetLastError(); - if (err == ERROR_IO_PENDING) { - input_state_.is_pending = true; - return true; - } - LOG(ERROR) << "pipe error: " << err; - return false; - } - input_state_.is_pending = true; - return true; - } - DCHECK(bytes_read); - - // Process messages from input buffer. - - const char* p, *end; - if (input_overflow_buf_.empty()) { - p = input_buf_; - end = p + bytes_read; - } else { - if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) { - input_overflow_buf_.clear(); - LOG(ERROR) << "IPC message is too big"; - return false; - } - input_overflow_buf_.append(input_buf_, bytes_read); - p = input_overflow_buf_.data(); - end = p + input_overflow_buf_.size(); - } - - while (p < end) { - const char* message_tail = Message::FindNext(p, end); - if (message_tail) { - int len = static_cast(message_tail - p); - const Message m(p, len); -#ifdef IPC_MESSAGE_DEBUG_EXTRA - DLOG(INFO) << "received message on channel @" << this << - " with type " << m.type(); -#endif - if (m.routing_id() == MSG_ROUTING_NONE && - m.type() == HELLO_MESSAGE_TYPE) { - // The Hello message contains only the process id. - listener_->OnChannelConnected(MessageIterator(m).NextInt()); - } else { - listener_->OnMessageReceived(m); - } - p = message_tail; - } else { - // Last message is partial. - break; - } - } - input_overflow_buf_.assign(p, end - p); - - bytes_read = 0; // Get more data. - } - - return true; -} - -bool Channel::ProcessOutgoingMessages(MessageLoopForIO::IOContext* context, - DWORD bytes_written) { - DCHECK(!waiting_connect_); // Why are we trying to send messages if there's - // no connection? - - if (output_state_.is_pending) { - DCHECK(context); - output_state_.is_pending = false; - if (!context || bytes_written == 0) { - DWORD err = GetLastError(); - LOG(ERROR) << "pipe error: " << err; - return false; - } - // Message was sent. - DCHECK(!output_queue_.empty()); - Message* m = output_queue_.front(); - output_queue_.pop(); - delete m; - } - - if (output_queue_.empty()) - return true; - - if (INVALID_HANDLE_VALUE == pipe_) - return false; - - // Write to pipe... - Message* m = output_queue_.front(); - BOOL ok = WriteFile(pipe_, - m->data(), - m->size(), - &bytes_written, - &output_state_.context.overlapped); - if (!ok) { - DWORD err = GetLastError(); - if (err == ERROR_IO_PENDING) { - output_state_.is_pending = true; - -#ifdef IPC_MESSAGE_DEBUG_EXTRA - DLOG(INFO) << "sent pending message @" << m << " on channel @" << - this << " with type " << m->type(); -#endif - - return true; - } - LOG(ERROR) << "pipe error: " << err; - return false; - } - -#ifdef IPC_MESSAGE_DEBUG_EXTRA - DLOG(INFO) << "sent message @" << m << " on channel @" << this << - " with type " << m->type(); -#endif - - output_state_.is_pending = true; - return true; -} - -void Channel::OnIOCompleted(MessageLoopForIO::IOContext* context, - DWORD bytes_transfered, DWORD error) { - bool ok; - if (context == &input_state_.context) { - if (waiting_connect_) { - if (!ProcessConnection()) - return; - // We may have some messages queued up to send... - if (!output_queue_.empty() && !output_state_.is_pending) - ProcessOutgoingMessages(NULL, 0); - if (input_state_.is_pending) - return; - // else, fall-through and look for incoming messages... - } - // we don't support recursion through OnMessageReceived yet! - DCHECK(!processing_incoming_); - processing_incoming_ = true; - ok = ProcessIncomingMessages(context, bytes_transfered); - processing_incoming_ = false; - } else { - DCHECK(context == &output_state_.context); - ok = ProcessOutgoingMessages(context, bytes_transfered); - } - if (!ok && INVALID_HANDLE_VALUE != pipe_) { - // We don't want to re-enter Close(). - Close(); - listener_->OnChannelError(); - } -} - -} diff --git a/chrome/common/ipc_channel.h b/chrome/common/ipc_channel.h index ac234b8..bd7bf05 100644 --- a/chrome/common/ipc_channel.h +++ b/chrome/common/ipc_channel.h @@ -5,22 +5,13 @@ #ifndef CHROME_COMMON_IPC_CHANNEL_H_ #define CHROME_COMMON_IPC_CHANNEL_H_ -#include - -#include "base/message_loop.h" #include "chrome/common/ipc_message.h" namespace IPC { //------------------------------------------------------------------------------ -class Channel : public Message::Sender, -#if defined(OS_WIN) - public MessageLoopForIO::IOHandler -#elif defined(OS_POSIX) - public MessageLoopForIO::FileWatcher -#endif - { +class Channel : public Message::Sender { // Security tests need access to the pipe handle. friend class ChannelTest; @@ -47,10 +38,13 @@ class Channel : public Message::Sender, MODE_CLIENT }; - // The maximum message size in bytes. Attempting to receive a - // message of this size or bigger results in a channel error. enum { - kMaximumMessageSize = 256 * 1024 * 1024 + // The maximum message size in bytes. Attempting to receive a + // message of this size or bigger results in a channel error. + kMaximumMessageSize = 256 * 1024 * 1024, + + // Ammount of data to read at once from the pipe. + kReadBufferSize = 4 * 1024 }; // Initialize a Channel. @@ -65,7 +59,7 @@ class Channel : public Message::Sender, // Channel(const std::wstring& channel_id, Mode mode, Listener* listener); - ~Channel() { Close(); } + ~Channel(); // Connect the pipe. On the server side, this will initiate // waiting for connections. On the client, it attempts to @@ -78,7 +72,7 @@ class Channel : public Message::Sender, void Close(); // Modify the Channel's listener. - void set_listener(Listener* listener) { listener_ = listener; } + void set_listener(Listener* listener); // Send a message over the Channel to the listener on the other end. // @@ -92,87 +86,9 @@ class Channel : public Message::Sender, virtual bool Send(Message* message); private: - const std::wstring PipeName(const std::wstring& channel_id) const; - bool CreatePipe(const std::wstring& channel_id, Mode mode); -#if defined(OS_WIN) - bool ProcessConnection(); - bool ProcessIncomingMessages(MessageLoopForIO::IOContext* context, - DWORD bytes_read); - bool ProcessOutgoingMessages(MessageLoopForIO::IOContext* context, - DWORD bytes_written); - - // MessageLoop::IOHandler implementation. - virtual void OnIOCompleted(MessageLoopForIO::IOContext* context, - DWORD bytes_transfered, DWORD error); - private: - enum { - BUF_SIZE = 4096 - }; - - struct State { - explicit State(Channel* channel); - ~State(); - MessageLoopForIO::IOContext context; - bool is_pending; - }; - - State input_state_; - State output_state_; - - HANDLE pipe_; -#elif defined(OS_POSIX) - bool ProcessIncomingMessages(); - bool ProcessOutgoingMessages(); - - void OnFileReadReady(int fd); - void OnFileWriteReady(int fd); - - - Mode mode_; - - // TODO(playmobil): do we need to change BUF_SIZE ? - private: - enum { - BUF_SIZE = 4096 - }; - - // PIMPL to encapsulate libevent structures. - struct EventHolder; - EventHolder *server_listen_connection_event_; - EventHolder *read_event_; - EventHolder *write_event_; - - // If sending a message blocks then we use this variable - // to keep track of where we are. - size_t message_send_bytes_written_; - - int server_listen_pipe_; - int pipe_; - std::string pipe_name_; -#endif // defined(OS_POSIX) - Listener* listener_; - - // Messages to be sent are queued here. - std::queue output_queue_; - - // We read from the pipe into this buffer - char input_buf_[BUF_SIZE]; - - // Large messages that span multiple pipe buffers, get built-up using - // this buffer. - std::string input_overflow_buf_; - - // In server-mode, we have to wait for the client to connect before we - // can begin reading. We make use of the input_state_ when performing - // the connect operation in overlapped mode. - bool waiting_connect_; - - // This flag is set when processing incoming messages. It is used to - // avoid recursing through ProcessIncomingMessages, which could cause - // problems. TODO(darin): make this unnecessary - bool processing_incoming_; - - ScopedRunnableMethodFactory factory_; + // PIMPL to which all channel calls are delegated. + class ChannelImpl; + ChannelImpl *channel_impl_; // The Hello message is internal to the Channel class. It is sent // by the peer when the channel is connected. The message contains @@ -186,7 +102,6 @@ class Channel : public Message::Sender, }; }; -} +} // namespace IPC #endif // CHROME_COMMON_IPC_CHANNEL_H_ - diff --git a/chrome/common/ipc_channel_posix.cc b/chrome/common/ipc_channel_posix.cc index 092e8a1..45573ae 100644 --- a/chrome/common/ipc_channel_posix.cc +++ b/chrome/common/ipc_channel_posix.cc @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "chrome/common/ipc_channel.h" +#include "chrome/common/ipc_channel_posix.h" #include #include @@ -17,7 +17,6 @@ #include "base/string_util.h" #include "chrome/common/chrome_counters.h" #include "chrome/common/ipc_message_utils.h" -#include "third_party/libevent/event.h" namespace IPC { @@ -140,25 +139,10 @@ bool ClientConnectToFifo(const std::string &pipe_name, int* client_socket) { } } // namespace - //------------------------------------------------------------------------------ -// PIMPL wrapper for libevent event. -// TODO(playmobil): MessageLoopForIO needs to better encapsulate libevent. -struct Channel::EventHolder { - EventHolder() : is_active(false) {} - ~EventHolder() {} - - bool is_active; - - // libevent's set functions set all the needed members of this struct, so no - // need to initialize before use. - struct event event; -}; - -//------------------------------------------------------------------------------ - -Channel::Channel(const std::wstring& channel_id, Mode mode, Listener* listener) +Channel::ChannelImpl::ChannelImpl(const std::wstring& channel_id, Mode mode, + Listener* listener) : mode_(mode), server_listen_connection_event_(new EventHolder()), read_event_(new EventHolder()), @@ -178,7 +162,8 @@ Channel::Channel(const std::wstring& channel_id, Mode mode, Listener* listener) } } -const std::wstring Channel::PipeName(const std::wstring& channel_id) const { +const std::wstring Channel::ChannelImpl::PipeName( + const std::wstring& channel_id) const { std::wostringstream ss; // TODO(playmobil): This should live in the Chrome user data directory. // TODO(playmobil): Cleanup any stale fifos. @@ -186,7 +171,8 @@ const std::wstring Channel::PipeName(const std::wstring& channel_id) const { return ss.str(); } -bool Channel::CreatePipe(const std::wstring& channel_id, Mode mode) { +bool Channel::ChannelImpl::CreatePipe(const std::wstring& channel_id, + Mode mode) { DCHECK(server_listen_pipe_ == -1 && pipe_ == -1); // TODO(playmobil): Should we just change pipe_name to be a normal string @@ -217,7 +203,7 @@ bool Channel::CreatePipe(const std::wstring& channel_id, Mode mode) { return true; } -bool Channel::Connect() { +bool Channel::ChannelImpl::Connect() { if (mode_ == MODE_SERVER) { if (server_listen_pipe_ == -1) { return false; @@ -245,7 +231,7 @@ bool Channel::Connect() { return true; } -bool Channel::ProcessIncomingMessages() { +bool Channel::ChannelImpl::ProcessIncomingMessages() { ssize_t bytes_read = 0; for (;;) { @@ -257,7 +243,7 @@ bool Channel::ProcessIncomingMessages() { // recv() returns 0 if the connection has closed or EAGAIN if no data is // waiting on the pipe. do { - bytes_read = read(pipe_, input_buf_, BUF_SIZE); + bytes_read = read(pipe_, input_buf_, Channel::kReadBufferSize); } while (bytes_read == -1 && errno == EINTR); if (bytes_read < 0) { if (errno == EAGAIN) { @@ -322,7 +308,7 @@ bool Channel::ProcessIncomingMessages() { return true; } -bool Channel::ProcessOutgoingMessages() { +bool Channel::ChannelImpl::ProcessOutgoingMessages() { DCHECK(!waiting_connect_); // Why are we trying to send messages if there's // no connection? @@ -384,7 +370,7 @@ bool Channel::ProcessOutgoingMessages() { return true; } -bool Channel::Send(Message* message) { +bool Channel::ChannelImpl::Send(Message* message) { chrome::Counters::ipc_send_counter().Increment(); #ifdef IPC_MESSAGE_DEBUG_EXTRA DLOG(INFO) << "sending message @" << message << " on channel @" << this @@ -409,7 +395,7 @@ bool Channel::Send(Message* message) { } // Called by libevent when we can read from th pipe without blocking. -void Channel::OnFileReadReady(int fd) { +void Channel::ChannelImpl::OnFileReadReady(int fd) { bool send_server_hello_msg = false; if (waiting_connect_ && mode_ == MODE_SERVER) { if (!ServerAcceptFifoConnection(server_listen_pipe_, &pipe_)) { @@ -449,14 +435,14 @@ void Channel::OnFileReadReady(int fd) { } // Called by libevent when we can write to the pipe without blocking. -void Channel::OnFileWriteReady(int fd) { +void Channel::ChannelImpl::OnFileWriteReady(int fd) { if (!ProcessOutgoingMessages()) { Close(); listener_->OnChannelError(); } } -void Channel::Close() { +void Channel::ChannelImpl::Close() { // Close can be called multiple time, so we need to make sure we're // idempotent. @@ -501,4 +487,31 @@ void Channel::Close() { } } +//------------------------------------------------------------------------------ +// Channel's methods simply call through to ChannelImpl. +Channel::Channel(const std::wstring& channel_id, Mode mode, + Listener* listener) + : channel_impl_(new ChannelImpl(channel_id, mode, listener)) { +} + +Channel::~Channel() { + delete channel_impl_; +} + +bool Channel::Connect() { + return channel_impl_->Connect(); +} + +void Channel::Close() { + channel_impl_->Close(); +} + +void Channel::set_listener(Listener* listener) { + channel_impl_->set_listener(listener); +} + +bool Channel::Send(Message* message) { + return channel_impl_->Send(message); +} + } // namespace IPC diff --git a/chrome/common/ipc_channel_posix.h b/chrome/common/ipc_channel_posix.h new file mode 100644 index 0000000..b2849dc --- /dev/null +++ b/chrome/common/ipc_channel_posix.h @@ -0,0 +1,93 @@ +// Copyright (c) 2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CHROME_COMMON_IPC_CHANNEL_POSIX_H_ +#define CHROME_COMMON_IPC_CHANNEL_POSIX_H_ + +#include "chrome/common/ipc_channel.h" + +#include +#include + +#include "base/message_loop.h" +#include "third_party/libevent/event.h" + +namespace IPC { + +class Channel::ChannelImpl : public MessageLoopForIO::FileWatcher { + public: + // Mirror methods of Channel, see ipc_channel.h for description. + ChannelImpl(const std::wstring& channel_id, Mode mode, Listener* listener); + ~ChannelImpl() { Close(); } + bool Connect(); + void Close(); + void set_listener(Listener* listener) { listener_ = listener; } + bool Send(Message* message); + private: + const std::wstring PipeName(const std::wstring& channel_id) const; + bool CreatePipe(const std::wstring& channel_id, Mode mode); + + bool ProcessIncomingMessages(); + bool ProcessOutgoingMessages(); + + void OnFileReadReady(int fd); + void OnFileWriteReady(int fd); + + Mode mode_; + + // Wrapper for Libevent event. + // TODO(playmobil): MessageLoopForIO needs to better encapsulate libevent. + struct EventHolder { + EventHolder() : is_active(false) {} + ~EventHolder() {} + + bool is_active; + + // libevent's set functions set all the needed members of this struct, so no + // need to initialize before use. + struct event event; + }; + + EventHolder *server_listen_connection_event_; + EventHolder *read_event_; + EventHolder *write_event_; + + // If sending a message blocks then we use this variable + // to keep track of where we are. + size_t message_send_bytes_written_; + + int server_listen_pipe_; + int pipe_; + std::string pipe_name_; + + Listener* listener_; + + // Messages to be sent are queued here. + std::queue output_queue_; + + // We read from the pipe into this buffer + char input_buf_[Channel::kReadBufferSize]; + + // Large messages that span multiple pipe buffers, get built-up using + // this buffer. + std::string input_overflow_buf_; + + // In server-mode, we have to wait for the client to connect before we + // can begin reading. We make use of the input_state_ when performing + // the connect operation in overlapped mode. + bool waiting_connect_; + + // This flag is set when processing incoming messages. It is used to + // avoid recursing through ProcessIncomingMessages, which could cause + // problems. TODO(darin): make this unnecessary + bool processing_incoming_; + + ScopedRunnableMethodFactory factory_; + + DISALLOW_COPY_AND_ASSIGN(ChannelImpl); +}; + +} // namespace IPC + +#endif // CHROME_COMMON_IPC_CHANNEL_POSIX_H_ diff --git a/chrome/common/ipc_channel_proxy.h b/chrome/common/ipc_channel_proxy.h index 2b08c09..0b88cb7 100644 --- a/chrome/common/ipc_channel_proxy.h +++ b/chrome/common/ipc_channel_proxy.h @@ -10,6 +10,8 @@ #include "base/ref_counted.h" #include "chrome/common/ipc_channel.h" +class MessageLoop; + namespace IPC { //----------------------------------------------------------------------------- diff --git a/chrome/common/ipc_channel_win.cc b/chrome/common/ipc_channel_win.cc new file mode 100644 index 0000000..a38305a --- /dev/null +++ b/chrome/common/ipc_channel_win.cc @@ -0,0 +1,427 @@ +// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "chrome/common/ipc_channel_win.h" + +#include +#include + +#include "base/compiler_specific.h" +#include "base/logging.h" +#include "base/win_util.h" +#include "chrome/common/chrome_counters.h" +#include "chrome/common/ipc_logging.h" +#include "chrome/common/ipc_message_utils.h" + +namespace IPC { +//------------------------------------------------------------------------------ + +Channel::ChannelImpl::State::State(ChannelImpl* channel) : is_pending(false) { + memset(&context.overlapped, 0, sizeof(context.overlapped)); + context.handler = channel; +} + +Channel::ChannelImpl::State::~State() { + COMPILE_ASSERT(!offsetof(Channel::ChannelImpl::State, context), + starts_with_io_context); +} + +//------------------------------------------------------------------------------ + +Channel::ChannelImpl::ChannelImpl(const std::wstring& channel_id, Mode mode, + Listener* listener) + : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)), + ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)), + pipe_(INVALID_HANDLE_VALUE), + listener_(listener), + waiting_connect_(mode == MODE_SERVER), + processing_incoming_(false), + ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) { + if (!CreatePipe(channel_id, mode)) { + // The pipe may have been closed already. + LOG(WARNING) << "Unable to create pipe named \"" << channel_id << + "\" in " << (mode == 0 ? "server" : "client") << " mode."; + } +} + +void Channel::ChannelImpl::Close() { + bool waited = false; + if (input_state_.is_pending || output_state_.is_pending) { + CancelIo(pipe_); + waited = true; + } + + // Closing the handle at this point prevents us from issuing more requests + // form OnIOCompleted(). + if (pipe_ != INVALID_HANDLE_VALUE) { + CloseHandle(pipe_); + pipe_ = INVALID_HANDLE_VALUE; + } + + // Make sure all IO has completed. + base::Time start = base::Time::Now(); + while (input_state_.is_pending || output_state_.is_pending) { + MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); + } + if (waited) { + // We want to see if we block the message loop for too long. + UMA_HISTOGRAM_TIMES(L"AsyncIO.IPCChannelClose", base::Time::Now() - start); + } + + while (!output_queue_.empty()) { + Message* m = output_queue_.front(); + output_queue_.pop(); + delete m; + } +} + +bool Channel::ChannelImpl::Send(Message* message) { + chrome::Counters::ipc_send_counter().Increment(); +#ifdef IPC_MESSAGE_DEBUG_EXTRA + DLOG(INFO) << "sending message @" << message << " on channel @" << this + << " with type " << message->type() + << " (" << output_queue_.size() << " in queue)"; +#endif + +#ifdef IPC_MESSAGE_LOG_ENABLED + Logging::current()->OnSendMessage(message, L""); +#endif + + output_queue_.push(message); + // ensure waiting to write + if (!waiting_connect_) { + if (!output_state_.is_pending) { + if (!ProcessOutgoingMessages(NULL, 0)) + return false; + } + } + + return true; +} + +const std::wstring Channel::ChannelImpl::PipeName(const std::wstring& channel_id) + const { + std::wostringstream ss; + // XXX(darin): get application name from somewhere else + ss << L"\\\\.\\pipe\\chrome." << channel_id; + return ss.str(); +} + +bool Channel::ChannelImpl::CreatePipe(const std::wstring& channel_id, + Mode mode) { + DCHECK(pipe_ == INVALID_HANDLE_VALUE); + const std::wstring pipe_name = PipeName(channel_id); + if (mode == MODE_SERVER) { + SECURITY_ATTRIBUTES security_attributes = {0}; + security_attributes.bInheritHandle = FALSE; + security_attributes.nLength = sizeof(SECURITY_ATTRIBUTES); + if (!win_util::GetLogonSessionOnlyDACL( + reinterpret_cast( + &security_attributes.lpSecurityDescriptor))) { + NOTREACHED(); + } + + pipe_ = CreateNamedPipeW(pipe_name.c_str(), + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | + FILE_FLAG_FIRST_PIPE_INSTANCE, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, + 1, // number of pipe instances + // output buffer size (XXX tune) + Channel::kReadBufferSize, + // input buffer size (XXX tune) + Channel::kReadBufferSize, + 5000, // timeout in milliseconds (XXX tune) + &security_attributes); + LocalFree(security_attributes.lpSecurityDescriptor); + } else { + pipe_ = CreateFileW(pipe_name.c_str(), + GENERIC_READ | GENERIC_WRITE, + 0, + NULL, + OPEN_EXISTING, + SECURITY_SQOS_PRESENT | SECURITY_IDENTIFICATION | + FILE_FLAG_OVERLAPPED, + NULL); + } + if (pipe_ == INVALID_HANDLE_VALUE) { + // If this process is being closed, the pipe may be gone already. + LOG(WARNING) << "failed to create pipe: " << GetLastError(); + return false; + } + + // Create the Hello message to be sent when Connect is called + scoped_ptr m(new Message(MSG_ROUTING_NONE, + HELLO_MESSAGE_TYPE, + IPC::Message::PRIORITY_NORMAL)); + if (!m->WriteInt(GetCurrentProcessId())) { + CloseHandle(pipe_); + pipe_ = INVALID_HANDLE_VALUE; + return false; + } + + output_queue_.push(m.release()); + return true; +} + +bool Channel::ChannelImpl::Connect() { + DLOG(WARNING) << "Connect called twice"; + + if (pipe_ == INVALID_HANDLE_VALUE) + return false; + + MessageLoopForIO::current()->RegisterIOHandler(pipe_, this); + + // Check to see if there is a client connected to our pipe... + if (waiting_connect_) + ProcessConnection(); + + if (!input_state_.is_pending) { + // Complete setup asynchronously. By not setting input_state_.is_pending + // to true, we indicate to OnIOCompleted that this is the special + // initialization signal. + MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod( + &Channel::ChannelImpl::OnIOCompleted, &input_state_.context, 0, 0)); + } + + if (!waiting_connect_) + ProcessOutgoingMessages(NULL, 0); + return true; +} + +bool Channel::ChannelImpl::ProcessConnection() { + if (input_state_.is_pending) + input_state_.is_pending = false; + + // Do we have a client connected to our pipe? + if (INVALID_HANDLE_VALUE == pipe_) + return false; + + BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped); + + DWORD err = GetLastError(); + if (ok) { + // Uhm, the API documentation says that this function should never + // return success when used in overlapped mode. + NOTREACHED(); + return false; + } + + switch (err) { + case ERROR_IO_PENDING: + input_state_.is_pending = true; + break; + case ERROR_PIPE_CONNECTED: + waiting_connect_ = false; + break; + default: + NOTREACHED(); + return false; + } + + return true; +} + +bool Channel::ChannelImpl::ProcessIncomingMessages( + MessageLoopForIO::IOContext* context, + DWORD bytes_read) { + if (input_state_.is_pending) { + input_state_.is_pending = false; + DCHECK(context); + + if (!context || !bytes_read) + return false; + } else { + // This happens at channel initialization. + DCHECK(!bytes_read && context == &input_state_.context); + } + + for (;;) { + if (bytes_read == 0) { + if (INVALID_HANDLE_VALUE == pipe_) + return false; + + // Read from pipe... + BOOL ok = ReadFile(pipe_, + input_buf_, + Channel::kReadBufferSize, + &bytes_read, + &input_state_.context.overlapped); + if (!ok) { + DWORD err = GetLastError(); + if (err == ERROR_IO_PENDING) { + input_state_.is_pending = true; + return true; + } + LOG(ERROR) << "pipe error: " << err; + return false; + } + input_state_.is_pending = true; + return true; + } + DCHECK(bytes_read); + + // Process messages from input buffer. + + const char* p, *end; + if (input_overflow_buf_.empty()) { + p = input_buf_; + end = p + bytes_read; + } else { + if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) { + input_overflow_buf_.clear(); + LOG(ERROR) << "IPC message is too big"; + return false; + } + input_overflow_buf_.append(input_buf_, bytes_read); + p = input_overflow_buf_.data(); + end = p + input_overflow_buf_.size(); + } + + while (p < end) { + const char* message_tail = Message::FindNext(p, end); + if (message_tail) { + int len = static_cast(message_tail - p); + const Message m(p, len); +#ifdef IPC_MESSAGE_DEBUG_EXTRA + DLOG(INFO) << "received message on channel @" << this << + " with type " << m.type(); +#endif + if (m.routing_id() == MSG_ROUTING_NONE && + m.type() == HELLO_MESSAGE_TYPE) { + // The Hello message contains only the process id. + listener_->OnChannelConnected(MessageIterator(m).NextInt()); + } else { + listener_->OnMessageReceived(m); + } + p = message_tail; + } else { + // Last message is partial. + break; + } + } + input_overflow_buf_.assign(p, end - p); + + bytes_read = 0; // Get more data. + } + + return true; +} + +bool Channel::ChannelImpl::ProcessOutgoingMessages( + MessageLoopForIO::IOContext* context, + DWORD bytes_written) { + DCHECK(!waiting_connect_); // Why are we trying to send messages if there's + // no connection? + + if (output_state_.is_pending) { + DCHECK(context); + output_state_.is_pending = false; + if (!context || bytes_written == 0) { + DWORD err = GetLastError(); + LOG(ERROR) << "pipe error: " << err; + return false; + } + // Message was sent. + DCHECK(!output_queue_.empty()); + Message* m = output_queue_.front(); + output_queue_.pop(); + delete m; + } + + if (output_queue_.empty()) + return true; + + if (INVALID_HANDLE_VALUE == pipe_) + return false; + + // Write to pipe... + Message* m = output_queue_.front(); + BOOL ok = WriteFile(pipe_, + m->data(), + m->size(), + &bytes_written, + &output_state_.context.overlapped); + if (!ok) { + DWORD err = GetLastError(); + if (err == ERROR_IO_PENDING) { + output_state_.is_pending = true; + +#ifdef IPC_MESSAGE_DEBUG_EXTRA + DLOG(INFO) << "sent pending message @" << m << " on channel @" << + this << " with type " << m->type(); +#endif + + return true; + } + LOG(ERROR) << "pipe error: " << err; + return false; + } + +#ifdef IPC_MESSAGE_DEBUG_EXTRA + DLOG(INFO) << "sent message @" << m << " on channel @" << this << + " with type " << m->type(); +#endif + + output_state_.is_pending = true; + return true; +} + +void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_transfered, DWORD error) { + bool ok; + if (context == &input_state_.context) { + if (waiting_connect_) { + if (!ProcessConnection()) + return; + // We may have some messages queued up to send... + if (!output_queue_.empty() && !output_state_.is_pending) + ProcessOutgoingMessages(NULL, 0); + if (input_state_.is_pending) + return; + // else, fall-through and look for incoming messages... + } + // we don't support recursion through OnMessageReceived yet! + DCHECK(!processing_incoming_); + processing_incoming_ = true; + ok = ProcessIncomingMessages(context, bytes_transfered); + processing_incoming_ = false; + } else { + DCHECK(context == &output_state_.context); + ok = ProcessOutgoingMessages(context, bytes_transfered); + } + if (!ok && INVALID_HANDLE_VALUE != pipe_) { + // We don't want to re-enter Close(). + Close(); + listener_->OnChannelError(); + } +} + +//------------------------------------------------------------------------------ +// Channel's methods simply call through to ChannelImpl. +Channel::Channel(const std::wstring& channel_id, Mode mode, + Listener* listener) + : channel_impl_(new ChannelImpl(channel_id, mode, listener)) { +} + +Channel::~Channel() { + delete channel_impl_; +} + +bool Channel::Connect() { + return channel_impl_->Connect(); +} + +void Channel::Close() { + channel_impl_->Close(); +} + +void Channel::set_listener(Listener* listener) { + channel_impl_->set_listener(listener); +} + +bool Channel::Send(Message* message) { + return channel_impl_->Send(message); +} + +} // namespace IPC diff --git a/chrome/common/ipc_channel_win.h b/chrome/common/ipc_channel_win.h new file mode 100644 index 0000000..fcd8abe --- /dev/null +++ b/chrome/common/ipc_channel_win.h @@ -0,0 +1,81 @@ +// Copyright (c) 2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CHROME_COMMON_IPC_CHANNEL_WIN_H_ +#define CHROME_COMMON_IPC_CHANNEL_WIN_H_ + +#include "chrome/common/ipc_channel.h" + +#include +#include + +#include "base/message_loop.h" + +namespace IPC { + +class Channel::ChannelImpl : public MessageLoopForIO::IOHandler { + public: + // Mirror methods of Channel, see ipc_channel.h for description. + ChannelImpl(const std::wstring& channel_id, Mode mode, Listener* listener); + ~ChannelImpl() { Close(); } + bool Connect(); + void Close(); + void set_listener(Listener* listener) { listener_ = listener; } + bool Send(Message* message); + private: + const std::wstring PipeName(const std::wstring& channel_id) const; + bool CreatePipe(const std::wstring& channel_id, Mode mode); + + bool ProcessConnection(); + bool ProcessIncomingMessages(MessageLoopForIO::IOContext* context, + DWORD bytes_read); + bool ProcessOutgoingMessages(MessageLoopForIO::IOContext* context, + DWORD bytes_written); + + // MessageLoop::IOHandler implementation. + virtual void OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_transfered, DWORD error); + private: + struct State { + explicit State(ChannelImpl* channel); + ~State(); + MessageLoopForIO::IOContext context; + bool is_pending; + }; + + State input_state_; + State output_state_; + + HANDLE pipe_; + + Listener* listener_; + + // Messages to be sent are queued here. + std::queue output_queue_; + + // We read from the pipe into this buffer + char input_buf_[Channel::kReadBufferSize]; + + // Large messages that span multiple pipe buffers, get built-up using + // this buffer. + std::string input_overflow_buf_; + + // In server-mode, we have to wait for the client to connect before we + // can begin reading. We make use of the input_state_ when performing + // the connect operation in overlapped mode. + bool waiting_connect_; + + // This flag is set when processing incoming messages. It is used to + // avoid recursing through ProcessIncomingMessages, which could cause + // problems. TODO(darin): make this unnecessary + bool processing_incoming_; + + ScopedRunnableMethodFactory factory_; + + DISALLOW_COPY_AND_ASSIGN(ChannelImpl); +}; + +} // namespace IPC + +#endif // CHROME_COMMON_IPC_CHANNEL_WIN_H_ diff --git a/chrome/common/ipc_fuzzing_tests.cc b/chrome/common/ipc_fuzzing_tests.cc index cec05fb..0c729f8 100644 --- a/chrome/common/ipc_fuzzing_tests.cc +++ b/chrome/common/ipc_fuzzing_tests.cc @@ -7,13 +7,13 @@ #include #include -#include "chrome/common/ipc_tests.h" - +#include "base/message_loop.h" #include "base/platform_thread.h" #include "base/process_util.h" #include "chrome/common/ipc_channel.h" #include "chrome/common/ipc_channel_proxy.h" #include "chrome/common/ipc_message_utils.h" +#include "chrome/common/ipc_tests.h" #include "testing/gtest/include/gtest/gtest.h" #include "testing/multiprocess_func_list.h" diff --git a/chrome/common/resource_dispatcher.cc b/chrome/common/resource_dispatcher.cc index d3c7a33..ecf58e6 100644 --- a/chrome/common/resource_dispatcher.cc +++ b/chrome/common/resource_dispatcher.cc @@ -7,6 +7,7 @@ #include "chrome/common/resource_dispatcher.h" #include "base/basictypes.h" +#include "base/message_loop.h" #include "base/shared_memory.h" #include "base/string_util.h" #include "chrome/common/render_messages.h" diff --git a/chrome/common/resource_dispatcher.h b/chrome/common/resource_dispatcher.h index c290e10..e948045 100644 --- a/chrome/common/resource_dispatcher.h +++ b/chrome/common/resource_dispatcher.h @@ -9,6 +9,8 @@ #include +#include + #include "base/hash_tables.h" #include "base/task.h" #include "chrome/common/ipc_channel.h" -- cgit v1.1