diff options
author | kmarshall <kmarshall@chromium.org> | 2015-10-27 16:30:01 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-10-27 23:31:16 +0000 |
commit | a0921ade6d8b06be814772050307e0d02cef3e85 (patch) | |
tree | 4cf90001cbe4eb229f6bfc9541302800fbd954ef /blimp/net | |
parent | b02d1809b3742b407ffc39a8203df2c9e8e0a7d1 (diff) | |
download | chromium_src-a0921ade6d8b06be814772050307e0d02cef3e85.zip chromium_src-a0921ade6d8b06be814772050307e0d02cef3e85.tar.gz chromium_src-a0921ade6d8b06be814772050307e0d02cef3e85.tar.bz2 |
Created BlimpMessageWriter, which frames and sends data over a Socket.
The conversion from BlimpMessages to bytes will be handled
in another class as part of a followup CL.
Added unit tests for BlimpMessageWriter.
R=wez@chromium.org
CC=dtrainor@chromium.org,haibinlu@chromium.org,nyquist@chromium.org
BUG=
Review URL: https://codereview.chromium.org/1392753002
Cr-Commit-Position: refs/heads/master@{#356431}
Diffstat (limited to 'blimp/net')
-rw-r--r-- | blimp/net/BUILD.gn | 4 | ||||
-rw-r--r-- | blimp/net/common.cc | 2 | ||||
-rw-r--r-- | blimp/net/common.h | 1 | ||||
-rw-r--r-- | blimp/net/packet_writer.cc | 152 | ||||
-rw-r--r-- | blimp/net/packet_writer.h | 81 | ||||
-rw-r--r-- | blimp/net/packet_writer_unittest.cc | 267 |
6 files changed, 504 insertions, 3 deletions
diff --git a/blimp/net/BUILD.gn b/blimp/net/BUILD.gn index d3a6619..b4d2375 100644 --- a/blimp/net/BUILD.gn +++ b/blimp/net/BUILD.gn @@ -14,6 +14,8 @@ component("blimp_net") { "common.h", "packet_reader.cc", "packet_reader.h", + "packet_writer.cc", + "packet_writer.h", ] defines = [ "BLIMP_NET_IMPLEMENTATION=1" ] @@ -31,7 +33,7 @@ source_set("unit_tests") { sources = [ "blimp_message_dispatcher_unittest.cc", "packet_reader_unittest.cc", - "packet_reader_unittest.h", + "packet_writer_unittest.cc", "test_common.cc", "test_common.h", ] diff --git a/blimp/net/common.cc b/blimp/net/common.cc index e5b0dc6..1663b30 100644 --- a/blimp/net/common.cc +++ b/blimp/net/common.cc @@ -6,7 +6,7 @@ namespace blimp { -const size_t kMaxPacketPayloadSizeBytes = 1 << 16; // 64kb +const size_t kMaxPacketPayloadSizeBytes = 1 << 16; // 64KB const size_t kPacketHeaderSizeBytes = 4; } // namespace blimp diff --git a/blimp/net/common.h b/blimp/net/common.h index 96e911a..eff9921 100644 --- a/blimp/net/common.h +++ b/blimp/net/common.h @@ -8,7 +8,6 @@ #include <string> #include "blimp/net/blimp_net_export.h" -#include "net/base/net_errors.h" namespace blimp { diff --git a/blimp/net/packet_writer.cc b/blimp/net/packet_writer.cc new file mode 100644 index 0000000..81a6d90 --- /dev/null +++ b/blimp/net/packet_writer.cc @@ -0,0 +1,152 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "blimp/net/packet_writer.h" + +#include <iostream> + +#include "base/callback_helpers.h" +#include "base/logging.h" +#include "base/memory/ref_counted.h" +#include "base/message_loop/message_loop.h" +#include "base/sys_byteorder.h" +#include "blimp/common/proto/blimp_message.pb.h" +#include "blimp/net/common.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/socket/stream_socket.h" + +namespace blimp { + +std::ostream& operator<<(std::ostream& out, + const PacketWriter::WriteState state) { + switch (state) { + case PacketWriter::WriteState::IDLE: + out << "IDLE"; + break; + case PacketWriter::WriteState::HEADER: + out << "HEADER"; + break; + case PacketWriter::WriteState::PAYLOAD: + out << "PAYLOAD"; + break; + } + return out; +} + +PacketWriter::PacketWriter(net::StreamSocket* socket) + : write_state_(WriteState::IDLE), + socket_(socket), + header_buffer_( + new net::DrainableIOBuffer(new net::IOBuffer(kPacketHeaderSizeBytes), + kPacketHeaderSizeBytes)), + weak_factory_(this) { + DCHECK(socket_); +} + +PacketWriter::~PacketWriter() {} + +int PacketWriter::WritePacket(scoped_refptr<net::DrainableIOBuffer> data, + const net::CompletionCallback& callback) { + DCHECK_EQ(WriteState::IDLE, write_state_); + DCHECK(data); + if (data->BytesRemaining() == 0) { + // The packet is empty; your argument is invalid. + DLOG(ERROR) << "Attempted to write zero-length packet."; + return net::ERR_INVALID_ARGUMENT; + } + + write_state_ = WriteState::HEADER; + header_buffer_->SetOffset(0); + *reinterpret_cast<uint32*>(header_buffer_->data()) = + base::HostToNet32(data->BytesRemaining()); + payload_buffer_ = data; + + int result = DoWriteLoop(false); + if (result == net::ERR_IO_PENDING) { + // Store the completion callback to invoke when DoWriteLoop completes + // asynchronously. + callback_ = callback; + } else { + // Release the payload buffer, since the write operation has completed + // synchronously. + payload_buffer_ = nullptr; + } + + return result; +} + +int PacketWriter::DoWriteLoop(int result) { + DCHECK_NE(net::ERR_IO_PENDING, result); + DCHECK_GE(result, 0); + DCHECK_NE(WriteState::IDLE, write_state_); + + while (result >= 0 && write_state_ != WriteState::IDLE) { + VLOG(2) << "DoWriteLoop (state=" << write_state_ << ", result=" << result + << ")"; + + switch (write_state_) { + case WriteState::HEADER: + result = DoWriteHeader(result); + break; + case WriteState::PAYLOAD: + result = DoWritePayload(result); + break; + case WriteState::IDLE: + NOTREACHED(); + result = net::ERR_UNEXPECTED; + break; + } + } + + return result; +} + +int PacketWriter::DoWriteHeader(int result) { + DCHECK_EQ(WriteState::HEADER, write_state_); + DCHECK_GE(result, 0); + + header_buffer_->DidConsume(result); + if (header_buffer_->BytesRemaining() > 0) { + return socket_->Write( + header_buffer_.get(), header_buffer_->BytesRemaining(), + base::Bind(&PacketWriter::OnWriteComplete, weak_factory_.GetWeakPtr())); + } + + write_state_ = WriteState::PAYLOAD; + return net::OK; +} + +int PacketWriter::DoWritePayload(int result) { + DCHECK_EQ(WriteState::PAYLOAD, write_state_); + DCHECK_GE(result, 0); + + payload_buffer_->DidConsume(result); + if (payload_buffer_->BytesRemaining() > 0) { + return socket_->Write( + payload_buffer_.get(), payload_buffer_->BytesRemaining(), + base::Bind(&PacketWriter::OnWriteComplete, weak_factory_.GetWeakPtr())); + } + + write_state_ = WriteState::IDLE; + return net::OK; +} + +void PacketWriter::OnWriteComplete(int result) { + DCHECK_NE(net::ERR_IO_PENDING, result); + + // If the write was succesful, then process the result. + if (result > 0) { + result = DoWriteLoop(result); + } + + // If the write finished, either successfully or by error, inform the + // caller. + if (result != net::ERR_IO_PENDING) { + payload_buffer_ = nullptr; + base::ResetAndReturn(&callback_).Run(result); + } +} + +} // namespace blimp diff --git a/blimp/net/packet_writer.h b/blimp/net/packet_writer.h new file mode 100644 index 0000000..79fc766497 --- /dev/null +++ b/blimp/net/packet_writer.h @@ -0,0 +1,81 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BLIMP_NET_PACKET_WRITER_H_ +#define BLIMP_NET_PACKET_WRITER_H_ + +#include <string> + +#include "base/macros.h" +#include "base/memory/ref_counted.h" +#include "base/threading/thread_checker.h" +#include "blimp/net/blimp_net_export.h" +#include "net/base/completion_callback.h" +#include "net/base/net_errors.h" + +namespace net { +class DrainableIOBuffer; +class StreamSocket; +} // namespace net + +namespace blimp { + +// Writes opaque length-prefixed packets to a StreamSocket. +// The header segment is 32-bit, encoded in network byte order. +// The body segment length is specified in the header (should be capped at +// kMaxPacketPayloadSizeBytes). +class BLIMP_NET_EXPORT PacketWriter { + public: + // |socket|: The socket to write packets to. The caller must ensure |socket| + // is valid while the reader is in-use (see ReadPacket below). + explicit PacketWriter(net::StreamSocket* socket); + + ~PacketWriter(); + + // Writes a packet of at least one byte in size to |socket_|. + // + // Returns net::OK or an error code if the operation executed successfully. + // Returns ERR_IO_PENDING if the operation will be executed asynchronously. + // |cb| is later invoked with net::OK or an error code. + int WritePacket(scoped_refptr<net::DrainableIOBuffer> data, + const net::CompletionCallback& callback); + + private: + enum class WriteState { + IDLE, + HEADER, + PAYLOAD, + }; + + friend std::ostream& operator<<(std::ostream& out, const WriteState state); + + // State machine implementation. + // |result| - the result value of the most recent network operation. + // See comments for WritePacket() for documentation on return values. + int DoWriteLoop(int result); + + int DoWriteHeader(int result); + + int DoWritePayload(int result); + + // Callback function to be invoked on asynchronous write completion. + // Invokes |callback_| on packet write completion or on error. + void OnWriteComplete(int result); + + WriteState write_state_; + + net::StreamSocket* socket_; + + scoped_refptr<net::DrainableIOBuffer> payload_buffer_; + scoped_refptr<net::DrainableIOBuffer> header_buffer_; + net::CompletionCallback callback_; + + base::WeakPtrFactory<PacketWriter> weak_factory_; + + DISALLOW_COPY_AND_ASSIGN(PacketWriter); +}; + +} // namespace blimp + +#endif // BLIMP_NET_PACKET_WRITER_H_ diff --git a/blimp/net/packet_writer_unittest.cc b/blimp/net/packet_writer_unittest.cc new file mode 100644 index 0000000..2554b47 --- /dev/null +++ b/blimp/net/packet_writer_unittest.cc @@ -0,0 +1,267 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <string> + +#include "base/message_loop/message_loop.h" +#include "base/run_loop.h" +#include "blimp/net/common.h" +#include "blimp/net/packet_writer.h" +#include "blimp/net/test_common.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/base/test_completion_callback.h" +#include "net/socket/socket.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using testing::_; +using testing::DoAll; +using testing::InSequence; +using testing::Mock; +using testing::NotNull; +using testing::Return; +using testing::SaveArg; + +namespace blimp { +namespace { + +class PacketWriterTest : public testing::Test { + public: + PacketWriterTest() + : test_data_( + new net::DrainableIOBuffer(new net::StringIOBuffer(test_data_str_), + test_data_str_.size())), + message_writer_(&socket_) {} + + ~PacketWriterTest() override {} + + protected: + const std::string test_data_str_ = "U WOT M8"; + scoped_refptr<net::DrainableIOBuffer> test_data_; + + MockStreamSocket socket_; + PacketWriter message_writer_; + base::MessageLoop message_loop_; + testing::InSequence mock_sequence_; + + private: + DISALLOW_COPY_AND_ASSIGN(PacketWriterTest); +}; + +// Successful write with 1 async header write and 1 async payload write. +TEST_F(PacketWriterTest, TestWriteAsync) { + net::TestCompletionCallback writer_cb; + net::CompletionCallback header_cb; + net::CompletionCallback payload_cb; + + // Write header. + EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())), + kPacketHeaderSizeBytes, _)) + .WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING))); + EXPECT_EQ(net::ERR_IO_PENDING, + message_writer_.WritePacket(test_data_, writer_cb.callback())); + Mock::VerifyAndClearExpectations(&socket_); + + // Write payload. + EXPECT_CALL(socket_, + Write(BufferEquals(test_data_str_), test_data_str_.size(), _)) + .WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING))); + header_cb.Run(kPacketHeaderSizeBytes); + Mock::VerifyAndClearExpectations(&socket_); + + payload_cb.Run(test_data_str_.size()); + EXPECT_EQ(net::OK, writer_cb.WaitForResult()); +} + +// Successful write with 2 async header writes and 2 async payload writes. +TEST_F(PacketWriterTest, TestPartialWriteAsync) { + net::TestCompletionCallback writer_cb; + net::CompletionCallback header_cb; + net::CompletionCallback payload_cb; + + std::string header = EncodeHeader(test_data_str_.size()); + std::string payload = test_data_str_; + + EXPECT_CALL(socket_, Write(BufferEquals(header), kPacketHeaderSizeBytes, _)) + .WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING))) + .RetiresOnSaturation(); + EXPECT_CALL(socket_, Write(BufferEquals(header.substr(1, header.size())), + header.size() - 1, _)) + .WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING))) + .RetiresOnSaturation(); + EXPECT_CALL(socket_, Write(BufferEquals(payload), payload.size(), _)) + .WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING))) + .RetiresOnSaturation(); + EXPECT_CALL(socket_, + Write(BufferEquals(payload.substr(1, payload.size() - 1)), + payload.size() - 1, _)) + .WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING))) + .RetiresOnSaturation(); + + EXPECT_EQ(net::ERR_IO_PENDING, + message_writer_.WritePacket(test_data_, writer_cb.callback())); + + // Header is written - first one byte, then the remainder. + header_cb.Run(1); + header_cb.Run(header.size() - 1); + + // Payload is written - first one byte, then the remainder. + payload_cb.Run(1); + payload_cb.Run(payload.size() - 1); + + EXPECT_EQ(net::OK, writer_cb.WaitForResult()); +} + +// Async socket error while writing data. +TEST_F(PacketWriterTest, TestWriteErrorAsync) { + net::TestCompletionCallback writer_cb; + net::CompletionCallback header_cb; + net::CompletionCallback payload_cb; + + EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())), + kPacketHeaderSizeBytes, _)) + .WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING))); + EXPECT_CALL(socket_, + Write(BufferEquals(test_data_str_), test_data_str_.size(), _)) + .WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING))); + + EXPECT_EQ(net::ERR_IO_PENDING, + message_writer_.WritePacket(test_data_, writer_cb.callback())); + header_cb.Run(kPacketHeaderSizeBytes); + payload_cb.Run(net::ERR_CONNECTION_RESET); + + EXPECT_EQ(net::ERR_CONNECTION_RESET, writer_cb.WaitForResult()); +} + +// Successful write with 1 sync header write and 1 sync payload write. +TEST_F(PacketWriterTest, TestWriteSync) { + net::TestCompletionCallback writer_cb; + EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())), + kPacketHeaderSizeBytes, _)) + .WillOnce(Return(kPacketHeaderSizeBytes)); + EXPECT_CALL(socket_, + Write(BufferEquals(test_data_str_), test_data_str_.size(), _)) + .WillOnce(Return(test_data_str_.size())); + EXPECT_EQ(net::OK, + message_writer_.WritePacket(test_data_, writer_cb.callback())); + EXPECT_FALSE(writer_cb.have_result()); +} + +// Successful write with 2 sync header writes and 2 sync payload writes. +TEST_F(PacketWriterTest, TestPartialWriteSync) { + net::TestCompletionCallback writer_cb; + + std::string header = EncodeHeader(test_data_str_.size()); + std::string payload = test_data_str_; + + EXPECT_CALL(socket_, Write(BufferEquals(header), header.size(), _)) + .WillOnce(Return(1)); + EXPECT_CALL(socket_, Write(BufferEquals(header.substr(1, header.size() - 1)), + header.size() - 1, _)) + .WillOnce(Return(header.size() - 1)); + EXPECT_CALL(socket_, Write(BufferEquals(payload), payload.size(), _)) + .WillOnce(Return(1)); + EXPECT_CALL(socket_, Write(BufferEquals(payload.substr(1, payload.size())), + payload.size() - 1, _)) + .WillOnce(Return(payload.size() - 1)); + + EXPECT_EQ(net::OK, + message_writer_.WritePacket(test_data_, writer_cb.callback())); + EXPECT_FALSE(writer_cb.have_result()); +} + +// Verify that zero-length packets are rejected. +TEST_F(PacketWriterTest, TestZeroLengthPacketsRejected) { + net::TestCompletionCallback writer_cb; + + EXPECT_EQ(net::ERR_INVALID_ARGUMENT, + message_writer_.WritePacket( + new net::DrainableIOBuffer(new net::IOBuffer(0), 0), + writer_cb.callback())); + + EXPECT_FALSE(writer_cb.have_result()); +} + +// Sync socket error while writing header data. +TEST_F(PacketWriterTest, TestWriteHeaderErrorSync) { + net::TestCompletionCallback writer_cb; + + EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())), + kPacketHeaderSizeBytes, _)) + .WillOnce(Return(net::ERR_FAILED)); + + EXPECT_EQ(net::ERR_FAILED, + message_writer_.WritePacket(test_data_, writer_cb.callback())); + + EXPECT_EQ(net::ERR_EMPTY_RESPONSE, + writer_cb.GetResult(net::ERR_EMPTY_RESPONSE)); + EXPECT_FALSE(writer_cb.have_result()); +} + +// Sync socket error while writing payload data. +TEST_F(PacketWriterTest, TestWritePayloadErrorSync) { + net::TestCompletionCallback writer_cb; + + EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())), + kPacketHeaderSizeBytes, _)) + .WillOnce(Return(kPacketHeaderSizeBytes)); + EXPECT_CALL(socket_, + Write(BufferEquals(test_data_str_), test_data_str_.size(), _)) + .WillOnce(Return(net::ERR_FAILED)); + + EXPECT_EQ(net::ERR_FAILED, + message_writer_.WritePacket(test_data_, writer_cb.callback())); + EXPECT_FALSE(writer_cb.have_result()); +} + +// Verify that asynchronous header write completions don't cause a +// use-after-free error if the writer object is deleted. +TEST_F(PacketWriterTest, DeletedDuringHeaderWrite) { + net::TestCompletionCallback writer_cb; + net::CompletionCallback header_cb; + net::CompletionCallback payload_cb; + scoped_ptr<PacketWriter> writer(new PacketWriter(&socket_)); + + // Write header. + EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())), + kPacketHeaderSizeBytes, _)) + .WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING))); + EXPECT_EQ(net::ERR_IO_PENDING, + writer->WritePacket(test_data_, writer_cb.callback())); + Mock::VerifyAndClearExpectations(&socket_); + + // Header write completion callback is invoked after the writer died. + writer.reset(); + header_cb.Run(kPacketHeaderSizeBytes); +} + +// Verify that asynchronous payload write completions don't cause a +// use-after-free error if the writer object is deleted. +TEST_F(PacketWriterTest, DeletedDuringPayloadWrite) { + net::TestCompletionCallback writer_cb; + net::CompletionCallback header_cb; + net::CompletionCallback payload_cb; + scoped_ptr<PacketWriter> writer(new PacketWriter(&socket_)); + + EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())), + kPacketHeaderSizeBytes, _)) + .WillOnce(DoAll(SaveArg<2>(&header_cb), Return(net::ERR_IO_PENDING))); + EXPECT_CALL(socket_, + Write(BufferEquals(test_data_str_), test_data_str_.size(), _)) + .WillOnce(DoAll(SaveArg<2>(&payload_cb), Return(net::ERR_IO_PENDING))); + + EXPECT_EQ(net::ERR_IO_PENDING, + writer->WritePacket(test_data_, writer_cb.callback())); + + // Header write completes successfully. + header_cb.Run(kPacketHeaderSizeBytes); + + // Payload write completion callback is invoked after the writer died. + writer.reset(); + payload_cb.Run(test_data_str_.size()); +} + +} // namespace +} // namespace blimp |