diff options
author | rvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-11-07 21:52:15 +0000 |
---|---|---|
committer | rvargas@google.com <rvargas@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-11-07 21:52:15 +0000 |
commit | 17b891482a081341470ead21ca7eda953d74dd69 (patch) | |
tree | 1ca1393109504064e44a31435714a238ca171477 | |
parent | 209c36546ae088f1cf76e7f72765ad92b3cdaa2e (diff) | |
download | chromium_src-17b891482a081341470ead21ca7eda953d74dd69.zip chromium_src-17b891482a081341470ead21ca7eda953d74dd69.tar.gz chromium_src-17b891482a081341470ead21ca7eda953d74dd69.tar.bz2 |
Switch MessagePumpForIO to use completion ports on Windows.
Cleanup the separation between MessagePumpForUI and
MessagePumpForIO, and convert the latter to use Completion
Ports instead of MsgWaitForMultipleobjects to sleep
when idle.
Remove all traces of Windows messages from MessagePumpForIO,
remove the transitional API of completion port notifications
and remove WatchObject API.
Modify all callers of RegisterIOHandler so that they are no
longer using RegisterIOContext, and also handle properly
the new semantics of completion ports (notifications even when
the IO completes immediately).
Add a new interface to allow proper cleanup of disk cache (to
replace code that was waiting for pending APCs from the destructor).
Add a way for the message pump to perform cleanup of abandoned IO.
BUG=B/1344358, 3497, 3630
TESt=unit tests
R=darin
Review URL: http://codereview.chromium.org/8156
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@5021 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | base/message_loop.cc | 11 | ||||
-rw-r--r-- | base/message_loop.h | 7 | ||||
-rw-r--r-- | base/message_loop_unittest.cc | 186 | ||||
-rw-r--r-- | base/message_pump_win.cc | 666 | ||||
-rw-r--r-- | base/message_pump_win.h | 290 | ||||
-rw-r--r-- | chrome/common/ipc_channel.cc | 144 | ||||
-rw-r--r-- | chrome/common/ipc_channel.h | 14 | ||||
-rw-r--r-- | chrome/common/ipc_sync_channel_unittest.cc | 8 | ||||
-rw-r--r-- | net/base/file_stream_win.cc | 73 | ||||
-rw-r--r-- | net/disk_cache/cache_util_win.cc | 12 | ||||
-rw-r--r-- | net/disk_cache/disk_cache_perftest.cc | 8 | ||||
-rw-r--r-- | net/disk_cache/disk_cache_test_base.cc | 5 | ||||
-rw-r--r-- | net/disk_cache/disk_cache_test_base.h | 14 | ||||
-rw-r--r-- | net/disk_cache/entry_impl.cc | 39 | ||||
-rw-r--r-- | net/disk_cache/file_win.cc | 265 | ||||
-rw-r--r-- | net/disk_cache/mapped_file_unittest.cc | 6 |
16 files changed, 816 insertions, 932 deletions
diff --git a/base/message_loop.cc b/base/message_loop.cc index 73cb7da..1145439 100644 --- a/base/message_loop.cc +++ b/base/message_loop.cc @@ -573,7 +573,7 @@ void MessageLoopForUI::DidProcessMessage(const MSG& message) { pump_win()->DidProcessMessage(message); } void MessageLoopForUI::PumpOutPendingPaintMessages() { - pump_win()->PumpOutPendingPaintMessages(); + pump_ui()->PumpOutPendingPaintMessages(); } #endif // defined(OS_WIN) @@ -583,17 +583,12 @@ void MessageLoopForUI::PumpOutPendingPaintMessages() { #if defined(OS_WIN) -void MessageLoopForIO::WatchObject(HANDLE object, Watcher* watcher) { - pump_io()->WatchObject(object, watcher); -} - void MessageLoopForIO::RegisterIOHandler(HANDLE file, IOHandler* handler) { pump_io()->RegisterIOHandler(file, handler); } -void MessageLoopForIO::RegisterIOContext(OVERLAPPED* context, - IOHandler* handler) { - pump_io()->RegisterIOContext(context, handler); +bool MessageLoopForIO::WaitForIOCompletion(DWORD timeout, IOHandler* filter) { + return pump_io()->WaitForIOCompletion(timeout, filter); } #elif defined(OS_POSIX) diff --git a/base/message_loop.h b/base/message_loop.h index 2d38853..8f16d8e 100644 --- a/base/message_loop.h +++ b/base/message_loop.h @@ -426,7 +426,7 @@ class MessageLoopForUI : public MessageLoop { protected: // TODO(rvargas): Make this platform independent. - base::MessagePumpWin* pump_ui() { + base::MessagePumpForUI* pump_ui() { return static_cast<base::MessagePumpForUI*>(pump_.get()); } #endif // defined(OS_WIN) @@ -458,13 +458,12 @@ class MessageLoopForIO : public MessageLoop { } #if defined(OS_WIN) - typedef base::MessagePumpForIO::Watcher Watcher; typedef base::MessagePumpForIO::IOHandler IOHandler; + typedef base::MessagePumpForIO::IOContext IOContext; // Please see MessagePumpWin for definitions of these methods. - void WatchObject(HANDLE object, Watcher* watcher); void RegisterIOHandler(HANDLE file_handle, IOHandler* handler); - void RegisterIOContext(OVERLAPPED* context, IOHandler* handler); + bool WaitForIOCompletion(DWORD timeout, IOHandler* filter); protected: // TODO(rvargas): Make this platform independent. diff --git a/base/message_loop_unittest.cc b/base/message_loop_unittest.cc index 83be2b1..d959659 100644 --- a/base/message_loop_unittest.cc +++ b/base/message_loop_unittest.cc @@ -1056,68 +1056,6 @@ void RunTest_NonNestableInNestedLoop(MessageLoop::Type message_loop_type) { #if defined(OS_WIN) -class AutoresetWatcher : public MessageLoopForIO::Watcher { - public: - explicit AutoresetWatcher(HANDLE signal) : signal_(signal) { - } - virtual void OnObjectSignaled(HANDLE object); - private: - HANDLE signal_; -}; - -void AutoresetWatcher::OnObjectSignaled(HANDLE object) { - MessageLoopForIO::current()->WatchObject(object, NULL); - ASSERT_TRUE(SetEvent(signal_)); -} - -class AutoresetTask : public Task { - public: - AutoresetTask(HANDLE object, MessageLoopForIO::Watcher* watcher) - : object_(object), watcher_(watcher) {} - virtual void Run() { - MessageLoopForIO::current()->WatchObject(object_, watcher_); - } - - private: - HANDLE object_; - MessageLoopForIO::Watcher* watcher_; -}; - -void RunTest_AutoresetEvents(MessageLoop::Type message_loop_type) { - MessageLoop loop(message_loop_type); - - SECURITY_ATTRIBUTES attributes; - attributes.nLength = sizeof(attributes); - attributes.bInheritHandle = false; - attributes.lpSecurityDescriptor = NULL; - - // Init an autoreset and a manual reset events. - HANDLE autoreset = CreateEvent(&attributes, FALSE, FALSE, NULL); - HANDLE callback_called = CreateEvent(&attributes, TRUE, FALSE, NULL); - ASSERT_TRUE(NULL != autoreset); - ASSERT_TRUE(NULL != callback_called); - - Thread thread("Autoreset test"); - Thread::Options options; - options.message_loop_type = message_loop_type; - ASSERT_TRUE(thread.StartWithOptions(options)); - - MessageLoop* thread_loop = thread.message_loop(); - ASSERT_TRUE(NULL != thread_loop); - - AutoresetWatcher watcher(callback_called); - AutoresetTask* task = new AutoresetTask(autoreset, &watcher); - thread_loop->PostTask(FROM_HERE, task); - Sleep(100); // Make sure the thread runs and sleeps for lack of work. - - ASSERT_TRUE(SetEvent(autoreset)); - - DWORD result = WaitForSingleObject(callback_called, 1000); - EXPECT_EQ(WAIT_OBJECT_0, result); - - thread.Stop(); -} - class DispatcherImpl : public MessageLoopForUI::Dispatcher { public: DispatcherImpl() : dispatch_count_(0) {} @@ -1150,68 +1088,68 @@ void RunTest_Dispatcher(MessageLoop::Type message_loop_type) { class TestIOHandler : public MessageLoopForIO::IOHandler { public: - TestIOHandler(const wchar_t* name, HANDLE signal); + TestIOHandler(const wchar_t* name, HANDLE signal, bool wait); - virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, - DWORD error); + virtual void OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_transfered, DWORD error); - HANDLE file() { return file_.Get(); } - void* buffer() { return buffer_; } - OVERLAPPED* context() { return &context_; } + void Init(); + void WaitForIO(); + OVERLAPPED* context() { return &context_.overlapped; } DWORD size() { return sizeof(buffer_); } private: char buffer_[48]; - OVERLAPPED context_; + MessageLoopForIO::IOContext context_; HANDLE signal_; ScopedHandle file_; - ScopedHandle event_; + bool wait_; }; -TestIOHandler::TestIOHandler(const wchar_t* name, HANDLE signal) - : signal_(signal) { +TestIOHandler::TestIOHandler(const wchar_t* name, HANDLE signal, bool wait) + : signal_(signal), wait_(wait) { memset(buffer_, 0, sizeof(buffer_)); memset(&context_, 0, sizeof(context_)); - - event_.Set(CreateEvent(NULL, TRUE, FALSE, NULL)); - EXPECT_TRUE(event_.IsValid()); - context_.hEvent = event_.Get(); + context_.handler = this; file_.Set(CreateFile(name, GENERIC_READ, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL)); EXPECT_TRUE(file_.IsValid()); } -void TestIOHandler::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, - DWORD error) { +void TestIOHandler::Init() { + MessageLoopForIO::current()->RegisterIOHandler(file_, this); + + DWORD read; + EXPECT_FALSE(ReadFile(file_, buffer_, size(), &read, context())); + EXPECT_EQ(ERROR_IO_PENDING, GetLastError()); + if (wait_) + WaitForIO(); +} + +void TestIOHandler::OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_transfered, DWORD error) { ASSERT_TRUE(context == &context_); - MessageLoopForIO::current()->RegisterIOContext(context, NULL); ASSERT_TRUE(SetEvent(signal_)); } +void TestIOHandler::WaitForIO() { + EXPECT_TRUE(MessageLoopForIO::current()->WaitForIOCompletion(300, this)); + EXPECT_TRUE(MessageLoopForIO::current()->WaitForIOCompletion(400, this)); +} + class IOHandlerTask : public Task { public: explicit IOHandlerTask(TestIOHandler* handler) : handler_(handler) {} - virtual void Run(); + virtual void Run() { + handler_->Init(); + } private: TestIOHandler* handler_; }; -void IOHandlerTask::Run() { - MessageLoopForIO::current()->RegisterIOHandler(handler_->file(), handler_); - MessageLoopForIO::current()->RegisterIOContext(handler_->context(), handler_); - - DWORD read; - EXPECT_FALSE(ReadFile(handler_->file(), handler_->buffer(), handler_->size(), - &read, handler_->context())); - EXPECT_EQ(ERROR_IO_PENDING, GetLastError()); -} - void RunTest_IOHandler() { - // This test requires an IO loop. - MessageLoop loop(MessageLoop::TYPE_IO); - ScopedHandle callback_called(CreateEvent(NULL, TRUE, FALSE, NULL)); ASSERT_TRUE(callback_called.IsValid()); @@ -1228,7 +1166,7 @@ void RunTest_IOHandler() { MessageLoop* thread_loop = thread.message_loop(); ASSERT_TRUE(NULL != thread_loop); - TestIOHandler handler(kPipeName, callback_called); + TestIOHandler handler(kPipeName, callback_called, false); IOHandlerTask* task = new IOHandlerTask(&handler); thread_loop->PostTask(FROM_HERE, task); Sleep(100); // Make sure the thread runs and sleeps for lack of work. @@ -1243,6 +1181,57 @@ void RunTest_IOHandler() { thread.Stop(); } +void RunTest_WaitForIO() { + ScopedHandle callback1_called(CreateEvent(NULL, TRUE, FALSE, NULL)); + ScopedHandle callback2_called(CreateEvent(NULL, TRUE, FALSE, NULL)); + ASSERT_TRUE(callback1_called.IsValid()); + ASSERT_TRUE(callback2_called.IsValid()); + + const wchar_t* kPipeName1 = L"\\\\.\\pipe\\iohandler_pipe1"; + const wchar_t* kPipeName2 = L"\\\\.\\pipe\\iohandler_pipe2"; + ScopedHandle server1(CreateNamedPipe(kPipeName1, PIPE_ACCESS_OUTBOUND, 0, 1, + 0, 0, 0, NULL)); + ScopedHandle server2(CreateNamedPipe(kPipeName2, PIPE_ACCESS_OUTBOUND, 0, 1, + 0, 0, 0, NULL)); + ASSERT_TRUE(server1.IsValid()); + ASSERT_TRUE(server2.IsValid()); + + Thread thread("IOHandler test"); + Thread::Options options; + options.message_loop_type = MessageLoop::TYPE_IO; + ASSERT_TRUE(thread.StartWithOptions(options)); + + MessageLoop* thread_loop = thread.message_loop(); + ASSERT_TRUE(NULL != thread_loop); + + TestIOHandler handler1(kPipeName1, callback1_called, false); + TestIOHandler handler2(kPipeName2, callback2_called, true); + IOHandlerTask* task1 = new IOHandlerTask(&handler1); + IOHandlerTask* task2 = new IOHandlerTask(&handler2); + thread_loop->PostTask(FROM_HERE, task1); + Sleep(100); // Make sure the thread runs and sleeps for lack of work. + thread_loop->PostTask(FROM_HERE, task2); + Sleep(100); + + // At this time handler1 is waiting to be called, and the thread is waiting + // on the Init method of handler2, filtering only handler2 callbacks. + + const char buffer[] = "Hello there!"; + DWORD written; + EXPECT_TRUE(WriteFile(server1, buffer, sizeof(buffer), &written, NULL)); + Sleep(200); + EXPECT_EQ(WAIT_TIMEOUT, WaitForSingleObject(callback1_called, 0)) << + "handler1 has not been called"; + + EXPECT_TRUE(WriteFile(server2, buffer, sizeof(buffer), &written, NULL)); + + HANDLE objects[2] = { callback1_called.Get(), callback2_called.Get() }; + DWORD result = WaitForMultipleObjects(2, objects, TRUE, 1000); + EXPECT_EQ(WAIT_OBJECT_0, result); + + thread.Stop(); +} + #endif // defined(OS_WIN) } // namespace @@ -1379,11 +1368,6 @@ TEST(MessageLoopTest, NonNestableInNestedLoop) { } #if defined(OS_WIN) -TEST(MessageLoopTest, AutoresetEvents) { - // This test requires an IO loop - RunTest_AutoresetEvents(MessageLoop::TYPE_IO); -} - TEST(MessageLoopTest, Dispatcher) { // This test requires a UI loop RunTest_Dispatcher(MessageLoop::TYPE_UI); @@ -1392,4 +1376,8 @@ TEST(MessageLoopTest, Dispatcher) { TEST(MessageLoopTest, IOHandler) { RunTest_IOHandler(); } + +TEST(MessageLoopTest, WaitForIO) { + RunTest_WaitForIO(); +} #endif // defined(OS_WIN) diff --git a/base/message_pump_win.cc b/base/message_pump_win.cc index 542fcf1..7ada5eb7 100644 --- a/base/message_pump_win.cc +++ b/base/message_pump_win.cc @@ -11,41 +11,6 @@ using base::Time; -namespace { - -class HandlerData : public base::MessagePumpForIO::Watcher { - public: - typedef base::MessagePumpForIO::IOHandler IOHandler; - HandlerData(OVERLAPPED* context, IOHandler* handler) - : context_(context), handler_(handler) {} - ~HandlerData() {} - - virtual void OnObjectSignaled(HANDLE object); - - private: - OVERLAPPED* context_; - IOHandler* handler_; - - DISALLOW_COPY_AND_ASSIGN(HandlerData); -}; - -void HandlerData::OnObjectSignaled(HANDLE object) { - DCHECK(object == context_->hEvent); - DWORD transfered; - DWORD error = ERROR_SUCCESS; - BOOL ret = GetOverlappedResult(NULL, context_, &transfered, FALSE); - if (!ret) { - error = GetLastError(); - DCHECK(ERROR_HANDLE_EOF == error || ERROR_BROKEN_PIPE == error); - transfered = 0; - } - - ResetEvent(context_->hEvent); - handler_->OnIOCompleted(context_, transfered, error); -} - -} // namespace - namespace base { static const wchar_t kWndClass[] = L"Chrome_MessagePumpWindow"; @@ -54,24 +19,9 @@ static const wchar_t kWndClass[] = L"Chrome_MessagePumpWindow"; // task (a series of such messages creates a continuous task pump). static const int kMsgHaveWork = WM_USER + 1; -#ifndef NDEBUG -// Force exercise of polling model. -static const int kMaxWaitObjects = 8; -#else -static const int kMaxWaitObjects = MAXIMUM_WAIT_OBJECTS; -#endif - //----------------------------------------------------------------------------- // MessagePumpWin public: -MessagePumpWin::MessagePumpWin() : have_work_(0), state_(NULL) { - InitMessageWnd(); -} - -MessagePumpWin::~MessagePumpWin() { - DestroyWindow(message_hwnd_); -} - void MessagePumpWin::AddObserver(Observer* observer) { observers_.AddObserver(observer); } @@ -88,36 +38,6 @@ void MessagePumpWin::DidProcessMessage(const MSG& msg) { FOR_EACH_OBSERVER(Observer, observers_, DidProcessMessage(msg)); } -void MessagePumpWin::PumpOutPendingPaintMessages() { - // If we are being called outside of the context of Run, then don't try to do - // any work. - if (!state_) - return; - - // Create a mini-message-pump to force immediate processing of only Windows - // WM_PAINT messages. Don't provide an infinite loop, but do enough peeking - // to get the job done. Actual common max is 4 peeks, but we'll be a little - // safe here. - const int kMaxPeekCount = 20; - bool win2k = win_util::GetWinVersion() <= win_util::WINVERSION_2000; - int peek_count; - for (peek_count = 0; peek_count < kMaxPeekCount; ++peek_count) { - MSG msg; - if (win2k) { - if (!PeekMessage(&msg, NULL, WM_PAINT, WM_PAINT, PM_REMOVE)) - break; - } else { - if (!PeekMessage(&msg, NULL, 0, 0, PM_REMOVE | PM_QS_PAINT)) - break; - } - ProcessMessageHelper(msg); - if (state_->should_quit) // Handle WM_QUIT. - break; - } - // Histogram what was really being used, to help to adjust kMaxPeekCount. - DHISTOGRAM_COUNTS(L"Loop.PumpOutPendingPaintMessages Peeks", peek_count); -} - void MessagePumpWin::RunWithDispatcher( Delegate* delegate, Dispatcher* dispatcher) { RunState s; @@ -139,7 +59,38 @@ void MessagePumpWin::Quit() { state_->should_quit = true; } -void MessagePumpWin::ScheduleWork() { +//----------------------------------------------------------------------------- +// MessagePumpWin protected: + +int MessagePumpWin::GetCurrentDelay() const { + if (delayed_work_time_.is_null()) + return -1; + + // Be careful here. TimeDelta has a precision of microseconds, but we want a + // value in milliseconds. If there are 5.5ms left, should the delay be 5 or + // 6? It should be 6 to avoid executing delayed work too early. + double timeout = ceil((delayed_work_time_ - Time::Now()).InMillisecondsF()); + + // If this value is negative, then we need to run delayed work soon. + int delay = static_cast<int>(timeout); + if (delay < 0) + delay = 0; + + return delay; +} + +//----------------------------------------------------------------------------- +// MessagePumpForUI public: + +MessagePumpForUI::MessagePumpForUI() { + InitMessageWnd(); +} + +MessagePumpForUI::~MessagePumpForUI() { + DestroyWindow(message_hwnd_); +} + +void MessagePumpForUI::ScheduleWork() { if (InterlockedExchange(&have_work_, 1)) return; // Someone else continued the pumping. @@ -147,7 +98,7 @@ void MessagePumpWin::ScheduleWork() { PostMessage(message_hwnd_, kMsgHaveWork, reinterpret_cast<WPARAM>(this), 0); } -void MessagePumpWin::ScheduleDelayedWork(const Time& delayed_work_time) { +void MessagePumpForUI::ScheduleDelayedWork(const Time& delayed_work_time) { // // We would *like* to provide high resolution timers. Windows timers using // SetTimer() have a 10ms granularity. We have to use WM_TIMER as a wakeup @@ -180,24 +131,110 @@ void MessagePumpWin::ScheduleDelayedWork(const Time& delayed_work_time) { SetTimer(message_hwnd_, reinterpret_cast<UINT_PTR>(this), delay_msec, NULL); } +void MessagePumpForUI::PumpOutPendingPaintMessages() { + // If we are being called outside of the context of Run, then don't try to do + // any work. + if (!state_) + return; + + // Create a mini-message-pump to force immediate processing of only Windows + // WM_PAINT messages. Don't provide an infinite loop, but do enough peeking + // to get the job done. Actual common max is 4 peeks, but we'll be a little + // safe here. + const int kMaxPeekCount = 20; + bool win2k = win_util::GetWinVersion() <= win_util::WINVERSION_2000; + int peek_count; + for (peek_count = 0; peek_count < kMaxPeekCount; ++peek_count) { + MSG msg; + if (win2k) { + if (!PeekMessage(&msg, NULL, WM_PAINT, WM_PAINT, PM_REMOVE)) + break; + } else { + if (!PeekMessage(&msg, NULL, 0, 0, PM_REMOVE | PM_QS_PAINT)) + break; + } + ProcessMessageHelper(msg); + if (state_->should_quit) // Handle WM_QUIT. + break; + } + // Histogram what was really being used, to help to adjust kMaxPeekCount. + DHISTOGRAM_COUNTS(L"Loop.PumpOutPendingPaintMessages Peeks", peek_count); +} + //----------------------------------------------------------------------------- -// MessagePumpWin protected: +// MessagePumpForUI private: // static -LRESULT CALLBACK MessagePumpWin::WndProcThunk( +LRESULT CALLBACK MessagePumpForUI::WndProcThunk( HWND hwnd, UINT message, WPARAM wparam, LPARAM lparam) { switch (message) { case kMsgHaveWork: - reinterpret_cast<MessagePumpWin*>(wparam)->HandleWorkMessage(); + reinterpret_cast<MessagePumpForUI*>(wparam)->HandleWorkMessage(); break; case WM_TIMER: - reinterpret_cast<MessagePumpWin*>(wparam)->HandleTimerMessage(); + reinterpret_cast<MessagePumpForUI*>(wparam)->HandleTimerMessage(); break; } return DefWindowProc(hwnd, message, wparam, lparam); } -void MessagePumpWin::InitMessageWnd() { +void MessagePumpForUI::DoRunLoop() { + // IF this was just a simple PeekMessage() loop (servicing all possible work + // queues), then Windows would try to achieve the following order according + // to MSDN documentation about PeekMessage with no filter): + // * Sent messages + // * Posted messages + // * Sent messages (again) + // * WM_PAINT messages + // * WM_TIMER messages + // + // Summary: none of the above classes is starved, and sent messages has twice + // the chance of being processed (i.e., reduced service time). + + for (;;) { + // If we do any work, we may create more messages etc., and more work may + // possibly be waiting in another task group. When we (for example) + // ProcessNextWindowsMessage(), there is a good chance there are still more + // messages waiting. On the other hand, when any of these methods return + // having done no work, then it is pretty unlikely that calling them again + // quickly will find any work to do. Finally, if they all say they had no + // work, then it is a good time to consider sleeping (waiting) for more + // work. + + bool more_work_is_plausible = ProcessNextWindowsMessage(); + if (state_->should_quit) + break; + + more_work_is_plausible |= state_->delegate->DoWork(); + if (state_->should_quit) + break; + + more_work_is_plausible |= + state_->delegate->DoDelayedWork(&delayed_work_time_); + // If we did not process any delayed work, then we can assume that our + // existing WM_TIMER if any will fire when delayed work should run. We + // don't want to disturb that timer if it is already in flight. However, + // if we did do all remaining delayed work, then lets kill the WM_TIMER. + if (more_work_is_plausible && delayed_work_time_.is_null()) + KillTimer(message_hwnd_, reinterpret_cast<UINT_PTR>(this)); + if (state_->should_quit) + break; + + if (more_work_is_plausible) + continue; + + more_work_is_plausible = state_->delegate->DoIdleWork(); + if (state_->should_quit) + break; + + if (more_work_is_plausible) + continue; + + WaitForWork(); // Wait (sleep) until we have work to do again. + } +} + +void MessagePumpForUI::InitMessageWnd() { HINSTANCE hinst = GetModuleHandle(NULL); WNDCLASSEX wc = {0}; @@ -212,7 +249,41 @@ void MessagePumpWin::InitMessageWnd() { DCHECK(message_hwnd_); } -void MessagePumpWin::HandleWorkMessage() { +void MessagePumpForUI::WaitForWork() { + // Wait until a message is available, up to the time needed by the timer + // manager to fire the next set of timers. + int delay = GetCurrentDelay(); + if (delay < 0) // Negative value means no timers waiting. + delay = INFINITE; + + DWORD result; + result = MsgWaitForMultipleObjectsEx(0, NULL, delay, QS_ALLINPUT, + MWMO_INPUTAVAILABLE); + + if (WAIT_OBJECT_0 == result) { + // A WM_* message is available. + // If a parent child relationship exists between windows across threads + // then their thread inputs are implicitly attached. + // This causes the MsgWaitForMultipleObjectsEx API to return indicating + // that messages are ready for processing (specifically mouse messages + // intended for the child window. Occurs if the child window has capture) + // The subsequent PeekMessages call fails to return any messages thus + // causing us to enter a tight loop at times. + // The WaitMessage call below is a workaround to give the child window + // sometime to process its input messages. + MSG msg = {0}; + DWORD queue_status = GetQueueStatus(QS_MOUSE); + if (HIWORD(queue_status) & QS_MOUSE && + !PeekMessage(&msg, NULL, WM_MOUSEFIRST, WM_MOUSELAST, PM_NOREMOVE)) { + WaitMessage(); + } + return; + } + + DCHECK_NE(WAIT_FAILED, result) << GetLastError(); +} + +void MessagePumpForUI::HandleWorkMessage() { // If we are being called outside of the context of Run, then don't try to do // any work. This could correspond to a MessageBox call or something of that // sort. @@ -233,7 +304,7 @@ void MessagePumpWin::HandleWorkMessage() { ScheduleWork(); } -void MessagePumpWin::HandleTimerMessage() { +void MessagePumpForUI::HandleTimerMessage() { KillTimer(message_hwnd_, reinterpret_cast<UINT_PTR>(this)); // If we are being called outside of the context of Run, then don't do @@ -249,7 +320,7 @@ void MessagePumpWin::HandleTimerMessage() { } } -bool MessagePumpWin::ProcessNextWindowsMessage() { +bool MessagePumpForUI::ProcessNextWindowsMessage() { // If there are sent messages in the queue then PeekMessage internally // dispatches the message and returns false. We return true in this // case to ensure that the message loop peeks again instead of calling @@ -266,7 +337,7 @@ bool MessagePumpWin::ProcessNextWindowsMessage() { return sent_messages_in_queue; } -bool MessagePumpWin::ProcessMessageHelper(const MSG& msg) { +bool MessagePumpForUI::ProcessMessageHelper(const MSG& msg) { if (WM_QUIT == msg.message) { // Repost the QUIT message so that it will be retrieved by the primary // GetMessage() loop. @@ -293,7 +364,7 @@ bool MessagePumpWin::ProcessMessageHelper(const MSG& msg) { return true; } -bool MessagePumpWin::ProcessPumpReplacementMessage() { +bool MessagePumpForUI::ProcessPumpReplacementMessage() { // When we encounter a kMsgHaveWork message, this method is called to peek // and process a replacement message, such as a WM_PAINT or WM_TIMER. The // goal is to make the kMsgHaveWork as non-intrusive as possible, even though @@ -307,7 +378,7 @@ bool MessagePumpWin::ProcessPumpReplacementMessage() { bool have_message = (0 != PeekMessage(&msg, NULL, 0, 0, PM_REMOVE)); DCHECK(!have_message || kMsgHaveWork != msg.message || msg.hwnd != message_hwnd_); - + // Since we discarded a kMsgHaveWork message, we must update the flag. InterlockedExchange(&have_work_, 0); @@ -318,233 +389,63 @@ bool MessagePumpWin::ProcessPumpReplacementMessage() { return have_message && ProcessMessageHelper(msg); } -int MessagePumpWin::GetCurrentDelay() const { - if (delayed_work_time_.is_null()) - return -1; - - // Be careful here. TimeDelta has a precision of microseconds, but we want a - // value in milliseconds. If there are 5.5ms left, should the delay be 5 or - // 6? It should be 6 to avoid executing delayed work too early. - double timeout = ceil((delayed_work_time_ - Time::Now()).InMillisecondsF()); - - // If this value is negative, then we need to run delayed work soon. - int delay = static_cast<int>(timeout); - if (delay < 0) - delay = 0; - - return delay; -} - //----------------------------------------------------------------------------- -// MessagePumpForUI private: - -void MessagePumpForUI::DoRunLoop() { - // IF this was just a simple PeekMessage() loop (servicing all possible work - // queues), then Windows would try to achieve the following order according - // to MSDN documentation about PeekMessage with no filter): - // * Sent messages - // * Posted messages - // * Sent messages (again) - // * WM_PAINT messages - // * WM_TIMER messages - // - // Summary: none of the above classes is starved, and sent messages has twice - // the chance of being processed (i.e., reduced service time). - - for (;;) { - // If we do any work, we may create more messages etc., and more work may - // possibly be waiting in another task group. When we (for example) - // ProcessNextWindowsMessage(), there is a good chance there are still more - // messages waiting. On the other hand, when any of these methods return - // having done no work, then it is pretty unlikely that calling them again - // quickly will find any work to do. Finally, if they all say they had no - // work, then it is a good time to consider sleeping (waiting) for more - // work. - - bool more_work_is_plausible = ProcessNextWindowsMessage(); - if (state_->should_quit) - break; - - more_work_is_plausible |= state_->delegate->DoWork(); - if (state_->should_quit) - break; - - more_work_is_plausible |= - state_->delegate->DoDelayedWork(&delayed_work_time_); - // If we did not process any delayed work, then we can assume that our - // existing WM_TIMER if any will fire when delayed work should run. We - // don't want to disturb that timer if it is already in flight. However, - // if we did do all remaining delayed work, then lets kill the WM_TIMER. - if (more_work_is_plausible && delayed_work_time_.is_null()) - KillTimer(message_hwnd_, reinterpret_cast<UINT_PTR>(this)); - if (state_->should_quit) - break; - - if (more_work_is_plausible) - continue; - - more_work_is_plausible = state_->delegate->DoIdleWork(); - if (state_->should_quit) - break; - - if (more_work_is_plausible) - continue; +// MessagePumpForIO public: - WaitForWork(); // Wait (sleep) until we have work to do again. - } +MessagePumpForIO::MessagePumpForIO() { + port_.Set(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1)); + DCHECK(port_.IsValid()); } -void MessagePumpForUI::WaitForWork() { - // Wait until a message is available, up to the time needed by the timer - // manager to fire the next set of timers. - int delay = GetCurrentDelay(); - if (delay < 0) // Negative value means no timers waiting. - delay = INFINITE; - - DWORD result; - result = MsgWaitForMultipleObjectsEx(0, NULL, delay, QS_ALLINPUT, - MWMO_INPUTAVAILABLE); - - if (WAIT_OBJECT_0 == result) { - // A WM_* message is available. - // If a parent child relationship exists between windows across threads - // then their thread inputs are implicitly attached. - // This causes the MsgWaitForMultipleObjectsEx API to return indicating - // that messages are ready for processing (specifically mouse messages - // intended for the child window. Occurs if the child window has capture) - // The subsequent PeekMessages call fails to return any messages thus - // causing us to enter a tight loop at times. - // The WaitMessage call below is a workaround to give the child window - // sometime to process its input messages. - MSG msg = {0}; - DWORD queue_status = GetQueueStatus(QS_MOUSE); - if (HIWORD(queue_status) & QS_MOUSE && - !PeekMessage(&msg, NULL, WM_MOUSEFIRST, WM_MOUSELAST, PM_NOREMOVE)) { - WaitMessage(); - } - return; - } +void MessagePumpForIO::ScheduleWork() { + if (InterlockedExchange(&have_work_, 1)) + return; // Someone else continued the pumping. - DCHECK_NE(WAIT_FAILED, result) << GetLastError(); + // Make sure the MessagePump does some work for us. + BOOL ret = PostQueuedCompletionStatus(port_, 0, + reinterpret_cast<ULONG_PTR>(this), + reinterpret_cast<OVERLAPPED*>(this)); + DCHECK(ret); } -//----------------------------------------------------------------------------- -// MessagePumpForIO public: - -void MessagePumpForIO::WatchObject(HANDLE object, Watcher* watcher) { - DCHECK(object); - DCHECK_NE(object, INVALID_HANDLE_VALUE); - - std::vector<HANDLE>::iterator it = - find(objects_.begin(), objects_.end(), object); - if (watcher) { - if (it == objects_.end()) { - static size_t warning_multiple = 1; - if (objects_.size() >= warning_multiple * MAXIMUM_WAIT_OBJECTS / 2) { - LOG(INFO) << "More than " << warning_multiple * MAXIMUM_WAIT_OBJECTS / 2 - << " objects being watched"; - // This DCHECK() is an artificial limitation, meant to warn us if we - // start creating too many objects. It can safely be raised to a higher - // level, and the program is designed to handle much larger values. - // Before raising this limit, make sure that there is a very good reason - // (in your debug testing) to be watching this many objects. - DCHECK(2 <= warning_multiple); - ++warning_multiple; - } - objects_.push_back(object); - watchers_.push_back(watcher); - } else { - watchers_[it - objects_.begin()] = watcher; - } - } else if (it != objects_.end()) { - std::vector<HANDLE>::difference_type index = it - objects_.begin(); - objects_.erase(it); - watchers_.erase(watchers_.begin() + index); - } +void MessagePumpForIO::ScheduleDelayedWork(const Time& delayed_work_time) { + // We know that we can't be blocked right now since this method can only be + // called on the same thread as Run, so we only need to update our record of + // how long to sleep when we do sleep. + delayed_work_time_ = delayed_work_time; } void MessagePumpForIO::RegisterIOHandler(HANDLE file_handle, IOHandler* handler) { -#if 0 - // TODO(rvargas): This is just to give an idea of what this code will look - // like when we actually move to completion ports. Of course, we cannot - // do this without calling GetQueuedCompletionStatus(). ULONG_PTR key = reinterpret_cast<ULONG_PTR>(handler); HANDLE port = CreateIoCompletionPort(file_handle, port_, key, 1); - if (!port_.IsValid()) - port_.Set(port); -#endif -} - -void MessagePumpForIO::RegisterIOContext(OVERLAPPED* context, - IOHandler* handler) { - DCHECK(context->hEvent); - if (handler) { - HandlerData* watcher = new HandlerData(context, handler); - WatchObject(context->hEvent, watcher); - } else { - std::vector<HANDLE>::iterator it = - find(objects_.begin(), objects_.end(), context->hEvent); - - if (it == objects_.end()) { - NOTREACHED(); - return; - } - - std::vector<HANDLE>::difference_type index = it - objects_.begin(); - objects_.erase(it); - delete watchers_[index]; - watchers_.erase(watchers_.begin() + index); - } + DCHECK(port == port_.Get()); } //----------------------------------------------------------------------------- // MessagePumpForIO private: void MessagePumpForIO::DoRunLoop() { - // IF this was just a simple PeekMessage() loop (servicing all possible work - // queues), then Windows would try to achieve the following order according - // to MSDN documentation about PeekMessage with no filter): - // * Sent messages - // * Posted messages - // * Sent messages (again) - // * WM_PAINT messages - // * WM_TIMER messages - // - // Summary: none of the above classes is starved, and sent messages has twice - // the chance of being processed (i.e., reduced service time). - for (;;) { // If we do any work, we may create more messages etc., and more work may // possibly be waiting in another task group. When we (for example) - // ProcessNextWindowsMessage(), there is a good chance there are still more - // messages waiting (same thing for ProcessNextObject(), which responds to - // only one signaled object; etc.). On the other hand, when any of these - // methods return having done no work, then it is pretty unlikely that - // calling them again quickly will find any work to do. Finally, if they - // all say they had no work, then it is a good time to consider sleeping - // (waiting) for more work. - - bool more_work_is_plausible = ProcessNextWindowsMessage(); - if (state_->should_quit) - break; + // WaitForIOCompletion(), there is a good chance there are still more + // messages waiting. On the other hand, when any of these methods return + // having done no work, then it is pretty unlikely that calling them + // again quickly will find any work to do. Finally, if they all say they + // had no work, then it is a good time to consider sleeping (waiting) for + // more work. - more_work_is_plausible |= state_->delegate->DoWork(); + bool more_work_is_plausible = state_->delegate->DoWork(); if (state_->should_quit) break; - more_work_is_plausible |= ProcessNextObject(); + more_work_is_plausible |= WaitForIOCompletion(0, NULL); if (state_->should_quit) break; more_work_is_plausible |= state_->delegate->DoDelayedWork(&delayed_work_time_); - // If we did not process any delayed work, then we can assume that our - // existing WM_TIMER if any will fire when delayed work should run. We - // don't want to disturb that timer if it is already in flight. However, - // if we did do all remaining delayed work, then lets kill the WM_TIMER. - if (more_work_is_plausible && delayed_work_time_.is_null()) - KillTimer(message_hwnd_, reinterpret_cast<UINT_PTR>(this)); if (state_->should_quit) break; @@ -558,141 +459,92 @@ void MessagePumpForIO::DoRunLoop() { if (more_work_is_plausible) continue; - // We service APCs in WaitForWork, without returning. WaitForWork(); // Wait (sleep) until we have work to do again. } } -// If we handle more than the OS limit on the number of objects that can be -// waited for, we'll need to poll (sequencing through subsets of the objects -// that can be passed in a single OS wait call). The following is the polling -// interval used in that (unusual) case. (I don't have a lot of justifcation -// for the specific value, but it needed to be short enough that it would not -// add a lot of latency, and long enough that we wouldn't thrash the CPU for no -// reason... especially considering the silly user probably has a million tabs -// open, etc.) -static const int kMultipleWaitPollingInterval = 20; - +// Wait until IO completes, up to the time needed by the timer manager to fire +// the next set of timers. void MessagePumpForIO::WaitForWork() { - // Wait until either an object is signaled or a message is available. Handle - // (without returning) any APCs (only the IO thread currently has APCs.) - - // We do not support nested message loops when we have watched objects. This - // is to avoid messy recursion problems. - DCHECK(objects_.empty() || state_->run_depth == 1) << - "Cannot nest a message loop when there are watched objects!"; + // We do not support nested IO message loops. This is to avoid messy + // recursion problems. + DCHECK(state_->run_depth == 1) << "Cannot nest an IO message loop!"; - int wait_flags = MWMO_ALERTABLE | MWMO_INPUTAVAILABLE; + int timeout = GetCurrentDelay(); + if (timeout < 0) // Negative value means no timers waiting. + timeout = INFINITE; - bool use_polling = false; // Poll if too many objects for one OS Wait call. - for (;;) { - // Do initialization here, in case APC modifies object list. - size_t total_objs = objects_.size(); - - int delay; - size_t polling_index = 0; // The first unprocessed object index. - do { - size_t objs_len = - (polling_index < total_objs) ? total_objs - polling_index : 0; - if (objs_len >= MAXIMUM_WAIT_OBJECTS) { - objs_len = MAXIMUM_WAIT_OBJECTS - 1; - use_polling = true; - } - HANDLE* objs = objs_len ? polling_index + &objects_.front() : NULL; - - // Only wait up to the time needed by the timer manager to fire the next - // set of timers. - delay = GetCurrentDelay(); - if (use_polling && delay > kMultipleWaitPollingInterval) - delay = kMultipleWaitPollingInterval; - if (delay < 0) // Negative value means no timers waiting. - delay = INFINITE; - - DWORD result; - result = MsgWaitForMultipleObjectsEx(static_cast<DWORD>(objs_len), objs, - delay, QS_ALLINPUT, wait_flags); - - if (WAIT_IO_COMPLETION == result) { - // We'll loop here when we service an APC. At it currently stands, - // *ONLY* the IO thread uses *any* APCs, so this should have no impact - // on the UI thread. - break; // Break to outer loop, and waitforwork() again. - } - - // Use unsigned type to simplify range detection; - size_t signaled_index = result - WAIT_OBJECT_0; - if (signaled_index < objs_len) { - SignalWatcher(polling_index + signaled_index); - return; // We serviced a signaled object. - } - - if (objs_len == signaled_index) - return; // A WM_* message is available. - - DCHECK_NE(WAIT_FAILED, result) << GetLastError(); - - DCHECK(!objs || result == WAIT_TIMEOUT); - if (!use_polling) - return; - polling_index += objs_len; - } while (polling_index < total_objs); - // For compatibility, we didn't return sooner. This made us do *some* wait - // call(s) before returning. This will probably change in next rev. - if (!delay || !GetCurrentDelay()) - return; // No work done, but timer is ready to fire. - } + WaitForIOCompletion(timeout, NULL); } -bool MessagePumpForIO::ProcessNextObject() { - size_t total_objs = objects_.size(); - if (!total_objs) { - return false; +bool MessagePumpForIO::WaitForIOCompletion(DWORD timeout, IOHandler* filter) { + IOItem item; + if (completed_io_.empty() || !MatchCompletedIOItem(filter, &item)) { + // We have to ask the system for another IO completion. + if (!GetIOItem(timeout, &item)) + return false; + + if (ProcessInternalIOItem(item)) + return true; } - size_t polling_index = 0; // The first unprocessed object index. - do { - DCHECK(polling_index < total_objs); - size_t objs_len = total_objs - polling_index; - if (objs_len >= kMaxWaitObjects) - objs_len = kMaxWaitObjects - 1; - HANDLE* objs = polling_index + &objects_.front(); - - // Identify 1 pending object, or allow an IO APC to be completed. - DWORD result = WaitForMultipleObjectsEx(static_cast<DWORD>(objs_len), objs, - FALSE, // 1 signal is sufficient. - 0, // Wait 0ms. - false); // Not alertable (no APC). - - // Use unsigned type to simplify range detection; - size_t signaled_index = result - WAIT_OBJECT_0; - if (signaled_index < objs_len) { - SignalWatcher(polling_index + signaled_index); - return true; // We serviced a signaled object. + if (item.context->handler) { + if (filter && item.handler != filter) { + // Save this item for later + completed_io_.push_back(item); + } else { + DCHECK(item.context->handler == item.handler); + item.handler->OnIOCompleted(item.context, item.bytes_transfered, + item.error); } - - // If an handle is invalid, it will be WAIT_FAILED. - DCHECK_EQ(WAIT_TIMEOUT, result) << GetLastError(); - polling_index += objs_len; - } while (polling_index < total_objs); - return false; // We serviced nothing. + } else { + // The handler must be gone by now, just cleanup the mess. + delete item.context; + } + return true; } -bool MessagePumpForIO::SignalWatcher(size_t object_index) { - // Signal the watcher corresponding to the given index. - - DCHECK(objects_.size() > object_index); +// Asks the OS for another IO completion result. +bool MessagePumpForIO::GetIOItem(DWORD timeout, IOItem* item) { + memset(item, 0, sizeof(*item)); + ULONG_PTR key = NULL; + OVERLAPPED* overlapped = NULL; + if (!GetQueuedCompletionStatus(port_.Get(), &item->bytes_transfered, &key, + &overlapped, timeout)) { + if (!overlapped) + return false; // Nothing in the queue. + item->error = GetLastError(); + item->bytes_transfered = 0; + } - // On reception of OnObjectSignaled() to a Watcher object, it may call - // WatchObject(). watchers_ and objects_ will be modified. This is expected, - // so don't be afraid if, while tracing a OnObjectSignaled() function, the - // corresponding watchers_[result] is non-existant. - watchers_[object_index]->OnObjectSignaled(objects_[object_index]); + item->handler = reinterpret_cast<IOHandler*>(key); + item->context = reinterpret_cast<IOContext*>(overlapped); + return true; +} - // Signaled objects tend to be removed from the watch list, and then added - // back (appended). As a result, they move to the end of the objects_ array, - // and this should make their service "fair" (no HANDLEs should be starved). +bool MessagePumpForIO::ProcessInternalIOItem(const IOItem& item) { + if (this == reinterpret_cast<MessagePumpForIO*>(item.context) && + this == reinterpret_cast<MessagePumpForIO*>(item.handler)) { + // This is our internal completion. + DCHECK(!item.bytes_transfered); + InterlockedExchange(&have_work_, 0); + return true; + } + return false; +} - return true; +// Returns a completion item that was previously received. +bool MessagePumpForIO::MatchCompletedIOItem(IOHandler* filter, IOItem* item) { + DCHECK(!completed_io_.empty()); + for (std::list<IOItem>::iterator it = completed_io_.begin(); + it != completed_io_.end(); ++it) { + if (!filter || it->handler == filter) { + *item = *it; + completed_io_.erase(it); + return true; + } + } + return false; } } // namespace base diff --git a/base/message_pump_win.h b/base/message_pump_win.h index eb4253f..63222a8 100644 --- a/base/message_pump_win.h +++ b/base/message_pump_win.h @@ -5,10 +5,10 @@ #ifndef BASE_MESSAGE_PUMP_WIN_H_ #define BASE_MESSAGE_PUMP_WIN_H_ -#include <vector> - #include <windows.h> +#include <list> + #include "base/lock.h" #include "base/message_pump.h" #include "base/observer_list.h" @@ -17,50 +17,9 @@ namespace base { -// MessagePumpWin implements a "traditional" Windows message pump. It contains -// a nearly infinite loop that peeks out messages, and then dispatches them. -// Intermixed with those peeks are callouts to DoWork for pending tasks, -// DoDelayedWork for pending timers, and OnObjectSignaled for signaled objects. -// When there are no events to be serviced, this pump goes into a wait state. -// In most cases, this message pump handles all processing. -// -// However, when a task, or windows event, invokes on the stack a native dialog -// box or such, that window typically provides a bare bones (native?) message -// pump. That bare-bones message pump generally supports little more than a -// peek of the Windows message queue, followed by a dispatch of the peeked -// message. MessageLoop extends that bare-bones message pump to also service -// Tasks, at the cost of some complexity. -// -// The basic structure of the extension (refered to as a sub-pump) is that a -// special message, kMsgHaveWork, is repeatedly injected into the Windows -// Message queue. Each time the kMsgHaveWork message is peeked, checks are -// made for an extended set of events, including the availability of Tasks to -// run. -// -// After running a task, the special message kMsgHaveWork is again posted to -// the Windows Message queue, ensuring a future time slice for processing a -// future event. To prevent flooding the Windows Message queue, care is taken -// to be sure that at most one kMsgHaveWork message is EVER pending in the -// Window's Message queue. -// -// There are a few additional complexities in this system where, when there are -// no Tasks to run, this otherwise infinite stream of messages which drives the -// sub-pump is halted. The pump is automatically re-started when Tasks are -// queued. -// -// A second complexity is that the presence of this stream of posted tasks may -// prevent a bare-bones message pump from ever peeking a WM_PAINT or WM_TIMER. -// Such paint and timer events always give priority to a posted message, such as -// kMsgHaveWork messages. As a result, care is taken to do some peeking in -// between the posting of each kMsgHaveWork message (i.e., after kMsgHaveWork -// is peeked, and before a replacement kMsgHaveWork is posted). -// -// NOTE: Although it may seem odd that messages are used to start and stop this -// flow (as opposed to signaling objects, etc.), it should be understood that -// the native message pump will *only* respond to messages. As a result, it is -// an excellent choice. It is also helpful that the starter messages that are -// placed in the queue when new task arrive also awakens DoRunLoop. -// +// MessagePumpWin serves as the base for specialized versions of the MessagePump +// for Windows. It provides basic functionality like handling of observers and +// controlling the lifetime of the message pump. class MessagePumpWin : public MessagePump { public: // An Observer is an object that receives global notifications from the @@ -97,8 +56,8 @@ class MessagePumpWin : public MessagePump { virtual bool Dispatch(const MSG& msg) = 0; }; - MessagePumpWin(); - virtual ~MessagePumpWin(); + MessagePumpWin() : have_work_(0), state_(NULL) {} + virtual ~MessagePumpWin() {} // Add an Observer, which will start receiving notifications immediately. void AddObserver(Observer* observer); @@ -112,19 +71,12 @@ class MessagePumpWin : public MessagePump { void WillProcessMessage(const MSG& msg); void DidProcessMessage(const MSG& msg); - // Applications can call this to encourage us to process all pending WM_PAINT - // messages. This method will process all paint messages the Windows Message - // queue can provide, up to some fixed number (to avoid any infinite loops). - void PumpOutPendingPaintMessages(); - // Like MessagePump::Run, but MSG objects are routed through dispatcher. void RunWithDispatcher(Delegate* delegate, Dispatcher* dispatcher); // MessagePump methods: virtual void Run(Delegate* delegate) { RunWithDispatcher(delegate, NULL); } virtual void Quit(); - virtual void ScheduleWork(); - virtual void ScheduleDelayedWork(const Time& delayed_work_time); protected: struct RunState { @@ -138,20 +90,9 @@ class MessagePumpWin : public MessagePump { int run_depth; }; - static LRESULT CALLBACK WndProcThunk( - HWND hwnd, UINT message, WPARAM wparam, LPARAM lparam); virtual void DoRunLoop() = 0; - void InitMessageWnd(); - void HandleWorkMessage(); - void HandleTimerMessage(); - bool ProcessNextWindowsMessage(); - bool ProcessMessageHelper(const MSG& msg); - bool ProcessPumpReplacementMessage(); int GetCurrentDelay() const; - // A hidden message-only window. - HWND message_hwnd_; - ObserverList<Observer> observers_; // The time at which delayed work should run. @@ -170,33 +111,164 @@ class MessagePumpWin : public MessagePump { // MessagePumpForUI extends MessagePumpWin with methods that are particular to a // MessageLoop instantiated with TYPE_UI. // +// MessagePumpForUI implements a "traditional" Windows message pump. It contains +// a nearly infinite loop that peeks out messages, and then dispatches them. +// Intermixed with those peeks are callouts to DoWork for pending tasks, and +// DoDelayedWork for pending timers. When there are no events to be serviced, +// this pump goes into a wait state. In most cases, this message pump handles +// all processing. +// +// However, when a task, or windows event, invokes on the stack a native dialog +// box or such, that window typically provides a bare bones (native?) message +// pump. That bare-bones message pump generally supports little more than a +// peek of the Windows message queue, followed by a dispatch of the peeked +// message. MessageLoop extends that bare-bones message pump to also service +// Tasks, at the cost of some complexity. +// +// The basic structure of the extension (refered to as a sub-pump) is that a +// special message, kMsgHaveWork, is repeatedly injected into the Windows +// Message queue. Each time the kMsgHaveWork message is peeked, checks are +// made for an extended set of events, including the availability of Tasks to +// run. +// +// After running a task, the special message kMsgHaveWork is again posted to +// the Windows Message queue, ensuring a future time slice for processing a +// future event. To prevent flooding the Windows Message queue, care is taken +// to be sure that at most one kMsgHaveWork message is EVER pending in the +// Window's Message queue. +// +// There are a few additional complexities in this system where, when there are +// no Tasks to run, this otherwise infinite stream of messages which drives the +// sub-pump is halted. The pump is automatically re-started when Tasks are +// queued. +// +// A second complexity is that the presence of this stream of posted tasks may +// prevent a bare-bones message pump from ever peeking a WM_PAINT or WM_TIMER. +// Such paint and timer events always give priority to a posted message, such as +// kMsgHaveWork messages. As a result, care is taken to do some peeking in +// between the posting of each kMsgHaveWork message (i.e., after kMsgHaveWork +// is peeked, and before a replacement kMsgHaveWork is posted). +// +// NOTE: Although it may seem odd that messages are used to start and stop this +// flow (as opposed to signaling objects, etc.), it should be understood that +// the native message pump will *only* respond to messages. As a result, it is +// an excellent choice. It is also helpful that the starter messages that are +// placed in the queue when new task arrive also awakens DoRunLoop. +// class MessagePumpForUI : public MessagePumpWin { public: - MessagePumpForUI() {} - virtual ~MessagePumpForUI() {} + MessagePumpForUI(); + virtual ~MessagePumpForUI(); + + // MessagePump methods: + virtual void ScheduleWork(); + virtual void ScheduleDelayedWork(const Time& delayed_work_time); + + // Applications can call this to encourage us to process all pending WM_PAINT + // messages. This method will process all paint messages the Windows Message + // queue can provide, up to some fixed number (to avoid any infinite loops). + void PumpOutPendingPaintMessages(); + private: + static LRESULT CALLBACK WndProcThunk( + HWND hwnd, UINT message, WPARAM wparam, LPARAM lparam); virtual void DoRunLoop(); + void InitMessageWnd(); void WaitForWork(); + void HandleWorkMessage(); + void HandleTimerMessage(); + bool ProcessNextWindowsMessage(); + bool ProcessMessageHelper(const MSG& msg); + bool ProcessPumpReplacementMessage(); + + // A hidden message-only window. + HWND message_hwnd_; }; //----------------------------------------------------------------------------- // MessagePumpForIO extends MessagePumpWin with methods that are particular to a -// MessageLoop instantiated with TYPE_IO. +// MessageLoop instantiated with TYPE_IO. This version of MessagePump does not +// deal with Windows mesagges, and instead has a Run loop based on Completion +// Ports so it is better suited for IO operations. // class MessagePumpForIO : public MessagePumpWin { public: - // Used with WatchObject to asynchronously monitor the signaled state of a - // HANDLE object. - class Watcher { - public: - virtual ~Watcher() {} - // Called from MessageLoop::Run when a signalled object is detected. - virtual void OnObjectSignaled(HANDLE object) = 0; - }; + struct IOContext; // Clients interested in receiving OS notifications when asynchronous IO // operations complete should implement this interface and register themselves // with the message pump. + // + // Typical use #1: + // // Use only when there are no user's buffers involved on the actual IO, + // // so that all the cleanup can be done by the message pump. + // class MyFile : public IOHandler { + // MyFile() { + // ... + // context_ = new IOContext; + // context_->handler = this; + // message_pump->RegisterIOHandler(file_, this); + // } + // ~MyFile() { + // if (pending_) { + // // By setting the handler to NULL, we're asking for this context + // // to be deleted when received, without calling back to us. + // context_->handler = NULL; + // } else { + // delete context_; + // } + // } + // virtual void OnIOCompleted(IOContext* context, DWORD bytes_transfered, + // DWORD error) { + // pending_ = false; + // } + // void DoSomeIo() { + // ... + // // The only buffer required for this operation is the overlapped + // // structure. + // ConnectNamedPipe(file_, &context_->overlapped); + // pending_ = true; + // } + // bool pending_; + // IOContext* context_; + // HANDLE file_; + // }; + // + // Typical use #2: + // class MyFile : public IOHandler { + // MyFile() { + // ... + // message_pump->RegisterIOHandler(file_, this); + // } + // // Plus some code to make sure that this destructor is not called + // // while there are pending IO operations. + // ~MyFile() { + // } + // virtual void OnIOCompleted(IOContext* context, DWORD bytes_transfered, + // DWORD error) { + // ... + // delete context; + // } + // void DoSomeIo() { + // ... + // IOContext* context = new IOContext; + // // This is not used for anything. It just prevents the context from + // // being considered "abandoned". + // context->handler = this; + // ReadFile(file_, buffer, num_bytes, &read, &context->overlapped); + // } + // HANDLE file_; + // }; + // + // Typical use #3: + // Same as the previous example, except that in order to deal with the + // requirement stated for the destructor, the class calls WaitForIOCompletion + // from the destructor to block until all IO finishes. + // ~MyFile() { + // while(pending_) + // message_pump->WaitForIOCompletion(INFINITE, this); + // } + // class IOHandler { public: virtual ~IOHandler() {} @@ -204,46 +276,66 @@ class MessagePumpForIO : public MessagePumpWin { // |context| completes. |error| is the Win32 error code of the IO operation // (ERROR_SUCCESS if there was no error). |bytes_transfered| will be zero // on error. - virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, + virtual void OnIOCompleted(IOContext* context, DWORD bytes_transfered, DWORD error) = 0; }; - MessagePumpForIO() {} + // The extended context that should be used as the base structure on every + // overlapped IO operation. |handler| must be set to the registered IOHandler + // for the given file when the operation is started, and it can be set to NULL + // before the operation completes to indicate that the handler should not be + // called anymore, and instead, the IOContext should be deleted when the OS + // notifies the completion of this operation. Please remember that any buffers + // involved with an IO operation should be around until the callback is + // received, so this technique can only be used for IO that do not involve + // additional buffers (other than the overlapped structure itself). + struct IOContext { + OVERLAPPED overlapped; + IOHandler* handler; + }; + + MessagePumpForIO(); virtual ~MessagePumpForIO() {} - // Have the current thread's message loop watch for a signaled object. - // Pass a null watcher to stop watching the object. - void WatchObject(HANDLE, Watcher*); + // MessagePump methods: + virtual void ScheduleWork(); + virtual void ScheduleDelayedWork(const Time& delayed_work_time); // Register the handler to be used when asynchronous IO for the given file // completes. The registration persists as long as |file_handle| is valid, so // |handler| must be valid as long as there is pending IO for the given file. void RegisterIOHandler(HANDLE file_handle, IOHandler* handler); - // This is just a throw away function to ease transition to completion ports. - // Pass NULL for handler to stop tracking this request. WARNING: cancellation - // correctness is the responsibility of the caller. |context| must contain a - // valid manual reset event, but the caller should not interact directly with - // it. The registration can live across a single IO operation, or it can live - // across multiple IO operations without having to reset it after each IO - // completion callback. Internally, there will be a WatchObject registration - // alive as long as this context registration is in effect. It is an error - // to unregister a context that has not been registered before. - void RegisterIOContext(OVERLAPPED* context, IOHandler* handler); + // Waits for the next IO completion that should be processed by |filter|, for + // up to |timeout| milliseconds. Return true if any IO operation completed, + // regardless of the involved handler, and false if the timeout expired. If + // the completion port received any message and the involved IO handler + // matches |filter|, the callback is called before returning from this code; + // if the handler is not the one that we are looking for, the callback will + // be postponed for another time, so reentrancy problems can be avoided. + // External use of this method should be reserved for the rare case when the + // caller is willing to allow pausing regular task dispatching on this thread. + bool WaitForIOCompletion(DWORD timeout, IOHandler* filter); private: + struct IOItem { + IOHandler* handler; + IOContext* context; + DWORD bytes_transfered; + DWORD error; + }; + virtual void DoRunLoop(); void WaitForWork(); - bool ProcessNextObject(); - bool SignalWatcher(size_t object_index); - - // A vector of objects (and corresponding watchers) that are routinely - // serviced by this message pump. - std::vector<HANDLE> objects_; - std::vector<Watcher*> watchers_; + bool MatchCompletedIOItem(IOHandler* filter, IOItem* item); + bool GetIOItem(DWORD timeout, IOItem* item); + bool ProcessInternalIOItem(const IOItem& item); // The completion port associated with this thread. ScopedHandle port_; + // This list will be empty almost always. It stores IO completions that have + // not been delivered yet because somebody was doing cleanup. + std::list<IOItem> completed_io_; }; } // namespace base diff --git a/chrome/common/ipc_channel.cc b/chrome/common/ipc_channel.cc index a9d73e7..934ea9b 100644 --- a/chrome/common/ipc_channel.cc +++ b/chrome/common/ipc_channel.cc @@ -20,24 +20,21 @@ namespace IPC { //------------------------------------------------------------------------------ -Channel::State::State() - : is_pending(false) { - memset(&overlapped, 0, sizeof(overlapped)); - overlapped.hEvent = CreateEvent(NULL, // default security attributes - TRUE, // manual-reset event - TRUE, // initial state = signaled - NULL); // unnamed event object +Channel::State::State(Channel* channel) : is_pending(false) { + memset(&context.overlapped, 0, sizeof(context.overlapped)); + context.handler = channel; } Channel::State::~State() { - if (overlapped.hEvent) - CloseHandle(overlapped.hEvent); + COMPILE_ASSERT(!offsetof(Channel::State, context), starts_with_io_context); } //------------------------------------------------------------------------------ Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener) - : pipe_(INVALID_HANDLE_VALUE), + : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)), + ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)), + pipe_(INVALID_HANDLE_VALUE), listener_(listener), waiting_connect_(mode == MODE_SERVER), processing_incoming_(false), @@ -50,23 +47,29 @@ Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener) } void Channel::Close() { - // make sure we are no longer watching the pipe events - MessageLoopForIO* loop = MessageLoopForIO::current(); - if (input_state_.is_pending) { - input_state_.is_pending = false; - loop->RegisterIOContext(&input_state_.overlapped, NULL); - } - - if (output_state_.is_pending) { - output_state_.is_pending = false; - loop->RegisterIOContext(&output_state_.overlapped, NULL); + bool waited = false; + if (input_state_.is_pending || output_state_.is_pending) { + CancelIo(pipe_); + waited = true; } + // Closing the handle at this point prevents us from issuing more requests + // form OnIOCompleted(). if (pipe_ != INVALID_HANDLE_VALUE) { CloseHandle(pipe_); pipe_ = INVALID_HANDLE_VALUE; } + // Make sure all IO has completed. + base::Time start = base::Time::Now(); + while (input_state_.is_pending || output_state_.is_pending) { + MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); + } + if (waited) { + // We want to see if we block the message loop for too long. + UMA_HISTOGRAM_TIMES(L"AsyncIO.IPCChannelClose", base::Time::Now() - start); + } + while (!output_queue_.empty()) { Message* m = output_queue_.front(); output_queue_.pop(); @@ -175,7 +178,7 @@ bool Channel::Connect() { // to true, we indicate to OnIOCompleted that this is the special // initialization signal. MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod( - &Channel::OnIOCompleted, &input_state_.overlapped, 0, 0)); + &Channel::OnIOCompleted, &input_state_.context, 0, 0)); } if (!waiting_connect_) @@ -184,15 +187,14 @@ bool Channel::Connect() { } bool Channel::ProcessConnection() { - if (input_state_.is_pending) { + if (input_state_.is_pending) input_state_.is_pending = false; - MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped, - NULL); - } // Do we have a client connected to our pipe? - DCHECK(pipe_ != INVALID_HANDLE_VALUE); - BOOL ok = ConnectNamedPipe(pipe_, &input_state_.overlapped); + if (INVALID_HANDLE_VALUE == pipe_) + return false; + + BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped); DWORD err = GetLastError(); if (ok) { @@ -205,8 +207,6 @@ bool Channel::ProcessConnection() { switch (err) { case ERROR_IO_PENDING: input_state_.is_pending = true; - MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped, - this); break; case ERROR_PIPE_CONNECTED: waiting_connect_ = false; @@ -219,40 +219,41 @@ bool Channel::ProcessConnection() { return true; } -bool Channel::ProcessIncomingMessages(OVERLAPPED* context, +bool Channel::ProcessIncomingMessages(MessageLoopForIO::IOContext* context, DWORD bytes_read) { if (input_state_.is_pending) { input_state_.is_pending = false; DCHECK(context); - MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped, - NULL); if (!context || !bytes_read) return false; } else { // This happens at channel initialization. - DCHECK(!bytes_read && context == &input_state_.overlapped); + DCHECK(!bytes_read && context == &input_state_.context); } for (;;) { if (bytes_read == 0) { + if (INVALID_HANDLE_VALUE == pipe_) + return false; + // Read from pipe... BOOL ok = ReadFile(pipe_, input_buf_, BUF_SIZE, &bytes_read, - &input_state_.overlapped); + &input_state_.context.overlapped); if (!ok) { DWORD err = GetLastError(); if (err == ERROR_IO_PENDING) { - MessageLoopForIO::current()->RegisterIOContext( - &input_state_.overlapped, this); input_state_.is_pending = true; return true; } LOG(ERROR) << "pipe error: " << err; return false; } + input_state_.is_pending = true; + return true; } DCHECK(bytes_read); @@ -303,15 +304,13 @@ bool Channel::ProcessIncomingMessages(OVERLAPPED* context, return true; } -bool Channel::ProcessOutgoingMessages(OVERLAPPED* context, +bool Channel::ProcessOutgoingMessages(MessageLoopForIO::IOContext* context, DWORD bytes_written) { DCHECK(!waiting_connect_); // Why are we trying to send messages if there's // no connection? if (output_state_.is_pending) { DCHECK(context); - MessageLoopForIO::current()->RegisterIOContext(&output_state_.overlapped, - NULL); output_state_.is_pending = false; if (!context || bytes_written == 0) { DWORD err = GetLastError(); @@ -325,42 +324,41 @@ bool Channel::ProcessOutgoingMessages(OVERLAPPED* context, delete m; } - while (!output_queue_.empty()) { - // Write to pipe... - Message* m = output_queue_.front(); - BOOL ok = WriteFile(pipe_, - m->data(), - m->size(), - &bytes_written, - &output_state_.overlapped); - if (!ok) { - DWORD err = GetLastError(); - if (err == ERROR_IO_PENDING) { - MessageLoopForIO::current()->RegisterIOContext( - &output_state_.overlapped, this); - output_state_.is_pending = true; + if (output_queue_.empty()) + return true; + + if (INVALID_HANDLE_VALUE == pipe_) + return false; + + // Write to pipe... + Message* m = output_queue_.front(); + BOOL ok = WriteFile(pipe_, + m->data(), + m->size(), + &bytes_written, + &output_state_.context.overlapped); + if (!ok) { + DWORD err = GetLastError(); + if (err == ERROR_IO_PENDING) { + output_state_.is_pending = true; #ifdef IPC_MESSAGE_DEBUG_EXTRA - DLOG(INFO) << "sent pending message @" << m << " on channel @" << - this << " with type " << m->type(); + DLOG(INFO) << "sent pending message @" << m << " on channel @" << + this << " with type " << m->type(); #endif - return true; - } - LOG(ERROR) << "pipe error: " << err; - return false; + return true; } - DCHECK(bytes_written == m->size()); - output_queue_.pop(); + LOG(ERROR) << "pipe error: " << err; + return false; + } #ifdef IPC_MESSAGE_DEBUG_EXTRA - DLOG(INFO) << "sent message @" << m << " on channel @" << this << - " with type " << m->type(); + DLOG(INFO) << "sent message @" << m << " on channel @" << this << + " with type " << m->type(); #endif - delete m; - } - + output_state_.is_pending = true; return true; } @@ -400,12 +398,13 @@ bool Channel::ProcessPendingMessages(DWORD max_wait_msec) { #endif } -void Channel::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, - DWORD error) { +void Channel::OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_transfered, DWORD error) { bool ok; - if (context == &input_state_.overlapped) { + if (context == &input_state_.context) { if (waiting_connect_) { - ProcessConnection(); + if (!ProcessConnection()) + return; // We may have some messages queued up to send... if (!output_queue_.empty() && !output_state_.is_pending) ProcessOutgoingMessages(NULL, 0); @@ -419,10 +418,11 @@ void Channel::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, ok = ProcessIncomingMessages(context, bytes_transfered); processing_incoming_ = false; } else { - DCHECK(context == &output_state_.overlapped); + DCHECK(context == &output_state_.context); ok = ProcessOutgoingMessages(context, bytes_transfered); } - if (!ok) { + if (!ok && INVALID_HANDLE_VALUE != pipe_) { + // We don't want to re-enter Close(). Close(); listener_->OnChannelError(); } diff --git a/chrome/common/ipc_channel.h b/chrome/common/ipc_channel.h index e3e4ca2..b69f962 100644 --- a/chrome/common/ipc_channel.h +++ b/chrome/common/ipc_channel.h @@ -99,12 +99,14 @@ class Channel : public MessageLoopForIO::IOHandler, const std::wstring PipeName(const std::wstring& channel_id) const; bool CreatePipe(const std::wstring& channel_id, Mode mode); bool ProcessConnection(); - bool ProcessIncomingMessages(OVERLAPPED* context, DWORD bytes_read); - bool ProcessOutgoingMessages(OVERLAPPED* context, DWORD bytes_written); + bool ProcessIncomingMessages(MessageLoopForIO::IOContext* context, + DWORD bytes_read); + bool ProcessOutgoingMessages(MessageLoopForIO::IOContext* context, + DWORD bytes_written); // MessageLoop::IOHandler implementation. - virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered, - DWORD error); + virtual void OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_transfered, DWORD error); private: enum { @@ -112,9 +114,9 @@ class Channel : public MessageLoopForIO::IOHandler, }; struct State { - State(); + explicit State(Channel* channel); ~State(); - OVERLAPPED overlapped; + MessageLoopForIO::IOContext context; bool is_pending; }; diff --git a/chrome/common/ipc_sync_channel_unittest.cc b/chrome/common/ipc_sync_channel_unittest.cc index 1d28b06..749a507 100644 --- a/chrome/common/ipc_sync_channel_unittest.cc +++ b/chrome/common/ipc_sync_channel_unittest.cc @@ -104,7 +104,7 @@ class Worker : public Channel::Listener, public Message::Sender { channel_->Close(); } void Start() { - StartThread(&listener_thread_); + StartThread(&listener_thread_, MessageLoop::TYPE_DEFAULT); ListenerThread()->message_loop()->PostTask(FROM_HERE, NewRunnableMethod( this, &Worker::OnStart)); } @@ -149,7 +149,7 @@ class Worker : public Channel::Listener, public Message::Sender { // Called on the listener thread to create the sync channel. void OnStart() { // Link ipc_thread_, listener_thread_ and channel_ altogether. - StartThread(&ipc_thread_); + StartThread(&ipc_thread_, MessageLoop::TYPE_IO); channel_.reset(new SyncChannel( channel_name_, mode_, this, NULL, ipc_thread_.message_loop(), true, TestProcess::GetShutDownEvent())); @@ -178,9 +178,9 @@ class Worker : public Channel::Listener, public Message::Sender { IPC_END_MESSAGE_MAP() } - void StartThread(base::Thread* thread) { + void StartThread(base::Thread* thread, MessageLoop::Type type) { base::Thread::Options options; - options.message_loop_type = MessageLoop::TYPE_IO; + options.message_loop_type = type; thread->StartWithOptions(options); } diff --git a/net/base/file_stream_win.cc b/net/base/file_stream_win.cc index d7b3d1d..56d0368 100644 --- a/net/base/file_stream_win.cc +++ b/net/base/file_stream_win.cc @@ -50,55 +50,55 @@ static int MapErrorCode(DWORD err) { class FileStream::AsyncContext : public MessageLoopForIO::IOHandler { public: AsyncContext(FileStream* owner) - : owner_(owner), overlapped_(), callback_(NULL) { - overlapped_.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); - } - - ~AsyncContext() { - if (callback_) - MessageLoopForIO::current()->RegisterIOContext(&overlapped_, NULL); - CloseHandle(overlapped_.hEvent); + : owner_(owner), context_(), callback_(NULL) { + context_.handler = this; } + ~AsyncContext(); void IOCompletionIsPending(CompletionCallback* callback); - - OVERLAPPED* overlapped() { return &overlapped_; } + + OVERLAPPED* overlapped() { return &context_.overlapped; } CompletionCallback* callback() const { return callback_; } private: - // MessageLoopForIO::IOHandler implementation: - virtual void OnIOCompleted(OVERLAPPED* context, DWORD num_bytes, - DWORD error); + virtual void OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_read, DWORD error); FileStream* owner_; - OVERLAPPED overlapped_; + MessageLoopForIO::IOContext context_; CompletionCallback* callback_; }; +FileStream::AsyncContext::~AsyncContext() { + bool waited = false; + base::Time start = base::Time::Now(); + while (callback_) { + waited = true; + MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); + } + if (waited) { + // We want to see if we block the message loop for too long. + UMA_HISTOGRAM_TIMES(L"AsyncIO.FileStreamClose", base::Time::Now() - start); + } +} + void FileStream::AsyncContext::IOCompletionIsPending( CompletionCallback* callback) { DCHECK(!callback_); callback_ = callback; - - MessageLoopForIO::current()->RegisterIOContext(&overlapped_, this); } -void FileStream::AsyncContext::OnIOCompleted(OVERLAPPED* context, - DWORD num_bytes, - DWORD error) { - DCHECK(&overlapped_ == context); +void FileStream::AsyncContext::OnIOCompleted( + MessageLoopForIO::IOContext* context, DWORD bytes_read, DWORD error) { + DCHECK(&context_ == context); DCHECK(callback_); - MessageLoopForIO::current()->RegisterIOContext(&overlapped_, NULL); - - HANDLE handle = owner_->file_; - - int result = static_cast<int>(num_bytes); + int result = static_cast<int>(bytes_read); if (error && error != ERROR_HANDLE_EOF) result = MapErrorCode(error); - - if (num_bytes) - IncrementOffset(&overlapped_, num_bytes); + + if (bytes_read) + IncrementOffset(&context->overlapped, bytes_read); CompletionCallback* temp = NULL; std::swap(temp, callback_); @@ -115,11 +115,14 @@ FileStream::~FileStream() { } void FileStream::Close() { + if (file_ != INVALID_HANDLE_VALUE) + CancelIo(file_); + + async_context_.reset(); if (file_ != INVALID_HANDLE_VALUE) { CloseHandle(file_); file_ = INVALID_HANDLE_VALUE; } - async_context_.reset(); } int FileStream::Open(const std::wstring& path, int open_flags) { @@ -211,9 +214,10 @@ int FileStream::Read( LOG(WARNING) << "ReadFile failed: " << error; rv = MapErrorCode(error); } + } else if (overlapped) { + async_context_->IOCompletionIsPending(callback); + rv = ERR_IO_PENDING; } else { - if (overlapped) - IncrementOffset(overlapped, bytes_read); rv = static_cast<int>(bytes_read); } return rv; @@ -224,7 +228,7 @@ int FileStream::Write( if (!IsOpen()) return ERR_UNEXPECTED; DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE); - + OVERLAPPED* overlapped = NULL; if (async_context_.get()) { DCHECK(!async_context_->callback()); @@ -242,9 +246,10 @@ int FileStream::Write( LOG(WARNING) << "WriteFile failed: " << error; rv = MapErrorCode(error); } + } else if (overlapped) { + async_context_->IOCompletionIsPending(callback); + rv = ERR_IO_PENDING; } else { - if (overlapped) - IncrementOffset(overlapped, bytes_written); rv = static_cast<int>(bytes_written); } return rv; diff --git a/net/disk_cache/cache_util_win.cc b/net/disk_cache/cache_util_win.cc index 377cbbf..adb9d8a5 100644 --- a/net/disk_cache/cache_util_win.cc +++ b/net/disk_cache/cache_util_win.cc @@ -7,6 +7,7 @@ #include <windows.h> #include "base/logging.h" +#include "base/message_loop.h" #include "base/scoped_handle.h" #include "base/file_util.h" @@ -38,6 +39,9 @@ void DeleteFiles(const wchar_t* path, const wchar_t* search_name) { namespace disk_cache { +// Implemented in file_win.cc. +MessageLoopForIO::IOHandler* GetFileIOHandler(); + bool MoveCache(const std::wstring& from_path, const std::wstring& to_path) { // I don't want to use the shell version of move because if something goes // wrong, that version will attempt to move file by file and fail at the end. @@ -63,12 +67,8 @@ bool DeleteCacheFile(const std::wstring& name) { void WaitForPendingIO(int* num_pending_io) { while (*num_pending_io) { // Asynchronous IO operations may be in flight and the completion may end - // up calling us back so let's wait for them (we need an alertable wait). - // The idea is to let other threads do usefull work and at the same time - // allow more than one IO to finish... 20 mS later, we process all queued - // APCs and see if we have to repeat the wait. - Sleep(20); - SleepEx(0, TRUE); + // up calling us back so let's wait for them. + MessageLoopForIO::current()->WaitForIOCompletion(100, GetFileIOHandler()); } } diff --git a/net/disk_cache/disk_cache_perftest.cc b/net/disk_cache/disk_cache_perftest.cc index c160677..33cad54 100644 --- a/net/disk_cache/disk_cache_perftest.cc +++ b/net/disk_cache/disk_cache_perftest.cc @@ -9,6 +9,7 @@ #include "base/basictypes.h" #include "base/file_util.h" #include "base/perftimer.h" +#include "base/platform_test.h" #if defined(OS_WIN) #include "base/scoped_handle.h" #endif @@ -17,7 +18,6 @@ #include "net/base/net_errors.h" #include "net/disk_cache/block_files.h" #include "net/disk_cache/disk_cache.h" -#include "net/disk_cache/disk_cache_test_base.h" #include "net/disk_cache/disk_cache_test_util.h" #include "net/disk_cache/hash.h" #include "testing/gtest/include/gtest/gtest.h" @@ -28,6 +28,8 @@ extern int g_cache_tests_max_id; extern volatile int g_cache_tests_received; extern volatile bool g_cache_tests_error; +typedef PlatformTest DiskCacheTest; + namespace { bool EvictFileFromSystemCache(const wchar_t* name) { @@ -226,6 +228,7 @@ TEST_F(DiskCacheTest, CacheBackendPerformance) { int ret = TimeWrite(num_entries, cache, &entries); EXPECT_EQ(ret, g_cache_tests_received); + MessageLoop::current()->RunAllPending(); delete cache; std::wstring filename(path); @@ -257,6 +260,7 @@ TEST_F(DiskCacheTest, CacheBackendPerformance) { ret = TimeRead(num_entries, cache, entries, false); EXPECT_EQ(ret, g_cache_tests_received); + MessageLoop::current()->RunAllPending(); delete cache; } @@ -266,6 +270,7 @@ TEST_F(DiskCacheTest, CacheBackendPerformance) { // fragmented, or if we have multiple files. This test measures that scenario, // by using multiple, highly fragmented files. TEST_F(DiskCacheTest, BlockFilesPerformance) { + MessageLoopForIO message_loop; std::wstring path = GetCachePath(); ASSERT_TRUE(DeleteCache(path.c_str())); @@ -303,4 +308,5 @@ TEST_F(DiskCacheTest, BlockFilesPerformance) { } timer2.Done(); + MessageLoop::current()->RunAllPending(); } diff --git a/net/disk_cache/disk_cache_test_base.cc b/net/disk_cache/disk_cache_test_base.cc index 417792f..0a3e07b 100644 --- a/net/disk_cache/disk_cache_test_base.cc +++ b/net/disk_cache/disk_cache_test_base.cc @@ -8,6 +8,10 @@ #include "net/disk_cache/disk_cache_test_util.h" #include "net/disk_cache/mem_backend_impl.h" +void DiskCacheTest::TearDown() { + MessageLoop::current()->RunAllPending(); +} + void DiskCacheTestWithCache::SetMaxSize(int size) { size_ = size; if (cache_impl_) @@ -73,6 +77,7 @@ void DiskCacheTestWithCache::InitDiskCache() { void DiskCacheTestWithCache::TearDown() { + MessageLoop::current()->RunAllPending(); delete cache_; if (!memory_only_) { diff --git a/net/disk_cache/disk_cache_test_base.h b/net/disk_cache/disk_cache_test_base.h index ca56be1f..6b75361 100644 --- a/net/disk_cache/disk_cache_test_base.h +++ b/net/disk_cache/disk_cache_test_base.h @@ -9,12 +9,6 @@ #include "base/platform_test.h" #include "testing/gtest/include/gtest/gtest.h" -// These tests can use the path service, which uses autoreleased objects on the -// Mac, so this needs to be a PlatformTest. Even tests that do not require a -// cache (and that do not need to be a DiskCacheTestWithCache) are susceptible -// to this problem; all such tests should use TEST_F(DiskCacheTest, ...). -typedef PlatformTest DiskCacheTest; - namespace disk_cache { class Backend; @@ -23,6 +17,14 @@ class MemBackendImpl; } // namespace disk_cache +// These tests can use the path service, which uses autoreleased objects on the +// Mac, so this needs to be a PlatformTest. Even tests that do not require a +// cache (and that do not need to be a DiskCacheTestWithCache) are susceptible +// to this problem; all such tests should use TEST_F(DiskCacheTest, ...). +class DiskCacheTest : public PlatformTest { + virtual void TearDown(); +}; + // Provides basic support for cache related tests. class DiskCacheTestWithCache : public DiskCacheTest { protected: diff --git a/net/disk_cache/entry_impl.cc b/net/disk_cache/entry_impl.cc index 844ff40..2821540 100644 --- a/net/disk_cache/entry_impl.cc +++ b/net/disk_cache/entry_impl.cc @@ -16,25 +16,8 @@ using base::TimeDelta; namespace { -// This is a simple Task to execute the callback (from the message loop instead -// of the APC). -class InvokeCallback : public Task { - public: - InvokeCallback(net::CompletionCallback* callback, int argument) - : callback_(callback), argument_(argument) {} - - virtual void Run() { - callback_->Run(argument_); - } - - private: - net::CompletionCallback* callback_; - int argument_; - DISALLOW_EVIL_CONSTRUCTORS(InvokeCallback); -}; - -// This class implements FileIOCallback to buffer the callback from an IO -// operation from the actual IO class. +// This class implements FileIOCallback to buffer the callback from a file IO +// operation from the actual net class. class SyncCallback: public disk_cache::FileIOCallback { public: SyncCallback(disk_cache::EntryImpl* entry, @@ -57,10 +40,8 @@ class SyncCallback: public disk_cache::FileIOCallback { void SyncCallback::OnFileIOComplete(int bytes_copied) { entry_->DecrementIoCount(); entry_->Release(); - if (callback_) { - InvokeCallback* task = new InvokeCallback(callback_, bytes_copied); - MessageLoop::current()->PostTask(FROM_HERE, task); - } + if (callback_) + callback_->Run(bytes_copied); delete this; } @@ -556,9 +537,11 @@ void EntryImpl::DeleteData(Addr address, int index) { if (files_[index]) files_[index] = NULL; // Releases the object. - if (!DeleteCacheFile(backend_->GetFileName(address))) + if (!DeleteCacheFile(backend_->GetFileName(address))) { + UMA_HISTOGRAM_COUNTS(L"DiskCache.DeleteFailed", 1); LOG(ERROR) << "Failed to delete " << backend_->GetFileName(address) << " from the cache."; + } } else { backend_->DeleteBlock(address, true); } @@ -711,7 +694,6 @@ bool EntryImpl::ImportSeparateFile(int index, int offset, int buf_len) { return true; } - // The common scenario is that this is called from the destructor of the entry, // to write to disk what we have buffered. We don't want to hold the destructor // until the actual IO finishes, so we'll send an asynchronous write that will @@ -744,6 +726,13 @@ bool EntryImpl::Flush(int index, int size, bool async) { if (!file) return false; + // TODO(rvargas): figure out if it's worth to re-enable posting operations. + // Right now it is only used from GrowUserBuffer, not the destructor, and + // it is not accounted for from the point of view of the total number of + // pending operations of the cache. It is also racing with the actual write + // on the GrowUserBuffer path because there is no code to exclude the range + // that is going to be written. + async = false; if (async) { if (!file->PostWrite(user_buffers_[index].get(), len, offset)) return false; diff --git a/net/disk_cache/file_win.cc b/net/disk_cache/file_win.cc index 0e8e3f3..bc89975 100644 --- a/net/disk_cache/file_win.cc +++ b/net/disk_cache/file_win.cc @@ -4,62 +4,39 @@ #include "net/disk_cache/file.h" +#include "base/message_loop.h" +#include "base/singleton.h" #include "net/disk_cache/disk_cache.h" namespace { -// This class implements FileIOCallback to perform IO operations -// when the callback parameter of the operation is NULL. -class SyncCallback: public disk_cache::FileIOCallback { - public: - SyncCallback() : called_(false) {} - ~SyncCallback() {} - - virtual void OnFileIOComplete(int bytes_copied); - void WaitForResult(int* bytes_copied); - private: - bool called_; - int actual_; -}; - -void SyncCallback::OnFileIOComplete(int bytes_copied) { - actual_ = bytes_copied; - called_ = true; -} - -// Waits for the IO operation to complete. -void SyncCallback::WaitForResult(int* bytes_copied) { - for (;;) { - SleepEx(INFINITE, TRUE); - if (called_) - break; - } - *bytes_copied = actual_; -} - // Structure used for asynchronous operations. struct MyOverlapped { - OVERLAPPED overlapped; - disk_cache::File* file; - disk_cache::FileIOCallback* callback; - const void* buffer; - DWORD actual_bytes; - bool async; // Invoke the callback form the completion. - bool called; // Completion received. - bool delete_buffer; // Delete the user buffer at completion. -}; + MyOverlapped(disk_cache::File* file, size_t offset, + disk_cache::FileIOCallback* callback); + ~MyOverlapped(); + OVERLAPPED* overlapped() { + return &context_.overlapped; + } -COMPILE_ASSERT(!offsetof(MyOverlapped, overlapped), starts_with_overlapped); + MessageLoopForIO::IOContext context_; + scoped_refptr<disk_cache::File> file_; + disk_cache::FileIOCallback* callback_; + const void* buffer_; + bool delete_buffer_; // Delete the user buffer at completion. +}; -} // namespace +COMPILE_ASSERT(!offsetof(MyOverlapped, context_), starts_with_overlapped); -namespace disk_cache { +// Helper class to handle the IO completion notifications from the message loop. +class CompletionHandler : public MessageLoopForIO::IOHandler { + virtual void OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD actual_bytes, DWORD error); +}; -// SyncCallback to be invoked as an APC when the asynchronous operation -// completes. -void CALLBACK IoCompletion(DWORD error, DWORD actual_bytes, - OVERLAPPED* overlapped) { - MyOverlapped* data = reinterpret_cast<MyOverlapped*>(overlapped); +void CompletionHandler::OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD actual_bytes, DWORD error) { + MyOverlapped* data = reinterpret_cast<MyOverlapped*>(context); if (error) { DCHECK(!actual_bytes); @@ -67,29 +44,39 @@ void CALLBACK IoCompletion(DWORD error, DWORD actual_bytes, NOTREACHED(); } - if (data->delete_buffer) { - DCHECK(!data->callback); - data->file->Release(); - delete data->buffer; - delete data; - return; - } + if (data->callback_) + data->callback_->OnFileIOComplete(static_cast<int>(actual_bytes)); - if (data->async) { - data->callback->OnFileIOComplete(static_cast<int>(actual_bytes)); - data->file->Release(); - delete data; - } else { - // Somebody is waiting for this so don't delete data and instead notify - // that we were called. - data->actual_bytes = actual_bytes; - data->file->Release(); - data->called = true; + delete data; +} + +MyOverlapped::MyOverlapped(disk_cache::File* file, size_t offset, + disk_cache::FileIOCallback* callback) { + memset(this, 0, sizeof(*this)); + context_.handler = Singleton<CompletionHandler>::get(); + context_.overlapped.Offset = static_cast<DWORD>(offset); + file_ = file; + callback_ = callback; +} + +MyOverlapped::~MyOverlapped() { + if (delete_buffer_) { + DCHECK(!callback_); + delete buffer_; } } +} // namespace + +namespace disk_cache { + +// Used from WaitForPendingIO() when the cache is being destroyed. +MessageLoopForIO::IOHandler* GetFileIOHandler() { + return Singleton<CompletionHandler>::get(); +} + File::File(base::PlatformFile file) - : init_(true), mixed_(true), platform_file_(INVALID_HANDLE_VALUE), + : init_(true), platform_file_(INVALID_HANDLE_VALUE), sync_platform_file_(file) { } @@ -99,23 +86,22 @@ bool File::Init(const std::wstring& name) { return false; platform_file_ = CreateFile(name.c_str(), GENERIC_READ | GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, - FILE_FLAG_OVERLAPPED, NULL); + FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, + OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); if (INVALID_HANDLE_VALUE == platform_file_) return false; + MessageLoopForIO::current()->RegisterIOHandler( + platform_file_, Singleton<CompletionHandler>::get()); + init_ = true; - if (mixed_) { - sync_platform_file_ = CreateFile(name.c_str(), GENERIC_READ | GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, - OPEN_EXISTING, 0, NULL); - - if (INVALID_HANDLE_VALUE == sync_platform_file_) - return false; - } else { - sync_platform_file_ = INVALID_HANDLE_VALUE; - } + sync_platform_file_ = CreateFile(name.c_str(), GENERIC_READ | GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, + OPEN_EXISTING, 0, NULL); + + if (INVALID_HANDLE_VALUE == sync_platform_file_) + return false; return true; } @@ -126,13 +112,13 @@ File::~File() { if (INVALID_HANDLE_VALUE != platform_file_) CloseHandle(platform_file_); - if (mixed_ && INVALID_HANDLE_VALUE != sync_platform_file_) + if (INVALID_HANDLE_VALUE != sync_platform_file_) CloseHandle(sync_platform_file_); } base::PlatformFile File::platform_file() const { DCHECK(init_); - return (INVALID_HANDLE_VALUE == platform_file_) ? sync_platform_file_ : + return (INVALID_HANDLE_VALUE == platform_file_) ? sync_platform_file_ : platform_file_; } @@ -145,13 +131,11 @@ bool File::IsValid() const { bool File::Read(void* buffer, size_t buffer_len, size_t offset) { DCHECK(init_); - if (!mixed_ || buffer_len > ULONG_MAX || offset > LONG_MAX) + if (buffer_len > ULONG_MAX || offset > LONG_MAX) return false; - DWORD ret = SetFilePointer(sync_platform_file_, - static_cast<LONG>(offset), - NULL, - FILE_BEGIN); + DWORD ret = SetFilePointer(sync_platform_file_, static_cast<LONG>(offset), + NULL, FILE_BEGIN); if (INVALID_SET_FILE_POINTER == ret) return false; @@ -164,13 +148,11 @@ bool File::Read(void* buffer, size_t buffer_len, size_t offset) { bool File::Write(const void* buffer, size_t buffer_len, size_t offset) { DCHECK(init_); - if (!mixed_ || buffer_len > ULONG_MAX || offset > ULONG_MAX) + if (buffer_len > ULONG_MAX || offset > ULONG_MAX) return false; - DWORD ret = SetFilePointer(sync_platform_file_, - static_cast<LONG>(offset), - NULL, - FILE_BEGIN); + DWORD ret = SetFilePointer(sync_platform_file_, static_cast<LONG>(offset), + NULL, FILE_BEGIN); if (INVALID_SET_FILE_POINTER == ret) return false; @@ -187,55 +169,38 @@ bool File::Write(const void* buffer, size_t buffer_len, size_t offset) { bool File::Read(void* buffer, size_t buffer_len, size_t offset, FileIOCallback* callback, bool* completed) { DCHECK(init_); + if (!callback) + return Read(buffer, buffer_len, offset); + if (buffer_len > ULONG_MAX || offset > ULONG_MAX) return false; - MyOverlapped* data = new MyOverlapped; - memset(data, 0, sizeof(*data)); - - SyncCallback local_callback; - data->overlapped.Offset = static_cast<DWORD>(offset); - data->callback = callback ? callback : &local_callback; - data->file = this; - + MyOverlapped* data = new MyOverlapped(this, offset, callback); DWORD size = static_cast<DWORD>(buffer_len); - AddRef(); - if (!ReadFileEx(platform_file_, buffer, size, &data->overlapped, - &IoCompletion)) { - Release(); + DWORD actual; + if (!ReadFile(platform_file_, buffer, size, &actual, data->overlapped())) { + *completed = false; + if (GetLastError() == ERROR_IO_PENDING) + return true; delete data; return false; } - if (callback) { - *completed = false; - // Let's check if the operation is already finished. - SleepEx(0, TRUE); - if (data->called) { - *completed = (data->actual_bytes == size); - DCHECK(data->actual_bytes == size); - delete data; - return *completed; - } - data->async = true; - } else { - // Invoke the callback and perform cleanup on the APC. - data->async = true; - int bytes_copied; - local_callback.WaitForResult(&bytes_copied); - if (static_cast<int>(buffer_len) != bytes_copied) { - NOTREACHED(); - return false; - } - } - - return true; + // The operation completed already. We'll be called back anyway. + *completed = (actual == size); + DCHECK(actual == size); + data->callback_ = NULL; + data->file_ = NULL; // There is no reason to hold on to this anymore. + return *completed; } bool File::Write(const void* buffer, size_t buffer_len, size_t offset, FileIOCallback* callback, bool* completed) { DCHECK(init_); + if (!callback) + return Write(buffer, buffer_len, offset); + return AsyncWrite(buffer, buffer_len, offset, true, callback, completed); } @@ -250,50 +215,32 @@ bool File::AsyncWrite(const void* buffer, size_t buffer_len, size_t offset, if (buffer_len > ULONG_MAX || offset > ULONG_MAX) return false; - MyOverlapped* data = new MyOverlapped; - memset(data, 0, sizeof(*data)); - - SyncCallback local_callback; - data->overlapped.Offset = static_cast<DWORD>(offset); - data->callback = callback ? callback : &local_callback; - data->file = this; - if (!callback && !notify) { - data->delete_buffer = true; - data->callback = NULL; - data->buffer = buffer; + MyOverlapped* data = new MyOverlapped(this, offset, callback); + bool dummy_completed; + if (!callback) { + DCHECK(!notify); + data->delete_buffer_ = true; + data->buffer_ = buffer; + completed = &dummy_completed; } DWORD size = static_cast<DWORD>(buffer_len); - AddRef(); - if (!WriteFileEx(platform_file_, buffer, size, &data->overlapped, - &IoCompletion)) { - Release(); + DWORD actual; + if (!WriteFile(platform_file_, buffer, size, &actual, data->overlapped())) { + *completed = false; + if (GetLastError() == ERROR_IO_PENDING) + return true; delete data; return false; } - if (callback) { - *completed = false; - SleepEx(0, TRUE); - if (data->called) { - *completed = (data->actual_bytes == size); - DCHECK(data->actual_bytes == size); - delete data; - return *completed; - } - data->async = true; - } else if (notify) { - data->async = true; - int bytes_copied; - local_callback.WaitForResult(&bytes_copied); - if (static_cast<int>(buffer_len) != bytes_copied) { - NOTREACHED(); - return false; - } - } - - return true; + // The operation completed already. We'll be called back anyway. + *completed = (actual == size); + DCHECK(actual == size); + data->callback_ = NULL; + data->file_ = NULL; // There is no reason to hold on to this anymore. + return *completed; } bool File::SetLength(size_t length) { diff --git a/net/disk_cache/mapped_file_unittest.cc b/net/disk_cache/mapped_file_unittest.cc index fe090e5..28eeff5 100644 --- a/net/disk_cache/mapped_file_unittest.cc +++ b/net/disk_cache/mapped_file_unittest.cc @@ -95,6 +95,8 @@ TEST_F(DiskCacheTest, MappedFile_AsyncIO) { g_cache_tests_max_id = 0; g_cache_tests_received = 0; + MessageLoopHelper helper; + char buffer1[20]; char buffer2[20]; CacheTestFillBuffer(buffer1, sizeof(buffer1), false); @@ -105,14 +107,14 @@ TEST_F(DiskCacheTest, MappedFile_AsyncIO) { int expected = completed ? 0 : 1; g_cache_tests_max_id = 1; - WaitForCallbacks(expected); + helper.WaitUntilCacheIoFinished(expected); EXPECT_TRUE(file->Read(buffer2, sizeof(buffer2), 1024 * 1024, &callback, &completed)); if (!completed) expected++; - WaitForCallbacks(expected); + helper.WaitUntilCacheIoFinished(expected); EXPECT_EQ(expected, g_cache_tests_received); EXPECT_FALSE(g_cache_tests_error); |