summaryrefslogtreecommitdiffstats
path: root/blimp/net
diff options
context:
space:
mode:
authorhaibinlu <haibinlu@chromium.org>2015-11-19 17:43:04 -0800
committerCommit bot <commit-bot@chromium.org>2015-11-20 01:43:47 +0000
commit0131077a38a9455e6270d4b1eb474bab34fae1fd (patch)
tree7301bcf0d8d7b4ffab3c5f38f18d9c062732cd0b /blimp/net
parenteb3cd8a10ed6065f7e5b1dbed076e4da208cf154 (diff)
downloadchromium_src-0131077a38a9455e6270d4b1eb474bab34fae1fd.zip
chromium_src-0131077a38a9455e6270d4b1eb474bab34fae1fd.tar.gz
chromium_src-0131077a38a9455e6270d4b1eb474bab34fae1fd.tar.bz2
[Blimp Net] Implement BlimpConnection.
Review URL: https://codereview.chromium.org/1460593002 Cr-Commit-Position: refs/heads/master@{#360712}
Diffstat (limited to 'blimp/net')
-rw-r--r--blimp/net/BUILD.gn1
-rw-r--r--blimp/net/blimp_connection.cc90
-rw-r--r--blimp/net/blimp_connection.h3
-rw-r--r--blimp/net/blimp_connection_unittest.cc196
-rw-r--r--blimp/net/blimp_message_pump_unittest.cc9
-rw-r--r--blimp/net/test_common.cc8
-rw-r--r--blimp/net/test_common.h39
7 files changed, 335 insertions, 11 deletions
diff --git a/blimp/net/BUILD.gn b/blimp/net/BUILD.gn
index cc0263e..e745099 100644
--- a/blimp/net/BUILD.gn
+++ b/blimp/net/BUILD.gn
@@ -52,6 +52,7 @@ source_set("unit_tests") {
testonly = true
sources = [
+ "blimp_connection_unittest.cc",
"blimp_message_demultiplexer_unittest.cc",
"blimp_message_multiplexer_unittest.cc",
"blimp_message_pump_unittest.cc",
diff --git a/blimp/net/blimp_connection.cc b/blimp/net/blimp_connection.cc
index e9fc41e..e7db062 100644
--- a/blimp/net/blimp_connection.cc
+++ b/blimp/net/blimp_connection.cc
@@ -4,20 +4,101 @@
#include "blimp/net/blimp_connection.h"
+#include "base/callback_helpers.h"
+#include "base/logging.h"
#include "base/macros.h"
+#include "base/message_loop/message_loop.h"
+#include "blimp/common/proto/blimp_message.pb.h"
#include "blimp/net/blimp_message_processor.h"
#include "blimp/net/blimp_message_pump.h"
+#include "blimp/net/common.h"
#include "blimp/net/connection_error_observer.h"
#include "blimp/net/packet_reader.h"
#include "blimp/net/packet_writer.h"
+#include "net/base/completion_callback.h"
namespace blimp {
+namespace {
+
+// Forwards incoming blimp messages to PacketWriter.
+class BlimpMessageSender : public BlimpMessageProcessor {
+ public:
+ explicit BlimpMessageSender(PacketWriter* writer);
+ ~BlimpMessageSender() override;
+
+ void set_error_observer(ConnectionErrorObserver* observer) {
+ error_observer_ = observer;
+ }
+
+ // BlimpMessageProcessor implementation.
+ void ProcessMessage(scoped_ptr<BlimpMessage> message,
+ const net::CompletionCallback& callback) override;
+
+ private:
+ void OnWritePacketComplete(int result);
+
+ PacketWriter* writer_;
+ ConnectionErrorObserver* error_observer_;
+ scoped_refptr<net::DrainableIOBuffer> buffer_;
+ net::CancelableCompletionCallback write_packet_callback_;
+ net::CompletionCallback pending_process_msg_callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender);
+};
+
+BlimpMessageSender::BlimpMessageSender(PacketWriter* writer)
+ : writer_(writer),
+ buffer_(new net::DrainableIOBuffer(
+ new net::IOBuffer(kMaxPacketPayloadSizeBytes),
+ kMaxPacketPayloadSizeBytes)) {
+ DCHECK(writer_);
+}
+
+BlimpMessageSender::~BlimpMessageSender() {}
+
+void BlimpMessageSender::ProcessMessage(
+ scoped_ptr<BlimpMessage> message,
+ const net::CompletionCallback& callback) {
+ if (message->ByteSize() > static_cast<int>(kMaxPacketPayloadSizeBytes)) {
+ DLOG(ERROR) << "Message is too big, size=" << message->ByteSize();
+ callback.Run(net::ERR_MSG_TOO_BIG);
+ return;
+ }
+
+ buffer_->SetOffset(0);
+ if (!message->SerializeToArray(buffer_->data(), message->ByteSize())) {
+ DLOG(ERROR) << "Failed to serialize message.";
+ callback.Run(net::ERR_INVALID_ARGUMENT);
+ return;
+ }
+
+ write_packet_callback_.Reset(base::Bind(
+ &BlimpMessageSender::OnWritePacketComplete, base::Unretained(this)));
+ pending_process_msg_callback_ = callback;
+ int result = writer_->WritePacket(buffer_, write_packet_callback_.callback());
+ if (result != net::ERR_IO_PENDING) {
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE, base::Bind(write_packet_callback_.callback(), result));
+ }
+}
+
+void BlimpMessageSender::OnWritePacketComplete(int result) {
+ DCHECK_NE(net::ERR_IO_PENDING, result);
+ write_packet_callback_.Cancel();
+ base::ResetAndReturn(&pending_process_msg_callback_).Run(result);
+ if (result != net::OK) {
+ error_observer_->OnConnectionError(result);
+ }
+}
+
+} // namespace
BlimpConnection::BlimpConnection(scoped_ptr<PacketReader> reader,
scoped_ptr<PacketWriter> writer)
: reader_(reader.Pass()),
message_pump_(new BlimpMessagePump(reader_.get())),
- writer_(writer.Pass()) {
+ writer_(writer.Pass()),
+ outgoing_msg_processor_(new BlimpMessageSender(writer_.get())) {
DCHECK(writer_);
}
@@ -26,6 +107,9 @@ BlimpConnection::~BlimpConnection() {}
void BlimpConnection::SetConnectionErrorObserver(
ConnectionErrorObserver* observer) {
message_pump_->set_error_observer(observer);
+ BlimpMessageSender* sender =
+ static_cast<BlimpMessageSender*>(outgoing_msg_processor_.get());
+ sender->set_error_observer(observer);
}
void BlimpConnection::SetIncomingMessageProcessor(
@@ -33,4 +117,8 @@ void BlimpConnection::SetIncomingMessageProcessor(
message_pump_->SetMessageProcessor(processor);
}
+BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() const {
+ return outgoing_msg_processor_.get();
+}
+
} // namespace blimp
diff --git a/blimp/net/blimp_connection.h b/blimp/net/blimp_connection.h
index a48521a..9f48f50 100644
--- a/blimp/net/blimp_connection.h
+++ b/blimp/net/blimp_connection.h
@@ -35,12 +35,13 @@ class BLIMP_NET_EXPORT BlimpConnection {
void SetIncomingMessageProcessor(BlimpMessageProcessor* processor);
// Gets a processor for BrowserSession->BlimpConnection message routing.
- scoped_ptr<BlimpMessageProcessor> take_outgoing_message_processor() const;
+ BlimpMessageProcessor* GetOutgoingMessageProcessor() const;
private:
scoped_ptr<PacketReader> reader_;
scoped_ptr<BlimpMessagePump> message_pump_;
scoped_ptr<PacketWriter> writer_;
+ scoped_ptr<BlimpMessageProcessor> outgoing_msg_processor_;
DISALLOW_COPY_AND_ASSIGN(BlimpConnection);
};
diff --git a/blimp/net/blimp_connection_unittest.cc b/blimp/net/blimp_connection_unittest.cc
new file mode 100644
index 0000000..82a547b
--- /dev/null
+++ b/blimp/net/blimp_connection_unittest.cc
@@ -0,0 +1,196 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <stddef.h>
+#include <string>
+
+#include "base/callback_helpers.h"
+#include "base/message_loop/message_loop.h"
+#include "blimp/common/proto/blimp_message.pb.h"
+#include "blimp/net/blimp_connection.h"
+#include "blimp/net/common.h"
+#include "blimp/net/connection_error_observer.h"
+#include "blimp/net/test_common.h"
+#include "net/base/completion_callback.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/base/test_completion_callback.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using testing::_;
+using testing::DoAll;
+using testing::InSequence;
+using testing::NotNull;
+using testing::Return;
+using testing::SaveArg;
+
+namespace blimp {
+namespace {
+
+class BlimpConnectionTest : public testing::Test {
+ public:
+ BlimpConnectionTest() {
+ message1_ = CreateInputMessage().Pass();
+ message2_ = CreateControlMessage().Pass();
+ scoped_ptr<testing::StrictMock<MockPacketReader>> reader(
+ new testing::StrictMock<MockPacketReader>);
+ reader_ = reader.get();
+ scoped_ptr<testing::StrictMock<MockPacketWriter>> writer(
+ new testing::StrictMock<MockPacketWriter>);
+ writer_ = writer.get();
+ connection_.reset(new BlimpConnection(reader.Pass(), writer.Pass()));
+ connection_->SetConnectionErrorObserver(&error_observer_);
+ }
+
+ ~BlimpConnectionTest() override {}
+
+ scoped_ptr<BlimpMessage> CreateInputMessage() {
+ scoped_ptr<BlimpMessage> msg(new BlimpMessage);
+ msg->set_type(BlimpMessage::INPUT);
+ return msg.Pass();
+ }
+
+ scoped_ptr<BlimpMessage> CreateControlMessage() {
+ scoped_ptr<BlimpMessage> msg(new BlimpMessage);
+ msg->set_type(BlimpMessage::CONTROL);
+ return msg.Pass();
+ }
+
+ protected:
+ base::MessageLoopForIO message_loop_;
+ scoped_ptr<BlimpMessage> message1_;
+ scoped_ptr<BlimpMessage> message2_;
+
+ testing::StrictMock<MockPacketReader>* reader_;
+ testing::StrictMock<MockPacketWriter>* writer_;
+ testing::StrictMock<MockConnectionErrorObserver> error_observer_;
+ testing::StrictMock<MockBlimpMessageProcessor> receiver_;
+ scoped_ptr<BlimpConnection> connection_;
+};
+
+// Reader completes reading one packet synchronously.
+// Two read cases here. BlimpMessagePumpTest covers other cases.
+TEST_F(BlimpConnectionTest, SyncPacketRead) {
+ EXPECT_CALL(receiver_, MockableProcessMessage(EqualsProto(*message1_), _));
+ EXPECT_CALL(*reader_, ReadPacket(NotNull(), _))
+ .WillOnce(DoAll(FillBufferFromMessage<0>(message1_.get()),
+ Return(message1_->ByteSize())));
+ connection_->SetIncomingMessageProcessor(&receiver_);
+}
+
+// Reader completes reading one packet synchronously withe error.
+TEST_F(BlimpConnectionTest, SyncPacketReadWithError) {
+ InSequence s;
+ EXPECT_CALL(*reader_, ReadPacket(NotNull(), _))
+ .WillOnce(Return(net::ERR_FAILED));
+ EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED));
+ connection_->SetIncomingMessageProcessor(&receiver_);
+}
+
+// Writer completes writing two packets synchronously.
+TEST_F(BlimpConnectionTest, SyncTwoPacketsWrite) {
+ InSequence s;
+ EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _))
+ .WillOnce(Return(net::OK))
+ .RetiresOnSaturation();
+ EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _))
+ .WillOnce(Return(net::OK))
+ .RetiresOnSaturation();
+
+ BlimpMessageProcessor* sender = connection_->GetOutgoingMessageProcessor();
+ net::TestCompletionCallback complete_cb_1;
+ sender->ProcessMessage(CreateInputMessage().Pass(), complete_cb_1.callback());
+ EXPECT_EQ(net::OK, complete_cb_1.WaitForResult());
+ net::TestCompletionCallback complete_cb_2;
+ sender->ProcessMessage(CreateControlMessage().Pass(),
+ complete_cb_2.callback());
+ EXPECT_EQ(net::OK, complete_cb_2.WaitForResult());
+}
+
+// Writer completes writing two packets synchronously.
+// First write succeeds, second fails.
+TEST_F(BlimpConnectionTest, SyncTwoPacketsWriteWithError) {
+ InSequence s;
+ EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _))
+ .WillOnce(Return(net::OK))
+ .RetiresOnSaturation();
+ EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _))
+ .WillOnce(Return(net::ERR_FAILED))
+ .RetiresOnSaturation();
+ EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED));
+
+ BlimpMessageProcessor* sender = connection_->GetOutgoingMessageProcessor();
+ net::TestCompletionCallback complete_cb_1;
+ sender->ProcessMessage(CreateInputMessage().Pass(), complete_cb_1.callback());
+ EXPECT_EQ(net::OK, complete_cb_1.WaitForResult());
+ net::TestCompletionCallback complete_cb_2;
+ sender->ProcessMessage(CreateControlMessage().Pass(),
+ complete_cb_2.callback());
+ EXPECT_EQ(net::ERR_FAILED, complete_cb_2.WaitForResult());
+}
+
+// Write completes writing two packets asynchronously.
+TEST_F(BlimpConnectionTest, AsyncTwoPacketsWrite) {
+ net::CompletionCallback write_packet_cb;
+
+ InSequence s;
+ EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _))
+ .WillOnce(
+ DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING)))
+ .RetiresOnSaturation();
+ EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _))
+ .WillOnce(
+ DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING)))
+ .RetiresOnSaturation();
+
+ BlimpMessageProcessor* sender = connection_->GetOutgoingMessageProcessor();
+ net::TestCompletionCallback complete_cb_1;
+ ASSERT_TRUE(write_packet_cb.is_null());
+ sender->ProcessMessage(CreateInputMessage().Pass(), complete_cb_1.callback());
+ ASSERT_FALSE(write_packet_cb.is_null());
+ base::ResetAndReturn(&write_packet_cb).Run(net::OK);
+ EXPECT_EQ(net::OK, complete_cb_1.WaitForResult());
+
+ net::TestCompletionCallback complete_cb_2;
+ ASSERT_TRUE(write_packet_cb.is_null());
+ sender->ProcessMessage(CreateControlMessage().Pass(),
+ complete_cb_2.callback());
+ ASSERT_FALSE(write_packet_cb.is_null());
+ base::ResetAndReturn(&write_packet_cb).Run(net::OK);
+ EXPECT_EQ(net::OK, complete_cb_2.WaitForResult());
+}
+
+// Writer completes writing two packets asynchronously.
+// First write succeeds, second fails.
+TEST_F(BlimpConnectionTest, AsyncTwoPacketsWriteWithError) {
+ net::CompletionCallback write_packet_cb;
+
+ InSequence s;
+ EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message1_), _))
+ .WillOnce(
+ DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING)))
+ .RetiresOnSaturation();
+ EXPECT_CALL(*writer_, WritePacket(BufferEqualsProto(*message2_), _))
+ .WillOnce(
+ DoAll(SaveArg<1>(&write_packet_cb), Return(net::ERR_IO_PENDING)))
+ .RetiresOnSaturation();
+ EXPECT_CALL(error_observer_, OnConnectionError(net::ERR_FAILED));
+
+ BlimpMessageProcessor* sender = connection_->GetOutgoingMessageProcessor();
+ net::TestCompletionCallback complete_cb_1;
+ sender->ProcessMessage(CreateInputMessage().Pass(), complete_cb_1.callback());
+ base::ResetAndReturn(&write_packet_cb).Run(net::OK);
+ EXPECT_EQ(net::OK, complete_cb_1.WaitForResult());
+
+ net::TestCompletionCallback complete_cb_2;
+ sender->ProcessMessage(CreateControlMessage().Pass(),
+ complete_cb_2.callback());
+ base::ResetAndReturn(&write_packet_cb).Run(net::ERR_FAILED);
+ EXPECT_EQ(net::ERR_FAILED, complete_cb_2.WaitForResult());
+}
+
+} // namespace
+
+} // namespace blimp
diff --git a/blimp/net/blimp_message_pump_unittest.cc b/blimp/net/blimp_message_pump_unittest.cc
index 65aaec9..ec7987e 100644
--- a/blimp/net/blimp_message_pump_unittest.cc
+++ b/blimp/net/blimp_message_pump_unittest.cc
@@ -27,11 +27,6 @@ using testing::SaveArg;
namespace blimp {
namespace {
-class MockConnectionErrorObserver : public ConnectionErrorObserver {
- public:
- MOCK_METHOD1(OnConnectionError, void(int error));
-};
-
class BlimpMessagePumpTest : public testing::Test {
public:
BlimpMessagePumpTest()
@@ -49,8 +44,8 @@ class BlimpMessagePumpTest : public testing::Test {
scoped_ptr<BlimpMessage> message2_;
testing::StrictMock<MockPacketReader> reader_;
- MockConnectionErrorObserver error_observer_;
- MockBlimpMessageProcessor receiver_;
+ testing::StrictMock<MockConnectionErrorObserver> error_observer_;
+ testing::StrictMock<MockBlimpMessageProcessor> receiver_;
scoped_ptr<BlimpMessagePump> message_pump_;
};
diff --git a/blimp/net/test_common.cc b/blimp/net/test_common.cc
index a3a73ad8..24f4ae3 100644
--- a/blimp/net/test_common.cc
+++ b/blimp/net/test_common.cc
@@ -21,6 +21,14 @@ MockPacketReader::MockPacketReader() {}
MockPacketReader::~MockPacketReader() {}
+MockPacketWriter::MockPacketWriter() {}
+
+MockPacketWriter::~MockPacketWriter() {}
+
+MockConnectionErrorObserver::MockConnectionErrorObserver() {}
+
+MockConnectionErrorObserver::~MockConnectionErrorObserver() {}
+
MockBlimpMessageProcessor::MockBlimpMessageProcessor() {}
MockBlimpMessageProcessor::~MockBlimpMessageProcessor() {}
diff --git a/blimp/net/test_common.h b/blimp/net/test_common.h
index eadc1e6..119f8ee 100644
--- a/blimp/net/test_common.h
+++ b/blimp/net/test_common.h
@@ -7,8 +7,11 @@
#include <string>
+#include "blimp/common/proto/blimp_message.pb.h"
#include "blimp/net/blimp_message_processor.h"
+#include "blimp/net/connection_error_observer.h"
#include "blimp/net/packet_reader.h"
+#include "blimp/net/packet_writer.h"
#include "net/socket/stream_socket.h"
#include "testing/gmock/include/gmock/gmock.h"
@@ -31,12 +34,26 @@ MATCHER_P(BufferEquals, expected, "") {
// Checks if two proto messages are the same.
// TODO(kmarshall): promote to a shared testing library.
MATCHER_P(EqualsProto, message, "") {
- std::string expected_serialized, actual_serialized;
+ std::string expected_serialized;
+ std::string actual_serialized;
message.SerializeToString(&expected_serialized);
arg.SerializeToString(&actual_serialized);
return expected_serialized == actual_serialized;
}
+// Checks if the contents of a buffer are an exact match with BlimpMessage.
+// arg (type: net::DrainableIOBuffer*) The buffer to check.
+// message (type: BlimpMessage) The message to compare with |arg|.
+MATCHER_P(BufferEqualsProto, message, "") {
+ BlimpMessage actual_message;
+ actual_message.ParseFromArray(arg->data(), arg->BytesRemaining());
+ std::string expected_serialized;
+ std::string actual_serialized;
+ message.SerializeToString(&expected_serialized);
+ actual_message.SerializeToString(&actual_serialized);
+ return expected_serialized == actual_serialized;
+}
+
// GMock action that writes data from a string to an IOBuffer.
//
// buf_idx (template parameter 0): 0-based index of the IOBuffer arg.
@@ -95,13 +112,31 @@ class MockStreamSocket : public net::StreamSocket {
class MockPacketReader : public PacketReader {
public:
MockPacketReader();
- virtual ~MockPacketReader();
+ ~MockPacketReader() override;
MOCK_METHOD2(ReadPacket,
int(const scoped_refptr<net::GrowableIOBuffer>&,
const net::CompletionCallback&));
};
+class MockPacketWriter : public PacketWriter {
+ public:
+ MockPacketWriter();
+ ~MockPacketWriter() override;
+
+ MOCK_METHOD2(WritePacket,
+ int(scoped_refptr<net::DrainableIOBuffer>,
+ const net::CompletionCallback&));
+};
+
+class MockConnectionErrorObserver : public ConnectionErrorObserver {
+ public:
+ MockConnectionErrorObserver();
+ ~MockConnectionErrorObserver() override;
+
+ MOCK_METHOD1(OnConnectionError, void(int error));
+};
+
class MockBlimpMessageProcessor : public BlimpMessageProcessor {
public:
MockBlimpMessageProcessor();