diff options
author | zea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-02-25 19:43:29 +0000 |
---|---|---|
committer | zea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-02-25 19:43:29 +0000 |
commit | 79ba4f72472d2408038390311bd9d0deda63e610 (patch) | |
tree | 7a41329ca9dee1625f0f7d1302492db092efcbc5 /google_apis | |
parent | 52222eeb9d3a567b7bd19be2814ec7d8668b9f71 (diff) | |
download | chromium_src-79ba4f72472d2408038390311bd9d0deda63e610.zip chromium_src-79ba4f72472d2408038390311bd9d0deda63e610.tar.gz chromium_src-79ba4f72472d2408038390311bd9d0deda63e610.tar.bz2 |
[GCM] Make RMQ ack logic more resilient
Redundant ids (those already acknowledged) are now supported during a stream
ack. Additionally, id order is no longer assumed.
BUG=343974
R=fgorski@chromium.org
Review URL: https://codereview.chromium.org/173713009
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@253228 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'google_apis')
-rw-r--r-- | google_apis/gcm/engine/mcs_client.cc | 58 |
1 files changed, 42 insertions, 16 deletions
diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc index adae939..75fed0d 100644 --- a/google_apis/gcm/engine/mcs_client.cc +++ b/google_apis/gcm/engine/mcs_client.cc @@ -4,6 +4,8 @@ #include "google_apis/gcm/engine/mcs_client.h" +#include <set> + #include "base/basictypes.h" #include "base/message_loop/message_loop.h" #include "base/metrics/histogram.h" @@ -739,43 +741,67 @@ void MCSClient::HandleStreamAck(StreamId last_stream_id_received) { } void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) { - // First check the to_resend_ queue. Acknowledgments should always happen - // in the order they were sent, so if messages are present they should match - // the acknowledge list. - PersistentIdList::const_iterator iter = id_list.begin(); - for (; iter != id_list.end() && !to_resend_.empty(); ++iter) { + std::set<PersistentId> remaining_ids(id_list.begin(), id_list.end()); + + StreamId last_stream_id_received = -1; + + // First check the to_resend_ queue. Acknowledgments are always contiguous, + // so if there's a pending message that hasn't been acked, all newer messages + // must also be unacked. + while(!to_resend_.empty() && !remaining_ids.empty()) { const MCSPacketInternal& outgoing_packet = to_resend_.front(); - DCHECK_EQ(outgoing_packet->persistent_id, *iter); + if (remaining_ids.count(outgoing_packet->persistent_id) == 0) + break; // Newer message must be unacked too. + remaining_ids.erase(outgoing_packet->persistent_id); NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT); // No need to re-acknowledge any server messages this message already // acknowledged. StreamId device_stream_id = outgoing_packet->stream_id; - HandleServerConfirmedReceipt(device_stream_id); - + if (device_stream_id > last_stream_id_received) + last_stream_id_received = device_stream_id; to_resend_.pop_front(); } // If the acknowledged ids aren't all there, they might be in the to_send_ - // queue (typically when a StreamAck confirms messages as part of a login + // queue (typically when a SelectiveAck confirms messages as part of a login // response). - for (; iter != id_list.end() && !to_send_.empty(); ++iter) { - const MCSPacketInternal& outgoing_packet = PopMessageForSend(); - DCHECK_EQ(outgoing_packet->persistent_id, *iter); + while (!to_send_.empty() && !remaining_ids.empty()) { + const MCSPacketInternal& outgoing_packet = to_send_.front(); + if (remaining_ids.count(outgoing_packet->persistent_id) == 0) + break; // Newer messages must be unacked too. + remaining_ids.erase(outgoing_packet->persistent_id); NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT); // No need to re-acknowledge any server messages this message already // acknowledged. StreamId device_stream_id = outgoing_packet->stream_id; - HandleServerConfirmedReceipt(device_stream_id); + if (device_stream_id > last_stream_id_received) + last_stream_id_received = device_stream_id; + PopMessageForSend(); } - DCHECK(iter == id_list.end()); + // Only handle the largest stream id value. All other stream ids are + // implicitly handled. + if (last_stream_id_received > 0) + HandleServerConfirmedReceipt(last_stream_id_received); + + // At this point, all remaining acked ids are redundant. + PersistentIdList acked_ids; + if (remaining_ids.size() > 0) { + for (size_t i = 0; i < id_list.size(); ++i) { + if (remaining_ids.count(id_list[i]) > 0) + continue; + acked_ids.push_back(id_list[i]); + } + } else { + acked_ids = id_list; + } - DVLOG(1) << "Server acked " << id_list.size() + DVLOG(1) << "Server acked " << acked_ids.size() << " messages, " << to_resend_.size() << " remaining unacked."; gcm_store_->RemoveOutgoingMessages( - id_list, + acked_ids, base::Bind(&MCSClient::OnGCMUpdateFinished, weak_ptr_factory_.GetWeakPtr())); |