path: root/blimp/net
diff options
authorkmarshall <>2015-10-27 16:30:01 -0700
committerCommit bot <>2015-10-27 23:31:16 +0000
commita0921ade6d8b06be814772050307e0d02cef3e85 (patch)
tree4cf90001cbe4eb229f6bfc9541302800fbd954ef /blimp/net
parentb02d1809b3742b407ffc39a8203df2c9e8e0a7d1 (diff)
Created BlimpMessageWriter, which frames and sends data over a Socket.
The conversion from BlimpMessages to bytes will be handled in another class as part of a followup CL. Added unit tests for BlimpMessageWriter.,, BUG= Review URL: Cr-Commit-Position: refs/heads/master@{#356431}
Diffstat (limited to 'blimp/net')
6 files changed, 504 insertions, 3 deletions
diff --git a/blimp/net/ b/blimp/net/
index d3a6619..b4d2375 100644
--- a/blimp/net/
+++ b/blimp/net/
@@ -14,6 +14,8 @@ component("blimp_net") {
+ "",
+ "packet_writer.h",
@@ -31,7 +33,7 @@ source_set("unit_tests") {
sources = [
- "packet_reader_unittest.h",
+ "",
diff --git a/blimp/net/ b/blimp/net/
index e5b0dc6..1663b30 100644
--- a/blimp/net/
+++ b/blimp/net/
@@ -6,7 +6,7 @@
namespace blimp {
-const size_t kMaxPacketPayloadSizeBytes = 1 << 16; // 64kb
+const size_t kMaxPacketPayloadSizeBytes = 1 << 16; // 64KB
const size_t kPacketHeaderSizeBytes = 4;
} // namespace blimp
diff --git a/blimp/net/common.h b/blimp/net/common.h
index 96e911a..eff9921 100644
--- a/blimp/net/common.h
+++ b/blimp/net/common.h
@@ -8,7 +8,6 @@
#include <string>
#include "blimp/net/blimp_net_export.h"
-#include "net/base/net_errors.h"
namespace blimp {
diff --git a/blimp/net/ b/blimp/net/
new file mode 100644
index 0000000..81a6d90
--- /dev/null
+++ b/blimp/net/
@@ -0,0 +1,152 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#include "blimp/net/packet_writer.h"
+#include <iostream>
+#include "base/callback_helpers.h"
+#include "base/logging.h"
+#include "base/memory/ref_counted.h"
+#include "base/message_loop/message_loop.h"
+#include "base/sys_byteorder.h"
+#include "blimp/common/proto/blimp_message.pb.h"
+#include "blimp/net/common.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/socket/stream_socket.h"
+namespace blimp {
+std::ostream& operator<<(std::ostream& out,
+ const PacketWriter::WriteState state) {
+ switch (state) {
+ case PacketWriter::WriteState::IDLE:
+ out << "IDLE";
+ break;
+ case PacketWriter::WriteState::HEADER:
+ out << "HEADER";
+ break;
+ case PacketWriter::WriteState::PAYLOAD:
+ out << "PAYLOAD";
+ break;
+ }
+ return out;
+PacketWriter::PacketWriter(net::StreamSocket* socket)
+ : write_state_(WriteState::IDLE),
+ socket_(socket),
+ header_buffer_(
+ new net::DrainableIOBuffer(new net::IOBuffer(kPacketHeaderSizeBytes),
+ kPacketHeaderSizeBytes)),
+ weak_factory_(this) {
+ DCHECK(socket_);
+PacketWriter::~PacketWriter() {}
+int PacketWriter::WritePacket(scoped_refptr<net::DrainableIOBuffer> data,
+ const net::CompletionCallback& callback) {
+ DCHECK_EQ(WriteState::IDLE, write_state_);
+ DCHECK(data);
+ if (data->BytesRemaining() == 0) {
+ // The packet is empty; your argument is invalid.
+ DLOG(ERROR) << "Attempted to write zero-length packet.";
+ }
+ write_state_ = WriteState::HEADER;
+ header_buffer_->SetOffset(0);
+ *reinterpret_cast<uint32*>(header_buffer_->data()) =
+ base::HostToNet32(data->BytesRemaining());
+ payload_buffer_ = data;
+ int result = DoWriteLoop(false);
+ if (result == net::ERR_IO_PENDING) {
+ // Store the completion callback to invoke when DoWriteLoop completes
+ // asynchronously.
+ callback_ = callback;
+ } else {
+ // Release the payload buffer, since the write operation has completed
+ // synchronously.
+ payload_buffer_ = nullptr;
+ }
+ return result;
+int PacketWriter::DoWriteLoop(int result) {
+ DCHECK_NE(net::ERR_IO_PENDING, result);
+ DCHECK_GE(result, 0);
+ DCHECK_NE(WriteState::IDLE, write_state_);
+ while (result >= 0 && write_state_ != WriteState::IDLE) {
+ VLOG(2) << "DoWriteLoop (state=" << write_state_ << ", result=" << result
+ << ")";
+ switch (write_state_) {
+ case WriteState::HEADER:
+ result = DoWriteHeader(result);
+ break;
+ case WriteState::PAYLOAD:
+ result = DoWritePayload(result);
+ break;
+ case WriteState::IDLE:
+ result = net::ERR_UNEXPECTED;
+ break;
+ }
+ }
+ return result;
+int PacketWriter::DoWriteHeader(int result) {
+ DCHECK_EQ(WriteState::HEADER, write_state_);
+ DCHECK_GE(result, 0);
+ header_buffer_->DidConsume(result);
+ if (header_buffer_->BytesRemaining() > 0) {
+ return socket_->Write(
+ header_buffer_.get(), header_buffer_->BytesRemaining(),
+ base::Bind(&PacketWriter::OnWriteComplete, weak_factory_.GetWeakPtr()));
+ }
+ write_state_ = WriteState::PAYLOAD;
+ return net::OK;
+int PacketWriter::DoWritePayload(int result) {
+ DCHECK_EQ(WriteState::PAYLOAD, write_state_);
+ DCHECK_GE(result, 0);
+ payload_buffer_->DidConsume(result);
+ if (payload_buffer_->BytesRemaining() > 0) {
+ return socket_->Write(
+ payload_buffer_.get(), payload_buffer_->BytesRemaining(),
+ base::Bind(&PacketWriter::OnWriteComplete, weak_factory_.GetWeakPtr()));
+ }
+ write_state_ = WriteState::IDLE;
+ return net::OK;
+void PacketWriter::OnWriteComplete(int result) {
+ DCHECK_NE(net::ERR_IO_PENDING, result);
+ // If the write was succesful, then process the result.
+ if (result > 0) {
+ result = DoWriteLoop(result);
+ }
+ // If the write finished, either successfully or by error, inform the
+ // caller.
+ if (result != net::ERR_IO_PENDING) {
+ payload_buffer_ = nullptr;
+ base::ResetAndReturn(&callback_).Run(result);
+ }
+} // namespace blimp
diff --git a/blimp/net/packet_writer.h b/blimp/net/packet_writer.h
new file mode 100644
index 0000000..79fc766497
--- /dev/null
+++ b/blimp/net/packet_writer.h
@@ -0,0 +1,81 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#include <string>
+#include "base/macros.h"
+#include "base/memory/ref_counted.h"
+#include "base/threading/thread_checker.h"
+#include "blimp/net/blimp_net_export.h"
+#include "net/base/completion_callback.h"
+#include "net/base/net_errors.h"
+namespace net {
+class DrainableIOBuffer;
+class StreamSocket;
+} // namespace net
+namespace blimp {
+// Writes opaque length-prefixed packets to a StreamSocket.
+// The header segment is 32-bit, encoded in network byte order.
+// The body segment length is specified in the header (should be capped at
+// kMaxPacketPayloadSizeBytes).
+class BLIMP_NET_EXPORT PacketWriter {
+ public:
+ // |socket|: The socket to write packets to. The caller must ensure |socket|
+ // is valid while the reader is in-use (see ReadPacket below).
+ explicit PacketWriter(net::StreamSocket* socket);
+ ~PacketWriter();
+ // Writes a packet of at least one byte in size to |socket_|.
+ //
+ // Returns net::OK or an error code if the operation executed successfully.
+ // Returns ERR_IO_PENDING if the operation will be executed asynchronously.
+ // |cb| is later invoked with net::OK or an error code.
+ int WritePacket(scoped_refptr<net::DrainableIOBuffer> data,
+ const net::CompletionCallback& callback);
+ private:
+ enum class WriteState {
+ };
+ friend std::ostream& operator<<(std::ostream& out, const WriteState state);
+ // State machine implementation.
+ // |result| - the result value of the most recent network operation.
+ // See comments for WritePacket() for documentation on return values.
+ int DoWriteLoop(int result);
+ int DoWriteHeader(int result);
+ int DoWritePayload(int result);
+ // Callback function to be invoked on asynchronous write completion.
+ // Invokes |callback_| on packet write completion or on error.
+ void OnWriteComplete(int result);
+ WriteState write_state_;
+ net::StreamSocket* socket_;
+ scoped_refptr<net::DrainableIOBuffer> payload_buffer_;
+ scoped_refptr<net::DrainableIOBuffer> header_buffer_;
+ net::CompletionCallback callback_;
+ base::WeakPtrFactory<PacketWriter> weak_factory_;
+} // namespace blimp
diff --git a/blimp/net/ b/blimp/net/
new file mode 100644
index 0000000..2554b47
--- /dev/null
+++ b/blimp/net/
@@ -0,0 +1,267 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#include <string>
+#include "base/message_loop/message_loop.h"
+#include "base/run_loop.h"
+#include "blimp/net/common.h"
+#include "blimp/net/packet_writer.h"
+#include "blimp/net/test_common.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/base/test_completion_callback.h"
+#include "net/socket/socket.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+using testing::_;
+using testing::DoAll;
+using testing::InSequence;
+using testing::Mock;
+using testing::NotNull;
+using testing::Return;
+using testing::SaveArg;
+namespace blimp {
+namespace {
+class PacketWriterTest : public testing::Test {
+ public:
+ PacketWriterTest()
+ : test_data_(
+ new net::DrainableIOBuffer(new net::StringIOBuffer(test_data_str_),
+ test_data_str_.size())),
+ message_writer_(&socket_) {}
+ ~PacketWriterTest() override {}
+ protected:
+ const std::string test_data_str_ = "U WOT M8";
+ scoped_refptr<net::DrainableIOBuffer> test_data_;
+ MockStreamSocket socket_;
+ PacketWriter message_writer_;
+ base::MessageLoop message_loop_;
+ testing::InSequence mock_sequence_;
+ private:
+// Successful write with 1 async header write and 1 async payload write.
+TEST_F(PacketWriterTest, TestWriteAsync) {
+ net::TestCompletionCallback writer_cb;
+ net::CompletionCallback header_cb;
+ net::CompletionCallback payload_cb;
+ // Write header.
+ EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())),
+ kPacketHeaderSizeBytes, _))
+ .WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING)));
+ message_writer_.WritePacket(test_data_, writer_cb.callback()));
+ Mock::VerifyAndClearExpectations(&socket_);
+ // Write payload.
+ EXPECT_CALL(socket_,
+ Write(BufferEquals(test_data_str_), test_data_str_.size(), _))
+ .WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING)));
+ header_cb.Run(kPacketHeaderSizeBytes);
+ Mock::VerifyAndClearExpectations(&socket_);
+ payload_cb.Run(test_data_str_.size());
+ EXPECT_EQ(net::OK, writer_cb.WaitForResult());
+// Successful write with 2 async header writes and 2 async payload writes.
+TEST_F(PacketWriterTest, TestPartialWriteAsync) {
+ net::TestCompletionCallback writer_cb;
+ net::CompletionCallback header_cb;
+ net::CompletionCallback payload_cb;
+ std::string header = EncodeHeader(test_data_str_.size());
+ std::string payload = test_data_str_;
+ EXPECT_CALL(socket_, Write(BufferEquals(header), kPacketHeaderSizeBytes, _))
+ .WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING)))
+ .RetiresOnSaturation();
+ EXPECT_CALL(socket_, Write(BufferEquals(header.substr(1, header.size())),
+ header.size() - 1, _))
+ .WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING)))
+ .RetiresOnSaturation();
+ EXPECT_CALL(socket_, Write(BufferEquals(payload), payload.size(), _))
+ .WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING)))
+ .RetiresOnSaturation();
+ EXPECT_CALL(socket_,
+ Write(BufferEquals(payload.substr(1, payload.size() - 1)),
+ payload.size() - 1, _))
+ .WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING)))
+ .RetiresOnSaturation();
+ message_writer_.WritePacket(test_data_, writer_cb.callback()));
+ // Header is written - first one byte, then the remainder.
+ header_cb.Run(1);
+ header_cb.Run(header.size() - 1);
+ // Payload is written - first one byte, then the remainder.
+ payload_cb.Run(1);
+ payload_cb.Run(payload.size() - 1);
+ EXPECT_EQ(net::OK, writer_cb.WaitForResult());
+// Async socket error while writing data.
+TEST_F(PacketWriterTest, TestWriteErrorAsync) {
+ net::TestCompletionCallback writer_cb;
+ net::CompletionCallback header_cb;
+ net::CompletionCallback payload_cb;
+ EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())),
+ kPacketHeaderSizeBytes, _))
+ .WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING)));
+ EXPECT_CALL(socket_,
+ Write(BufferEquals(test_data_str_), test_data_str_.size(), _))
+ .WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING)));
+ message_writer_.WritePacket(test_data_, writer_cb.callback()));
+ header_cb.Run(kPacketHeaderSizeBytes);
+ payload_cb.Run(net::ERR_CONNECTION_RESET);
+ EXPECT_EQ(net::ERR_CONNECTION_RESET, writer_cb.WaitForResult());
+// Successful write with 1 sync header write and 1 sync payload write.
+TEST_F(PacketWriterTest, TestWriteSync) {
+ net::TestCompletionCallback writer_cb;
+ EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())),
+ kPacketHeaderSizeBytes, _))
+ .WillOnce(Return(kPacketHeaderSizeBytes));
+ EXPECT_CALL(socket_,
+ Write(BufferEquals(test_data_str_), test_data_str_.size(), _))
+ .WillOnce(Return(test_data_str_.size()));
+ EXPECT_EQ(net::OK,
+ message_writer_.WritePacket(test_data_, writer_cb.callback()));
+ EXPECT_FALSE(writer_cb.have_result());
+// Successful write with 2 sync header writes and 2 sync payload writes.
+TEST_F(PacketWriterTest, TestPartialWriteSync) {
+ net::TestCompletionCallback writer_cb;
+ std::string header = EncodeHeader(test_data_str_.size());
+ std::string payload = test_data_str_;
+ EXPECT_CALL(socket_, Write(BufferEquals(header), header.size(), _))
+ .WillOnce(Return(1));
+ EXPECT_CALL(socket_, Write(BufferEquals(header.substr(1, header.size() - 1)),
+ header.size() - 1, _))
+ .WillOnce(Return(header.size() - 1));
+ EXPECT_CALL(socket_, Write(BufferEquals(payload), payload.size(), _))
+ .WillOnce(Return(1));
+ EXPECT_CALL(socket_, Write(BufferEquals(payload.substr(1, payload.size())),
+ payload.size() - 1, _))
+ .WillOnce(Return(payload.size() - 1));
+ EXPECT_EQ(net::OK,
+ message_writer_.WritePacket(test_data_, writer_cb.callback()));
+ EXPECT_FALSE(writer_cb.have_result());
+// Verify that zero-length packets are rejected.
+TEST_F(PacketWriterTest, TestZeroLengthPacketsRejected) {
+ net::TestCompletionCallback writer_cb;
+ message_writer_.WritePacket(
+ new net::DrainableIOBuffer(new net::IOBuffer(0), 0),
+ writer_cb.callback()));
+ EXPECT_FALSE(writer_cb.have_result());
+// Sync socket error while writing header data.
+TEST_F(PacketWriterTest, TestWriteHeaderErrorSync) {
+ net::TestCompletionCallback writer_cb;
+ EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())),
+ kPacketHeaderSizeBytes, _))
+ .WillOnce(Return(net::ERR_FAILED));
+ message_writer_.WritePacket(test_data_, writer_cb.callback()));
+ writer_cb.GetResult(net::ERR_EMPTY_RESPONSE));
+ EXPECT_FALSE(writer_cb.have_result());
+// Sync socket error while writing payload data.
+TEST_F(PacketWriterTest, TestWritePayloadErrorSync) {
+ net::TestCompletionCallback writer_cb;
+ EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())),
+ kPacketHeaderSizeBytes, _))
+ .WillOnce(Return(kPacketHeaderSizeBytes));
+ EXPECT_CALL(socket_,
+ Write(BufferEquals(test_data_str_), test_data_str_.size(), _))
+ .WillOnce(Return(net::ERR_FAILED));
+ message_writer_.WritePacket(test_data_, writer_cb.callback()));
+ EXPECT_FALSE(writer_cb.have_result());
+// Verify that asynchronous header write completions don't cause a
+// use-after-free error if the writer object is deleted.
+TEST_F(PacketWriterTest, DeletedDuringHeaderWrite) {
+ net::TestCompletionCallback writer_cb;
+ net::CompletionCallback header_cb;
+ net::CompletionCallback payload_cb;
+ scoped_ptr<PacketWriter> writer(new PacketWriter(&socket_));
+ // Write header.
+ EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())),
+ kPacketHeaderSizeBytes, _))
+ .WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING)));
+ writer->WritePacket(test_data_, writer_cb.callback()));
+ Mock::VerifyAndClearExpectations(&socket_);
+ // Header write completion callback is invoked after the writer died.
+ writer.reset();
+ header_cb.Run(kPacketHeaderSizeBytes);
+// Verify that asynchronous payload write completions don't cause a
+// use-after-free error if the writer object is deleted.
+TEST_F(PacketWriterTest, DeletedDuringPayloadWrite) {
+ net::TestCompletionCallback writer_cb;
+ net::CompletionCallback header_cb;
+ net::CompletionCallback payload_cb;
+ scoped_ptr<PacketWriter> writer(new PacketWriter(&socket_));
+ EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())),
+ kPacketHeaderSizeBytes, _))
+ .WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING)));
+ EXPECT_CALL(socket_,
+ Write(BufferEquals(test_data_str_), test_data_str_.size(), _))
+ .WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING)));
+ writer->WritePacket(test_data_, writer_cb.callback()));
+ // Header write completes successfully.
+ header_cb.Run(kPacketHeaderSizeBytes);
+ // Payload write completion callback is invoked after the writer died.
+ writer.reset();
+ payload_cb.Run(test_data_str_.size());
+} // namespace
+} // namespace blimp