diff options
author | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-10-15 19:53:54 +0000 |
---|---|---|
committer | viettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-10-15 19:53:54 +0000 |
commit | da83553b1671cd51a986275947400d8cfc56883e (patch) | |
tree | afdfbe6ebb6c28511dddfb8acec4b569ea76e610 /mojo/system/raw_channel_posix.cc | |
parent | 4b9d0863d0a116dbc747cc057dac0fd94604b8bd (diff) | |
download | chromium_src-da83553b1671cd51a986275947400d8cfc56883e.zip chromium_src-da83553b1671cd51a986275947400d8cfc56883e.tar.gz chromium_src-da83553b1671cd51a986275947400d8cfc56883e.tar.bz2 |
Add handling of fatal errors to RawChannel/RawChannelPosix.
Also fix a race if |Shutdown()| were called simultaneously (on the I/O thread)
with |WriteMessage()| being called (and posting |WaitToWrite()|).
R=davemoore@chromium.org
Review URL: https://codereview.chromium.org/27241003
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@228745 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo/system/raw_channel_posix.cc')
-rw-r--r-- | mojo/system/raw_channel_posix.cc | 93 |
1 files changed, 70 insertions, 23 deletions
diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc index d81d1bf..e218d84 100644 --- a/mojo/system/raw_channel_posix.cc +++ b/mojo/system/raw_channel_posix.cc @@ -18,6 +18,7 @@ #include "base/location.h" #include "base/logging.h" #include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" #include "base/message_loop/message_loop.h" #include "base/posix/eintr_wrapper.h" #include "base/synchronization/lock.h" @@ -50,6 +51,10 @@ class RawChannelPosix : public RawChannel, // Watches for |fd_| to become writable. Must be called on the I/O thread. void WaitToWrite(); + // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O + // thread WITHOUT |write_lock_| held. + void CallOnFatalError(Delegate::FatalError fatal_error); + // Writes the message at the front of |write_message_queue_|, starting at // |write_message_offset_|. It removes and destroys if the write completes and // otherwise updates |write_message_offset_|. Returns true on success. Must be @@ -57,15 +62,15 @@ class RawChannelPosix : public RawChannel, bool WriteFrontMessageNoLock(); // Cancels all pending writes and destroys the contents of - // |write_message_queue_|. Should only be called if |is_writable_| is true; - // sets |is_writable_| to false. Must be called under |write_lock_|. + // |write_message_queue_|. Should only be called if |is_dead_| is false; sets + // |is_dead_| to true. Must be called under |write_lock_|. void CancelPendingWritesNoLock(); base::MessageLoopForIO* message_loop_for_io() { return static_cast<base::MessageLoopForIO*>(message_loop()); } - const int fd_; + int fd_; // Only used on the I/O thread: scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; @@ -79,9 +84,13 @@ class RawChannelPosix : public RawChannel, size_t read_buffer_num_valid_bytes_; base::Lock write_lock_; // Protects the following members. - bool is_writable_; + bool is_dead_; std::deque<MessageInTransit*> write_message_queue_; size_t write_message_offset_; + // This is used for posting tasks from write threads to the I/O thread. It + // must only be accessed under |write_lock_|. The weak pointers it produces + // are only used/invalidated on the I/O thread. + base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); }; @@ -92,13 +101,21 @@ RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle, : RawChannel(delegate, message_loop), fd_(handle.fd), read_buffer_num_valid_bytes_(0), - is_writable_(true), - write_message_offset_(0) { + is_dead_(false), + write_message_offset_(0), + weak_ptr_factory_(this) { CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO); + DCHECK_NE(fd_, -1); } RawChannelPosix::~RawChannelPosix() { - close(fd_); + DCHECK(is_dead_); + DCHECK_EQ(fd_, -1); + + // No need to take the |write_lock_| here -- if there are still weak pointers + // outstanding, then we're hosed anyway (since we wouldn't be able to + // invalidate them cleanly, since we might not be on the I/O thread). + DCHECK(!weak_ptr_factory_.HasWeakPtrs()); // These must have been shut down/destroyed on the I/O thread. DCHECK(!read_watcher_.get()); @@ -125,9 +142,16 @@ void RawChannelPosix::Shutdown() { DCHECK_EQ(base::MessageLoop::current(), message_loop()); base::AutoLock locker(write_lock_); - if (is_writable_) + if (!is_dead_) CancelPendingWritesNoLock(); + DCHECK_NE(fd_, -1); + if (close(fd_) != 0) + PLOG(ERROR) << "close"; + fd_ = -1; + + weak_ptr_factory_.InvalidateWeakPtrs(); + read_watcher_.reset(); // This will stop watching (if necessary). write_watcher_.reset(); // This will stop watching (if necessary). } @@ -136,7 +160,7 @@ void RawChannelPosix::Shutdown() { // success. bool RawChannelPosix::WriteMessage(MessageInTransit* message) { base::AutoLock locker(write_lock_); - if (!is_writable_) { + if (is_dead_) { message->Destroy(); return false; } @@ -151,7 +175,14 @@ bool RawChannelPosix::WriteMessage(MessageInTransit* message) { bool result = WriteFrontMessageNoLock(); DCHECK(result || write_message_queue_.empty()); - if (!write_message_queue_.empty()) { + if (!result) { + // Even if we're on the I/O thread, don't call |OnFatalError()| in the + // nested context. + message_loop()->PostTask(FROM_HERE, + base::Bind(&RawChannelPosix::CallOnFatalError, + weak_ptr_factory_.GetWeakPtr(), + Delegate::FATAL_ERROR_FAILED_WRITE)); + } else if (!write_message_queue_.empty()) { // Set up to wait for the FD to become writable. If we're not on the I/O // thread, we have to post a task to do this. if (base::MessageLoop::current() == message_loop()) { @@ -159,7 +190,7 @@ bool RawChannelPosix::WriteMessage(MessageInTransit* message) { } else { message_loop()->PostTask(FROM_HERE, base::Bind(&RawChannelPosix::WaitToWrite, - base::Unretained(this))); + weak_ptr_factory_.GetWeakPtr())); } } @@ -196,10 +227,13 @@ void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], kReadSize)); if (bytes_read < 0) { - base::AutoLock locker(write_lock_); if (errno != EAGAIN && errno != EWOULDBLOCK) { PLOG(ERROR) << "read"; - CancelPendingWritesNoLock(); + { + base::AutoLock locker(write_lock_); + CancelPendingWritesNoLock(); + } + CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); return; } @@ -262,15 +296,22 @@ void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { DCHECK_EQ(fd, fd_); DCHECK_EQ(base::MessageLoop::current(), message_loop()); - base::AutoLock locker(write_lock_); - DCHECK(is_writable_); - DCHECK(!write_message_queue_.empty()); + bool did_fail = false; + { + base::AutoLock locker(write_lock_); + DCHECK(!is_dead_); + DCHECK(!write_message_queue_.empty()); - bool result = WriteFrontMessageNoLock(); - DCHECK(result || write_message_queue_.empty()); + bool result = WriteFrontMessageNoLock(); + DCHECK(result || write_message_queue_.empty()); - if (!write_message_queue_.empty()) - WaitToWrite(); + if (!result) + did_fail = true; + else if (!write_message_queue_.empty()) + WaitToWrite(); + } + if (did_fail) + CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); } void RawChannelPosix::WaitToWrite() { @@ -283,10 +324,16 @@ void RawChannelPosix::WaitToWrite() { DCHECK(result); } +void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { + DCHECK_EQ(base::MessageLoop::current(), message_loop()); + // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? + delegate()->OnFatalError(fatal_error); +} + bool RawChannelPosix::WriteFrontMessageNoLock() { write_lock_.AssertAcquired(); - DCHECK(is_writable_); + DCHECK(!is_dead_); DCHECK(!write_message_queue_.empty()); MessageInTransit* message = write_message_queue_.front(); @@ -326,9 +373,9 @@ bool RawChannelPosix::WriteFrontMessageNoLock() { void RawChannelPosix::CancelPendingWritesNoLock() { write_lock_.AssertAcquired(); - DCHECK(is_writable_); + DCHECK(!is_dead_); - is_writable_ = false; + is_dead_ = true; for (std::deque<MessageInTransit*>::iterator it = write_message_queue_.begin(); it != write_message_queue_.end(); ++it) { |