summaryrefslogtreecommitdiffstats
path: root/google_apis
diff options
context:
space:
mode:
authorzea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-02-25 19:43:29 +0000
committerzea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-02-25 19:43:29 +0000
commit79ba4f72472d2408038390311bd9d0deda63e610 (patch)
tree7a41329ca9dee1625f0f7d1302492db092efcbc5 /google_apis
parent52222eeb9d3a567b7bd19be2814ec7d8668b9f71 (diff)
downloadchromium_src-79ba4f72472d2408038390311bd9d0deda63e610.zip
chromium_src-79ba4f72472d2408038390311bd9d0deda63e610.tar.gz
chromium_src-79ba4f72472d2408038390311bd9d0deda63e610.tar.bz2
[GCM] Make RMQ ack logic more resilient
Redundant ids (those already acknowledged) are now supported during a stream ack. Additionally, id order is no longer assumed. BUG=343974 R=fgorski@chromium.org Review URL: https://codereview.chromium.org/173713009 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@253228 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'google_apis')
-rw-r--r--google_apis/gcm/engine/mcs_client.cc58
1 files changed, 42 insertions, 16 deletions
diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc
index adae939..75fed0d 100644
--- a/google_apis/gcm/engine/mcs_client.cc
+++ b/google_apis/gcm/engine/mcs_client.cc
@@ -4,6 +4,8 @@
#include "google_apis/gcm/engine/mcs_client.h"
+#include <set>
+
#include "base/basictypes.h"
#include "base/message_loop/message_loop.h"
#include "base/metrics/histogram.h"
@@ -739,43 +741,67 @@ void MCSClient::HandleStreamAck(StreamId last_stream_id_received) {
}
void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
- // First check the to_resend_ queue. Acknowledgments should always happen
- // in the order they were sent, so if messages are present they should match
- // the acknowledge list.
- PersistentIdList::const_iterator iter = id_list.begin();
- for (; iter != id_list.end() && !to_resend_.empty(); ++iter) {
+ std::set<PersistentId> remaining_ids(id_list.begin(), id_list.end());
+
+ StreamId last_stream_id_received = -1;
+
+ // First check the to_resend_ queue. Acknowledgments are always contiguous,
+ // so if there's a pending message that hasn't been acked, all newer messages
+ // must also be unacked.
+ while(!to_resend_.empty() && !remaining_ids.empty()) {
const MCSPacketInternal& outgoing_packet = to_resend_.front();
- DCHECK_EQ(outgoing_packet->persistent_id, *iter);
+ if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
+ break; // Newer message must be unacked too.
+ remaining_ids.erase(outgoing_packet->persistent_id);
NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
// No need to re-acknowledge any server messages this message already
// acknowledged.
StreamId device_stream_id = outgoing_packet->stream_id;
- HandleServerConfirmedReceipt(device_stream_id);
-
+ if (device_stream_id > last_stream_id_received)
+ last_stream_id_received = device_stream_id;
to_resend_.pop_front();
}
// If the acknowledged ids aren't all there, they might be in the to_send_
- // queue (typically when a StreamAck confirms messages as part of a login
+ // queue (typically when a SelectiveAck confirms messages as part of a login
// response).
- for (; iter != id_list.end() && !to_send_.empty(); ++iter) {
- const MCSPacketInternal& outgoing_packet = PopMessageForSend();
- DCHECK_EQ(outgoing_packet->persistent_id, *iter);
+ while (!to_send_.empty() && !remaining_ids.empty()) {
+ const MCSPacketInternal& outgoing_packet = to_send_.front();
+ if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
+ break; // Newer messages must be unacked too.
+ remaining_ids.erase(outgoing_packet->persistent_id);
NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
// No need to re-acknowledge any server messages this message already
// acknowledged.
StreamId device_stream_id = outgoing_packet->stream_id;
- HandleServerConfirmedReceipt(device_stream_id);
+ if (device_stream_id > last_stream_id_received)
+ last_stream_id_received = device_stream_id;
+ PopMessageForSend();
}
- DCHECK(iter == id_list.end());
+ // Only handle the largest stream id value. All other stream ids are
+ // implicitly handled.
+ if (last_stream_id_received > 0)
+ HandleServerConfirmedReceipt(last_stream_id_received);
+
+ // At this point, all remaining acked ids are redundant.
+ PersistentIdList acked_ids;
+ if (remaining_ids.size() > 0) {
+ for (size_t i = 0; i < id_list.size(); ++i) {
+ if (remaining_ids.count(id_list[i]) > 0)
+ continue;
+ acked_ids.push_back(id_list[i]);
+ }
+ } else {
+ acked_ids = id_list;
+ }
- DVLOG(1) << "Server acked " << id_list.size()
+ DVLOG(1) << "Server acked " << acked_ids.size()
<< " messages, " << to_resend_.size() << " remaining unacked.";
gcm_store_->RemoveOutgoingMessages(
- id_list,
+ acked_ids,
base::Bind(&MCSClient::OnGCMUpdateFinished,
weak_ptr_factory_.GetWeakPtr()));