summaryrefslogtreecommitdiffstats
path: root/google_apis
diff options
context:
space:
mode:
authorzea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-01-03 00:53:34 +0000
committerzea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2014-01-03 00:53:34 +0000
commit079631dd15199a1463505b7c51570c27ba0e13c8 (patch)
treeffca7f0f37754f3ed403ff3daff04ff5fef9031f /google_apis
parentd3c6e6956c72c19d7d06021c87304728db1615be (diff)
downloadchromium_src-079631dd15199a1463505b7c51570c27ba0e13c8.zip
chromium_src-079631dd15199a1463505b7c51570c27ba0e13c8.tar.gz
chromium_src-079631dd15199a1463505b7c51570c27ba0e13c8.tar.bz2
[GCM] Add heartbeat manager and reconnection logic due to heartbeat failure
The heartbeat manager maintains the heartbeat timer, handles heartbeat interval updates from the server, and automatically triggers connection resets if the heartbeat isn't properly acknowledged in a timely manner. Also fixes an issue in reconnection where the client socket handle doesn't deal well with a connection reset due to passing ownership of the socket. BUG=284553 Review URL: https://codereview.chromium.org/118133003 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@242848 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'google_apis')
-rw-r--r--google_apis/gcm/engine/connection_factory_impl.cc10
-rw-r--r--google_apis/gcm/engine/connection_handler.h2
-rw-r--r--google_apis/gcm/engine/connection_handler_impl.cc10
-rw-r--r--google_apis/gcm/engine/connection_handler_impl.h4
-rw-r--r--google_apis/gcm/engine/connection_handler_impl_unittest.cc2
-rw-r--r--google_apis/gcm/engine/fake_connection_factory.cc2
-rw-r--r--google_apis/gcm/engine/fake_connection_handler.cc2
-rw-r--r--google_apis/gcm/engine/fake_connection_handler.h2
-rw-r--r--google_apis/gcm/engine/heartbeat_manager.cc119
-rw-r--r--google_apis/gcm/engine/heartbeat_manager.h82
-rw-r--r--google_apis/gcm/engine/heartbeat_manager_unittest.cc176
-rw-r--r--google_apis/gcm/engine/mcs_client.cc41
-rw-r--r--google_apis/gcm/engine/mcs_client.h12
-rw-r--r--google_apis/gcm/gcm.gyp3
14 files changed, 427 insertions, 40 deletions
diff --git a/google_apis/gcm/engine/connection_factory_impl.cc b/google_apis/gcm/engine/connection_factory_impl.cc
index d577a68..31c225b 100644
--- a/google_apis/gcm/engine/connection_factory_impl.cc
+++ b/google_apis/gcm/engine/connection_factory_impl.cc
@@ -117,7 +117,9 @@ void ConnectionFactoryImpl::Connect() {
}
bool ConnectionFactoryImpl::IsEndpointReachable() const {
- return connection_handler_ && connection_handler_->CanSendMessage();
+ return connection_handler_ &&
+ connection_handler_->CanSendMessage() &&
+ !connecting_;
}
void ConnectionFactoryImpl::SignalConnectionReset() {
@@ -164,6 +166,10 @@ void ConnectionFactoryImpl::OnIPAddressChanged() {
void ConnectionFactoryImpl::ConnectImpl() {
DCHECK(!IsEndpointReachable());
+ if (socket_handle_.socket() && socket_handle_.socket()->IsConnected())
+ socket_handle_.socket()->Disconnect();
+ socket_handle_.Reset();
+
// TODO(zea): resolve proxies.
net::ProxyInfo proxy_info;
proxy_info.UseDirect();
@@ -193,7 +199,7 @@ void ConnectionFactoryImpl::InitHandler() {
DCHECK(login_request.IsInitialized());
}
- connection_handler_->Init(login_request, socket_handle_.PassSocket());
+ connection_handler_->Init(login_request, socket_handle_.socket());
}
scoped_ptr<net::BackoffEntry> ConnectionFactoryImpl::CreateBackoffEntry(
diff --git a/google_apis/gcm/engine/connection_handler.h b/google_apis/gcm/engine/connection_handler.h
index 5b9ea71..50b880d 100644
--- a/google_apis/gcm/engine/connection_handler.h
+++ b/google_apis/gcm/engine/connection_handler.h
@@ -48,7 +48,7 @@ class GCM_EXPORT ConnectionHandler {
// Note: It is correct and expected to call Init more than once, as connection
// issues are encountered and new connections must be made.
virtual void Init(const mcs_proto::LoginRequest& login_request,
- scoped_ptr<net::StreamSocket> socket) = 0;
+ net::StreamSocket* socket) = 0;
// Checks that a handshake has been completed and a message is not already
// in flight.
diff --git a/google_apis/gcm/engine/connection_handler_impl.cc b/google_apis/gcm/engine/connection_handler_impl.cc
index 9772000..aea1683 100644
--- a/google_apis/gcm/engine/connection_handler_impl.cc
+++ b/google_apis/gcm/engine/connection_handler_impl.cc
@@ -52,7 +52,7 @@ ConnectionHandlerImpl::~ConnectionHandlerImpl() {
void ConnectionHandlerImpl::Init(
const mcs_proto::LoginRequest& login_request,
- scoped_ptr<net::StreamSocket> socket) {
+ net::StreamSocket* socket) {
DCHECK(!read_callback_.is_null());
DCHECK(!write_callback_.is_null());
DCHECK(!connection_callback_.is_null());
@@ -63,9 +63,9 @@ void ConnectionHandlerImpl::Init(
handshake_complete_ = false;
message_tag_ = 0;
message_size_ = 0;
- socket_ = socket.Pass();
- input_stream_.reset(new SocketInputStream(socket_.get()));
- output_stream_.reset(new SocketOutputStream(socket_.get()));
+ socket_ = socket;
+ input_stream_.reset(new SocketInputStream(socket_));
+ output_stream_.reset(new SocketOutputStream(socket_));
Login(login_request);
}
@@ -393,8 +393,6 @@ void ConnectionHandlerImpl::OnTimeout() {
void ConnectionHandlerImpl::CloseConnection() {
DVLOG(1) << "Closing connection.";
- read_callback_.Reset();
- write_callback_.Reset();
read_timeout_timer_.Stop();
socket_->Disconnect();
input_stream_.reset();
diff --git a/google_apis/gcm/engine/connection_handler_impl.h b/google_apis/gcm/engine/connection_handler_impl.h
index 110cdcd..c4efe66 100644
--- a/google_apis/gcm/engine/connection_handler_impl.h
+++ b/google_apis/gcm/engine/connection_handler_impl.h
@@ -34,7 +34,7 @@ class GCM_EXPORT ConnectionHandlerImpl : public ConnectionHandler {
// ConnectionHandler implementation.
virtual void Init(const mcs_proto::LoginRequest& login_request,
- scoped_ptr<net::StreamSocket> socket) OVERRIDE;
+ net::StreamSocket* socket) OVERRIDE;
virtual bool CanSendMessage() const OVERRIDE;
virtual void SendMessage(const google::protobuf::MessageLite& message)
OVERRIDE;
@@ -96,7 +96,7 @@ class GCM_EXPORT ConnectionHandlerImpl : public ConnectionHandler {
base::OneShotTimer<ConnectionHandlerImpl> read_timeout_timer_;
// This connection's socket and the input/output streams attached to it.
- scoped_ptr<net::StreamSocket> socket_;
+ net::StreamSocket* socket_;
scoped_ptr<SocketInputStream> input_stream_;
scoped_ptr<SocketOutputStream> output_stream_;
diff --git a/google_apis/gcm/engine/connection_handler_impl_unittest.cc b/google_apis/gcm/engine/connection_handler_impl_unittest.cc
index 0cdcdc6..70a3660 100644
--- a/google_apis/gcm/engine/connection_handler_impl_unittest.cc
+++ b/google_apis/gcm/engine/connection_handler_impl_unittest.cc
@@ -199,7 +199,7 @@ void GCMConnectionHandlerImplTest::Connect(
base::Unretained(this))));
EXPECT_FALSE(connection_handler()->CanSendMessage());
connection_handler_->Init(*BuildLoginRequest(kAuthId, kAuthToken),
- socket_.Pass());
+ socket_.get());
}
void GCMConnectionHandlerImplTest::ReadContinuation(
diff --git a/google_apis/gcm/engine/fake_connection_factory.cc b/google_apis/gcm/engine/fake_connection_factory.cc
index 93e66cb..084ff66 100644
--- a/google_apis/gcm/engine/fake_connection_factory.cc
+++ b/google_apis/gcm/engine/fake_connection_factory.cc
@@ -32,7 +32,7 @@ ConnectionHandler* FakeConnectionFactory::GetConnectionHandler() const {
void FakeConnectionFactory::Connect() {
mcs_proto::LoginRequest login_request;
request_builder_.Run(&login_request);
- connection_handler_->Init(login_request, scoped_ptr<net::StreamSocket>());
+ connection_handler_->Init(login_request, NULL);
}
bool FakeConnectionFactory::IsEndpointReachable() const {
diff --git a/google_apis/gcm/engine/fake_connection_handler.cc b/google_apis/gcm/engine/fake_connection_handler.cc
index 0663933..6a239ea 100644
--- a/google_apis/gcm/engine/fake_connection_handler.cc
+++ b/google_apis/gcm/engine/fake_connection_handler.cc
@@ -39,7 +39,7 @@ FakeConnectionHandler::~FakeConnectionHandler() {
}
void FakeConnectionHandler::Init(const mcs_proto::LoginRequest& login_request,
- scoped_ptr<net::StreamSocket> socket) {
+ net::StreamSocket* socket) {
EXPECT_EQ(expected_outgoing_messages_.front().SerializeAsString(),
login_request.SerializeAsString());
expected_outgoing_messages_.pop_front();
diff --git a/google_apis/gcm/engine/fake_connection_handler.h b/google_apis/gcm/engine/fake_connection_handler.h
index 5356b77..ffb71af 100644
--- a/google_apis/gcm/engine/fake_connection_handler.h
+++ b/google_apis/gcm/engine/fake_connection_handler.h
@@ -23,7 +23,7 @@ class FakeConnectionHandler : public ConnectionHandler {
// ConnectionHandler implementation.
virtual void Init(const mcs_proto::LoginRequest& login_request,
- scoped_ptr<net::StreamSocket> socket) OVERRIDE;
+ net::StreamSocket* socket) OVERRIDE;
virtual bool CanSendMessage() const OVERRIDE;
virtual void SendMessage(const google::protobuf::MessageLite& message)
OVERRIDE;
diff --git a/google_apis/gcm/engine/heartbeat_manager.cc b/google_apis/gcm/engine/heartbeat_manager.cc
new file mode 100644
index 0000000..5b5ae47
--- /dev/null
+++ b/google_apis/gcm/engine/heartbeat_manager.cc
@@ -0,0 +1,119 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "google_apis/gcm/engine/heartbeat_manager.h"
+
+#include "google_apis/gcm/protocol/mcs.pb.h"
+#include "net/base/network_change_notifier.h"
+
+namespace gcm {
+
+namespace {
+// The default heartbeat when on a mobile or unknown network .
+const int64 kCellHeartbeatDefaultMs = 1000 * 60 * 28; // 28 minutes.
+// The default heartbeat when on WiFi (also used for ethernet).
+const int64 kWifiHeartbeatDefaultMs = 1000 * 60 * 15; // 15 minutes.
+// The default heartbeat ack interval.
+const int64 kHeartbeatAckDefaultMs = 1000 * 60 * 1; // 1 minute.
+} // namespace
+
+HeartbeatManager::HeartbeatManager()
+ : waiting_for_ack_(false),
+ heartbeat_interval_ms_(0),
+ server_interval_ms_(0),
+ heartbeat_timer_(true /* retain user task */,
+ false /* not repeating */),
+ weak_ptr_factory_(this) {}
+
+HeartbeatManager::~HeartbeatManager() {}
+
+void HeartbeatManager::Start(
+ const base::Closure& send_heartbeat_callback,
+ const base::Closure& trigger_reconnect_callback) {
+ DCHECK(!send_heartbeat_callback.is_null());
+ DCHECK(!trigger_reconnect_callback.is_null());
+ send_heartbeat_callback_ = send_heartbeat_callback;
+ trigger_reconnect_callback_ = trigger_reconnect_callback;
+
+ // Kicks off the timer.
+ waiting_for_ack_ = false;
+ RestartTimer();
+}
+
+void HeartbeatManager::Stop() {
+ heartbeat_timer_.Stop();
+ waiting_for_ack_ = false;
+}
+
+void HeartbeatManager::OnHeartbeatAcked() {
+ if (!heartbeat_timer_.IsRunning())
+ return;
+
+ DCHECK(!send_heartbeat_callback_.is_null());
+ DCHECK(!trigger_reconnect_callback_.is_null());
+ waiting_for_ack_ = false;
+ RestartTimer();
+}
+
+void HeartbeatManager::UpdateHeartbeatConfig(
+ const mcs_proto::HeartbeatConfig& config) {
+ if (!config.IsInitialized() ||
+ !config.has_interval_ms() ||
+ config.interval_ms() <= 0) {
+ return;
+ }
+ DVLOG(1) << "Updating heartbeat interval to " << config.interval_ms();
+ server_interval_ms_ = config.interval_ms();
+}
+
+base::TimeTicks HeartbeatManager::GetNextHeartbeatTime() const {
+ if (heartbeat_timer_.IsRunning())
+ return heartbeat_timer_.desired_run_time();
+ else
+ return base::TimeTicks();
+}
+
+void HeartbeatManager::OnHeartbeatTriggered() {
+ if (waiting_for_ack_) {
+ LOG(WARNING) << "Lost connection to MCS, reconnecting.";
+ Stop();
+ trigger_reconnect_callback_.Run();
+ return;
+ }
+
+ waiting_for_ack_ = true;
+ RestartTimer();
+ send_heartbeat_callback_.Run();
+}
+
+void HeartbeatManager::RestartTimer() {
+ if (!waiting_for_ack_) {
+ // Recalculate the timer interval based network type.
+ if (server_interval_ms_ != 0) {
+ // If a server interval is set, it overrides any local one.
+ heartbeat_interval_ms_ = server_interval_ms_;
+ } else if (net::NetworkChangeNotifier::GetConnectionType() ==
+ net::NetworkChangeNotifier::CONNECTION_WIFI ||
+ net::NetworkChangeNotifier::GetConnectionType() ==
+ net::NetworkChangeNotifier::CONNECTION_ETHERNET) {
+ heartbeat_interval_ms_ = kWifiHeartbeatDefaultMs;
+ } else {
+ // For unknown connections, use the longer cellular heartbeat interval.
+ heartbeat_interval_ms_ = kCellHeartbeatDefaultMs;
+ }
+ DVLOG(1) << "Sending next heartbeat in "
+ << heartbeat_interval_ms_ << " ms.";
+ } else {
+ heartbeat_interval_ms_ = kHeartbeatAckDefaultMs;
+ DVLOG(1) << "Resetting timer for ack with "
+ << heartbeat_interval_ms_ << " ms interval.";
+ }
+ heartbeat_timer_.Start(FROM_HERE,
+ base::TimeDelta::FromMilliseconds(
+ heartbeat_interval_ms_),
+ base::Bind(&HeartbeatManager::OnHeartbeatTriggered,
+ weak_ptr_factory_.GetWeakPtr()));
+}
+
+} // namespace gcm
diff --git a/google_apis/gcm/engine/heartbeat_manager.h b/google_apis/gcm/engine/heartbeat_manager.h
new file mode 100644
index 0000000..9266b98
--- /dev/null
+++ b/google_apis/gcm/engine/heartbeat_manager.h
@@ -0,0 +1,82 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef GOOGLE_APIS_GCM_ENGINE_HEARTBEAT_MANAGER_H_
+#define GOOGLE_APIS_GCM_ENGINE_HEARTBEAT_MANAGER_H_
+
+#include "base/callback.h"
+#include "base/logging.h"
+#include "base/memory/weak_ptr.h"
+#include "base/timer/timer.h"
+#include "google_apis/gcm/base/gcm_export.h"
+
+namespace mcs_proto {
+class HeartbeatConfig;
+}
+
+namespace gcm {
+
+// A heartbeat management class, capable of sending and handling heartbeat
+// receipt/failures and triggering reconnection as necessary.
+class GCM_EXPORT HeartbeatManager {
+ public:
+ HeartbeatManager();
+ ~HeartbeatManager();
+
+ // Start the heartbeat logic.
+ // |send_heartbeat_callback_| is the callback the HeartbeatManager uses to
+ // send new heartbeats. Only one heartbeat can be outstanding at a time.
+ void Start(const base::Closure& send_heartbeat_callback,
+ const base::Closure& trigger_reconnect_callback);
+
+ // Stop the timer. Start(..) must be called again to begin sending heartbeats
+ // afterwards.
+ void Stop();
+
+ // Reset the heartbeat timer. It is valid to call this even if no heartbeat
+ // is associated with the ack (for example if another signal is used to
+ // determine that the connection is alive).
+ void OnHeartbeatAcked();
+
+ // Updates the current heartbeat interval.
+ void UpdateHeartbeatConfig(const mcs_proto::HeartbeatConfig& config);
+
+ // Returns the next scheduled heartbeat time. A null time means
+ // no heartbeat is pending. If non-null and less than the
+ // current time (in ticks), the heartbeat has been triggered and an ack is
+ // pending.
+ base::TimeTicks GetNextHeartbeatTime() const;
+
+ protected:
+ // Helper method to send heartbeat on timer trigger.
+ void OnHeartbeatTriggered();
+
+ private:
+ // Restarts the heartbeat timer.
+ void RestartTimer();
+
+ // Whether the last heartbeat ping sent has been acknowledged or not.
+ bool waiting_for_ack_;
+
+ // The current heartbeat interval.
+ int heartbeat_interval_ms_;
+ // The most recent server-provided heartbeat interval (0 if none has been
+ // provided).
+ int server_interval_ms_;
+
+ // Timer for triggering heartbeats.
+ base::Timer heartbeat_timer_;
+
+ // Callbacks for interacting with the the connection.
+ base::Closure send_heartbeat_callback_;
+ base::Closure trigger_reconnect_callback_;
+
+ base::WeakPtrFactory<HeartbeatManager> weak_ptr_factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(HeartbeatManager);
+};
+
+} // namespace gcm
+
+#endif // GOOGLE_APIS_GCM_ENGINE_HEARTBEAT_MANAGER_H_
diff --git a/google_apis/gcm/engine/heartbeat_manager_unittest.cc b/google_apis/gcm/engine/heartbeat_manager_unittest.cc
new file mode 100644
index 0000000..2c3b668
--- /dev/null
+++ b/google_apis/gcm/engine/heartbeat_manager_unittest.cc
@@ -0,0 +1,176 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "google_apis/gcm/engine/heartbeat_manager.h"
+
+#include "base/message_loop/message_loop.h"
+#include "base/time/time.h"
+#include "google_apis/gcm/protocol/mcs.pb.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace gcm {
+
+namespace {
+
+mcs_proto::HeartbeatConfig BuildHeartbeatConfig(int interval_ms) {
+ mcs_proto::HeartbeatConfig config;
+ config.set_interval_ms(interval_ms);
+ return config;
+}
+
+class TestHeartbeatManager : public HeartbeatManager {
+ public:
+ TestHeartbeatManager() {}
+ virtual ~TestHeartbeatManager() {}
+
+ // Bypass the heartbeat timer, and send the heartbeat now.
+ void TriggerHearbeat();
+};
+
+void TestHeartbeatManager::TriggerHearbeat() {
+ OnHeartbeatTriggered();
+}
+
+class HeartbeatManagerTest : public testing::Test {
+ public:
+ HeartbeatManagerTest();
+ virtual ~HeartbeatManagerTest() {}
+
+ TestHeartbeatManager* manager() const { return manager_.get(); }
+ int heartbeats_sent() const { return heartbeats_sent_; }
+ int reconnects_triggered() const { return reconnects_triggered_; }
+
+ // Starts the heartbeat manager.
+ void StartManager();
+
+ private:
+ // Helper functions for verifying heartbeat manager effects.
+ void SendHeartbeatClosure();
+ void TriggerReconnectClosure();
+
+ scoped_ptr<TestHeartbeatManager> manager_;
+
+ int heartbeats_sent_;
+ int reconnects_triggered_;
+
+ base::MessageLoop message_loop_;
+};
+
+HeartbeatManagerTest::HeartbeatManagerTest()
+ : manager_(new TestHeartbeatManager()),
+ heartbeats_sent_(0),
+ reconnects_triggered_(0) {
+}
+
+void HeartbeatManagerTest::StartManager() {
+ manager_->Start(base::Bind(&HeartbeatManagerTest::SendHeartbeatClosure,
+ base::Unretained(this)),
+ base::Bind(&HeartbeatManagerTest::TriggerReconnectClosure,
+ base::Unretained(this)));
+}
+
+void HeartbeatManagerTest::SendHeartbeatClosure() {
+ heartbeats_sent_++;
+}
+
+void HeartbeatManagerTest::TriggerReconnectClosure() {
+ reconnects_triggered_++;
+}
+
+// Basic initialization. No heartbeat should be pending.
+TEST_F(HeartbeatManagerTest, Init) {
+ EXPECT_TRUE(manager()->GetNextHeartbeatTime().is_null());
+}
+
+// Acknowledging a heartbeat before starting the manager should have no effect.
+TEST_F(HeartbeatManagerTest, AckBeforeStart) {
+ manager()->OnHeartbeatAcked();
+ EXPECT_TRUE(manager()->GetNextHeartbeatTime().is_null());
+}
+
+// Starting the manager should start the heartbeat timer.
+TEST_F(HeartbeatManagerTest, Start) {
+ StartManager();
+ EXPECT_GT(manager()->GetNextHeartbeatTime(), base::TimeTicks::Now());
+ EXPECT_EQ(0, heartbeats_sent());
+ EXPECT_EQ(0, reconnects_triggered());
+}
+
+// Acking the heartbeat should trigger a new heartbeat timer.
+TEST_F(HeartbeatManagerTest, AckedHeartbeat) {
+ StartManager();
+ manager()->TriggerHearbeat();
+ base::TimeTicks heartbeat = manager()->GetNextHeartbeatTime();
+ EXPECT_GT(heartbeat, base::TimeTicks::Now());
+ EXPECT_EQ(1, heartbeats_sent());
+ EXPECT_EQ(0, reconnects_triggered());
+
+ manager()->OnHeartbeatAcked();
+ EXPECT_LT(heartbeat, manager()->GetNextHeartbeatTime());
+ EXPECT_EQ(1, heartbeats_sent());
+ EXPECT_EQ(0, reconnects_triggered());
+
+ manager()->TriggerHearbeat();
+ EXPECT_EQ(2, heartbeats_sent());
+ EXPECT_EQ(0, reconnects_triggered());
+}
+
+// Trigger a heartbeat when one was outstanding should reset the connection.
+TEST_F(HeartbeatManagerTest, UnackedHeartbeat) {
+ StartManager();
+ manager()->TriggerHearbeat();
+ EXPECT_EQ(1, heartbeats_sent());
+ EXPECT_EQ(0, reconnects_triggered());
+
+ manager()->TriggerHearbeat();
+ EXPECT_EQ(1, heartbeats_sent());
+ EXPECT_EQ(1, reconnects_triggered());
+}
+
+// Updating the heartbeat interval before starting should result in the new
+// interval being used at Start time.
+TEST_F(HeartbeatManagerTest, UpdateIntervalThenStart) {
+ const int kIntervalMs = 60 * 1000; // 60 seconds.
+ manager()->UpdateHeartbeatConfig(BuildHeartbeatConfig(kIntervalMs));
+ EXPECT_TRUE(manager()->GetNextHeartbeatTime().is_null());
+ StartManager();
+ EXPECT_LE(manager()->GetNextHeartbeatTime() - base::TimeTicks::Now(),
+ base::TimeDelta::FromMilliseconds(kIntervalMs));
+}
+
+// Updating the heartbeat interval after starting should only use the new
+// interval on the next heartbeat.
+TEST_F(HeartbeatManagerTest, StartThenUpdateInterval) {
+ const int kIntervalMs = 60 * 1000; // 60 seconds.
+ StartManager();
+ base::TimeTicks heartbeat = manager()->GetNextHeartbeatTime();
+ EXPECT_GT(heartbeat - base::TimeTicks::Now(),
+ base::TimeDelta::FromMilliseconds(kIntervalMs));
+
+ // Updating the interval should not affect an outstanding heartbeat.
+ manager()->UpdateHeartbeatConfig(BuildHeartbeatConfig(kIntervalMs));
+ EXPECT_EQ(heartbeat, manager()->GetNextHeartbeatTime());
+
+ // Triggering and acking the heartbeat should result in a heartbeat being
+ // posted with the new interval.
+ manager()->TriggerHearbeat();
+ manager()->OnHeartbeatAcked();
+
+ EXPECT_LE(manager()->GetNextHeartbeatTime() - base::TimeTicks::Now(),
+ base::TimeDelta::FromMilliseconds(kIntervalMs));
+ EXPECT_NE(heartbeat, manager()->GetNextHeartbeatTime());
+}
+
+// Stopping the manager should reset the heartbeat timer.
+TEST_F(HeartbeatManagerTest, Stop) {
+ StartManager();
+ EXPECT_GT(manager()->GetNextHeartbeatTime(), base::TimeTicks::Now());
+
+ manager()->Stop();
+ EXPECT_TRUE(manager()->GetNextHeartbeatTime().is_null());
+}
+
+} // namespace
+
+} // namespace gcm
diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc
index 07f1004..9b639c3 100644
--- a/google_apis/gcm/engine/mcs_client.cc
+++ b/google_apis/gcm/engine/mcs_client.cc
@@ -20,9 +20,6 @@ namespace {
typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
-// TODO(zea): get these values from MCS settings.
-const int64 kHeartbeatDefaultSeconds = 60 * 15; // 15 minutes.
-
// The category of messages intended for the GCM client itself from MCS.
const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
@@ -30,8 +27,8 @@ const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
const char kGCMFromField[] = "gcm@android.com";
// MCS status message types.
+// TODO(zea): handle these at the GCMClient layer.
const char kIdleNotification[] = "IdleNotification";
-// TODO(zea): consume the following message types:
// const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
// const char kPowerNotification[] = "PowerNotification";
// const char kDataActiveNotification[] = "DataActiveNotification";
@@ -97,9 +94,6 @@ MCSClient::MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store)
stream_id_out_(0),
stream_id_in_(0),
rmq_store_(rmq_store),
- heartbeat_interval_(
- base::TimeDelta::FromSeconds(kHeartbeatDefaultSeconds)),
- heartbeat_timer_(true, true),
weak_ptr_factory_(this) {
}
@@ -183,7 +177,6 @@ void MCSClient::Initialize(
}
void MCSClient::Login(uint64 android_id, uint64 security_token) {
- DCHECK_EQ(state_, LOADED);
if (android_id != android_id_ && security_token != security_token_) {
DCHECK(android_id);
DCHECK(security_token);
@@ -257,6 +250,8 @@ void MCSClient::ResetStateAndBuildLoginRequest(
last_device_to_server_stream_id_received_ = 0;
last_server_to_device_stream_id_received_ = 0;
+ heartbeat_manager_.Stop();
+
// TODO(zea): expire all messages older than their TTL.
// Add any pending acknowledgments to the list of ids.
@@ -299,8 +294,6 @@ void MCSClient::ResetStateAndBuildLoginRequest(
<< " incoming acks pending, and " << to_send_.size()
<< " pending outgoing messages.";
- heartbeat_timer_.Stop();
-
state_ = CONNECTING;
}
@@ -332,8 +325,6 @@ void MCSClient::MaybeSendMessage() {
}
void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
- // Reset the heartbeat interval.
- heartbeat_timer_.Reset();
packet_info->stream_id = ++stream_id_out_;
DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
@@ -447,6 +438,9 @@ void MCSClient::HandlePacketFromWire(
false);
}
+ // The connection is alive, treat this message as a heartbeat ack.
+ heartbeat_manager_.OnHeartbeatAcked();
+
switch (tag) {
case kLoginResponseTag: {
DCHECK_EQ(CONNECTING, state_);
@@ -455,7 +449,7 @@ void MCSClient::HandlePacketFromWire(
DVLOG(1) << "Received login response:";
DVLOG(1) << " Id: " << login_response->id();
DVLOG(1) << " Timestamp: " << login_response->server_timestamp();
- if (login_response->has_error()) {
+ if (login_response->has_error() && login_response->error().code() != 0) {
state_ = UNINITIALIZED;
DVLOG(1) << " Error code: " << login_response->error().code();
DVLOG(1) << " Error message: " << login_response->error().message();
@@ -463,6 +457,11 @@ void MCSClient::HandlePacketFromWire(
return;
}
+ if (login_response->has_heartbeat_config()) {
+ heartbeat_manager_.UpdateHeartbeatConfig(
+ login_response->heartbeat_config());
+ }
+
state_ = CONNECTED;
stream_id_in_ = 1; // To account for the login response.
DCHECK_EQ(1U, stream_id_out_);
@@ -483,10 +482,11 @@ void MCSClient::HandlePacketFromWire(
weak_ptr_factory_.GetWeakPtr()));
}
- heartbeat_timer_.Start(FROM_HERE,
- heartbeat_interval_,
- base::Bind(&MCSClient::SendHeartbeat,
- weak_ptr_factory_.GetWeakPtr()));
+ heartbeat_manager_.Start(
+ base::Bind(&MCSClient::SendHeartbeat,
+ weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&MCSClient::OnConnectionResetByHeartbeat,
+ weak_ptr_factory_.GetWeakPtr()));
return;
}
case kHeartbeatPingTag:
@@ -498,8 +498,7 @@ void MCSClient::HandlePacketFromWire(
case kHeartbeatAckTag:
DCHECK_GE(stream_id_in_, 1U);
DVLOG(1) << "Received heartbeat ack.";
- // TODO(zea): add logic to reconnect if no ack received within a certain
- // timeout (with backoff).
+ // Do nothing else, all messages act as heartbeat acks.
return;
case kCloseTag:
LOG(ERROR) << "Received close command, resetting connection.";
@@ -657,4 +656,8 @@ MCSClient::PersistentId MCSClient::GetNextPersistentId() {
return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
}
+void MCSClient::OnConnectionResetByHeartbeat() {
+ connection_factory_->SignalConnectionReset();
+}
+
} // namespace gcm
diff --git a/google_apis/gcm/engine/mcs_client.h b/google_apis/gcm/engine/mcs_client.h
index 8087e4e..741c51e 100644
--- a/google_apis/gcm/engine/mcs_client.h
+++ b/google_apis/gcm/engine/mcs_client.h
@@ -13,10 +13,10 @@
#include "base/files/file_path.h"
#include "base/memory/linked_ptr.h"
#include "base/memory/weak_ptr.h"
-#include "base/timer/timer.h"
#include "google_apis/gcm/base/gcm_export.h"
#include "google_apis/gcm/base/mcs_message.h"
#include "google_apis/gcm/engine/connection_handler.h"
+#include "google_apis/gcm/engine/heartbeat_manager.h"
#include "google_apis/gcm/engine/rmq_store.h"
namespace google {
@@ -152,6 +152,9 @@ class GCM_EXPORT MCSClient {
// Virtual for testing.
virtual PersistentId GetNextPersistentId();
+ // Helper for the heartbeat manager to signal a connection reset.
+ void OnConnectionResetByHeartbeat();
+
// Client state.
State state_;
@@ -209,11 +212,8 @@ class GCM_EXPORT MCSClient {
// The reliable message queue persistent store. Owned by the caller.
RMQStore* rmq_store_;
- // ----- Heartbeats -----
- // The current heartbeat interval.
- base::TimeDelta heartbeat_interval_;
- // Timer for triggering heartbeats.
- base::Timer heartbeat_timer_;
+ // Manager to handle triggering/detecting heartbeats.
+ HeartbeatManager heartbeat_manager_;
base::WeakPtrFactory<MCSClient> weak_ptr_factory_;
diff --git a/google_apis/gcm/gcm.gyp b/google_apis/gcm/gcm.gyp
index f81c4ef..79ecd0e 100644
--- a/google_apis/gcm/gcm.gyp
+++ b/google_apis/gcm/gcm.gyp
@@ -52,6 +52,8 @@
'engine/connection_handler.cc',
'engine/connection_handler_impl.h',
'engine/connection_handler_impl.cc',
+ 'engine/heartbeat_manager.h',
+ 'engine/heartbeat_manager.cc',
'engine/mcs_client.h',
'engine/mcs_client.cc',
'engine/rmq_store.h',
@@ -118,6 +120,7 @@
'engine/fake_connection_factory.cc',
'engine/fake_connection_handler.h',
'engine/fake_connection_handler.cc',
+ 'engine/heartbeat_manager_unittest.cc',
'engine/mcs_client_unittest.cc',
'engine/rmq_store_unittest.cc',
]