diff options
author | rvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-10-09 23:58:43 +0000 |
---|---|---|
committer | rvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-10-09 23:58:43 +0000 |
commit | 32cda29d0266751c764b043d8aaec6dccc646e29 (patch) | |
tree | f7a4b556c69da92949236cb8eb9a5e4647147099 | |
parent | dd59411d090b2a6a7327f5c0d527321bdccd5e84 (diff) | |
download | chromium_src-32cda29d0266751c764b043d8aaec6dccc646e29.zip chromium_src-32cda29d0266751c764b043d8aaec6dccc646e29.tar.gz chromium_src-32cda29d0266751c764b043d8aaec6dccc646e29.tar.bz2 |
Add a way to register for completion-ports based async operations to be handled
through the windows version of the message pump.
As a first step, actual IO processing is still performed using WatchObject instead of
using completion ports.
Review URL: http://codereview.chromium.org/1950
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@3157 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | base/message_loop.cc | 9 | ||||
-rw-r--r-- | base/message_loop.h | 3 | ||||
-rw-r--r-- | base/message_loop_unittest.cc | 115 | ||||
-rw-r--r-- | base/message_pump_win.cc | 69 | ||||
-rw-r--r-- | base/message_pump_win.h | 34 |
5 files changed, 223 insertions, 7 deletions
diff --git a/base/message_loop.cc b/base/message_loop.cc index 348648f..cc87fe6 100644 --- a/base/message_loop.cc +++ b/base/message_loop.cc @@ -584,6 +584,15 @@ void MessageLoopForIO::WatchObject(HANDLE object, Watcher* watcher) { pump_io()->WatchObject(object, watcher); } +void MessageLoopForIO::RegisterIOHandler(HANDLE file, IOHandler* handler) { + pump_io()->RegisterIOHandler(file, handler); +} + +void MessageLoopForIO::RegisterIOContext(OVERLAPPED* context, + IOHandler* handler) { + pump_io()->RegisterIOContext(context, handler); +} + #elif defined(OS_POSIX) void MessageLoopForIO::WatchSocket(int socket, short interest_mask, diff --git a/base/message_loop.h b/base/message_loop.h index cc67587..e13d568 100644 --- a/base/message_loop.h +++ b/base/message_loop.h @@ -459,9 +459,12 @@ class MessageLoopForIO : public MessageLoop { #if defined(OS_WIN) typedef base::MessagePumpForIO::Watcher Watcher; + typedef base::MessagePumpForIO::IOHandler IOHandler; // Please see MessagePumpWin for definitions of these methods. void WatchObject(HANDLE object, Watcher* watcher); + void RegisterIOHandler(HANDLE file_handle, IOHandler* handler); + void RegisterIOContext(OVERLAPPED* context, IOHandler* handler); protected: // TODO(rvargas): Make this platform independent. diff --git a/base/message_loop_unittest.cc b/base/message_loop_unittest.cc index 62f2ae9..b742d76 100644 --- a/base/message_loop_unittest.cc +++ b/base/message_loop_unittest.cc @@ -240,7 +240,8 @@ void RunTest_PostDelayedTask_InPostOrder(MessageLoop::Type message_loop_type) { EXPECT_TRUE(run_time1 < run_time2); } -void RunTest_PostDelayedTask_InPostOrder_2(MessageLoop::Type message_loop_type) { +void RunTest_PostDelayedTask_InPostOrder_2( + MessageLoop::Type message_loop_type) { MessageLoop loop(message_loop_type); // Test that a delayed task still runs after a normal tasks even if the @@ -265,7 +266,8 @@ void RunTest_PostDelayedTask_InPostOrder_2(MessageLoop::Type message_loop_type) EXPECT_LT(kPauseMS, (time_after_run - time_before_run).InMilliseconds()); } -void RunTest_PostDelayedTask_InPostOrder_3(MessageLoop::Type message_loop_type) { +void RunTest_PostDelayedTask_InPostOrder_3( + MessageLoop::Type message_loop_type) { MessageLoop loop(message_loop_type); // Test that a delayed task still runs after a pile of normal tasks. The key @@ -314,7 +316,7 @@ void RunTest_PostDelayedTask_SharedTimer(MessageLoop::Type message_loop_type) { // Ensure that we ran in far less time than the slower timer. TimeDelta total_time = Time::Now() - start_time; EXPECT_GT(5000, total_time.InMilliseconds()); - + // In case both timers somehow run at nearly the same time, sleep a little // and then run all pending to force them both to have run. This is just // encouraging flakiness if there is any. @@ -332,7 +334,7 @@ class SubPumpTask : public Task { virtual void Run() { MessageLoop::current()->SetNestableTasksAllowed(true); MSG msg; - while (GetMessage(&msg, NULL, 0, 0)) { + while (GetMessage(&msg, NULL, 0, 0)) { TranslateMessage(&msg); DispatchMessage(&msg); } @@ -550,7 +552,7 @@ void RunTest_Crasher(MessageLoop::Type message_loop_type) { void RunTest_CrasherNasty(MessageLoop::Type message_loop_type) { MessageLoop loop(message_loop_type); - + if (::IsDebuggerPresent()) return; @@ -856,7 +858,7 @@ void RunTest_RecursiveDenial1(MessageLoop::Type message_loop_type) { void RunTest_RecursiveSupport1(MessageLoop::Type message_loop_type) { MessageLoop loop(message_loop_type); - + TaskList order; MessageLoop::current()->PostTask(FROM_HERE, new RecursiveTask(2, &order, 1, true)); @@ -1054,7 +1056,7 @@ void RunTest_NonNestableInNestedLoop(MessageLoop::Type message_loop_type) { class AutoresetWatcher : public MessageLoopForIO::Watcher { public: - AutoresetWatcher(HANDLE signal) : signal_(signal) { + explicit AutoresetWatcher(HANDLE signal) : signal_(signal) { } virtual void OnObjectSignaled(HANDLE object); private: @@ -1144,6 +1146,101 @@ void RunTest_Dispatcher(MessageLoop::Type message_loop_type) { ASSERT_EQ(2, dispatcher.dispatch_count_); } +class TestIOHandler : public MessageLoopForIO::IOHandler { + public: + TestIOHandler(const wchar_t* name, HANDLE signal); + + virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, + DWORD error); + + HANDLE file() { return file_.Get(); } + void* buffer() { return buffer_; } + OVERLAPPED* context() { return &context_; } + DWORD size() { return sizeof(buffer_); } + + private: + char buffer_[48]; + OVERLAPPED context_; + HANDLE signal_; + ScopedHandle file_; + ScopedHandle event_; +}; + +TestIOHandler::TestIOHandler(const wchar_t* name, HANDLE signal) + : signal_(signal) { + memset(buffer_, 0, sizeof(buffer_)); + memset(&context_, 0, sizeof(context_)); + + event_.Set(CreateEvent(NULL, TRUE, FALSE, NULL)); + EXPECT_TRUE(event_.IsValid()); + context_.hEvent = event_.Get(); + + file_.Set(CreateFile(name, GENERIC_READ, 0, NULL, OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, NULL)); + EXPECT_TRUE(file_.IsValid()); +} + +void TestIOHandler::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, + DWORD error) { + ASSERT_TRUE(context == &context_); + MessageLoopForIO::current()->RegisterIOContext(context, NULL); + ASSERT_TRUE(SetEvent(signal_)); +} + +class IOHandlerTask : public Task { + public: + explicit IOHandlerTask(TestIOHandler* handler) : handler_(handler) {} + virtual void Run(); + + private: + TestIOHandler* handler_; +}; + +void IOHandlerTask::Run() { + MessageLoopForIO::current()->RegisterIOHandler(handler_->file(), handler_); + MessageLoopForIO::current()->RegisterIOContext(handler_->context(), handler_); + + DWORD read; + EXPECT_FALSE(ReadFile(handler_->file(), handler_->buffer(), handler_->size(), + &read, handler_->context())); + EXPECT_EQ(ERROR_IO_PENDING, GetLastError()); +} + +void RunTest_IOHandler() { + // This test requires an IO loop. + MessageLoop loop(MessageLoop::TYPE_IO); + + ScopedHandle callback_called(CreateEvent(NULL, TRUE, FALSE, NULL)); + ASSERT_TRUE(callback_called.IsValid()); + + const wchar_t* kPipeName = L"\\\\.\\pipe\\iohandler_pipe"; + ScopedHandle server(CreateNamedPipe(kPipeName, PIPE_ACCESS_OUTBOUND, 0, 1, + 0, 0, 0, NULL)); + ASSERT_TRUE(server.IsValid()); + + Thread thread("IOHandler test"); + Thread::Options options; + options.message_loop_type = MessageLoop::TYPE_IO; + ASSERT_TRUE(thread.StartWithOptions(options)); + + MessageLoop* thread_loop = thread.message_loop(); + ASSERT_TRUE(NULL != thread_loop); + + TestIOHandler handler(kPipeName, callback_called); + IOHandlerTask* task = new IOHandlerTask(&handler); + thread_loop->PostTask(FROM_HERE, task); + Sleep(100); // Make sure the thread runs and sleeps for lack of work. + + const char buffer[] = "Hello there!"; + DWORD written; + EXPECT_TRUE(WriteFile(server, buffer, sizeof(buffer), &written, NULL)); + + DWORD result = WaitForSingleObject(callback_called, 1000); + EXPECT_EQ(WAIT_OBJECT_0, result); + + thread.Stop(); +} + #endif // defined(OS_WIN) } // namespace @@ -1289,4 +1386,8 @@ TEST(MessageLoopTest, Dispatcher) { // This test requires a UI loop RunTest_Dispatcher(MessageLoop::TYPE_UI); } + +TEST(MessageLoopTest, IOHandler) { + RunTest_IOHandler(); +} #endif // defined(OS_WIN) diff --git a/base/message_pump_win.cc b/base/message_pump_win.cc index 4fff9d7..4eb0995 100644 --- a/base/message_pump_win.cc +++ b/base/message_pump_win.cc @@ -9,6 +9,41 @@ #include "base/histogram.h" #include "base/win_util.h" +namespace { + +class HandlerData : public base::MessagePumpForIO::Watcher { + public: + typedef base::MessagePumpForIO::IOHandler IOHandler; + HandlerData(OVERLAPPED* context, IOHandler* handler) + : context_(context), handler_(handler) {} + ~HandlerData() {} + + virtual void OnObjectSignaled(HANDLE object); + + private: + OVERLAPPED* context_; + IOHandler* handler_; + + DISALLOW_COPY_AND_ASSIGN(HandlerData); +}; + +void HandlerData::OnObjectSignaled(HANDLE object) { + DCHECK(object == context_->hEvent); + DWORD transfered; + DWORD error = ERROR_SUCCESS; + BOOL ret = GetOverlappedResult(NULL, context_, &transfered, FALSE); + if (!ret) { + error = GetLastError(); + DCHECK(ERROR_HANDLE_EOF == error || ERROR_BROKEN_PIPE == error); + transfered = 0; + } + + ResetEvent(context_->hEvent); + handler_->OnIOCompleted(context_, transfered, error); +} + +} // namespace + namespace base { static const wchar_t kWndClass[] = L"Chrome_MessagePumpWindow"; @@ -426,6 +461,40 @@ void MessagePumpForIO::WatchObject(HANDLE object, Watcher* watcher) { } } +void MessagePumpForIO::RegisterIOHandler(HANDLE file_handle, + IOHandler* handler) { +#if 0 + // TODO(rvargas): This is just to give an idea of what this code will look + // like when we actually move to completion ports. Of course, we cannot + // do this without calling GetQueuedCompletionStatus(). + ULONG_PTR key = reinterpret_cast<ULONG_PTR>(handler); + HANDLE port = CreateIoCompletionPort(file_handle, port_, key, 1); + if (!port_.IsValid()) + port_.Set(port); +#endif +} + +void MessagePumpForIO::RegisterIOContext(OVERLAPPED* context, + IOHandler* handler) { + DCHECK(context->hEvent); + if (handler) { + HandlerData* watcher = new HandlerData(context, handler); + WatchObject(context->hEvent, watcher); + } else { + std::vector<HANDLE>::iterator it = + find(objects_.begin(), objects_.end(), context->hEvent); + + if (it == objects_.end()) { + NOTREACHED(); + return; + } + + std::vector<HANDLE>::difference_type index = it - objects_.begin(); + objects_.erase(it); + delete watchers_[index]; + watchers_.erase(watchers_.begin() + index); + } +} //----------------------------------------------------------------------------- // MessagePumpForIO private: diff --git a/base/message_pump_win.h b/base/message_pump_win.h index 0192e3c..50a5223 100644 --- a/base/message_pump_win.h +++ b/base/message_pump_win.h @@ -12,6 +12,7 @@ #include "base/lock.h" #include "base/message_pump.h" #include "base/observer_list.h" +#include "base/scoped_handle.h" #include "base/time.h" namespace base { @@ -193,6 +194,20 @@ class MessagePumpForIO : public MessagePumpWin { virtual void OnObjectSignaled(HANDLE object) = 0; }; + // Clients interested in receiving OS notifications when asynchronous IO + // operations complete should implement this interface and register themselves + // with the message pump. + class IOHandler { + public: + virtual ~IOHandler() {} + // This will be called once the pending IO operation associated with + // |context| completes. |error| is the Win32 error code of the IO operation + // (ERROR_SUCCESS if there was no error). |bytes_transfered| will be zero + // on error. + virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, + DWORD error) = 0; + }; + MessagePumpForIO() {} virtual ~MessagePumpForIO() {} @@ -200,6 +215,22 @@ class MessagePumpForIO : public MessagePumpWin { // Pass a null watcher to stop watching the object. void WatchObject(HANDLE, Watcher*); + // Register the handler to be used when asynchronous IO for the given file + // completes. The registration persists as long as |file_handle| is valid, so + // |handler| must be valid as long as there is pending IO for the given file. + void RegisterIOHandler(HANDLE file_handle, IOHandler* handler); + + // This is just a throw away function to ease transition to completion ports. + // Pass NULL for handler to stop tracking this request. WARNING: cancellation + // correctness is the responsibility of the caller. |context| must contain a + // valid manual reset event, but the caller should not interact directly with + // it. The registration can live across a single IO operation, or it can live + // across multiple IO operations without having to reset it after each IO + // completion callback. Internally, there will be a WatchObject registration + // alive as long as this context registration is in effect. It is an error + // to unregister a context that has not been registered before. + void RegisterIOContext(OVERLAPPED* context, IOHandler* handler); + private: virtual void DoRunLoop(); void WaitForWork(); @@ -210,6 +241,9 @@ class MessagePumpForIO : public MessagePumpWin { // serviced by this message pump. std::vector<HANDLE> objects_; std::vector<Watcher*> watchers_; + + // The completion port associated with this thread. + ScopedHandle port_; }; } // namespace base |