diff options
author | ckehoe <ckehoe@chromium.org> | 2014-11-06 17:46:28 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2014-11-07 01:46:57 +0000 |
commit | 38eadce9b59abe213e1ff1b830ad45d382114a23 (patch) | |
tree | b7181ffb2220775dfbe013bda9e93880ea4838f0 /components/copresence | |
parent | acd5661350286666f1ca0d513bef340a743ca651 (diff) | |
download | chromium_src-38eadce9b59abe213e1ff1b830ad45d382114a23.zip chromium_src-38eadce9b59abe213e1ff1b830ad45d382114a23.tar.gz chromium_src-38eadce9b59abe213e1ff1b830ad45d382114a23.tar.bz2 |
Moving request queuing to the RpcHandler (now with tests!). Also passing in the auth token with ExecuteReportRequest rather than through the delegate.
Review URL: https://codereview.chromium.org/705663005
Cr-Commit-Position: refs/heads/master@{#303159}
Diffstat (limited to 'components/copresence')
-rw-r--r-- | components/copresence/copresence_manager_impl.cc | 131 | ||||
-rw-r--r-- | components/copresence/copresence_manager_impl.h | 35 | ||||
-rw-r--r-- | components/copresence/public/copresence_constants.h | 6 | ||||
-rw-r--r-- | components/copresence/public/copresence_delegate.h | 1 | ||||
-rw-r--r-- | components/copresence/public/copresence_manager.h | 8 | ||||
-rw-r--r-- | components/copresence/rpc/rpc_handler.cc | 222 | ||||
-rw-r--r-- | components/copresence/rpc/rpc_handler.h | 50 | ||||
-rw-r--r-- | components/copresence/rpc/rpc_handler_unittest.cc | 171 | ||||
-rw-r--r-- | components/copresence/test/stub_whispernet_client.cc | 9 | ||||
-rw-r--r-- | components/copresence/test/stub_whispernet_client.h | 19 |
10 files changed, 370 insertions, 282 deletions
diff --git a/components/copresence/copresence_manager_impl.cc b/components/copresence/copresence_manager_impl.cc index 5ad6773..265b042 100644 --- a/components/copresence/copresence_manager_impl.cc +++ b/components/copresence/copresence_manager_impl.cc @@ -4,6 +4,8 @@ #include "components/copresence/copresence_manager_impl.h" +#include <vector> + #include "base/bind.h" #include "base/strings/stringprintf.h" #include "base/timer/timer.h" @@ -14,8 +16,6 @@ namespace { -// Number of characters of suffix to log for auth tokens -const int kTokenSuffix = 5; const int kPollTimerIntervalMs = 3000; // milliseconds. const int kAudioCheckIntervalMs = 1000; // milliseconds. @@ -23,25 +23,25 @@ const int kAudioCheckIntervalMs = 1000; // milliseconds. namespace copresence { -PendingRequest::PendingRequest(const ReportRequest& report, - const std::string& app_id, - const std::string& auth_token, - const StatusCallback& callback) - : report(new ReportRequest(report)), - app_id(app_id), - auth_token(auth_token), - callback(callback) {} - -PendingRequest::~PendingRequest() {} - -// static -scoped_ptr<CopresenceManager> CopresenceManager::Create( - CopresenceDelegate* delegate) { - return make_scoped_ptr(new CopresenceManagerImpl(delegate)); -} +// Public functions. +CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) + : delegate_(delegate), + whispernet_init_callback_(base::Bind( + &CopresenceManagerImpl::WhispernetInitComplete, + // This callback gets cancelled when we are destroyed. + base::Unretained(this))), + directive_handler_(new DirectiveHandler), + poll_timer_(new base::RepeatingTimer<CopresenceManagerImpl>), + audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>), + init_failed_(false) { + DCHECK(delegate_); + DCHECK(delegate_->GetWhispernetClient()); + delegate_->GetWhispernetClient()->Initialize( + whispernet_init_callback_.callback()); -// Public functions. + rpc_handler_.reset(new RpcHandler(delegate_, directive_handler_.get())); +} CopresenceManagerImpl::~CopresenceManagerImpl() { whispernet_init_callback_.Cancel(); @@ -51,6 +51,7 @@ CopresenceManagerImpl::~CopresenceManagerImpl() { void CopresenceManagerImpl::ExecuteReportRequest( const ReportRequest& request, const std::string& app_id, + const std::string& auth_token, const StatusCallback& callback) { // If initialization has failed, reject all requests. if (init_failed_) { @@ -58,67 +59,19 @@ void CopresenceManagerImpl::ExecuteReportRequest( return; } - // Check if we are initialized enough to execute this request. - // If we haven't seen this auth token yet, we need to register for it. - // TODO(ckehoe): Queue per device ID instead of globally. - const std::string& auth_token = delegate_->GetAuthToken(); - if (!rpc_handler_->IsRegisteredForToken(auth_token)) { - std::string token_str = auth_token.empty() ? "(anonymous)" : - base::StringPrintf("(token ...%s)", - auth_token.substr(auth_token.length() - kTokenSuffix, - kTokenSuffix).c_str()); - rpc_handler_->RegisterForToken( - auth_token, - // The manager owns the RpcHandler, so this callback cannot outlive us. - base::Bind(&CopresenceManagerImpl::InitStepComplete, - base::Unretained(this), - "Device registration " + token_str)); - pending_init_operations_++; - } - - // Execute the request if possible, or queue it - // if initialization is still in progress. - if (pending_init_operations_) { - pending_requests_queue_.push_back( - new PendingRequest(request, app_id, auth_token, callback)); - } else { - rpc_handler_->SendReportRequest( - make_scoped_ptr(new ReportRequest(request)), - app_id, - auth_token, - callback); - } + // We'll need to modify the ReportRequest, so we make our own copy to send. + scoped_ptr<ReportRequest> request_copy(new ReportRequest(request)); + rpc_handler_->SendReportRequest( + request_copy.Pass(), app_id, auth_token, callback); } // Private functions. -CopresenceManagerImpl::CopresenceManagerImpl(CopresenceDelegate* delegate) - : delegate_(delegate), - pending_init_operations_(0), - // This callback gets cancelled when we are destroyed. - whispernet_init_callback_( - base::Bind(&CopresenceManagerImpl::InitStepComplete, - base::Unretained(this), - "Whispernet proxy initialization")), - init_failed_(false), - directive_handler_(new DirectiveHandler), - poll_timer_(new base::RepeatingTimer<CopresenceManagerImpl>), - audio_check_timer_(new base::RepeatingTimer<CopresenceManagerImpl>) { - DCHECK(delegate); - DCHECK(delegate->GetWhispernetClient()); - - rpc_handler_.reset(new RpcHandler(delegate, directive_handler_.get())); - delegate->GetWhispernetClient()->Initialize( - whispernet_init_callback_.callback()); - pending_init_operations_++; -} - -void CopresenceManagerImpl::CompleteInitialization() { - if (pending_init_operations_) - return; +void CopresenceManagerImpl::WhispernetInitComplete(bool success) { + if (success) { + DVLOG(3) << "Whispernet initialized successfully."; - if (!init_failed_) { // We destroy |directive_handler_| before |rpc_handler_|, hence passing // in |rpc_handler_|'s pointer is safe here. directive_handler_->Start(delegate_->GetWhispernetClient(), @@ -133,36 +86,10 @@ void CopresenceManagerImpl::CompleteInitialization() { audio_check_timer_->Start( FROM_HERE, base::TimeDelta::FromMilliseconds(kAudioCheckIntervalMs), base::Bind(&CopresenceManagerImpl::AudioCheck, base::Unretained(this))); - } - - // Not const because SendReportRequest takes ownership of the ReportRequests. - // This is ok though, as the entire queue is deleted afterwards. - for (PendingRequest* request : pending_requests_queue_) { - if (init_failed_) { - request->callback.Run(FAIL); - } else { - rpc_handler_->SendReportRequest( - request->report.Pass(), - request->app_id, - request->auth_token, - request->callback); - } - } - pending_requests_queue_.clear(); -} - -void CopresenceManagerImpl::InitStepComplete( - const std::string& step, bool success) { - if (!success) { - LOG(ERROR) << step << " failed!"; + } else { + LOG(ERROR) << "Whispernet initialization failed!"; init_failed_ = true; - // TODO(ckehoe): Retry for registration failures. But maybe not here. } - - DVLOG(3) << step << " complete."; - DCHECK(pending_init_operations_ > 0); - pending_init_operations_--; - CompleteInitialization(); } void CopresenceManagerImpl::PollForMessages() { diff --git a/components/copresence/copresence_manager_impl.h b/components/copresence/copresence_manager_impl.h index 4007ab3..12bc051 100644 --- a/components/copresence/copresence_manager_impl.h +++ b/components/copresence/copresence_manager_impl.h @@ -10,7 +10,6 @@ #include "base/cancelable_callback.h" #include "base/macros.h" #include "base/memory/scoped_ptr.h" -#include "base/memory/scoped_vector.h" #include "components/copresence/public/copresence_manager.h" namespace base { @@ -26,19 +25,7 @@ namespace copresence { class DirectiveHandler; class ReportRequest; class RpcHandler; - -struct PendingRequest { - PendingRequest(const ReportRequest& report, - const std::string& app_id, - const std::string& auth_token, - const StatusCallback& callback); - ~PendingRequest(); - - scoped_ptr<ReportRequest> report; - std::string app_id; - std::string auth_token; - StatusCallback callback; -}; +class WhispernetClient; // The implementation for CopresenceManager. Responsible primarily for // client-side initialization. The RpcHandler handles all the details @@ -46,18 +33,18 @@ struct PendingRequest { // TODO(rkc): Add tests for this class. class CopresenceManagerImpl : public CopresenceManager { public: + // The delegate is owned by the caller, and must outlive the manager. + explicit CopresenceManagerImpl(CopresenceDelegate* delegate); + ~CopresenceManagerImpl() override; + void ExecuteReportRequest(const ReportRequest& request, const std::string& app_id, + const std::string& auth_token, const StatusCallback& callback) override; private: - // Create managers with the CopresenceManager::Create() method. - friend class CopresenceManager; - CopresenceManagerImpl(CopresenceDelegate* delegate); - - void CompleteInitialization(); - void InitStepComplete(const std::string& step, bool success); + void WhispernetInitComplete(bool success); // This function will be called every kPollTimerIntervalMs milliseconds to // poll the server for new messages. @@ -70,11 +57,9 @@ class CopresenceManagerImpl : public CopresenceManager { // Belongs to the caller. CopresenceDelegate* const delegate_; - int pending_init_operations_; + // We use a CancelableCallback here because Whispernet + // does not provide a way to unregister its init callback. base::CancelableCallback<void(bool)> whispernet_init_callback_; - bool init_failed_; - - ScopedVector<PendingRequest> pending_requests_queue_; // The |directive handler_| needs to destruct before |rpc_handler_|, do not // change this order. @@ -84,6 +69,8 @@ class CopresenceManagerImpl : public CopresenceManager { scoped_ptr<base::Timer> poll_timer_; scoped_ptr<base::Timer> audio_check_timer_; + bool init_failed_; + DISALLOW_COPY_AND_ASSIGN(CopresenceManagerImpl); }; diff --git a/components/copresence/public/copresence_constants.h b/components/copresence/public/copresence_constants.h index 8172e56..68f8a90 100644 --- a/components/copresence/public/copresence_constants.h +++ b/components/copresence/public/copresence_constants.h @@ -2,8 +2,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#ifndef COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_CONSTANTS_ -#define COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_CONSTANTS_ +#ifndef COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_CONSTANTS_H_ +#define COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_CONSTANTS_H_ #include <string> #include <vector> @@ -74,4 +74,4 @@ using SamplesCallback = const scoped_refptr<media::AudioBusRefCounted>&)>; } // namespace copresence -#endif // COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_CONSTANTS_ +#endif // COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_CONSTANTS_H_ diff --git a/components/copresence/public/copresence_delegate.h b/components/copresence/public/copresence_delegate.h index 3a7aca6..b8a01bf 100644 --- a/components/copresence/public/copresence_delegate.h +++ b/components/copresence/public/copresence_delegate.h @@ -46,7 +46,6 @@ class CopresenceDelegate { virtual const std::string GetPlatformVersionString() const = 0; virtual const std::string GetAPIKey(const std::string& app_id) const = 0; - virtual const std::string GetAuthToken() const = 0; // Thw WhispernetClient must outlive the CopresenceManager. virtual WhispernetClient* GetWhispernetClient() = 0; diff --git a/components/copresence/public/copresence_manager.h b/components/copresence/public/copresence_manager.h index 1833f23..960c601 100644 --- a/components/copresence/public/copresence_manager.h +++ b/components/copresence/public/copresence_manager.h @@ -5,6 +5,8 @@ #ifndef COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_MANAGER_H_ #define COMPONENTS_COPRESENCE_PUBLIC_COPRESENCE_MANAGER_H_ +#include <string> + #include "base/memory/scoped_ptr.h" #include "components/copresence/public/copresence_delegate.h" @@ -28,14 +30,10 @@ class CopresenceManager { // to relay it to the requester. virtual void ExecuteReportRequest(const ReportRequest& request, const std::string& app_id, + const std::string& auth_token, const StatusCallback& callback) = 0; - // Factory method for CopresenceManagers. The delegate is owned - // by the caller, and must outlive the manager. - static scoped_ptr<CopresenceManager> Create(CopresenceDelegate* delegate); - private: - DISALLOW_COPY_AND_ASSIGN(CopresenceManager); }; diff --git a/components/copresence/rpc/rpc_handler.cc b/components/copresence/rpc/rpc_handler.cc index 50aebba..7c751ab 100644 --- a/components/copresence/rpc/rpc_handler.cc +++ b/components/copresence/rpc/rpc_handler.cc @@ -4,13 +4,12 @@ #include "components/copresence/rpc/rpc_handler.h" -#include <map> - #include "base/bind.h" #include "base/command_line.h" #include "base/guid.h" #include "base/logging.h" #include "base/strings/string_util.h" +#include "base/strings/stringprintf.h" // TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities // to DeviceCapabilitiesW. This breaks the pb.h headers below. For now, @@ -39,6 +38,9 @@ using google::protobuf::RepeatedPtrField; const char RpcHandler::kReportRequestRpcName[] = "report"; +// Number of characters of suffix to log for auth tokens +const int kTokenSuffix = 5; + namespace { // UrlSafe is defined as: @@ -59,7 +61,7 @@ const char kDefaultCopresenceServer[] = // Logging // Checks for a copresence error. If there is one, logs it and returns true. -bool CopresenceErrorLogged(const Status& status) { +bool IsErrorStatus(const Status& status) { if (status.code() != OK) { LOG(ERROR) << "Copresence error code " << status.code() << (status.message().empty() ? "" : ": " + status.message()); @@ -76,7 +78,7 @@ void LogIfErrorStatus(const util::error::Code& code, // If any errors occurred, logs them and returns true. bool ReportErrorLogged(const ReportResponse& response) { - bool result = CopresenceErrorLogged(response.header().status()); + bool result = IsErrorStatus(response.header().status()); // The Report fails or succeeds as a unit. If any responses had errors, // the header will too. Thus we don't need to propagate individual errors. @@ -147,19 +149,28 @@ void AddTokenToRequest(const AudioToken& token, ReportRequest* request) { signals->set_observed_time_millis(base::Time::Now().ToJsTime()); } +const std::string LoggingStrForToken(const std::string& auth_token) { + std::string token_str = auth_token.empty() ? "anonymous" : + base::StringPrintf("token ...%s", + auth_token.substr(auth_token.length() - kTokenSuffix, + kTokenSuffix).c_str()); + return token_str; +} + } // namespace -// Public methods + +// Public functions. RpcHandler::RpcHandler(CopresenceDelegate* delegate, DirectiveHandler* directive_handler, const PostCallback& server_post_callback) : delegate_(delegate), directive_handler_(directive_handler), + server_post_callback_(server_post_callback), invalid_audio_token_cache_( base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs), - kMaxInvalidTokens), - server_post_callback_(server_post_callback) { + kMaxInvalidTokens) { DCHECK(delegate_); DCHECK(directive_handler_); @@ -170,70 +181,35 @@ RpcHandler::RpcHandler(CopresenceDelegate* delegate, } RpcHandler::~RpcHandler() { - // Do not use |directive_handler_| here, it will already have been - // destructed. + // Do not use |directive_handler_| here. + // It will already have been destructed. for (HttpPost* post : pending_posts_) delete post; } -void RpcHandler::RegisterForToken(const std::string& auth_token, - const SuccessCallback& init_done_callback) { - if (IsRegisteredForToken(auth_token)) { - LOG(WARNING) << "Attempted re-registration for the same auth token."; - init_done_callback.Run(true); - return; - } - scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest); - - request->mutable_push_service()->set_service(PUSH_SERVICE_NONE); - - DVLOG(2) << "Sending " << (auth_token.empty() ? "anonymous" : "authenticated") - << " registration to server."; - - // Only identify as a Chrome device if we're in anonymous mode. - // Authenticated calls come from a "GAIA device". - if (auth_token.empty()) { - Identity* identity = - request->mutable_device_identifiers()->mutable_registrant(); - identity->set_type(CHROME); - identity->set_chrome_id(base::GenerateGUID()); - } - - SendServerRequest( - kRegisterDeviceRpcName, - std::string(), // device ID - std::string(), // app ID - auth_token, - request.Pass(), - base::Bind(&RpcHandler::RegisterResponseHandler, - // On destruction, this request will be cancelled. - base::Unretained(this), - init_done_callback, - auth_token)); -} - -bool RpcHandler::IsRegisteredForToken(const std::string& auth_token) const { - return device_id_by_auth_token_.find(auth_token) != - device_id_by_auth_token_.end(); -} - -void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request, - const std::string& auth_token) { - SendReportRequest(request.Pass(), - std::string(), - auth_token, - StatusCallback()); -} - void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request, const std::string& app_id, const std::string& auth_token, const StatusCallback& status_callback) { DCHECK(request.get()); - auto registration_entry = device_id_by_auth_token_.find(auth_token); - DCHECK(registration_entry != device_id_by_auth_token_.end()) - << "RegisterForToken() must complete successfully " - << "for new tokens before calling SendReportRequest()."; + + // Check that we have a "device" registered for this auth token. + bool queue_request = true; + const auto& registration = device_id_by_auth_token_.find(auth_token); + if (registration == device_id_by_auth_token_.end()) { + // Not registered. + RegisterForToken(auth_token); + } else if (!registration->second.empty()) { + // Registration complete. + queue_request = false; + } + + // We're not registered, or registration is in progress. + if (queue_request) { + pending_requests_queue_.push_back(new PendingRequest( + request.Pass(), app_id, auth_token, status_callback)); + return; + } DVLOG(3) << "Sending ReportRequest to server."; @@ -247,7 +223,7 @@ void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request, AddPlayingTokens(request.get()); SendServerRequest(kReportRequestRpcName, - registration_entry->second, + registration->second, app_id, auth_token, request.Pass(), @@ -284,41 +260,121 @@ void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) { // Private methods +RpcHandler::PendingRequest::PendingRequest(scoped_ptr<ReportRequest> report, + const std::string& app_id, + const std::string& auth_token, + const StatusCallback& callback) + : report(report.Pass()), + app_id(app_id), + auth_token(auth_token), + callback(callback) {} + +RpcHandler::PendingRequest::~PendingRequest() {} + +void RpcHandler::RegisterForToken(const std::string& auth_token) { + DVLOG(2) << "Sending " << LoggingStrForToken(auth_token) + << " registration to server."; + + // Mark registration as in progress. + device_id_by_auth_token_[auth_token] = ""; + + scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest); + request->mutable_push_service()->set_service(PUSH_SERVICE_NONE); + + // Only identify as a Chrome device if we're in anonymous mode. + // Authenticated calls come from a "GAIA device". + if (auth_token.empty()) { + Identity* identity = + request->mutable_device_identifiers()->mutable_registrant(); + identity->set_type(CHROME); + identity->set_chrome_id(base::GenerateGUID()); + } + + SendServerRequest( + kRegisterDeviceRpcName, + std::string(), // device ID + std::string(), // app ID + auth_token, + request.Pass(), + base::Bind(&RpcHandler::RegisterResponseHandler, + // On destruction, this request will be cancelled. + base::Unretained(this), + auth_token)); +} + +void RpcHandler::ProcessQueuedRequests(const std::string& auth_token) { + // Track requests that are not on this auth token. + ScopedVector<PendingRequest> still_pending_requests; + + // If there is no device ID for this auth token, registration failed. + bool registration_failed = + (device_id_by_auth_token_.count(auth_token) == 0); + + // We momentarily take ownership of all the pointers in the queue. + // They are either deleted here or passed on to a new queue. + for (PendingRequest* request : pending_requests_queue_) { + if (request->auth_token == auth_token) { + if (registration_failed) { + request->callback.Run(FAIL); + } else { + SendReportRequest(request->report.Pass(), + request->app_id, + request->auth_token, + request->callback); + } + delete request; + } else { + // The request is on a different auth token. + still_pending_requests.push_back(request); + } + } + + // Only keep the requests that weren't processed. + // All the pointers in the queue are now spoken for. + pending_requests_queue_.weak_clear(); + pending_requests_queue_ = still_pending_requests.Pass(); +} + +void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request, + const std::string& auth_token) { + SendReportRequest(request.Pass(), + std::string(), + auth_token, + StatusCallback()); +} + void RpcHandler::RegisterResponseHandler( - const SuccessCallback& init_done_callback, const std::string& auth_token, HttpPost* completed_post, int http_status_code, const std::string& response_data) { if (completed_post) { int elements_erased = pending_posts_.erase(completed_post); - DCHECK(elements_erased); + DCHECK_GT(elements_erased, 0); delete completed_post; } - if (http_status_code != net::HTTP_OK) { - init_done_callback.Run(false); - return; - } + // Registration is no longer in progress. + // If it was successful, we'll update below. + device_id_by_auth_token_.erase(auth_token); RegisterDeviceResponse response; - if (!response.ParseFromString(response_data)) { + if (http_status_code != net::HTTP_OK) { + // TODO(ckehoe): Retry registration if appropriate. + LOG(ERROR) << LoggingStrForToken(auth_token) + << " device registration failed"; + } else if (!response.ParseFromString(response_data)) { LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data; - init_done_callback.Run(false); - return; - } - - if (CopresenceErrorLogged(response.header().status())) { - init_done_callback.Run(false); - return; + } else if (!IsErrorStatus(response.header().status())) { + const std::string& device_id = response.registered_device_id(); + DCHECK(!device_id.empty()); + device_id_by_auth_token_[auth_token] = device_id; + DVLOG(2) << LoggingStrForToken(auth_token) + << " device registration successful. Id: " << device_id; } - const std::string& device_id = response.registered_device_id(); - DCHECK(!device_id.empty()); - device_id_by_auth_token_[auth_token] = device_id; - DVLOG(2) << (auth_token.empty() ? "Anonymous" : "Authenticated") - << " device registration successful: id " << device_id; - init_done_callback.Run(true); + // Send or fail requests on this auth token. + ProcessQueuedRequests(auth_token); } void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback, diff --git a/components/copresence/rpc/rpc_handler.h b/components/copresence/rpc/rpc_handler.h index fca80eb..e2e999f 100644 --- a/components/copresence/rpc/rpc_handler.h +++ b/components/copresence/rpc/rpc_handler.h @@ -5,12 +5,14 @@ #ifndef COMPONENTS_COPRESENCE_RPC_RPC_HANDLER_H_ #define COMPONENTS_COPRESENCE_RPC_RPC_HANDLER_H_ +#include <map> #include <set> #include <string> #include <vector> #include "base/callback_forward.h" #include "base/memory/scoped_ptr.h" +#include "base/memory/scoped_vector.h" #include "components/copresence/proto/enums.pb.h" #include "components/copresence/public/copresence_delegate.h" #include "components/copresence/timed_map.h" @@ -60,27 +62,13 @@ class RpcHandler { // Constructor. |delegate| and |directive_handler| // are owned by the caller and must outlive the RpcHandler. + // |server_post_callback| should be set only by tests. RpcHandler(CopresenceDelegate* delegate, DirectiveHandler* directive_handler, const PostCallback& server_post_callback = PostCallback()); virtual ~RpcHandler(); - // Before accepting any other calls, the server requires registration, - // which is tied to the auth token (or lack thereof) used to call Report. - // Clients must call RegisterForToken() for each new token, - // *including the empty token*, they need to pass to SendReportRequest(), - // and then wait for |init_done_callback| to be invoked. - void RegisterForToken(const std::string& auth_token, - const SuccessCallback& init_done_callback); - - // Check if a given auth token is already active (registered). - bool IsRegisteredForToken(const std::string& auth_token) const; - - // Send a ReportRequest from Chrome itself, i.e. no app id. - void SendReportRequest(scoped_ptr<ReportRequest> request, - const std::string& auth_token); - // Send a ReportRequest from a specific app, and get notified of completion. void SendReportRequest(scoped_ptr<ReportRequest> request, const std::string& app_id, @@ -92,11 +80,35 @@ class RpcHandler { void ReportTokens(const std::vector<AudioToken>& tokens); private: + // A queued ReportRequest along with its metadata. + struct PendingRequest { + PendingRequest(scoped_ptr<ReportRequest> report, + const std::string& app_id, + const std::string& auth_token, + const StatusCallback& callback); + ~PendingRequest(); + + scoped_ptr<ReportRequest> report; + const std::string app_id; + const std::string auth_token; + const StatusCallback callback; + }; + friend class RpcHandlerTest; + // Before accepting any other calls, the server requires registration, + // which is tied to the auth token (or lack thereof) used to call Report. + void RegisterForToken(const std::string& auth_token); + + // Device registration has completed. Send the requests that it was blocking. + void ProcessQueuedRequests(const std::string& auth_token); + + // Send a ReportRequest from Chrome itself, i.e. no app id. + void SendReportRequest(scoped_ptr<ReportRequest> request, + const std::string& auth_token); + // Server call response handlers. - void RegisterResponseHandler(const SuccessCallback& init_done_callback, - const std::string& auth_token, + void RegisterResponseHandler(const std::string& auth_token, HttpPost* completed_post, int http_status_code, const std::string& response_data); @@ -143,10 +155,10 @@ class RpcHandler { CopresenceDelegate* delegate_; DirectiveHandler* directive_handler_; - TimedMap<std::string, bool> invalid_audio_token_cache_; - PostCallback server_post_callback_; + ScopedVector<PendingRequest> pending_requests_queue_; + TimedMap<std::string, bool> invalid_audio_token_cache_; std::map<std::string, std::string> device_id_by_auth_token_; std::set<HttpPost*> pending_posts_; diff --git a/components/copresence/rpc/rpc_handler_unittest.cc b/components/copresence/rpc/rpc_handler_unittest.cc index d9b3813..7efe2f6 100644 --- a/components/copresence/rpc/rpc_handler_unittest.cc +++ b/components/copresence/rpc/rpc_handler_unittest.cc @@ -52,6 +52,10 @@ class FakeDirectiveHandler final : public DirectiveHandler { return added_directives_; } + const std::vector<std::string>& removed_directives() const { + return removed_directives_; + } + void Start(WhispernetClient* /* whispernet_client */, const TokensCallback& /* tokens_cb */) override { NOTREACHED(); @@ -62,7 +66,7 @@ class FakeDirectiveHandler final : public DirectiveHandler { } void RemoveDirectives(const std::string& op_id) override { - NOTREACHED(); + removed_directives_.push_back(op_id); } const std::string GetCurrentAudioToken(AudioType type) const override { @@ -73,6 +77,7 @@ class FakeDirectiveHandler final : public DirectiveHandler { private: std::vector<std::string> added_directives_; + std::vector<std::string> removed_directives_; DISALLOW_COPY_AND_ASSIGN(FakeDirectiveHandler); }; @@ -114,28 +119,58 @@ class RpcHandlerTest : public testing::Test, public CopresenceDelegate { return app_id + " API Key"; } - const std::string GetAuthToken() const override { - return auth_token_; - } - WhispernetClient* GetWhispernetClient() override { return whispernet_client_.get(); } protected: - void InvokeReportResponseHandler(int status_code, - const std::string& response) { + + // Send test input to RpcHandler + + void RegisterForToken(const std::string& auth_token) { + rpc_handler_.RegisterForToken(auth_token); + } + + void SendRegisterResponse(const std::string& auth_token, + const std::string& device_id) { + RegisterDeviceResponse response; + response.set_registered_device_id(device_id); + response.mutable_header()->mutable_status()->set_code(OK); + + std::string serialized_response; + response.SerializeToString(&serialized_response); + rpc_handler_.RegisterResponseHandler( + auth_token, nullptr, net::HTTP_OK, serialized_response); + } + + void SendReport(scoped_ptr<ReportRequest> request, + const std::string& app_id, + const std::string& auth_token) { + rpc_handler_.SendReportRequest( + request.Pass(), app_id, auth_token, StatusCallback()); + } + + void SendReportResponse(int status_code, + scoped_ptr<ReportResponse> response) { + response->mutable_header()->mutable_status()->set_code(OK); + + std::string serialized_response; + response->SerializeToString(&serialized_response); rpc_handler_.ReportResponseHandler( base::Bind(&RpcHandlerTest::CaptureStatus, base::Unretained(this)), nullptr, status_code, - response); + serialized_response); } - void SetDeviceIdAndAuthToken(const std::string& device_id, - const std::string& auth_token) { - rpc_handler_.device_id_by_auth_token_[auth_token] = device_id; - auth_token_ = auth_token; + // Read and modify RpcHandler state + + const ScopedVector<RpcHandler::PendingRequest>& request_queue() const { + return rpc_handler_.pending_requests_queue_; + } + + std::map<std::string, std::string>& device_id_by_auth_token() { + return rpc_handler_.device_id_by_auth_token_; } void AddInvalidToken(const std::string& token) { @@ -179,9 +214,8 @@ class RpcHandlerTest : public testing::Test, public CopresenceDelegate { } }; -TEST_F(RpcHandlerTest, RegisterDevice) { - EXPECT_FALSE(rpc_handler_.IsRegisteredForToken("")); - rpc_handler_.RegisterForToken("", RpcHandler::SuccessCallback()); +TEST_F(RpcHandlerTest, RegisterForToken) { + RegisterForToken(""); EXPECT_THAT(request_protos_, SizeIs(1)); const RegisterDeviceRequest* registration = static_cast<RegisterDeviceRequest*>(request_protos_[0]); @@ -189,22 +223,84 @@ TEST_F(RpcHandlerTest, RegisterDevice) { EXPECT_EQ(CHROME, identity.type()); EXPECT_FALSE(identity.chrome_id().empty()); - EXPECT_FALSE(rpc_handler_.IsRegisteredForToken("abc")); - rpc_handler_.RegisterForToken("abc", RpcHandler::SuccessCallback()); + RegisterForToken("abc"); EXPECT_THAT(request_protos_, SizeIs(2)); registration = static_cast<RegisterDeviceRequest*>(request_protos_[1]); EXPECT_FALSE(registration->has_device_identifiers()); } -// TODO(ckehoe): Add a test for RegisterResponseHandler. +TEST_F(RpcHandlerTest, RequestQueuing) { + // Send a report. + ReportRequest* report = new ReportRequest; + report->mutable_manage_messages_request()->add_id_to_unpublish("unpublish"); + SendReport(make_scoped_ptr(report), "Q App ID", "Q Auth Token"); + EXPECT_THAT(request_queue(), SizeIs(1)); + EXPECT_EQ("Q Auth Token", request_queue()[0]->auth_token); + + // Check for registration request. + EXPECT_THAT(request_protos_, SizeIs(1)); + const RegisterDeviceRequest* registration = + static_cast<RegisterDeviceRequest*>(request_protos_[0]); + EXPECT_FALSE(registration->device_identifiers().has_registrant()); + EXPECT_EQ("Q Auth Token", auth_token_); + + // Send a second report. + report = new ReportRequest; + report->mutable_manage_subscriptions_request()->add_id_to_unsubscribe( + "unsubscribe"); + SendReport(make_scoped_ptr(report), "Q App ID", "Q Auth Token"); + EXPECT_THAT(request_protos_, SizeIs(1)); + EXPECT_THAT(request_queue(), SizeIs(2)); + EXPECT_EQ("Q Auth Token", request_queue()[1]->auth_token); + + // Send an anonymous report. + report = new ReportRequest; + report->mutable_update_signals_request()->add_token_observation() + ->set_token_id("Q Audio Token"); + SendReport(make_scoped_ptr(report), "Q App ID", ""); + EXPECT_THAT(request_queue(), SizeIs(3)); + EXPECT_EQ("", request_queue()[2]->auth_token); + + // Check for another registration request. + EXPECT_THAT(request_protos_, SizeIs(2)); + registration = static_cast<RegisterDeviceRequest*>(request_protos_[1]); + EXPECT_TRUE(registration->device_identifiers().has_registrant()); + EXPECT_EQ("", auth_token_); + + // Respond to the first registration. + SendRegisterResponse("Q Auth Token", "Q Auth Device ID"); + EXPECT_EQ("Q Auth Device ID", device_id_by_auth_token()["Q Auth Token"]); + + // Check that queued reports are sent. + EXPECT_THAT(request_protos_, SizeIs(4)); + EXPECT_THAT(request_queue(), SizeIs(1)); + EXPECT_THAT(directive_handler_.removed_directives(), + ElementsAre("unpublish", "unsubscribe")); + report = static_cast<ReportRequest*>(request_protos_[2]); + EXPECT_EQ("unpublish", report->manage_messages_request().id_to_unpublish(0)); + report = static_cast<ReportRequest*>(request_protos_[3]); + EXPECT_EQ("unsubscribe", + report->manage_subscriptions_request().id_to_unsubscribe(0)); + + // Respond to the second registration. + SendRegisterResponse("", "Q Anonymous Device ID"); + EXPECT_EQ("Q Anonymous Device ID", device_id_by_auth_token()[""]); + + // Check for last report. + EXPECT_THAT(request_protos_, SizeIs(5)); + EXPECT_TRUE(request_queue().empty()); + report = static_cast<ReportRequest*>(request_protos_[4]); + EXPECT_EQ("Q Audio Token", + report->update_signals_request().token_observation(0).token_id()); +} TEST_F(RpcHandlerTest, CreateRequestHeader) { - SetDeviceIdAndAuthToken("CreateRequestHeader Device ID", - "CreateRequestHeader Auth Token"); - rpc_handler_.SendReportRequest(make_scoped_ptr(new ReportRequest), - "CreateRequestHeader App", - "CreateRequestHeader Auth Token", - StatusCallback()); + device_id_by_auth_token()["CreateRequestHeader Auth Token"] = + "CreateRequestHeader Device ID"; + SendReport(make_scoped_ptr(new ReportRequest), + "CreateRequestHeader App", + "CreateRequestHeader Auth Token"); + EXPECT_EQ(RpcHandler::kReportRequestRpcName, rpc_name_); EXPECT_EQ("CreateRequestHeader App API Key", api_key_); EXPECT_EQ("CreateRequestHeader Auth Token", auth_token_); @@ -226,8 +322,8 @@ TEST_F(RpcHandlerTest, ReportTokens) { test_tokens.push_back(AudioToken("token 3", true)); AddInvalidToken("token 2"); - SetDeviceIdAndAuthToken("ReportTokens Device 1", ""); - SetDeviceIdAndAuthToken("ReportTokens Device 2", "ReportTokens Auth"); + device_id_by_auth_token()[""] = "ReportTokens Anonymous Device"; + device_id_by_auth_token()["ReportTokens Auth"] = "ReportTokens Auth Device"; rpc_handler_.ReportTokens(test_tokens); EXPECT_EQ(RpcHandler::kReportRequestRpcName, rpc_name_); @@ -245,24 +341,23 @@ TEST_F(RpcHandlerTest, ReportTokens) { TEST_F(RpcHandlerTest, ReportResponseHandler) { // Fail on HTTP status != 200. - ReportResponse empty_response; - empty_response.mutable_header()->mutable_status()->set_code(OK); - std::string serialized_empty_response; - ASSERT_TRUE(empty_response.SerializeToString(&serialized_empty_response)); + scoped_ptr<ReportResponse> response(new ReportResponse); status_ = SUCCESS; - InvokeReportResponseHandler(net::HTTP_BAD_REQUEST, serialized_empty_response); + SendReportResponse(net::HTTP_BAD_REQUEST, response.Pass()); EXPECT_EQ(FAIL, status_); + // Construct test subscriptions. std::vector<std::string> subscription_1(1, "Subscription 1"); std::vector<std::string> subscription_2(1, "Subscription 2"); std::vector<std::string> both_subscriptions; both_subscriptions.push_back("Subscription 1"); both_subscriptions.push_back("Subscription 2"); - ReportResponse test_response; - test_response.mutable_header()->mutable_status()->set_code(OK); + // Construct a test ReportResponse. + response.reset(new ReportResponse); + response->mutable_header()->mutable_status()->set_code(OK); UpdateSignalsResponse* update_response = - test_response.mutable_update_signals_response(); + response->mutable_update_signals_response(); update_response->set_status(util::error::OK); Token* invalid_token = update_response->add_token(); invalid_token->set_id("bad token"); @@ -276,20 +371,18 @@ TEST_F(RpcHandlerTest, ReportResponseHandler) { update_response->add_directive()->set_subscription_id("Subscription 1"); update_response->add_directive()->set_subscription_id("Subscription 2"); + // Process it. messages_by_subscription_.clear(); - std::string serialized_proto; - ASSERT_TRUE(test_response.SerializeToString(&serialized_proto)); status_ = FAIL; - InvokeReportResponseHandler(net::HTTP_OK, serialized_proto); + SendReportResponse(net::HTTP_OK, response.Pass()); + // Check processing. EXPECT_EQ(SUCCESS, status_); EXPECT_TRUE(TokenIsInvalid("bad token")); - EXPECT_THAT(messages_by_subscription_["Subscription 1"], ElementsAre("Message A", "Message C")); EXPECT_THAT(messages_by_subscription_["Subscription 2"], ElementsAre("Message B", "Message C")); - EXPECT_THAT(directive_handler_.added_directives(), ElementsAre("Subscription 1", "Subscription 2")); } diff --git a/components/copresence/test/stub_whispernet_client.cc b/components/copresence/test/stub_whispernet_client.cc index 955f6b9..ee67f9b 100644 --- a/components/copresence/test/stub_whispernet_client.cc +++ b/components/copresence/test/stub_whispernet_client.cc @@ -9,7 +9,8 @@ namespace copresence { -StubWhispernetClient::StubWhispernetClient() { +StubWhispernetClient::StubWhispernetClient(bool complete_initialization) + : complete_initialization_(complete_initialization) { tokens_.push_back(AudioToken("abcdef", true)); tokens_.push_back(AudioToken("123456", false)); samples_ = CreateRandomAudioRefCounted(0x123, 1, 0x321); @@ -18,6 +19,12 @@ StubWhispernetClient::StubWhispernetClient() { StubWhispernetClient::~StubWhispernetClient() { } +void StubWhispernetClient::Initialize(const SuccessCallback& init_callback) { + // TODO(ckehoe): Consider updating tests to use this. + if (complete_initialization_) + init_callback.Run(true); +} + void StubWhispernetClient::EncodeToken(const std::string& token, AudioType type) { if (!samples_cb_.is_null()) diff --git a/components/copresence/test/stub_whispernet_client.h b/components/copresence/test/stub_whispernet_client.h index f4d3536..836e443 100644 --- a/components/copresence/test/stub_whispernet_client.h +++ b/components/copresence/test/stub_whispernet_client.h @@ -5,34 +5,43 @@ #ifndef COMPONENTS_COPRESENCE_TEST_STUB_WHISPERNET_CLIENT_H_ #define COMPONENTS_COPRESENCE_TEST_STUB_WHISPERNET_CLIENT_H_ -#include "base/callback.h" -#include "base/macros.h" +#include <string> +#include <vector> +#include "base/callback_forward.h" +#include "base/macros.h" #include "components/copresence/public/whispernet_client.h" namespace copresence { -// An empty WhispernetClient for testing. +// A simple WhispernetClient for testing. class StubWhispernetClient final : public WhispernetClient { public: - StubWhispernetClient(); + // Constructor. The client can optionally be configured to respond + // as if Initialize() has completed. By default it does not. + explicit StubWhispernetClient(bool complete_initialization = false); + ~StubWhispernetClient() override; - void Initialize(const SuccessCallback& /* init_cb */) override {} + void Initialize(const SuccessCallback& init_callback) override; void Shutdown() override {} + void EncodeToken(const std::string& token, AudioType type) override; void DecodeSamples(AudioType type, const std::string& samples) override; void DetectBroadcast() override {} + void RegisterTokensCallback(const TokensCallback& tokens_cb) override; void RegisterSamplesCallback(const SamplesCallback& samples_cb) override; void RegisterDetectBroadcastCallback( const SuccessCallback& /* db_cb */) override {} + TokensCallback GetTokensCallback() override; SamplesCallback GetSamplesCallback() override; SuccessCallback GetDetectBroadcastCallback() override; SuccessCallback GetInitializedCallback() override; private: + bool complete_initialization_; TokensCallback tokens_cb_; SamplesCallback samples_cb_; std::vector<AudioToken> tokens_; |