// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a // heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to // increase tolerance and reduce observed flakiness (though doing so reduces the // meaningfulness of the test). #include "mojo/system/message_pipe_dispatcher.h" #include <string.h> #include <limits> #include "base/memory/ref_counted.h" #include "base/memory/scoped_vector.h" #include "base/rand_util.h" #include "base/threading/platform_thread.h" // For |Sleep()|. #include "base/threading/simple_thread.h" #include "base/time/time.h" #include "mojo/system/message_pipe.h" #include "mojo/system/test_utils.h" #include "mojo/system/waiter.h" #include "mojo/system/waiter_test_utils.h" #include "testing/gtest/include/gtest/gtest.h" namespace mojo { namespace system { namespace { TEST(MessagePipeDispatcherTest, Basic) { test::Stopwatch stopwatch; int32_t buffer[1]; const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); uint32_t buffer_size; // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. for (unsigned i = 0; i < 2; i++) { scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( MessagePipeDispatcher::kDefaultCreateOptions)); EXPECT_EQ(Dispatcher::kTypeMessagePipe, d0->GetType()); scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( MessagePipeDispatcher::kDefaultCreateOptions)); { scoped_refptr<MessagePipe> mp(new MessagePipe()); d0->Init(mp, i); // 0, 1. d1->Init(mp, i ^ 1); // 1, 0. } Waiter w; uint32_t context = 0; HandleSignalsState hss; // Try adding a writable waiter when already writable. w.Init(); hss = HandleSignalsState(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss)); EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals); // Shouldn't need to remove the waiter (it was not added). // Add a readable waiter to |d0|, then make it readable (by writing to // |d1|), then wait. w.Init(); ASSERT_EQ(MOJO_RESULT_OK, d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, NULL)); buffer[0] = 123456789; EXPECT_EQ(MOJO_RESULT_OK, d1->WriteMessage(UserPointer<const void>(buffer), kBufferSize, NULL, MOJO_WRITE_MESSAGE_FLAG_NONE)); stopwatch.Start(); EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context)); EXPECT_EQ(1u, context); EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); hss = HandleSignalsState(); d0->RemoveWaiter(&w, &hss); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals); // Try adding a readable waiter when already readable (from above). w.Init(); hss = HandleSignalsState(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss)); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals); // Shouldn't need to remove the waiter (it was not added). // Make |d0| no longer readable (by reading from it). buffer[0] = 0; buffer_size = kBufferSize; EXPECT_EQ(MOJO_RESULT_OK, d0->ReadMessage(UserPointer<void>(buffer), MakeUserPointer(&buffer_size), 0, NULL, MOJO_READ_MESSAGE_FLAG_NONE)); EXPECT_EQ(kBufferSize, buffer_size); EXPECT_EQ(123456789, buffer[0]); // Wait for zero time for readability on |d0| (will time out). w.Init(); ASSERT_EQ(MOJO_RESULT_OK, d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, NULL)); stopwatch.Start(); EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, NULL)); EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); hss = HandleSignalsState(); d0->RemoveWaiter(&w, &hss); EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals); // Wait for non-zero, finite time for readability on |d0| (will time out). w.Init(); ASSERT_EQ(MOJO_RESULT_OK, d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, NULL)); stopwatch.Start(); EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(2 * test::EpsilonTimeout().InMicroseconds(), NULL)); base::TimeDelta elapsed = stopwatch.Elapsed(); EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); hss = HandleSignalsState(); d0->RemoveWaiter(&w, &hss); EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals); EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); } } TEST(MessagePipeDispatcherTest, InvalidParams) { char buffer[1]; scoped_refptr<MessagePipeDispatcher> d0( new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); scoped_refptr<MessagePipeDispatcher> d1( new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); { scoped_refptr<MessagePipe> mp(new MessagePipe()); d0->Init(mp, 0); d1->Init(mp, 1); } // |WriteMessage|: // Huge buffer size. EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, d0->WriteMessage(UserPointer<const void>(buffer), std::numeric_limits<uint32_t>::max(), NULL, MOJO_WRITE_MESSAGE_FLAG_NONE)); EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); } // These test invalid arguments that should cause death if we're being paranoid // about checking arguments (which we would want to do if, e.g., we were in a // true "kernel" situation, but we might not want to do otherwise for // performance reasons). Probably blatant errors like passing in null pointers // (for required pointer arguments) will still cause death, but perhaps not // predictably. TEST(MessagePipeDispatcherTest, InvalidParamsDeath) { const char kMemoryCheckFailedRegex[] = "Check failed"; scoped_refptr<MessagePipeDispatcher> d0( new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); scoped_refptr<MessagePipeDispatcher> d1( new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); { scoped_refptr<MessagePipe> mp(new MessagePipe()); d0->Init(mp, 0); d1->Init(mp, 1); } // |WriteMessage|: // Null buffer with nonzero buffer size. EXPECT_DEATH_IF_SUPPORTED( d0->WriteMessage( NullUserPointer(), 1, NULL, MOJO_WRITE_MESSAGE_FLAG_NONE), kMemoryCheckFailedRegex); // |ReadMessage|: // Null buffer with nonzero buffer size. // First write something so that we actually have something to read. EXPECT_EQ( MOJO_RESULT_OK, d1->WriteMessage( UserPointer<const void>("x"), 1, NULL, MOJO_WRITE_MESSAGE_FLAG_NONE)); uint32_t buffer_size = 1; EXPECT_DEATH_IF_SUPPORTED(d0->ReadMessage(NullUserPointer(), MakeUserPointer(&buffer_size), 0, NULL, MOJO_READ_MESSAGE_FLAG_NONE), kMemoryCheckFailedRegex); EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); } // Test what happens when one end is closed (single-threaded test). TEST(MessagePipeDispatcherTest, BasicClosed) { int32_t buffer[1]; const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); uint32_t buffer_size; // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. for (unsigned i = 0; i < 2; i++) { scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( MessagePipeDispatcher::kDefaultCreateOptions)); scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( MessagePipeDispatcher::kDefaultCreateOptions)); { scoped_refptr<MessagePipe> mp(new MessagePipe()); d0->Init(mp, i); // 0, 1. d1->Init(mp, i ^ 1); // 1, 0. } Waiter w; HandleSignalsState hss; // Write (twice) to |d1|. buffer[0] = 123456789; EXPECT_EQ(MOJO_RESULT_OK, d1->WriteMessage(UserPointer<const void>(buffer), kBufferSize, NULL, MOJO_WRITE_MESSAGE_FLAG_NONE)); buffer[0] = 234567890; EXPECT_EQ(MOJO_RESULT_OK, d1->WriteMessage(UserPointer<const void>(buffer), kBufferSize, NULL, MOJO_WRITE_MESSAGE_FLAG_NONE)); // Try waiting for readable on |d0|; should fail (already satisfied). w.Init(); hss = HandleSignalsState(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals); // Try reading from |d1|; should fail (nothing to read). buffer[0] = 0; buffer_size = kBufferSize; EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, d1->ReadMessage(UserPointer<void>(buffer), MakeUserPointer(&buffer_size), 0, NULL, MOJO_READ_MESSAGE_FLAG_NONE)); // Close |d1|. EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); // Try waiting for readable on |d0|; should fail (already satisfied). w.Init(); hss = HandleSignalsState(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, &hss)); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals); // Read from |d0|. buffer[0] = 0; buffer_size = kBufferSize; EXPECT_EQ(MOJO_RESULT_OK, d0->ReadMessage(UserPointer<void>(buffer), MakeUserPointer(&buffer_size), 0, NULL, MOJO_READ_MESSAGE_FLAG_NONE)); EXPECT_EQ(kBufferSize, buffer_size); EXPECT_EQ(123456789, buffer[0]); // Try waiting for readable on |d0|; should fail (already satisfied). w.Init(); hss = HandleSignalsState(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss)); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals); // Read again from |d0|. buffer[0] = 0; buffer_size = kBufferSize; EXPECT_EQ(MOJO_RESULT_OK, d0->ReadMessage(UserPointer<void>(buffer), MakeUserPointer(&buffer_size), 0, NULL, MOJO_READ_MESSAGE_FLAG_NONE)); EXPECT_EQ(kBufferSize, buffer_size); EXPECT_EQ(234567890, buffer[0]); // Try waiting for readable on |d0|; should fail (unsatisfiable). w.Init(); hss = HandleSignalsState(); EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, &hss)); EXPECT_EQ(0u, hss.satisfied_signals); EXPECT_EQ(0u, hss.satisfiable_signals); // Try waiting for writable on |d0|; should fail (unsatisfiable). w.Init(); hss = HandleSignalsState(); EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4, &hss)); EXPECT_EQ(0u, hss.satisfied_signals); EXPECT_EQ(0u, hss.satisfiable_signals); // Try reading from |d0|; should fail (nothing to read and other end // closed). buffer[0] = 0; buffer_size = kBufferSize; EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, d0->ReadMessage(UserPointer<void>(buffer), MakeUserPointer(&buffer_size), 0, NULL, MOJO_READ_MESSAGE_FLAG_NONE)); // Try writing to |d0|; should fail (other end closed). buffer[0] = 345678901; EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, d0->WriteMessage(UserPointer<const void>(buffer), kBufferSize, NULL, MOJO_WRITE_MESSAGE_FLAG_NONE)); EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); } } #if defined(OS_WIN) // http://crbug.com/396386 #define MAYBE_BasicThreaded DISABLED_BasicThreaded #else #define MAYBE_BasicThreaded BasicThreaded #endif TEST(MessagePipeDispatcherTest, MAYBE_BasicThreaded) { test::Stopwatch stopwatch; int32_t buffer[1]; const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); uint32_t buffer_size; base::TimeDelta elapsed; bool did_wait; MojoResult result; uint32_t context; HandleSignalsState hss; // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. for (unsigned i = 0; i < 2; i++) { scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( MessagePipeDispatcher::kDefaultCreateOptions)); scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( MessagePipeDispatcher::kDefaultCreateOptions)); { scoped_refptr<MessagePipe> mp(new MessagePipe()); d0->Init(mp, i); // 0, 1. d1->Init(mp, i ^ 1); // 1, 0. } // Wait for readable on |d1|, which will become readable after some time. { test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, 1, &did_wait, &result, &context, &hss); stopwatch.Start(); thread.Start(); base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); // Wake it up by writing to |d0|. buffer[0] = 123456789; EXPECT_EQ(MOJO_RESULT_OK, d0->WriteMessage(UserPointer<const void>(buffer), kBufferSize, NULL, MOJO_WRITE_MESSAGE_FLAG_NONE)); } // Joins the thread. elapsed = stopwatch.Elapsed(); EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); EXPECT_TRUE(did_wait); EXPECT_EQ(MOJO_RESULT_OK, result); EXPECT_EQ(1u, context); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals); // Now |d1| is already readable. Try waiting for it again. { test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, 2, &did_wait, &result, &context, &hss); stopwatch.Start(); thread.Start(); } // Joins the thread. EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); EXPECT_FALSE(did_wait); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals); // Consume what we wrote to |d0|. buffer[0] = 0; buffer_size = kBufferSize; EXPECT_EQ(MOJO_RESULT_OK, d1->ReadMessage(UserPointer<void>(buffer), MakeUserPointer(&buffer_size), 0, NULL, MOJO_READ_MESSAGE_FLAG_NONE)); EXPECT_EQ(kBufferSize, buffer_size); EXPECT_EQ(123456789, buffer[0]); // Wait for readable on |d1| and close |d0| after some time, which should // cancel that wait. { test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, 3, &did_wait, &result, &context, &hss); stopwatch.Start(); thread.Start(); base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); } // Joins the thread. elapsed = stopwatch.Elapsed(); EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); EXPECT_TRUE(did_wait); EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); EXPECT_EQ(3u, context); EXPECT_EQ(0u, hss.satisfied_signals); EXPECT_EQ(0u, hss.satisfiable_signals); EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); } for (unsigned i = 0; i < 2; i++) { scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( MessagePipeDispatcher::kDefaultCreateOptions)); scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( MessagePipeDispatcher::kDefaultCreateOptions)); { scoped_refptr<MessagePipe> mp(new MessagePipe()); d0->Init(mp, i); // 0, 1. d1->Init(mp, i ^ 1); // 1, 0. } // Wait for readable on |d1| and close |d1| after some time, which should // cancel that wait. { test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, 4, &did_wait, &result, &context, &hss); stopwatch.Start(); thread.Start(); base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); } // Joins the thread. elapsed = stopwatch.Elapsed(); EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); EXPECT_TRUE(did_wait); EXPECT_EQ(MOJO_RESULT_CANCELLED, result); EXPECT_EQ(4u, context); EXPECT_EQ(0u, hss.satisfied_signals); EXPECT_EQ(0u, hss.satisfiable_signals); EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); } } // Stress test ----------------------------------------------------------------- const size_t kMaxMessageSize = 2000; class WriterThread : public base::SimpleThread { public: // |*messages_written| and |*bytes_written| belong to the thread while it's // alive. WriterThread(scoped_refptr<Dispatcher> write_dispatcher, size_t* messages_written, size_t* bytes_written) : base::SimpleThread("writer_thread"), write_dispatcher_(write_dispatcher), messages_written_(messages_written), bytes_written_(bytes_written) { *messages_written_ = 0; *bytes_written_ = 0; } virtual ~WriterThread() { Join(); } private: virtual void Run() OVERRIDE { // Make some data to write. unsigned char buffer[kMaxMessageSize]; for (size_t i = 0; i < kMaxMessageSize; i++) buffer[i] = static_cast<unsigned char>(i); // Number of messages to write. *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000)); // Write messages. for (size_t i = 0; i < *messages_written_; i++) { uint32_t bytes_to_write = static_cast<uint32_t>( base::RandInt(1, static_cast<int>(kMaxMessageSize))); EXPECT_EQ(MOJO_RESULT_OK, write_dispatcher_->WriteMessage(UserPointer<const void>(buffer), bytes_to_write, NULL, MOJO_WRITE_MESSAGE_FLAG_NONE)); *bytes_written_ += bytes_to_write; } // Write one last "quit" message. EXPECT_EQ(MOJO_RESULT_OK, write_dispatcher_->WriteMessage(UserPointer<const void>("quit"), 4, NULL, MOJO_WRITE_MESSAGE_FLAG_NONE)); } const scoped_refptr<Dispatcher> write_dispatcher_; size_t* const messages_written_; size_t* const bytes_written_; DISALLOW_COPY_AND_ASSIGN(WriterThread); }; class ReaderThread : public base::SimpleThread { public: // |*messages_read| and |*bytes_read| belong to the thread while it's alive. ReaderThread(scoped_refptr<Dispatcher> read_dispatcher, size_t* messages_read, size_t* bytes_read) : base::SimpleThread("reader_thread"), read_dispatcher_(read_dispatcher), messages_read_(messages_read), bytes_read_(bytes_read) { *messages_read_ = 0; *bytes_read_ = 0; } virtual ~ReaderThread() { Join(); } private: virtual void Run() OVERRIDE { unsigned char buffer[kMaxMessageSize]; Waiter w; HandleSignalsState hss; MojoResult result; // Read messages. for (;;) { // Wait for it to be readable. w.Init(); hss = HandleSignalsState(); result = read_dispatcher_->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss); EXPECT_TRUE(result == MOJO_RESULT_OK || result == MOJO_RESULT_ALREADY_EXISTS) << "result: " << result; if (result == MOJO_RESULT_OK) { // Actually need to wait. EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, NULL)); read_dispatcher_->RemoveWaiter(&w, &hss); } // We may not actually be readable, since we're racing with other threads. EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE)); // Now, try to do the read. // Clear the buffer so that we can check the result. memset(buffer, 0, sizeof(buffer)); uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); result = read_dispatcher_->ReadMessage(UserPointer<void>(buffer), MakeUserPointer(&buffer_size), 0, NULL, MOJO_READ_MESSAGE_FLAG_NONE); EXPECT_TRUE(result == MOJO_RESULT_OK || result == MOJO_RESULT_SHOULD_WAIT) << "result: " << result; // We're racing with others to read, so maybe we failed. if (result == MOJO_RESULT_SHOULD_WAIT) continue; // In which case, try again. // Check for quit. if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0) return; EXPECT_GE(buffer_size, 1u); EXPECT_LE(buffer_size, kMaxMessageSize); EXPECT_TRUE(IsValidMessage(buffer, buffer_size)); (*messages_read_)++; *bytes_read_ += buffer_size; } } static bool IsValidMessage(const unsigned char* buffer, uint32_t message_size) { size_t i; for (i = 0; i < message_size; i++) { if (buffer[i] != static_cast<unsigned char>(i)) return false; } // Check that the remaining bytes weren't stomped on. for (; i < kMaxMessageSize; i++) { if (buffer[i] != 0) return false; } return true; } const scoped_refptr<Dispatcher> read_dispatcher_; size_t* const messages_read_; size_t* const bytes_read_; DISALLOW_COPY_AND_ASSIGN(ReaderThread); }; TEST(MessagePipeDispatcherTest, Stress) { static const size_t kNumWriters = 30; static const size_t kNumReaders = kNumWriters; scoped_refptr<MessagePipeDispatcher> d_write( new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); scoped_refptr<MessagePipeDispatcher> d_read( new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); { scoped_refptr<MessagePipe> mp(new MessagePipe()); d_write->Init(mp, 0); d_read->Init(mp, 1); } size_t messages_written[kNumWriters]; size_t bytes_written[kNumWriters]; size_t messages_read[kNumReaders]; size_t bytes_read[kNumReaders]; { // Make writers. ScopedVector<WriterThread> writers; for (size_t i = 0; i < kNumWriters; i++) { writers.push_back( new WriterThread(d_write, &messages_written[i], &bytes_written[i])); } // Make readers. ScopedVector<ReaderThread> readers; for (size_t i = 0; i < kNumReaders; i++) { readers.push_back( new ReaderThread(d_read, &messages_read[i], &bytes_read[i])); } // Start writers. for (size_t i = 0; i < kNumWriters; i++) writers[i]->Start(); // Start readers. for (size_t i = 0; i < kNumReaders; i++) readers[i]->Start(); // TODO(vtl): Maybe I should have an event that triggers all the threads to // start doing stuff for real (so that the first ones created/started aren't // advantaged). } // Joins all the threads. size_t total_messages_written = 0; size_t total_bytes_written = 0; for (size_t i = 0; i < kNumWriters; i++) { total_messages_written += messages_written[i]; total_bytes_written += bytes_written[i]; } size_t total_messages_read = 0; size_t total_bytes_read = 0; for (size_t i = 0; i < kNumReaders; i++) { total_messages_read += messages_read[i]; total_bytes_read += bytes_read[i]; // We'd have to be really unlucky to have read no messages on a thread. EXPECT_GT(messages_read[i], 0u) << "reader: " << i; EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i; } EXPECT_EQ(total_messages_written, total_messages_read); EXPECT_EQ(total_bytes_written, total_bytes_read); EXPECT_EQ(MOJO_RESULT_OK, d_write->Close()); EXPECT_EQ(MOJO_RESULT_OK, d_read->Close()); } } // namespace } // namespace system } // namespace mojo