summaryrefslogtreecommitdiffstats
path: root/mojo/system
diff options
context:
space:
mode:
authoryzshen@chromium.org <yzshen@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-03-04 07:44:13 +0000
committeryzshen@chromium.org <yzshen@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-03-04 07:44:13 +0000
commit44992e712496b368a207685f5cc1c46a3b2f8a57 (patch)
tree3bb47deef06f1031bcaf858897bdcd5686ac5519 /mojo/system
parentf02c6ccd70c378739e1afb38beebe7e25cc08cab (diff)
downloadchromium_src-44992e712496b368a207685f5cc1c46a3b2f8a57.zip
chromium_src-44992e712496b368a207685f5cc1c46a3b2f8a57.tar.gz
chromium_src-44992e712496b368a207685f5cc1c46a3b2f8a57.tar.bz2
Implement RawChannel on Windows.
- add RawChannelWin class; - move read/write buffer managing code to RawChannel so that it can be shared by both RawChannelWin and RawChannelPosix. - refactor unittests. TEST=raw_channel_unittests.cc Review URL: https://codereview.chromium.org/166293002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@254710 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo/system')
-rw-r--r--mojo/system/message_in_transit.h2
-rw-r--r--mojo/system/raw_channel_unittest.cc (renamed from mojo/system/raw_channel_posix_unittest.cc)121
-rw-r--r--mojo/system/raw_channel_win.cc525
3 files changed, 604 insertions, 44 deletions
diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h
index 66f374d..cbb7271 100644
--- a/mojo/system/message_in_transit.h
+++ b/mojo/system/message_in_transit.h
@@ -68,7 +68,7 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
struct Header;
public:
// This represents a view of serialized message data in a raw buffer.
- class View {
+ class MOJO_SYSTEM_IMPL_EXPORT View {
public:
// Constructs a view from the given buffer of the given size. (The size must
// be as provided by |MessageInTransit::GetNextMessageSize()|.) The buffer
diff --git a/mojo/system/raw_channel_posix_unittest.cc b/mojo/system/raw_channel_unittest.cc
index b0d5fc6..5723f1c 100644
--- a/mojo/system/raw_channel_posix_unittest.cc
+++ b/mojo/system/raw_channel_unittest.cc
@@ -1,14 +1,9 @@
-// Copyright 2013 The Chromium Authors. All rights reserved.
+// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-// TODO(vtl): Factor out the remaining POSIX-specific bits of this test (once we
-// have a non-POSIX implementation).
-
#include "mojo/system/raw_channel.h"
-#include <sys/socket.h>
-
#include <vector>
#include "base/basictypes.h"
@@ -25,6 +20,7 @@
#include "base/threading/platform_thread.h" // For |Sleep()|.
#include "base/threading/simple_thread.h"
#include "base/time/time.h"
+#include "build/build_config.h"
#include "mojo/common/test/test_utils.h"
#include "mojo/system/embedder/platform_channel_pair.h"
#include "mojo/system/embedder/platform_handle.h"
@@ -32,6 +28,10 @@
#include "mojo/system/message_in_transit.h"
#include "mojo/system/test_utils.h"
+#if defined(OS_POSIX)
+#include <sys/socket.h>
+#endif
+
namespace mojo {
namespace system {
namespace {
@@ -71,10 +71,10 @@ bool WriteTestMessageToHandle(const embedder::PlatformHandle& handle,
// -----------------------------------------------------------------------------
-class RawChannelPosixTest : public test::TestWithIOThreadBase {
+class RawChannelTest : public test::TestWithIOThreadBase {
public:
- RawChannelPosixTest() {}
- virtual ~RawChannelPosixTest() {}
+ RawChannelTest() {}
+ virtual ~RawChannelTest() {}
virtual void SetUp() OVERRIDE {
test::TestWithIOThreadBase::SetUp();
@@ -95,10 +95,10 @@ class RawChannelPosixTest : public test::TestWithIOThreadBase {
embedder::ScopedPlatformHandle handles[2];
private:
- DISALLOW_COPY_AND_ASSIGN(RawChannelPosixTest);
+ DISALLOW_COPY_AND_ASSIGN(RawChannelTest);
};
-// RawChannelPosixTest.WriteMessage --------------------------------------------
+// RawChannelTest.WriteMessage -------------------------------------------------
class WriteOnlyRawChannelDelegate : public RawChannel::Delegate {
public:
@@ -187,7 +187,7 @@ class TestMessageReaderAndChecker {
};
// Tests writing (and verifies reading using our own custom reader).
-TEST_F(RawChannelPosixTest, WriteMessage) {
+TEST_F(RawChannelTest, WriteMessage) {
WriteOnlyRawChannelDelegate delegate;
scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass(),
&delegate,
@@ -217,7 +217,7 @@ TEST_F(RawChannelPosixTest, WriteMessage) {
base::Unretained(rc.get())));
}
-// RawChannelPosixTest.OnReadMessage -------------------------------------------
+// RawChannelTest.OnReadMessage ------------------------------------------------
class ReadCheckerRawChannelDelegate : public RawChannel::Delegate {
public:
@@ -278,7 +278,7 @@ class ReadCheckerRawChannelDelegate : public RawChannel::Delegate {
};
// Tests reading (writing using our own custom writer).
-TEST_F(RawChannelPosixTest, OnReadMessage) {
+TEST_F(RawChannelTest, OnReadMessage) {
ReadCheckerRawChannelDelegate delegate;
scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass(),
&delegate,
@@ -313,7 +313,7 @@ TEST_F(RawChannelPosixTest, OnReadMessage) {
base::Unretained(rc.get())));
}
-// RawChannelPosixTest.WriteMessageAndOnReadMessage ----------------------------
+// RawChannelTest.WriteMessageAndOnReadMessage ---------------------------------
class RawChannelWriterThread : public base::SimpleThread {
public:
@@ -380,7 +380,7 @@ class ReadCountdownRawChannelDelegate : public RawChannel::Delegate {
DISALLOW_COPY_AND_ASSIGN(ReadCountdownRawChannelDelegate);
};
-TEST_F(RawChannelPosixTest, WriteMessageAndOnReadMessage) {
+TEST_F(RawChannelTest, WriteMessageAndOnReadMessage) {
static const size_t kNumWriterThreads = 10;
static const size_t kNumWriteMessagesPerThread = 4000;
@@ -433,37 +433,46 @@ TEST_F(RawChannelPosixTest, WriteMessageAndOnReadMessage) {
base::Unretained(writer_rc.get())));
}
-// RawChannelPosixTest.OnFatalError --------------------------------------------
+// RawChannelTest.OnFatalError -------------------------------------------------
class FatalErrorRecordingRawChannelDelegate
: public ReadCountdownRawChannelDelegate {
public:
- FatalErrorRecordingRawChannelDelegate(size_t expected_read_count_)
- : ReadCountdownRawChannelDelegate(expected_read_count_),
+ FatalErrorRecordingRawChannelDelegate(size_t expected_read_count,
+ bool expect_read_error,
+ bool expect_write_error)
+ : ReadCountdownRawChannelDelegate(expected_read_count),
got_fatal_error_event_(false, false),
- on_fatal_error_call_count_(0),
- last_fatal_error_(FATAL_ERROR_UNKNOWN) {}
+ expecting_read_error_(expect_read_error),
+ expecting_write_error_(expect_write_error) {
+ }
virtual ~FatalErrorRecordingRawChannelDelegate() {}
virtual void OnFatalError(FatalError fatal_error) OVERRIDE {
- CHECK_EQ(on_fatal_error_call_count_, 0);
- on_fatal_error_call_count_++;
- last_fatal_error_ = fatal_error;
- got_fatal_error_event_.Signal();
+ if (fatal_error == FATAL_ERROR_FAILED_READ) {
+ ASSERT_TRUE(expecting_read_error_);
+ expecting_read_error_ = false;
+ } else if (fatal_error == FATAL_ERROR_FAILED_WRITE) {
+ ASSERT_TRUE(expecting_write_error_);
+ expecting_write_error_ = false;
+ } else {
+ ASSERT_TRUE(false);
+ }
+
+ if (!expecting_read_error_ && !expecting_write_error_)
+ got_fatal_error_event_.Signal();
}
- FatalError WaitForFatalError() {
+ void WaitForFatalError() {
got_fatal_error_event_.Wait();
- CHECK_EQ(on_fatal_error_call_count_, 1);
- return last_fatal_error_;
}
private:
base::WaitableEvent got_fatal_error_event_;
- int on_fatal_error_call_count_;
- FatalError last_fatal_error_;
+ bool expecting_read_error_;
+ bool expecting_write_error_;
DISALLOW_COPY_AND_ASSIGN(FatalErrorRecordingRawChannelDelegate);
};
@@ -471,10 +480,48 @@ class FatalErrorRecordingRawChannelDelegate
// Tests fatal errors.
// TODO(vtl): Figure out how to make reading fail reliably. (I'm not convinced
// that it does.)
-TEST_F(RawChannelPosixTest, OnFatalError) {
+TEST_F(RawChannelTest, OnFatalError) {
+ FatalErrorRecordingRawChannelDelegate delegate(0, false, true);
+
+ scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass(),
+ &delegate,
+ io_thread_message_loop()));
+
+ test::PostTaskAndWait(io_thread_task_runner(),
+ FROM_HERE,
+ base::Bind(&InitOnIOThread, rc.get()));
+
+ // Close the handle of the other end, which should make writing fail.
+ handles[1].reset();
+
+ EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1)));
+
+ // TODO(vtl): In theory, it's conceivable that closing the other end might
+ // lead to read failing. In practice, it doesn't seem to.
+ delegate.WaitForFatalError();
+
+ EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(2)));
+
+ // Sleep a bit, to make sure we don't get another |OnFatalError()|
+ // notification. (If we actually get another one, |OnFatalError()| crashes.)
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
+
+ test::PostTaskAndWait(io_thread_task_runner(),
+ FROM_HERE,
+ base::Bind(&RawChannel::Shutdown,
+ base::Unretained(rc.get())));
+}
+
+#if defined(OS_POSIX)
+// RawChannelTest.ReadUnaffectedByWriteFatalError ------------------------------
+
+// TODO(yzshen): On Windows, I haven't figured out a way to shut down one
+// direction of the named pipe.
+TEST_F(RawChannelTest, ReadUnaffectedByWriteFatalError) {
const size_t kMessageCount = 5;
- FatalErrorRecordingRawChannelDelegate delegate(2 * kMessageCount);
+ FatalErrorRecordingRawChannelDelegate delegate(2 * kMessageCount, false,
+ true);
scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass(),
&delegate,
io_thread_message_loop()));
@@ -495,10 +542,7 @@ TEST_F(RawChannelPosixTest, OnFatalError) {
EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1)));
- // TODO(vtl): In theory, it's conceivable that closing the other end might
- // lead to read failing. In practice, it doesn't seem to.
- EXPECT_EQ(RawChannel::Delegate::FATAL_ERROR_FAILED_WRITE,
- delegate.WaitForFatalError());
+ delegate.WaitForFatalError();
EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(2)));
@@ -519,12 +563,13 @@ TEST_F(RawChannelPosixTest, OnFatalError) {
base::Bind(&RawChannel::Shutdown,
base::Unretained(rc.get())));
}
+#endif // defined(OS_POSIX)
-// RawChannelPosixTest.WriteMessageAfterShutdown -------------------------------
+// RawChannelTest.WriteMessageAfterShutdown ------------------------------------
// Makes sure that calling |WriteMessage()| after |Shutdown()| behaves
// correctly.
-TEST_F(RawChannelPosixTest, WriteMessageAfterShutdown) {
+TEST_F(RawChannelTest, WriteMessageAfterShutdown) {
WriteOnlyRawChannelDelegate delegate;
scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass(),
&delegate,
diff --git a/mojo/system/raw_channel_win.cc b/mojo/system/raw_channel_win.cc
index 7fe2f94..e329d8d 100644
--- a/mojo/system/raw_channel_win.cc
+++ b/mojo/system/raw_channel_win.cc
@@ -4,23 +4,538 @@
#include "mojo/system/raw_channel.h"
-#include <stddef.h>
+#include <windows.h>
+#include "base/basictypes.h"
+#include "base/bind.h"
+#include "base/compiler_specific.h"
+#include "base/lazy_instance.h"
+#include "base/location.h"
#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/message_loop/message_loop.h"
+#include "base/synchronization/lock.h"
+#include "base/win/windows_version.h"
+#include "mojo/system/embedder/platform_handle.h"
namespace mojo {
namespace system {
+namespace {
+
+class VistaOrHigherFunctions {
+ public:
+ VistaOrHigherFunctions();
+
+ bool is_vista_or_higher() const { return is_vista_or_higher_; }
+
+ BOOL SetFileCompletionNotificationModes(HANDLE handle, UCHAR flags) {
+ return set_file_completion_notification_modes_(handle, flags);
+ }
+
+ BOOL CancelIoEx(HANDLE handle, LPOVERLAPPED overlapped) {
+ return cancel_io_ex_(handle, overlapped);
+ }
+
+ private:
+ typedef BOOL (WINAPI *SetFileCompletionNotificationModesFunc)(HANDLE, UCHAR);
+ typedef BOOL (WINAPI *CancelIoExFunc)(HANDLE, LPOVERLAPPED);
+
+ bool is_vista_or_higher_;
+ SetFileCompletionNotificationModesFunc
+ set_file_completion_notification_modes_;
+ CancelIoExFunc cancel_io_ex_;
+};
+
+VistaOrHigherFunctions::VistaOrHigherFunctions()
+ : is_vista_or_higher_(base::win::GetVersion() >= base::win::VERSION_VISTA),
+ set_file_completion_notification_modes_(NULL),
+ cancel_io_ex_(NULL) {
+ if (!is_vista_or_higher_)
+ return;
+
+ HMODULE module = GetModuleHandleW(L"kernel32.dll");
+ set_file_completion_notification_modes_ =
+ reinterpret_cast<SetFileCompletionNotificationModesFunc>(
+ GetProcAddress(module, "SetFileCompletionNotificationModes"));
+ DCHECK(set_file_completion_notification_modes_);
+
+ cancel_io_ex_ = reinterpret_cast<CancelIoExFunc>(
+ GetProcAddress(module, "CancelIoEx"));
+ DCHECK(cancel_io_ex_);
+}
+
+base::LazyInstance<VistaOrHigherFunctions> g_vista_or_higher_functions =
+ LAZY_INSTANCE_INITIALIZER;
+
+class RawChannelWin : public RawChannel {
+ public:
+ RawChannelWin(embedder::ScopedPlatformHandle handle,
+ Delegate* delegate,
+ base::MessageLoopForIO* message_loop);
+ virtual ~RawChannelWin();
+
+ private:
+ // RawChannelIOHandler receives OS notifications for I/O completion. It must
+ // be created on the I/O thread.
+ //
+ // It manages its own destruction. Destruction happens on the I/O thread when
+ // all the following conditions are satisfied:
+ // - |DetachFromOwnerNoLock()| has been called;
+ // - there is no pending read;
+ // - there is no pending write.
+ class RawChannelIOHandler : public base::MessageLoopForIO::IOHandler {
+ public:
+ RawChannelIOHandler(RawChannelWin* owner,
+ embedder::ScopedPlatformHandle handle);
+
+ HANDLE handle() const { return handle_.get().handle; }
+
+ // The following methods are only called by the owner on the I/O thread.
+ bool pending_read() const;
+ base::MessageLoopForIO::IOContext* read_context();
+ // Instructs the object to wait for an |OnIOCompleted()| notification.
+ void OnPendingReadStarted();
+
+ // The following methods are only called by the owner under
+ // |owner_->write_lock()|.
+ bool pending_write_no_lock() const;
+ base::MessageLoopForIO::IOContext* write_context_no_lock();
+ // Instructs the object to wait for an |OnIOCompleted()| notification.
+ void OnPendingWriteStartedNoLock();
+
+ // |base::MessageLoopForIO::IOHandler| implementation:
+ // Must be called on the I/O thread. It could be called before or after
+ // detached from the owner.
+ virtual void OnIOCompleted(base::MessageLoopForIO::IOContext* context,
+ DWORD bytes_transferred,
+ DWORD error) OVERRIDE;
+
+ // Must be called on the I/O thread under |owner_->write_lock()|.
+ // After this call, the owner must not make any further calls on this
+ // object, and therefore the object is used on the I/O thread exclusively
+ // (if it stays alive).
+ void DetachFromOwnerNoLock(scoped_ptr<ReadBuffer> read_buffer,
+ scoped_ptr<WriteBuffer> write_buffer);
+
+ private:
+ virtual ~RawChannelIOHandler();
+
+ // Returns true if |owner_| has been reset and there is not pending read or
+ // write.
+ // Must be called on the I/O thread.
+ bool ShouldSelfDestruct() const;
+
+ // Must be called on the I/O thread. It could be called before or after
+ // detached from the owner.
+ void OnReadCompleted(DWORD bytes_read, DWORD error);
+ // Must be called on the I/O thread. It could be called before or after
+ // detached from the owner.
+ void OnWriteCompleted(DWORD bytes_written, DWORD error);
+
+ embedder::ScopedPlatformHandle handle_;
+
+ // |owner_| is reset on the I/O thread under |owner_->write_lock()|.
+ // Therefore, it may be used on any thread under lock; or on the I/O thread
+ // without locking.
+ RawChannelWin* owner_;
+
+ // The following members must be used on the I/O thread.
+ scoped_ptr<ReadBuffer> preserved_read_buffer_after_detach_;
+ scoped_ptr<WriteBuffer> preserved_write_buffer_after_detach_;
+
+ bool pending_read_;
+ base::MessageLoopForIO::IOContext read_context_;
+
+ // The following members must be used under |owner_->write_lock()| while the
+ // object is still attached to the owner, and only on the I/O thread
+ // afterwards.
+ bool pending_write_;
+ base::MessageLoopForIO::IOContext write_context_;
+
+ DISALLOW_COPY_AND_ASSIGN(RawChannelIOHandler);
+ };
+
+ // |RawChannel| implementation:
+ virtual IOResult Read(size_t* bytes_read) OVERRIDE;
+ virtual IOResult ScheduleRead() OVERRIDE;
+ virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE;
+ virtual IOResult ScheduleWriteNoLock() OVERRIDE;
+ virtual bool OnInit() OVERRIDE;
+ virtual void OnShutdownNoLock(
+ scoped_ptr<ReadBuffer> read_buffer,
+ scoped_ptr<WriteBuffer> write_buffer) OVERRIDE;
+
+ // Passed to |io_handler_| during initialization.
+ embedder::ScopedPlatformHandle handle_;
+
+ RawChannelIOHandler* io_handler_;
+
+ const bool skip_completion_port_on_success_;
+
+ DISALLOW_COPY_AND_ASSIGN(RawChannelWin);
+};
+
+RawChannelWin::RawChannelIOHandler::RawChannelIOHandler(
+ RawChannelWin* owner,
+ embedder::ScopedPlatformHandle handle) : handle_(handle.Pass()),
+ owner_(owner),
+ pending_read_(false),
+ pending_write_(false) {
+ memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped));
+ read_context_.handler = this;
+ memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped));
+ write_context_.handler = this;
+
+ owner_->message_loop_for_io()->RegisterIOHandler(handle_.get().handle, this);
+}
+
+RawChannelWin::RawChannelIOHandler::~RawChannelIOHandler() {
+ DCHECK(ShouldSelfDestruct());
+}
+
+bool RawChannelWin::RawChannelIOHandler::pending_read() const {
+ DCHECK(owner_);
+ DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
+ return pending_read_;
+}
+
+base::MessageLoopForIO::IOContext*
+ RawChannelWin::RawChannelIOHandler::read_context() {
+ DCHECK(owner_);
+ DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
+ return &read_context_;
+}
+
+void RawChannelWin::RawChannelIOHandler::OnPendingReadStarted() {
+ DCHECK(owner_);
+ DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
+ DCHECK(!pending_read_);
+ pending_read_ = true;
+}
+
+bool RawChannelWin::RawChannelIOHandler::pending_write_no_lock() const {
+ DCHECK(owner_);
+ owner_->write_lock().AssertAcquired();
+ return pending_write_;
+}
+
+base::MessageLoopForIO::IOContext*
+ RawChannelWin::RawChannelIOHandler::write_context_no_lock() {
+ DCHECK(owner_);
+ owner_->write_lock().AssertAcquired();
+ return &write_context_;
+}
+
+void RawChannelWin::RawChannelIOHandler::OnPendingWriteStartedNoLock() {
+ DCHECK(owner_);
+ owner_->write_lock().AssertAcquired();
+ DCHECK(!pending_write_);
+ pending_write_ = true;
+}
+
+void RawChannelWin::RawChannelIOHandler::OnIOCompleted(
+ base::MessageLoopForIO::IOContext* context,
+ DWORD bytes_transferred,
+ DWORD error) {
+ DCHECK(!owner_ ||
+ base::MessageLoop::current() == owner_->message_loop_for_io());
+
+ if (context == &read_context_)
+ OnReadCompleted(bytes_transferred, error);
+ else if (context == &write_context_)
+ OnWriteCompleted(bytes_transferred, error);
+ else
+ NOTREACHED();
+
+ if (ShouldSelfDestruct())
+ delete this;
+}
+
+void RawChannelWin::RawChannelIOHandler::DetachFromOwnerNoLock(
+ scoped_ptr<ReadBuffer> read_buffer,
+ scoped_ptr<WriteBuffer> write_buffer) {
+ DCHECK(owner_);
+ DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
+ owner_->write_lock().AssertAcquired();
+
+ // If read/write is pending, we have to retain the corresponding buffer.
+ if (pending_read_)
+ preserved_read_buffer_after_detach_ = read_buffer.Pass();
+ if (pending_write_)
+ preserved_write_buffer_after_detach_ = write_buffer.Pass();
+
+ owner_ = NULL;
+ if (ShouldSelfDestruct())
+ delete this;
+}
+
+bool RawChannelWin::RawChannelIOHandler::ShouldSelfDestruct() const {
+ if (owner_)
+ return false;
+
+ // Note: Detached, hence no lock needed for |pending_write_|.
+ return !pending_read_ && !pending_write_;
+}
+
+void RawChannelWin::RawChannelIOHandler::OnReadCompleted(DWORD bytes_read,
+ DWORD error) {
+ DCHECK(!owner_ ||
+ base::MessageLoop::current() == owner_->message_loop_for_io());
+
+ CHECK(pending_read_);
+ pending_read_ = false;
+ if (!owner_)
+ return;
+
+ if (error != ERROR_SUCCESS) {
+ DCHECK_EQ(bytes_read, 0u);
+
+ // ERROR_BROKEN_PIPE indicates that the other side has closed the handle.
+ // Don't regard that as a fatal error. We simply don't tell RawChannel that
+ // the read has actually completed.
+ if (error != ERROR_BROKEN_PIPE) {
+ LOG(ERROR) << "ReadFile failed: " << error;
+ owner_->OnReadCompleted(false, 0);
+ }
+ } else {
+ owner_->OnReadCompleted(true, bytes_read);
+ }
+}
+
+void RawChannelWin::RawChannelIOHandler::OnWriteCompleted(DWORD bytes_written,
+ DWORD error) {
+ DCHECK(!owner_ ||
+ base::MessageLoop::current() == owner_->message_loop_for_io());
+
+ if (!owner_) {
+ // No lock needed.
+ CHECK(pending_write_);
+ pending_write_ = false;
+ return;
+ }
+
+ {
+ base::AutoLock locker(owner_->write_lock());
+ CHECK(pending_write_);
+ pending_write_ = false;
+ }
+
+ if (error != ERROR_SUCCESS) {
+ LOG(ERROR) << "WriteFile failed: " << error;
+ owner_->OnWriteCompleted(false, 0);
+ } else {
+ owner_->OnWriteCompleted(true, bytes_written);
+ }
+}
+
+RawChannelWin::RawChannelWin(embedder::ScopedPlatformHandle handle,
+ Delegate* delegate,
+ base::MessageLoopForIO* message_loop)
+ : RawChannel(delegate, message_loop),
+ handle_(handle.Pass()),
+ io_handler_(NULL),
+ skip_completion_port_on_success_(
+ g_vista_or_higher_functions.Get().is_vista_or_higher()) {
+ DCHECK(handle_.is_valid());
+}
+
+RawChannelWin::~RawChannelWin() {
+ DCHECK(!io_handler_);
+}
+
+RawChannel::IOResult RawChannelWin::Read(size_t* bytes_read) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(io_handler_);
+ DCHECK(!io_handler_->pending_read());
+
+ char* buffer = NULL;
+ size_t bytes_to_read = 0;
+ read_buffer()->GetBuffer(&buffer, &bytes_to_read);
+
+ DWORD bytes_read_dword = 0;
+ BOOL result = ReadFile(io_handler_->handle(),
+ buffer,
+ static_cast<DWORD>(bytes_to_read),
+ &bytes_read_dword,
+ &io_handler_->read_context()->overlapped);
+ if (!result) {
+ DCHECK_EQ(bytes_read_dword, 0u);
+ DWORD error = GetLastError();
+ if (error != ERROR_IO_PENDING) {
+ if (error != ERROR_BROKEN_PIPE) {
+ PLOG(ERROR) << "ReadFile";
+ return IO_FAILED;
+ }
+
+ // Please see comments in |RawChannelIOHandler::OnReadCompleted()| for why
+ // handling ERROR_BROKEN_PIPE differently.
+ return IO_PENDING;
+ }
+ }
+
+ if (result && skip_completion_port_on_success_) {
+ *bytes_read = bytes_read_dword;
+ return IO_SUCCEEDED;
+ }
+
+ // If the read is pending or the read has succeeded but we don't skip
+ // completion port on success, instruct |io_handler_| to wait for the
+ // completion packet.
+ //
+ // TODO(yzshen): It seems there isn't document saying that all error cases
+ // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion
+ // packet. If we do get one for errors, |RawChannelIOHandler::OnIOCompleted()|
+ // will crash so we will learn about it.
+
+ io_handler_->OnPendingReadStarted();
+ return IO_PENDING;
+}
+
+RawChannel::IOResult RawChannelWin::ScheduleRead() {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(io_handler_);
+ DCHECK(!io_handler_->pending_read());
+
+ size_t bytes_read = 0;
+ IOResult io_result = Read(&bytes_read);
+ if (io_result == IO_SUCCEEDED) {
+ DCHECK(skip_completion_port_on_success_);
+
+ // We have finished reading successfully. Queue a notification manually.
+ io_handler_->OnPendingReadStarted();
+ // |io_handler_| won't go away before the task is run, so it is safe to use
+ // |base::Unretained()|.
+ message_loop_for_io()->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannelIOHandler::OnIOCompleted,
+ base::Unretained(io_handler_),
+ base::Unretained(io_handler_->read_context()),
+ static_cast<DWORD>(bytes_read),
+ ERROR_SUCCESS));
+ return IO_PENDING;
+ }
+
+ return io_result;
+}
+
+RawChannel::IOResult RawChannelWin::WriteNoLock(size_t* bytes_written) {
+ write_lock().AssertAcquired();
+
+ DCHECK(io_handler_);
+ DCHECK(!io_handler_->pending_write_no_lock());
+
+ std::vector<WriteBuffer::Buffer> buffers;
+ write_buffer_no_lock()->GetBuffers(&buffers);
+ DCHECK(!buffers.empty());
+
+ // TODO(yzshen): Handle multi-segment writes more efficiently.
+ DWORD bytes_written_dword = 0;
+ BOOL result = WriteFile(io_handler_->handle(),
+ buffers[0].addr,
+ static_cast<DWORD>(buffers[0].size),
+ &bytes_written_dword,
+ &io_handler_->write_context_no_lock()->overlapped);
+ if (!result && GetLastError() != ERROR_IO_PENDING) {
+ PLOG(ERROR) << "WriteFile";
+ return IO_FAILED;
+ }
+
+ if (result && skip_completion_port_on_success_) {
+ *bytes_written = bytes_written_dword;
+ return IO_SUCCEEDED;
+ }
+
+ // If the write is pending or the write has succeeded but we don't skip
+ // completion port on success, instruct |io_handler_| to wait for the
+ // completion packet.
+ //
+ // TODO(yzshen): it seems there isn't document saying that all error cases
+ // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion
+ // packet. If we do get one for errors, |RawChannelIOHandler::OnIOCompleted()|
+ // will crash so we will learn about it.
+
+ io_handler_->OnPendingWriteStartedNoLock();
+ return IO_PENDING;
+}
+
+RawChannel::IOResult RawChannelWin::ScheduleWriteNoLock() {
+ write_lock().AssertAcquired();
+
+ DCHECK(io_handler_);
+ DCHECK(!io_handler_->pending_write_no_lock());
+
+ size_t bytes_written = 0;
+ IOResult io_result = WriteNoLock(&bytes_written);
+ if (io_result == IO_SUCCEEDED) {
+ DCHECK(skip_completion_port_on_success_);
+
+ // We have finished writing successfully. Queue a notification manually.
+ io_handler_->OnPendingWriteStartedNoLock();
+ // |io_handler_| won't go away before that task is run, so it is safe to use
+ // |base::Unretained()|.
+ message_loop_for_io()->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannelIOHandler::OnIOCompleted,
+ base::Unretained(io_handler_),
+ base::Unretained(io_handler_->write_context_no_lock()),
+ static_cast<DWORD>(bytes_written),
+ ERROR_SUCCESS));
+ return IO_PENDING;
+ }
+
+ return io_result;
+}
+
+bool RawChannelWin::OnInit() {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+
+ DCHECK(handle_.is_valid());
+ if (skip_completion_port_on_success_ &&
+ !g_vista_or_higher_functions.Get().SetFileCompletionNotificationModes(
+ handle_.get().handle, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)) {
+ return false;
+ }
+
+ DCHECK(!io_handler_);
+ io_handler_ = new RawChannelIOHandler(this, handle_.Pass());
+
+ return true;
+}
+
+void RawChannelWin::OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer,
+ scoped_ptr<WriteBuffer> write_buffer) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
+ DCHECK(io_handler_);
+
+ write_lock().AssertAcquired();
+
+ if (io_handler_->pending_read() || io_handler_->pending_write_no_lock()) {
+ // |io_handler_| will be alive until pending read/write (if any) completes.
+ // Call |CancelIoEx()| or |CancelIo()| so that resources can be freed up as
+ // soon as possible.
+ // Note: |CancelIo()| only cancels read/write requests started from this
+ // thread.
+ if (g_vista_or_higher_functions.Get().is_vista_or_higher())
+ g_vista_or_higher_functions.Get().CancelIoEx(io_handler_->handle(), NULL);
+ else
+ CancelIo(io_handler_->handle());
+ }
+
+ io_handler_->DetachFromOwnerNoLock(read_buffer.Pass(), write_buffer.Pass());
+ io_handler_ = NULL;
+}
+
+} // namespace
+
// -----------------------------------------------------------------------------
// Static factory method declared in raw_channel.h.
// static
RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle,
Delegate* delegate,
- base::MessageLoopForIO* message_loop_for_io) {
- // TODO(vtl)
- NOTIMPLEMENTED();
- return NULL;
+ base::MessageLoopForIO* message_loop) {
+ return new RawChannelWin(handle.Pass(), delegate, message_loop);
}
} // namespace system