// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/system/local_data_pipe.h" #include #include "base/basictypes.h" #include "base/memory/ref_counted.h" #include "mojo/system/data_pipe.h" #include "mojo/system/waiter.h" #include "testing/gtest/include/gtest/gtest.h" namespace mojo { namespace system { namespace { const uint32_t kSizeOfOptions = static_cast(sizeof(MojoCreateDataPipeOptions)); // Validate options. TEST(LocalDataPipeTest, Creation) { // Create using default options. { // Get default options. MojoCreateDataPipeOptions default_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(NULL, &default_options)); scoped_refptr dp(new LocalDataPipe(default_options)); dp->ProducerClose(); dp->ConsumerClose(); } // Create using non-default options. { const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1, // |element_num_bytes|. 1000 // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); scoped_refptr dp(new LocalDataPipe(validated_options)); dp->ProducerClose(); dp->ConsumerClose(); } { const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 4, // |element_num_bytes|. 4000 // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); scoped_refptr dp(new LocalDataPipe(validated_options)); dp->ProducerClose(); dp->ConsumerClose(); } { const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD, // |flags|. 7, // |element_num_bytes|. 7000000 // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); scoped_refptr dp(new LocalDataPipe(validated_options)); dp->ProducerClose(); dp->ConsumerClose(); } // Default capacity. { const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD, // |flags|. 100, // |element_num_bytes|. 0 // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); scoped_refptr dp(new LocalDataPipe(validated_options)); dp->ProducerClose(); dp->ConsumerClose(); } } TEST(LocalDataPipeTest, SimpleReadWrite) { const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. static_cast(sizeof(int32_t)), // |element_num_bytes|. 1000 * sizeof(int32_t) // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); scoped_refptr dp(new LocalDataPipe(validated_options)); int32_t elements[10] = { 0 }; uint32_t num_bytes = 0; // Try reading; nothing there yet. num_bytes = static_cast(arraysize(elements) * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, dp->ConsumerReadData(elements, &num_bytes, false)); // Query; nothing there yet. num_bytes = 0; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(0u, num_bytes); // Discard; nothing there yet. num_bytes = static_cast(5u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, dp->ConsumerDiscardData(&num_bytes, false)); // Read with invalid |num_bytes|. num_bytes = sizeof(elements[0]) + 1; EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, dp->ConsumerReadData(elements, &num_bytes, false)); // Write two elements. elements[0] = 123; elements[1] = 456; num_bytes = static_cast(2u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(elements, &num_bytes, false)); // It should have written everything (even without "all or none"). EXPECT_EQ(2u * sizeof(elements[0]), num_bytes); // Query. num_bytes = 0; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(2 * sizeof(elements[0]), num_bytes); // Read one element. elements[0] = -1; elements[1] = -1; num_bytes = static_cast(1u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(elements, &num_bytes, false)); EXPECT_EQ(1u * sizeof(elements[0]), num_bytes); EXPECT_EQ(123, elements[0]); EXPECT_EQ(-1, elements[1]); // Query. num_bytes = 0; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(1 * sizeof(elements[0]), num_bytes); // Try to read two elements, with "all or none". elements[0] = -1; elements[1] = -1; num_bytes = static_cast(2u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, dp->ConsumerReadData(elements, &num_bytes, true)); EXPECT_EQ(-1, elements[0]); EXPECT_EQ(-1, elements[1]); // Try to read two elements, without "all or none". elements[0] = -1; elements[1] = -1; num_bytes = static_cast(2u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(elements, &num_bytes, false)); EXPECT_EQ(456, elements[0]); EXPECT_EQ(-1, elements[1]); // Query. num_bytes = 0; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(0u, num_bytes); dp->ProducerClose(); dp->ConsumerClose(); } // Note: The "basic" waiting tests test that the "wait states" are correct in // various situations; they don't test that waiters are properly awoken on state // changes. (For that, we need to use multiple threads.) TEST(LocalDataPipeTest, BasicProducerWaiting) { // Note: We take advantage of the fact that for |LocalDataPipe|, capacities // are strict maximums. This is not guaranteed by the API. const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. static_cast(sizeof(int32_t)), // |element_num_bytes|. 2 * sizeof(int32_t) // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); scoped_refptr dp(new LocalDataPipe(validated_options)); Waiter waiter; // Never readable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ProducerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 12)); // Already writable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ProducerAddWaiter(&waiter, MOJO_WAIT_FLAG_WRITABLE, 34)); // Write two elements. int32_t elements[2] = { 123, 456 }; uint32_t num_bytes = static_cast(2u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(elements, &num_bytes, true)); EXPECT_EQ(static_cast(2u * sizeof(elements[0])), num_bytes); // Adding a waiter should now succeed. waiter.Init(); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerAddWaiter(&waiter, MOJO_WAIT_FLAG_WRITABLE, 56)); // And it shouldn't be writable yet. EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0)); dp->ProducerRemoveWaiter(&waiter); // Do it again. waiter.Init(); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerAddWaiter(&waiter, MOJO_WAIT_FLAG_WRITABLE, 78)); // Read one element. elements[0] = -1; elements[1] = -1; num_bytes = static_cast(1u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(elements, &num_bytes, true)); EXPECT_EQ(static_cast(1u * sizeof(elements[0])), num_bytes); EXPECT_EQ(123, elements[0]); EXPECT_EQ(-1, elements[1]); // Waiting should now succeed. EXPECT_EQ(78, waiter.Wait(1000)); dp->ProducerRemoveWaiter(&waiter); // Try writing, using a two-phase write. void* buffer = NULL; num_bytes = static_cast(3u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerBeginWriteData(&buffer, &num_bytes, false)); EXPECT_TRUE(buffer != NULL); EXPECT_EQ(static_cast(1u * sizeof(elements[0])), num_bytes); static_cast(buffer)[0] = 789; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData( static_cast(1u * sizeof(elements[0])))); // Add a waiter. waiter.Init(); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerAddWaiter(&waiter, MOJO_WAIT_FLAG_WRITABLE, 90)); // Read one element, using a two-phase read. const void* read_buffer = NULL; num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerBeginReadData(&read_buffer, &num_bytes, false)); EXPECT_TRUE(read_buffer != NULL); // Since we only read one element (after having written three in all), the // two-phase read should only allow us to read one. This checks an // implementation detail! EXPECT_EQ(static_cast(1u * sizeof(elements[0])), num_bytes); EXPECT_EQ(456, static_cast(read_buffer)[0]); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData( static_cast(1u * sizeof(elements[0])))); // Waiting should succeed. EXPECT_EQ(90, waiter.Wait(1000)); dp->ProducerRemoveWaiter(&waiter); // Write one element. elements[0] = 123; num_bytes = static_cast(1u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(elements, &num_bytes, false)); EXPECT_EQ(static_cast(1u * sizeof(elements[0])), num_bytes); // Add a waiter. waiter.Init(); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerAddWaiter(&waiter, MOJO_WAIT_FLAG_WRITABLE, 12)); // Close the consumer. dp->ConsumerClose(); // It should now be never-writable. EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, waiter.Wait(1000)); dp->ProducerRemoveWaiter(&waiter); dp->ProducerClose(); } TEST(LocalDataPipeTest, BasicConsumerWaiting) { const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. static_cast(sizeof(int32_t)), // |element_num_bytes|. 1000 * sizeof(int32_t) // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); { scoped_refptr dp(new LocalDataPipe(validated_options)); Waiter waiter; // Never writable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_WRITABLE, 12)); // Not yet readable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 34)); EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0)); dp->ConsumerRemoveWaiter(&waiter); // Write two elements. int32_t elements[2] = { 123, 456 }; uint32_t num_bytes = static_cast(2u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(elements, &num_bytes, true)); // Should already be readable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 56)); // Discard one element. num_bytes = static_cast(1u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerDiscardData(&num_bytes, true)); EXPECT_EQ(static_cast(1u * sizeof(elements[0])), num_bytes); // Should still be readable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 78)); // Read one element. elements[0] = -1; elements[1] = -1; num_bytes = static_cast(1u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(elements, &num_bytes, true)); EXPECT_EQ(static_cast(1u * sizeof(elements[0])), num_bytes); EXPECT_EQ(456, elements[0]); EXPECT_EQ(-1, elements[1]); // Adding a waiter should now succeed. waiter.Init(); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 90)); // Write one element. elements[0] = 789; elements[1] = -1; num_bytes = static_cast(1u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(elements, &num_bytes, true)); // Waiting should now succeed. EXPECT_EQ(90, waiter.Wait(1000)); dp->ConsumerRemoveWaiter(&waiter); // Close the producer. dp->ProducerClose(); // Should still be readable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 12)); // Read one element. elements[0] = -1; elements[1] = -1; num_bytes = static_cast(1u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(elements, &num_bytes, true)); EXPECT_EQ(static_cast(1u * sizeof(elements[0])), num_bytes); EXPECT_EQ(789, elements[0]); EXPECT_EQ(-1, elements[1]); // Should be never-readable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 34)); dp->ConsumerClose(); } // Test with two-phase APIs and closing the producer with an active consumer // waiter. { scoped_refptr dp(new LocalDataPipe(validated_options)); Waiter waiter; // Write two elements. int32_t* elements = NULL; void* buffer = NULL; // Request room for three (but we'll only write two). uint32_t num_bytes = static_cast(3u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerBeginWriteData(&buffer, &num_bytes, true)); EXPECT_TRUE(buffer != NULL); EXPECT_GE(num_bytes, static_cast(3u * sizeof(elements[0]))); elements = static_cast(buffer); elements[0] = 123; elements[1] = 456; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData( static_cast(2u * sizeof(elements[0])))); // Should already be readable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 12)); // Read one element. // Request two in all-or-none mode, but only read one. const void* read_buffer = NULL; num_bytes = static_cast(2u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerBeginReadData(&read_buffer, &num_bytes, true)); EXPECT_TRUE(read_buffer != NULL); EXPECT_EQ(static_cast(2u * sizeof(elements[0])), num_bytes); const int32_t* read_elements = static_cast(read_buffer); EXPECT_EQ(123, read_elements[0]); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData( static_cast(1u * sizeof(elements[0])))); // Should still be readable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 34)); // Read one element. // Request three, but not in all-or-none mode. read_buffer = NULL; num_bytes = static_cast(3u * sizeof(elements[0])); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerBeginReadData(&read_buffer, &num_bytes, false)); EXPECT_TRUE(read_buffer != NULL); EXPECT_EQ(static_cast(1u * sizeof(elements[0])), num_bytes); read_elements = static_cast(read_buffer); EXPECT_EQ(456, read_elements[0]); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData( static_cast(1u * sizeof(elements[0])))); // Adding a waiter should now succeed. waiter.Init(); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 56)); // Close the producer. dp->ProducerClose(); // Should be never-readable. EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, waiter.Wait(1000)); dp->ConsumerRemoveWaiter(&waiter); dp->ConsumerClose(); } } // Tests that data pipes aren't writable/readable during two-phase writes/reads. TEST(LocalDataPipeTest, BasicTwoPhaseWaiting) { const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. static_cast(sizeof(int32_t)), // |element_num_bytes|. 1000 * sizeof(int32_t) // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); scoped_refptr dp(new LocalDataPipe(validated_options)); Waiter waiter; // It should be writable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ProducerAddWaiter(&waiter, MOJO_WAIT_FLAG_WRITABLE, 0)); uint32_t num_bytes = static_cast(1u * sizeof(int32_t)); void* write_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerBeginWriteData(&write_ptr, &num_bytes, false)); EXPECT_TRUE(write_ptr != NULL); EXPECT_GE(num_bytes, static_cast(1u * sizeof(int32_t))); // At this point, it shouldn't be writable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerAddWaiter(&waiter, MOJO_WAIT_FLAG_WRITABLE, 1)); EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0)); dp->ProducerRemoveWaiter(&waiter); // It shouldn't be readable yet either. waiter.Init(); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 2)); EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0)); dp->ConsumerRemoveWaiter(&waiter); static_cast(write_ptr)[0] = 123; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData( static_cast(1u * sizeof(int32_t)))); // It should be writable again. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ProducerAddWaiter(&waiter, MOJO_WAIT_FLAG_WRITABLE, 3)); // And readable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 4)); // Start another two-phase write and check that it's readable even in the // middle of it. num_bytes = static_cast(1u * sizeof(int32_t)); write_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerBeginWriteData(&write_ptr, &num_bytes, false)); EXPECT_TRUE(write_ptr != NULL); EXPECT_GE(num_bytes, static_cast(1u * sizeof(int32_t))); // It should be readable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 5)); // End the two-phase write without writing anything. EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(0u)); // Start a two-phase read. num_bytes = static_cast(1u * sizeof(int32_t)); const void* read_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerBeginReadData(&read_ptr, &num_bytes, false)); EXPECT_TRUE(read_ptr != NULL); EXPECT_EQ(static_cast(1u * sizeof(int32_t)), num_bytes); // At this point, it should still be writable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ProducerAddWaiter(&waiter, MOJO_WAIT_FLAG_WRITABLE, 6)); // But not readable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 7)); EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0)); dp->ConsumerRemoveWaiter(&waiter); // End the two-phase read without reading anything. EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData(0u)); // It should be readable again. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 8)); dp->ProducerClose(); dp->ConsumerClose(); } // Test that a "may discard" data pipe is writable even when it's full. TEST(LocalDataPipeTest, BasicMayDiscardWaiting) { const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD, // |flags|. static_cast(sizeof(int32_t)), // |element_num_bytes|. 1 * sizeof(int32_t) // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); scoped_refptr dp(new LocalDataPipe(validated_options)); Waiter waiter; // Writable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ProducerAddWaiter(&waiter, MOJO_WAIT_FLAG_WRITABLE, 0)); // Not readable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 1)); EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0)); dp->ConsumerRemoveWaiter(&waiter); uint32_t num_bytes = static_cast(sizeof(int32_t)); int32_t element = 123; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(&element, &num_bytes, false)); EXPECT_EQ(static_cast(sizeof(int32_t)), num_bytes); // Still writable (even though it's full). waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ProducerAddWaiter(&waiter, MOJO_WAIT_FLAG_WRITABLE, 2)); // Now readable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 3)); // Overwrite that element. num_bytes = static_cast(sizeof(int32_t)); element = 456; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(&element, &num_bytes, false)); EXPECT_EQ(static_cast(sizeof(int32_t)), num_bytes); // Still writable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ProducerAddWaiter(&waiter, MOJO_WAIT_FLAG_WRITABLE, 4)); // And still readable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 5)); // Read that element. num_bytes = static_cast(sizeof(int32_t)); element = 0; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(&element, &num_bytes, false)); EXPECT_EQ(static_cast(sizeof(int32_t)), num_bytes); EXPECT_EQ(456, element); // Still writable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, dp->ProducerAddWaiter(&waiter, MOJO_WAIT_FLAG_WRITABLE, 6)); // No longer readable. waiter.Init(); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerAddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 7)); EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0)); dp->ConsumerRemoveWaiter(&waiter); dp->ProducerClose(); dp->ConsumerClose(); } void Seq(int32_t start, size_t count, int32_t* out) { for (size_t i = 0; i < count; i++) out[i] = start + static_cast(i); } TEST(LocalDataPipeTest, MayDiscard) { const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD, // |flags|. static_cast(sizeof(int32_t)), // |element_num_bytes|. 10 * sizeof(int32_t) // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); scoped_refptr dp(new LocalDataPipe(validated_options)); int32_t buffer[100] = { 0 }; uint32_t num_bytes = 0; num_bytes = 20u * sizeof(int32_t); Seq(0, arraysize(buffer), buffer); // Try writing more than capacity. (This test relies on the implementation // enforcing the capacity strictly.) EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(buffer, &num_bytes, false)); EXPECT_EQ(10u * sizeof(int32_t), num_bytes); // Read half of what we wrote. num_bytes = 5u * sizeof(int32_t); memset(buffer, 0xab, sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(buffer, &num_bytes, false)); EXPECT_EQ(5u * sizeof(int32_t), num_bytes); int32_t expected_buffer[100]; memset(expected_buffer, 0xab, sizeof(expected_buffer)); Seq(0, 5u, expected_buffer); EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); // Internally, a circular buffer would now look like: // -, -, -, -, -, 5, 6, 7, 8, 9 // Write a bit more than the space that's available. num_bytes = 8u * sizeof(int32_t); Seq(100, arraysize(buffer), buffer); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(buffer, &num_bytes, false)); EXPECT_EQ(8u * sizeof(int32_t), num_bytes); // Internally, a circular buffer would now look like: // 100, 101, 102, 103, 104, 105, 106, 107, 8, 9 // Read half of what's available. num_bytes = 5u * sizeof(int32_t); memset(buffer, 0xab, sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(buffer, &num_bytes, false)); EXPECT_EQ(5u * sizeof(int32_t), num_bytes); memset(expected_buffer, 0xab, sizeof(expected_buffer)); expected_buffer[0] = 8; expected_buffer[1] = 9; expected_buffer[2] = 100; expected_buffer[3] = 101; expected_buffer[4] = 102; EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); // Internally, a circular buffer would now look like: // -, -, -, 103, 104, 105, 106, 107, -, - // Write one integer. num_bytes = 1u * sizeof(int32_t); Seq(200, arraysize(buffer), buffer); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(buffer, &num_bytes, false)); EXPECT_EQ(1u * sizeof(int32_t), num_bytes); // Internally, a circular buffer would now look like: // -, -, -, 103, 104, 105, 106, 107, 200, - // Write five more. num_bytes = 5u * sizeof(int32_t); Seq(300, arraysize(buffer), buffer); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(buffer, &num_bytes, false)); EXPECT_EQ(5u * sizeof(int32_t), num_bytes); // Internally, a circular buffer would now look like: // 301, 302, 303, 304, 104, 105, 106, 107, 200, 300 // Read it all. num_bytes = sizeof(buffer); memset(buffer, 0xab, sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(buffer, &num_bytes, false)); EXPECT_EQ(10u * sizeof(int32_t), num_bytes); memset(expected_buffer, 0xab, sizeof(expected_buffer)); expected_buffer[0] = 104; expected_buffer[1] = 105; expected_buffer[2] = 106; expected_buffer[3] = 107; expected_buffer[4] = 200; expected_buffer[5] = 300; expected_buffer[6] = 301; expected_buffer[7] = 302; expected_buffer[8] = 303; expected_buffer[9] = 304; EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); // Test two-phase writes, including in all-or-none mode. // Note: Again, the following depends on an implementation detail -- namely // that the write pointer will point at the 5th element of the buffer (and the // buffer has exactly the capacity requested). num_bytes = 0u; void* write_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerBeginWriteData(&write_ptr, &num_bytes, false)); EXPECT_TRUE(write_ptr != NULL); EXPECT_EQ(6u * sizeof(int32_t), num_bytes); Seq(400, 6, static_cast(write_ptr)); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(6u * sizeof(int32_t))); // Internally, a circular buffer would now look like: // -, -, -, -, 400, 401, 402, 403, 404, 405 // |ProducerBeginWriteData()| ignores |*num_bytes| except in "all-or-none" // mode. num_bytes = 6u * sizeof(int32_t); write_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerBeginWriteData(&write_ptr, &num_bytes, false)); EXPECT_EQ(4u * sizeof(int32_t), num_bytes); static_cast(write_ptr)[0] = 500; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(1u * sizeof(int32_t))); // Internally, a circular buffer would now look like: // 500, -, -, -, 400, 401, 402, 403, 404, 405 // Requesting a 10-element buffer in all-or-none mode fails at this point. num_bytes = 10u * sizeof(int32_t); write_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, dp->ProducerBeginWriteData(&write_ptr, &num_bytes, true)); // But requesting, say, a 5-element (up to 9, really) buffer should be okay. // It will discard two elements. num_bytes = 5u * sizeof(int32_t); write_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerBeginWriteData(&write_ptr, &num_bytes, true)); EXPECT_EQ(5u * sizeof(int32_t), num_bytes); // Only write 4 elements though. Seq(600, 4, static_cast(write_ptr)); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(4u * sizeof(int32_t))); // Internally, a circular buffer would now look like: // 500, 600, 601, 602, 603, -, 402, 403, 404, 405 // Do this again. Make sure we can get a buffer all the way out to the end of // the internal buffer. num_bytes = 5u * sizeof(int32_t); write_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerBeginWriteData(&write_ptr, &num_bytes, true)); EXPECT_EQ(5u * sizeof(int32_t), num_bytes); // Only write 3 elements though. Seq(700, 3, static_cast(write_ptr)); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(3u * sizeof(int32_t))); // Internally, a circular buffer would now look like: // 500, 600, 601, 602, 603, 700, 701, 702, -, - // Read everything. num_bytes = sizeof(buffer); memset(buffer, 0xab, sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(buffer, &num_bytes, false)); EXPECT_EQ(8u * sizeof(int32_t), num_bytes); memset(expected_buffer, 0xab, sizeof(expected_buffer)); expected_buffer[0] = 500; expected_buffer[1] = 600; expected_buffer[2] = 601; expected_buffer[3] = 602; expected_buffer[4] = 603; expected_buffer[5] = 700; expected_buffer[6] = 701; expected_buffer[7] = 702; EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); dp->ProducerClose(); dp->ConsumerClose(); } TEST(LocalDataPipeTest, AllOrNone) { const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. static_cast(sizeof(int32_t)), // |element_num_bytes|. 10 * sizeof(int32_t) // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); scoped_refptr dp(new LocalDataPipe(validated_options)); // Try writing way too much. uint32_t num_bytes = 20u * sizeof(int32_t); int32_t buffer[100]; Seq(0, arraysize(buffer), buffer); EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, dp->ProducerWriteData(buffer, &num_bytes, true)); // Should still be empty. num_bytes = ~0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(0u, num_bytes); // Write some data. num_bytes = 5u * sizeof(int32_t); Seq(100, arraysize(buffer), buffer); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(buffer, &num_bytes, true)); EXPECT_EQ(5u * sizeof(int32_t), num_bytes); // Half full. num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(5u * sizeof(int32_t), num_bytes); // Too much. num_bytes = 6u * sizeof(int32_t); Seq(200, arraysize(buffer), buffer); EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, dp->ProducerWriteData(buffer, &num_bytes, true)); // Try reading too much. num_bytes = 11u * sizeof(int32_t); memset(buffer, 0xab, sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, dp->ConsumerReadData(buffer, &num_bytes, true)); int32_t expected_buffer[100]; memset(expected_buffer, 0xab, sizeof(expected_buffer)); EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); // Try discarding too much. num_bytes = 11u * sizeof(int32_t); EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, dp->ConsumerDiscardData(&num_bytes, true)); // Just a little. num_bytes = 2u * sizeof(int32_t); Seq(300, arraysize(buffer), buffer); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(buffer, &num_bytes, true)); EXPECT_EQ(2u * sizeof(int32_t), num_bytes); // Just right. num_bytes = 3u * sizeof(int32_t); Seq(400, arraysize(buffer), buffer); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(buffer, &num_bytes, true)); EXPECT_EQ(3u * sizeof(int32_t), num_bytes); // Exactly full. num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(10u * sizeof(int32_t), num_bytes); // Read half. num_bytes = 5u * sizeof(int32_t); memset(buffer, 0xab, sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(buffer, &num_bytes, true)); EXPECT_EQ(5u * sizeof(int32_t), num_bytes); memset(expected_buffer, 0xab, sizeof(expected_buffer)); Seq(100, 5, expected_buffer); EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); // Try reading too much again. num_bytes = 6u * sizeof(int32_t); memset(buffer, 0xab, sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, dp->ConsumerReadData(buffer, &num_bytes, true)); memset(expected_buffer, 0xab, sizeof(expected_buffer)); EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); // Try discarding too much again. num_bytes = 6u * sizeof(int32_t); EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, dp->ConsumerDiscardData(&num_bytes, true)); // Discard a little. num_bytes = 2u * sizeof(int32_t); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerDiscardData(&num_bytes, true)); EXPECT_EQ(2u * sizeof(int32_t), num_bytes); // Three left. num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(3u * sizeof(int32_t), num_bytes); // Close the producer, then test producer-closed cases. dp->ProducerClose(); // Try reading too much; "failed precondition" since the producer is closed. num_bytes = 4u * sizeof(int32_t); memset(buffer, 0xab, sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ConsumerReadData(buffer, &num_bytes, true)); memset(expected_buffer, 0xab, sizeof(expected_buffer)); EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); // Try discarding too much; "failed precondition" again. num_bytes = 4u * sizeof(int32_t); EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ConsumerDiscardData(&num_bytes, true)); // Read a little. num_bytes = 2u * sizeof(int32_t); memset(buffer, 0xab, sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(buffer, &num_bytes, true)); EXPECT_EQ(2u * sizeof(int32_t), num_bytes); memset(expected_buffer, 0xab, sizeof(expected_buffer)); Seq(400, 2, expected_buffer); EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); // Discard the remaining element. num_bytes = 1u * sizeof(int32_t); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerDiscardData(&num_bytes, true)); EXPECT_EQ(1u * sizeof(int32_t), num_bytes); // Empty again. num_bytes = ~0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(0u, num_bytes); dp->ConsumerClose(); } TEST(LocalDataPipeTest, AllOrNoneMayDiscard) { const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD, // |flags|. static_cast(sizeof(int32_t)), // |element_num_bytes|. 10 * sizeof(int32_t) // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); scoped_refptr dp(new LocalDataPipe(validated_options)); // Try writing way too much. uint32_t num_bytes = 20u * sizeof(int32_t); int32_t buffer[100]; Seq(0, arraysize(buffer), buffer); EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, dp->ProducerWriteData(buffer, &num_bytes, true)); // Write some stuff. num_bytes = 5u * sizeof(int32_t); Seq(100, arraysize(buffer), buffer); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(buffer, &num_bytes, true)); EXPECT_EQ(5u * sizeof(int32_t), num_bytes); // Write lots of stuff (discarding all but "104"). num_bytes = 9u * sizeof(int32_t); Seq(200, arraysize(buffer), buffer); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(buffer, &num_bytes, true)); EXPECT_EQ(9u * sizeof(int32_t), num_bytes); // Read one. num_bytes = 1u * sizeof(int32_t); memset(buffer, 0xab, sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(buffer, &num_bytes, true)); EXPECT_EQ(1u * sizeof(int32_t), num_bytes); int32_t expected_buffer[100]; memset(expected_buffer, 0xab, sizeof(expected_buffer)); expected_buffer[0] = 104; EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); // Try reading too many. num_bytes = 10u * sizeof(int32_t); memset(buffer, 0xab, sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, dp->ConsumerReadData(buffer, &num_bytes, true)); memset(expected_buffer, 0xab, sizeof(expected_buffer)); EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); // Try discarding too many. num_bytes = 10u * sizeof(int32_t); EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, dp->ConsumerDiscardData(&num_bytes, true)); // Discard a bunch. num_bytes = 4u * sizeof(int32_t); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerDiscardData(&num_bytes, true)); // Half full. num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(5u * sizeof(int32_t), num_bytes); // Write as much as possible. num_bytes = 10u * sizeof(int32_t); Seq(300, arraysize(buffer), buffer); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(buffer, &num_bytes, true)); EXPECT_EQ(10u * sizeof(int32_t), num_bytes); // Read everything. num_bytes = 10u * sizeof(int32_t); memset(buffer, 0xab, sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(buffer, &num_bytes, true)); memset(expected_buffer, 0xab, sizeof(expected_buffer)); EXPECT_EQ(10u * sizeof(int32_t), num_bytes); Seq(300, 10, expected_buffer); EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); // Note: All-or-none two-phase writes on a "may discard" data pipe are tested // in LocalDataPipeTest.MayDiscard. dp->ProducerClose(); dp->ConsumerClose(); } TEST(LocalDataPipeTest, TwoPhaseAllOrNone) { const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. static_cast(sizeof(int32_t)), // |element_num_bytes|. 10 * sizeof(int32_t) // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); scoped_refptr dp(new LocalDataPipe(validated_options)); // Try writing way too much (two-phase). uint32_t num_bytes = 20u * sizeof(int32_t); void* write_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, dp->ProducerBeginWriteData(&write_ptr, &num_bytes, true)); // Try writing an amount which isn't a multiple of the element size // (two-phase). COMPILE_ASSERT(sizeof(int32_t) > 1u, wow_int32_ts_have_size_1); num_bytes = 1u; write_ptr = NULL; EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, dp->ProducerBeginWriteData(&write_ptr, &num_bytes, true)); // Try reading way too much (two-phase). num_bytes = 20u * sizeof(int32_t); const void* read_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, dp->ConsumerBeginReadData(&read_ptr, &num_bytes, true)); // Write half (two-phase). num_bytes = 5u * sizeof(int32_t); write_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerBeginWriteData(&write_ptr, &num_bytes, true)); // May provide more space than requested. EXPECT_GE(num_bytes, 5u * sizeof(int32_t)); EXPECT_TRUE(write_ptr != NULL); Seq(0, 5, static_cast(write_ptr)); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(5u * sizeof(int32_t))); // Try reading an amount which isn't a multiple of the element size // (two-phase). num_bytes = 1u; read_ptr = NULL; EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, dp->ConsumerBeginReadData(&read_ptr, &num_bytes, true)); // Read one (two-phase). num_bytes = 1u * sizeof(int32_t); read_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerBeginReadData(&read_ptr, &num_bytes, true)); EXPECT_GE(num_bytes, 1u * sizeof(int32_t)); EXPECT_EQ(0, static_cast(read_ptr)[0]); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData(1u * sizeof(int32_t))); // We should have four left, leaving room for six. num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(4u * sizeof(int32_t), num_bytes); // Assuming a tight circular buffer of the specified capacity, we can't do a // two-phase write of six now. num_bytes = 6u * sizeof(int32_t); write_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, dp->ProducerBeginWriteData(&write_ptr, &num_bytes, true)); // Write six elements (simple), filling the buffer. num_bytes = 6u * sizeof(int32_t); int32_t buffer[100]; Seq(100, 6, buffer); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(buffer, &num_bytes, true)); EXPECT_EQ(6u * sizeof(int32_t), num_bytes); // We have ten. num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(10u * sizeof(int32_t), num_bytes); // But a two-phase read of ten should fail. num_bytes = 10u * sizeof(int32_t); read_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, dp->ConsumerBeginReadData(&read_ptr, &num_bytes, true)); // Close the producer. dp->ProducerClose(); // A two-phase read of nine should work. num_bytes = 9u * sizeof(int32_t); read_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerBeginReadData(&read_ptr, &num_bytes, true)); EXPECT_GE(num_bytes, 9u * sizeof(int32_t)); EXPECT_EQ(1, static_cast(read_ptr)[0]); EXPECT_EQ(2, static_cast(read_ptr)[1]); EXPECT_EQ(3, static_cast(read_ptr)[2]); EXPECT_EQ(4, static_cast(read_ptr)[3]); EXPECT_EQ(100, static_cast(read_ptr)[4]); EXPECT_EQ(101, static_cast(read_ptr)[5]); EXPECT_EQ(102, static_cast(read_ptr)[6]); EXPECT_EQ(103, static_cast(read_ptr)[7]); EXPECT_EQ(104, static_cast(read_ptr)[8]); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData(9u * sizeof(int32_t))); // A two-phase read of two should fail, with "failed precondition". num_bytes = 2u * sizeof(int32_t); read_ptr = NULL; EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ConsumerBeginReadData(&read_ptr, &num_bytes, true)); dp->ConsumerClose(); } // Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads, // respectively, as much as possible, even if it has to "wrap around" the // internal circular buffer. (Note that the two-phase write and read do not do // this.) TEST(LocalDataPipeTest, WrapAround) { unsigned char test_data[1000]; for (size_t i = 0; i < arraysize(test_data); i++) test_data[i] = static_cast(i); const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1u, // |element_num_bytes|. 100u // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); // This test won't be valid if |ValidateOptions()| decides to give the pipe // more space. ASSERT_EQ(100u, validated_options.capacity_num_bytes); scoped_refptr dp(new LocalDataPipe(validated_options)); // Write 20 bytes. uint32_t num_bytes = 20u; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(&test_data[0], &num_bytes, false)); EXPECT_EQ(20u, num_bytes); // Read 10 bytes. unsigned char read_buffer[1000] = { 0 }; num_bytes = 10u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(read_buffer, &num_bytes, false)); EXPECT_EQ(10u, num_bytes); EXPECT_EQ(0, memcmp(read_buffer, &test_data[0], 10u)); // Check that a two-phase write can now only write (at most) 80 bytes. (This // checks an implementation detail; this behavior is not guaranteed, but we // need it for this test.) void* write_buffer_ptr = NULL; num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerBeginWriteData(&write_buffer_ptr, &num_bytes, false)); EXPECT_TRUE(write_buffer_ptr != NULL); EXPECT_EQ(80u, num_bytes); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(0u)); // Write as much data as we can (using |ProducerWriteData()|). We should write // 90 bytes. num_bytes = 200u; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(&test_data[20], &num_bytes, false)); EXPECT_EQ(90u, num_bytes); // Check that a two-phase read can now only read (at most) 90 bytes. (This // checks an implementation detail; this behavior is not guaranteed, but we // need it for this test.) const void* read_buffer_ptr = NULL; num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerBeginReadData(&read_buffer_ptr, &num_bytes, false)); EXPECT_TRUE(read_buffer_ptr != NULL); EXPECT_EQ(90u, num_bytes); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData(0u)); // Read as much as possible (using |ConsumerReadData()|). We should read 100 // bytes. num_bytes = static_cast(arraysize(read_buffer) * sizeof(read_buffer[0])); memset(read_buffer, 0, num_bytes); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(read_buffer, &num_bytes, false)); EXPECT_EQ(100u, num_bytes); EXPECT_EQ(0, memcmp(read_buffer, &test_data[10], 100u)); dp->ProducerClose(); dp->ConsumerClose(); } // Tests the behavior of closing the producer or consumer with respect to // writes and reads (simple and two-phase). TEST(LocalDataPipeTest, CloseWriteRead) { const char kTestData[] = "hello world"; const uint32_t kTestDataSize = static_cast(sizeof(kTestData)); const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. 1u, // |element_num_bytes|. 1000u // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); // Close producer first, then consumer. { scoped_refptr dp(new LocalDataPipe(validated_options)); // Write some data, so we'll have something to read. uint32_t num_bytes = kTestDataSize; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(kTestData, &num_bytes, false)); EXPECT_EQ(kTestDataSize, num_bytes); // Write it again, so we'll have something left over. num_bytes = kTestDataSize; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(kTestData, &num_bytes, false)); EXPECT_EQ(kTestDataSize, num_bytes); // Start two-phase write. void* write_buffer_ptr = NULL; num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerBeginWriteData(&write_buffer_ptr, &num_bytes, false)); EXPECT_TRUE(write_buffer_ptr != NULL); EXPECT_GT(num_bytes, 0u); // Start two-phase read. const void* read_buffer_ptr = NULL; num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerBeginReadData(&read_buffer_ptr, &num_bytes, false)); EXPECT_TRUE(read_buffer_ptr != NULL); EXPECT_EQ(2u * kTestDataSize, num_bytes); // Close the producer. dp->ProducerClose(); // The consumer can finish its two-phase read. EXPECT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize)); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData(kTestDataSize)); // And start another. read_buffer_ptr = NULL; num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerBeginReadData(&read_buffer_ptr, &num_bytes, false)); EXPECT_TRUE(read_buffer_ptr != NULL); EXPECT_EQ(kTestDataSize, num_bytes); // Close the consumer, which cancels the two-phase read. dp->ConsumerClose(); } // Close consumer first, then producer. { scoped_refptr dp(new LocalDataPipe(validated_options)); // Write some data, so we'll have something to read. uint32_t num_bytes = kTestDataSize; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(kTestData, &num_bytes, false)); EXPECT_EQ(kTestDataSize, num_bytes); // Start two-phase write. void* write_buffer_ptr = NULL; num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerBeginWriteData(&write_buffer_ptr, &num_bytes, false)); EXPECT_TRUE(write_buffer_ptr != NULL); ASSERT_GT(num_bytes, kTestDataSize); // Start two-phase read. const void* read_buffer_ptr = NULL; num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerBeginReadData(&read_buffer_ptr, &num_bytes, false)); EXPECT_TRUE(read_buffer_ptr != NULL); EXPECT_EQ(kTestDataSize, num_bytes); // Close the consumer. dp->ConsumerClose(); // Actually write some data. (Note: Premature freeing of the buffer would // probably only be detected under ASAN or similar.) memcpy(write_buffer_ptr, kTestData, kTestDataSize); // Note: Even though the consumer has been closed, ending the two-phase // write will report success. EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(kTestDataSize)); // But trying to write should result in failure. num_bytes = kTestDataSize; EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ProducerWriteData(kTestData, &num_bytes, false)); // As will trying to start another two-phase write. write_buffer_ptr = NULL; num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ProducerBeginWriteData(&write_buffer_ptr, &num_bytes, false)); dp->ProducerClose(); } // Test closing the consumer first, then the producer, with an active // two-phase write. { scoped_refptr dp(new LocalDataPipe(validated_options)); // Start two-phase write. void* write_buffer_ptr = NULL; uint32_t num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerBeginWriteData(&write_buffer_ptr, &num_bytes, false)); EXPECT_TRUE(write_buffer_ptr != NULL); ASSERT_GT(num_bytes, kTestDataSize); dp->ConsumerClose(); dp->ProducerClose(); } // Test closing the producer and then trying to read (with no data). { scoped_refptr dp(new LocalDataPipe(validated_options)); // Write some data, so we'll have something to read. uint32_t num_bytes = kTestDataSize; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(kTestData, &num_bytes, false)); EXPECT_EQ(kTestDataSize, num_bytes); // Close the producer. dp->ProducerClose(); // Read that data. char buffer[1000]; num_bytes = static_cast(sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerReadData(buffer, &num_bytes, false)); EXPECT_EQ(kTestDataSize, num_bytes); EXPECT_EQ(0, memcmp(buffer, kTestData, kTestDataSize)); // A second read should fail. num_bytes = static_cast(sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ConsumerReadData(buffer, &num_bytes, false)); // A two-phase read should also fail. const void* read_buffer_ptr = NULL; num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ConsumerBeginReadData(&read_buffer_ptr, &num_bytes, false)); // Ditto for discard. num_bytes = 10u; EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ConsumerDiscardData(&num_bytes, false)); dp->ConsumerClose(); } } TEST(LocalDataPipeTest, TwoPhaseMoreInvalidArguments) { const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. static_cast(sizeof(int32_t)), // |element_num_bytes|. 10 * sizeof(int32_t) // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); scoped_refptr dp(new LocalDataPipe(validated_options)); // No data. uint32_t num_bytes = 1000u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(0u, num_bytes); // Try "ending" a two-phase write when one isn't active. EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ProducerEndWriteData(1u * sizeof(int32_t))); // Still no data. num_bytes = 1000u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(0u, num_bytes); // Try ending a two-phase write with an invalid amount (too much). num_bytes = 0u; void* write_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerBeginWriteData(&write_ptr, &num_bytes, false)); EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, dp->ProducerEndWriteData( num_bytes + static_cast(sizeof(int32_t)))); // But the two-phase write still ended. EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ProducerEndWriteData(0u)); // Still no data. num_bytes = 1000u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(0u, num_bytes); // Try ending a two-phase write with an invalid amount (not a multiple of the // element size). num_bytes = 0u; write_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerBeginWriteData(&write_ptr, &num_bytes, false)); EXPECT_GE(num_bytes, 1u); EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, dp->ProducerEndWriteData(1u)); // But the two-phase write still ended. EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ProducerEndWriteData(0u)); // Still no data. num_bytes = 1000u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(0u, num_bytes); // Now write some data, so we'll be able to try reading. int32_t element = 123; num_bytes = 1u * sizeof(int32_t); EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(&element, &num_bytes, false)); // One element available. num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(1u * sizeof(int32_t), num_bytes); // Try "ending" a two-phase read when one isn't active. EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ConsumerEndReadData(1u * sizeof(int32_t))); // Still one element available. num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(1u * sizeof(int32_t), num_bytes); // Try ending a two-phase read with an invalid amount (too much). num_bytes = 0u; const void* read_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerBeginReadData(&read_ptr, &num_bytes, false)); EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, dp->ConsumerEndReadData( num_bytes + static_cast(sizeof(int32_t)))); // Still one element available. num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(1u * sizeof(int32_t), num_bytes); // Try ending a two-phase read with an invalid amount (not a multiple of the // element size). num_bytes = 0u; read_ptr = NULL; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerBeginReadData(&read_ptr, &num_bytes, false)); EXPECT_EQ(1u * sizeof(int32_t), num_bytes); EXPECT_EQ(123, static_cast(read_ptr)[0]); EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, dp->ConsumerEndReadData(1u)); // Still one element available. num_bytes = 0u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(&num_bytes)); EXPECT_EQ(1u * sizeof(int32_t), num_bytes); dp->ProducerClose(); dp->ConsumerClose(); } // Tests that even with "may discard", the data won't change under a two-phase // read. // TODO(vtl): crbug.com/348644: We currently don't pass this. (There are two // related issues: First, we don't recognize that the data given to // |ConsumerBeginReadData()| isn't discardable until |ConsumerEndReadData()|, // and thus we erroneously allow |ProducerWriteData()| to succeed. Second, the // |ProducerWriteData()| then changes the data underneath the two-phase read.) TEST(LocalDataPipeTest, DISABLED_MayDiscardTwoPhaseConsistent) { const MojoCreateDataPipeOptions options = { kSizeOfOptions, // |struct_size|. MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD, // |flags|. 1, // |element_num_bytes|. 2 // |capacity_num_bytes|. }; MojoCreateDataPipeOptions validated_options = { 0 }; EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateOptions(&options, &validated_options)); scoped_refptr dp(new LocalDataPipe(validated_options)); // Write some elements. char elements[2] = { 'a', 'b' }; uint32_t num_bytes = 2u; EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(elements, &num_bytes, false)); EXPECT_EQ(2u, num_bytes); // Begin reading. const void* read_ptr = NULL; num_bytes = 2u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerBeginReadData(&read_ptr, &num_bytes, false)); EXPECT_EQ(2u, num_bytes); EXPECT_EQ('a', static_cast(read_ptr)[0]); EXPECT_EQ('b', static_cast(read_ptr)[1]); // Try to write some more. But nothing should be discardable right now. elements[0] = 'x'; elements[1] = 'y'; num_bytes = 2u; // TODO(vtl): This should be: // EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, // dp->ProducerWriteData(elements, &num_bytes, false)); // but we incorrectly think that the bytes being read are discardable. Letting // this through reveals the significant consequence. EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(elements, &num_bytes, false)); // Check that our read buffer hasn't changed underneath us. EXPECT_EQ('a', static_cast(read_ptr)[0]); EXPECT_EQ('b', static_cast(read_ptr)[1]); // End reading. EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData(2u)); // Now writing should succeed. EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerWriteData(elements, &num_bytes, false)); // And if we read, we should get the new values. read_ptr = NULL; num_bytes = 2u; EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerBeginReadData(&read_ptr, &num_bytes, false)); EXPECT_EQ(2u, num_bytes); EXPECT_EQ('x', static_cast(read_ptr)[0]); EXPECT_EQ('y', static_cast(read_ptr)[1]); // End reading. EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData(2u)); dp->ProducerClose(); dp->ConsumerClose(); } } // namespace } // namespace system } // namespace mojo