From 0131077a38a9455e6270d4b1eb474bab34fae1fd Mon Sep 17 00:00:00 2001 From: haibinlu Date: Thu, 19 Nov 2015 17:43:04 -0800 Subject: [Blimp Net] Implement BlimpConnection. Review URL: https://codereview.chromium.org/1460593002 Cr-Commit-Position: refs/heads/master@{#360712} --- blimp/net/BUILD.gn | 1 + blimp/net/blimp_connection.cc | 90 +++++++++++++- blimp/net/blimp_connection.h | 3 +- blimp/net/blimp_connection_unittest.cc | 196 +++++++++++++++++++++++++++++++ blimp/net/blimp_message_pump_unittest.cc | 9 +- blimp/net/test_common.cc | 8 ++ blimp/net/test_common.h | 39 +++++- 7 files changed, 335 insertions(+), 11 deletions(-) create mode 100644 blimp/net/blimp_connection_unittest.cc (limited to 'blimp/net') diff --git a/blimp/net/BUILD.gn b/blimp/net/BUILD.gn index cc0263e..e745099 100644 --- a/blimp/net/BUILD.gn +++ b/blimp/net/BUILD.gn @@ -52,6 +52,7 @@ source_set("unit_tests") { testonly = true sources = [ + "blimp_connection_unittest.cc", "blimp_message_demultiplexer_unittest.cc", "blimp_message_multiplexer_unittest.cc", "blimp_message_pump_unittest.cc", diff --git a/blimp/net/blimp_connection.cc b/blimp/net/blimp_connection.cc index e9fc41e..e7db062 100644 --- a/blimp/net/blimp_connection.cc +++ b/blimp/net/blimp_connection.cc @@ -4,20 +4,101 @@ #include "blimp/net/blimp_connection.h" +#include "base/callback_helpers.h" +#include "base/logging.h" #include "base/macros.h" +#include "base/message_loop/message_loop.h" +#include "blimp/common/proto/blimp_message.pb.h" #include "blimp/net/blimp_message_processor.h" #include "blimp/net/blimp_message_pump.h" +#include "blimp/net/common.h" #include "blimp/net/connection_error_observer.h" #include "blimp/net/packet_reader.h" #include "blimp/net/packet_writer.h" +#include "net/base/completion_callback.h" namespace blimp { +namespace { + +// Forwards incoming blimp messages to PacketWriter. +class BlimpMessageSender : public BlimpMessageProcessor { + public: + explicit BlimpMessageSender(PacketWriter* writer); + ~BlimpMessageSender() override; + + void set_error_observer(ConnectionErrorObserver* observer) { + error_observer_ = observer; + } + + // BlimpMessageProcessor implementation. + void ProcessMessage(scoped_ptr message, + const net::CompletionCallback& callback) override; + + private: + void OnWritePacketComplete(int result); + + PacketWriter* writer_; + ConnectionErrorObserver* error_observer_; + scoped_refptr buffer_; + net::CancelableCompletionCallback write_packet_callback_; + net::CompletionCallback pending_process_msg_callback_; + + DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender); +}; + +BlimpMessageSender::BlimpMessageSender(PacketWriter* writer) + : writer_(writer), + buffer_(new net::DrainableIOBuffer( + new net::IOBuffer(kMaxPacketPayloadSizeBytes), + kMaxPacketPayloadSizeBytes)) { + DCHECK(writer_); +} + +BlimpMessageSender::~BlimpMessageSender() {} + +void BlimpMessageSender::ProcessMessage( + scoped_ptr message, + const net::CompletionCallback& callback) { + if (message->ByteSize() > static_cast(kMaxPacketPayloadSizeBytes)) { + DLOG(ERROR) << "Message is too big, size=" << message->ByteSize(); + callback.Run(net::ERR_MSG_TOO_BIG); + return; + } + + buffer_->SetOffset(0); + if (!message->SerializeToArray(buffer_->data(), message->ByteSize())) { + DLOG(ERROR) << "Failed to serialize message."; + callback.Run(net::ERR_INVALID_ARGUMENT); + return; + } + + write_packet_callback_.Reset(base::Bind( + &BlimpMessageSender::OnWritePacketComplete, base::Unretained(this))); + pending_process_msg_callback_ = callback; + int result = writer_->WritePacket(buffer_, write_packet_callback_.callback()); + if (result != net::ERR_IO_PENDING) { + base::MessageLoop::current()->PostTask( + FROM_HERE, base::Bind(write_packet_callback_.callback(), result)); + } +} + +void BlimpMessageSender::OnWritePacketComplete(int result) { + DCHECK_NE(net::ERR_IO_PENDING, result); + write_packet_callback_.Cancel(); + base::ResetAndReturn(&pending_process_msg_callback_).Run(result); + if (result != net::OK) { + error_observer_->OnConnectionError(result); + } +} + +} // namespace BlimpConnection::BlimpConnection(scoped_ptr reader, scoped_ptr writer) : reader_(reader.Pass()), message_pump_(new BlimpMessagePump(reader_.get())), - writer_(writer.Pass()) { + writer_(writer.Pass()), + outgoing_msg_processor_(new BlimpMessageSender(writer_.get())) { DCHECK(writer_); } @@ -26,6 +107,9 @@ BlimpConnection::~BlimpConnection() {} void BlimpConnection::SetConnectionErrorObserver( ConnectionErrorObserver* observer) { message_pump_->set_error_observer(observer); + BlimpMessageSender* sender = + static_cast(outgoing_msg_processor_.get()); + sender->set_error_observer(observer); } void BlimpConnection::SetIncomingMessageProcessor( @@ -33,4 +117,8 @@ void BlimpConnection::SetIncomingMessageProcessor( message_pump_->SetMessageProcessor(processor); } +BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() const { + return outgoing_msg_processor_.get(); +} + } // namespace blimp diff --git a/blimp/net/blimp_connection.h b/blimp/net/blimp_connection.h index a48521a..9f48f50 100644 --- a/blimp/net/blimp_connection.h +++ b/blimp/net/blimp_connection.h @@ -35,12 +35,13 @@ class BLIMP_NET_EXPORT BlimpConnection { void SetIncomingMessageProcessor(BlimpMessageProcessor* processor); // Gets a processor for BrowserSession->BlimpConnection message routing. - scoped_ptr take_outgoing_message_processor() const; + BlimpMessageProcessor* GetOutgoingMessageProcessor() const; private: scoped_ptr reader_; scoped_ptr message_pump_; scoped_ptr writer_; + scoped_ptr outgoing_msg_processor_; DISALLOW_COPY_AND_ASSIGN(BlimpConnection); }; diff --git a/blimp/net/blimp_connection_unittest.cc b/blimp/net/blimp_connection_unittest.cc new file mode 100644 index 0000000..82a547b --- /dev/null +++ b/blimp/net/blimp_connection_unittest.cc @@ -0,0 +1,196 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include +#include + +#include "base/callback_helpers.h" +#include "base/message_loop/message_loop.h" +#include "blimp/common/proto/blimp_message.pb.h" +#include "blimp/net/blimp_connection.h" +#include "blimp/net/common.h" +#include "blimp/net/connection_error_observer.h" +#include "blimp/net/test_common.h" +#include "net/base/completion_callback.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/base/test_completion_callback.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using testing::_; +using testing::DoAll; +using testing::InSequence; +using testing::NotNull; +using testing::Return; +using testing::SaveArg; + +namespace blimp { +namespace { + +class BlimpConnectionTest : public testing::Test { + public: + BlimpConnectionTest() { + message1_ = CreateInputMessage().Pass(); + message2_ = CreateControlMessage().Pass(); + scoped_ptr> reader( + new testing::StrictMock); + reader_ = reader.get(); + scoped_ptr> writer( + new testing::StrictMock); + writer_ = writer.get(); + connection_.reset(new BlimpConnection(reader.Pass(), writer.Pass())); + connection_->SetConnectionErrorObserver(&error_observer_); + } + + ~BlimpConnectionTest() override {} + + scoped_ptr CreateInputMessage() { + scoped_ptr msg(new BlimpMessage); + msg->set_type(BlimpMessage::INPUT); + return msg.Pass(); + } + + scoped_ptr CreateControlMessage() { + scoped_ptr msg(new BlimpMessage); + msg->set_type(BlimpMessage::CONTROL); + return msg.Pass(); + } + + protected: + base::MessageLoopForIO message_loop_; + scoped_ptr message1_; + scoped_ptr message2_; + + testing::StrictMock* reader_; + testing::StrictMock* writer_; + testing::StrictMock error_observer_; + testing::StrictMock receiver_; + scoped_ptr connection_; +}; + +// Reader completes reading one packet synchronously. +// Two read cases here. BlimpMessagePumpTest covers other cases. +TEST_F(BlimpConnectionTest, SyncPacketRead) { + EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _)); + EXPECT_CALL(*reader_, ReadPacket(NotNull(), _)) + .WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()), + Return(message1_->ByteSize()))); + connection_->SetIncomingMessageProcessor(&receiver_); +} + +// Reader completes reading one packet synchronously withe error. +TEST_F(BlimpConnectionTest, SyncPacketReadWithError) { + InSequence s; + EXPECT_CALL(*reader_, ReadPacket(NotNull(), _)) + .WillOnce(Return(net::ERR_FAILED)); + EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED)); + connection_->SetIncomingMessageProcessor(&receiver_); +} + +// Writer completes writing two packets synchronously. +TEST_F(BlimpConnectionTest, SyncTwoPacketsWrite) { + InSequence s; + EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _)) + .WillOnce(Return(net::OK)) + .RetiresOnSaturation(); + EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _)) + .WillOnce(Return(net::OK)) + .RetiresOnSaturation(); + + BlimpMessageProcessor* sender = connection_->GetOutgoingMessageProcessor(); + net::TestCompletionCallback complete_cb_1; + sender->ProcessMessage(CreateInputMessage().Pass(), complete_cb_1.callback()); + EXPECT_EQ(net::OK, complete_cb_1.WaitForResult()); + net::TestCompletionCallback complete_cb_2; + sender->ProcessMessage(CreateControlMessage().Pass(), + complete_cb_2.callback()); + EXPECT_EQ(net::OK, complete_cb_2.WaitForResult()); +} + +// Writer completes writing two packets synchronously. +// First write succeeds, second fails. +TEST_F(BlimpConnectionTest, SyncTwoPacketsWriteWithError) { + InSequence s; + EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _)) + .WillOnce(Return(net::OK)) + .RetiresOnSaturation(); + EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _)) + .WillOnce(Return(net::ERR_FAILED)) + .RetiresOnSaturation(); + EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED)); + + BlimpMessageProcessor* sender = connection_->GetOutgoingMessageProcessor(); + net::TestCompletionCallback complete_cb_1; + sender->ProcessMessage(CreateInputMessage().Pass(), complete_cb_1.callback()); + EXPECT_EQ(net::OK, complete_cb_1.WaitForResult()); + net::TestCompletionCallback complete_cb_2; + sender->ProcessMessage(CreateControlMessage().Pass(), + complete_cb_2.callback()); + EXPECT_EQ(net::ERR_FAILED, complete_cb_2.WaitForResult()); +} + +// Write completes writing two packets asynchronously. +TEST_F(BlimpConnectionTest, AsyncTwoPacketsWrite) { + net::CompletionCallback write_packet_cb; + + InSequence s; + EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _)) + .WillOnce( + DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING))) + .RetiresOnSaturation(); + EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _)) + .WillOnce( + DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING))) + .RetiresOnSaturation(); + + BlimpMessageProcessor* sender = connection_->GetOutgoingMessageProcessor(); + net::TestCompletionCallback complete_cb_1; + ASSERT_TRUE(write_packet_cb.is_null()); + sender->ProcessMessage(CreateInputMessage().Pass(), complete_cb_1.callback()); + ASSERT_FALSE(write_packet_cb.is_null()); + base::ResetAndReturn(&write_packet_cb).Run(net::OK); + EXPECT_EQ(net::OK, complete_cb_1.WaitForResult()); + + net::TestCompletionCallback complete_cb_2; + ASSERT_TRUE(write_packet_cb.is_null()); + sender->ProcessMessage(CreateControlMessage().Pass(), + complete_cb_2.callback()); + ASSERT_FALSE(write_packet_cb.is_null()); + base::ResetAndReturn(&write_packet_cb).Run(net::OK); + EXPECT_EQ(net::OK, complete_cb_2.WaitForResult()); +} + +// Writer completes writing two packets asynchronously. +// First write succeeds, second fails. +TEST_F(BlimpConnectionTest, AsyncTwoPacketsWriteWithError) { + net::CompletionCallback write_packet_cb; + + InSequence s; + EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _)) + .WillOnce( + DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING))) + .RetiresOnSaturation(); + EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _)) + .WillOnce( + DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING))) + .RetiresOnSaturation(); + EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED)); + + BlimpMessageProcessor* sender = connection_->GetOutgoingMessageProcessor(); + net::TestCompletionCallback complete_cb_1; + sender->ProcessMessage(CreateInputMessage().Pass(), complete_cb_1.callback()); + base::ResetAndReturn(&write_packet_cb).Run(net::OK); + EXPECT_EQ(net::OK, complete_cb_1.WaitForResult()); + + net::TestCompletionCallback complete_cb_2; + sender->ProcessMessage(CreateControlMessage().Pass(), + complete_cb_2.callback()); + base::ResetAndReturn(&write_packet_cb).Run(net::ERR_FAILED); + EXPECT_EQ(net::ERR_FAILED, complete_cb_2.WaitForResult()); +} + +} // namespace + +} // namespace blimp diff --git a/blimp/net/blimp_message_pump_unittest.cc b/blimp/net/blimp_message_pump_unittest.cc index 65aaec9..ec7987e 100644 --- a/blimp/net/blimp_message_pump_unittest.cc +++ b/blimp/net/blimp_message_pump_unittest.cc @@ -27,11 +27,6 @@ using testing::SaveArg; namespace blimp { namespace { -class MockConnectionErrorObserver : public ConnectionErrorObserver { - public: - MOCK_METHOD1(OnConnectionError, void(int error)); -}; - class BlimpMessagePumpTest : public testing::Test { public: BlimpMessagePumpTest() @@ -49,8 +44,8 @@ class BlimpMessagePumpTest : public testing::Test { scoped_ptr message2_; testing::StrictMock reader_; - MockConnectionErrorObserver error_observer_; - MockBlimpMessageProcessor receiver_; + testing::StrictMock error_observer_; + testing::StrictMock receiver_; scoped_ptr message_pump_; }; diff --git a/blimp/net/test_common.cc b/blimp/net/test_common.cc index a3a73ad8..24f4ae3 100644 --- a/blimp/net/test_common.cc +++ b/blimp/net/test_common.cc @@ -21,6 +21,14 @@ MockPacketReader::MockPacketReader() {} MockPacketReader::~MockPacketReader() {} +MockPacketWriter::MockPacketWriter() {} + +MockPacketWriter::~MockPacketWriter() {} + +MockConnectionErrorObserver::MockConnectionErrorObserver() {} + +MockConnectionErrorObserver::~MockConnectionErrorObserver() {} + MockBlimpMessageProcessor::MockBlimpMessageProcessor() {} MockBlimpMessageProcessor::~MockBlimpMessageProcessor() {} diff --git a/blimp/net/test_common.h b/blimp/net/test_common.h index eadc1e6..119f8ee 100644 --- a/blimp/net/test_common.h +++ b/blimp/net/test_common.h @@ -7,8 +7,11 @@ #include +#include "blimp/common/proto/blimp_message.pb.h" #include "blimp/net/blimp_message_processor.h" +#include "blimp/net/connection_error_observer.h" #include "blimp/net/packet_reader.h" +#include "blimp/net/packet_writer.h" #include "net/socket/stream_socket.h" #include "testing/gmock/include/gmock/gmock.h" @@ -31,12 +34,26 @@ MATCHER_P(BufferEquals, expected, "") { // Checks if two proto messages are the same. // TODO(kmarshall): promote to a shared testing library. MATCHER_P(EqualsProto, message, "") { - std::string expected_serialized, actual_serialized; + std::string expected_serialized; + std::string actual_serialized; message.SerializeToString(&expected_serialized); arg.SerializeToString(&actual_serialized); return expected_serialized == actual_serialized; } +// Checks if the contents of a buffer are an exact match with BlimpMessage. +// arg (type: net::DrainableIOBuffer*) The buffer to check. +// message (type: BlimpMessage) The message to compare with |arg|. +MATCHER_P(BufferEqualsProto, message, "") { + BlimpMessage actual_message; + actual_message.ParseFromArray(arg->data(), arg->BytesRemaining()); + std::string expected_serialized; + std::string actual_serialized; + message.SerializeToString(&expected_serialized); + actual_message.SerializeToString(&actual_serialized); + return expected_serialized == actual_serialized; +} + // GMock action that writes data from a string to an IOBuffer. // // buf_idx (template parameter 0): 0-based index of the IOBuffer arg. @@ -95,13 +112,31 @@ class MockStreamSocket : public net::StreamSocket { class MockPacketReader : public PacketReader { public: MockPacketReader(); - virtual ~MockPacketReader(); + ~MockPacketReader() override; MOCK_METHOD2(ReadPacket, int(const scoped_refptr&, const net::CompletionCallback&)); }; +class MockPacketWriter : public PacketWriter { + public: + MockPacketWriter(); + ~MockPacketWriter() override; + + MOCK_METHOD2(WritePacket, + int(scoped_refptr, + const net::CompletionCallback&)); +}; + +class MockConnectionErrorObserver : public ConnectionErrorObserver { + public: + MockConnectionErrorObserver(); + ~MockConnectionErrorObserver() override; + + MOCK_METHOD1(OnConnectionError, void(int error)); +}; + class MockBlimpMessageProcessor : public BlimpMessageProcessor { public: MockBlimpMessageProcessor(); -- cgit v1.1