summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2008-10-09 23:58:43 +0000
committerrvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2008-10-09 23:58:43 +0000
commit32cda29d0266751c764b043d8aaec6dccc646e29 (patch)
treef7a4b556c69da92949236cb8eb9a5e4647147099
parentdd59411d090b2a6a7327f5c0d527321bdccd5e84 (diff)
downloadchromium_src-32cda29d0266751c764b043d8aaec6dccc646e29.zip
chromium_src-32cda29d0266751c764b043d8aaec6dccc646e29.tar.gz
chromium_src-32cda29d0266751c764b043d8aaec6dccc646e29.tar.bz2
Add a way to register for completion-ports based async operations to be handled
through the windows version of the message pump. As a first step, actual IO processing is still performed using WatchObject instead of using completion ports. Review URL: http://codereview.chromium.org/1950 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@3157 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--base/message_loop.cc9
-rw-r--r--base/message_loop.h3
-rw-r--r--base/message_loop_unittest.cc115
-rw-r--r--base/message_pump_win.cc69
-rw-r--r--base/message_pump_win.h34
5 files changed, 223 insertions, 7 deletions
diff --git a/base/message_loop.cc b/base/message_loop.cc
index 348648f..cc87fe6 100644
--- a/base/message_loop.cc
+++ b/base/message_loop.cc
@@ -584,6 +584,15 @@ void MessageLoopForIO::WatchObject(HANDLE object, Watcher* watcher) {
pump_io()->WatchObject(object, watcher);
}
+void MessageLoopForIO::RegisterIOHandler(HANDLE file, IOHandler* handler) {
+ pump_io()->RegisterIOHandler(file, handler);
+}
+
+void MessageLoopForIO::RegisterIOContext(OVERLAPPED* context,
+ IOHandler* handler) {
+ pump_io()->RegisterIOContext(context, handler);
+}
+
#elif defined(OS_POSIX)
void MessageLoopForIO::WatchSocket(int socket, short interest_mask,
diff --git a/base/message_loop.h b/base/message_loop.h
index cc67587..e13d568 100644
--- a/base/message_loop.h
+++ b/base/message_loop.h
@@ -459,9 +459,12 @@ class MessageLoopForIO : public MessageLoop {
#if defined(OS_WIN)
typedef base::MessagePumpForIO::Watcher Watcher;
+ typedef base::MessagePumpForIO::IOHandler IOHandler;
// Please see MessagePumpWin for definitions of these methods.
void WatchObject(HANDLE object, Watcher* watcher);
+ void RegisterIOHandler(HANDLE file_handle, IOHandler* handler);
+ void RegisterIOContext(OVERLAPPED* context, IOHandler* handler);
protected:
// TODO(rvargas): Make this platform independent.
diff --git a/base/message_loop_unittest.cc b/base/message_loop_unittest.cc
index 62f2ae9..b742d76 100644
--- a/base/message_loop_unittest.cc
+++ b/base/message_loop_unittest.cc
@@ -240,7 +240,8 @@ void RunTest_PostDelayedTask_InPostOrder(MessageLoop::Type message_loop_type) {
EXPECT_TRUE(run_time1 < run_time2);
}
-void RunTest_PostDelayedTask_InPostOrder_2(MessageLoop::Type message_loop_type) {
+void RunTest_PostDelayedTask_InPostOrder_2(
+ MessageLoop::Type message_loop_type) {
MessageLoop loop(message_loop_type);
// Test that a delayed task still runs after a normal tasks even if the
@@ -265,7 +266,8 @@ void RunTest_PostDelayedTask_InPostOrder_2(MessageLoop::Type message_loop_type)
EXPECT_LT(kPauseMS, (time_after_run - time_before_run).InMilliseconds());
}
-void RunTest_PostDelayedTask_InPostOrder_3(MessageLoop::Type message_loop_type) {
+void RunTest_PostDelayedTask_InPostOrder_3(
+ MessageLoop::Type message_loop_type) {
MessageLoop loop(message_loop_type);
// Test that a delayed task still runs after a pile of normal tasks. The key
@@ -314,7 +316,7 @@ void RunTest_PostDelayedTask_SharedTimer(MessageLoop::Type message_loop_type) {
// Ensure that we ran in far less time than the slower timer.
TimeDelta total_time = Time::Now() - start_time;
EXPECT_GT(5000, total_time.InMilliseconds());
-
+
// In case both timers somehow run at nearly the same time, sleep a little
// and then run all pending to force them both to have run. This is just
// encouraging flakiness if there is any.
@@ -332,7 +334,7 @@ class SubPumpTask : public Task {
virtual void Run() {
MessageLoop::current()->SetNestableTasksAllowed(true);
MSG msg;
- while (GetMessage(&msg, NULL, 0, 0)) {
+ while (GetMessage(&msg, NULL, 0, 0)) {
TranslateMessage(&msg);
DispatchMessage(&msg);
}
@@ -550,7 +552,7 @@ void RunTest_Crasher(MessageLoop::Type message_loop_type) {
void RunTest_CrasherNasty(MessageLoop::Type message_loop_type) {
MessageLoop loop(message_loop_type);
-
+
if (::IsDebuggerPresent())
return;
@@ -856,7 +858,7 @@ void RunTest_RecursiveDenial1(MessageLoop::Type message_loop_type) {
void RunTest_RecursiveSupport1(MessageLoop::Type message_loop_type) {
MessageLoop loop(message_loop_type);
-
+
TaskList order;
MessageLoop::current()->PostTask(FROM_HERE,
new RecursiveTask(2, &order, 1, true));
@@ -1054,7 +1056,7 @@ void RunTest_NonNestableInNestedLoop(MessageLoop::Type message_loop_type) {
class AutoresetWatcher : public MessageLoopForIO::Watcher {
public:
- AutoresetWatcher(HANDLE signal) : signal_(signal) {
+ explicit AutoresetWatcher(HANDLE signal) : signal_(signal) {
}
virtual void OnObjectSignaled(HANDLE object);
private:
@@ -1144,6 +1146,101 @@ void RunTest_Dispatcher(MessageLoop::Type message_loop_type) {
ASSERT_EQ(2, dispatcher.dispatch_count_);
}
+class TestIOHandler : public MessageLoopForIO::IOHandler {
+ public:
+ TestIOHandler(const wchar_t* name, HANDLE signal);
+
+ virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered,
+ DWORD error);
+
+ HANDLE file() { return file_.Get(); }
+ void* buffer() { return buffer_; }
+ OVERLAPPED* context() { return &context_; }
+ DWORD size() { return sizeof(buffer_); }
+
+ private:
+ char buffer_[48];
+ OVERLAPPED context_;
+ HANDLE signal_;
+ ScopedHandle file_;
+ ScopedHandle event_;
+};
+
+TestIOHandler::TestIOHandler(const wchar_t* name, HANDLE signal)
+ : signal_(signal) {
+ memset(buffer_, 0, sizeof(buffer_));
+ memset(&context_, 0, sizeof(context_));
+
+ event_.Set(CreateEvent(NULL, TRUE, FALSE, NULL));
+ EXPECT_TRUE(event_.IsValid());
+ context_.hEvent = event_.Get();
+
+ file_.Set(CreateFile(name, GENERIC_READ, 0, NULL, OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED, NULL));
+ EXPECT_TRUE(file_.IsValid());
+}
+
+void TestIOHandler::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered,
+ DWORD error) {
+ ASSERT_TRUE(context == &context_);
+ MessageLoopForIO::current()->RegisterIOContext(context, NULL);
+ ASSERT_TRUE(SetEvent(signal_));
+}
+
+class IOHandlerTask : public Task {
+ public:
+ explicit IOHandlerTask(TestIOHandler* handler) : handler_(handler) {}
+ virtual void Run();
+
+ private:
+ TestIOHandler* handler_;
+};
+
+void IOHandlerTask::Run() {
+ MessageLoopForIO::current()->RegisterIOHandler(handler_->file(), handler_);
+ MessageLoopForIO::current()->RegisterIOContext(handler_->context(), handler_);
+
+ DWORD read;
+ EXPECT_FALSE(ReadFile(handler_->file(), handler_->buffer(), handler_->size(),
+ &read, handler_->context()));
+ EXPECT_EQ(ERROR_IO_PENDING, GetLastError());
+}
+
+void RunTest_IOHandler() {
+ // This test requires an IO loop.
+ MessageLoop loop(MessageLoop::TYPE_IO);
+
+ ScopedHandle callback_called(CreateEvent(NULL, TRUE, FALSE, NULL));
+ ASSERT_TRUE(callback_called.IsValid());
+
+ const wchar_t* kPipeName = L"\\\\.\\pipe\\iohandler_pipe";
+ ScopedHandle server(CreateNamedPipe(kPipeName, PIPE_ACCESS_OUTBOUND, 0, 1,
+ 0, 0, 0, NULL));
+ ASSERT_TRUE(server.IsValid());
+
+ Thread thread("IOHandler test");
+ Thread::Options options;
+ options.message_loop_type = MessageLoop::TYPE_IO;
+ ASSERT_TRUE(thread.StartWithOptions(options));
+
+ MessageLoop* thread_loop = thread.message_loop();
+ ASSERT_TRUE(NULL != thread_loop);
+
+ TestIOHandler handler(kPipeName, callback_called);
+ IOHandlerTask* task = new IOHandlerTask(&handler);
+ thread_loop->PostTask(FROM_HERE, task);
+ Sleep(100); // Make sure the thread runs and sleeps for lack of work.
+
+ const char buffer[] = "Hello there!";
+ DWORD written;
+ EXPECT_TRUE(WriteFile(server, buffer, sizeof(buffer), &written, NULL));
+
+ DWORD result = WaitForSingleObject(callback_called, 1000);
+ EXPECT_EQ(WAIT_OBJECT_0, result);
+
+ thread.Stop();
+}
+
#endif // defined(OS_WIN)
} // namespace
@@ -1289,4 +1386,8 @@ TEST(MessageLoopTest, Dispatcher) {
// This test requires a UI loop
RunTest_Dispatcher(MessageLoop::TYPE_UI);
}
+
+TEST(MessageLoopTest, IOHandler) {
+ RunTest_IOHandler();
+}
#endif // defined(OS_WIN)
diff --git a/base/message_pump_win.cc b/base/message_pump_win.cc
index 4fff9d7..4eb0995 100644
--- a/base/message_pump_win.cc
+++ b/base/message_pump_win.cc
@@ -9,6 +9,41 @@
#include "base/histogram.h"
#include "base/win_util.h"
+namespace {
+
+class HandlerData : public base::MessagePumpForIO::Watcher {
+ public:
+ typedef base::MessagePumpForIO::IOHandler IOHandler;
+ HandlerData(OVERLAPPED* context, IOHandler* handler)
+ : context_(context), handler_(handler) {}
+ ~HandlerData() {}
+
+ virtual void OnObjectSignaled(HANDLE object);
+
+ private:
+ OVERLAPPED* context_;
+ IOHandler* handler_;
+
+ DISALLOW_COPY_AND_ASSIGN(HandlerData);
+};
+
+void HandlerData::OnObjectSignaled(HANDLE object) {
+ DCHECK(object == context_->hEvent);
+ DWORD transfered;
+ DWORD error = ERROR_SUCCESS;
+ BOOL ret = GetOverlappedResult(NULL, context_, &transfered, FALSE);
+ if (!ret) {
+ error = GetLastError();
+ DCHECK(ERROR_HANDLE_EOF == error || ERROR_BROKEN_PIPE == error);
+ transfered = 0;
+ }
+
+ ResetEvent(context_->hEvent);
+ handler_->OnIOCompleted(context_, transfered, error);
+}
+
+} // namespace
+
namespace base {
static const wchar_t kWndClass[] = L"Chrome_MessagePumpWindow";
@@ -426,6 +461,40 @@ void MessagePumpForIO::WatchObject(HANDLE object, Watcher* watcher) {
}
}
+void MessagePumpForIO::RegisterIOHandler(HANDLE file_handle,
+ IOHandler* handler) {
+#if 0
+ // TODO(rvargas): This is just to give an idea of what this code will look
+ // like when we actually move to completion ports. Of course, we cannot
+ // do this without calling GetQueuedCompletionStatus().
+ ULONG_PTR key = reinterpret_cast<ULONG_PTR>(handler);
+ HANDLE port = CreateIoCompletionPort(file_handle, port_, key, 1);
+ if (!port_.IsValid())
+ port_.Set(port);
+#endif
+}
+
+void MessagePumpForIO::RegisterIOContext(OVERLAPPED* context,
+ IOHandler* handler) {
+ DCHECK(context->hEvent);
+ if (handler) {
+ HandlerData* watcher = new HandlerData(context, handler);
+ WatchObject(context->hEvent, watcher);
+ } else {
+ std::vector<HANDLE>::iterator it =
+ find(objects_.begin(), objects_.end(), context->hEvent);
+
+ if (it == objects_.end()) {
+ NOTREACHED();
+ return;
+ }
+
+ std::vector<HANDLE>::difference_type index = it - objects_.begin();
+ objects_.erase(it);
+ delete watchers_[index];
+ watchers_.erase(watchers_.begin() + index);
+ }
+}
//-----------------------------------------------------------------------------
// MessagePumpForIO private:
diff --git a/base/message_pump_win.h b/base/message_pump_win.h
index 0192e3c..50a5223 100644
--- a/base/message_pump_win.h
+++ b/base/message_pump_win.h
@@ -12,6 +12,7 @@
#include "base/lock.h"
#include "base/message_pump.h"
#include "base/observer_list.h"
+#include "base/scoped_handle.h"
#include "base/time.h"
namespace base {
@@ -193,6 +194,20 @@ class MessagePumpForIO : public MessagePumpWin {
virtual void OnObjectSignaled(HANDLE object) = 0;
};
+ // Clients interested in receiving OS notifications when asynchronous IO
+ // operations complete should implement this interface and register themselves
+ // with the message pump.
+ class IOHandler {
+ public:
+ virtual ~IOHandler() {}
+ // This will be called once the pending IO operation associated with
+ // |context| completes. |error| is the Win32 error code of the IO operation
+ // (ERROR_SUCCESS if there was no error). |bytes_transfered| will be zero
+ // on error.
+ virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered,
+ DWORD error) = 0;
+ };
+
MessagePumpForIO() {}
virtual ~MessagePumpForIO() {}
@@ -200,6 +215,22 @@ class MessagePumpForIO : public MessagePumpWin {
// Pass a null watcher to stop watching the object.
void WatchObject(HANDLE, Watcher*);
+ // Register the handler to be used when asynchronous IO for the given file
+ // completes. The registration persists as long as |file_handle| is valid, so
+ // |handler| must be valid as long as there is pending IO for the given file.
+ void RegisterIOHandler(HANDLE file_handle, IOHandler* handler);
+
+ // This is just a throw away function to ease transition to completion ports.
+ // Pass NULL for handler to stop tracking this request. WARNING: cancellation
+ // correctness is the responsibility of the caller. |context| must contain a
+ // valid manual reset event, but the caller should not interact directly with
+ // it. The registration can live across a single IO operation, or it can live
+ // across multiple IO operations without having to reset it after each IO
+ // completion callback. Internally, there will be a WatchObject registration
+ // alive as long as this context registration is in effect. It is an error
+ // to unregister a context that has not been registered before.
+ void RegisterIOContext(OVERLAPPED* context, IOHandler* handler);
+
private:
virtual void DoRunLoop();
void WaitForWork();
@@ -210,6 +241,9 @@ class MessagePumpForIO : public MessagePumpWin {
// serviced by this message pump.
std::vector<HANDLE> objects_;
std::vector<Watcher*> watchers_;
+
+ // The completion port associated with this thread.
+ ScopedHandle port_;
};
} // namespace base