diff options
author | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-10-15 19:53:54 +0000 |
---|---|---|
committer | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-10-15 19:53:54 +0000 |
commit | da83553b1671cd51a986275947400d8cfc56883e (patch) | |
tree | afdfbe6ebb6c28511dddfb8acec4b569ea76e610 /mojo/system | |
parent | 4b9d0863d0a116dbc747cc057dac0fd94604b8bd (diff) | |
download | chromium_src-da83553b1671cd51a986275947400d8cfc56883e.zip chromium_src-da83553b1671cd51a986275947400d8cfc56883e.tar.gz chromium_src-da83553b1671cd51a986275947400d8cfc56883e.tar.bz2 |
Add handling of fatal errors to RawChannel/RawChannelPosix.
Also fix a race if |Shutdown()| were called simultaneously (on the I/O thread)
with |WriteMessage()| being called (and posting |WaitToWrite()|).
R=davemoore@chromium.org
Review URL: https://codereview.chromium.org/27241003
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@228745 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo/system')
-rw-r--r-- | mojo/system/raw_channel.h | 10 | ||||
-rw-r--r-- | mojo/system/raw_channel_posix.cc | 93 | ||||
-rw-r--r-- | mojo/system/raw_channel_posix_unittest.cc | 119 |
3 files changed, 189 insertions, 33 deletions
diff --git a/mojo/system/raw_channel.h b/mojo/system/raw_channel.h index c56dbb5..0c19ad0 100644 --- a/mojo/system/raw_channel.h +++ b/mojo/system/raw_channel.h @@ -48,10 +48,20 @@ class MOJO_SYSTEM_EXPORT RawChannel { // (passed in on creation). class Delegate { public: + enum FatalError { + FATAL_ERROR_UNKNOWN = 0, + FATAL_ERROR_FAILED_READ, + FATAL_ERROR_FAILED_WRITE + }; + // Called when a message is read. This may call |Shutdown()| on the // |RawChannel|, but must not destroy it. virtual void OnReadMessage(const MessageInTransit& message) = 0; + // Called when there's a fatal error, which leads to the channel no longer + // being viable. + virtual void OnFatalError(FatalError fatal_error) = 0; + protected: virtual ~Delegate() {} }; diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc index d81d1bf..e218d84 100644 --- a/mojo/system/raw_channel_posix.cc +++ b/mojo/system/raw_channel_posix.cc @@ -18,6 +18,7 @@ #include "base/location.h" #include "base/logging.h" #include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" #include "base/message_loop/message_loop.h" #include "base/posix/eintr_wrapper.h" #include "base/synchronization/lock.h" @@ -50,6 +51,10 @@ class RawChannelPosix : public RawChannel, // Watches for |fd_| to become writable. Must be called on the I/O thread. void WaitToWrite(); + // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O + // thread WITHOUT |write_lock_| held. + void CallOnFatalError(Delegate::FatalError fatal_error); + // Writes the message at the front of |write_message_queue_|, starting at // |write_message_offset_|. It removes and destroys if the write completes and // otherwise updates |write_message_offset_|. Returns true on success. Must be @@ -57,15 +62,15 @@ class RawChannelPosix : public RawChannel, bool WriteFrontMessageNoLock(); // Cancels all pending writes and destroys the contents of - // |write_message_queue_|. Should only be called if |is_writable_| is true; - // sets |is_writable_| to false. Must be called under |write_lock_|. + // |write_message_queue_|. Should only be called if |is_dead_| is false; sets + // |is_dead_| to true. Must be called under |write_lock_|. void CancelPendingWritesNoLock(); base::MessageLoopForIO* message_loop_for_io() { return static_cast<base::MessageLoopForIO*>(message_loop()); } - const int fd_; + int fd_; // Only used on the I/O thread: scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; @@ -79,9 +84,13 @@ class RawChannelPosix : public RawChannel, size_t read_buffer_num_valid_bytes_; base::Lock write_lock_; // Protects the following members. - bool is_writable_; + bool is_dead_; std::deque<MessageInTransit*> write_message_queue_; size_t write_message_offset_; + // This is used for posting tasks from write threads to the I/O thread. It + // must only be accessed under |write_lock_|. The weak pointers it produces + // are only used/invalidated on the I/O thread. + base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); }; @@ -92,13 +101,21 @@ RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle, : RawChannel(delegate, message_loop), fd_(handle.fd), read_buffer_num_valid_bytes_(0), - is_writable_(true), - write_message_offset_(0) { + is_dead_(false), + write_message_offset_(0), + weak_ptr_factory_(this) { CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO); + DCHECK_NE(fd_, -1); } RawChannelPosix::~RawChannelPosix() { - close(fd_); + DCHECK(is_dead_); + DCHECK_EQ(fd_, -1); + + // No need to take the |write_lock_| here -- if there are still weak pointers + // outstanding, then we're hosed anyway (since we wouldn't be able to + // invalidate them cleanly, since we might not be on the I/O thread). + DCHECK(!weak_ptr_factory_.HasWeakPtrs()); // These must have been shut down/destroyed on the I/O thread. DCHECK(!read_watcher_.get()); @@ -125,9 +142,16 @@ void RawChannelPosix::Shutdown() { DCHECK_EQ(base::MessageLoop::current(), message_loop()); base::AutoLock locker(write_lock_); - if (is_writable_) + if (!is_dead_) CancelPendingWritesNoLock(); + DCHECK_NE(fd_, -1); + if (close(fd_) != 0) + PLOG(ERROR) << "close"; + fd_ = -1; + + weak_ptr_factory_.InvalidateWeakPtrs(); + read_watcher_.reset(); // This will stop watching (if necessary). write_watcher_.reset(); // This will stop watching (if necessary). } @@ -136,7 +160,7 @@ void RawChannelPosix::Shutdown() { // success. bool RawChannelPosix::WriteMessage(MessageInTransit* message) { base::AutoLock locker(write_lock_); - if (!is_writable_) { + if (is_dead_) { message->Destroy(); return false; } @@ -151,7 +175,14 @@ bool RawChannelPosix::WriteMessage(MessageInTransit* message) { bool result = WriteFrontMessageNoLock(); DCHECK(result || write_message_queue_.empty()); - if (!write_message_queue_.empty()) { + if (!result) { + // Even if we're on the I/O thread, don't call |OnFatalError()| in the + // nested context. + message_loop()->PostTask(FROM_HERE, + base::Bind(&RawChannelPosix::CallOnFatalError, + weak_ptr_factory_.GetWeakPtr(), + Delegate::FATAL_ERROR_FAILED_WRITE)); + } else if (!write_message_queue_.empty()) { // Set up to wait for the FD to become writable. If we're not on the I/O // thread, we have to post a task to do this. if (base::MessageLoop::current() == message_loop()) { @@ -159,7 +190,7 @@ bool RawChannelPosix::WriteMessage(MessageInTransit* message) { } else { message_loop()->PostTask(FROM_HERE, base::Bind(&RawChannelPosix::WaitToWrite, - base::Unretained(this))); + weak_ptr_factory_.GetWeakPtr())); } } @@ -196,10 +227,13 @@ void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], kReadSize)); if (bytes_read < 0) { - base::AutoLock locker(write_lock_); if (errno != EAGAIN && errno != EWOULDBLOCK) { PLOG(ERROR) << "read"; - CancelPendingWritesNoLock(); + { + base::AutoLock locker(write_lock_); + CancelPendingWritesNoLock(); + } + CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); return; } @@ -262,15 +296,22 @@ void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { DCHECK_EQ(fd, fd_); DCHECK_EQ(base::MessageLoop::current(), message_loop()); - base::AutoLock locker(write_lock_); - DCHECK(is_writable_); - DCHECK(!write_message_queue_.empty()); + bool did_fail = false; + { + base::AutoLock locker(write_lock_); + DCHECK(!is_dead_); + DCHECK(!write_message_queue_.empty()); - bool result = WriteFrontMessageNoLock(); - DCHECK(result || write_message_queue_.empty()); + bool result = WriteFrontMessageNoLock(); + DCHECK(result || write_message_queue_.empty()); - if (!write_message_queue_.empty()) - WaitToWrite(); + if (!result) + did_fail = true; + else if (!write_message_queue_.empty()) + WaitToWrite(); + } + if (did_fail) + CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); } void RawChannelPosix::WaitToWrite() { @@ -283,10 +324,16 @@ void RawChannelPosix::WaitToWrite() { DCHECK(result); } +void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { + DCHECK_EQ(base::MessageLoop::current(), message_loop()); + // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? + delegate()->OnFatalError(fatal_error); +} + bool RawChannelPosix::WriteFrontMessageNoLock() { write_lock_.AssertAcquired(); - DCHECK(is_writable_); + DCHECK(!is_dead_); DCHECK(!write_message_queue_.empty()); MessageInTransit* message = write_message_queue_.front(); @@ -326,9 +373,9 @@ bool RawChannelPosix::WriteFrontMessageNoLock() { void RawChannelPosix::CancelPendingWritesNoLock() { write_lock_.AssertAcquired(); - DCHECK(is_writable_); + DCHECK(!is_dead_); - is_writable_ = false; + is_dead_ = true; for (std::deque<MessageInTransit*>::iterator it = write_message_queue_.begin(); it != write_message_queue_.end(); ++it) { diff --git a/mojo/system/raw_channel_posix_unittest.cc b/mojo/system/raw_channel_posix_unittest.cc index 8599996..94d9943 100644 --- a/mojo/system/raw_channel_posix_unittest.cc +++ b/mojo/system/raw_channel_posix_unittest.cc @@ -99,9 +99,9 @@ class RawChannelPosixTest : public testing::Test { virtual void TearDown() OVERRIDE { if (fds_[0] != -1) - close(fds_[0]); + CHECK_EQ(close(fds_[0]), 0); if (fds_[1] != -1) - close(fds_[1]); + CHECK_EQ(close(fds_[1]), 0); io_thread_.Stop(); } @@ -130,6 +130,9 @@ class WriteOnlyRawChannelDelegate : public RawChannel::Delegate { virtual void OnReadMessage(const MessageInTransit& /*message*/) OVERRIDE { NOTREACHED(); } + virtual void OnFatalError(FatalError /*fatal_error*/) OVERRIDE { + NOTREACHED(); + } private: DISALLOW_COPY_AND_ASSIGN(WriteOnlyRawChannelDelegate); @@ -221,13 +224,13 @@ TEST_F(RawChannelPosixTest, WriteMessage) { // Write and read, for a variety of sizes. for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) { - rc->WriteMessage(MakeTestMessage(size)); + EXPECT_TRUE(rc->WriteMessage(MakeTestMessage(size))); EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size; } // Write/queue and read afterwards, for a variety of sizes. for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) - rc->WriteMessage(MakeTestMessage(size)); + EXPECT_TRUE(rc->WriteMessage(MakeTestMessage(size))); for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size; @@ -246,8 +249,7 @@ class ReadCheckerRawChannelDelegate : public RawChannel::Delegate { position_(0) {} virtual ~ReadCheckerRawChannelDelegate() {} - // |RawChannel::Delegate| implementation: - // Called on the I/O thread. + // |RawChannel::Delegate| implementation (called on the I/O thread): virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE { size_t position; size_t expected_size; @@ -271,6 +273,9 @@ class ReadCheckerRawChannelDelegate : public RawChannel::Delegate { if (should_signal) done_event_.Signal(); } + virtual void OnFatalError(FatalError /*fatal_error*/) OVERRIDE { + NOTREACHED(); + } // Wait for all the messages (of sizes |expected_sizes_|) to be seen. void Wait() { @@ -359,8 +364,8 @@ class RawChannelWriterThread : public base::SimpleThread { static const int kMaxRandomMessageSize = 25000; while (left_to_write_-- > 0) { - raw_channel_->WriteMessage(MakeTestMessage( - static_cast<uint32_t>(base::RandInt(1, kMaxRandomMessageSize)))); + EXPECT_TRUE(raw_channel_->WriteMessage(MakeTestMessage( + static_cast<uint32_t>(base::RandInt(1, kMaxRandomMessageSize))))); } } @@ -378,8 +383,7 @@ class ReadCountdownRawChannelDelegate : public RawChannel::Delegate { count_(0) {} virtual ~ReadCountdownRawChannelDelegate() {} - // |RawChannel::Delegate| implementation: - // Called on the I/O thread. + // |RawChannel::Delegate| implementation (called on the I/O thread): virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE { EXPECT_LT(count_, expected_count_); count_++; @@ -389,6 +393,9 @@ class ReadCountdownRawChannelDelegate : public RawChannel::Delegate { if (count_ >= expected_count_) done_event_.Signal(); } + virtual void OnFatalError(FatalError /*fatal_error*/) OVERRIDE { + NOTREACHED(); + } // Wait for all the messages to have been seen. void Wait() { @@ -462,6 +469,98 @@ TEST_F(RawChannelPosixTest, WriteMessageAndOnReadMessage) { base::Unretained(writer_rc.get()))); } +// RawChannelPosixTest.OnFatalError -------------------------------------------- + +class FatalErrorRecordingRawChannelDelegate : public RawChannel::Delegate { + public: + FatalErrorRecordingRawChannelDelegate() + : got_fatal_error_event_(false, false), + on_fatal_error_call_count_(0), + last_fatal_error_(FATAL_ERROR_UNKNOWN) {} + virtual ~FatalErrorRecordingRawChannelDelegate() {} + + // |RawChannel::Delegate| implementation: + virtual void OnReadMessage(const MessageInTransit& /*message*/) OVERRIDE { + NOTREACHED(); + } + virtual void OnFatalError(FatalError fatal_error) OVERRIDE { + CHECK_EQ(on_fatal_error_call_count_, 0); + on_fatal_error_call_count_++; + last_fatal_error_ = fatal_error; + got_fatal_error_event_.Signal(); + } + + FatalError WaitForFatalError() { + got_fatal_error_event_.Wait(); + CHECK_EQ(on_fatal_error_call_count_, 1); + return last_fatal_error_; + } + + private: + base::WaitableEvent got_fatal_error_event_; + + int on_fatal_error_call_count_; + FatalError last_fatal_error_; + + DISALLOW_COPY_AND_ASSIGN(FatalErrorRecordingRawChannelDelegate); +}; + +// Tests fatal errors. +// TODO(vtl): Figure out how to make reading fail reliably. (I'm not convinced +// that it does.) +TEST_F(RawChannelPosixTest, OnFatalError) { + FatalErrorRecordingRawChannelDelegate delegate; + scoped_ptr<RawChannel> rc(RawChannel::Create(PlatformChannelHandle(fd(0)), + &delegate, + message_loop())); + // |RawChannel::Create()| takes ownership of the FD. + clear_fd(0); + + PostTaskAndWait(message_loop(), + FROM_HERE, + base::Bind(&RawChannel::Init, base::Unretained(rc.get()))); + + // Close the other end, which should make writing fail. + CHECK_EQ(close(fd(1)), 0); + clear_fd(1); + + EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); + + // TODO(vtl): In theory, it's conceivable that closing the other end might + // lead to read failing. In practice, it doesn't seem to. + EXPECT_EQ(RawChannel::Delegate::FATAL_ERROR_FAILED_WRITE, + delegate.WaitForFatalError()); + + PostTaskAndWait(message_loop(), + FROM_HERE, + base::Bind(&RawChannel::Shutdown, + base::Unretained(rc.get()))); + +} + +// RawChannelPosixTest.WriteMessageAfterShutdown ------------------------------- + +// Makes sure that calling |WriteMessage()| after |Shutdown()| behaves +// correctly. +TEST_F(RawChannelPosixTest, WriteMessageAfterShutdown) { + WriteOnlyRawChannelDelegate delegate; + scoped_ptr<RawChannel> rc(RawChannel::Create(PlatformChannelHandle(fd(0)), + &delegate, + message_loop())); + // |RawChannel::Create()| takes ownership of the FD. + clear_fd(0); + + PostTaskAndWait(message_loop(), + FROM_HERE, + base::Bind(&RawChannel::Init, base::Unretained(rc.get()))); + PostTaskAndWait(message_loop(), + FROM_HERE, + base::Bind(&RawChannel::Shutdown, + base::Unretained(rc.get()))); + + EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); +} + } // namespace } // namespace system } // namespace mojo |