summaryrefslogtreecommitdiffstats
path: root/mojo/system/raw_channel_posix.cc
diff options
context:
space:
mode:
authorviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-10-15 19:53:54 +0000
committerviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-10-15 19:53:54 +0000
commitda83553b1671cd51a986275947400d8cfc56883e (patch)
treeafdfbe6ebb6c28511dddfb8acec4b569ea76e610 /mojo/system/raw_channel_posix.cc
parent4b9d0863d0a116dbc747cc057dac0fd94604b8bd (diff)
downloadchromium_src-da83553b1671cd51a986275947400d8cfc56883e.zip
chromium_src-da83553b1671cd51a986275947400d8cfc56883e.tar.gz
chromium_src-da83553b1671cd51a986275947400d8cfc56883e.tar.bz2
Add handling of fatal errors to RawChannel/RawChannelPosix.
Also fix a race if |Shutdown()| were called simultaneously (on the I/O thread) with |WriteMessage()| being called (and posting |WaitToWrite()|). R=davemoore@chromium.org Review URL: https://codereview.chromium.org/27241003 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@228745 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo/system/raw_channel_posix.cc')
-rw-r--r--mojo/system/raw_channel_posix.cc93
1 files changed, 70 insertions, 23 deletions
diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc
index d81d1bf..e218d84 100644
--- a/mojo/system/raw_channel_posix.cc
+++ b/mojo/system/raw_channel_posix.cc
@@ -18,6 +18,7 @@
#include "base/location.h"
#include "base/logging.h"
#include "base/memory/scoped_ptr.h"
+#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/posix/eintr_wrapper.h"
#include "base/synchronization/lock.h"
@@ -50,6 +51,10 @@ class RawChannelPosix : public RawChannel,
// Watches for |fd_| to become writable. Must be called on the I/O thread.
void WaitToWrite();
+ // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O
+ // thread WITHOUT |write_lock_| held.
+ void CallOnFatalError(Delegate::FatalError fatal_error);
+
// Writes the message at the front of |write_message_queue_|, starting at
// |write_message_offset_|. It removes and destroys if the write completes and
// otherwise updates |write_message_offset_|. Returns true on success. Must be
@@ -57,15 +62,15 @@ class RawChannelPosix : public RawChannel,
bool WriteFrontMessageNoLock();
// Cancels all pending writes and destroys the contents of
- // |write_message_queue_|. Should only be called if |is_writable_| is true;
- // sets |is_writable_| to false. Must be called under |write_lock_|.
+ // |write_message_queue_|. Should only be called if |is_dead_| is false; sets
+ // |is_dead_| to true. Must be called under |write_lock_|.
void CancelPendingWritesNoLock();
base::MessageLoopForIO* message_loop_for_io() {
return static_cast<base::MessageLoopForIO*>(message_loop());
}
- const int fd_;
+ int fd_;
// Only used on the I/O thread:
scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
@@ -79,9 +84,13 @@ class RawChannelPosix : public RawChannel,
size_t read_buffer_num_valid_bytes_;
base::Lock write_lock_; // Protects the following members.
- bool is_writable_;
+ bool is_dead_;
std::deque<MessageInTransit*> write_message_queue_;
size_t write_message_offset_;
+ // This is used for posting tasks from write threads to the I/O thread. It
+ // must only be accessed under |write_lock_|. The weak pointers it produces
+ // are only used/invalidated on the I/O thread.
+ base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
};
@@ -92,13 +101,21 @@ RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle,
: RawChannel(delegate, message_loop),
fd_(handle.fd),
read_buffer_num_valid_bytes_(0),
- is_writable_(true),
- write_message_offset_(0) {
+ is_dead_(false),
+ write_message_offset_(0),
+ weak_ptr_factory_(this) {
CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO);
+ DCHECK_NE(fd_, -1);
}
RawChannelPosix::~RawChannelPosix() {
- close(fd_);
+ DCHECK(is_dead_);
+ DCHECK_EQ(fd_, -1);
+
+ // No need to take the |write_lock_| here -- if there are still weak pointers
+ // outstanding, then we're hosed anyway (since we wouldn't be able to
+ // invalidate them cleanly, since we might not be on the I/O thread).
+ DCHECK(!weak_ptr_factory_.HasWeakPtrs());
// These must have been shut down/destroyed on the I/O thread.
DCHECK(!read_watcher_.get());
@@ -125,9 +142,16 @@ void RawChannelPosix::Shutdown() {
DCHECK_EQ(base::MessageLoop::current(), message_loop());
base::AutoLock locker(write_lock_);
- if (is_writable_)
+ if (!is_dead_)
CancelPendingWritesNoLock();
+ DCHECK_NE(fd_, -1);
+ if (close(fd_) != 0)
+ PLOG(ERROR) << "close";
+ fd_ = -1;
+
+ weak_ptr_factory_.InvalidateWeakPtrs();
+
read_watcher_.reset(); // This will stop watching (if necessary).
write_watcher_.reset(); // This will stop watching (if necessary).
}
@@ -136,7 +160,7 @@ void RawChannelPosix::Shutdown() {
// success.
bool RawChannelPosix::WriteMessage(MessageInTransit* message) {
base::AutoLock locker(write_lock_);
- if (!is_writable_) {
+ if (is_dead_) {
message->Destroy();
return false;
}
@@ -151,7 +175,14 @@ bool RawChannelPosix::WriteMessage(MessageInTransit* message) {
bool result = WriteFrontMessageNoLock();
DCHECK(result || write_message_queue_.empty());
- if (!write_message_queue_.empty()) {
+ if (!result) {
+ // Even if we're on the I/O thread, don't call |OnFatalError()| in the
+ // nested context.
+ message_loop()->PostTask(FROM_HERE,
+ base::Bind(&RawChannelPosix::CallOnFatalError,
+ weak_ptr_factory_.GetWeakPtr(),
+ Delegate::FATAL_ERROR_FAILED_WRITE));
+ } else if (!write_message_queue_.empty()) {
// Set up to wait for the FD to become writable. If we're not on the I/O
// thread, we have to post a task to do this.
if (base::MessageLoop::current() == message_loop()) {
@@ -159,7 +190,7 @@ bool RawChannelPosix::WriteMessage(MessageInTransit* message) {
} else {
message_loop()->PostTask(FROM_HERE,
base::Bind(&RawChannelPosix::WaitToWrite,
- base::Unretained(this)));
+ weak_ptr_factory_.GetWeakPtr()));
}
}
@@ -196,10 +227,13 @@ void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
&read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_],
kReadSize));
if (bytes_read < 0) {
- base::AutoLock locker(write_lock_);
if (errno != EAGAIN && errno != EWOULDBLOCK) {
PLOG(ERROR) << "read";
- CancelPendingWritesNoLock();
+ {
+ base::AutoLock locker(write_lock_);
+ CancelPendingWritesNoLock();
+ }
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
return;
}
@@ -262,15 +296,22 @@ void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
DCHECK_EQ(fd, fd_);
DCHECK_EQ(base::MessageLoop::current(), message_loop());
- base::AutoLock locker(write_lock_);
- DCHECK(is_writable_);
- DCHECK(!write_message_queue_.empty());
+ bool did_fail = false;
+ {
+ base::AutoLock locker(write_lock_);
+ DCHECK(!is_dead_);
+ DCHECK(!write_message_queue_.empty());
- bool result = WriteFrontMessageNoLock();
- DCHECK(result || write_message_queue_.empty());
+ bool result = WriteFrontMessageNoLock();
+ DCHECK(result || write_message_queue_.empty());
- if (!write_message_queue_.empty())
- WaitToWrite();
+ if (!result)
+ did_fail = true;
+ else if (!write_message_queue_.empty())
+ WaitToWrite();
+ }
+ if (did_fail)
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
}
void RawChannelPosix::WaitToWrite() {
@@ -283,10 +324,16 @@ void RawChannelPosix::WaitToWrite() {
DCHECK(result);
}
+void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop());
+ // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
+ delegate()->OnFatalError(fatal_error);
+}
+
bool RawChannelPosix::WriteFrontMessageNoLock() {
write_lock_.AssertAcquired();
- DCHECK(is_writable_);
+ DCHECK(!is_dead_);
DCHECK(!write_message_queue_.empty());
MessageInTransit* message = write_message_queue_.front();
@@ -326,9 +373,9 @@ bool RawChannelPosix::WriteFrontMessageNoLock() {
void RawChannelPosix::CancelPendingWritesNoLock() {
write_lock_.AssertAcquired();
- DCHECK(is_writable_);
+ DCHECK(!is_dead_);
- is_writable_ = false;
+ is_dead_ = true;
for (std::deque<MessageInTransit*>::iterator it =
write_message_queue_.begin(); it != write_message_queue_.end();
++it) {