diff options
author | kmarshall <kmarshall@chromium.org> | 2015-11-05 13:55:53 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-11-05 21:56:35 +0000 |
commit | 90e22697d8d163452c067e88c2bae8b476380077 (patch) | |
tree | a4660eecb68cebf2cc0d3c4d154efa62b1a96451 /blimp/net | |
parent | 738501d6098043ccd3cd84f6ba6a47cbdd3034d0 (diff) | |
download | chromium_src-90e22697d8d163452c067e88c2bae8b476380077.zip chromium_src-90e22697d8d163452c067e88c2bae8b476380077.tar.gz chromium_src-90e22697d8d163452c067e88c2bae8b476380077.tar.bz2 |
Change BlimpTransport to vend BlimpConnections.
This removes the dependency StreamSocket from BlimpTransport.
Make PacketReader and PacketWriter stream/datagram agnostic,
moving framing logic to the new classes StreamPacketReader and
StreamPacketWriter.
Add channel reader/writer objects and fields to BlimpConnection,
and create a specialization of BlimpConnection for receiving
ownership of StreamSocket.
R=wez@chromium.org
BUG=
Review URL: https://codereview.chromium.org/1430963002
Cr-Commit-Position: refs/heads/master@{#358142}
Diffstat (limited to 'blimp/net')
-rw-r--r-- | blimp/net/BUILD.gn | 12 | ||||
-rw-r--r-- | blimp/net/blimp_connection.cc | 11 | ||||
-rw-r--r-- | blimp/net/blimp_connection.h | 14 | ||||
-rw-r--r-- | blimp/net/blimp_transport.h | 9 | ||||
-rw-r--r-- | blimp/net/packet_reader.h | 69 | ||||
-rw-r--r-- | blimp/net/packet_writer.h | 60 | ||||
-rw-r--r-- | blimp/net/stream_packet_reader.cc (renamed from blimp/net/packet_reader.cc) | 41 | ||||
-rw-r--r-- | blimp/net/stream_packet_reader.h | 81 | ||||
-rw-r--r-- | blimp/net/stream_packet_reader_unittest.cc (renamed from blimp/net/packet_reader_unittest.cc) | 54 | ||||
-rw-r--r-- | blimp/net/stream_packet_writer.cc (renamed from blimp/net/packet_writer.cc) | 40 | ||||
-rw-r--r-- | blimp/net/stream_packet_writer.h | 78 | ||||
-rw-r--r-- | blimp/net/stream_packet_writer_unittest.cc (renamed from blimp/net/packet_writer_unittest.cc) | 36 | ||||
-rw-r--r-- | blimp/net/stream_socket_connection.cc | 22 | ||||
-rw-r--r-- | blimp/net/stream_socket_connection.h | 31 | ||||
-rw-r--r-- | blimp/net/tcp_client_transport.cc | 21 | ||||
-rw-r--r-- | blimp/net/tcp_client_transport.h | 7 |
16 files changed, 364 insertions, 222 deletions
diff --git a/blimp/net/BUILD.gn b/blimp/net/BUILD.gn index 1b4da84..5adb729 100644 --- a/blimp/net/BUILD.gn +++ b/blimp/net/BUILD.gn @@ -13,10 +13,14 @@ component("blimp_net") { "blimp_transport.h", "common.cc", "common.h", - "packet_reader.cc", "packet_reader.h", - "packet_writer.cc", "packet_writer.h", + "stream_packet_reader.cc", + "stream_packet_reader.h", + "stream_packet_writer.cc", + "stream_packet_writer.h", + "stream_socket_connection.cc", + "stream_socket_connection.h", "tcp_client_transport.cc", "tcp_client_transport.h", ] @@ -35,8 +39,8 @@ source_set("unit_tests") { sources = [ "blimp_message_dispatcher_unittest.cc", - "packet_reader_unittest.cc", - "packet_writer_unittest.cc", + "stream_packet_reader_unittest.cc", + "stream_packet_writer_unittest.cc", "test_common.cc", "test_common.h", ] diff --git a/blimp/net/blimp_connection.cc b/blimp/net/blimp_connection.cc index cdc5bc8..5b9c917 100644 --- a/blimp/net/blimp_connection.cc +++ b/blimp/net/blimp_connection.cc @@ -4,9 +4,18 @@ #include "blimp/net/blimp_connection.h" +#include "base/macros.h" +#include "blimp/net/packet_reader.h" +#include "blimp/net/packet_writer.h" + namespace blimp { -BlimpConnection::BlimpConnection() {} +BlimpConnection::BlimpConnection(scoped_ptr<PacketReader> reader, + scoped_ptr<PacketWriter> writer) + : reader_(reader.Pass()), writer_(writer.Pass()) { + DCHECK(reader_); + DCHECK(writer_); +} BlimpConnection::~BlimpConnection() {} diff --git a/blimp/net/blimp_connection.h b/blimp/net/blimp_connection.h index 35336c4..5bd115d 100644 --- a/blimp/net/blimp_connection.h +++ b/blimp/net/blimp_connection.h @@ -6,17 +6,25 @@ #define BLIMP_NET_BLIMP_CONNECTION_H_ #include "base/macros.h" +#include "base/memory/scoped_ptr.h" #include "blimp/net/blimp_net_export.h" namespace blimp { +class PacketReader; +class PacketWriter; + class BLIMP_NET_EXPORT BlimpConnection { public: - BlimpConnection(); - ~BlimpConnection(); + virtual ~BlimpConnection(); + + protected: + BlimpConnection(scoped_ptr<PacketReader> reader, + scoped_ptr<PacketWriter> writer); private: - DISALLOW_COPY_AND_ASSIGN(BlimpConnection); + scoped_ptr<PacketReader> reader_; + scoped_ptr<PacketWriter> writer_; }; } // namespace blimp diff --git a/blimp/net/blimp_transport.h b/blimp/net/blimp_transport.h index 5ee9fff..56d5e34 100644 --- a/blimp/net/blimp_transport.h +++ b/blimp/net/blimp_transport.h @@ -7,10 +7,11 @@ #include "base/memory/scoped_ptr.h" #include "net/base/completion_callback.h" -#include "net/socket/stream_socket.h" namespace blimp { +class BlimpConnection; + // An interface which encapsulates the transport-specific code for // establishing network connections between the client and engine. // Subclasses of BlimpTransport are responsible for defining their own @@ -26,12 +27,12 @@ class BlimpTransport { // Returns net::ERR_IO_PENDING if the connection is being established // asynchronously. |callback| is later invoked with the connection outcome. // - // If the connection is successful, the connected socket can be taken by + // If the connection is successful, the BlimpConnection can be taken by // calling TakeConnectedSocket(). virtual int Connect(const net::CompletionCallback& callback) = 0; - // Returns the connected socket following a successful Connect(). - virtual scoped_ptr<net::StreamSocket> TakeConnectedSocket() = 0; + // Returns the connection object after a successful Connect(). + virtual scoped_ptr<BlimpConnection> TakeConnection() = 0; }; } // namespace blimp diff --git a/blimp/net/packet_reader.h b/blimp/net/packet_reader.h index a9f948d..3c6abf8 100644 --- a/blimp/net/packet_reader.h +++ b/blimp/net/packet_reader.h @@ -5,81 +5,26 @@ #ifndef BLIMP_NET_PACKET_READER_H_ #define BLIMP_NET_PACKET_READER_H_ -#include "base/macros.h" -#include "base/memory/ref_counted.h" -#include "base/memory/weak_ptr.h" #include "blimp/net/blimp_net_export.h" #include "net/base/completion_callback.h" -#include "net/base/net_errors.h" - -namespace net { -class GrowableIOBuffer; -class StreamSocket; -} // namespace net +#include "net/base/io_buffer.h" namespace blimp { -// Reads opaque length-prefixed packets of bytes from a StreamSocket. -// The header segment is 32-bit, encoded in network byte order. -// The body segment length is specified in the header (capped at -// kMaxPacketPayloadSizeBytes). +// Interface to describe a reader which can read variable-length data packets +// from a connection. class BLIMP_NET_EXPORT PacketReader { public: - // |socket|: The socket to read packets from. The caller must ensure |socket| - // is valid while the reader is in-use (see ReadPacket below). - explicit PacketReader(net::StreamSocket* socket); - - ~PacketReader(); + virtual ~PacketReader() {} - // Reads a packet from the socket. + // Reads a packet from the connection. // Returns the size of the packet, in bytes, if the read operation executed // successfully. // Returns ERR_IO_PENDING if the operation will be executed asynchronously. // |cb| is later invoked with the packet size or an error code. // All other return values indicate errors. - // |socket_| must be alive when this is called, but may be deleted while there - // is an outstanding read. - int ReadPacket(const scoped_refptr<net::GrowableIOBuffer>& buf, - const net::CompletionCallback& cb); - - private: - enum class ReadState { - IDLE, - HEADER, - PAYLOAD, - }; - - friend std::ostream& operator<<(std::ostream& out, const ReadState state); - - // State machine implementation. - // |result| - the result value of the most recent network operation. - // See comments for ReadPacket() for documentation on return values. - int DoReadLoop(int result); - - // Reads the header and parses it when, done, to get the payload size. - int DoReadHeader(int result); - - // Reads payload bytes until the payload is complete. - int DoReadPayload(int result); - - // Processes an asynchronous header or payload read, and invokes |callback_| - // on packet read completion. - void OnReadComplete(int result); - - ReadState read_state_; - - net::StreamSocket* socket_; - - // The size of the payload, in bytes. - size_t payload_size_; - - scoped_refptr<net::GrowableIOBuffer> header_buffer_; - scoped_refptr<net::GrowableIOBuffer> payload_buffer_; - net::CompletionCallback callback_; - - base::WeakPtrFactory<PacketReader> weak_factory_; - - DISALLOW_COPY_AND_ASSIGN(PacketReader); + virtual int ReadPacket(const scoped_refptr<net::GrowableIOBuffer>& buf, + const net::CompletionCallback& cb) = 0; }; } // namespace blimp diff --git a/blimp/net/packet_writer.h b/blimp/net/packet_writer.h index 79fc766497..1ac2126 100644 --- a/blimp/net/packet_writer.h +++ b/blimp/net/packet_writer.h @@ -8,72 +8,26 @@ #include <string> #include "base/macros.h" -#include "base/memory/ref_counted.h" -#include "base/threading/thread_checker.h" #include "blimp/net/blimp_net_export.h" #include "net/base/completion_callback.h" +#include "net/base/io_buffer.h" #include "net/base/net_errors.h" -namespace net { -class DrainableIOBuffer; -class StreamSocket; -} // namespace net - namespace blimp { -// Writes opaque length-prefixed packets to a StreamSocket. -// The header segment is 32-bit, encoded in network byte order. -// The body segment length is specified in the header (should be capped at -// kMaxPacketPayloadSizeBytes). +// Interface to describe a writer which can write variable-length data packets +// to a connection. class BLIMP_NET_EXPORT PacketWriter { public: - // |socket|: The socket to write packets to. The caller must ensure |socket| - // is valid while the reader is in-use (see ReadPacket below). - explicit PacketWriter(net::StreamSocket* socket); - - ~PacketWriter(); + virtual ~PacketWriter() {} - // Writes a packet of at least one byte in size to |socket_|. + // Writes a packet of at least one byte in size to a connection. // // Returns net::OK or an error code if the operation executed successfully. // Returns ERR_IO_PENDING if the operation will be executed asynchronously. // |cb| is later invoked with net::OK or an error code. - int WritePacket(scoped_refptr<net::DrainableIOBuffer> data, - const net::CompletionCallback& callback); - - private: - enum class WriteState { - IDLE, - HEADER, - PAYLOAD, - }; - - friend std::ostream& operator<<(std::ostream& out, const WriteState state); - - // State machine implementation. - // |result| - the result value of the most recent network operation. - // See comments for WritePacket() for documentation on return values. - int DoWriteLoop(int result); - - int DoWriteHeader(int result); - - int DoWritePayload(int result); - - // Callback function to be invoked on asynchronous write completion. - // Invokes |callback_| on packet write completion or on error. - void OnWriteComplete(int result); - - WriteState write_state_; - - net::StreamSocket* socket_; - - scoped_refptr<net::DrainableIOBuffer> payload_buffer_; - scoped_refptr<net::DrainableIOBuffer> header_buffer_; - net::CompletionCallback callback_; - - base::WeakPtrFactory<PacketWriter> weak_factory_; - - DISALLOW_COPY_AND_ASSIGN(PacketWriter); + virtual int WritePacket(scoped_refptr<net::DrainableIOBuffer> data, + const net::CompletionCallback& callback) = 0; }; } // namespace blimp diff --git a/blimp/net/packet_reader.cc b/blimp/net/stream_packet_reader.cc index 37cd30d..8da6130 100644 --- a/blimp/net/packet_reader.cc +++ b/blimp/net/stream_packet_reader.cc @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "blimp/net/packet_reader.h" +#include "blimp/net/stream_packet_reader.h" #include <iostream> @@ -18,32 +18,33 @@ namespace blimp { std::ostream& operator<<(std::ostream& out, - const PacketReader::ReadState state) { + const StreamPacketReader::ReadState state) { switch (state) { - case PacketReader::ReadState::HEADER: + case StreamPacketReader::ReadState::HEADER: out << "HEADER"; break; - case PacketReader::ReadState::PAYLOAD: + case StreamPacketReader::ReadState::PAYLOAD: out << "PAYLOAD"; break; - case PacketReader::ReadState::IDLE: + case StreamPacketReader::ReadState::IDLE: out << "IDLE"; break; } return out; } -PacketReader::PacketReader(net::StreamSocket* socket) +StreamPacketReader::StreamPacketReader(net::StreamSocket* socket) : read_state_(ReadState::IDLE), socket_(socket), weak_factory_(this) { DCHECK(socket_); header_buffer_ = new net::GrowableIOBuffer; header_buffer_->SetCapacity(kPacketHeaderSizeBytes); } -PacketReader::~PacketReader() {} +StreamPacketReader::~StreamPacketReader() {} -int PacketReader::ReadPacket(const scoped_refptr<net::GrowableIOBuffer>& buf, - const net::CompletionCallback& callback) { +int StreamPacketReader::ReadPacket( + const scoped_refptr<net::GrowableIOBuffer>& buf, + const net::CompletionCallback& callback) { DCHECK_EQ(ReadState::IDLE, read_state_); DCHECK_GT(buf->capacity(), 0); @@ -66,7 +67,7 @@ int PacketReader::ReadPacket(const scoped_refptr<net::GrowableIOBuffer>& buf, return result; } -int PacketReader::DoReadLoop(int result) { +int StreamPacketReader::DoReadLoop(int result) { DCHECK_NE(net::ERR_IO_PENDING, result); DCHECK_GE(result, 0); DCHECK_NE(ReadState::IDLE, read_state_); @@ -92,7 +93,7 @@ int PacketReader::DoReadLoop(int result) { return result; } -int PacketReader::DoReadHeader(int result) { +int StreamPacketReader::DoReadHeader(int result) { DCHECK_EQ(ReadState::HEADER, read_state_); DCHECK_GT(kPacketHeaderSizeBytes, static_cast<size_t>(header_buffer_->offset())); @@ -101,9 +102,10 @@ int PacketReader::DoReadHeader(int result) { header_buffer_->set_offset(header_buffer_->offset() + result); if (static_cast<size_t>(header_buffer_->offset()) < kPacketHeaderSizeBytes) { // There is more header to read. - return socket_->Read( - header_buffer_.get(), kPacketHeaderSizeBytes - header_buffer_->offset(), - base::Bind(&PacketReader::OnReadComplete, weak_factory_.GetWeakPtr())); + return socket_->Read(header_buffer_.get(), + kPacketHeaderSizeBytes - header_buffer_->offset(), + base::Bind(&StreamPacketReader::OnReadComplete, + weak_factory_.GetWeakPtr())); } // Finished reading the header. Parse the size and prepare for payload read. @@ -118,15 +120,16 @@ int PacketReader::DoReadHeader(int result) { return net::OK; } -int PacketReader::DoReadPayload(int result) { +int StreamPacketReader::DoReadPayload(int result) { DCHECK_EQ(ReadState::PAYLOAD, read_state_); DCHECK_GE(result, 0); payload_buffer_->set_offset(payload_buffer_->offset() + result); if (static_cast<size_t>(payload_buffer_->offset()) < payload_size_) { - return socket_->Read( - payload_buffer_.get(), payload_size_ - payload_buffer_->offset(), - base::Bind(&PacketReader::OnReadComplete, weak_factory_.GetWeakPtr())); + return socket_->Read(payload_buffer_.get(), + payload_size_ - payload_buffer_->offset(), + base::Bind(&StreamPacketReader::OnReadComplete, + weak_factory_.GetWeakPtr())); } // Finished reading the payload. @@ -134,7 +137,7 @@ int PacketReader::DoReadPayload(int result) { return payload_size_; } -void PacketReader::OnReadComplete(int result) { +void StreamPacketReader::OnReadComplete(int result) { DCHECK_NE(net::ERR_IO_PENDING, result); // If the read was succesful, then process the result. diff --git a/blimp/net/stream_packet_reader.h b/blimp/net/stream_packet_reader.h new file mode 100644 index 0000000..4468638 --- /dev/null +++ b/blimp/net/stream_packet_reader.h @@ -0,0 +1,81 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BLIMP_NET_STREAM_PACKET_READER_H_ +#define BLIMP_NET_STREAM_PACKET_READER_H_ + +#include "base/macros.h" +#include "base/memory/ref_counted.h" +#include "base/memory/weak_ptr.h" +#include "blimp/net/blimp_net_export.h" +#include "blimp/net/packet_reader.h" +#include "net/base/completion_callback.h" +#include "net/base/net_errors.h" + +namespace net { +class GrowableIOBuffer; +class StreamSocket; +} // namespace net + +namespace blimp { + +// Reads opaque length-prefixed packets of bytes from a StreamSocket. +// The header segment is 32-bit, encoded in network byte order. +// The body segment length is specified in the header (capped at +// kMaxPacketPayloadSizeBytes). +class BLIMP_NET_EXPORT StreamPacketReader : public PacketReader { + public: + // |socket|: The socket to read packets from. The caller must ensure |socket| + // is valid while the reader is in-use (see ReadPacket below). + explicit StreamPacketReader(net::StreamSocket* socket); + + ~StreamPacketReader() override; + + // PacketReader implementation. + int ReadPacket(const scoped_refptr<net::GrowableIOBuffer>& buf, + const net::CompletionCallback& cb) override; + + private: + enum class ReadState { + IDLE, + HEADER, + PAYLOAD, + }; + + friend std::ostream& operator<<(std::ostream& out, const ReadState state); + + // State machine implementation. + // |result| - the result value of the most recent network operation. + // See comments for ReadPacket() for documentation on return values. + int DoReadLoop(int result); + + // Reads the header and parses it when, done, to get the payload size. + int DoReadHeader(int result); + + // Reads payload bytes until the payload is complete. + int DoReadPayload(int result); + + // Processes an asynchronous header or payload read, and invokes |callback_| + // on packet read completion. + void OnReadComplete(int result); + + ReadState read_state_; + + net::StreamSocket* socket_; + + // The size of the payload, in bytes. + size_t payload_size_; + + scoped_refptr<net::GrowableIOBuffer> header_buffer_; + scoped_refptr<net::GrowableIOBuffer> payload_buffer_; + net::CompletionCallback callback_; + + base::WeakPtrFactory<StreamPacketReader> weak_factory_; + + DISALLOW_COPY_AND_ASSIGN(StreamPacketReader); +}; + +} // namespace blimp + +#endif // BLIMP_NET_STREAM_PACKET_READER_H_ diff --git a/blimp/net/packet_reader_unittest.cc b/blimp/net/stream_packet_reader_unittest.cc index 5981d58..61419cf 100644 --- a/blimp/net/packet_reader_unittest.cc +++ b/blimp/net/stream_packet_reader_unittest.cc @@ -7,7 +7,7 @@ #include "base/sys_byteorder.h" #include "blimp/net/common.h" -#include "blimp/net/packet_reader.h" +#include "blimp/net/stream_packet_reader.h" #include "blimp/net/test_common.h" #include "net/base/completion_callback.h" #include "net/base/io_buffer.h" @@ -30,16 +30,16 @@ namespace { const size_t kTestMaxBufferSize = 1 << 16; // 64KB -class PacketReaderTest : public testing::Test { +class StreamPacketReaderTest : public testing::Test { public: - PacketReaderTest() + StreamPacketReaderTest() : buffer_(new net::GrowableIOBuffer), test_msg_("U WOT M8"), data_reader_(&socket_) { buffer_->SetCapacity(kTestMaxBufferSize); } - ~PacketReaderTest() override {} + ~StreamPacketReaderTest() override {} int ReadPacket() { return data_reader_.ReadPacket(buffer_, callback_.callback()); @@ -51,11 +51,11 @@ class PacketReaderTest : public testing::Test { net::TestCompletionCallback callback_; testing::StrictMock<MockStreamSocket> socket_; testing::InSequence sequence_; - PacketReader data_reader_; + StreamPacketReader data_reader_; }; // Successful read with 1 async header read and 1 async payload read. -TEST_F(PacketReaderTest, ReadAsyncHeaderAsyncPayload) { +TEST_F(StreamPacketReaderTest, ReadAsyncHeaderAsyncPayload) { net::CompletionCallback socket_cb; EXPECT_CALL(socket_, Read(NotNull(), kPacketHeaderSizeBytes, _)) @@ -75,7 +75,7 @@ TEST_F(PacketReaderTest, ReadAsyncHeaderAsyncPayload) { } // Successful read with 1 async header read and 1 sync payload read. -TEST_F(PacketReaderTest, ReadAsyncHeaderSyncPayload) { +TEST_F(StreamPacketReaderTest, ReadAsyncHeaderSyncPayload) { net::CompletionCallback socket_cb; // Asynchronous payload read expectation. @@ -100,7 +100,7 @@ TEST_F(PacketReaderTest, ReadAsyncHeaderSyncPayload) { } // Successful read with 1 sync header read and 1 async payload read. -TEST_F(PacketReaderTest, ReadSyncHeaderAsyncPayload) { +TEST_F(StreamPacketReaderTest, ReadSyncHeaderAsyncPayload) { net::CompletionCallback socket_cb; EXPECT_CALL(socket_, Read(NotNull(), kPacketHeaderSizeBytes, _)) @@ -120,7 +120,7 @@ TEST_F(PacketReaderTest, ReadSyncHeaderAsyncPayload) { } // Successful read with 1 sync header read and 1 sync payload read. -TEST_F(PacketReaderTest, ReadSyncHeaderSyncPayload) { +TEST_F(StreamPacketReaderTest, ReadSyncHeaderSyncPayload) { net::CompletionCallback socket_cb; EXPECT_CALL(socket_, Read(NotNull(), kPacketHeaderSizeBytes, _)) @@ -140,7 +140,7 @@ TEST_F(PacketReaderTest, ReadSyncHeaderSyncPayload) { // Successful read of 2 messages, header and payload reads all completing // synchronously with no partial results. -TEST_F(PacketReaderTest, ReadMultipleMessagesSync) { +TEST_F(StreamPacketReaderTest, ReadMultipleMessagesSync) { net::CompletionCallback socket_cb; std::string test_msg2 = test_msg_ + "SlightlyLongerString"; @@ -181,7 +181,7 @@ TEST_F(PacketReaderTest, ReadMultipleMessagesSync) { // Successful read of 2 messages, header and payload reads all completing // asynchronously with no partial results. -TEST_F(PacketReaderTest, ReadMultipleMessagesAsync) { +TEST_F(StreamPacketReaderTest, ReadMultipleMessagesAsync) { net::TestCompletionCallback read_cb1; net::TestCompletionCallback read_cb2; net::CompletionCallback socket_cb; @@ -233,7 +233,7 @@ TEST_F(PacketReaderTest, ReadMultipleMessagesAsync) { // Verify that partial header reads are supported. // Read #0: 1 header byte is read. // Read #1: Remainder of header bytes read. -TEST_F(PacketReaderTest, PartialHeaderReadAsync) { +TEST_F(StreamPacketReaderTest, PartialHeaderReadAsync) { net::CompletionCallback cb; std::string header = EncodeHeader(test_msg_.size()); @@ -259,7 +259,7 @@ TEST_F(PacketReaderTest, PartialHeaderReadAsync) { // Read #0: Header is fully read synchronously. // Read #1: First payload byte is read. (Um, it's an acoustic cup modem.) // Read #2: Remainder of payload bytes are read. -TEST_F(PacketReaderTest, PartialPayloadReadAsync) { +TEST_F(StreamPacketReaderTest, PartialPayloadReadAsync) { net::CompletionCallback cb; EXPECT_CALL(socket_, Read(NotNull(), kPacketHeaderSizeBytes, _)) @@ -283,7 +283,7 @@ TEST_F(PacketReaderTest, PartialPayloadReadAsync) { } // Verify that synchronous header read errors are reported correctly. -TEST_F(PacketReaderTest, ReadHeaderErrorSync) { +TEST_F(StreamPacketReaderTest, ReadHeaderErrorSync) { net::CompletionCallback cb; EXPECT_CALL(socket_, Read(NotNull(), kPacketHeaderSizeBytes, _)) .WillOnce(Return(net::ERR_FAILED)); @@ -291,7 +291,7 @@ TEST_F(PacketReaderTest, ReadHeaderErrorSync) { } // Verify that synchronous payload read errors are reported correctly. -TEST_F(PacketReaderTest, ReadPayloadErrorSync) { +TEST_F(StreamPacketReaderTest, ReadPayloadErrorSync) { net::CompletionCallback cb; EXPECT_CALL(socket_, Read(NotNull(), kPacketHeaderSizeBytes, _)) @@ -304,7 +304,7 @@ TEST_F(PacketReaderTest, ReadPayloadErrorSync) { } // Verify that async header read errors are reported correctly. -TEST_F(PacketReaderTest, ReadHeaderErrorAsync) { +TEST_F(StreamPacketReaderTest, ReadHeaderErrorAsync) { net::CompletionCallback cb; net::TestCompletionCallback test_cb; @@ -318,7 +318,7 @@ TEST_F(PacketReaderTest, ReadHeaderErrorAsync) { } // Verify that asynchronous paylod read errors are reported correctly. -TEST_F(PacketReaderTest, ReadPayloadErrorAsync) { +TEST_F(StreamPacketReaderTest, ReadPayloadErrorAsync) { net::CompletionCallback cb; EXPECT_CALL(socket_, Read(NotNull(), kPacketHeaderSizeBytes, _)) @@ -332,12 +332,12 @@ TEST_F(PacketReaderTest, ReadPayloadErrorAsync) { EXPECT_EQ(net::ERR_FAILED, callback_.WaitForResult()); } -// Verify that async header read completions don't break us if the PacketReader -// object was destroyed. -TEST_F(PacketReaderTest, ReaderDeletedDuringAsyncHeaderRead) { +// Verify that async header read completions don't break us if the +// StreamPacketReader object was destroyed. +TEST_F(StreamPacketReaderTest, ReaderDeletedDuringAsyncHeaderRead) { net::CompletionCallback cb; net::TestCompletionCallback test_cb; - scoped_ptr<PacketReader> reader(new PacketReader(&socket_)); + scoped_ptr<StreamPacketReader> reader(new StreamPacketReader(&socket_)); EXPECT_CALL(socket_, Read(NotNull(), kPacketHeaderSizeBytes, _)) .WillOnce(DoAll(FillBufferFromString<0>(EncodeHeader(test_msg_.size())), @@ -349,11 +349,11 @@ TEST_F(PacketReaderTest, ReaderDeletedDuringAsyncHeaderRead) { cb.Run(kPacketHeaderSizeBytes); // Complete the socket operation. } -// Verify that async payload read completions don't break us if the PacketReader -// object was destroyed. -TEST_F(PacketReaderTest, ReaderDeletedDuringAsyncPayloadRead) { +// Verify that async payload read completions don't break us if the +// StreamPacketReader object was destroyed. +TEST_F(StreamPacketReaderTest, ReaderDeletedDuringAsyncPayloadRead) { net::CompletionCallback cb; - scoped_ptr<PacketReader> reader(new PacketReader(&socket_)); + scoped_ptr<StreamPacketReader> reader(new StreamPacketReader(&socket_)); EXPECT_CALL(socket_, Read(NotNull(), kPacketHeaderSizeBytes, _)) .WillOnce(DoAll(FillBufferFromString<0>(EncodeHeader(test_msg_.size())), @@ -368,7 +368,7 @@ TEST_F(PacketReaderTest, ReaderDeletedDuringAsyncPayloadRead) { } // Verify that zero-length payload is reported as an erroneous input. -TEST_F(PacketReaderTest, ReadWhatIsThisAPacketForAnts) { +TEST_F(StreamPacketReaderTest, ReadWhatIsThisAPacketForAnts) { EXPECT_CALL(socket_, Read(NotNull(), kPacketHeaderSizeBytes, _)) .WillOnce(DoAll(FillBufferFromString<0>(EncodeHeader(0)), Return(kPacketHeaderSizeBytes))) @@ -378,7 +378,7 @@ TEST_F(PacketReaderTest, ReadWhatIsThisAPacketForAnts) { } // Verify that an illegally large payloads is reported as an erroneous inputs. -TEST_F(PacketReaderTest, ReadErrorIllegallyLargePayload) { +TEST_F(StreamPacketReaderTest, ReadErrorIllegallyLargePayload) { EXPECT_CALL(socket_, Read(NotNull(), kPacketHeaderSizeBytes, _)) .WillOnce( DoAll(FillBufferFromString<0>(EncodeHeader(kTestMaxBufferSize + 1)), diff --git a/blimp/net/packet_writer.cc b/blimp/net/stream_packet_writer.cc index 81a6d90..221cdd1 100644 --- a/blimp/net/packet_writer.cc +++ b/blimp/net/stream_packet_writer.cc @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "blimp/net/packet_writer.h" +#include "blimp/net/stream_packet_writer.h" #include <iostream> @@ -20,22 +20,22 @@ namespace blimp { std::ostream& operator<<(std::ostream& out, - const PacketWriter::WriteState state) { + const StreamPacketWriter::WriteState state) { switch (state) { - case PacketWriter::WriteState::IDLE: + case StreamPacketWriter::WriteState::IDLE: out << "IDLE"; break; - case PacketWriter::WriteState::HEADER: + case StreamPacketWriter::WriteState::HEADER: out << "HEADER"; break; - case PacketWriter::WriteState::PAYLOAD: + case StreamPacketWriter::WriteState::PAYLOAD: out << "PAYLOAD"; break; } return out; } -PacketWriter::PacketWriter(net::StreamSocket* socket) +StreamPacketWriter::StreamPacketWriter(net::StreamSocket* socket) : write_state_(WriteState::IDLE), socket_(socket), header_buffer_( @@ -45,10 +45,10 @@ PacketWriter::PacketWriter(net::StreamSocket* socket) DCHECK(socket_); } -PacketWriter::~PacketWriter() {} +StreamPacketWriter::~StreamPacketWriter() {} -int PacketWriter::WritePacket(scoped_refptr<net::DrainableIOBuffer> data, - const net::CompletionCallback& callback) { +int StreamPacketWriter::WritePacket(scoped_refptr<net::DrainableIOBuffer> data, + const net::CompletionCallback& callback) { DCHECK_EQ(WriteState::IDLE, write_state_); DCHECK(data); if (data->BytesRemaining() == 0) { @@ -77,7 +77,7 @@ int PacketWriter::WritePacket(scoped_refptr<net::DrainableIOBuffer> data, return result; } -int PacketWriter::DoWriteLoop(int result) { +int StreamPacketWriter::DoWriteLoop(int result) { DCHECK_NE(net::ERR_IO_PENDING, result); DCHECK_GE(result, 0); DCHECK_NE(WriteState::IDLE, write_state_); @@ -103,37 +103,39 @@ int PacketWriter::DoWriteLoop(int result) { return result; } -int PacketWriter::DoWriteHeader(int result) { +int StreamPacketWriter::DoWriteHeader(int result) { DCHECK_EQ(WriteState::HEADER, write_state_); DCHECK_GE(result, 0); header_buffer_->DidConsume(result); if (header_buffer_->BytesRemaining() > 0) { - return socket_->Write( - header_buffer_.get(), header_buffer_->BytesRemaining(), - base::Bind(&PacketWriter::OnWriteComplete, weak_factory_.GetWeakPtr())); + return socket_->Write(header_buffer_.get(), + header_buffer_->BytesRemaining(), + base::Bind(&StreamPacketWriter::OnWriteComplete, + weak_factory_.GetWeakPtr())); } write_state_ = WriteState::PAYLOAD; return net::OK; } -int PacketWriter::DoWritePayload(int result) { +int StreamPacketWriter::DoWritePayload(int result) { DCHECK_EQ(WriteState::PAYLOAD, write_state_); DCHECK_GE(result, 0); payload_buffer_->DidConsume(result); if (payload_buffer_->BytesRemaining() > 0) { - return socket_->Write( - payload_buffer_.get(), payload_buffer_->BytesRemaining(), - base::Bind(&PacketWriter::OnWriteComplete, weak_factory_.GetWeakPtr())); + return socket_->Write(payload_buffer_.get(), + payload_buffer_->BytesRemaining(), + base::Bind(&StreamPacketWriter::OnWriteComplete, + weak_factory_.GetWeakPtr())); } write_state_ = WriteState::IDLE; return net::OK; } -void PacketWriter::OnWriteComplete(int result) { +void StreamPacketWriter::OnWriteComplete(int result) { DCHECK_NE(net::ERR_IO_PENDING, result); // If the write was succesful, then process the result. diff --git a/blimp/net/stream_packet_writer.h b/blimp/net/stream_packet_writer.h new file mode 100644 index 0000000..c618eba --- /dev/null +++ b/blimp/net/stream_packet_writer.h @@ -0,0 +1,78 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BLIMP_NET_STREAM_PACKET_WRITER_H_ +#define BLIMP_NET_STREAM_PACKET_WRITER_H_ + +#include <string> + +#include "base/macros.h" +#include "base/memory/ref_counted.h" +#include "base/threading/thread_checker.h" +#include "blimp/net/blimp_net_export.h" +#include "blimp/net/packet_writer.h" +#include "net/base/completion_callback.h" +#include "net/base/net_errors.h" + +namespace net { +class DrainableIOBuffer; +class StreamSocket; +} // namespace net + +namespace blimp { + +// Writes opaque length-prefixed packets to a StreamSocket. +// The header segment is 32-bit, encoded in network byte order. +// The body segment length is specified in the header (should be capped at +// kMaxPacketPayloadSizeBytes). +class BLIMP_NET_EXPORT StreamPacketWriter : public PacketWriter { + public: + // |socket|: The socket to write packets to. The caller must ensure |socket| + // is valid while the reader is in-use (see ReadPacket below). + explicit StreamPacketWriter(net::StreamSocket* socket); + + ~StreamPacketWriter() override; + + // PacketWriter implementation. + int WritePacket(scoped_refptr<net::DrainableIOBuffer> data, + const net::CompletionCallback& callback) override; + + private: + enum class WriteState { + IDLE, + HEADER, + PAYLOAD, + }; + + friend std::ostream& operator<<(std::ostream& out, const WriteState state); + + // State machine implementation. + // |result| - the result value of the most recent network operation. + // See comments for WritePacket() for documentation on return values. + int DoWriteLoop(int result); + + int DoWriteHeader(int result); + + int DoWritePayload(int result); + + // Callback function to be invoked on asynchronous write completion. + // Invokes |callback_| on packet write completion or on error. + void OnWriteComplete(int result); + + WriteState write_state_; + + net::StreamSocket* socket_; + + scoped_refptr<net::DrainableIOBuffer> payload_buffer_; + scoped_refptr<net::DrainableIOBuffer> header_buffer_; + net::CompletionCallback callback_; + + base::WeakPtrFactory<StreamPacketWriter> weak_factory_; + + DISALLOW_COPY_AND_ASSIGN(StreamPacketWriter); +}; + +} // namespace blimp + +#endif // BLIMP_NET_STREAM_PACKET_WRITER_H_ diff --git a/blimp/net/packet_writer_unittest.cc b/blimp/net/stream_packet_writer_unittest.cc index 2554b47..c8014f9 100644 --- a/blimp/net/packet_writer_unittest.cc +++ b/blimp/net/stream_packet_writer_unittest.cc @@ -7,7 +7,7 @@ #include "base/message_loop/message_loop.h" #include "base/run_loop.h" #include "blimp/net/common.h" -#include "blimp/net/packet_writer.h" +#include "blimp/net/stream_packet_writer.h" #include "blimp/net/test_common.h" #include "net/base/io_buffer.h" #include "net/base/net_errors.h" @@ -27,31 +27,31 @@ using testing::SaveArg; namespace blimp { namespace { -class PacketWriterTest : public testing::Test { +class StreamPacketWriterTest : public testing::Test { public: - PacketWriterTest() + StreamPacketWriterTest() : test_data_( new net::DrainableIOBuffer(new net::StringIOBuffer(test_data_str_), test_data_str_.size())), message_writer_(&socket_) {} - ~PacketWriterTest() override {} + ~StreamPacketWriterTest() override {} protected: const std::string test_data_str_ = "U WOT M8"; scoped_refptr<net::DrainableIOBuffer> test_data_; MockStreamSocket socket_; - PacketWriter message_writer_; + StreamPacketWriter message_writer_; base::MessageLoop message_loop_; testing::InSequence mock_sequence_; private: - DISALLOW_COPY_AND_ASSIGN(PacketWriterTest); + DISALLOW_COPY_AND_ASSIGN(StreamPacketWriterTest); }; // Successful write with 1 async header write and 1 async payload write. -TEST_F(PacketWriterTest, TestWriteAsync) { +TEST_F(StreamPacketWriterTest, TestWriteAsync) { net::TestCompletionCallback writer_cb; net::CompletionCallback header_cb; net::CompletionCallback payload_cb; @@ -76,7 +76,7 @@ TEST_F(PacketWriterTest, TestWriteAsync) { } // Successful write with 2 async header writes and 2 async payload writes. -TEST_F(PacketWriterTest, TestPartialWriteAsync) { +TEST_F(StreamPacketWriterTest, TestPartialWriteAsync) { net::TestCompletionCallback writer_cb; net::CompletionCallback header_cb; net::CompletionCallback payload_cb; @@ -115,7 +115,7 @@ TEST_F(PacketWriterTest, TestPartialWriteAsync) { } // Async socket error while writing data. -TEST_F(PacketWriterTest, TestWriteErrorAsync) { +TEST_F(StreamPacketWriterTest, TestWriteErrorAsync) { net::TestCompletionCallback writer_cb; net::CompletionCallback header_cb; net::CompletionCallback payload_cb; @@ -136,7 +136,7 @@ TEST_F(PacketWriterTest, TestWriteErrorAsync) { } // Successful write with 1 sync header write and 1 sync payload write. -TEST_F(PacketWriterTest, TestWriteSync) { +TEST_F(StreamPacketWriterTest, TestWriteSync) { net::TestCompletionCallback writer_cb; EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())), kPacketHeaderSizeBytes, _)) @@ -150,7 +150,7 @@ TEST_F(PacketWriterTest, TestWriteSync) { } // Successful write with 2 sync header writes and 2 sync payload writes. -TEST_F(PacketWriterTest, TestPartialWriteSync) { +TEST_F(StreamPacketWriterTest, TestPartialWriteSync) { net::TestCompletionCallback writer_cb; std::string header = EncodeHeader(test_data_str_.size()); @@ -173,7 +173,7 @@ TEST_F(PacketWriterTest, TestPartialWriteSync) { } // Verify that zero-length packets are rejected. -TEST_F(PacketWriterTest, TestZeroLengthPacketsRejected) { +TEST_F(StreamPacketWriterTest, TestZeroLengthPacketsRejected) { net::TestCompletionCallback writer_cb; EXPECT_EQ(net::ERR_INVALID_ARGUMENT, @@ -185,7 +185,7 @@ TEST_F(PacketWriterTest, TestZeroLengthPacketsRejected) { } // Sync socket error while writing header data. -TEST_F(PacketWriterTest, TestWriteHeaderErrorSync) { +TEST_F(StreamPacketWriterTest, TestWriteHeaderErrorSync) { net::TestCompletionCallback writer_cb; EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())), @@ -201,7 +201,7 @@ TEST_F(PacketWriterTest, TestWriteHeaderErrorSync) { } // Sync socket error while writing payload data. -TEST_F(PacketWriterTest, TestWritePayloadErrorSync) { +TEST_F(StreamPacketWriterTest, TestWritePayloadErrorSync) { net::TestCompletionCallback writer_cb; EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())), @@ -218,11 +218,11 @@ TEST_F(PacketWriterTest, TestWritePayloadErrorSync) { // Verify that asynchronous header write completions don't cause a // use-after-free error if the writer object is deleted. -TEST_F(PacketWriterTest, DeletedDuringHeaderWrite) { +TEST_F(StreamPacketWriterTest, DeletedDuringHeaderWrite) { net::TestCompletionCallback writer_cb; net::CompletionCallback header_cb; net::CompletionCallback payload_cb; - scoped_ptr<PacketWriter> writer(new PacketWriter(&socket_)); + scoped_ptr<StreamPacketWriter> writer(new StreamPacketWriter(&socket_)); // Write header. EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())), @@ -239,11 +239,11 @@ TEST_F(PacketWriterTest, DeletedDuringHeaderWrite) { // Verify that asynchronous payload write completions don't cause a // use-after-free error if the writer object is deleted. -TEST_F(PacketWriterTest, DeletedDuringPayloadWrite) { +TEST_F(StreamPacketWriterTest, DeletedDuringPayloadWrite) { net::TestCompletionCallback writer_cb; net::CompletionCallback header_cb; net::CompletionCallback payload_cb; - scoped_ptr<PacketWriter> writer(new PacketWriter(&socket_)); + scoped_ptr<StreamPacketWriter> writer(new StreamPacketWriter(&socket_)); EXPECT_CALL(socket_, Write(BufferEquals(EncodeHeader(test_data_str_.size())), kPacketHeaderSizeBytes, _)) diff --git a/blimp/net/stream_socket_connection.cc b/blimp/net/stream_socket_connection.cc new file mode 100644 index 0000000..ec06206 --- /dev/null +++ b/blimp/net/stream_socket_connection.cc @@ -0,0 +1,22 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "blimp/net/stream_socket_connection.h" + +#include "blimp/net/stream_packet_reader.h" +#include "blimp/net/stream_packet_writer.h" + +namespace blimp { + +StreamSocketConnection::StreamSocketConnection( + scoped_ptr<net::StreamSocket> socket) + : BlimpConnection(make_scoped_ptr(new StreamPacketReader(socket.get())), + make_scoped_ptr(new StreamPacketWriter(socket.get()))), + socket_(socket.Pass()) { + DCHECK(socket_); +} + +StreamSocketConnection::~StreamSocketConnection() {} + +} // namespace blimp diff --git a/blimp/net/stream_socket_connection.h b/blimp/net/stream_socket_connection.h new file mode 100644 index 0000000..9739a09 --- /dev/null +++ b/blimp/net/stream_socket_connection.h @@ -0,0 +1,31 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BLIMP_NET_STREAM_SOCKET_CONNECTION_H_ +#define BLIMP_NET_STREAM_SOCKET_CONNECTION_H_ + +#include "base/memory/scoped_ptr.h" +#include "blimp/net/blimp_connection.h" +#include "net/socket/stream_socket.h" + +namespace net { +class StreamSocket; +} // namespace net + +namespace blimp { + +// BlimpConnection specialization for StreamSocket-based connections. +class StreamSocketConnection : public BlimpConnection { + public: + explicit StreamSocketConnection(scoped_ptr<net::StreamSocket> socket); + + ~StreamSocketConnection() override; + + private: + scoped_ptr<net::StreamSocket> socket_; +}; + +} // namespace blimp + +#endif // BLIMP_NET_STREAM_SOCKET_CONNECTION_H_ diff --git a/blimp/net/tcp_client_transport.cc b/blimp/net/tcp_client_transport.cc index 3d52aa4..8ac24a8 100644 --- a/blimp/net/tcp_client_transport.cc +++ b/blimp/net/tcp_client_transport.cc @@ -7,6 +7,7 @@ #include "base/callback.h" #include "base/callback_helpers.h" #include "base/memory/scoped_ptr.h" +#include "blimp/net/stream_socket_connection.h" #include "net/socket/stream_socket.h" #include "net/socket/tcp_client_socket.h" @@ -19,34 +20,36 @@ TCPClientTransport::TCPClientTransport(const net::AddressList& addresses, TCPClientTransport::~TCPClientTransport() {} int TCPClientTransport::Connect(const net::CompletionCallback& callback) { - DCHECK(!socket_); + DCHECK(!connection_); DCHECK(!callback.is_null()); - socket_.reset( + scoped_ptr<net::StreamSocket> socket; + socket.reset( new net::TCPClientSocket(addresses_, net_log_, net::NetLog::Source())); net::CompletionCallback completion_callback = base::Bind( &TCPClientTransport::OnTCPConnectComplete, base::Unretained(this)); - int result = socket_->Connect(completion_callback); + int result = socket->Connect(completion_callback); + connection_.reset(new StreamSocketConnection(socket.Pass())); if (result == net::ERR_IO_PENDING) { connect_callback_ = callback; } else if (result != net::OK) { - socket_ = nullptr; + connection_ = nullptr; } return result; } -scoped_ptr<net::StreamSocket> TCPClientTransport::TakeConnectedSocket() { - DCHECK(socket_); +scoped_ptr<BlimpConnection> TCPClientTransport::TakeConnection() { + DCHECK(connection_); DCHECK(connect_callback_.is_null()); - return socket_.Pass(); + return connection_.Pass(); } void TCPClientTransport::OnTCPConnectComplete(int result) { - DCHECK(socket_); + DCHECK(connection_); if (result != net::OK) { - socket_ = nullptr; + connection_ = nullptr; } base::ResetAndReturn(&connect_callback_).Run(result); } diff --git a/blimp/net/tcp_client_transport.h b/blimp/net/tcp_client_transport.h index ac383d5..77ca287 100644 --- a/blimp/net/tcp_client_transport.h +++ b/blimp/net/tcp_client_transport.h @@ -14,11 +14,12 @@ namespace net { class NetLog; -class StreamSocket; } // namespace net namespace blimp { +class BlimpConnection; + // BlimpTransport which creates a TCP connection to one of the specified // |addresses| on each call to Connect(). class TCPClientTransport : public BlimpTransport { @@ -28,14 +29,14 @@ class TCPClientTransport : public BlimpTransport { // BlimpTransport implementation. int Connect(const net::CompletionCallback& callback) override; - scoped_ptr<net::StreamSocket> TakeConnectedSocket() override; + scoped_ptr<BlimpConnection> TakeConnection() override; private: void OnTCPConnectComplete(int result); net::AddressList addresses_; net::NetLog* net_log_; - scoped_ptr<net::StreamSocket> socket_; + scoped_ptr<BlimpConnection> connection_; net::CompletionCallback connect_callback_; DISALLOW_COPY_AND_ASSIGN(TCPClientTransport); |