diff options
author | rvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-11-07 21:52:15 +0000 |
---|---|---|
committer | rvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-11-07 21:52:15 +0000 |
commit | 17b891482a081341470ead21ca7eda953d74dd69 (patch) | |
tree | 1ca1393109504064e44a31435714a238ca171477 /chrome/common/ipc_channel.cc | |
parent | 209c36546ae088f1cf76e7f72765ad92b3cdaa2e (diff) | |
download | chromium_src-17b891482a081341470ead21ca7eda953d74dd69.zip chromium_src-17b891482a081341470ead21ca7eda953d74dd69.tar.gz chromium_src-17b891482a081341470ead21ca7eda953d74dd69.tar.bz2 |
Switch MessagePumpForIO to use completion ports on Windows.
Cleanup the separation between MessagePumpForUI and
MessagePumpForIO, and convert the latter to use Completion
Ports instead of MsgWaitForMultipleobjects to sleep
when idle.
Remove all traces of Windows messages from MessagePumpForIO,
remove the transitional API of completion port notifications
and remove WatchObject API.
Modify all callers of RegisterIOHandler so that they are no
longer using RegisterIOContext, and also handle properly
the new semantics of completion ports (notifications even when
the IO completes immediately).
Add a new interface to allow proper cleanup of disk cache (to
replace code that was waiting for pending APCs from the destructor).
Add a way for the message pump to perform cleanup of abandoned IO.
BUG=B/1344358, 3497, 3630
TESt=unit tests
R=darin
Review URL: http://codereview.chromium.org/8156
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@5021 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/common/ipc_channel.cc')
-rw-r--r-- | chrome/common/ipc_channel.cc | 144 |
1 files changed, 72 insertions, 72 deletions
diff --git a/chrome/common/ipc_channel.cc b/chrome/common/ipc_channel.cc index a9d73e7..934ea9b 100644 --- a/chrome/common/ipc_channel.cc +++ b/chrome/common/ipc_channel.cc @@ -20,24 +20,21 @@ namespace IPC { //------------------------------------------------------------------------------ -Channel::State::State() - : is_pending(false) { - memset(&overlapped, 0, sizeof(overlapped)); - overlapped.hEvent = CreateEvent(NULL, // default security attributes - TRUE, // manual-reset event - TRUE, // initial state = signaled - NULL); // unnamed event object +Channel::State::State(Channel* channel) : is_pending(false) { + memset(&context.overlapped, 0, sizeof(context.overlapped)); + context.handler = channel; } Channel::State::~State() { - if (overlapped.hEvent) - CloseHandle(overlapped.hEvent); + COMPILE_ASSERT(!offsetof(Channel::State, context), starts_with_io_context); } //------------------------------------------------------------------------------ Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener) - : pipe_(INVALID_HANDLE_VALUE), + : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)), + ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)), + pipe_(INVALID_HANDLE_VALUE), listener_(listener), waiting_connect_(mode == MODE_SERVER), processing_incoming_(false), @@ -50,23 +47,29 @@ Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener) } void Channel::Close() { - // make sure we are no longer watching the pipe events - MessageLoopForIO* loop = MessageLoopForIO::current(); - if (input_state_.is_pending) { - input_state_.is_pending = false; - loop->RegisterIOContext(&input_state_.overlapped, NULL); - } - - if (output_state_.is_pending) { - output_state_.is_pending = false; - loop->RegisterIOContext(&output_state_.overlapped, NULL); + bool waited = false; + if (input_state_.is_pending || output_state_.is_pending) { + CancelIo(pipe_); + waited = true; } + // Closing the handle at this point prevents us from issuing more requests + // form OnIOCompleted(). if (pipe_ != INVALID_HANDLE_VALUE) { CloseHandle(pipe_); pipe_ = INVALID_HANDLE_VALUE; } + // Make sure all IO has completed. + base::Time start = base::Time::Now(); + while (input_state_.is_pending || output_state_.is_pending) { + MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); + } + if (waited) { + // We want to see if we block the message loop for too long. + UMA_HISTOGRAM_TIMES(L"AsyncIO.IPCChannelClose", base::Time::Now() - start); + } + while (!output_queue_.empty()) { Message* m = output_queue_.front(); output_queue_.pop(); @@ -175,7 +178,7 @@ bool Channel::Connect() { // to true, we indicate to OnIOCompleted that this is the special // initialization signal. MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod( - &Channel::OnIOCompleted, &input_state_.overlapped, 0, 0)); + &Channel::OnIOCompleted, &input_state_.context, 0, 0)); } if (!waiting_connect_) @@ -184,15 +187,14 @@ bool Channel::Connect() { } bool Channel::ProcessConnection() { - if (input_state_.is_pending) { + if (input_state_.is_pending) input_state_.is_pending = false; - MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped, - NULL); - } // Do we have a client connected to our pipe? - DCHECK(pipe_ != INVALID_HANDLE_VALUE); - BOOL ok = ConnectNamedPipe(pipe_, &input_state_.overlapped); + if (INVALID_HANDLE_VALUE == pipe_) + return false; + + BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped); DWORD err = GetLastError(); if (ok) { @@ -205,8 +207,6 @@ bool Channel::ProcessConnection() { switch (err) { case ERROR_IO_PENDING: input_state_.is_pending = true; - MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped, - this); break; case ERROR_PIPE_CONNECTED: waiting_connect_ = false; @@ -219,40 +219,41 @@ bool Channel::ProcessConnection() { return true; } -bool Channel::ProcessIncomingMessages(OVERLAPPED* context, +bool Channel::ProcessIncomingMessages(MessageLoopForIO::IOContext* context, DWORD bytes_read) { if (input_state_.is_pending) { input_state_.is_pending = false; DCHECK(context); - MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped, - NULL); if (!context || !bytes_read) return false; } else { // This happens at channel initialization. - DCHECK(!bytes_read && context == &input_state_.overlapped); + DCHECK(!bytes_read && context == &input_state_.context); } for (;;) { if (bytes_read == 0) { + if (INVALID_HANDLE_VALUE == pipe_) + return false; + // Read from pipe... BOOL ok = ReadFile(pipe_, input_buf_, BUF_SIZE, &bytes_read, - &input_state_.overlapped); + &input_state_.context.overlapped); if (!ok) { DWORD err = GetLastError(); if (err == ERROR_IO_PENDING) { - MessageLoopForIO::current()->RegisterIOContext( - &input_state_.overlapped, this); input_state_.is_pending = true; return true; } LOG(ERROR) << "pipe error: " << err; return false; } + input_state_.is_pending = true; + return true; } DCHECK(bytes_read); @@ -303,15 +304,13 @@ bool Channel::ProcessIncomingMessages(OVERLAPPED* context, return true; } -bool Channel::ProcessOutgoingMessages(OVERLAPPED* context, +bool Channel::ProcessOutgoingMessages(MessageLoopForIO::IOContext* context, DWORD bytes_written) { DCHECK(!waiting_connect_); // Why are we trying to send messages if there's // no connection? if (output_state_.is_pending) { DCHECK(context); - MessageLoopForIO::current()->RegisterIOContext(&output_state_.overlapped, - NULL); output_state_.is_pending = false; if (!context || bytes_written == 0) { DWORD err = GetLastError(); @@ -325,42 +324,41 @@ bool Channel::ProcessOutgoingMessages(OVERLAPPED* context, delete m; } - while (!output_queue_.empty()) { - // Write to pipe... - Message* m = output_queue_.front(); - BOOL ok = WriteFile(pipe_, - m->data(), - m->size(), - &bytes_written, - &output_state_.overlapped); - if (!ok) { - DWORD err = GetLastError(); - if (err == ERROR_IO_PENDING) { - MessageLoopForIO::current()->RegisterIOContext( - &output_state_.overlapped, this); - output_state_.is_pending = true; + if (output_queue_.empty()) + return true; + + if (INVALID_HANDLE_VALUE == pipe_) + return false; + + // Write to pipe... + Message* m = output_queue_.front(); + BOOL ok = WriteFile(pipe_, + m->data(), + m->size(), + &bytes_written, + &output_state_.context.overlapped); + if (!ok) { + DWORD err = GetLastError(); + if (err == ERROR_IO_PENDING) { + output_state_.is_pending = true; #ifdef IPC_MESSAGE_DEBUG_EXTRA - DLOG(INFO) << "sent pending message @" << m << " on channel @" << - this << " with type " << m->type(); + DLOG(INFO) << "sent pending message @" << m << " on channel @" << + this << " with type " << m->type(); #endif - return true; - } - LOG(ERROR) << "pipe error: " << err; - return false; + return true; } - DCHECK(bytes_written == m->size()); - output_queue_.pop(); + LOG(ERROR) << "pipe error: " << err; + return false; + } #ifdef IPC_MESSAGE_DEBUG_EXTRA - DLOG(INFO) << "sent message @" << m << " on channel @" << this << - " with type " << m->type(); + DLOG(INFO) << "sent message @" << m << " on channel @" << this << + " with type " << m->type(); #endif - delete m; - } - + output_state_.is_pending = true; return true; } @@ -400,12 +398,13 @@ bool Channel::ProcessPendingMessages(DWORD max_wait_msec) { #endif } -void Channel::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, - DWORD error) { +void Channel::OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_transfered, DWORD error) { bool ok; - if (context == &input_state_.overlapped) { + if (context == &input_state_.context) { if (waiting_connect_) { - ProcessConnection(); + if (!ProcessConnection()) + return; // We may have some messages queued up to send... if (!output_queue_.empty() && !output_state_.is_pending) ProcessOutgoingMessages(NULL, 0); @@ -419,10 +418,11 @@ void Channel::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, ok = ProcessIncomingMessages(context, bytes_transfered); processing_incoming_ = false; } else { - DCHECK(context == &output_state_.overlapped); + DCHECK(context == &output_state_.context); ok = ProcessOutgoingMessages(context, bytes_transfered); } - if (!ok) { + if (!ok && INVALID_HANDLE_VALUE != pipe_) { + // We don't want to re-enter Close(). Close(); listener_->OnChannelError(); } |