diff options
author | kmarshall <kmarshall@chromium.org> | 2015-12-02 16:27:30 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-12-03 00:30:56 +0000 |
commit | 840a0fe2a4cab8709b5d29b89cdb968b620953ff (patch) | |
tree | e3f6703a0e5e7cbc31b90e1575ab8a4eee30ba7d /blimp | |
parent | a18e826a670891d5804c8aced282761834741e8b (diff) | |
download | chromium_src-840a0fe2a4cab8709b5d29b89cdb968b620953ff.zip chromium_src-840a0fe2a4cab8709b5d29b89cdb968b620953ff.tar.gz chromium_src-840a0fe2a4cab8709b5d29b89cdb968b620953ff.tar.bz2 |
Make PacketReader/PacketWriter interfaces async-only.
Adapts synchronous I/O completion to use callbacks, so downstream
callers are insulated from having to manage the complexities of
mixed sync and async I/O.
Updated logic of all PacketReader/PacketWriter dependents.
R=haibinlu@chromium.org,wez@chromium.org
BUG=558643
Review URL: https://codereview.chromium.org/1452823011
Cr-Commit-Position: refs/heads/master@{#362833}
Diffstat (limited to 'blimp')
-rw-r--r-- | blimp/net/blimp_connection.cc | 12 | ||||
-rw-r--r-- | blimp/net/blimp_connection_unittest.cc | 105 | ||||
-rw-r--r-- | blimp/net/blimp_message_pump.cc | 40 | ||||
-rw-r--r-- | blimp/net/blimp_message_pump.h | 15 | ||||
-rw-r--r-- | blimp/net/blimp_message_pump_unittest.cc | 121 | ||||
-rw-r--r-- | blimp/net/packet_reader.h | 15 | ||||
-rw-r--r-- | blimp/net/packet_writer.h | 13 | ||||
-rw-r--r-- | blimp/net/stream_packet_reader.cc | 19 | ||||
-rw-r--r-- | blimp/net/stream_packet_reader.h | 4 | ||||
-rw-r--r-- | blimp/net/stream_packet_reader_unittest.cc | 99 | ||||
-rw-r--r-- | blimp/net/stream_packet_writer.cc | 26 | ||||
-rw-r--r-- | blimp/net/stream_packet_writer.h | 4 | ||||
-rw-r--r-- | blimp/net/stream_packet_writer_unittest.cc | 58 | ||||
-rw-r--r-- | blimp/net/test_common.h | 22 |
14 files changed, 190 insertions, 363 deletions
diff --git a/blimp/net/blimp_connection.cc b/blimp/net/blimp_connection.cc index a79ffe0..c5b0700 100644 --- a/blimp/net/blimp_connection.cc +++ b/blimp/net/blimp_connection.cc @@ -40,7 +40,6 @@ class BlimpMessageSender : public BlimpMessageProcessor { PacketWriter* writer_; ConnectionErrorObserver* error_observer_; scoped_refptr<net::DrainableIOBuffer> buffer_; - net::CancelableCompletionCallback write_packet_callback_; net::CompletionCallback pending_process_msg_callback_; DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender); @@ -72,19 +71,14 @@ void BlimpMessageSender::ProcessMessage( return; } - write_packet_callback_.Reset(base::Bind( - &BlimpMessageSender::OnWritePacketComplete, base::Unretained(this))); pending_process_msg_callback_ = callback; - int result = writer_->WritePacket(buffer_, write_packet_callback_.callback()); - if (result != net::ERR_IO_PENDING) { - base::MessageLoop::current()->PostTask( - FROM_HERE, base::Bind(write_packet_callback_.callback(), result)); - } + writer_->WritePacket(buffer_, + base::Bind(&BlimpMessageSender::OnWritePacketComplete, + base::Unretained(this))); } void BlimpMessageSender::OnWritePacketComplete(int result) { DCHECK_NE(net::ERR_IO_PENDING, result); - write_packet_callback_.Cancel(); base::ResetAndReturn(&pending_process_msg_callback_).Run(result); if (result != net::OK) { error_observer_->OnConnectionError(result); diff --git a/blimp/net/blimp_connection_unittest.cc b/blimp/net/blimp_connection_unittest.cc index 66d0658..759b426 100644 --- a/blimp/net/blimp_connection_unittest.cc +++ b/blimp/net/blimp_connection_unittest.cc @@ -20,9 +20,7 @@ #include "testing/gtest/include/gtest/gtest.h" using testing::_; -using testing::DoAll; using testing::InSequence; -using testing::NotNull; using testing::Return; using testing::SaveArg; @@ -32,21 +30,17 @@ namespace { class BlimpConnectionTest : public testing::Test { public: BlimpConnectionTest() { - message1_ = CreateInputMessage(); - message2_ = CreateControlMessage(); - scoped_ptr<testing::StrictMock<MockPacketReader>> reader( - new testing::StrictMock<MockPacketReader>); - reader_ = reader.get(); scoped_ptr<testing::StrictMock<MockPacketWriter>> writer( new testing::StrictMock<MockPacketWriter>); writer_ = writer.get(); - connection_.reset(new BlimpConnection(std::move(reader), + connection_.reset(new BlimpConnection(make_scoped_ptr(new MockPacketReader), std::move(writer))); connection_->SetConnectionErrorObserver(&error_observer_); } ~BlimpConnectionTest() override {} + protected: scoped_ptr<BlimpMessage> CreateInputMessage() { scoped_ptr<BlimpMessage> msg(new BlimpMessage); msg->set_type(BlimpMessage::INPUT); @@ -59,93 +53,25 @@ class BlimpConnectionTest : public testing::Test { return msg; } - protected: - base::MessageLoopForIO message_loop_; - scoped_ptr<BlimpMessage> message1_; - scoped_ptr<BlimpMessage> message2_; - - testing::StrictMock<MockPacketReader>* reader_; + base::MessageLoop message_loop_; testing::StrictMock<MockPacketWriter>* writer_; testing::StrictMock<MockConnectionErrorObserver> error_observer_; testing::StrictMock<MockBlimpMessageProcessor> receiver_; scoped_ptr<BlimpConnection> connection_; }; -// Reader completes reading one packet synchronously. -// Two read cases here. BlimpMessagePumpTest covers other cases. -TEST_F(BlimpConnectionTest, SyncPacketRead) { - EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _)); - EXPECT_CALL(*reader_, ReadPacket(NotNull(), _)) - .WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()), - Return(message1_->ByteSize()))); - connection_->SetIncomingMessageProcessor(&receiver_); -} - -// Reader completes reading one packet synchronously withe error. -TEST_F(BlimpConnectionTest, SyncPacketReadWithError) { - InSequence s; - EXPECT_CALL(*reader_, ReadPacket(NotNull(), _)) - .WillOnce(Return(net::ERR_FAILED)); - EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED)); - connection_->SetIncomingMessageProcessor(&receiver_); -} - -// Writer completes writing two packets synchronously. -TEST_F(BlimpConnectionTest, SyncTwoPacketsWrite) { - InSequence s; - EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _)) - .WillOnce(Return(net::OK)) - .RetiresOnSaturation(); - EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _)) - .WillOnce(Return(net::OK)) - .RetiresOnSaturation(); - - BlimpMessageProcessor* sender = connection_->GetOutgoingMessageProcessor(); - net::TestCompletionCallback complete_cb_1; - sender->ProcessMessage(CreateInputMessage(), - complete_cb_1.callback()); - EXPECT_EQ(net::OK, complete_cb_1.WaitForResult()); - net::TestCompletionCallback complete_cb_2; - sender->ProcessMessage(CreateControlMessage(), - complete_cb_2.callback()); - EXPECT_EQ(net::OK, complete_cb_2.WaitForResult()); -} - -// Writer completes writing two packets synchronously. -// First write succeeds, second fails. -TEST_F(BlimpConnectionTest, SyncTwoPacketsWriteWithError) { - InSequence s; - EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _)) - .WillOnce(Return(net::OK)) - .RetiresOnSaturation(); - EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _)) - .WillOnce(Return(net::ERR_FAILED)) - .RetiresOnSaturation(); - EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED)); - - BlimpMessageProcessor* sender = connection_->GetOutgoingMessageProcessor(); - net::TestCompletionCallback complete_cb_1; - sender->ProcessMessage(CreateInputMessage(), - complete_cb_1.callback()); - EXPECT_EQ(net::OK, complete_cb_1.WaitForResult()); - net::TestCompletionCallback complete_cb_2; - sender->ProcessMessage(CreateControlMessage(), - complete_cb_2.callback()); - EXPECT_EQ(net::ERR_FAILED, complete_cb_2.WaitForResult()); -} - // Write completes writing two packets asynchronously. TEST_F(BlimpConnectionTest, AsyncTwoPacketsWrite) { net::CompletionCallback write_packet_cb; InSequence s; - EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _)) - .WillOnce( - DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING))) + EXPECT_CALL(*writer_, + WritePacket(BufferEqualsProto(*CreateInputMessage()), _)) + .WillOnce(SaveArg<1>(&write_packet_cb)) .RetiresOnSaturation(); - EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _)) - .WillOnce( - DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING))) + EXPECT_CALL(*writer_, + WritePacket(BufferEqualsProto(*CreateControlMessage()), _)) + .WillOnce(SaveArg<1>(&write_packet_cb)) .RetiresOnSaturation(); BlimpMessageProcessor* sender = connection_->GetOutgoingMessageProcessor(); @@ -172,13 +98,13 @@ TEST_F(BlimpConnectionTest, AsyncTwoPacketsWriteWithError) { net::CompletionCallback write_packet_cb; InSequence s; - EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _)) - .WillOnce( - DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING))) + EXPECT_CALL(*writer_, + WritePacket(BufferEqualsProto(*CreateInputMessage()), _)) + .WillOnce(SaveArg<1>(&write_packet_cb)) .RetiresOnSaturation(); - EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _)) - .WillOnce( - DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING))) + EXPECT_CALL(*writer_, + WritePacket(BufferEqualsProto(*CreateControlMessage()), _)) + .WillOnce(SaveArg<1>(&write_packet_cb)) .RetiresOnSaturation(); EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED)); @@ -197,5 +123,4 @@ TEST_F(BlimpConnectionTest, AsyncTwoPacketsWriteWithError) { } } // namespace - } // namespace blimp diff --git a/blimp/net/blimp_message_pump.cc b/blimp/net/blimp_message_pump.cc index d9a250a..79082e0 100644 --- a/blimp/net/blimp_message_pump.cc +++ b/blimp/net/blimp_message_pump.cc @@ -21,58 +21,46 @@ BlimpMessagePump::BlimpMessagePump(PacketReader* reader) processor_(nullptr), buffer_(new net::GrowableIOBuffer) { DCHECK(reader_); + buffer_->SetCapacity(kMaxPacketPayloadSizeBytes); } BlimpMessagePump::~BlimpMessagePump() {} void BlimpMessagePump::SetMessageProcessor(BlimpMessageProcessor* processor) { - process_msg_callback_.Cancel(); + DCHECK(!processor_); processor_ = processor; - if (!processor_) { - read_packet_callback_.Cancel(); - } else if (read_packet_callback_.IsCancelled()) { - buffer_->SetCapacity(kMaxPacketPayloadSizeBytes); - ReadNextPacket(); - } + ReadNextPacket(); } void BlimpMessagePump::ReadNextPacket() { DCHECK(processor_); buffer_->set_offset(0); - read_packet_callback_.Reset(base::Bind( - &BlimpMessagePump::OnReadPacketComplete, base::Unretained(this))); - int result = - reader_->ReadPacket(buffer_.get(), read_packet_callback_.callback()); - if (result != net::ERR_IO_PENDING) { - // Read completed synchronously. - OnReadPacketComplete(result); - } + read_callback_.Reset(base::Bind(&BlimpMessagePump::OnReadPacketComplete, + base::Unretained(this))); + reader_->ReadPacket(buffer_.get(), read_callback_.callback()); } void BlimpMessagePump::OnReadPacketComplete(int result) { - read_packet_callback_.Cancel(); - if (result > 0) { - // The result is the size of the packet in bytes. + if (result == net::OK) { scoped_ptr<BlimpMessage> message(new BlimpMessage); - bool parse_result = - message->ParseFromArray(buffer_->StartOfBuffer(), result); - if (parse_result) { + if (message->ParseFromArray(buffer_->StartOfBuffer(), buffer_->offset())) { process_msg_callback_.Reset(base::Bind( &BlimpMessagePump::OnProcessMessageComplete, base::Unretained(this))); processor_->ProcessMessage(std::move(message), process_msg_callback_.callback()); - return; + } else { + result = net::ERR_FAILED; } - result = net::ERR_FAILED; } - if (error_observer_) + + if (result != net::OK) { error_observer_->OnConnectionError(result); + } } void BlimpMessagePump::OnProcessMessageComplete(int result) { // No error is expected from the message receiver. - DCHECK_EQ(result, net::OK); - process_msg_callback_.Cancel(); + DCHECK_EQ(net::OK, result); ReadNextPacket(); } diff --git a/blimp/net/blimp_message_pump.h b/blimp/net/blimp_message_pump.h index e8fc0aa..d86b750 100644 --- a/blimp/net/blimp_message_pump.h +++ b/blimp/net/blimp_message_pump.h @@ -26,13 +26,12 @@ class PacketReader; // message, the BlimpMessagePump reads the next packet. class BLIMP_NET_EXPORT BlimpMessagePump { public: - // Caller ensures |reader| outlive this object. + // Caller ensures that |reader| outlives this object. explicit BlimpMessagePump(PacketReader* reader); ~BlimpMessagePump(); - // Sets the processor which will take blimp messages. - // Can be set multiple times, but previously set processors are discarded. + // Sets the processor which will take BlimpMessages. Can only be set once. // Caller retains the ownership of |processor|. void SetMessageProcessor(BlimpMessageProcessor* processor); @@ -47,16 +46,22 @@ class BLIMP_NET_EXPORT BlimpMessagePump { // Callback when next packet is ready in |buffer_|. void OnReadPacketComplete(int result); - // Callback when |processor_| finishes processing a blimp message. + // Callback when |processor_| finishes processing a BlimpMessage. void OnProcessMessageComplete(int result); PacketReader* reader_; ConnectionErrorObserver* error_observer_; BlimpMessageProcessor* processor_; scoped_refptr<net::GrowableIOBuffer> buffer_; - net::CancelableCompletionCallback read_packet_callback_; + + // Cancelled in the event that the connection is destroyed (along with + // |this|) while a inflight callback is held by |processor_|. net::CancelableCompletionCallback process_msg_callback_; + // Cancelled to guard against |this| being called back from a completed read + // operation. + net::CancelableCompletionCallback read_callback_; + DISALLOW_COPY_AND_ASSIGN(BlimpMessagePump); }; diff --git a/blimp/net/blimp_message_pump_unittest.cc b/blimp/net/blimp_message_pump_unittest.cc index 7f9ab11..20760af 100644 --- a/blimp/net/blimp_message_pump_unittest.cc +++ b/blimp/net/blimp_message_pump_unittest.cc @@ -50,60 +50,34 @@ class BlimpMessagePumpTest : public testing::Test { scoped_ptr<BlimpMessagePump> message_pump_; }; -// Reader completes reading one packet synchronously. -TEST_F(BlimpMessagePumpTest, SyncPacketRead) { - EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _)); - EXPECT_CALL(reader_, ReadPacket(NotNull(), _)) - .WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()), - Return(message1_->ByteSize()))); - message_pump_->SetMessageProcessor(&receiver_); -} - -// Reader completes reading two packets synchronously. -TEST_F(BlimpMessagePumpTest, SyncTwoPacketsRead) { - EXPECT_CALL(reader_, ReadPacket(NotNull(), _)) - .WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()), - Return(message1_->ByteSize()))) - .WillOnce(DoAll(FillBufferFromMessage<0>(message2_.get()), - Return(message2_->ByteSize()))); - net::CompletionCallback process_msg_cb; - { - InSequence s; - EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _)) - .WillOnce(SaveArg<1>(&process_msg_cb)) - .RetiresOnSaturation(); - EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message2_), _)); - } - message_pump_->SetMessageProcessor(&receiver_); - - // Trigger next packet read - process_msg_cb.Run(net::OK); -} - // Reader completes reading one packet asynchronously. -TEST_F(BlimpMessagePumpTest, AsyncPacketRead) { +TEST_F(BlimpMessagePumpTest, ReadPacket) { net::CompletionCallback read_packet_cb; + EXPECT_CALL(reader_, ReadPacket(NotNull(), _)); EXPECT_CALL(reader_, ReadPacket(NotNull(), _)) .WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()), - SaveArg<1>(&read_packet_cb), Return(net::ERR_IO_PENDING))) - .WillOnce(Return(net::ERR_IO_PENDING)); + SetBufferOffset<0>(message1_->ByteSize()), + SaveArg<1>(&read_packet_cb))) + .RetiresOnSaturation(); net::CompletionCallback process_msg_cb; EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _)) .WillOnce(SaveArg<1>(&process_msg_cb)); message_pump_->SetMessageProcessor(&receiver_); - read_packet_cb.Run(message1_->ByteSize()); + ASSERT_FALSE(read_packet_cb.is_null()); + base::ResetAndReturn(&read_packet_cb).Run(net::OK); process_msg_cb.Run(net::OK); } // Reader completes reading two packets asynchronously. -TEST_F(BlimpMessagePumpTest, AsyncTwoPacketsRead) { +TEST_F(BlimpMessagePumpTest, ReadTwoPackets) { net::CompletionCallback read_packet_cb; EXPECT_CALL(reader_, ReadPacket(NotNull(), _)) .WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()), - SaveArg<1>(&read_packet_cb), Return(net::ERR_IO_PENDING))) + SetBufferOffset<0>(message1_->ByteSize()), + SaveArg<1>(&read_packet_cb))) .WillOnce(DoAll(FillBufferFromMessage<0>(message2_.get()), - SaveArg<1>(&read_packet_cb), - Return(net::ERR_IO_PENDING))); + SetBufferOffset<0>(message2_->ByteSize()), + SaveArg<1>(&read_packet_cb))); net::CompletionCallback process_msg_cb; { InSequence s; @@ -113,77 +87,54 @@ TEST_F(BlimpMessagePumpTest, AsyncTwoPacketsRead) { EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message2_), _)); } message_pump_->SetMessageProcessor(&receiver_); - read_packet_cb.Run(message1_->ByteSize()); + ASSERT_FALSE(read_packet_cb.is_null()); + base::ResetAndReturn(&read_packet_cb).Run(net::OK); // Trigger next packet read process_msg_cb.Run(net::OK); - read_packet_cb.Run(message2_->ByteSize()); + ASSERT_FALSE(read_packet_cb.is_null()); + base::ResetAndReturn(&read_packet_cb).Run(net::OK); } // Reader completes reading two packets asynchronously. // The first read succeeds, and the second fails. -TEST_F(BlimpMessagePumpTest, AsyncTwoPacketsReadWithError) { +TEST_F(BlimpMessagePumpTest, ReadTwoPacketsWithError) { + net::CompletionCallback process_msg_cb; net::CompletionCallback read_packet_cb; EXPECT_CALL(reader_, ReadPacket(NotNull(), _)) .WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()), - SaveArg<1>(&read_packet_cb), Return(net::ERR_IO_PENDING))) + SetBufferOffset<0>(message1_->ByteSize()), + SaveArg<1>(&read_packet_cb))) .WillOnce(DoAll(FillBufferFromMessage<0>(message2_.get()), - SaveArg<1>(&read_packet_cb), - Return(net::ERR_IO_PENDING))); - net::CompletionCallback process_msg_cb; - { - InSequence s; - EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _)) - .WillOnce(SaveArg<1>(&process_msg_cb)); - EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED)); - } + SetBufferOffset<0>(message2_->ByteSize()), + SaveArg<1>(&read_packet_cb))); + EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _)) + .WillOnce(SaveArg<1>(&process_msg_cb)); + EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED)); + message_pump_->SetMessageProcessor(&receiver_); - read_packet_cb.Run(message1_->ByteSize()); + ASSERT_FALSE(read_packet_cb.is_null()); + base::ResetAndReturn(&read_packet_cb).Run(net::OK); // Trigger next packet read process_msg_cb.Run(net::OK); - read_packet_cb.Run(net::ERR_FAILED); + ASSERT_FALSE(read_packet_cb.is_null()); + base::ResetAndReturn(&read_packet_cb).Run(net::ERR_FAILED); } // Reader completes reading one packet synchronously, but packet is invalid TEST_F(BlimpMessagePumpTest, InvalidPacket) { + net::CompletionCallback read_packet_cb; std::string test_msg("msg"); EXPECT_CALL(reader_, ReadPacket(NotNull(), _)) - .WillOnce(DoAll(FillBufferFromString<0>(test_msg), Return(1))); + .WillOnce(DoAll(FillBufferFromString<0>(test_msg), + SetBufferOffset<0>(test_msg.size()), + SaveArg<1>(&read_packet_cb))); EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED)); - message_pump_->SetMessageProcessor(&receiver_); -} - -TEST_F(BlimpMessagePumpTest, ClearMessageProcessorAfterRead) { - net::CompletionCallback read_packet_cb; - EXPECT_CALL(reader_, ReadPacket(NotNull(), _)) - .WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()), - SaveArg<1>(&read_packet_cb), - Return(net::ERR_IO_PENDING))); - net::CompletionCallback process_msg_cb; - EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _)) - .WillOnce(SaveArg<1>(&process_msg_cb)); - - message_pump_->SetMessageProcessor(&receiver_); - base::ResetAndReturn(&read_packet_cb).Run(message1_->ByteSize()); - - message_pump_->SetMessageProcessor(nullptr); - - // Completing message processing will not trigger next packet read. - base::ResetAndReturn(&process_msg_cb).Run(net::OK); -} - -TEST_F(BlimpMessagePumpTest, ClearMessageProcessorDuringRead) { - net::CompletionCallback read_packet_cb; - EXPECT_CALL(reader_, ReadPacket(NotNull(), _)) - .WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()), - SaveArg<1>(&read_packet_cb), - Return(net::ERR_IO_PENDING))); - // Receiver will not get any message. message_pump_->SetMessageProcessor(&receiver_); - message_pump_->SetMessageProcessor(nullptr); - base::ResetAndReturn(&read_packet_cb).Run(message1_->ByteSize()); + ASSERT_FALSE(read_packet_cb.is_null()); + base::ResetAndReturn(&read_packet_cb).Run(net::OK); } } // namespace diff --git a/blimp/net/packet_reader.h b/blimp/net/packet_reader.h index 407e2fd..c000ab0 100644 --- a/blimp/net/packet_reader.h +++ b/blimp/net/packet_reader.h @@ -18,14 +18,13 @@ class BLIMP_NET_EXPORT PacketReader { virtual ~PacketReader() {} // Reads a packet from the connection. - // Returns the size of the packet, in bytes, if the read operation executed - // successfully. - // Returns ERR_IO_PENDING if the operation will be executed asynchronously. - // |cb| is later invoked with the packet size or an error code. - // All other return values indicate errors and caller should stop using this - // reader. - virtual int ReadPacket(const scoped_refptr<net::GrowableIOBuffer>& buf, - const net::CompletionCallback& cb) = 0; + // Passes net::OK to |cb| if the read operation executed successfully. + // Sets |buf.offset()| to the received message's size, and invokes |cb| with + // net::OK result on success. + // All other values indicate errors. + // |callback| will not be invoked if |this| is deleted. + virtual void ReadPacket(const scoped_refptr<net::GrowableIOBuffer>& buf, + const net::CompletionCallback& cb) = 0; }; } // namespace blimp diff --git a/blimp/net/packet_writer.h b/blimp/net/packet_writer.h index e5dbb9a..8c7b9ca 100644 --- a/blimp/net/packet_writer.h +++ b/blimp/net/packet_writer.h @@ -21,14 +21,11 @@ class BLIMP_NET_EXPORT PacketWriter { public: virtual ~PacketWriter() {} - // Writes a packet of at least one byte in size to a connection. - // - // Returns net::OK or an error code if the operation executed successfully. - // Returns ERR_IO_PENDING if the operation will be executed asynchronously. - // |cb| is later invoked with net::OK or an error code. - // An error code indicates that caller should stop using this writer. - virtual int WritePacket(scoped_refptr<net::DrainableIOBuffer> data, - const net::CompletionCallback& callback) = 0; + // Invokes |cb| with net::OK if the write operation executed successfully. + // All other values indicate unrecoverable errors. + // |callback| must not be invoked if |this| is deleted. + virtual void WritePacket(scoped_refptr<net::DrainableIOBuffer> data, + const net::CompletionCallback& callback) = 0; }; } // namespace blimp diff --git a/blimp/net/stream_packet_reader.cc b/blimp/net/stream_packet_reader.cc index 8da6130..e500748 100644 --- a/blimp/net/stream_packet_reader.cc +++ b/blimp/net/stream_packet_reader.cc @@ -9,6 +9,7 @@ #include "base/callback_helpers.h" #include "base/logging.h" #include "base/memory/weak_ptr.h" +#include "base/message_loop/message_loop.h" #include "base/sys_byteorder.h" #include "blimp/net/common.h" #include "net/base/io_buffer.h" @@ -42,7 +43,7 @@ StreamPacketReader::StreamPacketReader(net::StreamSocket* socket) StreamPacketReader::~StreamPacketReader() {} -int StreamPacketReader::ReadPacket( +void StreamPacketReader::ReadPacket( const scoped_refptr<net::GrowableIOBuffer>& buf, const net::CompletionCallback& callback) { DCHECK_EQ(ReadState::IDLE, read_state_); @@ -54,17 +55,17 @@ int StreamPacketReader::ReadPacket( read_state_ = ReadState::HEADER; int result = DoReadLoop(net::OK); - if (result == net::ERR_IO_PENDING) { - // Store the completion callback to invoke when read completes - // asynchronously. - callback_ = callback; - } else { + if (result != net::ERR_IO_PENDING) { // Release the payload buffer, since the read operation has completed // synchronously. payload_buffer_ = nullptr; - } - return result; + // Adapt synchronous completion to an asynchronous style. + base::MessageLoop::current()->PostTask(FROM_HERE, + base::Bind(callback, result)); + } else { + callback_ = callback; + } } int StreamPacketReader::DoReadLoop(int result) { @@ -134,7 +135,7 @@ int StreamPacketReader::DoReadPayload(int result) { // Finished reading the payload. read_state_ = ReadState::IDLE; - return payload_size_; + return net::OK; } void StreamPacketReader::OnReadComplete(int result) { diff --git a/blimp/net/stream_packet_reader.h b/blimp/net/stream_packet_reader.h index 4468638..21a6dab 100644 --- a/blimp/net/stream_packet_reader.h +++ b/blimp/net/stream_packet_reader.h @@ -33,8 +33,8 @@ class BLIMP_NET_EXPORT StreamPacketReader : public PacketReader { ~StreamPacketReader() override; // PacketReader implementation. - int ReadPacket(const scoped_refptr<net::GrowableIOBuffer>& buf, - const net::CompletionCallback& cb) override; + void ReadPacket(const scoped_refptr<net::GrowableIOBuffer>& buf, + const net::CompletionCallback& cb) override; private: enum class ReadState { diff --git a/blimp/net/stream_packet_reader_unittest.cc b/blimp/net/stream_packet_reader_unittest.cc index 61419cf..5777d4d 100644 --- a/blimp/net/stream_packet_reader_unittest.cc +++ b/blimp/net/stream_packet_reader_unittest.cc @@ -5,6 +5,7 @@ #include <stddef.h> #include <string> +#include "base/message_loop/message_loop.h" #include "base/sys_byteorder.h" #include "blimp/net/common.h" #include "blimp/net/stream_packet_reader.h" @@ -41,11 +42,10 @@ class StreamPacketReaderTest : public testing::Test { ~StreamPacketReaderTest() override {} - int ReadPacket() { - return data_reader_.ReadPacket(buffer_, callback_.callback()); - } + void ReadPacket() { data_reader_.ReadPacket(buffer_, callback_.callback()); } protected: + base::MessageLoop message_loop_; scoped_refptr<net::GrowableIOBuffer> buffer_; std::string test_msg_; net::TestCompletionCallback callback_; @@ -65,12 +65,11 @@ TEST_F(StreamPacketReaderTest, ReadAsyncHeaderAsyncPayload) { .WillOnce(DoAll(FillBufferFromString<0>(test_msg_), SaveArg<2>(&socket_cb), Return(net::ERR_IO_PENDING))); - EXPECT_EQ(net::ERR_IO_PENDING, ReadPacket()); + ReadPacket(); socket_cb.Run(kPacketHeaderSizeBytes); socket_cb.Run(test_msg_.size()); - int rv = callback_.WaitForResult(); - - EXPECT_EQ(rv, static_cast<int>(test_msg_.size())); + EXPECT_EQ(net::OK, callback_.WaitForResult()); + EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset()); EXPECT_TRUE(BufferStartsWith(buffer_.get(), test_msg_)); } @@ -89,13 +88,12 @@ TEST_F(StreamPacketReaderTest, ReadAsyncHeaderSyncPayload) { .WillOnce( DoAll(FillBufferFromString<0>(test_msg_), Return(test_msg_.size()))); - EXPECT_EQ(net::ERR_IO_PENDING, ReadPacket()); + ReadPacket(); EXPECT_FALSE(callback_.have_result()); socket_cb.Run(kPacketHeaderSizeBytes); - int rv = callback_.WaitForResult(); - EXPECT_GT(rv, 0); - EXPECT_EQ(rv, static_cast<int>(test_msg_.size())); + EXPECT_EQ(net::OK, callback_.WaitForResult()); + EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset()); EXPECT_TRUE(BufferStartsWith(buffer_.get(), test_msg_)); } @@ -110,12 +108,10 @@ TEST_F(StreamPacketReaderTest, ReadSyncHeaderAsyncPayload) { .WillOnce(DoAll(FillBufferFromString<0>(test_msg_), SaveArg<2>(&socket_cb), Return(net::ERR_IO_PENDING))); - EXPECT_EQ(net::ERR_IO_PENDING, ReadPacket()); + ReadPacket(); socket_cb.Run(test_msg_.size()); - int rv = callback_.WaitForResult(); - - EXPECT_GT(rv, 0); - EXPECT_EQ(rv, static_cast<int>(test_msg_.size())); + EXPECT_EQ(net::OK, callback_.WaitForResult()); + EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset()); EXPECT_TRUE(BufferStartsWith(buffer_.get(), test_msg_)); } @@ -130,12 +126,10 @@ TEST_F(StreamPacketReaderTest, ReadSyncHeaderSyncPayload) { .WillOnce( DoAll(FillBufferFromString<0>(test_msg_), Return(test_msg_.size()))); - int rv = ReadPacket(); - - EXPECT_GT(rv, 0); - EXPECT_EQ(rv, static_cast<int>(test_msg_.size())); + ReadPacket(); + EXPECT_EQ(net::OK, callback_.WaitForResult()); + EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset()); EXPECT_TRUE(BufferStartsWith(buffer_.get(), test_msg_)); - EXPECT_FALSE(callback_.have_result()); } // Successful read of 2 messages, header and payload reads all completing @@ -168,13 +162,14 @@ TEST_F(StreamPacketReaderTest, ReadMultipleMessagesSync) { DoAll(FillBufferFromString<0>(test_msg2), Return(test_msg2.size()))) .RetiresOnSaturation(); - int rv = ReadPacket(); + ReadPacket(); + EXPECT_EQ(net::OK, callback_.WaitForResult()); + EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset()); + + ReadPacket(); + EXPECT_EQ(net::OK, callback_.WaitForResult()); + EXPECT_EQ(static_cast<int>(test_msg2.size()), buffer_->offset()); - EXPECT_GT(rv, 0); - EXPECT_EQ(rv, static_cast<int>(test_msg_.size())); - rv = ReadPacket(); - EXPECT_GT(rv, 0); - EXPECT_EQ(rv, static_cast<int>(test_msg2.size())); EXPECT_TRUE(BufferStartsWith(buffer_.get(), test_msg_)); EXPECT_FALSE(callback_.have_result()); } @@ -210,23 +205,17 @@ TEST_F(StreamPacketReaderTest, ReadMultipleMessagesAsync) { SaveArg<2>(&socket_cb), Return(net::ERR_IO_PENDING))) .RetiresOnSaturation(); - EXPECT_EQ(net::ERR_IO_PENDING, - data_reader_.ReadPacket(buffer_, read_cb1.callback())); + data_reader_.ReadPacket(buffer_, read_cb1.callback()); socket_cb.Run(kPacketHeaderSizeBytes); socket_cb.Run(test_msg_.size()); - int rv = read_cb1.WaitForResult(); + EXPECT_EQ(net::OK, read_cb1.WaitForResult()); + EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset()); - EXPECT_GT(rv, 0); - EXPECT_EQ(rv, static_cast<int>(test_msg_.size())); - - EXPECT_EQ(net::ERR_IO_PENDING, - data_reader_.ReadPacket(buffer_, read_cb2.callback())); + data_reader_.ReadPacket(buffer_, read_cb2.callback()); socket_cb.Run(kPacketHeaderSizeBytes); socket_cb.Run(test_msg_.size()); - rv = read_cb2.WaitForResult(); - - EXPECT_GT(rv, 0); - EXPECT_EQ(rv, static_cast<int>(test_msg_.size())); + EXPECT_EQ(net::OK, read_cb2.WaitForResult()); + EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset()); EXPECT_TRUE(BufferStartsWith(buffer_.get(), test_msg_)); } @@ -250,7 +239,7 @@ TEST_F(StreamPacketReaderTest, PartialHeaderReadAsync) { EXPECT_CALL(socket_, Read(NotNull(), test_msg_.size(), _)) .WillOnce(Return(net::ERR_IO_PENDING)); - EXPECT_EQ(net::ERR_IO_PENDING, ReadPacket()); + ReadPacket(); cb.Run(1); cb.Run(kPacketHeaderSizeBytes - 1); } @@ -268,7 +257,7 @@ TEST_F(StreamPacketReaderTest, PartialPayloadReadAsync) { EXPECT_CALL(socket_, Read(NotNull(), test_msg_.size(), _)) .WillOnce(DoAll(FillBufferFromString<0>(test_msg_.substr(0, 1)), SaveArg<2>(&cb), Return(net::ERR_IO_PENDING))); - EXPECT_EQ(net::ERR_IO_PENDING, ReadPacket()); + ReadPacket(); EXPECT_CALL(socket_, Read(NotNull(), test_msg_.size() - 1, _)) .WillOnce(DoAll( FillBufferFromString<0>(test_msg_.substr(1, test_msg_.size() - 1)), @@ -276,10 +265,8 @@ TEST_F(StreamPacketReaderTest, PartialPayloadReadAsync) { cb.Run(1); cb.Run(test_msg_.size() - 1); - int rv = callback_.WaitForResult(); - - EXPECT_GT(rv, 0); - EXPECT_EQ(static_cast<int>(test_msg_.size()), rv); + EXPECT_EQ(net::OK, callback_.WaitForResult()); + EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset()); } // Verify that synchronous header read errors are reported correctly. @@ -287,7 +274,8 @@ TEST_F(StreamPacketReaderTest, ReadHeaderErrorSync) { net::CompletionCallback cb; EXPECT_CALL(socket_, Read(NotNull(), kPacketHeaderSizeBytes, _)) .WillOnce(Return(net::ERR_FAILED)); - EXPECT_EQ(net::ERR_FAILED, ReadPacket()); + ReadPacket(); + EXPECT_EQ(net::ERR_FAILED, callback_.WaitForResult()); } // Verify that synchronous payload read errors are reported correctly. @@ -300,7 +288,8 @@ TEST_F(StreamPacketReaderTest, ReadPayloadErrorSync) { EXPECT_CALL(socket_, Read(NotNull(), test_msg_.size(), _)) .WillOnce(Return(net::ERR_FAILED)); - EXPECT_EQ(net::ERR_FAILED, ReadPacket()); + ReadPacket(); + EXPECT_EQ(net::ERR_FAILED, callback_.WaitForResult()); } // Verify that async header read errors are reported correctly. @@ -312,7 +301,7 @@ TEST_F(StreamPacketReaderTest, ReadHeaderErrorAsync) { .WillOnce(DoAll(FillBufferFromString<0>(EncodeHeader(test_msg_.size())), SaveArg<2>(&cb), Return(net::ERR_IO_PENDING))); - EXPECT_EQ(net::ERR_IO_PENDING, ReadPacket()); + ReadPacket(); cb.Run(net::ERR_FAILED); EXPECT_EQ(net::ERR_FAILED, callback_.WaitForResult()); } @@ -327,7 +316,7 @@ TEST_F(StreamPacketReaderTest, ReadPayloadErrorAsync) { EXPECT_CALL(socket_, Read(NotNull(), test_msg_.size(), _)) .WillOnce(DoAll(SaveArg<2>(&cb), Return(net::ERR_IO_PENDING))); - EXPECT_EQ(net::ERR_IO_PENDING, ReadPacket()); + ReadPacket(); cb.Run(net::ERR_FAILED); EXPECT_EQ(net::ERR_FAILED, callback_.WaitForResult()); } @@ -343,8 +332,7 @@ TEST_F(StreamPacketReaderTest, ReaderDeletedDuringAsyncHeaderRead) { .WillOnce(DoAll(FillBufferFromString<0>(EncodeHeader(test_msg_.size())), SaveArg<2>(&cb), Return(net::ERR_IO_PENDING))); - EXPECT_EQ(net::ERR_IO_PENDING, - reader->ReadPacket(buffer_, callback_.callback())); + reader->ReadPacket(buffer_, callback_.callback()); reader.reset(); // Delete the reader object. cb.Run(kPacketHeaderSizeBytes); // Complete the socket operation. } @@ -360,8 +348,7 @@ TEST_F(StreamPacketReaderTest, ReaderDeletedDuringAsyncPayloadRead) { Return(kPacketHeaderSizeBytes))); EXPECT_CALL(socket_, Read(NotNull(), test_msg_.size(), _)) .WillOnce(DoAll(SaveArg<2>(&cb), Return(net::ERR_IO_PENDING))); - EXPECT_EQ(net::ERR_IO_PENDING, - reader->ReadPacket(buffer_, callback_.callback())); + reader->ReadPacket(buffer_, callback_.callback()); reader.reset(); // Delete the reader object. cb.Run(net::ERR_FAILED); // Complete the socket operation. @@ -374,7 +361,8 @@ TEST_F(StreamPacketReaderTest, ReadWhatIsThisAPacketForAnts) { Return(kPacketHeaderSizeBytes))) .RetiresOnSaturation(); - EXPECT_EQ(net::ERR_INVALID_RESPONSE, ReadPacket()); + ReadPacket(); + EXPECT_EQ(net::ERR_INVALID_RESPONSE, callback_.WaitForResult()); } // Verify that an illegally large payloads is reported as an erroneous inputs. @@ -384,7 +372,8 @@ TEST_F(StreamPacketReaderTest, ReadErrorIllegallyLargePayload) { DoAll(FillBufferFromString<0>(EncodeHeader(kTestMaxBufferSize + 1)), Return(kPacketHeaderSizeBytes))); - EXPECT_EQ(net::ERR_INVALID_RESPONSE, ReadPacket()); + ReadPacket(); + EXPECT_EQ(net::ERR_INVALID_RESPONSE, callback_.WaitForResult()); } } // namespace diff --git a/blimp/net/stream_packet_writer.cc b/blimp/net/stream_packet_writer.cc index 221cdd1..943f9a9 100644 --- a/blimp/net/stream_packet_writer.cc +++ b/blimp/net/stream_packet_writer.cc @@ -47,15 +47,11 @@ StreamPacketWriter::StreamPacketWriter(net::StreamSocket* socket) StreamPacketWriter::~StreamPacketWriter() {} -int StreamPacketWriter::WritePacket(scoped_refptr<net::DrainableIOBuffer> data, - const net::CompletionCallback& callback) { +void StreamPacketWriter::WritePacket(scoped_refptr<net::DrainableIOBuffer> data, + const net::CompletionCallback& callback) { DCHECK_EQ(WriteState::IDLE, write_state_); DCHECK(data); - if (data->BytesRemaining() == 0) { - // The packet is empty; your argument is invalid. - DLOG(ERROR) << "Attempted to write zero-length packet."; - return net::ERR_INVALID_ARGUMENT; - } + CHECK(data->BytesRemaining()); write_state_ = WriteState::HEADER; header_buffer_->SetOffset(0); @@ -63,18 +59,18 @@ int StreamPacketWriter::WritePacket(scoped_refptr<net::DrainableIOBuffer> data, base::HostToNet32(data->BytesRemaining()); payload_buffer_ = data; - int result = DoWriteLoop(false); - if (result == net::ERR_IO_PENDING) { - // Store the completion callback to invoke when DoWriteLoop completes - // asynchronously. - callback_ = callback; - } else { + int result = DoWriteLoop(net::OK); + if (result != net::ERR_IO_PENDING) { // Release the payload buffer, since the write operation has completed // synchronously. payload_buffer_ = nullptr; - } - return result; + // Adapt synchronous completion to an asynchronous style. + base::MessageLoop::current()->PostTask(FROM_HERE, + base::Bind(callback, result)); + } else { + callback_ = callback; + } } int StreamPacketWriter::DoWriteLoop(int result) { diff --git a/blimp/net/stream_packet_writer.h b/blimp/net/stream_packet_writer.h index c618eba..071a8f0 100644 --- a/blimp/net/stream_packet_writer.h +++ b/blimp/net/stream_packet_writer.h @@ -35,8 +35,8 @@ class BLIMP_NET_EXPORT StreamPacketWriter : public PacketWriter { ~StreamPacketWriter() override; // PacketWriter implementation. - int WritePacket(scoped_refptr<net::DrainableIOBuffer> data, - const net::CompletionCallback& callback) override; + void WritePacket(scoped_refptr<net::DrainableIOBuffer> data, + const net::CompletionCallback& callback) override; private: enum class WriteState { diff --git a/blimp/net/stream_packet_writer_unittest.cc b/blimp/net/stream_packet_writer_unittest.cc index c8014f9..3212e928 100644 --- a/blimp/net/stream_packet_writer_unittest.cc +++ b/blimp/net/stream_packet_writer_unittest.cc @@ -35,15 +35,13 @@ class StreamPacketWriterTest : public testing::Test { test_data_str_.size())), message_writer_(&socket_) {} - ~StreamPacketWriterTest() override {} - protected: const std::string test_data_str_ = "U WOT M8"; scoped_refptr<net::DrainableIOBuffer> test_data_; + base::MessageLoop message_loop_; MockStreamSocket socket_; StreamPacketWriter message_writer_; - base::MessageLoop message_loop_; testing::InSequence mock_sequence_; private: @@ -56,20 +54,14 @@ TEST_F(StreamPacketWriterTest, TestWriteAsync) { net::CompletionCallback header_cb; net::CompletionCallback payload_cb; - // Write header. EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())), kPacketHeaderSizeBytes, _)) .WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING))); - EXPECT_EQ(net::ERR_IO_PENDING, - message_writer_.WritePacket(test_data_, writer_cb.callback())); - Mock::VerifyAndClearExpectations(&socket_); - - // Write payload. + message_writer_.WritePacket(test_data_, writer_cb.callback()); EXPECT_CALL(socket_, Write(BufferEquals(test_data_str_), test_data_str_.size(), _)) .WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING))); header_cb.Run(kPacketHeaderSizeBytes); - Mock::VerifyAndClearExpectations(&socket_); payload_cb.Run(test_data_str_.size()); EXPECT_EQ(net::OK, writer_cb.WaitForResult()); @@ -100,8 +92,7 @@ TEST_F(StreamPacketWriterTest, TestPartialWriteAsync) { .WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING))) .RetiresOnSaturation(); - EXPECT_EQ(net::ERR_IO_PENDING, - message_writer_.WritePacket(test_data_, writer_cb.callback())); + message_writer_.WritePacket(test_data_, writer_cb.callback()); // Header is written - first one byte, then the remainder. header_cb.Run(1); @@ -127,8 +118,7 @@ TEST_F(StreamPacketWriterTest, TestWriteErrorAsync) { Write(BufferEquals(test_data_str_), test_data_str_.size(), _)) .WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING))); - EXPECT_EQ(net::ERR_IO_PENDING, - message_writer_.WritePacket(test_data_, writer_cb.callback())); + message_writer_.WritePacket(test_data_, writer_cb.callback()); header_cb.Run(kPacketHeaderSizeBytes); payload_cb.Run(net::ERR_CONNECTION_RESET); @@ -138,15 +128,16 @@ TEST_F(StreamPacketWriterTest, TestWriteErrorAsync) { // Successful write with 1 sync header write and 1 sync payload write. TEST_F(StreamPacketWriterTest, TestWriteSync) { net::TestCompletionCallback writer_cb; + EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())), kPacketHeaderSizeBytes, _)) .WillOnce(Return(kPacketHeaderSizeBytes)); EXPECT_CALL(socket_, Write(BufferEquals(test_data_str_), test_data_str_.size(), _)) .WillOnce(Return(test_data_str_.size())); - EXPECT_EQ(net::OK, - message_writer_.WritePacket(test_data_, writer_cb.callback())); - EXPECT_FALSE(writer_cb.have_result()); + + message_writer_.WritePacket(test_data_, writer_cb.callback()); + EXPECT_EQ(net::OK, writer_cb.WaitForResult()); } // Successful write with 2 sync header writes and 2 sync payload writes. @@ -167,21 +158,8 @@ TEST_F(StreamPacketWriterTest, TestPartialWriteSync) { payload.size() - 1, _)) .WillOnce(Return(payload.size() - 1)); - EXPECT_EQ(net::OK, - message_writer_.WritePacket(test_data_, writer_cb.callback())); - EXPECT_FALSE(writer_cb.have_result()); -} - -// Verify that zero-length packets are rejected. -TEST_F(StreamPacketWriterTest, TestZeroLengthPacketsRejected) { - net::TestCompletionCallback writer_cb; - - EXPECT_EQ(net::ERR_INVALID_ARGUMENT, - message_writer_.WritePacket( - new net::DrainableIOBuffer(new net::IOBuffer(0), 0), - writer_cb.callback())); - - EXPECT_FALSE(writer_cb.have_result()); + message_writer_.WritePacket(test_data_, writer_cb.callback()); + EXPECT_EQ(net::OK, writer_cb.WaitForResult()); } // Sync socket error while writing header data. @@ -192,9 +170,8 @@ TEST_F(StreamPacketWriterTest, TestWriteHeaderErrorSync) { kPacketHeaderSizeBytes, _)) .WillOnce(Return(net::ERR_FAILED)); - EXPECT_EQ(net::ERR_FAILED, - message_writer_.WritePacket(test_data_, writer_cb.callback())); - + message_writer_.WritePacket(test_data_, writer_cb.callback()); + EXPECT_EQ(net::ERR_FAILED, writer_cb.WaitForResult()); EXPECT_EQ(net::ERR_EMPTY_RESPONSE, writer_cb.GetResult(net::ERR_EMPTY_RESPONSE)); EXPECT_FALSE(writer_cb.have_result()); @@ -211,9 +188,8 @@ TEST_F(StreamPacketWriterTest, TestWritePayloadErrorSync) { Write(BufferEquals(test_data_str_), test_data_str_.size(), _)) .WillOnce(Return(net::ERR_FAILED)); - EXPECT_EQ(net::ERR_FAILED, - message_writer_.WritePacket(test_data_, writer_cb.callback())); - EXPECT_FALSE(writer_cb.have_result()); + message_writer_.WritePacket(test_data_, writer_cb.callback()); + EXPECT_EQ(net::ERR_FAILED, writer_cb.WaitForResult()); } // Verify that asynchronous header write completions don't cause a @@ -228,8 +204,7 @@ TEST_F(StreamPacketWriterTest, DeletedDuringHeaderWrite) { EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())), kPacketHeaderSizeBytes, _)) .WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING))); - EXPECT_EQ(net::ERR_IO_PENDING, - writer->WritePacket(test_data_, writer_cb.callback())); + writer->WritePacket(test_data_, writer_cb.callback()); Mock::VerifyAndClearExpectations(&socket_); // Header write completion callback is invoked after the writer died. @@ -252,8 +227,7 @@ TEST_F(StreamPacketWriterTest, DeletedDuringPayloadWrite) { Write(BufferEquals(test_data_str_), test_data_str_.size(), _)) .WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING))); - EXPECT_EQ(net::ERR_IO_PENDING, - writer->WritePacket(test_data_, writer_cb.callback())); + writer->WritePacket(test_data_, writer_cb.callback()); // Header write completes successfully. header_cb.Run(kPacketHeaderSizeBytes); diff --git a/blimp/net/test_common.h b/blimp/net/test_common.h index be66ab2..58ee2f6 100644 --- a/blimp/net/test_common.h +++ b/blimp/net/test_common.h @@ -35,7 +35,6 @@ MATCHER_P(BufferEquals, expected, "") { } // Checks if two proto messages are the same. -// TODO(kmarshall): promote to a shared testing library. MATCHER_P(EqualsProto, message, "") { std::string expected_serialized; std::string actual_serialized; @@ -49,7 +48,7 @@ MATCHER_P(EqualsProto, message, "") { // message (type: BlimpMessage) The message to compare with |arg|. MATCHER_P(BufferEqualsProto, message, "") { BlimpMessage actual_message; - actual_message.ParseFromArray(arg->data(), arg->BytesRemaining()); + actual_message.ParseFromArray(arg->data(), message.ByteSize()); std::string expected_serialized; std::string actual_serialized; message.SerializeToString(&expected_serialized); @@ -71,7 +70,8 @@ ACTION_TEMPLATE(FillBufferFromString, // Behavior is undefined if len(buf) < len(str). bool BufferStartsWith(net::GrowableIOBuffer* buf, const std::string& str); -// GMock action that writes data from a blimp message to an IOBuffer . +// GMock action that writes data from a BlimpMessage to a GrowableIOBuffer. +// Advances the buffer's |offset| to the end of the message. // // buf_idx (template parameter 0): 0-based index of the IOBuffer arg. // message: the blimp message containing data to be written to the IOBuffer @@ -80,6 +80,14 @@ ACTION_TEMPLATE(FillBufferFromMessage, AND_1_VALUE_PARAMS(message)) { message->SerializeToArray(testing::get<buf_idx>(args)->data(), message->ByteSize()); + testing::get<buf_idx>(args)->set_offset(message->ByteSize()); +} + +// Calls |set_offset()| for a GrowableIOBuffer. +ACTION_TEMPLATE(SetBufferOffset, + HAS_1_TEMPLATE_PARAMS(int, buf_idx), + AND_1_VALUE_PARAMS(offset)) { + testing::get<buf_idx>(args)->set_offset(offset); } // Formats a string-based representation of a BlimpMessage header. @@ -143,8 +151,8 @@ class MockPacketReader : public PacketReader { ~MockPacketReader() override; MOCK_METHOD2(ReadPacket, - int(const scoped_refptr<net::GrowableIOBuffer>&, - const net::CompletionCallback&)); + void(const scoped_refptr<net::GrowableIOBuffer>&, + const net::CompletionCallback&)); }; class MockPacketWriter : public PacketWriter { @@ -153,8 +161,8 @@ class MockPacketWriter : public PacketWriter { ~MockPacketWriter() override; MOCK_METHOD2(WritePacket, - int(scoped_refptr<net::DrainableIOBuffer>, - const net::CompletionCallback&)); + void(scoped_refptr<net::DrainableIOBuffer>, + const net::CompletionCallback&)); }; class MockConnectionErrorObserver : public ConnectionErrorObserver { |