summaryrefslogtreecommitdiffstats
path: root/google_apis
diff options
context:
space:
mode:
authorfgorski@chromium.org <fgorski@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-12-21 01:47:30 +0000
committerfgorski@chromium.org <fgorski@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-12-21 01:47:30 +0000
commit3f2dff75548beed15bc8ee3b43110cb93d4c72d3 (patch)
treebbbdbdc77c4fa3d3ddc520bef9afc2e4b4a82069 /google_apis
parentf597a330e1c21b0bdfdc516e5d766a367ab96d64 (diff)
downloadchromium_src-3f2dff75548beed15bc8ee3b43110cb93d4c72d3.zip
chromium_src-3f2dff75548beed15bc8ee3b43110cb93d4c72d3.tar.gz
chromium_src-3f2dff75548beed15bc8ee3b43110cb93d4c72d3.tar.bz2
Moving the RMQStore Creation and Loading outside of the MCSClient
BUG=284553 Review URL: https://codereview.chromium.org/106523005 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@242243 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'google_apis')
-rw-r--r--google_apis/gcm/engine/mcs_client.cc173
-rw-r--r--google_apis/gcm/engine/mcs_client.h18
-rw-r--r--google_apis/gcm/engine/mcs_client_unittest.cc30
-rw-r--r--google_apis/gcm/tools/mcs_probe.cc17
4 files changed, 119 insertions, 119 deletions
diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc
index e8291f0..07f1004 100644
--- a/google_apis/gcm/engine/mcs_client.cc
+++ b/google_apis/gcm/engine/mcs_client.cc
@@ -86,10 +86,7 @@ ReliablePacketInfo::ReliablePacketInfo()
}
ReliablePacketInfo::~ReliablePacketInfo() {}
-MCSClient::MCSClient(
- const base::FilePath& rmq_path,
- ConnectionFactory* connection_factory,
- scoped_refptr<base::SequencedTaskRunner> blocking_task_runner)
+MCSClient::MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store)
: state_(UNINITIALIZED),
android_id_(0),
security_token_(0),
@@ -99,11 +96,10 @@ MCSClient::MCSClient(
last_server_to_device_stream_id_received_(0),
stream_id_out_(0),
stream_id_in_(0),
- rmq_store_(rmq_path, blocking_task_runner),
+ rmq_store_(rmq_store),
heartbeat_interval_(
base::TimeDelta::FromSeconds(kHeartbeatDefaultSeconds)),
heartbeat_timer_(true, true),
- blocking_task_runner_(blocking_task_runner),
weak_ptr_factory_(this) {
}
@@ -113,16 +109,13 @@ MCSClient::~MCSClient() {
void MCSClient::Initialize(
const InitializationCompleteCallback& initialization_callback,
const OnMessageReceivedCallback& message_received_callback,
- const OnMessageSentCallback& message_sent_callback) {
+ const OnMessageSentCallback& message_sent_callback,
+ const RMQStore::LoadResult& load_result) {
DCHECK_EQ(state_, UNINITIALIZED);
initialization_callback_ = initialization_callback;
message_received_callback_ = message_received_callback;
message_sent_callback_ = message_sent_callback;
- state_ = LOADING;
- rmq_store_.Load(base::Bind(&MCSClient::OnRMQLoadFinished,
- weak_ptr_factory_.GetWeakPtr()));
-
connection_factory_->Initialize(
base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
weak_ptr_factory_.GetWeakPtr()),
@@ -131,6 +124,62 @@ void MCSClient::Initialize(
base::Bind(&MCSClient::MaybeSendMessage,
weak_ptr_factory_.GetWeakPtr()));
connection_handler_ = connection_factory_->GetConnectionHandler();
+
+ // TODO(fgorski): Likely this whole check will be done outside in GCMClient.
+ if (!load_result.success) {
+ state_ = UNINITIALIZED;
+ LOG(ERROR) << "Failed to load/create RMQ state. Not connecting.";
+ return;
+ }
+
+ state_ = LOADED;
+ stream_id_out_ = 1; // Login request is hardcoded to id 1.
+
+ // TODO(fgorski): android_id and secutiry_token will be moved to GCMClient.
+ if (load_result.device_android_id == 0 ||
+ load_result.device_security_token == 0) {
+ DVLOG(1) << "No device credentials found, assuming new client.";
+ initialization_callback_.Run(true, 0, 0);
+ return;
+ }
+
+ android_id_ = load_result.device_android_id;
+ security_token_ = load_result.device_security_token;
+
+ DVLOG(1) << "RMQ Load finished with " << load_result.incoming_messages.size()
+ << " incoming acks pending and "
+ << load_result.outgoing_messages.size()
+ << " outgoing messages pending.";
+
+ restored_unackeds_server_ids_ = load_result.incoming_messages;
+
+ // First go through and order the outgoing messages by recency.
+ std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
+ for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator
+ iter = load_result.outgoing_messages.begin();
+ iter != load_result.outgoing_messages.end(); ++iter) {
+ uint64 timestamp = 0;
+ if (!base::StringToUint64(iter->first, &timestamp)) {
+ LOG(ERROR) << "Invalid restored message.";
+ return;
+ }
+ ordered_messages[timestamp] = iter->second;
+ }
+
+ // Now go through and add the outgoing messages to the send queue in their
+ // appropriate order (oldest at front, most recent at back).
+ for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator
+ iter = ordered_messages.begin();
+ iter != ordered_messages.end(); ++iter) {
+ ReliablePacketInfo* packet_info = new ReliablePacketInfo();
+ packet_info->protobuf.reset(iter->second);
+ packet_info->persistent_id = base::Uint64ToString(iter->first);
+ to_send_.push_back(make_linked_ptr(packet_info));
+ }
+
+ // TODO(fgorski): that is likely the only place where the initialization
+ // callback could be used.
+ initialization_callback_.Run(true, android_id_, security_token_);
}
void MCSClient::Login(uint64 android_id, uint64 security_token) {
@@ -141,10 +190,11 @@ void MCSClient::Login(uint64 android_id, uint64 security_token) {
DCHECK(restored_unackeds_server_ids_.empty());
android_id_ = android_id;
security_token_ = security_token;
- rmq_store_.SetDeviceCredentials(android_id_,
- security_token_,
- base::Bind(&MCSClient::OnRMQUpdateFinished,
- weak_ptr_factory_.GetWeakPtr()));
+ rmq_store_->SetDeviceCredentials(
+ android_id_,
+ security_token_,
+ base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
}
state_ = CONNECTING;
@@ -175,11 +225,11 @@ void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) {
packet_info->persistent_id = persistent_id;
SetPersistentId(persistent_id,
packet_info->protobuf.get());
- rmq_store_.AddOutgoingMessage(persistent_id,
- MCSMessage(message.tag(),
- *(packet_info->protobuf)),
- base::Bind(&MCSClient::OnRMQUpdateFinished,
- weak_ptr_factory_.GetWeakPtr()));
+ rmq_store_->AddOutgoingMessage(persistent_id,
+ MCSMessage(message.tag(),
+ *(packet_info->protobuf)),
+ base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
} else {
// Check that there is an active connection to the endpoint.
if (!connection_handler_->CanSendMessage()) {
@@ -194,8 +244,8 @@ void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) {
}
void MCSClient::Destroy() {
- rmq_store_.Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished,
- weak_ptr_factory_.GetWeakPtr()));
+ rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
}
void MCSClient::ResetStateAndBuildLoginRequest(
@@ -259,58 +309,6 @@ void MCSClient::SendHeartbeat() {
false);
}
-void MCSClient::OnRMQLoadFinished(const RMQStore::LoadResult& result) {
- if (!result.success) {
- state_ = UNINITIALIZED;
- LOG(ERROR) << "Failed to load/create RMQ state. Not connecting.";
- initialization_callback_.Run(false, 0, 0);
- return;
- }
- state_ = LOADED;
- stream_id_out_ = 1; // Login request is hardcoded to id 1.
-
- if (result.device_android_id == 0 || result.device_security_token == 0) {
- DVLOG(1) << "No device credentials found, assuming new client.";
- initialization_callback_.Run(true, 0, 0);
- return;
- }
-
- android_id_ = result.device_android_id;
- security_token_ = result.device_security_token;
-
- DVLOG(1) << "RMQ Load finished with " << result.incoming_messages.size()
- << " incoming acks pending and " << result.outgoing_messages.size()
- << " outgoing messages pending.";
-
- restored_unackeds_server_ids_ = result.incoming_messages;
-
- // First go through and order the outgoing messages by recency.
- std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
- for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator
- iter = result.outgoing_messages.begin();
- iter != result.outgoing_messages.end(); ++iter) {
- uint64 timestamp = 0;
- if (!base::StringToUint64(iter->first, &timestamp)) {
- LOG(ERROR) << "Invalid restored message.";
- return;
- }
- ordered_messages[timestamp] = iter->second;
- }
-
- // Now go through and add the outgoing messages to the send queue in their
- // appropriate order (oldest at front, most recent at back).
- for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator
- iter = ordered_messages.begin();
- iter != ordered_messages.end(); ++iter) {
- ReliablePacketInfo* packet_info = new ReliablePacketInfo();
- packet_info->protobuf.reset(iter->second);
- packet_info->persistent_id = base::Uint64ToString(iter->first);
- to_send_.push_back(make_linked_ptr(packet_info));
- }
-
- initialization_callback_.Run(true, android_id_, security_token_);
-}
-
void MCSClient::OnRMQUpdateFinished(bool success) {
LOG_IF(ERROR, !success) << "RMQ Update failed!";
// TODO(zea): Rebuild the store from scratch in case of persistence failure?
@@ -430,9 +428,9 @@ void MCSClient::HandlePacketFromWire(
++stream_id_in_;
if (!persistent_id.empty()) {
unacked_server_ids_[stream_id_in_] = persistent_id;
- rmq_store_.AddIncomingMessage(persistent_id,
- base::Bind(&MCSClient::OnRMQUpdateFinished,
- weak_ptr_factory_.GetWeakPtr()));
+ rmq_store_->AddIncomingMessage(persistent_id,
+ base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
}
DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
@@ -571,9 +569,10 @@ void MCSClient::HandleStreamAck(StreamId last_stream_id_received) {
DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size()
<< " outgoing messages, " << to_resend_.size()
<< " remaining unacked";
- rmq_store_.RemoveOutgoingMessages(acked_outgoing_persistent_ids,
- base::Bind(&MCSClient::OnRMQUpdateFinished,
- weak_ptr_factory_.GetWeakPtr()));
+ rmq_store_->RemoveOutgoingMessages(
+ acked_outgoing_persistent_ids,
+ base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
HandleServerConfirmedReceipt(last_stream_id_received);
}
@@ -614,9 +613,10 @@ void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
DVLOG(1) << "Server acked " << id_list.size()
<< " messages, " << to_resend_.size() << " remaining unacked.";
- rmq_store_.RemoveOutgoingMessages(id_list,
- base::Bind(&MCSClient::OnRMQUpdateFinished,
- weak_ptr_factory_.GetWeakPtr()));
+ rmq_store_->RemoveOutgoingMessages(
+ id_list,
+ base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
// Resend any remaining outgoing messages, as they were not received by the
// server.
@@ -647,9 +647,10 @@ void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) {
DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size()
<< " acknowledged server messages.";
- rmq_store_.RemoveIncomingMessages(acked_incoming_ids,
- base::Bind(&MCSClient::OnRMQUpdateFinished,
- weak_ptr_factory_.GetWeakPtr()));
+ rmq_store_->RemoveIncomingMessages(
+ acked_incoming_ids,
+ base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
}
MCSClient::PersistentId MCSClient::GetNextPersistentId() {
diff --git a/google_apis/gcm/engine/mcs_client.h b/google_apis/gcm/engine/mcs_client.h
index 4de62cb..8087e4e 100644
--- a/google_apis/gcm/engine/mcs_client.h
+++ b/google_apis/gcm/engine/mcs_client.h
@@ -42,7 +42,6 @@ class GCM_EXPORT MCSClient {
public:
enum State {
UNINITIALIZED, // Uninitialized.
- LOADING, // Waiting for RMQ load to finish.
LOADED, // RMQ Load finished, waiting to connect.
CONNECTING, // Connection in progress.
CONNECTED, // Connected and running.
@@ -65,9 +64,7 @@ class GCM_EXPORT MCSClient {
typedef base::Callback<void(const std::string& message_id)>
OnMessageSentCallback;
- MCSClient(const base::FilePath& rmq_path,
- ConnectionFactory* connection_factory,
- scoped_refptr<base::SequencedTaskRunner> blocking_task_runner);
+ MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store);
virtual ~MCSClient();
// Initialize the client. Will load any previous id/token information as well
@@ -81,7 +78,8 @@ class GCM_EXPORT MCSClient {
// |initialization_callback| will be invoked with |success == false|.
void Initialize(const InitializationCompleteCallback& initialization_callback,
const OnMessageReceivedCallback& message_received_callback,
- const OnMessageSentCallback& message_sent_callback);
+ const OnMessageSentCallback& message_sent_callback,
+ const RMQStore::LoadResult& load_result);
// Logs the client into the server. Client must be initialized.
// |android_id| and |security_token| are optional if this is not a new
@@ -122,8 +120,7 @@ class GCM_EXPORT MCSClient {
// Send a heartbeat to the MCS server.
void SendHeartbeat();
- // RMQ Store callbacks.
- void OnRMQLoadFinished(const RMQStore::LoadResult& result);
+ // RMQ Store callback.
void OnRMQUpdateFinished(bool success);
// Attempt to send a message.
@@ -209,8 +206,8 @@ class GCM_EXPORT MCSClient {
// acknowledged on the next login attempt.
PersistentIdList restored_unackeds_server_ids_;
- // The reliable message queue persistent store.
- RMQStore rmq_store_;
+ // The reliable message queue persistent store. Owned by the caller.
+ RMQStore* rmq_store_;
// ----- Heartbeats -----
// The current heartbeat interval.
@@ -218,9 +215,6 @@ class GCM_EXPORT MCSClient {
// Timer for triggering heartbeats.
base::Timer heartbeat_timer_;
- // The task runner for blocking tasks (i.e. persisting RMQ state to disk).
- scoped_refptr<base::SequencedTaskRunner> blocking_task_runner_;
-
base::WeakPtrFactory<MCSClient> weak_ptr_factory_;
DISALLOW_COPY_AND_ASSIGN(MCSClient);
diff --git a/google_apis/gcm/engine/mcs_client_unittest.cc b/google_apis/gcm/engine/mcs_client_unittest.cc
index 6ef1405..0dcf298 100644
--- a/google_apis/gcm/engine/mcs_client_unittest.cc
+++ b/google_apis/gcm/engine/mcs_client_unittest.cc
@@ -47,10 +47,9 @@ MCSMessage BuildDataMessage(const std::string& from,
// MCSClient with overriden exposed persistent id logic.
class TestMCSClient : public MCSClient {
public:
- TestMCSClient(const base::FilePath& rmq_path,
- ConnectionFactory* connection_factory,
- scoped_refptr<base::SequencedTaskRunner> blocking_task_runner)
- : MCSClient(rmq_path, connection_factory, blocking_task_runner),
+ TestMCSClient(ConnectionFactory* connection_factory,
+ RMQStore* rmq_store)
+ : MCSClient(connection_factory, rmq_store),
next_id_(0) {
}
@@ -96,6 +95,7 @@ class MCSClientTest : public testing::Test {
base::ScopedTempDir temp_directory_;
base::MessageLoop message_loop_;
scoped_ptr<base::RunLoop> run_loop_;
+ scoped_ptr<RMQStore> rmq_store_;
FakeConnectionFactory connection_factory_;
scoped_ptr<TestMCSClient> mcs_client_;
@@ -123,19 +123,21 @@ MCSClientTest::MCSClientTest()
MCSClientTest::~MCSClientTest() {}
void MCSClientTest::BuildMCSClient() {
- mcs_client_.reset(
- new TestMCSClient(temp_directory_.path(),
- &connection_factory_,
- message_loop_.message_loop_proxy()));
+ rmq_store_.reset(new RMQStore(temp_directory_.path(),
+ message_loop_.message_loop_proxy()));
+ mcs_client_.reset(new TestMCSClient(&connection_factory_, rmq_store_.get()));
}
void MCSClientTest::InitializeClient() {
- mcs_client_->Initialize(base::Bind(&MCSClientTest::InitializationCallback,
- base::Unretained(this)),
- base::Bind(&MCSClientTest::MessageReceivedCallback,
- base::Unretained(this)),
- base::Bind(&MCSClientTest::MessageSentCallback,
- base::Unretained(this)));
+ rmq_store_->Load(
+ base::Bind(&MCSClient::Initialize,
+ base::Unretained(mcs_client_.get()),
+ base::Bind(&MCSClientTest::InitializationCallback,
+ base::Unretained(this)),
+ base::Bind(&MCSClientTest::MessageReceivedCallback,
+ base::Unretained(this)),
+ base::Bind(&MCSClientTest::MessageSentCallback,
+ base::Unretained(this))));
run_loop_->Run();
run_loop_.reset(new base::RunLoop());
}
diff --git a/google_apis/gcm/tools/mcs_probe.cc b/google_apis/gcm/tools/mcs_probe.cc
index bc4ad7c..e958bd4 100644
--- a/google_apis/gcm/tools/mcs_probe.cc
+++ b/google_apis/gcm/tools/mcs_probe.cc
@@ -195,6 +195,7 @@ class MCSProbe {
scoped_refptr<net::HttpNetworkSession> network_session_;
scoped_ptr<net::ProxyService> proxy_service_;
+ scoped_ptr<RMQStore> rmq_store_;
scoped_ptr<MCSClient> mcs_client_;
scoped_ptr<ConnectionFactoryImpl> connection_factory_;
@@ -249,14 +250,16 @@ void MCSProbe::Start() {
server_host_, server_port_).ToString()),
network_session_,
&net_log_));
- mcs_client_.reset(new MCSClient(rmq_path_,
- connection_factory_.get(),
- file_thread_.message_loop_proxy()));
+ rmq_store_.reset(new RMQStore(rmq_path_, file_thread_.message_loop_proxy()));
+ mcs_client_.reset(new MCSClient(connection_factory_.get(),
+ rmq_store_.get()));
run_loop_.reset(new base::RunLoop());
- mcs_client_->Initialize(base::Bind(&MCSProbe::InitializationCallback,
- base::Unretained(this)),
- base::Bind(&MessageReceivedCallback),
- base::Bind(&MessageSentCallback));
+ rmq_store_->Load(base::Bind(&MCSClient::Initialize,
+ base::Unretained(mcs_client_.get()),
+ base::Bind(&MCSProbe::InitializationCallback,
+ base::Unretained(this)),
+ base::Bind(&MessageReceivedCallback),
+ base::Bind(&MessageSentCallback)));
run_loop_->Run();
}