diff options
author | haibinlu <haibinlu@chromium.org> | 2016-03-03 18:32:37 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2016-03-04 02:33:48 +0000 |
commit | 3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48 (patch) | |
tree | 332f9a1155bd3c4be3697ea17ac1eb953adf2353 /blimp/net | |
parent | 0621b14f1b6cf6d7354a8ed52f45f943628f0405 (diff) | |
download | chromium_src-3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48.zip chromium_src-3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48.tar.gz chromium_src-3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48.tar.bz2 |
-- Removes duplicate feature registration code from client&engine session and NetworkComponents.
-- Adds ThreadPipeManager for managing thread pipes and registering features across UI and IO threads.
-- Makes ThreadPipeManager not depend on NetworkComponents.
-- Adds unit test for ThreadPipeManager
BUG=574884
Review URL: https://codereview.chromium.org/1647193003
Cr-Commit-Position: refs/heads/master@{#379182}
Diffstat (limited to 'blimp/net')
-rw-r--r-- | blimp/net/BUILD.gn | 3 | ||||
-rw-r--r-- | blimp/net/blimp_message_checkpointer.h | 2 | ||||
-rw-r--r-- | blimp/net/blimp_message_demultiplexer.h | 2 | ||||
-rw-r--r-- | blimp/net/blimp_message_multiplexer.h | 2 | ||||
-rw-r--r-- | blimp/net/blimp_message_output_buffer.h | 2 | ||||
-rw-r--r-- | blimp/net/browser_connection_handler.h | 4 | ||||
-rw-r--r-- | blimp/net/thread_pipe_manager.cc | 113 | ||||
-rw-r--r-- | blimp/net/thread_pipe_manager.h | 65 | ||||
-rw-r--r-- | blimp/net/thread_pipe_manager_unittest.cc | 180 |
9 files changed, 372 insertions, 1 deletions
diff --git a/blimp/net/BUILD.gn b/blimp/net/BUILD.gn index 28c00cd..0fe0cfd 100644 --- a/blimp/net/BUILD.gn +++ b/blimp/net/BUILD.gn @@ -54,6 +54,8 @@ component("blimp_net") { "tcp_client_transport.h", "tcp_engine_transport.cc", "tcp_engine_transport.h", + "thread_pipe_manager.cc", + "thread_pipe_manager.h", ] defines = [ "BLIMP_NET_IMPLEMENTATION=1" ] @@ -102,6 +104,7 @@ source_set("unit_tests") { "stream_packet_reader_unittest.cc", "stream_packet_writer_unittest.cc", "tcp_transport_unittest.cc", + "thread_pipe_manager_unittest.cc", ] deps = [ diff --git a/blimp/net/blimp_message_checkpointer.h b/blimp/net/blimp_message_checkpointer.h index 66c9894..cd5fd49 100644 --- a/blimp/net/blimp_message_checkpointer.h +++ b/blimp/net/blimp_message_checkpointer.h @@ -23,6 +23,8 @@ class BlimpMessageCheckpointObserver; // Checkpoint/ACK message dispatch may be deferred for a second or // two to avoid saturating the link with ACK traffic; feature implementations // need to account for this latency in their design. +// BlimpMessageCheckpointer is created on the UI thread, and then used and +// destroyed on the IO thread. class BLIMP_NET_EXPORT BlimpMessageCheckpointer : public BlimpMessageProcessor { public: BlimpMessageCheckpointer(BlimpMessageProcessor* incoming_processor, diff --git a/blimp/net/blimp_message_demultiplexer.h b/blimp/net/blimp_message_demultiplexer.h index eb6771eb..07854aa 100644 --- a/blimp/net/blimp_message_demultiplexer.h +++ b/blimp/net/blimp_message_demultiplexer.h @@ -18,6 +18,8 @@ namespace blimp { // Multiplexing BlimpMessageProcessor which routes BlimpMessages to message // processors based on |message.type|. +// BlimpMessageDemultiplexer is created on the UI thread, and then used and +// destroyed on the IO thread. class BLIMP_NET_EXPORT BlimpMessageDemultiplexer : public BlimpMessageProcessor { public: diff --git a/blimp/net/blimp_message_multiplexer.h b/blimp/net/blimp_message_multiplexer.h index 27713af..af4c4bb 100644 --- a/blimp/net/blimp_message_multiplexer.h +++ b/blimp/net/blimp_message_multiplexer.h @@ -17,6 +17,8 @@ class BlimpMessageProcessor; // Creates MessageProcessors that receive outgoing messages and put them // onto a multiplexed message stream. +// BlimpMessageMultiplexer is created on the UI thread, and then used and +// destroyed on the IO thread. class BLIMP_NET_EXPORT BlimpMessageMultiplexer { public: // |output_processor|: A pointer to the MessageProcessor that will receive the diff --git a/blimp/net/blimp_message_output_buffer.h b/blimp/net/blimp_message_output_buffer.h index c40aea3..97bd2ff 100644 --- a/blimp/net/blimp_message_output_buffer.h +++ b/blimp/net/blimp_message_output_buffer.h @@ -28,6 +28,8 @@ class BlimpConnection; // message acknowledgment. // (Redelivery will be used in a future CL to implement Fast Recovery // of dropped connections.) +// BlimpMessageOutputBuffer is created on the UI thread, and then used and +// destroyed on the IO thread. class BLIMP_NET_EXPORT BlimpMessageOutputBuffer : public BlimpMessageProcessor, public BlimpMessageCheckpointObserver { diff --git a/blimp/net/browser_connection_handler.h b/blimp/net/browser_connection_handler.h index 74bb7f2..0304ae1 100644 --- a/blimp/net/browser_connection_handler.h +++ b/blimp/net/browser_connection_handler.h @@ -25,6 +25,8 @@ class BlimpMessageProcessor; // messages out via underlying BlimpConnection. // A BrowserConnectionHandler is created on browser startup, and persists for // the lifetime of the application. +// BrowserConnectionHandler is created on the UI thread, and then used and +// destroyed on the IO thread. class BLIMP_NET_EXPORT BrowserConnectionHandler : public ConnectionHandler, public ConnectionErrorObserver { @@ -38,7 +40,7 @@ class BLIMP_NET_EXPORT BrowserConnectionHandler // this object is in-use. // // Returns a BlimpMessageProcessor object for sending messages of type |type|. - scoped_ptr<BlimpMessageProcessor> RegisterFeature( + virtual scoped_ptr<BlimpMessageProcessor> RegisterFeature( BlimpMessage::Type type, BlimpMessageProcessor* incoming_processor); diff --git a/blimp/net/thread_pipe_manager.cc b/blimp/net/thread_pipe_manager.cc new file mode 100644 index 0000000..95bc350 --- /dev/null +++ b/blimp/net/thread_pipe_manager.cc @@ -0,0 +1,113 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "blimp/net/thread_pipe_manager.h" + +#include "base/location.h" +#include "base/sequenced_task_runner.h" +#include "blimp/net/blimp_message_processor.h" +#include "blimp/net/blimp_message_thread_pipe.h" +#include "blimp/net/browser_connection_handler.h" + +namespace blimp { + +// IoThreadPipeManager is created on the UI thread, and then used and destroyed +// on the IO thread. +// It works with |connection_handler| to register features on the IO thread, +// and manages IO-thread-side BlimpMessageThreadPipes. +class IoThreadPipeManager { + public: + explicit IoThreadPipeManager(BrowserConnectionHandler* connection_handler); + virtual ~IoThreadPipeManager(); + + // Connects message pipes between the specified feature and the network layer, + // using |incoming_proxy| as the incoming message processor, and connecting + // |outgoing_pipe| to the actual message sender. + void RegisterFeature(BlimpMessage::Type type, + scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe, + scoped_ptr<BlimpMessageProcessor> incoming_proxy); + + private: + BrowserConnectionHandler* connection_handler_; + + // Container for the feature-specific MessageProcessors. + // IO-side proxy for sending messages to UI thread. + std::vector<scoped_ptr<BlimpMessageProcessor>> incoming_proxies_; + + // Containers for the MessageProcessors used to write feature-specific + // messages to the network, and the thread-pipe endpoints through which + // they are used from the UI thread. + std::vector<scoped_ptr<BlimpMessageProcessor>> outgoing_message_processors_; + std::vector<scoped_ptr<BlimpMessageThreadPipe>> outgoing_pipes_; + + DISALLOW_COPY_AND_ASSIGN(IoThreadPipeManager); +}; + +IoThreadPipeManager::IoThreadPipeManager( + BrowserConnectionHandler* connection_handler) + : connection_handler_(connection_handler) { + DCHECK(connection_handler_); +} + +IoThreadPipeManager::~IoThreadPipeManager() {} + +void IoThreadPipeManager::RegisterFeature( + BlimpMessage::Type type, + scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe, + scoped_ptr<BlimpMessageProcessor> incoming_proxy) { + // Registers |incoming_proxy| as the message processor for incoming + // messages with |type|. Sets the returned outgoing message processor as the + // target of the |outgoing_pipe|. + scoped_ptr<BlimpMessageProcessor> outgoing_message_processor = + connection_handler_->RegisterFeature(type, incoming_proxy.get()); + outgoing_pipe->set_target_processor(outgoing_message_processor.get()); + + // This object manages the lifetimes of the pipe, proxy and target processor. + incoming_proxies_.push_back(std::move(incoming_proxy)); + outgoing_pipes_.push_back(std::move(outgoing_pipe)); + outgoing_message_processors_.push_back(std::move(outgoing_message_processor)); +} + +ThreadPipeManager::ThreadPipeManager( + const scoped_refptr<base::SequencedTaskRunner>& io_task_runner, + const scoped_refptr<base::SequencedTaskRunner>& ui_task_runner, + BrowserConnectionHandler* connection_handler) + : io_task_runner_(io_task_runner), + ui_task_runner_(ui_task_runner), + io_pipe_manager_(new IoThreadPipeManager(connection_handler)) {} + +ThreadPipeManager::~ThreadPipeManager() { + io_task_runner_->DeleteSoon(FROM_HERE, io_pipe_manager_.release()); +} + +scoped_ptr<BlimpMessageProcessor> ThreadPipeManager::RegisterFeature( + BlimpMessage::Type type, + BlimpMessageProcessor* incoming_processor) { + // Creates an outgoing pipe and a proxy for forwarding messages + // from features on the UI thread to network components on the IO thread. + scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe( + new BlimpMessageThreadPipe(io_task_runner_)); + scoped_ptr<BlimpMessageProcessor> outgoing_proxy = + outgoing_pipe->CreateProxy(); + + // Creates an incoming pipe and a proxy for receiving messages + // from network components on the IO thread. + scoped_ptr<BlimpMessageThreadPipe> incoming_pipe( + new BlimpMessageThreadPipe(ui_task_runner_)); + incoming_pipe->set_target_processor(incoming_processor); + scoped_ptr<BlimpMessageProcessor> incoming_proxy = + incoming_pipe->CreateProxy(); + + // Finishes registration on IO thread. + io_task_runner_->PostTask( + FROM_HERE, base::Bind(&IoThreadPipeManager::RegisterFeature, + base::Unretained(io_pipe_manager_.get()), type, + base::Passed(std::move(outgoing_pipe)), + base::Passed(std::move(incoming_proxy)))); + + incoming_pipes_.push_back(std::move(incoming_pipe)); + return outgoing_proxy; +} + +} // namespace blimp diff --git a/blimp/net/thread_pipe_manager.h b/blimp/net/thread_pipe_manager.h new file mode 100644 index 0000000..55bf5b6 --- /dev/null +++ b/blimp/net/thread_pipe_manager.h @@ -0,0 +1,65 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BLIMP_NET_THREAD_PIPE_MANAGER_H_ +#define BLIMP_NET_THREAD_PIPE_MANAGER_H_ + +#include <vector> + +#include "base/macros.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "blimp/common/proto/blimp_message.pb.h" +#include "blimp/net/blimp_net_export.h" + +namespace base { +class SequencedTaskRunner; +} // namespace base + +namespace blimp { + +class BlimpMessageProcessor; +class BlimpMessageThreadPipe; +class BrowserConnectionHandler; +class IoThreadPipeManager; + +// This class is used on the UI thread for registering features and setting up +// BlimpMessageThreadPipes for communicating with |connection_handler| on the +// IO thread. +class BLIMP_NET_EXPORT ThreadPipeManager { + public: + // Caller is responsible for ensuring that |connection_handler| outlives + // |this|. + explicit ThreadPipeManager( + const scoped_refptr<base::SequencedTaskRunner>& io_task_runner, + const scoped_refptr<base::SequencedTaskRunner>& ui_task_runner, + BrowserConnectionHandler* connection_handler); + + ~ThreadPipeManager(); + + // Registers a message processor |incoming_processor| which will receive all + // messages of the |type| specified. Returns a BlimpMessageProcessor object + // for sending messages of type |type|. + scoped_ptr<BlimpMessageProcessor> RegisterFeature( + BlimpMessage::Type type, + BlimpMessageProcessor* incoming_processor); + + private: + scoped_refptr<base::SequencedTaskRunner> io_task_runner_; + scoped_refptr<base::SequencedTaskRunner> ui_task_runner_; + + // Container for BlimpMessageThreadPipes that are destroyed on IO thread. + scoped_ptr<IoThreadPipeManager> io_pipe_manager_; + + // Pipes for routing messages from the IO to the UI thread. + // Incoming messages are only routed to the UI thread since all features run + // there. + std::vector<scoped_ptr<BlimpMessageThreadPipe>> incoming_pipes_; + + DISALLOW_COPY_AND_ASSIGN(ThreadPipeManager); +}; + +} // namespace blimp + +#endif // BLIMP_NET_THREAD_PIPE_MANAGER_H_ diff --git a/blimp/net/thread_pipe_manager_unittest.cc b/blimp/net/thread_pipe_manager_unittest.cc new file mode 100644 index 0000000..f36ef48 --- /dev/null +++ b/blimp/net/thread_pipe_manager_unittest.cc @@ -0,0 +1,180 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "blimp/net/thread_pipe_manager.h" + +#include "base/location.h" +#include "base/memory/ref_counted.h" +#include "base/message_loop/message_loop.h" +#include "base/threading/sequenced_task_runner_handle.h" +#include "base/threading/thread.h" +#include "blimp/net/blimp_message_thread_pipe.h" +#include "blimp/net/browser_connection_handler.h" +#include "blimp/net/test_common.h" +#include "net/base/net_errors.h" +#include "net/base/test_completion_callback.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using testing::_; +using testing::SaveArg; + +namespace blimp { +namespace { + +scoped_ptr<BlimpMessage> CreateMessage(BlimpMessage::Type type) { + scoped_ptr<BlimpMessage> output(new BlimpMessage); + output->set_type(type); + return output; +} + +// A feature that registers itself with ThreadPipeManager. +class FakeFeature { + public: + FakeFeature(BlimpMessage::Type type, ThreadPipeManager* pipe_manager_) { + outgoing_message_processor_ = + pipe_manager_->RegisterFeature(type, &incoming_message_processor_); + } + + ~FakeFeature() {} + + BlimpMessageProcessor* outgoing_message_processor() { + return outgoing_message_processor_.get(); + } + + MockBlimpMessageProcessor* incoming_message_processor() { + return &incoming_message_processor_; + } + + private: + testing::StrictMock<MockBlimpMessageProcessor> incoming_message_processor_; + scoped_ptr<BlimpMessageProcessor> outgoing_message_processor_; +}; + +// A feature peer on |thread_| that forwards incoming messages to +// |message_processor|. +class FakeFeaturePeer : public BlimpMessageProcessor { + public: + FakeFeaturePeer(BlimpMessage::Type type, + BlimpMessageProcessor* message_processor, + const scoped_refptr<base::SequencedTaskRunner>& task_runner) + : type_(type), + message_processor_(message_processor), + task_runner_(task_runner) {} + + ~FakeFeaturePeer() override {} + + private: + void ForwardMessage(scoped_ptr<BlimpMessage> message) { + DCHECK(task_runner_->RunsTasksOnCurrentThread()); + message_processor_->ProcessMessage(std::move(message), + net::CompletionCallback()); + } + + // BlimpMessageProcessor implementation. + void ProcessMessage(scoped_ptr<BlimpMessage> message, + const net::CompletionCallback& callback) override { + DCHECK(task_runner_->RunsTasksOnCurrentThread()); + ASSERT_EQ(type_, message->type()); + base::MessageLoop::current()->PostTask( + FROM_HERE, base::Bind(&FakeFeaturePeer::ForwardMessage, + base::Unretained(this), base::Passed(&message))); + if (!callback.is_null()) + callback.Run(net::OK); + } + + BlimpMessage::Type type_; + BlimpMessageProcessor* message_processor_ = nullptr; + scoped_refptr<base::SequencedTaskRunner> task_runner_; +}; + +// A browser connection handler that returns FakeFeaturePeer to allow it +// forwarding message back so that FakeFeature can check message it receives +// with one it just sent. +class FakeBrowserConnectionHandler : public BrowserConnectionHandler { + public: + FakeBrowserConnectionHandler( + const scoped_refptr<base::SequencedTaskRunner>& task_runner) + : task_runner_(task_runner) {} + scoped_ptr<BlimpMessageProcessor> RegisterFeature( + BlimpMessage::Type type, + BlimpMessageProcessor* incoming_processor) override { + DCHECK(task_runner_->RunsTasksOnCurrentThread()); + return make_scoped_ptr( + new FakeFeaturePeer(type, incoming_processor, task_runner_)); + } + + private: + scoped_refptr<base::SequencedTaskRunner> task_runner_; +}; + +} // namespace + +class ThreadPipeManagerTest : public testing::Test { + public: + ThreadPipeManagerTest() : thread_("IoThread") {} + + ~ThreadPipeManagerTest() override {} + + void SetUp() override { + ASSERT_TRUE(thread_.Start()); + connection_handler_ = make_scoped_ptr( + new FakeBrowserConnectionHandler(thread_.task_runner())); + pipe_manager_ = make_scoped_ptr(new ThreadPipeManager( + thread_.task_runner(), base::SequencedTaskRunnerHandle::Get(), + connection_handler_.get())); + + input_feature_.reset( + new FakeFeature(BlimpMessage::INPUT, pipe_manager_.get())); + tab_control_feature_.reset( + new FakeFeature(BlimpMessage::TAB_CONTROL, pipe_manager_.get())); + } + + void TearDown() override { SynchronizeWithThread(); } + + // Synchronize with |thread_| to ensure that any pending work is done. + void SynchronizeWithThread() { + net::TestCompletionCallback cb; + thread_.task_runner()->PostTaskAndReply(FROM_HERE, + base::Bind(&base::DoNothing), + base::Bind(cb.callback(), net::OK)); + ASSERT_EQ(net::OK, cb.WaitForResult()); + } + + protected: + base::MessageLoop message_loop_; + scoped_ptr<BrowserConnectionHandler> connection_handler_; + scoped_ptr<ThreadPipeManager> pipe_manager_; + base::Thread thread_; + + scoped_ptr<FakeFeature> input_feature_; + scoped_ptr<FakeFeature> tab_control_feature_; +}; + +// Features send out message and receive the same message due to +// |FakeFeaturePeer| loops the message back on |thread_|. +TEST_F(ThreadPipeManagerTest, MessageSentIsReceived) { + scoped_ptr<BlimpMessage> input_message = CreateMessage(BlimpMessage::INPUT); + scoped_ptr<BlimpMessage> tab_control_message = + CreateMessage(BlimpMessage::TAB_CONTROL); + + EXPECT_CALL(*(input_feature_->incoming_message_processor()), + MockableProcessMessage(EqualsProto(*input_message), _)) + .RetiresOnSaturation(); + EXPECT_CALL(*(tab_control_feature_->incoming_message_processor()), + MockableProcessMessage(EqualsProto(*tab_control_message), _)) + .RetiresOnSaturation(); + + net::TestCompletionCallback cb1; + input_feature_->outgoing_message_processor()->ProcessMessage( + std::move(input_message), cb1.callback()); + net::TestCompletionCallback cb2; + tab_control_feature_->outgoing_message_processor()->ProcessMessage( + std::move(tab_control_message), cb2.callback()); + + EXPECT_EQ(net::OK, cb1.WaitForResult()); + EXPECT_EQ(net::OK, cb2.WaitForResult()); +} + +} // namespace blimp |