summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--remoting/client/chromoting_client.cc3
-rw-r--r--remoting/jingle_glue/jingle_channel.cc65
-rw-r--r--remoting/jingle_glue/jingle_channel.h34
-rw-r--r--remoting/jingle_glue/jingle_channel_unittest.cc164
-rw-r--r--remoting/jingle_glue/jingle_client.cc201
-rw-r--r--remoting/jingle_glue/jingle_client.h25
-rw-r--r--remoting/jingle_glue/jingle_client_unittest.cc108
-rw-r--r--remoting/jingle_glue/jingle_test_client.cc29
-rw-r--r--remoting/jingle_glue/jingle_thread.cc80
-rw-r--r--remoting/jingle_glue/jingle_thread.h18
-rw-r--r--remoting/remoting.gyp3
11 files changed, 525 insertions, 205 deletions
diff --git a/remoting/client/chromoting_client.cc b/remoting/client/chromoting_client.cc
index 9226186..a9cf5ba 100644
--- a/remoting/client/chromoting_client.cc
+++ b/remoting/client/chromoting_client.cc
@@ -60,9 +60,6 @@ void ChromotingClient::Stop() {
connection_->Disconnect();
view_->TearDown();
-
- // Quit the current message loop.
- message_loop()->PostTask(FROM_HERE, new MessageLoop::QuitTask());
}
void ChromotingClient::ClientDone() {
diff --git a/remoting/jingle_glue/jingle_channel.cc b/remoting/jingle_glue/jingle_channel.cc
index 0ff634c..9dc545f 100644
--- a/remoting/jingle_glue/jingle_channel.cc
+++ b/remoting/jingle_glue/jingle_channel.cc
@@ -22,6 +22,7 @@ const size_t kReadBufferSize = 4096;
JingleChannel::JingleChannel(Callback* callback)
: state_(INITIALIZING),
callback_(callback),
+ closed_(false),
event_handler_(this),
write_buffer_size_(0),
current_write_buf_pos_(0) {
@@ -31,12 +32,13 @@ JingleChannel::JingleChannel(Callback* callback)
// This constructor is only used in unit test.
JingleChannel::JingleChannel()
: state_(CLOSED),
+ closed_(false),
write_buffer_size_(0),
current_write_buf_pos_(0) {
}
JingleChannel::~JingleChannel() {
- DCHECK_EQ(CLOSED, state_);
+ DCHECK(closed_ || stream_ == NULL);
}
void JingleChannel::Init(JingleThread* thread,
@@ -101,7 +103,12 @@ void JingleChannel::DoRead() {
case talk_base::SR_SUCCESS: {
DCHECK_GT(bytes_read, 0U);
buffer->SetDataSize(bytes_read);
- callback_->OnPacketReceived(this, buffer);
+ {
+ AutoLock auto_lock(state_lock_);
+ // Drop received data if the channel is already closed.
+ if (!closed_)
+ callback_->OnPacketReceived(this, buffer);
+ }
break;
}
case talk_base::SR_BLOCK: {
@@ -177,25 +184,53 @@ void JingleChannel::OnStreamEvent(talk_base::StreamInterface* stream,
SetState(CLOSED);
}
-void JingleChannel::SetState(State state) {
- if (state == state_)
- return;
- state_ = state;
- callback_->OnStateChange(this, state);
+void JingleChannel::SetState(State new_state) {
+ if (new_state != state_) {
+ state_ = new_state;
+ {
+ AutoLock auto_lock(state_lock_);
+ if (!closed_)
+ callback_->OnStateChange(this, new_state);
+ }
+ }
}
void JingleChannel::Close() {
- base::WaitableEvent event(true, false);
+ Close(NULL);
+}
+
+void JingleChannel::Close(Task* closed_task) {
+ {
+ AutoLock auto_lock(state_lock_);
+ if (closed_) {
+ // We are already closed.
+ if (closed_task)
+ thread_->message_loop()->PostTask(FROM_HERE, closed_task);
+ return;
+ }
+ closed_ = true;
+ if (closed_task)
+ closed_task_.reset(closed_task);
+ }
+
thread_->message_loop()->PostTask(
- FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoClose, &event));
- event.Wait();
+ FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoClose));
}
-void JingleChannel::DoClose(base::WaitableEvent* done_event) {
- if (stream_.get())
- stream_->Close();
- SetState(CLOSED);
- done_event->Signal();
+
+void JingleChannel::DoClose() {
+ DCHECK(closed_);
+ stream_->Close();
+ stream_.reset();
+
+ // TODO(sergeyu): Even though we have called Close() for the stream, it
+ // doesn't mean that the p2p sessions has been closed. I.e. |closed_task_|
+ // is called too early. If the client is closed right after that the other
+ // side will not receive notification that the channel was closed.
+ if (closed_task_.get()) {
+ closed_task_->Run();
+ closed_task_.reset();
+ }
}
size_t JingleChannel::write_buffer_size() {
diff --git a/remoting/jingle_glue/jingle_channel.h b/remoting/jingle_glue/jingle_channel.h
index 0ba1a75..ee53760 100644
--- a/remoting/jingle_glue/jingle_channel.h
+++ b/remoting/jingle_glue/jingle_channel.h
@@ -14,6 +14,7 @@
#include "base/lock.h"
#include "base/ref_counted.h"
#include "base/scoped_ptr.h"
+#include "base/task.h"
#include "third_party/libjingle/source/talk/base/sigslot.h"
namespace base {
@@ -59,8 +60,10 @@ class JingleChannel : public base::RefCountedThreadSafe<JingleChannel> {
// Puts data to the write buffer.
virtual void Write(scoped_refptr<media::DataBuffer> data);
- // Closes the tunnel.
+ // Closes the tunnel. If specified, |closed_task| is executed after the
+ // connection is successfully closed.
virtual void Close();
+ virtual void Close(Task* closed_task);
// Current state of the tunnel.
State state() const { return state_; }
@@ -87,17 +90,12 @@ class JingleChannel : public base::RefCountedThreadSafe<JingleChannel> {
// caller. Ownership of |thread| is not.
void Init(JingleThread* thread, talk_base::StreamInterface* stream,
const std::string& jid);
- void SetState(State state);
-
- JingleThread* thread_;
- scoped_ptr<talk_base::StreamInterface> stream_;
- State state_;
private:
+ friend class JingleChannelTest;
FRIEND_TEST_ALL_PREFIXES(JingleChannelTest, Init);
FRIEND_TEST_ALL_PREFIXES(JingleChannelTest, Write);
FRIEND_TEST_ALL_PREFIXES(JingleChannelTest, Read);
- FRIEND_TEST_ALL_PREFIXES(JingleChannelTest, Close);
typedef std::deque<scoped_refptr<media::DataBuffer> > DataQueue;
@@ -132,11 +130,31 @@ class JingleChannel : public base::RefCountedThreadSafe<JingleChannel> {
// Called from OnStreamEvent() in the jingle thread.
void DoRead();
- void DoClose(base::WaitableEvent* done_event);
+ // Used by Close() to actually close the channel.
+ void DoClose();
+
+ // Updates state and calels |callback_| if neccessary.
+ void SetState(State new_state);
+
+ // The thread this channel runs on.
+ JingleThread* thread_;
+
+ // The stream of this channel.
+ scoped_ptr<talk_base::StreamInterface> stream_;
+
+ // Current state of the channel.
+ State state_;
// Callback that is called on channel events. Initialized in the constructor.
+ // Must not be called if |closed_| is set to true.
Callback* callback_;
+ // |closed_| must be set to true after Close() is called. |state_lock_| must
+ // be locked whenever closed_ is accessed.
+ Lock state_lock_;
+ bool closed_;
+ scoped_ptr<Task> closed_task_;
+
// Event handler for stream events.
EventHandler event_handler_;
diff --git a/remoting/jingle_glue/jingle_channel_unittest.cc b/remoting/jingle_glue/jingle_channel_unittest.cc
index 94bb137..175d69c 100644
--- a/remoting/jingle_glue/jingle_channel_unittest.cc
+++ b/remoting/jingle_glue/jingle_channel_unittest.cc
@@ -3,6 +3,7 @@
// found in the LICENSE file.
#include "base/ref_counted.h"
+#include "base/waitable_event.h"
#include "media/base/data_buffer.h"
#include "remoting/jingle_glue/jingle_channel.h"
#include "remoting/jingle_glue/jingle_thread.h"
@@ -22,7 +23,7 @@ namespace {
const size_t kBufferSize = 100;
} // namespace
-class MockCallback : public JingleChannel::Callback {
+class MockJingleChannelCallback : public JingleChannel::Callback {
public:
MOCK_METHOD2(OnStateChange, void(JingleChannel*, JingleChannel::State));
MOCK_METHOD2(OnPacketReceived, void(JingleChannel*,
@@ -44,107 +45,146 @@ class MockStream : public talk_base::StreamInterface {
MOCK_METHOD2(PostEvent, void(int, int));
};
-TEST(JingleChannelTest, Init) {
- JingleThread thread;
-
- MockStream* stream = new MockStream();
- MockCallback callback;
+class JingleChannelTest : public testing::Test {
+ public:
+ virtual ~JingleChannelTest() { }
+
+ // A helper that calls OnStreamEvent(). Need this because we want
+ // to call it on the jingle thread.
+ static void StreamEvent(JingleChannel* channel,
+ talk_base::StreamInterface* stream,
+ int event, int error,
+ base::WaitableEvent* done_event) {
+ channel->OnStreamEvent(stream, event, error);
+ if (done_event)
+ done_event->Signal();
+ }
+
+ static void OnClosed(bool* called) {
+ *called = true;
+ }
+
+ protected:
+ virtual void SetUp() {
+ stream_ = new MockStream(); // Freed by the channel.
+ channel_ = new JingleChannel(&callback_);
+ channel_->thread_ = &thread_;
+ channel_->stream_.reset(stream_);
+ }
+
+ JingleThread thread_;
+ MockStream* stream_;
+ MockJingleChannelCallback callback_;
+ scoped_refptr<JingleChannel> channel_;
+};
- EXPECT_CALL(*stream, GetState())
+TEST_F(JingleChannelTest, Init) {
+ EXPECT_CALL(*stream_, GetState())
.Times(1)
.WillRepeatedly(Return(talk_base::SS_OPENING));
- scoped_refptr<JingleChannel> channel = new JingleChannel(&callback);
-
- EXPECT_CALL(callback, OnStateChange(channel.get(), JingleChannel::CONNECTING))
+ EXPECT_CALL(callback_, OnStateChange(channel_.get(),
+ JingleChannel::CONNECTING))
.Times(1);
- thread.Start();
+ thread_.Start();
- EXPECT_EQ(JingleChannel::INITIALIZING, channel->state());
- channel->Init(&thread, stream, "user@domain.com");
- EXPECT_EQ(JingleChannel::CONNECTING, channel->state());
- channel->state_ = JingleChannel::CLOSED;
+ EXPECT_EQ(JingleChannel::INITIALIZING, channel_->state());
+ channel_->Init(&thread_, stream_, "user@domain.com");
+ EXPECT_EQ(JingleChannel::CONNECTING, channel_->state());
+ channel_->closed_ = true;
- thread.Stop();
+ thread_.Stop();
}
-TEST(JingleChannelTest, Write) {
- JingleThread thread;
- MockStream* stream = new MockStream(); // Freed by the channel.
- MockCallback callback;
-
+TEST_F(JingleChannelTest, Write) {
scoped_refptr<media::DataBuffer> data = new media::DataBuffer(kBufferSize);
data->SetDataSize(kBufferSize);
- EXPECT_CALL(*stream, Write(static_cast<const void*>(data->GetData()),
+ EXPECT_CALL(*stream_, Write(static_cast<const void*>(data->GetData()),
kBufferSize, _, _))
.WillOnce(DoAll(SetArgumentPointee<2>(kBufferSize),
Return(talk_base::SR_SUCCESS)));
- scoped_refptr<JingleChannel> channel = new JingleChannel(&callback);
-
- channel->thread_ = &thread;
- channel->stream_.reset(stream);
- channel->state_ = JingleChannel::OPEN;
- thread.Start();
- channel->Write(data);
- thread.Stop();
- channel->state_ = JingleChannel::CLOSED;
+ channel_->state_ = JingleChannel::OPEN;
+ thread_.Start();
+ channel_->Write(data);
+ thread_.Stop();
+ channel_->closed_ = true;
}
-TEST(JingleChannelTest, Read) {
- JingleThread thread;
- MockStream* stream = new MockStream(); // Freed by the channel.
- MockCallback callback;
-
+TEST_F(JingleChannelTest, Read) {
scoped_refptr<media::DataBuffer> data = new media::DataBuffer(kBufferSize);
data->SetDataSize(kBufferSize);
- scoped_refptr<JingleChannel> channel = new JingleChannel(&callback);
-
- EXPECT_CALL(callback, OnPacketReceived(channel.get(), _))
+ EXPECT_CALL(callback_, OnPacketReceived(channel_.get(), _))
.Times(1);
- EXPECT_CALL(*stream, GetAvailable(_))
+ EXPECT_CALL(*stream_, GetAvailable(_))
.WillOnce(DoAll(SetArgumentPointee<0>(kBufferSize),
Return(true)))
.WillOnce(DoAll(SetArgumentPointee<0>(0),
Return(true)));
- EXPECT_CALL(*stream, Read(_, kBufferSize, _, _))
+ EXPECT_CALL(*stream_, Read(_, kBufferSize, _, _))
.WillOnce(DoAll(SetArgumentPointee<2>(kBufferSize),
Return(talk_base::SR_SUCCESS)));
- channel->thread_ = &thread;
- channel->stream_.reset(stream);
- channel->state_ = JingleChannel::OPEN;
- thread.Start();
- channel->OnStreamEvent(stream, talk_base::SE_READ, 0);
- thread.Stop();
- channel->state_ = JingleChannel::CLOSED;
+ channel_->state_ = JingleChannel::OPEN;
+ thread_.Start();
+
+ base::WaitableEvent done_event(true, false);
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction(
+ &JingleChannelTest::StreamEvent, channel_.get(), stream_,
+ talk_base::SE_READ, 0, &done_event));
+ done_event.Wait();
+
+ thread_.Stop();
+
+ channel_->closed_ = true;
}
-TEST(JingleChannelTest, Close) {
- JingleThread thread;
- MockStream* stream = new MockStream(); // Freed by the channel.
- MockCallback callback;
+TEST_F(JingleChannelTest, Close) {
+ EXPECT_CALL(*stream_, Close()).Times(1);
+ // Don't expect any calls except Close().
+ EXPECT_CALL(*stream_, GetAvailable(_)).Times(0);
+ EXPECT_CALL(*stream_, Read(_, _, _, _)).Times(0);
+ EXPECT_CALL(callback_, OnPacketReceived(_, _)).Times(0);
+
+ thread_.Start();
+ channel_->Close();
+ // Verify that the channel doesn't call callback anymore.
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction(
+ &JingleChannelTest::StreamEvent, channel_.get(), stream_,
+ talk_base::SE_READ, 0, static_cast<base::WaitableEvent*>(NULL)));
+ thread_.Stop();
+}
- EXPECT_CALL(*stream, Close())
+TEST_F(JingleChannelTest, ClosedTask) {
+ EXPECT_CALL(*stream_, Close())
.Times(1);
- scoped_refptr<JingleChannel> channel = new JingleChannel(&callback);
-
- channel->thread_ = &thread;
- channel->stream_.reset(stream);
- channel->state_ = JingleChannel::OPEN;
+ thread_.Start();
+ bool closed = false;
+ channel_->Close(NewRunnableFunction(&JingleChannelTest::OnClosed,
+ &closed));
+ thread_.Stop();
+ EXPECT_TRUE(closed);
+}
- EXPECT_CALL(callback, OnStateChange(channel.get(), JingleChannel::CLOSED))
+TEST_F(JingleChannelTest, DoubleClose) {
+ EXPECT_CALL(*stream_, Close())
.Times(1);
- thread.Start();
- channel->Close();
- thread.Stop();
+ thread_.Start();
+ bool closed1 = false;
+ channel_->Close(NewRunnableFunction(&JingleChannelTest::OnClosed,
+ &closed1));
+ bool closed2 = false;
+ channel_->Close(NewRunnableFunction(&JingleChannelTest::OnClosed,
+ &closed2));
+ thread_.Stop();
+ EXPECT_TRUE(closed1 && closed2);
}
} // namespace remoting
diff --git a/remoting/jingle_glue/jingle_client.cc b/remoting/jingle_glue/jingle_client.cc
index 938664e..01e607b 100644
--- a/remoting/jingle_glue/jingle_client.cc
+++ b/remoting/jingle_glue/jingle_client.cc
@@ -2,9 +2,6 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-// TODO(ajwong): We assign and read from a few of the member variables on
-// two threads. We need to audit this for thread safety.
-
#include "remoting/jingle_glue/jingle_client.h"
#include "base/logging.h"
@@ -17,92 +14,64 @@
#include "third_party/libjingle/source/talk/base/asyncsocket.h"
#include "third_party/libjingle/source/talk/base/ssladapter.h"
#include "third_party/libjingle/source/talk/p2p/base/sessionmanager.h"
+#include "third_party/libjingle/source/talk/p2p/base/transport.h"
#include "third_party/libjingle/source/talk/p2p/client/sessionmanagertask.h"
-#ifdef USE_SSL_TUNNEL
-#include "third_party/libjingle/source/talk/session/tunnel/securetunnelsessionclient.h"
-#endif
#include "third_party/libjingle/source/talk/session/tunnel/tunnelsessionclient.h"
#include "third_party/libjingle/source/talk/xmpp/prexmppauth.h"
#include "third_party/libjingle/source/talk/xmpp/saslcookiemechanism.h"
namespace remoting {
+namespace {
+// A TunnelSessionClient that allows local connections.
+class LocalTunnelSessionClient : public cricket::TunnelSessionClient {
+ public:
+ LocalTunnelSessionClient(const buzz::Jid& jid,
+ cricket::SessionManager* manager)
+ : TunnelSessionClient(jid, manager) {
+ }
+
+ protected:
+ virtual cricket::TunnelSession* MakeTunnelSession(
+ cricket::Session* session, talk_base::Thread* stream_thread,
+ cricket::TunnelSessionRole role) {
+ session->transport()->set_allow_local_ips(true);
+ return new cricket::TunnelSession(this, session, stream_thread);
+ }
+};
+} // namespace
+
JingleClient::JingleClient(JingleThread* thread)
- : client_(NULL),
- thread_(thread),
- state_(CREATED),
- callback_(NULL) {
+ : thread_(thread),
+ callback_(NULL),
+ client_(NULL),
+ state_(START),
+ initialized_(false),
+ closed_(false) {
}
JingleClient::~JingleClient() {
- // JingleClient can be destroyed only after it's closed.
- DCHECK(state_ == CLOSED || state_ == CREATED);
+ AutoLock auto_lock(state_lock_);
+ DCHECK(!initialized_ || closed_);
}
void JingleClient::Init(
const std::string& username, const std::string& auth_token,
const std::string& auth_token_service, Callback* callback) {
DCHECK_NE(username, "");
- DCHECK(callback != NULL);
- DCHECK(state_ == CREATED);
-
- callback_ = callback;
- message_loop()->PostTask(
- FROM_HERE, NewRunnableMethod(this, &JingleClient::DoInitialize,
- username, auth_token, auth_token_service));
- state_ = INITIALIZED;
-}
-
-JingleChannel* JingleClient::Connect(const std::string& host_jid,
- JingleChannel::Callback* callback) {
- // Ownership if channel is given to DoConnect.
- scoped_refptr<JingleChannel> channel = new JingleChannel(callback);
- message_loop()->PostTask(
- FROM_HERE, NewRunnableMethod(this, &JingleClient::DoConnect,
- channel, host_jid, callback));
- return channel;
-}
-
-void JingleClient::DoConnect(scoped_refptr<JingleChannel> channel,
- const std::string& host_jid,
- JingleChannel::Callback* callback) {
- DCHECK_EQ(message_loop(), MessageLoop::current());
-
- talk_base::StreamInterface* stream =
- tunnel_session_client_->CreateTunnel(buzz::Jid(host_jid), "");
- DCHECK(stream != NULL);
- channel->Init(thread_, stream, host_jid);
-}
+ {
+ AutoLock auto_lock(state_lock_);
+ DCHECK(!initialized_ && !closed_);
+ initialized_ = true;
-void JingleClient::Close() {
- // Once we are closed we really shouldn't talk to the callback again. In the
- // case when JingleClient outlives the owner access the callback is not safe.
- // TODO(hclam): We need to lock to reset callback.
- callback_ = NULL;
+ DCHECK(callback != NULL);
+ callback_ = callback;
+ }
message_loop()->PostTask(
- FROM_HERE, NewRunnableMethod(this, &JingleClient::DoClose));
-}
-
-void JingleClient::DoClose() {
- DCHECK_EQ(message_loop(), MessageLoop::current());
-
- // If we have not yet initialized and the client is already closed then
- // don't close again.
- if (state_ == CLOSED)
- return;
-
- if (client_) {
- client_->Disconnect();
- // Client is deleted by TaskRunner.
- client_ = NULL;
- }
- tunnel_session_client_.reset();
- port_allocator_.reset();
- session_manager_.reset();
- network_manager_.reset();
- UpdateState(CLOSED);
+ FROM_HERE, NewRunnableMethod(this, &JingleClient::DoInitialize,
+ username, auth_token, auth_token_service));
}
void JingleClient::DoInitialize(const std::string& username,
@@ -138,18 +107,10 @@ void JingleClient::DoInitialize(const std::string& username,
port_allocator->SetJingleInfo(client_);
session_manager_.reset(new cricket::SessionManager(port_allocator_.get()));
-#ifdef USE_SSL_TUNNEL
- cricket::SecureTunnelSessionClient* session_client =
- new cricket::SecureTunnelSessionClient(client_->jid(),
- session_manager_.get());
- if (!session_client->GenerateIdentity())
- return false;
- tunnel_session_client_.reset(session_client);
-#else // !USE_SSL_TUNNEL
+
tunnel_session_client_.reset(
- new cricket::TunnelSessionClient(client_->jid(),
- session_manager_.get()));
-#endif // USE_SSL_TUNNEL
+ new LocalTunnelSessionClient(client_->jid(),
+ session_manager_.get()));
cricket::SessionManagerTask* receiver =
new cricket::SessionManagerTask(client_, session_manager_.get());
@@ -160,6 +121,72 @@ void JingleClient::DoInitialize(const std::string& username,
this, &JingleClient::OnIncomingTunnel);
}
+JingleChannel* JingleClient::Connect(const std::string& host_jid,
+ JingleChannel::Callback* callback) {
+ DCHECK(initialized_ && !closed_);
+
+ // Ownership if channel is given to DoConnect.
+ scoped_refptr<JingleChannel> channel = new JingleChannel(callback);
+ message_loop()->PostTask(
+ FROM_HERE, NewRunnableMethod(this, &JingleClient::DoConnect,
+ channel, host_jid, callback));
+ return channel;
+}
+
+void JingleClient::DoConnect(scoped_refptr<JingleChannel> channel,
+ const std::string& host_jid,
+ JingleChannel::Callback* callback) {
+ DCHECK_EQ(message_loop(), MessageLoop::current());
+
+ talk_base::StreamInterface* stream =
+ tunnel_session_client_->CreateTunnel(buzz::Jid(host_jid), "");
+ DCHECK(stream != NULL);
+
+ channel->Init(thread_, stream, host_jid);
+}
+
+void JingleClient::Close() {
+ Close(NULL);
+}
+
+void JingleClient::Close(Task* closed_task) {
+ {
+ AutoLock auto_lock(state_lock_);
+ // If the client is already closed then don't close again.
+ if (closed_) {
+ if (closed_task)
+ thread_->message_loop()->PostTask(FROM_HERE, closed_task);
+ return;
+ }
+ closed_task_.reset(closed_task);
+ closed_ = true;
+ }
+
+ message_loop()->PostTask(
+ FROM_HERE, NewRunnableMethod(this, &JingleClient::DoClose));
+}
+
+void JingleClient::DoClose() {
+ DCHECK_EQ(message_loop(), MessageLoop::current());
+ DCHECK(closed_);
+
+ tunnel_session_client_.reset();
+ session_manager_.reset();
+ port_allocator_.reset();
+ network_manager_.reset();
+
+ if (client_) {
+ client_->Disconnect();
+ // Client is deleted by TaskRunner.
+ client_ = NULL;
+ }
+
+ if (closed_task_.get()) {
+ closed_task_->Run();
+ closed_task_.reset();
+ }
+}
+
std::string JingleClient::GetFullJid() {
AutoLock auto_lock(full_jid_lock_);
return full_jid_;
@@ -176,7 +203,7 @@ MessageLoop* JingleClient::message_loop() {
void JingleClient::OnConnectionStateChanged(buzz::XmppEngine::State state) {
switch (state) {
case buzz::XmppEngine::STATE_START:
- UpdateState(INITIALIZED);
+ UpdateState(START);
break;
case buzz::XmppEngine::STATE_OPENING:
UpdateState(CONNECTING);
@@ -200,8 +227,11 @@ void JingleClient::OnConnectionStateChanged(buzz::XmppEngine::State state) {
void JingleClient::OnIncomingTunnel(
cricket::TunnelSessionClient* client, buzz::Jid jid,
std::string description, cricket::Session* session) {
- // Decline connection if we don't have callback.
- if (!callback_) {
+ // Must always lock state and check closed_ when calling callback.
+ AutoLock auto_lock(state_lock_);
+
+ if (closed_) {
+ // Always reject connection if we are closed.
client->DeclineTunnel(session);
return;
}
@@ -221,8 +251,11 @@ void JingleClient::OnIncomingTunnel(
void JingleClient::UpdateState(State new_state) {
if (new_state != state_) {
state_ = new_state;
- if (callback_)
- callback_->OnStateChange(this, new_state);
+ {
+ AutoLock auto_lock(state_lock_);
+ if (!closed_)
+ callback_->OnStateChange(this, new_state);
+ }
}
}
diff --git a/remoting/jingle_glue/jingle_client.h b/remoting/jingle_glue/jingle_client.h
index bac15d8..9d7b844 100644
--- a/remoting/jingle_glue/jingle_client.h
+++ b/remoting/jingle_glue/jingle_client.h
@@ -7,6 +7,7 @@
#include <string>
+#include "base/waitable_event.h"
#include "remoting/jingle_glue/jingle_channel.h"
#include "third_party/libjingle/source/talk/xmpp/xmppclient.h"
@@ -36,8 +37,7 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>,
public sigslot::has_slots<> {
public:
enum State {
- CREATED, // Initial state.
- INITIALIZED,
+ START, // Initial state.
CONNECTING,
CONNECTED,
CLOSED,
@@ -84,8 +84,10 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>,
JingleChannel::Callback* callback);
// Closes XMPP connection and stops the thread. Must be called before the
- // object is destroyed.
+ // object is destroyed. If specified, |closed_task| is executed after the
+ // connection is successfully closed.
void Close();
+ void Close(Task* closed_task);
// Returns JID with resource ID. Empty string is returned if full JID is not
// known yet, i.e. authentication hasn't finished.
@@ -95,7 +97,7 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>,
// is transfered to the caller.
virtual IqRequest* CreateIqRequest();
- // Current state of the client.
+ // Current connection state of the client.
State state() { return state_; }
// Returns XmppClient object for the xmpp connection or NULL if not connected.
@@ -106,6 +108,7 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>,
private:
friend class HeartbeatSenderTest;
+ friend class JingleClientTest;
void OnConnectionStateChanged(buzz::XmppEngine::State state);
@@ -131,14 +134,24 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>,
buzz::PreXmppAuth* CreatePreXmppAuth(
const buzz::XmppClientSettings& settings);
- buzz::XmppClient* client_;
+ // JingleThread used for the connection. Set in the constructor.
JingleThread* thread_;
- State state_;
+
+ // Callback for this object. Callback must not be called if closed_ == true.
Callback* callback_;
+ // The XmppClient and its state and jid.
+ buzz::XmppClient* client_;
+ State state_;
Lock full_jid_lock_;
std::string full_jid_;
+ // Current state of the object.
+ Lock state_lock_; // Must be locked when accessing initialized_ or closed_.
+ bool initialized_;
+ bool closed_;
+ scoped_ptr<Task> closed_task_;
+
scoped_ptr<talk_base::NetworkManager> network_manager_;
scoped_ptr<cricket::BasicPortAllocator> port_allocator_;
scoped_ptr<cricket::SessionManager> session_manager_;
diff --git a/remoting/jingle_glue/jingle_client_unittest.cc b/remoting/jingle_glue/jingle_client_unittest.cc
new file mode 100644
index 0000000..4d6d55f
--- /dev/null
+++ b/remoting/jingle_glue/jingle_client_unittest.cc
@@ -0,0 +1,108 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/waitable_event.h"
+#include "remoting/jingle_glue/jingle_client.h"
+#include "remoting/jingle_glue/jingle_thread.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using testing::_;
+
+namespace remoting {
+
+class MockJingleClientCallback : public JingleClient::Callback {
+ public:
+ ~MockJingleClientCallback() { }
+
+ MOCK_METHOD2(OnStateChange, void(JingleClient*, JingleClient::State));
+ MOCK_METHOD3(OnAcceptConnection, bool(
+ JingleClient*, const std::string&,
+ JingleChannel::Callback**_callback));
+ MOCK_METHOD2(OnNewConnection, void(
+ JingleClient*, scoped_refptr<JingleChannel>));
+};
+
+class JingleClientTest : public testing::Test {
+ public:
+ virtual ~JingleClientTest() { }
+
+ static void OnClosed(bool* called) {
+ *called = true;
+ }
+
+ // A helper that calls OnConnectionStateChanged(). Need this because we want
+ // to call it on the jingle thread.
+ static void ChangeState(JingleClient* client, buzz::XmppEngine::State state,
+ base::WaitableEvent* done_event) {
+ client->OnConnectionStateChanged(state);
+ if (done_event)
+ done_event->Signal();
+ }
+
+ protected:
+ virtual void SetUp() {
+ client_ = new JingleClient(&thread_);
+ // Fake initialization
+ client_->initialized_ = true;
+ client_->callback_ = &callback_;
+ }
+
+ JingleThread thread_;
+ scoped_refptr<JingleClient> client_;
+ MockJingleClientCallback callback_;
+};
+
+TEST_F(JingleClientTest, OnStateChanged) {
+ EXPECT_CALL(callback_, OnStateChange(_, JingleClient::CONNECTING))
+ .Times(1);
+
+ thread_.Start();
+
+ base::WaitableEvent state_changed_event(true, false);
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction(
+ &JingleClientTest::ChangeState, client_.get(),
+ buzz::XmppEngine::STATE_OPENING, &state_changed_event));
+ state_changed_event.Wait();
+
+ client_->Close();
+
+ thread_.Stop();
+}
+
+TEST_F(JingleClientTest, Close) {
+ EXPECT_CALL(callback_, OnStateChange(_, _))
+ .Times(0);
+ thread_.Start();
+ client_->Close();
+ // Verify that the channel doesn't call callback anymore.
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction(
+ &JingleClientTest::ChangeState, client_.get(),
+ buzz::XmppEngine::STATE_OPENING,
+ static_cast<base::WaitableEvent*>(NULL)));
+ thread_.Stop();
+}
+
+TEST_F(JingleClientTest, ClosedTask) {
+ thread_.Start();
+ bool closed = false;
+ client_->Close(NewRunnableFunction(&JingleClientTest::OnClosed,
+ &closed));
+ thread_.Stop();
+ EXPECT_TRUE(closed);
+}
+
+TEST_F(JingleClientTest, DoubleClose) {
+ thread_.Start();
+ bool closed1 = false;
+ client_->Close(NewRunnableFunction(&JingleClientTest::OnClosed,
+ &closed1));
+ bool closed2 = false;
+ client_->Close(NewRunnableFunction(&JingleClientTest::OnClosed,
+ &closed2));
+ thread_.Stop();
+ EXPECT_TRUE(closed1 && closed2);
+}
+
+} // namespace remoting
diff --git a/remoting/jingle_glue/jingle_test_client.cc b/remoting/jingle_glue/jingle_test_client.cc
index 255a931..4157930b 100644
--- a/remoting/jingle_glue/jingle_test_client.cc
+++ b/remoting/jingle_glue/jingle_test_client.cc
@@ -14,6 +14,8 @@ extern "C" {
#include <list>
#include "base/at_exit.h"
+#include "base/nss_util.h"
+#include "base/time.h"
#include "media/base/data_buffer.h"
#include "remoting/base/constants.h"
#include "remoting/jingle_glue/jingle_channel.h"
@@ -25,13 +27,17 @@ using remoting::JingleChannel;
using remoting::kChromotingTokenServiceName;
class JingleTestClient : public JingleChannel::Callback,
- public JingleClient::Callback {
+ public JingleClient::Callback,
+ public base::RefCountedThreadSafe<JingleTestClient> {
public:
+ JingleTestClient()
+ : closed_event_(true, false) {
+ }
+
virtual ~JingleTestClient() {}
void Run(const std::string& username, const std::string& auth_token,
const std::string& host_jid) {
- // TODO(hclam): Fix the threading problem.
remoting::JingleThread jingle_thread;
jingle_thread.Start();
client_ = new JingleClient(&jingle_thread);
@@ -63,7 +69,12 @@ class JingleTestClient : public JingleChannel::Callback,
}
while (!channels_.empty()) {
- channels_.front()->Close();
+ closed_event_.Reset();
+ channels_.front()->Close(
+ NewRunnableMethod(this, &JingleTestClient::OnClosed));
+ // Wait until channel is closed. If it is not closed within 0.1 seconds
+ // continue closing everything else.
+ closed_event_.TimedWait(base::TimeDelta::FromMilliseconds(100));
channels_.pop_front();
}
@@ -106,12 +117,17 @@ class JingleTestClient : public JingleChannel::Callback,
channels_.push_back(channel);
}
+ void OnClosed() {
+ closed_event_.Signal();
+ }
+
private:
typedef std::list<scoped_refptr<JingleChannel> > ChannelsList;
scoped_refptr<JingleClient> client_;
ChannelsList channels_;
Lock channels_lock_;
+ base::WaitableEvent closed_event_;
};
int main(int argc, char** argv) {
@@ -120,6 +136,9 @@ int main(int argc, char** argv) {
base::AtExitManager exit_manager;
+ base::EnsureNSPRInit();
+ base::EnsureNSSInit();
+
std::string host_jid = argc == 2 ? argv[1] : "";
std::string username;
@@ -130,9 +149,9 @@ int main(int argc, char** argv) {
std::cout << "Auth token: ";
std::cin >> auth_token;
- JingleTestClient client;
+ scoped_refptr<JingleTestClient> client = new JingleTestClient();
- client.Run(username, auth_token, host_jid);
+ client->Run(username, auth_token, host_jid);
return 0;
}
diff --git a/remoting/jingle_glue/jingle_thread.cc b/remoting/jingle_glue/jingle_thread.cc
index a00ce59..0037066 100644
--- a/remoting/jingle_glue/jingle_thread.cc
+++ b/remoting/jingle_glue/jingle_thread.cc
@@ -4,13 +4,41 @@
#include "remoting/jingle_glue/jingle_thread.h"
+#include "base/basictypes.h"
#include "base/logging.h"
-#include "base/message_loop.h"
+#include "base/message_pump.h"
+#include "base/time.h"
#include "third_party/libjingle/source/talk/base/ssladapter.h"
namespace remoting {
-const int kRunTasksMessageId = 1;
+const uint32 kRunTasksMessageId = 1;
+const uint32 kStopMessageId = 2;
+
+class JingleThread::JingleMessagePump : public base::MessagePump {
+ public:
+ JingleMessagePump(JingleThread* thread) : thread_(thread) { }
+
+ virtual void Run(Delegate* delegate) { NOTIMPLEMENTED() ;}
+ virtual void Quit() { NOTIMPLEMENTED(); }
+ virtual void ScheduleWork() {
+ thread_->Post(thread_, kRunTasksMessageId);
+ }
+ virtual void ScheduleDelayedWork(const base::Time& time) {
+ NOTIMPLEMENTED();
+ }
+
+ private:
+ JingleThread* thread_;
+};
+
+class JingleThread::JingleMessageLoop : public MessageLoop {
+ public:
+ JingleMessageLoop(JingleThread* thread)
+ : MessageLoop(MessageLoop::TYPE_IO) {
+ pump_ = new JingleMessagePump(thread);
+ }
+};
TaskPump::TaskPump() {
}
@@ -30,6 +58,7 @@ void TaskPump::OnMessage(talk_base::Message* pmsg) {
JingleThread::JingleThread()
: task_pump_(NULL),
started_event_(true, false),
+ stopped_event_(true, false),
message_loop_(NULL) {
}
@@ -41,7 +70,7 @@ void JingleThread::Start() {
}
void JingleThread::Run() {
- MessageLoopForIO message_loop;
+ JingleMessageLoop message_loop(this);
message_loop_ = &message_loop;
TaskPump task_pump;
@@ -50,28 +79,51 @@ void JingleThread::Run() {
// Signal after we've initialized |message_loop_| and |task_pump_|.
started_event_.Signal();
- Post(this, kRunTasksMessageId);
-
Thread::Run();
- message_loop.RunAllPending();
+ stopped_event_.Signal();
task_pump_ = NULL;
message_loop_ = NULL;
}
-// This method is called every 20ms to process tasks from |message_loop_|
-// on this thread.
-// TODO(sergeyu): Remove it when JingleThread moved to Chromium's base::Thread.
-void JingleThread::PumpAuxiliaryLoops() {
- MessageLoop::current()->RunAllPending();
+void JingleThread::Stop() {
+ // Shutdown gracefully: make sure that we excute all messages left in the
+ // queue before exiting. Thread::Stop() would not do that.
+ Post(this, kStopMessageId);
+ stopped_event_.Wait();
+}
+
+MessageLoop* JingleThread::message_loop() {
+ return message_loop_;
+}
- // Schedule next execution 20ms from now.
- PostDelayed(20, this, kRunTasksMessageId);
+ // Returns task pump if the thread is running, otherwise NULL is returned.
+TaskPump* JingleThread::task_pump() {
+ return task_pump_;
}
void JingleThread::OnMessage(talk_base::Message* msg) {
- PumpAuxiliaryLoops();
+ if (msg->message_id == kRunTasksMessageId) {
+ // This code is executed whenever we get new message in |message_loop_|.
+ // JingleMessagePump posts new tasks in the jingle thread.
+ // TODO(sergeyu): Remove it when JingleThread moved on Chromium's
+ // base::Thread.
+ base::MessagePump::Delegate* delegate = message_loop_;
+ // Loop until we run out of work.
+ while (true) {
+ if (!delegate->DoWork())
+ break;
+ }
+ } else if (msg->message_id == kStopMessageId) {
+ // Stop the thread only if there are no more messages left in the queue,
+ // otherwise post another task to try again later.
+ if (msgq_.size() > 0 || fPeekKeep_) {
+ Post(this, kStopMessageId);
+ } else {
+ MessageQueue::Quit();
+ }
+ }
}
} // namespace remoting
diff --git a/remoting/jingle_glue/jingle_thread.h b/remoting/jingle_glue/jingle_thread.h
index 2aa23f3..139b053 100644
--- a/remoting/jingle_glue/jingle_thread.h
+++ b/remoting/jingle_glue/jingle_thread.h
@@ -5,14 +5,13 @@
#ifndef REMOTING_JINGLE_GLUE_JINGLE_THREAD_H_
#define REMOTING_JINGLE_GLUE_JINGLE_THREAD_H_
+#include "base/message_loop.h"
#include "base/tracked_objects.h"
#include "base/waitable_event.h"
#include "third_party/libjingle/source/talk/base/messagequeue.h"
#include "third_party/libjingle/source/talk/base/taskrunner.h"
#include "third_party/libjingle/source/talk/base/thread.h"
-class MessageLoop;
-
namespace buzz {
class XmppClient;
}
@@ -35,7 +34,7 @@ class TaskPump : public talk_base::MessageHandler,
// TODO(sergeyu): This class should be changed to inherit from Chromiums
// base::Thread instead of libjingle's thread.
class JingleThread : public talk_base::Thread,
- private talk_base::MessageHandler {
+ public talk_base::MessageHandler {
public:
JingleThread();
virtual ~JingleThread();
@@ -45,23 +44,28 @@ class JingleThread : public talk_base::Thread,
// Main function for the thread. Should not be called directly.
void Run();
+ // Stop the thread.
+ void Stop();
+
// Returns Chromiums message loop for this thread.
// TODO(sergeyu): remove this method when we use base::Thread instead of
// talk_base::Thread
- MessageLoop* message_loop() { return message_loop_; }
+ MessageLoop* message_loop();
// Returns task pump if the thread is running, otherwise NULL is returned.
- TaskPump* task_pump() { return task_pump_; }
+ TaskPump* task_pump();
private:
+ class JingleMessageLoop;
+ class JingleMessagePump;
+
friend class HeartbeatSenderTest;
virtual void OnMessage(talk_base::Message* msg);
- void PumpAuxiliaryLoops();
-
TaskPump* task_pump_;
base::WaitableEvent started_event_;
+ base::WaitableEvent stopped_event_;
MessageLoop* message_loop_;
DISALLOW_COPY_AND_ASSIGN(JingleThread);
diff --git a/remoting/remoting.gyp b/remoting/remoting.gyp
index eed4d58..9754ddf2 100644
--- a/remoting/remoting.gyp
+++ b/remoting/remoting.gyp
@@ -370,8 +370,9 @@
'host/mock_objects.h',
'host/session_manager_unittest.cc',
'host/test_key_pair.h',
- 'jingle_glue/jingle_thread_unittest.cc',
+ 'jingle_glue/jingle_client_unittest.cc',
'jingle_glue/jingle_channel_unittest.cc',
+ 'jingle_glue/jingle_thread_unittest.cc',
'jingle_glue/iq_request_unittest.cc',
'jingle_glue/mock_objects.h',
'run_all_unittests.cc',