summaryrefslogtreecommitdiffstats
path: root/chrome/common
diff options
context:
space:
mode:
authorakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-06-02 23:32:22 +0000
committerakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-06-02 23:32:22 +0000
commit1d874a809da9edd362288409233a842bee7c94a9 (patch)
tree4a9fcccee24a211bc156b54519dfee5aa9e83eae /chrome/common
parent0717207534670b8e4ca8e33244baf46371b7d851 (diff)
downloadchromium_src-1d874a809da9edd362288409233a842bee7c94a9.zip
chromium_src-1d874a809da9edd362288409233a842bee7c94a9.tar.gz
chromium_src-1d874a809da9edd362288409233a842bee7c94a9.tar.bz2
Refactored MediatorThread to use Chrome threads primarily. This is in preparation for using Chrome sockets.
Made TalkMediator explicitly non-thread-safe. BUG=45612 TEST=manual testing of notifications Review URL: http://codereview.chromium.org/2471006 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@48791 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'chrome/common')
-rw-r--r--chrome/common/net/notifier/communicator/login_failure.h9
-rw-r--r--chrome/common/net/notifier/listener/mediator_thread_impl.cc288
-rw-r--r--chrome/common/net/notifier/listener/mediator_thread_impl.h116
-rw-r--r--chrome/common/net/notifier/listener/talk_mediator_impl.cc95
-rw-r--r--chrome/common/net/notifier/listener/talk_mediator_impl.h8
-rw-r--r--chrome/common/net/notifier/listener/talk_mediator_unittest.cc4
6 files changed, 272 insertions, 248 deletions
diff --git a/chrome/common/net/notifier/communicator/login_failure.h b/chrome/common/net/notifier/communicator/login_failure.h
index 89c629b..05505ce 100644
--- a/chrome/common/net/notifier/communicator/login_failure.h
+++ b/chrome/common/net/notifier/communicator/login_failure.h
@@ -5,7 +5,6 @@
#ifndef CHROME_COMMON_NET_NOTIFIER_COMMUNICATOR_LOGIN_FAILURE_H_
#define CHROME_COMMON_NET_NOTIFIER_COMMUNICATOR_LOGIN_FAILURE_H_
-#include "talk/base/common.h"
#include "talk/xmpp/xmppengine.h"
namespace notifier {
@@ -41,11 +40,9 @@ class LoginFailure {
buzz::XmppEngine::Error xmpp_error() const;
private:
- LoginError error_;
- buzz::XmppEngine::Error xmpp_error_;
- int subcode_;
-
- DISALLOW_COPY_AND_ASSIGN(LoginFailure);
+ const LoginError error_;
+ const buzz::XmppEngine::Error xmpp_error_;
+ const int subcode_;
};
} // namespace notifier
diff --git a/chrome/common/net/notifier/listener/mediator_thread_impl.cc b/chrome/common/net/notifier/listener/mediator_thread_impl.cc
index f13dd25..ac88983 100644
--- a/chrome/common/net/notifier/listener/mediator_thread_impl.cc
+++ b/chrome/common/net/notifier/listener/mediator_thread_impl.cc
@@ -6,7 +6,7 @@
#include "base/logging.h"
#include "base/message_loop.h"
-#include "base/platform_thread.h"
+#include "base/task.h"
#include "chrome/common/net/network_change_notifier_proxy.h"
#include "chrome/common/net/notifier/base/task_pump.h"
#include "chrome/common/net/notifier/communicator/connection_options.h"
@@ -17,11 +17,17 @@
#include "chrome/common/net/notifier/listener/subscribe_task.h"
#include "net/base/host_port_pair.h"
#include "net/base/host_resolver.h"
+#include "talk/base/physicalsocketserver.h"
#include "talk/base/thread.h"
#include "talk/xmpp/xmppclient.h"
#include "talk/xmpp/xmppclientsettings.h"
-using std::string;
+// We manage the lifetime of notifier::MediatorThreadImpl ourselves.
+template <>
+struct RunnableMethodTraits<notifier::MediatorThreadImpl> {
+ void RetainCallee(notifier::MediatorThreadImpl*) {}
+ void ReleaseCallee(notifier::MediatorThreadImpl*) {}
+};
namespace notifier {
@@ -29,111 +35,123 @@ MediatorThreadImpl::MediatorThreadImpl(
chrome_common_net::NetworkChangeNotifierThread*
network_change_notifier_thread)
: delegate_(NULL),
- network_change_notifier_thread_(network_change_notifier_thread) {
+ parent_message_loop_(MessageLoop::current()),
+ network_change_notifier_thread_(network_change_notifier_thread),
+ worker_thread_("MediatorThread worker thread") {
+ DCHECK(parent_message_loop_);
DCHECK(network_change_notifier_thread_);
}
MediatorThreadImpl::~MediatorThreadImpl() {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
}
void MediatorThreadImpl::SetDelegate(Delegate* delegate) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
delegate_ = delegate;
}
void MediatorThreadImpl::Start() {
- talk_base::Thread::Start();
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ // We create the worker thread as an IO thread in preparation for
+ // making this use Chrome sockets.
+ const base::Thread::Options options(MessageLoop::TYPE_IO, 0);
+ // TODO(akalin): Make this function return a bool and remove this
+ // CHECK().
+ CHECK(worker_thread_.StartWithOptions(options));
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::StartLibjingleThread));
}
-void MediatorThreadImpl::Run() {
- PlatformThread::SetName("Notifier_MediatorThread");
- MessageLoop message_loop;
- Post(this, CMD_PUMP_AUXILIARY_LOOPS);
- ProcessMessages(talk_base::kForever);
+void MediatorThreadImpl::StartLibjingleThread() {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ socket_server_.reset(new talk_base::PhysicalSocketServer());
+ libjingle_thread_.reset(new talk_base::Thread());
+ talk_base::ThreadManager::SetCurrent(libjingle_thread_.get());
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::PumpLibjingleLoop));
}
-void MediatorThreadImpl::PumpAuxiliaryLoops() {
- MessageLoop::current()->RunAllPending();
- // We want to pump auxiliary loops every 100ms until this thread is stopped,
- // at which point this call will do nothing.
- PostDelayed(100, this, CMD_PUMP_AUXILIARY_LOOPS);
+void MediatorThreadImpl::StopLibjingleThread() {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ talk_base::ThreadManager::SetCurrent(NULL);
+ libjingle_thread_.reset();
+ socket_server_.reset();
}
-void MediatorThreadImpl::Login(const buzz::XmppClientSettings& settings) {
- Post(this, CMD_LOGIN, new LoginData(settings));
+void MediatorThreadImpl::PumpLibjingleLoop() {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ // Pump the libjingle message loop 100ms at a time.
+ if (!libjingle_thread_.get()) {
+ // StopLibjingleThread() was called.
+ return;
+ }
+ libjingle_thread_->ProcessMessages(100);
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::PumpLibjingleLoop));
}
-void MediatorThreadImpl::Stop() {
- Thread::Stop();
- CHECK(!login_.get() && !network_change_notifier_.get() && !pump_.get())
- << "Logout should be called prior to message queue exit.";
+void MediatorThreadImpl::Login(const buzz::XmppClientSettings& settings) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::DoLogin, settings));
}
void MediatorThreadImpl::Logout() {
- CHECK(!IsQuitting())
- << "Logout should be called prior to message queue exit.";
- Post(this, CMD_DISCONNECT);
- Stop();
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::DoDisconnect));
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::StopLibjingleThread));
+ // TODO(akalin): Decomp this into a separate stop method.
+ worker_thread_.Stop();
+ // Process any messages the worker thread may be posted on our
+ // thread.
+ bool old_state = parent_message_loop_->NestableTasksAllowed();
+ parent_message_loop_->SetNestableTasksAllowed(true);
+ parent_message_loop_->RunAllPending();
+ parent_message_loop_->SetNestableTasksAllowed(old_state);
+ // worker_thread_ should have cleaned all this up.
+ CHECK(!login_.get());
+ CHECK(!network_change_notifier_.get());
+ CHECK(!pump_.get());
}
void MediatorThreadImpl::ListenForUpdates() {
- Post(this, CMD_LISTEN_FOR_UPDATES);
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::DoListenForUpdates));
}
void MediatorThreadImpl::SubscribeForUpdates(
const std::vector<std::string>& subscribed_services_list) {
- Post(this, CMD_SUBSCRIBE_FOR_UPDATES,
- new SubscriptionData(subscribed_services_list));
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::DoSubscribeForUpdates,
+ subscribed_services_list));
}
void MediatorThreadImpl::SendNotification(
const OutgoingNotificationData& data) {
- Post(this, CMD_SEND_NOTIFICATION, new OutgoingNotificationMessageData(data));
-}
-
-void MediatorThreadImpl::ProcessMessages(int milliseconds) {
- talk_base::Thread::ProcessMessages(milliseconds);
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::DoSendNotification,
+ data));
}
-void MediatorThreadImpl::OnMessage(talk_base::Message* msg) {
- scoped_ptr<LoginData> data;
- switch (msg->message_id) {
- case CMD_LOGIN:
- DCHECK(msg->pdata);
- data.reset(reinterpret_cast<LoginData*>(msg->pdata));
- DoLogin(data.get());
- break;
- case CMD_DISCONNECT:
- DoDisconnect();
- break;
- case CMD_LISTEN_FOR_UPDATES:
- DoListenForUpdates();
- break;
- case CMD_SEND_NOTIFICATION: {
- DCHECK(msg->pdata);
- scoped_ptr<OutgoingNotificationMessageData> data(
- reinterpret_cast<OutgoingNotificationMessageData*>(msg->pdata));
- DoSendNotification(*data);
- break;
- }
- case CMD_SUBSCRIBE_FOR_UPDATES: {
- DCHECK(msg->pdata);
- scoped_ptr<SubscriptionData> subscription_data(
- reinterpret_cast<SubscriptionData*>(msg->pdata));
- DoSubscribeForUpdates(*subscription_data);
- break;
- }
- case CMD_PUMP_AUXILIARY_LOOPS:
- PumpAuxiliaryLoops();
- break;
- default:
- LOG(ERROR) << "P2P: Someone passed a bad message to the thread.";
- break;
- }
-}
-
-void MediatorThreadImpl::DoLogin(LoginData* login_data) {
+void MediatorThreadImpl::DoLogin(
+ const buzz::XmppClientSettings& settings) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
LOG(INFO) << "P2P: Thread logging into talk network.";
- buzz::XmppClientSettings& user_settings = login_data->user_settings;
network_change_notifier_.reset(
new chrome_common_net::NetworkChangeNotifierProxy(
@@ -164,7 +182,7 @@ void MediatorThreadImpl::DoLogin(LoginData* login_data) {
// Language is not used in the stanza so we default to |en|.
std::string lang = "en";
login_.reset(new notifier::Login(pump_.get(),
- user_settings,
+ settings,
options,
lang,
host_resolver_.get(),
@@ -189,17 +207,8 @@ void MediatorThreadImpl::DoLogin(LoginData* login_data) {
login_->StartConnection();
}
-void MediatorThreadImpl::OnInputDebug(const char* msg, int length) {
- string output(msg, length);
- LOG(INFO) << "P2P: OnInputDebug:" << output << ".";
-}
-
-void MediatorThreadImpl::OnOutputDebug(const char* msg, int length) {
- string output(msg, length);
- LOG(INFO) << "P2P: OnOutputDebug:" << output << ".";
-}
-
void MediatorThreadImpl::DoDisconnect() {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
LOG(INFO) << "P2P: Thread logging out of talk network.";
login_.reset();
// Delete the old pump while on the thread to ensure that everything is
@@ -211,14 +220,10 @@ void MediatorThreadImpl::DoDisconnect() {
}
void MediatorThreadImpl::DoSubscribeForUpdates(
- const SubscriptionData& subscription_data) {
- buzz::XmppClient* client = xmpp_client();
- // If there isn't an active xmpp client, return.
- if (!client) {
- return;
- }
+ const std::vector<std::string>& subscribed_services_list) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
SubscribeTask* subscription =
- new SubscribeTask(client, subscription_data.subscribed_services_list);
+ new SubscribeTask(xmpp_client(), subscribed_services_list);
subscription->SignalStatusUpdate.connect(
this,
&MediatorThreadImpl::OnSubscriptionStateChange);
@@ -226,40 +231,56 @@ void MediatorThreadImpl::DoSubscribeForUpdates(
}
void MediatorThreadImpl::DoListenForUpdates() {
- buzz::XmppClient* client = xmpp_client();
- // If there isn't an active xmpp client, return.
- if (!client) {
- return;
- }
- ListenTask* listener = new ListenTask(client);
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ ListenTask* listener = new ListenTask(xmpp_client());
listener->SignalUpdateAvailable.connect(
this,
- &MediatorThreadImpl::OnUpdateListenerMessage);
+ &MediatorThreadImpl::OnIncomingNotification);
listener->Start();
}
void MediatorThreadImpl::DoSendNotification(
- const OutgoingNotificationMessageData& data) {
- buzz::XmppClient* client = xmpp_client();
- // If there isn't an active xmpp client, return.
- if (!client) {
- return;
- }
- SendUpdateTask* task = new SendUpdateTask(client, data.notification_data);
+ const OutgoingNotificationData& data) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ SendUpdateTask* task = new SendUpdateTask(xmpp_client(), data);
task->SignalStatusUpdate.connect(
this,
- &MediatorThreadImpl::OnUpdateNotificationSent);
+ &MediatorThreadImpl::OnOutgoingNotification);
task->Start();
}
-void MediatorThreadImpl::OnUpdateListenerMessage(
+void MediatorThreadImpl::OnIncomingNotification(
const IncomingNotificationData& notification_data) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ parent_message_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this,
+ &MediatorThreadImpl::OnIncomingNotificationOnParentThread,
+ notification_data));
+}
+
+void MediatorThreadImpl::OnIncomingNotificationOnParentThread(
+ const IncomingNotificationData& notification_data) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
if (delegate_) {
delegate_->OnIncomingNotification(notification_data);
}
}
-void MediatorThreadImpl::OnUpdateNotificationSent(bool success) {
+void MediatorThreadImpl::OnOutgoingNotification(bool success) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ parent_message_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this,
+ &MediatorThreadImpl::OnOutgoingNotificationOnParentThread,
+ success));
+}
+
+void MediatorThreadImpl::OnOutgoingNotificationOnParentThread(
+ bool success) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
if (delegate_ && success) {
delegate_->OnOutgoingNotification();
}
@@ -267,6 +288,18 @@ void MediatorThreadImpl::OnUpdateNotificationSent(bool success) {
void MediatorThreadImpl::OnLoginFailureMessage(
const notifier::LoginFailure& failure) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ parent_message_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this,
+ &MediatorThreadImpl::OnLoginFailureMessageOnParentThread,
+ failure));
+}
+
+void MediatorThreadImpl::OnLoginFailureMessageOnParentThread(
+ const notifier::LoginFailure& failure) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
if (delegate_) {
delegate_->OnConnectionStateChange(false);
}
@@ -274,6 +307,18 @@ void MediatorThreadImpl::OnLoginFailureMessage(
void MediatorThreadImpl::OnClientStateChangeMessage(
LoginConnectionState state) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ parent_message_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this,
+ &MediatorThreadImpl::OnClientStateChangeMessageOnParentThread,
+ state));
+}
+
+void MediatorThreadImpl::OnClientStateChangeMessageOnParentThread(
+ LoginConnectionState state) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
switch (state) {
case STATE_CLOSED:
if (delegate_) {
@@ -301,16 +346,39 @@ void MediatorThreadImpl::OnClientStateChangeMessage(
}
void MediatorThreadImpl::OnSubscriptionStateChange(bool success) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ parent_message_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this,
+ &MediatorThreadImpl::OnSubscriptionStateChangeOnParentThread,
+ success));
+}
+
+void MediatorThreadImpl::OnSubscriptionStateChangeOnParentThread(
+ bool success) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
if (delegate_) {
delegate_->OnSubscriptionStateChange(success);
}
}
+MessageLoop* MediatorThreadImpl::worker_message_loop() {
+ MessageLoop* current_message_loop = MessageLoop::current();
+ DCHECK(current_message_loop);
+ MessageLoop* worker_message_loop = worker_thread_.message_loop();
+ DCHECK(worker_message_loop);
+ DCHECK(current_message_loop == parent_message_loop_ ||
+ current_message_loop == worker_message_loop);
+ return worker_message_loop;
+}
+
buzz::XmppClient* MediatorThreadImpl::xmpp_client() {
- if (!login_.get()) {
- return NULL;
- }
- return login_->xmpp_client();
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ DCHECK(login_.get());
+ buzz::XmppClient* xmpp_client = login_->xmpp_client();
+ DCHECK(xmpp_client);
+ return xmpp_client;
}
} // namespace notifier
diff --git a/chrome/common/net/notifier/listener/mediator_thread_impl.h b/chrome/common/net/notifier/listener/mediator_thread_impl.h
index 0a35051..30683e8 100644
--- a/chrome/common/net/notifier/listener/mediator_thread_impl.h
+++ b/chrome/common/net/notifier/listener/mediator_thread_impl.h
@@ -26,13 +26,20 @@
#include "base/logging.h"
#include "base/ref_counted.h"
#include "base/scoped_ptr.h"
+#include "base/thread.h"
#include "chrome/common/net/notifier/communicator/login.h"
+#include "chrome/common/net/notifier/communicator/login_connection_state.h"
#include "chrome/common/net/notifier/communicator/login_failure.h"
#include "chrome/common/net/notifier/listener/mediator_thread.h"
#include "talk/base/sigslot.h"
-#include "talk/base/thread.h"
#include "talk/xmpp/xmppclientsettings.h"
+class MessageLoop;
+
+namespace buzz {
+class XmppClient;
+} // namespace buzz
+
namespace chrome_common_net {
class NetworkChangeNotifierThread;
} // namespace chrome_common_net
@@ -46,64 +53,16 @@ namespace notifier {
class TaskPump;
} // namespace notifier
-namespace buzz {
-class XmppClient;
-} // namespace buzz
-
namespace talk_base {
class SocketServer;
+class Thread;
} // namespace talk_base
namespace notifier {
-enum MEDIATOR_CMD {
- CMD_LOGIN,
- CMD_DISCONNECT,
- CMD_LISTEN_FOR_UPDATES,
- CMD_SEND_NOTIFICATION,
- CMD_SUBSCRIBE_FOR_UPDATES,
- CMD_PUMP_AUXILIARY_LOOPS,
-};
-
-// Used to pass authentication information from the mediator to the thread.
-// Use new to allocate it on the heap, the thread will delete it for you.
-struct LoginData : public talk_base::MessageData {
- explicit LoginData(const buzz::XmppClientSettings& settings)
- : user_settings(settings) {
- }
- virtual ~LoginData() {}
-
- buzz::XmppClientSettings user_settings;
-};
-
-// Used to pass subscription information from the mediator to the thread.
-// Use new to allocate it on the heap, the thread will delete it for you.
-struct SubscriptionData : public talk_base::MessageData {
- explicit SubscriptionData(const std::vector<std::string>& services)
- : subscribed_services_list(services) {
- }
- virtual ~SubscriptionData() {}
-
- std::vector<std::string> subscribed_services_list;
-};
-
-// Used to pass outgoing notification information from the mediator to the
-// thread. Use new to allocate it on the heap, the thread will delete it
-// for you.
-struct OutgoingNotificationMessageData : public talk_base::MessageData {
- explicit OutgoingNotificationMessageData(
- const OutgoingNotificationData& data) : notification_data(data) {
- }
- virtual ~OutgoingNotificationMessageData() {}
-
- OutgoingNotificationData notification_data;
-};
-
class MediatorThreadImpl
: public MediatorThread,
- public sigslot::has_slots<>,
- public talk_base::MessageHandler,
- public talk_base::Thread {
+ public sigslot::has_slots<> {
public:
explicit MediatorThreadImpl(
chrome_common_net::NetworkChangeNotifierThread*
@@ -114,47 +73,58 @@ class MediatorThreadImpl
// Start the thread.
virtual void Start();
- virtual void Stop();
- virtual void Run();
// These are called from outside threads, by the talk mediator object.
// They add messages to a queue which we poll in this thread.
- void Login(const buzz::XmppClientSettings& settings);
- void Logout();
- void ListenForUpdates();
- void SubscribeForUpdates(
+ virtual void Login(const buzz::XmppClientSettings& settings);
+ virtual void Logout();
+ virtual void ListenForUpdates();
+ virtual void SubscribeForUpdates(
const std::vector<std::string>& subscribed_services_list);
- void SendNotification(const OutgoingNotificationData& data);
- void LogStanzas();
+ virtual void SendNotification(const OutgoingNotificationData& data);
private:
+ void StartLibjingleThread();
+ void PumpLibjingleLoop();
+ void StopLibjingleThread();
+
// Called from within the thread on internal events.
- void ProcessMessages(int cms);
- void OnMessage(talk_base::Message* msg);
- void DoLogin(LoginData* login_data);
+ void DoLogin(const buzz::XmppClientSettings& settings);
void DoDisconnect();
- void DoSubscribeForUpdates(const SubscriptionData& subscription_data);
+ void DoSubscribeForUpdates(
+ const std::vector<std::string>& subscribed_services_list);
void DoListenForUpdates();
void DoSendNotification(
- const OutgoingNotificationMessageData& data);
- void DoStanzaLogging();
- void PumpAuxiliaryLoops();
+ const OutgoingNotificationData& data);
- // These handle messages indicating an event happened in the outside world.
- void OnUpdateListenerMessage(
+ // These handle messages indicating an event happened in the outside
+ // world. These are all called from the worker thread.
+ void OnIncomingNotification(
const IncomingNotificationData& notification_data);
- void OnUpdateNotificationSent(bool success);
+ void OnOutgoingNotification(bool success);
void OnLoginFailureMessage(const notifier::LoginFailure& failure);
void OnClientStateChangeMessage(LoginConnectionState state);
void OnSubscriptionStateChange(bool success);
- void OnInputDebug(const char* msg, int length);
- void OnOutputDebug(const char* msg, int length);
+ // Equivalents of the above functions called from the parent thread.
+ void OnIncomingNotificationOnParentThread(
+ const IncomingNotificationData& notification_data);
+ void OnOutgoingNotificationOnParentThread(bool success);
+ void OnLoginFailureMessageOnParentThread(
+ const notifier::LoginFailure& failure);
+ void OnClientStateChangeMessageOnParentThread(
+ LoginConnectionState state);
+ void OnSubscriptionStateChangeOnParentThread(
+ bool success);
+
+ MessageLoop* worker_message_loop();
buzz::XmppClient* xmpp_client();
Delegate* delegate_;
+ MessageLoop* parent_message_loop_;
chrome_common_net::NetworkChangeNotifierThread*
network_change_notifier_thread_;
+ base::Thread worker_thread_;
scoped_ptr<net::NetworkChangeNotifier> network_change_notifier_;
scoped_refptr<net::HostResolver> host_resolver_;
@@ -164,6 +134,10 @@ class MediatorThreadImpl
// complete or the pump shuts down.
scoped_ptr<notifier::TaskPump> pump_;
scoped_ptr<notifier::Login> login_;
+
+ scoped_ptr<talk_base::SocketServer> socket_server_;
+ scoped_ptr<talk_base::Thread> libjingle_thread_;
+
DISALLOW_COPY_AND_ASSIGN(MediatorThreadImpl);
};
diff --git a/chrome/common/net/notifier/listener/talk_mediator_impl.cc b/chrome/common/net/notifier/listener/talk_mediator_impl.cc
index fbf9da7..2e977fc 100644
--- a/chrome/common/net/notifier/listener/talk_mediator_impl.cc
+++ b/chrome/common/net/notifier/listener/talk_mediator_impl.cc
@@ -46,6 +46,7 @@ TalkMediatorImpl::TalkMediatorImpl(
mediator_thread_(
new MediatorThreadImpl(network_change_notifier_thread)),
invalidate_xmpp_auth_token_(invalidate_xmpp_auth_token) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
// Ensure the SSL library is initialized.
SslInitializationSingleton::GetInstance()->RegisterClient();
@@ -57,11 +58,13 @@ TalkMediatorImpl::TalkMediatorImpl(MediatorThread *thread)
: delegate_(NULL),
mediator_thread_(thread),
invalidate_xmpp_auth_token_(false) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
// When testing we do not initialize the SSL library.
TalkMediatorInitialization(true);
}
void TalkMediatorImpl::TalkMediatorInitialization(bool should_connect) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
if (should_connect) {
mediator_thread_->SetDelegate(this);
state_.connected = 1;
@@ -71,76 +74,64 @@ void TalkMediatorImpl::TalkMediatorInitialization(bool should_connect) {
}
TalkMediatorImpl::~TalkMediatorImpl() {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
if (state_.started) {
Logout();
}
}
bool TalkMediatorImpl::Login() {
- bool should_log_in = false;
- buzz::XmppClientSettings xmpp_settings;
- {
- AutoLock lock(mutex_);
- // Connect to the mediator thread and start processing messages.
- if (!state_.connected) {
- mediator_thread_->SetDelegate(this);
- state_.connected = 1;
- }
- should_log_in =
- state_.initialized && !state_.logging_in && !state_.logged_in;
- state_.logging_in = should_log_in;
- xmpp_settings = xmpp_settings_;
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ // Connect to the mediator thread and start processing messages.
+ if (!state_.connected) {
+ mediator_thread_->SetDelegate(this);
+ state_.connected = 1;
}
- if (should_log_in) {
- mediator_thread_->Login(xmpp_settings);
+ if (state_.initialized && !state_.logging_in && !state_.logged_in) {
+ state_.logging_in = true;
+ mediator_thread_->Login(xmpp_settings_);
+ return true;
}
- return should_log_in;
+ return false;
}
bool TalkMediatorImpl::Logout() {
- bool logging_out = false;
- {
- AutoLock lock(mutex_);
- if (state_.connected) {
- state_.connected = 0;
- }
- logging_out = state_.started;
- if (logging_out) {
- state_.started = 0;
- state_.logging_in = 0;
- state_.logged_in = 0;
- state_.subscribed = 0;
- }
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ if (state_.connected) {
+ state_.connected = 0;
}
- if (logging_out) {
+ if (state_.started) {
+ state_.started = 0;
+ state_.logging_in = 0;
+ state_.logged_in = 0;
+ state_.subscribed = 0;
// We do not want to be called back during logout since we may be
// closing.
mediator_thread_->SetDelegate(NULL);
mediator_thread_->Logout();
+ return true;
}
- return logging_out;
+ return false;
}
bool TalkMediatorImpl::SendNotification(const OutgoingNotificationData& data) {
- bool can_send_notification = false;
- {
- AutoLock lock(mutex_);
- can_send_notification = state_.logged_in && state_.subscribed;
- }
- if (can_send_notification) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ if (state_.logged_in && state_.subscribed) {
mediator_thread_->SendNotification(data);
+ return true;
}
- return can_send_notification;
+ return false;
}
void TalkMediatorImpl::SetDelegate(TalkMediator::Delegate* delegate) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
delegate_ = delegate;
}
bool TalkMediatorImpl::SetAuthToken(const std::string& email,
const std::string& token,
const std::string& token_service) {
- AutoLock lock(mutex_);
+ DCHECK(non_thread_safe_.CalledOnValidThread());
// Verify that we can create a JID from the email provided.
buzz::Jid jid = buzz::Jid(email);
@@ -163,13 +154,9 @@ bool TalkMediatorImpl::SetAuthToken(const std::string& email,
void TalkMediatorImpl::AddSubscribedServiceUrl(
const std::string& service_url) {
- bool logged_in = false;
- {
- AutoLock lock(mutex_);
- subscribed_services_list_.push_back(service_url);
- logged_in = state_.logged_in;
- }
- if (logged_in) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ subscribed_services_list_.push_back(service_url);
+ if (state_.logged_in) {
LOG(INFO) << "Resubscribing for updates, a new service got added";
mediator_thread_->SubscribeForUpdates(subscribed_services_list_);
}
@@ -177,11 +164,9 @@ void TalkMediatorImpl::AddSubscribedServiceUrl(
void TalkMediatorImpl::OnConnectionStateChange(bool logged_in) {
- {
- AutoLock lock(mutex_);
- state_.logging_in = 0;
- state_.logged_in = logged_in;
- }
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ state_.logging_in = 0;
+ state_.logged_in = logged_in;
if (logged_in) {
LOG(INFO) << "P2P: Logged in.";
// ListenForUpdates enables the ListenTask. This is done before
@@ -196,10 +181,8 @@ void TalkMediatorImpl::OnConnectionStateChange(bool logged_in) {
}
void TalkMediatorImpl::OnSubscriptionStateChange(bool subscribed) {
- {
- AutoLock lock(mutex_);
- state_.subscribed = subscribed;
- }
+ DCHECK(non_thread_safe_.CalledOnValidThread());
+ state_.subscribed = subscribed;
LOG(INFO) << "P2P: " << (subscribed ? "subscribed" : "unsubscribed");
if (delegate_) {
delegate_->OnNotificationStateChange(subscribed);
@@ -208,6 +191,7 @@ void TalkMediatorImpl::OnSubscriptionStateChange(bool subscribed) {
void TalkMediatorImpl::OnIncomingNotification(
const IncomingNotificationData& notification_data) {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
LOG(INFO) << "P2P: Updates are available on the server.";
if (delegate_) {
delegate_->OnIncomingNotification(notification_data);
@@ -215,6 +199,7 @@ void TalkMediatorImpl::OnIncomingNotification(
}
void TalkMediatorImpl::OnOutgoingNotification() {
+ DCHECK(non_thread_safe_.CalledOnValidThread());
LOG(INFO) <<
"P2P: Peers were notified that updates are available on the server.";
if (delegate_) {
diff --git a/chrome/common/net/notifier/listener/talk_mediator_impl.h b/chrome/common/net/notifier/listener/talk_mediator_impl.h
index 4ff4681..8625fa4 100644
--- a/chrome/common/net/notifier/listener/talk_mediator_impl.h
+++ b/chrome/common/net/notifier/listener/talk_mediator_impl.h
@@ -12,7 +12,7 @@
#include <string>
#include <vector>
-#include "base/lock.h"
+#include "base/non_thread_safe.h"
#include "base/scoped_ptr.h"
#include "chrome/common/net/notifier/listener/mediator_thread.h"
#include "chrome/common/net/notifier/listener/talk_mediator.h"
@@ -82,11 +82,7 @@ class TalkMediatorImpl
// mediator thread's SignalStateChange object.
void TalkMediatorInitialization(bool should_connect);
- // Protects state_, xmpp_settings_, and subscribed_services_list_.
- //
- // TODO(akalin): Remove this once we use this class from one thread
- // only.
- Lock mutex_;
+ NonThreadSafe non_thread_safe_;
// Delegate, which we don't own. May be NULL.
TalkMediator::Delegate* delegate_;
diff --git a/chrome/common/net/notifier/listener/talk_mediator_unittest.cc b/chrome/common/net/notifier/listener/talk_mediator_unittest.cc
index 1a8b0b4..ea9d222 100644
--- a/chrome/common/net/notifier/listener/talk_mediator_unittest.cc
+++ b/chrome/common/net/notifier/listener/talk_mediator_unittest.cc
@@ -6,6 +6,7 @@
#include "base/basictypes.h"
#include "base/logging.h"
+#include "base/message_loop.h"
#include "chrome/common/net/fake_network_change_notifier_thread.h"
#include "chrome/common/net/notifier/listener/mediator_thread_mock.h"
#include "chrome/common/net/notifier/listener/talk_mediator_impl.h"
@@ -42,6 +43,9 @@ class TalkMediatorImplTest : public testing::Test {
int last_message_;
private:
+ // TalkMediatorImpl expects a message loop.
+ MessageLoop message_loop_;
+
DISALLOW_COPY_AND_ASSIGN(TalkMediatorImplTest);
};