summaryrefslogtreecommitdiffstats
path: root/blimp
diff options
context:
space:
mode:
authorhaibinlu <haibinlu@chromium.org>2016-03-03 18:32:37 -0800
committerCommit bot <commit-bot@chromium.org>2016-03-04 02:33:48 +0000
commit3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48 (patch)
tree332f9a1155bd3c4be3697ea17ac1eb953adf2353 /blimp
parent0621b14f1b6cf6d7354a8ed52f45f943628f0405 (diff)
downloadchromium_src-3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48.zip
chromium_src-3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48.tar.gz
chromium_src-3f60cfdc2d1304bfcb376efcd9d5da10aafe3b48.tar.bz2
-- Removes duplicate feature registration code from client&engine session and NetworkComponents.
-- Adds ThreadPipeManager for managing thread pipes and registering features across UI and IO threads. -- Makes ThreadPipeManager not depend on NetworkComponents. -- Adds unit test for ThreadPipeManager BUG=574884 Review URL: https://codereview.chromium.org/1647193003 Cr-Commit-Position: refs/heads/master@{#379182}
Diffstat (limited to 'blimp')
-rw-r--r--blimp/client/session/blimp_client_session.cc117
-rw-r--r--blimp/client/session/blimp_client_session.h15
-rw-r--r--blimp/engine/session/blimp_engine_session.cc106
-rw-r--r--blimp/engine/session/blimp_engine_session.h15
-rw-r--r--blimp/net/BUILD.gn3
-rw-r--r--blimp/net/blimp_message_checkpointer.h2
-rw-r--r--blimp/net/blimp_message_demultiplexer.h2
-rw-r--r--blimp/net/blimp_message_multiplexer.h2
-rw-r--r--blimp/net/blimp_message_output_buffer.h2
-rw-r--r--blimp/net/browser_connection_handler.h4
-rw-r--r--blimp/net/thread_pipe_manager.cc113
-rw-r--r--blimp/net/thread_pipe_manager.h65
-rw-r--r--blimp/net/thread_pipe_manager_unittest.cc180
13 files changed, 448 insertions, 178 deletions
diff --git a/blimp/client/session/blimp_client_session.cc b/blimp/client/session/blimp_client_session.cc
index 7846d59..1f05184 100644
--- a/blimp/client/session/blimp_client_session.cc
+++ b/blimp/client/session/blimp_client_session.cc
@@ -4,7 +4,6 @@
#include "blimp/client/session/blimp_client_session.h"
-#include <string>
#include <vector>
#include "base/command_line.h"
@@ -24,6 +23,7 @@
#include "blimp/net/null_blimp_message_processor.h"
#include "blimp/net/ssl_client_transport.h"
#include "blimp/net/tcp_client_transport.h"
+#include "blimp/net/thread_pipe_manager.h"
#include "net/base/address_list.h"
#include "net/base/ip_address.h"
#include "net/base/ip_endpoint.h"
@@ -33,13 +33,11 @@ namespace client {
// This class's functions and destruction are all invoked on the IO thread by
// the BlimpClientSession.
-// TODO(haibinlu): crbug/574884
class ClientNetworkComponents {
public:
// Can be created on any thread.
- ClientNetworkComponents() {}
-
- ~ClientNetworkComponents() {}
+ ClientNetworkComponents();
+ ~ClientNetworkComponents();
// Sets up network components.
void Initialize();
@@ -48,32 +46,24 @@ class ClientNetworkComponents {
// It is required to first call Initialize.
void ConnectWithAssignment(const Assignment& assignment);
- // Invoked by BlimpEngineSession to finish feature registration on IO thread:
- // using |incoming_proxy| as the incoming message processor, and connecting
- // |outgoing_pipe| to the actual message sender.
- void RegisterFeature(BlimpMessage::Type type,
- scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe,
- scoped_ptr<BlimpMessageProcessor> incoming_proxy);
+ BrowserConnectionHandler* GetBrowserConnectionHandler();
private:
- scoped_ptr<BrowserConnectionHandler> browser_connection_handler_;
+ scoped_ptr<BrowserConnectionHandler> connection_handler_;
scoped_ptr<ClientConnectionManager> connection_manager_;
- // Container for the feature-specific MessageProcessors.
- std::vector<scoped_ptr<BlimpMessageProcessor>> incoming_proxies_;
-
- // Containers for the MessageProcessors used to write feature-specific
- // messages to the network, and the thread-pipe endpoints through which
- // they are used from the UI thread.
- std::vector<scoped_ptr<BlimpMessageThreadPipe>> outgoing_pipes_;
- std::vector<scoped_ptr<BlimpMessageProcessor>> outgoing_message_processors_;
DISALLOW_COPY_AND_ASSIGN(ClientNetworkComponents);
};
+ClientNetworkComponents::ClientNetworkComponents()
+ : connection_handler_(new BrowserConnectionHandler) {}
+
+ClientNetworkComponents::~ClientNetworkComponents() {}
+
void ClientNetworkComponents::Initialize() {
DCHECK(!connection_manager_);
- connection_manager_ = make_scoped_ptr(
- new ClientConnectionManager(browser_connection_handler_.get()));
+ connection_manager_ =
+ make_scoped_ptr(new ClientConnectionManager(connection_handler_.get()));
}
void ClientNetworkComponents::ConnectWithAssignment(
@@ -99,24 +89,9 @@ void ClientNetworkComponents::ConnectWithAssignment(
connection_manager_->Connect();
}
-void ClientNetworkComponents::RegisterFeature(
- BlimpMessage::Type type,
- scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe,
- scoped_ptr<BlimpMessageProcessor> incoming_proxy) {
- if (!browser_connection_handler_) {
- browser_connection_handler_ = make_scoped_ptr(new BrowserConnectionHandler);
- }
-
- // Registers |incoming_proxy| as the message processor for incoming
- // messages with |type|. Sets the returned outgoing message processor as the
- // actual sender of the |outgoing_pipe|.
- scoped_ptr<BlimpMessageProcessor> outgoing_message_processor =
- browser_connection_handler_->RegisterFeature(type, incoming_proxy.get());
- outgoing_pipe->set_target_processor(outgoing_message_processor.get());
-
- incoming_proxies_.push_back(std::move(incoming_proxy));
- outgoing_pipes_.push_back(std::move(outgoing_pipe));
- outgoing_message_processors_.push_back(std::move(outgoing_message_processor));
+BrowserConnectionHandler*
+ClientNetworkComponents::GetBrowserConnectionHandler() {
+ return connection_handler_.get();
}
BlimpClientSession::BlimpClientSession()
@@ -133,19 +108,7 @@ BlimpClientSession::BlimpClientSession()
assignment_source_.reset(
new AssignmentSource(io_thread_.task_runner(), io_thread_.task_runner()));
- // Register features' message senders and receivers.
- tab_control_feature_->set_outgoing_message_processor(
- RegisterFeature(BlimpMessage::TAB_CONTROL, tab_control_feature_.get()));
- navigation_feature_->set_outgoing_message_processor(
- RegisterFeature(BlimpMessage::NAVIGATION, navigation_feature_.get()));
- render_widget_feature_->set_outgoing_input_message_processor(
- RegisterFeature(BlimpMessage::INPUT, render_widget_feature_.get()));
- render_widget_feature_->set_outgoing_compositor_message_processor(
- RegisterFeature(BlimpMessage::COMPOSITOR, render_widget_feature_.get()));
-
- // We don't expect to send any RenderWidget messages, so don't save the
- // outgoing BlimpMessageProcessor in the RenderWidgetFeature.
- RegisterFeature(BlimpMessage::RENDER_WIDGET, render_widget_feature_.get());
+ RegisterFeatures();
// Initialize must only be posted after the RegisterFeature calls have
// completed.
@@ -182,33 +145,29 @@ void BlimpClientSession::ConnectWithAssignment(AssignmentSource::Result result,
void BlimpClientSession::OnAssignmentConnectionAttempted(
AssignmentSource::Result result) {}
-scoped_ptr<BlimpMessageProcessor> BlimpClientSession::RegisterFeature(
- BlimpMessage::Type type,
- BlimpMessageProcessor* incoming_processor) {
- // Creates an outgoing pipe and a proxy for forwarding messages
- // from features on the UI thread to network components on the IO thread.
- scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe(
- new BlimpMessageThreadPipe(io_thread_.task_runner()));
- scoped_ptr<BlimpMessageProcessor> outgoing_message_proxy =
- outgoing_pipe->CreateProxy();
-
- // Creates an incoming pipe and a proxy for receiving messages
- // from network components on the IO thread.
- scoped_ptr<BlimpMessageThreadPipe> incoming_pipe(
- new BlimpMessageThreadPipe(base::SequencedTaskRunnerHandle::Get()));
- incoming_pipe->set_target_processor(incoming_processor);
- scoped_ptr<BlimpMessageProcessor> incoming_proxy =
- incoming_pipe->CreateProxy();
-
- // Finishes registration on IO thread.
- io_thread_.task_runner()->PostTask(
- FROM_HERE, base::Bind(&ClientNetworkComponents::RegisterFeature,
- base::Unretained(net_components_.get()), type,
- base::Passed(std::move(outgoing_pipe)),
- base::Passed(std::move(incoming_proxy))));
+void BlimpClientSession::RegisterFeatures() {
+ thread_pipe_manager_ = make_scoped_ptr(new ThreadPipeManager(
+ io_thread_.task_runner(), base::SequencedTaskRunnerHandle::Get(),
+ net_components_->GetBrowserConnectionHandler()));
- incoming_pipes_.push_back(std::move(incoming_pipe));
- return outgoing_message_proxy;
+ // Register features' message senders and receivers.
+ tab_control_feature_->set_outgoing_message_processor(
+ thread_pipe_manager_->RegisterFeature(BlimpMessage::TAB_CONTROL,
+ tab_control_feature_.get()));
+ navigation_feature_->set_outgoing_message_processor(
+ thread_pipe_manager_->RegisterFeature(BlimpMessage::NAVIGATION,
+ navigation_feature_.get()));
+ render_widget_feature_->set_outgoing_input_message_processor(
+ thread_pipe_manager_->RegisterFeature(BlimpMessage::INPUT,
+ render_widget_feature_.get()));
+ render_widget_feature_->set_outgoing_compositor_message_processor(
+ thread_pipe_manager_->RegisterFeature(BlimpMessage::COMPOSITOR,
+ render_widget_feature_.get()));
+
+ // Client will not send send any RenderWidget messages, so don't save the
+ // outgoing BlimpMessageProcessor in the RenderWidgetFeature.
+ thread_pipe_manager_->RegisterFeature(BlimpMessage::RENDER_WIDGET,
+ render_widget_feature_.get());
}
TabControlFeature* BlimpClientSession::GetTabControlFeature() const {
diff --git a/blimp/client/session/blimp_client_session.h b/blimp/client/session/blimp_client_session.h
index b715453..5dc8277 100644
--- a/blimp/client/session/blimp_client_session.h
+++ b/blimp/client/session/blimp_client_session.h
@@ -5,6 +5,8 @@
#ifndef BLIMP_CLIENT_SESSION_BLIMP_CLIENT_SESSION_H_
#define BLIMP_CLIENT_SESSION_BLIMP_CLIENT_SESSION_H_
+#include <string>
+
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
@@ -24,6 +26,7 @@ class BlimpMessageProcessor;
class BlimpMessageThreadPipe;
class BrowserConnectionHandler;
class ClientConnectionManager;
+class ThreadPipeManager;
namespace client {
@@ -68,12 +71,7 @@ class BLIMP_CLIENT_EXPORT BlimpClientSession {
virtual void OnAssignmentConnectionAttempted(AssignmentSource::Result result);
private:
- // Registers a message processor which will receive all messages of the |type|
- // specified. Returns a BlimpMessageProcessor object for sending messages of
- // type |type|.
- scoped_ptr<BlimpMessageProcessor> RegisterFeature(
- BlimpMessage::Type type,
- BlimpMessageProcessor* incoming_processor);
+ void RegisterFeatures();
base::Thread io_thread_;
scoped_ptr<TabControlFeature> tab_control_feature_;
@@ -88,10 +86,7 @@ class BLIMP_CLIENT_EXPORT BlimpClientSession {
// Must be deleted on the IO thread.
scoped_ptr<ClientNetworkComponents> net_components_;
- // Pipes for receiving BlimpMessages from IO thread.
- // Incoming messages are only routed to the UI thread since all features run
- // on the UI thread.
- std::vector<scoped_ptr<BlimpMessageThreadPipe>> incoming_pipes_;
+ scoped_ptr<ThreadPipeManager> thread_pipe_manager_;
base::WeakPtrFactory<BlimpClientSession> weak_factory_;
diff --git a/blimp/engine/session/blimp_engine_session.cc b/blimp/engine/session/blimp_engine_session.cc
index e867b37..5ba532c 100644
--- a/blimp/engine/session/blimp_engine_session.cc
+++ b/blimp/engine/session/blimp_engine_session.cc
@@ -24,6 +24,7 @@
#include "blimp/net/engine_connection_manager.h"
#include "blimp/net/null_blimp_message_processor.h"
#include "blimp/net/tcp_engine_transport.h"
+#include "blimp/net/thread_pipe_manager.h"
#include "content/public/browser/browser_context.h"
#include "content/public/browser/browser_thread.h"
#include "content/public/browser/navigation_controller.h"
@@ -85,12 +86,7 @@ class EngineNetworkComponents {
// received messages can be properly handled.
void Initialize(const std::string& client_token);
- // Connects message pipes between the specified feature and the network layer,
- // using |incoming_proxy| as the incoming message processor, and connecting
- // |outgoing_pipe| to the actual message sender.
- void RegisterFeature(BlimpMessage::Type type,
- scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe,
- scoped_ptr<BlimpMessageProcessor> incoming_proxy);
+ BrowserConnectionHandler* GetBrowserConnectionHandler();
private:
net::NetLog* net_log_;
@@ -98,20 +94,11 @@ class EngineNetworkComponents {
scoped_ptr<EngineAuthenticationHandler> authentication_handler_;
scoped_ptr<EngineConnectionManager> connection_manager_;
- // Container for the feature-specific MessageProcessors.
- std::vector<scoped_ptr<BlimpMessageProcessor>> incoming_proxies_;
-
- // Containers for the MessageProcessors used to write feature-specific
- // messages to the network, and the thread-pipe endpoints through which
- // they are used from the UI thread.
- std::vector<scoped_ptr<BlimpMessageProcessor>> outgoing_message_processors_;
- std::vector<scoped_ptr<BlimpMessageThreadPipe>> outgoing_pipes_;
-
DISALLOW_COPY_AND_ASSIGN(EngineNetworkComponents);
};
EngineNetworkComponents::EngineNetworkComponents(net::NetLog* net_log)
- : net_log_(net_log) {}
+ : net_log_(net_log), connection_handler_(new BrowserConnectionHandler) {}
EngineNetworkComponents::~EngineNetworkComponents() {
DCHECK_CURRENTLY_ON(content::BrowserThread::IO);
@@ -119,8 +106,7 @@ EngineNetworkComponents::~EngineNetworkComponents() {
void EngineNetworkComponents::Initialize(const std::string& client_token) {
DCHECK_CURRENTLY_ON(content::BrowserThread::IO);
- DCHECK(connection_handler_);
- DCHECK(!authentication_handler_);
+ DCHECK(!connection_manager_);
// Creates and connects net components.
// A BlimpConnection flows from
@@ -136,27 +122,9 @@ void EngineNetworkComponents::Initialize(const std::string& client_token) {
make_scoped_ptr(new TCPEngineTransport(address, net_log_)));
}
-void EngineNetworkComponents::RegisterFeature(
- BlimpMessage::Type type,
- scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe,
- scoped_ptr<BlimpMessageProcessor> incoming_proxy) {
- DCHECK_CURRENTLY_ON(content::BrowserThread::IO);
-
- if (!connection_handler_) {
- connection_handler_ = make_scoped_ptr(new BrowserConnectionHandler);
- }
-
- // Registers |incoming_proxy| as the message processor for incoming
- // messages with |type|. Sets the returned outgoing message processor as the
- // target of the |outgoing_pipe|.
- scoped_ptr<BlimpMessageProcessor> outgoing_message_processor =
- connection_handler_->RegisterFeature(type, incoming_proxy.get());
- outgoing_pipe->set_target_processor(outgoing_message_processor.get());
-
- // This object manages the lifetimes of the pipe, proxy and target processor.
- incoming_proxies_.push_back(std::move(incoming_proxy));
- outgoing_pipes_.push_back(std::move(outgoing_pipe));
- outgoing_message_processors_.push_back(std::move(outgoing_message_processor));
+BrowserConnectionHandler*
+EngineNetworkComponents::GetBrowserConnectionHandler() {
+ return connection_handler_.get();
}
BlimpEngineSession::BlimpEngineSession(
@@ -204,16 +172,7 @@ void BlimpEngineSession::Initialize() {
window_tree_host_->SetBounds(gfx::Rect(screen_->GetPrimaryDisplay().size()));
- // Register features' message senders and receivers.
- tab_control_message_sender_ =
- RegisterFeature(BlimpMessage::TAB_CONTROL, this);
- navigation_message_sender_ = RegisterFeature(BlimpMessage::NAVIGATION, this);
- render_widget_feature_.set_render_widget_message_sender(
- RegisterFeature(BlimpMessage::RENDER_WIDGET, &render_widget_feature_));
- render_widget_feature_.set_input_message_sender(
- RegisterFeature(BlimpMessage::INPUT, &render_widget_feature_));
- render_widget_feature_.set_compositor_message_sender(
- RegisterFeature(BlimpMessage::COMPOSITOR, &render_widget_feature_));
+ RegisterFeatures();
// Initialize must only be posted after the RegisterFeature calls have
// completed.
@@ -224,37 +183,28 @@ void BlimpEngineSession::Initialize() {
engine_config_->client_token()));
}
-scoped_ptr<BlimpMessageProcessor> BlimpEngineSession::RegisterFeature(
- BlimpMessage::Type type,
- BlimpMessageProcessor* incoming_processor) {
- // Creates an outgoing pipe and a proxy for forwarding messages
- // from features on the UI thread to network components on the IO thread.
- scoped_refptr<base::SingleThreadTaskRunner> io_task_runner =
+void BlimpEngineSession::RegisterFeatures() {
+ thread_pipe_manager_.reset(new ThreadPipeManager(
content::BrowserThread::GetMessageLoopProxyForThread(
- content::BrowserThread::IO);
- scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe(
- new BlimpMessageThreadPipe(io_task_runner));
- scoped_ptr<BlimpMessageProcessor> outgoing_proxy =
- outgoing_pipe->CreateProxy();
-
- // Creates an incoming pipe and a proxy for receiving messages
- // from network components on the IO thread.
- scoped_ptr<BlimpMessageThreadPipe> incoming_pipe(new BlimpMessageThreadPipe(
+ content::BrowserThread::IO),
content::BrowserThread::GetMessageLoopProxyForThread(
- content::BrowserThread::UI)));
- incoming_pipe->set_target_processor(incoming_processor);
- scoped_ptr<BlimpMessageProcessor> incoming_proxy =
- incoming_pipe->CreateProxy();
-
- // Finishes registration on IO thread.
- io_task_runner->PostTask(
- FROM_HERE, base::Bind(&EngineNetworkComponents::RegisterFeature,
- base::Unretained(net_components_.get()), type,
- base::Passed(std::move(outgoing_pipe)),
- base::Passed(std::move(incoming_proxy))));
-
- incoming_pipes_.push_back(std::move(incoming_pipe));
- return outgoing_proxy;
+ content::BrowserThread::UI),
+ net_components_->GetBrowserConnectionHandler()));
+
+ // Register features' message senders and receivers.
+ tab_control_message_sender_ =
+ thread_pipe_manager_->RegisterFeature(BlimpMessage::TAB_CONTROL, this);
+ navigation_message_sender_ =
+ thread_pipe_manager_->RegisterFeature(BlimpMessage::NAVIGATION, this);
+ render_widget_feature_.set_render_widget_message_sender(
+ thread_pipe_manager_->RegisterFeature(BlimpMessage::RENDER_WIDGET,
+ &render_widget_feature_));
+ render_widget_feature_.set_input_message_sender(
+ thread_pipe_manager_->RegisterFeature(BlimpMessage::INPUT,
+ &render_widget_feature_));
+ render_widget_feature_.set_compositor_message_sender(
+ thread_pipe_manager_->RegisterFeature(BlimpMessage::COMPOSITOR,
+ &render_widget_feature_));
}
bool BlimpEngineSession::CreateWebContents(const int target_tab_id) {
diff --git a/blimp/engine/session/blimp_engine_session.h b/blimp/engine/session/blimp_engine_session.h
index ebab04d..ae527e5 100644
--- a/blimp/engine/session/blimp_engine_session.h
+++ b/blimp/engine/session/blimp_engine_session.h
@@ -53,6 +53,7 @@ namespace blimp {
class BlimpConnection;
class BlimpMessage;
class BlimpMessageThreadPipe;
+class ThreadPipeManager;
namespace engine {
@@ -88,12 +89,9 @@ class BlimpEngineSession
const net::CompletionCallback& callback) override;
private:
- // Registers a message processor which will receive all messages of the |type|
- // specified. Returns a BlimpMessageProcessor object for sending messages of
- // type |type|.
- scoped_ptr<BlimpMessageProcessor> RegisterFeature(
- BlimpMessage::Type type,
- BlimpMessageProcessor* incoming_processor);
+ // Creates ThreadPipeManager, registers features, and then starts to accept
+ // incoming connection.
+ void RegisterFeatures();
// TabControlMessage handler methods.
// Creates a new WebContents, which will be indexed by |target_tab_id|.
@@ -183,10 +181,7 @@ class BlimpEngineSession
// this object is destroyed there.
scoped_ptr<EngineNetworkComponents> net_components_;
- // Pipes for receiving BlimpMessages from IO thread.
- // Incoming messages are only routed to the UI thread since all features run
- // there.
- std::vector<scoped_ptr<BlimpMessageThreadPipe>> incoming_pipes_;
+ scoped_ptr<ThreadPipeManager> thread_pipe_manager_;
// Used to send TAB_CONTROL or NAVIGATION messages to client.
scoped_ptr<BlimpMessageProcessor> tab_control_message_sender_;
diff --git a/blimp/net/BUILD.gn b/blimp/net/BUILD.gn
index 28c00cd..0fe0cfd 100644
--- a/blimp/net/BUILD.gn
+++ b/blimp/net/BUILD.gn
@@ -54,6 +54,8 @@ component("blimp_net") {
"tcp_client_transport.h",
"tcp_engine_transport.cc",
"tcp_engine_transport.h",
+ "thread_pipe_manager.cc",
+ "thread_pipe_manager.h",
]
defines = [ "BLIMP_NET_IMPLEMENTATION=1" ]
@@ -102,6 +104,7 @@ source_set("unit_tests") {
"stream_packet_reader_unittest.cc",
"stream_packet_writer_unittest.cc",
"tcp_transport_unittest.cc",
+ "thread_pipe_manager_unittest.cc",
]
deps = [
diff --git a/blimp/net/blimp_message_checkpointer.h b/blimp/net/blimp_message_checkpointer.h
index 66c9894..cd5fd49 100644
--- a/blimp/net/blimp_message_checkpointer.h
+++ b/blimp/net/blimp_message_checkpointer.h
@@ -23,6 +23,8 @@ class BlimpMessageCheckpointObserver;
// Checkpoint/ACK message dispatch may be deferred for a second or
// two to avoid saturating the link with ACK traffic; feature implementations
// need to account for this latency in their design.
+// BlimpMessageCheckpointer is created on the UI thread, and then used and
+// destroyed on the IO thread.
class BLIMP_NET_EXPORT BlimpMessageCheckpointer : public BlimpMessageProcessor {
public:
BlimpMessageCheckpointer(BlimpMessageProcessor* incoming_processor,
diff --git a/blimp/net/blimp_message_demultiplexer.h b/blimp/net/blimp_message_demultiplexer.h
index eb6771eb..07854aa 100644
--- a/blimp/net/blimp_message_demultiplexer.h
+++ b/blimp/net/blimp_message_demultiplexer.h
@@ -18,6 +18,8 @@ namespace blimp {
// Multiplexing BlimpMessageProcessor which routes BlimpMessages to message
// processors based on |message.type|.
+// BlimpMessageDemultiplexer is created on the UI thread, and then used and
+// destroyed on the IO thread.
class BLIMP_NET_EXPORT BlimpMessageDemultiplexer
: public BlimpMessageProcessor {
public:
diff --git a/blimp/net/blimp_message_multiplexer.h b/blimp/net/blimp_message_multiplexer.h
index 27713af..af4c4bb 100644
--- a/blimp/net/blimp_message_multiplexer.h
+++ b/blimp/net/blimp_message_multiplexer.h
@@ -17,6 +17,8 @@ class BlimpMessageProcessor;
// Creates MessageProcessors that receive outgoing messages and put them
// onto a multiplexed message stream.
+// BlimpMessageMultiplexer is created on the UI thread, and then used and
+// destroyed on the IO thread.
class BLIMP_NET_EXPORT BlimpMessageMultiplexer {
public:
// |output_processor|: A pointer to the MessageProcessor that will receive the
diff --git a/blimp/net/blimp_message_output_buffer.h b/blimp/net/blimp_message_output_buffer.h
index c40aea3..97bd2ff 100644
--- a/blimp/net/blimp_message_output_buffer.h
+++ b/blimp/net/blimp_message_output_buffer.h
@@ -28,6 +28,8 @@ class BlimpConnection;
// message acknowledgment.
// (Redelivery will be used in a future CL to implement Fast Recovery
// of dropped connections.)
+// BlimpMessageOutputBuffer is created on the UI thread, and then used and
+// destroyed on the IO thread.
class BLIMP_NET_EXPORT BlimpMessageOutputBuffer
: public BlimpMessageProcessor,
public BlimpMessageCheckpointObserver {
diff --git a/blimp/net/browser_connection_handler.h b/blimp/net/browser_connection_handler.h
index 74bb7f2..0304ae1 100644
--- a/blimp/net/browser_connection_handler.h
+++ b/blimp/net/browser_connection_handler.h
@@ -25,6 +25,8 @@ class BlimpMessageProcessor;
// messages out via underlying BlimpConnection.
// A BrowserConnectionHandler is created on browser startup, and persists for
// the lifetime of the application.
+// BrowserConnectionHandler is created on the UI thread, and then used and
+// destroyed on the IO thread.
class BLIMP_NET_EXPORT BrowserConnectionHandler
: public ConnectionHandler,
public ConnectionErrorObserver {
@@ -38,7 +40,7 @@ class BLIMP_NET_EXPORT BrowserConnectionHandler
// this object is in-use.
//
// Returns a BlimpMessageProcessor object for sending messages of type |type|.
- scoped_ptr<BlimpMessageProcessor> RegisterFeature(
+ virtual scoped_ptr<BlimpMessageProcessor> RegisterFeature(
BlimpMessage::Type type,
BlimpMessageProcessor* incoming_processor);
diff --git a/blimp/net/thread_pipe_manager.cc b/blimp/net/thread_pipe_manager.cc
new file mode 100644
index 0000000..95bc350
--- /dev/null
+++ b/blimp/net/thread_pipe_manager.cc
@@ -0,0 +1,113 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "blimp/net/thread_pipe_manager.h"
+
+#include "base/location.h"
+#include "base/sequenced_task_runner.h"
+#include "blimp/net/blimp_message_processor.h"
+#include "blimp/net/blimp_message_thread_pipe.h"
+#include "blimp/net/browser_connection_handler.h"
+
+namespace blimp {
+
+// IoThreadPipeManager is created on the UI thread, and then used and destroyed
+// on the IO thread.
+// It works with |connection_handler| to register features on the IO thread,
+// and manages IO-thread-side BlimpMessageThreadPipes.
+class IoThreadPipeManager {
+ public:
+ explicit IoThreadPipeManager(BrowserConnectionHandler* connection_handler);
+ virtual ~IoThreadPipeManager();
+
+ // Connects message pipes between the specified feature and the network layer,
+ // using |incoming_proxy| as the incoming message processor, and connecting
+ // |outgoing_pipe| to the actual message sender.
+ void RegisterFeature(BlimpMessage::Type type,
+ scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe,
+ scoped_ptr<BlimpMessageProcessor> incoming_proxy);
+
+ private:
+ BrowserConnectionHandler* connection_handler_;
+
+ // Container for the feature-specific MessageProcessors.
+ // IO-side proxy for sending messages to UI thread.
+ std::vector<scoped_ptr<BlimpMessageProcessor>> incoming_proxies_;
+
+ // Containers for the MessageProcessors used to write feature-specific
+ // messages to the network, and the thread-pipe endpoints through which
+ // they are used from the UI thread.
+ std::vector<scoped_ptr<BlimpMessageProcessor>> outgoing_message_processors_;
+ std::vector<scoped_ptr<BlimpMessageThreadPipe>> outgoing_pipes_;
+
+ DISALLOW_COPY_AND_ASSIGN(IoThreadPipeManager);
+};
+
+IoThreadPipeManager::IoThreadPipeManager(
+ BrowserConnectionHandler* connection_handler)
+ : connection_handler_(connection_handler) {
+ DCHECK(connection_handler_);
+}
+
+IoThreadPipeManager::~IoThreadPipeManager() {}
+
+void IoThreadPipeManager::RegisterFeature(
+ BlimpMessage::Type type,
+ scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe,
+ scoped_ptr<BlimpMessageProcessor> incoming_proxy) {
+ // Registers |incoming_proxy| as the message processor for incoming
+ // messages with |type|. Sets the returned outgoing message processor as the
+ // target of the |outgoing_pipe|.
+ scoped_ptr<BlimpMessageProcessor> outgoing_message_processor =
+ connection_handler_->RegisterFeature(type, incoming_proxy.get());
+ outgoing_pipe->set_target_processor(outgoing_message_processor.get());
+
+ // This object manages the lifetimes of the pipe, proxy and target processor.
+ incoming_proxies_.push_back(std::move(incoming_proxy));
+ outgoing_pipes_.push_back(std::move(outgoing_pipe));
+ outgoing_message_processors_.push_back(std::move(outgoing_message_processor));
+}
+
+ThreadPipeManager::ThreadPipeManager(
+ const scoped_refptr<base::SequencedTaskRunner>& io_task_runner,
+ const scoped_refptr<base::SequencedTaskRunner>& ui_task_runner,
+ BrowserConnectionHandler* connection_handler)
+ : io_task_runner_(io_task_runner),
+ ui_task_runner_(ui_task_runner),
+ io_pipe_manager_(new IoThreadPipeManager(connection_handler)) {}
+
+ThreadPipeManager::~ThreadPipeManager() {
+ io_task_runner_->DeleteSoon(FROM_HERE, io_pipe_manager_.release());
+}
+
+scoped_ptr<BlimpMessageProcessor> ThreadPipeManager::RegisterFeature(
+ BlimpMessage::Type type,
+ BlimpMessageProcessor* incoming_processor) {
+ // Creates an outgoing pipe and a proxy for forwarding messages
+ // from features on the UI thread to network components on the IO thread.
+ scoped_ptr<BlimpMessageThreadPipe> outgoing_pipe(
+ new BlimpMessageThreadPipe(io_task_runner_));
+ scoped_ptr<BlimpMessageProcessor> outgoing_proxy =
+ outgoing_pipe->CreateProxy();
+
+ // Creates an incoming pipe and a proxy for receiving messages
+ // from network components on the IO thread.
+ scoped_ptr<BlimpMessageThreadPipe> incoming_pipe(
+ new BlimpMessageThreadPipe(ui_task_runner_));
+ incoming_pipe->set_target_processor(incoming_processor);
+ scoped_ptr<BlimpMessageProcessor> incoming_proxy =
+ incoming_pipe->CreateProxy();
+
+ // Finishes registration on IO thread.
+ io_task_runner_->PostTask(
+ FROM_HERE, base::Bind(&IoThreadPipeManager::RegisterFeature,
+ base::Unretained(io_pipe_manager_.get()), type,
+ base::Passed(std::move(outgoing_pipe)),
+ base::Passed(std::move(incoming_proxy))));
+
+ incoming_pipes_.push_back(std::move(incoming_pipe));
+ return outgoing_proxy;
+}
+
+} // namespace blimp
diff --git a/blimp/net/thread_pipe_manager.h b/blimp/net/thread_pipe_manager.h
new file mode 100644
index 0000000..55bf5b6
--- /dev/null
+++ b/blimp/net/thread_pipe_manager.h
@@ -0,0 +1,65 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BLIMP_NET_THREAD_PIPE_MANAGER_H_
+#define BLIMP_NET_THREAD_PIPE_MANAGER_H_
+
+#include <vector>
+
+#include "base/macros.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "blimp/common/proto/blimp_message.pb.h"
+#include "blimp/net/blimp_net_export.h"
+
+namespace base {
+class SequencedTaskRunner;
+} // namespace base
+
+namespace blimp {
+
+class BlimpMessageProcessor;
+class BlimpMessageThreadPipe;
+class BrowserConnectionHandler;
+class IoThreadPipeManager;
+
+// This class is used on the UI thread for registering features and setting up
+// BlimpMessageThreadPipes for communicating with |connection_handler| on the
+// IO thread.
+class BLIMP_NET_EXPORT ThreadPipeManager {
+ public:
+ // Caller is responsible for ensuring that |connection_handler| outlives
+ // |this|.
+ explicit ThreadPipeManager(
+ const scoped_refptr<base::SequencedTaskRunner>& io_task_runner,
+ const scoped_refptr<base::SequencedTaskRunner>& ui_task_runner,
+ BrowserConnectionHandler* connection_handler);
+
+ ~ThreadPipeManager();
+
+ // Registers a message processor |incoming_processor| which will receive all
+ // messages of the |type| specified. Returns a BlimpMessageProcessor object
+ // for sending messages of type |type|.
+ scoped_ptr<BlimpMessageProcessor> RegisterFeature(
+ BlimpMessage::Type type,
+ BlimpMessageProcessor* incoming_processor);
+
+ private:
+ scoped_refptr<base::SequencedTaskRunner> io_task_runner_;
+ scoped_refptr<base::SequencedTaskRunner> ui_task_runner_;
+
+ // Container for BlimpMessageThreadPipes that are destroyed on IO thread.
+ scoped_ptr<IoThreadPipeManager> io_pipe_manager_;
+
+ // Pipes for routing messages from the IO to the UI thread.
+ // Incoming messages are only routed to the UI thread since all features run
+ // there.
+ std::vector<scoped_ptr<BlimpMessageThreadPipe>> incoming_pipes_;
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadPipeManager);
+};
+
+} // namespace blimp
+
+#endif // BLIMP_NET_THREAD_PIPE_MANAGER_H_
diff --git a/blimp/net/thread_pipe_manager_unittest.cc b/blimp/net/thread_pipe_manager_unittest.cc
new file mode 100644
index 0000000..f36ef48
--- /dev/null
+++ b/blimp/net/thread_pipe_manager_unittest.cc
@@ -0,0 +1,180 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "blimp/net/thread_pipe_manager.h"
+
+#include "base/location.h"
+#include "base/memory/ref_counted.h"
+#include "base/message_loop/message_loop.h"
+#include "base/threading/sequenced_task_runner_handle.h"
+#include "base/threading/thread.h"
+#include "blimp/net/blimp_message_thread_pipe.h"
+#include "blimp/net/browser_connection_handler.h"
+#include "blimp/net/test_common.h"
+#include "net/base/net_errors.h"
+#include "net/base/test_completion_callback.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using testing::_;
+using testing::SaveArg;
+
+namespace blimp {
+namespace {
+
+scoped_ptr<BlimpMessage> CreateMessage(BlimpMessage::Type type) {
+ scoped_ptr<BlimpMessage> output(new BlimpMessage);
+ output->set_type(type);
+ return output;
+}
+
+// A feature that registers itself with ThreadPipeManager.
+class FakeFeature {
+ public:
+ FakeFeature(BlimpMessage::Type type, ThreadPipeManager* pipe_manager_) {
+ outgoing_message_processor_ =
+ pipe_manager_->RegisterFeature(type, &incoming_message_processor_);
+ }
+
+ ~FakeFeature() {}
+
+ BlimpMessageProcessor* outgoing_message_processor() {
+ return outgoing_message_processor_.get();
+ }
+
+ MockBlimpMessageProcessor* incoming_message_processor() {
+ return &incoming_message_processor_;
+ }
+
+ private:
+ testing::StrictMock<MockBlimpMessageProcessor> incoming_message_processor_;
+ scoped_ptr<BlimpMessageProcessor> outgoing_message_processor_;
+};
+
+// A feature peer on |thread_| that forwards incoming messages to
+// |message_processor|.
+class FakeFeaturePeer : public BlimpMessageProcessor {
+ public:
+ FakeFeaturePeer(BlimpMessage::Type type,
+ BlimpMessageProcessor* message_processor,
+ const scoped_refptr<base::SequencedTaskRunner>& task_runner)
+ : type_(type),
+ message_processor_(message_processor),
+ task_runner_(task_runner) {}
+
+ ~FakeFeaturePeer() override {}
+
+ private:
+ void ForwardMessage(scoped_ptr<BlimpMessage> message) {
+ DCHECK(task_runner_->RunsTasksOnCurrentThread());
+ message_processor_->ProcessMessage(std::move(message),
+ net::CompletionCallback());
+ }
+
+ // BlimpMessageProcessor implementation.
+ void ProcessMessage(scoped_ptr<BlimpMessage> message,
+ const net::CompletionCallback& callback) override {
+ DCHECK(task_runner_->RunsTasksOnCurrentThread());
+ ASSERT_EQ(type_, message->type());
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE, base::Bind(&FakeFeaturePeer::ForwardMessage,
+ base::Unretained(this), base::Passed(&message)));
+ if (!callback.is_null())
+ callback.Run(net::OK);
+ }
+
+ BlimpMessage::Type type_;
+ BlimpMessageProcessor* message_processor_ = nullptr;
+ scoped_refptr<base::SequencedTaskRunner> task_runner_;
+};
+
+// A browser connection handler that returns FakeFeaturePeer to allow it
+// forwarding message back so that FakeFeature can check message it receives
+// with one it just sent.
+class FakeBrowserConnectionHandler : public BrowserConnectionHandler {
+ public:
+ FakeBrowserConnectionHandler(
+ const scoped_refptr<base::SequencedTaskRunner>& task_runner)
+ : task_runner_(task_runner) {}
+ scoped_ptr<BlimpMessageProcessor> RegisterFeature(
+ BlimpMessage::Type type,
+ BlimpMessageProcessor* incoming_processor) override {
+ DCHECK(task_runner_->RunsTasksOnCurrentThread());
+ return make_scoped_ptr(
+ new FakeFeaturePeer(type, incoming_processor, task_runner_));
+ }
+
+ private:
+ scoped_refptr<base::SequencedTaskRunner> task_runner_;
+};
+
+} // namespace
+
+class ThreadPipeManagerTest : public testing::Test {
+ public:
+ ThreadPipeManagerTest() : thread_("IoThread") {}
+
+ ~ThreadPipeManagerTest() override {}
+
+ void SetUp() override {
+ ASSERT_TRUE(thread_.Start());
+ connection_handler_ = make_scoped_ptr(
+ new FakeBrowserConnectionHandler(thread_.task_runner()));
+ pipe_manager_ = make_scoped_ptr(new ThreadPipeManager(
+ thread_.task_runner(), base::SequencedTaskRunnerHandle::Get(),
+ connection_handler_.get()));
+
+ input_feature_.reset(
+ new FakeFeature(BlimpMessage::INPUT, pipe_manager_.get()));
+ tab_control_feature_.reset(
+ new FakeFeature(BlimpMessage::TAB_CONTROL, pipe_manager_.get()));
+ }
+
+ void TearDown() override { SynchronizeWithThread(); }
+
+ // Synchronize with |thread_| to ensure that any pending work is done.
+ void SynchronizeWithThread() {
+ net::TestCompletionCallback cb;
+ thread_.task_runner()->PostTaskAndReply(FROM_HERE,
+ base::Bind(&base::DoNothing),
+ base::Bind(cb.callback(), net::OK));
+ ASSERT_EQ(net::OK, cb.WaitForResult());
+ }
+
+ protected:
+ base::MessageLoop message_loop_;
+ scoped_ptr<BrowserConnectionHandler> connection_handler_;
+ scoped_ptr<ThreadPipeManager> pipe_manager_;
+ base::Thread thread_;
+
+ scoped_ptr<FakeFeature> input_feature_;
+ scoped_ptr<FakeFeature> tab_control_feature_;
+};
+
+// Features send out message and receive the same message due to
+// |FakeFeaturePeer| loops the message back on |thread_|.
+TEST_F(ThreadPipeManagerTest, MessageSentIsReceived) {
+ scoped_ptr<BlimpMessage> input_message = CreateMessage(BlimpMessage::INPUT);
+ scoped_ptr<BlimpMessage> tab_control_message =
+ CreateMessage(BlimpMessage::TAB_CONTROL);
+
+ EXPECT_CALL(*(input_feature_->incoming_message_processor()),
+ MockableProcessMessage(EqualsProto(*input_message), _))
+ .RetiresOnSaturation();
+ EXPECT_CALL(*(tab_control_feature_->incoming_message_processor()),
+ MockableProcessMessage(EqualsProto(*tab_control_message), _))
+ .RetiresOnSaturation();
+
+ net::TestCompletionCallback cb1;
+ input_feature_->outgoing_message_processor()->ProcessMessage(
+ std::move(input_message), cb1.callback());
+ net::TestCompletionCallback cb2;
+ tab_control_feature_->outgoing_message_processor()->ProcessMessage(
+ std::move(tab_control_message), cb2.callback());
+
+ EXPECT_EQ(net::OK, cb1.WaitForResult());
+ EXPECT_EQ(net::OK, cb2.WaitForResult());
+}
+
+} // namespace blimp