diff options
author | haibinlu <haibinlu@chromium.org> | 2016-03-03 18:32:37 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2016-03-04 02:33:48 +0000 |
commit | 3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48 (patch) | |
tree | 332f9a1155bd3c4be3697ea17ac1eb953adf2353 /blimp | |
parent | 0621b14f1b6cf6d7354a8ed52f45f943628f0405 (diff) | |
download | chromium_src-3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48.zip chromium_src-3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48.tar.gz chromium_src-3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48.tar.bz2 |
-- Removes duplicate feature registration code from client&engine session and NetworkComponents.
-- Adds ThreadPipeManager for managing thread pipes and registering features across UI and IO threads.
-- Makes ThreadPipeManager not depend on NetworkComponents.
-- Adds unit test for ThreadPipeManager
BUG=574884
Review URL: https://codereview.chromium.org/1647193003
Cr-Commit-Position: refs/heads/master@{#379182}
Diffstat (limited to 'blimp')
-rw-r--r-- | blimp/client/session/blimp_client_session.cc | 117 | ||||
-rw-r--r-- | blimp/client/session/blimp_client_session.h | 15 | ||||
-rw-r--r-- | blimp/engine/session/blimp_engine_session.cc | 106 | ||||
-rw-r--r-- | blimp/engine/session/blimp_engine_session.h | 15 | ||||
-rw-r--r-- | blimp/net/BUILD.gn | 3 | ||||
-rw-r--r-- | blimp/net/blimp_message_checkpointer.h | 2 | ||||
-rw-r--r-- | blimp/net/blimp_message_demultiplexer.h | 2 | ||||
-rw-r--r-- | blimp/net/blimp_message_multiplexer.h | 2 | ||||
-rw-r--r-- | blimp/net/blimp_message_output_buffer.h | 2 | ||||
-rw-r--r-- | blimp/net/browser_connection_handler.h | 4 | ||||
-rw-r--r-- | blimp/net/thread_pipe_manager.cc | 113 | ||||
-rw-r--r-- | blimp/net/thread_pipe_manager.h | 65 | ||||
-rw-r--r-- | blimp/net/thread_pipe_manager_unittest.cc | 180 |
13 files changed, 448 insertions, 178 deletions
diff --git a/blimp/client/session/blimp_client_session.cc b/blimp/client/session/blimp_client_session.cc index 7846d59..1f05184 100644 --- a/blimp/client/session/blimp_client_session.cc +++ b/blimp/client/session/blimp_client_session.cc @@ -4,7 +4,6 @@ #include "blimp/client/session/blimp_client_session.h" -#include <string> #include <vector> #include "base/command_line.h" @@ -24,6 +23,7 @@ #include "blimp/net/null_blimp_message_processor.h" #include "blimp/net/ssl_client_transport.h" #include "blimp/net/tcp_client_transport.h" +#include "blimp/net/thread_pipe_manager.h" #include "net/base/address_list.h" #include "net/base/ip_address.h" #include "net/base/ip_endpoint.h" @@ -33,13 +33,11 @@ namespace client { // This class's functions and destruction are all invoked on the IO thread by // the BlimpClientSession. -// TODO(haibinlu): crbug/574884 class ClientNetworkComponents { public: // Can be created on any thread. - ClientNetworkComponents() {} - - ~ClientNetworkComponents() {} + ClientNetworkComponents(); + ~ClientNetworkComponents(); // Sets up network components. void Initialize(); @@ -48,32 +46,24 @@ class ClientNetworkComponents { // It is required to first call Initialize. void ConnectWithAssignment(const Assignment& assignment); - // Invoked by BlimpEngineSession to finish feature registration on IO thread: - // using |incoming_proxy| as the incoming message processor, and connecting - // |outgoing_pipe| to the actual message sender. - void RegisterFeature(BlimpMessage::Type type, - scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe, - scoped_ptr<BlimpMessageProcessor> incoming_proxy); + BrowserConnectionHandler* GetBrowserConnectionHandler(); private: - scoped_ptr<BrowserConnectionHandler> browser_connection_handler_; + scoped_ptr<BrowserConnectionHandler> connection_handler_; scoped_ptr<ClientConnectionManager> connection_manager_; - // Container for the feature-specific MessageProcessors. - std::vector<scoped_ptr<BlimpMessageProcessor>> incoming_proxies_; - - // Containers for the MessageProcessors used to write feature-specific - // messages to the network, and the thread-pipe endpoints through which - // they are used from the UI thread. - std::vector<scoped_ptr<BlimpMessageThreadPipe>> outgoing_pipes_; - std::vector<scoped_ptr<BlimpMessageProcessor>> outgoing_message_processors_; DISALLOW_COPY_AND_ASSIGN(ClientNetworkComponents); }; +ClientNetworkComponents::ClientNetworkComponents() + : connection_handler_(new BrowserConnectionHandler) {} + +ClientNetworkComponents::~ClientNetworkComponents() {} + void ClientNetworkComponents::Initialize() { DCHECK(!connection_manager_); - connection_manager_ = make_scoped_ptr( - new ClientConnectionManager(browser_connection_handler_.get())); + connection_manager_ = + make_scoped_ptr(new ClientConnectionManager(connection_handler_.get())); } void ClientNetworkComponents::ConnectWithAssignment( @@ -99,24 +89,9 @@ void ClientNetworkComponents::ConnectWithAssignment( connection_manager_->Connect(); } -void ClientNetworkComponents::RegisterFeature( - BlimpMessage::Type type, - scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe, - scoped_ptr<BlimpMessageProcessor> incoming_proxy) { - if (!browser_connection_handler_) { - browser_connection_handler_ = make_scoped_ptr(new BrowserConnectionHandler); - } - - // Registers |incoming_proxy| as the message processor for incoming - // messages with |type|. Sets the returned outgoing message processor as the - // actual sender of the |outgoing_pipe|. - scoped_ptr<BlimpMessageProcessor> outgoing_message_processor = - browser_connection_handler_->RegisterFeature(type, incoming_proxy.get()); - outgoing_pipe->set_target_processor(outgoing_message_processor.get()); - - incoming_proxies_.push_back(std::move(incoming_proxy)); - outgoing_pipes_.push_back(std::move(outgoing_pipe)); - outgoing_message_processors_.push_back(std::move(outgoing_message_processor)); +BrowserConnectionHandler* +ClientNetworkComponents::GetBrowserConnectionHandler() { + return connection_handler_.get(); } BlimpClientSession::BlimpClientSession() @@ -133,19 +108,7 @@ BlimpClientSession::BlimpClientSession() assignment_source_.reset( new AssignmentSource(io_thread_.task_runner(), io_thread_.task_runner())); - // Register features' message senders and receivers. - tab_control_feature_->set_outgoing_message_processor( - RegisterFeature(BlimpMessage::TAB_CONTROL, tab_control_feature_.get())); - navigation_feature_->set_outgoing_message_processor( - RegisterFeature(BlimpMessage::NAVIGATION, navigation_feature_.get())); - render_widget_feature_->set_outgoing_input_message_processor( - RegisterFeature(BlimpMessage::INPUT, render_widget_feature_.get())); - render_widget_feature_->set_outgoing_compositor_message_processor( - RegisterFeature(BlimpMessage::COMPOSITOR, render_widget_feature_.get())); - - // We don't expect to send any RenderWidget messages, so don't save the - // outgoing BlimpMessageProcessor in the RenderWidgetFeature. - RegisterFeature(BlimpMessage::RENDER_WIDGET, render_widget_feature_.get()); + RegisterFeatures(); // Initialize must only be posted after the RegisterFeature calls have // completed. @@ -182,33 +145,29 @@ void BlimpClientSession::ConnectWithAssignment(AssignmentSource::Result result, void BlimpClientSession::OnAssignmentConnectionAttempted( AssignmentSource::Result result) {} -scoped_ptr<BlimpMessageProcessor> BlimpClientSession::RegisterFeature( - BlimpMessage::Type type, - BlimpMessageProcessor* incoming_processor) { - // Creates an outgoing pipe and a proxy for forwarding messages - // from features on the UI thread to network components on the IO thread. - scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe( - new BlimpMessageThreadPipe(io_thread_.task_runner())); - scoped_ptr<BlimpMessageProcessor> outgoing_message_proxy = - outgoing_pipe->CreateProxy(); - - // Creates an incoming pipe and a proxy for receiving messages - // from network components on the IO thread. - scoped_ptr<BlimpMessageThreadPipe> incoming_pipe( - new BlimpMessageThreadPipe(base::SequencedTaskRunnerHandle::Get())); - incoming_pipe->set_target_processor(incoming_processor); - scoped_ptr<BlimpMessageProcessor> incoming_proxy = - incoming_pipe->CreateProxy(); - - // Finishes registration on IO thread. - io_thread_.task_runner()->PostTask( - FROM_HERE, base::Bind(&ClientNetworkComponents::RegisterFeature, - base::Unretained(net_components_.get()), type, - base::Passed(std::move(outgoing_pipe)), - base::Passed(std::move(incoming_proxy)))); +void BlimpClientSession::RegisterFeatures() { + thread_pipe_manager_ = make_scoped_ptr(new ThreadPipeManager( + io_thread_.task_runner(), base::SequencedTaskRunnerHandle::Get(), + net_components_->GetBrowserConnectionHandler())); - incoming_pipes_.push_back(std::move(incoming_pipe)); - return outgoing_message_proxy; + // Register features' message senders and receivers. + tab_control_feature_->set_outgoing_message_processor( + thread_pipe_manager_->RegisterFeature(BlimpMessage::TAB_CONTROL, + tab_control_feature_.get())); + navigation_feature_->set_outgoing_message_processor( + thread_pipe_manager_->RegisterFeature(BlimpMessage::NAVIGATION, + navigation_feature_.get())); + render_widget_feature_->set_outgoing_input_message_processor( + thread_pipe_manager_->RegisterFeature(BlimpMessage::INPUT, + render_widget_feature_.get())); + render_widget_feature_->set_outgoing_compositor_message_processor( + thread_pipe_manager_->RegisterFeature(BlimpMessage::COMPOSITOR, + render_widget_feature_.get())); + + // Client will not send send any RenderWidget messages, so don't save the + // outgoing BlimpMessageProcessor in the RenderWidgetFeature. + thread_pipe_manager_->RegisterFeature(BlimpMessage::RENDER_WIDGET, + render_widget_feature_.get()); } TabControlFeature* BlimpClientSession::GetTabControlFeature() const { diff --git a/blimp/client/session/blimp_client_session.h b/blimp/client/session/blimp_client_session.h index b715453..5dc8277 100644 --- a/blimp/client/session/blimp_client_session.h +++ b/blimp/client/session/blimp_client_session.h @@ -5,6 +5,8 @@ #ifndef BLIMP_CLIENT_SESSION_BLIMP_CLIENT_SESSION_H_ #define BLIMP_CLIENT_SESSION_BLIMP_CLIENT_SESSION_H_ +#include <string> + #include "base/macros.h" #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" @@ -24,6 +26,7 @@ class BlimpMessageProcessor; class BlimpMessageThreadPipe; class BrowserConnectionHandler; class ClientConnectionManager; +class ThreadPipeManager; namespace client { @@ -68,12 +71,7 @@ class BLIMP_CLIENT_EXPORT BlimpClientSession { virtual void OnAssignmentConnectionAttempted(AssignmentSource::Result result); private: - // Registers a message processor which will receive all messages of the |type| - // specified. Returns a BlimpMessageProcessor object for sending messages of - // type |type|. - scoped_ptr<BlimpMessageProcessor> RegisterFeature( - BlimpMessage::Type type, - BlimpMessageProcessor* incoming_processor); + void RegisterFeatures(); base::Thread io_thread_; scoped_ptr<TabControlFeature> tab_control_feature_; @@ -88,10 +86,7 @@ class BLIMP_CLIENT_EXPORT BlimpClientSession { // Must be deleted on the IO thread. scoped_ptr<ClientNetworkComponents> net_components_; - // Pipes for receiving BlimpMessages from IO thread. - // Incoming messages are only routed to the UI thread since all features run - // on the UI thread. - std::vector<scoped_ptr<BlimpMessageThreadPipe>> incoming_pipes_; + scoped_ptr<ThreadPipeManager> thread_pipe_manager_; base::WeakPtrFactory<BlimpClientSession> weak_factory_; diff --git a/blimp/engine/session/blimp_engine_session.cc b/blimp/engine/session/blimp_engine_session.cc index e867b37..5ba532c 100644 --- a/blimp/engine/session/blimp_engine_session.cc +++ b/blimp/engine/session/blimp_engine_session.cc @@ -24,6 +24,7 @@ #include "blimp/net/engine_connection_manager.h" #include "blimp/net/null_blimp_message_processor.h" #include "blimp/net/tcp_engine_transport.h" +#include "blimp/net/thread_pipe_manager.h" #include "content/public/browser/browser_context.h" #include "content/public/browser/browser_thread.h" #include "content/public/browser/navigation_controller.h" @@ -85,12 +86,7 @@ class EngineNetworkComponents { // received messages can be properly handled. void Initialize(const std::string& client_token); - // Connects message pipes between the specified feature and the network layer, - // using |incoming_proxy| as the incoming message processor, and connecting - // |outgoing_pipe| to the actual message sender. - void RegisterFeature(BlimpMessage::Type type, - scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe, - scoped_ptr<BlimpMessageProcessor> incoming_proxy); + BrowserConnectionHandler* GetBrowserConnectionHandler(); private: net::NetLog* net_log_; @@ -98,20 +94,11 @@ class EngineNetworkComponents { scoped_ptr<EngineAuthenticationHandler> authentication_handler_; scoped_ptr<EngineConnectionManager> connection_manager_; - // Container for the feature-specific MessageProcessors. - std::vector<scoped_ptr<BlimpMessageProcessor>> incoming_proxies_; - - // Containers for the MessageProcessors used to write feature-specific - // messages to the network, and the thread-pipe endpoints through which - // they are used from the UI thread. - std::vector<scoped_ptr<BlimpMessageProcessor>> outgoing_message_processors_; - std::vector<scoped_ptr<BlimpMessageThreadPipe>> outgoing_pipes_; - DISALLOW_COPY_AND_ASSIGN(EngineNetworkComponents); }; EngineNetworkComponents::EngineNetworkComponents(net::NetLog* net_log) - : net_log_(net_log) {} + : net_log_(net_log), connection_handler_(new BrowserConnectionHandler) {} EngineNetworkComponents::~EngineNetworkComponents() { DCHECK_CURRENTLY_ON(content::BrowserThread::IO); @@ -119,8 +106,7 @@ EngineNetworkComponents::~EngineNetworkComponents() { void EngineNetworkComponents::Initialize(const std::string& client_token) { DCHECK_CURRENTLY_ON(content::BrowserThread::IO); - DCHECK(connection_handler_); - DCHECK(!authentication_handler_); + DCHECK(!connection_manager_); // Creates and connects net components. // A BlimpConnection flows from @@ -136,27 +122,9 @@ void EngineNetworkComponents::Initialize(const std::string& client_token) { make_scoped_ptr(new TCPEngineTransport(address, net_log_))); } -void EngineNetworkComponents::RegisterFeature( - BlimpMessage::Type type, - scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe, - scoped_ptr<BlimpMessageProcessor> incoming_proxy) { - DCHECK_CURRENTLY_ON(content::BrowserThread::IO); - - if (!connection_handler_) { - connection_handler_ = make_scoped_ptr(new BrowserConnectionHandler); - } - - // Registers |incoming_proxy| as the message processor for incoming - // messages with |type|. Sets the returned outgoing message processor as the - // target of the |outgoing_pipe|. - scoped_ptr<BlimpMessageProcessor> outgoing_message_processor = - connection_handler_->RegisterFeature(type, incoming_proxy.get()); - outgoing_pipe->set_target_processor(outgoing_message_processor.get()); - - // This object manages the lifetimes of the pipe, proxy and target processor. - incoming_proxies_.push_back(std::move(incoming_proxy)); - outgoing_pipes_.push_back(std::move(outgoing_pipe)); - outgoing_message_processors_.push_back(std::move(outgoing_message_processor)); +BrowserConnectionHandler* +EngineNetworkComponents::GetBrowserConnectionHandler() { + return connection_handler_.get(); } BlimpEngineSession::BlimpEngineSession( @@ -204,16 +172,7 @@ void BlimpEngineSession::Initialize() { window_tree_host_->SetBounds(gfx::Rect(screen_->GetPrimaryDisplay().size())); - // Register features' message senders and receivers. - tab_control_message_sender_ = - RegisterFeature(BlimpMessage::TAB_CONTROL, this); - navigation_message_sender_ = RegisterFeature(BlimpMessage::NAVIGATION, this); - render_widget_feature_.set_render_widget_message_sender( - RegisterFeature(BlimpMessage::RENDER_WIDGET, &render_widget_feature_)); - render_widget_feature_.set_input_message_sender( - RegisterFeature(BlimpMessage::INPUT, &render_widget_feature_)); - render_widget_feature_.set_compositor_message_sender( - RegisterFeature(BlimpMessage::COMPOSITOR, &render_widget_feature_)); + RegisterFeatures(); // Initialize must only be posted after the RegisterFeature calls have // completed. @@ -224,37 +183,28 @@ void BlimpEngineSession::Initialize() { engine_config_->client_token())); } -scoped_ptr<BlimpMessageProcessor> BlimpEngineSession::RegisterFeature( - BlimpMessage::Type type, - BlimpMessageProcessor* incoming_processor) { - // Creates an outgoing pipe and a proxy for forwarding messages - // from features on the UI thread to network components on the IO thread. - scoped_refptr<base::SingleThreadTaskRunner> io_task_runner = +void BlimpEngineSession::RegisterFeatures() { + thread_pipe_manager_.reset(new ThreadPipeManager( content::BrowserThread::GetMessageLoopProxyForThread( - content::BrowserThread::IO); - scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe( - new BlimpMessageThreadPipe(io_task_runner)); - scoped_ptr<BlimpMessageProcessor> outgoing_proxy = - outgoing_pipe->CreateProxy(); - - // Creates an incoming pipe and a proxy for receiving messages - // from network components on the IO thread. - scoped_ptr<BlimpMessageThreadPipe> incoming_pipe(new BlimpMessageThreadPipe( + content::BrowserThread::IO), content::BrowserThread::GetMessageLoopProxyForThread( - content::BrowserThread::UI))); - incoming_pipe->set_target_processor(incoming_processor); - scoped_ptr<BlimpMessageProcessor> incoming_proxy = - incoming_pipe->CreateProxy(); - - // Finishes registration on IO thread. - io_task_runner->PostTask( - FROM_HERE, base::Bind(&EngineNetworkComponents::RegisterFeature, - base::Unretained(net_components_.get()), type, - base::Passed(std::move(outgoing_pipe)), - base::Passed(std::move(incoming_proxy)))); - - incoming_pipes_.push_back(std::move(incoming_pipe)); - return outgoing_proxy; + content::BrowserThread::UI), + net_components_->GetBrowserConnectionHandler())); + + // Register features' message senders and receivers. + tab_control_message_sender_ = + thread_pipe_manager_->RegisterFeature(BlimpMessage::TAB_CONTROL, this); + navigation_message_sender_ = + thread_pipe_manager_->RegisterFeature(BlimpMessage::NAVIGATION, this); + render_widget_feature_.set_render_widget_message_sender( + thread_pipe_manager_->RegisterFeature(BlimpMessage::RENDER_WIDGET, + &render_widget_feature_)); + render_widget_feature_.set_input_message_sender( + thread_pipe_manager_->RegisterFeature(BlimpMessage::INPUT, + &render_widget_feature_)); + render_widget_feature_.set_compositor_message_sender( + thread_pipe_manager_->RegisterFeature(BlimpMessage::COMPOSITOR, + &render_widget_feature_)); } bool BlimpEngineSession::CreateWebContents(const int target_tab_id) { diff --git a/blimp/engine/session/blimp_engine_session.h b/blimp/engine/session/blimp_engine_session.h index ebab04d..ae527e5 100644 --- a/blimp/engine/session/blimp_engine_session.h +++ b/blimp/engine/session/blimp_engine_session.h @@ -53,6 +53,7 @@ namespace blimp { class BlimpConnection; class BlimpMessage; class BlimpMessageThreadPipe; +class ThreadPipeManager; namespace engine { @@ -88,12 +89,9 @@ class BlimpEngineSession const net::CompletionCallback& callback) override; private: - // Registers a message processor which will receive all messages of the |type| - // specified. Returns a BlimpMessageProcessor object for sending messages of - // type |type|. - scoped_ptr<BlimpMessageProcessor> RegisterFeature( - BlimpMessage::Type type, - BlimpMessageProcessor* incoming_processor); + // Creates ThreadPipeManager, registers features, and then starts to accept + // incoming connection. + void RegisterFeatures(); // TabControlMessage handler methods. // Creates a new WebContents, which will be indexed by |target_tab_id|. @@ -183,10 +181,7 @@ class BlimpEngineSession // this object is destroyed there. scoped_ptr<EngineNetworkComponents> net_components_; - // Pipes for receiving BlimpMessages from IO thread. - // Incoming messages are only routed to the UI thread since all features run - // there. - std::vector<scoped_ptr<BlimpMessageThreadPipe>> incoming_pipes_; + scoped_ptr<ThreadPipeManager> thread_pipe_manager_; // Used to send TAB_CONTROL or NAVIGATION messages to client. scoped_ptr<BlimpMessageProcessor> tab_control_message_sender_; diff --git a/blimp/net/BUILD.gn b/blimp/net/BUILD.gn index 28c00cd..0fe0cfd 100644 --- a/blimp/net/BUILD.gn +++ b/blimp/net/BUILD.gn @@ -54,6 +54,8 @@ component("blimp_net") { "tcp_client_transport.h", "tcp_engine_transport.cc", "tcp_engine_transport.h", + "thread_pipe_manager.cc", + "thread_pipe_manager.h", ] defines = [ "BLIMP_NET_IMPLEMENTATION=1" ] @@ -102,6 +104,7 @@ source_set("unit_tests") { "stream_packet_reader_unittest.cc", "stream_packet_writer_unittest.cc", "tcp_transport_unittest.cc", + "thread_pipe_manager_unittest.cc", ] deps = [ diff --git a/blimp/net/blimp_message_checkpointer.h b/blimp/net/blimp_message_checkpointer.h index 66c9894..cd5fd49 100644 --- a/blimp/net/blimp_message_checkpointer.h +++ b/blimp/net/blimp_message_checkpointer.h @@ -23,6 +23,8 @@ class BlimpMessageCheckpointObserver; // Checkpoint/ACK message dispatch may be deferred for a second or // two to avoid saturating the link with ACK traffic; feature implementations // need to account for this latency in their design. +// BlimpMessageCheckpointer is created on the UI thread, and then used and +// destroyed on the IO thread. class BLIMP_NET_EXPORT BlimpMessageCheckpointer : public BlimpMessageProcessor { public: BlimpMessageCheckpointer(BlimpMessageProcessor* incoming_processor, diff --git a/blimp/net/blimp_message_demultiplexer.h b/blimp/net/blimp_message_demultiplexer.h index eb6771eb..07854aa 100644 --- a/blimp/net/blimp_message_demultiplexer.h +++ b/blimp/net/blimp_message_demultiplexer.h @@ -18,6 +18,8 @@ namespace blimp { // Multiplexing BlimpMessageProcessor which routes BlimpMessages to message // processors based on |message.type|. +// BlimpMessageDemultiplexer is created on the UI thread, and then used and +// destroyed on the IO thread. class BLIMP_NET_EXPORT BlimpMessageDemultiplexer : public BlimpMessageProcessor { public: diff --git a/blimp/net/blimp_message_multiplexer.h b/blimp/net/blimp_message_multiplexer.h index 27713af..af4c4bb 100644 --- a/blimp/net/blimp_message_multiplexer.h +++ b/blimp/net/blimp_message_multiplexer.h @@ -17,6 +17,8 @@ class BlimpMessageProcessor; // Creates MessageProcessors that receive outgoing messages and put them // onto a multiplexed message stream. +// BlimpMessageMultiplexer is created on the UI thread, and then used and +// destroyed on the IO thread. class BLIMP_NET_EXPORT BlimpMessageMultiplexer { public: // |output_processor|: A pointer to the MessageProcessor that will receive the diff --git a/blimp/net/blimp_message_output_buffer.h b/blimp/net/blimp_message_output_buffer.h index c40aea3..97bd2ff 100644 --- a/blimp/net/blimp_message_output_buffer.h +++ b/blimp/net/blimp_message_output_buffer.h @@ -28,6 +28,8 @@ class BlimpConnection; // message acknowledgment. // (Redelivery will be used in a future CL to implement Fast Recovery // of dropped connections.) +// BlimpMessageOutputBuffer is created on the UI thread, and then used and +// destroyed on the IO thread. class BLIMP_NET_EXPORT BlimpMessageOutputBuffer : public BlimpMessageProcessor, public BlimpMessageCheckpointObserver { diff --git a/blimp/net/browser_connection_handler.h b/blimp/net/browser_connection_handler.h index 74bb7f2..0304ae1 100644 --- a/blimp/net/browser_connection_handler.h +++ b/blimp/net/browser_connection_handler.h @@ -25,6 +25,8 @@ class BlimpMessageProcessor; // messages out via underlying BlimpConnection. // A BrowserConnectionHandler is created on browser startup, and persists for // the lifetime of the application. +// BrowserConnectionHandler is created on the UI thread, and then used and +// destroyed on the IO thread. class BLIMP_NET_EXPORT BrowserConnectionHandler : public ConnectionHandler, public ConnectionErrorObserver { @@ -38,7 +40,7 @@ class BLIMP_NET_EXPORT BrowserConnectionHandler // this object is in-use. // // Returns a BlimpMessageProcessor object for sending messages of type |type|. - scoped_ptr<BlimpMessageProcessor> RegisterFeature( + virtual scoped_ptr<BlimpMessageProcessor> RegisterFeature( BlimpMessage::Type type, BlimpMessageProcessor* incoming_processor); diff --git a/blimp/net/thread_pipe_manager.cc b/blimp/net/thread_pipe_manager.cc new file mode 100644 index 0000000..95bc350 --- /dev/null +++ b/blimp/net/thread_pipe_manager.cc @@ -0,0 +1,113 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "blimp/net/thread_pipe_manager.h" + +#include "base/location.h" +#include "base/sequenced_task_runner.h" +#include "blimp/net/blimp_message_processor.h" +#include "blimp/net/blimp_message_thread_pipe.h" +#include "blimp/net/browser_connection_handler.h" + +namespace blimp { + +// IoThreadPipeManager is created on the UI thread, and then used and destroyed +// on the IO thread. +// It works with |connection_handler| to register features on the IO thread, +// and manages IO-thread-side BlimpMessageThreadPipes. +class IoThreadPipeManager { + public: + explicit IoThreadPipeManager(BrowserConnectionHandler* connection_handler); + virtual ~IoThreadPipeManager(); + + // Connects message pipes between the specified feature and the network layer, + // using |incoming_proxy| as the incoming message processor, and connecting + // |outgoing_pipe| to the actual message sender. + void RegisterFeature(BlimpMessage::Type type, + scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe, + scoped_ptr<BlimpMessageProcessor> incoming_proxy); + + private: + BrowserConnectionHandler* connection_handler_; + + // Container for the feature-specific MessageProcessors. + // IO-side proxy for sending messages to UI thread. + std::vector<scoped_ptr<BlimpMessageProcessor>> incoming_proxies_; + + // Containers for the MessageProcessors used to write feature-specific + // messages to the network, and the thread-pipe endpoints through which + // they are used from the UI thread. + std::vector<scoped_ptr<BlimpMessageProcessor>> outgoing_message_processors_; + std::vector<scoped_ptr<BlimpMessageThreadPipe>> outgoing_pipes_; + + DISALLOW_COPY_AND_ASSIGN(IoThreadPipeManager); +}; + +IoThreadPipeManager::IoThreadPipeManager( + BrowserConnectionHandler* connection_handler) + : connection_handler_(connection_handler) { + DCHECK(connection_handler_); +} + +IoThreadPipeManager::~IoThreadPipeManager() {} + +void IoThreadPipeManager::RegisterFeature( + BlimpMessage::Type type, + scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe, + scoped_ptr<BlimpMessageProcessor> incoming_proxy) { + // Registers |incoming_proxy| as the message processor for incoming + // messages with |type|. Sets the returned outgoing message processor as the + // target of the |outgoing_pipe|. + scoped_ptr<BlimpMessageProcessor> outgoing_message_processor = + connection_handler_->RegisterFeature(type, incoming_proxy.get()); + outgoing_pipe->set_target_processor(outgoing_message_processor.get()); + + // This object manages the lifetimes of the pipe, proxy and target processor. + incoming_proxies_.push_back(std::move(incoming_proxy)); + outgoing_pipes_.push_back(std::move(outgoing_pipe)); + outgoing_message_processors_.push_back(std::move(outgoing_message_processor)); +} + +ThreadPipeManager::ThreadPipeManager( + const scoped_refptr<base::SequencedTaskRunner>& io_task_runner, + const scoped_refptr<base::SequencedTaskRunner>& ui_task_runner, + BrowserConnectionHandler* connection_handler) + : io_task_runner_(io_task_runner), + ui_task_runner_(ui_task_runner), + io_pipe_manager_(new IoThreadPipeManager(connection_handler)) {} + +ThreadPipeManager::~ThreadPipeManager() { + io_task_runner_->DeleteSoon(FROM_HERE, io_pipe_manager_.release()); +} + +scoped_ptr<BlimpMessageProcessor> ThreadPipeManager::RegisterFeature( + BlimpMessage::Type type, + BlimpMessageProcessor* incoming_processor) { + // Creates an outgoing pipe and a proxy for forwarding messages + // from features on the UI thread to network components on the IO thread. + scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe( + new BlimpMessageThreadPipe(io_task_runner_)); + scoped_ptr<BlimpMessageProcessor> outgoing_proxy = + outgoing_pipe->CreateProxy(); + + // Creates an incoming pipe and a proxy for receiving messages + // from network components on the IO thread. + scoped_ptr<BlimpMessageThreadPipe> incoming_pipe( + new BlimpMessageThreadPipe(ui_task_runner_)); + incoming_pipe->set_target_processor(incoming_processor); + scoped_ptr<BlimpMessageProcessor> incoming_proxy = + incoming_pipe->CreateProxy(); + + // Finishes registration on IO thread. + io_task_runner_->PostTask( + FROM_HERE, base::Bind(&IoThreadPipeManager::RegisterFeature, + base::Unretained(io_pipe_manager_.get()), type, + base::Passed(std::move(outgoing_pipe)), + base::Passed(std::move(incoming_proxy)))); + + incoming_pipes_.push_back(std::move(incoming_pipe)); + return outgoing_proxy; +} + +} // namespace blimp diff --git a/blimp/net/thread_pipe_manager.h b/blimp/net/thread_pipe_manager.h new file mode 100644 index 0000000..55bf5b6 --- /dev/null +++ b/blimp/net/thread_pipe_manager.h @@ -0,0 +1,65 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BLIMP_NET_THREAD_PIPE_MANAGER_H_ +#define BLIMP_NET_THREAD_PIPE_MANAGER_H_ + +#include <vector> + +#include "base/macros.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "blimp/common/proto/blimp_message.pb.h" +#include "blimp/net/blimp_net_export.h" + +namespace base { +class SequencedTaskRunner; +} // namespace base + +namespace blimp { + +class BlimpMessageProcessor; +class BlimpMessageThreadPipe; +class BrowserConnectionHandler; +class IoThreadPipeManager; + +// This class is used on the UI thread for registering features and setting up +// BlimpMessageThreadPipes for communicating with |connection_handler| on the +// IO thread. +class BLIMP_NET_EXPORT ThreadPipeManager { + public: + // Caller is responsible for ensuring that |connection_handler| outlives + // |this|. + explicit ThreadPipeManager( + const scoped_refptr<base::SequencedTaskRunner>& io_task_runner, + const scoped_refptr<base::SequencedTaskRunner>& ui_task_runner, + BrowserConnectionHandler* connection_handler); + + ~ThreadPipeManager(); + + // Registers a message processor |incoming_processor| which will receive all + // messages of the |type| specified. Returns a BlimpMessageProcessor object + // for sending messages of type |type|. + scoped_ptr<BlimpMessageProcessor> RegisterFeature( + BlimpMessage::Type type, + BlimpMessageProcessor* incoming_processor); + + private: + scoped_refptr<base::SequencedTaskRunner> io_task_runner_; + scoped_refptr<base::SequencedTaskRunner> ui_task_runner_; + + // Container for BlimpMessageThreadPipes that are destroyed on IO thread. + scoped_ptr<IoThreadPipeManager> io_pipe_manager_; + + // Pipes for routing messages from the IO to the UI thread. + // Incoming messages are only routed to the UI thread since all features run + // there. + std::vector<scoped_ptr<BlimpMessageThreadPipe>> incoming_pipes_; + + DISALLOW_COPY_AND_ASSIGN(ThreadPipeManager); +}; + +} // namespace blimp + +#endif // BLIMP_NET_THREAD_PIPE_MANAGER_H_ diff --git a/blimp/net/thread_pipe_manager_unittest.cc b/blimp/net/thread_pipe_manager_unittest.cc new file mode 100644 index 0000000..f36ef48 --- /dev/null +++ b/blimp/net/thread_pipe_manager_unittest.cc @@ -0,0 +1,180 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "blimp/net/thread_pipe_manager.h" + +#include "base/location.h" +#include "base/memory/ref_counted.h" +#include "base/message_loop/message_loop.h" +#include "base/threading/sequenced_task_runner_handle.h" +#include "base/threading/thread.h" +#include "blimp/net/blimp_message_thread_pipe.h" +#include "blimp/net/browser_connection_handler.h" +#include "blimp/net/test_common.h" +#include "net/base/net_errors.h" +#include "net/base/test_completion_callback.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using testing::_; +using testing::SaveArg; + +namespace blimp { +namespace { + +scoped_ptr<BlimpMessage> CreateMessage(BlimpMessage::Type type) { + scoped_ptr<BlimpMessage> output(new BlimpMessage); + output->set_type(type); + return output; +} + +// A feature that registers itself with ThreadPipeManager. +class FakeFeature { + public: + FakeFeature(BlimpMessage::Type type, ThreadPipeManager* pipe_manager_) { + outgoing_message_processor_ = + pipe_manager_->RegisterFeature(type, &incoming_message_processor_); + } + + ~FakeFeature() {} + + BlimpMessageProcessor* outgoing_message_processor() { + return outgoing_message_processor_.get(); + } + + MockBlimpMessageProcessor* incoming_message_processor() { + return &incoming_message_processor_; + } + + private: + testing::StrictMock<MockBlimpMessageProcessor> incoming_message_processor_; + scoped_ptr<BlimpMessageProcessor> outgoing_message_processor_; +}; + +// A feature peer on |thread_| that forwards incoming messages to +// |message_processor|. +class FakeFeaturePeer : public BlimpMessageProcessor { + public: + FakeFeaturePeer(BlimpMessage::Type type, + BlimpMessageProcessor* message_processor, + const scoped_refptr<base::SequencedTaskRunner>& task_runner) + : type_(type), + message_processor_(message_processor), + task_runner_(task_runner) {} + + ~FakeFeaturePeer() override {} + + private: + void ForwardMessage(scoped_ptr<BlimpMessage> message) { + DCHECK(task_runner_->RunsTasksOnCurrentThread()); + message_processor_->ProcessMessage(std::move(message), + net::CompletionCallback()); + } + + // BlimpMessageProcessor implementation. + void ProcessMessage(scoped_ptr<BlimpMessage> message, + const net::CompletionCallback& callback) override { + DCHECK(task_runner_->RunsTasksOnCurrentThread()); + ASSERT_EQ(type_, message->type()); + base::MessageLoop::current()->PostTask( + FROM_HERE, base::Bind(&FakeFeaturePeer::ForwardMessage, + base::Unretained(this), base::Passed(&message))); + if (!callback.is_null()) + callback.Run(net::OK); + } + + BlimpMessage::Type type_; + BlimpMessageProcessor* message_processor_ = nullptr; + scoped_refptr<base::SequencedTaskRunner> task_runner_; +}; + +// A browser connection handler that returns FakeFeaturePeer to allow it +// forwarding message back so that FakeFeature can check message it receives +// with one it just sent. +class FakeBrowserConnectionHandler : public BrowserConnectionHandler { + public: + FakeBrowserConnectionHandler( + const scoped_refptr<base::SequencedTaskRunner>& task_runner) + : task_runner_(task_runner) {} + scoped_ptr<BlimpMessageProcessor> RegisterFeature( + BlimpMessage::Type type, + BlimpMessageProcessor* incoming_processor) override { + DCHECK(task_runner_->RunsTasksOnCurrentThread()); + return make_scoped_ptr( + new FakeFeaturePeer(type, incoming_processor, task_runner_)); + } + + private: + scoped_refptr<base::SequencedTaskRunner> task_runner_; +}; + +} // namespace + +class ThreadPipeManagerTest : public testing::Test { + public: + ThreadPipeManagerTest() : thread_("IoThread") {} + + ~ThreadPipeManagerTest() override {} + + void SetUp() override { + ASSERT_TRUE(thread_.Start()); + connection_handler_ = make_scoped_ptr( + new FakeBrowserConnectionHandler(thread_.task_runner())); + pipe_manager_ = make_scoped_ptr(new ThreadPipeManager( + thread_.task_runner(), base::SequencedTaskRunnerHandle::Get(), + connection_handler_.get())); + + input_feature_.reset( + new FakeFeature(BlimpMessage::INPUT, pipe_manager_.get())); + tab_control_feature_.reset( + new FakeFeature(BlimpMessage::TAB_CONTROL, pipe_manager_.get())); + } + + void TearDown() override { SynchronizeWithThread(); } + + // Synchronize with |thread_| to ensure that any pending work is done. + void SynchronizeWithThread() { + net::TestCompletionCallback cb; + thread_.task_runner()->PostTaskAndReply(FROM_HERE, + base::Bind(&base::DoNothing), + base::Bind(cb.callback(), net::OK)); + ASSERT_EQ(net::OK, cb.WaitForResult()); + } + + protected: + base::MessageLoop message_loop_; + scoped_ptr<BrowserConnectionHandler> connection_handler_; + scoped_ptr<ThreadPipeManager> pipe_manager_; + base::Thread thread_; + + scoped_ptr<FakeFeature> input_feature_; + scoped_ptr<FakeFeature> tab_control_feature_; +}; + +// Features send out message and receive the same message due to +// |FakeFeaturePeer| loops the message back on |thread_|. +TEST_F(ThreadPipeManagerTest, MessageSentIsReceived) { + scoped_ptr<BlimpMessage> input_message = CreateMessage(BlimpMessage::INPUT); + scoped_ptr<BlimpMessage> tab_control_message = + CreateMessage(BlimpMessage::TAB_CONTROL); + + EXPECT_CALL(*(input_feature_->incoming_message_processor()), + MockableProcessMessage(EqualsProto(*input_message), _)) + .RetiresOnSaturation(); + EXPECT_CALL(*(tab_control_feature_->incoming_message_processor()), + MockableProcessMessage(EqualsProto(*tab_control_message), _)) + .RetiresOnSaturation(); + + net::TestCompletionCallback cb1; + input_feature_->outgoing_message_processor()->ProcessMessage( + std::move(input_message), cb1.callback()); + net::TestCompletionCallback cb2; + tab_control_feature_->outgoing_message_processor()->ProcessMessage( + std::move(tab_control_message), cb2.callback()); + + EXPECT_EQ(net::OK, cb1.WaitForResult()); + EXPECT_EQ(net::OK, cb2.WaitForResult()); +} + +} // namespace blimp |