diff options
author | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-08-18 04:17:23 +0000 |
---|---|---|
committer | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-08-18 04:17:23 +0000 |
commit | 3e9187d08a26fb43a360a468c18b670bae2dd3b0 (patch) | |
tree | 7e3f0071f447b2ef76c2e6d11c08a9bc25e90911 /remoting/protocol | |
parent | 2c539b896f7cd8e1ff6a1209418a5c7d15b3b6bc (diff) | |
download | chromium_src-3e9187d08a26fb43a360a468c18b670bae2dd3b0.zip chromium_src-3e9187d08a26fb43a360a468c18b670bae2dd3b0.tar.gz chromium_src-3e9187d08a26fb43a360a468c18b670bae2dd3b0.tar.bz2 |
Add support for multiplexed channels in remoting::protocol::Session interface.
Now the Session interface has two methods that return channel factories - one for regular channels and one for multiplexed channels.
Also refactored AudioReader and AudioWriter to inherit from
ChannelDispatcherBase.
BUG=137135
Review URL: https://chromiumcodereview.appspot.com/10823323
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@152240 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'remoting/protocol')
20 files changed, 230 insertions, 221 deletions
diff --git a/remoting/protocol/audio_reader.cc b/remoting/protocol/audio_reader.cc index 4269176..9723201 100644 --- a/remoting/protocol/audio_reader.cc +++ b/remoting/protocol/audio_reader.cc @@ -14,14 +14,12 @@ namespace remoting { namespace protocol { AudioReader::AudioReader(AudioPacket::Encoding encoding) - : session_(NULL), + : ChannelDispatcherBase(kAudioChannelName), encoding_(encoding), audio_stub_(NULL) { } AudioReader::~AudioReader() { - if (session_) - session_->CancelChannelCreation(kAudioChannelName); } // static @@ -32,33 +30,9 @@ scoped_ptr<AudioReader> AudioReader::Create(const SessionConfig& config) { return scoped_ptr<AudioReader>(new AudioReader(AudioPacket::ENCODING_RAW)); } -void AudioReader::Init(protocol::Session* session, - AudioStub* audio_stub, - const InitializedCallback& callback) { - session_ = session; - initialized_callback_ = callback; - audio_stub_ = audio_stub; - - session_->CreateStreamChannel( - kAudioChannelName, - base::Bind(&AudioReader::OnChannelReady, base::Unretained(this))); -} - -bool AudioReader::is_connected() { - return channel_.get() != NULL; -} - -void AudioReader::OnChannelReady(scoped_ptr<net::StreamSocket> socket) { - if (!socket.get()) { - initialized_callback_.Run(false); - return; - } - - DCHECK(!channel_.get()); - channel_ = socket.Pass(); - reader_.Init(channel_.get(), base::Bind(&AudioReader::OnNewData, - base::Unretained(this))); - initialized_callback_.Run(true); +void AudioReader::OnInitialized() { + reader_.Init(channel(), base::Bind(&AudioReader::OnNewData, + base::Unretained(this))); } void AudioReader::OnNewData(scoped_ptr<AudioPacket> packet, diff --git a/remoting/protocol/audio_reader.h b/remoting/protocol/audio_reader.h index 2d708db..fe7b806 100644 --- a/remoting/protocol/audio_reader.h +++ b/remoting/protocol/audio_reader.h @@ -10,6 +10,7 @@ #include "remoting/proto/audio.pb.h" #include "remoting/protocol/audio_stub.h" #include "remoting/protocol/message_reader.h" +#include "remoting/protocol/channel_dispatcher_base.h" namespace net { class StreamSocket; @@ -21,38 +22,25 @@ namespace protocol { class Session; class SessionConfig; -class AudioReader { +class AudioReader : public ChannelDispatcherBase { public: - // The callback is called when initialization is finished. The - // parameter is set to true on success. - typedef base::Callback<void(bool)> InitializedCallback; + static scoped_ptr<AudioReader> Create(const SessionConfig& config); virtual ~AudioReader(); - static scoped_ptr<AudioReader> Create(const SessionConfig& config); + void set_audio_stub(AudioStub* audio_stub) { audio_stub_ = audio_stub; } - // Initializies the reader. - void Init(Session* session, - AudioStub* audio_stub, - const InitializedCallback& callback); - bool is_connected(); + protected: + virtual void OnInitialized() OVERRIDE; private: explicit AudioReader(AudioPacket::Encoding encoding); - void OnChannelReady(scoped_ptr<net::StreamSocket> socket); void OnNewData(scoped_ptr<AudioPacket> packet, const base::Closure& done_task); - Session* session_; - - InitializedCallback initialized_callback_; - AudioPacket::Encoding encoding_; - // TODO(sergeyu): Remove |channel_| and let |reader_| own it. - scoped_ptr<net::StreamSocket> channel_; - ProtobufMessageReader<AudioPacket> reader_; // The stub that processes all received packets. diff --git a/remoting/protocol/audio_writer.cc b/remoting/protocol/audio_writer.cc index 4b786a1..37c90ed 100644 --- a/remoting/protocol/audio_writer.cc +++ b/remoting/protocol/audio_writer.cc @@ -16,53 +16,20 @@ namespace remoting { namespace protocol { AudioWriter::AudioWriter() - : session_(NULL) { + : ChannelDispatcherBase(kAudioChannelName) { } AudioWriter::~AudioWriter() { - Close(); } -void AudioWriter::Init(protocol::Session* session, - const InitializedCallback& callback) { - session_ = session; - initialized_callback_ = callback; - - session_->CreateStreamChannel( - kAudioChannelName, - base::Bind(&AudioWriter::OnChannelReady, base::Unretained(this))); -} - -void AudioWriter::OnChannelReady(scoped_ptr<net::StreamSocket> socket) { - if (!socket.get()) { - initialized_callback_.Run(false); - return; - } - - DCHECK(!channel_.get()); - channel_ = socket.Pass(); - // TODO(sergeyu): Provide WriteFailedCallback for the buffered writer. +void AudioWriter::OnInitialized() { + // TODO(sergeyu): Provide a non-null WriteFailedCallback for the writer. buffered_writer_.Init( - channel_.get(), BufferedSocketWriter::WriteFailedCallback()); - - initialized_callback_.Run(true); -} - -void AudioWriter::Close() { - buffered_writer_.Close(); - channel_.reset(); - if (session_) { - session_->CancelChannelCreation(kAudioChannelName); - session_ = NULL; - } -} - -bool AudioWriter::is_connected() { - return channel_.get() != NULL; + channel(), BufferedSocketWriter::WriteFailedCallback()); } void AudioWriter::ProcessAudioPacket(scoped_ptr<AudioPacket> packet, - const base::Closure& done) { + const base::Closure& done) { buffered_writer_.Write(SerializeAndFrameMessage(*packet), done); } diff --git a/remoting/protocol/audio_writer.h b/remoting/protocol/audio_writer.h index 34bd8f9..0ec008e 100644 --- a/remoting/protocol/audio_writer.h +++ b/remoting/protocol/audio_writer.h @@ -13,6 +13,7 @@ #include "base/memory/scoped_ptr.h" #include "remoting/protocol/audio_stub.h" #include "remoting/protocol/buffered_socket_writer.h" +#include "remoting/protocol/channel_dispatcher_base.h" namespace net { class StreamSocket; @@ -24,42 +25,25 @@ namespace protocol { class Session; class SessionConfig; -class AudioWriter : public AudioStub { +class AudioWriter : public ChannelDispatcherBase, + public AudioStub { public: - virtual ~AudioWriter(); - - // The callback is called when initialization is finished. The - // parameter is set to true on success. - typedef base::Callback<void(bool)> InitializedCallback; - + // Once AudioWriter is created, the Init() method of ChannelDispatcherBase + // should be used to initialize it for the session. static scoped_ptr<AudioWriter> Create(const SessionConfig& config); - // Initializes the writer. - void Init(Session* session, const InitializedCallback& callback); - - // Stops writing. Must be called on the network thread before this - // object is destroyed. - void Close(); - - // Returns true if the channel is connected. - bool is_connected(); + virtual ~AudioWriter(); // AudioStub interface. virtual void ProcessAudioPacket(scoped_ptr<AudioPacket> packet, const base::Closure& done) OVERRIDE; + protected: + virtual void OnInitialized() OVERRIDE; + private: AudioWriter(); - void OnChannelReady(scoped_ptr<net::StreamSocket> socket); - - Session* session_; - - InitializedCallback initialized_callback_; - - // TODO(sergeyu): Remove |channel_| and let |buffered_writer_| own it. - scoped_ptr<net::StreamSocket> channel_; - BufferedSocketWriter buffered_writer_; DISALLOW_COPY_AND_ASSIGN(AudioWriter); diff --git a/remoting/protocol/channel_dispatcher_base.cc b/remoting/protocol/channel_dispatcher_base.cc index ca97ecb..7eb88e0 100644 --- a/remoting/protocol/channel_dispatcher_base.cc +++ b/remoting/protocol/channel_dispatcher_base.cc @@ -6,6 +6,7 @@ #include "base/bind.h" #include "net/socket/stream_socket.h" +#include "remoting/protocol/channel_factory.h" #include "remoting/protocol/session.h" namespace remoting { @@ -13,21 +14,21 @@ namespace protocol { ChannelDispatcherBase::ChannelDispatcherBase(const char* channel_name) : channel_name_(channel_name), - session_(NULL) { + channel_factory_(NULL) { } ChannelDispatcherBase::~ChannelDispatcherBase() { - if (session_) - session_->CancelChannelCreation(channel_name_); + if (channel_factory_) + channel_factory_->CancelChannelCreation(channel_name_); } void ChannelDispatcherBase::Init(Session* session, const InitializedCallback& callback) { DCHECK(session); - session_ = session; + channel_factory_ = session->GetTransportChannelFactory(); initialized_callback_ = callback; - session_->CreateStreamChannel(channel_name_, base::Bind( + channel_factory_->CreateStreamChannel(channel_name_, base::Bind( &ChannelDispatcherBase::OnChannelReady, base::Unretained(this))); } diff --git a/remoting/protocol/channel_dispatcher_base.h b/remoting/protocol/channel_dispatcher_base.h index 999b2b93..3a663fe 100644 --- a/remoting/protocol/channel_dispatcher_base.h +++ b/remoting/protocol/channel_dispatcher_base.h @@ -18,6 +18,7 @@ class StreamSocket; namespace remoting { namespace protocol { +class ChannelFactory; class Session; // Base class for channel message dispatchers. It's responsible for @@ -52,7 +53,7 @@ class ChannelDispatcherBase { void OnChannelReady(scoped_ptr<net::StreamSocket> socket); std::string channel_name_; - Session* session_; + ChannelFactory* channel_factory_; InitializedCallback initialized_callback_; scoped_ptr<net::StreamSocket> channel_; diff --git a/remoting/protocol/channel_multiplexer.cc b/remoting/protocol/channel_multiplexer.cc index 71647bf..3ad87bc 100644 --- a/remoting/protocol/channel_multiplexer.cc +++ b/remoting/protocol/channel_multiplexer.cc @@ -365,10 +365,6 @@ ChannelMultiplexer::ChannelMultiplexer(ChannelFactory* factory, base_channel_name_(base_channel_name), next_channel_id_(0), destroyed_flag_(NULL) { - factory->CreateStreamChannel( - base_channel_name, - base::Bind(&ChannelMultiplexer::OnBaseChannelReady, - base::Unretained(this))); } ChannelMultiplexer::~ChannelMultiplexer() { @@ -396,6 +392,14 @@ void ChannelMultiplexer::CreateStreamChannel( } else { // Still waiting for the |base_channel_|. pending_channels_.push_back(PendingChannel(name, callback)); + + // If this is the first multiplexed channel then create the base channel. + if (pending_channels_.size() == 1U) { + base_channel_factory_->CreateStreamChannel( + base_channel_name_, + base::Bind(&ChannelMultiplexer::OnBaseChannelReady, + base::Unretained(this))); + } } } diff --git a/remoting/protocol/channel_multiplexer_unittest.cc b/remoting/protocol/channel_multiplexer_unittest.cc index 11a459b..c4c8179 100644 --- a/remoting/protocol/channel_multiplexer_unittest.cc +++ b/remoting/protocol/channel_multiplexer_unittest.cc @@ -56,6 +56,11 @@ class ChannelMultiplexerTest : public testing::Test { host_mux_.reset(new ChannelMultiplexer(&host_session_, kMuxChannelName)); client_mux_.reset(new ChannelMultiplexer(&client_session_, kMuxChannelName)); + } + + // Connect sockets to each other. Must be called after we've created at least + // one channel with each multiplexer. + void ConnectSockets() { FakeSocket* host_socket = host_session_.GetStreamChannel(ChannelMultiplexer::kMuxChannelName); FakeSocket* client_socket = @@ -123,6 +128,8 @@ TEST_F(ChannelMultiplexerTest, OneChannel) { scoped_ptr<net::StreamSocket> client_socket; ASSERT_NO_FATAL_FAILURE(CreateChannel("test", &host_socket, &client_socket)); + ConnectSockets(); + StreamConnectionTester tester(host_socket.get(), client_socket.get(), kMessageSize, kMessages); tester.Start(); @@ -141,6 +148,8 @@ TEST_F(ChannelMultiplexerTest, TwoChannels) { ASSERT_NO_FATAL_FAILURE( CreateChannel("ch2", &host_socket2_, &client_socket2_)); + ConnectSockets(); + StreamConnectionTester tester1(host_socket1_.get(), client_socket1_.get(), kMessageSize, kMessages); StreamConnectionTester tester2(host_socket2_.get(), client_socket2_.get(), @@ -176,6 +185,8 @@ TEST_F(ChannelMultiplexerTest, FourChannels) { ASSERT_NO_FATAL_FAILURE( CreateChannel("ch4", &host_socket4, &client_socket4)); + ConnectSockets(); + StreamConnectionTester tester1(host_socket1_.get(), client_socket1_.get(), kMessageSize, kMessages); StreamConnectionTester tester2(host_socket2_.get(), client_socket2_.get(), @@ -209,6 +220,8 @@ TEST_F(ChannelMultiplexerTest, SyncFail) { ASSERT_NO_FATAL_FAILURE( CreateChannel("ch2", &host_socket2_, &client_socket2_)); + ConnectSockets(); + host_session_.GetStreamChannel(kMuxChannelName)-> set_next_write_error(net::ERR_FAILED); host_session_.GetStreamChannel(kMuxChannelName)-> @@ -239,6 +252,8 @@ TEST_F(ChannelMultiplexerTest, AsyncFail) { ASSERT_NO_FATAL_FAILURE( CreateChannel("ch2", &host_socket2_, &client_socket2_)); + ConnectSockets(); + host_session_.GetStreamChannel(kMuxChannelName)-> set_next_write_error(net::ERR_FAILED); host_session_.GetStreamChannel(kMuxChannelName)-> @@ -267,6 +282,8 @@ TEST_F(ChannelMultiplexerTest, DeleteWhenFailed) { ASSERT_NO_FATAL_FAILURE( CreateChannel("ch2", &host_socket2_, &client_socket2_)); + ConnectSockets(); + host_session_.GetStreamChannel(kMuxChannelName)-> set_next_write_error(net::ERR_FAILED); host_session_.GetStreamChannel(kMuxChannelName)-> diff --git a/remoting/protocol/connection_to_host.cc b/remoting/protocol/connection_to_host.cc index 5326660..2064077 100644 --- a/remoting/protocol/connection_to_host.cc +++ b/remoting/protocol/connection_to_host.cc @@ -171,8 +171,9 @@ void ConnectionToHost::OnSessionStateChange( audio_reader_ = AudioReader::Create(session_->config()); if (audio_reader_.get()) { - audio_reader_->Init(session_.get(), audio_stub_, base::Bind( + audio_reader_->Init(session_.get(), base::Bind( &ConnectionToHost::OnChannelInitialized, base::Unretained(this))); + audio_reader_->set_audio_stub(audio_stub_); } control_dispatcher_.reset(new ClientControlDispatcher()); diff --git a/remoting/protocol/fake_session.cc b/remoting/protocol/fake_session.cc index dc64a1d..4d547430 100644 --- a/remoting/protocol/fake_session.cc +++ b/remoting/protocol/fake_session.cc @@ -308,23 +308,6 @@ ErrorCode FakeSession::error() { return error_; } -void FakeSession::CreateStreamChannel( - const std::string& name, const StreamChannelCallback& callback) { - scoped_ptr<FakeSocket> channel(new FakeSocket()); - stream_channels_[name] = channel.get(); - callback.Run(channel.PassAs<net::StreamSocket>()); -} - -void FakeSession::CreateDatagramChannel( - const std::string& name, const DatagramChannelCallback& callback) { - scoped_ptr<FakeUdpSocket> channel(new FakeUdpSocket()); - datagram_channels_[name] = channel.get(); - callback.Run(channel.PassAs<net::Socket>()); -} - -void FakeSession::CancelChannelCreation(const std::string& name) { -} - const std::string& FakeSession::jid() { return jid_; } @@ -341,9 +324,34 @@ void FakeSession::set_config(const SessionConfig& config) { config_ = config; } +ChannelFactory* FakeSession::GetTransportChannelFactory() { + return this; +} + +ChannelFactory* FakeSession::GetMultiplexedChannelFactory() { + return this; +} + void FakeSession::Close() { closed_ = true; } +void FakeSession::CreateStreamChannel( + const std::string& name, const StreamChannelCallback& callback) { + scoped_ptr<FakeSocket> channel(new FakeSocket()); + stream_channels_[name] = channel.get(); + callback.Run(channel.PassAs<net::StreamSocket>()); +} + +void FakeSession::CreateDatagramChannel( + const std::string& name, const DatagramChannelCallback& callback) { + scoped_ptr<FakeUdpSocket> channel(new FakeUdpSocket()); + datagram_channels_[name] = channel.get(); + callback.Run(channel.PassAs<net::Socket>()); +} + +void FakeSession::CancelChannelCreation(const std::string& name) { +} + } // namespace protocol } // namespace remoting diff --git a/remoting/protocol/fake_session.h b/remoting/protocol/fake_session.h index 848a2bf..5683241 100644 --- a/remoting/protocol/fake_session.h +++ b/remoting/protocol/fake_session.h @@ -14,6 +14,7 @@ #include "net/base/completion_callback.h" #include "net/socket/socket.h" #include "net/socket/stream_socket.h" +#include "remoting/protocol/channel_factory.h" #include "remoting/protocol/session.h" class MessageLoop; @@ -146,7 +147,8 @@ class FakeUdpSocket : public net::Socket { // FakeSession is a dummy protocol::Session that uses FakeSocket for all // channels. -class FakeSession : public Session { +class FakeSession : public Session, + public ChannelFactory { public: FakeSession(); virtual ~FakeSession(); @@ -164,11 +166,18 @@ class FakeSession : public Session { FakeSocket* GetStreamChannel(const std::string& name); FakeUdpSocket* GetDatagramChannel(const std::string& name); - // Session implementation. + // Session interface. virtual void SetEventHandler(EventHandler* event_handler) OVERRIDE; - virtual ErrorCode error() OVERRIDE; + virtual const std::string& jid() OVERRIDE; + virtual const CandidateSessionConfig* candidate_config() OVERRIDE; + virtual const SessionConfig& config() OVERRIDE; + virtual void set_config(const SessionConfig& config) OVERRIDE; + virtual ChannelFactory* GetTransportChannelFactory() OVERRIDE; + virtual ChannelFactory* GetMultiplexedChannelFactory() OVERRIDE; + virtual void Close() OVERRIDE; + // ChannelFactory interface. virtual void CreateStreamChannel( const std::string& name, const StreamChannelCallback& callback) OVERRIDE; virtual void CreateDatagramChannel( @@ -176,14 +185,6 @@ class FakeSession : public Session { const DatagramChannelCallback& callback) OVERRIDE; virtual void CancelChannelCreation(const std::string& name) OVERRIDE; - virtual const std::string& jid() OVERRIDE; - - virtual const CandidateSessionConfig* candidate_config() OVERRIDE; - virtual const SessionConfig& config() OVERRIDE; - virtual void set_config(const SessionConfig& config) OVERRIDE; - - virtual void Close() OVERRIDE; - public: EventHandler* event_handler_; scoped_ptr<const CandidateSessionConfig> candidate_config_; diff --git a/remoting/protocol/jingle_session.cc b/remoting/protocol/jingle_session.cc index 814e09f..bcf105b 100644 --- a/remoting/protocol/jingle_session.cc +++ b/remoting/protocol/jingle_session.cc @@ -13,6 +13,7 @@ #include "remoting/jingle_glue/iq_sender.h" #include "remoting/protocol/authenticator.h" #include "remoting/protocol/channel_authenticator.h" +#include "remoting/protocol/channel_multiplexer.h" #include "remoting/protocol/content_description.h" #include "remoting/protocol/jingle_messages.h" #include "remoting/protocol/jingle_session_manager.h" @@ -38,6 +39,9 @@ const int kTransportInfoSendDelayMs = 2; // |transport-info|. const int kMessageResponseTimeoutSeconds = 10; +// Name of the multiplexed channel. +const char kMuxChannelName[] = "mux"; + ErrorCode AuthRejectionReasonToErrorCode( Authenticator::RejectionReason reason) { switch (reason) { @@ -61,6 +65,7 @@ JingleSession::JingleSession(JingleSessionManager* session_manager) } JingleSession::~JingleSession() { + channel_multiplexer_.reset(); STLDeleteContainerPointers(pending_requests_.begin(), pending_requests_.end()); STLDeleteContainerPairSecondPointers(channels_.begin(), channels_.end()); @@ -170,6 +175,46 @@ void JingleSession::AcceptIncomingConnection( return; } +const std::string& JingleSession::jid() { + DCHECK(CalledOnValidThread()); + return peer_jid_; +} + +const CandidateSessionConfig* JingleSession::candidate_config() { + DCHECK(CalledOnValidThread()); + return candidate_config_.get(); +} + +const SessionConfig& JingleSession::config() { + DCHECK(CalledOnValidThread()); + return config_; +} + +void JingleSession::set_config(const SessionConfig& config) { + DCHECK(CalledOnValidThread()); + DCHECK(!config_is_set_); + config_ = config; + config_is_set_ = true; +} + +ChannelFactory* JingleSession::GetTransportChannelFactory() { + DCHECK(CalledOnValidThread()); + return this; +} + +ChannelFactory* JingleSession::GetMultiplexedChannelFactory() { + DCHECK(CalledOnValidThread()); + if (!channel_multiplexer_.get()) + channel_multiplexer_.reset(new ChannelMultiplexer(this, kMuxChannelName)); + return channel_multiplexer_.get(); +} + +void JingleSession::Close() { + DCHECK(CalledOnValidThread()); + + CloseInternal(OK); +} + void JingleSession::CreateStreamChannel( const std::string& name, const StreamChannelCallback& callback) { @@ -206,34 +251,6 @@ void JingleSession::CancelChannelCreation(const std::string& name) { } } -const std::string& JingleSession::jid() { - DCHECK(CalledOnValidThread()); - return peer_jid_; -} - -const CandidateSessionConfig* JingleSession::candidate_config() { - DCHECK(CalledOnValidThread()); - return candidate_config_.get(); -} - -const SessionConfig& JingleSession::config() { - DCHECK(CalledOnValidThread()); - return config_; -} - -void JingleSession::set_config(const SessionConfig& config) { - DCHECK(CalledOnValidThread()); - DCHECK(!config_is_set_); - config_ = config; - config_is_set_ = true; -} - -void JingleSession::Close() { - DCHECK(CalledOnValidThread()); - - CloseInternal(OK); -} - void JingleSession::OnTransportCandidate(Transport* transport, const cricket::Candidate& candidate) { pending_candidates_.push_back(JingleMessage::NamedCandidate( diff --git a/remoting/protocol/jingle_session.h b/remoting/protocol/jingle_session.h index 202fb05..7743229 100644 --- a/remoting/protocol/jingle_session.h +++ b/remoting/protocol/jingle_session.h @@ -15,6 +15,7 @@ #include "net/base/completion_callback.h" #include "remoting/jingle_glue/iq_sender.h" #include "remoting/protocol/authenticator.h" +#include "remoting/protocol/channel_factory.h" #include "remoting/protocol/jingle_messages.h" #include "remoting/protocol/session.h" #include "remoting/protocol/session_config.h" @@ -28,12 +29,14 @@ class StreamSocket; namespace remoting { namespace protocol { +class ChannelMultiplexer; class JingleSessionManager; // JingleSessionManager and JingleSession implement the subset of the // Jingle protocol used in Chromoting. Instances of this class are // created by the JingleSessionManager. class JingleSession : public Session, + public ChannelFactory, public Transport::EventHandler { public: virtual ~JingleSession(); @@ -41,6 +44,15 @@ class JingleSession : public Session, // Session interface. virtual void SetEventHandler(Session::EventHandler* event_handler) OVERRIDE; virtual ErrorCode error() OVERRIDE; + virtual const std::string& jid() OVERRIDE; + virtual const CandidateSessionConfig* candidate_config() OVERRIDE; + virtual const SessionConfig& config() OVERRIDE; + virtual void set_config(const SessionConfig& config) OVERRIDE; + virtual ChannelFactory* GetTransportChannelFactory() OVERRIDE; + virtual ChannelFactory* GetMultiplexedChannelFactory() OVERRIDE; + virtual void Close() OVERRIDE; + + // ChannelFactory interface. virtual void CreateStreamChannel( const std::string& name, const StreamChannelCallback& callback) OVERRIDE; @@ -48,11 +60,6 @@ class JingleSession : public Session, const std::string& name, const DatagramChannelCallback& callback) OVERRIDE; virtual void CancelChannelCreation(const std::string& name) OVERRIDE; - virtual const std::string& jid() OVERRIDE; - virtual const CandidateSessionConfig* candidate_config() OVERRIDE; - virtual const SessionConfig& config() OVERRIDE; - virtual void set_config(const SessionConfig& config) OVERRIDE; - virtual void Close() OVERRIDE; // Transport::EventHandler interface. virtual void OnTransportCandidate( @@ -142,6 +149,7 @@ class JingleSession : public Session, std::list<IqRequest*> pending_requests_; ChannelsMap channels_; + scoped_ptr<ChannelMultiplexer> channel_multiplexer_; base::OneShotTimer<JingleSession> transport_infos_timer_; std::list<JingleMessage::NamedCandidate> pending_candidates_; diff --git a/remoting/protocol/jingle_session_unittest.cc b/remoting/protocol/jingle_session_unittest.cc index 0942934..623de44 100644 --- a/remoting/protocol/jingle_session_unittest.cc +++ b/remoting/protocol/jingle_session_unittest.cc @@ -228,13 +228,15 @@ class JingleSessionTest : public testing::Test { } void CreateChannel() { - client_session_->CreateStreamChannel(kChannelName, base::Bind( - &JingleSessionTest::OnClientChannelCreated, base::Unretained(this))); - host_session_->CreateStreamChannel(kChannelName, base::Bind( - &JingleSessionTest::OnHostChannelCreated, base::Unretained(this))); + client_session_->GetTransportChannelFactory()->CreateStreamChannel( + kChannelName, base::Bind(&JingleSessionTest::OnClientChannelCreated, + base::Unretained(this))); + host_session_->GetTransportChannelFactory()->CreateStreamChannel( + kChannelName, base::Bind(&JingleSessionTest::OnHostChannelCreated, + base::Unretained(this))); int counter = 2; - ExpectRouteChange(); + ExpectRouteChange(kChannelName); EXPECT_CALL(client_channel_callback_, OnDone(_)) .WillOnce(QuitThreadOnCounter(&counter)); EXPECT_CALL(host_channel_callback_, OnDone(_)) @@ -245,12 +247,12 @@ class JingleSessionTest : public testing::Test { EXPECT_TRUE(host_socket_.get()); } - void ExpectRouteChange() { + void ExpectRouteChange(const std::string& channel_name) { EXPECT_CALL(host_session_event_handler_, - OnSessionRouteChange(kChannelName, _)) + OnSessionRouteChange(channel_name, _)) .Times(AtLeast(1)); EXPECT_CALL(client_session_event_handler_, - OnSessionRouteChange(kChannelName, _)) + OnSessionRouteChange(channel_name, _)) .Times(AtLeast(1)); } @@ -357,6 +359,37 @@ TEST_F(JingleSessionTest, TestStreamChannel) { tester.CheckResults(); } +// Verify that data can be sent over a multiplexed channel. +TEST_F(JingleSessionTest, TestMuxStreamChannel) { + CreateSessionManagers(1, FakeAuthenticator::ACCEPT); + ASSERT_NO_FATAL_FAILURE( + InitiateConnection(1, FakeAuthenticator::ACCEPT, false)); + + client_session_->GetMultiplexedChannelFactory()->CreateStreamChannel( + kChannelName, base::Bind(&JingleSessionTest::OnClientChannelCreated, + base::Unretained(this))); + host_session_->GetMultiplexedChannelFactory()->CreateStreamChannel( + kChannelName, base::Bind(&JingleSessionTest::OnHostChannelCreated, + base::Unretained(this))); + + int counter = 2; + ExpectRouteChange("mux"); + EXPECT_CALL(client_channel_callback_, OnDone(_)) + .WillOnce(QuitThreadOnCounter(&counter)); + EXPECT_CALL(host_channel_callback_, OnDone(_)) + .WillOnce(QuitThreadOnCounter(&counter)); + message_loop_->Run(); + + EXPECT_TRUE(client_socket_.get()); + EXPECT_TRUE(host_socket_.get()); + + StreamConnectionTester tester(host_socket_.get(), client_socket_.get(), + kMessageSize, kMessages); + tester.Start(); + message_loop_->Run(); + tester.CheckResults(); +} + // Verify that we can connect channels with multistep auth. TEST_F(JingleSessionTest, TestMultistepAuthStreamChannel) { CreateSessionManagers(3, FakeAuthenticator::ACCEPT); @@ -378,10 +411,12 @@ TEST_F(JingleSessionTest, TestFailedChannelAuth) { ASSERT_NO_FATAL_FAILURE( InitiateConnection(1, FakeAuthenticator::ACCEPT, false)); - client_session_->CreateStreamChannel(kChannelName, base::Bind( - &JingleSessionTest::OnClientChannelCreated, base::Unretained(this))); - host_session_->CreateStreamChannel(kChannelName, base::Bind( - &JingleSessionTest::OnHostChannelCreated, base::Unretained(this))); + client_session_->GetTransportChannelFactory()->CreateStreamChannel( + kChannelName, base::Bind(&JingleSessionTest::OnClientChannelCreated, + base::Unretained(this))); + host_session_->GetTransportChannelFactory()->CreateStreamChannel( + kChannelName, base::Bind(&JingleSessionTest::OnHostChannelCreated, + base::Unretained(this))); // Terminate the message loop when we get rejection notification // from the host. @@ -389,7 +424,7 @@ TEST_F(JingleSessionTest, TestFailedChannelAuth) { .WillOnce(QuitThread()); EXPECT_CALL(client_channel_callback_, OnDone(_)) .Times(AtMost(1)); - ExpectRouteChange(); + ExpectRouteChange(kChannelName); message_loop_->Run(); diff --git a/remoting/protocol/protobuf_video_reader.cc b/remoting/protocol/protobuf_video_reader.cc index 827a63f..178a5de 100644 --- a/remoting/protocol/protobuf_video_reader.cc +++ b/remoting/protocol/protobuf_video_reader.cc @@ -8,30 +8,31 @@ #include "net/socket/stream_socket.h" #include "remoting/base/constants.h" #include "remoting/proto/video.pb.h" +#include "remoting/protocol/channel_factory.h" #include "remoting/protocol/session.h" namespace remoting { namespace protocol { ProtobufVideoReader::ProtobufVideoReader(VideoPacketFormat::Encoding encoding) - : session_(NULL), - encoding_(encoding), + : encoding_(encoding), + channel_factory_(NULL), video_stub_(NULL) { } ProtobufVideoReader::~ProtobufVideoReader() { - if (session_) - session_->CancelChannelCreation(kVideoChannelName); + if (channel_factory_) + channel_factory_->CancelChannelCreation(kVideoChannelName); } void ProtobufVideoReader::Init(protocol::Session* session, VideoStub* video_stub, const InitializedCallback& callback) { - session_ = session; + channel_factory_ = session->GetTransportChannelFactory(); initialized_callback_ = callback; video_stub_ = video_stub; - session_->CreateStreamChannel( + channel_factory_->CreateStreamChannel( kVideoChannelName, base::Bind(&ProtobufVideoReader::OnChannelReady, base::Unretained(this))); } diff --git a/remoting/protocol/protobuf_video_reader.h b/remoting/protocol/protobuf_video_reader.h index 8bf07b2..f6bb55c 100644 --- a/remoting/protocol/protobuf_video_reader.h +++ b/remoting/protocol/protobuf_video_reader.h @@ -17,6 +17,7 @@ class StreamSocket; namespace remoting { namespace protocol { +class ChannelFactory; class Session; class ProtobufVideoReader : public VideoReader { @@ -35,13 +36,11 @@ class ProtobufVideoReader : public VideoReader { void OnNewData(scoped_ptr<VideoPacket> packet, const base::Closure& done_task); - Session* session_; - InitializedCallback initialized_callback_; VideoPacketFormat::Encoding encoding_; - // TODO(sergeyu): Remove |channel_| and let |reader_| own it. + ChannelFactory* channel_factory_; scoped_ptr<net::StreamSocket> channel_; ProtobufMessageReader<VideoPacket> reader_; diff --git a/remoting/protocol/protobuf_video_writer.cc b/remoting/protocol/protobuf_video_writer.cc index 12dd6611a..e1318c0 100644 --- a/remoting/protocol/protobuf_video_writer.cc +++ b/remoting/protocol/protobuf_video_writer.cc @@ -8,6 +8,7 @@ #include "net/socket/stream_socket.h" #include "remoting/base/constants.h" #include "remoting/proto/video.pb.h" +#include "remoting/protocol/channel_factory.h" #include "remoting/protocol/session.h" #include "remoting/protocol/util.h" @@ -15,7 +16,7 @@ namespace remoting { namespace protocol { ProtobufVideoWriter::ProtobufVideoWriter() - : session_(NULL) { + : channel_factory_(NULL) { } ProtobufVideoWriter::~ProtobufVideoWriter() { @@ -24,10 +25,10 @@ ProtobufVideoWriter::~ProtobufVideoWriter() { void ProtobufVideoWriter::Init(protocol::Session* session, const InitializedCallback& callback) { - session_ = session; + channel_factory_ = session->GetTransportChannelFactory(); initialized_callback_ = callback; - session_->CreateStreamChannel( + channel_factory_->CreateStreamChannel( kVideoChannelName, base::Bind(&ProtobufVideoWriter::OnChannelReady, base::Unretained(this))); } @@ -50,9 +51,9 @@ void ProtobufVideoWriter::OnChannelReady(scoped_ptr<net::StreamSocket> socket) { void ProtobufVideoWriter::Close() { buffered_writer_.Close(); channel_.reset(); - if (session_) { - session_->CancelChannelCreation(kVideoChannelName); - session_ = NULL; + if (channel_factory_) { + channel_factory_->CancelChannelCreation(kVideoChannelName); + channel_factory_ = NULL; } } diff --git a/remoting/protocol/protobuf_video_writer.h b/remoting/protocol/protobuf_video_writer.h index 83f551c..9c373170 100644 --- a/remoting/protocol/protobuf_video_writer.h +++ b/remoting/protocol/protobuf_video_writer.h @@ -20,6 +20,7 @@ class StreamSocket; namespace remoting { namespace protocol { +class ChannelFactory; class Session; class ProtobufVideoWriter : public VideoWriter { @@ -41,11 +42,9 @@ class ProtobufVideoWriter : public VideoWriter { private: void OnChannelReady(scoped_ptr<net::StreamSocket> socket); - Session* session_; - InitializedCallback initialized_callback_; - // TODO(sergeyu): Remove |channel_| and let |buffered_writer_| own it. + ChannelFactory* channel_factory_; scoped_ptr<net::StreamSocket> channel_; BufferedSocketWriter buffered_writer_; diff --git a/remoting/protocol/protocol_mock_objects.h b/remoting/protocol/protocol_mock_objects.h index 16337a6..9e64bc2 100644 --- a/remoting/protocol/protocol_mock_objects.h +++ b/remoting/protocol/protocol_mock_objects.h @@ -152,11 +152,8 @@ class MockSession : public Session { MOCK_METHOD1(SetEventHandler, void(Session::EventHandler* event_handler)); MOCK_METHOD0(error, ErrorCode()); - MOCK_METHOD2(CreateStreamChannel, void( - const std::string& name, const StreamChannelCallback& callback)); - MOCK_METHOD2(CreateDatagramChannel, void( - const std::string& name, const DatagramChannelCallback& callback)); - MOCK_METHOD1(CancelChannelCreation, void(const std::string& name)); + MOCK_METHOD0(GetTransportChannelFactory, ChannelFactory*()); + MOCK_METHOD0(GetMultiplexedChannelFactory, ChannelFactory*()); MOCK_METHOD0(jid, const std::string&()); MOCK_METHOD0(candidate_config, const CandidateSessionConfig*()); MOCK_METHOD0(config, const SessionConfig&()); diff --git a/remoting/protocol/session.h b/remoting/protocol/session.h index 7041486..daa12e4 100644 --- a/remoting/protocol/session.h +++ b/remoting/protocol/session.h @@ -7,7 +7,6 @@ #include <string> -#include "remoting/protocol/channel_factory.h" #include "remoting/protocol/errors.h" #include "remoting/protocol/session_config.h" @@ -18,12 +17,13 @@ class IPEndPoint; namespace remoting { namespace protocol { +class ChannelFactory; struct TransportRoute; // Generic interface for Chromotocol connection used by both client and host. // Provides access to the connection channels, but doesn't depend on the // protocol used for each channel. -class Session : public ChannelFactory { +class Session { public: enum State { // Created, but not connecting yet. @@ -98,6 +98,12 @@ class Session : public ChannelFactory { // ChromotocolServer::IncomingConnectionCallback. virtual void set_config(const SessionConfig& config) = 0; + // GetTransportChannelFactory() returns a factory that creates a new transport + // channel for each logical channel. GetMultiplexedChannelFactory() channels + // share a single underlying transport channel + virtual ChannelFactory* GetTransportChannelFactory() = 0; + virtual ChannelFactory* GetMultiplexedChannelFactory() = 0; + // Closes connection. Callbacks are guaranteed not to be called // after this method returns. Must be called before the object is // destroyed, unless the state is set to FAILED or CLOSED. |