diff options
-rw-r--r-- | base/message_loop.cc | 11 | ||||
-rw-r--r-- | base/message_loop.h | 7 | ||||
-rw-r--r-- | base/message_loop_unittest.cc | 186 | ||||
-rw-r--r-- | base/message_pump_win.cc | 666 | ||||
-rw-r--r-- | base/message_pump_win.h | 290 | ||||
-rw-r--r-- | chrome/common/ipc_channel.cc | 144 | ||||
-rw-r--r-- | chrome/common/ipc_channel.h | 14 | ||||
-rw-r--r-- | chrome/common/ipc_sync_channel_unittest.cc | 8 | ||||
-rw-r--r-- | net/base/file_stream_win.cc | 73 | ||||
-rw-r--r-- | net/disk_cache/cache_util_win.cc | 12 | ||||
-rw-r--r-- | net/disk_cache/disk_cache_perftest.cc | 8 | ||||
-rw-r--r-- | net/disk_cache/disk_cache_test_base.cc | 5 | ||||
-rw-r--r-- | net/disk_cache/disk_cache_test_base.h | 14 | ||||
-rw-r--r-- | net/disk_cache/entry_impl.cc | 39 | ||||
-rw-r--r-- | net/disk_cache/file_win.cc | 265 | ||||
-rw-r--r-- | net/disk_cache/mapped_file_unittest.cc | 6 |
16 files changed, 816 insertions, 932 deletions
diff --git a/base/message_loop.cc b/base/message_loop.cc index 73cb7da..1145439 100644 --- a/base/message_loop.cc +++ b/base/message_loop.cc @@ -573,7 +573,7 @@ void MessageLoopForUI::DidProcessMessage(const MSG& message) { pump_win()->DidProcessMessage(message); } void MessageLoopForUI::PumpOutPendingPaintMessages() { - pump_win()->PumpOutPendingPaintMessages(); + pump_ui()->PumpOutPendingPaintMessages(); } #endif // defined(OS_WIN) @@ -583,17 +583,12 @@ void MessageLoopForUI::PumpOutPendingPaintMessages() { #if defined(OS_WIN) -void MessageLoopForIO::WatchObject(HANDLE object, Watcher* watcher) { - pump_io()->WatchObject(object, watcher); -} - void MessageLoopForIO::RegisterIOHandler(HANDLE file, IOHandler* handler) { pump_io()->RegisterIOHandler(file, handler); } -void MessageLoopForIO::RegisterIOContext(OVERLAPPED* context, - IOHandler* handler) { - pump_io()->RegisterIOContext(context, handler); +bool MessageLoopForIO::WaitForIOCompletion(DWORD timeout, IOHandler* filter) { + return pump_io()->WaitForIOCompletion(timeout, filter); } #elif defined(OS_POSIX) diff --git a/base/message_loop.h b/base/message_loop.h index 2d38853..8f16d8e 100644 --- a/base/message_loop.h +++ b/base/message_loop.h @@ -426,7 +426,7 @@ class MessageLoopForUI : public MessageLoop { protected: // TODO(rvargas): Make this platform independent. - base::MessagePumpWin* pump_ui() { + base::MessagePumpForUI* pump_ui() { return static_cast<base::MessagePumpForUI*>(pump_.get()); } #endif // defined(OS_WIN) @@ -458,13 +458,12 @@ class MessageLoopForIO : public MessageLoop { } #if defined(OS_WIN) - typedef base::MessagePumpForIO::Watcher Watcher; typedef base::MessagePumpForIO::IOHandler IOHandler; + typedef base::MessagePumpForIO::IOContext IOContext; // Please see MessagePumpWin for definitions of these methods. - void WatchObject(HANDLE object, Watcher* watcher); void RegisterIOHandler(HANDLE file_handle, IOHandler* handler); - void RegisterIOContext(OVERLAPPED* context, IOHandler* handler); + bool WaitForIOCompletion(DWORD timeout, IOHandler* filter); protected: // TODO(rvargas): Make this platform independent. diff --git a/base/message_loop_unittest.cc b/base/message_loop_unittest.cc index 83be2b1..d959659 100644 --- a/base/message_loop_unittest.cc +++ b/base/message_loop_unittest.cc @@ -1056,68 +1056,6 @@ void RunTest_NonNestableInNestedLoop(MessageLoop::Type message_loop_type) { #if defined(OS_WIN) -class AutoresetWatcher : public MessageLoopForIO::Watcher { - public: - explicit AutoresetWatcher(HANDLE signal) : signal_(signal) { - } - virtual void OnObjectSignaled(HANDLE object); - private: - HANDLE signal_; -}; - -void AutoresetWatcher::OnObjectSignaled(HANDLE object) { - MessageLoopForIO::current()->WatchObject(object, NULL); - ASSERT_TRUE(SetEvent(signal_)); -} - -class AutoresetTask : public Task { - public: - AutoresetTask(HANDLE object, MessageLoopForIO::Watcher* watcher) - : object_(object), watcher_(watcher) {} - virtual void Run() { - MessageLoopForIO::current()->WatchObject(object_, watcher_); - } - - private: - HANDLE object_; - MessageLoopForIO::Watcher* watcher_; -}; - -void RunTest_AutoresetEvents(MessageLoop::Type message_loop_type) { - MessageLoop loop(message_loop_type); - - SECURITY_ATTRIBUTES attributes; - attributes.nLength = sizeof(attributes); - attributes.bInheritHandle = false; - attributes.lpSecurityDescriptor = NULL; - - // Init an autoreset and a manual reset events. - HANDLE autoreset = CreateEvent(&attributes, FALSE, FALSE, NULL); - HANDLE callback_called = CreateEvent(&attributes, TRUE, FALSE, NULL); - ASSERT_TRUE(NULL != autoreset); - ASSERT_TRUE(NULL != callback_called); - - Thread thread("Autoreset test"); - Thread::Options options; - options.message_loop_type = message_loop_type; - ASSERT_TRUE(thread.StartWithOptions(options)); - - MessageLoop* thread_loop = thread.message_loop(); - ASSERT_TRUE(NULL != thread_loop); - - AutoresetWatcher watcher(callback_called); - AutoresetTask* task = new AutoresetTask(autoreset, &watcher); - thread_loop->PostTask(FROM_HERE, task); - Sleep(100); // Make sure the thread runs and sleeps for lack of work. - - ASSERT_TRUE(SetEvent(autoreset)); - - DWORD result = WaitForSingleObject(callback_called, 1000); - EXPECT_EQ(WAIT_OBJECT_0, result); - - thread.Stop(); -} - class DispatcherImpl : public MessageLoopForUI::Dispatcher { public: DispatcherImpl() : dispatch_count_(0) {} @@ -1150,68 +1088,68 @@ void RunTest_Dispatcher(MessageLoop::Type message_loop_type) { class TestIOHandler : public MessageLoopForIO::IOHandler { public: - TestIOHandler(const wchar_t* name, HANDLE signal); + TestIOHandler(const wchar_t* name, HANDLE signal, bool wait); - virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, - DWORD error); + virtual void OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_transfered, DWORD error); - HANDLE file() { return file_.Get(); } - void* buffer() { return buffer_; } - OVERLAPPED* context() { return &context_; } + void Init(); + void WaitForIO(); + OVERLAPPED* context() { return &context_.overlapped; } DWORD size() { return sizeof(buffer_); } private: char buffer_[48]; - OVERLAPPED context_; + MessageLoopForIO::IOContext context_; HANDLE signal_; ScopedHandle file_; - ScopedHandle event_; + bool wait_; }; -TestIOHandler::TestIOHandler(const wchar_t* name, HANDLE signal) - : signal_(signal) { +TestIOHandler::TestIOHandler(const wchar_t* name, HANDLE signal, bool wait) + : signal_(signal), wait_(wait) { memset(buffer_, 0, sizeof(buffer_)); memset(&context_, 0, sizeof(context_)); - - event_.Set(CreateEvent(NULL, TRUE, FALSE, NULL)); - EXPECT_TRUE(event_.IsValid()); - context_.hEvent = event_.Get(); + context_.handler = this; file_.Set(CreateFile(name, GENERIC_READ, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL)); EXPECT_TRUE(file_.IsValid()); } -void TestIOHandler::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, - DWORD error) { +void TestIOHandler::Init() { + MessageLoopForIO::current()->RegisterIOHandler(file_, this); + + DWORD read; + EXPECT_FALSE(ReadFile(file_, buffer_, size(), &read, context())); + EXPECT_EQ(ERROR_IO_PENDING, GetLastError()); + if (wait_) + WaitForIO(); +} + +void TestIOHandler::OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_transfered, DWORD error) { ASSERT_TRUE(context == &context_); - MessageLoopForIO::current()->RegisterIOContext(context, NULL); ASSERT_TRUE(SetEvent(signal_)); } +void TestIOHandler::WaitForIO() { + EXPECT_TRUE(MessageLoopForIO::current()->WaitForIOCompletion(300, this)); + EXPECT_TRUE(MessageLoopForIO::current()->WaitForIOCompletion(400, this)); +} + class IOHandlerTask : public Task { public: explicit IOHandlerTask(TestIOHandler* handler) : handler_(handler) {} - virtual void Run(); + virtual void Run() { + handler_->Init(); + } private: TestIOHandler* handler_; }; -void IOHandlerTask::Run() { - MessageLoopForIO::current()->RegisterIOHandler(handler_->file(), handler_); - MessageLoopForIO::current()->RegisterIOContext(handler_->context(), handler_); - - DWORD read; - EXPECT_FALSE(ReadFile(handler_->file(), handler_->buffer(), handler_->size(), - &read, handler_->context())); - EXPECT_EQ(ERROR_IO_PENDING, GetLastError()); -} - void RunTest_IOHandler() { - // This test requires an IO loop. - MessageLoop loop(MessageLoop::TYPE_IO); - ScopedHandle callback_called(CreateEvent(NULL, TRUE, FALSE, NULL)); ASSERT_TRUE(callback_called.IsValid()); @@ -1228,7 +1166,7 @@ void RunTest_IOHandler() { MessageLoop* thread_loop = thread.message_loop(); ASSERT_TRUE(NULL != thread_loop); - TestIOHandler handler(kPipeName, callback_called); + TestIOHandler handler(kPipeName, callback_called, false); IOHandlerTask* task = new IOHandlerTask(&handler); thread_loop->PostTask(FROM_HERE, task); Sleep(100); // Make sure the thread runs and sleeps for lack of work. @@ -1243,6 +1181,57 @@ void RunTest_IOHandler() { thread.Stop(); } +void RunTest_WaitForIO() { + ScopedHandle callback1_called(CreateEvent(NULL, TRUE, FALSE, NULL)); + ScopedHandle callback2_called(CreateEvent(NULL, TRUE, FALSE, NULL)); + ASSERT_TRUE(callback1_called.IsValid()); + ASSERT_TRUE(callback2_called.IsValid()); + + const wchar_t* kPipeName1 = L"\\\\.\\pipe\\iohandler_pipe1"; + const wchar_t* kPipeName2 = L"\\\\.\\pipe\\iohandler_pipe2"; + ScopedHandle server1(CreateNamedPipe(kPipeName1, PIPE_ACCESS_OUTBOUND, 0, 1, + 0, 0, 0, NULL)); + ScopedHandle server2(CreateNamedPipe(kPipeName2, PIPE_ACCESS_OUTBOUND, 0, 1, + 0, 0, 0, NULL)); + ASSERT_TRUE(server1.IsValid()); + ASSERT_TRUE(server2.IsValid()); + + Thread thread("IOHandler test"); + Thread::Options options; + options.message_loop_type = MessageLoop::TYPE_IO; + ASSERT_TRUE(thread.StartWithOptions(options)); + + MessageLoop* thread_loop = thread.message_loop(); + ASSERT_TRUE(NULL != thread_loop); + + TestIOHandler handler1(kPipeName1, callback1_called, false); + TestIOHandler handler2(kPipeName2, callback2_called, true); + IOHandlerTask* task1 = new IOHandlerTask(&handler1); + IOHandlerTask* task2 = new IOHandlerTask(&handler2); + thread_loop->PostTask(FROM_HERE, task1); + Sleep(100); // Make sure the thread runs and sleeps for lack of work. + thread_loop->PostTask(FROM_HERE, task2); + Sleep(100); + + // At this time handler1 is waiting to be called, and the thread is waiting + // on the Init method of handler2, filtering only handler2 callbacks. + + const char buffer[] = "Hello there!"; + DWORD written; + EXPECT_TRUE(WriteFile(server1, buffer, sizeof(buffer), &written, NULL)); + Sleep(200); + EXPECT_EQ(WAIT_TIMEOUT, WaitForSingleObject(callback1_called, 0)) << + "handler1 has not been called"; + + EXPECT_TRUE(WriteFile(server2, buffer, sizeof(buffer), &written, NULL)); + + HANDLE objects[2] = { callback1_called.Get(), callback2_called.Get() }; + DWORD result = WaitForMultipleObjects(2, objects, TRUE, 1000); + EXPECT_EQ(WAIT_OBJECT_0, result); + + thread.Stop(); +} + #endif // defined(OS_WIN) } // namespace @@ -1379,11 +1368,6 @@ TEST(MessageLoopTest, NonNestableInNestedLoop) { } #if defined(OS_WIN) -TEST(MessageLoopTest, AutoresetEvents) { - // This test requires an IO loop - RunTest_AutoresetEvents(MessageLoop::TYPE_IO); -} - TEST(MessageLoopTest, Dispatcher) { // This test requires a UI loop RunTest_Dispatcher(MessageLoop::TYPE_UI); @@ -1392,4 +1376,8 @@ TEST(MessageLoopTest, Dispatcher) { TEST(MessageLoopTest, IOHandler) { RunTest_IOHandler(); } + +TEST(MessageLoopTest, WaitForIO) { + RunTest_WaitForIO(); +} #endif // defined(OS_WIN) diff --git a/base/message_pump_win.cc b/base/message_pump_win.cc index 542fcf1..7ada5eb7 100644 --- a/base/message_pump_win.cc +++ b/base/message_pump_win.cc @@ -11,41 +11,6 @@ using base::Time; -namespace { - -class HandlerData : public base::MessagePumpForIO::Watcher { - public: - typedef base::MessagePumpForIO::IOHandler IOHandler; - HandlerData(OVERLAPPED* context, IOHandler* handler) - : context_(context), handler_(handler) {} - ~HandlerData() {} - - virtual void OnObjectSignaled(HANDLE object); - - private: - OVERLAPPED* context_; - IOHandler* handler_; - - DISALLOW_COPY_AND_ASSIGN(HandlerData); -}; - -void HandlerData::OnObjectSignaled(HANDLE object) { - DCHECK(object == context_->hEvent); - DWORD transfered; - DWORD error = ERROR_SUCCESS; - BOOL ret = GetOverlappedResult(NULL, context_, &transfered, FALSE); - if (!ret) { - error = GetLastError(); - DCHECK(ERROR_HANDLE_EOF == error || ERROR_BROKEN_PIPE == error); - transfered = 0; - } - - ResetEvent(context_->hEvent); - handler_->OnIOCompleted(context_, transfered, error); -} - -} // namespace - namespace base { static const wchar_t kWndClass[] = L"Chrome_MessagePumpWindow"; @@ -54,24 +19,9 @@ static const wchar_t kWndClass[] = L"Chrome_MessagePumpWindow"; // task (a series of such messages creates a continuous task pump). static const int kMsgHaveWork = WM_USER + 1; -#ifndef NDEBUG -// Force exercise of polling model. -static const int kMaxWaitObjects = 8; -#else -static const int kMaxWaitObjects = MAXIMUM_WAIT_OBJECTS; -#endif - //----------------------------------------------------------------------------- // MessagePumpWin public: -MessagePumpWin::MessagePumpWin() : have_work_(0), state_(NULL) { - InitMessageWnd(); -} - -MessagePumpWin::~MessagePumpWin() { - DestroyWindow(message_hwnd_); -} - void MessagePumpWin::AddObserver(Observer* observer) { observers_.AddObserver(observer); } @@ -88,36 +38,6 @@ void MessagePumpWin::DidProcessMessage(const MSG& msg) { FOR_EACH_OBSERVER(Observer, observers_, DidProcessMessage(msg)); } -void MessagePumpWin::PumpOutPendingPaintMessages() { - // If we are being called outside of the context of Run, then don't try to do - // any work. - if (!state_) - return; - - // Create a mini-message-pump to force immediate processing of only Windows - // WM_PAINT messages. Don't provide an infinite loop, but do enough peeking - // to get the job done. Actual common max is 4 peeks, but we'll be a little - // safe here. - const int kMaxPeekCount = 20; - bool win2k = win_util::GetWinVersion() <= win_util::WINVERSION_2000; - int peek_count; - for (peek_count = 0; peek_count < kMaxPeekCount; ++peek_count) { - MSG msg; - if (win2k) { - if (!PeekMessage(&msg, NULL, WM_PAINT, WM_PAINT, PM_REMOVE)) - break; - } else { - if (!PeekMessage(&msg, NULL, 0, 0, PM_REMOVE | PM_QS_PAINT)) - break; - } - ProcessMessageHelper(msg); - if (state_->should_quit) // Handle WM_QUIT. - break; - } - // Histogram what was really being used, to help to adjust kMaxPeekCount. - DHISTOGRAM_COUNTS(L"Loop.PumpOutPendingPaintMessages Peeks", peek_count); -} - void MessagePumpWin::RunWithDispatcher( Delegate* delegate, Dispatcher* dispatcher) { RunState s; @@ -139,7 +59,38 @@ void MessagePumpWin::Quit() { state_->should_quit = true; } -void MessagePumpWin::ScheduleWork() { +//----------------------------------------------------------------------------- +// MessagePumpWin protected: + +int MessagePumpWin::GetCurrentDelay() const { + if (delayed_work_time_.is_null()) + return -1; + + // Be careful here. TimeDelta has a precision of microseconds, but we want a + // value in milliseconds. If there are 5.5ms left, should the delay be 5 or + // 6? It should be 6 to avoid executing delayed work too early. + double timeout = ceil((delayed_work_time_ - Time::Now()).InMillisecondsF()); + + // If this value is negative, then we need to run delayed work soon. + int delay = static_cast<int>(timeout); + if (delay < 0) + delay = 0; + + return delay; +} + +//----------------------------------------------------------------------------- +// MessagePumpForUI public: + +MessagePumpForUI::MessagePumpForUI() { + InitMessageWnd(); +} + +MessagePumpForUI::~MessagePumpForUI() { + DestroyWindow(message_hwnd_); +} + +void MessagePumpForUI::ScheduleWork() { if (InterlockedExchange(&have_work_, 1)) return; // Someone else continued the pumping. @@ -147,7 +98,7 @@ void MessagePumpWin::ScheduleWork() { PostMessage(message_hwnd_, kMsgHaveWork, reinterpret_cast<WPARAM>(this), 0); } -void MessagePumpWin::ScheduleDelayedWork(const Time& delayed_work_time) { +void MessagePumpForUI::ScheduleDelayedWork(const Time& delayed_work_time) { // // We would *like* to provide high resolution timers. Windows timers using // SetTimer() have a 10ms granularity. We have to use WM_TIMER as a wakeup @@ -180,24 +131,110 @@ void MessagePumpWin::ScheduleDelayedWork(const Time& delayed_work_time) { SetTimer(message_hwnd_, reinterpret_cast<UINT_PTR>(this), delay_msec, NULL); } +void MessagePumpForUI::PumpOutPendingPaintMessages() { + // If we are being called outside of the context of Run, then don't try to do + // any work. + if (!state_) + return; + + // Create a mini-message-pump to force immediate processing of only Windows + // WM_PAINT messages. Don't provide an infinite loop, but do enough peeking + // to get the job done. Actual common max is 4 peeks, but we'll be a little + // safe here. + const int kMaxPeekCount = 20; + bool win2k = win_util::GetWinVersion() <= win_util::WINVERSION_2000; + int peek_count; + for (peek_count = 0; peek_count < kMaxPeekCount; ++peek_count) { + MSG msg; + if (win2k) { + if (!PeekMessage(&msg, NULL, WM_PAINT, WM_PAINT, PM_REMOVE)) + break; + } else { + if (!PeekMessage(&msg, NULL, 0, 0, PM_REMOVE | PM_QS_PAINT)) + break; + } + ProcessMessageHelper(msg); + if (state_->should_quit) // Handle WM_QUIT. + break; + } + // Histogram what was really being used, to help to adjust kMaxPeekCount. + DHISTOGRAM_COUNTS(L"Loop.PumpOutPendingPaintMessages Peeks", peek_count); +} + //----------------------------------------------------------------------------- -// MessagePumpWin protected: +// MessagePumpForUI private: // static -LRESULT CALLBACK MessagePumpWin::WndProcThunk( +LRESULT CALLBACK MessagePumpForUI::WndProcThunk( HWND hwnd, UINT message, WPARAM wparam, LPARAM lparam) { switch (message) { case kMsgHaveWork: - reinterpret_cast<MessagePumpWin*>(wparam)->HandleWorkMessage(); + reinterpret_cast<MessagePumpForUI*>(wparam)->HandleWorkMessage(); break; case WM_TIMER: - reinterpret_cast<MessagePumpWin*>(wparam)->HandleTimerMessage(); + reinterpret_cast<MessagePumpForUI*>(wparam)->HandleTimerMessage(); break; } return DefWindowProc(hwnd, message, wparam, lparam); } -void MessagePumpWin::InitMessageWnd() { +void MessagePumpForUI::DoRunLoop() { + // IF this was just a simple PeekMessage() loop (servicing all possible work + // queues), then Windows would try to achieve the following order according + // to MSDN documentation about PeekMessage with no filter): + // * Sent messages + // * Posted messages + // * Sent messages (again) + // * WM_PAINT messages + // * WM_TIMER messages + // + // Summary: none of the above classes is starved, and sent messages has twice + // the chance of being processed (i.e., reduced service time). + + for (;;) { + // If we do any work, we may create more messages etc., and more work may + // possibly be waiting in another task group. When we (for example) + // ProcessNextWindowsMessage(), there is a good chance there are still more + // messages waiting. On the other hand, when any of these methods return + // having done no work, then it is pretty unlikely that calling them again + // quickly will find any work to do. Finally, if they all say they had no + // work, then it is a good time to consider sleeping (waiting) for more + // work. + + bool more_work_is_plausible = ProcessNextWindowsMessage(); + if (state_->should_quit) + break; + + more_work_is_plausible |= state_->delegate->DoWork(); + if (state_->should_quit) + break; + + more_work_is_plausible |= + state_->delegate->DoDelayedWork(&delayed_work_time_); + // If we did not process any delayed work, then we can assume that our + // existing WM_TIMER if any will fire when delayed work should run. We + // don't want to disturb that timer if it is already in flight. However, + // if we did do all remaining delayed work, then lets kill the WM_TIMER. + if (more_work_is_plausible && delayed_work_time_.is_null()) + KillTimer(message_hwnd_, reinterpret_cast<UINT_PTR>(this)); + if (state_->should_quit) + break; + + if (more_work_is_plausible) + continue; + + more_work_is_plausible = state_->delegate->DoIdleWork(); + if (state_->should_quit) + break; + + if (more_work_is_plausible) + continue; + + WaitForWork(); // Wait (sleep) until we have work to do again. + } +} + +void MessagePumpForUI::InitMessageWnd() { HINSTANCE hinst = GetModuleHandle(NULL); WNDCLASSEX wc = {0}; @@ -212,7 +249,41 @@ void MessagePumpWin::InitMessageWnd() { DCHECK(message_hwnd_); } -void MessagePumpWin::HandleWorkMessage() { +void MessagePumpForUI::WaitForWork() { + // Wait until a message is available, up to the time needed by the timer + // manager to fire the next set of timers. + int delay = GetCurrentDelay(); + if (delay < 0) // Negative value means no timers waiting. + delay = INFINITE; + + DWORD result; + result = MsgWaitForMultipleObjectsEx(0, NULL, delay, QS_ALLINPUT, + MWMO_INPUTAVAILABLE); + + if (WAIT_OBJECT_0 == result) { + // A WM_* message is available. + // If a parent child relationship exists between windows across threads + // then their thread inputs are implicitly attached. + // This causes the MsgWaitForMultipleObjectsEx API to return indicating + // that messages are ready for processing (specifically mouse messages + // intended for the child window. Occurs if the child window has capture) + // The subsequent PeekMessages call fails to return any messages thus + // causing us to enter a tight loop at times. + // The WaitMessage call below is a workaround to give the child window + // sometime to process its input messages. + MSG msg = {0}; + DWORD queue_status = GetQueueStatus(QS_MOUSE); + if (HIWORD(queue_status) & QS_MOUSE && + !PeekMessage(&msg, NULL, WM_MOUSEFIRST, WM_MOUSELAST, PM_NOREMOVE)) { + WaitMessage(); + } + return; + } + + DCHECK_NE(WAIT_FAILED, result) << GetLastError(); +} + +void MessagePumpForUI::HandleWorkMessage() { // If we are being called outside of the context of Run, then don't try to do // any work. This could correspond to a MessageBox call or something of that // sort. @@ -233,7 +304,7 @@ void MessagePumpWin::HandleWorkMessage() { ScheduleWork(); } -void MessagePumpWin::HandleTimerMessage() { +void MessagePumpForUI::HandleTimerMessage() { KillTimer(message_hwnd_, reinterpret_cast<UINT_PTR>(this)); // If we are being called outside of the context of Run, then don't do @@ -249,7 +320,7 @@ void MessagePumpWin::HandleTimerMessage() { } } -bool MessagePumpWin::ProcessNextWindowsMessage() { +bool MessagePumpForUI::ProcessNextWindowsMessage() { // If there are sent messages in the queue then PeekMessage internally // dispatches the message and returns false. We return true in this // case to ensure that the message loop peeks again instead of calling @@ -266,7 +337,7 @@ bool MessagePumpWin::ProcessNextWindowsMessage() { return sent_messages_in_queue; } -bool MessagePumpWin::ProcessMessageHelper(const MSG& msg) { +bool MessagePumpForUI::ProcessMessageHelper(const MSG& msg) { if (WM_QUIT == msg.message) { // Repost the QUIT message so that it will be retrieved by the primary // GetMessage() loop. @@ -293,7 +364,7 @@ bool MessagePumpWin::ProcessMessageHelper(const MSG& msg) { return true; } -bool MessagePumpWin::ProcessPumpReplacementMessage() { +bool MessagePumpForUI::ProcessPumpReplacementMessage() { // When we encounter a kMsgHaveWork message, this method is called to peek // and process a replacement message, such as a WM_PAINT or WM_TIMER. The // goal is to make the kMsgHaveWork as non-intrusive as possible, even though @@ -307,7 +378,7 @@ bool MessagePumpWin::ProcessPumpReplacementMessage() { bool have_message = (0 != PeekMessage(&msg, NULL, 0, 0, PM_REMOVE)); DCHECK(!have_message || kMsgHaveWork != msg.message || msg.hwnd != message_hwnd_); - + // Since we discarded a kMsgHaveWork message, we must update the flag. InterlockedExchange(&have_work_, 0); @@ -318,233 +389,63 @@ bool MessagePumpWin::ProcessPumpReplacementMessage() { return have_message && ProcessMessageHelper(msg); } -int MessagePumpWin::GetCurrentDelay() const { - if (delayed_work_time_.is_null()) - return -1; - - // Be careful here. TimeDelta has a precision of microseconds, but we want a - // value in milliseconds. If there are 5.5ms left, should the delay be 5 or - // 6? It should be 6 to avoid executing delayed work too early. - double timeout = ceil((delayed_work_time_ - Time::Now()).InMillisecondsF()); - - // If this value is negative, then we need to run delayed work soon. - int delay = static_cast<int>(timeout); - if (delay < 0) - delay = 0; - - return delay; -} - //----------------------------------------------------------------------------- -// MessagePumpForUI private: - -void MessagePumpForUI::DoRunLoop() { - // IF this was just a simple PeekMessage() loop (servicing all possible work - // queues), then Windows would try to achieve the following order according - // to MSDN documentation about PeekMessage with no filter): - // * Sent messages - // * Posted messages - // * Sent messages (again) - // * WM_PAINT messages - // * WM_TIMER messages - // - // Summary: none of the above classes is starved, and sent messages has twice - // the chance of being processed (i.e., reduced service time). - - for (;;) { - // If we do any work, we may create more messages etc., and more work may - // possibly be waiting in another task group. When we (for example) - // ProcessNextWindowsMessage(), there is a good chance there are still more - // messages waiting. On the other hand, when any of these methods return - // having done no work, then it is pretty unlikely that calling them again - // quickly will find any work to do. Finally, if they all say they had no - // work, then it is a good time to consider sleeping (waiting) for more - // work. - - bool more_work_is_plausible = ProcessNextWindowsMessage(); - if (state_->should_quit) - break; - - more_work_is_plausible |= state_->delegate->DoWork(); - if (state_->should_quit) - break; - - more_work_is_plausible |= - state_->delegate->DoDelayedWork(&delayed_work_time_); - // If we did not process any delayed work, then we can assume that our - // existing WM_TIMER if any will fire when delayed work should run. We - // don't want to disturb that timer if it is already in flight. However, - // if we did do all remaining delayed work, then lets kill the WM_TIMER. - if (more_work_is_plausible && delayed_work_time_.is_null()) - KillTimer(message_hwnd_, reinterpret_cast<UINT_PTR>(this)); - if (state_->should_quit) - break; - - if (more_work_is_plausible) - continue; - - more_work_is_plausible = state_->delegate->DoIdleWork(); - if (state_->should_quit) - break; - - if (more_work_is_plausible) - continue; +// MessagePumpForIO public: - WaitForWork(); // Wait (sleep) until we have work to do again. - } +MessagePumpForIO::MessagePumpForIO() { + port_.Set(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1)); + DCHECK(port_.IsValid()); } -void MessagePumpForUI::WaitForWork() { - // Wait until a message is available, up to the time needed by the timer - // manager to fire the next set of timers. - int delay = GetCurrentDelay(); - if (delay < 0) // Negative value means no timers waiting. - delay = INFINITE; - - DWORD result; - result = MsgWaitForMultipleObjectsEx(0, NULL, delay, QS_ALLINPUT, - MWMO_INPUTAVAILABLE); - - if (WAIT_OBJECT_0 == result) { - // A WM_* message is available. - // If a parent child relationship exists between windows across threads - // then their thread inputs are implicitly attached. - // This causes the MsgWaitForMultipleObjectsEx API to return indicating - // that messages are ready for processing (specifically mouse messages - // intended for the child window. Occurs if the child window has capture) - // The subsequent PeekMessages call fails to return any messages thus - // causing us to enter a tight loop at times. - // The WaitMessage call below is a workaround to give the child window - // sometime to process its input messages. - MSG msg = {0}; - DWORD queue_status = GetQueueStatus(QS_MOUSE); - if (HIWORD(queue_status) & QS_MOUSE && - !PeekMessage(&msg, NULL, WM_MOUSEFIRST, WM_MOUSELAST, PM_NOREMOVE)) { - WaitMessage(); - } - return; - } +void MessagePumpForIO::ScheduleWork() { + if (InterlockedExchange(&have_work_, 1)) + return; // Someone else continued the pumping. - DCHECK_NE(WAIT_FAILED, result) << GetLastError(); + // Make sure the MessagePump does some work for us. + BOOL ret = PostQueuedCompletionStatus(port_, 0, + reinterpret_cast<ULONG_PTR>(this), + reinterpret_cast<OVERLAPPED*>(this)); + DCHECK(ret); } -//----------------------------------------------------------------------------- -// MessagePumpForIO public: - -void MessagePumpForIO::WatchObject(HANDLE object, Watcher* watcher) { - DCHECK(object); - DCHECK_NE(object, INVALID_HANDLE_VALUE); - - std::vector<HANDLE>::iterator it = - find(objects_.begin(), objects_.end(), object); - if (watcher) { - if (it == objects_.end()) { - static size_t warning_multiple = 1; - if (objects_.size() >= warning_multiple * MAXIMUM_WAIT_OBJECTS / 2) { - LOG(INFO) << "More than " << warning_multiple * MAXIMUM_WAIT_OBJECTS / 2 - << " objects being watched"; - // This DCHECK() is an artificial limitation, meant to warn us if we - // start creating too many objects. It can safely be raised to a higher - // level, and the program is designed to handle much larger values. - // Before raising this limit, make sure that there is a very good reason - // (in your debug testing) to be watching this many objects. - DCHECK(2 <= warning_multiple); - ++warning_multiple; - } - objects_.push_back(object); - watchers_.push_back(watcher); - } else { - watchers_[it - objects_.begin()] = watcher; - } - } else if (it != objects_.end()) { - std::vector<HANDLE>::difference_type index = it - objects_.begin(); - objects_.erase(it); - watchers_.erase(watchers_.begin() + index); - } +void MessagePumpForIO::ScheduleDelayedWork(const Time& delayed_work_time) { + // We know that we can't be blocked right now since this method can only be + // called on the same thread as Run, so we only need to update our record of + // how long to sleep when we do sleep. + delayed_work_time_ = delayed_work_time; } void MessagePumpForIO::RegisterIOHandler(HANDLE file_handle, IOHandler* handler) { -#if 0 - // TODO(rvargas): This is just to give an idea of what this code will look - // like when we actually move to completion ports. Of course, we cannot - // do this without calling GetQueuedCompletionStatus(). ULONG_PTR key = reinterpret_cast<ULONG_PTR>(handler); HANDLE port = CreateIoCompletionPort(file_handle, port_, key, 1); - if (!port_.IsValid()) - port_.Set(port); -#endif -} - -void MessagePumpForIO::RegisterIOContext(OVERLAPPED* context, - IOHandler* handler) { - DCHECK(context->hEvent); - if (handler) { - HandlerData* watcher = new HandlerData(context, handler); - WatchObject(context->hEvent, watcher); - } else { - std::vector<HANDLE>::iterator it = - find(objects_.begin(), objects_.end(), context->hEvent); - - if (it == objects_.end()) { - NOTREACHED(); - return; - } - - std::vector<HANDLE>::difference_type index = it - objects_.begin(); - objects_.erase(it); - delete watchers_[index]; - watchers_.erase(watchers_.begin() + index); - } + DCHECK(port == port_.Get()); } //----------------------------------------------------------------------------- // MessagePumpForIO private: void MessagePumpForIO::DoRunLoop() { - // IF this was just a simple PeekMessage() loop (servicing all possible work - // queues), then Windows would try to achieve the following order according - // to MSDN documentation about PeekMessage with no filter): - // * Sent messages - // * Posted messages - // * Sent messages (again) - // * WM_PAINT messages - // * WM_TIMER messages - // - // Summary: none of the above classes is starved, and sent messages has twice - // the chance of being processed (i.e., reduced service time). - for (;;) { // If we do any work, we may create more messages etc., and more work may // possibly be waiting in another task group. When we (for example) - // ProcessNextWindowsMessage(), there is a good chance there are still more - // messages waiting (same thing for ProcessNextObject(), which responds to - // only one signaled object; etc.). On the other hand, when any of these - // methods return having done no work, then it is pretty unlikely that - // calling them again quickly will find any work to do. Finally, if they - // all say they had no work, then it is a good time to consider sleeping - // (waiting) for more work. - - bool more_work_is_plausible = ProcessNextWindowsMessage(); - if (state_->should_quit) - break; + // WaitForIOCompletion(), there is a good chance there are still more + // messages waiting. On the other hand, when any of these methods return + // having done no work, then it is pretty unlikely that calling them + // again quickly will find any work to do. Finally, if they all say they + // had no work, then it is a good time to consider sleeping (waiting) for + // more work. - more_work_is_plausible |= state_->delegate->DoWork(); + bool more_work_is_plausible = state_->delegate->DoWork(); if (state_->should_quit) break; - more_work_is_plausible |= ProcessNextObject(); + more_work_is_plausible |= WaitForIOCompletion(0, NULL); if (state_->should_quit) break; more_work_is_plausible |= state_->delegate->DoDelayedWork(&delayed_work_time_); - // If we did not process any delayed work, then we can assume that our - // existing WM_TIMER if any will fire when delayed work should run. We - // don't want to disturb that timer if it is already in flight. However, - // if we did do all remaining delayed work, then lets kill the WM_TIMER. - if (more_work_is_plausible && delayed_work_time_.is_null()) - KillTimer(message_hwnd_, reinterpret_cast<UINT_PTR>(this)); if (state_->should_quit) break; @@ -558,141 +459,92 @@ void MessagePumpForIO::DoRunLoop() { if (more_work_is_plausible) continue; - // We service APCs in WaitForWork, without returning. WaitForWork(); // Wait (sleep) until we have work to do again. } } -// If we handle more than the OS limit on the number of objects that can be -// waited for, we'll need to poll (sequencing through subsets of the objects -// that can be passed in a single OS wait call). The following is the polling -// interval used in that (unusual) case. (I don't have a lot of justifcation -// for the specific value, but it needed to be short enough that it would not -// add a lot of latency, and long enough that we wouldn't thrash the CPU for no -// reason... especially considering the silly user probably has a million tabs -// open, etc.) -static const int kMultipleWaitPollingInterval = 20; - +// Wait until IO completes, up to the time needed by the timer manager to fire +// the next set of timers. void MessagePumpForIO::WaitForWork() { - // Wait until either an object is signaled or a message is available. Handle - // (without returning) any APCs (only the IO thread currently has APCs.) - - // We do not support nested message loops when we have watched objects. This - // is to avoid messy recursion problems. - DCHECK(objects_.empty() || state_->run_depth == 1) << - "Cannot nest a message loop when there are watched objects!"; + // We do not support nested IO message loops. This is to avoid messy + // recursion problems. + DCHECK(state_->run_depth == 1) << "Cannot nest an IO message loop!"; - int wait_flags = MWMO_ALERTABLE | MWMO_INPUTAVAILABLE; + int timeout = GetCurrentDelay(); + if (timeout < 0) // Negative value means no timers waiting. + timeout = INFINITE; - bool use_polling = false; // Poll if too many objects for one OS Wait call. - for (;;) { - // Do initialization here, in case APC modifies object list. - size_t total_objs = objects_.size(); - - int delay; - size_t polling_index = 0; // The first unprocessed object index. - do { - size_t objs_len = - (polling_index < total_objs) ? total_objs - polling_index : 0; - if (objs_len >= MAXIMUM_WAIT_OBJECTS) { - objs_len = MAXIMUM_WAIT_OBJECTS - 1; - use_polling = true; - } - HANDLE* objs = objs_len ? polling_index + &objects_.front() : NULL; - - // Only wait up to the time needed by the timer manager to fire the next - // set of timers. - delay = GetCurrentDelay(); - if (use_polling && delay > kMultipleWaitPollingInterval) - delay = kMultipleWaitPollingInterval; - if (delay < 0) // Negative value means no timers waiting. - delay = INFINITE; - - DWORD result; - result = MsgWaitForMultipleObjectsEx(static_cast<DWORD>(objs_len), objs, - delay, QS_ALLINPUT, wait_flags); - - if (WAIT_IO_COMPLETION == result) { - // We'll loop here when we service an APC. At it currently stands, - // *ONLY* the IO thread uses *any* APCs, so this should have no impact - // on the UI thread. - break; // Break to outer loop, and waitforwork() again. - } - - // Use unsigned type to simplify range detection; - size_t signaled_index = result - WAIT_OBJECT_0; - if (signaled_index < objs_len) { - SignalWatcher(polling_index + signaled_index); - return; // We serviced a signaled object. - } - - if (objs_len == signaled_index) - return; // A WM_* message is available. - - DCHECK_NE(WAIT_FAILED, result) << GetLastError(); - - DCHECK(!objs || result == WAIT_TIMEOUT); - if (!use_polling) - return; - polling_index += objs_len; - } while (polling_index < total_objs); - // For compatibility, we didn't return sooner. This made us do *some* wait - // call(s) before returning. This will probably change in next rev. - if (!delay || !GetCurrentDelay()) - return; // No work done, but timer is ready to fire. - } + WaitForIOCompletion(timeout, NULL); } -bool MessagePumpForIO::ProcessNextObject() { - size_t total_objs = objects_.size(); - if (!total_objs) { - return false; +bool MessagePumpForIO::WaitForIOCompletion(DWORD timeout, IOHandler* filter) { + IOItem item; + if (completed_io_.empty() || !MatchCompletedIOItem(filter, &item)) { + // We have to ask the system for another IO completion. + if (!GetIOItem(timeout, &item)) + return false; + + if (ProcessInternalIOItem(item)) + return true; } - size_t polling_index = 0; // The first unprocessed object index. - do { - DCHECK(polling_index < total_objs); - size_t objs_len = total_objs - polling_index; - if (objs_len >= kMaxWaitObjects) - objs_len = kMaxWaitObjects - 1; - HANDLE* objs = polling_index + &objects_.front(); - - // Identify 1 pending object, or allow an IO APC to be completed. - DWORD result = WaitForMultipleObjectsEx(static_cast<DWORD>(objs_len), objs, - FALSE, // 1 signal is sufficient. - 0, // Wait 0ms. - false); // Not alertable (no APC). - - // Use unsigned type to simplify range detection; - size_t signaled_index = result - WAIT_OBJECT_0; - if (signaled_index < objs_len) { - SignalWatcher(polling_index + signaled_index); - return true; // We serviced a signaled object. + if (item.context->handler) { + if (filter && item.handler != filter) { + // Save this item for later + completed_io_.push_back(item); + } else { + DCHECK(item.context->handler == item.handler); + item.handler->OnIOCompleted(item.context, item.bytes_transfered, + item.error); } - - // If an handle is invalid, it will be WAIT_FAILED. - DCHECK_EQ(WAIT_TIMEOUT, result) << GetLastError(); - polling_index += objs_len; - } while (polling_index < total_objs); - return false; // We serviced nothing. + } else { + // The handler must be gone by now, just cleanup the mess. + delete item.context; + } + return true; } -bool MessagePumpForIO::SignalWatcher(size_t object_index) { - // Signal the watcher corresponding to the given index. - - DCHECK(objects_.size() > object_index); +// Asks the OS for another IO completion result. +bool MessagePumpForIO::GetIOItem(DWORD timeout, IOItem* item) { + memset(item, 0, sizeof(*item)); + ULONG_PTR key = NULL; + OVERLAPPED* overlapped = NULL; + if (!GetQueuedCompletionStatus(port_.Get(), &item->bytes_transfered, &key, + &overlapped, timeout)) { + if (!overlapped) + return false; // Nothing in the queue. + item->error = GetLastError(); + item->bytes_transfered = 0; + } - // On reception of OnObjectSignaled() to a Watcher object, it may call - // WatchObject(). watchers_ and objects_ will be modified. This is expected, - // so don't be afraid if, while tracing a OnObjectSignaled() function, the - // corresponding watchers_[result] is non-existant. - watchers_[object_index]->OnObjectSignaled(objects_[object_index]); + item->handler = reinterpret_cast<IOHandler*>(key); + item->context = reinterpret_cast<IOContext*>(overlapped); + return true; +} - // Signaled objects tend to be removed from the watch list, and then added - // back (appended). As a result, they move to the end of the objects_ array, - // and this should make their service "fair" (no HANDLEs should be starved). +bool MessagePumpForIO::ProcessInternalIOItem(const IOItem& item) { + if (this == reinterpret_cast<MessagePumpForIO*>(item.context) && + this == reinterpret_cast<MessagePumpForIO*>(item.handler)) { + // This is our internal completion. + DCHECK(!item.bytes_transfered); + InterlockedExchange(&have_work_, 0); + return true; + } + return false; +} - return true; +// Returns a completion item that was previously received. +bool MessagePumpForIO::MatchCompletedIOItem(IOHandler* filter, IOItem* item) { + DCHECK(!completed_io_.empty()); + for (std::list<IOItem>::iterator it = completed_io_.begin(); + it != completed_io_.end(); ++it) { + if (!filter || it->handler == filter) { + *item = *it; + completed_io_.erase(it); + return true; + } + } + return false; } } // namespace base diff --git a/base/message_pump_win.h b/base/message_pump_win.h index eb4253f..63222a8 100644 --- a/base/message_pump_win.h +++ b/base/message_pump_win.h @@ -5,10 +5,10 @@ #ifndef BASE_MESSAGE_PUMP_WIN_H_ #define BASE_MESSAGE_PUMP_WIN_H_ -#include <vector> - #include <windows.h> +#include <list> + #include "base/lock.h" #include "base/message_pump.h" #include "base/observer_list.h" @@ -17,50 +17,9 @@ namespace base { -// MessagePumpWin implements a "traditional" Windows message pump. It contains -// a nearly infinite loop that peeks out messages, and then dispatches them. -// Intermixed with those peeks are callouts to DoWork for pending tasks, -// DoDelayedWork for pending timers, and OnObjectSignaled for signaled objects. -// When there are no events to be serviced, this pump goes into a wait state. -// In most cases, this message pump handles all processing. -// -// However, when a task, or windows event, invokes on the stack a native dialog -// box or such, that window typically provides a bare bones (native?) message -// pump. That bare-bones message pump generally supports little more than a -// peek of the Windows message queue, followed by a dispatch of the peeked -// message. MessageLoop extends that bare-bones message pump to also service -// Tasks, at the cost of some complexity. -// -// The basic structure of the extension (refered to as a sub-pump) is that a -// special message, kMsgHaveWork, is repeatedly injected into the Windows -// Message queue. Each time the kMsgHaveWork message is peeked, checks are -// made for an extended set of events, including the availability of Tasks to -// run. -// -// After running a task, the special message kMsgHaveWork is again posted to -// the Windows Message queue, ensuring a future time slice for processing a -// future event. To prevent flooding the Windows Message queue, care is taken -// to be sure that at most one kMsgHaveWork message is EVER pending in the -// Window's Message queue. -// -// There are a few additional complexities in this system where, when there are -// no Tasks to run, this otherwise infinite stream of messages which drives the -// sub-pump is halted. The pump is automatically re-started when Tasks are -// queued. -// -// A second complexity is that the presence of this stream of posted tasks may -// prevent a bare-bones message pump from ever peeking a WM_PAINT or WM_TIMER. -// Such paint and timer events always give priority to a posted message, such as -// kMsgHaveWork messages. As a result, care is taken to do some peeking in -// between the posting of each kMsgHaveWork message (i.e., after kMsgHaveWork -// is peeked, and before a replacement kMsgHaveWork is posted). -// -// NOTE: Although it may seem odd that messages are used to start and stop this -// flow (as opposed to signaling objects, etc.), it should be understood that -// the native message pump will *only* respond to messages. As a result, it is -// an excellent choice. It is also helpful that the starter messages that are -// placed in the queue when new task arrive also awakens DoRunLoop. -// +// MessagePumpWin serves as the base for specialized versions of the MessagePump +// for Windows. It provides basic functionality like handling of observers and +// controlling the lifetime of the message pump. class MessagePumpWin : public MessagePump { public: // An Observer is an object that receives global notifications from the @@ -97,8 +56,8 @@ class MessagePumpWin : public MessagePump { virtual bool Dispatch(const MSG& msg) = 0; }; - MessagePumpWin(); - virtual ~MessagePumpWin(); + MessagePumpWin() : have_work_(0), state_(NULL) {} + virtual ~MessagePumpWin() {} // Add an Observer, which will start receiving notifications immediately. void AddObserver(Observer* observer); @@ -112,19 +71,12 @@ class MessagePumpWin : public MessagePump { void WillProcessMessage(const MSG& msg); void DidProcessMessage(const MSG& msg); - // Applications can call this to encourage us to process all pending WM_PAINT - // messages. This method will process all paint messages the Windows Message - // queue can provide, up to some fixed number (to avoid any infinite loops). - void PumpOutPendingPaintMessages(); - // Like MessagePump::Run, but MSG objects are routed through dispatcher. void RunWithDispatcher(Delegate* delegate, Dispatcher* dispatcher); // MessagePump methods: virtual void Run(Delegate* delegate) { RunWithDispatcher(delegate, NULL); } virtual void Quit(); - virtual void ScheduleWork(); - virtual void ScheduleDelayedWork(const Time& delayed_work_time); protected: struct RunState { @@ -138,20 +90,9 @@ class MessagePumpWin : public MessagePump { int run_depth; }; - static LRESULT CALLBACK WndProcThunk( - HWND hwnd, UINT message, WPARAM wparam, LPARAM lparam); virtual void DoRunLoop() = 0; - void InitMessageWnd(); - void HandleWorkMessage(); - void HandleTimerMessage(); - bool ProcessNextWindowsMessage(); - bool ProcessMessageHelper(const MSG& msg); - bool ProcessPumpReplacementMessage(); int GetCurrentDelay() const; - // A hidden message-only window. - HWND message_hwnd_; - ObserverList<Observer> observers_; // The time at which delayed work should run. @@ -170,33 +111,164 @@ class MessagePumpWin : public MessagePump { // MessagePumpForUI extends MessagePumpWin with methods that are particular to a // MessageLoop instantiated with TYPE_UI. // +// MessagePumpForUI implements a "traditional" Windows message pump. It contains +// a nearly infinite loop that peeks out messages, and then dispatches them. +// Intermixed with those peeks are callouts to DoWork for pending tasks, and +// DoDelayedWork for pending timers. When there are no events to be serviced, +// this pump goes into a wait state. In most cases, this message pump handles +// all processing. +// +// However, when a task, or windows event, invokes on the stack a native dialog +// box or such, that window typically provides a bare bones (native?) message +// pump. That bare-bones message pump generally supports little more than a +// peek of the Windows message queue, followed by a dispatch of the peeked +// message. MessageLoop extends that bare-bones message pump to also service +// Tasks, at the cost of some complexity. +// +// The basic structure of the extension (refered to as a sub-pump) is that a +// special message, kMsgHaveWork, is repeatedly injected into the Windows +// Message queue. Each time the kMsgHaveWork message is peeked, checks are +// made for an extended set of events, including the availability of Tasks to +// run. +// +// After running a task, the special message kMsgHaveWork is again posted to +// the Windows Message queue, ensuring a future time slice for processing a +// future event. To prevent flooding the Windows Message queue, care is taken +// to be sure that at most one kMsgHaveWork message is EVER pending in the +// Window's Message queue. +// +// There are a few additional complexities in this system where, when there are +// no Tasks to run, this otherwise infinite stream of messages which drives the +// sub-pump is halted. The pump is automatically re-started when Tasks are +// queued. +// +// A second complexity is that the presence of this stream of posted tasks may +// prevent a bare-bones message pump from ever peeking a WM_PAINT or WM_TIMER. +// Such paint and timer events always give priority to a posted message, such as +// kMsgHaveWork messages. As a result, care is taken to do some peeking in +// between the posting of each kMsgHaveWork message (i.e., after kMsgHaveWork +// is peeked, and before a replacement kMsgHaveWork is posted). +// +// NOTE: Although it may seem odd that messages are used to start and stop this +// flow (as opposed to signaling objects, etc.), it should be understood that +// the native message pump will *only* respond to messages. As a result, it is +// an excellent choice. It is also helpful that the starter messages that are +// placed in the queue when new task arrive also awakens DoRunLoop. +// class MessagePumpForUI : public MessagePumpWin { public: - MessagePumpForUI() {} - virtual ~MessagePumpForUI() {} + MessagePumpForUI(); + virtual ~MessagePumpForUI(); + + // MessagePump methods: + virtual void ScheduleWork(); + virtual void ScheduleDelayedWork(const Time& delayed_work_time); + + // Applications can call this to encourage us to process all pending WM_PAINT + // messages. This method will process all paint messages the Windows Message + // queue can provide, up to some fixed number (to avoid any infinite loops). + void PumpOutPendingPaintMessages(); + private: + static LRESULT CALLBACK WndProcThunk( + HWND hwnd, UINT message, WPARAM wparam, LPARAM lparam); virtual void DoRunLoop(); + void InitMessageWnd(); void WaitForWork(); + void HandleWorkMessage(); + void HandleTimerMessage(); + bool ProcessNextWindowsMessage(); + bool ProcessMessageHelper(const MSG& msg); + bool ProcessPumpReplacementMessage(); + + // A hidden message-only window. + HWND message_hwnd_; }; //----------------------------------------------------------------------------- // MessagePumpForIO extends MessagePumpWin with methods that are particular to a -// MessageLoop instantiated with TYPE_IO. +// MessageLoop instantiated with TYPE_IO. This version of MessagePump does not +// deal with Windows mesagges, and instead has a Run loop based on Completion +// Ports so it is better suited for IO operations. // class MessagePumpForIO : public MessagePumpWin { public: - // Used with WatchObject to asynchronously monitor the signaled state of a - // HANDLE object. - class Watcher { - public: - virtual ~Watcher() {} - // Called from MessageLoop::Run when a signalled object is detected. - virtual void OnObjectSignaled(HANDLE object) = 0; - }; + struct IOContext; // Clients interested in receiving OS notifications when asynchronous IO // operations complete should implement this interface and register themselves // with the message pump. + // + // Typical use #1: + // // Use only when there are no user's buffers involved on the actual IO, + // // so that all the cleanup can be done by the message pump. + // class MyFile : public IOHandler { + // MyFile() { + // ... + // context_ = new IOContext; + // context_->handler = this; + // message_pump->RegisterIOHandler(file_, this); + // } + // ~MyFile() { + // if (pending_) { + // // By setting the handler to NULL, we're asking for this context + // // to be deleted when received, without calling back to us. + // context_->handler = NULL; + // } else { + // delete context_; + // } + // } + // virtual void OnIOCompleted(IOContext* context, DWORD bytes_transfered, + // DWORD error) { + // pending_ = false; + // } + // void DoSomeIo() { + // ... + // // The only buffer required for this operation is the overlapped + // // structure. + // ConnectNamedPipe(file_, &context_->overlapped); + // pending_ = true; + // } + // bool pending_; + // IOContext* context_; + // HANDLE file_; + // }; + // + // Typical use #2: + // class MyFile : public IOHandler { + // MyFile() { + // ... + // message_pump->RegisterIOHandler(file_, this); + // } + // // Plus some code to make sure that this destructor is not called + // // while there are pending IO operations. + // ~MyFile() { + // } + // virtual void OnIOCompleted(IOContext* context, DWORD bytes_transfered, + // DWORD error) { + // ... + // delete context; + // } + // void DoSomeIo() { + // ... + // IOContext* context = new IOContext; + // // This is not used for anything. It just prevents the context from + // // being considered "abandoned". + // context->handler = this; + // ReadFile(file_, buffer, num_bytes, &read, &context->overlapped); + // } + // HANDLE file_; + // }; + // + // Typical use #3: + // Same as the previous example, except that in order to deal with the + // requirement stated for the destructor, the class calls WaitForIOCompletion + // from the destructor to block until all IO finishes. + // ~MyFile() { + // while(pending_) + // message_pump->WaitForIOCompletion(INFINITE, this); + // } + // class IOHandler { public: virtual ~IOHandler() {} @@ -204,46 +276,66 @@ class MessagePumpForIO : public MessagePumpWin { // |context| completes. |error| is the Win32 error code of the IO operation // (ERROR_SUCCESS if there was no error). |bytes_transfered| will be zero // on error. - virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, + virtual void OnIOCompleted(IOContext* context, DWORD bytes_transfered, DWORD error) = 0; }; - MessagePumpForIO() {} + // The extended context that should be used as the base structure on every + // overlapped IO operation. |handler| must be set to the registered IOHandler + // for the given file when the operation is started, and it can be set to NULL + // before the operation completes to indicate that the handler should not be + // called anymore, and instead, the IOContext should be deleted when the OS + // notifies the completion of this operation. Please remember that any buffers + // involved with an IO operation should be around until the callback is + // received, so this technique can only be used for IO that do not involve + // additional buffers (other than the overlapped structure itself). + struct IOContext { + OVERLAPPED overlapped; + IOHandler* handler; + }; + + MessagePumpForIO(); virtual ~MessagePumpForIO() {} - // Have the current thread's message loop watch for a signaled object. - // Pass a null watcher to stop watching the object. - void WatchObject(HANDLE, Watcher*); + // MessagePump methods: + virtual void ScheduleWork(); + virtual void ScheduleDelayedWork(const Time& delayed_work_time); // Register the handler to be used when asynchronous IO for the given file // completes. The registration persists as long as |file_handle| is valid, so // |handler| must be valid as long as there is pending IO for the given file. void RegisterIOHandler(HANDLE file_handle, IOHandler* handler); - // This is just a throw away function to ease transition to completion ports. - // Pass NULL for handler to stop tracking this request. WARNING: cancellation - // correctness is the responsibility of the caller. |context| must contain a - // valid manual reset event, but the caller should not interact directly with - // it. The registration can live across a single IO operation, or it can live - // across multiple IO operations without having to reset it after each IO - // completion callback. Internally, there will be a WatchObject registration - // alive as long as this context registration is in effect. It is an error - // to unregister a context that has not been registered before. - void RegisterIOContext(OVERLAPPED* context, IOHandler* handler); + // Waits for the next IO completion that should be processed by |filter|, for + // up to |timeout| milliseconds. Return true if any IO operation completed, + // regardless of the involved handler, and false if the timeout expired. If + // the completion port received any message and the involved IO handler + // matches |filter|, the callback is called before returning from this code; + // if the handler is not the one that we are looking for, the callback will + // be postponed for another time, so reentrancy problems can be avoided. + // External use of this method should be reserved for the rare case when the + // caller is willing to allow pausing regular task dispatching on this thread. + bool WaitForIOCompletion(DWORD timeout, IOHandler* filter); private: + struct IOItem { + IOHandler* handler; + IOContext* context; + DWORD bytes_transfered; + DWORD error; + }; + virtual void DoRunLoop(); void WaitForWork(); - bool ProcessNextObject(); - bool SignalWatcher(size_t object_index); - - // A vector of objects (and corresponding watchers) that are routinely - // serviced by this message pump. - std::vector<HANDLE> objects_; - std::vector<Watcher*> watchers_; + bool MatchCompletedIOItem(IOHandler* filter, IOItem* item); + bool GetIOItem(DWORD timeout, IOItem* item); + bool ProcessInternalIOItem(const IOItem& item); // The completion port associated with this thread. ScopedHandle port_; + // This list will be empty almost always. It stores IO completions that have + // not been delivered yet because somebody was doing cleanup. + std::list<IOItem> completed_io_; }; } // namespace base diff --git a/chrome/common/ipc_channel.cc b/chrome/common/ipc_channel.cc index a9d73e7..934ea9b 100644 --- a/chrome/common/ipc_channel.cc +++ b/chrome/common/ipc_channel.cc @@ -20,24 +20,21 @@ namespace IPC { //------------------------------------------------------------------------------ -Channel::State::State() - : is_pending(false) { - memset(&overlapped, 0, sizeof(overlapped)); - overlapped.hEvent = CreateEvent(NULL, // default security attributes - TRUE, // manual-reset event - TRUE, // initial state = signaled - NULL); // unnamed event object +Channel::State::State(Channel* channel) : is_pending(false) { + memset(&context.overlapped, 0, sizeof(context.overlapped)); + context.handler = channel; } Channel::State::~State() { - if (overlapped.hEvent) - CloseHandle(overlapped.hEvent); + COMPILE_ASSERT(!offsetof(Channel::State, context), starts_with_io_context); } //------------------------------------------------------------------------------ Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener) - : pipe_(INVALID_HANDLE_VALUE), + : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)), + ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)), + pipe_(INVALID_HANDLE_VALUE), listener_(listener), waiting_connect_(mode == MODE_SERVER), processing_incoming_(false), @@ -50,23 +47,29 @@ Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener) } void Channel::Close() { - // make sure we are no longer watching the pipe events - MessageLoopForIO* loop = MessageLoopForIO::current(); - if (input_state_.is_pending) { - input_state_.is_pending = false; - loop->RegisterIOContext(&input_state_.overlapped, NULL); - } - - if (output_state_.is_pending) { - output_state_.is_pending = false; - loop->RegisterIOContext(&output_state_.overlapped, NULL); + bool waited = false; + if (input_state_.is_pending || output_state_.is_pending) { + CancelIo(pipe_); + waited = true; } + // Closing the handle at this point prevents us from issuing more requests + // form OnIOCompleted(). if (pipe_ != INVALID_HANDLE_VALUE) { CloseHandle(pipe_); pipe_ = INVALID_HANDLE_VALUE; } + // Make sure all IO has completed. + base::Time start = base::Time::Now(); + while (input_state_.is_pending || output_state_.is_pending) { + MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); + } + if (waited) { + // We want to see if we block the message loop for too long. + UMA_HISTOGRAM_TIMES(L"AsyncIO.IPCChannelClose", base::Time::Now() - start); + } + while (!output_queue_.empty()) { Message* m = output_queue_.front(); output_queue_.pop(); @@ -175,7 +178,7 @@ bool Channel::Connect() { // to true, we indicate to OnIOCompleted that this is the special // initialization signal. MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod( - &Channel::OnIOCompleted, &input_state_.overlapped, 0, 0)); + &Channel::OnIOCompleted, &input_state_.context, 0, 0)); } if (!waiting_connect_) @@ -184,15 +187,14 @@ bool Channel::Connect() { } bool Channel::ProcessConnection() { - if (input_state_.is_pending) { + if (input_state_.is_pending) input_state_.is_pending = false; - MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped, - NULL); - } // Do we have a client connected to our pipe? - DCHECK(pipe_ != INVALID_HANDLE_VALUE); - BOOL ok = ConnectNamedPipe(pipe_, &input_state_.overlapped); + if (INVALID_HANDLE_VALUE == pipe_) + return false; + + BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped); DWORD err = GetLastError(); if (ok) { @@ -205,8 +207,6 @@ bool Channel::ProcessConnection() { switch (err) { case ERROR_IO_PENDING: input_state_.is_pending = true; - MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped, - this); break; case ERROR_PIPE_CONNECTED: waiting_connect_ = false; @@ -219,40 +219,41 @@ bool Channel::ProcessConnection() { return true; } -bool Channel::ProcessIncomingMessages(OVERLAPPED* context, +bool Channel::ProcessIncomingMessages(MessageLoopForIO::IOContext* context, DWORD bytes_read) { if (input_state_.is_pending) { input_state_.is_pending = false; DCHECK(context); - MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped, - NULL); if (!context || !bytes_read) return false; } else { // This happens at channel initialization. - DCHECK(!bytes_read && context == &input_state_.overlapped); + DCHECK(!bytes_read && context == &input_state_.context); } for (;;) { if (bytes_read == 0) { + if (INVALID_HANDLE_VALUE == pipe_) + return false; + // Read from pipe... BOOL ok = ReadFile(pipe_, input_buf_, BUF_SIZE, &bytes_read, - &input_state_.overlapped); + &input_state_.context.overlapped); if (!ok) { DWORD err = GetLastError(); if (err == ERROR_IO_PENDING) { - MessageLoopForIO::current()->RegisterIOContext( - &input_state_.overlapped, this); input_state_.is_pending = true; return true; } LOG(ERROR) << "pipe error: " << err; return false; } + input_state_.is_pending = true; + return true; } DCHECK(bytes_read); @@ -303,15 +304,13 @@ bool Channel::ProcessIncomingMessages(OVERLAPPED* context, return true; } -bool Channel::ProcessOutgoingMessages(OVERLAPPED* context, +bool Channel::ProcessOutgoingMessages(MessageLoopForIO::IOContext* context, DWORD bytes_written) { DCHECK(!waiting_connect_); // Why are we trying to send messages if there's // no connection? if (output_state_.is_pending) { DCHECK(context); - MessageLoopForIO::current()->RegisterIOContext(&output_state_.overlapped, - NULL); output_state_.is_pending = false; if (!context || bytes_written == 0) { DWORD err = GetLastError(); @@ -325,42 +324,41 @@ bool Channel::ProcessOutgoingMessages(OVERLAPPED* context, delete m; } - while (!output_queue_.empty()) { - // Write to pipe... - Message* m = output_queue_.front(); - BOOL ok = WriteFile(pipe_, - m->data(), - m->size(), - &bytes_written, - &output_state_.overlapped); - if (!ok) { - DWORD err = GetLastError(); - if (err == ERROR_IO_PENDING) { - MessageLoopForIO::current()->RegisterIOContext( - &output_state_.overlapped, this); - output_state_.is_pending = true; + if (output_queue_.empty()) + return true; + + if (INVALID_HANDLE_VALUE == pipe_) + return false; + + // Write to pipe... + Message* m = output_queue_.front(); + BOOL ok = WriteFile(pipe_, + m->data(), + m->size(), + &bytes_written, + &output_state_.context.overlapped); + if (!ok) { + DWORD err = GetLastError(); + if (err == ERROR_IO_PENDING) { + output_state_.is_pending = true; #ifdef IPC_MESSAGE_DEBUG_EXTRA - DLOG(INFO) << "sent pending message @" << m << " on channel @" << - this << " with type " << m->type(); + DLOG(INFO) << "sent pending message @" << m << " on channel @" << + this << " with type " << m->type(); #endif - return true; - } - LOG(ERROR) << "pipe error: " << err; - return false; + return true; } - DCHECK(bytes_written == m->size()); - output_queue_.pop(); + LOG(ERROR) << "pipe error: " << err; + return false; + } #ifdef IPC_MESSAGE_DEBUG_EXTRA - DLOG(INFO) << "sent message @" << m << " on channel @" << this << - " with type " << m->type(); + DLOG(INFO) << "sent message @" << m << " on channel @" << this << + " with type " << m->type(); #endif - delete m; - } - + output_state_.is_pending = true; return true; } @@ -400,12 +398,13 @@ bool Channel::ProcessPendingMessages(DWORD max_wait_msec) { #endif } -void Channel::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, - DWORD error) { +void Channel::OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_transfered, DWORD error) { bool ok; - if (context == &input_state_.overlapped) { + if (context == &input_state_.context) { if (waiting_connect_) { - ProcessConnection(); + if (!ProcessConnection()) + return; // We may have some messages queued up to send... if (!output_queue_.empty() && !output_state_.is_pending) ProcessOutgoingMessages(NULL, 0); @@ -419,10 +418,11 @@ void Channel::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, ok = ProcessIncomingMessages(context, bytes_transfered); processing_incoming_ = false; } else { - DCHECK(context == &output_state_.overlapped); + DCHECK(context == &output_state_.context); ok = ProcessOutgoingMessages(context, bytes_transfered); } - if (!ok) { + if (!ok && INVALID_HANDLE_VALUE != pipe_) { + // We don't want to re-enter Close(). Close(); listener_->OnChannelError(); } diff --git a/chrome/common/ipc_channel.h b/chrome/common/ipc_channel.h index e3e4ca2..b69f962 100644 --- a/chrome/common/ipc_channel.h +++ b/chrome/common/ipc_channel.h @@ -99,12 +99,14 @@ class Channel : public MessageLoopForIO::IOHandler, const std::wstring PipeName(const std::wstring& channel_id) const; bool CreatePipe(const std::wstring& channel_id, Mode mode); bool ProcessConnection(); - bool ProcessIncomingMessages(OVERLAPPED* context, DWORD bytes_read); - bool ProcessOutgoingMessages(OVERLAPPED* context, DWORD bytes_written); + bool ProcessIncomingMessages(MessageLoopForIO::IOContext* context, + DWORD bytes_read); + bool ProcessOutgoingMessages(MessageLoopForIO::IOContext* context, + DWORD bytes_written); // MessageLoop::IOHandler implementation. - virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, - DWORD error); + virtual void OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_transfered, DWORD error); private: enum { @@ -112,9 +114,9 @@ class Channel : public MessageLoopForIO::IOHandler, }; struct State { - State(); + explicit State(Channel* channel); ~State(); - OVERLAPPED overlapped; + MessageLoopForIO::IOContext context; bool is_pending; }; diff --git a/chrome/common/ipc_sync_channel_unittest.cc b/chrome/common/ipc_sync_channel_unittest.cc index 1d28b06..749a507 100644 --- a/chrome/common/ipc_sync_channel_unittest.cc +++ b/chrome/common/ipc_sync_channel_unittest.cc @@ -104,7 +104,7 @@ class Worker : public Channel::Listener, public Message::Sender { channel_->Close(); } void Start() { - StartThread(&listener_thread_); + StartThread(&listener_thread_, MessageLoop::TYPE_DEFAULT); ListenerThread()->message_loop()->PostTask(FROM_HERE, NewRunnableMethod( this, &Worker::OnStart)); } @@ -149,7 +149,7 @@ class Worker : public Channel::Listener, public Message::Sender { // Called on the listener thread to create the sync channel. void OnStart() { // Link ipc_thread_, listener_thread_ and channel_ altogether. - StartThread(&ipc_thread_); + StartThread(&ipc_thread_, MessageLoop::TYPE_IO); channel_.reset(new SyncChannel( channel_name_, mode_, this, NULL, ipc_thread_.message_loop(), true, TestProcess::GetShutDownEvent())); @@ -178,9 +178,9 @@ class Worker : public Channel::Listener, public Message::Sender { IPC_END_MESSAGE_MAP() } - void StartThread(base::Thread* thread) { + void StartThread(base::Thread* thread, MessageLoop::Type type) { base::Thread::Options options; - options.message_loop_type = MessageLoop::TYPE_IO; + options.message_loop_type = type; thread->StartWithOptions(options); } diff --git a/net/base/file_stream_win.cc b/net/base/file_stream_win.cc index d7b3d1d..56d0368 100644 --- a/net/base/file_stream_win.cc +++ b/net/base/file_stream_win.cc @@ -50,55 +50,55 @@ static int MapErrorCode(DWORD err) { class FileStream::AsyncContext : public MessageLoopForIO::IOHandler { public: AsyncContext(FileStream* owner) - : owner_(owner), overlapped_(), callback_(NULL) { - overlapped_.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); - } - - ~AsyncContext() { - if (callback_) - MessageLoopForIO::current()->RegisterIOContext(&overlapped_, NULL); - CloseHandle(overlapped_.hEvent); + : owner_(owner), context_(), callback_(NULL) { + context_.handler = this; } + ~AsyncContext(); void IOCompletionIsPending(CompletionCallback* callback); - - OVERLAPPED* overlapped() { return &overlapped_; } + + OVERLAPPED* overlapped() { return &context_.overlapped; } CompletionCallback* callback() const { return callback_; } private: - // MessageLoopForIO::IOHandler implementation: - virtual void OnIOCompleted(OVERLAPPED* context, DWORD num_bytes, - DWORD error); + virtual void OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_read, DWORD error); FileStream* owner_; - OVERLAPPED overlapped_; + MessageLoopForIO::IOContext context_; CompletionCallback* callback_; }; +FileStream::AsyncContext::~AsyncContext() { + bool waited = false; + base::Time start = base::Time::Now(); + while (callback_) { + waited = true; + MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); + } + if (waited) { + // We want to see if we block the message loop for too long. + UMA_HISTOGRAM_TIMES(L"AsyncIO.FileStreamClose", base::Time::Now() - start); + } +} + void FileStream::AsyncContext::IOCompletionIsPending( CompletionCallback* callback) { DCHECK(!callback_); callback_ = callback; - - MessageLoopForIO::current()->RegisterIOContext(&overlapped_, this); } -void FileStream::AsyncContext::OnIOCompleted(OVERLAPPED* context, - DWORD num_bytes, - DWORD error) { - DCHECK(&overlapped_ == context); +void FileStream::AsyncContext::OnIOCompleted( + MessageLoopForIO::IOContext* context, DWORD bytes_read, DWORD error) { + DCHECK(&context_ == context); DCHECK(callback_); - MessageLoopForIO::current()->RegisterIOContext(&overlapped_, NULL); - - HANDLE handle = owner_->file_; - - int result = static_cast<int>(num_bytes); + int result = static_cast<int>(bytes_read); if (error && error != ERROR_HANDLE_EOF) result = MapErrorCode(error); - - if (num_bytes) - IncrementOffset(&overlapped_, num_bytes); + + if (bytes_read) + IncrementOffset(&context->overlapped, bytes_read); CompletionCallback* temp = NULL; std::swap(temp, callback_); @@ -115,11 +115,14 @@ FileStream::~FileStream() { } void FileStream::Close() { + if (file_ != INVALID_HANDLE_VALUE) + CancelIo(file_); + + async_context_.reset(); if (file_ != INVALID_HANDLE_VALUE) { CloseHandle(file_); file_ = INVALID_HANDLE_VALUE; } - async_context_.reset(); } int FileStream::Open(const std::wstring& path, int open_flags) { @@ -211,9 +214,10 @@ int FileStream::Read( LOG(WARNING) << "ReadFile failed: " << error; rv = MapErrorCode(error); } + } else if (overlapped) { + async_context_->IOCompletionIsPending(callback); + rv = ERR_IO_PENDING; } else { - if (overlapped) - IncrementOffset(overlapped, bytes_read); rv = static_cast<int>(bytes_read); } return rv; @@ -224,7 +228,7 @@ int FileStream::Write( if (!IsOpen()) return ERR_UNEXPECTED; DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE); - + OVERLAPPED* overlapped = NULL; if (async_context_.get()) { DCHECK(!async_context_->callback()); @@ -242,9 +246,10 @@ int FileStream::Write( LOG(WARNING) << "WriteFile failed: " << error; rv = MapErrorCode(error); } + } else if (overlapped) { + async_context_->IOCompletionIsPending(callback); + rv = ERR_IO_PENDING; } else { - if (overlapped) - IncrementOffset(overlapped, bytes_written); rv = static_cast<int>(bytes_written); } return rv; diff --git a/net/disk_cache/cache_util_win.cc b/net/disk_cache/cache_util_win.cc index 377cbbf..adb9d8a5 100644 --- a/net/disk_cache/cache_util_win.cc +++ b/net/disk_cache/cache_util_win.cc @@ -7,6 +7,7 @@ #include <windows.h> #include "base/logging.h" +#include "base/message_loop.h" #include "base/scoped_handle.h" #include "base/file_util.h" @@ -38,6 +39,9 @@ void DeleteFiles(const wchar_t* path, const wchar_t* search_name) { namespace disk_cache { +// Implemented in file_win.cc. +MessageLoopForIO::IOHandler* GetFileIOHandler(); + bool MoveCache(const std::wstring& from_path, const std::wstring& to_path) { // I don't want to use the shell version of move because if something goes // wrong, that version will attempt to move file by file and fail at the end. @@ -63,12 +67,8 @@ bool DeleteCacheFile(const std::wstring& name) { void WaitForPendingIO(int* num_pending_io) { while (*num_pending_io) { // Asynchronous IO operations may be in flight and the completion may end - // up calling us back so let's wait for them (we need an alertable wait). - // The idea is to let other threads do usefull work and at the same time - // allow more than one IO to finish... 20 mS later, we process all queued - // APCs and see if we have to repeat the wait. - Sleep(20); - SleepEx(0, TRUE); + // up calling us back so let's wait for them. + MessageLoopForIO::current()->WaitForIOCompletion(100, GetFileIOHandler()); } } diff --git a/net/disk_cache/disk_cache_perftest.cc b/net/disk_cache/disk_cache_perftest.cc index c160677..33cad54 100644 --- a/net/disk_cache/disk_cache_perftest.cc +++ b/net/disk_cache/disk_cache_perftest.cc @@ -9,6 +9,7 @@ #include "base/basictypes.h" #include "base/file_util.h" #include "base/perftimer.h" +#include "base/platform_test.h" #if defined(OS_WIN) #include "base/scoped_handle.h" #endif @@ -17,7 +18,6 @@ #include "net/base/net_errors.h" #include "net/disk_cache/block_files.h" #include "net/disk_cache/disk_cache.h" -#include "net/disk_cache/disk_cache_test_base.h" #include "net/disk_cache/disk_cache_test_util.h" #include "net/disk_cache/hash.h" #include "testing/gtest/include/gtest/gtest.h" @@ -28,6 +28,8 @@ extern int g_cache_tests_max_id; extern volatile int g_cache_tests_received; extern volatile bool g_cache_tests_error; +typedef PlatformTest DiskCacheTest; + namespace { bool EvictFileFromSystemCache(const wchar_t* name) { @@ -226,6 +228,7 @@ TEST_F(DiskCacheTest, CacheBackendPerformance) { int ret = TimeWrite(num_entries, cache, &entries); EXPECT_EQ(ret, g_cache_tests_received); + MessageLoop::current()->RunAllPending(); delete cache; std::wstring filename(path); @@ -257,6 +260,7 @@ TEST_F(DiskCacheTest, CacheBackendPerformance) { ret = TimeRead(num_entries, cache, entries, false); EXPECT_EQ(ret, g_cache_tests_received); + MessageLoop::current()->RunAllPending(); delete cache; } @@ -266,6 +270,7 @@ TEST_F(DiskCacheTest, CacheBackendPerformance) { // fragmented, or if we have multiple files. This test measures that scenario, // by using multiple, highly fragmented files. TEST_F(DiskCacheTest, BlockFilesPerformance) { + MessageLoopForIO message_loop; std::wstring path = GetCachePath(); ASSERT_TRUE(DeleteCache(path.c_str())); @@ -303,4 +308,5 @@ TEST_F(DiskCacheTest, BlockFilesPerformance) { } timer2.Done(); + MessageLoop::current()->RunAllPending(); } diff --git a/net/disk_cache/disk_cache_test_base.cc b/net/disk_cache/disk_cache_test_base.cc index 417792f..0a3e07b 100644 --- a/net/disk_cache/disk_cache_test_base.cc +++ b/net/disk_cache/disk_cache_test_base.cc @@ -8,6 +8,10 @@ #include "net/disk_cache/disk_cache_test_util.h" #include "net/disk_cache/mem_backend_impl.h" +void DiskCacheTest::TearDown() { + MessageLoop::current()->RunAllPending(); +} + void DiskCacheTestWithCache::SetMaxSize(int size) { size_ = size; if (cache_impl_) @@ -73,6 +77,7 @@ void DiskCacheTestWithCache::InitDiskCache() { void DiskCacheTestWithCache::TearDown() { + MessageLoop::current()->RunAllPending(); delete cache_; if (!memory_only_) { diff --git a/net/disk_cache/disk_cache_test_base.h b/net/disk_cache/disk_cache_test_base.h index ca56be1f..6b75361 100644 --- a/net/disk_cache/disk_cache_test_base.h +++ b/net/disk_cache/disk_cache_test_base.h @@ -9,12 +9,6 @@ #include "base/platform_test.h" #include "testing/gtest/include/gtest/gtest.h" -// These tests can use the path service, which uses autoreleased objects on the -// Mac, so this needs to be a PlatformTest. Even tests that do not require a -// cache (and that do not need to be a DiskCacheTestWithCache) are susceptible -// to this problem; all such tests should use TEST_F(DiskCacheTest, ...). -typedef PlatformTest DiskCacheTest; - namespace disk_cache { class Backend; @@ -23,6 +17,14 @@ class MemBackendImpl; } // namespace disk_cache +// These tests can use the path service, which uses autoreleased objects on the +// Mac, so this needs to be a PlatformTest. Even tests that do not require a +// cache (and that do not need to be a DiskCacheTestWithCache) are susceptible +// to this problem; all such tests should use TEST_F(DiskCacheTest, ...). +class DiskCacheTest : public PlatformTest { + virtual void TearDown(); +}; + // Provides basic support for cache related tests. class DiskCacheTestWithCache : public DiskCacheTest { protected: diff --git a/net/disk_cache/entry_impl.cc b/net/disk_cache/entry_impl.cc index 844ff40..2821540 100644 --- a/net/disk_cache/entry_impl.cc +++ b/net/disk_cache/entry_impl.cc @@ -16,25 +16,8 @@ using base::TimeDelta; namespace { -// This is a simple Task to execute the callback (from the message loop instead -// of the APC). -class InvokeCallback : public Task { - public: - InvokeCallback(net::CompletionCallback* callback, int argument) - : callback_(callback), argument_(argument) {} - - virtual void Run() { - callback_->Run(argument_); - } - - private: - net::CompletionCallback* callback_; - int argument_; - DISALLOW_EVIL_CONSTRUCTORS(InvokeCallback); -}; - -// This class implements FileIOCallback to buffer the callback from an IO -// operation from the actual IO class. +// This class implements FileIOCallback to buffer the callback from a file IO +// operation from the actual net class. class SyncCallback: public disk_cache::FileIOCallback { public: SyncCallback(disk_cache::EntryImpl* entry, @@ -57,10 +40,8 @@ class SyncCallback: public disk_cache::FileIOCallback { void SyncCallback::OnFileIOComplete(int bytes_copied) { entry_->DecrementIoCount(); entry_->Release(); - if (callback_) { - InvokeCallback* task = new InvokeCallback(callback_, bytes_copied); - MessageLoop::current()->PostTask(FROM_HERE, task); - } + if (callback_) + callback_->Run(bytes_copied); delete this; } @@ -556,9 +537,11 @@ void EntryImpl::DeleteData(Addr address, int index) { if (files_[index]) files_[index] = NULL; // Releases the object. - if (!DeleteCacheFile(backend_->GetFileName(address))) + if (!DeleteCacheFile(backend_->GetFileName(address))) { + UMA_HISTOGRAM_COUNTS(L"DiskCache.DeleteFailed", 1); LOG(ERROR) << "Failed to delete " << backend_->GetFileName(address) << " from the cache."; + } } else { backend_->DeleteBlock(address, true); } @@ -711,7 +694,6 @@ bool EntryImpl::ImportSeparateFile(int index, int offset, int buf_len) { return true; } - // The common scenario is that this is called from the destructor of the entry, // to write to disk what we have buffered. We don't want to hold the destructor // until the actual IO finishes, so we'll send an asynchronous write that will @@ -744,6 +726,13 @@ bool EntryImpl::Flush(int index, int size, bool async) { if (!file) return false; + // TODO(rvargas): figure out if it's worth to re-enable posting operations. + // Right now it is only used from GrowUserBuffer, not the destructor, and + // it is not accounted for from the point of view of the total number of + // pending operations of the cache. It is also racing with the actual write + // on the GrowUserBuffer path because there is no code to exclude the range + // that is going to be written. + async = false; if (async) { if (!file->PostWrite(user_buffers_[index].get(), len, offset)) return false; diff --git a/net/disk_cache/file_win.cc b/net/disk_cache/file_win.cc index 0e8e3f3..bc89975 100644 --- a/net/disk_cache/file_win.cc +++ b/net/disk_cache/file_win.cc @@ -4,62 +4,39 @@ #include "net/disk_cache/file.h" +#include "base/message_loop.h" +#include "base/singleton.h" #include "net/disk_cache/disk_cache.h" namespace { -// This class implements FileIOCallback to perform IO operations -// when the callback parameter of the operation is NULL. -class SyncCallback: public disk_cache::FileIOCallback { - public: - SyncCallback() : called_(false) {} - ~SyncCallback() {} - - virtual void OnFileIOComplete(int bytes_copied); - void WaitForResult(int* bytes_copied); - private: - bool called_; - int actual_; -}; - -void SyncCallback::OnFileIOComplete(int bytes_copied) { - actual_ = bytes_copied; - called_ = true; -} - -// Waits for the IO operation to complete. -void SyncCallback::WaitForResult(int* bytes_copied) { - for (;;) { - SleepEx(INFINITE, TRUE); - if (called_) - break; - } - *bytes_copied = actual_; -} - // Structure used for asynchronous operations. struct MyOverlapped { - OVERLAPPED overlapped; - disk_cache::File* file; - disk_cache::FileIOCallback* callback; - const void* buffer; - DWORD actual_bytes; - bool async; // Invoke the callback form the completion. - bool called; // Completion received. - bool delete_buffer; // Delete the user buffer at completion. -}; + MyOverlapped(disk_cache::File* file, size_t offset, + disk_cache::FileIOCallback* callback); + ~MyOverlapped(); + OVERLAPPED* overlapped() { + return &context_.overlapped; + } -COMPILE_ASSERT(!offsetof(MyOverlapped, overlapped), starts_with_overlapped); + MessageLoopForIO::IOContext context_; + scoped_refptr<disk_cache::File> file_; + disk_cache::FileIOCallback* callback_; + const void* buffer_; + bool delete_buffer_; // Delete the user buffer at completion. +}; -} // namespace +COMPILE_ASSERT(!offsetof(MyOverlapped, context_), starts_with_overlapped); -namespace disk_cache { +// Helper class to handle the IO completion notifications from the message loop. +class CompletionHandler : public MessageLoopForIO::IOHandler { + virtual void OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD actual_bytes, DWORD error); +}; -// SyncCallback to be invoked as an APC when the asynchronous operation -// completes. -void CALLBACK IoCompletion(DWORD error, DWORD actual_bytes, - OVERLAPPED* overlapped) { - MyOverlapped* data = reinterpret_cast<MyOverlapped*>(overlapped); +void CompletionHandler::OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD actual_bytes, DWORD error) { + MyOverlapped* data = reinterpret_cast<MyOverlapped*>(context); if (error) { DCHECK(!actual_bytes); @@ -67,29 +44,39 @@ void CALLBACK IoCompletion(DWORD error, DWORD actual_bytes, NOTREACHED(); } - if (data->delete_buffer) { - DCHECK(!data->callback); - data->file->Release(); - delete data->buffer; - delete data; - return; - } + if (data->callback_) + data->callback_->OnFileIOComplete(static_cast<int>(actual_bytes)); - if (data->async) { - data->callback->OnFileIOComplete(static_cast<int>(actual_bytes)); - data->file->Release(); - delete data; - } else { - // Somebody is waiting for this so don't delete data and instead notify - // that we were called. - data->actual_bytes = actual_bytes; - data->file->Release(); - data->called = true; + delete data; +} + +MyOverlapped::MyOverlapped(disk_cache::File* file, size_t offset, + disk_cache::FileIOCallback* callback) { + memset(this, 0, sizeof(*this)); + context_.handler = Singleton<CompletionHandler>::get(); + context_.overlapped.Offset = static_cast<DWORD>(offset); + file_ = file; + callback_ = callback; +} + +MyOverlapped::~MyOverlapped() { + if (delete_buffer_) { + DCHECK(!callback_); + delete buffer_; } } +} // namespace + +namespace disk_cache { + +// Used from WaitForPendingIO() when the cache is being destroyed. +MessageLoopForIO::IOHandler* GetFileIOHandler() { + return Singleton<CompletionHandler>::get(); +} + File::File(base::PlatformFile file) - : init_(true), mixed_(true), platform_file_(INVALID_HANDLE_VALUE), + : init_(true), platform_file_(INVALID_HANDLE_VALUE), sync_platform_file_(file) { } @@ -99,23 +86,22 @@ bool File::Init(const std::wstring& name) { return false; platform_file_ = CreateFile(name.c_str(), GENERIC_READ | GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, - FILE_FLAG_OVERLAPPED, NULL); + FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, + OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); if (INVALID_HANDLE_VALUE == platform_file_) return false; + MessageLoopForIO::current()->RegisterIOHandler( + platform_file_, Singleton<CompletionHandler>::get()); + init_ = true; - if (mixed_) { - sync_platform_file_ = CreateFile(name.c_str(), GENERIC_READ | GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, - OPEN_EXISTING, 0, NULL); - - if (INVALID_HANDLE_VALUE == sync_platform_file_) - return false; - } else { - sync_platform_file_ = INVALID_HANDLE_VALUE; - } + sync_platform_file_ = CreateFile(name.c_str(), GENERIC_READ | GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, + OPEN_EXISTING, 0, NULL); + + if (INVALID_HANDLE_VALUE == sync_platform_file_) + return false; return true; } @@ -126,13 +112,13 @@ File::~File() { if (INVALID_HANDLE_VALUE != platform_file_) CloseHandle(platform_file_); - if (mixed_ && INVALID_HANDLE_VALUE != sync_platform_file_) + if (INVALID_HANDLE_VALUE != sync_platform_file_) CloseHandle(sync_platform_file_); } base::PlatformFile File::platform_file() const { DCHECK(init_); - return (INVALID_HANDLE_VALUE == platform_file_) ? sync_platform_file_ : + return (INVALID_HANDLE_VALUE == platform_file_) ? sync_platform_file_ : platform_file_; } @@ -145,13 +131,11 @@ bool File::IsValid() const { bool File::Read(void* buffer, size_t buffer_len, size_t offset) { DCHECK(init_); - if (!mixed_ || buffer_len > ULONG_MAX || offset > LONG_MAX) + if (buffer_len > ULONG_MAX || offset > LONG_MAX) return false; - DWORD ret = SetFilePointer(sync_platform_file_, - static_cast<LONG>(offset), - NULL, - FILE_BEGIN); + DWORD ret = SetFilePointer(sync_platform_file_, static_cast<LONG>(offset), + NULL, FILE_BEGIN); if (INVALID_SET_FILE_POINTER == ret) return false; @@ -164,13 +148,11 @@ bool File::Read(void* buffer, size_t buffer_len, size_t offset) { bool File::Write(const void* buffer, size_t buffer_len, size_t offset) { DCHECK(init_); - if (!mixed_ || buffer_len > ULONG_MAX || offset > ULONG_MAX) + if (buffer_len > ULONG_MAX || offset > ULONG_MAX) return false; - DWORD ret = SetFilePointer(sync_platform_file_, - static_cast<LONG>(offset), - NULL, - FILE_BEGIN); + DWORD ret = SetFilePointer(sync_platform_file_, static_cast<LONG>(offset), + NULL, FILE_BEGIN); if (INVALID_SET_FILE_POINTER == ret) return false; @@ -187,55 +169,38 @@ bool File::Write(const void* buffer, size_t buffer_len, size_t offset) { bool File::Read(void* buffer, size_t buffer_len, size_t offset, FileIOCallback* callback, bool* completed) { DCHECK(init_); + if (!callback) + return Read(buffer, buffer_len, offset); + if (buffer_len > ULONG_MAX || offset > ULONG_MAX) return false; - MyOverlapped* data = new MyOverlapped; - memset(data, 0, sizeof(*data)); - - SyncCallback local_callback; - data->overlapped.Offset = static_cast<DWORD>(offset); - data->callback = callback ? callback : &local_callback; - data->file = this; - + MyOverlapped* data = new MyOverlapped(this, offset, callback); DWORD size = static_cast<DWORD>(buffer_len); - AddRef(); - if (!ReadFileEx(platform_file_, buffer, size, &data->overlapped, - &IoCompletion)) { - Release(); + DWORD actual; + if (!ReadFile(platform_file_, buffer, size, &actual, data->overlapped())) { + *completed = false; + if (GetLastError() == ERROR_IO_PENDING) + return true; delete data; return false; } - if (callback) { - *completed = false; - // Let's check if the operation is already finished. - SleepEx(0, TRUE); - if (data->called) { - *completed = (data->actual_bytes == size); - DCHECK(data->actual_bytes == size); - delete data; - return *completed; - } - data->async = true; - } else { - // Invoke the callback and perform cleanup on the APC. - data->async = true; - int bytes_copied; - local_callback.WaitForResult(&bytes_copied); - if (static_cast<int>(buffer_len) != bytes_copied) { - NOTREACHED(); - return false; - } - } - - return true; + // The operation completed already. We'll be called back anyway. + *completed = (actual == size); + DCHECK(actual == size); + data->callback_ = NULL; + data->file_ = NULL; // There is no reason to hold on to this anymore. + return *completed; } bool File::Write(const void* buffer, size_t buffer_len, size_t offset, FileIOCallback* callback, bool* completed) { DCHECK(init_); + if (!callback) + return Write(buffer, buffer_len, offset); + return AsyncWrite(buffer, buffer_len, offset, true, callback, completed); } @@ -250,50 +215,32 @@ bool File::AsyncWrite(const void* buffer, size_t buffer_len, size_t offset, if (buffer_len > ULONG_MAX || offset > ULONG_MAX) return false; - MyOverlapped* data = new MyOverlapped; - memset(data, 0, sizeof(*data)); - - SyncCallback local_callback; - data->overlapped.Offset = static_cast<DWORD>(offset); - data->callback = callback ? callback : &local_callback; - data->file = this; - if (!callback && !notify) { - data->delete_buffer = true; - data->callback = NULL; - data->buffer = buffer; + MyOverlapped* data = new MyOverlapped(this, offset, callback); + bool dummy_completed; + if (!callback) { + DCHECK(!notify); + data->delete_buffer_ = true; + data->buffer_ = buffer; + completed = &dummy_completed; } DWORD size = static_cast<DWORD>(buffer_len); - AddRef(); - if (!WriteFileEx(platform_file_, buffer, size, &data->overlapped, - &IoCompletion)) { - Release(); + DWORD actual; + if (!WriteFile(platform_file_, buffer, size, &actual, data->overlapped())) { + *completed = false; + if (GetLastError() == ERROR_IO_PENDING) + return true; delete data; return false; } - if (callback) { - *completed = false; - SleepEx(0, TRUE); - if (data->called) { - *completed = (data->actual_bytes == size); - DCHECK(data->actual_bytes == size); - delete data; - return *completed; - } - data->async = true; - } else if (notify) { - data->async = true; - int bytes_copied; - local_callback.WaitForResult(&bytes_copied); - if (static_cast<int>(buffer_len) != bytes_copied) { - NOTREACHED(); - return false; - } - } - - return true; + // The operation completed already. We'll be called back anyway. + *completed = (actual == size); + DCHECK(actual == size); + data->callback_ = NULL; + data->file_ = NULL; // There is no reason to hold on to this anymore. + return *completed; } bool File::SetLength(size_t length) { diff --git a/net/disk_cache/mapped_file_unittest.cc b/net/disk_cache/mapped_file_unittest.cc index fe090e5..28eeff5 100644 --- a/net/disk_cache/mapped_file_unittest.cc +++ b/net/disk_cache/mapped_file_unittest.cc @@ -95,6 +95,8 @@ TEST_F(DiskCacheTest, MappedFile_AsyncIO) { g_cache_tests_max_id = 0; g_cache_tests_received = 0; + MessageLoopHelper helper; + char buffer1[20]; char buffer2[20]; CacheTestFillBuffer(buffer1, sizeof(buffer1), false); @@ -105,14 +107,14 @@ TEST_F(DiskCacheTest, MappedFile_AsyncIO) { int expected = completed ? 0 : 1; g_cache_tests_max_id = 1; - WaitForCallbacks(expected); + helper.WaitUntilCacheIoFinished(expected); EXPECT_TRUE(file->Read(buffer2, sizeof(buffer2), 1024 * 1024, &callback, &completed)); if (!completed) expected++; - WaitForCallbacks(expected); + helper.WaitUntilCacheIoFinished(expected); EXPECT_EQ(expected, g_cache_tests_received); EXPECT_FALSE(g_cache_tests_error); |