summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--base/message_loop.cc11
-rw-r--r--base/message_loop.h7
-rw-r--r--base/message_loop_unittest.cc186
-rw-r--r--base/message_pump_win.cc666
-rw-r--r--base/message_pump_win.h290
-rw-r--r--chrome/common/ipc_channel.cc144
-rw-r--r--chrome/common/ipc_channel.h14
-rw-r--r--chrome/common/ipc_sync_channel_unittest.cc8
-rw-r--r--net/base/file_stream_win.cc73
-rw-r--r--net/disk_cache/cache_util_win.cc12
-rw-r--r--net/disk_cache/disk_cache_perftest.cc8
-rw-r--r--net/disk_cache/disk_cache_test_base.cc5
-rw-r--r--net/disk_cache/disk_cache_test_base.h14
-rw-r--r--net/disk_cache/entry_impl.cc39
-rw-r--r--net/disk_cache/file_win.cc265
-rw-r--r--net/disk_cache/mapped_file_unittest.cc6
16 files changed, 816 insertions, 932 deletions
diff --git a/base/message_loop.cc b/base/message_loop.cc
index 73cb7da..1145439 100644
--- a/base/message_loop.cc
+++ b/base/message_loop.cc
@@ -573,7 +573,7 @@ void MessageLoopForUI::DidProcessMessage(const MSG& message) {
pump_win()->DidProcessMessage(message);
}
void MessageLoopForUI::PumpOutPendingPaintMessages() {
- pump_win()->PumpOutPendingPaintMessages();
+ pump_ui()->PumpOutPendingPaintMessages();
}
#endif // defined(OS_WIN)
@@ -583,17 +583,12 @@ void MessageLoopForUI::PumpOutPendingPaintMessages() {
#if defined(OS_WIN)
-void MessageLoopForIO::WatchObject(HANDLE object, Watcher* watcher) {
- pump_io()->WatchObject(object, watcher);
-}
-
void MessageLoopForIO::RegisterIOHandler(HANDLE file, IOHandler* handler) {
pump_io()->RegisterIOHandler(file, handler);
}
-void MessageLoopForIO::RegisterIOContext(OVERLAPPED* context,
- IOHandler* handler) {
- pump_io()->RegisterIOContext(context, handler);
+bool MessageLoopForIO::WaitForIOCompletion(DWORD timeout, IOHandler* filter) {
+ return pump_io()->WaitForIOCompletion(timeout, filter);
}
#elif defined(OS_POSIX)
diff --git a/base/message_loop.h b/base/message_loop.h
index 2d38853..8f16d8e 100644
--- a/base/message_loop.h
+++ b/base/message_loop.h
@@ -426,7 +426,7 @@ class MessageLoopForUI : public MessageLoop {
protected:
// TODO(rvargas): Make this platform independent.
- base::MessagePumpWin* pump_ui() {
+ base::MessagePumpForUI* pump_ui() {
return static_cast<base::MessagePumpForUI*>(pump_.get());
}
#endif // defined(OS_WIN)
@@ -458,13 +458,12 @@ class MessageLoopForIO : public MessageLoop {
}
#if defined(OS_WIN)
- typedef base::MessagePumpForIO::Watcher Watcher;
typedef base::MessagePumpForIO::IOHandler IOHandler;
+ typedef base::MessagePumpForIO::IOContext IOContext;
// Please see MessagePumpWin for definitions of these methods.
- void WatchObject(HANDLE object, Watcher* watcher);
void RegisterIOHandler(HANDLE file_handle, IOHandler* handler);
- void RegisterIOContext(OVERLAPPED* context, IOHandler* handler);
+ bool WaitForIOCompletion(DWORD timeout, IOHandler* filter);
protected:
// TODO(rvargas): Make this platform independent.
diff --git a/base/message_loop_unittest.cc b/base/message_loop_unittest.cc
index 83be2b1..d959659 100644
--- a/base/message_loop_unittest.cc
+++ b/base/message_loop_unittest.cc
@@ -1056,68 +1056,6 @@ void RunTest_NonNestableInNestedLoop(MessageLoop::Type message_loop_type) {
#if defined(OS_WIN)
-class AutoresetWatcher : public MessageLoopForIO::Watcher {
- public:
- explicit AutoresetWatcher(HANDLE signal) : signal_(signal) {
- }
- virtual void OnObjectSignaled(HANDLE object);
- private:
- HANDLE signal_;
-};
-
-void AutoresetWatcher::OnObjectSignaled(HANDLE object) {
- MessageLoopForIO::current()->WatchObject(object, NULL);
- ASSERT_TRUE(SetEvent(signal_));
-}
-
-class AutoresetTask : public Task {
- public:
- AutoresetTask(HANDLE object, MessageLoopForIO::Watcher* watcher)
- : object_(object), watcher_(watcher) {}
- virtual void Run() {
- MessageLoopForIO::current()->WatchObject(object_, watcher_);
- }
-
- private:
- HANDLE object_;
- MessageLoopForIO::Watcher* watcher_;
-};
-
-void RunTest_AutoresetEvents(MessageLoop::Type message_loop_type) {
- MessageLoop loop(message_loop_type);
-
- SECURITY_ATTRIBUTES attributes;
- attributes.nLength = sizeof(attributes);
- attributes.bInheritHandle = false;
- attributes.lpSecurityDescriptor = NULL;
-
- // Init an autoreset and a manual reset events.
- HANDLE autoreset = CreateEvent(&attributes, FALSE, FALSE, NULL);
- HANDLE callback_called = CreateEvent(&attributes, TRUE, FALSE, NULL);
- ASSERT_TRUE(NULL != autoreset);
- ASSERT_TRUE(NULL != callback_called);
-
- Thread thread("Autoreset test");
- Thread::Options options;
- options.message_loop_type = message_loop_type;
- ASSERT_TRUE(thread.StartWithOptions(options));
-
- MessageLoop* thread_loop = thread.message_loop();
- ASSERT_TRUE(NULL != thread_loop);
-
- AutoresetWatcher watcher(callback_called);
- AutoresetTask* task = new AutoresetTask(autoreset, &watcher);
- thread_loop->PostTask(FROM_HERE, task);
- Sleep(100); // Make sure the thread runs and sleeps for lack of work.
-
- ASSERT_TRUE(SetEvent(autoreset));
-
- DWORD result = WaitForSingleObject(callback_called, 1000);
- EXPECT_EQ(WAIT_OBJECT_0, result);
-
- thread.Stop();
-}
-
class DispatcherImpl : public MessageLoopForUI::Dispatcher {
public:
DispatcherImpl() : dispatch_count_(0) {}
@@ -1150,68 +1088,68 @@ void RunTest_Dispatcher(MessageLoop::Type message_loop_type) {
class TestIOHandler : public MessageLoopForIO::IOHandler {
public:
- TestIOHandler(const wchar_t* name, HANDLE signal);
+ TestIOHandler(const wchar_t* name, HANDLE signal, bool wait);
- virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered,
- DWORD error);
+ virtual void OnIOCompleted(MessageLoopForIO::IOContext* context,
+ DWORD bytes_transfered, DWORD error);
- HANDLE file() { return file_.Get(); }
- void* buffer() { return buffer_; }
- OVERLAPPED* context() { return &context_; }
+ void Init();
+ void WaitForIO();
+ OVERLAPPED* context() { return &context_.overlapped; }
DWORD size() { return sizeof(buffer_); }
private:
char buffer_[48];
- OVERLAPPED context_;
+ MessageLoopForIO::IOContext context_;
HANDLE signal_;
ScopedHandle file_;
- ScopedHandle event_;
+ bool wait_;
};
-TestIOHandler::TestIOHandler(const wchar_t* name, HANDLE signal)
- : signal_(signal) {
+TestIOHandler::TestIOHandler(const wchar_t* name, HANDLE signal, bool wait)
+ : signal_(signal), wait_(wait) {
memset(buffer_, 0, sizeof(buffer_));
memset(&context_, 0, sizeof(context_));
-
- event_.Set(CreateEvent(NULL, TRUE, FALSE, NULL));
- EXPECT_TRUE(event_.IsValid());
- context_.hEvent = event_.Get();
+ context_.handler = this;
file_.Set(CreateFile(name, GENERIC_READ, 0, NULL, OPEN_EXISTING,
FILE_FLAG_OVERLAPPED, NULL));
EXPECT_TRUE(file_.IsValid());
}
-void TestIOHandler::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered,
- DWORD error) {
+void TestIOHandler::Init() {
+ MessageLoopForIO::current()->RegisterIOHandler(file_, this);
+
+ DWORD read;
+ EXPECT_FALSE(ReadFile(file_, buffer_, size(), &read, context()));
+ EXPECT_EQ(ERROR_IO_PENDING, GetLastError());
+ if (wait_)
+ WaitForIO();
+}
+
+void TestIOHandler::OnIOCompleted(MessageLoopForIO::IOContext* context,
+ DWORD bytes_transfered, DWORD error) {
ASSERT_TRUE(context == &context_);
- MessageLoopForIO::current()->RegisterIOContext(context, NULL);
ASSERT_TRUE(SetEvent(signal_));
}
+void TestIOHandler::WaitForIO() {
+ EXPECT_TRUE(MessageLoopForIO::current()->WaitForIOCompletion(300, this));
+ EXPECT_TRUE(MessageLoopForIO::current()->WaitForIOCompletion(400, this));
+}
+
class IOHandlerTask : public Task {
public:
explicit IOHandlerTask(TestIOHandler* handler) : handler_(handler) {}
- virtual void Run();
+ virtual void Run() {
+ handler_->Init();
+ }
private:
TestIOHandler* handler_;
};
-void IOHandlerTask::Run() {
- MessageLoopForIO::current()->RegisterIOHandler(handler_->file(), handler_);
- MessageLoopForIO::current()->RegisterIOContext(handler_->context(), handler_);
-
- DWORD read;
- EXPECT_FALSE(ReadFile(handler_->file(), handler_->buffer(), handler_->size(),
- &read, handler_->context()));
- EXPECT_EQ(ERROR_IO_PENDING, GetLastError());
-}
-
void RunTest_IOHandler() {
- // This test requires an IO loop.
- MessageLoop loop(MessageLoop::TYPE_IO);
-
ScopedHandle callback_called(CreateEvent(NULL, TRUE, FALSE, NULL));
ASSERT_TRUE(callback_called.IsValid());
@@ -1228,7 +1166,7 @@ void RunTest_IOHandler() {
MessageLoop* thread_loop = thread.message_loop();
ASSERT_TRUE(NULL != thread_loop);
- TestIOHandler handler(kPipeName, callback_called);
+ TestIOHandler handler(kPipeName, callback_called, false);
IOHandlerTask* task = new IOHandlerTask(&handler);
thread_loop->PostTask(FROM_HERE, task);
Sleep(100); // Make sure the thread runs and sleeps for lack of work.
@@ -1243,6 +1181,57 @@ void RunTest_IOHandler() {
thread.Stop();
}
+void RunTest_WaitForIO() {
+ ScopedHandle callback1_called(CreateEvent(NULL, TRUE, FALSE, NULL));
+ ScopedHandle callback2_called(CreateEvent(NULL, TRUE, FALSE, NULL));
+ ASSERT_TRUE(callback1_called.IsValid());
+ ASSERT_TRUE(callback2_called.IsValid());
+
+ const wchar_t* kPipeName1 = L"\\\\.\\pipe\\iohandler_pipe1";
+ const wchar_t* kPipeName2 = L"\\\\.\\pipe\\iohandler_pipe2";
+ ScopedHandle server1(CreateNamedPipe(kPipeName1, PIPE_ACCESS_OUTBOUND, 0, 1,
+ 0, 0, 0, NULL));
+ ScopedHandle server2(CreateNamedPipe(kPipeName2, PIPE_ACCESS_OUTBOUND, 0, 1,
+ 0, 0, 0, NULL));
+ ASSERT_TRUE(server1.IsValid());
+ ASSERT_TRUE(server2.IsValid());
+
+ Thread thread("IOHandler test");
+ Thread::Options options;
+ options.message_loop_type = MessageLoop::TYPE_IO;
+ ASSERT_TRUE(thread.StartWithOptions(options));
+
+ MessageLoop* thread_loop = thread.message_loop();
+ ASSERT_TRUE(NULL != thread_loop);
+
+ TestIOHandler handler1(kPipeName1, callback1_called, false);
+ TestIOHandler handler2(kPipeName2, callback2_called, true);
+ IOHandlerTask* task1 = new IOHandlerTask(&handler1);
+ IOHandlerTask* task2 = new IOHandlerTask(&handler2);
+ thread_loop->PostTask(FROM_HERE, task1);
+ Sleep(100); // Make sure the thread runs and sleeps for lack of work.
+ thread_loop->PostTask(FROM_HERE, task2);
+ Sleep(100);
+
+ // At this time handler1 is waiting to be called, and the thread is waiting
+ // on the Init method of handler2, filtering only handler2 callbacks.
+
+ const char buffer[] = "Hello there!";
+ DWORD written;
+ EXPECT_TRUE(WriteFile(server1, buffer, sizeof(buffer), &written, NULL));
+ Sleep(200);
+ EXPECT_EQ(WAIT_TIMEOUT, WaitForSingleObject(callback1_called, 0)) <<
+ "handler1 has not been called";
+
+ EXPECT_TRUE(WriteFile(server2, buffer, sizeof(buffer), &written, NULL));
+
+ HANDLE objects[2] = { callback1_called.Get(), callback2_called.Get() };
+ DWORD result = WaitForMultipleObjects(2, objects, TRUE, 1000);
+ EXPECT_EQ(WAIT_OBJECT_0, result);
+
+ thread.Stop();
+}
+
#endif // defined(OS_WIN)
} // namespace
@@ -1379,11 +1368,6 @@ TEST(MessageLoopTest, NonNestableInNestedLoop) {
}
#if defined(OS_WIN)
-TEST(MessageLoopTest, AutoresetEvents) {
- // This test requires an IO loop
- RunTest_AutoresetEvents(MessageLoop::TYPE_IO);
-}
-
TEST(MessageLoopTest, Dispatcher) {
// This test requires a UI loop
RunTest_Dispatcher(MessageLoop::TYPE_UI);
@@ -1392,4 +1376,8 @@ TEST(MessageLoopTest, Dispatcher) {
TEST(MessageLoopTest, IOHandler) {
RunTest_IOHandler();
}
+
+TEST(MessageLoopTest, WaitForIO) {
+ RunTest_WaitForIO();
+}
#endif // defined(OS_WIN)
diff --git a/base/message_pump_win.cc b/base/message_pump_win.cc
index 542fcf1..7ada5eb7 100644
--- a/base/message_pump_win.cc
+++ b/base/message_pump_win.cc
@@ -11,41 +11,6 @@
using base::Time;
-namespace {
-
-class HandlerData : public base::MessagePumpForIO::Watcher {
- public:
- typedef base::MessagePumpForIO::IOHandler IOHandler;
- HandlerData(OVERLAPPED* context, IOHandler* handler)
- : context_(context), handler_(handler) {}
- ~HandlerData() {}
-
- virtual void OnObjectSignaled(HANDLE object);
-
- private:
- OVERLAPPED* context_;
- IOHandler* handler_;
-
- DISALLOW_COPY_AND_ASSIGN(HandlerData);
-};
-
-void HandlerData::OnObjectSignaled(HANDLE object) {
- DCHECK(object == context_->hEvent);
- DWORD transfered;
- DWORD error = ERROR_SUCCESS;
- BOOL ret = GetOverlappedResult(NULL, context_, &transfered, FALSE);
- if (!ret) {
- error = GetLastError();
- DCHECK(ERROR_HANDLE_EOF == error || ERROR_BROKEN_PIPE == error);
- transfered = 0;
- }
-
- ResetEvent(context_->hEvent);
- handler_->OnIOCompleted(context_, transfered, error);
-}
-
-} // namespace
-
namespace base {
static const wchar_t kWndClass[] = L"Chrome_MessagePumpWindow";
@@ -54,24 +19,9 @@ static const wchar_t kWndClass[] = L"Chrome_MessagePumpWindow";
// task (a series of such messages creates a continuous task pump).
static const int kMsgHaveWork = WM_USER + 1;
-#ifndef NDEBUG
-// Force exercise of polling model.
-static const int kMaxWaitObjects = 8;
-#else
-static const int kMaxWaitObjects = MAXIMUM_WAIT_OBJECTS;
-#endif
-
//-----------------------------------------------------------------------------
// MessagePumpWin public:
-MessagePumpWin::MessagePumpWin() : have_work_(0), state_(NULL) {
- InitMessageWnd();
-}
-
-MessagePumpWin::~MessagePumpWin() {
- DestroyWindow(message_hwnd_);
-}
-
void MessagePumpWin::AddObserver(Observer* observer) {
observers_.AddObserver(observer);
}
@@ -88,36 +38,6 @@ void MessagePumpWin::DidProcessMessage(const MSG& msg) {
FOR_EACH_OBSERVER(Observer, observers_, DidProcessMessage(msg));
}
-void MessagePumpWin::PumpOutPendingPaintMessages() {
- // If we are being called outside of the context of Run, then don't try to do
- // any work.
- if (!state_)
- return;
-
- // Create a mini-message-pump to force immediate processing of only Windows
- // WM_PAINT messages. Don't provide an infinite loop, but do enough peeking
- // to get the job done. Actual common max is 4 peeks, but we'll be a little
- // safe here.
- const int kMaxPeekCount = 20;
- bool win2k = win_util::GetWinVersion() <= win_util::WINVERSION_2000;
- int peek_count;
- for (peek_count = 0; peek_count < kMaxPeekCount; ++peek_count) {
- MSG msg;
- if (win2k) {
- if (!PeekMessage(&msg, NULL, WM_PAINT, WM_PAINT, PM_REMOVE))
- break;
- } else {
- if (!PeekMessage(&msg, NULL, 0, 0, PM_REMOVE | PM_QS_PAINT))
- break;
- }
- ProcessMessageHelper(msg);
- if (state_->should_quit) // Handle WM_QUIT.
- break;
- }
- // Histogram what was really being used, to help to adjust kMaxPeekCount.
- DHISTOGRAM_COUNTS(L"Loop.PumpOutPendingPaintMessages Peeks", peek_count);
-}
-
void MessagePumpWin::RunWithDispatcher(
Delegate* delegate, Dispatcher* dispatcher) {
RunState s;
@@ -139,7 +59,38 @@ void MessagePumpWin::Quit() {
state_->should_quit = true;
}
-void MessagePumpWin::ScheduleWork() {
+//-----------------------------------------------------------------------------
+// MessagePumpWin protected:
+
+int MessagePumpWin::GetCurrentDelay() const {
+ if (delayed_work_time_.is_null())
+ return -1;
+
+ // Be careful here. TimeDelta has a precision of microseconds, but we want a
+ // value in milliseconds. If there are 5.5ms left, should the delay be 5 or
+ // 6? It should be 6 to avoid executing delayed work too early.
+ double timeout = ceil((delayed_work_time_ - Time::Now()).InMillisecondsF());
+
+ // If this value is negative, then we need to run delayed work soon.
+ int delay = static_cast<int>(timeout);
+ if (delay < 0)
+ delay = 0;
+
+ return delay;
+}
+
+//-----------------------------------------------------------------------------
+// MessagePumpForUI public:
+
+MessagePumpForUI::MessagePumpForUI() {
+ InitMessageWnd();
+}
+
+MessagePumpForUI::~MessagePumpForUI() {
+ DestroyWindow(message_hwnd_);
+}
+
+void MessagePumpForUI::ScheduleWork() {
if (InterlockedExchange(&have_work_, 1))
return; // Someone else continued the pumping.
@@ -147,7 +98,7 @@ void MessagePumpWin::ScheduleWork() {
PostMessage(message_hwnd_, kMsgHaveWork, reinterpret_cast<WPARAM>(this), 0);
}
-void MessagePumpWin::ScheduleDelayedWork(const Time& delayed_work_time) {
+void MessagePumpForUI::ScheduleDelayedWork(const Time& delayed_work_time) {
//
// We would *like* to provide high resolution timers. Windows timers using
// SetTimer() have a 10ms granularity. We have to use WM_TIMER as a wakeup
@@ -180,24 +131,110 @@ void MessagePumpWin::ScheduleDelayedWork(const Time& delayed_work_time) {
SetTimer(message_hwnd_, reinterpret_cast<UINT_PTR>(this), delay_msec, NULL);
}
+void MessagePumpForUI::PumpOutPendingPaintMessages() {
+ // If we are being called outside of the context of Run, then don't try to do
+ // any work.
+ if (!state_)
+ return;
+
+ // Create a mini-message-pump to force immediate processing of only Windows
+ // WM_PAINT messages. Don't provide an infinite loop, but do enough peeking
+ // to get the job done. Actual common max is 4 peeks, but we'll be a little
+ // safe here.
+ const int kMaxPeekCount = 20;
+ bool win2k = win_util::GetWinVersion() <= win_util::WINVERSION_2000;
+ int peek_count;
+ for (peek_count = 0; peek_count < kMaxPeekCount; ++peek_count) {
+ MSG msg;
+ if (win2k) {
+ if (!PeekMessage(&msg, NULL, WM_PAINT, WM_PAINT, PM_REMOVE))
+ break;
+ } else {
+ if (!PeekMessage(&msg, NULL, 0, 0, PM_REMOVE | PM_QS_PAINT))
+ break;
+ }
+ ProcessMessageHelper(msg);
+ if (state_->should_quit) // Handle WM_QUIT.
+ break;
+ }
+ // Histogram what was really being used, to help to adjust kMaxPeekCount.
+ DHISTOGRAM_COUNTS(L"Loop.PumpOutPendingPaintMessages Peeks", peek_count);
+}
+
//-----------------------------------------------------------------------------
-// MessagePumpWin protected:
+// MessagePumpForUI private:
// static
-LRESULT CALLBACK MessagePumpWin::WndProcThunk(
+LRESULT CALLBACK MessagePumpForUI::WndProcThunk(
HWND hwnd, UINT message, WPARAM wparam, LPARAM lparam) {
switch (message) {
case kMsgHaveWork:
- reinterpret_cast<MessagePumpWin*>(wparam)->HandleWorkMessage();
+ reinterpret_cast<MessagePumpForUI*>(wparam)->HandleWorkMessage();
break;
case WM_TIMER:
- reinterpret_cast<MessagePumpWin*>(wparam)->HandleTimerMessage();
+ reinterpret_cast<MessagePumpForUI*>(wparam)->HandleTimerMessage();
break;
}
return DefWindowProc(hwnd, message, wparam, lparam);
}
-void MessagePumpWin::InitMessageWnd() {
+void MessagePumpForUI::DoRunLoop() {
+ // IF this was just a simple PeekMessage() loop (servicing all possible work
+ // queues), then Windows would try to achieve the following order according
+ // to MSDN documentation about PeekMessage with no filter):
+ // * Sent messages
+ // * Posted messages
+ // * Sent messages (again)
+ // * WM_PAINT messages
+ // * WM_TIMER messages
+ //
+ // Summary: none of the above classes is starved, and sent messages has twice
+ // the chance of being processed (i.e., reduced service time).
+
+ for (;;) {
+ // If we do any work, we may create more messages etc., and more work may
+ // possibly be waiting in another task group. When we (for example)
+ // ProcessNextWindowsMessage(), there is a good chance there are still more
+ // messages waiting. On the other hand, when any of these methods return
+ // having done no work, then it is pretty unlikely that calling them again
+ // quickly will find any work to do. Finally, if they all say they had no
+ // work, then it is a good time to consider sleeping (waiting) for more
+ // work.
+
+ bool more_work_is_plausible = ProcessNextWindowsMessage();
+ if (state_->should_quit)
+ break;
+
+ more_work_is_plausible |= state_->delegate->DoWork();
+ if (state_->should_quit)
+ break;
+
+ more_work_is_plausible |=
+ state_->delegate->DoDelayedWork(&delayed_work_time_);
+ // If we did not process any delayed work, then we can assume that our
+ // existing WM_TIMER if any will fire when delayed work should run. We
+ // don't want to disturb that timer if it is already in flight. However,
+ // if we did do all remaining delayed work, then lets kill the WM_TIMER.
+ if (more_work_is_plausible && delayed_work_time_.is_null())
+ KillTimer(message_hwnd_, reinterpret_cast<UINT_PTR>(this));
+ if (state_->should_quit)
+ break;
+
+ if (more_work_is_plausible)
+ continue;
+
+ more_work_is_plausible = state_->delegate->DoIdleWork();
+ if (state_->should_quit)
+ break;
+
+ if (more_work_is_plausible)
+ continue;
+
+ WaitForWork(); // Wait (sleep) until we have work to do again.
+ }
+}
+
+void MessagePumpForUI::InitMessageWnd() {
HINSTANCE hinst = GetModuleHandle(NULL);
WNDCLASSEX wc = {0};
@@ -212,7 +249,41 @@ void MessagePumpWin::InitMessageWnd() {
DCHECK(message_hwnd_);
}
-void MessagePumpWin::HandleWorkMessage() {
+void MessagePumpForUI::WaitForWork() {
+ // Wait until a message is available, up to the time needed by the timer
+ // manager to fire the next set of timers.
+ int delay = GetCurrentDelay();
+ if (delay < 0) // Negative value means no timers waiting.
+ delay = INFINITE;
+
+ DWORD result;
+ result = MsgWaitForMultipleObjectsEx(0, NULL, delay, QS_ALLINPUT,
+ MWMO_INPUTAVAILABLE);
+
+ if (WAIT_OBJECT_0 == result) {
+ // A WM_* message is available.
+ // If a parent child relationship exists between windows across threads
+ // then their thread inputs are implicitly attached.
+ // This causes the MsgWaitForMultipleObjectsEx API to return indicating
+ // that messages are ready for processing (specifically mouse messages
+ // intended for the child window. Occurs if the child window has capture)
+ // The subsequent PeekMessages call fails to return any messages thus
+ // causing us to enter a tight loop at times.
+ // The WaitMessage call below is a workaround to give the child window
+ // sometime to process its input messages.
+ MSG msg = {0};
+ DWORD queue_status = GetQueueStatus(QS_MOUSE);
+ if (HIWORD(queue_status) & QS_MOUSE &&
+ !PeekMessage(&msg, NULL, WM_MOUSEFIRST, WM_MOUSELAST, PM_NOREMOVE)) {
+ WaitMessage();
+ }
+ return;
+ }
+
+ DCHECK_NE(WAIT_FAILED, result) << GetLastError();
+}
+
+void MessagePumpForUI::HandleWorkMessage() {
// If we are being called outside of the context of Run, then don't try to do
// any work. This could correspond to a MessageBox call or something of that
// sort.
@@ -233,7 +304,7 @@ void MessagePumpWin::HandleWorkMessage() {
ScheduleWork();
}
-void MessagePumpWin::HandleTimerMessage() {
+void MessagePumpForUI::HandleTimerMessage() {
KillTimer(message_hwnd_, reinterpret_cast<UINT_PTR>(this));
// If we are being called outside of the context of Run, then don't do
@@ -249,7 +320,7 @@ void MessagePumpWin::HandleTimerMessage() {
}
}
-bool MessagePumpWin::ProcessNextWindowsMessage() {
+bool MessagePumpForUI::ProcessNextWindowsMessage() {
// If there are sent messages in the queue then PeekMessage internally
// dispatches the message and returns false. We return true in this
// case to ensure that the message loop peeks again instead of calling
@@ -266,7 +337,7 @@ bool MessagePumpWin::ProcessNextWindowsMessage() {
return sent_messages_in_queue;
}
-bool MessagePumpWin::ProcessMessageHelper(const MSG& msg) {
+bool MessagePumpForUI::ProcessMessageHelper(const MSG& msg) {
if (WM_QUIT == msg.message) {
// Repost the QUIT message so that it will be retrieved by the primary
// GetMessage() loop.
@@ -293,7 +364,7 @@ bool MessagePumpWin::ProcessMessageHelper(const MSG& msg) {
return true;
}
-bool MessagePumpWin::ProcessPumpReplacementMessage() {
+bool MessagePumpForUI::ProcessPumpReplacementMessage() {
// When we encounter a kMsgHaveWork message, this method is called to peek
// and process a replacement message, such as a WM_PAINT or WM_TIMER. The
// goal is to make the kMsgHaveWork as non-intrusive as possible, even though
@@ -307,7 +378,7 @@ bool MessagePumpWin::ProcessPumpReplacementMessage() {
bool have_message = (0 != PeekMessage(&msg, NULL, 0, 0, PM_REMOVE));
DCHECK(!have_message || kMsgHaveWork != msg.message ||
msg.hwnd != message_hwnd_);
-
+
// Since we discarded a kMsgHaveWork message, we must update the flag.
InterlockedExchange(&have_work_, 0);
@@ -318,233 +389,63 @@ bool MessagePumpWin::ProcessPumpReplacementMessage() {
return have_message && ProcessMessageHelper(msg);
}
-int MessagePumpWin::GetCurrentDelay() const {
- if (delayed_work_time_.is_null())
- return -1;
-
- // Be careful here. TimeDelta has a precision of microseconds, but we want a
- // value in milliseconds. If there are 5.5ms left, should the delay be 5 or
- // 6? It should be 6 to avoid executing delayed work too early.
- double timeout = ceil((delayed_work_time_ - Time::Now()).InMillisecondsF());
-
- // If this value is negative, then we need to run delayed work soon.
- int delay = static_cast<int>(timeout);
- if (delay < 0)
- delay = 0;
-
- return delay;
-}
-
//-----------------------------------------------------------------------------
-// MessagePumpForUI private:
-
-void MessagePumpForUI::DoRunLoop() {
- // IF this was just a simple PeekMessage() loop (servicing all possible work
- // queues), then Windows would try to achieve the following order according
- // to MSDN documentation about PeekMessage with no filter):
- // * Sent messages
- // * Posted messages
- // * Sent messages (again)
- // * WM_PAINT messages
- // * WM_TIMER messages
- //
- // Summary: none of the above classes is starved, and sent messages has twice
- // the chance of being processed (i.e., reduced service time).
-
- for (;;) {
- // If we do any work, we may create more messages etc., and more work may
- // possibly be waiting in another task group. When we (for example)
- // ProcessNextWindowsMessage(), there is a good chance there are still more
- // messages waiting. On the other hand, when any of these methods return
- // having done no work, then it is pretty unlikely that calling them again
- // quickly will find any work to do. Finally, if they all say they had no
- // work, then it is a good time to consider sleeping (waiting) for more
- // work.
-
- bool more_work_is_plausible = ProcessNextWindowsMessage();
- if (state_->should_quit)
- break;
-
- more_work_is_plausible |= state_->delegate->DoWork();
- if (state_->should_quit)
- break;
-
- more_work_is_plausible |=
- state_->delegate->DoDelayedWork(&delayed_work_time_);
- // If we did not process any delayed work, then we can assume that our
- // existing WM_TIMER if any will fire when delayed work should run. We
- // don't want to disturb that timer if it is already in flight. However,
- // if we did do all remaining delayed work, then lets kill the WM_TIMER.
- if (more_work_is_plausible && delayed_work_time_.is_null())
- KillTimer(message_hwnd_, reinterpret_cast<UINT_PTR>(this));
- if (state_->should_quit)
- break;
-
- if (more_work_is_plausible)
- continue;
-
- more_work_is_plausible = state_->delegate->DoIdleWork();
- if (state_->should_quit)
- break;
-
- if (more_work_is_plausible)
- continue;
+// MessagePumpForIO public:
- WaitForWork(); // Wait (sleep) until we have work to do again.
- }
+MessagePumpForIO::MessagePumpForIO() {
+ port_.Set(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1));
+ DCHECK(port_.IsValid());
}
-void MessagePumpForUI::WaitForWork() {
- // Wait until a message is available, up to the time needed by the timer
- // manager to fire the next set of timers.
- int delay = GetCurrentDelay();
- if (delay < 0) // Negative value means no timers waiting.
- delay = INFINITE;
-
- DWORD result;
- result = MsgWaitForMultipleObjectsEx(0, NULL, delay, QS_ALLINPUT,
- MWMO_INPUTAVAILABLE);
-
- if (WAIT_OBJECT_0 == result) {
- // A WM_* message is available.
- // If a parent child relationship exists between windows across threads
- // then their thread inputs are implicitly attached.
- // This causes the MsgWaitForMultipleObjectsEx API to return indicating
- // that messages are ready for processing (specifically mouse messages
- // intended for the child window. Occurs if the child window has capture)
- // The subsequent PeekMessages call fails to return any messages thus
- // causing us to enter a tight loop at times.
- // The WaitMessage call below is a workaround to give the child window
- // sometime to process its input messages.
- MSG msg = {0};
- DWORD queue_status = GetQueueStatus(QS_MOUSE);
- if (HIWORD(queue_status) & QS_MOUSE &&
- !PeekMessage(&msg, NULL, WM_MOUSEFIRST, WM_MOUSELAST, PM_NOREMOVE)) {
- WaitMessage();
- }
- return;
- }
+void MessagePumpForIO::ScheduleWork() {
+ if (InterlockedExchange(&have_work_, 1))
+ return; // Someone else continued the pumping.
- DCHECK_NE(WAIT_FAILED, result) << GetLastError();
+ // Make sure the MessagePump does some work for us.
+ BOOL ret = PostQueuedCompletionStatus(port_, 0,
+ reinterpret_cast<ULONG_PTR>(this),
+ reinterpret_cast<OVERLAPPED*>(this));
+ DCHECK(ret);
}
-//-----------------------------------------------------------------------------
-// MessagePumpForIO public:
-
-void MessagePumpForIO::WatchObject(HANDLE object, Watcher* watcher) {
- DCHECK(object);
- DCHECK_NE(object, INVALID_HANDLE_VALUE);
-
- std::vector<HANDLE>::iterator it =
- find(objects_.begin(), objects_.end(), object);
- if (watcher) {
- if (it == objects_.end()) {
- static size_t warning_multiple = 1;
- if (objects_.size() >= warning_multiple * MAXIMUM_WAIT_OBJECTS / 2) {
- LOG(INFO) << "More than " << warning_multiple * MAXIMUM_WAIT_OBJECTS / 2
- << " objects being watched";
- // This DCHECK() is an artificial limitation, meant to warn us if we
- // start creating too many objects. It can safely be raised to a higher
- // level, and the program is designed to handle much larger values.
- // Before raising this limit, make sure that there is a very good reason
- // (in your debug testing) to be watching this many objects.
- DCHECK(2 <= warning_multiple);
- ++warning_multiple;
- }
- objects_.push_back(object);
- watchers_.push_back(watcher);
- } else {
- watchers_[it - objects_.begin()] = watcher;
- }
- } else if (it != objects_.end()) {
- std::vector<HANDLE>::difference_type index = it - objects_.begin();
- objects_.erase(it);
- watchers_.erase(watchers_.begin() + index);
- }
+void MessagePumpForIO::ScheduleDelayedWork(const Time& delayed_work_time) {
+ // We know that we can't be blocked right now since this method can only be
+ // called on the same thread as Run, so we only need to update our record of
+ // how long to sleep when we do sleep.
+ delayed_work_time_ = delayed_work_time;
}
void MessagePumpForIO::RegisterIOHandler(HANDLE file_handle,
IOHandler* handler) {
-#if 0
- // TODO(rvargas): This is just to give an idea of what this code will look
- // like when we actually move to completion ports. Of course, we cannot
- // do this without calling GetQueuedCompletionStatus().
ULONG_PTR key = reinterpret_cast<ULONG_PTR>(handler);
HANDLE port = CreateIoCompletionPort(file_handle, port_, key, 1);
- if (!port_.IsValid())
- port_.Set(port);
-#endif
-}
-
-void MessagePumpForIO::RegisterIOContext(OVERLAPPED* context,
- IOHandler* handler) {
- DCHECK(context->hEvent);
- if (handler) {
- HandlerData* watcher = new HandlerData(context, handler);
- WatchObject(context->hEvent, watcher);
- } else {
- std::vector<HANDLE>::iterator it =
- find(objects_.begin(), objects_.end(), context->hEvent);
-
- if (it == objects_.end()) {
- NOTREACHED();
- return;
- }
-
- std::vector<HANDLE>::difference_type index = it - objects_.begin();
- objects_.erase(it);
- delete watchers_[index];
- watchers_.erase(watchers_.begin() + index);
- }
+ DCHECK(port == port_.Get());
}
//-----------------------------------------------------------------------------
// MessagePumpForIO private:
void MessagePumpForIO::DoRunLoop() {
- // IF this was just a simple PeekMessage() loop (servicing all possible work
- // queues), then Windows would try to achieve the following order according
- // to MSDN documentation about PeekMessage with no filter):
- // * Sent messages
- // * Posted messages
- // * Sent messages (again)
- // * WM_PAINT messages
- // * WM_TIMER messages
- //
- // Summary: none of the above classes is starved, and sent messages has twice
- // the chance of being processed (i.e., reduced service time).
-
for (;;) {
// If we do any work, we may create more messages etc., and more work may
// possibly be waiting in another task group. When we (for example)
- // ProcessNextWindowsMessage(), there is a good chance there are still more
- // messages waiting (same thing for ProcessNextObject(), which responds to
- // only one signaled object; etc.). On the other hand, when any of these
- // methods return having done no work, then it is pretty unlikely that
- // calling them again quickly will find any work to do. Finally, if they
- // all say they had no work, then it is a good time to consider sleeping
- // (waiting) for more work.
-
- bool more_work_is_plausible = ProcessNextWindowsMessage();
- if (state_->should_quit)
- break;
+ // WaitForIOCompletion(), there is a good chance there are still more
+ // messages waiting. On the other hand, when any of these methods return
+ // having done no work, then it is pretty unlikely that calling them
+ // again quickly will find any work to do. Finally, if they all say they
+ // had no work, then it is a good time to consider sleeping (waiting) for
+ // more work.
- more_work_is_plausible |= state_->delegate->DoWork();
+ bool more_work_is_plausible = state_->delegate->DoWork();
if (state_->should_quit)
break;
- more_work_is_plausible |= ProcessNextObject();
+ more_work_is_plausible |= WaitForIOCompletion(0, NULL);
if (state_->should_quit)
break;
more_work_is_plausible |=
state_->delegate->DoDelayedWork(&delayed_work_time_);
- // If we did not process any delayed work, then we can assume that our
- // existing WM_TIMER if any will fire when delayed work should run. We
- // don't want to disturb that timer if it is already in flight. However,
- // if we did do all remaining delayed work, then lets kill the WM_TIMER.
- if (more_work_is_plausible && delayed_work_time_.is_null())
- KillTimer(message_hwnd_, reinterpret_cast<UINT_PTR>(this));
if (state_->should_quit)
break;
@@ -558,141 +459,92 @@ void MessagePumpForIO::DoRunLoop() {
if (more_work_is_plausible)
continue;
- // We service APCs in WaitForWork, without returning.
WaitForWork(); // Wait (sleep) until we have work to do again.
}
}
-// If we handle more than the OS limit on the number of objects that can be
-// waited for, we'll need to poll (sequencing through subsets of the objects
-// that can be passed in a single OS wait call). The following is the polling
-// interval used in that (unusual) case. (I don't have a lot of justifcation
-// for the specific value, but it needed to be short enough that it would not
-// add a lot of latency, and long enough that we wouldn't thrash the CPU for no
-// reason... especially considering the silly user probably has a million tabs
-// open, etc.)
-static const int kMultipleWaitPollingInterval = 20;
-
+// Wait until IO completes, up to the time needed by the timer manager to fire
+// the next set of timers.
void MessagePumpForIO::WaitForWork() {
- // Wait until either an object is signaled or a message is available. Handle
- // (without returning) any APCs (only the IO thread currently has APCs.)
-
- // We do not support nested message loops when we have watched objects. This
- // is to avoid messy recursion problems.
- DCHECK(objects_.empty() || state_->run_depth == 1) <<
- "Cannot nest a message loop when there are watched objects!";
+ // We do not support nested IO message loops. This is to avoid messy
+ // recursion problems.
+ DCHECK(state_->run_depth == 1) << "Cannot nest an IO message loop!";
- int wait_flags = MWMO_ALERTABLE | MWMO_INPUTAVAILABLE;
+ int timeout = GetCurrentDelay();
+ if (timeout < 0) // Negative value means no timers waiting.
+ timeout = INFINITE;
- bool use_polling = false; // Poll if too many objects for one OS Wait call.
- for (;;) {
- // Do initialization here, in case APC modifies object list.
- size_t total_objs = objects_.size();
-
- int delay;
- size_t polling_index = 0; // The first unprocessed object index.
- do {
- size_t objs_len =
- (polling_index < total_objs) ? total_objs - polling_index : 0;
- if (objs_len >= MAXIMUM_WAIT_OBJECTS) {
- objs_len = MAXIMUM_WAIT_OBJECTS - 1;
- use_polling = true;
- }
- HANDLE* objs = objs_len ? polling_index + &objects_.front() : NULL;
-
- // Only wait up to the time needed by the timer manager to fire the next
- // set of timers.
- delay = GetCurrentDelay();
- if (use_polling && delay > kMultipleWaitPollingInterval)
- delay = kMultipleWaitPollingInterval;
- if (delay < 0) // Negative value means no timers waiting.
- delay = INFINITE;
-
- DWORD result;
- result = MsgWaitForMultipleObjectsEx(static_cast<DWORD>(objs_len), objs,
- delay, QS_ALLINPUT, wait_flags);
-
- if (WAIT_IO_COMPLETION == result) {
- // We'll loop here when we service an APC. At it currently stands,
- // *ONLY* the IO thread uses *any* APCs, so this should have no impact
- // on the UI thread.
- break; // Break to outer loop, and waitforwork() again.
- }
-
- // Use unsigned type to simplify range detection;
- size_t signaled_index = result - WAIT_OBJECT_0;
- if (signaled_index < objs_len) {
- SignalWatcher(polling_index + signaled_index);
- return; // We serviced a signaled object.
- }
-
- if (objs_len == signaled_index)
- return; // A WM_* message is available.
-
- DCHECK_NE(WAIT_FAILED, result) << GetLastError();
-
- DCHECK(!objs || result == WAIT_TIMEOUT);
- if (!use_polling)
- return;
- polling_index += objs_len;
- } while (polling_index < total_objs);
- // For compatibility, we didn't return sooner. This made us do *some* wait
- // call(s) before returning. This will probably change in next rev.
- if (!delay || !GetCurrentDelay())
- return; // No work done, but timer is ready to fire.
- }
+ WaitForIOCompletion(timeout, NULL);
}
-bool MessagePumpForIO::ProcessNextObject() {
- size_t total_objs = objects_.size();
- if (!total_objs) {
- return false;
+bool MessagePumpForIO::WaitForIOCompletion(DWORD timeout, IOHandler* filter) {
+ IOItem item;
+ if (completed_io_.empty() || !MatchCompletedIOItem(filter, &item)) {
+ // We have to ask the system for another IO completion.
+ if (!GetIOItem(timeout, &item))
+ return false;
+
+ if (ProcessInternalIOItem(item))
+ return true;
}
- size_t polling_index = 0; // The first unprocessed object index.
- do {
- DCHECK(polling_index < total_objs);
- size_t objs_len = total_objs - polling_index;
- if (objs_len >= kMaxWaitObjects)
- objs_len = kMaxWaitObjects - 1;
- HANDLE* objs = polling_index + &objects_.front();
-
- // Identify 1 pending object, or allow an IO APC to be completed.
- DWORD result = WaitForMultipleObjectsEx(static_cast<DWORD>(objs_len), objs,
- FALSE, // 1 signal is sufficient.
- 0, // Wait 0ms.
- false); // Not alertable (no APC).
-
- // Use unsigned type to simplify range detection;
- size_t signaled_index = result - WAIT_OBJECT_0;
- if (signaled_index < objs_len) {
- SignalWatcher(polling_index + signaled_index);
- return true; // We serviced a signaled object.
+ if (item.context->handler) {
+ if (filter && item.handler != filter) {
+ // Save this item for later
+ completed_io_.push_back(item);
+ } else {
+ DCHECK(item.context->handler == item.handler);
+ item.handler->OnIOCompleted(item.context, item.bytes_transfered,
+ item.error);
}
-
- // If an handle is invalid, it will be WAIT_FAILED.
- DCHECK_EQ(WAIT_TIMEOUT, result) << GetLastError();
- polling_index += objs_len;
- } while (polling_index < total_objs);
- return false; // We serviced nothing.
+ } else {
+ // The handler must be gone by now, just cleanup the mess.
+ delete item.context;
+ }
+ return true;
}
-bool MessagePumpForIO::SignalWatcher(size_t object_index) {
- // Signal the watcher corresponding to the given index.
-
- DCHECK(objects_.size() > object_index);
+// Asks the OS for another IO completion result.
+bool MessagePumpForIO::GetIOItem(DWORD timeout, IOItem* item) {
+ memset(item, 0, sizeof(*item));
+ ULONG_PTR key = NULL;
+ OVERLAPPED* overlapped = NULL;
+ if (!GetQueuedCompletionStatus(port_.Get(), &item->bytes_transfered, &key,
+ &overlapped, timeout)) {
+ if (!overlapped)
+ return false; // Nothing in the queue.
+ item->error = GetLastError();
+ item->bytes_transfered = 0;
+ }
- // On reception of OnObjectSignaled() to a Watcher object, it may call
- // WatchObject(). watchers_ and objects_ will be modified. This is expected,
- // so don't be afraid if, while tracing a OnObjectSignaled() function, the
- // corresponding watchers_[result] is non-existant.
- watchers_[object_index]->OnObjectSignaled(objects_[object_index]);
+ item->handler = reinterpret_cast<IOHandler*>(key);
+ item->context = reinterpret_cast<IOContext*>(overlapped);
+ return true;
+}
- // Signaled objects tend to be removed from the watch list, and then added
- // back (appended). As a result, they move to the end of the objects_ array,
- // and this should make their service "fair" (no HANDLEs should be starved).
+bool MessagePumpForIO::ProcessInternalIOItem(const IOItem& item) {
+ if (this == reinterpret_cast<MessagePumpForIO*>(item.context) &&
+ this == reinterpret_cast<MessagePumpForIO*>(item.handler)) {
+ // This is our internal completion.
+ DCHECK(!item.bytes_transfered);
+ InterlockedExchange(&have_work_, 0);
+ return true;
+ }
+ return false;
+}
- return true;
+// Returns a completion item that was previously received.
+bool MessagePumpForIO::MatchCompletedIOItem(IOHandler* filter, IOItem* item) {
+ DCHECK(!completed_io_.empty());
+ for (std::list<IOItem>::iterator it = completed_io_.begin();
+ it != completed_io_.end(); ++it) {
+ if (!filter || it->handler == filter) {
+ *item = *it;
+ completed_io_.erase(it);
+ return true;
+ }
+ }
+ return false;
}
} // namespace base
diff --git a/base/message_pump_win.h b/base/message_pump_win.h
index eb4253f..63222a8 100644
--- a/base/message_pump_win.h
+++ b/base/message_pump_win.h
@@ -5,10 +5,10 @@
#ifndef BASE_MESSAGE_PUMP_WIN_H_
#define BASE_MESSAGE_PUMP_WIN_H_
-#include <vector>
-
#include <windows.h>
+#include <list>
+
#include "base/lock.h"
#include "base/message_pump.h"
#include "base/observer_list.h"
@@ -17,50 +17,9 @@
namespace base {
-// MessagePumpWin implements a "traditional" Windows message pump. It contains
-// a nearly infinite loop that peeks out messages, and then dispatches them.
-// Intermixed with those peeks are callouts to DoWork for pending tasks,
-// DoDelayedWork for pending timers, and OnObjectSignaled for signaled objects.
-// When there are no events to be serviced, this pump goes into a wait state.
-// In most cases, this message pump handles all processing.
-//
-// However, when a task, or windows event, invokes on the stack a native dialog
-// box or such, that window typically provides a bare bones (native?) message
-// pump. That bare-bones message pump generally supports little more than a
-// peek of the Windows message queue, followed by a dispatch of the peeked
-// message. MessageLoop extends that bare-bones message pump to also service
-// Tasks, at the cost of some complexity.
-//
-// The basic structure of the extension (refered to as a sub-pump) is that a
-// special message, kMsgHaveWork, is repeatedly injected into the Windows
-// Message queue. Each time the kMsgHaveWork message is peeked, checks are
-// made for an extended set of events, including the availability of Tasks to
-// run.
-//
-// After running a task, the special message kMsgHaveWork is again posted to
-// the Windows Message queue, ensuring a future time slice for processing a
-// future event. To prevent flooding the Windows Message queue, care is taken
-// to be sure that at most one kMsgHaveWork message is EVER pending in the
-// Window's Message queue.
-//
-// There are a few additional complexities in this system where, when there are
-// no Tasks to run, this otherwise infinite stream of messages which drives the
-// sub-pump is halted. The pump is automatically re-started when Tasks are
-// queued.
-//
-// A second complexity is that the presence of this stream of posted tasks may
-// prevent a bare-bones message pump from ever peeking a WM_PAINT or WM_TIMER.
-// Such paint and timer events always give priority to a posted message, such as
-// kMsgHaveWork messages. As a result, care is taken to do some peeking in
-// between the posting of each kMsgHaveWork message (i.e., after kMsgHaveWork
-// is peeked, and before a replacement kMsgHaveWork is posted).
-//
-// NOTE: Although it may seem odd that messages are used to start and stop this
-// flow (as opposed to signaling objects, etc.), it should be understood that
-// the native message pump will *only* respond to messages. As a result, it is
-// an excellent choice. It is also helpful that the starter messages that are
-// placed in the queue when new task arrive also awakens DoRunLoop.
-//
+// MessagePumpWin serves as the base for specialized versions of the MessagePump
+// for Windows. It provides basic functionality like handling of observers and
+// controlling the lifetime of the message pump.
class MessagePumpWin : public MessagePump {
public:
// An Observer is an object that receives global notifications from the
@@ -97,8 +56,8 @@ class MessagePumpWin : public MessagePump {
virtual bool Dispatch(const MSG& msg) = 0;
};
- MessagePumpWin();
- virtual ~MessagePumpWin();
+ MessagePumpWin() : have_work_(0), state_(NULL) {}
+ virtual ~MessagePumpWin() {}
// Add an Observer, which will start receiving notifications immediately.
void AddObserver(Observer* observer);
@@ -112,19 +71,12 @@ class MessagePumpWin : public MessagePump {
void WillProcessMessage(const MSG& msg);
void DidProcessMessage(const MSG& msg);
- // Applications can call this to encourage us to process all pending WM_PAINT
- // messages. This method will process all paint messages the Windows Message
- // queue can provide, up to some fixed number (to avoid any infinite loops).
- void PumpOutPendingPaintMessages();
-
// Like MessagePump::Run, but MSG objects are routed through dispatcher.
void RunWithDispatcher(Delegate* delegate, Dispatcher* dispatcher);
// MessagePump methods:
virtual void Run(Delegate* delegate) { RunWithDispatcher(delegate, NULL); }
virtual void Quit();
- virtual void ScheduleWork();
- virtual void ScheduleDelayedWork(const Time& delayed_work_time);
protected:
struct RunState {
@@ -138,20 +90,9 @@ class MessagePumpWin : public MessagePump {
int run_depth;
};
- static LRESULT CALLBACK WndProcThunk(
- HWND hwnd, UINT message, WPARAM wparam, LPARAM lparam);
virtual void DoRunLoop() = 0;
- void InitMessageWnd();
- void HandleWorkMessage();
- void HandleTimerMessage();
- bool ProcessNextWindowsMessage();
- bool ProcessMessageHelper(const MSG& msg);
- bool ProcessPumpReplacementMessage();
int GetCurrentDelay() const;
- // A hidden message-only window.
- HWND message_hwnd_;
-
ObserverList<Observer> observers_;
// The time at which delayed work should run.
@@ -170,33 +111,164 @@ class MessagePumpWin : public MessagePump {
// MessagePumpForUI extends MessagePumpWin with methods that are particular to a
// MessageLoop instantiated with TYPE_UI.
//
+// MessagePumpForUI implements a "traditional" Windows message pump. It contains
+// a nearly infinite loop that peeks out messages, and then dispatches them.
+// Intermixed with those peeks are callouts to DoWork for pending tasks, and
+// DoDelayedWork for pending timers. When there are no events to be serviced,
+// this pump goes into a wait state. In most cases, this message pump handles
+// all processing.
+//
+// However, when a task, or windows event, invokes on the stack a native dialog
+// box or such, that window typically provides a bare bones (native?) message
+// pump. That bare-bones message pump generally supports little more than a
+// peek of the Windows message queue, followed by a dispatch of the peeked
+// message. MessageLoop extends that bare-bones message pump to also service
+// Tasks, at the cost of some complexity.
+//
+// The basic structure of the extension (refered to as a sub-pump) is that a
+// special message, kMsgHaveWork, is repeatedly injected into the Windows
+// Message queue. Each time the kMsgHaveWork message is peeked, checks are
+// made for an extended set of events, including the availability of Tasks to
+// run.
+//
+// After running a task, the special message kMsgHaveWork is again posted to
+// the Windows Message queue, ensuring a future time slice for processing a
+// future event. To prevent flooding the Windows Message queue, care is taken
+// to be sure that at most one kMsgHaveWork message is EVER pending in the
+// Window's Message queue.
+//
+// There are a few additional complexities in this system where, when there are
+// no Tasks to run, this otherwise infinite stream of messages which drives the
+// sub-pump is halted. The pump is automatically re-started when Tasks are
+// queued.
+//
+// A second complexity is that the presence of this stream of posted tasks may
+// prevent a bare-bones message pump from ever peeking a WM_PAINT or WM_TIMER.
+// Such paint and timer events always give priority to a posted message, such as
+// kMsgHaveWork messages. As a result, care is taken to do some peeking in
+// between the posting of each kMsgHaveWork message (i.e., after kMsgHaveWork
+// is peeked, and before a replacement kMsgHaveWork is posted).
+//
+// NOTE: Although it may seem odd that messages are used to start and stop this
+// flow (as opposed to signaling objects, etc.), it should be understood that
+// the native message pump will *only* respond to messages. As a result, it is
+// an excellent choice. It is also helpful that the starter messages that are
+// placed in the queue when new task arrive also awakens DoRunLoop.
+//
class MessagePumpForUI : public MessagePumpWin {
public:
- MessagePumpForUI() {}
- virtual ~MessagePumpForUI() {}
+ MessagePumpForUI();
+ virtual ~MessagePumpForUI();
+
+ // MessagePump methods:
+ virtual void ScheduleWork();
+ virtual void ScheduleDelayedWork(const Time& delayed_work_time);
+
+ // Applications can call this to encourage us to process all pending WM_PAINT
+ // messages. This method will process all paint messages the Windows Message
+ // queue can provide, up to some fixed number (to avoid any infinite loops).
+ void PumpOutPendingPaintMessages();
+
private:
+ static LRESULT CALLBACK WndProcThunk(
+ HWND hwnd, UINT message, WPARAM wparam, LPARAM lparam);
virtual void DoRunLoop();
+ void InitMessageWnd();
void WaitForWork();
+ void HandleWorkMessage();
+ void HandleTimerMessage();
+ bool ProcessNextWindowsMessage();
+ bool ProcessMessageHelper(const MSG& msg);
+ bool ProcessPumpReplacementMessage();
+
+ // A hidden message-only window.
+ HWND message_hwnd_;
};
//-----------------------------------------------------------------------------
// MessagePumpForIO extends MessagePumpWin with methods that are particular to a
-// MessageLoop instantiated with TYPE_IO.
+// MessageLoop instantiated with TYPE_IO. This version of MessagePump does not
+// deal with Windows mesagges, and instead has a Run loop based on Completion
+// Ports so it is better suited for IO operations.
//
class MessagePumpForIO : public MessagePumpWin {
public:
- // Used with WatchObject to asynchronously monitor the signaled state of a
- // HANDLE object.
- class Watcher {
- public:
- virtual ~Watcher() {}
- // Called from MessageLoop::Run when a signalled object is detected.
- virtual void OnObjectSignaled(HANDLE object) = 0;
- };
+ struct IOContext;
// Clients interested in receiving OS notifications when asynchronous IO
// operations complete should implement this interface and register themselves
// with the message pump.
+ //
+ // Typical use #1:
+ // // Use only when there are no user's buffers involved on the actual IO,
+ // // so that all the cleanup can be done by the message pump.
+ // class MyFile : public IOHandler {
+ // MyFile() {
+ // ...
+ // context_ = new IOContext;
+ // context_->handler = this;
+ // message_pump->RegisterIOHandler(file_, this);
+ // }
+ // ~MyFile() {
+ // if (pending_) {
+ // // By setting the handler to NULL, we're asking for this context
+ // // to be deleted when received, without calling back to us.
+ // context_->handler = NULL;
+ // } else {
+ // delete context_;
+ // }
+ // }
+ // virtual void OnIOCompleted(IOContext* context, DWORD bytes_transfered,
+ // DWORD error) {
+ // pending_ = false;
+ // }
+ // void DoSomeIo() {
+ // ...
+ // // The only buffer required for this operation is the overlapped
+ // // structure.
+ // ConnectNamedPipe(file_, &context_->overlapped);
+ // pending_ = true;
+ // }
+ // bool pending_;
+ // IOContext* context_;
+ // HANDLE file_;
+ // };
+ //
+ // Typical use #2:
+ // class MyFile : public IOHandler {
+ // MyFile() {
+ // ...
+ // message_pump->RegisterIOHandler(file_, this);
+ // }
+ // // Plus some code to make sure that this destructor is not called
+ // // while there are pending IO operations.
+ // ~MyFile() {
+ // }
+ // virtual void OnIOCompleted(IOContext* context, DWORD bytes_transfered,
+ // DWORD error) {
+ // ...
+ // delete context;
+ // }
+ // void DoSomeIo() {
+ // ...
+ // IOContext* context = new IOContext;
+ // // This is not used for anything. It just prevents the context from
+ // // being considered "abandoned".
+ // context->handler = this;
+ // ReadFile(file_, buffer, num_bytes, &read, &context->overlapped);
+ // }
+ // HANDLE file_;
+ // };
+ //
+ // Typical use #3:
+ // Same as the previous example, except that in order to deal with the
+ // requirement stated for the destructor, the class calls WaitForIOCompletion
+ // from the destructor to block until all IO finishes.
+ // ~MyFile() {
+ // while(pending_)
+ // message_pump->WaitForIOCompletion(INFINITE, this);
+ // }
+ //
class IOHandler {
public:
virtual ~IOHandler() {}
@@ -204,46 +276,66 @@ class MessagePumpForIO : public MessagePumpWin {
// |context| completes. |error| is the Win32 error code of the IO operation
// (ERROR_SUCCESS if there was no error). |bytes_transfered| will be zero
// on error.
- virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered,
+ virtual void OnIOCompleted(IOContext* context, DWORD bytes_transfered,
DWORD error) = 0;
};
- MessagePumpForIO() {}
+ // The extended context that should be used as the base structure on every
+ // overlapped IO operation. |handler| must be set to the registered IOHandler
+ // for the given file when the operation is started, and it can be set to NULL
+ // before the operation completes to indicate that the handler should not be
+ // called anymore, and instead, the IOContext should be deleted when the OS
+ // notifies the completion of this operation. Please remember that any buffers
+ // involved with an IO operation should be around until the callback is
+ // received, so this technique can only be used for IO that do not involve
+ // additional buffers (other than the overlapped structure itself).
+ struct IOContext {
+ OVERLAPPED overlapped;
+ IOHandler* handler;
+ };
+
+ MessagePumpForIO();
virtual ~MessagePumpForIO() {}
- // Have the current thread's message loop watch for a signaled object.
- // Pass a null watcher to stop watching the object.
- void WatchObject(HANDLE, Watcher*);
+ // MessagePump methods:
+ virtual void ScheduleWork();
+ virtual void ScheduleDelayedWork(const Time& delayed_work_time);
// Register the handler to be used when asynchronous IO for the given file
// completes. The registration persists as long as |file_handle| is valid, so
// |handler| must be valid as long as there is pending IO for the given file.
void RegisterIOHandler(HANDLE file_handle, IOHandler* handler);
- // This is just a throw away function to ease transition to completion ports.
- // Pass NULL for handler to stop tracking this request. WARNING: cancellation
- // correctness is the responsibility of the caller. |context| must contain a
- // valid manual reset event, but the caller should not interact directly with
- // it. The registration can live across a single IO operation, or it can live
- // across multiple IO operations without having to reset it after each IO
- // completion callback. Internally, there will be a WatchObject registration
- // alive as long as this context registration is in effect. It is an error
- // to unregister a context that has not been registered before.
- void RegisterIOContext(OVERLAPPED* context, IOHandler* handler);
+ // Waits for the next IO completion that should be processed by |filter|, for
+ // up to |timeout| milliseconds. Return true if any IO operation completed,
+ // regardless of the involved handler, and false if the timeout expired. If
+ // the completion port received any message and the involved IO handler
+ // matches |filter|, the callback is called before returning from this code;
+ // if the handler is not the one that we are looking for, the callback will
+ // be postponed for another time, so reentrancy problems can be avoided.
+ // External use of this method should be reserved for the rare case when the
+ // caller is willing to allow pausing regular task dispatching on this thread.
+ bool WaitForIOCompletion(DWORD timeout, IOHandler* filter);
private:
+ struct IOItem {
+ IOHandler* handler;
+ IOContext* context;
+ DWORD bytes_transfered;
+ DWORD error;
+ };
+
virtual void DoRunLoop();
void WaitForWork();
- bool ProcessNextObject();
- bool SignalWatcher(size_t object_index);
-
- // A vector of objects (and corresponding watchers) that are routinely
- // serviced by this message pump.
- std::vector<HANDLE> objects_;
- std::vector<Watcher*> watchers_;
+ bool MatchCompletedIOItem(IOHandler* filter, IOItem* item);
+ bool GetIOItem(DWORD timeout, IOItem* item);
+ bool ProcessInternalIOItem(const IOItem& item);
// The completion port associated with this thread.
ScopedHandle port_;
+ // This list will be empty almost always. It stores IO completions that have
+ // not been delivered yet because somebody was doing cleanup.
+ std::list<IOItem> completed_io_;
};
} // namespace base
diff --git a/chrome/common/ipc_channel.cc b/chrome/common/ipc_channel.cc
index a9d73e7..934ea9b 100644
--- a/chrome/common/ipc_channel.cc
+++ b/chrome/common/ipc_channel.cc
@@ -20,24 +20,21 @@ namespace IPC {
//------------------------------------------------------------------------------
-Channel::State::State()
- : is_pending(false) {
- memset(&overlapped, 0, sizeof(overlapped));
- overlapped.hEvent = CreateEvent(NULL, // default security attributes
- TRUE, // manual-reset event
- TRUE, // initial state = signaled
- NULL); // unnamed event object
+Channel::State::State(Channel* channel) : is_pending(false) {
+ memset(&context.overlapped, 0, sizeof(context.overlapped));
+ context.handler = channel;
}
Channel::State::~State() {
- if (overlapped.hEvent)
- CloseHandle(overlapped.hEvent);
+ COMPILE_ASSERT(!offsetof(Channel::State, context), starts_with_io_context);
}
//------------------------------------------------------------------------------
Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener)
- : pipe_(INVALID_HANDLE_VALUE),
+ : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)),
+ ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)),
+ pipe_(INVALID_HANDLE_VALUE),
listener_(listener),
waiting_connect_(mode == MODE_SERVER),
processing_incoming_(false),
@@ -50,23 +47,29 @@ Channel::Channel(const wstring& channel_id, Mode mode, Listener* listener)
}
void Channel::Close() {
- // make sure we are no longer watching the pipe events
- MessageLoopForIO* loop = MessageLoopForIO::current();
- if (input_state_.is_pending) {
- input_state_.is_pending = false;
- loop->RegisterIOContext(&input_state_.overlapped, NULL);
- }
-
- if (output_state_.is_pending) {
- output_state_.is_pending = false;
- loop->RegisterIOContext(&output_state_.overlapped, NULL);
+ bool waited = false;
+ if (input_state_.is_pending || output_state_.is_pending) {
+ CancelIo(pipe_);
+ waited = true;
}
+ // Closing the handle at this point prevents us from issuing more requests
+ // form OnIOCompleted().
if (pipe_ != INVALID_HANDLE_VALUE) {
CloseHandle(pipe_);
pipe_ = INVALID_HANDLE_VALUE;
}
+ // Make sure all IO has completed.
+ base::Time start = base::Time::Now();
+ while (input_state_.is_pending || output_state_.is_pending) {
+ MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this);
+ }
+ if (waited) {
+ // We want to see if we block the message loop for too long.
+ UMA_HISTOGRAM_TIMES(L"AsyncIO.IPCChannelClose", base::Time::Now() - start);
+ }
+
while (!output_queue_.empty()) {
Message* m = output_queue_.front();
output_queue_.pop();
@@ -175,7 +178,7 @@ bool Channel::Connect() {
// to true, we indicate to OnIOCompleted that this is the special
// initialization signal.
MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod(
- &Channel::OnIOCompleted, &input_state_.overlapped, 0, 0));
+ &Channel::OnIOCompleted, &input_state_.context, 0, 0));
}
if (!waiting_connect_)
@@ -184,15 +187,14 @@ bool Channel::Connect() {
}
bool Channel::ProcessConnection() {
- if (input_state_.is_pending) {
+ if (input_state_.is_pending)
input_state_.is_pending = false;
- MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped,
- NULL);
- }
// Do we have a client connected to our pipe?
- DCHECK(pipe_ != INVALID_HANDLE_VALUE);
- BOOL ok = ConnectNamedPipe(pipe_, &input_state_.overlapped);
+ if (INVALID_HANDLE_VALUE == pipe_)
+ return false;
+
+ BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped);
DWORD err = GetLastError();
if (ok) {
@@ -205,8 +207,6 @@ bool Channel::ProcessConnection() {
switch (err) {
case ERROR_IO_PENDING:
input_state_.is_pending = true;
- MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped,
- this);
break;
case ERROR_PIPE_CONNECTED:
waiting_connect_ = false;
@@ -219,40 +219,41 @@ bool Channel::ProcessConnection() {
return true;
}
-bool Channel::ProcessIncomingMessages(OVERLAPPED* context,
+bool Channel::ProcessIncomingMessages(MessageLoopForIO::IOContext* context,
DWORD bytes_read) {
if (input_state_.is_pending) {
input_state_.is_pending = false;
DCHECK(context);
- MessageLoopForIO::current()->RegisterIOContext(&input_state_.overlapped,
- NULL);
if (!context || !bytes_read)
return false;
} else {
// This happens at channel initialization.
- DCHECK(!bytes_read && context == &input_state_.overlapped);
+ DCHECK(!bytes_read && context == &input_state_.context);
}
for (;;) {
if (bytes_read == 0) {
+ if (INVALID_HANDLE_VALUE == pipe_)
+ return false;
+
// Read from pipe...
BOOL ok = ReadFile(pipe_,
input_buf_,
BUF_SIZE,
&bytes_read,
- &input_state_.overlapped);
+ &input_state_.context.overlapped);
if (!ok) {
DWORD err = GetLastError();
if (err == ERROR_IO_PENDING) {
- MessageLoopForIO::current()->RegisterIOContext(
- &input_state_.overlapped, this);
input_state_.is_pending = true;
return true;
}
LOG(ERROR) << "pipe error: " << err;
return false;
}
+ input_state_.is_pending = true;
+ return true;
}
DCHECK(bytes_read);
@@ -303,15 +304,13 @@ bool Channel::ProcessIncomingMessages(OVERLAPPED* context,
return true;
}
-bool Channel::ProcessOutgoingMessages(OVERLAPPED* context,
+bool Channel::ProcessOutgoingMessages(MessageLoopForIO::IOContext* context,
DWORD bytes_written) {
DCHECK(!waiting_connect_); // Why are we trying to send messages if there's
// no connection?
if (output_state_.is_pending) {
DCHECK(context);
- MessageLoopForIO::current()->RegisterIOContext(&output_state_.overlapped,
- NULL);
output_state_.is_pending = false;
if (!context || bytes_written == 0) {
DWORD err = GetLastError();
@@ -325,42 +324,41 @@ bool Channel::ProcessOutgoingMessages(OVERLAPPED* context,
delete m;
}
- while (!output_queue_.empty()) {
- // Write to pipe...
- Message* m = output_queue_.front();
- BOOL ok = WriteFile(pipe_,
- m->data(),
- m->size(),
- &bytes_written,
- &output_state_.overlapped);
- if (!ok) {
- DWORD err = GetLastError();
- if (err == ERROR_IO_PENDING) {
- MessageLoopForIO::current()->RegisterIOContext(
- &output_state_.overlapped, this);
- output_state_.is_pending = true;
+ if (output_queue_.empty())
+ return true;
+
+ if (INVALID_HANDLE_VALUE == pipe_)
+ return false;
+
+ // Write to pipe...
+ Message* m = output_queue_.front();
+ BOOL ok = WriteFile(pipe_,
+ m->data(),
+ m->size(),
+ &bytes_written,
+ &output_state_.context.overlapped);
+ if (!ok) {
+ DWORD err = GetLastError();
+ if (err == ERROR_IO_PENDING) {
+ output_state_.is_pending = true;
#ifdef IPC_MESSAGE_DEBUG_EXTRA
- DLOG(INFO) << "sent pending message @" << m << " on channel @" <<
- this << " with type " << m->type();
+ DLOG(INFO) << "sent pending message @" << m << " on channel @" <<
+ this << " with type " << m->type();
#endif
- return true;
- }
- LOG(ERROR) << "pipe error: " << err;
- return false;
+ return true;
}
- DCHECK(bytes_written == m->size());
- output_queue_.pop();
+ LOG(ERROR) << "pipe error: " << err;
+ return false;
+ }
#ifdef IPC_MESSAGE_DEBUG_EXTRA
- DLOG(INFO) << "sent message @" << m << " on channel @" << this <<
- " with type " << m->type();
+ DLOG(INFO) << "sent message @" << m << " on channel @" << this <<
+ " with type " << m->type();
#endif
- delete m;
- }
-
+ output_state_.is_pending = true;
return true;
}
@@ -400,12 +398,13 @@ bool Channel::ProcessPendingMessages(DWORD max_wait_msec) {
#endif
}
-void Channel::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered,
- DWORD error) {
+void Channel::OnIOCompleted(MessageLoopForIO::IOContext* context,
+ DWORD bytes_transfered, DWORD error) {
bool ok;
- if (context == &input_state_.overlapped) {
+ if (context == &input_state_.context) {
if (waiting_connect_) {
- ProcessConnection();
+ if (!ProcessConnection())
+ return;
// We may have some messages queued up to send...
if (!output_queue_.empty() && !output_state_.is_pending)
ProcessOutgoingMessages(NULL, 0);
@@ -419,10 +418,11 @@ void Channel::OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered,
ok = ProcessIncomingMessages(context, bytes_transfered);
processing_incoming_ = false;
} else {
- DCHECK(context == &output_state_.overlapped);
+ DCHECK(context == &output_state_.context);
ok = ProcessOutgoingMessages(context, bytes_transfered);
}
- if (!ok) {
+ if (!ok && INVALID_HANDLE_VALUE != pipe_) {
+ // We don't want to re-enter Close().
Close();
listener_->OnChannelError();
}
diff --git a/chrome/common/ipc_channel.h b/chrome/common/ipc_channel.h
index e3e4ca2..b69f962 100644
--- a/chrome/common/ipc_channel.h
+++ b/chrome/common/ipc_channel.h
@@ -99,12 +99,14 @@ class Channel : public MessageLoopForIO::IOHandler,
const std::wstring PipeName(const std::wstring& channel_id) const;
bool CreatePipe(const std::wstring& channel_id, Mode mode);
bool ProcessConnection();
- bool ProcessIncomingMessages(OVERLAPPED* context, DWORD bytes_read);
- bool ProcessOutgoingMessages(OVERLAPPED* context, DWORD bytes_written);
+ bool ProcessIncomingMessages(MessageLoopForIO::IOContext* context,
+ DWORD bytes_read);
+ bool ProcessOutgoingMessages(MessageLoopForIO::IOContext* context,
+ DWORD bytes_written);
// MessageLoop::IOHandler implementation.
- virtual void OnIOCompleted(OVERLAPPED* context, DWORD bytes_transfered,
- DWORD error);
+ virtual void OnIOCompleted(MessageLoopForIO::IOContext* context,
+ DWORD bytes_transfered, DWORD error);
private:
enum {
@@ -112,9 +114,9 @@ class Channel : public MessageLoopForIO::IOHandler,
};
struct State {
- State();
+ explicit State(Channel* channel);
~State();
- OVERLAPPED overlapped;
+ MessageLoopForIO::IOContext context;
bool is_pending;
};
diff --git a/chrome/common/ipc_sync_channel_unittest.cc b/chrome/common/ipc_sync_channel_unittest.cc
index 1d28b06..749a507 100644
--- a/chrome/common/ipc_sync_channel_unittest.cc
+++ b/chrome/common/ipc_sync_channel_unittest.cc
@@ -104,7 +104,7 @@ class Worker : public Channel::Listener, public Message::Sender {
channel_->Close();
}
void Start() {
- StartThread(&listener_thread_);
+ StartThread(&listener_thread_, MessageLoop::TYPE_DEFAULT);
ListenerThread()->message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
this, &Worker::OnStart));
}
@@ -149,7 +149,7 @@ class Worker : public Channel::Listener, public Message::Sender {
// Called on the listener thread to create the sync channel.
void OnStart() {
// Link ipc_thread_, listener_thread_ and channel_ altogether.
- StartThread(&ipc_thread_);
+ StartThread(&ipc_thread_, MessageLoop::TYPE_IO);
channel_.reset(new SyncChannel(
channel_name_, mode_, this, NULL, ipc_thread_.message_loop(), true,
TestProcess::GetShutDownEvent()));
@@ -178,9 +178,9 @@ class Worker : public Channel::Listener, public Message::Sender {
IPC_END_MESSAGE_MAP()
}
- void StartThread(base::Thread* thread) {
+ void StartThread(base::Thread* thread, MessageLoop::Type type) {
base::Thread::Options options;
- options.message_loop_type = MessageLoop::TYPE_IO;
+ options.message_loop_type = type;
thread->StartWithOptions(options);
}
diff --git a/net/base/file_stream_win.cc b/net/base/file_stream_win.cc
index d7b3d1d..56d0368 100644
--- a/net/base/file_stream_win.cc
+++ b/net/base/file_stream_win.cc
@@ -50,55 +50,55 @@ static int MapErrorCode(DWORD err) {
class FileStream::AsyncContext : public MessageLoopForIO::IOHandler {
public:
AsyncContext(FileStream* owner)
- : owner_(owner), overlapped_(), callback_(NULL) {
- overlapped_.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
- }
-
- ~AsyncContext() {
- if (callback_)
- MessageLoopForIO::current()->RegisterIOContext(&overlapped_, NULL);
- CloseHandle(overlapped_.hEvent);
+ : owner_(owner), context_(), callback_(NULL) {
+ context_.handler = this;
}
+ ~AsyncContext();
void IOCompletionIsPending(CompletionCallback* callback);
-
- OVERLAPPED* overlapped() { return &overlapped_; }
+
+ OVERLAPPED* overlapped() { return &context_.overlapped; }
CompletionCallback* callback() const { return callback_; }
private:
- // MessageLoopForIO::IOHandler implementation:
- virtual void OnIOCompleted(OVERLAPPED* context, DWORD num_bytes,
- DWORD error);
+ virtual void OnIOCompleted(MessageLoopForIO::IOContext* context,
+ DWORD bytes_read, DWORD error);
FileStream* owner_;
- OVERLAPPED overlapped_;
+ MessageLoopForIO::IOContext context_;
CompletionCallback* callback_;
};
+FileStream::AsyncContext::~AsyncContext() {
+ bool waited = false;
+ base::Time start = base::Time::Now();
+ while (callback_) {
+ waited = true;
+ MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this);
+ }
+ if (waited) {
+ // We want to see if we block the message loop for too long.
+ UMA_HISTOGRAM_TIMES(L"AsyncIO.FileStreamClose", base::Time::Now() - start);
+ }
+}
+
void FileStream::AsyncContext::IOCompletionIsPending(
CompletionCallback* callback) {
DCHECK(!callback_);
callback_ = callback;
-
- MessageLoopForIO::current()->RegisterIOContext(&overlapped_, this);
}
-void FileStream::AsyncContext::OnIOCompleted(OVERLAPPED* context,
- DWORD num_bytes,
- DWORD error) {
- DCHECK(&overlapped_ == context);
+void FileStream::AsyncContext::OnIOCompleted(
+ MessageLoopForIO::IOContext* context, DWORD bytes_read, DWORD error) {
+ DCHECK(&context_ == context);
DCHECK(callback_);
- MessageLoopForIO::current()->RegisterIOContext(&overlapped_, NULL);
-
- HANDLE handle = owner_->file_;
-
- int result = static_cast<int>(num_bytes);
+ int result = static_cast<int>(bytes_read);
if (error && error != ERROR_HANDLE_EOF)
result = MapErrorCode(error);
-
- if (num_bytes)
- IncrementOffset(&overlapped_, num_bytes);
+
+ if (bytes_read)
+ IncrementOffset(&context->overlapped, bytes_read);
CompletionCallback* temp = NULL;
std::swap(temp, callback_);
@@ -115,11 +115,14 @@ FileStream::~FileStream() {
}
void FileStream::Close() {
+ if (file_ != INVALID_HANDLE_VALUE)
+ CancelIo(file_);
+
+ async_context_.reset();
if (file_ != INVALID_HANDLE_VALUE) {
CloseHandle(file_);
file_ = INVALID_HANDLE_VALUE;
}
- async_context_.reset();
}
int FileStream::Open(const std::wstring& path, int open_flags) {
@@ -211,9 +214,10 @@ int FileStream::Read(
LOG(WARNING) << "ReadFile failed: " << error;
rv = MapErrorCode(error);
}
+ } else if (overlapped) {
+ async_context_->IOCompletionIsPending(callback);
+ rv = ERR_IO_PENDING;
} else {
- if (overlapped)
- IncrementOffset(overlapped, bytes_read);
rv = static_cast<int>(bytes_read);
}
return rv;
@@ -224,7 +228,7 @@ int FileStream::Write(
if (!IsOpen())
return ERR_UNEXPECTED;
DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE);
-
+
OVERLAPPED* overlapped = NULL;
if (async_context_.get()) {
DCHECK(!async_context_->callback());
@@ -242,9 +246,10 @@ int FileStream::Write(
LOG(WARNING) << "WriteFile failed: " << error;
rv = MapErrorCode(error);
}
+ } else if (overlapped) {
+ async_context_->IOCompletionIsPending(callback);
+ rv = ERR_IO_PENDING;
} else {
- if (overlapped)
- IncrementOffset(overlapped, bytes_written);
rv = static_cast<int>(bytes_written);
}
return rv;
diff --git a/net/disk_cache/cache_util_win.cc b/net/disk_cache/cache_util_win.cc
index 377cbbf..adb9d8a5 100644
--- a/net/disk_cache/cache_util_win.cc
+++ b/net/disk_cache/cache_util_win.cc
@@ -7,6 +7,7 @@
#include <windows.h>
#include "base/logging.h"
+#include "base/message_loop.h"
#include "base/scoped_handle.h"
#include "base/file_util.h"
@@ -38,6 +39,9 @@ void DeleteFiles(const wchar_t* path, const wchar_t* search_name) {
namespace disk_cache {
+// Implemented in file_win.cc.
+MessageLoopForIO::IOHandler* GetFileIOHandler();
+
bool MoveCache(const std::wstring& from_path, const std::wstring& to_path) {
// I don't want to use the shell version of move because if something goes
// wrong, that version will attempt to move file by file and fail at the end.
@@ -63,12 +67,8 @@ bool DeleteCacheFile(const std::wstring& name) {
void WaitForPendingIO(int* num_pending_io) {
while (*num_pending_io) {
// Asynchronous IO operations may be in flight and the completion may end
- // up calling us back so let's wait for them (we need an alertable wait).
- // The idea is to let other threads do usefull work and at the same time
- // allow more than one IO to finish... 20 mS later, we process all queued
- // APCs and see if we have to repeat the wait.
- Sleep(20);
- SleepEx(0, TRUE);
+ // up calling us back so let's wait for them.
+ MessageLoopForIO::current()->WaitForIOCompletion(100, GetFileIOHandler());
}
}
diff --git a/net/disk_cache/disk_cache_perftest.cc b/net/disk_cache/disk_cache_perftest.cc
index c160677..33cad54 100644
--- a/net/disk_cache/disk_cache_perftest.cc
+++ b/net/disk_cache/disk_cache_perftest.cc
@@ -9,6 +9,7 @@
#include "base/basictypes.h"
#include "base/file_util.h"
#include "base/perftimer.h"
+#include "base/platform_test.h"
#if defined(OS_WIN)
#include "base/scoped_handle.h"
#endif
@@ -17,7 +18,6 @@
#include "net/base/net_errors.h"
#include "net/disk_cache/block_files.h"
#include "net/disk_cache/disk_cache.h"
-#include "net/disk_cache/disk_cache_test_base.h"
#include "net/disk_cache/disk_cache_test_util.h"
#include "net/disk_cache/hash.h"
#include "testing/gtest/include/gtest/gtest.h"
@@ -28,6 +28,8 @@ extern int g_cache_tests_max_id;
extern volatile int g_cache_tests_received;
extern volatile bool g_cache_tests_error;
+typedef PlatformTest DiskCacheTest;
+
namespace {
bool EvictFileFromSystemCache(const wchar_t* name) {
@@ -226,6 +228,7 @@ TEST_F(DiskCacheTest, CacheBackendPerformance) {
int ret = TimeWrite(num_entries, cache, &entries);
EXPECT_EQ(ret, g_cache_tests_received);
+ MessageLoop::current()->RunAllPending();
delete cache;
std::wstring filename(path);
@@ -257,6 +260,7 @@ TEST_F(DiskCacheTest, CacheBackendPerformance) {
ret = TimeRead(num_entries, cache, entries, false);
EXPECT_EQ(ret, g_cache_tests_received);
+ MessageLoop::current()->RunAllPending();
delete cache;
}
@@ -266,6 +270,7 @@ TEST_F(DiskCacheTest, CacheBackendPerformance) {
// fragmented, or if we have multiple files. This test measures that scenario,
// by using multiple, highly fragmented files.
TEST_F(DiskCacheTest, BlockFilesPerformance) {
+ MessageLoopForIO message_loop;
std::wstring path = GetCachePath();
ASSERT_TRUE(DeleteCache(path.c_str()));
@@ -303,4 +308,5 @@ TEST_F(DiskCacheTest, BlockFilesPerformance) {
}
timer2.Done();
+ MessageLoop::current()->RunAllPending();
}
diff --git a/net/disk_cache/disk_cache_test_base.cc b/net/disk_cache/disk_cache_test_base.cc
index 417792f..0a3e07b 100644
--- a/net/disk_cache/disk_cache_test_base.cc
+++ b/net/disk_cache/disk_cache_test_base.cc
@@ -8,6 +8,10 @@
#include "net/disk_cache/disk_cache_test_util.h"
#include "net/disk_cache/mem_backend_impl.h"
+void DiskCacheTest::TearDown() {
+ MessageLoop::current()->RunAllPending();
+}
+
void DiskCacheTestWithCache::SetMaxSize(int size) {
size_ = size;
if (cache_impl_)
@@ -73,6 +77,7 @@ void DiskCacheTestWithCache::InitDiskCache() {
void DiskCacheTestWithCache::TearDown() {
+ MessageLoop::current()->RunAllPending();
delete cache_;
if (!memory_only_) {
diff --git a/net/disk_cache/disk_cache_test_base.h b/net/disk_cache/disk_cache_test_base.h
index ca56be1f..6b75361 100644
--- a/net/disk_cache/disk_cache_test_base.h
+++ b/net/disk_cache/disk_cache_test_base.h
@@ -9,12 +9,6 @@
#include "base/platform_test.h"
#include "testing/gtest/include/gtest/gtest.h"
-// These tests can use the path service, which uses autoreleased objects on the
-// Mac, so this needs to be a PlatformTest. Even tests that do not require a
-// cache (and that do not need to be a DiskCacheTestWithCache) are susceptible
-// to this problem; all such tests should use TEST_F(DiskCacheTest, ...).
-typedef PlatformTest DiskCacheTest;
-
namespace disk_cache {
class Backend;
@@ -23,6 +17,14 @@ class MemBackendImpl;
} // namespace disk_cache
+// These tests can use the path service, which uses autoreleased objects on the
+// Mac, so this needs to be a PlatformTest. Even tests that do not require a
+// cache (and that do not need to be a DiskCacheTestWithCache) are susceptible
+// to this problem; all such tests should use TEST_F(DiskCacheTest, ...).
+class DiskCacheTest : public PlatformTest {
+ virtual void TearDown();
+};
+
// Provides basic support for cache related tests.
class DiskCacheTestWithCache : public DiskCacheTest {
protected:
diff --git a/net/disk_cache/entry_impl.cc b/net/disk_cache/entry_impl.cc
index 844ff40..2821540 100644
--- a/net/disk_cache/entry_impl.cc
+++ b/net/disk_cache/entry_impl.cc
@@ -16,25 +16,8 @@ using base::TimeDelta;
namespace {
-// This is a simple Task to execute the callback (from the message loop instead
-// of the APC).
-class InvokeCallback : public Task {
- public:
- InvokeCallback(net::CompletionCallback* callback, int argument)
- : callback_(callback), argument_(argument) {}
-
- virtual void Run() {
- callback_->Run(argument_);
- }
-
- private:
- net::CompletionCallback* callback_;
- int argument_;
- DISALLOW_EVIL_CONSTRUCTORS(InvokeCallback);
-};
-
-// This class implements FileIOCallback to buffer the callback from an IO
-// operation from the actual IO class.
+// This class implements FileIOCallback to buffer the callback from a file IO
+// operation from the actual net class.
class SyncCallback: public disk_cache::FileIOCallback {
public:
SyncCallback(disk_cache::EntryImpl* entry,
@@ -57,10 +40,8 @@ class SyncCallback: public disk_cache::FileIOCallback {
void SyncCallback::OnFileIOComplete(int bytes_copied) {
entry_->DecrementIoCount();
entry_->Release();
- if (callback_) {
- InvokeCallback* task = new InvokeCallback(callback_, bytes_copied);
- MessageLoop::current()->PostTask(FROM_HERE, task);
- }
+ if (callback_)
+ callback_->Run(bytes_copied);
delete this;
}
@@ -556,9 +537,11 @@ void EntryImpl::DeleteData(Addr address, int index) {
if (files_[index])
files_[index] = NULL; // Releases the object.
- if (!DeleteCacheFile(backend_->GetFileName(address)))
+ if (!DeleteCacheFile(backend_->GetFileName(address))) {
+ UMA_HISTOGRAM_COUNTS(L"DiskCache.DeleteFailed", 1);
LOG(ERROR) << "Failed to delete " << backend_->GetFileName(address) <<
" from the cache.";
+ }
} else {
backend_->DeleteBlock(address, true);
}
@@ -711,7 +694,6 @@ bool EntryImpl::ImportSeparateFile(int index, int offset, int buf_len) {
return true;
}
-
// The common scenario is that this is called from the destructor of the entry,
// to write to disk what we have buffered. We don't want to hold the destructor
// until the actual IO finishes, so we'll send an asynchronous write that will
@@ -744,6 +726,13 @@ bool EntryImpl::Flush(int index, int size, bool async) {
if (!file)
return false;
+ // TODO(rvargas): figure out if it's worth to re-enable posting operations.
+ // Right now it is only used from GrowUserBuffer, not the destructor, and
+ // it is not accounted for from the point of view of the total number of
+ // pending operations of the cache. It is also racing with the actual write
+ // on the GrowUserBuffer path because there is no code to exclude the range
+ // that is going to be written.
+ async = false;
if (async) {
if (!file->PostWrite(user_buffers_[index].get(), len, offset))
return false;
diff --git a/net/disk_cache/file_win.cc b/net/disk_cache/file_win.cc
index 0e8e3f3..bc89975 100644
--- a/net/disk_cache/file_win.cc
+++ b/net/disk_cache/file_win.cc
@@ -4,62 +4,39 @@
#include "net/disk_cache/file.h"
+#include "base/message_loop.h"
+#include "base/singleton.h"
#include "net/disk_cache/disk_cache.h"
namespace {
-// This class implements FileIOCallback to perform IO operations
-// when the callback parameter of the operation is NULL.
-class SyncCallback: public disk_cache::FileIOCallback {
- public:
- SyncCallback() : called_(false) {}
- ~SyncCallback() {}
-
- virtual void OnFileIOComplete(int bytes_copied);
- void WaitForResult(int* bytes_copied);
- private:
- bool called_;
- int actual_;
-};
-
-void SyncCallback::OnFileIOComplete(int bytes_copied) {
- actual_ = bytes_copied;
- called_ = true;
-}
-
-// Waits for the IO operation to complete.
-void SyncCallback::WaitForResult(int* bytes_copied) {
- for (;;) {
- SleepEx(INFINITE, TRUE);
- if (called_)
- break;
- }
- *bytes_copied = actual_;
-}
-
// Structure used for asynchronous operations.
struct MyOverlapped {
- OVERLAPPED overlapped;
- disk_cache::File* file;
- disk_cache::FileIOCallback* callback;
- const void* buffer;
- DWORD actual_bytes;
- bool async; // Invoke the callback form the completion.
- bool called; // Completion received.
- bool delete_buffer; // Delete the user buffer at completion.
-};
+ MyOverlapped(disk_cache::File* file, size_t offset,
+ disk_cache::FileIOCallback* callback);
+ ~MyOverlapped();
+ OVERLAPPED* overlapped() {
+ return &context_.overlapped;
+ }
-COMPILE_ASSERT(!offsetof(MyOverlapped, overlapped), starts_with_overlapped);
+ MessageLoopForIO::IOContext context_;
+ scoped_refptr<disk_cache::File> file_;
+ disk_cache::FileIOCallback* callback_;
+ const void* buffer_;
+ bool delete_buffer_; // Delete the user buffer at completion.
+};
-} // namespace
+COMPILE_ASSERT(!offsetof(MyOverlapped, context_), starts_with_overlapped);
-namespace disk_cache {
+// Helper class to handle the IO completion notifications from the message loop.
+class CompletionHandler : public MessageLoopForIO::IOHandler {
+ virtual void OnIOCompleted(MessageLoopForIO::IOContext* context,
+ DWORD actual_bytes, DWORD error);
+};
-// SyncCallback to be invoked as an APC when the asynchronous operation
-// completes.
-void CALLBACK IoCompletion(DWORD error, DWORD actual_bytes,
- OVERLAPPED* overlapped) {
- MyOverlapped* data = reinterpret_cast<MyOverlapped*>(overlapped);
+void CompletionHandler::OnIOCompleted(MessageLoopForIO::IOContext* context,
+ DWORD actual_bytes, DWORD error) {
+ MyOverlapped* data = reinterpret_cast<MyOverlapped*>(context);
if (error) {
DCHECK(!actual_bytes);
@@ -67,29 +44,39 @@ void CALLBACK IoCompletion(DWORD error, DWORD actual_bytes,
NOTREACHED();
}
- if (data->delete_buffer) {
- DCHECK(!data->callback);
- data->file->Release();
- delete data->buffer;
- delete data;
- return;
- }
+ if (data->callback_)
+ data->callback_->OnFileIOComplete(static_cast<int>(actual_bytes));
- if (data->async) {
- data->callback->OnFileIOComplete(static_cast<int>(actual_bytes));
- data->file->Release();
- delete data;
- } else {
- // Somebody is waiting for this so don't delete data and instead notify
- // that we were called.
- data->actual_bytes = actual_bytes;
- data->file->Release();
- data->called = true;
+ delete data;
+}
+
+MyOverlapped::MyOverlapped(disk_cache::File* file, size_t offset,
+ disk_cache::FileIOCallback* callback) {
+ memset(this, 0, sizeof(*this));
+ context_.handler = Singleton<CompletionHandler>::get();
+ context_.overlapped.Offset = static_cast<DWORD>(offset);
+ file_ = file;
+ callback_ = callback;
+}
+
+MyOverlapped::~MyOverlapped() {
+ if (delete_buffer_) {
+ DCHECK(!callback_);
+ delete buffer_;
}
}
+} // namespace
+
+namespace disk_cache {
+
+// Used from WaitForPendingIO() when the cache is being destroyed.
+MessageLoopForIO::IOHandler* GetFileIOHandler() {
+ return Singleton<CompletionHandler>::get();
+}
+
File::File(base::PlatformFile file)
- : init_(true), mixed_(true), platform_file_(INVALID_HANDLE_VALUE),
+ : init_(true), platform_file_(INVALID_HANDLE_VALUE),
sync_platform_file_(file) {
}
@@ -99,23 +86,22 @@ bool File::Init(const std::wstring& name) {
return false;
platform_file_ = CreateFile(name.c_str(), GENERIC_READ | GENERIC_WRITE,
- FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING,
- FILE_FLAG_OVERLAPPED, NULL);
+ FILE_SHARE_READ | FILE_SHARE_WRITE, NULL,
+ OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
if (INVALID_HANDLE_VALUE == platform_file_)
return false;
+ MessageLoopForIO::current()->RegisterIOHandler(
+ platform_file_, Singleton<CompletionHandler>::get());
+
init_ = true;
- if (mixed_) {
- sync_platform_file_ = CreateFile(name.c_str(), GENERIC_READ | GENERIC_WRITE,
- FILE_SHARE_READ | FILE_SHARE_WRITE, NULL,
- OPEN_EXISTING, 0, NULL);
-
- if (INVALID_HANDLE_VALUE == sync_platform_file_)
- return false;
- } else {
- sync_platform_file_ = INVALID_HANDLE_VALUE;
- }
+ sync_platform_file_ = CreateFile(name.c_str(), GENERIC_READ | GENERIC_WRITE,
+ FILE_SHARE_READ | FILE_SHARE_WRITE, NULL,
+ OPEN_EXISTING, 0, NULL);
+
+ if (INVALID_HANDLE_VALUE == sync_platform_file_)
+ return false;
return true;
}
@@ -126,13 +112,13 @@ File::~File() {
if (INVALID_HANDLE_VALUE != platform_file_)
CloseHandle(platform_file_);
- if (mixed_ && INVALID_HANDLE_VALUE != sync_platform_file_)
+ if (INVALID_HANDLE_VALUE != sync_platform_file_)
CloseHandle(sync_platform_file_);
}
base::PlatformFile File::platform_file() const {
DCHECK(init_);
- return (INVALID_HANDLE_VALUE == platform_file_) ? sync_platform_file_ :
+ return (INVALID_HANDLE_VALUE == platform_file_) ? sync_platform_file_ :
platform_file_;
}
@@ -145,13 +131,11 @@ bool File::IsValid() const {
bool File::Read(void* buffer, size_t buffer_len, size_t offset) {
DCHECK(init_);
- if (!mixed_ || buffer_len > ULONG_MAX || offset > LONG_MAX)
+ if (buffer_len > ULONG_MAX || offset > LONG_MAX)
return false;
- DWORD ret = SetFilePointer(sync_platform_file_,
- static_cast<LONG>(offset),
- NULL,
- FILE_BEGIN);
+ DWORD ret = SetFilePointer(sync_platform_file_, static_cast<LONG>(offset),
+ NULL, FILE_BEGIN);
if (INVALID_SET_FILE_POINTER == ret)
return false;
@@ -164,13 +148,11 @@ bool File::Read(void* buffer, size_t buffer_len, size_t offset) {
bool File::Write(const void* buffer, size_t buffer_len, size_t offset) {
DCHECK(init_);
- if (!mixed_ || buffer_len > ULONG_MAX || offset > ULONG_MAX)
+ if (buffer_len > ULONG_MAX || offset > ULONG_MAX)
return false;
- DWORD ret = SetFilePointer(sync_platform_file_,
- static_cast<LONG>(offset),
- NULL,
- FILE_BEGIN);
+ DWORD ret = SetFilePointer(sync_platform_file_, static_cast<LONG>(offset),
+ NULL, FILE_BEGIN);
if (INVALID_SET_FILE_POINTER == ret)
return false;
@@ -187,55 +169,38 @@ bool File::Write(const void* buffer, size_t buffer_len, size_t offset) {
bool File::Read(void* buffer, size_t buffer_len, size_t offset,
FileIOCallback* callback, bool* completed) {
DCHECK(init_);
+ if (!callback)
+ return Read(buffer, buffer_len, offset);
+
if (buffer_len > ULONG_MAX || offset > ULONG_MAX)
return false;
- MyOverlapped* data = new MyOverlapped;
- memset(data, 0, sizeof(*data));
-
- SyncCallback local_callback;
- data->overlapped.Offset = static_cast<DWORD>(offset);
- data->callback = callback ? callback : &local_callback;
- data->file = this;
-
+ MyOverlapped* data = new MyOverlapped(this, offset, callback);
DWORD size = static_cast<DWORD>(buffer_len);
- AddRef();
- if (!ReadFileEx(platform_file_, buffer, size, &data->overlapped,
- &IoCompletion)) {
- Release();
+ DWORD actual;
+ if (!ReadFile(platform_file_, buffer, size, &actual, data->overlapped())) {
+ *completed = false;
+ if (GetLastError() == ERROR_IO_PENDING)
+ return true;
delete data;
return false;
}
- if (callback) {
- *completed = false;
- // Let's check if the operation is already finished.
- SleepEx(0, TRUE);
- if (data->called) {
- *completed = (data->actual_bytes == size);
- DCHECK(data->actual_bytes == size);
- delete data;
- return *completed;
- }
- data->async = true;
- } else {
- // Invoke the callback and perform cleanup on the APC.
- data->async = true;
- int bytes_copied;
- local_callback.WaitForResult(&bytes_copied);
- if (static_cast<int>(buffer_len) != bytes_copied) {
- NOTREACHED();
- return false;
- }
- }
-
- return true;
+ // The operation completed already. We'll be called back anyway.
+ *completed = (actual == size);
+ DCHECK(actual == size);
+ data->callback_ = NULL;
+ data->file_ = NULL; // There is no reason to hold on to this anymore.
+ return *completed;
}
bool File::Write(const void* buffer, size_t buffer_len, size_t offset,
FileIOCallback* callback, bool* completed) {
DCHECK(init_);
+ if (!callback)
+ return Write(buffer, buffer_len, offset);
+
return AsyncWrite(buffer, buffer_len, offset, true, callback, completed);
}
@@ -250,50 +215,32 @@ bool File::AsyncWrite(const void* buffer, size_t buffer_len, size_t offset,
if (buffer_len > ULONG_MAX || offset > ULONG_MAX)
return false;
- MyOverlapped* data = new MyOverlapped;
- memset(data, 0, sizeof(*data));
-
- SyncCallback local_callback;
- data->overlapped.Offset = static_cast<DWORD>(offset);
- data->callback = callback ? callback : &local_callback;
- data->file = this;
- if (!callback && !notify) {
- data->delete_buffer = true;
- data->callback = NULL;
- data->buffer = buffer;
+ MyOverlapped* data = new MyOverlapped(this, offset, callback);
+ bool dummy_completed;
+ if (!callback) {
+ DCHECK(!notify);
+ data->delete_buffer_ = true;
+ data->buffer_ = buffer;
+ completed = &dummy_completed;
}
DWORD size = static_cast<DWORD>(buffer_len);
- AddRef();
- if (!WriteFileEx(platform_file_, buffer, size, &data->overlapped,
- &IoCompletion)) {
- Release();
+ DWORD actual;
+ if (!WriteFile(platform_file_, buffer, size, &actual, data->overlapped())) {
+ *completed = false;
+ if (GetLastError() == ERROR_IO_PENDING)
+ return true;
delete data;
return false;
}
- if (callback) {
- *completed = false;
- SleepEx(0, TRUE);
- if (data->called) {
- *completed = (data->actual_bytes == size);
- DCHECK(data->actual_bytes == size);
- delete data;
- return *completed;
- }
- data->async = true;
- } else if (notify) {
- data->async = true;
- int bytes_copied;
- local_callback.WaitForResult(&bytes_copied);
- if (static_cast<int>(buffer_len) != bytes_copied) {
- NOTREACHED();
- return false;
- }
- }
-
- return true;
+ // The operation completed already. We'll be called back anyway.
+ *completed = (actual == size);
+ DCHECK(actual == size);
+ data->callback_ = NULL;
+ data->file_ = NULL; // There is no reason to hold on to this anymore.
+ return *completed;
}
bool File::SetLength(size_t length) {
diff --git a/net/disk_cache/mapped_file_unittest.cc b/net/disk_cache/mapped_file_unittest.cc
index fe090e5..28eeff5 100644
--- a/net/disk_cache/mapped_file_unittest.cc
+++ b/net/disk_cache/mapped_file_unittest.cc
@@ -95,6 +95,8 @@ TEST_F(DiskCacheTest, MappedFile_AsyncIO) {
g_cache_tests_max_id = 0;
g_cache_tests_received = 0;
+ MessageLoopHelper helper;
+
char buffer1[20];
char buffer2[20];
CacheTestFillBuffer(buffer1, sizeof(buffer1), false);
@@ -105,14 +107,14 @@ TEST_F(DiskCacheTest, MappedFile_AsyncIO) {
int expected = completed ? 0 : 1;
g_cache_tests_max_id = 1;
- WaitForCallbacks(expected);
+ helper.WaitUntilCacheIoFinished(expected);
EXPECT_TRUE(file->Read(buffer2, sizeof(buffer2), 1024 * 1024, &callback,
&completed));
if (!completed)
expected++;
- WaitForCallbacks(expected);
+ helper.WaitUntilCacheIoFinished(expected);
EXPECT_EQ(expected, g_cache_tests_received);
EXPECT_FALSE(g_cache_tests_error);