summaryrefslogtreecommitdiffstats
path: root/google_apis/gcm
diff options
context:
space:
mode:
authorzea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-02-13 21:42:33 +0000
committerzea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-02-13 21:42:33 +0000
commit9eba93a294ccc08160be066c011f90bcae07bc00 (patch)
treedbe8a4e792401edd23fae93c8ccccb0d795891d2 /google_apis/gcm
parent47b779fca8a5b22ec61e2469d14c33c9cd5bf86c (diff)
downloadchromium_src-9eba93a294ccc08160be066c011f90bcae07bc00.zip
chromium_src-9eba93a294ccc08160be066c011f90bcae07bc00.tar.gz
chromium_src-9eba93a294ccc08160be066c011f90bcae07bc00.tar.bz2
[GCM] Add basic collapse key support for upstream
Messages are collapsed based on app + token as long as they have not been sent over the wire yet. If messages are resent (because original send failed for some reason) no collapsing is done at this point in time. BUG=284553 Review URL: https://codereview.chromium.org/148293002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@251135 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'google_apis/gcm')
-rw-r--r--google_apis/gcm/engine/gcm_store.h3
-rw-r--r--google_apis/gcm/engine/gcm_store_impl.cc39
-rw-r--r--google_apis/gcm/engine/gcm_store_impl.h4
-rw-r--r--google_apis/gcm/engine/mcs_client.cc127
-rw-r--r--google_apis/gcm/engine/mcs_client.h8
-rw-r--r--google_apis/gcm/engine/mcs_client_unittest.cc266
6 files changed, 318 insertions, 129 deletions
diff --git a/google_apis/gcm/engine/gcm_store.h b/google_apis/gcm/engine/gcm_store.h
index ed97949..d725118 100644
--- a/google_apis/gcm/engine/gcm_store.h
+++ b/google_apis/gcm/engine/gcm_store.h
@@ -91,6 +91,9 @@ class GCM_EXPORT GCMStore {
virtual bool AddOutgoingMessage(const std::string& persistent_id,
const MCSMessage& message,
const UpdateCallback& callback) = 0;
+ virtual void OverwriteOutgoingMessage(const std::string& persistent_id,
+ const MCSMessage& message,
+ const UpdateCallback& callback) = 0;
virtual void RemoveOutgoingMessage(const std::string& persistent_id,
const UpdateCallback& callback) = 0;
virtual void RemoveOutgoingMessages(const PersistentIdList& persistent_ids,
diff --git a/google_apis/gcm/engine/gcm_store_impl.cc b/google_apis/gcm/engine/gcm_store_impl.cc
index daf7d36..9844f8a 100644
--- a/google_apis/gcm/engine/gcm_store_impl.cc
+++ b/google_apis/gcm/engine/gcm_store_impl.cc
@@ -372,11 +372,11 @@ void GCMStoreImpl::Backend::RemoveOutgoingMessages(
mcs_proto::DataMessageStanza data_message;
// Skip the initial tag byte and parse the rest to extract the message.
if (data_message.ParseFromString(outgoing_message.substr(1))) {
- DCHECK(!data_message.from().empty());
- if (removed_message_counts.count(data_message.from()) != 0)
- removed_message_counts[data_message.from()]++;
+ DCHECK(!data_message.category().empty());
+ if (removed_message_counts.count(data_message.category()) != 0)
+ removed_message_counts[data_message.category()]++;
else
- removed_message_counts[data_message.from()] = 1;
+ removed_message_counts[data_message.category()] = 1;
}
DVLOG(1) << "Removing outgoing message with id " << *iter;
s = db_->Delete(write_options, MakeSlice(key));
@@ -688,7 +688,7 @@ bool GCMStoreImpl::AddOutgoingMessage(const std::string& persistent_id,
const UpdateCallback& callback) {
DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
std::string app_id = reinterpret_cast<const mcs_proto::DataMessageStanza*>(
- &message.GetProtobuf())->from();
+ &message.GetProtobuf())->category();
DCHECK(!app_id.empty());
if (app_message_counts_.count(app_id) == 0)
app_message_counts_[app_id] = 0;
@@ -710,6 +710,25 @@ bool GCMStoreImpl::AddOutgoingMessage(const std::string& persistent_id,
return false;
}
+void GCMStoreImpl::OverwriteOutgoingMessage(const std::string& persistent_id,
+ const MCSMessage& message,
+ const UpdateCallback& callback) {
+ DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
+ std::string app_id = reinterpret_cast<const mcs_proto::DataMessageStanza*>(
+ &message.GetProtobuf())->category();
+ DCHECK(!app_id.empty());
+ // There should already be pending messages for this app.
+ DCHECK(app_message_counts_.count(app_id));
+ // TODO(zea): consider verifying the specific message already exists.
+ blocking_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&GCMStoreImpl::Backend::AddOutgoingMessage,
+ backend_,
+ persistent_id,
+ message,
+ callback));
+}
+
void GCMStoreImpl::RemoveOutgoingMessage(const std::string& persistent_id,
const UpdateCallback& callback) {
blocking_task_runner_->PostTask(
@@ -779,12 +798,12 @@ void GCMStoreImpl::LoadContinuation(const LoadCallback& callback,
iter != result->outgoing_messages.end(); ++iter) {
const mcs_proto::DataMessageStanza* data_message =
reinterpret_cast<mcs_proto::DataMessageStanza*>(iter->second.get());
- DCHECK(!data_message->from().empty());
- if (app_message_counts_.count(data_message->from()) == 0)
- app_message_counts_[data_message->from()] = 1;
+ DCHECK(!data_message->category().empty());
+ if (app_message_counts_.count(data_message->category()) == 0)
+ app_message_counts_[data_message->category()] = 1;
else
- app_message_counts_[data_message->from()]++;
- if (app_message_counts_[data_message->from()] == kMessagesPerAppLimit)
+ app_message_counts_[data_message->category()]++;
+ if (app_message_counts_[data_message->category()] == kMessagesPerAppLimit)
num_throttled_apps++;
}
UMA_HISTOGRAM_COUNTS("GCM.NumThrottledApps", num_throttled_apps);
diff --git a/google_apis/gcm/engine/gcm_store_impl.h b/google_apis/gcm/engine/gcm_store_impl.h
index 621bfe1..b8550da 100644
--- a/google_apis/gcm/engine/gcm_store_impl.h
+++ b/google_apis/gcm/engine/gcm_store_impl.h
@@ -55,6 +55,10 @@ class GCM_EXPORT GCMStoreImpl : public GCMStore {
virtual bool AddOutgoingMessage(const std::string& persistent_id,
const MCSMessage& message,
const UpdateCallback& callback) OVERRIDE;
+ virtual void OverwriteOutgoingMessage(const std::string& persistent_id,
+ const MCSMessage& message,
+ const UpdateCallback& callback)
+ OVERRIDE;
virtual void RemoveOutgoingMessage(const std::string& persistent_id,
const UpdateCallback& callback) OVERRIDE;
virtual void RemoveOutgoingMessages(const PersistentIdList& persistent_ids,
diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc
index 0a0bfc9..2f4edf1 100644
--- a/google_apis/gcm/engine/mcs_client.cc
+++ b/google_apis/gcm/engine/mcs_client.cc
@@ -63,6 +63,47 @@ bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
} // namespace
+class CollapseKey {
+ public:
+ explicit CollapseKey(const mcs_proto::DataMessageStanza& message);
+ ~CollapseKey();
+
+ // Comparison operator for use in maps.
+ bool operator<(const CollapseKey& right) const;
+
+ // Whether the message had a valid collapse key.
+ bool IsValid() const;
+
+ std::string token() const { return token_; }
+ std::string app_id() const { return app_id_; }
+ int64 device_user_id() const { return device_user_id_; }
+
+ private:
+ const std::string token_;
+ const std::string app_id_;
+ const int64 device_user_id_;
+};
+
+CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message)
+ : token_(message.token()),
+ app_id_(message.category()),
+ device_user_id_(message.device_user_id()) {}
+
+CollapseKey::~CollapseKey() {}
+
+bool CollapseKey::IsValid() const {
+ // Device user id is optional, but the application id and token are not.
+ return !token_.empty() && !app_id_.empty();
+}
+
+bool CollapseKey::operator<(const CollapseKey& right) const {
+ if (device_user_id_ != right.device_user_id())
+ return device_user_id_ < right.device_user_id();
+ if (app_id_ != right.app_id())
+ return app_id_ < right.app_id();
+ return token_ < right.token();
+}
+
struct ReliablePacketInfo {
ReliablePacketInfo();
~ReliablePacketInfo();
@@ -189,6 +230,15 @@ void MCSClient::Initialize(
packet_info->tag = GetMCSProtoTag(*iter->second);
packet_info->persistent_id = base::Uint64ToString(iter->first);
to_send_.push_back(make_linked_ptr(packet_info));
+
+ if (packet_info->tag == kDataMessageStanzaTag) {
+ mcs_proto::DataMessageStanza* data_message =
+ reinterpret_cast<mcs_proto::DataMessageStanza*>(
+ packet_info->protobuf.get());
+ CollapseKey collapse_key(*data_message);
+ if (collapse_key.IsValid())
+ collapse_key_map_[collapse_key] = packet_info;
+ }
}
}
@@ -227,21 +277,47 @@ void MCSClient::SendMessage(const MCSMessage& message) {
packet_info->protobuf = message.CloneProtobuf();
if (ttl > 0) {
- PersistentId persistent_id = GetNextPersistentId();
- DVLOG(1) << "Setting persistent id to " << persistent_id;
- packet_info->persistent_id = persistent_id;
- SetPersistentId(persistent_id,
- packet_info->protobuf.get());
- if (!gcm_store_->AddOutgoingMessage(
- persistent_id,
- MCSMessage(message.tag(),
- *(packet_info->protobuf)),
- base::Bind(&MCSClient::OnGCMUpdateFinished,
- weak_ptr_factory_.GetWeakPtr()))) {
- NotifyMessageSendStatus(message.GetProtobuf(),
- APP_QUEUE_SIZE_LIMIT_REACHED);
+ DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
+
+ // First check if this message should replace a pending message with the
+ // same collapse key.
+ mcs_proto::DataMessageStanza* data_message =
+ reinterpret_cast<mcs_proto::DataMessageStanza*>(
+ packet_info->protobuf.get());
+ CollapseKey collapse_key(*data_message);
+ if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) {
+ ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key];
+ DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
+ << original_packet->persistent_id;
+ original_packet->protobuf = packet_info->protobuf.Pass();
+ SetPersistentId(original_packet->persistent_id,
+ original_packet->protobuf.get());
+ gcm_store_->OverwriteOutgoingMessage(
+ original_packet->persistent_id,
+ message,
+ base::Bind(&MCSClient::OnGCMUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
+
+ // The message is already queued, return.
return;
+ } else {
+ PersistentId persistent_id = GetNextPersistentId();
+ DVLOG(1) << "Setting persistent id to " << persistent_id;
+ packet_info->persistent_id = persistent_id;
+ SetPersistentId(persistent_id, packet_info->protobuf.get());
+ if (!gcm_store_->AddOutgoingMessage(
+ persistent_id,
+ MCSMessage(message.tag(), *(packet_info->protobuf)),
+ base::Bind(&MCSClient::OnGCMUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()))) {
+ NotifyMessageSendStatus(message.GetProtobuf(),
+ APP_QUEUE_SIZE_LIMIT_REACHED);
+ return;
+ }
}
+
+ if (collapse_key.IsValid())
+ collapse_key_map_[collapse_key] = packet_info.get();
} else if (!connection_factory_->IsEndpointReachable()) {
DVLOG(1) << "No active connection, dropping message.";
NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
@@ -314,8 +390,7 @@ void MCSClient::ResetStateAndBuildLoginRequest(
std::deque<MCSPacketInternal> new_to_send;
std::vector<PersistentId> expired_ttl_ids;
while (!to_send_.empty()) {
- MCSPacketInternal packet = to_send_.front();
- to_send_.pop_front();
+ MCSPacketInternal packet = PopMessageForSend();
if (GetTTL(*packet->protobuf) > 0 &&
!HasTTLExpired(*packet->protobuf, clock_)) {
new_to_send.push_back(packet);
@@ -367,8 +442,7 @@ void MCSClient::MaybeSendMessage() {
if (!connection_factory_->IsEndpointReachable())
return;
- MCSPacketInternal packet = to_send_.front();
- to_send_.pop_front();
+ MCSPacketInternal packet = PopMessageForSend();
if (HasTTLExpired(*packet->protobuf, clock_)) {
DCHECK(!packet->persistent_id.empty());
DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
@@ -678,7 +752,7 @@ void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
// queue (typically when a StreamAck confirms messages as part of a login
// response).
for (; iter != id_list.end() && !to_send_.empty(); ++iter) {
- const MCSPacketInternal& outgoing_packet = to_send_.front();
+ const MCSPacketInternal& outgoing_packet = PopMessageForSend();
DCHECK_EQ(outgoing_packet->persistent_id, *iter);
NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
@@ -686,8 +760,6 @@ void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
// acknowledged.
StreamId device_stream_id = outgoing_packet->stream_id;
HandleServerConfirmedReceipt(device_stream_id);
-
- to_send_.pop_front();
}
DCHECK(iter == id_list.end());
@@ -755,4 +827,19 @@ void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) {
gcm_store_ = gcm_store;
}
+MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() {
+ MCSPacketInternal packet = to_send_.front();
+ to_send_.pop_front();
+
+ if (packet->tag == kDataMessageStanzaTag) {
+ mcs_proto::DataMessageStanza* data_message =
+ reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get());
+ CollapseKey collapse_key(*data_message);
+ if (collapse_key.IsValid())
+ collapse_key_map_.erase(collapse_key);
+ }
+
+ return packet;
+}
+
} // namespace gcm
diff --git a/google_apis/gcm/engine/mcs_client.h b/google_apis/gcm/engine/mcs_client.h
index d7bd71b..1834b95 100644
--- a/google_apis/gcm/engine/mcs_client.h
+++ b/google_apis/gcm/engine/mcs_client.h
@@ -35,6 +35,7 @@ class LoginRequest;
namespace gcm {
+class CollapseKey;
class ConnectionFactory;
struct ReliablePacketInfo;
@@ -192,6 +193,10 @@ class GCM_EXPORT MCSClient {
void NotifyMessageSendStatus(const google::protobuf::MessageLite& protobuf,
MessageSendStatus status);
+ // Pops the next message from the front of the send queue (cleaning up
+ // any associated state).
+ MCSPacketInternal PopMessageForSend();
+
// Clock for enforcing TTL. Passed in for testing.
base::Clock* const clock_;
@@ -222,6 +227,9 @@ class GCM_EXPORT MCSClient {
std::deque<MCSPacketInternal> to_send_;
std::deque<MCSPacketInternal> to_resend_;
+ // Map of collapse keys to their pending messages.
+ std::map<CollapseKey, ReliablePacketInfo*> collapse_key_map_;
+
// Last device_to_server stream id acknowledged by the server.
StreamId last_device_to_server_stream_id_received_;
// Last server_to_device stream id acknowledged by this device.
diff --git a/google_apis/gcm/engine/mcs_client_unittest.cc b/google_apis/gcm/engine/mcs_client_unittest.cc
index cc2e0f9..085f136 100644
--- a/google_apis/gcm/engine/mcs_client_unittest.cc
+++ b/google_apis/gcm/engine/mcs_client_unittest.cc
@@ -43,7 +43,9 @@ MCSMessage BuildDataMessage(const std::string& from,
const std::string& persistent_id,
int ttl,
uint64 sent,
- int queued) {
+ int queued,
+ const std::string& token,
+ const uint64& user_id) {
mcs_proto::DataMessageStanza data_message;
data_message.set_id(message_id);
data_message.set_from(from);
@@ -54,6 +56,8 @@ MCSMessage BuildDataMessage(const std::string& from,
data_message.set_ttl(ttl);
data_message.set_sent(sent);
data_message.set_queued(queued);
+ data_message.set_token(token);
+ data_message.set_device_user_id(user_id);
return MCSMessage(kDataMessageStanzaTag, data_message);
}
@@ -286,7 +290,8 @@ TEST_F(MCSClientTest, SendMessageNoRMQ) {
BuildMCSClient();
InitializeClient();
LoginClient(std::vector<std::string>());
- MCSMessage message(BuildDataMessage("from", "category", "X", 1, "", 0, 1, 0));
+ MCSMessage message(
+ BuildDataMessage("from", "category", "X", 1, "", 0, 1, 0, "", 0));
GetFakeHandler()->ExpectOutgoingMessage(message);
mcs_client()->SendMessage(message);
EXPECT_TRUE(GetFakeHandler()->AllOutgoingMessagesReceived());
@@ -299,7 +304,8 @@ TEST_F(MCSClientTest, SendMessageNoRMQWhileDisconnected) {
InitializeClient();
EXPECT_TRUE(sent_message_id().empty());
- MCSMessage message(BuildDataMessage("from", "category", "X", 1, "", 0, 1, 0));
+ MCSMessage message(
+ BuildDataMessage("from", "category", "X", 1, "", 0, 1, 0, "", 0));
mcs_client()->SendMessage(message);
// Message sent callback should be invoked, but no message should actually
@@ -314,8 +320,8 @@ TEST_F(MCSClientTest, SendMessageRMQ) {
BuildMCSClient();
InitializeClient();
LoginClient(std::vector<std::string>());
- MCSMessage message(
- BuildDataMessage("from", "category", "X", 1, "1", kTTLValue, 1, 0));
+ MCSMessage message(BuildDataMessage(
+ "from", "category", "X", 1, "1", kTTLValue, 1, 0, "", 0));
GetFakeHandler()->ExpectOutgoingMessage(message);
mcs_client()->SendMessage(message);
EXPECT_TRUE(GetFakeHandler()->AllOutgoingMessagesReceived());
@@ -328,8 +334,8 @@ TEST_F(MCSClientTest, SendMessageRMQWhileDisconnected) {
InitializeClient();
LoginClient(std::vector<std::string>());
GetFakeHandler()->set_fail_send(true);
- MCSMessage message(
- BuildDataMessage("from", "category", "X", 1, "1", kTTLValue, 1, 0));
+ MCSMessage message(BuildDataMessage(
+ "from", "category", "X", 1, "1", kTTLValue, 1, 0, "", 0));
// The initial (failed) send.
GetFakeHandler()->ExpectOutgoingMessage(message);
@@ -340,14 +346,8 @@ TEST_F(MCSClientTest, SendMessageRMQWhileDisconnected) {
BuildLoginRequest(kAndroidId, kSecurityToken).
PassAs<const google::protobuf::MessageLite>()));
// The second (re)send.
- MCSMessage message2(BuildDataMessage("from",
- "category",
- "X",
- 1,
- "1",
- kTTLValue,
- 1,
- kTTLValue - 1));
+ MCSMessage message2(BuildDataMessage(
+ "from", "category", "X", 1, "1", kTTLValue, 1, kTTLValue - 1, "", 0));
GetFakeHandler()->ExpectOutgoingMessage(message2);
mcs_client()->SendMessage(message);
PumpLoop(); // Wait for the queuing to happen.
@@ -368,8 +368,8 @@ TEST_F(MCSClientTest, SendMessageRMQOnRestart) {
InitializeClient();
LoginClient(std::vector<std::string>());
GetFakeHandler()->set_fail_send(true);
- MCSMessage message(
- BuildDataMessage("from", "category", "X", 1, "1", kTTLValue, 1, 0));
+ MCSMessage message(BuildDataMessage(
+ "from", "category", "X", 1, "1", kTTLValue, 1, 0, "", 0));
// The initial (failed) send.
GetFakeHandler()->ExpectOutgoingMessage(message);
@@ -384,14 +384,8 @@ TEST_F(MCSClientTest, SendMessageRMQOnRestart) {
InitializeClient();
clock()->Advance(base::TimeDelta::FromSeconds(kTTLValue - 1));
- MCSMessage message2(BuildDataMessage("from",
- "category",
- "X",
- 1,
- "1",
- kTTLValue,
- 1,
- kTTLValue - 1));
+ MCSMessage message2(BuildDataMessage(
+ "from", "category", "X", 1, "1", kTTLValue, 1, kTTLValue - 1, "", 0));
LoginClient(std::vector<std::string>());
GetFakeHandler()->ExpectOutgoingMessage(message2);
PumpLoop();
@@ -407,15 +401,16 @@ TEST_F(MCSClientTest, SendMessageRMQWithStreamAck) {
// Send some messages.
for (int i = 1; i <= kMessageBatchSize; ++i) {
- MCSMessage message(
- BuildDataMessage("from",
- "category",
- "X",
- 1,
- base::IntToString(i),
- kTTLValue,
- 1,
- 0));
+ MCSMessage message(BuildDataMessage("from",
+ "category",
+ "X",
+ 1,
+ base::IntToString(i),
+ kTTLValue,
+ 1,
+ 0,
+ "",
+ 0));
GetFakeHandler()->ExpectOutgoingMessage(message);
mcs_client()->SendMessage(message);
PumpLoop();
@@ -449,15 +444,16 @@ TEST_F(MCSClientTest, SendMessageRMQAckOnReconnect) {
std::vector<std::string> id_list;
for (int i = 1; i <= kMessageBatchSize; ++i) {
id_list.push_back(base::IntToString(i));
- MCSMessage message(
- BuildDataMessage("from",
- "category",
- id_list.back(),
- 1,
- id_list.back(),
- kTTLValue,
- 1,
- 0));
+ MCSMessage message(BuildDataMessage("from",
+ "category",
+ id_list.back(),
+ 1,
+ id_list.back(),
+ kTTLValue,
+ 1,
+ 0,
+ "",
+ 0));
GetFakeHandler()->ExpectOutgoingMessage(message);
mcs_client()->SendMessage(message);
PumpLoop();
@@ -489,15 +485,16 @@ TEST_F(MCSClientTest, SendMessageRMQPartialAckOnReconnect) {
std::vector<std::string> id_list;
for (int i = 1; i <= kMessageBatchSize; ++i) {
id_list.push_back(base::IntToString(i));
- MCSMessage message(
- BuildDataMessage("from",
- "category",
- id_list.back(),
- 1,
- id_list.back(),
- kTTLValue,
- 1,
- 0));
+ MCSMessage message(BuildDataMessage("from",
+ "category",
+ id_list.back(),
+ 1,
+ id_list.back(),
+ kTTLValue,
+ 1,
+ 0,
+ "",
+ 0));
GetFakeHandler()->ExpectOutgoingMessage(message);
mcs_client()->SendMessage(message);
PumpLoop();
@@ -519,14 +516,16 @@ TEST_F(MCSClientTest, SendMessageRMQPartialAckOnReconnect) {
id_list.begin() + kMessageBatchSize / 2,
id_list.end());
for (int i = 1; i <= kMessageBatchSize / 2; ++i) {
- MCSMessage message(
- BuildDataMessage("from",
- "category",
- remaining_ids[i - 1],
- 2,
- remaining_ids[i - 1],
- kTTLValue,
- 1, 0));
+ MCSMessage message(BuildDataMessage("from",
+ "category",
+ remaining_ids[i - 1],
+ 2,
+ remaining_ids[i - 1],
+ kTTLValue,
+ 1,
+ 0,
+ "",
+ 0));
GetFakeHandler()->ExpectOutgoingMessage(message);
}
scoped_ptr<mcs_proto::IqStanza> ack(BuildSelectiveAck(acked_ids));
@@ -549,15 +548,8 @@ TEST_F(MCSClientTest, AckOnLogin) {
std::vector<std::string> id_list;
for (int i = 1; i <= kMessageBatchSize; ++i) {
id_list.push_back(base::IntToString(i));
- MCSMessage message(
- BuildDataMessage("from",
- "category",
- "X",
- 1,
- id_list.back(),
- kTTLValue,
- 1,
- 0));
+ MCSMessage message(BuildDataMessage(
+ "from", "category", "X", 1, id_list.back(), kTTLValue, 1, 0, "", 0));
GetFakeHandler()->ReceiveMessage(message);
WaitForMCSEvent();
PumpLoop();
@@ -581,28 +573,31 @@ TEST_F(MCSClientTest, AckOnSend) {
std::vector<std::string> id_list;
for (int i = 1; i <= kMessageBatchSize; ++i) {
id_list.push_back(base::IntToString(i));
- MCSMessage message(
- BuildDataMessage("from",
- "category",
- id_list.back(),
- 1,
- id_list.back(),
- kTTLValue,
- 1,
- 0));
+ MCSMessage message(BuildDataMessage("from",
+ "category",
+ id_list.back(),
+ 1,
+ id_list.back(),
+ kTTLValue,
+ 1,
+ 0,
+ "",
+ 0));
GetFakeHandler()->ReceiveMessage(message);
PumpLoop();
}
// Trigger a message send, which should acknowledge via stream ack.
- MCSMessage message(
- BuildDataMessage("from",
- "category",
- "X",
- kMessageBatchSize + 1,
- "1",
- kTTLValue,
- 1, 0));
+ MCSMessage message(BuildDataMessage("from",
+ "category",
+ "X",
+ kMessageBatchSize + 1,
+ "1",
+ kTTLValue,
+ 1,
+ 0,
+ "",
+ 0));
GetFakeHandler()->ExpectOutgoingMessage(message);
mcs_client()->SendMessage(message);
EXPECT_TRUE(GetFakeHandler()->AllOutgoingMessagesReceived());
@@ -626,15 +621,16 @@ TEST_F(MCSClientTest, AckWhenLimitReachedWithHeartbeat) {
std::vector<std::string> id_list;
for (int i = 1; i <= kAckLimitSize; ++i) {
id_list.push_back(base::IntToString(i));
- MCSMessage message(
- BuildDataMessage("from",
- "category",
- id_list.back(),
- 1,
- id_list.back(),
- kTTLValue,
- 1,
- 0));
+ MCSMessage message(BuildDataMessage("from",
+ "category",
+ id_list.back(),
+ 1,
+ id_list.back(),
+ kTTLValue,
+ 1,
+ 0,
+ "",
+ 0));
GetFakeHandler()->ReceiveMessage(message);
WaitForMCSEvent();
PumpLoop();
@@ -673,8 +669,8 @@ TEST_F(MCSClientTest, ExpiredTTLOnSend) {
BuildMCSClient();
InitializeClient();
LoginClient(std::vector<std::string>());
- MCSMessage message(
- BuildDataMessage("from", "category", "X", 1, "1", kTTLValue, 1, 0));
+ MCSMessage message(BuildDataMessage(
+ "from", "category", "X", 1, "1", kTTLValue, 1, 0, "", 0));
// Advance time to after the TTL.
clock()->Advance(base::TimeDelta::FromSeconds(kTTLValue + 2));
@@ -692,8 +688,8 @@ TEST_F(MCSClientTest, ExpiredTTLOnRestart) {
InitializeClient();
LoginClient(std::vector<std::string>());
GetFakeHandler()->set_fail_send(true);
- MCSMessage message(
- BuildDataMessage("from", "category", "X", 1, "1", kTTLValue, 1, 0));
+ MCSMessage message(BuildDataMessage(
+ "from", "category", "X", 1, "1", kTTLValue, 1, 0, "", 0));
// The initial (failed) send.
GetFakeHandler()->ExpectOutgoingMessage(message);
@@ -715,6 +711,78 @@ TEST_F(MCSClientTest, ExpiredTTLOnRestart) {
EXPECT_TRUE(GetFakeHandler()->AllOutgoingMessagesReceived());
}
+// Sending two messages with the same collapse key and same app id while
+// disconnected should only send the latter of the two on reconnection.
+TEST_F(MCSClientTest, CollapseKeysSameApp) {
+ BuildMCSClient();
+ InitializeClient();
+ MCSMessage message(BuildDataMessage(
+ "from", "app", "message id 1", 1, "1", kTTLValue, 1, 0, "token", 0));
+ mcs_client()->SendMessage(message);
+
+ MCSMessage message2(BuildDataMessage(
+ "from", "app", "message id 2", 1, "1", kTTLValue, 1, 0, "token", 0));
+ mcs_client()->SendMessage(message2);
+
+ LoginClient(std::vector<std::string>());
+ GetFakeHandler()->ExpectOutgoingMessage(message2);
+ PumpLoop();
+}
+
+// Sending two messages with the same collapse key and different app id while
+// disconnected should not perform any collapsing.
+TEST_F(MCSClientTest, CollapseKeysDifferentApp) {
+ BuildMCSClient();
+ InitializeClient();
+ MCSMessage message(BuildDataMessage(
+ "from", "app", "message id 1", 1, "1", kTTLValue, 1, 0, "token", 0));
+ mcs_client()->SendMessage(message);
+
+ MCSMessage message2(BuildDataMessage("from",
+ "app 2",
+ "message id 2",
+ 1,
+ "2",
+ kTTLValue,
+ 1,
+ 0,
+ "token",
+ 0));
+ mcs_client()->SendMessage(message2);
+
+ LoginClient(std::vector<std::string>());
+ GetFakeHandler()->ExpectOutgoingMessage(message);
+ GetFakeHandler()->ExpectOutgoingMessage(message2);
+ PumpLoop();
+}
+
+// Sending two messages with the same collapse key and app id, but different
+// user, while disconnected, should not perform any collapsing.
+TEST_F(MCSClientTest, CollapseKeysDifferentUser) {
+ BuildMCSClient();
+ InitializeClient();
+ MCSMessage message(BuildDataMessage(
+ "from", "app", "message id 1", 1, "1", kTTLValue, 1, 0, "token", 0));
+ mcs_client()->SendMessage(message);
+
+ MCSMessage message2(BuildDataMessage("from",
+ "app",
+ "message id 2",
+ 1,
+ "2",
+ kTTLValue,
+ 1,
+ 0,
+ "token",
+ 1));
+ mcs_client()->SendMessage(message2);
+
+ LoginClient(std::vector<std::string>());
+ GetFakeHandler()->ExpectOutgoingMessage(message);
+ GetFakeHandler()->ExpectOutgoingMessage(message2);
+ PumpLoop();
+}
+
} // namespace
} // namespace gcm