summaryrefslogtreecommitdiffstats
path: root/blimp/net
diff options
context:
space:
mode:
authorhaibinlu <haibinlu@chromium.org>2016-03-03 18:32:37 -0800
committerCommit bot <commit-bot@chromium.org>2016-03-04 02:33:48 +0000
commit3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48 (patch)
tree332f9a1155bd3c4be3697ea17ac1eb953adf2353 /blimp/net
parent0621b14f1b6cf6d7354a8ed52f45f943628f0405 (diff)
downloadchromium_src-3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48.zip
chromium_src-3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48.tar.gz
chromium_src-3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48.tar.bz2
-- Removes duplicate feature registration code from client&engine session and NetworkComponents.
-- Adds ThreadPipeManager for managing thread pipes and registering features across UI and IO threads. -- Makes ThreadPipeManager not depend on NetworkComponents. -- Adds unit test for ThreadPipeManager BUG=574884 Review URL: https://codereview.chromium.org/1647193003 Cr-Commit-Position: refs/heads/master@{#379182}
Diffstat (limited to 'blimp/net')
-rw-r--r--blimp/net/BUILD.gn3
-rw-r--r--blimp/net/blimp_message_checkpointer.h2
-rw-r--r--blimp/net/blimp_message_demultiplexer.h2
-rw-r--r--blimp/net/blimp_message_multiplexer.h2
-rw-r--r--blimp/net/blimp_message_output_buffer.h2
-rw-r--r--blimp/net/browser_connection_handler.h4
-rw-r--r--blimp/net/thread_pipe_manager.cc113
-rw-r--r--blimp/net/thread_pipe_manager.h65
-rw-r--r--blimp/net/thread_pipe_manager_unittest.cc180
9 files changed, 372 insertions, 1 deletions
diff --git a/blimp/net/BUILD.gn b/blimp/net/BUILD.gn
index 28c00cd..0fe0cfd 100644
--- a/blimp/net/BUILD.gn
+++ b/blimp/net/BUILD.gn
@@ -54,6 +54,8 @@ component("blimp_net") {
"tcp_client_transport.h",
"tcp_engine_transport.cc",
"tcp_engine_transport.h",
+ "thread_pipe_manager.cc",
+ "thread_pipe_manager.h",
]
defines = [ "BLIMP_NET_IMPLEMENTATION=1" ]
@@ -102,6 +104,7 @@ source_set("unit_tests") {
"stream_packet_reader_unittest.cc",
"stream_packet_writer_unittest.cc",
"tcp_transport_unittest.cc",
+ "thread_pipe_manager_unittest.cc",
]
deps = [
diff --git a/blimp/net/blimp_message_checkpointer.h b/blimp/net/blimp_message_checkpointer.h
index 66c9894..cd5fd49 100644
--- a/blimp/net/blimp_message_checkpointer.h
+++ b/blimp/net/blimp_message_checkpointer.h
@@ -23,6 +23,8 @@ class BlimpMessageCheckpointObserver;
// Checkpoint/ACK message dispatch may be deferred for a second or
// two to avoid saturating the link with ACK traffic; feature implementations
// need to account for this latency in their design.
+// BlimpMessageCheckpointer is created on the UI thread, and then used and
+// destroyed on the IO thread.
class BLIMP_NET_EXPORT BlimpMessageCheckpointer : public BlimpMessageProcessor {
public:
BlimpMessageCheckpointer(BlimpMessageProcessor* incoming_processor,
diff --git a/blimp/net/blimp_message_demultiplexer.h b/blimp/net/blimp_message_demultiplexer.h
index eb6771eb..07854aa 100644
--- a/blimp/net/blimp_message_demultiplexer.h
+++ b/blimp/net/blimp_message_demultiplexer.h
@@ -18,6 +18,8 @@ namespace blimp {
// Multiplexing BlimpMessageProcessor which routes BlimpMessages to message
// processors based on |message.type|.
+// BlimpMessageDemultiplexer is created on the UI thread, and then used and
+// destroyed on the IO thread.
class BLIMP_NET_EXPORT BlimpMessageDemultiplexer
: public BlimpMessageProcessor {
public:
diff --git a/blimp/net/blimp_message_multiplexer.h b/blimp/net/blimp_message_multiplexer.h
index 27713af..af4c4bb 100644
--- a/blimp/net/blimp_message_multiplexer.h
+++ b/blimp/net/blimp_message_multiplexer.h
@@ -17,6 +17,8 @@ class BlimpMessageProcessor;
// Creates MessageProcessors that receive outgoing messages and put them
// onto a multiplexed message stream.
+// BlimpMessageMultiplexer is created on the UI thread, and then used and
+// destroyed on the IO thread.
class BLIMP_NET_EXPORT BlimpMessageMultiplexer {
public:
// |output_processor|: A pointer to the MessageProcessor that will receive the
diff --git a/blimp/net/blimp_message_output_buffer.h b/blimp/net/blimp_message_output_buffer.h
index c40aea3..97bd2ff 100644
--- a/blimp/net/blimp_message_output_buffer.h
+++ b/blimp/net/blimp_message_output_buffer.h
@@ -28,6 +28,8 @@ class BlimpConnection;
// message acknowledgment.
// (Redelivery will be used in a future CL to implement Fast Recovery
// of dropped connections.)
+// BlimpMessageOutputBuffer is created on the UI thread, and then used and
+// destroyed on the IO thread.
class BLIMP_NET_EXPORT BlimpMessageOutputBuffer
: public BlimpMessageProcessor,
public BlimpMessageCheckpointObserver {
diff --git a/blimp/net/browser_connection_handler.h b/blimp/net/browser_connection_handler.h
index 74bb7f2..0304ae1 100644
--- a/blimp/net/browser_connection_handler.h
+++ b/blimp/net/browser_connection_handler.h
@@ -25,6 +25,8 @@ class BlimpMessageProcessor;
// messages out via underlying BlimpConnection.
// A BrowserConnectionHandler is created on browser startup, and persists for
// the lifetime of the application.
+// BrowserConnectionHandler is created on the UI thread, and then used and
+// destroyed on the IO thread.
class BLIMP_NET_EXPORT BrowserConnectionHandler
: public ConnectionHandler,
public ConnectionErrorObserver {
@@ -38,7 +40,7 @@ class BLIMP_NET_EXPORT BrowserConnectionHandler
// this object is in-use.
//
// Returns a BlimpMessageProcessor object for sending messages of type |type|.
- scoped_ptr<BlimpMessageProcessor> RegisterFeature(
+ virtual scoped_ptr<BlimpMessageProcessor> RegisterFeature(
BlimpMessage::Type type,
BlimpMessageProcessor* incoming_processor);
diff --git a/blimp/net/thread_pipe_manager.cc b/blimp/net/thread_pipe_manager.cc
new file mode 100644
index 0000000..95bc350
--- /dev/null
+++ b/blimp/net/thread_pipe_manager.cc
@@ -0,0 +1,113 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "blimp/net/thread_pipe_manager.h"
+
+#include "base/location.h"
+#include "base/sequenced_task_runner.h"
+#include "blimp/net/blimp_message_processor.h"
+#include "blimp/net/blimp_message_thread_pipe.h"
+#include "blimp/net/browser_connection_handler.h"
+
+namespace blimp {
+
+// IoThreadPipeManager is created on the UI thread, and then used and destroyed
+// on the IO thread.
+// It works with |connection_handler| to register features on the IO thread,
+// and manages IO-thread-side BlimpMessageThreadPipes.
+class IoThreadPipeManager {
+ public:
+ explicit IoThreadPipeManager(BrowserConnectionHandler* connection_handler);
+ virtual ~IoThreadPipeManager();
+
+ // Connects message pipes between the specified feature and the network layer,
+ // using |incoming_proxy| as the incoming message processor, and connecting
+ // |outgoing_pipe| to the actual message sender.
+ void RegisterFeature(BlimpMessage::Type type,
+ scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe,
+ scoped_ptr<BlimpMessageProcessor> incoming_proxy);
+
+ private:
+ BrowserConnectionHandler* connection_handler_;
+
+ // Container for the feature-specific MessageProcessors.
+ // IO-side proxy for sending messages to UI thread.
+ std::vector<scoped_ptr<BlimpMessageProcessor>> incoming_proxies_;
+
+ // Containers for the MessageProcessors used to write feature-specific
+ // messages to the network, and the thread-pipe endpoints through which
+ // they are used from the UI thread.
+ std::vector<scoped_ptr<BlimpMessageProcessor>> outgoing_message_processors_;
+ std::vector<scoped_ptr<BlimpMessageThreadPipe>> outgoing_pipes_;
+
+ DISALLOW_COPY_AND_ASSIGN(IoThreadPipeManager);
+};
+
+IoThreadPipeManager::IoThreadPipeManager(
+ BrowserConnectionHandler* connection_handler)
+ : connection_handler_(connection_handler) {
+ DCHECK(connection_handler_);
+}
+
+IoThreadPipeManager::~IoThreadPipeManager() {}
+
+void IoThreadPipeManager::RegisterFeature(
+ BlimpMessage::Type type,
+ scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe,
+ scoped_ptr<BlimpMessageProcessor> incoming_proxy) {
+ // Registers |incoming_proxy| as the message processor for incoming
+ // messages with |type|. Sets the returned outgoing message processor as the
+ // target of the |outgoing_pipe|.
+ scoped_ptr<BlimpMessageProcessor> outgoing_message_processor =
+ connection_handler_->RegisterFeature(type, incoming_proxy.get());
+ outgoing_pipe->set_target_processor(outgoing_message_processor.get());
+
+ // This object manages the lifetimes of the pipe, proxy and target processor.
+ incoming_proxies_.push_back(std::move(incoming_proxy));
+ outgoing_pipes_.push_back(std::move(outgoing_pipe));
+ outgoing_message_processors_.push_back(std::move(outgoing_message_processor));
+}
+
+ThreadPipeManager::ThreadPipeManager(
+ const scoped_refptr<base::SequencedTaskRunner>& io_task_runner,
+ const scoped_refptr<base::SequencedTaskRunner>& ui_task_runner,
+ BrowserConnectionHandler* connection_handler)
+ : io_task_runner_(io_task_runner),
+ ui_task_runner_(ui_task_runner),
+ io_pipe_manager_(new IoThreadPipeManager(connection_handler)) {}
+
+ThreadPipeManager::~ThreadPipeManager() {
+ io_task_runner_->DeleteSoon(FROM_HERE, io_pipe_manager_.release());
+}
+
+scoped_ptr<BlimpMessageProcessor> ThreadPipeManager::RegisterFeature(
+ BlimpMessage::Type type,
+ BlimpMessageProcessor* incoming_processor) {
+ // Creates an outgoing pipe and a proxy for forwarding messages
+ // from features on the UI thread to network components on the IO thread.
+ scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe(
+ new BlimpMessageThreadPipe(io_task_runner_));
+ scoped_ptr<BlimpMessageProcessor> outgoing_proxy =
+ outgoing_pipe->CreateProxy();
+
+ // Creates an incoming pipe and a proxy for receiving messages
+ // from network components on the IO thread.
+ scoped_ptr<BlimpMessageThreadPipe> incoming_pipe(
+ new BlimpMessageThreadPipe(ui_task_runner_));
+ incoming_pipe->set_target_processor(incoming_processor);
+ scoped_ptr<BlimpMessageProcessor> incoming_proxy =
+ incoming_pipe->CreateProxy();
+
+ // Finishes registration on IO thread.
+ io_task_runner_->PostTask(
+ FROM_HERE, base::Bind(&IoThreadPipeManager::RegisterFeature,
+ base::Unretained(io_pipe_manager_.get()), type,
+ base::Passed(std::move(outgoing_pipe)),
+ base::Passed(std::move(incoming_proxy))));
+
+ incoming_pipes_.push_back(std::move(incoming_pipe));
+ return outgoing_proxy;
+}
+
+} // namespace blimp
diff --git a/blimp/net/thread_pipe_manager.h b/blimp/net/thread_pipe_manager.h
new file mode 100644
index 0000000..55bf5b6
--- /dev/null
+++ b/blimp/net/thread_pipe_manager.h
@@ -0,0 +1,65 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BLIMP_NET_THREAD_PIPE_MANAGER_H_
+#define BLIMP_NET_THREAD_PIPE_MANAGER_H_
+
+#include <vector>
+
+#include "base/macros.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "blimp/common/proto/blimp_message.pb.h"
+#include "blimp/net/blimp_net_export.h"
+
+namespace base {
+class SequencedTaskRunner;
+} // namespace base
+
+namespace blimp {
+
+class BlimpMessageProcessor;
+class BlimpMessageThreadPipe;
+class BrowserConnectionHandler;
+class IoThreadPipeManager;
+
+// This class is used on the UI thread for registering features and setting up
+// BlimpMessageThreadPipes for communicating with |connection_handler| on the
+// IO thread.
+class BLIMP_NET_EXPORT ThreadPipeManager {
+ public:
+ // Caller is responsible for ensuring that |connection_handler| outlives
+ // |this|.
+ explicit ThreadPipeManager(
+ const scoped_refptr<base::SequencedTaskRunner>& io_task_runner,
+ const scoped_refptr<base::SequencedTaskRunner>& ui_task_runner,
+ BrowserConnectionHandler* connection_handler);
+
+ ~ThreadPipeManager();
+
+ // Registers a message processor |incoming_processor| which will receive all
+ // messages of the |type| specified. Returns a BlimpMessageProcessor object
+ // for sending messages of type |type|.
+ scoped_ptr<BlimpMessageProcessor> RegisterFeature(
+ BlimpMessage::Type type,
+ BlimpMessageProcessor* incoming_processor);
+
+ private:
+ scoped_refptr<base::SequencedTaskRunner> io_task_runner_;
+ scoped_refptr<base::SequencedTaskRunner> ui_task_runner_;
+
+ // Container for BlimpMessageThreadPipes that are destroyed on IO thread.
+ scoped_ptr<IoThreadPipeManager> io_pipe_manager_;
+
+ // Pipes for routing messages from the IO to the UI thread.
+ // Incoming messages are only routed to the UI thread since all features run
+ // there.
+ std::vector<scoped_ptr<BlimpMessageThreadPipe>> incoming_pipes_;
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadPipeManager);
+};
+
+} // namespace blimp
+
+#endif // BLIMP_NET_THREAD_PIPE_MANAGER_H_
diff --git a/blimp/net/thread_pipe_manager_unittest.cc b/blimp/net/thread_pipe_manager_unittest.cc
new file mode 100644
index 0000000..f36ef48
--- /dev/null
+++ b/blimp/net/thread_pipe_manager_unittest.cc
@@ -0,0 +1,180 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "blimp/net/thread_pipe_manager.h"
+
+#include "base/location.h"
+#include "base/memory/ref_counted.h"
+#include "base/message_loop/message_loop.h"
+#include "base/threading/sequenced_task_runner_handle.h"
+#include "base/threading/thread.h"
+#include "blimp/net/blimp_message_thread_pipe.h"
+#include "blimp/net/browser_connection_handler.h"
+#include "blimp/net/test_common.h"
+#include "net/base/net_errors.h"
+#include "net/base/test_completion_callback.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using testing::_;
+using testing::SaveArg;
+
+namespace blimp {
+namespace {
+
+scoped_ptr<BlimpMessage> CreateMessage(BlimpMessage::Type type) {
+ scoped_ptr<BlimpMessage> output(new BlimpMessage);
+ output->set_type(type);
+ return output;
+}
+
+// A feature that registers itself with ThreadPipeManager.
+class FakeFeature {
+ public:
+ FakeFeature(BlimpMessage::Type type, ThreadPipeManager* pipe_manager_) {
+ outgoing_message_processor_ =
+ pipe_manager_->RegisterFeature(type, &incoming_message_processor_);
+ }
+
+ ~FakeFeature() {}
+
+ BlimpMessageProcessor* outgoing_message_processor() {
+ return outgoing_message_processor_.get();
+ }
+
+ MockBlimpMessageProcessor* incoming_message_processor() {
+ return &incoming_message_processor_;
+ }
+
+ private:
+ testing::StrictMock<MockBlimpMessageProcessor> incoming_message_processor_;
+ scoped_ptr<BlimpMessageProcessor> outgoing_message_processor_;
+};
+
+// A feature peer on |thread_| that forwards incoming messages to
+// |message_processor|.
+class FakeFeaturePeer : public BlimpMessageProcessor {
+ public:
+ FakeFeaturePeer(BlimpMessage::Type type,
+ BlimpMessageProcessor* message_processor,
+ const scoped_refptr<base::SequencedTaskRunner>& task_runner)
+ : type_(type),
+ message_processor_(message_processor),
+ task_runner_(task_runner) {}
+
+ ~FakeFeaturePeer() override {}
+
+ private:
+ void ForwardMessage(scoped_ptr<BlimpMessage> message) {
+ DCHECK(task_runner_->RunsTasksOnCurrentThread());
+ message_processor_->ProcessMessage(std::move(message),
+ net::CompletionCallback());
+ }
+
+ // BlimpMessageProcessor implementation.
+ void ProcessMessage(scoped_ptr<BlimpMessage> message,
+ const net::CompletionCallback& callback) override {
+ DCHECK(task_runner_->RunsTasksOnCurrentThread());
+ ASSERT_EQ(type_, message->type());
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE, base::Bind(&FakeFeaturePeer::ForwardMessage,
+ base::Unretained(this), base::Passed(&message)));
+ if (!callback.is_null())
+ callback.Run(net::OK);
+ }
+
+ BlimpMessage::Type type_;
+ BlimpMessageProcessor* message_processor_ = nullptr;
+ scoped_refptr<base::SequencedTaskRunner> task_runner_;
+};
+
+// A browser connection handler that returns FakeFeaturePeer to allow it
+// forwarding message back so that FakeFeature can check message it receives
+// with one it just sent.
+class FakeBrowserConnectionHandler : public BrowserConnectionHandler {
+ public:
+ FakeBrowserConnectionHandler(
+ const scoped_refptr<base::SequencedTaskRunner>& task_runner)
+ : task_runner_(task_runner) {}
+ scoped_ptr<BlimpMessageProcessor> RegisterFeature(
+ BlimpMessage::Type type,
+ BlimpMessageProcessor* incoming_processor) override {
+ DCHECK(task_runner_->RunsTasksOnCurrentThread());
+ return make_scoped_ptr(
+ new FakeFeaturePeer(type, incoming_processor, task_runner_));
+ }
+
+ private:
+ scoped_refptr<base::SequencedTaskRunner> task_runner_;
+};
+
+} // namespace
+
+class ThreadPipeManagerTest : public testing::Test {
+ public:
+ ThreadPipeManagerTest() : thread_("IoThread") {}
+
+ ~ThreadPipeManagerTest() override {}
+
+ void SetUp() override {
+ ASSERT_TRUE(thread_.Start());
+ connection_handler_ = make_scoped_ptr(
+ new FakeBrowserConnectionHandler(thread_.task_runner()));
+ pipe_manager_ = make_scoped_ptr(new ThreadPipeManager(
+ thread_.task_runner(), base::SequencedTaskRunnerHandle::Get(),
+ connection_handler_.get()));
+
+ input_feature_.reset(
+ new FakeFeature(BlimpMessage::INPUT, pipe_manager_.get()));
+ tab_control_feature_.reset(
+ new FakeFeature(BlimpMessage::TAB_CONTROL, pipe_manager_.get()));
+ }
+
+ void TearDown() override { SynchronizeWithThread(); }
+
+ // Synchronize with |thread_| to ensure that any pending work is done.
+ void SynchronizeWithThread() {
+ net::TestCompletionCallback cb;
+ thread_.task_runner()->PostTaskAndReply(FROM_HERE,
+ base::Bind(&base::DoNothing),
+ base::Bind(cb.callback(), net::OK));
+ ASSERT_EQ(net::OK, cb.WaitForResult());
+ }
+
+ protected:
+ base::MessageLoop message_loop_;
+ scoped_ptr<BrowserConnectionHandler> connection_handler_;
+ scoped_ptr<ThreadPipeManager> pipe_manager_;
+ base::Thread thread_;
+
+ scoped_ptr<FakeFeature> input_feature_;
+ scoped_ptr<FakeFeature> tab_control_feature_;
+};
+
+// Features send out message and receive the same message due to
+// |FakeFeaturePeer| loops the message back on |thread_|.
+TEST_F(ThreadPipeManagerTest, MessageSentIsReceived) {
+ scoped_ptr<BlimpMessage> input_message = CreateMessage(BlimpMessage::INPUT);
+ scoped_ptr<BlimpMessage> tab_control_message =
+ CreateMessage(BlimpMessage::TAB_CONTROL);
+
+ EXPECT_CALL(*(input_feature_->incoming_message_processor()),
+ MockableProcessMessage(EqualsProto(*input_message), _))
+ .RetiresOnSaturation();
+ EXPECT_CALL(*(tab_control_feature_->incoming_message_processor()),
+ MockableProcessMessage(EqualsProto(*tab_control_message), _))
+ .RetiresOnSaturation();
+
+ net::TestCompletionCallback cb1;
+ input_feature_->outgoing_message_processor()->ProcessMessage(
+ std::move(input_message), cb1.callback());
+ net::TestCompletionCallback cb2;
+ tab_control_feature_->outgoing_message_processor()->ProcessMessage(
+ std::move(tab_control_message), cb2.callback());
+
+ EXPECT_EQ(net::OK, cb1.WaitForResult());
+ EXPECT_EQ(net::OK, cb2.WaitForResult());
+}
+
+} // namespace blimp