diff options
| author | rvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-10-09 23:58:43 +0000 | 
|---|---|---|
| committer | rvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-10-09 23:58:43 +0000 | 
| commit | 32cda29d0266751c764b043d8aaec6dccc646e29 (patch) | |
| tree | f7a4b556c69da92949236cb8eb9a5e4647147099 | |
| parent | dd59411d090b2a6a7327f5c0d527321bdccd5e84 (diff) | |
| download | chromium_src-32cda29d0266751c764b043d8aaec6dccc646e29.zip chromium_src-32cda29d0266751c764b043d8aaec6dccc646e29.tar.gz chromium_src-32cda29d0266751c764b043d8aaec6dccc646e29.tar.bz2 | |
Add a way to register for completion-ports based async operations to be handled
through the windows version of the message pump.
As a first step, actual IO processing is still performed using WatchObject instead of
using completion ports.
Review URL: http://codereview.chromium.org/1950
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@3157 0039d316-1c4b-4281-b951-d872f2087c98
| -rw-r--r-- | base/message_loop.cc | 9 | ||||
| -rw-r--r-- | base/message_loop.h | 3 | ||||
| -rw-r--r-- | base/message_loop_unittest.cc | 115 | ||||
| -rw-r--r-- | base/message_pump_win.cc | 69 | ||||
| -rw-r--r-- | base/message_pump_win.h | 34 | 
5 files changed, 223 insertions, 7 deletions
| diff --git a/base/message_loop.cc b/base/message_loop.cc index 348648f..cc87fe6 100644 --- a/base/message_loop.cc +++ b/base/message_loop.cc @@ -584,6 +584,15 @@ void MessageLoopForIO::WatchObject(HANDLE object, Watcher* watcher) {    pump_io()->WatchObject(object, watcher);  } +void MessageLoopForIO::RegisterIOHandler(HANDLE file, IOHandler* handler) { +  pump_io()->RegisterIOHandler(file, handler); +} + +void MessageLoopForIO::RegisterIOContext(OVERLAPPED* context, +                                         IOHandler* handler) { +  pump_io()->RegisterIOContext(context, handler); +} +  #elif defined(OS_POSIX)  void MessageLoopForIO::WatchSocket(int socket, short interest_mask,  diff --git a/base/message_loop.h b/base/message_loop.h index cc67587..e13d568 100644 --- a/base/message_loop.h +++ b/base/message_loop.h @@ -459,9 +459,12 @@ class MessageLoopForIO : public MessageLoop {  #if defined(OS_WIN)    typedef base::MessagePumpForIO::Watcher Watcher; +  typedef base::MessagePumpForIO::IOHandler IOHandler;    // Please see MessagePumpWin for definitions of these methods.    void WatchObject(HANDLE object, Watcher* watcher); +  void RegisterIOHandler(HANDLE file_handle, IOHandler* handler); +  void RegisterIOContext(OVERLAPPED* context, IOHandler* handler);   protected:    // TODO(rvargas): Make this platform independent. diff --git a/base/message_loop_unittest.cc b/base/message_loop_unittest.cc index 62f2ae9..b742d76 100644 --- a/base/message_loop_unittest.cc +++ b/base/message_loop_unittest.cc @@ -240,7 +240,8 @@ void RunTest_PostDelayedTask_InPostOrder(MessageLoop::Type message_loop_type) {    EXPECT_TRUE(run_time1 < run_time2);  } -void RunTest_PostDelayedTask_InPostOrder_2(MessageLoop::Type message_loop_type) { +void RunTest_PostDelayedTask_InPostOrder_2( +    MessageLoop::Type message_loop_type) {    MessageLoop loop(message_loop_type);    // Test that a delayed task still runs after a normal tasks even if the @@ -265,7 +266,8 @@ void RunTest_PostDelayedTask_InPostOrder_2(MessageLoop::Type message_loop_type)    EXPECT_LT(kPauseMS, (time_after_run - time_before_run).InMilliseconds());  } -void RunTest_PostDelayedTask_InPostOrder_3(MessageLoop::Type message_loop_type) { +void RunTest_PostDelayedTask_InPostOrder_3( +    MessageLoop::Type message_loop_type) {    MessageLoop loop(message_loop_type);    // Test that a delayed task still runs after a pile of normal tasks.  The key @@ -314,7 +316,7 @@ void RunTest_PostDelayedTask_SharedTimer(MessageLoop::Type message_loop_type) {    // Ensure that we ran in far less time than the slower timer.    TimeDelta total_time = Time::Now() - start_time;    EXPECT_GT(5000, total_time.InMilliseconds()); -   +    // In case both timers somehow run at nearly the same time, sleep a little    // and then run all pending to force them both to have run.  This is just    // encouraging flakiness if there is any. @@ -332,7 +334,7 @@ class SubPumpTask : public Task {    virtual void Run() {      MessageLoop::current()->SetNestableTasksAllowed(true);      MSG msg; -    while (GetMessage(&msg, NULL, 0, 0)) {  +    while (GetMessage(&msg, NULL, 0, 0)) {        TranslateMessage(&msg);        DispatchMessage(&msg);      } @@ -550,7 +552,7 @@ void RunTest_Crasher(MessageLoop::Type message_loop_type) {  void RunTest_CrasherNasty(MessageLoop::Type message_loop_type) {    MessageLoop loop(message_loop_type); -   +    if (::IsDebuggerPresent())      return; @@ -856,7 +858,7 @@ void RunTest_RecursiveDenial1(MessageLoop::Type message_loop_type) {  void RunTest_RecursiveSupport1(MessageLoop::Type message_loop_type) {    MessageLoop loop(message_loop_type); -   +    TaskList order;    MessageLoop::current()->PostTask(FROM_HERE,                                     new RecursiveTask(2, &order, 1, true)); @@ -1054,7 +1056,7 @@ void RunTest_NonNestableInNestedLoop(MessageLoop::Type message_loop_type) {  class AutoresetWatcher : public MessageLoopForIO::Watcher {   public: -  AutoresetWatcher(HANDLE signal) : signal_(signal) { +  explicit AutoresetWatcher(HANDLE signal) : signal_(signal) {    }    virtual void OnObjectSignaled(HANDLE object);   private: @@ -1144,6 +1146,101 @@ void RunTest_Dispatcher(MessageLoop::Type message_loop_type) {    ASSERT_EQ(2, dispatcher.dispatch_count_);  } +class TestIOHandler : public MessageLoopForIO::IOHandler { + public: +  TestIOHandler(const wchar_t* name, HANDLE signal); + +  virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, +                             DWORD error); + +  HANDLE file() { return file_.Get(); } +  void* buffer() { return buffer_; } +  OVERLAPPED* context() { return &context_; } +  DWORD size() { return sizeof(buffer_); } + + private: +  char buffer_[48]; +  OVERLAPPED context_; +  HANDLE signal_; +  ScopedHandle file_; +  ScopedHandle event_; +}; + +TestIOHandler::TestIOHandler(const wchar_t* name, HANDLE signal) +    : signal_(signal) { +  memset(buffer_, 0, sizeof(buffer_)); +  memset(&context_, 0, sizeof(context_)); + +  event_.Set(CreateEvent(NULL, TRUE, FALSE, NULL)); +  EXPECT_TRUE(event_.IsValid()); +  context_.hEvent =  event_.Get(); + +  file_.Set(CreateFile(name, GENERIC_READ, 0, NULL, OPEN_EXISTING, +                       FILE_FLAG_OVERLAPPED, NULL)); +  EXPECT_TRUE(file_.IsValid()); +} + +void TestIOHandler::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, +                                  DWORD error) { +  ASSERT_TRUE(context == &context_); +  MessageLoopForIO::current()->RegisterIOContext(context, NULL); +  ASSERT_TRUE(SetEvent(signal_)); +} + +class IOHandlerTask : public Task { + public: +  explicit IOHandlerTask(TestIOHandler* handler) : handler_(handler) {} +  virtual void Run(); + + private: +  TestIOHandler* handler_; +}; + +void IOHandlerTask::Run() { +  MessageLoopForIO::current()->RegisterIOHandler(handler_->file(), handler_); +  MessageLoopForIO::current()->RegisterIOContext(handler_->context(), handler_); + +  DWORD read; +  EXPECT_FALSE(ReadFile(handler_->file(), handler_->buffer(), handler_->size(), +               &read, handler_->context())); +  EXPECT_EQ(ERROR_IO_PENDING, GetLastError()); +} + +void RunTest_IOHandler() { +  // This test requires an IO loop. +  MessageLoop loop(MessageLoop::TYPE_IO); + +  ScopedHandle callback_called(CreateEvent(NULL, TRUE, FALSE, NULL)); +  ASSERT_TRUE(callback_called.IsValid()); + +  const wchar_t* kPipeName = L"\\\\.\\pipe\\iohandler_pipe"; +  ScopedHandle server(CreateNamedPipe(kPipeName, PIPE_ACCESS_OUTBOUND, 0, 1, +                                      0, 0, 0, NULL)); +  ASSERT_TRUE(server.IsValid()); + +  Thread thread("IOHandler test"); +  Thread::Options options; +  options.message_loop_type = MessageLoop::TYPE_IO; +  ASSERT_TRUE(thread.StartWithOptions(options)); + +  MessageLoop* thread_loop = thread.message_loop(); +  ASSERT_TRUE(NULL != thread_loop); + +  TestIOHandler handler(kPipeName, callback_called); +  IOHandlerTask* task = new IOHandlerTask(&handler); +  thread_loop->PostTask(FROM_HERE, task); +  Sleep(100);  // Make sure the thread runs and sleeps for lack of work. + +  const char buffer[] = "Hello there!"; +  DWORD written; +  EXPECT_TRUE(WriteFile(server, buffer, sizeof(buffer), &written, NULL)); + +  DWORD result = WaitForSingleObject(callback_called, 1000); +  EXPECT_EQ(WAIT_OBJECT_0, result); + +  thread.Stop(); +} +  #endif  // defined(OS_WIN)  }  // namespace @@ -1289,4 +1386,8 @@ TEST(MessageLoopTest, Dispatcher) {    // This test requires a UI loop    RunTest_Dispatcher(MessageLoop::TYPE_UI);  } + +TEST(MessageLoopTest, IOHandler) { +  RunTest_IOHandler(); +}  #endif  // defined(OS_WIN) diff --git a/base/message_pump_win.cc b/base/message_pump_win.cc index 4fff9d7..4eb0995 100644 --- a/base/message_pump_win.cc +++ b/base/message_pump_win.cc @@ -9,6 +9,41 @@  #include "base/histogram.h"  #include "base/win_util.h" +namespace { + +class HandlerData : public base::MessagePumpForIO::Watcher { + public: +  typedef base::MessagePumpForIO::IOHandler IOHandler; +  HandlerData(OVERLAPPED* context, IOHandler* handler) +      : context_(context), handler_(handler) {} +  ~HandlerData() {} + +  virtual void OnObjectSignaled(HANDLE object); + + private: +  OVERLAPPED* context_; +  IOHandler* handler_; +   +  DISALLOW_COPY_AND_ASSIGN(HandlerData); +}; + +void HandlerData::OnObjectSignaled(HANDLE object) { +  DCHECK(object == context_->hEvent); +  DWORD transfered; +  DWORD error = ERROR_SUCCESS; +  BOOL ret = GetOverlappedResult(NULL, context_, &transfered, FALSE); +  if (!ret) { +    error = GetLastError(); +    DCHECK(ERROR_HANDLE_EOF == error || ERROR_BROKEN_PIPE == error); +    transfered = 0; +  } + +  ResetEvent(context_->hEvent); +  handler_->OnIOCompleted(context_, transfered, error); +} + +}  // namespace +  namespace base {  static const wchar_t kWndClass[] = L"Chrome_MessagePumpWindow"; @@ -426,6 +461,40 @@ void MessagePumpForIO::WatchObject(HANDLE object, Watcher* watcher) {    }  } +void MessagePumpForIO::RegisterIOHandler(HANDLE file_handle, +                                         IOHandler* handler) { +#if 0 +  // TODO(rvargas): This is just to give an idea of what this code will look +  // like when we actually move to completion ports. Of course, we cannot +  // do this without calling GetQueuedCompletionStatus(). +  ULONG_PTR key = reinterpret_cast<ULONG_PTR>(handler); +  HANDLE port = CreateIoCompletionPort(file_handle, port_, key, 1); +  if (!port_.IsValid()) +    port_.Set(port); +#endif +} + +void MessagePumpForIO::RegisterIOContext(OVERLAPPED* context, +                                         IOHandler* handler) { +  DCHECK(context->hEvent); +  if (handler) { +    HandlerData* watcher = new HandlerData(context, handler); +    WatchObject(context->hEvent, watcher); +  } else { +    std::vector<HANDLE>::iterator it = +      find(objects_.begin(), objects_.end(), context->hEvent); + +    if (it == objects_.end()) { +      NOTREACHED(); +      return; +    } + +    std::vector<HANDLE>::difference_type index = it - objects_.begin(); +    objects_.erase(it); +    delete watchers_[index]; +    watchers_.erase(watchers_.begin() + index); +  } +}  //-----------------------------------------------------------------------------  // MessagePumpForIO private: diff --git a/base/message_pump_win.h b/base/message_pump_win.h index 0192e3c..50a5223 100644 --- a/base/message_pump_win.h +++ b/base/message_pump_win.h @@ -12,6 +12,7 @@  #include "base/lock.h"  #include "base/message_pump.h"  #include "base/observer_list.h" +#include "base/scoped_handle.h"  #include "base/time.h"  namespace base { @@ -193,6 +194,20 @@ class MessagePumpForIO : public MessagePumpWin {      virtual void OnObjectSignaled(HANDLE object) = 0;    }; +  // Clients interested in receiving OS notifications when asynchronous IO +  // operations complete should implement this interface and register themselves +  // with the message pump. +  class IOHandler { +   public: +    virtual ~IOHandler() {} +    // This will be called once the pending IO operation associated with +    // |context| completes. |error| is the Win32 error code of the IO operation +    // (ERROR_SUCCESS if there was no error). |bytes_transfered| will be zero +    // on error. +    virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, +                               DWORD error) = 0; +  }; +    MessagePumpForIO() {}    virtual ~MessagePumpForIO() {} @@ -200,6 +215,22 @@ class MessagePumpForIO : public MessagePumpWin {    // Pass a null watcher to stop watching the object.    void WatchObject(HANDLE, Watcher*); +  // Register the handler to be used when asynchronous IO for the given file +  // completes. The registration persists as long as |file_handle| is valid, so +  // |handler| must be valid as long as there is pending IO for the given file. +  void RegisterIOHandler(HANDLE file_handle, IOHandler* handler); + +  // This is just a throw away function to ease transition to completion ports. +  // Pass NULL for handler to stop tracking this request. WARNING: cancellation +  // correctness is the responsibility of the caller. |context| must contain a +  // valid manual reset event, but the caller should not interact directly with +  // it. The registration can live across a single IO operation, or it can live +  // across multiple IO operations without having to reset it after each IO +  // completion callback. Internally, there will be a WatchObject registration +  // alive as long as this context registration is in effect. It is an error +  // to unregister a context that has not been registered before. +  void RegisterIOContext(OVERLAPPED* context, IOHandler* handler); +   private:    virtual void DoRunLoop();    void WaitForWork(); @@ -210,6 +241,9 @@ class MessagePumpForIO : public MessagePumpWin {    // serviced by this message pump.    std::vector<HANDLE> objects_;    std::vector<Watcher*> watchers_; + +  // The completion port associated with this thread. +  ScopedHandle port_;  };  }  // namespace base | 
