diff options
author | agl@chromium.org <agl@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-07-22 23:57:21 +0000 |
---|---|---|
committer | agl@chromium.org <agl@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-07-22 23:57:21 +0000 |
commit | 946d1b2c806795351598aeb9faaed797284a8ee3 (patch) | |
tree | d8d2695f73a56ec33ab068f9070fe93cb7c0e4a3 /ipc/ipc_channel_win.cc | |
parent | 00fceac62015db950f3dde84f5aeeacb82f1b2c6 (diff) | |
download | chromium_src-946d1b2c806795351598aeb9faaed797284a8ee3.zip chromium_src-946d1b2c806795351598aeb9faaed797284a8ee3.tar.gz chromium_src-946d1b2c806795351598aeb9faaed797284a8ee3.tar.bz2 |
Split the IPC code into ipc/
This splits the ipc code from the common project. The 'common' project pulls in
all of webkit, the v8 bindings, skia, googleurl, and a number of other projects
which makes it very difficult to deal with especially for external projects
wanting just to use some of Chromium's infrastructure. This puts the ipc code
into its top-level ipc/ directory with a dependency only on base. The common
project depends on the new ipc/ipc.gyp:ipc target so that all projects currently
pulling common in to get the IPC code still have it available. This mostly
follows agl's pre-gyp attempt to do this which was r13062.
Known issues:
- Currently a number of projects depend on chrome/chrome.gyp:common in order to
use the IPC infrastructure. Rather than fixing all of these dependencies I have
made common depend on ipc/ipc.gyp:ipc and added "ipc" to the include_rules
section of DEPS so that checkdeps.py doesn't complain. Over time projects that
need IPC should depend on the IPC project themselves and dependencies on common
removed, although I don't think many projects that need IPC will be able to get
away without common currently.
- ipc/ipc_message_macros.h still has #include "chrome/common/..." inside of a
ipc/ should not refer to files in chrome/... now. I'm not sure how to resolve
this since it's really an IDE bug
- the named pipe name (windows+linux) and the logging event name (all) + env
variable (posix) refer explicitly to 'Chrome' which somewhat hurts the illusion
of ipc/ being an independent library. I think this should be examined in a
subsequent, much smaller patch.
- I've eliminated the IPC.SendMsgCount counter since it was implemented in a way
to create a dependency from ipc/ to chrome/common/chrome_counters. This is the
same approach that r13062 took.
http://codereview.chromium.org/155905
(Patch from James Robinson)
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@21342 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'ipc/ipc_channel_win.cc')
-rw-r--r-- | ipc/ipc_channel_win.cc | 442 |
1 files changed, 442 insertions, 0 deletions
diff --git a/ipc/ipc_channel_win.cc b/ipc/ipc_channel_win.cc new file mode 100644 index 0000000..9296ea4 --- /dev/null +++ b/ipc/ipc_channel_win.cc @@ -0,0 +1,442 @@ +// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "ipc/ipc_channel_win.h" + +#include <windows.h> +#include <sstream> + +#include "base/compiler_specific.h" +#include "base/logging.h" +#include "base/non_thread_safe.h" +#include "base/stats_counters.h" +#include "base/win_util.h" +#include "ipc/ipc_logging.h" +#include "ipc/ipc_message_utils.h" + +namespace IPC { +//------------------------------------------------------------------------------ + +Channel::ChannelImpl::State::State(ChannelImpl* channel) : is_pending(false) { + memset(&context.overlapped, 0, sizeof(context.overlapped)); + context.handler = channel; +} + +Channel::ChannelImpl::State::~State() { + COMPILE_ASSERT(!offsetof(Channel::ChannelImpl::State, context), + starts_with_io_context); +} + +//------------------------------------------------------------------------------ + +Channel::ChannelImpl::ChannelImpl(const std::string& channel_id, Mode mode, + Listener* listener) + : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)), + ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)), + pipe_(INVALID_HANDLE_VALUE), + listener_(listener), + waiting_connect_(mode == MODE_SERVER), + processing_incoming_(false), + ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) { + if (!CreatePipe(channel_id, mode)) { + // The pipe may have been closed already. + LOG(WARNING) << "Unable to create pipe named \"" << channel_id << + "\" in " << (mode == 0 ? "server" : "client") << " mode."; + } +} + +void Channel::ChannelImpl::Close() { + if (thread_check_.get()) { + DCHECK(thread_check_->CalledOnValidThread()); + } + + bool waited = false; + if (input_state_.is_pending || output_state_.is_pending) { + CancelIo(pipe_); + waited = true; + } + + // Closing the handle at this point prevents us from issuing more requests + // form OnIOCompleted(). + if (pipe_ != INVALID_HANDLE_VALUE) { + CloseHandle(pipe_); + pipe_ = INVALID_HANDLE_VALUE; + } + + // Make sure all IO has completed. + base::Time start = base::Time::Now(); + while (input_state_.is_pending || output_state_.is_pending) { + MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); + } + if (waited) { + // We want to see if we block the message loop for too long. + UMA_HISTOGRAM_TIMES("AsyncIO.IPCChannelClose", base::Time::Now() - start); + } + + while (!output_queue_.empty()) { + Message* m = output_queue_.front(); + output_queue_.pop(); + delete m; + } +} + +bool Channel::ChannelImpl::Send(Message* message) { + DCHECK(thread_check_->CalledOnValidThread()); +#ifdef IPC_MESSAGE_DEBUG_EXTRA + DLOG(INFO) << "sending message @" << message << " on channel @" << this + << " with type " << message->type() + << " (" << output_queue_.size() << " in queue)"; +#endif + +#ifdef IPC_MESSAGE_LOG_ENABLED + Logging::current()->OnSendMessage(message, ""); +#endif + + output_queue_.push(message); + // ensure waiting to write + if (!waiting_connect_) { + if (!output_state_.is_pending) { + if (!ProcessOutgoingMessages(NULL, 0)) + return false; + } + } + + return true; +} + +const std::wstring Channel::ChannelImpl::PipeName( + const std::string& channel_id) const { + std::wostringstream ss; + // XXX(darin): get application name from somewhere else + ss << L"\\\\.\\pipe\\chrome." << ASCIIToWide(channel_id); + return ss.str(); +} + +bool Channel::ChannelImpl::CreatePipe(const std::string& channel_id, + Mode mode) { + DCHECK(pipe_ == INVALID_HANDLE_VALUE); + const std::wstring pipe_name = PipeName(channel_id); + if (mode == MODE_SERVER) { + SECURITY_ATTRIBUTES security_attributes = {0}; + security_attributes.bInheritHandle = FALSE; + security_attributes.nLength = sizeof(SECURITY_ATTRIBUTES); + if (!win_util::GetLogonSessionOnlyDACL( + reinterpret_cast<SECURITY_DESCRIPTOR**>( + &security_attributes.lpSecurityDescriptor))) { + NOTREACHED(); + } + + pipe_ = CreateNamedPipeW(pipe_name.c_str(), + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | + FILE_FLAG_FIRST_PIPE_INSTANCE, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, + 1, // number of pipe instances + // output buffer size (XXX tune) + Channel::kReadBufferSize, + // input buffer size (XXX tune) + Channel::kReadBufferSize, + 5000, // timeout in milliseconds (XXX tune) + &security_attributes); + LocalFree(security_attributes.lpSecurityDescriptor); + } else { + pipe_ = CreateFileW(pipe_name.c_str(), + GENERIC_READ | GENERIC_WRITE, + 0, + NULL, + OPEN_EXISTING, + SECURITY_SQOS_PRESENT | SECURITY_IDENTIFICATION | + FILE_FLAG_OVERLAPPED, + NULL); + } + if (pipe_ == INVALID_HANDLE_VALUE) { + // If this process is being closed, the pipe may be gone already. + LOG(WARNING) << "failed to create pipe: " << GetLastError(); + return false; + } + + // Create the Hello message to be sent when Connect is called + scoped_ptr<Message> m(new Message(MSG_ROUTING_NONE, + HELLO_MESSAGE_TYPE, + IPC::Message::PRIORITY_NORMAL)); + if (!m->WriteInt(GetCurrentProcessId())) { + CloseHandle(pipe_); + pipe_ = INVALID_HANDLE_VALUE; + return false; + } + + output_queue_.push(m.release()); + return true; +} + +bool Channel::ChannelImpl::Connect() { + DLOG(WARNING) << "Connect called twice"; + + if (!thread_check_.get()) + thread_check_.reset(new NonThreadSafe()); + + if (pipe_ == INVALID_HANDLE_VALUE) + return false; + + MessageLoopForIO::current()->RegisterIOHandler(pipe_, this); + + // Check to see if there is a client connected to our pipe... + if (waiting_connect_) + ProcessConnection(); + + if (!input_state_.is_pending) { + // Complete setup asynchronously. By not setting input_state_.is_pending + // to true, we indicate to OnIOCompleted that this is the special + // initialization signal. + MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod( + &Channel::ChannelImpl::OnIOCompleted, &input_state_.context, 0, 0)); + } + + if (!waiting_connect_) + ProcessOutgoingMessages(NULL, 0); + return true; +} + +bool Channel::ChannelImpl::ProcessConnection() { + DCHECK(thread_check_->CalledOnValidThread()); + if (input_state_.is_pending) + input_state_.is_pending = false; + + // Do we have a client connected to our pipe? + if (INVALID_HANDLE_VALUE == pipe_) + return false; + + BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped); + + DWORD err = GetLastError(); + if (ok) { + // Uhm, the API documentation says that this function should never + // return success when used in overlapped mode. + NOTREACHED(); + return false; + } + + switch (err) { + case ERROR_IO_PENDING: + input_state_.is_pending = true; + break; + case ERROR_PIPE_CONNECTED: + waiting_connect_ = false; + break; + case ERROR_NO_DATA: + // The pipe is being closed. + return false; + default: + NOTREACHED(); + return false; + } + + return true; +} + +bool Channel::ChannelImpl::ProcessIncomingMessages( + MessageLoopForIO::IOContext* context, + DWORD bytes_read) { + DCHECK(thread_check_->CalledOnValidThread()); + if (input_state_.is_pending) { + input_state_.is_pending = false; + DCHECK(context); + + if (!context || !bytes_read) + return false; + } else { + // This happens at channel initialization. + DCHECK(!bytes_read && context == &input_state_.context); + } + + for (;;) { + if (bytes_read == 0) { + if (INVALID_HANDLE_VALUE == pipe_) + return false; + + // Read from pipe... + BOOL ok = ReadFile(pipe_, + input_buf_, + Channel::kReadBufferSize, + &bytes_read, + &input_state_.context.overlapped); + if (!ok) { + DWORD err = GetLastError(); + if (err == ERROR_IO_PENDING) { + input_state_.is_pending = true; + return true; + } + LOG(ERROR) << "pipe error: " << err; + return false; + } + input_state_.is_pending = true; + return true; + } + DCHECK(bytes_read); + + // Process messages from input buffer. + + const char* p, *end; + if (input_overflow_buf_.empty()) { + p = input_buf_; + end = p + bytes_read; + } else { + if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) { + input_overflow_buf_.clear(); + LOG(ERROR) << "IPC message is too big"; + return false; + } + input_overflow_buf_.append(input_buf_, bytes_read); + p = input_overflow_buf_.data(); + end = p + input_overflow_buf_.size(); + } + + while (p < end) { + const char* message_tail = Message::FindNext(p, end); + if (message_tail) { + int len = static_cast<int>(message_tail - p); + const Message m(p, len); +#ifdef IPC_MESSAGE_DEBUG_EXTRA + DLOG(INFO) << "received message on channel @" << this << + " with type " << m.type(); +#endif + if (m.routing_id() == MSG_ROUTING_NONE && + m.type() == HELLO_MESSAGE_TYPE) { + // The Hello message contains only the process id. + listener_->OnChannelConnected(MessageIterator(m).NextInt()); + } else { + listener_->OnMessageReceived(m); + } + p = message_tail; + } else { + // Last message is partial. + break; + } + } + input_overflow_buf_.assign(p, end - p); + + bytes_read = 0; // Get more data. + } + + return true; +} + +bool Channel::ChannelImpl::ProcessOutgoingMessages( + MessageLoopForIO::IOContext* context, + DWORD bytes_written) { + DCHECK(!waiting_connect_); // Why are we trying to send messages if there's + // no connection? + DCHECK(thread_check_->CalledOnValidThread()); + + if (output_state_.is_pending) { + DCHECK(context); + output_state_.is_pending = false; + if (!context || bytes_written == 0) { + DWORD err = GetLastError(); + LOG(ERROR) << "pipe error: " << err; + return false; + } + // Message was sent. + DCHECK(!output_queue_.empty()); + Message* m = output_queue_.front(); + output_queue_.pop(); + delete m; + } + + if (output_queue_.empty()) + return true; + + if (INVALID_HANDLE_VALUE == pipe_) + return false; + + // Write to pipe... + Message* m = output_queue_.front(); + BOOL ok = WriteFile(pipe_, + m->data(), + m->size(), + &bytes_written, + &output_state_.context.overlapped); + if (!ok) { + DWORD err = GetLastError(); + if (err == ERROR_IO_PENDING) { + output_state_.is_pending = true; + +#ifdef IPC_MESSAGE_DEBUG_EXTRA + DLOG(INFO) << "sent pending message @" << m << " on channel @" << + this << " with type " << m->type(); +#endif + + return true; + } + LOG(ERROR) << "pipe error: " << err; + return false; + } + +#ifdef IPC_MESSAGE_DEBUG_EXTRA + DLOG(INFO) << "sent message @" << m << " on channel @" << this << + " with type " << m->type(); +#endif + + output_state_.is_pending = true; + return true; +} + +void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_transfered, DWORD error) { + bool ok; + DCHECK(thread_check_->CalledOnValidThread()); + if (context == &input_state_.context) { + if (waiting_connect_) { + if (!ProcessConnection()) + return; + // We may have some messages queued up to send... + if (!output_queue_.empty() && !output_state_.is_pending) + ProcessOutgoingMessages(NULL, 0); + if (input_state_.is_pending) + return; + // else, fall-through and look for incoming messages... + } + // we don't support recursion through OnMessageReceived yet! + DCHECK(!processing_incoming_); + processing_incoming_ = true; + ok = ProcessIncomingMessages(context, bytes_transfered); + processing_incoming_ = false; + } else { + DCHECK(context == &output_state_.context); + ok = ProcessOutgoingMessages(context, bytes_transfered); + } + if (!ok && INVALID_HANDLE_VALUE != pipe_) { + // We don't want to re-enter Close(). + Close(); + listener_->OnChannelError(); + } +} + +//------------------------------------------------------------------------------ +// Channel's methods simply call through to ChannelImpl. +Channel::Channel(const std::string& channel_id, Mode mode, + Listener* listener) + : channel_impl_(new ChannelImpl(channel_id, mode, listener)) { +} + +Channel::~Channel() { + delete channel_impl_; +} + +bool Channel::Connect() { + return channel_impl_->Connect(); +} + +void Channel::Close() { + channel_impl_->Close(); +} + +void Channel::set_listener(Listener* listener) { + channel_impl_->set_listener(listener); +} + +bool Channel::Send(Message* message) { + return channel_impl_->Send(message); +} + +} // namespace IPC |