summaryrefslogtreecommitdiffstats
path: root/remoting
diff options
context:
space:
mode:
authorsergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-08-28 02:14:12 +0000
committersergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-08-28 02:14:12 +0000
commit0f935491a6bb96ebad37266ce4f5fb98657c1391 (patch)
treef2ba72e83ef7f9782e80643ccebacae11116b3fc /remoting
parent087574ee809adadd4001fceb330e7954a1c65d8e (diff)
downloadchromium_src-0f935491a6bb96ebad37266ce4f5fb98657c1391.zip
chromium_src-0f935491a6bb96ebad37266ce4f5fb98657c1391.tar.gz
chromium_src-0f935491a6bb96ebad37266ce4f5fb98657c1391.tar.bz2
Jingle_glue bugfixes.
Fixed Closed() methods in JingleChannel and JingleClient so that they are not blocking and guarantee that callback is not called afterwards. Changed JingleThread shutdown mechanism so that the thread doesn't stop until all tasks are finished. JingleClient now uses LocalTunnelSessionClient that allows local connections. BUG=52889 TEST=unittests Review URL: http://codereview.chromium.org/3167047 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@57770 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'remoting')
-rw-r--r--remoting/client/chromoting_client.cc3
-rw-r--r--remoting/jingle_glue/jingle_channel.cc65
-rw-r--r--remoting/jingle_glue/jingle_channel.h34
-rw-r--r--remoting/jingle_glue/jingle_channel_unittest.cc164
-rw-r--r--remoting/jingle_glue/jingle_client.cc201
-rw-r--r--remoting/jingle_glue/jingle_client.h25
-rw-r--r--remoting/jingle_glue/jingle_client_unittest.cc108
-rw-r--r--remoting/jingle_glue/jingle_test_client.cc29
-rw-r--r--remoting/jingle_glue/jingle_thread.cc80
-rw-r--r--remoting/jingle_glue/jingle_thread.h18
-rw-r--r--remoting/remoting.gyp3
11 files changed, 525 insertions, 205 deletions
diff --git a/remoting/client/chromoting_client.cc b/remoting/client/chromoting_client.cc
index 9226186..a9cf5ba 100644
--- a/remoting/client/chromoting_client.cc
+++ b/remoting/client/chromoting_client.cc
@@ -60,9 +60,6 @@ void ChromotingClient::Stop() {
connection_->Disconnect();
view_->TearDown();
-
- // Quit the current message loop.
- message_loop()->PostTask(FROM_HERE, new MessageLoop::QuitTask());
}
void ChromotingClient::ClientDone() {
diff --git a/remoting/jingle_glue/jingle_channel.cc b/remoting/jingle_glue/jingle_channel.cc
index 0ff634c..9dc545f 100644
--- a/remoting/jingle_glue/jingle_channel.cc
+++ b/remoting/jingle_glue/jingle_channel.cc
@@ -22,6 +22,7 @@ const size_t kReadBufferSize = 4096;
JingleChannel::JingleChannel(Callback* callback)
: state_(INITIALIZING),
callback_(callback),
+ closed_(false),
event_handler_(this),
write_buffer_size_(0),
current_write_buf_pos_(0) {
@@ -31,12 +32,13 @@ JingleChannel::JingleChannel(Callback* callback)
// This constructor is only used in unit test.
JingleChannel::JingleChannel()
: state_(CLOSED),
+ closed_(false),
write_buffer_size_(0),
current_write_buf_pos_(0) {
}
JingleChannel::~JingleChannel() {
- DCHECK_EQ(CLOSED, state_);
+ DCHECK(closed_ || stream_ == NULL);
}
void JingleChannel::Init(JingleThread* thread,
@@ -101,7 +103,12 @@ void JingleChannel::DoRead() {
case talk_base::SR_SUCCESS: {
DCHECK_GT(bytes_read, 0U);
buffer->SetDataSize(bytes_read);
- callback_->OnPacketReceived(this, buffer);
+ {
+ AutoLock auto_lock(state_lock_);
+ // Drop received data if the channel is already closed.
+ if (!closed_)
+ callback_->OnPacketReceived(this, buffer);
+ }
break;
}
case talk_base::SR_BLOCK: {
@@ -177,25 +184,53 @@ void JingleChannel::OnStreamEvent(talk_base::StreamInterface* stream,
SetState(CLOSED);
}
-void JingleChannel::SetState(State state) {
- if (state == state_)
- return;
- state_ = state;
- callback_->OnStateChange(this, state);
+void JingleChannel::SetState(State new_state) {
+ if (new_state != state_) {
+ state_ = new_state;
+ {
+ AutoLock auto_lock(state_lock_);
+ if (!closed_)
+ callback_->OnStateChange(this, new_state);
+ }
+ }
}
void JingleChannel::Close() {
- base::WaitableEvent event(true, false);
+ Close(NULL);
+}
+
+void JingleChannel::Close(Task* closed_task) {
+ {
+ AutoLock auto_lock(state_lock_);
+ if (closed_) {
+ // We are already closed.
+ if (closed_task)
+ thread_->message_loop()->PostTask(FROM_HERE, closed_task);
+ return;
+ }
+ closed_ = true;
+ if (closed_task)
+ closed_task_.reset(closed_task);
+ }
+
thread_->message_loop()->PostTask(
- FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoClose, &event));
- event.Wait();
+ FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoClose));
}
-void JingleChannel::DoClose(base::WaitableEvent* done_event) {
- if (stream_.get())
- stream_->Close();
- SetState(CLOSED);
- done_event->Signal();
+
+void JingleChannel::DoClose() {
+ DCHECK(closed_);
+ stream_->Close();
+ stream_.reset();
+
+ // TODO(sergeyu): Even though we have called Close() for the stream, it
+ // doesn't mean that the p2p sessions has been closed. I.e. |closed_task_|
+ // is called too early. If the client is closed right after that the other
+ // side will not receive notification that the channel was closed.
+ if (closed_task_.get()) {
+ closed_task_->Run();
+ closed_task_.reset();
+ }
}
size_t JingleChannel::write_buffer_size() {
diff --git a/remoting/jingle_glue/jingle_channel.h b/remoting/jingle_glue/jingle_channel.h
index 0ba1a75..ee53760 100644
--- a/remoting/jingle_glue/jingle_channel.h
+++ b/remoting/jingle_glue/jingle_channel.h
@@ -14,6 +14,7 @@
#include "base/lock.h"
#include "base/ref_counted.h"
#include "base/scoped_ptr.h"
+#include "base/task.h"
#include "third_party/libjingle/source/talk/base/sigslot.h"
namespace base {
@@ -59,8 +60,10 @@ class JingleChannel : public base::RefCountedThreadSafe<JingleChannel> {
// Puts data to the write buffer.
virtual void Write(scoped_refptr<media::DataBuffer> data);
- // Closes the tunnel.
+ // Closes the tunnel. If specified, |closed_task| is executed after the
+ // connection is successfully closed.
virtual void Close();
+ virtual void Close(Task* closed_task);
// Current state of the tunnel.
State state() const { return state_; }
@@ -87,17 +90,12 @@ class JingleChannel : public base::RefCountedThreadSafe<JingleChannel> {
// caller. Ownership of |thread| is not.
void Init(JingleThread* thread, talk_base::StreamInterface* stream,
const std::string& jid);
- void SetState(State state);
-
- JingleThread* thread_;
- scoped_ptr<talk_base::StreamInterface> stream_;
- State state_;
private:
+ friend class JingleChannelTest;
FRIEND_TEST_ALL_PREFIXES(JingleChannelTest, Init);
FRIEND_TEST_ALL_PREFIXES(JingleChannelTest, Write);
FRIEND_TEST_ALL_PREFIXES(JingleChannelTest, Read);
- FRIEND_TEST_ALL_PREFIXES(JingleChannelTest, Close);
typedef std::deque<scoped_refptr<media::DataBuffer> > DataQueue;
@@ -132,11 +130,31 @@ class JingleChannel : public base::RefCountedThreadSafe<JingleChannel> {
// Called from OnStreamEvent() in the jingle thread.
void DoRead();
- void DoClose(base::WaitableEvent* done_event);
+ // Used by Close() to actually close the channel.
+ void DoClose();
+
+ // Updates state and calels |callback_| if neccessary.
+ void SetState(State new_state);
+
+ // The thread this channel runs on.
+ JingleThread* thread_;
+
+ // The stream of this channel.
+ scoped_ptr<talk_base::StreamInterface> stream_;
+
+ // Current state of the channel.
+ State state_;
// Callback that is called on channel events. Initialized in the constructor.
+ // Must not be called if |closed_| is set to true.
Callback* callback_;
+ // |closed_| must be set to true after Close() is called. |state_lock_| must
+ // be locked whenever closed_ is accessed.
+ Lock state_lock_;
+ bool closed_;
+ scoped_ptr<Task> closed_task_;
+
// Event handler for stream events.
EventHandler event_handler_;
diff --git a/remoting/jingle_glue/jingle_channel_unittest.cc b/remoting/jingle_glue/jingle_channel_unittest.cc
index 94bb137..175d69c 100644
--- a/remoting/jingle_glue/jingle_channel_unittest.cc
+++ b/remoting/jingle_glue/jingle_channel_unittest.cc
@@ -3,6 +3,7 @@
// found in the LICENSE file.
#include "base/ref_counted.h"
+#include "base/waitable_event.h"
#include "media/base/data_buffer.h"
#include "remoting/jingle_glue/jingle_channel.h"
#include "remoting/jingle_glue/jingle_thread.h"
@@ -22,7 +23,7 @@ namespace {
const size_t kBufferSize = 100;
} // namespace
-class MockCallback : public JingleChannel::Callback {
+class MockJingleChannelCallback : public JingleChannel::Callback {
public:
MOCK_METHOD2(OnStateChange, void(JingleChannel*, JingleChannel::State));
MOCK_METHOD2(OnPacketReceived, void(JingleChannel*,
@@ -44,107 +45,146 @@ class MockStream : public talk_base::StreamInterface {
MOCK_METHOD2(PostEvent, void(int, int));
};
-TEST(JingleChannelTest, Init) {
- JingleThread thread;
-
- MockStream* stream = new MockStream();
- MockCallback callback;
+class JingleChannelTest : public testing::Test {
+ public:
+ virtual ~JingleChannelTest() { }
+
+ // A helper that calls OnStreamEvent(). Need this because we want
+ // to call it on the jingle thread.
+ static void StreamEvent(JingleChannel* channel,
+ talk_base::StreamInterface* stream,
+ int event, int error,
+ base::WaitableEvent* done_event) {
+ channel->OnStreamEvent(stream, event, error);
+ if (done_event)
+ done_event->Signal();
+ }
+
+ static void OnClosed(bool* called) {
+ *called = true;
+ }
+
+ protected:
+ virtual void SetUp() {
+ stream_ = new MockStream(); // Freed by the channel.
+ channel_ = new JingleChannel(&callback_);
+ channel_->thread_ = &thread_;
+ channel_->stream_.reset(stream_);
+ }
+
+ JingleThread thread_;
+ MockStream* stream_;
+ MockJingleChannelCallback callback_;
+ scoped_refptr<JingleChannel> channel_;
+};
- EXPECT_CALL(*stream, GetState())
+TEST_F(JingleChannelTest, Init) {
+ EXPECT_CALL(*stream_, GetState())
.Times(1)
.WillRepeatedly(Return(talk_base::SS_OPENING));
- scoped_refptr<JingleChannel> channel = new JingleChannel(&callback);
-
- EXPECT_CALL(callback, OnStateChange(channel.get(), JingleChannel::CONNECTING))
+ EXPECT_CALL(callback_, OnStateChange(channel_.get(),
+ JingleChannel::CONNECTING))
.Times(1);
- thread.Start();
+ thread_.Start();
- EXPECT_EQ(JingleChannel::INITIALIZING, channel->state());
- channel->Init(&thread, stream, "user@domain.com");
- EXPECT_EQ(JingleChannel::CONNECTING, channel->state());
- channel->state_ = JingleChannel::CLOSED;
+ EXPECT_EQ(JingleChannel::INITIALIZING, channel_->state());
+ channel_->Init(&thread_, stream_, "user@domain.com");
+ EXPECT_EQ(JingleChannel::CONNECTING, channel_->state());
+ channel_->closed_ = true;
- thread.Stop();
+ thread_.Stop();
}
-TEST(JingleChannelTest, Write) {
- JingleThread thread;
- MockStream* stream = new MockStream(); // Freed by the channel.
- MockCallback callback;
-
+TEST_F(JingleChannelTest, Write) {
scoped_refptr<media::DataBuffer> data = new media::DataBuffer(kBufferSize);
data->SetDataSize(kBufferSize);
- EXPECT_CALL(*stream, Write(static_cast<const void*>(data->GetData()),
+ EXPECT_CALL(*stream_, Write(static_cast<const void*>(data->GetData()),
kBufferSize, _, _))
.WillOnce(DoAll(SetArgumentPointee<2>(kBufferSize),
Return(talk_base::SR_SUCCESS)));
- scoped_refptr<JingleChannel> channel = new JingleChannel(&callback);
-
- channel->thread_ = &thread;
- channel->stream_.reset(stream);
- channel->state_ = JingleChannel::OPEN;
- thread.Start();
- channel->Write(data);
- thread.Stop();
- channel->state_ = JingleChannel::CLOSED;
+ channel_->state_ = JingleChannel::OPEN;
+ thread_.Start();
+ channel_->Write(data);
+ thread_.Stop();
+ channel_->closed_ = true;
}
-TEST(JingleChannelTest, Read) {
- JingleThread thread;
- MockStream* stream = new MockStream(); // Freed by the channel.
- MockCallback callback;
-
+TEST_F(JingleChannelTest, Read) {
scoped_refptr<media::DataBuffer> data = new media::DataBuffer(kBufferSize);
data->SetDataSize(kBufferSize);
- scoped_refptr<JingleChannel> channel = new JingleChannel(&callback);
-
- EXPECT_CALL(callback, OnPacketReceived(channel.get(), _))
+ EXPECT_CALL(callback_, OnPacketReceived(channel_.get(), _))
.Times(1);
- EXPECT_CALL(*stream, GetAvailable(_))
+ EXPECT_CALL(*stream_, GetAvailable(_))
.WillOnce(DoAll(SetArgumentPointee<0>(kBufferSize),
Return(true)))
.WillOnce(DoAll(SetArgumentPointee<0>(0),
Return(true)));
- EXPECT_CALL(*stream, Read(_, kBufferSize, _, _))
+ EXPECT_CALL(*stream_, Read(_, kBufferSize, _, _))
.WillOnce(DoAll(SetArgumentPointee<2>(kBufferSize),
Return(talk_base::SR_SUCCESS)));
- channel->thread_ = &thread;
- channel->stream_.reset(stream);
- channel->state_ = JingleChannel::OPEN;
- thread.Start();
- channel->OnStreamEvent(stream, talk_base::SE_READ, 0);
- thread.Stop();
- channel->state_ = JingleChannel::CLOSED;
+ channel_->state_ = JingleChannel::OPEN;
+ thread_.Start();
+
+ base::WaitableEvent done_event(true, false);
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction(
+ &JingleChannelTest::StreamEvent, channel_.get(), stream_,
+ talk_base::SE_READ, 0, &done_event));
+ done_event.Wait();
+
+ thread_.Stop();
+
+ channel_->closed_ = true;
}
-TEST(JingleChannelTest, Close) {
- JingleThread thread;
- MockStream* stream = new MockStream(); // Freed by the channel.
- MockCallback callback;
+TEST_F(JingleChannelTest, Close) {
+ EXPECT_CALL(*stream_, Close()).Times(1);
+ // Don't expect any calls except Close().
+ EXPECT_CALL(*stream_, GetAvailable(_)).Times(0);
+ EXPECT_CALL(*stream_, Read(_, _, _, _)).Times(0);
+ EXPECT_CALL(callback_, OnPacketReceived(_, _)).Times(0);
+
+ thread_.Start();
+ channel_->Close();
+ // Verify that the channel doesn't call callback anymore.
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction(
+ &JingleChannelTest::StreamEvent, channel_.get(), stream_,
+ talk_base::SE_READ, 0, static_cast<base::WaitableEvent*>(NULL)));
+ thread_.Stop();
+}
- EXPECT_CALL(*stream, Close())
+TEST_F(JingleChannelTest, ClosedTask) {
+ EXPECT_CALL(*stream_, Close())
.Times(1);
- scoped_refptr<JingleChannel> channel = new JingleChannel(&callback);
-
- channel->thread_ = &thread;
- channel->stream_.reset(stream);
- channel->state_ = JingleChannel::OPEN;
+ thread_.Start();
+ bool closed = false;
+ channel_->Close(NewRunnableFunction(&JingleChannelTest::OnClosed,
+ &closed));
+ thread_.Stop();
+ EXPECT_TRUE(closed);
+}
- EXPECT_CALL(callback, OnStateChange(channel.get(), JingleChannel::CLOSED))
+TEST_F(JingleChannelTest, DoubleClose) {
+ EXPECT_CALL(*stream_, Close())
.Times(1);
- thread.Start();
- channel->Close();
- thread.Stop();
+ thread_.Start();
+ bool closed1 = false;
+ channel_->Close(NewRunnableFunction(&JingleChannelTest::OnClosed,
+ &closed1));
+ bool closed2 = false;
+ channel_->Close(NewRunnableFunction(&JingleChannelTest::OnClosed,
+ &closed2));
+ thread_.Stop();
+ EXPECT_TRUE(closed1 && closed2);
}
} // namespace remoting
diff --git a/remoting/jingle_glue/jingle_client.cc b/remoting/jingle_glue/jingle_client.cc
index 938664e..01e607b 100644
--- a/remoting/jingle_glue/jingle_client.cc
+++ b/remoting/jingle_glue/jingle_client.cc
@@ -2,9 +2,6 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-// TODO(ajwong): We assign and read from a few of the member variables on
-// two threads. We need to audit this for thread safety.
-
#include "remoting/jingle_glue/jingle_client.h"
#include "base/logging.h"
@@ -17,92 +14,64 @@
#include "third_party/libjingle/source/talk/base/asyncsocket.h"
#include "third_party/libjingle/source/talk/base/ssladapter.h"
#include "third_party/libjingle/source/talk/p2p/base/sessionmanager.h"
+#include "third_party/libjingle/source/talk/p2p/base/transport.h"
#include "third_party/libjingle/source/talk/p2p/client/sessionmanagertask.h"
-#ifdef USE_SSL_TUNNEL
-#include "third_party/libjingle/source/talk/session/tunnel/securetunnelsessionclient.h"
-#endif
#include "third_party/libjingle/source/talk/session/tunnel/tunnelsessionclient.h"
#include "third_party/libjingle/source/talk/xmpp/prexmppauth.h"
#include "third_party/libjingle/source/talk/xmpp/saslcookiemechanism.h"
namespace remoting {
+namespace {
+// A TunnelSessionClient that allows local connections.
+class LocalTunnelSessionClient : public cricket::TunnelSessionClient {
+ public:
+ LocalTunnelSessionClient(const buzz::Jid& jid,
+ cricket::SessionManager* manager)
+ : TunnelSessionClient(jid, manager) {
+ }
+
+ protected:
+ virtual cricket::TunnelSession* MakeTunnelSession(
+ cricket::Session* session, talk_base::Thread* stream_thread,
+ cricket::TunnelSessionRole role) {
+ session->transport()->set_allow_local_ips(true);
+ return new cricket::TunnelSession(this, session, stream_thread);
+ }
+};
+} // namespace
+
JingleClient::JingleClient(JingleThread* thread)
- : client_(NULL),
- thread_(thread),
- state_(CREATED),
- callback_(NULL) {
+ : thread_(thread),
+ callback_(NULL),
+ client_(NULL),
+ state_(START),
+ initialized_(false),
+ closed_(false) {
}
JingleClient::~JingleClient() {
- // JingleClient can be destroyed only after it's closed.
- DCHECK(state_ == CLOSED || state_ == CREATED);
+ AutoLock auto_lock(state_lock_);
+ DCHECK(!initialized_ || closed_);
}
void JingleClient::Init(
const std::string& username, const std::string& auth_token,
const std::string& auth_token_service, Callback* callback) {
DCHECK_NE(username, "");
- DCHECK(callback != NULL);
- DCHECK(state_ == CREATED);
-
- callback_ = callback;
- message_loop()->PostTask(
- FROM_HERE, NewRunnableMethod(this, &JingleClient::DoInitialize,
- username, auth_token, auth_token_service));
- state_ = INITIALIZED;
-}
-
-JingleChannel* JingleClient::Connect(const std::string& host_jid,
- JingleChannel::Callback* callback) {
- // Ownership if channel is given to DoConnect.
- scoped_refptr<JingleChannel> channel = new JingleChannel(callback);
- message_loop()->PostTask(
- FROM_HERE, NewRunnableMethod(this, &JingleClient::DoConnect,
- channel, host_jid, callback));
- return channel;
-}
-
-void JingleClient::DoConnect(scoped_refptr<JingleChannel> channel,
- const std::string& host_jid,
- JingleChannel::Callback* callback) {
- DCHECK_EQ(message_loop(), MessageLoop::current());
-
- talk_base::StreamInterface* stream =
- tunnel_session_client_->CreateTunnel(buzz::Jid(host_jid), "");
- DCHECK(stream != NULL);
- channel->Init(thread_, stream, host_jid);
-}
+ {
+ AutoLock auto_lock(state_lock_);
+ DCHECK(!initialized_ && !closed_);
+ initialized_ = true;
-void JingleClient::Close() {
- // Once we are closed we really shouldn't talk to the callback again. In the
- // case when JingleClient outlives the owner access the callback is not safe.
- // TODO(hclam): We need to lock to reset callback.
- callback_ = NULL;
+ DCHECK(callback != NULL);
+ callback_ = callback;
+ }
message_loop()->PostTask(
- FROM_HERE, NewRunnableMethod(this, &JingleClient::DoClose));
-}
-
-void JingleClient::DoClose() {
- DCHECK_EQ(message_loop(), MessageLoop::current());
-
- // If we have not yet initialized and the client is already closed then
- // don't close again.
- if (state_ == CLOSED)
- return;
-
- if (client_) {
- client_->Disconnect();
- // Client is deleted by TaskRunner.
- client_ = NULL;
- }
- tunnel_session_client_.reset();
- port_allocator_.reset();
- session_manager_.reset();
- network_manager_.reset();
- UpdateState(CLOSED);
+ FROM_HERE, NewRunnableMethod(this, &JingleClient::DoInitialize,
+ username, auth_token, auth_token_service));
}
void JingleClient::DoInitialize(const std::string& username,
@@ -138,18 +107,10 @@ void JingleClient::DoInitialize(const std::string& username,
port_allocator->SetJingleInfo(client_);
session_manager_.reset(new cricket::SessionManager(port_allocator_.get()));
-#ifdef USE_SSL_TUNNEL
- cricket::SecureTunnelSessionClient* session_client =
- new cricket::SecureTunnelSessionClient(client_->jid(),
- session_manager_.get());
- if (!session_client->GenerateIdentity())
- return false;
- tunnel_session_client_.reset(session_client);
-#else // !USE_SSL_TUNNEL
+
tunnel_session_client_.reset(
- new cricket::TunnelSessionClient(client_->jid(),
- session_manager_.get()));
-#endif // USE_SSL_TUNNEL
+ new LocalTunnelSessionClient(client_->jid(),
+ session_manager_.get()));
cricket::SessionManagerTask* receiver =
new cricket::SessionManagerTask(client_, session_manager_.get());
@@ -160,6 +121,72 @@ void JingleClient::DoInitialize(const std::string& username,
this, &JingleClient::OnIncomingTunnel);
}
+JingleChannel* JingleClient::Connect(const std::string& host_jid,
+ JingleChannel::Callback* callback) {
+ DCHECK(initialized_ && !closed_);
+
+ // Ownership if channel is given to DoConnect.
+ scoped_refptr<JingleChannel> channel = new JingleChannel(callback);
+ message_loop()->PostTask(
+ FROM_HERE, NewRunnableMethod(this, &JingleClient::DoConnect,
+ channel, host_jid, callback));
+ return channel;
+}
+
+void JingleClient::DoConnect(scoped_refptr<JingleChannel> channel,
+ const std::string& host_jid,
+ JingleChannel::Callback* callback) {
+ DCHECK_EQ(message_loop(), MessageLoop::current());
+
+ talk_base::StreamInterface* stream =
+ tunnel_session_client_->CreateTunnel(buzz::Jid(host_jid), "");
+ DCHECK(stream != NULL);
+
+ channel->Init(thread_, stream, host_jid);
+}
+
+void JingleClient::Close() {
+ Close(NULL);
+}
+
+void JingleClient::Close(Task* closed_task) {
+ {
+ AutoLock auto_lock(state_lock_);
+ // If the client is already closed then don't close again.
+ if (closed_) {
+ if (closed_task)
+ thread_->message_loop()->PostTask(FROM_HERE, closed_task);
+ return;
+ }
+ closed_task_.reset(closed_task);
+ closed_ = true;
+ }
+
+ message_loop()->PostTask(
+ FROM_HERE, NewRunnableMethod(this, &JingleClient::DoClose));
+}
+
+void JingleClient::DoClose() {
+ DCHECK_EQ(message_loop(), MessageLoop::current());
+ DCHECK(closed_);
+
+ tunnel_session_client_.reset();
+ session_manager_.reset();
+ port_allocator_.reset();
+ network_manager_.reset();
+
+ if (client_) {
+ client_->Disconnect();
+ // Client is deleted by TaskRunner.
+ client_ = NULL;
+ }
+
+ if (closed_task_.get()) {
+ closed_task_->Run();
+ closed_task_.reset();
+ }
+}
+
std::string JingleClient::GetFullJid() {
AutoLock auto_lock(full_jid_lock_);
return full_jid_;
@@ -176,7 +203,7 @@ MessageLoop* JingleClient::message_loop() {
void JingleClient::OnConnectionStateChanged(buzz::XmppEngine::State state) {
switch (state) {
case buzz::XmppEngine::STATE_START:
- UpdateState(INITIALIZED);
+ UpdateState(START);
break;
case buzz::XmppEngine::STATE_OPENING:
UpdateState(CONNECTING);
@@ -200,8 +227,11 @@ void JingleClient::OnConnectionStateChanged(buzz::XmppEngine::State state) {
void JingleClient::OnIncomingTunnel(
cricket::TunnelSessionClient* client, buzz::Jid jid,
std::string description, cricket::Session* session) {
- // Decline connection if we don't have callback.
- if (!callback_) {
+ // Must always lock state and check closed_ when calling callback.
+ AutoLock auto_lock(state_lock_);
+
+ if (closed_) {
+ // Always reject connection if we are closed.
client->DeclineTunnel(session);
return;
}
@@ -221,8 +251,11 @@ void JingleClient::OnIncomingTunnel(
void JingleClient::UpdateState(State new_state) {
if (new_state != state_) {
state_ = new_state;
- if (callback_)
- callback_->OnStateChange(this, new_state);
+ {
+ AutoLock auto_lock(state_lock_);
+ if (!closed_)
+ callback_->OnStateChange(this, new_state);
+ }
}
}
diff --git a/remoting/jingle_glue/jingle_client.h b/remoting/jingle_glue/jingle_client.h
index bac15d8..9d7b844 100644
--- a/remoting/jingle_glue/jingle_client.h
+++ b/remoting/jingle_glue/jingle_client.h
@@ -7,6 +7,7 @@
#include <string>
+#include "base/waitable_event.h"
#include "remoting/jingle_glue/jingle_channel.h"
#include "third_party/libjingle/source/talk/xmpp/xmppclient.h"
@@ -36,8 +37,7 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>,
public sigslot::has_slots<> {
public:
enum State {
- CREATED, // Initial state.
- INITIALIZED,
+ START, // Initial state.
CONNECTING,
CONNECTED,
CLOSED,
@@ -84,8 +84,10 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>,
JingleChannel::Callback* callback);
// Closes XMPP connection and stops the thread. Must be called before the
- // object is destroyed.
+ // object is destroyed. If specified, |closed_task| is executed after the
+ // connection is successfully closed.
void Close();
+ void Close(Task* closed_task);
// Returns JID with resource ID. Empty string is returned if full JID is not
// known yet, i.e. authentication hasn't finished.
@@ -95,7 +97,7 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>,
// is transfered to the caller.
virtual IqRequest* CreateIqRequest();
- // Current state of the client.
+ // Current connection state of the client.
State state() { return state_; }
// Returns XmppClient object for the xmpp connection or NULL if not connected.
@@ -106,6 +108,7 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>,
private:
friend class HeartbeatSenderTest;
+ friend class JingleClientTest;
void OnConnectionStateChanged(buzz::XmppEngine::State state);
@@ -131,14 +134,24 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>,
buzz::PreXmppAuth* CreatePreXmppAuth(
const buzz::XmppClientSettings& settings);
- buzz::XmppClient* client_;
+ // JingleThread used for the connection. Set in the constructor.
JingleThread* thread_;
- State state_;
+
+ // Callback for this object. Callback must not be called if closed_ == true.
Callback* callback_;
+ // The XmppClient and its state and jid.
+ buzz::XmppClient* client_;
+ State state_;
Lock full_jid_lock_;
std::string full_jid_;
+ // Current state of the object.
+ Lock state_lock_; // Must be locked when accessing initialized_ or closed_.
+ bool initialized_;
+ bool closed_;
+ scoped_ptr<Task> closed_task_;
+
scoped_ptr<talk_base::NetworkManager> network_manager_;
scoped_ptr<cricket::BasicPortAllocator> port_allocator_;
scoped_ptr<cricket::SessionManager> session_manager_;
diff --git a/remoting/jingle_glue/jingle_client_unittest.cc b/remoting/jingle_glue/jingle_client_unittest.cc
new file mode 100644
index 0000000..4d6d55f
--- /dev/null
+++ b/remoting/jingle_glue/jingle_client_unittest.cc
@@ -0,0 +1,108 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/waitable_event.h"
+#include "remoting/jingle_glue/jingle_client.h"
+#include "remoting/jingle_glue/jingle_thread.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using testing::_;
+
+namespace remoting {
+
+class MockJingleClientCallback : public JingleClient::Callback {
+ public:
+ ~MockJingleClientCallback() { }
+
+ MOCK_METHOD2(OnStateChange, void(JingleClient*, JingleClient::State));
+ MOCK_METHOD3(OnAcceptConnection, bool(
+ JingleClient*, const std::string&,
+ JingleChannel::Callback**_callback));
+ MOCK_METHOD2(OnNewConnection, void(
+ JingleClient*, scoped_refptr<JingleChannel>));
+};
+
+class JingleClientTest : public testing::Test {
+ public:
+ virtual ~JingleClientTest() { }
+
+ static void OnClosed(bool* called) {
+ *called = true;
+ }
+
+ // A helper that calls OnConnectionStateChanged(). Need this because we want
+ // to call it on the jingle thread.
+ static void ChangeState(JingleClient* client, buzz::XmppEngine::State state,
+ base::WaitableEvent* done_event) {
+ client->OnConnectionStateChanged(state);
+ if (done_event)
+ done_event->Signal();
+ }
+
+ protected:
+ virtual void SetUp() {
+ client_ = new JingleClient(&thread_);
+ // Fake initialization
+ client_->initialized_ = true;
+ client_->callback_ = &callback_;
+ }
+
+ JingleThread thread_;
+ scoped_refptr<JingleClient> client_;
+ MockJingleClientCallback callback_;
+};
+
+TEST_F(JingleClientTest, OnStateChanged) {
+ EXPECT_CALL(callback_, OnStateChange(_, JingleClient::CONNECTING))
+ .Times(1);
+
+ thread_.Start();
+
+ base::WaitableEvent state_changed_event(true, false);
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction(
+ &JingleClientTest::ChangeState, client_.get(),
+ buzz::XmppEngine::STATE_OPENING, &state_changed_event));
+ state_changed_event.Wait();
+
+ client_->Close();
+
+ thread_.Stop();
+}
+
+TEST_F(JingleClientTest, Close) {
+ EXPECT_CALL(callback_, OnStateChange(_, _))
+ .Times(0);
+ thread_.Start();
+ client_->Close();
+ // Verify that the channel doesn't call callback anymore.
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction(
+ &JingleClientTest::ChangeState, client_.get(),
+ buzz::XmppEngine::STATE_OPENING,
+ static_cast<base::WaitableEvent*>(NULL)));
+ thread_.Stop();
+}
+
+TEST_F(JingleClientTest, ClosedTask) {
+ thread_.Start();
+ bool closed = false;
+ client_->Close(NewRunnableFunction(&JingleClientTest::OnClosed,
+ &closed));
+ thread_.Stop();
+ EXPECT_TRUE(closed);
+}
+
+TEST_F(JingleClientTest, DoubleClose) {
+ thread_.Start();
+ bool closed1 = false;
+ client_->Close(NewRunnableFunction(&JingleClientTest::OnClosed,
+ &closed1));
+ bool closed2 = false;
+ client_->Close(NewRunnableFunction(&JingleClientTest::OnClosed,
+ &closed2));
+ thread_.Stop();
+ EXPECT_TRUE(closed1 && closed2);
+}
+
+} // namespace remoting
diff --git a/remoting/jingle_glue/jingle_test_client.cc b/remoting/jingle_glue/jingle_test_client.cc
index 255a931..4157930b 100644
--- a/remoting/jingle_glue/jingle_test_client.cc
+++ b/remoting/jingle_glue/jingle_test_client.cc
@@ -14,6 +14,8 @@ extern "C" {
#include <list>
#include "base/at_exit.h"
+#include "base/nss_util.h"
+#include "base/time.h"
#include "media/base/data_buffer.h"
#include "remoting/base/constants.h"
#include "remoting/jingle_glue/jingle_channel.h"
@@ -25,13 +27,17 @@ using remoting::JingleChannel;
using remoting::kChromotingTokenServiceName;
class JingleTestClient : public JingleChannel::Callback,
- public JingleClient::Callback {
+ public JingleClient::Callback,
+ public base::RefCountedThreadSafe<JingleTestClient> {
public:
+ JingleTestClient()
+ : closed_event_(true, false) {
+ }
+
virtual ~JingleTestClient() {}
void Run(const std::string& username, const std::string& auth_token,
const std::string& host_jid) {
- // TODO(hclam): Fix the threading problem.
remoting::JingleThread jingle_thread;
jingle_thread.Start();
client_ = new JingleClient(&jingle_thread);
@@ -63,7 +69,12 @@ class JingleTestClient : public JingleChannel::Callback,
}
while (!channels_.empty()) {
- channels_.front()->Close();
+ closed_event_.Reset();
+ channels_.front()->Close(
+ NewRunnableMethod(this, &JingleTestClient::OnClosed));
+ // Wait until channel is closed. If it is not closed within 0.1 seconds
+ // continue closing everything else.
+ closed_event_.TimedWait(base::TimeDelta::FromMilliseconds(100));
channels_.pop_front();
}
@@ -106,12 +117,17 @@ class JingleTestClient : public JingleChannel::Callback,
channels_.push_back(channel);
}
+ void OnClosed() {
+ closed_event_.Signal();
+ }
+
private:
typedef std::list<scoped_refptr<JingleChannel> > ChannelsList;
scoped_refptr<JingleClient> client_;
ChannelsList channels_;
Lock channels_lock_;
+ base::WaitableEvent closed_event_;
};
int main(int argc, char** argv) {
@@ -120,6 +136,9 @@ int main(int argc, char** argv) {
base::AtExitManager exit_manager;
+ base::EnsureNSPRInit();
+ base::EnsureNSSInit();
+
std::string host_jid = argc == 2 ? argv[1] : "";
std::string username;
@@ -130,9 +149,9 @@ int main(int argc, char** argv) {
std::cout << "Auth token: ";
std::cin >> auth_token;
- JingleTestClient client;
+ scoped_refptr<JingleTestClient> client = new JingleTestClient();
- client.Run(username, auth_token, host_jid);
+ client->Run(username, auth_token, host_jid);
return 0;
}
diff --git a/remoting/jingle_glue/jingle_thread.cc b/remoting/jingle_glue/jingle_thread.cc
index a00ce59..0037066 100644
--- a/remoting/jingle_glue/jingle_thread.cc
+++ b/remoting/jingle_glue/jingle_thread.cc
@@ -4,13 +4,41 @@
#include "remoting/jingle_glue/jingle_thread.h"
+#include "base/basictypes.h"
#include "base/logging.h"
-#include "base/message_loop.h"
+#include "base/message_pump.h"
+#include "base/time.h"
#include "third_party/libjingle/source/talk/base/ssladapter.h"
namespace remoting {
-const int kRunTasksMessageId = 1;
+const uint32 kRunTasksMessageId = 1;
+const uint32 kStopMessageId = 2;
+
+class JingleThread::JingleMessagePump : public base::MessagePump {
+ public:
+ JingleMessagePump(JingleThread* thread) : thread_(thread) { }
+
+ virtual void Run(Delegate* delegate) { NOTIMPLEMENTED() ;}
+ virtual void Quit() { NOTIMPLEMENTED(); }
+ virtual void ScheduleWork() {
+ thread_->Post(thread_, kRunTasksMessageId);
+ }
+ virtual void ScheduleDelayedWork(const base::Time& time) {
+ NOTIMPLEMENTED();
+ }
+
+ private:
+ JingleThread* thread_;
+};
+
+class JingleThread::JingleMessageLoop : public MessageLoop {
+ public:
+ JingleMessageLoop(JingleThread* thread)
+ : MessageLoop(MessageLoop::TYPE_IO) {
+ pump_ = new JingleMessagePump(thread);
+ }
+};
TaskPump::TaskPump() {
}
@@ -30,6 +58,7 @@ void TaskPump::OnMessage(talk_base::Message* pmsg) {
JingleThread::JingleThread()
: task_pump_(NULL),
started_event_(true, false),
+ stopped_event_(true, false),
message_loop_(NULL) {
}
@@ -41,7 +70,7 @@ void JingleThread::Start() {
}
void JingleThread::Run() {
- MessageLoopForIO message_loop;
+ JingleMessageLoop message_loop(this);
message_loop_ = &message_loop;
TaskPump task_pump;
@@ -50,28 +79,51 @@ void JingleThread::Run() {
// Signal after we've initialized |message_loop_| and |task_pump_|.
started_event_.Signal();
- Post(this, kRunTasksMessageId);
-
Thread::Run();
- message_loop.RunAllPending();
+ stopped_event_.Signal();
task_pump_ = NULL;
message_loop_ = NULL;
}
-// This method is called every 20ms to process tasks from |message_loop_|
-// on this thread.
-// TODO(sergeyu): Remove it when JingleThread moved to Chromium's base::Thread.
-void JingleThread::PumpAuxiliaryLoops() {
- MessageLoop::current()->RunAllPending();
+void JingleThread::Stop() {
+ // Shutdown gracefully: make sure that we excute all messages left in the
+ // queue before exiting. Thread::Stop() would not do that.
+ Post(this, kStopMessageId);
+ stopped_event_.Wait();
+}
+
+MessageLoop* JingleThread::message_loop() {
+ return message_loop_;
+}
- // Schedule next execution 20ms from now.
- PostDelayed(20, this, kRunTasksMessageId);
+ // Returns task pump if the thread is running, otherwise NULL is returned.
+TaskPump* JingleThread::task_pump() {
+ return task_pump_;
}
void JingleThread::OnMessage(talk_base::Message* msg) {
- PumpAuxiliaryLoops();
+ if (msg->message_id == kRunTasksMessageId) {
+ // This code is executed whenever we get new message in |message_loop_|.
+ // JingleMessagePump posts new tasks in the jingle thread.
+ // TODO(sergeyu): Remove it when JingleThread moved on Chromium's
+ // base::Thread.
+ base::MessagePump::Delegate* delegate = message_loop_;
+ // Loop until we run out of work.
+ while (true) {
+ if (!delegate->DoWork())
+ break;
+ }
+ } else if (msg->message_id == kStopMessageId) {
+ // Stop the thread only if there are no more messages left in the queue,
+ // otherwise post another task to try again later.
+ if (msgq_.size() > 0 || fPeekKeep_) {
+ Post(this, kStopMessageId);
+ } else {
+ MessageQueue::Quit();
+ }
+ }
}
} // namespace remoting
diff --git a/remoting/jingle_glue/jingle_thread.h b/remoting/jingle_glue/jingle_thread.h
index 2aa23f3..139b053 100644
--- a/remoting/jingle_glue/jingle_thread.h
+++ b/remoting/jingle_glue/jingle_thread.h
@@ -5,14 +5,13 @@
#ifndef REMOTING_JINGLE_GLUE_JINGLE_THREAD_H_
#define REMOTING_JINGLE_GLUE_JINGLE_THREAD_H_
+#include "base/message_loop.h"
#include "base/tracked_objects.h"
#include "base/waitable_event.h"
#include "third_party/libjingle/source/talk/base/messagequeue.h"
#include "third_party/libjingle/source/talk/base/taskrunner.h"
#include "third_party/libjingle/source/talk/base/thread.h"
-class MessageLoop;
-
namespace buzz {
class XmppClient;
}
@@ -35,7 +34,7 @@ class TaskPump : public talk_base::MessageHandler,
// TODO(sergeyu): This class should be changed to inherit from Chromiums
// base::Thread instead of libjingle's thread.
class JingleThread : public talk_base::Thread,
- private talk_base::MessageHandler {
+ public talk_base::MessageHandler {
public:
JingleThread();
virtual ~JingleThread();
@@ -45,23 +44,28 @@ class JingleThread : public talk_base::Thread,
// Main function for the thread. Should not be called directly.
void Run();
+ // Stop the thread.
+ void Stop();
+
// Returns Chromiums message loop for this thread.
// TODO(sergeyu): remove this method when we use base::Thread instead of
// talk_base::Thread
- MessageLoop* message_loop() { return message_loop_; }
+ MessageLoop* message_loop();
// Returns task pump if the thread is running, otherwise NULL is returned.
- TaskPump* task_pump() { return task_pump_; }
+ TaskPump* task_pump();
private:
+ class JingleMessageLoop;
+ class JingleMessagePump;
+
friend class HeartbeatSenderTest;
virtual void OnMessage(talk_base::Message* msg);
- void PumpAuxiliaryLoops();
-
TaskPump* task_pump_;
base::WaitableEvent started_event_;
+ base::WaitableEvent stopped_event_;
MessageLoop* message_loop_;
DISALLOW_COPY_AND_ASSIGN(JingleThread);
diff --git a/remoting/remoting.gyp b/remoting/remoting.gyp
index eed4d58..9754ddf2 100644
--- a/remoting/remoting.gyp
+++ b/remoting/remoting.gyp
@@ -370,8 +370,9 @@
'host/mock_objects.h',
'host/session_manager_unittest.cc',
'host/test_key_pair.h',
- 'jingle_glue/jingle_thread_unittest.cc',
+ 'jingle_glue/jingle_client_unittest.cc',
'jingle_glue/jingle_channel_unittest.cc',
+ 'jingle_glue/jingle_thread_unittest.cc',
'jingle_glue/iq_request_unittest.cc',
'jingle_glue/mock_objects.h',
'run_all_unittests.cc',