diff options
author | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-07-06 17:25:25 +0000 |
---|---|---|
committer | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-07-06 17:25:25 +0000 |
commit | 964bd398fa49519c9afa19c66d4de56d842e884d (patch) | |
tree | 108abba4d394d8e29d181c3de23f4e401c663998 | |
parent | 7d2382564745c529ee4fe746df57ccaa2b636e40 (diff) | |
download | chromium_src-964bd398fa49519c9afa19c66d4de56d842e884d.zip chromium_src-964bd398fa49519c9afa19c66d4de56d842e884d.tar.gz chromium_src-964bd398fa49519c9afa19c66d4de56d842e884d.tar.bz2 |
Make Session and SessionManager not thread-safe and not ref-counted.
BUG=None
TEST=Unittests.
Review URL: http://codereview.chromium.org/7278013
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@91571 0039d316-1c4b-4281-b951-d872f2087c98
21 files changed, 353 insertions, 421 deletions
diff --git a/remoting/host/chromoting_host.cc b/remoting/host/chromoting_host.cc index fcd4e92..f5f0fa9 100644 --- a/remoting/host/chromoting_host.cc +++ b/remoting/host/chromoting_host.cc @@ -160,13 +160,23 @@ void ChromotingHost::Shutdown(Task* shutdown_task) { clients_.clear(); // Stop chromotocol session manager. - if (session_manager_) { - session_manager_->Close( - NewRunnableMethod(this, &ChromotingHost::ShutdownSignaling)); - session_manager_ = NULL; - } else { - ShutdownSignaling(); + if (session_manager_.get()) { + session_manager_->Close(); + session_manager_.reset(); } + + // Stop XMPP connection. + if (signal_strategy_.get()) { + signal_strategy_->Close(); + signal_strategy_.reset(); + + for (StatusObserverList::iterator it = status_observers_.begin(); + it != status_observers_.end(); ++it) { + (*it)->OnSignallingDisconnected(); + } + } + + ShutdownRecorder(); } void ChromotingHost::AddStatusObserver(HostStatusObserver* observer) { @@ -230,8 +240,7 @@ void ChromotingHost::OnStateChange( // Create and start session manager. protocol::JingleSessionManager* server = - new protocol::JingleSessionManager(context_->network_message_loop(), - NULL, NULL, NULL); + new protocol::JingleSessionManager(NULL, NULL, NULL); // TODO(ajwong): Make this a command switch when we're more stable. server->set_allow_local_ips(true); @@ -244,7 +253,7 @@ void ChromotingHost::OnStateChange( NewCallback(this, &ChromotingHost::OnNewClientSession), key_pair.CopyPrivateKey(), key_pair.GenerateCertificate()); - session_manager_ = server; + session_manager_.reset(server); for (StatusObserverList::iterator it = status_observers_.begin(); it != status_observers_.end(); ++it) { @@ -630,25 +639,6 @@ void ChromotingHost::ContinueWindowTimerFunc() { ShowContinueWindow(true); } -void ChromotingHost::ShutdownSignaling() { - if (MessageLoop::current() != context_->network_message_loop()) { - context_->network_message_loop()->PostTask( - FROM_HERE, base::Bind(&ChromotingHost::ShutdownSignaling, this)); - return; - } - - if (signal_strategy_.get()) { - signal_strategy_->Close(); - signal_strategy_.reset(); - } - - for (StatusObserverList::iterator it = status_observers_.begin(); - it != status_observers_.end(); ++it) { - (*it)->OnSignallingDisconnected(); - } - ShutdownRecorder(); -} - void ChromotingHost::ShutdownRecorder() { if (MessageLoop::current() != context_->main_message_loop()) { context_->main_message_loop()->PostTask( diff --git a/remoting/host/chromoting_host.h b/remoting/host/chromoting_host.h index 760d895..50dac75 100644 --- a/remoting/host/chromoting_host.h +++ b/remoting/host/chromoting_host.h @@ -192,7 +192,6 @@ class ChromotingHost : public base::RefCountedThreadSafe<ChromotingHost>, void ContinueWindowTimerFunc(); // The following methods are called during shutdown. - void ShutdownSignaling(); void ShutdownRecorder(); void ShutdownFinish(); @@ -207,7 +206,7 @@ class ChromotingHost : public base::RefCountedThreadSafe<ChromotingHost>, std::string local_jid_; - scoped_refptr<protocol::SessionManager> session_manager_; + scoped_ptr<protocol::SessionManager> session_manager_; StatusObserverList status_observers_; diff --git a/remoting/host/chromoting_host_unittest.cc b/remoting/host/chromoting_host_unittest.cc index fbf0b8f..29a7f7b 100644 --- a/remoting/host/chromoting_host_unittest.cc +++ b/remoting/host/chromoting_host_unittest.cc @@ -112,8 +112,8 @@ class ChromotingHostTest : public testing::Test { &message_loop_, &handler_, &host_stub_, event_executor_); connection2_ = new MockConnectionToClient( &message_loop_, &handler_, &host_stub2_, &event_executor2_); - session_ = new MockSession(); - session2_ = new MockSession(); + session_.reset(new MockSession()); + session2_.reset(new MockSession()); session_config_.reset(SessionConfig::CreateDefault()); session_config2_.reset(SessionConfig::CreateDefault()); @@ -128,13 +128,13 @@ class ChromotingHostTest : public testing::Test { ON_CALL(*connection_.get(), client_stub()) .WillByDefault(Return(&client_stub_)); ON_CALL(*connection_.get(), session()) - .WillByDefault(Return(session_)); + .WillByDefault(Return(session_.get())); ON_CALL(*connection2_.get(), video_stub()) .WillByDefault(Return(&video_stub2_)); ON_CALL(*connection2_.get(), client_stub()) .WillByDefault(Return(&client_stub2_)); ON_CALL(*connection2_.get(), session()) - .WillByDefault(Return(session2_)); + .WillByDefault(Return(session2_.get())); ON_CALL(*session_.get(), config()) .WillByDefault(Return(session_config_.get())); ON_CALL(*session2_.get(), config()) @@ -212,13 +212,13 @@ class ChromotingHostTest : public testing::Test { MockChromotingHostContext context_; protocol::LocalLoginCredentials credentials_; scoped_refptr<MockConnectionToClient> connection_; - scoped_refptr<MockSession> session_; + scoped_ptr<MockSession> session_; scoped_ptr<SessionConfig> session_config_; MockVideoStub video_stub_; MockClientStub client_stub_; MockHostStub host_stub_; scoped_refptr<MockConnectionToClient> connection2_; - scoped_refptr<MockSession> session2_; + scoped_ptr<MockSession> session2_; scoped_ptr<SessionConfig> session_config2_; MockVideoStub video_stub2_; MockClientStub client_stub2_; diff --git a/remoting/protocol/connection_to_client.cc b/remoting/protocol/connection_to_client.cc index 16795b7..4810d27 100644 --- a/remoting/protocol/connection_to_client.cc +++ b/remoting/protocol/connection_to_client.cc @@ -37,15 +37,14 @@ ConnectionToClient::~ConnectionToClient() { } void ConnectionToClient::Init(protocol::Session* session) { - DCHECK_EQ(session->message_loop(), MessageLoop::current()); - - session_ = session; + DCHECK_EQ(loop_, MessageLoop::current()); + session_.reset(session); session_->SetStateChangeCallback( NewCallback(this, &ConnectionToClient::OnSessionStateChange)); } protocol::Session* ConnectionToClient::session() { - return session_; + return session_.get(); } void ConnectionToClient::Disconnect() { @@ -60,9 +59,9 @@ void ConnectionToClient::Disconnect() { CloseChannels(); // If there is a channel then close it and release the reference. - if (session_) { - session_->Close(NewRunnableMethod(this, &ConnectionToClient::OnClosed)); - session_ = NULL; + if (session_.get()) { + session_->Close(); + session_.reset(); } } @@ -99,7 +98,7 @@ void ConnectionToClient::OnSessionStateChange(protocol::Session::State state) { client_control_sender_.reset( new ClientControlSender(session_->control_channel())); video_writer_.reset(VideoWriter::Create(session_->config())); - video_writer_->Init(session_); + video_writer_->Init(session_.get()); dispatcher_.reset(new HostMessageDispatcher()); dispatcher_->Initialize(this, host_stub_, input_stub_); @@ -127,9 +126,5 @@ void ConnectionToClient::CloseChannels() { client_control_sender_->Close(); } -// OnClosed() is used as a callback for protocol::Session::Close(). -void ConnectionToClient::OnClosed() { -} - } // namespace protocol } // namespace remoting diff --git a/remoting/protocol/connection_to_client.h b/remoting/protocol/connection_to_client.h index 1201e78..296ce63 100644 --- a/remoting/protocol/connection_to_client.h +++ b/remoting/protocol/connection_to_client.h @@ -93,10 +93,8 @@ class ConnectionToClient : // Stops writing in the channels. void CloseChannels(); - void OnClosed(); - // The libjingle channel used to send and receive data from the remote client. - scoped_refptr<Session> session_; + scoped_ptr<Session> session_; scoped_ptr<VideoWriter> video_writer_; diff --git a/remoting/protocol/connection_to_client_unittest.cc b/remoting/protocol/connection_to_client_unittest.cc index 378ac3e..c5db404 100644 --- a/remoting/protocol/connection_to_client_unittest.cc +++ b/remoting/protocol/connection_to_client_unittest.cc @@ -43,7 +43,8 @@ class ConnectionToClientTest : public testing::Test { MockInputStub input_stub_; scoped_refptr<ConnectionToClient> viewer_; - scoped_refptr<protocol::FakeSession> session_; + // Owned by |viewer_|. + protocol::FakeSession* session_; private: DISALLOW_COPY_AND_ASSIGN(ConnectionToClientTest); @@ -57,14 +58,14 @@ TEST_F(ConnectionToClientTest, SendUpdateStream) { message_loop_.RunAllPending(); + // Verify that something has been written. + // TODO(sergeyu): Verify that the correct data has been written. + EXPECT_GT(session_->video_channel()->written_data().size(), 0u); + // And then close the connection to ConnectionToClient. viewer_->Disconnect(); message_loop_.RunAllPending(); - - // Verify that something has been written. - // TODO(sergeyu): Verify that the correct data has been written. - EXPECT_GT(session_->video_channel()->written_data().size(), 0u); } TEST_F(ConnectionToClientTest, NoWriteAfterDisconnect) { @@ -76,10 +77,10 @@ TEST_F(ConnectionToClientTest, NoWriteAfterDisconnect) { // And then close the connection to ConnectionToClient. viewer_->Disconnect(); + // The test will crash if data writer tries to write data to the + // channel socket. + // TODO(sergeyu): Use MockSession to verify that no data is written? message_loop_.RunAllPending(); - - // Nothing should be written because connection has been closed. - EXPECT_EQ(session_->video_channel()->written_data().size(), 0u); } TEST_F(ConnectionToClientTest, StateChange) { diff --git a/remoting/protocol/connection_to_host.cc b/remoting/protocol/connection_to_host.cc index ae9b9997..a632e40 100644 --- a/remoting/protocol/connection_to_host.cc +++ b/remoting/protocol/connection_to_host.cc @@ -86,13 +86,22 @@ void ConnectionToHost::Disconnect(const base::Closure& shutdown_task) { CloseChannels(); - if (session_) { - session_->Close( - NewRunnableMethod(this, &ConnectionToHost::OnDisconnected, - shutdown_task)); - } else { - OnDisconnected(shutdown_task); + if (session_.get()) { + session_->Close(); + session_.reset(); } + + if (session_manager_.get()) { + session_manager_->Close(); + session_manager_.reset(); + } + + if (signal_strategy_.get()) { + signal_strategy_->Close(); + signal_strategy_.reset(); + } + + shutdown_task.Run(); } void ConnectionToHost::InitSession() { @@ -100,7 +109,7 @@ void ConnectionToHost::InitSession() { // Initialize chromotocol |session_manager_|. JingleSessionManager* session_manager = - new JingleSessionManager(message_loop_, network_manager_.release(), + new JingleSessionManager(network_manager_.release(), socket_factory_.release(), port_allocator_session_factory_.release()); @@ -109,7 +118,7 @@ void ConnectionToHost::InitSession() { session_manager->Init( local_jid_, signal_strategy_.get(), NewCallback(this, &ConnectionToHost::OnNewSession), NULL, NULL); - session_manager_ = session_manager; + session_manager_.reset(session_manager); CandidateSessionConfig* candidate_config = CandidateSessionConfig::CreateDefault(); @@ -118,43 +127,15 @@ void ConnectionToHost::InitSession() { protocol::GenerateSupportAuthToken(local_jid_, access_code_); // Initialize |session_|. - session_ = session_manager_->Connect( + session_.reset(session_manager_->Connect( host_jid_, host_public_key_, client_token, candidate_config, - NewCallback(this, &ConnectionToHost::OnSessionStateChange)); -} - -void ConnectionToHost::OnDisconnected(const base::Closure& shutdown_task) { - DCHECK_EQ(message_loop_, MessageLoop::current()); - - session_ = NULL; - - if (session_manager_) { - session_manager_->Close( - NewRunnableMethod(this, &ConnectionToHost::OnServerClosed, - shutdown_task)); - } else { - OnServerClosed(shutdown_task); - } -} - -void ConnectionToHost::OnServerClosed(const base::Closure& shutdown_task) { - DCHECK_EQ(message_loop_, MessageLoop::current()); - - session_manager_ = NULL; - - if (signal_strategy_.get()) { - signal_strategy_->Close(); - signal_strategy_.reset(); - } - - shutdown_task.Run(); + NewCallback(this, &ConnectionToHost::OnSessionStateChange))); } const SessionConfig* ConnectionToHost::config() { return session_->config(); } - void ConnectionToHost::OnStateChange( SignalStrategy::StatusObserver::State state) { DCHECK_EQ(message_loop_, MessageLoop::current()); @@ -203,7 +184,7 @@ void ConnectionToHost::OnSessionStateChange( state_ = STATE_CONNECTED; // Initialize reader and writer. video_reader_.reset(VideoReader::Create(session_->config())); - video_reader_->Init(session_, video_stub_); + video_reader_->Init(session_.get(), video_stub_); host_control_sender_.reset( new HostControlSender(session_->control_channel())); dispatcher_->Initialize(session_.get(), client_stub_); diff --git a/remoting/protocol/connection_to_host.h b/remoting/protocol/connection_to_host.h index a943ff6..e3d8c5c 100644 --- a/remoting/protocol/connection_to_host.h +++ b/remoting/protocol/connection_to_host.h @@ -125,11 +125,6 @@ class ConnectionToHost : public SignalStrategy::StatusObserver { // Stops writing in the channels. void CloseChannels(); - // Used by Disconnect() to disconnect chromoting connection, stop chromoting - // server, and then disconnect XMPP connection. - void OnDisconnected(const base::Closure& shutdown_task); - void OnServerClosed(const base::Closure& shutdown_task); - // Internal state of the connection. State state_; @@ -141,8 +136,8 @@ class ConnectionToHost : public SignalStrategy::StatusObserver { scoped_ptr<SignalStrategy> signal_strategy_; std::string local_jid_; - scoped_refptr<SessionManager> session_manager_; - scoped_refptr<Session> session_; + scoped_ptr<SessionManager> session_manager_; + scoped_ptr<Session> session_; scoped_ptr<VideoReader> video_reader_; diff --git a/remoting/protocol/fake_session.cc b/remoting/protocol/fake_session.cc index fe50f2b..c11fa34 100644 --- a/remoting/protocol/fake_session.cc +++ b/remoting/protocol/fake_session.cc @@ -164,10 +164,6 @@ const std::string& FakeSession::jid() { return jid_; } -MessageLoop* FakeSession::message_loop() { - return message_loop_; -} - const CandidateSessionConfig* FakeSession::candidate_config() { return candidate_config_.get(); } @@ -197,10 +193,8 @@ void FakeSession::set_receiver_token(const std::string& receiver_token) { receiver_token_ = receiver_token; } -void FakeSession::Close(Task* closed_task) { +void FakeSession::Close() { closed_ = true; - closed_task->Run(); - delete closed_task; } } // namespace protocol diff --git a/remoting/protocol/fake_session.h b/remoting/protocol/fake_session.h index b5c69e0..1427bf3 100644 --- a/remoting/protocol/fake_session.h +++ b/remoting/protocol/fake_session.h @@ -114,7 +114,6 @@ class FakeSession : public Session { virtual const std::string& jid(); - virtual MessageLoop* message_loop(); virtual const CandidateSessionConfig* candidate_config(); virtual const SessionConfig* config(); virtual void set_config(const SessionConfig* config); @@ -124,7 +123,7 @@ class FakeSession : public Session { virtual const std::string& receiver_token(); virtual void set_receiver_token(const std::string& receiver_token); - virtual void Close(Task* closed_task); + virtual void Close(); public: scoped_ptr<StateChangeCallback> callback_; diff --git a/remoting/protocol/jingle_session.cc b/remoting/protocol/jingle_session.cc index a045e30..f97a73a 100644 --- a/remoting/protocol/jingle_session.cc +++ b/remoting/protocol/jingle_session.cc @@ -146,7 +146,8 @@ JingleSession::JingleSession( ALLOW_THIS_IN_INITIALIZER_LIST(connect_callback_( this, &JingleSession::OnConnect)), ALLOW_THIS_IN_INITIALIZER_LIST(ssl_connect_callback_( - this, &JingleSession::OnSSLConnect)) { + this, &JingleSession::OnSSLConnect)), + ALLOW_THIS_IN_INITIALIZER_LIST(task_factory_(this)) { // TODO(hclam): Need a better way to clone a key. if (local_private_key) { std::vector<uint8> key_bytes; @@ -159,10 +160,11 @@ JingleSession::JingleSession( JingleSession::~JingleSession() { DCHECK(closed_); + jingle_session_manager_->SessionDestroyed(this); } void JingleSession::Init(cricket::Session* cricket_session) { - DCHECK_EQ(jingle_session_manager_->message_loop(), MessageLoop::current()); + DCHECK(CalledOnValidThread()); cricket_session_ = cricket_session; jid_ = cricket_session_->remote_name(); @@ -174,11 +176,12 @@ void JingleSession::Init(cricket::Session* cricket_session) { } std::string JingleSession::GetEncryptedMasterKey() const { + DCHECK(CalledOnValidThread()); return EncryptMasterKey(peer_public_key_, master_key_); } void JingleSession::CloseInternal(int result, bool failed) { - DCHECK_EQ(jingle_session_manager_->message_loop(), MessageLoop::current()); + DCHECK(CalledOnValidThread()); if (!closed_ && !closing_) { closing_ = true; @@ -223,11 +226,12 @@ void JingleSession::CloseInternal(int result, bool failed) { } bool JingleSession::HasSession(cricket::Session* cricket_session) { + DCHECK(CalledOnValidThread()); return cricket_session_ == cricket_session; } cricket::Session* JingleSession::ReleaseSession() { - DCHECK_EQ(jingle_session_manager_->message_loop(), MessageLoop::current()); + DCHECK(CalledOnValidThread()); // Session may be destroyed only after it is closed. DCHECK(closed_); @@ -240,107 +244,105 @@ cricket::Session* JingleSession::ReleaseSession() { } void JingleSession::SetStateChangeCallback(StateChangeCallback* callback) { - DCHECK_EQ(jingle_session_manager_->message_loop(), MessageLoop::current()); + DCHECK(CalledOnValidThread()); DCHECK(callback); state_change_callback_.reset(callback); } net::Socket* JingleSession::control_channel() { - DCHECK_EQ(jingle_session_manager_->message_loop(), MessageLoop::current()); + DCHECK(CalledOnValidThread()); return control_socket_wrapper_.get(); } net::Socket* JingleSession::event_channel() { - DCHECK_EQ(jingle_session_manager_->message_loop(), MessageLoop::current()); + DCHECK(CalledOnValidThread()); return event_socket_wrapper_.get(); } // TODO(sergeyu): Remove this method after we switch to RTP. net::Socket* JingleSession::video_channel() { - DCHECK_EQ(jingle_session_manager_->message_loop(), MessageLoop::current()); + DCHECK(CalledOnValidThread()); return video_socket_wrapper_.get(); } net::Socket* JingleSession::video_rtp_channel() { - DCHECK_EQ(jingle_session_manager_->message_loop(), MessageLoop::current()); + DCHECK(CalledOnValidThread()); return video_rtp_channel_.get(); } net::Socket* JingleSession::video_rtcp_channel() { - DCHECK_EQ(jingle_session_manager_->message_loop(), MessageLoop::current()); + DCHECK(CalledOnValidThread()); return video_rtcp_channel_.get(); } const std::string& JingleSession::jid() { + DCHECK(CalledOnValidThread()); // No synchronization is needed because jid_ is not changed // after new connection is passed to JingleChromotocolServer callback. return jid_; } -MessageLoop* JingleSession::message_loop() { - return jingle_session_manager_->message_loop(); -} - const CandidateSessionConfig* JingleSession::candidate_config() { + DCHECK(CalledOnValidThread()); DCHECK(candidate_config_.get()); return candidate_config_.get(); } void JingleSession::set_candidate_config( const CandidateSessionConfig* candidate_config) { + DCHECK(CalledOnValidThread()); DCHECK(!candidate_config_.get()); DCHECK(candidate_config); candidate_config_.reset(candidate_config); } scoped_refptr<net::X509Certificate> JingleSession::local_certificate() const { + DCHECK(CalledOnValidThread()); return local_cert_; } const SessionConfig* JingleSession::config() { + DCHECK(CalledOnValidThread()); DCHECK(config_.get()); return config_.get(); } void JingleSession::set_config(const SessionConfig* config) { + DCHECK(CalledOnValidThread()); DCHECK(!config_.get()); DCHECK(config); config_.reset(config); } const std::string& JingleSession::initiator_token() { + DCHECK(CalledOnValidThread()); return initiator_token_; } void JingleSession::set_initiator_token(const std::string& initiator_token) { + DCHECK(CalledOnValidThread()); initiator_token_ = initiator_token; } const std::string& JingleSession::receiver_token() { + DCHECK(CalledOnValidThread()); return receiver_token_; } void JingleSession::set_receiver_token(const std::string& receiver_token) { + DCHECK(CalledOnValidThread()); receiver_token_ = receiver_token; } -void JingleSession::Close(Task* closed_task) { - if (MessageLoop::current() != jingle_session_manager_->message_loop()) { - jingle_session_manager_->message_loop()->PostTask( - FROM_HERE, NewRunnableMethod(this, &JingleSession::Close, closed_task)); - return; - } +void JingleSession::Close() { + DCHECK(CalledOnValidThread()); CloseInternal(net::ERR_CONNECTION_CLOSED, false); - - if (closed_task) { - closed_task->Run(); - delete closed_task; - } } void JingleSession::OnSessionState( BaseSession* session, BaseSession::State state) { + DCHECK(CalledOnValidThread()); DCHECK_EQ(cricket_session_, session); if (closed_) { @@ -379,6 +381,7 @@ void JingleSession::OnSessionState( void JingleSession::OnSessionError( BaseSession* session, BaseSession::Error error) { + DCHECK(CalledOnValidThread()); DCHECK_EQ(cricket_session_, session); if (error != cricket::Session::ERROR_NONE) { @@ -387,6 +390,7 @@ void JingleSession::OnSessionError( } void JingleSession::OnInitiate() { + DCHECK(CalledOnValidThread()); jid_ = cricket_session_->remote_name(); const cricket::SessionDescription* session_description; @@ -561,6 +565,8 @@ void JingleSession::InitializeChannels() { } void JingleSession::OnAccept() { + DCHECK(CalledOnValidThread()); + // If we initiated the session, store the candidate configuration that the // host responded with, to refer to later. if (cricket_session_->initiator()) { @@ -578,15 +584,18 @@ void JingleSession::OnAccept() { // P2P channel). By posting a task here we can call it at the right // moment. This problem will go away when we switch to Pepper P2P // API. - jingle_session_manager_->message_loop()->PostTask( - FROM_HERE, NewRunnableMethod(this, &JingleSession::InitializeChannels)); + MessageLoop::current()->PostTask(FROM_HERE, task_factory_.NewRunnableMethod( + &JingleSession::InitializeChannels)); } void JingleSession::OnTerminate() { + DCHECK(CalledOnValidThread()); CloseInternal(net::ERR_CONNECTION_ABORTED, false); } void JingleSession::OnConnect(int result) { + DCHECK(CalledOnValidThread()); + if (result != net::OK) { LOG(ERROR) << "PseudoTCP connection failed: " << result; CloseInternal(result, true); @@ -620,6 +629,8 @@ void JingleSession::OnConnect(int result) { } void JingleSession::OnSSLConnect(int result) { + DCHECK(CalledOnValidThread()); + DCHECK(!closed_); if (result != net::OK) { LOG(ERROR) << "Error during SSL connection: " << result; @@ -648,6 +659,8 @@ void JingleSession::OnSSLConnect(int result) { } void JingleSession::SetState(State new_state) { + DCHECK(CalledOnValidThread()); + if (new_state != state_) { DCHECK_NE(state_, CLOSED); DCHECK_NE(state_, FAILED); diff --git a/remoting/protocol/jingle_session.h b/remoting/protocol/jingle_session.h index 0abe34e..b05fcf6 100644 --- a/remoting/protocol/jingle_session.h +++ b/remoting/protocol/jingle_session.h @@ -6,6 +6,7 @@ #define REMOTING_PROTOCOL_JINGLE_SESSION_H_ #include "base/memory/ref_counted.h" +#include "base/task.h" #include "crypto/rsa_private_key.h" #include "net/base/completion_callback.h" #include "remoting/protocol/session.h" @@ -52,7 +53,6 @@ class JingleSession : public protocol::Session, virtual net::Socket* video_rtcp_channel(); virtual const std::string& jid(); - virtual MessageLoop* message_loop(); virtual const CandidateSessionConfig* candidate_config(); @@ -64,7 +64,7 @@ class JingleSession : public protocol::Session, virtual const std::string& receiver_token(); virtual void set_receiver_token(const std::string& receiver_token); - virtual void Close(Task* closed_task); + virtual void Close(); private: friend class JingleSessionManager; @@ -138,8 +138,9 @@ class JingleSession : public protocol::Session, void SetState(State new_state); - // JingleSessionManager that created this session. - scoped_refptr<JingleSessionManager> jingle_session_manager_; + // JingleSessionManager that created this session. Guaranteed to + // exist throughout the lifetime of the session. + JingleSessionManager* jingle_session_manager_; // Certificates used for connection. Currently only receiving side // has a certificate. @@ -219,6 +220,8 @@ class JingleSession : public protocol::Session, net::CompletionCallbackImpl<JingleSession> connect_callback_; net::CompletionCallbackImpl<JingleSession> ssl_connect_callback_; + ScopedRunnableMethodFactory<JingleSession> task_factory_; + DISALLOW_COPY_AND_ASSIGN(JingleSession); }; diff --git a/remoting/protocol/jingle_session_manager.cc b/remoting/protocol/jingle_session_manager.cc index 76ad55b..93ae125 100644 --- a/remoting/protocol/jingle_session_manager.cc +++ b/remoting/protocol/jingle_session_manager.cc @@ -8,6 +8,7 @@ #include "base/message_loop.h" #include "base/string_util.h" +#include "base/task.h" #include "remoting/base/constants.h" #include "remoting/jingle_glue/http_port_allocator.h" #include "remoting/jingle_glue/jingle_info_request.h" @@ -26,20 +27,18 @@ namespace remoting { namespace protocol { JingleSessionManager::JingleSessionManager( - MessageLoop* message_loop, talk_base::NetworkManager* network_manager, talk_base::PacketSocketFactory* socket_factory, PortAllocatorSessionFactory* port_allocator_session_factory) - : message_loop_(message_loop), - network_manager_(network_manager), + : network_manager_(network_manager), socket_factory_(socket_factory), port_allocator_session_factory_(port_allocator_session_factory), signal_strategy_(NULL), enable_nat_traversing_(false), allow_local_ips_(false), http_port_allocator_(NULL), - closed_(false) { - DCHECK(message_loop); + closed_(false), + ALLOW_THIS_IN_INITIALIZER_LIST(task_factory_(this)) { } JingleSessionManager::~JingleSessionManager() { @@ -52,14 +51,7 @@ void JingleSessionManager::Init( IncomingSessionCallback* incoming_session_callback, crypto::RSAPrivateKey* private_key, scoped_refptr<net::X509Certificate> certificate) { - if (MessageLoop::current() != message_loop_) { - message_loop_->PostTask( - FROM_HERE, NewRunnableMethod( - this, &JingleSessionManager::Init, - local_jid, signal_strategy, incoming_session_callback, - private_key, certificate)); - return; - } + DCHECK(CalledOnValidThread()); DCHECK(signal_strategy); DCHECK(incoming_session_callback); @@ -91,7 +83,8 @@ void JingleSessionManager::Init( jingle_info_request_->SetCallback( NewCallback(this, &JingleSessionManager::OnJingleInfo)); jingle_info_request_->Run( - NewRunnableMethod(this, &JingleSessionManager::DoStartSessionManager)); + task_factory_.NewRunnableMethod( + &JingleSessionManager::DoStartSessionManager)); } else { port_allocator_.reset( new cricket::BasicPortAllocator( @@ -102,62 +95,53 @@ void JingleSessionManager::Init( } } -void JingleSessionManager::Close(Task* closed_task) { - if (MessageLoop::current() != message_loop_) { - message_loop_->PostTask( - FROM_HERE, NewRunnableMethod(this, &JingleSessionManager::Close, - closed_task)); - return; - } +void JingleSessionManager::Close() { + DCHECK(CalledOnValidThread()); + + // Close() can be called only after all sessions are destroyed. + DCHECK(sessions_.empty()); if (!closed_) { - // Close all connections. cricket_session_manager_->RemoveClient(kChromotingXmlNamespace); - while (!sessions_.empty()) { - cricket::Session* session = sessions_.front()->ReleaseSession(); - cricket_session_manager_->DestroySession(session); - sessions_.pop_front(); - } jingle_signaling_connector_.reset(); cricket_session_manager_.reset(); closed_ = true; } - - closed_task->Run(); - delete closed_task; } void JingleSessionManager::set_allow_local_ips(bool allow_local_ips) { allow_local_ips_ = allow_local_ips; } -scoped_refptr<protocol::Session> JingleSessionManager::Connect( +Session* JingleSessionManager::Connect( const std::string& host_jid, const std::string& host_public_key, const std::string& receiver_token, CandidateSessionConfig* candidate_config, - protocol::Session::StateChangeCallback* state_change_callback) { + Session::StateChangeCallback* state_change_callback) { + DCHECK(CalledOnValidThread()); + // Can be called from any thread. - scoped_refptr<JingleSession> jingle_session( - JingleSession::CreateClientSession(this, host_public_key)); + JingleSession* jingle_session = + JingleSession::CreateClientSession(this, host_public_key); jingle_session->set_candidate_config(candidate_config); jingle_session->set_receiver_token(receiver_token); - message_loop_->PostTask( - FROM_HERE, NewRunnableMethod(this, &JingleSessionManager::DoConnect, - jingle_session, host_jid, - host_public_key, receiver_token, - state_change_callback)); + // TODO(sergeyu): Does this need to be asynchronous? + MessageLoop::current()->PostTask(FROM_HERE, task_factory_.NewRunnableMethod( + &JingleSessionManager::DoConnect, jingle_session, host_jid, + host_public_key, receiver_token, state_change_callback)); return jingle_session; } void JingleSessionManager::DoConnect( - scoped_refptr<JingleSession> jingle_session, + JingleSession* jingle_session, const std::string& host_jid, const std::string& host_public_key, const std::string& receiver_token, - protocol::Session::StateChangeCallback* state_change_callback) { - DCHECK_EQ(message_loop_, MessageLoop::current()); + Session::StateChangeCallback* state_change_callback) { + DCHECK(CalledOnValidThread()); + cricket::Session* cricket_session = cricket_session_manager_->CreateSession( local_jid_, kChromotingXmlNamespace); @@ -171,13 +155,9 @@ void JingleSessionManager::DoConnect( jingle_session->GetEncryptedMasterKey())); } -MessageLoop* JingleSessionManager::message_loop() { - return message_loop_; -} - void JingleSessionManager::OnSessionCreate( cricket::Session* cricket_session, bool incoming) { - DCHECK_EQ(message_loop_, MessageLoop::current()); + DCHECK(CalledOnValidThread()); // Allow local connections if neccessary. cricket_session->set_allow_local_ips(allow_local_ips_); @@ -189,19 +169,18 @@ void JingleSessionManager::OnSessionCreate( JingleSession* jingle_session = JingleSession::CreateServerSession( this, certificate_, private_key_.get()); - sessions_.push_back(make_scoped_refptr(jingle_session)); + sessions_.push_back(jingle_session); jingle_session->Init(cricket_session); } } void JingleSessionManager::OnSessionDestroy(cricket::Session* cricket_session) { - DCHECK_EQ(message_loop_, MessageLoop::current()); + DCHECK(CalledOnValidThread()); - std::list<scoped_refptr<JingleSession> >::iterator it; + std::list<JingleSession*>::iterator it; for (it = sessions_.begin(); it != sessions_.end(); ++it) { if ((*it)->HasSession(cricket_session)) { (*it)->ReleaseSession(); - sessions_.erase(it); return; } } @@ -210,7 +189,7 @@ void JingleSessionManager::OnSessionDestroy(cricket::Session* cricket_session) { void JingleSessionManager::AcceptConnection( JingleSession* jingle_session, cricket::Session* cricket_session) { - DCHECK_EQ(message_loop_, MessageLoop::current()); + DCHECK(CalledOnValidThread()); // Reject connection if we are closed. if (closed_) { @@ -251,11 +230,17 @@ void JingleSessionManager::AcceptConnection( case protocol::SessionManager::INCOMPATIBLE: { cricket_session->Reject(cricket::STR_TERMINATE_INCOMPATIBLE_PARAMETERS); + jingle_session->ReleaseSession(); + jingle_session->Close(); + delete jingle_session; break; } case protocol::SessionManager::DECLINE: { cricket_session->Reject(cricket::STR_TERMINATE_DECLINE); + jingle_session->ReleaseSession(); + jingle_session->Close(); + delete jingle_session; break; } @@ -265,20 +250,21 @@ void JingleSessionManager::AcceptConnection( } } -// Parse content description generated by WriteContent(). -bool JingleSessionManager::ParseContent( - cricket::SignalingProtocol protocol, - const XmlElement* element, - const cricket::ContentDescription** content, - cricket::ParseError* error) { - *content = ContentDescription::ParseXml(element); - return *content != NULL; +void JingleSessionManager::SessionDestroyed(JingleSession* jingle_session) { + std::list<JingleSession*>::iterator it = + std::find(sessions_.begin(), sessions_.end(), jingle_session); + CHECK(it != sessions_.end()); + cricket::Session* cricket_session = jingle_session->ReleaseSession(); + cricket_session_manager_->DestroySession(cricket_session); + sessions_.erase(it); } void JingleSessionManager::OnJingleInfo( const std::string& token, const std::vector<std::string>& relay_hosts, const std::vector<talk_base::SocketAddress>& stun_hosts) { + DCHECK(CalledOnValidThread()); + if (port_allocator_.get()) { // TODO(ajwong): Avoid string processing if log-level is low. std::string stun_servers; @@ -297,6 +283,8 @@ void JingleSessionManager::OnJingleInfo( } void JingleSessionManager::DoStartSessionManager() { + DCHECK(CalledOnValidThread()); + cricket_session_manager_.reset( new cricket::SessionManager(port_allocator_.get())); cricket_session_manager_->AddClient(kChromotingXmlNamespace, this); @@ -305,6 +293,16 @@ void JingleSessionManager::DoStartSessionManager() { signal_strategy_, cricket_session_manager_.get())); } +// Parse content description generated by WriteContent(). +bool JingleSessionManager::ParseContent( + cricket::SignalingProtocol protocol, + const XmlElement* element, + const cricket::ContentDescription** content, + cricket::ParseError* error) { + *content = ContentDescription::ParseXml(element); + return *content != NULL; +} + bool JingleSessionManager::WriteContent( cricket::SignalingProtocol protocol, const cricket::ContentDescription* content, @@ -317,6 +315,7 @@ bool JingleSessionManager::WriteContent( return true; } +// static cricket::SessionDescription* JingleSessionManager::CreateClientSessionDescription( const CandidateSessionConfig* config, @@ -329,6 +328,7 @@ JingleSessionManager::CreateClientSessionDescription( return desc; } +// static cricket::SessionDescription* JingleSessionManager::CreateHostSessionDescription( const CandidateSessionConfig* config, scoped_refptr<net::X509Certificate> certificate) { diff --git a/remoting/protocol/jingle_session_manager.h b/remoting/protocol/jingle_session_manager.h index 994fb69..7dcc703 100644 --- a/remoting/protocol/jingle_session_manager.h +++ b/remoting/protocol/jingle_session_manager.h @@ -16,8 +16,6 @@ #include "third_party/libjingle/source/talk/p2p/base/session.h" #include "third_party/libjingle/source/talk/p2p/base/sessionclient.h" -class MessageLoop; - namespace cricket { class PortAllocator; class SessionManager; @@ -36,14 +34,14 @@ namespace protocol { // server that accepts chromoting connections and can also make new connections // to other hosts. class JingleSessionManager - : public protocol::SessionManager, + : public SessionManager, public cricket::SessionClient { public: JingleSessionManager( - MessageLoop* message_loop, talk_base::NetworkManager* network_manager, talk_base::PacketSocketFactory* socket_factory, PortAllocatorSessionFactory* port_allocator_session_factory); + virtual ~JingleSessionManager(); // SessionManager interface. virtual void Init(const std::string& local_jid, @@ -51,49 +49,46 @@ class JingleSessionManager IncomingSessionCallback* incoming_session_callback, crypto::RSAPrivateKey* private_key, scoped_refptr<net::X509Certificate> certificate) OVERRIDE; - virtual scoped_refptr<protocol::Session> Connect( + virtual Session* Connect( const std::string& host_jid, const std::string& host_public_key, const std::string& client_token, CandidateSessionConfig* config, - protocol::Session::StateChangeCallback* state_change_callback) OVERRIDE; - virtual void Close(Task* closed_task) OVERRIDE; + Session::StateChangeCallback* state_change_callback) OVERRIDE; + virtual void Close() OVERRIDE; void set_allow_local_ips(bool allow_local_ips); // cricket::SessionClient interface. virtual void OnSessionCreate(cricket::Session* cricket_session, - bool received_initiate); - virtual void OnSessionDestroy(cricket::Session* cricket_session); + bool received_initiate) OVERRIDE; + virtual void OnSessionDestroy(cricket::Session* cricket_session) OVERRIDE; virtual bool ParseContent(cricket::SignalingProtocol protocol, const buzz::XmlElement* elem, const cricket::ContentDescription** content, - cricket::ParseError* error); + cricket::ParseError* error) OVERRIDE; virtual bool WriteContent(cricket::SignalingProtocol protocol, const cricket::ContentDescription* content, buzz::XmlElement** elem, - cricket::WriteError* error); - - protected: - virtual ~JingleSessionManager(); + cricket::WriteError* error) OVERRIDE; private: friend class JingleSession; - // Message loop that corresponds to the network thread. - MessageLoop* message_loop(); - - // Called by JingleChromotocolConnection when a new connection is initiated. + // Called by JingleSession when a new connection is initiated. void AcceptConnection(JingleSession* jingle_session, cricket::Session* cricket_session); + // Called by JingleSession when it is being destroyed. + void SessionDestroyed(JingleSession* jingle_session); + void DoConnect( - scoped_refptr<JingleSession> jingle_session, + JingleSession* jingle_session, const std::string& host_jid, const std::string& host_public_key, const std::string& client_token, - protocol::Session::StateChangeCallback* state_change_callback); + Session::StateChangeCallback* state_change_callback); // Callback for JingleInfoRequest. void OnJingleInfo( @@ -105,16 +100,15 @@ class JingleSessionManager void DoStartSessionManager(); // Creates session description for outgoing session. - cricket::SessionDescription* CreateClientSessionDescription( + static cricket::SessionDescription* CreateClientSessionDescription( const CandidateSessionConfig* candidate_config, const std::string& auth_token, const std::string& master_key); // Creates session description for incoming session. - cricket::SessionDescription* CreateHostSessionDescription( + static cricket::SessionDescription* CreateHostSessionDescription( const CandidateSessionConfig* candidate_config, scoped_refptr<net::X509Certificate> certificate); - MessageLoop* message_loop_; scoped_ptr<talk_base::NetworkManager> network_manager_; scoped_ptr<talk_base::PacketSocketFactory> socket_factory_; scoped_ptr<PortAllocatorSessionFactory> port_allocator_session_factory_; @@ -140,7 +134,9 @@ class JingleSessionManager bool closed_; - std::list<scoped_refptr<JingleSession> > sessions_; + std::list<JingleSession*> sessions_; + + ScopedRunnableMethodFactory<JingleSessionManager> task_factory_; DISALLOW_COPY_AND_ASSIGN(JingleSessionManager); }; diff --git a/remoting/protocol/jingle_session_unittest.cc b/remoting/protocol/jingle_session_unittest.cc index 03c5be4..0097e36 100644 --- a/remoting/protocol/jingle_session_unittest.cc +++ b/remoting/protocol/jingle_session_unittest.cc @@ -2,11 +2,11 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include "base/bind.h" #include "base/file_path.h" #include "base/file_util.h" #include "base/path_service.h" #include "base/time.h" -#include "base/synchronization/waitable_event.h" #include "base/test/test_timeouts.h" #include "crypto/nss_util.h" #include "net/base/completion_callback.h" @@ -22,6 +22,7 @@ #include "third_party/libjingle/source/talk/p2p/client/basicportallocator.h" using testing::_; +using testing::DeleteArg; using testing::DoAll; using testing::Invoke; using testing::InvokeWithoutArgs; @@ -42,6 +43,7 @@ namespace remoting { namespace protocol { namespace { + // Send 100 messages 1024 bytes each. UDP messages are sent with 10ms delay // between messages (about 1 second for 100 messages). const int kMessageSize = 1024; @@ -60,7 +62,30 @@ const char kTestHostPublicKey[] = "iAvjrvkNDlfiEk7tiY7YzD9zTi3146GX6KLz5GQAd/3I8I5QW3ftF1s/m93AHuc383GZ" "A78Oi+IbcJf/jJUZO119VNnRKGiPsf5GZIoHyXX8O5OUQk5soKdQPeK1FwWkeZu6fuXl" "QoU12I6podD6xMFa/PA/xefMwcpmuWTRhcso9bp10zVFGQIDAQAB"; -} // namespace + +void QuitCurrentThread() { + MessageLoop::current()->PostTask(FROM_HERE, new MessageLoop::QuitTask()); +} + +void OnTimeoutTerminateThread(bool* timeout) { + *timeout = true; + QuitCurrentThread(); +} + +bool RunMessageLoopWithTimeout(int timeout_ms) { + bool timeout = false; + MessageLoop::current()->PostDelayedTask( + FROM_HERE, base::Bind(OnTimeoutTerminateThread, &timeout), timeout_ms); + MessageLoop::current()->Run(); + return !timeout; +} + +ACTION_P(QuitThreadOnCounter, counter) { + (*counter)--; + EXPECT_GE(*counter, 0); + if (*counter == 0) + QuitCurrentThread(); +} class MockSessionManagerCallback { public: @@ -74,12 +99,18 @@ class MockSessionCallback { MOCK_METHOD1(OnStateChange, void(Session::State)); }; +} // namespace + class JingleSessionTest : public testing::Test { public: + JingleSessionTest() + : message_loop_(talk_base::Thread::Current()) { + } + // Helper method to copy to set value of client_connection_. void SetHostSession(Session* session) { DCHECK(session); - host_session_ = session; + host_session_.reset(session); host_session_->SetStateChangeCallback( NewCallback(&host_connection_callback_, &MockSessionCallback::OnStateChange)); @@ -89,30 +120,27 @@ class JingleSessionTest : public testing::Test { protected: virtual void SetUp() { - thread_.Start(); } virtual void TearDown() { CloseSessions(); - thread_.message_loop()->PostTask( - FROM_HERE, NewRunnableMethod( - this, &JingleSessionTest::DoCloseSessionManager)); - thread_.Stop(); + CloseSessionManager(); } void CreateServerPair() { // Sessions must be initialized on the jingle thread. - thread_.message_loop()->PostTask( - FROM_HERE, NewRunnableMethod( - this, &JingleSessionTest::DoCreateServerPair)); - SyncWithJingleThread(); + DoCreateServerPair(); } void CloseSessions() { - thread_.message_loop()->PostTask( - FROM_HERE, NewRunnableMethod( - this, &JingleSessionTest::DoCloseSessions)); - SyncWithJingleThread(); + if (host_session_.get()) { + host_session_->Close(); + host_session_.reset(); + } + if (client_session_.get()) { + client_session_->Close(); + client_session_.reset(); + } } void DoCreateServerPair() { @@ -146,8 +174,7 @@ class JingleSessionTest : public testing::Test { FakeSignalStrategy::Connect(host_signal_strategy_.get(), client_signal_strategy_.get()); - host_server_ = - new JingleSessionManager(thread_.message_loop(), NULL, NULL, NULL); + host_server_.reset(new JingleSessionManager(NULL, NULL, NULL)); host_server_->set_allow_local_ips(true); host_server_->Init( kHostJid, host_signal_strategy_.get(), @@ -156,8 +183,7 @@ class JingleSessionTest : public testing::Test { private_key.release(), cert); - client_server_ = - new JingleSessionManager(thread_.message_loop(), NULL, NULL, NULL); + client_server_.reset(new JingleSessionManager(NULL, NULL, NULL)); client_server_->set_allow_local_ips(true); client_server_->Init( kClientJid, client_signal_strategy_.get(), @@ -166,31 +192,22 @@ class JingleSessionTest : public testing::Test { NULL, NULL); } - void DoCloseSessions() { - if (host_session_) { - host_session_->Close(NewRunnableFunction( - &JingleSessionTest::DoNothing)); + void CloseSessionManager() { + if (host_server_.get()) { + host_server_->Close(); + host_server_.reset(); } - if (client_session_) { - client_session_->Close(NewRunnableFunction( - &JingleSessionTest::DoNothing)); - } - } - - void DoCloseSessionManager() { - if (host_server_) { - host_server_->Close(NewRunnableFunction(&JingleSessionTest::DoNothing)); - host_server_ = NULL; - } - if (client_server_) { - client_server_->Close(NewRunnableFunction(&JingleSessionTest::DoNothing)); - client_server_ = NULL; + if (client_server_.get()) { + client_server_->Close(); + client_server_.reset(); } host_signal_strategy_.reset(); client_signal_strategy_.reset(); } bool InitiateConnection() { + int not_connected_peers = 2; + EXPECT_CALL(host_server_callback_, OnIncomingSession(_, _)) .WillOnce(DoAll( WithArg<0>(Invoke( @@ -204,8 +221,7 @@ class JingleSessionTest : public testing::Test { EXPECT_CALL(host_connection_callback_, OnStateChange(Session::CONNECTED)) .Times(1) - .WillOnce(InvokeWithoutArgs(&host_connected_event, - &base::WaitableEvent::Signal)); + .WillOnce(QuitThreadOnCounter(¬_connected_peers)); // Expect that the connection will be closed eventually. EXPECT_CALL(host_connection_callback_, OnStateChange(Session::CLOSED)) @@ -218,50 +234,36 @@ class JingleSessionTest : public testing::Test { EXPECT_CALL(client_connection_callback_, OnStateChange(Session::CONNECTED)) .Times(1) - .WillOnce(InvokeWithoutArgs(&client_connected_event, - &base::WaitableEvent::Signal)); + .WillOnce(QuitThreadOnCounter(¬_connected_peers)); // Expect that the connection will be closed eventually. EXPECT_CALL(client_connection_callback_, OnStateChange(Session::CLOSED)) .Times(1); - client_session_ = client_server_->Connect( + client_session_.reset(client_server_->Connect( kHostJid, kTestHostPublicKey, kTestToken, CandidateSessionConfig::CreateDefault(), NewCallback(&client_connection_callback_, - &MockSessionCallback::OnStateChange)); - - return host_connected_event.TimedWait(base::TimeDelta::FromMilliseconds( - TestTimeouts::action_max_timeout_ms())) && - client_connected_event.TimedWait(base::TimeDelta::FromMilliseconds( - TestTimeouts::action_max_timeout_ms())); - } + &MockSessionCallback::OnStateChange))); - static void SignalEvent(base::WaitableEvent* event) { - event->Signal(); + return RunMessageLoopWithTimeout(TestTimeouts::action_max_timeout_ms()); } static void DoNothing() { } - void SyncWithJingleThread() { - base::WaitableEvent event(true, false); - thread_.message_loop()->PostTask( - FROM_HERE, NewRunnableFunction(&SignalEvent, &event)); - event.Wait(); - } + JingleThreadMessageLoop message_loop_; - JingleThread thread_; scoped_ptr<FakeSignalStrategy> host_signal_strategy_; scoped_ptr<FakeSignalStrategy> client_signal_strategy_; - scoped_refptr<JingleSessionManager> host_server_; + scoped_ptr<JingleSessionManager> host_server_; MockSessionManagerCallback host_server_callback_; - scoped_refptr<JingleSessionManager> client_server_; + scoped_ptr<JingleSessionManager> client_server_; MockSessionManagerCallback client_server_callback_; - scoped_refptr<Session> host_session_; + scoped_ptr<Session> host_session_; MockSessionCallback host_connection_callback_; - scoped_refptr<Session> client_session_; + scoped_ptr<Session> client_session_; MockSessionCallback client_connection_callback_; }; @@ -275,32 +277,29 @@ class ChannelTesterBase : public base::RefCountedThreadSafe<ChannelTesterBase> { VIDEO_RTCP, }; - ChannelTesterBase(MessageLoop* message_loop, - Session* host_session, + ChannelTesterBase(Session* host_session, Session* client_session) - : message_loop_(message_loop), - host_session_(host_session), + : host_session_(host_session), client_session_(client_session), - done_event_(true, false) { + done_(false) { } virtual ~ChannelTesterBase() { } void Start(ChannelType channel) { - message_loop_->PostTask( + MessageLoop::current()->PostTask( FROM_HERE, NewRunnableMethod(this, &ChannelTesterBase::DoStart, channel)); } bool WaitFinished() { - return done_event_.TimedWait(base::TimeDelta::FromMilliseconds( - TestTimeouts::action_max_timeout_ms())); + return RunMessageLoopWithTimeout(TestTimeouts::action_max_timeout_ms()); } virtual void CheckResults() = 0; protected: - void DoStart(ChannelType channel) { + void DoStart(ChannelType channel) { socket_1_ = SelectChannel(host_session_, channel); socket_2_ = SelectChannel(client_session_, channel); @@ -309,6 +308,11 @@ class ChannelTesterBase : public base::RefCountedThreadSafe<ChannelTesterBase> { DoWrite(); } + void Done() { + done_ = true; + MessageLoop::current()->PostTask(FROM_HERE, base::Bind(&QuitCurrentThread)); + } + virtual void InitBuffers() = 0; virtual void DoWrite() = 0; virtual void DoRead() = 0; @@ -332,21 +336,20 @@ class ChannelTesterBase : public base::RefCountedThreadSafe<ChannelTesterBase> { } } - MessageLoop* message_loop_; - scoped_refptr<Session> host_session_; - scoped_refptr<Session> client_session_; + Session* host_session_; + Session* client_session_; net::Socket* socket_1_; net::Socket* socket_2_; - base::WaitableEvent done_event_; + bool done_; }; class TCPChannelTester : public ChannelTesterBase { public: - TCPChannelTester(MessageLoop* message_loop, - Session* host_session, + TCPChannelTester(Session* host_session, Session* client_session, - int message_size, int message_count) - : ChannelTesterBase(message_loop, host_session, client_session), + int message_size, + int message_count) + : ChannelTesterBase(host_session, client_session), ALLOW_THIS_IN_INITIALIZER_LIST( write_cb_(this, &TCPChannelTester::OnWritten)), ALLOW_THIS_IN_INITIALIZER_LIST( @@ -403,7 +406,7 @@ class TCPChannelTester : public ChannelTesterBase { if (result <= 0 && result != net::ERR_IO_PENDING) { LOG(ERROR) << "Received error " << result << " when trying to write"; write_errors_++; - done_event_.Signal(); + Done(); } else if (result > 0) { output_buffer_->DidConsume(result); } @@ -420,22 +423,22 @@ class TCPChannelTester : public ChannelTesterBase { void OnRead(int result) { HandleReadResult(result); - if (!done_event_.IsSignaled()) + if (!done_) DoRead(); // Don't try to read again when we are done reading. } void HandleReadResult(int result) { if (result <= 0 && result != net::ERR_IO_PENDING) { - if (!done_event_.IsSignaled()) { + if (!done_) { LOG(ERROR) << "Received error " << result << " when trying to read"; read_errors_++; - done_event_.Signal(); + Done(); } } else if (result > 0) { // Allocate memory for the next read. input_buffer_->set_offset(input_buffer_->offset() + result); if (input_buffer_->offset() == test_data_size_) - done_event_.Signal(); + Done(); } } @@ -453,12 +456,10 @@ class TCPChannelTester : public ChannelTesterBase { class ChannelSpeedTester : public TCPChannelTester { public: - ChannelSpeedTester(MessageLoop* message_loop, - Session* host_session, - Session* client_session, - int message_size) - : TCPChannelTester(message_loop, host_session, - client_session, message_size, 1) { + ChannelSpeedTester(Session* host_session, + Session* client_session, + int message_size) + : TCPChannelTester(host_session, client_session, message_size, 1) { CHECK(message_size >= 8); } @@ -483,10 +484,9 @@ class ChannelSpeedTester : public TCPChannelTester { class UDPChannelTester : public ChannelTesterBase { public: - UDPChannelTester(MessageLoop* message_loop, - Session* host_session, + UDPChannelTester(Session* host_session, Session* client_session) - : ChannelTesterBase(message_loop, host_session, client_session), + : ChannelTesterBase(host_session, client_session), ALLOW_THIS_IN_INITIALIZER_LIST( write_cb_(this, &UDPChannelTester::OnWritten)), ALLOW_THIS_IN_INITIALIZER_LIST( @@ -518,7 +518,7 @@ class UDPChannelTester : public ChannelTesterBase { virtual void DoWrite() { if (packets_sent_ >= kMessages) { - done_event_.Signal(); + Done(); return; } @@ -540,11 +540,11 @@ class UDPChannelTester : public ChannelTesterBase { if (result <= 0 && result != net::ERR_IO_PENDING) { LOG(ERROR) << "Received error " << result << " when trying to write"; write_errors_++; - done_event_.Signal(); + Done(); } else if (result > 0) { EXPECT_EQ(kMessageSize, result); packets_sent_++; - message_loop_->PostDelayedTask( + MessageLoop::current()->PostDelayedTask( FROM_HERE, NewRunnableMethod(this, &UDPChannelTester::DoWrite), kUdpWriteDelayMs); } @@ -569,10 +569,10 @@ class UDPChannelTester : public ChannelTesterBase { void HandleReadResult(int result) { if (result <= 0 && result != net::ERR_IO_PENDING) { // Error will be received after the socket is closed. - if (!done_event_.IsSignaled()) { + if (!done_) { LOG(ERROR) << "Received error " << result << " when trying to read"; read_errors_++; - done_event_.Signal(); + Done(); } } else if (result > 0) { packets_received_++; @@ -625,24 +625,21 @@ TEST_F(JingleSessionTest, RejectConnection) { EXPECT_CALL(host_server_callback_, OnIncomingSession(_, _)) .WillOnce(SetArgumentPointee<1>(protocol::SessionManager::DECLINE)); - base::WaitableEvent done_event(true, false); EXPECT_CALL(client_connection_callback_, OnStateChange(Session::CONNECTING)) .Times(1); EXPECT_CALL(client_connection_callback_, OnStateChange(Session::CLOSED)) .Times(1) - .WillOnce(InvokeWithoutArgs(&done_event, &base::WaitableEvent::Signal)); + .WillOnce(InvokeWithoutArgs(&QuitCurrentThread)); - client_session_ = client_server_->Connect( + client_session_.reset(client_server_->Connect( kHostJid, kTestHostPublicKey, kTestToken, CandidateSessionConfig::CreateDefault(), NewCallback(&client_connection_callback_, - &MockSessionCallback::OnStateChange)); + &MockSessionCallback::OnStateChange))); - ASSERT_TRUE( - done_event.TimedWait(base::TimeDelta::FromMilliseconds( - TestTimeouts::action_max_timeout_ms()))); + ASSERT_TRUE(RunMessageLoopWithTimeout(TestTimeouts::action_max_timeout_ms())); } // Verify that we can connect two endpoints. @@ -656,8 +653,8 @@ TEST_F(JingleSessionTest, TestControlChannel) { CreateServerPair(); ASSERT_TRUE(InitiateConnection()); scoped_refptr<TCPChannelTester> tester( - new TCPChannelTester(thread_.message_loop(), host_session_, - client_session_, kMessageSize, kMessages)); + new TCPChannelTester(host_session_.get(), client_session_.get(), + kMessageSize, kMessages)); tester->Start(ChannelTesterBase::CONTROL); ASSERT_TRUE(tester->WaitFinished()); tester->CheckResults(); @@ -671,8 +668,8 @@ TEST_F(JingleSessionTest, TestVideoChannel) { CreateServerPair(); ASSERT_TRUE(InitiateConnection()); scoped_refptr<TCPChannelTester> tester( - new TCPChannelTester(thread_.message_loop(), host_session_, - client_session_, kMessageSize, kMessageSize)); + new TCPChannelTester(host_session_.get(), client_session_.get(), + kMessageSize, kMessageSize)); tester->Start(ChannelTesterBase::VIDEO); ASSERT_TRUE(tester->WaitFinished()); tester->CheckResults(); @@ -686,8 +683,8 @@ TEST_F(JingleSessionTest, TestEventChannel) { CreateServerPair(); ASSERT_TRUE(InitiateConnection()); scoped_refptr<TCPChannelTester> tester( - new TCPChannelTester(thread_.message_loop(), host_session_, - client_session_, kMessageSize, kMessageSize)); + new TCPChannelTester(host_session_.get(), client_session_.get(), + kMessageSize, kMessageSize)); tester->Start(ChannelTesterBase::EVENT); ASSERT_TRUE(tester->WaitFinished()); tester->CheckResults(); @@ -701,8 +698,7 @@ TEST_F(JingleSessionTest, TestVideoRtpChannel) { CreateServerPair(); ASSERT_TRUE(InitiateConnection()); scoped_refptr<UDPChannelTester> tester( - new UDPChannelTester(thread_.message_loop(), host_session_, - client_session_)); + new UDPChannelTester(host_session_.get(), client_session_.get())); tester->Start(ChannelTesterBase::VIDEO_RTP); ASSERT_TRUE(tester->WaitFinished()); tester->CheckResults(); @@ -718,29 +714,29 @@ TEST_F(JingleSessionTest, DISABLED_TestSpeed) { ASSERT_TRUE(InitiateConnection()); scoped_refptr<ChannelSpeedTester> tester; - tester = new ChannelSpeedTester(thread_.message_loop(), host_session_, - client_session_, 512); + tester = new ChannelSpeedTester(host_session_.get(), + client_session_.get(), 512); tester->Start(ChannelTesterBase::VIDEO); ASSERT_TRUE(tester->WaitFinished()); LOG(INFO) << "Time for 512 bytes " << tester->GetElapsedTime().InMilliseconds() << " ms."; - tester = new ChannelSpeedTester(thread_.message_loop(), host_session_, - client_session_, 1024); + tester = new ChannelSpeedTester(host_session_.get(), + client_session_.get(), 1024); tester->Start(ChannelTesterBase::VIDEO); ASSERT_TRUE(tester->WaitFinished()); LOG(INFO) << "Time for 1024 bytes " << tester->GetElapsedTime().InMilliseconds() << " ms."; - tester = new ChannelSpeedTester(thread_.message_loop(), host_session_, - client_session_, 51200); + tester = new ChannelSpeedTester(host_session_.get(), + client_session_.get(), 51200); tester->Start(ChannelTesterBase::VIDEO); ASSERT_TRUE(tester->WaitFinished()); LOG(INFO) << "Time for 50k bytes " << tester->GetElapsedTime().InMilliseconds() << " ms."; - tester = new ChannelSpeedTester(thread_.message_loop(), host_session_, - client_session_, 512000); + tester = new ChannelSpeedTester(host_session_.get(), + client_session_.get(), 512000); tester->Start(ChannelTesterBase::VIDEO); ASSERT_TRUE(tester->WaitFinished()); LOG(INFO) << "Time for 500k bytes " diff --git a/remoting/protocol/protocol_mock_objects.h b/remoting/protocol/protocol_mock_objects.h index 04105b9..274b304 100644 --- a/remoting/protocol/protocol_mock_objects.h +++ b/remoting/protocol/protocol_mock_objects.h @@ -126,7 +126,7 @@ class MockSession : public Session { MOCK_METHOD1(set_initiator_token, void(const std::string& initiator_token)); MOCK_METHOD0(receiver_token, const std::string&()); MOCK_METHOD1(set_receiver_token, void(const std::string& receiver_token)); - MOCK_METHOD1(Close, void(Task* closed_task)); + MOCK_METHOD0(Close, void()); private: DISALLOW_COPY_AND_ASSIGN(MockSession); diff --git a/remoting/protocol/protocol_test_client.cc b/remoting/protocol/protocol_test_client.cc index 4a1a1b1..e20facc 100644 --- a/remoting/protocol/protocol_test_client.cc +++ b/remoting/protocol/protocol_test_client.cc @@ -49,8 +49,7 @@ class ProtocolTestConnection write_cb_(this, &ProtocolTestConnection::OnWritten)), pending_write_(false), ALLOW_THIS_IN_INITIALIZER_LIST( - read_cb_(this, &ProtocolTestConnection::OnRead)), - closed_event_(true, false) { + read_cb_(this, &ProtocolTestConnection::OnRead)) { } virtual ~ProtocolTestConnection() {} @@ -71,24 +70,20 @@ class ProtocolTestConnection void OnWritten(int result); void OnRead(int result); - void OnFinishedClosing(); - ProtocolTestClient* client_; MessageLoop* message_loop_; - scoped_refptr<Session> session_; + scoped_ptr<Session> session_; net::CompletionCallbackImpl<ProtocolTestConnection> write_cb_; bool pending_write_; net::CompletionCallbackImpl<ProtocolTestConnection> read_cb_; scoped_refptr<net::IOBuffer> read_buffer_; - base::WaitableEvent closed_event_; }; class ProtocolTestClient : public SignalStrategy::StatusObserver, public base::RefCountedThreadSafe<ProtocolTestClient> { public: - ProtocolTestClient() - : closed_event_(true, false) { + ProtocolTestClient() { } virtual ~ProtocolTestClient() {} @@ -111,20 +106,17 @@ class ProtocolTestClient private: typedef std::list<scoped_refptr<ProtocolTestConnection> > ConnectionsList; - void OnFinishedClosing(); - std::string host_jid_; scoped_ptr<SignalStrategy> signal_strategy_; std::string local_jid_; - scoped_refptr<JingleSessionManager> session_manager_; + scoped_ptr<JingleSessionManager> session_manager_; ConnectionsList connections_; base::Lock connections_lock_; - base::WaitableEvent closed_event_; }; void ProtocolTestConnection::Init(Session* session) { - session_ = session; + session_.reset(session); } void ProtocolTestConnection::Write(const std::string& str) { @@ -181,13 +173,7 @@ void ProtocolTestConnection::DoRead() { } void ProtocolTestConnection::Close() { - session_->Close( - NewRunnableMethod(this, &ProtocolTestConnection::OnFinishedClosing)); - closed_event_.Wait(); -} - -void ProtocolTestConnection::OnFinishedClosing() { - closed_event_.Signal(); + session_->Close(); } void ProtocolTestConnection::OnStateChange(Session::State state) { @@ -234,8 +220,7 @@ void ProtocolTestClient::Run(const std::string& username, new XmppSignalStrategy(&jingle_thread, username, auth_token, auth_service)); signal_strategy_->Init(this); - session_manager_ = new JingleSessionManager(jingle_thread.message_loop(), - NULL, NULL, NULL); + session_manager_.reset(new JingleSessionManager(NULL, NULL, NULL)); host_jid_ = host_jid; @@ -262,10 +247,9 @@ void ProtocolTestClient::Run(const std::string& username, connections_.pop_front(); } - if (session_manager_) { - session_manager_->Close( - NewRunnableMethod(this, &ProtocolTestClient::OnFinishedClosing)); - closed_event_.Wait(); + if (session_manager_.get()) { + session_manager_->Close(); + session_manager_.reset(); } signal_strategy_->Close(); @@ -328,10 +312,6 @@ void ProtocolTestClient::OnNewSession( connections_.push_back(make_scoped_refptr(test_connection)); } -void ProtocolTestClient::OnFinishedClosing() { - closed_event_.Signal(); -} - } // namespace protocol } // namespace remoting diff --git a/remoting/protocol/rtp_video_reader_unittest.cc b/remoting/protocol/rtp_video_reader_unittest.cc index 0281c34..2475d6a 100644 --- a/remoting/protocol/rtp_video_reader_unittest.cc +++ b/remoting/protocol/rtp_video_reader_unittest.cc @@ -63,9 +63,9 @@ class RtpVideoReaderTest : public testing::Test, } void Reset() { - session_ = new FakeSession(); + session_.reset(new FakeSession()); reader_.reset(new RtpVideoReader()); - reader_->Init(session_, this); + reader_->Init(session_.get(), this); received_packets_.clear(); } @@ -129,7 +129,7 @@ class RtpVideoReaderTest : public testing::Test, MessageLoop message_loop_; - scoped_refptr<FakeSession> session_; + scoped_ptr<FakeSession> session_; scoped_ptr<RtpVideoReader> reader_; vector<char> data_; diff --git a/remoting/protocol/rtp_video_writer_unittest.cc b/remoting/protocol/rtp_video_writer_unittest.cc index 74b4ae3..2724927 100644 --- a/remoting/protocol/rtp_video_writer_unittest.cc +++ b/remoting/protocol/rtp_video_writer_unittest.cc @@ -56,8 +56,8 @@ class RtpVideoWriterTest : public testing::Test { }; virtual void SetUp() { - session_ = new FakeSession(); - writer_.Init(session_); + session_.reset(new FakeSession()); + writer_.Init(session_.get()); } void InitData(int size) { @@ -109,7 +109,7 @@ class RtpVideoWriterTest : public testing::Test { MessageLoop message_loop_; - scoped_refptr<FakeSession> session_; + scoped_ptr<FakeSession> session_; RtpVideoWriter writer_; vector<char> data_; diff --git a/remoting/protocol/session.h b/remoting/protocol/session.h index 3051a68..754b223 100644 --- a/remoting/protocol/session.h +++ b/remoting/protocol/session.h @@ -8,10 +8,10 @@ #include <string> #include "base/callback.h" +#include "base/threading/non_thread_safe.h" #include "remoting/protocol/buffered_socket_writer.h" #include "remoting/protocol/session_config.h" -class MessageLoop; class Task; namespace net { @@ -25,7 +25,7 @@ namespace protocol { // Provides access to the connection channels, but doesn't depend on the // protocol used for each channel. // TODO(sergeyu): Remove refcounting? -class Session : public base::RefCountedThreadSafe<Session> { +class Session : public base::NonThreadSafe { public: enum State { INITIALIZING, @@ -37,6 +37,9 @@ class Session : public base::RefCountedThreadSafe<Session> { typedef Callback1<State>::Type StateChangeCallback; + Session() { } + virtual ~Session() { } + // Set callback that is called when state of the connection is changed. // Must be called on the jingle thread only. virtual void SetStateChangeCallback(StateChangeCallback* callback) = 0; @@ -58,9 +61,6 @@ class Session : public base::RefCountedThreadSafe<Session> { // JID of the other side. virtual const std::string& jid() = 0; - // Message loop that must be used to access the channels of this connection. - virtual MessageLoop* message_loop() = 0; - // Configuration of the protocol that was sent or received in the // session-initiate jingle message. Returned pointer is valid until // connection is closed. @@ -83,15 +83,9 @@ class Session : public base::RefCountedThreadSafe<Session> { virtual void set_receiver_token(const std::string& receiver_token) = 0; // Closes connection. Callbacks are guaranteed not to be called - // after |closed_task| is executed. Must be called before the object - // is destroyed, unless the state is set to FAILED or CLOSED. - virtual void Close(Task* closed_task) = 0; - - protected: - friend class base::RefCountedThreadSafe<Session>; - - Session() { } - virtual ~Session() { } + // after this method returns. Must be called before the object is + // destroyed, unless the state is set to FAILED or CLOSED. + virtual void Close() = 0; private: DISALLOW_COPY_AND_ASSIGN(Session); diff --git a/remoting/protocol/session_manager.h b/remoting/protocol/session_manager.h index e1cb990..376559d 100644 --- a/remoting/protocol/session_manager.h +++ b/remoting/protocol/session_manager.h @@ -50,6 +50,7 @@ #include "base/callback.h" #include "base/memory/ref_counted.h" +#include "base/threading/non_thread_safe.h" #include "remoting/protocol/session.h" class Task; @@ -69,21 +70,25 @@ class SignalStrategy; namespace protocol { // Generic interface for Chromoting session manager. -class SessionManager : public base::RefCountedThreadSafe<SessionManager> { +class SessionManager : public base::NonThreadSafe { public: + SessionManager() { } + virtual ~SessionManager() { } + enum IncomingSessionResponse { ACCEPT, INCOMPATIBLE, DECLINE, }; - // IncomingSessionCallback is called when a new session is received. If - // the callback decides to accept the session it should set the second - // argument to ACCEPT. Otherwise it should set it to DECLINE, or - // INCOMPATIBLE. INCOMPATIBLE indicates that the session has incompatible - // configuration, and cannot be accepted. - // If the callback accepts session then it must also set configuration - // for the new session using Session::set_config(). + // IncomingSessionCallback is called when a new session is + // received. If the callback decides to accept the session it should + // set the second argument to ACCEPT. Otherwise it should set it to + // DECLINE, or INCOMPATIBLE. INCOMPATIBLE indicates that the session + // has incompatible configuration, and cannot be accepted. If the + // callback accepts session then it must also set configuration for + // the new session using Session::set_config(). The callback must + // take ownership of the session if it accepts connection. typedef Callback2<Session*, IncomingSessionResponse*>::Type IncomingSessionCallback; @@ -113,23 +118,16 @@ class SessionManager : public base::RefCountedThreadSafe<SessionManager> { // is invoked on the network thread. // // Ownership of the |config| is passed to the new session. - virtual scoped_refptr<Session> Connect( + virtual Session* Connect( const std::string& host_jid, const std::string& host_public_key, const std::string& client_token, CandidateSessionConfig* config, Session::StateChangeCallback* state_change_callback) = 0; - // Close session manager and all current sessions. |close_task| is executed - // after the session client is actually closed. No callbacks are called after - // |closed_task| is executed. - virtual void Close(Task* closed_task) = 0; - - protected: - friend class base::RefCountedThreadSafe<SessionManager>; - - SessionManager() { } - virtual ~SessionManager() { } + // Close session manager and all current sessions. No callbacks are + // called after this method returns. + virtual void Close() = 0; private: DISALLOW_COPY_AND_ASSIGN(SessionManager); |