diff options
author | yzshen@chromium.org <yzshen@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-03-04 07:44:13 +0000 |
---|---|---|
committer | yzshen@chromium.org <yzshen@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-03-04 07:44:13 +0000 |
commit | 44992e712496b368a207685f5cc1c46a3b2f8a57 (patch) | |
tree | 3bb47deef06f1031bcaf858897bdcd5686ac5519 /mojo/system | |
parent | f02c6ccd70c378739e1afb38beebe7e25cc08cab (diff) | |
download | chromium_src-44992e712496b368a207685f5cc1c46a3b2f8a57.zip chromium_src-44992e712496b368a207685f5cc1c46a3b2f8a57.tar.gz chromium_src-44992e712496b368a207685f5cc1c46a3b2f8a57.tar.bz2 |
Implement RawChannel on Windows.
- add RawChannelWin class;
- move read/write buffer managing code to RawChannel so that it can be shared by both RawChannelWin and RawChannelPosix.
- refactor unittests.
TEST=raw_channel_unittests.cc
Review URL: https://codereview.chromium.org/166293002
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@254710 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo/system')
-rw-r--r-- | mojo/system/message_in_transit.h | 2 | ||||
-rw-r--r-- | mojo/system/raw_channel_unittest.cc (renamed from mojo/system/raw_channel_posix_unittest.cc) | 121 | ||||
-rw-r--r-- | mojo/system/raw_channel_win.cc | 525 |
3 files changed, 604 insertions, 44 deletions
diff --git a/mojo/system/message_in_transit.h b/mojo/system/message_in_transit.h index 66f374d..cbb7271 100644 --- a/mojo/system/message_in_transit.h +++ b/mojo/system/message_in_transit.h @@ -68,7 +68,7 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit { struct Header; public: // This represents a view of serialized message data in a raw buffer. - class View { + class MOJO_SYSTEM_IMPL_EXPORT View { public: // Constructs a view from the given buffer of the given size. (The size must // be as provided by |MessageInTransit::GetNextMessageSize()|.) The buffer diff --git a/mojo/system/raw_channel_posix_unittest.cc b/mojo/system/raw_channel_unittest.cc index b0d5fc6..5723f1c 100644 --- a/mojo/system/raw_channel_posix_unittest.cc +++ b/mojo/system/raw_channel_unittest.cc @@ -1,14 +1,9 @@ -// Copyright 2013 The Chromium Authors. All rights reserved. +// Copyright 2014 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -// TODO(vtl): Factor out the remaining POSIX-specific bits of this test (once we -// have a non-POSIX implementation). - #include "mojo/system/raw_channel.h" -#include <sys/socket.h> - #include <vector> #include "base/basictypes.h" @@ -25,6 +20,7 @@ #include "base/threading/platform_thread.h" // For |Sleep()|. #include "base/threading/simple_thread.h" #include "base/time/time.h" +#include "build/build_config.h" #include "mojo/common/test/test_utils.h" #include "mojo/system/embedder/platform_channel_pair.h" #include "mojo/system/embedder/platform_handle.h" @@ -32,6 +28,10 @@ #include "mojo/system/message_in_transit.h" #include "mojo/system/test_utils.h" +#if defined(OS_POSIX) +#include <sys/socket.h> +#endif + namespace mojo { namespace system { namespace { @@ -71,10 +71,10 @@ bool WriteTestMessageToHandle(const embedder::PlatformHandle& handle, // ----------------------------------------------------------------------------- -class RawChannelPosixTest : public test::TestWithIOThreadBase { +class RawChannelTest : public test::TestWithIOThreadBase { public: - RawChannelPosixTest() {} - virtual ~RawChannelPosixTest() {} + RawChannelTest() {} + virtual ~RawChannelTest() {} virtual void SetUp() OVERRIDE { test::TestWithIOThreadBase::SetUp(); @@ -95,10 +95,10 @@ class RawChannelPosixTest : public test::TestWithIOThreadBase { embedder::ScopedPlatformHandle handles[2]; private: - DISALLOW_COPY_AND_ASSIGN(RawChannelPosixTest); + DISALLOW_COPY_AND_ASSIGN(RawChannelTest); }; -// RawChannelPosixTest.WriteMessage -------------------------------------------- +// RawChannelTest.WriteMessage ------------------------------------------------- class WriteOnlyRawChannelDelegate : public RawChannel::Delegate { public: @@ -187,7 +187,7 @@ class TestMessageReaderAndChecker { }; // Tests writing (and verifies reading using our own custom reader). -TEST_F(RawChannelPosixTest, WriteMessage) { +TEST_F(RawChannelTest, WriteMessage) { WriteOnlyRawChannelDelegate delegate; scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass(), &delegate, @@ -217,7 +217,7 @@ TEST_F(RawChannelPosixTest, WriteMessage) { base::Unretained(rc.get()))); } -// RawChannelPosixTest.OnReadMessage ------------------------------------------- +// RawChannelTest.OnReadMessage ------------------------------------------------ class ReadCheckerRawChannelDelegate : public RawChannel::Delegate { public: @@ -278,7 +278,7 @@ class ReadCheckerRawChannelDelegate : public RawChannel::Delegate { }; // Tests reading (writing using our own custom writer). -TEST_F(RawChannelPosixTest, OnReadMessage) { +TEST_F(RawChannelTest, OnReadMessage) { ReadCheckerRawChannelDelegate delegate; scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass(), &delegate, @@ -313,7 +313,7 @@ TEST_F(RawChannelPosixTest, OnReadMessage) { base::Unretained(rc.get()))); } -// RawChannelPosixTest.WriteMessageAndOnReadMessage ---------------------------- +// RawChannelTest.WriteMessageAndOnReadMessage --------------------------------- class RawChannelWriterThread : public base::SimpleThread { public: @@ -380,7 +380,7 @@ class ReadCountdownRawChannelDelegate : public RawChannel::Delegate { DISALLOW_COPY_AND_ASSIGN(ReadCountdownRawChannelDelegate); }; -TEST_F(RawChannelPosixTest, WriteMessageAndOnReadMessage) { +TEST_F(RawChannelTest, WriteMessageAndOnReadMessage) { static const size_t kNumWriterThreads = 10; static const size_t kNumWriteMessagesPerThread = 4000; @@ -433,37 +433,46 @@ TEST_F(RawChannelPosixTest, WriteMessageAndOnReadMessage) { base::Unretained(writer_rc.get()))); } -// RawChannelPosixTest.OnFatalError -------------------------------------------- +// RawChannelTest.OnFatalError ------------------------------------------------- class FatalErrorRecordingRawChannelDelegate : public ReadCountdownRawChannelDelegate { public: - FatalErrorRecordingRawChannelDelegate(size_t expected_read_count_) - : ReadCountdownRawChannelDelegate(expected_read_count_), + FatalErrorRecordingRawChannelDelegate(size_t expected_read_count, + bool expect_read_error, + bool expect_write_error) + : ReadCountdownRawChannelDelegate(expected_read_count), got_fatal_error_event_(false, false), - on_fatal_error_call_count_(0), - last_fatal_error_(FATAL_ERROR_UNKNOWN) {} + expecting_read_error_(expect_read_error), + expecting_write_error_(expect_write_error) { + } virtual ~FatalErrorRecordingRawChannelDelegate() {} virtual void OnFatalError(FatalError fatal_error) OVERRIDE { - CHECK_EQ(on_fatal_error_call_count_, 0); - on_fatal_error_call_count_++; - last_fatal_error_ = fatal_error; - got_fatal_error_event_.Signal(); + if (fatal_error == FATAL_ERROR_FAILED_READ) { + ASSERT_TRUE(expecting_read_error_); + expecting_read_error_ = false; + } else if (fatal_error == FATAL_ERROR_FAILED_WRITE) { + ASSERT_TRUE(expecting_write_error_); + expecting_write_error_ = false; + } else { + ASSERT_TRUE(false); + } + + if (!expecting_read_error_ && !expecting_write_error_) + got_fatal_error_event_.Signal(); } - FatalError WaitForFatalError() { + void WaitForFatalError() { got_fatal_error_event_.Wait(); - CHECK_EQ(on_fatal_error_call_count_, 1); - return last_fatal_error_; } private: base::WaitableEvent got_fatal_error_event_; - int on_fatal_error_call_count_; - FatalError last_fatal_error_; + bool expecting_read_error_; + bool expecting_write_error_; DISALLOW_COPY_AND_ASSIGN(FatalErrorRecordingRawChannelDelegate); }; @@ -471,10 +480,48 @@ class FatalErrorRecordingRawChannelDelegate // Tests fatal errors. // TODO(vtl): Figure out how to make reading fail reliably. (I'm not convinced // that it does.) -TEST_F(RawChannelPosixTest, OnFatalError) { +TEST_F(RawChannelTest, OnFatalError) { + FatalErrorRecordingRawChannelDelegate delegate(0, false, true); + + scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass(), + &delegate, + io_thread_message_loop())); + + test::PostTaskAndWait(io_thread_task_runner(), + FROM_HERE, + base::Bind(&InitOnIOThread, rc.get())); + + // Close the handle of the other end, which should make writing fail. + handles[1].reset(); + + EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); + + // TODO(vtl): In theory, it's conceivable that closing the other end might + // lead to read failing. In practice, it doesn't seem to. + delegate.WaitForFatalError(); + + EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(2))); + + // Sleep a bit, to make sure we don't get another |OnFatalError()| + // notification. (If we actually get another one, |OnFatalError()| crashes.) + base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100)); + + test::PostTaskAndWait(io_thread_task_runner(), + FROM_HERE, + base::Bind(&RawChannel::Shutdown, + base::Unretained(rc.get()))); +} + +#if defined(OS_POSIX) +// RawChannelTest.ReadUnaffectedByWriteFatalError ------------------------------ + +// TODO(yzshen): On Windows, I haven't figured out a way to shut down one +// direction of the named pipe. +TEST_F(RawChannelTest, ReadUnaffectedByWriteFatalError) { const size_t kMessageCount = 5; - FatalErrorRecordingRawChannelDelegate delegate(2 * kMessageCount); + FatalErrorRecordingRawChannelDelegate delegate(2 * kMessageCount, false, + true); scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass(), &delegate, io_thread_message_loop())); @@ -495,10 +542,7 @@ TEST_F(RawChannelPosixTest, OnFatalError) { EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); - // TODO(vtl): In theory, it's conceivable that closing the other end might - // lead to read failing. In practice, it doesn't seem to. - EXPECT_EQ(RawChannel::Delegate::FATAL_ERROR_FAILED_WRITE, - delegate.WaitForFatalError()); + delegate.WaitForFatalError(); EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(2))); @@ -519,12 +563,13 @@ TEST_F(RawChannelPosixTest, OnFatalError) { base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); } +#endif // defined(OS_POSIX) -// RawChannelPosixTest.WriteMessageAfterShutdown ------------------------------- +// RawChannelTest.WriteMessageAfterShutdown ------------------------------------ // Makes sure that calling |WriteMessage()| after |Shutdown()| behaves // correctly. -TEST_F(RawChannelPosixTest, WriteMessageAfterShutdown) { +TEST_F(RawChannelTest, WriteMessageAfterShutdown) { WriteOnlyRawChannelDelegate delegate; scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass(), &delegate, diff --git a/mojo/system/raw_channel_win.cc b/mojo/system/raw_channel_win.cc index 7fe2f94..e329d8d 100644 --- a/mojo/system/raw_channel_win.cc +++ b/mojo/system/raw_channel_win.cc @@ -4,23 +4,538 @@ #include "mojo/system/raw_channel.h" -#include <stddef.h> +#include <windows.h> +#include "base/basictypes.h" +#include "base/bind.h" +#include "base/compiler_specific.h" +#include "base/lazy_instance.h" +#include "base/location.h" #include "base/logging.h" +#include "base/memory/scoped_ptr.h" +#include "base/message_loop/message_loop.h" +#include "base/synchronization/lock.h" +#include "base/win/windows_version.h" +#include "mojo/system/embedder/platform_handle.h" namespace mojo { namespace system { +namespace { + +class VistaOrHigherFunctions { + public: + VistaOrHigherFunctions(); + + bool is_vista_or_higher() const { return is_vista_or_higher_; } + + BOOL SetFileCompletionNotificationModes(HANDLE handle, UCHAR flags) { + return set_file_completion_notification_modes_(handle, flags); + } + + BOOL CancelIoEx(HANDLE handle, LPOVERLAPPED overlapped) { + return cancel_io_ex_(handle, overlapped); + } + + private: + typedef BOOL (WINAPI *SetFileCompletionNotificationModesFunc)(HANDLE, UCHAR); + typedef BOOL (WINAPI *CancelIoExFunc)(HANDLE, LPOVERLAPPED); + + bool is_vista_or_higher_; + SetFileCompletionNotificationModesFunc + set_file_completion_notification_modes_; + CancelIoExFunc cancel_io_ex_; +}; + +VistaOrHigherFunctions::VistaOrHigherFunctions() + : is_vista_or_higher_(base::win::GetVersion() >= base::win::VERSION_VISTA), + set_file_completion_notification_modes_(NULL), + cancel_io_ex_(NULL) { + if (!is_vista_or_higher_) + return; + + HMODULE module = GetModuleHandleW(L"kernel32.dll"); + set_file_completion_notification_modes_ = + reinterpret_cast<SetFileCompletionNotificationModesFunc>( + GetProcAddress(module, "SetFileCompletionNotificationModes")); + DCHECK(set_file_completion_notification_modes_); + + cancel_io_ex_ = reinterpret_cast<CancelIoExFunc>( + GetProcAddress(module, "CancelIoEx")); + DCHECK(cancel_io_ex_); +} + +base::LazyInstance<VistaOrHigherFunctions> g_vista_or_higher_functions = + LAZY_INSTANCE_INITIALIZER; + +class RawChannelWin : public RawChannel { + public: + RawChannelWin(embedder::ScopedPlatformHandle handle, + Delegate* delegate, + base::MessageLoopForIO* message_loop); + virtual ~RawChannelWin(); + + private: + // RawChannelIOHandler receives OS notifications for I/O completion. It must + // be created on the I/O thread. + // + // It manages its own destruction. Destruction happens on the I/O thread when + // all the following conditions are satisfied: + // - |DetachFromOwnerNoLock()| has been called; + // - there is no pending read; + // - there is no pending write. + class RawChannelIOHandler : public base::MessageLoopForIO::IOHandler { + public: + RawChannelIOHandler(RawChannelWin* owner, + embedder::ScopedPlatformHandle handle); + + HANDLE handle() const { return handle_.get().handle; } + + // The following methods are only called by the owner on the I/O thread. + bool pending_read() const; + base::MessageLoopForIO::IOContext* read_context(); + // Instructs the object to wait for an |OnIOCompleted()| notification. + void OnPendingReadStarted(); + + // The following methods are only called by the owner under + // |owner_->write_lock()|. + bool pending_write_no_lock() const; + base::MessageLoopForIO::IOContext* write_context_no_lock(); + // Instructs the object to wait for an |OnIOCompleted()| notification. + void OnPendingWriteStartedNoLock(); + + // |base::MessageLoopForIO::IOHandler| implementation: + // Must be called on the I/O thread. It could be called before or after + // detached from the owner. + virtual void OnIOCompleted(base::MessageLoopForIO::IOContext* context, + DWORD bytes_transferred, + DWORD error) OVERRIDE; + + // Must be called on the I/O thread under |owner_->write_lock()|. + // After this call, the owner must not make any further calls on this + // object, and therefore the object is used on the I/O thread exclusively + // (if it stays alive). + void DetachFromOwnerNoLock(scoped_ptr<ReadBuffer> read_buffer, + scoped_ptr<WriteBuffer> write_buffer); + + private: + virtual ~RawChannelIOHandler(); + + // Returns true if |owner_| has been reset and there is not pending read or + // write. + // Must be called on the I/O thread. + bool ShouldSelfDestruct() const; + + // Must be called on the I/O thread. It could be called before or after + // detached from the owner. + void OnReadCompleted(DWORD bytes_read, DWORD error); + // Must be called on the I/O thread. It could be called before or after + // detached from the owner. + void OnWriteCompleted(DWORD bytes_written, DWORD error); + + embedder::ScopedPlatformHandle handle_; + + // |owner_| is reset on the I/O thread under |owner_->write_lock()|. + // Therefore, it may be used on any thread under lock; or on the I/O thread + // without locking. + RawChannelWin* owner_; + + // The following members must be used on the I/O thread. + scoped_ptr<ReadBuffer> preserved_read_buffer_after_detach_; + scoped_ptr<WriteBuffer> preserved_write_buffer_after_detach_; + + bool pending_read_; + base::MessageLoopForIO::IOContext read_context_; + + // The following members must be used under |owner_->write_lock()| while the + // object is still attached to the owner, and only on the I/O thread + // afterwards. + bool pending_write_; + base::MessageLoopForIO::IOContext write_context_; + + DISALLOW_COPY_AND_ASSIGN(RawChannelIOHandler); + }; + + // |RawChannel| implementation: + virtual IOResult Read(size_t* bytes_read) OVERRIDE; + virtual IOResult ScheduleRead() OVERRIDE; + virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE; + virtual IOResult ScheduleWriteNoLock() OVERRIDE; + virtual bool OnInit() OVERRIDE; + virtual void OnShutdownNoLock( + scoped_ptr<ReadBuffer> read_buffer, + scoped_ptr<WriteBuffer> write_buffer) OVERRIDE; + + // Passed to |io_handler_| during initialization. + embedder::ScopedPlatformHandle handle_; + + RawChannelIOHandler* io_handler_; + + const bool skip_completion_port_on_success_; + + DISALLOW_COPY_AND_ASSIGN(RawChannelWin); +}; + +RawChannelWin::RawChannelIOHandler::RawChannelIOHandler( + RawChannelWin* owner, + embedder::ScopedPlatformHandle handle) : handle_(handle.Pass()), + owner_(owner), + pending_read_(false), + pending_write_(false) { + memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped)); + read_context_.handler = this; + memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped)); + write_context_.handler = this; + + owner_->message_loop_for_io()->RegisterIOHandler(handle_.get().handle, this); +} + +RawChannelWin::RawChannelIOHandler::~RawChannelIOHandler() { + DCHECK(ShouldSelfDestruct()); +} + +bool RawChannelWin::RawChannelIOHandler::pending_read() const { + DCHECK(owner_); + DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); + return pending_read_; +} + +base::MessageLoopForIO::IOContext* + RawChannelWin::RawChannelIOHandler::read_context() { + DCHECK(owner_); + DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); + return &read_context_; +} + +void RawChannelWin::RawChannelIOHandler::OnPendingReadStarted() { + DCHECK(owner_); + DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); + DCHECK(!pending_read_); + pending_read_ = true; +} + +bool RawChannelWin::RawChannelIOHandler::pending_write_no_lock() const { + DCHECK(owner_); + owner_->write_lock().AssertAcquired(); + return pending_write_; +} + +base::MessageLoopForIO::IOContext* + RawChannelWin::RawChannelIOHandler::write_context_no_lock() { + DCHECK(owner_); + owner_->write_lock().AssertAcquired(); + return &write_context_; +} + +void RawChannelWin::RawChannelIOHandler::OnPendingWriteStartedNoLock() { + DCHECK(owner_); + owner_->write_lock().AssertAcquired(); + DCHECK(!pending_write_); + pending_write_ = true; +} + +void RawChannelWin::RawChannelIOHandler::OnIOCompleted( + base::MessageLoopForIO::IOContext* context, + DWORD bytes_transferred, + DWORD error) { + DCHECK(!owner_ || + base::MessageLoop::current() == owner_->message_loop_for_io()); + + if (context == &read_context_) + OnReadCompleted(bytes_transferred, error); + else if (context == &write_context_) + OnWriteCompleted(bytes_transferred, error); + else + NOTREACHED(); + + if (ShouldSelfDestruct()) + delete this; +} + +void RawChannelWin::RawChannelIOHandler::DetachFromOwnerNoLock( + scoped_ptr<ReadBuffer> read_buffer, + scoped_ptr<WriteBuffer> write_buffer) { + DCHECK(owner_); + DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); + owner_->write_lock().AssertAcquired(); + + // If read/write is pending, we have to retain the corresponding buffer. + if (pending_read_) + preserved_read_buffer_after_detach_ = read_buffer.Pass(); + if (pending_write_) + preserved_write_buffer_after_detach_ = write_buffer.Pass(); + + owner_ = NULL; + if (ShouldSelfDestruct()) + delete this; +} + +bool RawChannelWin::RawChannelIOHandler::ShouldSelfDestruct() const { + if (owner_) + return false; + + // Note: Detached, hence no lock needed for |pending_write_|. + return !pending_read_ && !pending_write_; +} + +void RawChannelWin::RawChannelIOHandler::OnReadCompleted(DWORD bytes_read, + DWORD error) { + DCHECK(!owner_ || + base::MessageLoop::current() == owner_->message_loop_for_io()); + + CHECK(pending_read_); + pending_read_ = false; + if (!owner_) + return; + + if (error != ERROR_SUCCESS) { + DCHECK_EQ(bytes_read, 0u); + + // ERROR_BROKEN_PIPE indicates that the other side has closed the handle. + // Don't regard that as a fatal error. We simply don't tell RawChannel that + // the read has actually completed. + if (error != ERROR_BROKEN_PIPE) { + LOG(ERROR) << "ReadFile failed: " << error; + owner_->OnReadCompleted(false, 0); + } + } else { + owner_->OnReadCompleted(true, bytes_read); + } +} + +void RawChannelWin::RawChannelIOHandler::OnWriteCompleted(DWORD bytes_written, + DWORD error) { + DCHECK(!owner_ || + base::MessageLoop::current() == owner_->message_loop_for_io()); + + if (!owner_) { + // No lock needed. + CHECK(pending_write_); + pending_write_ = false; + return; + } + + { + base::AutoLock locker(owner_->write_lock()); + CHECK(pending_write_); + pending_write_ = false; + } + + if (error != ERROR_SUCCESS) { + LOG(ERROR) << "WriteFile failed: " << error; + owner_->OnWriteCompleted(false, 0); + } else { + owner_->OnWriteCompleted(true, bytes_written); + } +} + +RawChannelWin::RawChannelWin(embedder::ScopedPlatformHandle handle, + Delegate* delegate, + base::MessageLoopForIO* message_loop) + : RawChannel(delegate, message_loop), + handle_(handle.Pass()), + io_handler_(NULL), + skip_completion_port_on_success_( + g_vista_or_higher_functions.Get().is_vista_or_higher()) { + DCHECK(handle_.is_valid()); +} + +RawChannelWin::~RawChannelWin() { + DCHECK(!io_handler_); +} + +RawChannel::IOResult RawChannelWin::Read(size_t* bytes_read) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + DCHECK(io_handler_); + DCHECK(!io_handler_->pending_read()); + + char* buffer = NULL; + size_t bytes_to_read = 0; + read_buffer()->GetBuffer(&buffer, &bytes_to_read); + + DWORD bytes_read_dword = 0; + BOOL result = ReadFile(io_handler_->handle(), + buffer, + static_cast<DWORD>(bytes_to_read), + &bytes_read_dword, + &io_handler_->read_context()->overlapped); + if (!result) { + DCHECK_EQ(bytes_read_dword, 0u); + DWORD error = GetLastError(); + if (error != ERROR_IO_PENDING) { + if (error != ERROR_BROKEN_PIPE) { + PLOG(ERROR) << "ReadFile"; + return IO_FAILED; + } + + // Please see comments in |RawChannelIOHandler::OnReadCompleted()| for why + // handling ERROR_BROKEN_PIPE differently. + return IO_PENDING; + } + } + + if (result && skip_completion_port_on_success_) { + *bytes_read = bytes_read_dword; + return IO_SUCCEEDED; + } + + // If the read is pending or the read has succeeded but we don't skip + // completion port on success, instruct |io_handler_| to wait for the + // completion packet. + // + // TODO(yzshen): It seems there isn't document saying that all error cases + // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion + // packet. If we do get one for errors, |RawChannelIOHandler::OnIOCompleted()| + // will crash so we will learn about it. + + io_handler_->OnPendingReadStarted(); + return IO_PENDING; +} + +RawChannel::IOResult RawChannelWin::ScheduleRead() { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + DCHECK(io_handler_); + DCHECK(!io_handler_->pending_read()); + + size_t bytes_read = 0; + IOResult io_result = Read(&bytes_read); + if (io_result == IO_SUCCEEDED) { + DCHECK(skip_completion_port_on_success_); + + // We have finished reading successfully. Queue a notification manually. + io_handler_->OnPendingReadStarted(); + // |io_handler_| won't go away before the task is run, so it is safe to use + // |base::Unretained()|. + message_loop_for_io()->PostTask( + FROM_HERE, + base::Bind(&RawChannelIOHandler::OnIOCompleted, + base::Unretained(io_handler_), + base::Unretained(io_handler_->read_context()), + static_cast<DWORD>(bytes_read), + ERROR_SUCCESS)); + return IO_PENDING; + } + + return io_result; +} + +RawChannel::IOResult RawChannelWin::WriteNoLock(size_t* bytes_written) { + write_lock().AssertAcquired(); + + DCHECK(io_handler_); + DCHECK(!io_handler_->pending_write_no_lock()); + + std::vector<WriteBuffer::Buffer> buffers; + write_buffer_no_lock()->GetBuffers(&buffers); + DCHECK(!buffers.empty()); + + // TODO(yzshen): Handle multi-segment writes more efficiently. + DWORD bytes_written_dword = 0; + BOOL result = WriteFile(io_handler_->handle(), + buffers[0].addr, + static_cast<DWORD>(buffers[0].size), + &bytes_written_dword, + &io_handler_->write_context_no_lock()->overlapped); + if (!result && GetLastError() != ERROR_IO_PENDING) { + PLOG(ERROR) << "WriteFile"; + return IO_FAILED; + } + + if (result && skip_completion_port_on_success_) { + *bytes_written = bytes_written_dword; + return IO_SUCCEEDED; + } + + // If the write is pending or the write has succeeded but we don't skip + // completion port on success, instruct |io_handler_| to wait for the + // completion packet. + // + // TODO(yzshen): it seems there isn't document saying that all error cases + // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion + // packet. If we do get one for errors, |RawChannelIOHandler::OnIOCompleted()| + // will crash so we will learn about it. + + io_handler_->OnPendingWriteStartedNoLock(); + return IO_PENDING; +} + +RawChannel::IOResult RawChannelWin::ScheduleWriteNoLock() { + write_lock().AssertAcquired(); + + DCHECK(io_handler_); + DCHECK(!io_handler_->pending_write_no_lock()); + + size_t bytes_written = 0; + IOResult io_result = WriteNoLock(&bytes_written); + if (io_result == IO_SUCCEEDED) { + DCHECK(skip_completion_port_on_success_); + + // We have finished writing successfully. Queue a notification manually. + io_handler_->OnPendingWriteStartedNoLock(); + // |io_handler_| won't go away before that task is run, so it is safe to use + // |base::Unretained()|. + message_loop_for_io()->PostTask( + FROM_HERE, + base::Bind(&RawChannelIOHandler::OnIOCompleted, + base::Unretained(io_handler_), + base::Unretained(io_handler_->write_context_no_lock()), + static_cast<DWORD>(bytes_written), + ERROR_SUCCESS)); + return IO_PENDING; + } + + return io_result; +} + +bool RawChannelWin::OnInit() { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + + DCHECK(handle_.is_valid()); + if (skip_completion_port_on_success_ && + !g_vista_or_higher_functions.Get().SetFileCompletionNotificationModes( + handle_.get().handle, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)) { + return false; + } + + DCHECK(!io_handler_); + io_handler_ = new RawChannelIOHandler(this, handle_.Pass()); + + return true; +} + +void RawChannelWin::OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer, + scoped_ptr<WriteBuffer> write_buffer) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); + DCHECK(io_handler_); + + write_lock().AssertAcquired(); + + if (io_handler_->pending_read() || io_handler_->pending_write_no_lock()) { + // |io_handler_| will be alive until pending read/write (if any) completes. + // Call |CancelIoEx()| or |CancelIo()| so that resources can be freed up as + // soon as possible. + // Note: |CancelIo()| only cancels read/write requests started from this + // thread. + if (g_vista_or_higher_functions.Get().is_vista_or_higher()) + g_vista_or_higher_functions.Get().CancelIoEx(io_handler_->handle(), NULL); + else + CancelIo(io_handler_->handle()); + } + + io_handler_->DetachFromOwnerNoLock(read_buffer.Pass(), write_buffer.Pass()); + io_handler_ = NULL; +} + +} // namespace + // ----------------------------------------------------------------------------- // Static factory method declared in raw_channel.h. // static RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, Delegate* delegate, - base::MessageLoopForIO* message_loop_for_io) { - // TODO(vtl) - NOTIMPLEMENTED(); - return NULL; + base::MessageLoopForIO* message_loop) { + return new RawChannelWin(handle.Pass(), delegate, message_loop); } } // namespace system |