summaryrefslogtreecommitdiffstats
path: root/chrome/common
diff options
context:
space:
mode:
authorrvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2008-11-07 21:52:15 +0000
committerrvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2008-11-07 21:52:15 +0000
commit17b891482a081341470ead21ca7eda953d74dd69 (patch)
tree1ca1393109504064e44a31435714a238ca171477 /chrome/common
parent209c36546ae088f1cf76e7f72765ad92b3cdaa2e (diff)
downloadchromium_src-17b891482a081341470ead21ca7eda953d74dd69.zip
chromium_src-17b891482a081341470ead21ca7eda953d74dd69.tar.gz
chromium_src-17b891482a081341470ead21ca7eda953d74dd69.tar.bz2
Switch MessagePumpForIO to use completion ports on Windows.
Cleanup the separation between MessagePumpForUI and MessagePumpForIO, and convert the latter to use Completion Ports instead of MsgWaitForMultipleobjects to sleep when idle. Remove all traces of Windows messages from MessagePumpForIO, remove the transitional API of completion port notifications and remove WatchObject API. Modify all callers of RegisterIOHandler so that they are no longer using RegisterIOContext, and also handle properly the new semantics of completion ports (notifications even when the IO completes immediately). Add a new interface to allow proper cleanup of disk cache (to replace code that was waiting for pending APCs from the destructor). Add a way for the message pump to perform cleanup of abandoned IO. BUG=B/1344358, 3497, 3630 TESt=unit tests R=darin Review URL: http://codereview.chromium.org/8156 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@5021 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/common')
-rw-r--r--chrome/common/ipc_channel.cc144
-rw-r--r--chrome/common/ipc_channel.h14
-rw-r--r--chrome/common/ipc_sync_channel_unittest.cc8
3 files changed, 84 insertions, 82 deletions
diff --git a/chrome/common/ipc_channel.cc b/chrome/common/ipc_channel.cc
index a9d73e7..934ea9b 100644
--- a/chrome/common/ipc_channel.cc
+++ b/chrome/common/ipc_channel.cc
@@ -20,24 +20,21 @@ namespace IPC {
//------------------------------------------------------------------------------
-Channel::State::State()
- : is_pending(false) {
- memset(&overlapped, 0, sizeof(overlapped));
- overlapped.hEvent = CreateEvent(NULL, // default security attributes
- TRUE, // manual-reset event
- TRUE, // initial state = signaled
- NULL); // unnamed event object
+Channel::State::State(Channel* channel) : is_pending(false) {
+ memset(&context.overlapped, 0, sizeof(context.overlapped));
+ context.handler = channel;
}
Channel::State::~State() {
- if (overlapped.hEvent)
- CloseHandle(overlapped.hEvent);
+ COMPILE_ASSERT(!offsetof(Channel::State, context), starts_with_io_context);
}
//------------------------------------------------------------------------------
Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener)
- : pipe_(INVALID_HANDLE_VALUE),
+ : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)),
+ pipe_(INVALID_HANDLE_VALUE),
listener_(listener),
waiting_connect_(mode == MODE_SERVER),
processing_incoming_(false),
@@ -50,23 +47,29 @@ Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener)
}
void Channel::Close() {
- // make sure we are no longer watching the pipe events
- MessageLoopForIO* loop = MessageLoopForIO::current();
- if (input_state_.is_pending) {
- input_state_.is_pending = false;
- loop->RegisterIOContext(&input_state_.overlapped, NULL);
- }
-
- if (output_state_.is_pending) {
- output_state_.is_pending = false;
- loop->RegisterIOContext(&output_state_.overlapped, NULL);
+ bool waited = false;
+ if (input_state_.is_pending || output_state_.is_pending) {
+ CancelIo(pipe_);
+ waited = true;
}
+ // Closing the handle at this point prevents us from issuing more requests
+ // form OnIOCompleted().
if (pipe_ != INVALID_HANDLE_VALUE) {
CloseHandle(pipe_);
pipe_ = INVALID_HANDLE_VALUE;
}
+ // Make sure all IO has completed.
+ base::Time start = base::Time::Now();
+ while (input_state_.is_pending || output_state_.is_pending) {
+ MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this);
+ }
+ if (waited) {
+ // We want to see if we block the message loop for too long.
+ UMA_HISTOGRAM_TIMES(L"AsyncIO.IPCChannelClose", base::Time::Now() - start);
+ }
+
while (!output_queue_.empty()) {
Message* m = output_queue_.front();
output_queue_.pop();
@@ -175,7 +178,7 @@ bool Channel::Connect() {
// to true, we indicate to OnIOCompleted that this is the special
// initialization signal.
MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod(
- &Channel::OnIOCompleted, &input_state_.overlapped, 0, 0));
+ &Channel::OnIOCompleted, &input_state_.context, 0, 0));
}
if (!waiting_connect_)
@@ -184,15 +187,14 @@ bool Channel::Connect() {
}
bool Channel::ProcessConnection() {
- if (input_state_.is_pending) {
+ if (input_state_.is_pending)
input_state_.is_pending = false;
- MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped,
- NULL);
- }
// Do we have a client connected to our pipe?
- DCHECK(pipe_ != INVALID_HANDLE_VALUE);
- BOOL ok = ConnectNamedPipe(pipe_, &input_state_.overlapped);
+ if (INVALID_HANDLE_VALUE == pipe_)
+ return false;
+
+ BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped);
DWORD err = GetLastError();
if (ok) {
@@ -205,8 +207,6 @@ bool Channel::ProcessConnection() {
switch (err) {
case ERROR_IO_PENDING:
input_state_.is_pending = true;
- MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped,
- this);
break;
case ERROR_PIPE_CONNECTED:
waiting_connect_ = false;
@@ -219,40 +219,41 @@ bool Channel::ProcessConnection() {
return true;
}
-bool Channel::ProcessIncomingMessages(OVERLAPPED* context,
+bool Channel::ProcessIncomingMessages(MessageLoopForIO::IOContext* context,
DWORD bytes_read) {
if (input_state_.is_pending) {
input_state_.is_pending = false;
DCHECK(context);
- MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped,
- NULL);
if (!context || !bytes_read)
return false;
} else {
// This happens at channel initialization.
- DCHECK(!bytes_read && context == &input_state_.overlapped);
+ DCHECK(!bytes_read && context == &input_state_.context);
}
for (;;) {
if (bytes_read == 0) {
+ if (INVALID_HANDLE_VALUE == pipe_)
+ return false;
+
// Read from pipe...
BOOL ok = ReadFile(pipe_,
input_buf_,
BUF_SIZE,
&bytes_read,
- &input_state_.overlapped);
+ &input_state_.context.overlapped);
if (!ok) {
DWORD err = GetLastError();
if (err == ERROR_IO_PENDING) {
- MessageLoopForIO::current()->RegisterIOContext(
- &input_state_.overlapped, this);
input_state_.is_pending = true;
return true;
}
LOG(ERROR) << "pipe error: " << err;
return false;
}
+ input_state_.is_pending = true;
+ return true;
}
DCHECK(bytes_read);
@@ -303,15 +304,13 @@ bool Channel::ProcessIncomingMessages(OVERLAPPED* context,
return true;
}
-bool Channel::ProcessOutgoingMessages(OVERLAPPED* context,
+bool Channel::ProcessOutgoingMessages(MessageLoopForIO::IOContext* context,
DWORD bytes_written) {
DCHECK(!waiting_connect_); // Why are we trying to send messages if there's
// no connection?
if (output_state_.is_pending) {
DCHECK(context);
- MessageLoopForIO::current()->RegisterIOContext(&output_state_.overlapped,
- NULL);
output_state_.is_pending = false;
if (!context || bytes_written == 0) {
DWORD err = GetLastError();
@@ -325,42 +324,41 @@ bool Channel::ProcessOutgoingMessages(OVERLAPPED* context,
delete m;
}
- while (!output_queue_.empty()) {
- // Write to pipe...
- Message* m = output_queue_.front();
- BOOL ok = WriteFile(pipe_,
- m->data(),
- m->size(),
- &bytes_written,
- &output_state_.overlapped);
- if (!ok) {
- DWORD err = GetLastError();
- if (err == ERROR_IO_PENDING) {
- MessageLoopForIO::current()->RegisterIOContext(
- &output_state_.overlapped, this);
- output_state_.is_pending = true;
+ if (output_queue_.empty())
+ return true;
+
+ if (INVALID_HANDLE_VALUE == pipe_)
+ return false;
+
+ // Write to pipe...
+ Message* m = output_queue_.front();
+ BOOL ok = WriteFile(pipe_,
+ m->data(),
+ m->size(),
+ &bytes_written,
+ &output_state_.context.overlapped);
+ if (!ok) {
+ DWORD err = GetLastError();
+ if (err == ERROR_IO_PENDING) {
+ output_state_.is_pending = true;
#ifdef IPC_MESSAGE_DEBUG_EXTRA
- DLOG(INFO) << "sent pending message @" << m << " on channel @" <<
- this << " with type " << m->type();
+ DLOG(INFO) << "sent pending message @" << m << " on channel @" <<
+ this << " with type " << m->type();
#endif
- return true;
- }
- LOG(ERROR) << "pipe error: " << err;
- return false;
+ return true;
}
- DCHECK(bytes_written == m->size());
- output_queue_.pop();
+ LOG(ERROR) << "pipe error: " << err;
+ return false;
+ }
#ifdef IPC_MESSAGE_DEBUG_EXTRA
- DLOG(INFO) << "sent message @" << m << " on channel @" << this <<
- " with type " << m->type();
+ DLOG(INFO) << "sent message @" << m << " on channel @" << this <<
+ " with type " << m->type();
#endif
- delete m;
- }
-
+ output_state_.is_pending = true;
return true;
}
@@ -400,12 +398,13 @@ bool Channel::ProcessPendingMessages(DWORD max_wait_msec) {
#endif
}
-void Channel::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered,
- DWORD error) {
+void Channel::OnIOCompleted(MessageLoopForIO::IOContext* context,
+ DWORD bytes_transfered, DWORD error) {
bool ok;
- if (context == &input_state_.overlapped) {
+ if (context == &input_state_.context) {
if (waiting_connect_) {
- ProcessConnection();
+ if (!ProcessConnection())
+ return;
// We may have some messages queued up to send...
if (!output_queue_.empty() && !output_state_.is_pending)
ProcessOutgoingMessages(NULL, 0);
@@ -419,10 +418,11 @@ void Channel::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered,
ok = ProcessIncomingMessages(context, bytes_transfered);
processing_incoming_ = false;
} else {
- DCHECK(context == &output_state_.overlapped);
+ DCHECK(context == &output_state_.context);
ok = ProcessOutgoingMessages(context, bytes_transfered);
}
- if (!ok) {
+ if (!ok && INVALID_HANDLE_VALUE != pipe_) {
+ // We don't want to re-enter Close().
Close();
listener_->OnChannelError();
}
diff --git a/chrome/common/ipc_channel.h b/chrome/common/ipc_channel.h
index e3e4ca2..b69f962 100644
--- a/chrome/common/ipc_channel.h
+++ b/chrome/common/ipc_channel.h
@@ -99,12 +99,14 @@ class Channel : public MessageLoopForIO::IOHandler,
const std::wstring PipeName(const std::wstring& channel_id) const;
bool CreatePipe(const std::wstring& channel_id, Mode mode);
bool ProcessConnection();
- bool ProcessIncomingMessages(OVERLAPPED* context, DWORD bytes_read);
- bool ProcessOutgoingMessages(OVERLAPPED* context, DWORD bytes_written);
+ bool ProcessIncomingMessages(MessageLoopForIO::IOContext* context,
+ DWORD bytes_read);
+ bool ProcessOutgoingMessages(MessageLoopForIO::IOContext* context,
+ DWORD bytes_written);
// MessageLoop::IOHandler implementation.
- virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered,
- DWORD error);
+ virtual void OnIOCompleted(MessageLoopForIO::IOContext* context,
+ DWORD bytes_transfered, DWORD error);
private:
enum {
@@ -112,9 +114,9 @@ class Channel : public MessageLoopForIO::IOHandler,
};
struct State {
- State();
+ explicit State(Channel* channel);
~State();
- OVERLAPPED overlapped;
+ MessageLoopForIO::IOContext context;
bool is_pending;
};
diff --git a/chrome/common/ipc_sync_channel_unittest.cc b/chrome/common/ipc_sync_channel_unittest.cc
index 1d28b06..749a507 100644
--- a/chrome/common/ipc_sync_channel_unittest.cc
+++ b/chrome/common/ipc_sync_channel_unittest.cc
@@ -104,7 +104,7 @@ class Worker : public Channel::Listener, public Message::Sender {
channel_->Close();
}
void Start() {
- StartThread(&listener_thread_);
+ StartThread(&listener_thread_, MessageLoop::TYPE_DEFAULT);
ListenerThread()->message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
this, &Worker::OnStart));
}
@@ -149,7 +149,7 @@ class Worker : public Channel::Listener, public Message::Sender {
// Called on the listener thread to create the sync channel.
void OnStart() {
// Link ipc_thread_, listener_thread_ and channel_ altogether.
- StartThread(&ipc_thread_);
+ StartThread(&ipc_thread_, MessageLoop::TYPE_IO);
channel_.reset(new SyncChannel(
channel_name_, mode_, this, NULL, ipc_thread_.message_loop(), true,
TestProcess::GetShutDownEvent()));
@@ -178,9 +178,9 @@ class Worker : public Channel::Listener, public Message::Sender {
IPC_END_MESSAGE_MAP()
}
- void StartThread(base::Thread* thread) {
+ void StartThread(base::Thread* thread, MessageLoop::Type type) {
base::Thread::Options options;
- options.message_loop_type = MessageLoop::TYPE_IO;
+ options.message_loop_type = type;
thread->StartWithOptions(options);
}