summaryrefslogtreecommitdiffstats
path: root/google_apis
diff options
context:
space:
mode:
authorzea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-12-11 01:41:40 +0000
committerzea@chromium.org <zea@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-12-11 01:41:40 +0000
commit39fe4b2597a8a9b0ba1036663dcaf7307964ac0d (patch)
tree1354d8d3b026ea651d8fe3186e87c3b593893a2d /google_apis
parenta209538a03abf337dad4215aac2965419c0d0767 (diff)
downloadchromium_src-39fe4b2597a8a9b0ba1036663dcaf7307964ac0d.zip
chromium_src-39fe4b2597a8a9b0ba1036663dcaf7307964ac0d.tar.gz
chromium_src-39fe4b2597a8a9b0ba1036663dcaf7307964ac0d.tar.bz2
[GCM] Add a working MCS client and probe tool.
The MCS client maintains the connection with the MCS endpoint and handles all reliability issues. This is the meat and potatoes of the GCM logic. The probe tool allows testing the MCS client against a production server using real connections and arbitrary credentials. BUG=284553 Review URL: https://codereview.chromium.org/89143002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@239940 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'google_apis')
-rw-r--r--google_apis/gcm/base/mcs_message_unittest.cc92
-rw-r--r--google_apis/gcm/base/mcs_util.cc19
-rw-r--r--google_apis/gcm/base/mcs_util.h2
-rw-r--r--google_apis/gcm/engine/connection_factory.h26
-rw-r--r--google_apis/gcm/engine/connection_factory_impl.cc35
-rw-r--r--google_apis/gcm/engine/connection_factory_impl.h10
-rw-r--r--google_apis/gcm/engine/connection_factory_impl_unittest.cc36
-rw-r--r--google_apis/gcm/engine/fake_connection_factory.cc46
-rw-r--r--google_apis/gcm/engine/fake_connection_factory.h42
-rw-r--r--google_apis/gcm/engine/fake_connection_handler.cc86
-rw-r--r--google_apis/gcm/engine/fake_connection_handler.h74
-rw-r--r--google_apis/gcm/engine/mcs_client.cc659
-rw-r--r--google_apis/gcm/engine/mcs_client.h231
-rw-r--r--google_apis/gcm/engine/mcs_client_unittest.cc540
-rw-r--r--google_apis/gcm/gcm.gyp32
-rw-r--r--google_apis/gcm/tools/mcs_probe.cc372
16 files changed, 2257 insertions, 45 deletions
diff --git a/google_apis/gcm/base/mcs_message_unittest.cc b/google_apis/gcm/base/mcs_message_unittest.cc
new file mode 100644
index 0000000..4d4ef59
--- /dev/null
+++ b/google_apis/gcm/base/mcs_message_unittest.cc
@@ -0,0 +1,92 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "google_apis/gcm/base/mcs_message.h"
+
+#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+#include "google_apis/gcm/base/mcs_util.h"
+#include "google_apis/gcm/protocol/mcs.pb.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace gcm {
+
+const uint64 kAndroidId = 12345;
+const uint64 kSecret = 54321;
+
+class MCSMessageTest : public testing::Test {
+ public:
+ MCSMessageTest();
+ virtual ~MCSMessageTest();
+ private:
+ base::MessageLoop message_loop_;
+};
+
+MCSMessageTest::MCSMessageTest() {
+}
+
+MCSMessageTest::~MCSMessageTest() {
+}
+
+TEST_F(MCSMessageTest, Invalid) {
+ MCSMessage message;
+ EXPECT_FALSE(message.IsValid());
+}
+
+TEST_F(MCSMessageTest, InitInferTag) {
+ scoped_ptr<mcs_proto::LoginRequest> login_request(
+ BuildLoginRequest(kAndroidId, kSecret));
+ scoped_ptr<google::protobuf::MessageLite> login_copy(
+ new mcs_proto::LoginRequest(*login_request));
+ MCSMessage message(*login_copy);
+ login_copy.reset();
+ ASSERT_TRUE(message.IsValid());
+ EXPECT_EQ(kLoginRequestTag, message.tag());
+ EXPECT_EQ(login_request->ByteSize(), message.size());
+ EXPECT_EQ(login_request->SerializeAsString(), message.SerializeAsString());
+ EXPECT_EQ(login_request->SerializeAsString(),
+ message.GetProtobuf().SerializeAsString());
+ login_copy = message.CloneProtobuf();
+ EXPECT_EQ(login_request->SerializeAsString(),
+ login_copy->SerializeAsString());
+}
+
+TEST_F(MCSMessageTest, InitWithTag) {
+ scoped_ptr<mcs_proto::LoginRequest> login_request(
+ BuildLoginRequest(kAndroidId, kSecret));
+ scoped_ptr<google::protobuf::MessageLite> login_copy(
+ new mcs_proto::LoginRequest(*login_request));
+ MCSMessage message(kLoginRequestTag, *login_copy);
+ login_copy.reset();
+ ASSERT_TRUE(message.IsValid());
+ EXPECT_EQ(kLoginRequestTag, message.tag());
+ EXPECT_EQ(login_request->ByteSize(), message.size());
+ EXPECT_EQ(login_request->SerializeAsString(), message.SerializeAsString());
+ EXPECT_EQ(login_request->SerializeAsString(),
+ message.GetProtobuf().SerializeAsString());
+ login_copy = message.CloneProtobuf();
+ EXPECT_EQ(login_request->SerializeAsString(),
+ login_copy->SerializeAsString());
+}
+
+TEST_F(MCSMessageTest, InitPassOwnership) {
+ scoped_ptr<mcs_proto::LoginRequest> login_request(
+ BuildLoginRequest(kAndroidId, kSecret));
+ scoped_ptr<google::protobuf::MessageLite> login_copy(
+ new mcs_proto::LoginRequest(*login_request));
+ MCSMessage message(kLoginRequestTag,
+ login_copy.PassAs<const google::protobuf::MessageLite>());
+ EXPECT_FALSE(login_copy.get());
+ ASSERT_TRUE(message.IsValid());
+ EXPECT_EQ(kLoginRequestTag, message.tag());
+ EXPECT_EQ(login_request->ByteSize(), message.size());
+ EXPECT_EQ(login_request->SerializeAsString(), message.SerializeAsString());
+ EXPECT_EQ(login_request->SerializeAsString(),
+ message.GetProtobuf().SerializeAsString());
+ login_copy = message.CloneProtobuf();
+ EXPECT_EQ(login_request->SerializeAsString(),
+ login_copy->SerializeAsString());
+}
+
+} // namespace gcm
diff --git a/google_apis/gcm/base/mcs_util.cc b/google_apis/gcm/base/mcs_util.cc
index b52d429..7365560 100644
--- a/google_apis/gcm/base/mcs_util.cc
+++ b/google_apis/gcm/base/mcs_util.cc
@@ -46,9 +46,8 @@ const char kLoginSettingValue[] = "1";
} // namespace
-scoped_ptr<mcs_proto::LoginRequest> BuildLoginRequest(
- uint64 auth_id,
- uint64 auth_token) {
+scoped_ptr<mcs_proto::LoginRequest> BuildLoginRequest(uint64 auth_id,
+ uint64 auth_token) {
// Create a hex encoded auth id for the device id field.
std::string auth_id_hex;
auth_id_hex = base::StringPrintf("%" PRIx64, auth_id);
@@ -87,6 +86,20 @@ scoped_ptr<mcs_proto::IqStanza> BuildStreamAck() {
return stream_ack_iq.Pass();
}
+scoped_ptr<mcs_proto::IqStanza> BuildSelectiveAck(
+ const std::vector<std::string>& acked_ids) {
+ scoped_ptr<mcs_proto::IqStanza> selective_ack_iq(new mcs_proto::IqStanza());
+ selective_ack_iq->set_type(mcs_proto::IqStanza::SET);
+ selective_ack_iq->set_id("");
+ selective_ack_iq->mutable_extension()->set_id(kSelectiveAck);
+ mcs_proto::SelectiveAck selective_ack;
+ for (size_t i = 0; i < acked_ids.size(); ++i)
+ selective_ack.add_id(acked_ids[i]);
+ selective_ack_iq->mutable_extension()->set_data(
+ selective_ack.SerializeAsString());
+ return selective_ack_iq.Pass();
+}
+
// Utility method to build a google::protobuf::MessageLite object from a MCS
// tag.
scoped_ptr<google::protobuf::MessageLite> BuildProtobufFromTag(uint8 tag) {
diff --git a/google_apis/gcm/base/mcs_util.h b/google_apis/gcm/base/mcs_util.h
index d125af7..7f92564 100644
--- a/google_apis/gcm/base/mcs_util.h
+++ b/google_apis/gcm/base/mcs_util.h
@@ -56,6 +56,8 @@ GCM_EXPORT scoped_ptr<mcs_proto::LoginRequest> BuildLoginRequest(
// Builds a StreamAck IqStanza message.
GCM_EXPORT scoped_ptr<mcs_proto::IqStanza> BuildStreamAck();
+GCM_EXPORT scoped_ptr<mcs_proto::IqStanza> BuildSelectiveAck(
+ const std::vector<std::string>& acked_ids);
// Utility methods for building and identifying MCS protobufs.
GCM_EXPORT scoped_ptr<google::protobuf::MessageLite>
diff --git a/google_apis/gcm/engine/connection_factory.h b/google_apis/gcm/engine/connection_factory.h
index 598c211..3cff482 100644
--- a/google_apis/gcm/engine/connection_factory.h
+++ b/google_apis/gcm/engine/connection_factory.h
@@ -20,26 +20,34 @@ namespace gcm {
// backoff policies when attempting connections.
class GCM_EXPORT ConnectionFactory {
public:
+ typedef base::Callback<void(mcs_proto::LoginRequest* login_request)>
+ BuildLoginRequestCallback;
+
ConnectionFactory();
virtual ~ConnectionFactory();
- // Create a new uninitialized connection handler. Should only be called once.
- // The factory will retain ownership of the connection handler.
+ // Initialize the factory, creating a connection handler with a disconnected
+ // socket. Should only be called once.
+ // Upon connection:
// |read_callback| will be invoked with the contents of any received protobuf
// message.
// |write_callback| will be invoked anytime a message has been successfully
// sent. Note: this just means the data was sent to the wire, not that the
// other end received it.
- virtual ConnectionHandler* BuildConnectionHandler(
+ virtual void Initialize(
+ const BuildLoginRequestCallback& request_builder,
const ConnectionHandler::ProtoReceivedCallback& read_callback,
const ConnectionHandler::ProtoSentCallback& write_callback) = 0;
- // Opens a new connection for use by the locally owned connection handler
- // (created via BuildConnectionHandler), and initiates login handshake using
- // |login_request|. Upon completion of the handshake, |read_callback|
- // will be invoked with a valid mcs_proto::LoginResponse.
- // Note: BuildConnectionHandler must have already been invoked.
- virtual void Connect(const mcs_proto::LoginRequest& login_request) = 0;
+ // Get the connection handler for this factory. Initialize(..) must have
+ // been called.
+ virtual ConnectionHandler* GetConnectionHandler() const = 0;
+
+ // Opens a new connection and initiates login handshake. Upon completion of
+ // the handshake, |read_callback| will be invoked with a valid
+ // mcs_proto::LoginResponse.
+ // Note: Initialize must have already been invoked.
+ virtual void Connect() = 0;
// Whether or not the MCS endpoint is currently reachable with an active
// connection.
diff --git a/google_apis/gcm/engine/connection_factory_impl.cc b/google_apis/gcm/engine/connection_factory_impl.cc
index 0a87acc..388b9dc 100644
--- a/google_apis/gcm/engine/connection_factory_impl.cc
+++ b/google_apis/gcm/engine/connection_factory_impl.cc
@@ -64,12 +64,14 @@ ConnectionFactoryImpl::ConnectionFactoryImpl(
ConnectionFactoryImpl::~ConnectionFactoryImpl() {
}
-ConnectionHandler* ConnectionFactoryImpl::BuildConnectionHandler(
- const ConnectionHandler::ProtoReceivedCallback& read_callback,
- const ConnectionHandler::ProtoSentCallback& write_callback) {
+void ConnectionFactoryImpl::Initialize(
+ const BuildLoginRequestCallback& request_builder,
+ const ConnectionHandler::ProtoReceivedCallback& read_callback,
+ const ConnectionHandler::ProtoSentCallback& write_callback) {
DCHECK(!connection_handler_);
backoff_entry_ = CreateBackoffEntry(&kConnectionBackoffPolicy);
+ request_builder_ = request_builder;
net::NetworkChangeNotifier::AddIPAddressObserver(this);
net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
@@ -80,19 +82,16 @@ ConnectionHandler* ConnectionFactoryImpl::BuildConnectionHandler(
write_callback,
base::Bind(&ConnectionFactoryImpl::ConnectionHandlerCallback,
weak_ptr_factory_.GetWeakPtr())));
+}
+
+ConnectionHandler* ConnectionFactoryImpl::GetConnectionHandler() const {
return connection_handler_.get();
}
-void ConnectionFactoryImpl::Connect(
- const mcs_proto::LoginRequest& login_request) {
+void ConnectionFactoryImpl::Connect() {
DCHECK(connection_handler_);
DCHECK(!IsEndpointReachable());
- if (login_request.IsInitialized()) {
- DCHECK(!login_request_.IsInitialized());
- login_request_ = login_request;
- }
-
if (backoff_entry_->ShouldRejectRequest()) {
DVLOG(1) << "Delaying MCS endpoint connection for "
<< backoff_entry_->GetTimeUntilRelease().InMilliseconds()
@@ -100,8 +99,7 @@ void ConnectionFactoryImpl::Connect(
base::MessageLoop::current()->PostDelayedTask(
FROM_HERE,
base::Bind(&ConnectionFactoryImpl::Connect,
- weak_ptr_factory_.GetWeakPtr(),
- login_request_),
+ weak_ptr_factory_.GetWeakPtr()),
NextRetryAttempt() - base::TimeTicks::Now());
return;
}
@@ -165,7 +163,14 @@ void ConnectionFactoryImpl::ConnectImpl() {
}
void ConnectionFactoryImpl::InitHandler() {
- connection_handler_->Init(login_request_, socket_handle_.PassSocket());
+ // May be null in tests.
+ mcs_proto::LoginRequest login_request;
+ if (!request_builder_.is_null()) {
+ request_builder_.Run(&login_request);
+ DCHECK(login_request.IsInitialized());
+ }
+
+ connection_handler_->Init(login_request, socket_handle_.PassSocket());
}
scoped_ptr<net::BackoffEntry> ConnectionFactoryImpl::CreateBackoffEntry(
@@ -177,7 +182,7 @@ void ConnectionFactoryImpl::OnConnectDone(int result) {
if (result != net::OK) {
LOG(ERROR) << "Failed to connect to MCS endpoint with error " << result;
backoff_entry_->InformOfRequest(false);
- Connect(mcs_proto::LoginRequest());
+ Connect();
return;
}
@@ -194,7 +199,7 @@ void ConnectionFactoryImpl::ConnectionHandlerCallback(int result) {
// user intervention (login page, etc.).
LOG(ERROR) << "Connection reset with error " << result;
backoff_entry_->InformOfRequest(false);
- Connect(mcs_proto::LoginRequest());
+ Connect();
}
} // namespace gcm
diff --git a/google_apis/gcm/engine/connection_factory_impl.h b/google_apis/gcm/engine/connection_factory_impl.h
index 0e40521..d807270 100644
--- a/google_apis/gcm/engine/connection_factory_impl.h
+++ b/google_apis/gcm/engine/connection_factory_impl.h
@@ -35,10 +35,12 @@ class GCM_EXPORT ConnectionFactoryImpl :
virtual ~ConnectionFactoryImpl();
// ConnectionFactory implementation.
- virtual ConnectionHandler* BuildConnectionHandler(
+ virtual void Initialize(
+ const BuildLoginRequestCallback& request_builder,
const ConnectionHandler::ProtoReceivedCallback& read_callback,
const ConnectionHandler::ProtoSentCallback& write_callback) OVERRIDE;
- virtual void Connect(const mcs_proto::LoginRequest& login_request) OVERRIDE;
+ virtual ConnectionHandler* GetConnectionHandler() const OVERRIDE;
+ virtual void Connect() OVERRIDE;
virtual bool IsEndpointReachable() const OVERRIDE;
virtual base::TimeTicks NextRetryAttempt() const OVERRIDE;
@@ -86,8 +88,8 @@ class GCM_EXPORT ConnectionFactoryImpl :
// The current connection handler, if one exists.
scoped_ptr<ConnectionHandlerImpl> connection_handler_;
- // The current login request if a connection attempt is in progress/pending.
- mcs_proto::LoginRequest login_request_;
+ // Builder for generating new login requests.
+ BuildLoginRequestCallback request_builder_;
base::WeakPtrFactory<ConnectionFactoryImpl> weak_ptr_factory_;
diff --git a/google_apis/gcm/engine/connection_factory_impl_unittest.cc b/google_apis/gcm/engine/connection_factory_impl_unittest.cc
index 40adcf2..1e0ccef 100644
--- a/google_apis/gcm/engine/connection_factory_impl_unittest.cc
+++ b/google_apis/gcm/engine/connection_factory_impl_unittest.cc
@@ -192,43 +192,48 @@ void ConnectionFactoryImplTest::ConnectionsComplete() {
}
// Verify building a connection handler works.
-TEST_F(ConnectionFactoryImplTest, BuildConnectionHandler) {
+TEST_F(ConnectionFactoryImplTest, Initialize) {
EXPECT_FALSE(factory()->IsEndpointReachable());
- ConnectionHandler* handler = factory()->BuildConnectionHandler(
+ factory()->Initialize(
+ ConnectionFactory::BuildLoginRequestCallback(),
base::Bind(&ReadContinuation),
base::Bind(&WriteContinuation));
+ ConnectionHandler* handler = factory()->GetConnectionHandler();
ASSERT_TRUE(handler);
EXPECT_FALSE(factory()->IsEndpointReachable());
}
// An initial successful connection should not result in backoff.
TEST_F(ConnectionFactoryImplTest, ConnectSuccess) {
- factory()->BuildConnectionHandler(
+ factory()->Initialize(
+ ConnectionFactory::BuildLoginRequestCallback(),
ConnectionHandler::ProtoReceivedCallback(),
ConnectionHandler::ProtoSentCallback());
factory()->SetConnectResult(net::OK);
- factory()->Connect(mcs_proto::LoginRequest());
+ factory()->Connect();
EXPECT_TRUE(factory()->NextRetryAttempt().is_null());
}
// A connection failure should result in backoff.
TEST_F(ConnectionFactoryImplTest, ConnectFail) {
- factory()->BuildConnectionHandler(
+ factory()->Initialize(
+ ConnectionFactory::BuildLoginRequestCallback(),
ConnectionHandler::ProtoReceivedCallback(),
ConnectionHandler::ProtoSentCallback());
factory()->SetConnectResult(net::ERR_CONNECTION_FAILED);
- factory()->Connect(mcs_proto::LoginRequest());
+ factory()->Connect();
EXPECT_FALSE(factory()->NextRetryAttempt().is_null());
}
// A connection success after a failure should reset backoff.
TEST_F(ConnectionFactoryImplTest, FailThenSucceed) {
- factory()->BuildConnectionHandler(
+ factory()->Initialize(
+ ConnectionFactory::BuildLoginRequestCallback(),
ConnectionHandler::ProtoReceivedCallback(),
ConnectionHandler::ProtoSentCallback());
factory()->SetConnectResult(net::ERR_CONNECTION_FAILED);
base::TimeTicks connect_time = base::TimeTicks::Now();
- factory()->Connect(mcs_proto::LoginRequest());
+ factory()->Connect();
WaitForConnections();
base::TimeTicks retry_time = factory()->NextRetryAttempt();
EXPECT_FALSE(retry_time.is_null());
@@ -241,7 +246,8 @@ TEST_F(ConnectionFactoryImplTest, FailThenSucceed) {
// Multiple connection failures should retry with an exponentially increasing
// backoff, then reset on success.
TEST_F(ConnectionFactoryImplTest, MultipleFailuresThenSucceed) {
- factory()->BuildConnectionHandler(
+ factory()->Initialize(
+ ConnectionFactory::BuildLoginRequestCallback(),
ConnectionHandler::ProtoReceivedCallback(),
ConnectionHandler::ProtoSentCallback());
@@ -250,7 +256,7 @@ TEST_F(ConnectionFactoryImplTest, MultipleFailuresThenSucceed) {
kNumAttempts);
base::TimeTicks connect_time = base::TimeTicks::Now();
- factory()->Connect(mcs_proto::LoginRequest());
+ factory()->Connect();
WaitForConnections();
base::TimeTicks retry_time = factory()->NextRetryAttempt();
EXPECT_FALSE(retry_time.is_null());
@@ -264,11 +270,12 @@ TEST_F(ConnectionFactoryImplTest, MultipleFailuresThenSucceed) {
// IP events should reset backoff.
TEST_F(ConnectionFactoryImplTest, FailThenIPEvent) {
- factory()->BuildConnectionHandler(
+ factory()->Initialize(
+ ConnectionFactory::BuildLoginRequestCallback(),
ConnectionHandler::ProtoReceivedCallback(),
ConnectionHandler::ProtoSentCallback());
factory()->SetConnectResult(net::ERR_CONNECTION_FAILED);
- factory()->Connect(mcs_proto::LoginRequest());
+ factory()->Connect();
WaitForConnections();
EXPECT_FALSE(factory()->NextRetryAttempt().is_null());
@@ -278,11 +285,12 @@ TEST_F(ConnectionFactoryImplTest, FailThenIPEvent) {
// Connection type events should reset backoff.
TEST_F(ConnectionFactoryImplTest, FailThenConnectionTypeEvent) {
- factory()->BuildConnectionHandler(
+ factory()->Initialize(
+ ConnectionFactory::BuildLoginRequestCallback(),
ConnectionHandler::ProtoReceivedCallback(),
ConnectionHandler::ProtoSentCallback());
factory()->SetConnectResult(net::ERR_CONNECTION_FAILED);
- factory()->Connect(mcs_proto::LoginRequest());
+ factory()->Connect();
WaitForConnections();
EXPECT_FALSE(factory()->NextRetryAttempt().is_null());
diff --git a/google_apis/gcm/engine/fake_connection_factory.cc b/google_apis/gcm/engine/fake_connection_factory.cc
new file mode 100644
index 0000000..54b3423
--- /dev/null
+++ b/google_apis/gcm/engine/fake_connection_factory.cc
@@ -0,0 +1,46 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "google_apis/gcm/engine/fake_connection_factory.h"
+
+#include "google_apis/gcm/engine/fake_connection_handler.h"
+#include "google_apis/gcm/protocol/mcs.pb.h"
+#include "net/socket/stream_socket.h"
+
+namespace gcm {
+
+FakeConnectionFactory::FakeConnectionFactory() {
+}
+
+FakeConnectionFactory::~FakeConnectionFactory() {
+}
+
+void FakeConnectionFactory::Initialize(
+ const BuildLoginRequestCallback& request_builder,
+ const ConnectionHandler::ProtoReceivedCallback& read_callback,
+ const ConnectionHandler::ProtoSentCallback& write_callback) {
+ request_builder_ = request_builder;
+ connection_handler_.reset(new FakeConnectionHandler(read_callback,
+ write_callback));
+}
+
+ConnectionHandler* FakeConnectionFactory::GetConnectionHandler() const {
+ return connection_handler_.get();
+}
+
+void FakeConnectionFactory::Connect() {
+ mcs_proto::LoginRequest login_request;
+ request_builder_.Run(&login_request);
+ connection_handler_->Init(login_request, scoped_ptr<net::StreamSocket>());
+}
+
+bool FakeConnectionFactory::IsEndpointReachable() const {
+ return connection_handler_.get() && connection_handler_->CanSendMessage();
+}
+
+base::TimeTicks FakeConnectionFactory::NextRetryAttempt() const {
+ return base::TimeTicks();
+}
+
+} // namespace gcm
diff --git a/google_apis/gcm/engine/fake_connection_factory.h b/google_apis/gcm/engine/fake_connection_factory.h
new file mode 100644
index 0000000..60b10e1
--- /dev/null
+++ b/google_apis/gcm/engine/fake_connection_factory.h
@@ -0,0 +1,42 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef GOOGLE_APIS_GCM_ENGINE_FAKE_CONNECTION_FACTORY_H_
+#define GOOGLE_APIS_GCM_ENGINE_FAKE_CONNECTION_FACTORY_H_
+
+#include "base/memory/scoped_ptr.h"
+#include "google_apis/gcm/engine/connection_factory.h"
+
+namespace gcm {
+
+class FakeConnectionHandler;
+
+// A connection factory that mocks out real connections, using a fake connection
+// handler instead.
+class FakeConnectionFactory : public ConnectionFactory {
+ public:
+ FakeConnectionFactory();
+ virtual ~FakeConnectionFactory();
+
+ // ConnectionFactory implementation.
+ virtual void Initialize(
+ const BuildLoginRequestCallback& request_builder,
+ const ConnectionHandler::ProtoReceivedCallback& read_callback,
+ const ConnectionHandler::ProtoSentCallback& write_callback) OVERRIDE;
+ virtual ConnectionHandler* GetConnectionHandler() const OVERRIDE;
+ virtual void Connect() OVERRIDE;
+ virtual bool IsEndpointReachable() const OVERRIDE;
+ virtual base::TimeTicks NextRetryAttempt() const OVERRIDE;
+
+ private:
+ scoped_ptr<FakeConnectionHandler> connection_handler_;
+
+ BuildLoginRequestCallback request_builder_;
+
+ DISALLOW_COPY_AND_ASSIGN(FakeConnectionFactory);
+};
+
+} // namespace gcm
+
+#endif // GOOGLE_APIS_GCM_ENGINE_FAKE_CONNECTION_FACTORY_H_
diff --git a/google_apis/gcm/engine/fake_connection_handler.cc b/google_apis/gcm/engine/fake_connection_handler.cc
new file mode 100644
index 0000000..0663933
--- /dev/null
+++ b/google_apis/gcm/engine/fake_connection_handler.cc
@@ -0,0 +1,86 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "google_apis/gcm/engine/fake_connection_handler.h"
+
+#include "base/logging.h"
+#include "google_apis/gcm/base/mcs_util.h"
+#include "net/socket/stream_socket.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace gcm {
+
+namespace {
+
+// Build a basic login response.
+scoped_ptr<google::protobuf::MessageLite> BuildLoginResponse(bool fail_login) {
+ scoped_ptr<mcs_proto::LoginResponse> login_response(
+ new mcs_proto::LoginResponse());
+ login_response->set_id("id");
+ if (fail_login)
+ login_response->mutable_error()->set_code(1);
+ return login_response.PassAs<google::protobuf::MessageLite>();
+}
+
+} // namespace
+
+FakeConnectionHandler::FakeConnectionHandler(
+ const ConnectionHandler::ProtoReceivedCallback& read_callback,
+ const ConnectionHandler::ProtoSentCallback& write_callback)
+ : read_callback_(read_callback),
+ write_callback_(write_callback),
+ fail_login_(false),
+ fail_send_(false),
+ initialized_(false) {
+}
+
+FakeConnectionHandler::~FakeConnectionHandler() {
+}
+
+void FakeConnectionHandler::Init(const mcs_proto::LoginRequest& login_request,
+ scoped_ptr<net::StreamSocket> socket) {
+ EXPECT_EQ(expected_outgoing_messages_.front().SerializeAsString(),
+ login_request.SerializeAsString());
+ expected_outgoing_messages_.pop_front();
+ DVLOG(1) << "Received init call.";
+ read_callback_.Run(BuildLoginResponse(fail_login_));
+ initialized_ = !fail_login_;
+}
+
+bool FakeConnectionHandler::CanSendMessage() const {
+ return initialized_;
+}
+
+void FakeConnectionHandler::SendMessage(
+ const google::protobuf::MessageLite& message) {
+ if (expected_outgoing_messages_.empty())
+ FAIL() << "Unexpected message sent.";
+ EXPECT_EQ(expected_outgoing_messages_.front().SerializeAsString(),
+ message.SerializeAsString());
+ expected_outgoing_messages_.pop_front();
+ DVLOG(1) << "Received message, "
+ << (fail_send_ ? " failing send." : "calling back.");
+ if (!fail_send_)
+ write_callback_.Run();
+ else
+ initialized_ = false; // Prevent future messages until reconnect.
+}
+
+void FakeConnectionHandler::ExpectOutgoingMessage(const MCSMessage& message) {
+ expected_outgoing_messages_.push_back(message);
+}
+
+void FakeConnectionHandler::ResetOutgoingMessageExpectations() {
+ expected_outgoing_messages_.clear();
+}
+
+bool FakeConnectionHandler::AllOutgoingMessagesReceived() const {
+ return expected_outgoing_messages_.empty();
+}
+
+void FakeConnectionHandler::ReceiveMessage(const MCSMessage& message) {
+ read_callback_.Run(message.CloneProtobuf());
+}
+
+} // namespace gcm
diff --git a/google_apis/gcm/engine/fake_connection_handler.h b/google_apis/gcm/engine/fake_connection_handler.h
new file mode 100644
index 0000000..5356b77
--- /dev/null
+++ b/google_apis/gcm/engine/fake_connection_handler.h
@@ -0,0 +1,74 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef GOOGLE_APIS_GCM_ENGINE_FAKE_CONNECTION_HANDLER_H_
+#define GOOGLE_APIS_GCM_ENGINE_FAKE_CONNECTION_HANDLER_H_
+
+#include <list>
+
+#include "google_apis/gcm/base/mcs_message.h"
+#include "google_apis/gcm/engine/connection_handler.h"
+
+namespace gcm {
+
+// A fake implementation of a ConnectionHandler that can arbitrarily receive
+// messages and verify expectations for outgoing messages.
+class FakeConnectionHandler : public ConnectionHandler {
+ public:
+ FakeConnectionHandler(
+ const ConnectionHandler::ProtoReceivedCallback& read_callback,
+ const ConnectionHandler::ProtoSentCallback& write_callback);
+ virtual ~FakeConnectionHandler();
+
+ // ConnectionHandler implementation.
+ virtual void Init(const mcs_proto::LoginRequest& login_request,
+ scoped_ptr<net::StreamSocket> socket) OVERRIDE;
+ virtual bool CanSendMessage() const OVERRIDE;
+ virtual void SendMessage(const google::protobuf::MessageLite& message)
+ OVERRIDE;
+
+ // EXPECT's receipt of |message| via SendMessage(..).
+ void ExpectOutgoingMessage(const MCSMessage& message);
+
+ // Reset the expected outgoing messages.
+ void ResetOutgoingMessageExpectations();
+
+ // Whether all expected outgoing messages have been received;
+ bool AllOutgoingMessagesReceived() const;
+
+ // Passes on |message| to |write_callback_|.
+ void ReceiveMessage(const MCSMessage& message);
+
+ // Whether to return an error with the next login response.
+ void set_fail_login(bool fail_login) {
+ fail_login_ = fail_login;
+ }
+
+ // Whether to invoke the write callback on the next send attempt or fake a
+ // connection error instead.
+ void set_fail_send(bool fail_send) {
+ fail_send_ = fail_send;
+ }
+
+ private:
+ ConnectionHandler::ProtoReceivedCallback read_callback_;
+ ConnectionHandler::ProtoSentCallback write_callback_;
+
+ std::list<MCSMessage> expected_outgoing_messages_;
+
+ // Whether to fail the login or not.
+ bool fail_login_;
+
+ // Whether to fail a SendMessage call or not.
+ bool fail_send_;
+
+ // Whether a successful login has completed.
+ bool initialized_;
+
+ DISALLOW_COPY_AND_ASSIGN(FakeConnectionHandler);
+};
+
+} // namespace gcm
+
+#endif // GOOGLE_APIS_GCM_ENGINE_FAKE_CONNECTION_HANDLER_H_
diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc
new file mode 100644
index 0000000..f0af051
--- /dev/null
+++ b/google_apis/gcm/engine/mcs_client.cc
@@ -0,0 +1,659 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "google_apis/gcm/engine/mcs_client.h"
+
+#include "base/basictypes.h"
+#include "base/message_loop/message_loop.h"
+#include "base/strings/string_number_conversions.h"
+#include "google_apis/gcm/base/mcs_util.h"
+#include "google_apis/gcm/base/socket_stream.h"
+#include "google_apis/gcm/engine/connection_factory.h"
+#include "google_apis/gcm/engine/rmq_store.h"
+
+using namespace google::protobuf::io;
+
+namespace gcm {
+
+namespace {
+
+typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
+
+// TODO(zea): get these values from MCS settings.
+const int64 kHeartbeatDefaultSeconds = 60 * 15; // 15 minutes.
+
+// The category of messages intended for the GCM client itself from MCS.
+const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
+
+// The from field for messages originating in the GCM client.
+const char kGCMFromField[] = "gcm@android.com";
+
+// MCS status message types.
+const char kIdleNotification[] = "IdleNotification";
+// TODO(zea): consume the following message types:
+// const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
+// const char kPowerNotification[] = "PowerNotification";
+// const char kDataActiveNotification[] = "DataActiveNotification";
+
+// The number of unacked messages to allow before sending a stream ack.
+// Applies to both incoming and outgoing messages.
+// TODO(zea): make this server configurable.
+const int kUnackedMessageBeforeStreamAck = 10;
+
+// The global maximum number of pending messages to have in the send queue.
+const size_t kMaxSendQueueSize = 10 * 1024;
+
+// The maximum message size that can be sent to the server.
+const int kMaxMessageBytes = 4 * 1024; // 4KB, like the server.
+
+// Helper for converting a proto persistent id list to a vector of strings.
+bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
+ std::vector<std::string>* id_list) {
+ mcs_proto::SelectiveAck selective_ack;
+ if (!selective_ack.ParseFromString(bytes))
+ return false;
+ std::vector<std::string> new_list;
+ for (int i = 0; i < selective_ack.id_size(); ++i) {
+ DCHECK(!selective_ack.id(i).empty());
+ new_list.push_back(selective_ack.id(i));
+ }
+ id_list->swap(new_list);
+ return true;
+}
+
+} // namespace
+
+struct ReliablePacketInfo {
+ ReliablePacketInfo();
+ ~ReliablePacketInfo();
+
+ // The stream id with which the message was sent.
+ uint32 stream_id;
+
+ // If reliable delivery was requested, the persistent id of the message.
+ std::string persistent_id;
+
+ // The type of message itself (for easier lookup).
+ uint8 tag;
+
+ // The protobuf of the message itself.
+ MCSProto protobuf;
+};
+
+ReliablePacketInfo::ReliablePacketInfo()
+ : stream_id(0), tag(0) {
+}
+ReliablePacketInfo::~ReliablePacketInfo() {}
+
+MCSClient::MCSClient(
+ const base::FilePath& rmq_path,
+ ConnectionFactory* connection_factory,
+ scoped_refptr<base::SequencedTaskRunner> blocking_task_runner)
+ : state_(UNINITIALIZED),
+ android_id_(0),
+ security_token_(0),
+ connection_factory_(connection_factory),
+ connection_handler_(NULL),
+ last_device_to_server_stream_id_received_(0),
+ last_server_to_device_stream_id_received_(0),
+ stream_id_out_(0),
+ stream_id_in_(0),
+ rmq_store_(rmq_path, blocking_task_runner),
+ heartbeat_interval_(
+ base::TimeDelta::FromSeconds(kHeartbeatDefaultSeconds)),
+ heartbeat_timer_(true, true),
+ blocking_task_runner_(blocking_task_runner),
+ weak_ptr_factory_(this) {
+}
+
+MCSClient::~MCSClient() {
+}
+
+void MCSClient::Initialize(
+ const InitializationCompleteCallback& initialization_callback,
+ const OnMessageReceivedCallback& message_received_callback,
+ const OnMessageSentCallback& message_sent_callback) {
+ DCHECK_EQ(state_, UNINITIALIZED);
+ initialization_callback_ = initialization_callback;
+ message_received_callback_ = message_received_callback;
+ message_sent_callback_ = message_sent_callback;
+
+ state_ = LOADING;
+ rmq_store_.Load(base::Bind(&MCSClient::OnRMQLoadFinished,
+ weak_ptr_factory_.GetWeakPtr()));
+
+ connection_factory_->Initialize(
+ base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
+ weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&MCSClient::HandlePacketFromWire,
+ weak_ptr_factory_.GetWeakPtr()),
+ base::Bind(&MCSClient::MaybeSendMessage,
+ weak_ptr_factory_.GetWeakPtr()));
+ connection_handler_ = connection_factory_->GetConnectionHandler();
+}
+
+void MCSClient::Login(uint64 android_id, uint64 security_token) {
+ DCHECK_EQ(state_, LOADED);
+ if (android_id != android_id_ && security_token != security_token_) {
+ DCHECK(android_id);
+ DCHECK(security_token);
+ DCHECK(restored_unackeds_server_ids_.empty());
+ android_id_ = android_id;
+ security_token_ = security_token;
+ rmq_store_.SetDeviceCredentials(android_id_,
+ security_token_,
+ base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
+ }
+
+ state_ = CONNECTING;
+ connection_factory_->Connect();
+}
+
+void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) {
+ DCHECK_EQ(state_, CONNECTED);
+ if (to_send_.size() > kMaxSendQueueSize) {
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(message_sent_callback_, "Message queue full."));
+ return;
+ }
+ if (message.size() > kMaxMessageBytes) {
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(message_sent_callback_, "Message too large."));
+ return;
+ }
+
+ ReliablePacketInfo* packet_info = new ReliablePacketInfo();
+ packet_info->protobuf = message.CloneProtobuf();
+
+ if (use_rmq) {
+ PersistentId persistent_id = GetNextPersistentId();
+ DVLOG(1) << "Setting persistent id to " << persistent_id;
+ packet_info->persistent_id = persistent_id;
+ SetPersistentId(persistent_id,
+ packet_info->protobuf.get());
+ rmq_store_.AddOutgoingMessage(persistent_id,
+ MCSMessage(message.tag(),
+ *(packet_info->protobuf)),
+ base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
+ } else {
+ // Check that there is an active connection to the endpoint.
+ if (!connection_handler_->CanSendMessage()) {
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(message_sent_callback_, "Unable to reach endpoint"));
+ return;
+ }
+ }
+ to_send_.push_back(make_linked_ptr(packet_info));
+ MaybeSendMessage();
+}
+
+void MCSClient::Destroy() {
+ rmq_store_.Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
+}
+
+void MCSClient::ResetStateAndBuildLoginRequest(
+ mcs_proto::LoginRequest* request) {
+ DCHECK(android_id_);
+ DCHECK(security_token_);
+ stream_id_in_ = 0;
+ stream_id_out_ = 1;
+ last_device_to_server_stream_id_received_ = 0;
+ last_server_to_device_stream_id_received_ = 0;
+
+ // TODO(zea): expire all messages older than their TTL.
+
+ // Add any pending acknowledgments to the list of ids.
+ for (StreamIdToPersistentIdMap::const_iterator iter =
+ unacked_server_ids_.begin();
+ iter != unacked_server_ids_.end(); ++iter) {
+ restored_unackeds_server_ids_.push_back(iter->second);
+ }
+ unacked_server_ids_.clear();
+
+ // Any acknowledged server ids which have not been confirmed by the server
+ // are treated like unacknowledged ids.
+ for (std::map<StreamId, PersistentIdList>::const_iterator iter =
+ acked_server_ids_.begin();
+ iter != acked_server_ids_.end(); ++iter) {
+ restored_unackeds_server_ids_.insert(restored_unackeds_server_ids_.end(),
+ iter->second.begin(),
+ iter->second.end());
+ }
+ acked_server_ids_.clear();
+
+ // Then build the request, consuming all pending acknowledgments.
+ request->Swap(BuildLoginRequest(android_id_, security_token_).get());
+ for (PersistentIdList::const_iterator iter =
+ restored_unackeds_server_ids_.begin();
+ iter != restored_unackeds_server_ids_.end(); ++iter) {
+ request->add_received_persistent_id(*iter);
+ }
+ acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_;
+ restored_unackeds_server_ids_.clear();
+
+ // Push all unacknowledged messages to front of send queue. No need to save
+ // to RMQ, as all messages that reach this point should already have been
+ // saved as necessary.
+ while (!to_resend_.empty()) {
+ to_send_.push_front(to_resend_.back());
+ to_resend_.pop_back();
+ }
+ DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
+ << " incoming acks pending, and " << to_send_.size()
+ << " pending outgoing messages.";
+
+ heartbeat_timer_.Stop();
+
+ state_ = CONNECTING;
+}
+
+void MCSClient::SendHeartbeat() {
+ SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()),
+ false);
+}
+
+void MCSClient::OnRMQLoadFinished(const RMQStore::LoadResult& result) {
+ if (!result.success) {
+ state_ = UNINITIALIZED;
+ LOG(ERROR) << "Failed to load/create RMQ state. Not connecting.";
+ initialization_callback_.Run(false, 0, 0);
+ return;
+ }
+ state_ = LOADED;
+ stream_id_out_ = 1; // Login request is hardcoded to id 1.
+
+ if (result.device_android_id == 0 || result.device_security_token == 0) {
+ DVLOG(1) << "No device credentials found, assuming new client.";
+ initialization_callback_.Run(true, 0, 0);
+ return;
+ }
+
+ android_id_ = result.device_android_id;
+ security_token_ = result.device_security_token;
+
+ DVLOG(1) << "RMQ Load finished with " << result.incoming_messages.size()
+ << " incoming acks pending and " << result.outgoing_messages.size()
+ << " outgoing messages pending.";
+
+ restored_unackeds_server_ids_ = result.incoming_messages;
+
+ // First go through and order the outgoing messages by recency.
+ std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
+ for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator
+ iter = result.outgoing_messages.begin();
+ iter != result.outgoing_messages.end(); ++iter) {
+ uint64 timestamp = 0;
+ if (!base::StringToUint64(iter->first, &timestamp)) {
+ LOG(ERROR) << "Invalid restored message.";
+ return;
+ }
+ ordered_messages[timestamp] = iter->second;
+ }
+
+ // Now go through and add the outgoing messages to the send queue in their
+ // appropriate order (oldest at front, most recent at back).
+ for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator
+ iter = ordered_messages.begin();
+ iter != ordered_messages.end(); ++iter) {
+ ReliablePacketInfo* packet_info = new ReliablePacketInfo();
+ packet_info->protobuf.reset(iter->second);
+ packet_info->persistent_id = base::Uint64ToString(iter->first);
+ to_send_.push_back(make_linked_ptr(packet_info));
+ }
+
+ initialization_callback_.Run(true, android_id_, security_token_);
+}
+
+void MCSClient::OnRMQUpdateFinished(bool success) {
+ LOG_IF(ERROR, !success) << "RMQ Update failed!";
+ // TODO(zea): Rebuild the store from scratch in case of persistence failure?
+}
+
+void MCSClient::MaybeSendMessage() {
+ if (to_send_.empty())
+ return;
+
+ if (!connection_handler_->CanSendMessage())
+ return;
+
+ // TODO(zea): drop messages older than their TTL.
+
+ DVLOG(1) << "Pending output message found, sending.";
+ MCSPacketInternal packet = to_send_.front();
+ to_send_.pop_front();
+ if (!packet->persistent_id.empty())
+ to_resend_.push_back(packet);
+ SendPacketToWire(packet.get());
+}
+
+void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
+ // Reset the heartbeat interval.
+ heartbeat_timer_.Reset();
+ packet_info->stream_id = ++stream_id_out_;
+ DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
+
+ // Set the proper last received stream id to acknowledge received server
+ // packets.
+ DVLOG(1) << "Setting last stream id received to "
+ << stream_id_in_;
+ SetLastStreamIdReceived(stream_id_in_,
+ packet_info->protobuf.get());
+ if (stream_id_in_ != last_server_to_device_stream_id_received_) {
+ last_server_to_device_stream_id_received_ = stream_id_in_;
+ // Mark all acknowledged server messages as such. Note: they're not dropped,
+ // as it may be that they'll need to be re-acked if this message doesn't
+ // make it.
+ PersistentIdList persistent_id_list;
+ for (StreamIdToPersistentIdMap::const_iterator iter =
+ unacked_server_ids_.begin();
+ iter != unacked_server_ids_.end(); ++iter) {
+ DCHECK_LE(iter->first, last_server_to_device_stream_id_received_);
+ persistent_id_list.push_back(iter->second);
+ }
+ unacked_server_ids_.clear();
+ acked_server_ids_[stream_id_out_] = persistent_id_list;
+ }
+
+ connection_handler_->SendMessage(*packet_info->protobuf);
+}
+
+void MCSClient::HandleMCSDataMesssage(
+ scoped_ptr<google::protobuf::MessageLite> protobuf) {
+ mcs_proto::DataMessageStanza* data_message =
+ reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
+ // TODO(zea): implement a proper status manager rather than hardcoding these
+ // values.
+ scoped_ptr<mcs_proto::DataMessageStanza> response(
+ new mcs_proto::DataMessageStanza());
+ response->set_from(kGCMFromField);
+ bool send = false;
+ for (int i = 0; i < data_message->app_data_size(); ++i) {
+ const mcs_proto::AppData& app_data = data_message->app_data(i);
+ if (app_data.key() == kIdleNotification) {
+ // Tell the MCS server the client is not idle.
+ send = true;
+ mcs_proto::AppData data;
+ data.set_key(kIdleNotification);
+ data.set_value("false");
+ response->add_app_data()->CopyFrom(data);
+ response->set_category(kMCSCategory);
+ }
+ }
+
+ if (send) {
+ SendMessage(
+ MCSMessage(kDataMessageStanzaTag,
+ response.PassAs<const google::protobuf::MessageLite>()),
+ false);
+ }
+}
+
+void MCSClient::HandlePacketFromWire(
+ scoped_ptr<google::protobuf::MessageLite> protobuf) {
+ if (!protobuf.get())
+ return;
+ uint8 tag = GetMCSProtoTag(*protobuf);
+ PersistentId persistent_id = GetPersistentId(*protobuf);
+ StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf);
+
+ if (last_stream_id_received != 0) {
+ last_device_to_server_stream_id_received_ = last_stream_id_received;
+
+ // Process device to server messages that have now been acknowledged by the
+ // server. Because messages are stored in order, just pop off all that have
+ // a stream id lower than server's last received stream id.
+ HandleStreamAck(last_stream_id_received);
+
+ // Process server_to_device_messages that the server now knows were
+ // acknowledged. Again, they're in order, so just keep going until the
+ // stream id is reached.
+ StreamIdList acked_stream_ids_to_remove;
+ for (std::map<StreamId, PersistentIdList>::iterator iter =
+ acked_server_ids_.begin();
+ iter != acked_server_ids_.end() &&
+ iter->first <= last_stream_id_received; ++iter) {
+ acked_stream_ids_to_remove.push_back(iter->first);
+ }
+ for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin();
+ iter != acked_stream_ids_to_remove.end(); ++iter) {
+ acked_server_ids_.erase(*iter);
+ }
+ }
+
+ ++stream_id_in_;
+ if (!persistent_id.empty()) {
+ unacked_server_ids_[stream_id_in_] = persistent_id;
+ rmq_store_.AddIncomingMessage(persistent_id,
+ base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
+ }
+
+ DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
+ << " with persistent id "
+ << (persistent_id.empty() ? "NULL" : persistent_id)
+ << ", stream id " << stream_id_in_ << " and last stream id received "
+ << last_stream_id_received;
+
+ if (unacked_server_ids_.size() > 0 &&
+ unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
+ SendMessage(MCSMessage(kIqStanzaTag,
+ BuildStreamAck().
+ PassAs<const google::protobuf::MessageLite>()),
+ false);
+ }
+
+ switch (tag) {
+ case kLoginResponseTag: {
+ mcs_proto::LoginResponse* login_response =
+ reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
+ DVLOG(1) << "Received login response:";
+ DVLOG(1) << " Id: " << login_response->id();
+ DVLOG(1) << " Timestamp: " << login_response->server_timestamp();
+ if (login_response->has_error()) {
+ state_ = UNINITIALIZED;
+ DVLOG(1) << " Error code: " << login_response->error().code();
+ DVLOG(1) << " Error message: " << login_response->error().message();
+ initialization_callback_.Run(false, 0, 0);
+ return;
+ }
+
+ state_ = CONNECTED;
+ stream_id_in_ = 1; // To account for the login response.
+ DCHECK_EQ(1U, stream_id_out_);
+
+ // Pass the login response on up.
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(message_received_callback_,
+ MCSMessage(tag,
+ protobuf.PassAs<
+ const google::protobuf::MessageLite>())));
+
+ // If there are pending messages, attempt to send one.
+ if (!to_send_.empty()) {
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&MCSClient::MaybeSendMessage,
+ weak_ptr_factory_.GetWeakPtr()));
+ }
+
+ heartbeat_timer_.Start(FROM_HERE,
+ heartbeat_interval_,
+ base::Bind(&MCSClient::SendHeartbeat,
+ weak_ptr_factory_.GetWeakPtr()));
+ return;
+ }
+ case kHeartbeatPingTag:
+ DCHECK_GE(stream_id_in_, 1U);
+ DVLOG(1) << "Received heartbeat ping, sending ack.";
+ SendMessage(
+ MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false);
+ return;
+ case kHeartbeatAckTag:
+ DCHECK_GE(stream_id_in_, 1U);
+ DVLOG(1) << "Received heartbeat ack.";
+ // TODO(zea): add logic to reconnect if no ack received within a certain
+ // timeout (with backoff).
+ return;
+ case kCloseTag:
+ LOG(ERROR) << "Received close command, closing connection.";
+ state_ = UNINITIALIZED;
+ initialization_callback_.Run(false, 0, 0);
+ // TODO(zea): should this happen in non-error cases? Reconnect?
+ return;
+ case kIqStanzaTag: {
+ DCHECK_GE(stream_id_in_, 1U);
+ mcs_proto::IqStanza* iq_stanza =
+ reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get());
+ const mcs_proto::Extension& iq_extension = iq_stanza->extension();
+ switch (iq_extension.id()) {
+ case kSelectiveAck: {
+ PersistentIdList acked_ids;
+ if (BuildPersistentIdListFromProto(iq_extension.data(),
+ &acked_ids)) {
+ HandleSelectiveAck(acked_ids);
+ }
+ return;
+ }
+ case kStreamAck:
+ // Do nothing. The last received stream id is always processed if it's
+ // present.
+ return;
+ default:
+ LOG(WARNING) << "Received invalid iq stanza extension "
+ << iq_extension.id();
+ return;
+ }
+ }
+ case kDataMessageStanzaTag: {
+ DCHECK_GE(stream_id_in_, 1U);
+ mcs_proto::DataMessageStanza* data_message =
+ reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
+ if (data_message->category() == kMCSCategory) {
+ HandleMCSDataMesssage(protobuf.Pass());
+ return;
+ }
+
+ DCHECK(protobuf.get());
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(message_received_callback_,
+ MCSMessage(tag,
+ protobuf.PassAs<
+ const google::protobuf::MessageLite>())));
+ return;
+ }
+ default:
+ LOG(ERROR) << "Received unexpected message of type "
+ << static_cast<int>(tag);
+ return;
+ }
+}
+
+void MCSClient::HandleStreamAck(StreamId last_stream_id_received) {
+ PersistentIdList acked_outgoing_persistent_ids;
+ StreamIdList acked_outgoing_stream_ids;
+ while (!to_resend_.empty() &&
+ to_resend_.front()->stream_id <= last_stream_id_received) {
+ const MCSPacketInternal& outgoing_packet = to_resend_.front();
+ acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id);
+ acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id);
+ to_resend_.pop_front();
+ }
+
+ DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size()
+ << " outgoing messages, " << to_resend_.size()
+ << " remaining unacked";
+ rmq_store_.RemoveOutgoingMessages(acked_outgoing_persistent_ids,
+ base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
+
+ HandleServerConfirmedReceipt(last_stream_id_received);
+}
+
+void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
+ // First check the to_resend_ queue. Acknowledgments should always happen
+ // in the order they were sent, so if messages are present they should match
+ // the acknowledge list.
+ PersistentIdList::const_iterator iter = id_list.begin();
+ for (; iter != id_list.end() && !to_resend_.empty(); ++iter) {
+ const MCSPacketInternal& outgoing_packet = to_resend_.front();
+ DCHECK_EQ(outgoing_packet->persistent_id, *iter);
+
+ // No need to re-acknowledge any server messages this message already
+ // acknowledged.
+ StreamId device_stream_id = outgoing_packet->stream_id;
+ HandleServerConfirmedReceipt(device_stream_id);
+
+ to_resend_.pop_front();
+ }
+
+ // If the acknowledged ids aren't all there, they might be in the to_send_
+ // queue (typically when a StreamAck confirms messages as part of a login
+ // response).
+ for (; iter != id_list.end() && !to_send_.empty(); ++iter) {
+ const MCSPacketInternal& outgoing_packet = to_send_.front();
+ DCHECK_EQ(outgoing_packet->persistent_id, *iter);
+
+ // No need to re-acknowledge any server messages this message already
+ // acknowledged.
+ StreamId device_stream_id = outgoing_packet->stream_id;
+ HandleServerConfirmedReceipt(device_stream_id);
+
+ to_send_.pop_front();
+ }
+
+ DCHECK(iter == id_list.end());
+
+ DVLOG(1) << "Server acked " << id_list.size()
+ << " messages, " << to_resend_.size() << " remaining unacked.";
+ rmq_store_.RemoveOutgoingMessages(id_list,
+ base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
+
+ // Resend any remaining outgoing messages, as they were not received by the
+ // server.
+ DVLOG(1) << "Resending " << to_resend_.size() << " messages.";
+ while (!to_resend_.empty()) {
+ to_send_.push_front(to_resend_.back());
+ to_resend_.pop_back();
+ }
+}
+
+void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) {
+ // TODO(zea): use a message id the sender understands.
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(message_sent_callback_,
+ "Message " + base::UintToString(device_stream_id) + " sent."));
+
+ PersistentIdList acked_incoming_ids;
+ for (std::map<StreamId, PersistentIdList>::iterator iter =
+ acked_server_ids_.begin();
+ iter != acked_server_ids_.end() &&
+ iter->first <= device_stream_id;) {
+ acked_incoming_ids.insert(acked_incoming_ids.end(),
+ iter->second.begin(),
+ iter->second.end());
+ acked_server_ids_.erase(iter++);
+ }
+
+ DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size()
+ << " acknowledged server messages.";
+ rmq_store_.RemoveIncomingMessages(acked_incoming_ids,
+ base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
+}
+
+MCSClient::PersistentId MCSClient::GetNextPersistentId() {
+ return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
+}
+
+} // namespace gcm
diff --git a/google_apis/gcm/engine/mcs_client.h b/google_apis/gcm/engine/mcs_client.h
new file mode 100644
index 0000000..4de62cb
--- /dev/null
+++ b/google_apis/gcm/engine/mcs_client.h
@@ -0,0 +1,231 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef GOOGLE_APIS_GCM_ENGINE_MCS_CLIENT_H_
+#define GOOGLE_APIS_GCM_ENGINE_MCS_CLIENT_H_
+
+#include <deque>
+#include <map>
+#include <string>
+#include <vector>
+
+#include "base/files/file_path.h"
+#include "base/memory/linked_ptr.h"
+#include "base/memory/weak_ptr.h"
+#include "base/timer/timer.h"
+#include "google_apis/gcm/base/gcm_export.h"
+#include "google_apis/gcm/base/mcs_message.h"
+#include "google_apis/gcm/engine/connection_handler.h"
+#include "google_apis/gcm/engine/rmq_store.h"
+
+namespace google {
+namespace protobuf {
+class MessageLite;
+} // namespace protobuf
+} // namespace google
+
+namespace mcs_proto {
+class LoginRequest;
+}
+
+namespace gcm {
+
+class ConnectionFactory;
+struct ReliablePacketInfo;
+
+// An MCS client. This client is in charge of all communications with an
+// MCS endpoint, and is capable of reliably sending/receiving GCM messages.
+// NOTE: Not thread safe. This class should live on the same thread as that
+// network requests are performed on.
+class GCM_EXPORT MCSClient {
+ public:
+ enum State {
+ UNINITIALIZED, // Uninitialized.
+ LOADING, // Waiting for RMQ load to finish.
+ LOADED, // RMQ Load finished, waiting to connect.
+ CONNECTING, // Connection in progress.
+ CONNECTED, // Connected and running.
+ };
+
+ // Callback for informing MCSClient status. It is valid for this to be
+ // invoked more than once if a permanent error is encountered after a
+ // successful login was initiated.
+ typedef base::Callback<
+ void(bool success,
+ uint64 restored_android_id,
+ uint64 restored_security_token)> InitializationCompleteCallback;
+ // Callback when a message is received.
+ typedef base::Callback<void(const MCSMessage& message)>
+ OnMessageReceivedCallback;
+ // Callback when a message is sent (and receipt has been acknowledged by
+ // the MCS endpoint).
+ // TODO(zea): pass some sort of structure containing more details about
+ // send failures.
+ typedef base::Callback<void(const std::string& message_id)>
+ OnMessageSentCallback;
+
+ MCSClient(const base::FilePath& rmq_path,
+ ConnectionFactory* connection_factory,
+ scoped_refptr<base::SequencedTaskRunner> blocking_task_runner);
+ virtual ~MCSClient();
+
+ // Initialize the client. Will load any previous id/token information as well
+ // as unacknowledged message information from the RMQ storage, if it exists,
+ // passing the id/token information back via |initialization_callback| along
+ // with a |success == true| result. If no RMQ information is present (and
+ // this is therefore a fresh client), a clean RMQ store will be created and
+ // values of 0 will be returned via |initialization_callback| with
+ // |success == true|.
+ /// If an error loading the RMQ store is encountered,
+ // |initialization_callback| will be invoked with |success == false|.
+ void Initialize(const InitializationCompleteCallback& initialization_callback,
+ const OnMessageReceivedCallback& message_received_callback,
+ const OnMessageSentCallback& message_sent_callback);
+
+ // Logs the client into the server. Client must be initialized.
+ // |android_id| and |security_token| are optional if this is not a new
+ // client, else they must be non-zero.
+ // Successful login will result in |message_received_callback| being invoked
+ // with a valid LoginResponse.
+ // Login failure (typically invalid id/token) will shut down the client, and
+ // |initialization_callback| to be invoked with |success = false|.
+ void Login(uint64 android_id, uint64 security_token);
+
+ // Sends a message, with or without reliable message queueing (RMQ) support.
+ // Will asynchronously invoke the OnMessageSent callback regardless.
+ // TODO(zea): support TTL.
+ void SendMessage(const MCSMessage& message, bool use_rmq);
+
+ // Disconnects the client and permanently destroys the persistent RMQ store.
+ // WARNING: This is permanent, and the client must be recreated with new
+ // credentials afterwards.
+ void Destroy();
+
+ // Returns the current state of the client.
+ State state() const { return state_; }
+
+ private:
+ typedef uint32 StreamId;
+ typedef std::string PersistentId;
+ typedef std::vector<StreamId> StreamIdList;
+ typedef std::vector<PersistentId> PersistentIdList;
+ typedef std::map<StreamId, PersistentId> StreamIdToPersistentIdMap;
+ typedef linked_ptr<ReliablePacketInfo> MCSPacketInternal;
+
+ // Resets the internal state and builds a new login request, acknowledging
+ // any pending server-to-device messages and rebuilding the send queue
+ // from all unacknowledged device-to-server messages.
+ // Should only be called when the connection has been reset.
+ void ResetStateAndBuildLoginRequest(mcs_proto::LoginRequest* request);
+
+ // Send a heartbeat to the MCS server.
+ void SendHeartbeat();
+
+ // RMQ Store callbacks.
+ void OnRMQLoadFinished(const RMQStore::LoadResult& result);
+ void OnRMQUpdateFinished(bool success);
+
+ // Attempt to send a message.
+ void MaybeSendMessage();
+
+ // Helper for sending a protobuf along with any unacknowledged ids to the
+ // wire.
+ void SendPacketToWire(ReliablePacketInfo* packet_info);
+
+ // Handle a data message sent to the MCS client system from the MCS server.
+ void HandleMCSDataMesssage(
+ scoped_ptr<google::protobuf::MessageLite> protobuf);
+
+ // Handle a packet received over the wire.
+ void HandlePacketFromWire(scoped_ptr<google::protobuf::MessageLite> protobuf);
+
+ // ReliableMessageQueue acknowledgment helpers.
+ // Handle a StreamAck sent by the server confirming receipt of all
+ // messages up to the message with stream id |last_stream_id_received|.
+ void HandleStreamAck(StreamId last_stream_id_received_);
+ // Handle a SelectiveAck sent by the server confirming all messages
+ // in |id_list|.
+ void HandleSelectiveAck(const PersistentIdList& id_list);
+ // Handle server confirmation of a device message, including device's
+ // acknowledgment of receipt of messages.
+ void HandleServerConfirmedReceipt(StreamId device_stream_id);
+
+ // Generates a new persistent id for messages.
+ // Virtual for testing.
+ virtual PersistentId GetNextPersistentId();
+
+ // Client state.
+ State state_;
+
+ // Callbacks for owner.
+ InitializationCompleteCallback initialization_callback_;
+ OnMessageReceivedCallback message_received_callback_;
+ OnMessageSentCallback message_sent_callback_;
+
+ // The android id and security token in use by this device.
+ uint64 android_id_;
+ uint64 security_token_;
+
+ // Factory for creating new connections and connection handlers.
+ ConnectionFactory* connection_factory_;
+
+ // Connection handler to handle all over-the-wire protocol communication
+ // with the mobile connection server.
+ ConnectionHandler* connection_handler_;
+
+ // ----- Reliablie Message Queue section -----
+ // Note: all queues/maps are ordered from oldest (front/begin) message to
+ // most recent (back/end).
+
+ // Send/acknowledge queues.
+ std::deque<MCSPacketInternal> to_send_;
+ std::deque<MCSPacketInternal> to_resend_;
+
+ // Last device_to_server stream id acknowledged by the server.
+ StreamId last_device_to_server_stream_id_received_;
+ // Last server_to_device stream id acknowledged by this device.
+ StreamId last_server_to_device_stream_id_received_;
+ // The stream id for the last sent message. A new message should consume
+ // stream_id_out_ + 1.
+ StreamId stream_id_out_;
+ // The stream id of the last received message. The LoginResponse will always
+ // have a stream id of 1, and stream ids increment by 1 for each received
+ // message.
+ StreamId stream_id_in_;
+
+ // The server messages that have not been acked by the device yet. Keyed by
+ // server stream id.
+ StreamIdToPersistentIdMap unacked_server_ids_;
+
+ // Those server messages that have been acked. They must remain tracked
+ // until the ack message is itself confirmed. The list of all message ids
+ // acknowledged are keyed off the device stream id of the message that
+ // acknowledged them.
+ std::map<StreamId, PersistentIdList> acked_server_ids_;
+
+ // Those server messages from a previous connection that were not fully
+ // acknowledged. They do not have associated stream ids, and will be
+ // acknowledged on the next login attempt.
+ PersistentIdList restored_unackeds_server_ids_;
+
+ // The reliable message queue persistent store.
+ RMQStore rmq_store_;
+
+ // ----- Heartbeats -----
+ // The current heartbeat interval.
+ base::TimeDelta heartbeat_interval_;
+ // Timer for triggering heartbeats.
+ base::Timer heartbeat_timer_;
+
+ // The task runner for blocking tasks (i.e. persisting RMQ state to disk).
+ scoped_refptr<base::SequencedTaskRunner> blocking_task_runner_;
+
+ base::WeakPtrFactory<MCSClient> weak_ptr_factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(MCSClient);
+};
+
+} // namespace gcm
+
+#endif // GOOGLE_APIS_GCM_ENGINE_MCS_CLIENT_H_
diff --git a/google_apis/gcm/engine/mcs_client_unittest.cc b/google_apis/gcm/engine/mcs_client_unittest.cc
new file mode 100644
index 0000000..6ef1405
--- /dev/null
+++ b/google_apis/gcm/engine/mcs_client_unittest.cc
@@ -0,0 +1,540 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "google_apis/gcm/engine/mcs_client.h"
+
+#include "base/files/scoped_temp_dir.h"
+#include "base/message_loop/message_loop.h"
+#include "base/run_loop.h"
+#include "base/strings/string_number_conversions.h"
+#include "components/webdata/encryptor/encryptor.h"
+#include "google_apis/gcm/base/mcs_util.h"
+#include "google_apis/gcm/engine/fake_connection_factory.h"
+#include "google_apis/gcm/engine/fake_connection_handler.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace gcm {
+
+namespace {
+
+const uint64 kAndroidId = 54321;
+const uint64 kSecurityToken = 12345;
+
+// Number of messages to send when testing batching.
+// Note: must be even for tests that split batches in half.
+const int kMessageBatchSize = 6;
+
+// The number of unacked messages the client will receive before sending a
+// stream ack.
+// TODO(zea): get this (and other constants) directly from the mcs client.
+const int kAckLimitSize = 10;
+
+// Helper for building arbitrary data messages.
+MCSMessage BuildDataMessage(const std::string& from,
+ const std::string& category,
+ int last_stream_id_received,
+ const std::string persistent_id) {
+ mcs_proto::DataMessageStanza data_message;
+ data_message.set_from(from);
+ data_message.set_category(category);
+ data_message.set_last_stream_id_received(last_stream_id_received);
+ if (!persistent_id.empty())
+ data_message.set_persistent_id(persistent_id);
+ return MCSMessage(kDataMessageStanzaTag, data_message);
+}
+
+// MCSClient with overriden exposed persistent id logic.
+class TestMCSClient : public MCSClient {
+ public:
+ TestMCSClient(const base::FilePath& rmq_path,
+ ConnectionFactory* connection_factory,
+ scoped_refptr<base::SequencedTaskRunner> blocking_task_runner)
+ : MCSClient(rmq_path, connection_factory, blocking_task_runner),
+ next_id_(0) {
+ }
+
+ virtual std::string GetNextPersistentId() OVERRIDE {
+ return base::UintToString(++next_id_);
+ }
+
+ private:
+ uint32 next_id_;
+};
+
+class MCSClientTest : public testing::Test {
+ public:
+ MCSClientTest();
+ virtual ~MCSClientTest();
+
+ void BuildMCSClient();
+ void InitializeClient();
+ void LoginClient(const std::vector<std::string>& acknowledged_ids);
+
+ TestMCSClient* mcs_client() const { return mcs_client_.get(); }
+ FakeConnectionFactory* connection_factory() {
+ return &connection_factory_;
+ }
+ bool init_success() const { return init_success_; }
+ uint64 restored_android_id() const { return restored_android_id_; }
+ uint64 restored_security_token() const { return restored_security_token_; }
+ MCSMessage* received_message() const { return received_message_.get(); }
+ std::string sent_message_id() const { return sent_message_id_;}
+
+ FakeConnectionHandler* GetFakeHandler() const;
+
+ void WaitForMCSEvent();
+ void PumpLoop();
+
+ private:
+ void InitializationCallback(bool success,
+ uint64 restored_android_id,
+ uint64 restored_security_token);
+ void MessageReceivedCallback(const MCSMessage& message);
+ void MessageSentCallback(const std::string& message_id);
+
+ base::ScopedTempDir temp_directory_;
+ base::MessageLoop message_loop_;
+ scoped_ptr<base::RunLoop> run_loop_;
+
+ FakeConnectionFactory connection_factory_;
+ scoped_ptr<TestMCSClient> mcs_client_;
+ bool init_success_;
+ uint64 restored_android_id_;
+ uint64 restored_security_token_;
+ scoped_ptr<MCSMessage> received_message_;
+ std::string sent_message_id_;
+};
+
+MCSClientTest::MCSClientTest()
+ : run_loop_(new base::RunLoop()),
+ init_success_(false),
+ restored_android_id_(0),
+ restored_security_token_(0) {
+ EXPECT_TRUE(temp_directory_.CreateUniqueTempDir());
+ run_loop_.reset(new base::RunLoop());
+
+ // On OSX, prevent the Keychain permissions popup during unit tests.
+#if defined(OS_MACOSX)
+ Encryptor::UseMockKeychain(true);
+#endif
+}
+
+MCSClientTest::~MCSClientTest() {}
+
+void MCSClientTest::BuildMCSClient() {
+ mcs_client_.reset(
+ new TestMCSClient(temp_directory_.path(),
+ &connection_factory_,
+ message_loop_.message_loop_proxy()));
+}
+
+void MCSClientTest::InitializeClient() {
+ mcs_client_->Initialize(base::Bind(&MCSClientTest::InitializationCallback,
+ base::Unretained(this)),
+ base::Bind(&MCSClientTest::MessageReceivedCallback,
+ base::Unretained(this)),
+ base::Bind(&MCSClientTest::MessageSentCallback,
+ base::Unretained(this)));
+ run_loop_->Run();
+ run_loop_.reset(new base::RunLoop());
+}
+
+void MCSClientTest::LoginClient(
+ const std::vector<std::string>& acknowledged_ids) {
+ scoped_ptr<mcs_proto::LoginRequest> login_request =
+ BuildLoginRequest(kAndroidId, kSecurityToken);
+ for (size_t i = 0; i < acknowledged_ids.size(); ++i)
+ login_request->add_received_persistent_id(acknowledged_ids[i]);
+ GetFakeHandler()->ExpectOutgoingMessage(
+ MCSMessage(kLoginRequestTag,
+ login_request.PassAs<const google::protobuf::MessageLite>()));
+ mcs_client_->Login(kAndroidId, kSecurityToken);
+ run_loop_->Run();
+ run_loop_.reset(new base::RunLoop());
+}
+
+FakeConnectionHandler* MCSClientTest::GetFakeHandler() const {
+ return reinterpret_cast<FakeConnectionHandler*>(
+ connection_factory_.GetConnectionHandler());
+}
+
+void MCSClientTest::WaitForMCSEvent() {
+ run_loop_->Run();
+ run_loop_.reset(new base::RunLoop());
+}
+
+void MCSClientTest::PumpLoop() {
+ run_loop_->RunUntilIdle();
+ run_loop_.reset(new base::RunLoop());
+}
+
+void MCSClientTest::InitializationCallback(bool success,
+ uint64 restored_android_id,
+ uint64 restored_security_token) {
+ init_success_ = success;
+ restored_android_id_ = restored_android_id;
+ restored_security_token_ = restored_security_token;
+ DVLOG(1) << "Initialization callback invoked, killing loop.";
+ run_loop_->Quit();
+}
+
+void MCSClientTest::MessageReceivedCallback(const MCSMessage& message) {
+ received_message_.reset(new MCSMessage(message));
+ DVLOG(1) << "Message received callback invoked, killing loop.";
+ run_loop_->Quit();
+}
+
+void MCSClientTest::MessageSentCallback(const std::string& message_id) {
+ DVLOG(1) << "Message sent callback invoked, killing loop.";
+ run_loop_->Quit();
+}
+
+// Initialize a new client.
+TEST_F(MCSClientTest, InitializeNew) {
+ BuildMCSClient();
+ InitializeClient();
+ EXPECT_EQ(0U, restored_android_id());
+ EXPECT_EQ(0U, restored_security_token());
+ EXPECT_TRUE(init_success());
+}
+
+// Initialize a new client, shut it down, then restart the client. Should
+// reload the existing device credentials.
+TEST_F(MCSClientTest, InitializeExisting) {
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+
+ // Rebuild the client, to reload from the RMQ.
+ BuildMCSClient();
+ InitializeClient();
+ EXPECT_EQ(kAndroidId, restored_android_id());
+ EXPECT_EQ(kSecurityToken, restored_security_token());
+ EXPECT_TRUE(init_success());
+}
+
+// Log in successfully to the MCS endpoint.
+TEST_F(MCSClientTest, LoginSuccess) {
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+ EXPECT_TRUE(connection_factory()->IsEndpointReachable());
+ EXPECT_TRUE(init_success());
+ ASSERT_TRUE(received_message());
+ EXPECT_EQ(kLoginResponseTag, received_message()->tag());
+}
+
+// Encounter a server error during the login attempt.
+TEST_F(MCSClientTest, FailLogin) {
+ BuildMCSClient();
+ InitializeClient();
+ GetFakeHandler()->set_fail_login(true);
+ LoginClient(std::vector<std::string>());
+ EXPECT_FALSE(connection_factory()->IsEndpointReachable());
+ EXPECT_FALSE(init_success());
+ EXPECT_FALSE(received_message());
+}
+
+// Send a message without RMQ support.
+TEST_F(MCSClientTest, SendMessageNoRMQ) {
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+ MCSMessage message(BuildDataMessage("from", "category", 1, ""));
+ GetFakeHandler()->ExpectOutgoingMessage(message);
+ mcs_client()->SendMessage(message, false);
+ EXPECT_TRUE(GetFakeHandler()->
+ AllOutgoingMessagesReceived());
+}
+
+// Send a message with RMQ support.
+TEST_F(MCSClientTest, SendMessageRMQ) {
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+ MCSMessage message(BuildDataMessage("from", "category", 1, "1"));
+ GetFakeHandler()->ExpectOutgoingMessage(message);
+ mcs_client()->SendMessage(message, true);
+ EXPECT_TRUE(GetFakeHandler()->
+ AllOutgoingMessagesReceived());
+}
+
+// Send a message with RMQ support while disconnected. On reconnect, the message
+// should be resent.
+TEST_F(MCSClientTest, SendMessageRMQWhileDisconnected) {
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+ GetFakeHandler()->set_fail_send(true);
+ MCSMessage message(BuildDataMessage("from", "category", 1, "1"));
+
+ // The initial (failed) send.
+ GetFakeHandler()->ExpectOutgoingMessage(message);
+ // The login request.
+ GetFakeHandler()->ExpectOutgoingMessage(
+ MCSMessage(kLoginRequestTag,
+ BuildLoginRequest(kAndroidId, kSecurityToken).
+ PassAs<const google::protobuf::MessageLite>()));
+ // The second (re)send.
+ GetFakeHandler()->ExpectOutgoingMessage(message);
+ mcs_client()->SendMessage(message, true);
+ EXPECT_FALSE(GetFakeHandler()->
+ AllOutgoingMessagesReceived());
+ GetFakeHandler()->set_fail_send(false);
+ connection_factory()->Connect();
+ WaitForMCSEvent(); // Wait for the login to finish.
+ PumpLoop(); // Wait for the send to happen.
+ EXPECT_TRUE(GetFakeHandler()->
+ AllOutgoingMessagesReceived());
+}
+
+// Send a message with RMQ support without receiving an acknowledgement. On
+// restart the message should be resent.
+TEST_F(MCSClientTest, SendMessageRMQOnRestart) {
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+ GetFakeHandler()->set_fail_send(true);
+ MCSMessage message(BuildDataMessage("from", "category", 1, "1"));
+
+ // The initial (failed) send.
+ GetFakeHandler()->ExpectOutgoingMessage(message);
+ GetFakeHandler()->set_fail_send(false);
+ mcs_client()->SendMessage(message, true);
+ EXPECT_TRUE(GetFakeHandler()->
+ AllOutgoingMessagesReceived());
+
+ // Rebuild the client, which should resend the old message.
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+ GetFakeHandler()->ExpectOutgoingMessage(message);
+ PumpLoop();
+ EXPECT_TRUE(GetFakeHandler()->
+ AllOutgoingMessagesReceived());
+}
+
+// Send messages with RMQ support, followed by receiving a stream ack. On
+// restart nothing should be recent.
+TEST_F(MCSClientTest, SendMessageRMQWithStreamAck) {
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+
+ // Send some messages.
+ for (int i = 1; i <= kMessageBatchSize; ++i) {
+ MCSMessage message(
+ BuildDataMessage("from", "category", 1, base::IntToString(i)));
+ GetFakeHandler()->ExpectOutgoingMessage(message);
+ mcs_client()->SendMessage(message, true);
+ }
+ EXPECT_TRUE(GetFakeHandler()->
+ AllOutgoingMessagesReceived());
+
+ // Receive the ack.
+ scoped_ptr<mcs_proto::IqStanza> ack = BuildStreamAck();
+ ack->set_last_stream_id_received(kMessageBatchSize + 1);
+ GetFakeHandler()->ReceiveMessage(
+ MCSMessage(kIqStanzaTag,
+ ack.PassAs<const google::protobuf::MessageLite>()));
+ WaitForMCSEvent();
+
+ // Reconnect and ensure no messages are resent.
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+ PumpLoop();
+}
+
+// Send messages with RMQ support. On restart, receive a SelectiveAck with
+// the login response. No messages should be resent.
+TEST_F(MCSClientTest, SendMessageRMQAckOnReconnect) {
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+
+ // Send some messages.
+ std::vector<std::string> id_list;
+ for (int i = 1; i <= kMessageBatchSize; ++i) {
+ id_list.push_back(base::IntToString(i));
+ MCSMessage message(
+ BuildDataMessage("from", "category", 1, id_list.back()));
+ GetFakeHandler()->ExpectOutgoingMessage(message);
+ mcs_client()->SendMessage(message, true);
+ }
+ EXPECT_TRUE(GetFakeHandler()->
+ AllOutgoingMessagesReceived());
+
+ // Rebuild the client, and receive an acknowledgment for the messages as
+ // part of the login response.
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+ scoped_ptr<mcs_proto::IqStanza> ack(BuildSelectiveAck(id_list));
+ GetFakeHandler()->ReceiveMessage(
+ MCSMessage(kIqStanzaTag,
+ ack.PassAs<const google::protobuf::MessageLite>()));
+ WaitForMCSEvent();
+ EXPECT_TRUE(GetFakeHandler()->
+ AllOutgoingMessagesReceived());
+}
+
+// Send messages with RMQ support. On restart, receive a SelectiveAck with
+// the login response that only acks some messages. The unacked messages should
+// be resent.
+TEST_F(MCSClientTest, SendMessageRMQPartialAckOnReconnect) {
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+
+ // Send some messages.
+ std::vector<std::string> id_list;
+ for (int i = 1; i <= kMessageBatchSize; ++i) {
+ id_list.push_back(base::IntToString(i));
+ MCSMessage message(
+ BuildDataMessage("from", "category", 1, id_list.back()));
+ GetFakeHandler()->ExpectOutgoingMessage(message);
+ mcs_client()->SendMessage(message, true);
+ }
+ EXPECT_TRUE(GetFakeHandler()->
+ AllOutgoingMessagesReceived());
+
+ // Rebuild the client, and receive an acknowledgment for the messages as
+ // part of the login response.
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+
+ std::vector<std::string> acked_ids, remaining_ids;
+ acked_ids.insert(acked_ids.end(),
+ id_list.begin(),
+ id_list.begin() + kMessageBatchSize / 2);
+ remaining_ids.insert(remaining_ids.end(),
+ id_list.begin() + kMessageBatchSize / 2,
+ id_list.end());
+ for (int i = 1; i <= kMessageBatchSize / 2; ++i) {
+ MCSMessage message(
+ BuildDataMessage("from",
+ "category",
+ 2,
+ remaining_ids[i - 1]));
+ GetFakeHandler()->ExpectOutgoingMessage(message);
+ }
+ scoped_ptr<mcs_proto::IqStanza> ack(BuildSelectiveAck(acked_ids));
+ GetFakeHandler()->ReceiveMessage(
+ MCSMessage(kIqStanzaTag,
+ ack.PassAs<const google::protobuf::MessageLite>()));
+ WaitForMCSEvent();
+ EXPECT_TRUE(GetFakeHandler()->
+ AllOutgoingMessagesReceived());
+}
+
+// Receive some messages. On restart, the login request should contain the
+// appropriate acknowledged ids.
+TEST_F(MCSClientTest, AckOnLogin) {
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+
+ // Receive some messages.
+ std::vector<std::string> id_list;
+ for (int i = 1; i <= kMessageBatchSize; ++i) {
+ id_list.push_back(base::IntToString(i));
+ MCSMessage message(
+ BuildDataMessage("from", "category", i, id_list.back()));
+ GetFakeHandler()->ReceiveMessage(message);
+ WaitForMCSEvent();
+ PumpLoop();
+ }
+
+ // Restart the client.
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(id_list);
+}
+
+// Receive some messages. On the next send, the outgoing message should contain
+// the appropriate last stream id received field to ack the received messages.
+TEST_F(MCSClientTest, AckOnSend) {
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+
+ // Receive some messages.
+ std::vector<std::string> id_list;
+ for (int i = 1; i <= kMessageBatchSize; ++i) {
+ id_list.push_back(base::IntToString(i));
+ MCSMessage message(
+ BuildDataMessage("from", "category", i, id_list.back()));
+ GetFakeHandler()->ReceiveMessage(message);
+ WaitForMCSEvent();
+ PumpLoop();
+ }
+
+ // Trigger a message send, which should acknowledge via stream ack.
+ MCSMessage message(
+ BuildDataMessage("from", "category", kMessageBatchSize + 1, "1"));
+ GetFakeHandler()->ExpectOutgoingMessage(message);
+ mcs_client()->SendMessage(message, true);
+ EXPECT_TRUE(GetFakeHandler()->
+ AllOutgoingMessagesReceived());
+}
+
+// Receive the ack limit in messages, which should trigger an automatic
+// stream ack. Receive a heartbeat to confirm the ack.
+TEST_F(MCSClientTest, AckWhenLimitReachedWithHeartbeat) {
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+
+ // The stream ack.
+ scoped_ptr<mcs_proto::IqStanza> ack = BuildStreamAck();
+ ack->set_last_stream_id_received(kAckLimitSize + 1);
+ GetFakeHandler()->ExpectOutgoingMessage(
+ MCSMessage(kIqStanzaTag,
+ ack.PassAs<const google::protobuf::MessageLite>()));
+
+ // Receive some messages.
+ std::vector<std::string> id_list;
+ for (int i = 1; i <= kAckLimitSize; ++i) {
+ id_list.push_back(base::IntToString(i));
+ MCSMessage message(
+ BuildDataMessage("from", "category", i, id_list.back()));
+ GetFakeHandler()->ReceiveMessage(message);
+ WaitForMCSEvent();
+ PumpLoop();
+ }
+ EXPECT_TRUE(GetFakeHandler()->
+ AllOutgoingMessagesReceived());
+
+ // Receive a heartbeat confirming the ack (and receive the heartbeat ack).
+ scoped_ptr<mcs_proto::HeartbeatPing> heartbeat(
+ new mcs_proto::HeartbeatPing());
+ heartbeat->set_last_stream_id_received(2);
+
+ scoped_ptr<mcs_proto::HeartbeatAck> heartbeat_ack(
+ new mcs_proto::HeartbeatAck());
+ heartbeat_ack->set_last_stream_id_received(kAckLimitSize + 2);
+ GetFakeHandler()->ExpectOutgoingMessage(
+ MCSMessage(kHeartbeatAckTag,
+ heartbeat_ack.PassAs<const google::protobuf::MessageLite>()));
+
+ GetFakeHandler()->ReceiveMessage(
+ MCSMessage(kHeartbeatPingTag,
+ heartbeat.PassAs<const google::protobuf::MessageLite>()));
+ WaitForMCSEvent();
+ EXPECT_TRUE(GetFakeHandler()->
+ AllOutgoingMessagesReceived());
+
+ // Rebuild the client. Nothing should be sent on login.
+ BuildMCSClient();
+ InitializeClient();
+ LoginClient(std::vector<std::string>());
+ EXPECT_TRUE(GetFakeHandler()->
+ AllOutgoingMessagesReceived());
+}
+
+} // namespace
+
+} // namespace gcm
diff --git a/google_apis/gcm/gcm.gyp b/google_apis/gcm/gcm.gyp
index 833f2c8..f81c4ef 100644
--- a/google_apis/gcm/gcm.gyp
+++ b/google_apis/gcm/gcm.gyp
@@ -52,6 +52,8 @@
'engine/connection_handler.cc',
'engine/connection_handler_impl.h',
'engine/connection_handler_impl.cc',
+ 'engine/mcs_client.h',
+ 'engine/mcs_client.cc',
'engine/rmq_store.h',
'engine/rmq_store.cc',
'gcm_client.cc',
@@ -65,6 +67,26 @@
],
},
+ # A standalone MCS (mobile connection server) client.
+ {
+ 'target_name': 'mcs_probe',
+ 'type': 'executable',
+ 'variables': { 'enable_wexit_time_destructors': 1, },
+ 'include_dirs': [
+ '../..',
+ ],
+ 'dependencies': [
+ '../../base/base.gyp:base',
+ '../../net/net.gyp:net',
+ '../../net/net.gyp:net_test_support',
+ '../../third_party/protobuf/protobuf.gyp:protobuf_lite',
+ 'gcm'
+ ],
+ 'sources': [
+ 'tools/mcs_probe.cc',
+ ],
+ },
+
# The main GCM unit tests.
{
'target_name': 'gcm_unit_tests',
@@ -73,20 +95,30 @@
'include_dirs': [
'../..',
],
+ 'export_dependent_settings': [
+ '../../third_party/protobuf/protobuf.gyp:protobuf_lite'
+ ],
'dependencies': [
'../../base/base.gyp:run_all_unittests',
'../../base/base.gyp:base',
'../../components/components.gyp:encryptor',
+ '../../net/net.gyp:net',
'../../net/net.gyp:net_test_support',
'../../testing/gtest.gyp:gtest',
'../../third_party/protobuf/protobuf.gyp:protobuf_lite',
'gcm'
],
'sources': [
+ 'base/mcs_message_unittest.cc',
'base/mcs_util_unittest.cc',
'base/socket_stream_unittest.cc',
'engine/connection_factory_impl_unittest.cc',
'engine/connection_handler_impl_unittest.cc',
+ 'engine/fake_connection_factory.h',
+ 'engine/fake_connection_factory.cc',
+ 'engine/fake_connection_handler.h',
+ 'engine/fake_connection_handler.cc',
+ 'engine/mcs_client_unittest.cc',
'engine/rmq_store_unittest.cc',
]
},
diff --git a/google_apis/gcm/tools/mcs_probe.cc b/google_apis/gcm/tools/mcs_probe.cc
new file mode 100644
index 0000000..bc4ad7c
--- /dev/null
+++ b/google_apis/gcm/tools/mcs_probe.cc
@@ -0,0 +1,372 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// A standalone tool for testing MCS connections and the MCS client on their
+// own.
+
+#include <cstddef>
+#include <cstdio>
+#include <string>
+
+#include "base/at_exit.h"
+#include "base/command_line.h"
+#include "base/compiler_specific.h"
+#include "base/logging.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/message_loop/message_loop.h"
+#include "base/run_loop.h"
+#include "base/strings/string_number_conversions.h"
+#include "base/threading/thread.h"
+#include "base/threading/worker_pool.h"
+#include "base/values.h"
+#include "google_apis/gcm/base/mcs_message.h"
+#include "google_apis/gcm/base/mcs_util.h"
+#include "google_apis/gcm/engine/connection_factory_impl.h"
+#include "google_apis/gcm/engine/mcs_client.h"
+#include "net/base/host_mapping_rules.h"
+#include "net/base/net_log_logger.h"
+#include "net/cert/cert_verifier.h"
+#include "net/dns/host_resolver.h"
+#include "net/http/http_auth_handler_factory.h"
+#include "net/http/http_network_session.h"
+#include "net/http/http_server_properties_impl.h"
+#include "net/http/transport_security_state.h"
+#include "net/socket/client_socket_factory.h"
+#include "net/socket/ssl_client_socket.h"
+#include "net/ssl/default_server_bound_cert_store.h"
+#include "net/ssl/server_bound_cert_service.h"
+#include "net/url_request/url_request_test_util.h"
+
+#if defined(OS_MACOSX)
+#include "base/mac/scoped_nsautorelease_pool.h"
+#endif
+
+// This is a simple utility that initializes an mcs client and
+// prints out any events.
+namespace gcm {
+namespace {
+
+// The default server to communicate with.
+const char kMCSServerHost[] = "mtalk.google.com";
+const uint16 kMCSServerPort = 5228;
+
+// Command line switches.
+const char kRMQFileName[] = "rmq_file";
+const char kAndroidIdSwitch[] = "android_id";
+const char kSecretSwitch[] = "secret";
+const char kLogFileSwitch[] = "log-file";
+const char kIgnoreCertSwitch[] = "ignore-certs";
+const char kServerHostSwitch[] = "host";
+const char kServerPortSwitch[] = "port";
+
+void MessageReceivedCallback(const MCSMessage& message) {
+ LOG(INFO) << "Received message with id "
+ << GetPersistentId(message.GetProtobuf()) << " and tag "
+ << static_cast<int>(message.tag());
+
+ if (message.tag() == kDataMessageStanzaTag) {
+ const mcs_proto::DataMessageStanza& data_message =
+ reinterpret_cast<const mcs_proto::DataMessageStanza&>(
+ message.GetProtobuf());
+ DVLOG(1) << " to: " << data_message.to();
+ DVLOG(1) << " from: " << data_message.from();
+ DVLOG(1) << " category: " << data_message.category();
+ DVLOG(1) << " sent: " << data_message.sent();
+ for (int i = 0; i < data_message.app_data_size(); ++i) {
+ DVLOG(1) << " App data " << i << " "
+ << data_message.app_data(i).key() << " : "
+ << data_message.app_data(i).value();
+ }
+ }
+}
+
+void MessageSentCallback(const std::string& local_id) {
+ LOG(INFO) << "Message sent. Status: " << local_id;
+}
+
+// Needed to use a real host resolver.
+class MyTestURLRequestContext : public net::TestURLRequestContext {
+ public:
+ MyTestURLRequestContext() : TestURLRequestContext(true) {
+ context_storage_.set_host_resolver(
+ net::HostResolver::CreateDefaultResolver(NULL));
+ context_storage_.set_transport_security_state(
+ new net::TransportSecurityState());
+ Init();
+ }
+
+ virtual ~MyTestURLRequestContext() {}
+};
+
+class MyTestURLRequestContextGetter : public net::TestURLRequestContextGetter {
+ public:
+ explicit MyTestURLRequestContextGetter(
+ const scoped_refptr<base::MessageLoopProxy>& io_message_loop_proxy)
+ : TestURLRequestContextGetter(io_message_loop_proxy) {}
+
+ virtual net::TestURLRequestContext* GetURLRequestContext() OVERRIDE {
+ // Construct |context_| lazily so it gets constructed on the right
+ // thread (the IO thread).
+ if (!context_)
+ context_.reset(new MyTestURLRequestContext());
+ return context_.get();
+ }
+
+ private:
+ virtual ~MyTestURLRequestContextGetter() {}
+
+ scoped_ptr<MyTestURLRequestContext> context_;
+};
+
+// A net log that logs all events by default.
+class MyTestNetLog : public net::NetLog {
+ public:
+ MyTestNetLog() {
+ SetBaseLogLevel(LOG_ALL);
+ }
+ virtual ~MyTestNetLog() {}
+};
+
+// A cert verifier that access all certificates.
+class MyTestCertVerifier : public net::CertVerifier {
+ public:
+ MyTestCertVerifier() {}
+ virtual ~MyTestCertVerifier() {}
+
+ virtual int Verify(net::X509Certificate* cert,
+ const std::string& hostname,
+ int flags,
+ net::CRLSet* crl_set,
+ net::CertVerifyResult* verify_result,
+ const net::CompletionCallback& callback,
+ RequestHandle* out_req,
+ const net::BoundNetLog& net_log) OVERRIDE {
+ return net::OK;
+ }
+
+ virtual void CancelRequest(RequestHandle req) OVERRIDE {
+ // Do nothing.
+ }
+};
+
+class MCSProbe {
+ public:
+ MCSProbe(
+ const CommandLine& command_line,
+ scoped_refptr<net::URLRequestContextGetter> url_request_context_getter);
+ ~MCSProbe();
+
+ void Start();
+
+ uint64 android_id() const { return android_id_; }
+ uint64 secret() const { return secret_; }
+
+ private:
+ void InitializeNetworkState();
+ void BuildNetworkSession();
+
+ void InitializationCallback(bool success,
+ uint64 restored_android_id,
+ uint64 restored_security_token);
+
+ CommandLine command_line_;
+
+ base::FilePath rmq_path_;
+ uint64 android_id_;
+ uint64 secret_;
+ std::string server_host_;
+ int server_port_;
+
+ // Network state.
+ scoped_refptr<net::URLRequestContextGetter> url_request_context_getter_;
+ MyTestNetLog net_log_;
+ scoped_ptr<net::NetLogLogger> logger_;
+ scoped_ptr<base::Value> net_constants_;
+ scoped_ptr<net::HostResolver> host_resolver_;
+ scoped_ptr<net::CertVerifier> cert_verifier_;
+ scoped_ptr<net::ServerBoundCertService> system_server_bound_cert_service_;
+ scoped_ptr<net::TransportSecurityState> transport_security_state_;
+ scoped_ptr<net::URLSecurityManager> url_security_manager_;
+ scoped_ptr<net::HttpAuthHandlerFactory> http_auth_handler_factory_;
+ scoped_ptr<net::HttpServerPropertiesImpl> http_server_properties_;
+ scoped_ptr<net::HostMappingRules> host_mapping_rules_;
+ scoped_refptr<net::HttpNetworkSession> network_session_;
+ scoped_ptr<net::ProxyService> proxy_service_;
+
+ scoped_ptr<MCSClient> mcs_client_;
+
+ scoped_ptr<ConnectionFactoryImpl> connection_factory_;
+
+ base::Thread file_thread_;
+
+ scoped_ptr<base::RunLoop> run_loop_;
+};
+
+MCSProbe::MCSProbe(
+ const CommandLine& command_line,
+ scoped_refptr<net::URLRequestContextGetter> url_request_context_getter)
+ : command_line_(command_line),
+ rmq_path_(base::FilePath(FILE_PATH_LITERAL("gcm_rmq_store"))),
+ android_id_(0),
+ secret_(0),
+ server_port_(0),
+ url_request_context_getter_(url_request_context_getter),
+ file_thread_("FileThread") {
+ if (command_line.HasSwitch(kRMQFileName)) {
+ rmq_path_ = command_line.GetSwitchValuePath(kRMQFileName);
+ }
+ if (command_line.HasSwitch(kAndroidIdSwitch)) {
+ base::StringToUint64(command_line.GetSwitchValueASCII(kAndroidIdSwitch),
+ &android_id_);
+ }
+ if (command_line.HasSwitch(kSecretSwitch)) {
+ base::StringToUint64(command_line.GetSwitchValueASCII(kSecretSwitch),
+ &secret_);
+ }
+ server_host_ = kMCSServerHost;
+ if (command_line.HasSwitch(kServerHostSwitch)) {
+ server_host_ = command_line.GetSwitchValueASCII(kServerHostSwitch);
+ }
+ server_port_ = kMCSServerPort;
+ if (command_line.HasSwitch(kServerPortSwitch)) {
+ base::StringToInt(command_line.GetSwitchValueASCII(kServerPortSwitch),
+ &server_port_);
+ }
+}
+
+MCSProbe::~MCSProbe() {
+ file_thread_.Stop();
+}
+
+void MCSProbe::Start() {
+ file_thread_.Start();
+ InitializeNetworkState();
+ BuildNetworkSession();
+ connection_factory_.reset(
+ new ConnectionFactoryImpl(GURL("https://" + net::HostPortPair(
+ server_host_, server_port_).ToString()),
+ network_session_,
+ &net_log_));
+ mcs_client_.reset(new MCSClient(rmq_path_,
+ connection_factory_.get(),
+ file_thread_.message_loop_proxy()));
+ run_loop_.reset(new base::RunLoop());
+ mcs_client_->Initialize(base::Bind(&MCSProbe::InitializationCallback,
+ base::Unretained(this)),
+ base::Bind(&MessageReceivedCallback),
+ base::Bind(&MessageSentCallback));
+ run_loop_->Run();
+}
+
+void MCSProbe::InitializeNetworkState() {
+ FILE* log_file = NULL;
+ if (command_line_.HasSwitch(kLogFileSwitch)) {
+ base::FilePath log_path = command_line_.GetSwitchValuePath(kLogFileSwitch);
+#if defined(OS_WIN)
+ log_file = _wfopen(log_path.value().c_str(), L"w");
+#elif defined(OS_POSIX)
+ log_file = fopen(log_path.value().c_str(), "w");
+#endif
+ }
+ net_constants_.reset(net::NetLogLogger::GetConstants());
+ if (log_file != NULL) {
+ logger_.reset(new net::NetLogLogger(log_file, *net_constants_));
+ logger_->StartObserving(&net_log_);
+ }
+
+ host_resolver_ = net::HostResolver::CreateDefaultResolver(&net_log_);
+
+ if (command_line_.HasSwitch(kIgnoreCertSwitch)) {
+ cert_verifier_.reset(new MyTestCertVerifier());
+ } else {
+ cert_verifier_.reset(net::CertVerifier::CreateDefault());
+ }
+ system_server_bound_cert_service_.reset(
+ new net::ServerBoundCertService(
+ new net::DefaultServerBoundCertStore(NULL),
+ base::WorkerPool::GetTaskRunner(true)));
+
+ transport_security_state_.reset(new net::TransportSecurityState());
+ url_security_manager_.reset(net::URLSecurityManager::Create(NULL, NULL));
+ http_auth_handler_factory_.reset(
+ net::HttpAuthHandlerRegistryFactory::Create(
+ std::vector<std::string>(1, "basic"),
+ url_security_manager_.get(),
+ host_resolver_.get(),
+ std::string(),
+ false,
+ false));
+ http_server_properties_.reset(new net::HttpServerPropertiesImpl());
+ host_mapping_rules_.reset(new net::HostMappingRules());
+ proxy_service_.reset(net::ProxyService::CreateDirectWithNetLog(&net_log_));
+}
+
+void MCSProbe::BuildNetworkSession() {
+ net::HttpNetworkSession::Params session_params;
+ session_params.host_resolver = host_resolver_.get();
+ session_params.cert_verifier = cert_verifier_.get();
+ session_params.server_bound_cert_service =
+ system_server_bound_cert_service_.get();
+ session_params.transport_security_state = transport_security_state_.get();
+ session_params.ssl_config_service = new net::SSLConfigServiceDefaults();
+ session_params.http_auth_handler_factory = http_auth_handler_factory_.get();
+ session_params.http_server_properties =
+ http_server_properties_->GetWeakPtr();
+ session_params.network_delegate = NULL; // TODO(zea): implement?
+ session_params.host_mapping_rules = host_mapping_rules_.get();
+ session_params.ignore_certificate_errors = true;
+ session_params.http_pipelining_enabled = false;
+ session_params.testing_fixed_http_port = 0;
+ session_params.testing_fixed_https_port = 0;
+ session_params.net_log = &net_log_;
+ session_params.proxy_service = proxy_service_.get();
+
+ network_session_ = new net::HttpNetworkSession(session_params);
+}
+
+void MCSProbe::InitializationCallback(bool success,
+ uint64 restored_android_id,
+ uint64 restored_security_token) {
+ LOG(INFO) << "Initialization " << (success ? "success!" : "failure!");
+ if (restored_android_id && restored_security_token) {
+ android_id_ = restored_android_id;
+ secret_ = restored_security_token;
+ }
+ if (success)
+ mcs_client_->Login(android_id_, secret_);
+}
+
+int MCSProbeMain(int argc, char* argv[]) {
+ base::AtExitManager exit_manager;
+
+ CommandLine::Init(argc, argv);
+ logging::LoggingSettings settings;
+ settings.logging_dest = logging::LOG_TO_SYSTEM_DEBUG_LOG;
+ logging::InitLogging(settings);
+
+ base::MessageLoopForIO message_loop;
+
+ // For check-in and creating registration ids.
+ const scoped_refptr<MyTestURLRequestContextGetter> context_getter =
+ new MyTestURLRequestContextGetter(
+ base::MessageLoop::current()->message_loop_proxy());
+
+ const CommandLine& command_line = *CommandLine::ForCurrentProcess();
+
+ MCSProbe mcs_probe(command_line, context_getter);
+ mcs_probe.Start();
+
+ base::RunLoop run_loop;
+ run_loop.Run();
+
+ return 0;
+}
+
+} // namespace
+} // namespace gcm
+
+int main(int argc, char* argv[]) {
+ return gcm::MCSProbeMain(argc, argv);
+}