summaryrefslogtreecommitdiffstats
path: root/mojo/system
diff options
context:
space:
mode:
authorviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-10-15 19:53:54 +0000
committerviettrungluu@chromium.org <viettrungluu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-10-15 19:53:54 +0000
commitda83553b1671cd51a986275947400d8cfc56883e (patch)
treeafdfbe6ebb6c28511dddfb8acec4b569ea76e610 /mojo/system
parent4b9d0863d0a116dbc747cc057dac0fd94604b8bd (diff)
downloadchromium_src-da83553b1671cd51a986275947400d8cfc56883e.zip
chromium_src-da83553b1671cd51a986275947400d8cfc56883e.tar.gz
chromium_src-da83553b1671cd51a986275947400d8cfc56883e.tar.bz2
Add handling of fatal errors to RawChannel/RawChannelPosix.
Also fix a race if |Shutdown()| were called simultaneously (on the I/O thread) with |WriteMessage()| being called (and posting |WaitToWrite()|). R=davemoore@chromium.org Review URL: https://codereview.chromium.org/27241003 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@228745 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo/system')
-rw-r--r--mojo/system/raw_channel.h10
-rw-r--r--mojo/system/raw_channel_posix.cc93
-rw-r--r--mojo/system/raw_channel_posix_unittest.cc119
3 files changed, 189 insertions, 33 deletions
diff --git a/mojo/system/raw_channel.h b/mojo/system/raw_channel.h
index c56dbb5..0c19ad0 100644
--- a/mojo/system/raw_channel.h
+++ b/mojo/system/raw_channel.h
@@ -48,10 +48,20 @@ class MOJO_SYSTEM_EXPORT RawChannel {
// (passed in on creation).
class Delegate {
public:
+ enum FatalError {
+ FATAL_ERROR_UNKNOWN = 0,
+ FATAL_ERROR_FAILED_READ,
+ FATAL_ERROR_FAILED_WRITE
+ };
+
// Called when a message is read. This may call |Shutdown()| on the
// |RawChannel|, but must not destroy it.
virtual void OnReadMessage(const MessageInTransit& message) = 0;
+ // Called when there's a fatal error, which leads to the channel no longer
+ // being viable.
+ virtual void OnFatalError(FatalError fatal_error) = 0;
+
protected:
virtual ~Delegate() {}
};
diff --git a/mojo/system/raw_channel_posix.cc b/mojo/system/raw_channel_posix.cc
index d81d1bf..e218d84 100644
--- a/mojo/system/raw_channel_posix.cc
+++ b/mojo/system/raw_channel_posix.cc
@@ -18,6 +18,7 @@
#include "base/location.h"
#include "base/logging.h"
#include "base/memory/scoped_ptr.h"
+#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/posix/eintr_wrapper.h"
#include "base/synchronization/lock.h"
@@ -50,6 +51,10 @@ class RawChannelPosix : public RawChannel,
// Watches for |fd_| to become writable. Must be called on the I/O thread.
void WaitToWrite();
+ // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O
+ // thread WITHOUT |write_lock_| held.
+ void CallOnFatalError(Delegate::FatalError fatal_error);
+
// Writes the message at the front of |write_message_queue_|, starting at
// |write_message_offset_|. It removes and destroys if the write completes and
// otherwise updates |write_message_offset_|. Returns true on success. Must be
@@ -57,15 +62,15 @@ class RawChannelPosix : public RawChannel,
bool WriteFrontMessageNoLock();
// Cancels all pending writes and destroys the contents of
- // |write_message_queue_|. Should only be called if |is_writable_| is true;
- // sets |is_writable_| to false. Must be called under |write_lock_|.
+ // |write_message_queue_|. Should only be called if |is_dead_| is false; sets
+ // |is_dead_| to true. Must be called under |write_lock_|.
void CancelPendingWritesNoLock();
base::MessageLoopForIO* message_loop_for_io() {
return static_cast<base::MessageLoopForIO*>(message_loop());
}
- const int fd_;
+ int fd_;
// Only used on the I/O thread:
scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
@@ -79,9 +84,13 @@ class RawChannelPosix : public RawChannel,
size_t read_buffer_num_valid_bytes_;
base::Lock write_lock_; // Protects the following members.
- bool is_writable_;
+ bool is_dead_;
std::deque<MessageInTransit*> write_message_queue_;
size_t write_message_offset_;
+ // This is used for posting tasks from write threads to the I/O thread. It
+ // must only be accessed under |write_lock_|. The weak pointers it produces
+ // are only used/invalidated on the I/O thread.
+ base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
};
@@ -92,13 +101,21 @@ RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle,
: RawChannel(delegate, message_loop),
fd_(handle.fd),
read_buffer_num_valid_bytes_(0),
- is_writable_(true),
- write_message_offset_(0) {
+ is_dead_(false),
+ write_message_offset_(0),
+ weak_ptr_factory_(this) {
CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO);
+ DCHECK_NE(fd_, -1);
}
RawChannelPosix::~RawChannelPosix() {
- close(fd_);
+ DCHECK(is_dead_);
+ DCHECK_EQ(fd_, -1);
+
+ // No need to take the |write_lock_| here -- if there are still weak pointers
+ // outstanding, then we're hosed anyway (since we wouldn't be able to
+ // invalidate them cleanly, since we might not be on the I/O thread).
+ DCHECK(!weak_ptr_factory_.HasWeakPtrs());
// These must have been shut down/destroyed on the I/O thread.
DCHECK(!read_watcher_.get());
@@ -125,9 +142,16 @@ void RawChannelPosix::Shutdown() {
DCHECK_EQ(base::MessageLoop::current(), message_loop());
base::AutoLock locker(write_lock_);
- if (is_writable_)
+ if (!is_dead_)
CancelPendingWritesNoLock();
+ DCHECK_NE(fd_, -1);
+ if (close(fd_) != 0)
+ PLOG(ERROR) << "close";
+ fd_ = -1;
+
+ weak_ptr_factory_.InvalidateWeakPtrs();
+
read_watcher_.reset(); // This will stop watching (if necessary).
write_watcher_.reset(); // This will stop watching (if necessary).
}
@@ -136,7 +160,7 @@ void RawChannelPosix::Shutdown() {
// success.
bool RawChannelPosix::WriteMessage(MessageInTransit* message) {
base::AutoLock locker(write_lock_);
- if (!is_writable_) {
+ if (is_dead_) {
message->Destroy();
return false;
}
@@ -151,7 +175,14 @@ bool RawChannelPosix::WriteMessage(MessageInTransit* message) {
bool result = WriteFrontMessageNoLock();
DCHECK(result || write_message_queue_.empty());
- if (!write_message_queue_.empty()) {
+ if (!result) {
+ // Even if we're on the I/O thread, don't call |OnFatalError()| in the
+ // nested context.
+ message_loop()->PostTask(FROM_HERE,
+ base::Bind(&RawChannelPosix::CallOnFatalError,
+ weak_ptr_factory_.GetWeakPtr(),
+ Delegate::FATAL_ERROR_FAILED_WRITE));
+ } else if (!write_message_queue_.empty()) {
// Set up to wait for the FD to become writable. If we're not on the I/O
// thread, we have to post a task to do this.
if (base::MessageLoop::current() == message_loop()) {
@@ -159,7 +190,7 @@ bool RawChannelPosix::WriteMessage(MessageInTransit* message) {
} else {
message_loop()->PostTask(FROM_HERE,
base::Bind(&RawChannelPosix::WaitToWrite,
- base::Unretained(this)));
+ weak_ptr_factory_.GetWeakPtr()));
}
}
@@ -196,10 +227,13 @@ void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
&read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_],
kReadSize));
if (bytes_read < 0) {
- base::AutoLock locker(write_lock_);
if (errno != EAGAIN && errno != EWOULDBLOCK) {
PLOG(ERROR) << "read";
- CancelPendingWritesNoLock();
+ {
+ base::AutoLock locker(write_lock_);
+ CancelPendingWritesNoLock();
+ }
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
return;
}
@@ -262,15 +296,22 @@ void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
DCHECK_EQ(fd, fd_);
DCHECK_EQ(base::MessageLoop::current(), message_loop());
- base::AutoLock locker(write_lock_);
- DCHECK(is_writable_);
- DCHECK(!write_message_queue_.empty());
+ bool did_fail = false;
+ {
+ base::AutoLock locker(write_lock_);
+ DCHECK(!is_dead_);
+ DCHECK(!write_message_queue_.empty());
- bool result = WriteFrontMessageNoLock();
- DCHECK(result || write_message_queue_.empty());
+ bool result = WriteFrontMessageNoLock();
+ DCHECK(result || write_message_queue_.empty());
- if (!write_message_queue_.empty())
- WaitToWrite();
+ if (!result)
+ did_fail = true;
+ else if (!write_message_queue_.empty())
+ WaitToWrite();
+ }
+ if (did_fail)
+ CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
}
void RawChannelPosix::WaitToWrite() {
@@ -283,10 +324,16 @@ void RawChannelPosix::WaitToWrite() {
DCHECK(result);
}
+void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) {
+ DCHECK_EQ(base::MessageLoop::current(), message_loop());
+ // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
+ delegate()->OnFatalError(fatal_error);
+}
+
bool RawChannelPosix::WriteFrontMessageNoLock() {
write_lock_.AssertAcquired();
- DCHECK(is_writable_);
+ DCHECK(!is_dead_);
DCHECK(!write_message_queue_.empty());
MessageInTransit* message = write_message_queue_.front();
@@ -326,9 +373,9 @@ bool RawChannelPosix::WriteFrontMessageNoLock() {
void RawChannelPosix::CancelPendingWritesNoLock() {
write_lock_.AssertAcquired();
- DCHECK(is_writable_);
+ DCHECK(!is_dead_);
- is_writable_ = false;
+ is_dead_ = true;
for (std::deque<MessageInTransit*>::iterator it =
write_message_queue_.begin(); it != write_message_queue_.end();
++it) {
diff --git a/mojo/system/raw_channel_posix_unittest.cc b/mojo/system/raw_channel_posix_unittest.cc
index 8599996..94d9943 100644
--- a/mojo/system/raw_channel_posix_unittest.cc
+++ b/mojo/system/raw_channel_posix_unittest.cc
@@ -99,9 +99,9 @@ class RawChannelPosixTest : public testing::Test {
virtual void TearDown() OVERRIDE {
if (fds_[0] != -1)
- close(fds_[0]);
+ CHECK_EQ(close(fds_[0]), 0);
if (fds_[1] != -1)
- close(fds_[1]);
+ CHECK_EQ(close(fds_[1]), 0);
io_thread_.Stop();
}
@@ -130,6 +130,9 @@ class WriteOnlyRawChannelDelegate : public RawChannel::Delegate {
virtual void OnReadMessage(const MessageInTransit& /*message*/) OVERRIDE {
NOTREACHED();
}
+ virtual void OnFatalError(FatalError /*fatal_error*/) OVERRIDE {
+ NOTREACHED();
+ }
private:
DISALLOW_COPY_AND_ASSIGN(WriteOnlyRawChannelDelegate);
@@ -221,13 +224,13 @@ TEST_F(RawChannelPosixTest, WriteMessage) {
// Write and read, for a variety of sizes.
for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) {
- rc->WriteMessage(MakeTestMessage(size));
+ EXPECT_TRUE(rc->WriteMessage(MakeTestMessage(size)));
EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size;
}
// Write/queue and read afterwards, for a variety of sizes.
for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1)
- rc->WriteMessage(MakeTestMessage(size));
+ EXPECT_TRUE(rc->WriteMessage(MakeTestMessage(size)));
for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1)
EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size;
@@ -246,8 +249,7 @@ class ReadCheckerRawChannelDelegate : public RawChannel::Delegate {
position_(0) {}
virtual ~ReadCheckerRawChannelDelegate() {}
- // |RawChannel::Delegate| implementation:
- // Called on the I/O thread.
+ // |RawChannel::Delegate| implementation (called on the I/O thread):
virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE {
size_t position;
size_t expected_size;
@@ -271,6 +273,9 @@ class ReadCheckerRawChannelDelegate : public RawChannel::Delegate {
if (should_signal)
done_event_.Signal();
}
+ virtual void OnFatalError(FatalError /*fatal_error*/) OVERRIDE {
+ NOTREACHED();
+ }
// Wait for all the messages (of sizes |expected_sizes_|) to be seen.
void Wait() {
@@ -359,8 +364,8 @@ class RawChannelWriterThread : public base::SimpleThread {
static const int kMaxRandomMessageSize = 25000;
while (left_to_write_-- > 0) {
- raw_channel_->WriteMessage(MakeTestMessage(
- static_cast<uint32_t>(base::RandInt(1, kMaxRandomMessageSize))));
+ EXPECT_TRUE(raw_channel_->WriteMessage(MakeTestMessage(
+ static_cast<uint32_t>(base::RandInt(1, kMaxRandomMessageSize)))));
}
}
@@ -378,8 +383,7 @@ class ReadCountdownRawChannelDelegate : public RawChannel::Delegate {
count_(0) {}
virtual ~ReadCountdownRawChannelDelegate() {}
- // |RawChannel::Delegate| implementation:
- // Called on the I/O thread.
+ // |RawChannel::Delegate| implementation (called on the I/O thread):
virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE {
EXPECT_LT(count_, expected_count_);
count_++;
@@ -389,6 +393,9 @@ class ReadCountdownRawChannelDelegate : public RawChannel::Delegate {
if (count_ >= expected_count_)
done_event_.Signal();
}
+ virtual void OnFatalError(FatalError /*fatal_error*/) OVERRIDE {
+ NOTREACHED();
+ }
// Wait for all the messages to have been seen.
void Wait() {
@@ -462,6 +469,98 @@ TEST_F(RawChannelPosixTest, WriteMessageAndOnReadMessage) {
base::Unretained(writer_rc.get())));
}
+// RawChannelPosixTest.OnFatalError --------------------------------------------
+
+class FatalErrorRecordingRawChannelDelegate : public RawChannel::Delegate {
+ public:
+ FatalErrorRecordingRawChannelDelegate()
+ : got_fatal_error_event_(false, false),
+ on_fatal_error_call_count_(0),
+ last_fatal_error_(FATAL_ERROR_UNKNOWN) {}
+ virtual ~FatalErrorRecordingRawChannelDelegate() {}
+
+ // |RawChannel::Delegate| implementation:
+ virtual void OnReadMessage(const MessageInTransit& /*message*/) OVERRIDE {
+ NOTREACHED();
+ }
+ virtual void OnFatalError(FatalError fatal_error) OVERRIDE {
+ CHECK_EQ(on_fatal_error_call_count_, 0);
+ on_fatal_error_call_count_++;
+ last_fatal_error_ = fatal_error;
+ got_fatal_error_event_.Signal();
+ }
+
+ FatalError WaitForFatalError() {
+ got_fatal_error_event_.Wait();
+ CHECK_EQ(on_fatal_error_call_count_, 1);
+ return last_fatal_error_;
+ }
+
+ private:
+ base::WaitableEvent got_fatal_error_event_;
+
+ int on_fatal_error_call_count_;
+ FatalError last_fatal_error_;
+
+ DISALLOW_COPY_AND_ASSIGN(FatalErrorRecordingRawChannelDelegate);
+};
+
+// Tests fatal errors.
+// TODO(vtl): Figure out how to make reading fail reliably. (I'm not convinced
+// that it does.)
+TEST_F(RawChannelPosixTest, OnFatalError) {
+ FatalErrorRecordingRawChannelDelegate delegate;
+ scoped_ptr<RawChannel> rc(RawChannel::Create(PlatformChannelHandle(fd(0)),
+ &delegate,
+ message_loop()));
+ // |RawChannel::Create()| takes ownership of the FD.
+ clear_fd(0);
+
+ PostTaskAndWait(message_loop(),
+ FROM_HERE,
+ base::Bind(&RawChannel::Init, base::Unretained(rc.get())));
+
+ // Close the other end, which should make writing fail.
+ CHECK_EQ(close(fd(1)), 0);
+ clear_fd(1);
+
+ EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1)));
+
+ // TODO(vtl): In theory, it's conceivable that closing the other end might
+ // lead to read failing. In practice, it doesn't seem to.
+ EXPECT_EQ(RawChannel::Delegate::FATAL_ERROR_FAILED_WRITE,
+ delegate.WaitForFatalError());
+
+ PostTaskAndWait(message_loop(),
+ FROM_HERE,
+ base::Bind(&RawChannel::Shutdown,
+ base::Unretained(rc.get())));
+
+}
+
+// RawChannelPosixTest.WriteMessageAfterShutdown -------------------------------
+
+// Makes sure that calling |WriteMessage()| after |Shutdown()| behaves
+// correctly.
+TEST_F(RawChannelPosixTest, WriteMessageAfterShutdown) {
+ WriteOnlyRawChannelDelegate delegate;
+ scoped_ptr<RawChannel> rc(RawChannel::Create(PlatformChannelHandle(fd(0)),
+ &delegate,
+ message_loop()));
+ // |RawChannel::Create()| takes ownership of the FD.
+ clear_fd(0);
+
+ PostTaskAndWait(message_loop(),
+ FROM_HERE,
+ base::Bind(&RawChannel::Init, base::Unretained(rc.get())));
+ PostTaskAndWait(message_loop(),
+ FROM_HERE,
+ base::Bind(&RawChannel::Shutdown,
+ base::Unretained(rc.get())));
+
+ EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1)));
+}
+
} // namespace
} // namespace system
} // namespace mojo