diff options
author | sanjeevr@chromium.org <sanjeevr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-04-07 22:27:43 +0000 |
---|---|---|
committer | sanjeevr@chromium.org <sanjeevr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-04-07 22:27:43 +0000 |
commit | 942e202a50d64e0f21ebd872edcbfd857a320040 (patch) | |
tree | 36fa0759981ec2f4b1cba99bdb8b6b3d2e26627e /jingle | |
parent | 316d6c6b8c65f182af6f04bc68fbcd281cd43b28 (diff) | |
download | chromium_src-942e202a50d64e0f21ebd872edcbfd857a320040.zip chromium_src-942e202a50d64e0f21ebd872edcbfd857a320040.tar.gz chromium_src-942e202a50d64e0f21ebd872edcbfd857a320040.tar.bz2 |
Changed jingle to use the IO thread from the passed in URLRequestContextGetter for doing its I/O rather than creating its own worker thread.
BUG=None
TEST=Unit-tests, Cloud print notifications, sync notifications.
Review URL: http://codereview.chromium.org/6793047
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@80853 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'jingle')
-rw-r--r-- | jingle/notifier/base/notifier_options.cc | 2 | ||||
-rw-r--r-- | jingle/notifier/base/notifier_options.h | 8 | ||||
-rw-r--r-- | jingle/notifier/listener/mediator_thread_impl.cc | 328 | ||||
-rw-r--r-- | jingle/notifier/listener/mediator_thread_impl.h | 72 |
4 files changed, 214 insertions, 196 deletions
diff --git a/jingle/notifier/base/notifier_options.cc b/jingle/notifier/base/notifier_options.cc index 2b4b56a..dbb1d4c 100644 --- a/jingle/notifier/base/notifier_options.cc +++ b/jingle/notifier/base/notifier_options.cc @@ -15,4 +15,6 @@ NotifierOptions::NotifierOptions() notification_method(kDefaultNotificationMethod), auth_mechanism(GaiaTokenPreXmppAuth::kDefaultAuthMechanism) {} +NotifierOptions::~NotifierOptions() { } + } // namespace notifier diff --git a/jingle/notifier/base/notifier_options.h b/jingle/notifier/base/notifier_options.h index 5cd4243..0d7b471 100644 --- a/jingle/notifier/base/notifier_options.h +++ b/jingle/notifier/base/notifier_options.h @@ -1,4 +1,4 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -7,13 +7,16 @@ #include <string> +#include "base/memory/ref_counted.h" #include "jingle/notifier/base/notification_method.h" #include "net/base/host_port_pair.h" +#include "net/url_request/url_request_context_getter.h" namespace notifier { struct NotifierOptions { NotifierOptions(); + ~NotifierOptions(); // Indicates that the SSLTCP port (443) is to be tried before the the XMPP // port (5222) during login. @@ -37,6 +40,9 @@ struct NotifierOptions { // Specifies the auth mechanism to use ("X-GOOGLE-TOKEN", "X-OAUTH2", etc), std::string auth_mechanism; + + // The URLRequestContextGetter to use for doing I/O. + scoped_refptr<net::URLRequestContextGetter> request_context_getter; }; } // namespace notifier diff --git a/jingle/notifier/listener/mediator_thread_impl.cc b/jingle/notifier/listener/mediator_thread_impl.cc index 0d96d01..9ef6ea9 100644 --- a/jingle/notifier/listener/mediator_thread_impl.cc +++ b/jingle/notifier/listener/mediator_thread_impl.cc @@ -5,162 +5,121 @@ #include "jingle/notifier/listener/mediator_thread_impl.h" #include "base/logging.h" +#include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" #include "base/message_loop.h" +#include "base/observer_list_threadsafe.h" +#include "base/threading/thread.h" #include "jingle/notifier/base/const_communicator.h" #include "jingle/notifier/base/notifier_options_util.h" #include "jingle/notifier/base/task_pump.h" #include "jingle/notifier/communicator/connection_options.h" +#include "jingle/notifier/communicator/login.h" #include "jingle/notifier/communicator/xmpp_connection_generator.h" +#include "jingle/notifier/listener/push_notifications_listen_task.h" #include "jingle/notifier/listener/push_notifications_send_update_task.h" -#include "net/base/cert_verifier.h" +#include "jingle/notifier/listener/push_notifications_subscribe_task.h" #include "net/base/host_port_pair.h" -#include "net/base/host_resolver.h" +#include "net/url_request/url_request_context.h" +#include "net/url_request/url_request_context_getter.h" #include "talk/xmpp/xmppclientsettings.h" namespace notifier { -MediatorThreadImpl::MediatorThreadImpl( +class MediatorThreadImpl::Core + : public base::RefCountedThreadSafe<MediatorThreadImpl::Core>, + public LoginDelegate, + public PushNotificationsListenTaskDelegate, + public PushNotificationsSubscribeTaskDelegate { + public: + // Invoked on the caller thread. + explicit Core(const NotifierOptions& notifier_options); + void AddObserver(Observer* observer); + void RemoveObserver(Observer* observer); + + // Login::Delegate implementation. Called on I/O thread. + virtual void OnConnect(base::WeakPtr<talk_base::Task> base_task); + virtual void OnDisconnect(); + + // PushNotificationsListenTaskDelegate implementation. Called on I/O thread. + virtual void OnNotificationReceived( + const Notification& notification); + // PushNotificationsSubscribeTaskDelegate implementation. Called on I/O + // thread. + virtual void OnSubscribed(); + virtual void OnSubscriptionError(); + + // Helpers invoked on I/O thread. + void Login(const buzz::XmppClientSettings& settings); + void Disconnect(); + void ListenForPushNotifications(); + void SubscribeForPushNotifications( + const SubscriptionList& subscriptions); + void SendNotification(const Notification& data); + void UpdateXmppSettings(const buzz::XmppClientSettings& settings); + + private: + friend class base::RefCountedThreadSafe<MediatorThreadImpl::Core>; + // Invoked on either the caller thread or the I/O thread. + ~Core(); + scoped_refptr<ObserverListThreadSafe<Observer> > observers_; + base::WeakPtr<talk_base::Task> base_task_; + + const NotifierOptions notifier_options_; + + scoped_ptr<notifier::Login> login_; + DISALLOW_COPY_AND_ASSIGN(Core); +}; + +MediatorThreadImpl::Core::Core( const NotifierOptions& notifier_options) : observers_(new ObserverListThreadSafe<Observer>()), - construction_message_loop_(MessageLoop::current()), - method_message_loop_(NULL), - notifier_options_(notifier_options), - worker_thread_("MediatorThread worker thread") { - DCHECK(construction_message_loop_); + notifier_options_(notifier_options) { + DCHECK(notifier_options_.request_context_getter); } -MediatorThreadImpl::~MediatorThreadImpl() { - DCHECK_EQ(MessageLoop::current(), construction_message_loop_); - // If the worker thread is still around, we need to call Logout() so - // that all the variables living it get destroyed properly (i.e., on - // the worker thread). - if (worker_thread_.IsRunning()) { - Logout(); - } +MediatorThreadImpl::Core::~Core() { } -void MediatorThreadImpl::AddObserver(Observer* observer) { - CheckOrSetValidThread(); +void MediatorThreadImpl::Core::AddObserver(Observer* observer) { observers_->AddObserver(observer); } -void MediatorThreadImpl::RemoveObserver(Observer* observer) { - CheckOrSetValidThread(); +void MediatorThreadImpl::Core::RemoveObserver(Observer* observer) { observers_->RemoveObserver(observer); } -void MediatorThreadImpl::Start() { - DCHECK_EQ(MessageLoop::current(), construction_message_loop_); - // We create the worker thread as an IO thread in preparation for - // making this use Chrome sockets. - const base::Thread::Options options(MessageLoop::TYPE_IO, 0); - // TODO(akalin): Make this function return a bool and remove this - // CHECK(). - CHECK(worker_thread_.StartWithOptions(options)); -} - -void MediatorThreadImpl::Login(const buzz::XmppClientSettings& settings) { - CheckOrSetValidThread(); - - worker_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(this, &MediatorThreadImpl::DoLogin, settings)); -} - -void MediatorThreadImpl::Logout() { - CheckOrSetValidThread(); - worker_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(this, &MediatorThreadImpl::DoDisconnect)); - // TODO(akalin): Decomp this into a separate stop method. - worker_thread_.Stop(); - // worker_thread_ should have cleaned this up. It is OK to check this - // variable in this thread because worker_thread_ is gone by now. - CHECK(!login_.get()); -} - -void MediatorThreadImpl::ListenForUpdates() { - CheckOrSetValidThread(); - worker_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(this, - &MediatorThreadImpl::ListenForPushNotifications)); -} - -void MediatorThreadImpl::SubscribeForUpdates( - const SubscriptionList& subscriptions) { - CheckOrSetValidThread(); - worker_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod( - this, - &MediatorThreadImpl::SubscribeForPushNotifications, - subscriptions)); -} - -void MediatorThreadImpl::SendNotification( - const Notification& data) { - CheckOrSetValidThread(); - worker_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(this, &MediatorThreadImpl::DoSendNotification, - data)); -} - -void MediatorThreadImpl::UpdateXmppSettings( - const buzz::XmppClientSettings& settings) { - CheckOrSetValidThread(); - worker_message_loop()->PostTask( - FROM_HERE, - NewRunnableMethod(this, - &MediatorThreadImpl::DoUpdateXmppSettings, - settings)); -} - -MessageLoop* MediatorThreadImpl::worker_message_loop() { - MessageLoop* current_message_loop = MessageLoop::current(); - DCHECK(current_message_loop); - MessageLoop* worker_message_loop = worker_thread_.message_loop(); - DCHECK(worker_message_loop); - DCHECK(current_message_loop == method_message_loop_ || - current_message_loop == worker_message_loop); - return worker_message_loop; -} - - -void MediatorThreadImpl::DoLogin( - const buzz::XmppClientSettings& settings) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::Login(const buzz::XmppClientSettings& settings) { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); VLOG(1) << "P2P: Thread logging into talk network."; + // TODO(sanjeevr): Pass in the URLRequestContextGetter to Login. base_task_.reset(); - - host_resolver_.reset( - net::CreateSystemHostResolver(net::HostResolver::kDefaultParallelism, - NULL, NULL)); - cert_verifier_.reset(new net::CertVerifier); login_.reset(new notifier::Login(this, settings, notifier::ConnectionOptions(), - host_resolver_.get(), - cert_verifier_.get(), + notifier_options_.request_context_getter-> + GetURLRequestContext()->host_resolver(), + notifier_options_.request_context_getter-> + GetURLRequestContext()->cert_verifier(), GetServerList(notifier_options_), notifier_options_.try_ssltcp_first, notifier_options_.auth_mechanism)); login_->StartConnection(); } -void MediatorThreadImpl::DoDisconnect() { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::Disconnect() { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); VLOG(1) << "P2P: Thread logging out of talk network."; login_.reset(); - cert_verifier_.reset(); - host_resolver_.reset(); base_task_.reset(); } -void MediatorThreadImpl::ListenForPushNotifications() { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::ListenForPushNotifications() { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); if (!base_task_.get()) return; PushNotificationsListenTask* listener = @@ -168,36 +127,39 @@ void MediatorThreadImpl::ListenForPushNotifications() { listener->Start(); } -void MediatorThreadImpl::SubscribeForPushNotifications( +void MediatorThreadImpl::Core::SubscribeForPushNotifications( const SubscriptionList& subscriptions) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); if (!base_task_.get()) return; - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); PushNotificationsSubscribeTask* subscribe_task = new PushNotificationsSubscribeTask(base_task_, subscriptions, this); subscribe_task->Start(); } -void MediatorThreadImpl::OnSubscribed() { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::OnSubscribed() { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); observers_->Notify(&Observer::OnSubscriptionStateChange, true); } -void MediatorThreadImpl::OnSubscriptionError() { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::OnSubscriptionError() { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); observers_->Notify(&Observer::OnSubscriptionStateChange, false); } -void MediatorThreadImpl::OnNotificationReceived( +void MediatorThreadImpl::Core::OnNotificationReceived( const Notification& notification) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); observers_->Notify(&Observer::OnIncomingNotification, notification); } -void MediatorThreadImpl::DoSendNotification( - const Notification& data) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::SendNotification(const Notification& data) { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); if (!base_task_.get()) { return; } @@ -208,9 +170,10 @@ void MediatorThreadImpl::DoSendNotification( observers_->Notify(&Observer::OnOutgoingNotification); } -void MediatorThreadImpl::DoUpdateXmppSettings( +void MediatorThreadImpl::Core::UpdateXmppSettings( const buzz::XmppClientSettings& settings) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); VLOG(1) << "P2P: Thread Updating login settings."; // The caller should only call UpdateXmppSettings after a Login call. if (login_.get()) @@ -220,24 +183,115 @@ void MediatorThreadImpl::DoUpdateXmppSettings( "P2P: Thread UpdateXmppSettings called when login_ was NULL"; } - -void MediatorThreadImpl::OnConnect(base::WeakPtr<talk_base::Task> base_task) { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::OnConnect( + base::WeakPtr<talk_base::Task> base_task) { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); base_task_ = base_task; observers_->Notify(&Observer::OnConnectionStateChange, true); } -void MediatorThreadImpl::OnDisconnect() { - DCHECK_EQ(MessageLoop::current(), worker_message_loop()); +void MediatorThreadImpl::Core::OnDisconnect() { + DCHECK(notifier_options_.request_context_getter->GetIOMessageLoopProxy()-> + BelongsToCurrentThread()); base_task_.reset(); observers_->Notify(&Observer::OnConnectionStateChange, false); } + +MediatorThreadImpl::MediatorThreadImpl(const NotifierOptions& notifier_options) + : core_(new Core(notifier_options)), + construction_message_loop_proxy_( + base::MessageLoopProxy::CreateForCurrentThread()), + io_message_loop_proxy_( + notifier_options.request_context_getter->GetIOMessageLoopProxy()) { +} + +MediatorThreadImpl::~MediatorThreadImpl() { + DCHECK(construction_message_loop_proxy_->BelongsToCurrentThread()); + LogoutImpl(); +} + +void MediatorThreadImpl::AddObserver(Observer* observer) { + CheckOrSetValidThread(); + core_->AddObserver(observer); +} + +void MediatorThreadImpl::RemoveObserver(Observer* observer) { + CheckOrSetValidThread(); + core_->RemoveObserver(observer); +} + +void MediatorThreadImpl::Start() { + DCHECK(construction_message_loop_proxy_->BelongsToCurrentThread()); +} + +void MediatorThreadImpl::Login(const buzz::XmppClientSettings& settings) { + CheckOrSetValidThread(); + io_message_loop_proxy_->PostTask( + FROM_HERE, + NewRunnableMethod(core_.get(), + &MediatorThreadImpl::Core::Login, + settings)); +} + +void MediatorThreadImpl::Logout() { + CheckOrSetValidThread(); + LogoutImpl(); +} + +void MediatorThreadImpl::ListenForUpdates() { + CheckOrSetValidThread(); + io_message_loop_proxy_->PostTask( + FROM_HERE, + NewRunnableMethod(core_.get(), + &MediatorThreadImpl::Core::ListenForPushNotifications)); +} + +void MediatorThreadImpl::SubscribeForUpdates( + const SubscriptionList& subscriptions) { + CheckOrSetValidThread(); + io_message_loop_proxy_->PostTask( + FROM_HERE, + NewRunnableMethod( + core_.get(), + &MediatorThreadImpl::Core::SubscribeForPushNotifications, + subscriptions)); +} + +void MediatorThreadImpl::SendNotification( + const Notification& data) { + CheckOrSetValidThread(); + io_message_loop_proxy_->PostTask( + FROM_HERE, + NewRunnableMethod(core_.get(), + &MediatorThreadImpl::Core::SendNotification, + data)); +} + +void MediatorThreadImpl::UpdateXmppSettings( + const buzz::XmppClientSettings& settings) { + CheckOrSetValidThread(); + io_message_loop_proxy_->PostTask( + FROM_HERE, + NewRunnableMethod(core_.get(), + &MediatorThreadImpl::Core::UpdateXmppSettings, + settings)); +} + +void MediatorThreadImpl::LogoutImpl() { + io_message_loop_proxy_->PostTask( + FROM_HERE, + NewRunnableMethod(core_.get(), + &MediatorThreadImpl::Core::Disconnect)); +} + void MediatorThreadImpl::CheckOrSetValidThread() { - if (method_message_loop_) { - DCHECK_EQ(MessageLoop::current(), method_message_loop_); + if (method_message_loop_proxy_) { + DCHECK(method_message_loop_proxy_->BelongsToCurrentThread()); } else { - method_message_loop_ = MessageLoop::current(); + method_message_loop_proxy_ = + base::MessageLoopProxy::CreateForCurrentThread(); } } diff --git a/jingle/notifier/listener/mediator_thread_impl.h b/jingle/notifier/listener/mediator_thread_impl.h index 0d1f1c4..4015a81 100644 --- a/jingle/notifier/listener/mediator_thread_impl.h +++ b/jingle/notifier/listener/mediator_thread_impl.h @@ -25,32 +25,21 @@ #include "base/basictypes.h" #include "base/memory/ref_counted.h" -#include "base/memory/scoped_ptr.h" -#include "base/memory/weak_ptr.h" -#include "base/observer_list_threadsafe.h" #include "base/task.h" -#include "base/threading/thread.h" #include "jingle/notifier/base/notifier_options.h" -#include "jingle/notifier/communicator/login.h" #include "jingle/notifier/listener/mediator_thread.h" -#include "jingle/notifier/listener/push_notifications_listen_task.h" -#include "jingle/notifier/listener/push_notifications_subscribe_task.h" -class MessageLoop; +namespace base { +class MessageLoopProxy; +} namespace buzz { class XmppClientSettings; } // namespace buzz -namespace net { -class HostResolver; -} // namespace net - namespace notifier { -class MediatorThreadImpl : public MediatorThread, public LoginDelegate, - public PushNotificationsListenTaskDelegate, - public PushNotificationsSubscribeTaskDelegate { +class MediatorThreadImpl : public MediatorThread { public: explicit MediatorThreadImpl(const NotifierOptions& notifier_options); virtual ~MediatorThreadImpl(); @@ -70,54 +59,21 @@ class MediatorThreadImpl : public MediatorThread, public LoginDelegate, virtual void SendNotification(const Notification& data); virtual void UpdateXmppSettings(const buzz::XmppClientSettings& settings); - // Login::Delegate implementation. - virtual void OnConnect(base::WeakPtr<talk_base::Task> base_task); - virtual void OnDisconnect(); - - // PushNotificationsListenTaskDelegate implementation. - virtual void OnNotificationReceived( - const Notification& notification); - // PushNotificationsSubscribeTaskDelegate implementation. - virtual void OnSubscribed(); - virtual void OnSubscriptionError(); - - protected: - // Should only be called after Start(). - MessageLoop* worker_message_loop(); - - scoped_refptr<ObserverListThreadSafe<Observer> > observers_; - MessageLoop* construction_message_loop_; - MessageLoop* method_message_loop_; - base::WeakPtr<talk_base::Task> base_task_; - private: - void DoLogin(const buzz::XmppClientSettings& settings); - void DoDisconnect(); - - // Helpers invoked on worker thread. - void ListenForPushNotifications(); - void SubscribeForPushNotifications( - const SubscriptionList& subscriptions); - - void DoSendNotification( - const Notification& data); - void DoUpdateXmppSettings(const buzz::XmppClientSettings& settings); void CheckOrSetValidThread(); - - const NotifierOptions notifier_options_; - - base::Thread worker_thread_; - scoped_ptr<net::HostResolver> host_resolver_; - scoped_ptr<net::CertVerifier> cert_verifier_; - - scoped_ptr<notifier::Login> login_; - + // The logic of Logout without the thread check so it can be called in the + // d'tor. + void LogoutImpl(); + // The real guts of MediatorThreadImpl, which allows this class to not be + // refcounted. + class Core; + scoped_refptr<Core> core_; + scoped_refptr<base::MessageLoopProxy> construction_message_loop_proxy_; + scoped_refptr<base::MessageLoopProxy> method_message_loop_proxy_; + scoped_refptr<base::MessageLoopProxy> io_message_loop_proxy_; DISALLOW_COPY_AND_ASSIGN(MediatorThreadImpl); }; } // namespace notifier -// We manage the lifetime of notifier::MediatorThreadImpl ourselves. -DISABLE_RUNNABLE_METHOD_REFCOUNT(notifier::MediatorThreadImpl); - #endif // JINGLE_NOTIFIER_LISTENER_MEDIATOR_THREAD_IMPL_H_ |