diff options
-rw-r--r-- | remoting/client/chromoting_client.cc | 3 | ||||
-rw-r--r-- | remoting/jingle_glue/jingle_channel.cc | 65 | ||||
-rw-r--r-- | remoting/jingle_glue/jingle_channel.h | 34 | ||||
-rw-r--r-- | remoting/jingle_glue/jingle_channel_unittest.cc | 164 | ||||
-rw-r--r-- | remoting/jingle_glue/jingle_client.cc | 201 | ||||
-rw-r--r-- | remoting/jingle_glue/jingle_client.h | 25 | ||||
-rw-r--r-- | remoting/jingle_glue/jingle_client_unittest.cc | 108 | ||||
-rw-r--r-- | remoting/jingle_glue/jingle_test_client.cc | 29 | ||||
-rw-r--r-- | remoting/jingle_glue/jingle_thread.cc | 80 | ||||
-rw-r--r-- | remoting/jingle_glue/jingle_thread.h | 18 | ||||
-rw-r--r-- | remoting/remoting.gyp | 3 |
11 files changed, 525 insertions, 205 deletions
diff --git a/remoting/client/chromoting_client.cc b/remoting/client/chromoting_client.cc index 9226186..a9cf5ba 100644 --- a/remoting/client/chromoting_client.cc +++ b/remoting/client/chromoting_client.cc @@ -60,9 +60,6 @@ void ChromotingClient::Stop() { connection_->Disconnect(); view_->TearDown(); - - // Quit the current message loop. - message_loop()->PostTask(FROM_HERE, new MessageLoop::QuitTask()); } void ChromotingClient::ClientDone() { diff --git a/remoting/jingle_glue/jingle_channel.cc b/remoting/jingle_glue/jingle_channel.cc index 0ff634c..9dc545f 100644 --- a/remoting/jingle_glue/jingle_channel.cc +++ b/remoting/jingle_glue/jingle_channel.cc @@ -22,6 +22,7 @@ const size_t kReadBufferSize = 4096; JingleChannel::JingleChannel(Callback* callback) : state_(INITIALIZING), callback_(callback), + closed_(false), event_handler_(this), write_buffer_size_(0), current_write_buf_pos_(0) { @@ -31,12 +32,13 @@ JingleChannel::JingleChannel(Callback* callback) // This constructor is only used in unit test. JingleChannel::JingleChannel() : state_(CLOSED), + closed_(false), write_buffer_size_(0), current_write_buf_pos_(0) { } JingleChannel::~JingleChannel() { - DCHECK_EQ(CLOSED, state_); + DCHECK(closed_ || stream_ == NULL); } void JingleChannel::Init(JingleThread* thread, @@ -101,7 +103,12 @@ void JingleChannel::DoRead() { case talk_base::SR_SUCCESS: { DCHECK_GT(bytes_read, 0U); buffer->SetDataSize(bytes_read); - callback_->OnPacketReceived(this, buffer); + { + AutoLock auto_lock(state_lock_); + // Drop received data if the channel is already closed. + if (!closed_) + callback_->OnPacketReceived(this, buffer); + } break; } case talk_base::SR_BLOCK: { @@ -177,25 +184,53 @@ void JingleChannel::OnStreamEvent(talk_base::StreamInterface* stream, SetState(CLOSED); } -void JingleChannel::SetState(State state) { - if (state == state_) - return; - state_ = state; - callback_->OnStateChange(this, state); +void JingleChannel::SetState(State new_state) { + if (new_state != state_) { + state_ = new_state; + { + AutoLock auto_lock(state_lock_); + if (!closed_) + callback_->OnStateChange(this, new_state); + } + } } void JingleChannel::Close() { - base::WaitableEvent event(true, false); + Close(NULL); +} + +void JingleChannel::Close(Task* closed_task) { + { + AutoLock auto_lock(state_lock_); + if (closed_) { + // We are already closed. + if (closed_task) + thread_->message_loop()->PostTask(FROM_HERE, closed_task); + return; + } + closed_ = true; + if (closed_task) + closed_task_.reset(closed_task); + } + thread_->message_loop()->PostTask( - FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoClose, &event)); - event.Wait(); + FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoClose)); } -void JingleChannel::DoClose(base::WaitableEvent* done_event) { - if (stream_.get()) - stream_->Close(); - SetState(CLOSED); - done_event->Signal(); + +void JingleChannel::DoClose() { + DCHECK(closed_); + stream_->Close(); + stream_.reset(); + + // TODO(sergeyu): Even though we have called Close() for the stream, it + // doesn't mean that the p2p sessions has been closed. I.e. |closed_task_| + // is called too early. If the client is closed right after that the other + // side will not receive notification that the channel was closed. + if (closed_task_.get()) { + closed_task_->Run(); + closed_task_.reset(); + } } size_t JingleChannel::write_buffer_size() { diff --git a/remoting/jingle_glue/jingle_channel.h b/remoting/jingle_glue/jingle_channel.h index 0ba1a75..ee53760 100644 --- a/remoting/jingle_glue/jingle_channel.h +++ b/remoting/jingle_glue/jingle_channel.h @@ -14,6 +14,7 @@ #include "base/lock.h" #include "base/ref_counted.h" #include "base/scoped_ptr.h" +#include "base/task.h" #include "third_party/libjingle/source/talk/base/sigslot.h" namespace base { @@ -59,8 +60,10 @@ class JingleChannel : public base::RefCountedThreadSafe<JingleChannel> { // Puts data to the write buffer. virtual void Write(scoped_refptr<media::DataBuffer> data); - // Closes the tunnel. + // Closes the tunnel. If specified, |closed_task| is executed after the + // connection is successfully closed. virtual void Close(); + virtual void Close(Task* closed_task); // Current state of the tunnel. State state() const { return state_; } @@ -87,17 +90,12 @@ class JingleChannel : public base::RefCountedThreadSafe<JingleChannel> { // caller. Ownership of |thread| is not. void Init(JingleThread* thread, talk_base::StreamInterface* stream, const std::string& jid); - void SetState(State state); - - JingleThread* thread_; - scoped_ptr<talk_base::StreamInterface> stream_; - State state_; private: + friend class JingleChannelTest; FRIEND_TEST_ALL_PREFIXES(JingleChannelTest, Init); FRIEND_TEST_ALL_PREFIXES(JingleChannelTest, Write); FRIEND_TEST_ALL_PREFIXES(JingleChannelTest, Read); - FRIEND_TEST_ALL_PREFIXES(JingleChannelTest, Close); typedef std::deque<scoped_refptr<media::DataBuffer> > DataQueue; @@ -132,11 +130,31 @@ class JingleChannel : public base::RefCountedThreadSafe<JingleChannel> { // Called from OnStreamEvent() in the jingle thread. void DoRead(); - void DoClose(base::WaitableEvent* done_event); + // Used by Close() to actually close the channel. + void DoClose(); + + // Updates state and calels |callback_| if neccessary. + void SetState(State new_state); + + // The thread this channel runs on. + JingleThread* thread_; + + // The stream of this channel. + scoped_ptr<talk_base::StreamInterface> stream_; + + // Current state of the channel. + State state_; // Callback that is called on channel events. Initialized in the constructor. + // Must not be called if |closed_| is set to true. Callback* callback_; + // |closed_| must be set to true after Close() is called. |state_lock_| must + // be locked whenever closed_ is accessed. + Lock state_lock_; + bool closed_; + scoped_ptr<Task> closed_task_; + // Event handler for stream events. EventHandler event_handler_; diff --git a/remoting/jingle_glue/jingle_channel_unittest.cc b/remoting/jingle_glue/jingle_channel_unittest.cc index 94bb137..175d69c 100644 --- a/remoting/jingle_glue/jingle_channel_unittest.cc +++ b/remoting/jingle_glue/jingle_channel_unittest.cc @@ -3,6 +3,7 @@ // found in the LICENSE file. #include "base/ref_counted.h" +#include "base/waitable_event.h" #include "media/base/data_buffer.h" #include "remoting/jingle_glue/jingle_channel.h" #include "remoting/jingle_glue/jingle_thread.h" @@ -22,7 +23,7 @@ namespace { const size_t kBufferSize = 100; } // namespace -class MockCallback : public JingleChannel::Callback { +class MockJingleChannelCallback : public JingleChannel::Callback { public: MOCK_METHOD2(OnStateChange, void(JingleChannel*, JingleChannel::State)); MOCK_METHOD2(OnPacketReceived, void(JingleChannel*, @@ -44,107 +45,146 @@ class MockStream : public talk_base::StreamInterface { MOCK_METHOD2(PostEvent, void(int, int)); }; -TEST(JingleChannelTest, Init) { - JingleThread thread; - - MockStream* stream = new MockStream(); - MockCallback callback; +class JingleChannelTest : public testing::Test { + public: + virtual ~JingleChannelTest() { } + + // A helper that calls OnStreamEvent(). Need this because we want + // to call it on the jingle thread. + static void StreamEvent(JingleChannel* channel, + talk_base::StreamInterface* stream, + int event, int error, + base::WaitableEvent* done_event) { + channel->OnStreamEvent(stream, event, error); + if (done_event) + done_event->Signal(); + } + + static void OnClosed(bool* called) { + *called = true; + } + + protected: + virtual void SetUp() { + stream_ = new MockStream(); // Freed by the channel. + channel_ = new JingleChannel(&callback_); + channel_->thread_ = &thread_; + channel_->stream_.reset(stream_); + } + + JingleThread thread_; + MockStream* stream_; + MockJingleChannelCallback callback_; + scoped_refptr<JingleChannel> channel_; +}; - EXPECT_CALL(*stream, GetState()) +TEST_F(JingleChannelTest, Init) { + EXPECT_CALL(*stream_, GetState()) .Times(1) .WillRepeatedly(Return(talk_base::SS_OPENING)); - scoped_refptr<JingleChannel> channel = new JingleChannel(&callback); - - EXPECT_CALL(callback, OnStateChange(channel.get(), JingleChannel::CONNECTING)) + EXPECT_CALL(callback_, OnStateChange(channel_.get(), + JingleChannel::CONNECTING)) .Times(1); - thread.Start(); + thread_.Start(); - EXPECT_EQ(JingleChannel::INITIALIZING, channel->state()); - channel->Init(&thread, stream, "user@domain.com"); - EXPECT_EQ(JingleChannel::CONNECTING, channel->state()); - channel->state_ = JingleChannel::CLOSED; + EXPECT_EQ(JingleChannel::INITIALIZING, channel_->state()); + channel_->Init(&thread_, stream_, "user@domain.com"); + EXPECT_EQ(JingleChannel::CONNECTING, channel_->state()); + channel_->closed_ = true; - thread.Stop(); + thread_.Stop(); } -TEST(JingleChannelTest, Write) { - JingleThread thread; - MockStream* stream = new MockStream(); // Freed by the channel. - MockCallback callback; - +TEST_F(JingleChannelTest, Write) { scoped_refptr<media::DataBuffer> data = new media::DataBuffer(kBufferSize); data->SetDataSize(kBufferSize); - EXPECT_CALL(*stream, Write(static_cast<const void*>(data->GetData()), + EXPECT_CALL(*stream_, Write(static_cast<const void*>(data->GetData()), kBufferSize, _, _)) .WillOnce(DoAll(SetArgumentPointee<2>(kBufferSize), Return(talk_base::SR_SUCCESS))); - scoped_refptr<JingleChannel> channel = new JingleChannel(&callback); - - channel->thread_ = &thread; - channel->stream_.reset(stream); - channel->state_ = JingleChannel::OPEN; - thread.Start(); - channel->Write(data); - thread.Stop(); - channel->state_ = JingleChannel::CLOSED; + channel_->state_ = JingleChannel::OPEN; + thread_.Start(); + channel_->Write(data); + thread_.Stop(); + channel_->closed_ = true; } -TEST(JingleChannelTest, Read) { - JingleThread thread; - MockStream* stream = new MockStream(); // Freed by the channel. - MockCallback callback; - +TEST_F(JingleChannelTest, Read) { scoped_refptr<media::DataBuffer> data = new media::DataBuffer(kBufferSize); data->SetDataSize(kBufferSize); - scoped_refptr<JingleChannel> channel = new JingleChannel(&callback); - - EXPECT_CALL(callback, OnPacketReceived(channel.get(), _)) + EXPECT_CALL(callback_, OnPacketReceived(channel_.get(), _)) .Times(1); - EXPECT_CALL(*stream, GetAvailable(_)) + EXPECT_CALL(*stream_, GetAvailable(_)) .WillOnce(DoAll(SetArgumentPointee<0>(kBufferSize), Return(true))) .WillOnce(DoAll(SetArgumentPointee<0>(0), Return(true))); - EXPECT_CALL(*stream, Read(_, kBufferSize, _, _)) + EXPECT_CALL(*stream_, Read(_, kBufferSize, _, _)) .WillOnce(DoAll(SetArgumentPointee<2>(kBufferSize), Return(talk_base::SR_SUCCESS))); - channel->thread_ = &thread; - channel->stream_.reset(stream); - channel->state_ = JingleChannel::OPEN; - thread.Start(); - channel->OnStreamEvent(stream, talk_base::SE_READ, 0); - thread.Stop(); - channel->state_ = JingleChannel::CLOSED; + channel_->state_ = JingleChannel::OPEN; + thread_.Start(); + + base::WaitableEvent done_event(true, false); + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction( + &JingleChannelTest::StreamEvent, channel_.get(), stream_, + talk_base::SE_READ, 0, &done_event)); + done_event.Wait(); + + thread_.Stop(); + + channel_->closed_ = true; } -TEST(JingleChannelTest, Close) { - JingleThread thread; - MockStream* stream = new MockStream(); // Freed by the channel. - MockCallback callback; +TEST_F(JingleChannelTest, Close) { + EXPECT_CALL(*stream_, Close()).Times(1); + // Don't expect any calls except Close(). + EXPECT_CALL(*stream_, GetAvailable(_)).Times(0); + EXPECT_CALL(*stream_, Read(_, _, _, _)).Times(0); + EXPECT_CALL(callback_, OnPacketReceived(_, _)).Times(0); + + thread_.Start(); + channel_->Close(); + // Verify that the channel doesn't call callback anymore. + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction( + &JingleChannelTest::StreamEvent, channel_.get(), stream_, + talk_base::SE_READ, 0, static_cast<base::WaitableEvent*>(NULL))); + thread_.Stop(); +} - EXPECT_CALL(*stream, Close()) +TEST_F(JingleChannelTest, ClosedTask) { + EXPECT_CALL(*stream_, Close()) .Times(1); - scoped_refptr<JingleChannel> channel = new JingleChannel(&callback); - - channel->thread_ = &thread; - channel->stream_.reset(stream); - channel->state_ = JingleChannel::OPEN; + thread_.Start(); + bool closed = false; + channel_->Close(NewRunnableFunction(&JingleChannelTest::OnClosed, + &closed)); + thread_.Stop(); + EXPECT_TRUE(closed); +} - EXPECT_CALL(callback, OnStateChange(channel.get(), JingleChannel::CLOSED)) +TEST_F(JingleChannelTest, DoubleClose) { + EXPECT_CALL(*stream_, Close()) .Times(1); - thread.Start(); - channel->Close(); - thread.Stop(); + thread_.Start(); + bool closed1 = false; + channel_->Close(NewRunnableFunction(&JingleChannelTest::OnClosed, + &closed1)); + bool closed2 = false; + channel_->Close(NewRunnableFunction(&JingleChannelTest::OnClosed, + &closed2)); + thread_.Stop(); + EXPECT_TRUE(closed1 && closed2); } } // namespace remoting diff --git a/remoting/jingle_glue/jingle_client.cc b/remoting/jingle_glue/jingle_client.cc index 938664e..01e607b 100644 --- a/remoting/jingle_glue/jingle_client.cc +++ b/remoting/jingle_glue/jingle_client.cc @@ -2,9 +2,6 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -// TODO(ajwong): We assign and read from a few of the member variables on -// two threads. We need to audit this for thread safety. - #include "remoting/jingle_glue/jingle_client.h" #include "base/logging.h" @@ -17,92 +14,64 @@ #include "third_party/libjingle/source/talk/base/asyncsocket.h" #include "third_party/libjingle/source/talk/base/ssladapter.h" #include "third_party/libjingle/source/talk/p2p/base/sessionmanager.h" +#include "third_party/libjingle/source/talk/p2p/base/transport.h" #include "third_party/libjingle/source/talk/p2p/client/sessionmanagertask.h" -#ifdef USE_SSL_TUNNEL -#include "third_party/libjingle/source/talk/session/tunnel/securetunnelsessionclient.h" -#endif #include "third_party/libjingle/source/talk/session/tunnel/tunnelsessionclient.h" #include "third_party/libjingle/source/talk/xmpp/prexmppauth.h" #include "third_party/libjingle/source/talk/xmpp/saslcookiemechanism.h" namespace remoting { +namespace { +// A TunnelSessionClient that allows local connections. +class LocalTunnelSessionClient : public cricket::TunnelSessionClient { + public: + LocalTunnelSessionClient(const buzz::Jid& jid, + cricket::SessionManager* manager) + : TunnelSessionClient(jid, manager) { + } + + protected: + virtual cricket::TunnelSession* MakeTunnelSession( + cricket::Session* session, talk_base::Thread* stream_thread, + cricket::TunnelSessionRole role) { + session->transport()->set_allow_local_ips(true); + return new cricket::TunnelSession(this, session, stream_thread); + } +}; +} // namespace + JingleClient::JingleClient(JingleThread* thread) - : client_(NULL), - thread_(thread), - state_(CREATED), - callback_(NULL) { + : thread_(thread), + callback_(NULL), + client_(NULL), + state_(START), + initialized_(false), + closed_(false) { } JingleClient::~JingleClient() { - // JingleClient can be destroyed only after it's closed. - DCHECK(state_ == CLOSED || state_ == CREATED); + AutoLock auto_lock(state_lock_); + DCHECK(!initialized_ || closed_); } void JingleClient::Init( const std::string& username, const std::string& auth_token, const std::string& auth_token_service, Callback* callback) { DCHECK_NE(username, ""); - DCHECK(callback != NULL); - DCHECK(state_ == CREATED); - - callback_ = callback; - message_loop()->PostTask( - FROM_HERE, NewRunnableMethod(this, &JingleClient::DoInitialize, - username, auth_token, auth_token_service)); - state_ = INITIALIZED; -} - -JingleChannel* JingleClient::Connect(const std::string& host_jid, - JingleChannel::Callback* callback) { - // Ownership if channel is given to DoConnect. - scoped_refptr<JingleChannel> channel = new JingleChannel(callback); - message_loop()->PostTask( - FROM_HERE, NewRunnableMethod(this, &JingleClient::DoConnect, - channel, host_jid, callback)); - return channel; -} - -void JingleClient::DoConnect(scoped_refptr<JingleChannel> channel, - const std::string& host_jid, - JingleChannel::Callback* callback) { - DCHECK_EQ(message_loop(), MessageLoop::current()); - - talk_base::StreamInterface* stream = - tunnel_session_client_->CreateTunnel(buzz::Jid(host_jid), ""); - DCHECK(stream != NULL); - channel->Init(thread_, stream, host_jid); -} + { + AutoLock auto_lock(state_lock_); + DCHECK(!initialized_ && !closed_); + initialized_ = true; -void JingleClient::Close() { - // Once we are closed we really shouldn't talk to the callback again. In the - // case when JingleClient outlives the owner access the callback is not safe. - // TODO(hclam): We need to lock to reset callback. - callback_ = NULL; + DCHECK(callback != NULL); + callback_ = callback; + } message_loop()->PostTask( - FROM_HERE, NewRunnableMethod(this, &JingleClient::DoClose)); -} - -void JingleClient::DoClose() { - DCHECK_EQ(message_loop(), MessageLoop::current()); - - // If we have not yet initialized and the client is already closed then - // don't close again. - if (state_ == CLOSED) - return; - - if (client_) { - client_->Disconnect(); - // Client is deleted by TaskRunner. - client_ = NULL; - } - tunnel_session_client_.reset(); - port_allocator_.reset(); - session_manager_.reset(); - network_manager_.reset(); - UpdateState(CLOSED); + FROM_HERE, NewRunnableMethod(this, &JingleClient::DoInitialize, + username, auth_token, auth_token_service)); } void JingleClient::DoInitialize(const std::string& username, @@ -138,18 +107,10 @@ void JingleClient::DoInitialize(const std::string& username, port_allocator->SetJingleInfo(client_); session_manager_.reset(new cricket::SessionManager(port_allocator_.get())); -#ifdef USE_SSL_TUNNEL - cricket::SecureTunnelSessionClient* session_client = - new cricket::SecureTunnelSessionClient(client_->jid(), - session_manager_.get()); - if (!session_client->GenerateIdentity()) - return false; - tunnel_session_client_.reset(session_client); -#else // !USE_SSL_TUNNEL + tunnel_session_client_.reset( - new cricket::TunnelSessionClient(client_->jid(), - session_manager_.get())); -#endif // USE_SSL_TUNNEL + new LocalTunnelSessionClient(client_->jid(), + session_manager_.get())); cricket::SessionManagerTask* receiver = new cricket::SessionManagerTask(client_, session_manager_.get()); @@ -160,6 +121,72 @@ void JingleClient::DoInitialize(const std::string& username, this, &JingleClient::OnIncomingTunnel); } +JingleChannel* JingleClient::Connect(const std::string& host_jid, + JingleChannel::Callback* callback) { + DCHECK(initialized_ && !closed_); + + // Ownership if channel is given to DoConnect. + scoped_refptr<JingleChannel> channel = new JingleChannel(callback); + message_loop()->PostTask( + FROM_HERE, NewRunnableMethod(this, &JingleClient::DoConnect, + channel, host_jid, callback)); + return channel; +} + +void JingleClient::DoConnect(scoped_refptr<JingleChannel> channel, + const std::string& host_jid, + JingleChannel::Callback* callback) { + DCHECK_EQ(message_loop(), MessageLoop::current()); + + talk_base::StreamInterface* stream = + tunnel_session_client_->CreateTunnel(buzz::Jid(host_jid), ""); + DCHECK(stream != NULL); + + channel->Init(thread_, stream, host_jid); +} + +void JingleClient::Close() { + Close(NULL); +} + +void JingleClient::Close(Task* closed_task) { + { + AutoLock auto_lock(state_lock_); + // If the client is already closed then don't close again. + if (closed_) { + if (closed_task) + thread_->message_loop()->PostTask(FROM_HERE, closed_task); + return; + } + closed_task_.reset(closed_task); + closed_ = true; + } + + message_loop()->PostTask( + FROM_HERE, NewRunnableMethod(this, &JingleClient::DoClose)); +} + +void JingleClient::DoClose() { + DCHECK_EQ(message_loop(), MessageLoop::current()); + DCHECK(closed_); + + tunnel_session_client_.reset(); + session_manager_.reset(); + port_allocator_.reset(); + network_manager_.reset(); + + if (client_) { + client_->Disconnect(); + // Client is deleted by TaskRunner. + client_ = NULL; + } + + if (closed_task_.get()) { + closed_task_->Run(); + closed_task_.reset(); + } +} + std::string JingleClient::GetFullJid() { AutoLock auto_lock(full_jid_lock_); return full_jid_; @@ -176,7 +203,7 @@ MessageLoop* JingleClient::message_loop() { void JingleClient::OnConnectionStateChanged(buzz::XmppEngine::State state) { switch (state) { case buzz::XmppEngine::STATE_START: - UpdateState(INITIALIZED); + UpdateState(START); break; case buzz::XmppEngine::STATE_OPENING: UpdateState(CONNECTING); @@ -200,8 +227,11 @@ void JingleClient::OnConnectionStateChanged(buzz::XmppEngine::State state) { void JingleClient::OnIncomingTunnel( cricket::TunnelSessionClient* client, buzz::Jid jid, std::string description, cricket::Session* session) { - // Decline connection if we don't have callback. - if (!callback_) { + // Must always lock state and check closed_ when calling callback. + AutoLock auto_lock(state_lock_); + + if (closed_) { + // Always reject connection if we are closed. client->DeclineTunnel(session); return; } @@ -221,8 +251,11 @@ void JingleClient::OnIncomingTunnel( void JingleClient::UpdateState(State new_state) { if (new_state != state_) { state_ = new_state; - if (callback_) - callback_->OnStateChange(this, new_state); + { + AutoLock auto_lock(state_lock_); + if (!closed_) + callback_->OnStateChange(this, new_state); + } } } diff --git a/remoting/jingle_glue/jingle_client.h b/remoting/jingle_glue/jingle_client.h index bac15d8..9d7b844 100644 --- a/remoting/jingle_glue/jingle_client.h +++ b/remoting/jingle_glue/jingle_client.h @@ -7,6 +7,7 @@ #include <string> +#include "base/waitable_event.h" #include "remoting/jingle_glue/jingle_channel.h" #include "third_party/libjingle/source/talk/xmpp/xmppclient.h" @@ -36,8 +37,7 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>, public sigslot::has_slots<> { public: enum State { - CREATED, // Initial state. - INITIALIZED, + START, // Initial state. CONNECTING, CONNECTED, CLOSED, @@ -84,8 +84,10 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>, JingleChannel::Callback* callback); // Closes XMPP connection and stops the thread. Must be called before the - // object is destroyed. + // object is destroyed. If specified, |closed_task| is executed after the + // connection is successfully closed. void Close(); + void Close(Task* closed_task); // Returns JID with resource ID. Empty string is returned if full JID is not // known yet, i.e. authentication hasn't finished. @@ -95,7 +97,7 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>, // is transfered to the caller. virtual IqRequest* CreateIqRequest(); - // Current state of the client. + // Current connection state of the client. State state() { return state_; } // Returns XmppClient object for the xmpp connection or NULL if not connected. @@ -106,6 +108,7 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>, private: friend class HeartbeatSenderTest; + friend class JingleClientTest; void OnConnectionStateChanged(buzz::XmppEngine::State state); @@ -131,14 +134,24 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>, buzz::PreXmppAuth* CreatePreXmppAuth( const buzz::XmppClientSettings& settings); - buzz::XmppClient* client_; + // JingleThread used for the connection. Set in the constructor. JingleThread* thread_; - State state_; + + // Callback for this object. Callback must not be called if closed_ == true. Callback* callback_; + // The XmppClient and its state and jid. + buzz::XmppClient* client_; + State state_; Lock full_jid_lock_; std::string full_jid_; + // Current state of the object. + Lock state_lock_; // Must be locked when accessing initialized_ or closed_. + bool initialized_; + bool closed_; + scoped_ptr<Task> closed_task_; + scoped_ptr<talk_base::NetworkManager> network_manager_; scoped_ptr<cricket::BasicPortAllocator> port_allocator_; scoped_ptr<cricket::SessionManager> session_manager_; diff --git a/remoting/jingle_glue/jingle_client_unittest.cc b/remoting/jingle_glue/jingle_client_unittest.cc new file mode 100644 index 0000000..4d6d55f --- /dev/null +++ b/remoting/jingle_glue/jingle_client_unittest.cc @@ -0,0 +1,108 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/waitable_event.h" +#include "remoting/jingle_glue/jingle_client.h" +#include "remoting/jingle_glue/jingle_thread.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using testing::_; + +namespace remoting { + +class MockJingleClientCallback : public JingleClient::Callback { + public: + ~MockJingleClientCallback() { } + + MOCK_METHOD2(OnStateChange, void(JingleClient*, JingleClient::State)); + MOCK_METHOD3(OnAcceptConnection, bool( + JingleClient*, const std::string&, + JingleChannel::Callback**_callback)); + MOCK_METHOD2(OnNewConnection, void( + JingleClient*, scoped_refptr<JingleChannel>)); +}; + +class JingleClientTest : public testing::Test { + public: + virtual ~JingleClientTest() { } + + static void OnClosed(bool* called) { + *called = true; + } + + // A helper that calls OnConnectionStateChanged(). Need this because we want + // to call it on the jingle thread. + static void ChangeState(JingleClient* client, buzz::XmppEngine::State state, + base::WaitableEvent* done_event) { + client->OnConnectionStateChanged(state); + if (done_event) + done_event->Signal(); + } + + protected: + virtual void SetUp() { + client_ = new JingleClient(&thread_); + // Fake initialization + client_->initialized_ = true; + client_->callback_ = &callback_; + } + + JingleThread thread_; + scoped_refptr<JingleClient> client_; + MockJingleClientCallback callback_; +}; + +TEST_F(JingleClientTest, OnStateChanged) { + EXPECT_CALL(callback_, OnStateChange(_, JingleClient::CONNECTING)) + .Times(1); + + thread_.Start(); + + base::WaitableEvent state_changed_event(true, false); + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction( + &JingleClientTest::ChangeState, client_.get(), + buzz::XmppEngine::STATE_OPENING, &state_changed_event)); + state_changed_event.Wait(); + + client_->Close(); + + thread_.Stop(); +} + +TEST_F(JingleClientTest, Close) { + EXPECT_CALL(callback_, OnStateChange(_, _)) + .Times(0); + thread_.Start(); + client_->Close(); + // Verify that the channel doesn't call callback anymore. + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction( + &JingleClientTest::ChangeState, client_.get(), + buzz::XmppEngine::STATE_OPENING, + static_cast<base::WaitableEvent*>(NULL))); + thread_.Stop(); +} + +TEST_F(JingleClientTest, ClosedTask) { + thread_.Start(); + bool closed = false; + client_->Close(NewRunnableFunction(&JingleClientTest::OnClosed, + &closed)); + thread_.Stop(); + EXPECT_TRUE(closed); +} + +TEST_F(JingleClientTest, DoubleClose) { + thread_.Start(); + bool closed1 = false; + client_->Close(NewRunnableFunction(&JingleClientTest::OnClosed, + &closed1)); + bool closed2 = false; + client_->Close(NewRunnableFunction(&JingleClientTest::OnClosed, + &closed2)); + thread_.Stop(); + EXPECT_TRUE(closed1 && closed2); +} + +} // namespace remoting diff --git a/remoting/jingle_glue/jingle_test_client.cc b/remoting/jingle_glue/jingle_test_client.cc index 255a931..4157930b 100644 --- a/remoting/jingle_glue/jingle_test_client.cc +++ b/remoting/jingle_glue/jingle_test_client.cc @@ -14,6 +14,8 @@ extern "C" { #include <list> #include "base/at_exit.h" +#include "base/nss_util.h" +#include "base/time.h" #include "media/base/data_buffer.h" #include "remoting/base/constants.h" #include "remoting/jingle_glue/jingle_channel.h" @@ -25,13 +27,17 @@ using remoting::JingleChannel; using remoting::kChromotingTokenServiceName; class JingleTestClient : public JingleChannel::Callback, - public JingleClient::Callback { + public JingleClient::Callback, + public base::RefCountedThreadSafe<JingleTestClient> { public: + JingleTestClient() + : closed_event_(true, false) { + } + virtual ~JingleTestClient() {} void Run(const std::string& username, const std::string& auth_token, const std::string& host_jid) { - // TODO(hclam): Fix the threading problem. remoting::JingleThread jingle_thread; jingle_thread.Start(); client_ = new JingleClient(&jingle_thread); @@ -63,7 +69,12 @@ class JingleTestClient : public JingleChannel::Callback, } while (!channels_.empty()) { - channels_.front()->Close(); + closed_event_.Reset(); + channels_.front()->Close( + NewRunnableMethod(this, &JingleTestClient::OnClosed)); + // Wait until channel is closed. If it is not closed within 0.1 seconds + // continue closing everything else. + closed_event_.TimedWait(base::TimeDelta::FromMilliseconds(100)); channels_.pop_front(); } @@ -106,12 +117,17 @@ class JingleTestClient : public JingleChannel::Callback, channels_.push_back(channel); } + void OnClosed() { + closed_event_.Signal(); + } + private: typedef std::list<scoped_refptr<JingleChannel> > ChannelsList; scoped_refptr<JingleClient> client_; ChannelsList channels_; Lock channels_lock_; + base::WaitableEvent closed_event_; }; int main(int argc, char** argv) { @@ -120,6 +136,9 @@ int main(int argc, char** argv) { base::AtExitManager exit_manager; + base::EnsureNSPRInit(); + base::EnsureNSSInit(); + std::string host_jid = argc == 2 ? argv[1] : ""; std::string username; @@ -130,9 +149,9 @@ int main(int argc, char** argv) { std::cout << "Auth token: "; std::cin >> auth_token; - JingleTestClient client; + scoped_refptr<JingleTestClient> client = new JingleTestClient(); - client.Run(username, auth_token, host_jid); + client->Run(username, auth_token, host_jid); return 0; } diff --git a/remoting/jingle_glue/jingle_thread.cc b/remoting/jingle_glue/jingle_thread.cc index a00ce59..0037066 100644 --- a/remoting/jingle_glue/jingle_thread.cc +++ b/remoting/jingle_glue/jingle_thread.cc @@ -4,13 +4,41 @@ #include "remoting/jingle_glue/jingle_thread.h" +#include "base/basictypes.h" #include "base/logging.h" -#include "base/message_loop.h" +#include "base/message_pump.h" +#include "base/time.h" #include "third_party/libjingle/source/talk/base/ssladapter.h" namespace remoting { -const int kRunTasksMessageId = 1; +const uint32 kRunTasksMessageId = 1; +const uint32 kStopMessageId = 2; + +class JingleThread::JingleMessagePump : public base::MessagePump { + public: + JingleMessagePump(JingleThread* thread) : thread_(thread) { } + + virtual void Run(Delegate* delegate) { NOTIMPLEMENTED() ;} + virtual void Quit() { NOTIMPLEMENTED(); } + virtual void ScheduleWork() { + thread_->Post(thread_, kRunTasksMessageId); + } + virtual void ScheduleDelayedWork(const base::Time& time) { + NOTIMPLEMENTED(); + } + + private: + JingleThread* thread_; +}; + +class JingleThread::JingleMessageLoop : public MessageLoop { + public: + JingleMessageLoop(JingleThread* thread) + : MessageLoop(MessageLoop::TYPE_IO) { + pump_ = new JingleMessagePump(thread); + } +}; TaskPump::TaskPump() { } @@ -30,6 +58,7 @@ void TaskPump::OnMessage(talk_base::Message* pmsg) { JingleThread::JingleThread() : task_pump_(NULL), started_event_(true, false), + stopped_event_(true, false), message_loop_(NULL) { } @@ -41,7 +70,7 @@ void JingleThread::Start() { } void JingleThread::Run() { - MessageLoopForIO message_loop; + JingleMessageLoop message_loop(this); message_loop_ = &message_loop; TaskPump task_pump; @@ -50,28 +79,51 @@ void JingleThread::Run() { // Signal after we've initialized |message_loop_| and |task_pump_|. started_event_.Signal(); - Post(this, kRunTasksMessageId); - Thread::Run(); - message_loop.RunAllPending(); + stopped_event_.Signal(); task_pump_ = NULL; message_loop_ = NULL; } -// This method is called every 20ms to process tasks from |message_loop_| -// on this thread. -// TODO(sergeyu): Remove it when JingleThread moved to Chromium's base::Thread. -void JingleThread::PumpAuxiliaryLoops() { - MessageLoop::current()->RunAllPending(); +void JingleThread::Stop() { + // Shutdown gracefully: make sure that we excute all messages left in the + // queue before exiting. Thread::Stop() would not do that. + Post(this, kStopMessageId); + stopped_event_.Wait(); +} + +MessageLoop* JingleThread::message_loop() { + return message_loop_; +} - // Schedule next execution 20ms from now. - PostDelayed(20, this, kRunTasksMessageId); + // Returns task pump if the thread is running, otherwise NULL is returned. +TaskPump* JingleThread::task_pump() { + return task_pump_; } void JingleThread::OnMessage(talk_base::Message* msg) { - PumpAuxiliaryLoops(); + if (msg->message_id == kRunTasksMessageId) { + // This code is executed whenever we get new message in |message_loop_|. + // JingleMessagePump posts new tasks in the jingle thread. + // TODO(sergeyu): Remove it when JingleThread moved on Chromium's + // base::Thread. + base::MessagePump::Delegate* delegate = message_loop_; + // Loop until we run out of work. + while (true) { + if (!delegate->DoWork()) + break; + } + } else if (msg->message_id == kStopMessageId) { + // Stop the thread only if there are no more messages left in the queue, + // otherwise post another task to try again later. + if (msgq_.size() > 0 || fPeekKeep_) { + Post(this, kStopMessageId); + } else { + MessageQueue::Quit(); + } + } } } // namespace remoting diff --git a/remoting/jingle_glue/jingle_thread.h b/remoting/jingle_glue/jingle_thread.h index 2aa23f3..139b053 100644 --- a/remoting/jingle_glue/jingle_thread.h +++ b/remoting/jingle_glue/jingle_thread.h @@ -5,14 +5,13 @@ #ifndef REMOTING_JINGLE_GLUE_JINGLE_THREAD_H_ #define REMOTING_JINGLE_GLUE_JINGLE_THREAD_H_ +#include "base/message_loop.h" #include "base/tracked_objects.h" #include "base/waitable_event.h" #include "third_party/libjingle/source/talk/base/messagequeue.h" #include "third_party/libjingle/source/talk/base/taskrunner.h" #include "third_party/libjingle/source/talk/base/thread.h" -class MessageLoop; - namespace buzz { class XmppClient; } @@ -35,7 +34,7 @@ class TaskPump : public talk_base::MessageHandler, // TODO(sergeyu): This class should be changed to inherit from Chromiums // base::Thread instead of libjingle's thread. class JingleThread : public talk_base::Thread, - private talk_base::MessageHandler { + public talk_base::MessageHandler { public: JingleThread(); virtual ~JingleThread(); @@ -45,23 +44,28 @@ class JingleThread : public talk_base::Thread, // Main function for the thread. Should not be called directly. void Run(); + // Stop the thread. + void Stop(); + // Returns Chromiums message loop for this thread. // TODO(sergeyu): remove this method when we use base::Thread instead of // talk_base::Thread - MessageLoop* message_loop() { return message_loop_; } + MessageLoop* message_loop(); // Returns task pump if the thread is running, otherwise NULL is returned. - TaskPump* task_pump() { return task_pump_; } + TaskPump* task_pump(); private: + class JingleMessageLoop; + class JingleMessagePump; + friend class HeartbeatSenderTest; virtual void OnMessage(talk_base::Message* msg); - void PumpAuxiliaryLoops(); - TaskPump* task_pump_; base::WaitableEvent started_event_; + base::WaitableEvent stopped_event_; MessageLoop* message_loop_; DISALLOW_COPY_AND_ASSIGN(JingleThread); diff --git a/remoting/remoting.gyp b/remoting/remoting.gyp index eed4d58..9754ddf2 100644 --- a/remoting/remoting.gyp +++ b/remoting/remoting.gyp @@ -370,8 +370,9 @@ 'host/mock_objects.h', 'host/session_manager_unittest.cc', 'host/test_key_pair.h', - 'jingle_glue/jingle_thread_unittest.cc', + 'jingle_glue/jingle_client_unittest.cc', 'jingle_glue/jingle_channel_unittest.cc', + 'jingle_glue/jingle_thread_unittest.cc', 'jingle_glue/iq_request_unittest.cc', 'jingle_glue/mock_objects.h', 'run_all_unittests.cc', |