summaryrefslogtreecommitdiffstats
path: root/blimp/net
diff options
context:
space:
mode:
authorkmarshall <kmarshall@chromium.org>2015-12-02 16:27:30 -0800
committerCommit bot <commit-bot@chromium.org>2015-12-03 00:30:56 +0000
commit840a0fe2a4cab8709b5d29b89cdb968b620953ff (patch)
treee3f6703a0e5e7cbc31b90e1575ab8a4eee30ba7d /blimp/net
parenta18e826a670891d5804c8aced282761834741e8b (diff)
downloadchromium_src-840a0fe2a4cab8709b5d29b89cdb968b620953ff.zip
chromium_src-840a0fe2a4cab8709b5d29b89cdb968b620953ff.tar.gz
chromium_src-840a0fe2a4cab8709b5d29b89cdb968b620953ff.tar.bz2
Make PacketReader/PacketWriter interfaces async-only.
Adapts synchronous I/O completion to use callbacks, so downstream callers are insulated from having to manage the complexities of mixed sync and async I/O. Updated logic of all PacketReader/PacketWriter dependents. R=haibinlu@chromium.org,wez@chromium.org BUG=558643 Review URL: https://codereview.chromium.org/1452823011 Cr-Commit-Position: refs/heads/master@{#362833}
Diffstat (limited to 'blimp/net')
-rw-r--r--blimp/net/blimp_connection.cc12
-rw-r--r--blimp/net/blimp_connection_unittest.cc105
-rw-r--r--blimp/net/blimp_message_pump.cc40
-rw-r--r--blimp/net/blimp_message_pump.h15
-rw-r--r--blimp/net/blimp_message_pump_unittest.cc121
-rw-r--r--blimp/net/packet_reader.h15
-rw-r--r--blimp/net/packet_writer.h13
-rw-r--r--blimp/net/stream_packet_reader.cc19
-rw-r--r--blimp/net/stream_packet_reader.h4
-rw-r--r--blimp/net/stream_packet_reader_unittest.cc99
-rw-r--r--blimp/net/stream_packet_writer.cc26
-rw-r--r--blimp/net/stream_packet_writer.h4
-rw-r--r--blimp/net/stream_packet_writer_unittest.cc58
-rw-r--r--blimp/net/test_common.h22
14 files changed, 190 insertions, 363 deletions
diff --git a/blimp/net/blimp_connection.cc b/blimp/net/blimp_connection.cc
index a79ffe0..c5b0700 100644
--- a/blimp/net/blimp_connection.cc
+++ b/blimp/net/blimp_connection.cc
@@ -40,7 +40,6 @@ class BlimpMessageSender : public BlimpMessageProcessor {
PacketWriter* writer_;
ConnectionErrorObserver* error_observer_;
scoped_refptr<net::DrainableIOBuffer> buffer_;
- net::CancelableCompletionCallback write_packet_callback_;
net::CompletionCallback pending_process_msg_callback_;
DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender);
@@ -72,19 +71,14 @@ void BlimpMessageSender::ProcessMessage(
return;
}
- write_packet_callback_.Reset(base::Bind(
- &BlimpMessageSender::OnWritePacketComplete, base::Unretained(this)));
pending_process_msg_callback_ = callback;
- int result = writer_->WritePacket(buffer_, write_packet_callback_.callback());
- if (result != net::ERR_IO_PENDING) {
- base::MessageLoop::current()->PostTask(
- FROM_HERE, base::Bind(write_packet_callback_.callback(), result));
- }
+ writer_->WritePacket(buffer_,
+ base::Bind(&BlimpMessageSender::OnWritePacketComplete,
+ base::Unretained(this)));
}
void BlimpMessageSender::OnWritePacketComplete(int result) {
DCHECK_NE(net::ERR_IO_PENDING, result);
- write_packet_callback_.Cancel();
base::ResetAndReturn(&pending_process_msg_callback_).Run(result);
if (result != net::OK) {
error_observer_->OnConnectionError(result);
diff --git a/blimp/net/blimp_connection_unittest.cc b/blimp/net/blimp_connection_unittest.cc
index 66d0658..759b426 100644
--- a/blimp/net/blimp_connection_unittest.cc
+++ b/blimp/net/blimp_connection_unittest.cc
@@ -20,9 +20,7 @@
#include "testing/gtest/include/gtest/gtest.h"
using testing::_;
-using testing::DoAll;
using testing::InSequence;
-using testing::NotNull;
using testing::Return;
using testing::SaveArg;
@@ -32,21 +30,17 @@ namespace {
class BlimpConnectionTest : public testing::Test {
public:
BlimpConnectionTest() {
- message1_ = CreateInputMessage();
- message2_ = CreateControlMessage();
- scoped_ptr<testing::StrictMock<MockPacketReader>> reader(
- new testing::StrictMock<MockPacketReader>);
- reader_ = reader.get();
scoped_ptr<testing::StrictMock<MockPacketWriter>> writer(
new testing::StrictMock<MockPacketWriter>);
writer_ = writer.get();
- connection_.reset(new BlimpConnection(std::move(reader),
+ connection_.reset(new BlimpConnection(make_scoped_ptr(new MockPacketReader),
std::move(writer)));
connection_->SetConnectionErrorObserver(&error_observer_);
}
~BlimpConnectionTest() override {}
+ protected:
scoped_ptr<BlimpMessage> CreateInputMessage() {
scoped_ptr<BlimpMessage> msg(new BlimpMessage);
msg->set_type(BlimpMessage::INPUT);
@@ -59,93 +53,25 @@ class BlimpConnectionTest : public testing::Test {
return msg;
}
- protected:
- base::MessageLoopForIO message_loop_;
- scoped_ptr<BlimpMessage> message1_;
- scoped_ptr<BlimpMessage> message2_;
-
- testing::StrictMock<MockPacketReader>* reader_;
+ base::MessageLoop message_loop_;
testing::StrictMock<MockPacketWriter>* writer_;
testing::StrictMock<MockConnectionErrorObserver> error_observer_;
testing::StrictMock<MockBlimpMessageProcessor> receiver_;
scoped_ptr<BlimpConnection> connection_;
};
-// Reader completes reading one packet synchronously.
-// Two read cases here. BlimpMessagePumpTest covers other cases.
-TEST_F(BlimpConnectionTest, SyncPacketRead) {
- EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _));
- EXPECT_CALL(*reader_, ReadPacket(NotNull(), _))
- .WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()),
- Return(message1_->ByteSize())));
- connection_->SetIncomingMessageProcessor(&receiver_);
-}
-
-// Reader completes reading one packet synchronously withe error.
-TEST_F(BlimpConnectionTest, SyncPacketReadWithError) {
- InSequence s;
- EXPECT_CALL(*reader_, ReadPacket(NotNull(), _))
- .WillOnce(Return(net::ERR_FAILED));
- EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED));
- connection_->SetIncomingMessageProcessor(&receiver_);
-}
-
-// Writer completes writing two packets synchronously.
-TEST_F(BlimpConnectionTest, SyncTwoPacketsWrite) {
- InSequence s;
- EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _))
- .WillOnce(Return(net::OK))
- .RetiresOnSaturation();
- EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _))
- .WillOnce(Return(net::OK))
- .RetiresOnSaturation();
-
- BlimpMessageProcessor* sender = connection_->GetOutgoingMessageProcessor();
- net::TestCompletionCallback complete_cb_1;
- sender->ProcessMessage(CreateInputMessage(),
- complete_cb_1.callback());
- EXPECT_EQ(net::OK, complete_cb_1.WaitForResult());
- net::TestCompletionCallback complete_cb_2;
- sender->ProcessMessage(CreateControlMessage(),
- complete_cb_2.callback());
- EXPECT_EQ(net::OK, complete_cb_2.WaitForResult());
-}
-
-// Writer completes writing two packets synchronously.
-// First write succeeds, second fails.
-TEST_F(BlimpConnectionTest, SyncTwoPacketsWriteWithError) {
- InSequence s;
- EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _))
- .WillOnce(Return(net::OK))
- .RetiresOnSaturation();
- EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _))
- .WillOnce(Return(net::ERR_FAILED))
- .RetiresOnSaturation();
- EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED));
-
- BlimpMessageProcessor* sender = connection_->GetOutgoingMessageProcessor();
- net::TestCompletionCallback complete_cb_1;
- sender->ProcessMessage(CreateInputMessage(),
- complete_cb_1.callback());
- EXPECT_EQ(net::OK, complete_cb_1.WaitForResult());
- net::TestCompletionCallback complete_cb_2;
- sender->ProcessMessage(CreateControlMessage(),
- complete_cb_2.callback());
- EXPECT_EQ(net::ERR_FAILED, complete_cb_2.WaitForResult());
-}
-
// Write completes writing two packets asynchronously.
TEST_F(BlimpConnectionTest, AsyncTwoPacketsWrite) {
net::CompletionCallback write_packet_cb;
InSequence s;
- EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _))
- .WillOnce(
- DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING)))
+ EXPECT_CALL(*writer_,
+ WritePacket(BufferEqualsProto(*CreateInputMessage()), _))
+ .WillOnce(SaveArg<1>(&write_packet_cb))
.RetiresOnSaturation();
- EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _))
- .WillOnce(
- DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING)))
+ EXPECT_CALL(*writer_,
+ WritePacket(BufferEqualsProto(*CreateControlMessage()), _))
+ .WillOnce(SaveArg<1>(&write_packet_cb))
.RetiresOnSaturation();
BlimpMessageProcessor* sender = connection_->GetOutgoingMessageProcessor();
@@ -172,13 +98,13 @@ TEST_F(BlimpConnectionTest, AsyncTwoPacketsWriteWithError) {
net::CompletionCallback write_packet_cb;
InSequence s;
- EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _))
- .WillOnce(
- DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING)))
+ EXPECT_CALL(*writer_,
+ WritePacket(BufferEqualsProto(*CreateInputMessage()), _))
+ .WillOnce(SaveArg<1>(&write_packet_cb))
.RetiresOnSaturation();
- EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _))
- .WillOnce(
- DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING)))
+ EXPECT_CALL(*writer_,
+ WritePacket(BufferEqualsProto(*CreateControlMessage()), _))
+ .WillOnce(SaveArg<1>(&write_packet_cb))
.RetiresOnSaturation();
EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED));
@@ -197,5 +123,4 @@ TEST_F(BlimpConnectionTest, AsyncTwoPacketsWriteWithError) {
}
} // namespace
-
} // namespace blimp
diff --git a/blimp/net/blimp_message_pump.cc b/blimp/net/blimp_message_pump.cc
index d9a250a..79082e0 100644
--- a/blimp/net/blimp_message_pump.cc
+++ b/blimp/net/blimp_message_pump.cc
@@ -21,58 +21,46 @@ BlimpMessagePump::BlimpMessagePump(PacketReader* reader)
processor_(nullptr),
buffer_(new net::GrowableIOBuffer) {
DCHECK(reader_);
+ buffer_->SetCapacity(kMaxPacketPayloadSizeBytes);
}
BlimpMessagePump::~BlimpMessagePump() {}
void BlimpMessagePump::SetMessageProcessor(BlimpMessageProcessor* processor) {
- process_msg_callback_.Cancel();
+ DCHECK(!processor_);
processor_ = processor;
- if (!processor_) {
- read_packet_callback_.Cancel();
- } else if (read_packet_callback_.IsCancelled()) {
- buffer_->SetCapacity(kMaxPacketPayloadSizeBytes);
- ReadNextPacket();
- }
+ ReadNextPacket();
}
void BlimpMessagePump::ReadNextPacket() {
DCHECK(processor_);
buffer_->set_offset(0);
- read_packet_callback_.Reset(base::Bind(
- &BlimpMessagePump::OnReadPacketComplete, base::Unretained(this)));
- int result =
- reader_->ReadPacket(buffer_.get(), read_packet_callback_.callback());
- if (result != net::ERR_IO_PENDING) {
- // Read completed synchronously.
- OnReadPacketComplete(result);
- }
+ read_callback_.Reset(base::Bind(&BlimpMessagePump::OnReadPacketComplete,
+ base::Unretained(this)));
+ reader_->ReadPacket(buffer_.get(), read_callback_.callback());
}
void BlimpMessagePump::OnReadPacketComplete(int result) {
- read_packet_callback_.Cancel();
- if (result > 0) {
- // The result is the size of the packet in bytes.
+ if (result == net::OK) {
scoped_ptr<BlimpMessage> message(new BlimpMessage);
- bool parse_result =
- message->ParseFromArray(buffer_->StartOfBuffer(), result);
- if (parse_result) {
+ if (message->ParseFromArray(buffer_->StartOfBuffer(), buffer_->offset())) {
process_msg_callback_.Reset(base::Bind(
&BlimpMessagePump::OnProcessMessageComplete, base::Unretained(this)));
processor_->ProcessMessage(std::move(message),
process_msg_callback_.callback());
- return;
+ } else {
+ result = net::ERR_FAILED;
}
- result = net::ERR_FAILED;
}
- if (error_observer_)
+
+ if (result != net::OK) {
error_observer_->OnConnectionError(result);
+ }
}
void BlimpMessagePump::OnProcessMessageComplete(int result) {
// No error is expected from the message receiver.
- DCHECK_EQ(result, net::OK);
- process_msg_callback_.Cancel();
+ DCHECK_EQ(net::OK, result);
ReadNextPacket();
}
diff --git a/blimp/net/blimp_message_pump.h b/blimp/net/blimp_message_pump.h
index e8fc0aa..d86b750 100644
--- a/blimp/net/blimp_message_pump.h
+++ b/blimp/net/blimp_message_pump.h
@@ -26,13 +26,12 @@ class PacketReader;
// message, the BlimpMessagePump reads the next packet.
class BLIMP_NET_EXPORT BlimpMessagePump {
public:
- // Caller ensures |reader| outlive this object.
+ // Caller ensures that |reader| outlives this object.
explicit BlimpMessagePump(PacketReader* reader);
~BlimpMessagePump();
- // Sets the processor which will take blimp messages.
- // Can be set multiple times, but previously set processors are discarded.
+ // Sets the processor which will take BlimpMessages. Can only be set once.
// Caller retains the ownership of |processor|.
void SetMessageProcessor(BlimpMessageProcessor* processor);
@@ -47,16 +46,22 @@ class BLIMP_NET_EXPORT BlimpMessagePump {
// Callback when next packet is ready in |buffer_|.
void OnReadPacketComplete(int result);
- // Callback when |processor_| finishes processing a blimp message.
+ // Callback when |processor_| finishes processing a BlimpMessage.
void OnProcessMessageComplete(int result);
PacketReader* reader_;
ConnectionErrorObserver* error_observer_;
BlimpMessageProcessor* processor_;
scoped_refptr<net::GrowableIOBuffer> buffer_;
- net::CancelableCompletionCallback read_packet_callback_;
+
+ // Cancelled in the event that the connection is destroyed (along with
+ // |this|) while a inflight callback is held by |processor_|.
net::CancelableCompletionCallback process_msg_callback_;
+ // Cancelled to guard against |this| being called back from a completed read
+ // operation.
+ net::CancelableCompletionCallback read_callback_;
+
DISALLOW_COPY_AND_ASSIGN(BlimpMessagePump);
};
diff --git a/blimp/net/blimp_message_pump_unittest.cc b/blimp/net/blimp_message_pump_unittest.cc
index 7f9ab11..20760af 100644
--- a/blimp/net/blimp_message_pump_unittest.cc
+++ b/blimp/net/blimp_message_pump_unittest.cc
@@ -50,60 +50,34 @@ class BlimpMessagePumpTest : public testing::Test {
scoped_ptr<BlimpMessagePump> message_pump_;
};
-// Reader completes reading one packet synchronously.
-TEST_F(BlimpMessagePumpTest, SyncPacketRead) {
- EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _));
- EXPECT_CALL(reader_, ReadPacket(NotNull(), _))
- .WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()),
- Return(message1_->ByteSize())));
- message_pump_->SetMessageProcessor(&receiver_);
-}
-
-// Reader completes reading two packets synchronously.
-TEST_F(BlimpMessagePumpTest, SyncTwoPacketsRead) {
- EXPECT_CALL(reader_, ReadPacket(NotNull(), _))
- .WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()),
- Return(message1_->ByteSize())))
- .WillOnce(DoAll(FillBufferFromMessage<0>(message2_.get()),
- Return(message2_->ByteSize())));
- net::CompletionCallback process_msg_cb;
- {
- InSequence s;
- EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _))
- .WillOnce(SaveArg<1>(&process_msg_cb))
- .RetiresOnSaturation();
- EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message2_), _));
- }
- message_pump_->SetMessageProcessor(&receiver_);
-
- // Trigger next packet read
- process_msg_cb.Run(net::OK);
-}
-
// Reader completes reading one packet asynchronously.
-TEST_F(BlimpMessagePumpTest, AsyncPacketRead) {
+TEST_F(BlimpMessagePumpTest, ReadPacket) {
net::CompletionCallback read_packet_cb;
+ EXPECT_CALL(reader_, ReadPacket(NotNull(), _));
EXPECT_CALL(reader_, ReadPacket(NotNull(), _))
.WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()),
- SaveArg<1>(&read_packet_cb), Return(net::ERR_IO_PENDING)))
- .WillOnce(Return(net::ERR_IO_PENDING));
+ SetBufferOffset<0>(message1_->ByteSize()),
+ SaveArg<1>(&read_packet_cb)))
+ .RetiresOnSaturation();
net::CompletionCallback process_msg_cb;
EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _))
.WillOnce(SaveArg<1>(&process_msg_cb));
message_pump_->SetMessageProcessor(&receiver_);
- read_packet_cb.Run(message1_->ByteSize());
+ ASSERT_FALSE(read_packet_cb.is_null());
+ base::ResetAndReturn(&read_packet_cb).Run(net::OK);
process_msg_cb.Run(net::OK);
}
// Reader completes reading two packets asynchronously.
-TEST_F(BlimpMessagePumpTest, AsyncTwoPacketsRead) {
+TEST_F(BlimpMessagePumpTest, ReadTwoPackets) {
net::CompletionCallback read_packet_cb;
EXPECT_CALL(reader_, ReadPacket(NotNull(), _))
.WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()),
- SaveArg<1>(&read_packet_cb), Return(net::ERR_IO_PENDING)))
+ SetBufferOffset<0>(message1_->ByteSize()),
+ SaveArg<1>(&read_packet_cb)))
.WillOnce(DoAll(FillBufferFromMessage<0>(message2_.get()),
- SaveArg<1>(&read_packet_cb),
- Return(net::ERR_IO_PENDING)));
+ SetBufferOffset<0>(message2_->ByteSize()),
+ SaveArg<1>(&read_packet_cb)));
net::CompletionCallback process_msg_cb;
{
InSequence s;
@@ -113,77 +87,54 @@ TEST_F(BlimpMessagePumpTest, AsyncTwoPacketsRead) {
EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message2_), _));
}
message_pump_->SetMessageProcessor(&receiver_);
- read_packet_cb.Run(message1_->ByteSize());
+ ASSERT_FALSE(read_packet_cb.is_null());
+ base::ResetAndReturn(&read_packet_cb).Run(net::OK);
// Trigger next packet read
process_msg_cb.Run(net::OK);
- read_packet_cb.Run(message2_->ByteSize());
+ ASSERT_FALSE(read_packet_cb.is_null());
+ base::ResetAndReturn(&read_packet_cb).Run(net::OK);
}
// Reader completes reading two packets asynchronously.
// The first read succeeds, and the second fails.
-TEST_F(BlimpMessagePumpTest, AsyncTwoPacketsReadWithError) {
+TEST_F(BlimpMessagePumpTest, ReadTwoPacketsWithError) {
+ net::CompletionCallback process_msg_cb;
net::CompletionCallback read_packet_cb;
EXPECT_CALL(reader_, ReadPacket(NotNull(), _))
.WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()),
- SaveArg<1>(&read_packet_cb), Return(net::ERR_IO_PENDING)))
+ SetBufferOffset<0>(message1_->ByteSize()),
+ SaveArg<1>(&read_packet_cb)))
.WillOnce(DoAll(FillBufferFromMessage<0>(message2_.get()),
- SaveArg<1>(&read_packet_cb),
- Return(net::ERR_IO_PENDING)));
- net::CompletionCallback process_msg_cb;
- {
- InSequence s;
- EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _))
- .WillOnce(SaveArg<1>(&process_msg_cb));
- EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED));
- }
+ SetBufferOffset<0>(message2_->ByteSize()),
+ SaveArg<1>(&read_packet_cb)));
+ EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _))
+ .WillOnce(SaveArg<1>(&process_msg_cb));
+ EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED));
+
message_pump_->SetMessageProcessor(&receiver_);
- read_packet_cb.Run(message1_->ByteSize());
+ ASSERT_FALSE(read_packet_cb.is_null());
+ base::ResetAndReturn(&read_packet_cb).Run(net::OK);
// Trigger next packet read
process_msg_cb.Run(net::OK);
- read_packet_cb.Run(net::ERR_FAILED);
+ ASSERT_FALSE(read_packet_cb.is_null());
+ base::ResetAndReturn(&read_packet_cb).Run(net::ERR_FAILED);
}
// Reader completes reading one packet synchronously, but packet is invalid
TEST_F(BlimpMessagePumpTest, InvalidPacket) {
+ net::CompletionCallback read_packet_cb;
std::string test_msg("msg");
EXPECT_CALL(reader_, ReadPacket(NotNull(), _))
- .WillOnce(DoAll(FillBufferFromString<0>(test_msg), Return(1)));
+ .WillOnce(DoAll(FillBufferFromString<0>(test_msg),
+ SetBufferOffset<0>(test_msg.size()),
+ SaveArg<1>(&read_packet_cb)));
EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED));
- message_pump_->SetMessageProcessor(&receiver_);
-}
-
-TEST_F(BlimpMessagePumpTest, ClearMessageProcessorAfterRead) {
- net::CompletionCallback read_packet_cb;
- EXPECT_CALL(reader_, ReadPacket(NotNull(), _))
- .WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()),
- SaveArg<1>(&read_packet_cb),
- Return(net::ERR_IO_PENDING)));
- net::CompletionCallback process_msg_cb;
- EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _))
- .WillOnce(SaveArg<1>(&process_msg_cb));
-
- message_pump_->SetMessageProcessor(&receiver_);
- base::ResetAndReturn(&read_packet_cb).Run(message1_->ByteSize());
-
- message_pump_->SetMessageProcessor(nullptr);
-
- // Completing message processing will not trigger next packet read.
- base::ResetAndReturn(&process_msg_cb).Run(net::OK);
-}
-
-TEST_F(BlimpMessagePumpTest, ClearMessageProcessorDuringRead) {
- net::CompletionCallback read_packet_cb;
- EXPECT_CALL(reader_, ReadPacket(NotNull(), _))
- .WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()),
- SaveArg<1>(&read_packet_cb),
- Return(net::ERR_IO_PENDING)));
- // Receiver will not get any message.
message_pump_->SetMessageProcessor(&receiver_);
- message_pump_->SetMessageProcessor(nullptr);
- base::ResetAndReturn(&read_packet_cb).Run(message1_->ByteSize());
+ ASSERT_FALSE(read_packet_cb.is_null());
+ base::ResetAndReturn(&read_packet_cb).Run(net::OK);
}
} // namespace
diff --git a/blimp/net/packet_reader.h b/blimp/net/packet_reader.h
index 407e2fd..c000ab0 100644
--- a/blimp/net/packet_reader.h
+++ b/blimp/net/packet_reader.h
@@ -18,14 +18,13 @@ class BLIMP_NET_EXPORT PacketReader {
virtual ~PacketReader() {}
// Reads a packet from the connection.
- // Returns the size of the packet, in bytes, if the read operation executed
- // successfully.
- // Returns ERR_IO_PENDING if the operation will be executed asynchronously.
- // |cb| is later invoked with the packet size or an error code.
- // All other return values indicate errors and caller should stop using this
- // reader.
- virtual int ReadPacket(const scoped_refptr<net::GrowableIOBuffer>& buf,
- const net::CompletionCallback& cb) = 0;
+ // Passes net::OK to |cb| if the read operation executed successfully.
+ // Sets |buf.offset()| to the received message's size, and invokes |cb| with
+ // net::OK result on success.
+ // All other values indicate errors.
+ // |callback| will not be invoked if |this| is deleted.
+ virtual void ReadPacket(const scoped_refptr<net::GrowableIOBuffer>& buf,
+ const net::CompletionCallback& cb) = 0;
};
} // namespace blimp
diff --git a/blimp/net/packet_writer.h b/blimp/net/packet_writer.h
index e5dbb9a..8c7b9ca 100644
--- a/blimp/net/packet_writer.h
+++ b/blimp/net/packet_writer.h
@@ -21,14 +21,11 @@ class BLIMP_NET_EXPORT PacketWriter {
public:
virtual ~PacketWriter() {}
- // Writes a packet of at least one byte in size to a connection.
- //
- // Returns net::OK or an error code if the operation executed successfully.
- // Returns ERR_IO_PENDING if the operation will be executed asynchronously.
- // |cb| is later invoked with net::OK or an error code.
- // An error code indicates that caller should stop using this writer.
- virtual int WritePacket(scoped_refptr<net::DrainableIOBuffer> data,
- const net::CompletionCallback& callback) = 0;
+ // Invokes |cb| with net::OK if the write operation executed successfully.
+ // All other values indicate unrecoverable errors.
+ // |callback| must not be invoked if |this| is deleted.
+ virtual void WritePacket(scoped_refptr<net::DrainableIOBuffer> data,
+ const net::CompletionCallback& callback) = 0;
};
} // namespace blimp
diff --git a/blimp/net/stream_packet_reader.cc b/blimp/net/stream_packet_reader.cc
index 8da6130..e500748 100644
--- a/blimp/net/stream_packet_reader.cc
+++ b/blimp/net/stream_packet_reader.cc
@@ -9,6 +9,7 @@
#include "base/callback_helpers.h"
#include "base/logging.h"
#include "base/memory/weak_ptr.h"
+#include "base/message_loop/message_loop.h"
#include "base/sys_byteorder.h"
#include "blimp/net/common.h"
#include "net/base/io_buffer.h"
@@ -42,7 +43,7 @@ StreamPacketReader::StreamPacketReader(net::StreamSocket* socket)
StreamPacketReader::~StreamPacketReader() {}
-int StreamPacketReader::ReadPacket(
+void StreamPacketReader::ReadPacket(
const scoped_refptr<net::GrowableIOBuffer>& buf,
const net::CompletionCallback& callback) {
DCHECK_EQ(ReadState::IDLE, read_state_);
@@ -54,17 +55,17 @@ int StreamPacketReader::ReadPacket(
read_state_ = ReadState::HEADER;
int result = DoReadLoop(net::OK);
- if (result == net::ERR_IO_PENDING) {
- // Store the completion callback to invoke when read completes
- // asynchronously.
- callback_ = callback;
- } else {
+ if (result != net::ERR_IO_PENDING) {
// Release the payload buffer, since the read operation has completed
// synchronously.
payload_buffer_ = nullptr;
- }
- return result;
+ // Adapt synchronous completion to an asynchronous style.
+ base::MessageLoop::current()->PostTask(FROM_HERE,
+ base::Bind(callback, result));
+ } else {
+ callback_ = callback;
+ }
}
int StreamPacketReader::DoReadLoop(int result) {
@@ -134,7 +135,7 @@ int StreamPacketReader::DoReadPayload(int result) {
// Finished reading the payload.
read_state_ = ReadState::IDLE;
- return payload_size_;
+ return net::OK;
}
void StreamPacketReader::OnReadComplete(int result) {
diff --git a/blimp/net/stream_packet_reader.h b/blimp/net/stream_packet_reader.h
index 4468638..21a6dab 100644
--- a/blimp/net/stream_packet_reader.h
+++ b/blimp/net/stream_packet_reader.h
@@ -33,8 +33,8 @@ class BLIMP_NET_EXPORT StreamPacketReader : public PacketReader {
~StreamPacketReader() override;
// PacketReader implementation.
- int ReadPacket(const scoped_refptr<net::GrowableIOBuffer>& buf,
- const net::CompletionCallback& cb) override;
+ void ReadPacket(const scoped_refptr<net::GrowableIOBuffer>& buf,
+ const net::CompletionCallback& cb) override;
private:
enum class ReadState {
diff --git a/blimp/net/stream_packet_reader_unittest.cc b/blimp/net/stream_packet_reader_unittest.cc
index 61419cf..5777d4d 100644
--- a/blimp/net/stream_packet_reader_unittest.cc
+++ b/blimp/net/stream_packet_reader_unittest.cc
@@ -5,6 +5,7 @@
#include <stddef.h>
#include <string>
+#include "base/message_loop/message_loop.h"
#include "base/sys_byteorder.h"
#include "blimp/net/common.h"
#include "blimp/net/stream_packet_reader.h"
@@ -41,11 +42,10 @@ class StreamPacketReaderTest : public testing::Test {
~StreamPacketReaderTest() override {}
- int ReadPacket() {
- return data_reader_.ReadPacket(buffer_, callback_.callback());
- }
+ void ReadPacket() { data_reader_.ReadPacket(buffer_, callback_.callback()); }
protected:
+ base::MessageLoop message_loop_;
scoped_refptr<net::GrowableIOBuffer> buffer_;
std::string test_msg_;
net::TestCompletionCallback callback_;
@@ -65,12 +65,11 @@ TEST_F(StreamPacketReaderTest, ReadAsyncHeaderAsyncPayload) {
.WillOnce(DoAll(FillBufferFromString<0>(test_msg_),
SaveArg<2>(&socket_cb), Return(net::ERR_IO_PENDING)));
- EXPECT_EQ(net::ERR_IO_PENDING, ReadPacket());
+ ReadPacket();
socket_cb.Run(kPacketHeaderSizeBytes);
socket_cb.Run(test_msg_.size());
- int rv = callback_.WaitForResult();
-
- EXPECT_EQ(rv, static_cast<int>(test_msg_.size()));
+ EXPECT_EQ(net::OK, callback_.WaitForResult());
+ EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset());
EXPECT_TRUE(BufferStartsWith(buffer_.get(), test_msg_));
}
@@ -89,13 +88,12 @@ TEST_F(StreamPacketReaderTest, ReadAsyncHeaderSyncPayload) {
.WillOnce(
DoAll(FillBufferFromString<0>(test_msg_), Return(test_msg_.size())));
- EXPECT_EQ(net::ERR_IO_PENDING, ReadPacket());
+ ReadPacket();
EXPECT_FALSE(callback_.have_result());
socket_cb.Run(kPacketHeaderSizeBytes);
- int rv = callback_.WaitForResult();
- EXPECT_GT(rv, 0);
- EXPECT_EQ(rv, static_cast<int>(test_msg_.size()));
+ EXPECT_EQ(net::OK, callback_.WaitForResult());
+ EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset());
EXPECT_TRUE(BufferStartsWith(buffer_.get(), test_msg_));
}
@@ -110,12 +108,10 @@ TEST_F(StreamPacketReaderTest, ReadSyncHeaderAsyncPayload) {
.WillOnce(DoAll(FillBufferFromString<0>(test_msg_),
SaveArg<2>(&socket_cb), Return(net::ERR_IO_PENDING)));
- EXPECT_EQ(net::ERR_IO_PENDING, ReadPacket());
+ ReadPacket();
socket_cb.Run(test_msg_.size());
- int rv = callback_.WaitForResult();
-
- EXPECT_GT(rv, 0);
- EXPECT_EQ(rv, static_cast<int>(test_msg_.size()));
+ EXPECT_EQ(net::OK, callback_.WaitForResult());
+ EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset());
EXPECT_TRUE(BufferStartsWith(buffer_.get(), test_msg_));
}
@@ -130,12 +126,10 @@ TEST_F(StreamPacketReaderTest, ReadSyncHeaderSyncPayload) {
.WillOnce(
DoAll(FillBufferFromString<0>(test_msg_), Return(test_msg_.size())));
- int rv = ReadPacket();
-
- EXPECT_GT(rv, 0);
- EXPECT_EQ(rv, static_cast<int>(test_msg_.size()));
+ ReadPacket();
+ EXPECT_EQ(net::OK, callback_.WaitForResult());
+ EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset());
EXPECT_TRUE(BufferStartsWith(buffer_.get(), test_msg_));
- EXPECT_FALSE(callback_.have_result());
}
// Successful read of 2 messages, header and payload reads all completing
@@ -168,13 +162,14 @@ TEST_F(StreamPacketReaderTest, ReadMultipleMessagesSync) {
DoAll(FillBufferFromString<0>(test_msg2), Return(test_msg2.size())))
.RetiresOnSaturation();
- int rv = ReadPacket();
+ ReadPacket();
+ EXPECT_EQ(net::OK, callback_.WaitForResult());
+ EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset());
+
+ ReadPacket();
+ EXPECT_EQ(net::OK, callback_.WaitForResult());
+ EXPECT_EQ(static_cast<int>(test_msg2.size()), buffer_->offset());
- EXPECT_GT(rv, 0);
- EXPECT_EQ(rv, static_cast<int>(test_msg_.size()));
- rv = ReadPacket();
- EXPECT_GT(rv, 0);
- EXPECT_EQ(rv, static_cast<int>(test_msg2.size()));
EXPECT_TRUE(BufferStartsWith(buffer_.get(), test_msg_));
EXPECT_FALSE(callback_.have_result());
}
@@ -210,23 +205,17 @@ TEST_F(StreamPacketReaderTest, ReadMultipleMessagesAsync) {
SaveArg<2>(&socket_cb), Return(net::ERR_IO_PENDING)))
.RetiresOnSaturation();
- EXPECT_EQ(net::ERR_IO_PENDING,
- data_reader_.ReadPacket(buffer_, read_cb1.callback()));
+ data_reader_.ReadPacket(buffer_, read_cb1.callback());
socket_cb.Run(kPacketHeaderSizeBytes);
socket_cb.Run(test_msg_.size());
- int rv = read_cb1.WaitForResult();
+ EXPECT_EQ(net::OK, read_cb1.WaitForResult());
+ EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset());
- EXPECT_GT(rv, 0);
- EXPECT_EQ(rv, static_cast<int>(test_msg_.size()));
-
- EXPECT_EQ(net::ERR_IO_PENDING,
- data_reader_.ReadPacket(buffer_, read_cb2.callback()));
+ data_reader_.ReadPacket(buffer_, read_cb2.callback());
socket_cb.Run(kPacketHeaderSizeBytes);
socket_cb.Run(test_msg_.size());
- rv = read_cb2.WaitForResult();
-
- EXPECT_GT(rv, 0);
- EXPECT_EQ(rv, static_cast<int>(test_msg_.size()));
+ EXPECT_EQ(net::OK, read_cb2.WaitForResult());
+ EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset());
EXPECT_TRUE(BufferStartsWith(buffer_.get(), test_msg_));
}
@@ -250,7 +239,7 @@ TEST_F(StreamPacketReaderTest, PartialHeaderReadAsync) {
EXPECT_CALL(socket_, Read(NotNull(), test_msg_.size(), _))
.WillOnce(Return(net::ERR_IO_PENDING));
- EXPECT_EQ(net::ERR_IO_PENDING, ReadPacket());
+ ReadPacket();
cb.Run(1);
cb.Run(kPacketHeaderSizeBytes - 1);
}
@@ -268,7 +257,7 @@ TEST_F(StreamPacketReaderTest, PartialPayloadReadAsync) {
EXPECT_CALL(socket_, Read(NotNull(), test_msg_.size(), _))
.WillOnce(DoAll(FillBufferFromString<0>(test_msg_.substr(0, 1)),
SaveArg<2>(&cb), Return(net::ERR_IO_PENDING)));
- EXPECT_EQ(net::ERR_IO_PENDING, ReadPacket());
+ ReadPacket();
EXPECT_CALL(socket_, Read(NotNull(), test_msg_.size() - 1, _))
.WillOnce(DoAll(
FillBufferFromString<0>(test_msg_.substr(1, test_msg_.size() - 1)),
@@ -276,10 +265,8 @@ TEST_F(StreamPacketReaderTest, PartialPayloadReadAsync) {
cb.Run(1);
cb.Run(test_msg_.size() - 1);
- int rv = callback_.WaitForResult();
-
- EXPECT_GT(rv, 0);
- EXPECT_EQ(static_cast<int>(test_msg_.size()), rv);
+ EXPECT_EQ(net::OK, callback_.WaitForResult());
+ EXPECT_EQ(static_cast<int>(test_msg_.size()), buffer_->offset());
}
// Verify that synchronous header read errors are reported correctly.
@@ -287,7 +274,8 @@ TEST_F(StreamPacketReaderTest, ReadHeaderErrorSync) {
net::CompletionCallback cb;
EXPECT_CALL(socket_, Read(NotNull(), kPacketHeaderSizeBytes, _))
.WillOnce(Return(net::ERR_FAILED));
- EXPECT_EQ(net::ERR_FAILED, ReadPacket());
+ ReadPacket();
+ EXPECT_EQ(net::ERR_FAILED, callback_.WaitForResult());
}
// Verify that synchronous payload read errors are reported correctly.
@@ -300,7 +288,8 @@ TEST_F(StreamPacketReaderTest, ReadPayloadErrorSync) {
EXPECT_CALL(socket_, Read(NotNull(), test_msg_.size(), _))
.WillOnce(Return(net::ERR_FAILED));
- EXPECT_EQ(net::ERR_FAILED, ReadPacket());
+ ReadPacket();
+ EXPECT_EQ(net::ERR_FAILED, callback_.WaitForResult());
}
// Verify that async header read errors are reported correctly.
@@ -312,7 +301,7 @@ TEST_F(StreamPacketReaderTest, ReadHeaderErrorAsync) {
.WillOnce(DoAll(FillBufferFromString<0>(EncodeHeader(test_msg_.size())),
SaveArg<2>(&cb), Return(net::ERR_IO_PENDING)));
- EXPECT_EQ(net::ERR_IO_PENDING, ReadPacket());
+ ReadPacket();
cb.Run(net::ERR_FAILED);
EXPECT_EQ(net::ERR_FAILED, callback_.WaitForResult());
}
@@ -327,7 +316,7 @@ TEST_F(StreamPacketReaderTest, ReadPayloadErrorAsync) {
EXPECT_CALL(socket_, Read(NotNull(), test_msg_.size(), _))
.WillOnce(DoAll(SaveArg<2>(&cb), Return(net::ERR_IO_PENDING)));
- EXPECT_EQ(net::ERR_IO_PENDING, ReadPacket());
+ ReadPacket();
cb.Run(net::ERR_FAILED);
EXPECT_EQ(net::ERR_FAILED, callback_.WaitForResult());
}
@@ -343,8 +332,7 @@ TEST_F(StreamPacketReaderTest, ReaderDeletedDuringAsyncHeaderRead) {
.WillOnce(DoAll(FillBufferFromString<0>(EncodeHeader(test_msg_.size())),
SaveArg<2>(&cb), Return(net::ERR_IO_PENDING)));
- EXPECT_EQ(net::ERR_IO_PENDING,
- reader->ReadPacket(buffer_, callback_.callback()));
+ reader->ReadPacket(buffer_, callback_.callback());
reader.reset(); // Delete the reader object.
cb.Run(kPacketHeaderSizeBytes); // Complete the socket operation.
}
@@ -360,8 +348,7 @@ TEST_F(StreamPacketReaderTest, ReaderDeletedDuringAsyncPayloadRead) {
Return(kPacketHeaderSizeBytes)));
EXPECT_CALL(socket_, Read(NotNull(), test_msg_.size(), _))
.WillOnce(DoAll(SaveArg<2>(&cb), Return(net::ERR_IO_PENDING)));
- EXPECT_EQ(net::ERR_IO_PENDING,
- reader->ReadPacket(buffer_, callback_.callback()));
+ reader->ReadPacket(buffer_, callback_.callback());
reader.reset(); // Delete the reader object.
cb.Run(net::ERR_FAILED); // Complete the socket operation.
@@ -374,7 +361,8 @@ TEST_F(StreamPacketReaderTest, ReadWhatIsThisAPacketForAnts) {
Return(kPacketHeaderSizeBytes)))
.RetiresOnSaturation();
- EXPECT_EQ(net::ERR_INVALID_RESPONSE, ReadPacket());
+ ReadPacket();
+ EXPECT_EQ(net::ERR_INVALID_RESPONSE, callback_.WaitForResult());
}
// Verify that an illegally large payloads is reported as an erroneous inputs.
@@ -384,7 +372,8 @@ TEST_F(StreamPacketReaderTest, ReadErrorIllegallyLargePayload) {
DoAll(FillBufferFromString<0>(EncodeHeader(kTestMaxBufferSize + 1)),
Return(kPacketHeaderSizeBytes)));
- EXPECT_EQ(net::ERR_INVALID_RESPONSE, ReadPacket());
+ ReadPacket();
+ EXPECT_EQ(net::ERR_INVALID_RESPONSE, callback_.WaitForResult());
}
} // namespace
diff --git a/blimp/net/stream_packet_writer.cc b/blimp/net/stream_packet_writer.cc
index 221cdd1..943f9a9 100644
--- a/blimp/net/stream_packet_writer.cc
+++ b/blimp/net/stream_packet_writer.cc
@@ -47,15 +47,11 @@ StreamPacketWriter::StreamPacketWriter(net::StreamSocket* socket)
StreamPacketWriter::~StreamPacketWriter() {}
-int StreamPacketWriter::WritePacket(scoped_refptr<net::DrainableIOBuffer> data,
- const net::CompletionCallback& callback) {
+void StreamPacketWriter::WritePacket(scoped_refptr<net::DrainableIOBuffer> data,
+ const net::CompletionCallback& callback) {
DCHECK_EQ(WriteState::IDLE, write_state_);
DCHECK(data);
- if (data->BytesRemaining() == 0) {
- // The packet is empty; your argument is invalid.
- DLOG(ERROR) << "Attempted to write zero-length packet.";
- return net::ERR_INVALID_ARGUMENT;
- }
+ CHECK(data->BytesRemaining());
write_state_ = WriteState::HEADER;
header_buffer_->SetOffset(0);
@@ -63,18 +59,18 @@ int StreamPacketWriter::WritePacket(scoped_refptr<net::DrainableIOBuffer> data,
base::HostToNet32(data->BytesRemaining());
payload_buffer_ = data;
- int result = DoWriteLoop(false);
- if (result == net::ERR_IO_PENDING) {
- // Store the completion callback to invoke when DoWriteLoop completes
- // asynchronously.
- callback_ = callback;
- } else {
+ int result = DoWriteLoop(net::OK);
+ if (result != net::ERR_IO_PENDING) {
// Release the payload buffer, since the write operation has completed
// synchronously.
payload_buffer_ = nullptr;
- }
- return result;
+ // Adapt synchronous completion to an asynchronous style.
+ base::MessageLoop::current()->PostTask(FROM_HERE,
+ base::Bind(callback, result));
+ } else {
+ callback_ = callback;
+ }
}
int StreamPacketWriter::DoWriteLoop(int result) {
diff --git a/blimp/net/stream_packet_writer.h b/blimp/net/stream_packet_writer.h
index c618eba..071a8f0 100644
--- a/blimp/net/stream_packet_writer.h
+++ b/blimp/net/stream_packet_writer.h
@@ -35,8 +35,8 @@ class BLIMP_NET_EXPORT StreamPacketWriter : public PacketWriter {
~StreamPacketWriter() override;
// PacketWriter implementation.
- int WritePacket(scoped_refptr<net::DrainableIOBuffer> data,
- const net::CompletionCallback& callback) override;
+ void WritePacket(scoped_refptr<net::DrainableIOBuffer> data,
+ const net::CompletionCallback& callback) override;
private:
enum class WriteState {
diff --git a/blimp/net/stream_packet_writer_unittest.cc b/blimp/net/stream_packet_writer_unittest.cc
index c8014f9..3212e928 100644
--- a/blimp/net/stream_packet_writer_unittest.cc
+++ b/blimp/net/stream_packet_writer_unittest.cc
@@ -35,15 +35,13 @@ class StreamPacketWriterTest : public testing::Test {
test_data_str_.size())),
message_writer_(&socket_) {}
- ~StreamPacketWriterTest() override {}
-
protected:
const std::string test_data_str_ = "U WOT M8";
scoped_refptr<net::DrainableIOBuffer> test_data_;
+ base::MessageLoop message_loop_;
MockStreamSocket socket_;
StreamPacketWriter message_writer_;
- base::MessageLoop message_loop_;
testing::InSequence mock_sequence_;
private:
@@ -56,20 +54,14 @@ TEST_F(StreamPacketWriterTest, TestWriteAsync) {
net::CompletionCallback header_cb;
net::CompletionCallback payload_cb;
- // Write header.
EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())),
kPacketHeaderSizeBytes, _))
.WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING)));
- EXPECT_EQ(net::ERR_IO_PENDING,
- message_writer_.WritePacket(test_data_, writer_cb.callback()));
- Mock::VerifyAndClearExpectations(&socket_);
-
- // Write payload.
+ message_writer_.WritePacket(test_data_, writer_cb.callback());
EXPECT_CALL(socket_,
Write(BufferEquals(test_data_str_), test_data_str_.size(), _))
.WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING)));
header_cb.Run(kPacketHeaderSizeBytes);
- Mock::VerifyAndClearExpectations(&socket_);
payload_cb.Run(test_data_str_.size());
EXPECT_EQ(net::OK, writer_cb.WaitForResult());
@@ -100,8 +92,7 @@ TEST_F(StreamPacketWriterTest, TestPartialWriteAsync) {
.WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING)))
.RetiresOnSaturation();
- EXPECT_EQ(net::ERR_IO_PENDING,
- message_writer_.WritePacket(test_data_, writer_cb.callback()));
+ message_writer_.WritePacket(test_data_, writer_cb.callback());
// Header is written - first one byte, then the remainder.
header_cb.Run(1);
@@ -127,8 +118,7 @@ TEST_F(StreamPacketWriterTest, TestWriteErrorAsync) {
Write(BufferEquals(test_data_str_), test_data_str_.size(), _))
.WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING)));
- EXPECT_EQ(net::ERR_IO_PENDING,
- message_writer_.WritePacket(test_data_, writer_cb.callback()));
+ message_writer_.WritePacket(test_data_, writer_cb.callback());
header_cb.Run(kPacketHeaderSizeBytes);
payload_cb.Run(net::ERR_CONNECTION_RESET);
@@ -138,15 +128,16 @@ TEST_F(StreamPacketWriterTest, TestWriteErrorAsync) {
// Successful write with 1 sync header write and 1 sync payload write.
TEST_F(StreamPacketWriterTest, TestWriteSync) {
net::TestCompletionCallback writer_cb;
+
EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())),
kPacketHeaderSizeBytes, _))
.WillOnce(Return(kPacketHeaderSizeBytes));
EXPECT_CALL(socket_,
Write(BufferEquals(test_data_str_), test_data_str_.size(), _))
.WillOnce(Return(test_data_str_.size()));
- EXPECT_EQ(net::OK,
- message_writer_.WritePacket(test_data_, writer_cb.callback()));
- EXPECT_FALSE(writer_cb.have_result());
+
+ message_writer_.WritePacket(test_data_, writer_cb.callback());
+ EXPECT_EQ(net::OK, writer_cb.WaitForResult());
}
// Successful write with 2 sync header writes and 2 sync payload writes.
@@ -167,21 +158,8 @@ TEST_F(StreamPacketWriterTest, TestPartialWriteSync) {
payload.size() - 1, _))
.WillOnce(Return(payload.size() - 1));
- EXPECT_EQ(net::OK,
- message_writer_.WritePacket(test_data_, writer_cb.callback()));
- EXPECT_FALSE(writer_cb.have_result());
-}
-
-// Verify that zero-length packets are rejected.
-TEST_F(StreamPacketWriterTest, TestZeroLengthPacketsRejected) {
- net::TestCompletionCallback writer_cb;
-
- EXPECT_EQ(net::ERR_INVALID_ARGUMENT,
- message_writer_.WritePacket(
- new net::DrainableIOBuffer(new net::IOBuffer(0), 0),
- writer_cb.callback()));
-
- EXPECT_FALSE(writer_cb.have_result());
+ message_writer_.WritePacket(test_data_, writer_cb.callback());
+ EXPECT_EQ(net::OK, writer_cb.WaitForResult());
}
// Sync socket error while writing header data.
@@ -192,9 +170,8 @@ TEST_F(StreamPacketWriterTest, TestWriteHeaderErrorSync) {
kPacketHeaderSizeBytes, _))
.WillOnce(Return(net::ERR_FAILED));
- EXPECT_EQ(net::ERR_FAILED,
- message_writer_.WritePacket(test_data_, writer_cb.callback()));
-
+ message_writer_.WritePacket(test_data_, writer_cb.callback());
+ EXPECT_EQ(net::ERR_FAILED, writer_cb.WaitForResult());
EXPECT_EQ(net::ERR_EMPTY_RESPONSE,
writer_cb.GetResult(net::ERR_EMPTY_RESPONSE));
EXPECT_FALSE(writer_cb.have_result());
@@ -211,9 +188,8 @@ TEST_F(StreamPacketWriterTest, TestWritePayloadErrorSync) {
Write(BufferEquals(test_data_str_), test_data_str_.size(), _))
.WillOnce(Return(net::ERR_FAILED));
- EXPECT_EQ(net::ERR_FAILED,
- message_writer_.WritePacket(test_data_, writer_cb.callback()));
- EXPECT_FALSE(writer_cb.have_result());
+ message_writer_.WritePacket(test_data_, writer_cb.callback());
+ EXPECT_EQ(net::ERR_FAILED, writer_cb.WaitForResult());
}
// Verify that asynchronous header write completions don't cause a
@@ -228,8 +204,7 @@ TEST_F(StreamPacketWriterTest, DeletedDuringHeaderWrite) {
EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())),
kPacketHeaderSizeBytes, _))
.WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING)));
- EXPECT_EQ(net::ERR_IO_PENDING,
- writer->WritePacket(test_data_, writer_cb.callback()));
+ writer->WritePacket(test_data_, writer_cb.callback());
Mock::VerifyAndClearExpectations(&socket_);
// Header write completion callback is invoked after the writer died.
@@ -252,8 +227,7 @@ TEST_F(StreamPacketWriterTest, DeletedDuringPayloadWrite) {
Write(BufferEquals(test_data_str_), test_data_str_.size(), _))
.WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING)));
- EXPECT_EQ(net::ERR_IO_PENDING,
- writer->WritePacket(test_data_, writer_cb.callback()));
+ writer->WritePacket(test_data_, writer_cb.callback());
// Header write completes successfully.
header_cb.Run(kPacketHeaderSizeBytes);
diff --git a/blimp/net/test_common.h b/blimp/net/test_common.h
index be66ab2..58ee2f6 100644
--- a/blimp/net/test_common.h
+++ b/blimp/net/test_common.h
@@ -35,7 +35,6 @@ MATCHER_P(BufferEquals, expected, "") {
}
// Checks if two proto messages are the same.
-// TODO(kmarshall): promote to a shared testing library.
MATCHER_P(EqualsProto, message, "") {
std::string expected_serialized;
std::string actual_serialized;
@@ -49,7 +48,7 @@ MATCHER_P(EqualsProto, message, "") {
// message (type: BlimpMessage) The message to compare with |arg|.
MATCHER_P(BufferEqualsProto, message, "") {
BlimpMessage actual_message;
- actual_message.ParseFromArray(arg->data(), arg->BytesRemaining());
+ actual_message.ParseFromArray(arg->data(), message.ByteSize());
std::string expected_serialized;
std::string actual_serialized;
message.SerializeToString(&expected_serialized);
@@ -71,7 +70,8 @@ ACTION_TEMPLATE(FillBufferFromString,
// Behavior is undefined if len(buf) < len(str).
bool BufferStartsWith(net::GrowableIOBuffer* buf, const std::string& str);
-// GMock action that writes data from a blimp message to an IOBuffer .
+// GMock action that writes data from a BlimpMessage to a GrowableIOBuffer.
+// Advances the buffer's |offset| to the end of the message.
//
// buf_idx (template parameter 0): 0-based index of the IOBuffer arg.
// message: the blimp message containing data to be written to the IOBuffer
@@ -80,6 +80,14 @@ ACTION_TEMPLATE(FillBufferFromMessage,
AND_1_VALUE_PARAMS(message)) {
message->SerializeToArray(testing::get<buf_idx>(args)->data(),
message->ByteSize());
+ testing::get<buf_idx>(args)->set_offset(message->ByteSize());
+}
+
+// Calls |set_offset()| for a GrowableIOBuffer.
+ACTION_TEMPLATE(SetBufferOffset,
+ HAS_1_TEMPLATE_PARAMS(int, buf_idx),
+ AND_1_VALUE_PARAMS(offset)) {
+ testing::get<buf_idx>(args)->set_offset(offset);
}
// Formats a string-based representation of a BlimpMessage header.
@@ -143,8 +151,8 @@ class MockPacketReader : public PacketReader {
~MockPacketReader() override;
MOCK_METHOD2(ReadPacket,
- int(const scoped_refptr<net::GrowableIOBuffer>&,
- const net::CompletionCallback&));
+ void(const scoped_refptr<net::GrowableIOBuffer>&,
+ const net::CompletionCallback&));
};
class MockPacketWriter : public PacketWriter {
@@ -153,8 +161,8 @@ class MockPacketWriter : public PacketWriter {
~MockPacketWriter() override;
MOCK_METHOD2(WritePacket,
- int(scoped_refptr<net::DrainableIOBuffer>,
- const net::CompletionCallback&));
+ void(scoped_refptr<net::DrainableIOBuffer>,
+ const net::CompletionCallback&));
};
class MockConnectionErrorObserver : public ConnectionErrorObserver {