diff options
author | fgorski@chromium.org <fgorski@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-12-21 01:47:30 +0000 |
---|---|---|
committer | fgorski@chromium.org <fgorski@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-12-21 01:47:30 +0000 |
commit | 3f2dff75548beed15bc8ee3b43110cb93d4c72d3 (patch) | |
tree | bbbdbdc77c4fa3d3ddc520bef9afc2e4b4a82069 | |
parent | f597a330e1c21b0bdfdc516e5d766a367ab96d64 (diff) | |
download | chromium_src-3f2dff75548beed15bc8ee3b43110cb93d4c72d3.zip chromium_src-3f2dff75548beed15bc8ee3b43110cb93d4c72d3.tar.gz chromium_src-3f2dff75548beed15bc8ee3b43110cb93d4c72d3.tar.bz2 |
Moving the RMQStore Creation and Loading outside of the MCSClient
BUG=284553
Review URL: https://codereview.chromium.org/106523005
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@242243 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | google_apis/gcm/engine/mcs_client.cc | 173 | ||||
-rw-r--r-- | google_apis/gcm/engine/mcs_client.h | 18 | ||||
-rw-r--r-- | google_apis/gcm/engine/mcs_client_unittest.cc | 30 | ||||
-rw-r--r-- | google_apis/gcm/tools/mcs_probe.cc | 17 |
4 files changed, 119 insertions, 119 deletions
diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc index e8291f0..07f1004 100644 --- a/google_apis/gcm/engine/mcs_client.cc +++ b/google_apis/gcm/engine/mcs_client.cc @@ -86,10 +86,7 @@ ReliablePacketInfo::ReliablePacketInfo() } ReliablePacketInfo::~ReliablePacketInfo() {} -MCSClient::MCSClient( - const base::FilePath& rmq_path, - ConnectionFactory* connection_factory, - scoped_refptr<base::SequencedTaskRunner> blocking_task_runner) +MCSClient::MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store) : state_(UNINITIALIZED), android_id_(0), security_token_(0), @@ -99,11 +96,10 @@ MCSClient::MCSClient( last_server_to_device_stream_id_received_(0), stream_id_out_(0), stream_id_in_(0), - rmq_store_(rmq_path, blocking_task_runner), + rmq_store_(rmq_store), heartbeat_interval_( base::TimeDelta::FromSeconds(kHeartbeatDefaultSeconds)), heartbeat_timer_(true, true), - blocking_task_runner_(blocking_task_runner), weak_ptr_factory_(this) { } @@ -113,16 +109,13 @@ MCSClient::~MCSClient() { void MCSClient::Initialize( const InitializationCompleteCallback& initialization_callback, const OnMessageReceivedCallback& message_received_callback, - const OnMessageSentCallback& message_sent_callback) { + const OnMessageSentCallback& message_sent_callback, + const RMQStore::LoadResult& load_result) { DCHECK_EQ(state_, UNINITIALIZED); initialization_callback_ = initialization_callback; message_received_callback_ = message_received_callback; message_sent_callback_ = message_sent_callback; - state_ = LOADING; - rmq_store_.Load(base::Bind(&MCSClient::OnRMQLoadFinished, - weak_ptr_factory_.GetWeakPtr())); - connection_factory_->Initialize( base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, weak_ptr_factory_.GetWeakPtr()), @@ -131,6 +124,62 @@ void MCSClient::Initialize( base::Bind(&MCSClient::MaybeSendMessage, weak_ptr_factory_.GetWeakPtr())); connection_handler_ = connection_factory_->GetConnectionHandler(); + + // TODO(fgorski): Likely this whole check will be done outside in GCMClient. + if (!load_result.success) { + state_ = UNINITIALIZED; + LOG(ERROR) << "Failed to load/create RMQ state. Not connecting."; + return; + } + + state_ = LOADED; + stream_id_out_ = 1; // Login request is hardcoded to id 1. + + // TODO(fgorski): android_id and secutiry_token will be moved to GCMClient. + if (load_result.device_android_id == 0 || + load_result.device_security_token == 0) { + DVLOG(1) << "No device credentials found, assuming new client."; + initialization_callback_.Run(true, 0, 0); + return; + } + + android_id_ = load_result.device_android_id; + security_token_ = load_result.device_security_token; + + DVLOG(1) << "RMQ Load finished with " << load_result.incoming_messages.size() + << " incoming acks pending and " + << load_result.outgoing_messages.size() + << " outgoing messages pending."; + + restored_unackeds_server_ids_ = load_result.incoming_messages; + + // First go through and order the outgoing messages by recency. + std::map<uint64, google::protobuf::MessageLite*> ordered_messages; + for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator + iter = load_result.outgoing_messages.begin(); + iter != load_result.outgoing_messages.end(); ++iter) { + uint64 timestamp = 0; + if (!base::StringToUint64(iter->first, ×tamp)) { + LOG(ERROR) << "Invalid restored message."; + return; + } + ordered_messages[timestamp] = iter->second; + } + + // Now go through and add the outgoing messages to the send queue in their + // appropriate order (oldest at front, most recent at back). + for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator + iter = ordered_messages.begin(); + iter != ordered_messages.end(); ++iter) { + ReliablePacketInfo* packet_info = new ReliablePacketInfo(); + packet_info->protobuf.reset(iter->second); + packet_info->persistent_id = base::Uint64ToString(iter->first); + to_send_.push_back(make_linked_ptr(packet_info)); + } + + // TODO(fgorski): that is likely the only place where the initialization + // callback could be used. + initialization_callback_.Run(true, android_id_, security_token_); } void MCSClient::Login(uint64 android_id, uint64 security_token) { @@ -141,10 +190,11 @@ void MCSClient::Login(uint64 android_id, uint64 security_token) { DCHECK(restored_unackeds_server_ids_.empty()); android_id_ = android_id; security_token_ = security_token; - rmq_store_.SetDeviceCredentials(android_id_, - security_token_, - base::Bind(&MCSClient::OnRMQUpdateFinished, - weak_ptr_factory_.GetWeakPtr())); + rmq_store_->SetDeviceCredentials( + android_id_, + security_token_, + base::Bind(&MCSClient::OnRMQUpdateFinished, + weak_ptr_factory_.GetWeakPtr())); } state_ = CONNECTING; @@ -175,11 +225,11 @@ void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { packet_info->persistent_id = persistent_id; SetPersistentId(persistent_id, packet_info->protobuf.get()); - rmq_store_.AddOutgoingMessage(persistent_id, - MCSMessage(message.tag(), - *(packet_info->protobuf)), - base::Bind(&MCSClient::OnRMQUpdateFinished, - weak_ptr_factory_.GetWeakPtr())); + rmq_store_->AddOutgoingMessage(persistent_id, + MCSMessage(message.tag(), + *(packet_info->protobuf)), + base::Bind(&MCSClient::OnRMQUpdateFinished, + weak_ptr_factory_.GetWeakPtr())); } else { // Check that there is an active connection to the endpoint. if (!connection_handler_->CanSendMessage()) { @@ -194,8 +244,8 @@ void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { } void MCSClient::Destroy() { - rmq_store_.Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, - weak_ptr_factory_.GetWeakPtr())); + rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, + weak_ptr_factory_.GetWeakPtr())); } void MCSClient::ResetStateAndBuildLoginRequest( @@ -259,58 +309,6 @@ void MCSClient::SendHeartbeat() { false); } -void MCSClient::OnRMQLoadFinished(const RMQStore::LoadResult& result) { - if (!result.success) { - state_ = UNINITIALIZED; - LOG(ERROR) << "Failed to load/create RMQ state. Not connecting."; - initialization_callback_.Run(false, 0, 0); - return; - } - state_ = LOADED; - stream_id_out_ = 1; // Login request is hardcoded to id 1. - - if (result.device_android_id == 0 || result.device_security_token == 0) { - DVLOG(1) << "No device credentials found, assuming new client."; - initialization_callback_.Run(true, 0, 0); - return; - } - - android_id_ = result.device_android_id; - security_token_ = result.device_security_token; - - DVLOG(1) << "RMQ Load finished with " << result.incoming_messages.size() - << " incoming acks pending and " << result.outgoing_messages.size() - << " outgoing messages pending."; - - restored_unackeds_server_ids_ = result.incoming_messages; - - // First go through and order the outgoing messages by recency. - std::map<uint64, google::protobuf::MessageLite*> ordered_messages; - for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator - iter = result.outgoing_messages.begin(); - iter != result.outgoing_messages.end(); ++iter) { - uint64 timestamp = 0; - if (!base::StringToUint64(iter->first, ×tamp)) { - LOG(ERROR) << "Invalid restored message."; - return; - } - ordered_messages[timestamp] = iter->second; - } - - // Now go through and add the outgoing messages to the send queue in their - // appropriate order (oldest at front, most recent at back). - for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator - iter = ordered_messages.begin(); - iter != ordered_messages.end(); ++iter) { - ReliablePacketInfo* packet_info = new ReliablePacketInfo(); - packet_info->protobuf.reset(iter->second); - packet_info->persistent_id = base::Uint64ToString(iter->first); - to_send_.push_back(make_linked_ptr(packet_info)); - } - - initialization_callback_.Run(true, android_id_, security_token_); -} - void MCSClient::OnRMQUpdateFinished(bool success) { LOG_IF(ERROR, !success) << "RMQ Update failed!"; // TODO(zea): Rebuild the store from scratch in case of persistence failure? @@ -430,9 +428,9 @@ void MCSClient::HandlePacketFromWire( ++stream_id_in_; if (!persistent_id.empty()) { unacked_server_ids_[stream_id_in_] = persistent_id; - rmq_store_.AddIncomingMessage(persistent_id, - base::Bind(&MCSClient::OnRMQUpdateFinished, - weak_ptr_factory_.GetWeakPtr())); + rmq_store_->AddIncomingMessage(persistent_id, + base::Bind(&MCSClient::OnRMQUpdateFinished, + weak_ptr_factory_.GetWeakPtr())); } DVLOG(1) << "Received message of type " << protobuf->GetTypeName() @@ -571,9 +569,10 @@ void MCSClient::HandleStreamAck(StreamId last_stream_id_received) { DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size() << " outgoing messages, " << to_resend_.size() << " remaining unacked"; - rmq_store_.RemoveOutgoingMessages(acked_outgoing_persistent_ids, - base::Bind(&MCSClient::OnRMQUpdateFinished, - weak_ptr_factory_.GetWeakPtr())); + rmq_store_->RemoveOutgoingMessages( + acked_outgoing_persistent_ids, + base::Bind(&MCSClient::OnRMQUpdateFinished, + weak_ptr_factory_.GetWeakPtr())); HandleServerConfirmedReceipt(last_stream_id_received); } @@ -614,9 +613,10 @@ void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) { DVLOG(1) << "Server acked " << id_list.size() << " messages, " << to_resend_.size() << " remaining unacked."; - rmq_store_.RemoveOutgoingMessages(id_list, - base::Bind(&MCSClient::OnRMQUpdateFinished, - weak_ptr_factory_.GetWeakPtr())); + rmq_store_->RemoveOutgoingMessages( + id_list, + base::Bind(&MCSClient::OnRMQUpdateFinished, + weak_ptr_factory_.GetWeakPtr())); // Resend any remaining outgoing messages, as they were not received by the // server. @@ -647,9 +647,10 @@ void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) { DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size() << " acknowledged server messages."; - rmq_store_.RemoveIncomingMessages(acked_incoming_ids, - base::Bind(&MCSClient::OnRMQUpdateFinished, - weak_ptr_factory_.GetWeakPtr())); + rmq_store_->RemoveIncomingMessages( + acked_incoming_ids, + base::Bind(&MCSClient::OnRMQUpdateFinished, + weak_ptr_factory_.GetWeakPtr())); } MCSClient::PersistentId MCSClient::GetNextPersistentId() { diff --git a/google_apis/gcm/engine/mcs_client.h b/google_apis/gcm/engine/mcs_client.h index 4de62cb..8087e4e 100644 --- a/google_apis/gcm/engine/mcs_client.h +++ b/google_apis/gcm/engine/mcs_client.h @@ -42,7 +42,6 @@ class GCM_EXPORT MCSClient { public: enum State { UNINITIALIZED, // Uninitialized. - LOADING, // Waiting for RMQ load to finish. LOADED, // RMQ Load finished, waiting to connect. CONNECTING, // Connection in progress. CONNECTED, // Connected and running. @@ -65,9 +64,7 @@ class GCM_EXPORT MCSClient { typedef base::Callback<void(const std::string& message_id)> OnMessageSentCallback; - MCSClient(const base::FilePath& rmq_path, - ConnectionFactory* connection_factory, - scoped_refptr<base::SequencedTaskRunner> blocking_task_runner); + MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store); virtual ~MCSClient(); // Initialize the client. Will load any previous id/token information as well @@ -81,7 +78,8 @@ class GCM_EXPORT MCSClient { // |initialization_callback| will be invoked with |success == false|. void Initialize(const InitializationCompleteCallback& initialization_callback, const OnMessageReceivedCallback& message_received_callback, - const OnMessageSentCallback& message_sent_callback); + const OnMessageSentCallback& message_sent_callback, + const RMQStore::LoadResult& load_result); // Logs the client into the server. Client must be initialized. // |android_id| and |security_token| are optional if this is not a new @@ -122,8 +120,7 @@ class GCM_EXPORT MCSClient { // Send a heartbeat to the MCS server. void SendHeartbeat(); - // RMQ Store callbacks. - void OnRMQLoadFinished(const RMQStore::LoadResult& result); + // RMQ Store callback. void OnRMQUpdateFinished(bool success); // Attempt to send a message. @@ -209,8 +206,8 @@ class GCM_EXPORT MCSClient { // acknowledged on the next login attempt. PersistentIdList restored_unackeds_server_ids_; - // The reliable message queue persistent store. - RMQStore rmq_store_; + // The reliable message queue persistent store. Owned by the caller. + RMQStore* rmq_store_; // ----- Heartbeats ----- // The current heartbeat interval. @@ -218,9 +215,6 @@ class GCM_EXPORT MCSClient { // Timer for triggering heartbeats. base::Timer heartbeat_timer_; - // The task runner for blocking tasks (i.e. persisting RMQ state to disk). - scoped_refptr<base::SequencedTaskRunner> blocking_task_runner_; - base::WeakPtrFactory<MCSClient> weak_ptr_factory_; DISALLOW_COPY_AND_ASSIGN(MCSClient); diff --git a/google_apis/gcm/engine/mcs_client_unittest.cc b/google_apis/gcm/engine/mcs_client_unittest.cc index 6ef1405..0dcf298 100644 --- a/google_apis/gcm/engine/mcs_client_unittest.cc +++ b/google_apis/gcm/engine/mcs_client_unittest.cc @@ -47,10 +47,9 @@ MCSMessage BuildDataMessage(const std::string& from, // MCSClient with overriden exposed persistent id logic. class TestMCSClient : public MCSClient { public: - TestMCSClient(const base::FilePath& rmq_path, - ConnectionFactory* connection_factory, - scoped_refptr<base::SequencedTaskRunner> blocking_task_runner) - : MCSClient(rmq_path, connection_factory, blocking_task_runner), + TestMCSClient(ConnectionFactory* connection_factory, + RMQStore* rmq_store) + : MCSClient(connection_factory, rmq_store), next_id_(0) { } @@ -96,6 +95,7 @@ class MCSClientTest : public testing::Test { base::ScopedTempDir temp_directory_; base::MessageLoop message_loop_; scoped_ptr<base::RunLoop> run_loop_; + scoped_ptr<RMQStore> rmq_store_; FakeConnectionFactory connection_factory_; scoped_ptr<TestMCSClient> mcs_client_; @@ -123,19 +123,21 @@ MCSClientTest::MCSClientTest() MCSClientTest::~MCSClientTest() {} void MCSClientTest::BuildMCSClient() { - mcs_client_.reset( - new TestMCSClient(temp_directory_.path(), - &connection_factory_, - message_loop_.message_loop_proxy())); + rmq_store_.reset(new RMQStore(temp_directory_.path(), + message_loop_.message_loop_proxy())); + mcs_client_.reset(new TestMCSClient(&connection_factory_, rmq_store_.get())); } void MCSClientTest::InitializeClient() { - mcs_client_->Initialize(base::Bind(&MCSClientTest::InitializationCallback, - base::Unretained(this)), - base::Bind(&MCSClientTest::MessageReceivedCallback, - base::Unretained(this)), - base::Bind(&MCSClientTest::MessageSentCallback, - base::Unretained(this))); + rmq_store_->Load( + base::Bind(&MCSClient::Initialize, + base::Unretained(mcs_client_.get()), + base::Bind(&MCSClientTest::InitializationCallback, + base::Unretained(this)), + base::Bind(&MCSClientTest::MessageReceivedCallback, + base::Unretained(this)), + base::Bind(&MCSClientTest::MessageSentCallback, + base::Unretained(this)))); run_loop_->Run(); run_loop_.reset(new base::RunLoop()); } diff --git a/google_apis/gcm/tools/mcs_probe.cc b/google_apis/gcm/tools/mcs_probe.cc index bc4ad7c..e958bd4 100644 --- a/google_apis/gcm/tools/mcs_probe.cc +++ b/google_apis/gcm/tools/mcs_probe.cc @@ -195,6 +195,7 @@ class MCSProbe { scoped_refptr<net::HttpNetworkSession> network_session_; scoped_ptr<net::ProxyService> proxy_service_; + scoped_ptr<RMQStore> rmq_store_; scoped_ptr<MCSClient> mcs_client_; scoped_ptr<ConnectionFactoryImpl> connection_factory_; @@ -249,14 +250,16 @@ void MCSProbe::Start() { server_host_, server_port_).ToString()), network_session_, &net_log_)); - mcs_client_.reset(new MCSClient(rmq_path_, - connection_factory_.get(), - file_thread_.message_loop_proxy())); + rmq_store_.reset(new RMQStore(rmq_path_, file_thread_.message_loop_proxy())); + mcs_client_.reset(new MCSClient(connection_factory_.get(), + rmq_store_.get())); run_loop_.reset(new base::RunLoop()); - mcs_client_->Initialize(base::Bind(&MCSProbe::InitializationCallback, - base::Unretained(this)), - base::Bind(&MessageReceivedCallback), - base::Bind(&MessageSentCallback)); + rmq_store_->Load(base::Bind(&MCSClient::Initialize, + base::Unretained(mcs_client_.get()), + base::Bind(&MCSProbe::InitializationCallback, + base::Unretained(this)), + base::Bind(&MessageReceivedCallback), + base::Bind(&MessageSentCallback))); run_loop_->Run(); } |