summaryrefslogtreecommitdiffstats
path: root/components/copresence
diff options
context:
space:
mode:
authorckehoe <ckehoe@chromium.org>2014-11-06 17:46:28 -0800
committerCommit bot <commit-bot@chromium.org>2014-11-07 01:46:57 +0000
commit38eadce9b59abe213e1ff1b830ad45d382114a23 (patch)
treeb7181ffb2220775dfbe013bda9e93880ea4838f0 /components/copresence
parentacd5661350286666f1ca0d513bef340a743ca651 (diff)
downloadchromium_src-38eadce9b59abe213e1ff1b830ad45d382114a23.zip
chromium_src-38eadce9b59abe213e1ff1b830ad45d382114a23.tar.gz
chromium_src-38eadce9b59abe213e1ff1b830ad45d382114a23.tar.bz2
Moving request queuing to the RpcHandler (now with tests!). Also passing in the auth token with ExecuteReportRequest rather than through the delegate.
Review URL: https://codereview.chromium.org/705663005 Cr-Commit-Position: refs/heads/master@{#303159}
Diffstat (limited to 'components/copresence')
-rw-r--r--components/copresence/copresence_manager_impl.cc131
-rw-r--r--components/copresence/copresence_manager_impl.h35
-rw-r--r--components/copresence/public/copresence_constants.h6
-rw-r--r--components/copresence/public/copresence_delegate.h1
-rw-r--r--components/copresence/public/copresence_manager.h8
-rw-r--r--components/copresence/rpc/rpc_handler.cc222
-rw-r--r--components/copresence/rpc/rpc_handler.h50
-rw-r--r--components/copresence/rpc/rpc_handler_unittest.cc171
-rw-r--r--components/copresence/test/stub_whispernet_client.cc9
-rw-r--r--components/copresence/test/stub_whispernet_client.h19
10 files changed, 370 insertions, 282 deletions
diff --git a/components/copresence/copresence_manager_impl.cc b/components/copresence/copresence_manager_impl.cc
index 5ad6773..265b042 100644
--- a/components/copresence/copresence_manager_impl.cc
+++ b/components/copresence/copresence_manager_impl.cc
@@ -4,6 +4,8 @@
#include "components/copresence/copresence_manager_impl.h"
+#include <vector>
+
#include "base/bind.h"
#include "base/strings/stringprintf.h"
#include "base/timer/timer.h"
@@ -14,8 +16,6 @@
namespace {
-// Number of characters of suffix to log for auth tokens
-const int kTokenSuffix = 5;
const int kPollTimerIntervalMs = 3000; // milliseconds.
const int kAudioCheckIntervalMs = 1000; // milliseconds.
@@ -23,25 +23,25 @@ const int kAudioCheckIntervalMs = 1000; // milliseconds.
namespace copresence {
-PendingRequest::PendingRequest(const ReportRequest& report,
- const std::string& app_id,
- const std::string& auth_token,
- const StatusCallback& callback)
- : report(new ReportRequest(report)),
- app_id(app_id),
- auth_token(auth_token),
- callback(callback) {}
-
-PendingRequest::~PendingRequest() {}
-
-// static
-scoped_ptr<CopresenceManager> CopresenceManager::Create(
- CopresenceDelegate* delegate) {
- return make_scoped_ptr(new CopresenceManagerImpl(delegate));
-}
+// Public functions.
+CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate)
+ : delegate_(delegate),
+ whispernet_init_callback_(base::Bind(
+ &CopresenceManagerImpl::WhispernetInitComplete,
+ // This callback gets cancelled when we are destroyed.
+ base::Unretained(this))),
+ directive_handler_(new DirectiveHandler),
+ poll_timer_(new base::RepeatingTimer<CopresenceManagerImpl>),
+ audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>),
+ init_failed_(false) {
+ DCHECK(delegate_);
+ DCHECK(delegate_->GetWhispernetClient());
+ delegate_->GetWhispernetClient()->Initialize(
+ whispernet_init_callback_.callback());
-// Public functions.
+ rpc_handler_.reset(new RpcHandler(delegate_, directive_handler_.get()));
+}
CopresenceManagerImpl::~CopresenceManagerImpl() {
whispernet_init_callback_.Cancel();
@@ -51,6 +51,7 @@ CopresenceManagerImpl::~CopresenceManagerImpl() {
void CopresenceManagerImpl::ExecuteReportRequest(
const ReportRequest& request,
const std::string& app_id,
+ const std::string& auth_token,
const StatusCallback& callback) {
// If initialization has failed, reject all requests.
if (init_failed_) {
@@ -58,67 +59,19 @@ void CopresenceManagerImpl::ExecuteReportRequest(
return;
}
- // Check if we are initialized enough to execute this request.
- // If we haven't seen this auth token yet, we need to register for it.
- // TODO(ckehoe): Queue per device ID instead of globally.
- const std::string& auth_token = delegate_->GetAuthToken();
- if (!rpc_handler_->IsRegisteredForToken(auth_token)) {
- std::string token_str = auth_token.empty() ? "(anonymous)" :
- base::StringPrintf("(token ...%s)",
- auth_token.substr(auth_token.length() - kTokenSuffix,
- kTokenSuffix).c_str());
- rpc_handler_->RegisterForToken(
- auth_token,
- // The manager owns the RpcHandler, so this callback cannot outlive us.
- base::Bind(&CopresenceManagerImpl::InitStepComplete,
- base::Unretained(this),
- "Device registration " + token_str));
- pending_init_operations_++;
- }
-
- // Execute the request if possible, or queue it
- // if initialization is still in progress.
- if (pending_init_operations_) {
- pending_requests_queue_.push_back(
- new PendingRequest(request, app_id, auth_token, callback));
- } else {
- rpc_handler_->SendReportRequest(
- make_scoped_ptr(new ReportRequest(request)),
- app_id,
- auth_token,
- callback);
- }
+ // We'll need to modify the ReportRequest, so we make our own copy to send.
+ scoped_ptr<ReportRequest> request_copy(new ReportRequest(request));
+ rpc_handler_->SendReportRequest(
+ request_copy.Pass(), app_id, auth_token, callback);
}
// Private functions.
-CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate)
- : delegate_(delegate),
- pending_init_operations_(0),
- // This callback gets cancelled when we are destroyed.
- whispernet_init_callback_(
- base::Bind(&CopresenceManagerImpl::InitStepComplete,
- base::Unretained(this),
- "Whispernet proxy initialization")),
- init_failed_(false),
- directive_handler_(new DirectiveHandler),
- poll_timer_(new base::RepeatingTimer<CopresenceManagerImpl>),
- audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>) {
- DCHECK(delegate);
- DCHECK(delegate->GetWhispernetClient());
-
- rpc_handler_.reset(new RpcHandler(delegate, directive_handler_.get()));
- delegate->GetWhispernetClient()->Initialize(
- whispernet_init_callback_.callback());
- pending_init_operations_++;
-}
-
-void CopresenceManagerImpl::CompleteInitialization() {
- if (pending_init_operations_)
- return;
+void CopresenceManagerImpl::WhispernetInitComplete(bool success) {
+ if (success) {
+ DVLOG(3) << "Whispernet initialized successfully.";
- if (!init_failed_) {
// We destroy |directive_handler_| before |rpc_handler_|, hence passing
// in |rpc_handler_|'s pointer is safe here.
directive_handler_->Start(delegate_->GetWhispernetClient(),
@@ -133,36 +86,10 @@ void CopresenceManagerImpl::CompleteInitialization() {
audio_check_timer_->Start(
FROM_HERE, base::TimeDelta::FromMilliseconds(kAudioCheckIntervalMs),
base::Bind(&CopresenceManagerImpl::AudioCheck, base::Unretained(this)));
- }
-
- // Not const because SendReportRequest takes ownership of the ReportRequests.
- // This is ok though, as the entire queue is deleted afterwards.
- for (PendingRequest* request : pending_requests_queue_) {
- if (init_failed_) {
- request->callback.Run(FAIL);
- } else {
- rpc_handler_->SendReportRequest(
- request->report.Pass(),
- request->app_id,
- request->auth_token,
- request->callback);
- }
- }
- pending_requests_queue_.clear();
-}
-
-void CopresenceManagerImpl::InitStepComplete(
- const std::string& step, bool success) {
- if (!success) {
- LOG(ERROR) << step << " failed!";
+ } else {
+ LOG(ERROR) << "Whispernet initialization failed!";
init_failed_ = true;
- // TODO(ckehoe): Retry for registration failures. But maybe not here.
}
-
- DVLOG(3) << step << " complete.";
- DCHECK(pending_init_operations_ > 0);
- pending_init_operations_--;
- CompleteInitialization();
}
void CopresenceManagerImpl::PollForMessages() {
diff --git a/components/copresence/copresence_manager_impl.h b/components/copresence/copresence_manager_impl.h
index 4007ab3..12bc051 100644
--- a/components/copresence/copresence_manager_impl.h
+++ b/components/copresence/copresence_manager_impl.h
@@ -10,7 +10,6 @@
#include "base/cancelable_callback.h"
#include "base/macros.h"
#include "base/memory/scoped_ptr.h"
-#include "base/memory/scoped_vector.h"
#include "components/copresence/public/copresence_manager.h"
namespace base {
@@ -26,19 +25,7 @@ namespace copresence {
class DirectiveHandler;
class ReportRequest;
class RpcHandler;
-
-struct PendingRequest {
- PendingRequest(const ReportRequest& report,
- const std::string& app_id,
- const std::string& auth_token,
- const StatusCallback& callback);
- ~PendingRequest();
-
- scoped_ptr<ReportRequest> report;
- std::string app_id;
- std::string auth_token;
- StatusCallback callback;
-};
+class WhispernetClient;
// The implementation for CopresenceManager. Responsible primarily for
// client-side initialization. The RpcHandler handles all the details
@@ -46,18 +33,18 @@ struct PendingRequest {
// TODO(rkc): Add tests for this class.
class CopresenceManagerImpl : public CopresenceManager {
public:
+ // The delegate is owned by the caller, and must outlive the manager.
+ explicit CopresenceManagerImpl(CopresenceDelegate* delegate);
+
~CopresenceManagerImpl() override;
+
void ExecuteReportRequest(const ReportRequest& request,
const std::string& app_id,
+ const std::string& auth_token,
const StatusCallback& callback) override;
private:
- // Create managers with the CopresenceManager::Create() method.
- friend class CopresenceManager;
- CopresenceManagerImpl(CopresenceDelegate* delegate);
-
- void CompleteInitialization();
- void InitStepComplete(const std::string& step, bool success);
+ void WhispernetInitComplete(bool success);
// This function will be called every kPollTimerIntervalMs milliseconds to
// poll the server for new messages.
@@ -70,11 +57,9 @@ class CopresenceManagerImpl : public CopresenceManager {
// Belongs to the caller.
CopresenceDelegate* const delegate_;
- int pending_init_operations_;
+ // We use a CancelableCallback here because Whispernet
+ // does not provide a way to unregister its init callback.
base::CancelableCallback<void(bool)> whispernet_init_callback_;
- bool init_failed_;
-
- ScopedVector<PendingRequest> pending_requests_queue_;
// The |directive handler_| needs to destruct before |rpc_handler_|, do not
// change this order.
@@ -84,6 +69,8 @@ class CopresenceManagerImpl : public CopresenceManager {
scoped_ptr<base::Timer> poll_timer_;
scoped_ptr<base::Timer> audio_check_timer_;
+ bool init_failed_;
+
DISALLOW_COPY_AND_ASSIGN(CopresenceManagerImpl);
};
diff --git a/components/copresence/public/copresence_constants.h b/components/copresence/public/copresence_constants.h
index 8172e56..68f8a90 100644
--- a/components/copresence/public/copresence_constants.h
+++ b/components/copresence/public/copresence_constants.h
@@ -2,8 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#ifndef COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_CONSTANTS_
-#define COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_CONSTANTS_
+#ifndef COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_CONSTANTS_H_
+#define COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_CONSTANTS_H_
#include <string>
#include <vector>
@@ -74,4 +74,4 @@ using SamplesCallback =
const scoped_refptr<media::AudioBusRefCounted>&)>;
} // namespace copresence
-#endif // COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_CONSTANTS_
+#endif // COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_CONSTANTS_H_
diff --git a/components/copresence/public/copresence_delegate.h b/components/copresence/public/copresence_delegate.h
index 3a7aca6..b8a01bf 100644
--- a/components/copresence/public/copresence_delegate.h
+++ b/components/copresence/public/copresence_delegate.h
@@ -46,7 +46,6 @@ class CopresenceDelegate {
virtual const std::string GetPlatformVersionString() const = 0;
virtual const std::string GetAPIKey(const std::string& app_id) const = 0;
- virtual const std::string GetAuthToken() const = 0;
// Thw WhispernetClient must outlive the CopresenceManager.
virtual WhispernetClient* GetWhispernetClient() = 0;
diff --git a/components/copresence/public/copresence_manager.h b/components/copresence/public/copresence_manager.h
index 1833f23..960c601 100644
--- a/components/copresence/public/copresence_manager.h
+++ b/components/copresence/public/copresence_manager.h
@@ -5,6 +5,8 @@
#ifndef COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_MANAGER_H_
#define COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_MANAGER_H_
+#include <string>
+
#include "base/memory/scoped_ptr.h"
#include "components/copresence/public/copresence_delegate.h"
@@ -28,14 +30,10 @@ class CopresenceManager {
// to relay it to the requester.
virtual void ExecuteReportRequest(const ReportRequest& request,
const std::string& app_id,
+ const std::string& auth_token,
const StatusCallback& callback) = 0;
- // Factory method for CopresenceManagers. The delegate is owned
- // by the caller, and must outlive the manager.
- static scoped_ptr<CopresenceManager> Create(CopresenceDelegate* delegate);
-
private:
-
DISALLOW_COPY_AND_ASSIGN(CopresenceManager);
};
diff --git a/components/copresence/rpc/rpc_handler.cc b/components/copresence/rpc/rpc_handler.cc
index 50aebba..7c751ab 100644
--- a/components/copresence/rpc/rpc_handler.cc
+++ b/components/copresence/rpc/rpc_handler.cc
@@ -4,13 +4,12 @@
#include "components/copresence/rpc/rpc_handler.h"
-#include <map>
-
#include "base/bind.h"
#include "base/command_line.h"
#include "base/guid.h"
#include "base/logging.h"
#include "base/strings/string_util.h"
+#include "base/strings/stringprintf.h"
// TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities
// to DeviceCapabilitiesW. This breaks the pb.h headers below. For now,
@@ -39,6 +38,9 @@ using google::protobuf::RepeatedPtrField;
const char RpcHandler::kReportRequestRpcName[] = "report";
+// Number of characters of suffix to log for auth tokens
+const int kTokenSuffix = 5;
+
namespace {
// UrlSafe is defined as:
@@ -59,7 +61,7 @@ const char kDefaultCopresenceServer[] =
// Logging
// Checks for a copresence error. If there is one, logs it and returns true.
-bool CopresenceErrorLogged(const Status& status) {
+bool IsErrorStatus(const Status& status) {
if (status.code() != OK) {
LOG(ERROR) << "Copresence error code " << status.code()
<< (status.message().empty() ? "" : ": " + status.message());
@@ -76,7 +78,7 @@ void LogIfErrorStatus(const util::error::Code& code,
// If any errors occurred, logs them and returns true.
bool ReportErrorLogged(const ReportResponse& response) {
- bool result = CopresenceErrorLogged(response.header().status());
+ bool result = IsErrorStatus(response.header().status());
// The Report fails or succeeds as a unit. If any responses had errors,
// the header will too. Thus we don't need to propagate individual errors.
@@ -147,19 +149,28 @@ void AddTokenToRequest(const AudioToken& token, ReportRequest* request) {
signals->set_observed_time_millis(base::Time::Now().ToJsTime());
}
+const std::string LoggingStrForToken(const std::string& auth_token) {
+ std::string token_str = auth_token.empty() ? "anonymous" :
+ base::StringPrintf("token ...%s",
+ auth_token.substr(auth_token.length() - kTokenSuffix,
+ kTokenSuffix).c_str());
+ return token_str;
+}
+
} // namespace
-// Public methods
+
+// Public functions.
RpcHandler::RpcHandler(CopresenceDelegate* delegate,
DirectiveHandler* directive_handler,
const PostCallback& server_post_callback)
: delegate_(delegate),
directive_handler_(directive_handler),
+ server_post_callback_(server_post_callback),
invalid_audio_token_cache_(
base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
- kMaxInvalidTokens),
- server_post_callback_(server_post_callback) {
+ kMaxInvalidTokens) {
DCHECK(delegate_);
DCHECK(directive_handler_);
@@ -170,70 +181,35 @@ RpcHandler::RpcHandler(CopresenceDelegate* delegate,
}
RpcHandler::~RpcHandler() {
- // Do not use |directive_handler_| here, it will already have been
- // destructed.
+ // Do not use |directive_handler_| here.
+ // It will already have been destructed.
for (HttpPost* post : pending_posts_)
delete post;
}
-void RpcHandler::RegisterForToken(const std::string& auth_token,
- const SuccessCallback& init_done_callback) {
- if (IsRegisteredForToken(auth_token)) {
- LOG(WARNING) << "Attempted re-registration for the same auth token.";
- init_done_callback.Run(true);
- return;
- }
- scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
-
- request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
-
- DVLOG(2) << "Sending " << (auth_token.empty() ? "anonymous" : "authenticated")
- << " registration to server.";
-
- // Only identify as a Chrome device if we're in anonymous mode.
- // Authenticated calls come from a "GAIA device".
- if (auth_token.empty()) {
- Identity* identity =
- request->mutable_device_identifiers()->mutable_registrant();
- identity->set_type(CHROME);
- identity->set_chrome_id(base::GenerateGUID());
- }
-
- SendServerRequest(
- kRegisterDeviceRpcName,
- std::string(), // device ID
- std::string(), // app ID
- auth_token,
- request.Pass(),
- base::Bind(&RpcHandler::RegisterResponseHandler,
- // On destruction, this request will be cancelled.
- base::Unretained(this),
- init_done_callback,
- auth_token));
-}
-
-bool RpcHandler::IsRegisteredForToken(const std::string& auth_token) const {
- return device_id_by_auth_token_.find(auth_token) !=
- device_id_by_auth_token_.end();
-}
-
-void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
- const std::string& auth_token) {
- SendReportRequest(request.Pass(),
- std::string(),
- auth_token,
- StatusCallback());
-}
-
void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
const std::string& app_id,
const std::string& auth_token,
const StatusCallback& status_callback) {
DCHECK(request.get());
- auto registration_entry = device_id_by_auth_token_.find(auth_token);
- DCHECK(registration_entry != device_id_by_auth_token_.end())
- << "RegisterForToken() must complete successfully "
- << "for new tokens before calling SendReportRequest().";
+
+ // Check that we have a "device" registered for this auth token.
+ bool queue_request = true;
+ const auto& registration = device_id_by_auth_token_.find(auth_token);
+ if (registration == device_id_by_auth_token_.end()) {
+ // Not registered.
+ RegisterForToken(auth_token);
+ } else if (!registration->second.empty()) {
+ // Registration complete.
+ queue_request = false;
+ }
+
+ // We're not registered, or registration is in progress.
+ if (queue_request) {
+ pending_requests_queue_.push_back(new PendingRequest(
+ request.Pass(), app_id, auth_token, status_callback));
+ return;
+ }
DVLOG(3) << "Sending ReportRequest to server.";
@@ -247,7 +223,7 @@ void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
AddPlayingTokens(request.get());
SendServerRequest(kReportRequestRpcName,
- registration_entry->second,
+ registration->second,
app_id,
auth_token,
request.Pass(),
@@ -284,41 +260,121 @@ void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
// Private methods
+RpcHandler::PendingRequest::PendingRequest(scoped_ptr<ReportRequest> report,
+ const std::string& app_id,
+ const std::string& auth_token,
+ const StatusCallback& callback)
+ : report(report.Pass()),
+ app_id(app_id),
+ auth_token(auth_token),
+ callback(callback) {}
+
+RpcHandler::PendingRequest::~PendingRequest() {}
+
+void RpcHandler::RegisterForToken(const std::string& auth_token) {
+ DVLOG(2) << "Sending " << LoggingStrForToken(auth_token)
+ << " registration to server.";
+
+ // Mark registration as in progress.
+ device_id_by_auth_token_[auth_token] = "";
+
+ scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
+ request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
+
+ // Only identify as a Chrome device if we're in anonymous mode.
+ // Authenticated calls come from a "GAIA device".
+ if (auth_token.empty()) {
+ Identity* identity =
+ request->mutable_device_identifiers()->mutable_registrant();
+ identity->set_type(CHROME);
+ identity->set_chrome_id(base::GenerateGUID());
+ }
+
+ SendServerRequest(
+ kRegisterDeviceRpcName,
+ std::string(), // device ID
+ std::string(), // app ID
+ auth_token,
+ request.Pass(),
+ base::Bind(&RpcHandler::RegisterResponseHandler,
+ // On destruction, this request will be cancelled.
+ base::Unretained(this),
+ auth_token));
+}
+
+void RpcHandler::ProcessQueuedRequests(const std::string& auth_token) {
+ // Track requests that are not on this auth token.
+ ScopedVector<PendingRequest> still_pending_requests;
+
+ // If there is no device ID for this auth token, registration failed.
+ bool registration_failed =
+ (device_id_by_auth_token_.count(auth_token) == 0);
+
+ // We momentarily take ownership of all the pointers in the queue.
+ // They are either deleted here or passed on to a new queue.
+ for (PendingRequest* request : pending_requests_queue_) {
+ if (request->auth_token == auth_token) {
+ if (registration_failed) {
+ request->callback.Run(FAIL);
+ } else {
+ SendReportRequest(request->report.Pass(),
+ request->app_id,
+ request->auth_token,
+ request->callback);
+ }
+ delete request;
+ } else {
+ // The request is on a different auth token.
+ still_pending_requests.push_back(request);
+ }
+ }
+
+ // Only keep the requests that weren't processed.
+ // All the pointers in the queue are now spoken for.
+ pending_requests_queue_.weak_clear();
+ pending_requests_queue_ = still_pending_requests.Pass();
+}
+
+void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
+ const std::string& auth_token) {
+ SendReportRequest(request.Pass(),
+ std::string(),
+ auth_token,
+ StatusCallback());
+}
+
void RpcHandler::RegisterResponseHandler(
- const SuccessCallback& init_done_callback,
const std::string& auth_token,
HttpPost* completed_post,
int http_status_code,
const std::string& response_data) {
if (completed_post) {
int elements_erased = pending_posts_.erase(completed_post);
- DCHECK(elements_erased);
+ DCHECK_GT(elements_erased, 0);
delete completed_post;
}
- if (http_status_code != net::HTTP_OK) {
- init_done_callback.Run(false);
- return;
- }
+ // Registration is no longer in progress.
+ // If it was successful, we'll update below.
+ device_id_by_auth_token_.erase(auth_token);
RegisterDeviceResponse response;
- if (!response.ParseFromString(response_data)) {
+ if (http_status_code != net::HTTP_OK) {
+ // TODO(ckehoe): Retry registration if appropriate.
+ LOG(ERROR) << LoggingStrForToken(auth_token)
+ << " device registration failed";
+ } else if (!response.ParseFromString(response_data)) {
LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
- init_done_callback.Run(false);
- return;
- }
-
- if (CopresenceErrorLogged(response.header().status())) {
- init_done_callback.Run(false);
- return;
+ } else if (!IsErrorStatus(response.header().status())) {
+ const std::string& device_id = response.registered_device_id();
+ DCHECK(!device_id.empty());
+ device_id_by_auth_token_[auth_token] = device_id;
+ DVLOG(2) << LoggingStrForToken(auth_token)
+ << " device registration successful. Id: " << device_id;
}
- const std::string& device_id = response.registered_device_id();
- DCHECK(!device_id.empty());
- device_id_by_auth_token_[auth_token] = device_id;
- DVLOG(2) << (auth_token.empty() ? "Anonymous" : "Authenticated")
- << " device registration successful: id " << device_id;
- init_done_callback.Run(true);
+ // Send or fail requests on this auth token.
+ ProcessQueuedRequests(auth_token);
}
void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
diff --git a/components/copresence/rpc/rpc_handler.h b/components/copresence/rpc/rpc_handler.h
index fca80eb..e2e999f 100644
--- a/components/copresence/rpc/rpc_handler.h
+++ b/components/copresence/rpc/rpc_handler.h
@@ -5,12 +5,14 @@
#ifndef COMPONENTS_COPRESENCE_RPC_RPC_HANDLER_H_
#define COMPONENTS_COPRESENCE_RPC_RPC_HANDLER_H_
+#include <map>
#include <set>
#include <string>
#include <vector>
#include "base/callback_forward.h"
#include "base/memory/scoped_ptr.h"
+#include "base/memory/scoped_vector.h"
#include "components/copresence/proto/enums.pb.h"
#include "components/copresence/public/copresence_delegate.h"
#include "components/copresence/timed_map.h"
@@ -60,27 +62,13 @@ class RpcHandler {
// Constructor. |delegate| and |directive_handler|
// are owned by the caller and must outlive the RpcHandler.
+ // |server_post_callback| should be set only by tests.
RpcHandler(CopresenceDelegate* delegate,
DirectiveHandler* directive_handler,
const PostCallback& server_post_callback = PostCallback());
virtual ~RpcHandler();
- // Before accepting any other calls, the server requires registration,
- // which is tied to the auth token (or lack thereof) used to call Report.
- // Clients must call RegisterForToken() for each new token,
- // *including the empty token*, they need to pass to SendReportRequest(),
- // and then wait for |init_done_callback| to be invoked.
- void RegisterForToken(const std::string& auth_token,
- const SuccessCallback& init_done_callback);
-
- // Check if a given auth token is already active (registered).
- bool IsRegisteredForToken(const std::string& auth_token) const;
-
- // Send a ReportRequest from Chrome itself, i.e. no app id.
- void SendReportRequest(scoped_ptr<ReportRequest> request,
- const std::string& auth_token);
-
// Send a ReportRequest from a specific app, and get notified of completion.
void SendReportRequest(scoped_ptr<ReportRequest> request,
const std::string& app_id,
@@ -92,11 +80,35 @@ class RpcHandler {
void ReportTokens(const std::vector<AudioToken>& tokens);
private:
+ // A queued ReportRequest along with its metadata.
+ struct PendingRequest {
+ PendingRequest(scoped_ptr<ReportRequest> report,
+ const std::string& app_id,
+ const std::string& auth_token,
+ const StatusCallback& callback);
+ ~PendingRequest();
+
+ scoped_ptr<ReportRequest> report;
+ const std::string app_id;
+ const std::string auth_token;
+ const StatusCallback callback;
+ };
+
friend class RpcHandlerTest;
+ // Before accepting any other calls, the server requires registration,
+ // which is tied to the auth token (or lack thereof) used to call Report.
+ void RegisterForToken(const std::string& auth_token);
+
+ // Device registration has completed. Send the requests that it was blocking.
+ void ProcessQueuedRequests(const std::string& auth_token);
+
+ // Send a ReportRequest from Chrome itself, i.e. no app id.
+ void SendReportRequest(scoped_ptr<ReportRequest> request,
+ const std::string& auth_token);
+
// Server call response handlers.
- void RegisterResponseHandler(const SuccessCallback& init_done_callback,
- const std::string& auth_token,
+ void RegisterResponseHandler(const std::string& auth_token,
HttpPost* completed_post,
int http_status_code,
const std::string& response_data);
@@ -143,10 +155,10 @@ class RpcHandler {
CopresenceDelegate* delegate_;
DirectiveHandler* directive_handler_;
- TimedMap<std::string, bool> invalid_audio_token_cache_;
-
PostCallback server_post_callback_;
+ ScopedVector<PendingRequest> pending_requests_queue_;
+ TimedMap<std::string, bool> invalid_audio_token_cache_;
std::map<std::string, std::string> device_id_by_auth_token_;
std::set<HttpPost*> pending_posts_;
diff --git a/components/copresence/rpc/rpc_handler_unittest.cc b/components/copresence/rpc/rpc_handler_unittest.cc
index d9b3813..7efe2f6 100644
--- a/components/copresence/rpc/rpc_handler_unittest.cc
+++ b/components/copresence/rpc/rpc_handler_unittest.cc
@@ -52,6 +52,10 @@ class FakeDirectiveHandler final : public DirectiveHandler {
return added_directives_;
}
+ const std::vector<std::string>& removed_directives() const {
+ return removed_directives_;
+ }
+
void Start(WhispernetClient* /* whispernet_client */,
const TokensCallback& /* tokens_cb */) override {
NOTREACHED();
@@ -62,7 +66,7 @@ class FakeDirectiveHandler final : public DirectiveHandler {
}
void RemoveDirectives(const std::string& op_id) override {
- NOTREACHED();
+ removed_directives_.push_back(op_id);
}
const std::string GetCurrentAudioToken(AudioType type) const override {
@@ -73,6 +77,7 @@ class FakeDirectiveHandler final : public DirectiveHandler {
private:
std::vector<std::string> added_directives_;
+ std::vector<std::string> removed_directives_;
DISALLOW_COPY_AND_ASSIGN(FakeDirectiveHandler);
};
@@ -114,28 +119,58 @@ class RpcHandlerTest : public testing::Test, public CopresenceDelegate {
return app_id + " API Key";
}
- const std::string GetAuthToken() const override {
- return auth_token_;
- }
-
WhispernetClient* GetWhispernetClient() override {
return whispernet_client_.get();
}
protected:
- void InvokeReportResponseHandler(int status_code,
- const std::string& response) {
+
+ // Send test input to RpcHandler
+
+ void RegisterForToken(const std::string& auth_token) {
+ rpc_handler_.RegisterForToken(auth_token);
+ }
+
+ void SendRegisterResponse(const std::string& auth_token,
+ const std::string& device_id) {
+ RegisterDeviceResponse response;
+ response.set_registered_device_id(device_id);
+ response.mutable_header()->mutable_status()->set_code(OK);
+
+ std::string serialized_response;
+ response.SerializeToString(&serialized_response);
+ rpc_handler_.RegisterResponseHandler(
+ auth_token, nullptr, net::HTTP_OK, serialized_response);
+ }
+
+ void SendReport(scoped_ptr<ReportRequest> request,
+ const std::string& app_id,
+ const std::string& auth_token) {
+ rpc_handler_.SendReportRequest(
+ request.Pass(), app_id, auth_token, StatusCallback());
+ }
+
+ void SendReportResponse(int status_code,
+ scoped_ptr<ReportResponse> response) {
+ response->mutable_header()->mutable_status()->set_code(OK);
+
+ std::string serialized_response;
+ response->SerializeToString(&serialized_response);
rpc_handler_.ReportResponseHandler(
base::Bind(&RpcHandlerTest::CaptureStatus, base::Unretained(this)),
nullptr,
status_code,
- response);
+ serialized_response);
}
- void SetDeviceIdAndAuthToken(const std::string& device_id,
- const std::string& auth_token) {
- rpc_handler_.device_id_by_auth_token_[auth_token] = device_id;
- auth_token_ = auth_token;
+ // Read and modify RpcHandler state
+
+ const ScopedVector<RpcHandler::PendingRequest>& request_queue() const {
+ return rpc_handler_.pending_requests_queue_;
+ }
+
+ std::map<std::string, std::string>& device_id_by_auth_token() {
+ return rpc_handler_.device_id_by_auth_token_;
}
void AddInvalidToken(const std::string& token) {
@@ -179,9 +214,8 @@ class RpcHandlerTest : public testing::Test, public CopresenceDelegate {
}
};
-TEST_F(RpcHandlerTest, RegisterDevice) {
- EXPECT_FALSE(rpc_handler_.IsRegisteredForToken(""));
- rpc_handler_.RegisterForToken("", RpcHandler::SuccessCallback());
+TEST_F(RpcHandlerTest, RegisterForToken) {
+ RegisterForToken("");
EXPECT_THAT(request_protos_, SizeIs(1));
const RegisterDeviceRequest* registration =
static_cast<RegisterDeviceRequest*>(request_protos_[0]);
@@ -189,22 +223,84 @@ TEST_F(RpcHandlerTest, RegisterDevice) {
EXPECT_EQ(CHROME, identity.type());
EXPECT_FALSE(identity.chrome_id().empty());
- EXPECT_FALSE(rpc_handler_.IsRegisteredForToken("abc"));
- rpc_handler_.RegisterForToken("abc", RpcHandler::SuccessCallback());
+ RegisterForToken("abc");
EXPECT_THAT(request_protos_, SizeIs(2));
registration = static_cast<RegisterDeviceRequest*>(request_protos_[1]);
EXPECT_FALSE(registration->has_device_identifiers());
}
-// TODO(ckehoe): Add a test for RegisterResponseHandler.
+TEST_F(RpcHandlerTest, RequestQueuing) {
+ // Send a report.
+ ReportRequest* report = new ReportRequest;
+ report->mutable_manage_messages_request()->add_id_to_unpublish("unpublish");
+ SendReport(make_scoped_ptr(report), "Q App ID", "Q Auth Token");
+ EXPECT_THAT(request_queue(), SizeIs(1));
+ EXPECT_EQ("Q Auth Token", request_queue()[0]->auth_token);
+
+ // Check for registration request.
+ EXPECT_THAT(request_protos_, SizeIs(1));
+ const RegisterDeviceRequest* registration =
+ static_cast<RegisterDeviceRequest*>(request_protos_[0]);
+ EXPECT_FALSE(registration->device_identifiers().has_registrant());
+ EXPECT_EQ("Q Auth Token", auth_token_);
+
+ // Send a second report.
+ report = new ReportRequest;
+ report->mutable_manage_subscriptions_request()->add_id_to_unsubscribe(
+ "unsubscribe");
+ SendReport(make_scoped_ptr(report), "Q App ID", "Q Auth Token");
+ EXPECT_THAT(request_protos_, SizeIs(1));
+ EXPECT_THAT(request_queue(), SizeIs(2));
+ EXPECT_EQ("Q Auth Token", request_queue()[1]->auth_token);
+
+ // Send an anonymous report.
+ report = new ReportRequest;
+ report->mutable_update_signals_request()->add_token_observation()
+ ->set_token_id("Q Audio Token");
+ SendReport(make_scoped_ptr(report), "Q App ID", "");
+ EXPECT_THAT(request_queue(), SizeIs(3));
+ EXPECT_EQ("", request_queue()[2]->auth_token);
+
+ // Check for another registration request.
+ EXPECT_THAT(request_protos_, SizeIs(2));
+ registration = static_cast<RegisterDeviceRequest*>(request_protos_[1]);
+ EXPECT_TRUE(registration->device_identifiers().has_registrant());
+ EXPECT_EQ("", auth_token_);
+
+ // Respond to the first registration.
+ SendRegisterResponse("Q Auth Token", "Q Auth Device ID");
+ EXPECT_EQ("Q Auth Device ID", device_id_by_auth_token()["Q Auth Token"]);
+
+ // Check that queued reports are sent.
+ EXPECT_THAT(request_protos_, SizeIs(4));
+ EXPECT_THAT(request_queue(), SizeIs(1));
+ EXPECT_THAT(directive_handler_.removed_directives(),
+ ElementsAre("unpublish", "unsubscribe"));
+ report = static_cast<ReportRequest*>(request_protos_[2]);
+ EXPECT_EQ("unpublish", report->manage_messages_request().id_to_unpublish(0));
+ report = static_cast<ReportRequest*>(request_protos_[3]);
+ EXPECT_EQ("unsubscribe",
+ report->manage_subscriptions_request().id_to_unsubscribe(0));
+
+ // Respond to the second registration.
+ SendRegisterResponse("", "Q Anonymous Device ID");
+ EXPECT_EQ("Q Anonymous Device ID", device_id_by_auth_token()[""]);
+
+ // Check for last report.
+ EXPECT_THAT(request_protos_, SizeIs(5));
+ EXPECT_TRUE(request_queue().empty());
+ report = static_cast<ReportRequest*>(request_protos_[4]);
+ EXPECT_EQ("Q Audio Token",
+ report->update_signals_request().token_observation(0).token_id());
+}
TEST_F(RpcHandlerTest, CreateRequestHeader) {
- SetDeviceIdAndAuthToken("CreateRequestHeader Device ID",
- "CreateRequestHeader Auth Token");
- rpc_handler_.SendReportRequest(make_scoped_ptr(new ReportRequest),
- "CreateRequestHeader App",
- "CreateRequestHeader Auth Token",
- StatusCallback());
+ device_id_by_auth_token()["CreateRequestHeader Auth Token"] =
+ "CreateRequestHeader Device ID";
+ SendReport(make_scoped_ptr(new ReportRequest),
+ "CreateRequestHeader App",
+ "CreateRequestHeader Auth Token");
+
EXPECT_EQ(RpcHandler::kReportRequestRpcName, rpc_name_);
EXPECT_EQ("CreateRequestHeader App API Key", api_key_);
EXPECT_EQ("CreateRequestHeader Auth Token", auth_token_);
@@ -226,8 +322,8 @@ TEST_F(RpcHandlerTest, ReportTokens) {
test_tokens.push_back(AudioToken("token 3", true));
AddInvalidToken("token 2");
- SetDeviceIdAndAuthToken("ReportTokens Device 1", "");
- SetDeviceIdAndAuthToken("ReportTokens Device 2", "ReportTokens Auth");
+ device_id_by_auth_token()[""] = "ReportTokens Anonymous Device";
+ device_id_by_auth_token()["ReportTokens Auth"] = "ReportTokens Auth Device";
rpc_handler_.ReportTokens(test_tokens);
EXPECT_EQ(RpcHandler::kReportRequestRpcName, rpc_name_);
@@ -245,24 +341,23 @@ TEST_F(RpcHandlerTest, ReportTokens) {
TEST_F(RpcHandlerTest, ReportResponseHandler) {
// Fail on HTTP status != 200.
- ReportResponse empty_response;
- empty_response.mutable_header()->mutable_status()->set_code(OK);
- std::string serialized_empty_response;
- ASSERT_TRUE(empty_response.SerializeToString(&serialized_empty_response));
+ scoped_ptr<ReportResponse> response(new ReportResponse);
status_ = SUCCESS;
- InvokeReportResponseHandler(net::HTTP_BAD_REQUEST, serialized_empty_response);
+ SendReportResponse(net::HTTP_BAD_REQUEST, response.Pass());
EXPECT_EQ(FAIL, status_);
+ // Construct test subscriptions.
std::vector<std::string> subscription_1(1, "Subscription 1");
std::vector<std::string> subscription_2(1, "Subscription 2");
std::vector<std::string> both_subscriptions;
both_subscriptions.push_back("Subscription 1");
both_subscriptions.push_back("Subscription 2");
- ReportResponse test_response;
- test_response.mutable_header()->mutable_status()->set_code(OK);
+ // Construct a test ReportResponse.
+ response.reset(new ReportResponse);
+ response->mutable_header()->mutable_status()->set_code(OK);
UpdateSignalsResponse* update_response =
- test_response.mutable_update_signals_response();
+ response->mutable_update_signals_response();
update_response->set_status(util::error::OK);
Token* invalid_token = update_response->add_token();
invalid_token->set_id("bad token");
@@ -276,20 +371,18 @@ TEST_F(RpcHandlerTest, ReportResponseHandler) {
update_response->add_directive()->set_subscription_id("Subscription 1");
update_response->add_directive()->set_subscription_id("Subscription 2");
+ // Process it.
messages_by_subscription_.clear();
- std::string serialized_proto;
- ASSERT_TRUE(test_response.SerializeToString(&serialized_proto));
status_ = FAIL;
- InvokeReportResponseHandler(net::HTTP_OK, serialized_proto);
+ SendReportResponse(net::HTTP_OK, response.Pass());
+ // Check processing.
EXPECT_EQ(SUCCESS, status_);
EXPECT_TRUE(TokenIsInvalid("bad token"));
-
EXPECT_THAT(messages_by_subscription_["Subscription 1"],
ElementsAre("Message A", "Message C"));
EXPECT_THAT(messages_by_subscription_["Subscription 2"],
ElementsAre("Message B", "Message C"));
-
EXPECT_THAT(directive_handler_.added_directives(),
ElementsAre("Subscription 1", "Subscription 2"));
}
diff --git a/components/copresence/test/stub_whispernet_client.cc b/components/copresence/test/stub_whispernet_client.cc
index 955f6b9..ee67f9b 100644
--- a/components/copresence/test/stub_whispernet_client.cc
+++ b/components/copresence/test/stub_whispernet_client.cc
@@ -9,7 +9,8 @@
namespace copresence {
-StubWhispernetClient::StubWhispernetClient() {
+StubWhispernetClient::StubWhispernetClient(bool complete_initialization)
+ : complete_initialization_(complete_initialization) {
tokens_.push_back(AudioToken("abcdef", true));
tokens_.push_back(AudioToken("123456", false));
samples_ = CreateRandomAudioRefCounted(0x123, 1, 0x321);
@@ -18,6 +19,12 @@ StubWhispernetClient::StubWhispernetClient() {
StubWhispernetClient::~StubWhispernetClient() {
}
+void StubWhispernetClient::Initialize(const SuccessCallback& init_callback) {
+ // TODO(ckehoe): Consider updating tests to use this.
+ if (complete_initialization_)
+ init_callback.Run(true);
+}
+
void StubWhispernetClient::EncodeToken(const std::string& token,
AudioType type) {
if (!samples_cb_.is_null())
diff --git a/components/copresence/test/stub_whispernet_client.h b/components/copresence/test/stub_whispernet_client.h
index f4d3536..836e443 100644
--- a/components/copresence/test/stub_whispernet_client.h
+++ b/components/copresence/test/stub_whispernet_client.h
@@ -5,34 +5,43 @@
#ifndef COMPONENTS_COPRESENCE_TEST_STUB_WHISPERNET_CLIENT_H_
#define COMPONENTS_COPRESENCE_TEST_STUB_WHISPERNET_CLIENT_H_
-#include "base/callback.h"
-#include "base/macros.h"
+#include <string>
+#include <vector>
+#include "base/callback_forward.h"
+#include "base/macros.h"
#include "components/copresence/public/whispernet_client.h"
namespace copresence {
-// An empty WhispernetClient for testing.
+// A simple WhispernetClient for testing.
class StubWhispernetClient final : public WhispernetClient {
public:
- StubWhispernetClient();
+ // Constructor. The client can optionally be configured to respond
+ // as if Initialize() has completed. By default it does not.
+ explicit StubWhispernetClient(bool complete_initialization = false);
+
~StubWhispernetClient() override;
- void Initialize(const SuccessCallback& /* init_cb */) override {}
+ void Initialize(const SuccessCallback& init_callback) override;
void Shutdown() override {}
+
void EncodeToken(const std::string& token, AudioType type) override;
void DecodeSamples(AudioType type, const std::string& samples) override;
void DetectBroadcast() override {}
+
void RegisterTokensCallback(const TokensCallback& tokens_cb) override;
void RegisterSamplesCallback(const SamplesCallback& samples_cb) override;
void RegisterDetectBroadcastCallback(
const SuccessCallback& /* db_cb */) override {}
+
TokensCallback GetTokensCallback() override;
SamplesCallback GetSamplesCallback() override;
SuccessCallback GetDetectBroadcastCallback() override;
SuccessCallback GetInitializedCallback() override;
private:
+ bool complete_initialization_;
TokensCallback tokens_cb_;
SamplesCallback samples_cb_;
std::vector<AudioToken> tokens_;