1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
|
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/copresence/rpc/rpc_handler.h"
#include "base/bind.h"
#include "base/command_line.h"
#include "base/logging.h"
#include "base/strings/string_util.h"
#include "base/strings/stringprintf.h"
// TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities
// to DeviceCapabilitiesW. This breaks the pb.h headers below. For now,
// we fix this with an #undef.
#include "base/time/time.h"
#if defined(OS_WIN)
#undef DeviceCapabilities
#endif
#include "components/audio_modem/public/audio_modem_types.h"
#include "components/copresence/copresence_state_impl.h"
#include "components/copresence/copresence_switches.h"
#include "components/copresence/handlers/directive_handler.h"
#include "components/copresence/handlers/gcm_handler.h"
#include "components/copresence/proto/codes.pb.h"
#include "components/copresence/proto/data.pb.h"
#include "components/copresence/proto/rpcs.pb.h"
#include "components/copresence/public/copresence_constants.h"
#include "components/copresence/public/copresence_delegate.h"
#include "components/copresence/rpc/http_post.h"
#include "net/http/http_status_code.h"
using google::protobuf::MessageLite;
using audio_modem::AUDIBLE;
using audio_modem::AudioToken;
using audio_modem::INAUDIBLE;
// TODO(ckehoe): Return error messages for bad requests.
namespace copresence {
const char RpcHandler::kReportRequestRpcName[] = "report";
namespace {
const int kTokenLoggingSuffix = 5;
const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes.
const int kMaxInvalidTokens = 10000;
const char kRegisterDeviceRpcName[] = "registerdevice";
const char kDefaultCopresenceServer[] =
"https://www.googleapis.com/copresence/v2/copresence";
// UrlSafe is defined as:
// '/' represented by a '_' and '+' represented by a '-'
// TODO(rkc): Move this to the wrapper.
std::string ToUrlSafe(std::string token) {
base::ReplaceChars(token, "+", "-", &token);
base::ReplaceChars(token, "/", "_", &token);
return token;
}
// Logging
// Checks for a copresence error. If there is one, logs it and returns true.
bool IsErrorStatus(const Status& status) {
if (status.code() != OK) {
LOG(ERROR) << "Copresence error code " << status.code()
<< (status.message().empty() ? "" : ": " + status.message());
}
return status.code() != OK;
}
void LogIfErrorStatus(const util::error::Code& code,
const std::string& context) {
LOG_IF(ERROR, code != util::error::OK)
<< context << " error " << code << ". See "
<< "cs/google3/util/task/codes.proto for more info.";
}
// If any errors occurred, logs them and returns true.
bool ReportErrorLogged(const ReportResponse& response) {
bool result = IsErrorStatus(response.header().status());
// The Report fails or succeeds as a unit. If any responses had errors,
// the header will too. Thus we don't need to propagate individual errors.
if (response.has_update_signals_response())
LogIfErrorStatus(response.update_signals_response().status(), "Update");
if (response.has_manage_messages_response())
LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
if (response.has_manage_subscriptions_response()) {
LogIfErrorStatus(response.manage_subscriptions_response().status(),
"Subscribe");
}
return result;
}
const std::string LoggingStrForToken(const std::string& auth_token) {
if (auth_token.empty())
return "anonymous";
std::string token_suffix = auth_token.substr(
auth_token.length() - kTokenLoggingSuffix, kTokenLoggingSuffix);
return base::StringPrintf("token ...%s", token_suffix.c_str());
}
// Request construction
// TODO(ckehoe): Move these into a separate file?
template <typename T>
BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
if (msg.has_token_exchange_strategy() &&
msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
return msg.token_exchange_strategy().broadcast_scan_configuration();
}
return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
}
scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
scoped_ptr<DeviceState> state(new DeviceState);
TokenTechnology* ultrasound =
state->mutable_capabilities()->add_token_technology();
ultrasound->set_medium(AUDIO_ULTRASOUND_PASSBAND);
ultrasound->add_instruction_type(TRANSMIT);
ultrasound->add_instruction_type(RECEIVE);
TokenTechnology* audible =
state->mutable_capabilities()->add_token_technology();
audible->set_medium(AUDIO_AUDIBLE_DTMF);
audible->add_instruction_type(TRANSMIT);
audible->add_instruction_type(RECEIVE);
return state.Pass();
}
// TODO(ckehoe): We're keeping this code in a separate function for now
// because we get a version string from Chrome, but the proto expects
// an int64 version. We should probably change the version proto
// to handle a more detailed version.
ClientVersion* CreateVersion(const std::string& client,
const std::string& version_name) {
ClientVersion* version = new ClientVersion;
version->set_client(client);
version->set_version_name(version_name);
return version;
}
void AddTokenToRequest(const AudioToken& token, ReportRequest* request) {
TokenObservation* token_observation =
request->mutable_update_signals_request()->add_token_observation();
token_observation->set_token_id(ToUrlSafe(token.token));
TokenSignals* signals = token_observation->add_signals();
signals->set_medium(token.audible ? AUDIO_AUDIBLE_DTMF
: AUDIO_ULTRASOUND_PASSBAND);
signals->set_observed_time_millis(base::Time::Now().ToJsTime());
}
} // namespace
// Public functions.
RpcHandler::RpcHandler(CopresenceDelegate* delegate,
CopresenceStateImpl* state,
DirectiveHandler* directive_handler,
GCMHandler* gcm_handler,
const MessagesCallback& new_messages_callback,
const PostCallback& server_post_callback)
: delegate_(delegate),
state_(state),
directive_handler_(directive_handler),
gcm_handler_(gcm_handler),
new_messages_callback_(new_messages_callback),
server_post_callback_(server_post_callback),
invalid_audio_token_cache_(
base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
kMaxInvalidTokens) {
DCHECK(delegate_);
DCHECK(directive_handler_);
// |gcm_handler_| is optional.
if (server_post_callback_.is_null()) {
server_post_callback_ =
base::Bind(&RpcHandler::SendHttpPost, base::Unretained(this));
}
if (gcm_handler_) {
gcm_handler_->GetGcmId(
base::Bind(&RpcHandler::RegisterGcmId, base::Unretained(this)));
}
}
RpcHandler::~RpcHandler() {
// Do not use |directive_handler_| or |gcm_handler_| here.
// They will already have been destructed.
for (HttpPost* post : pending_posts_)
delete post;
}
void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
const std::string& app_id,
const std::string& auth_token,
const StatusCallback& status_callback) {
DCHECK(request.get());
// Check that the app, if any, has some kind of authentication token.
// Don't allow it to piggyback on Chrome's credentials.
if (!app_id.empty() && delegate_->GetAPIKey(app_id).empty() &&
auth_token.empty()) {
LOG(ERROR) << "App " << app_id << " has no API key or auth token";
status_callback.Run(FAIL);
return;
}
// Store just one auth token since we should have only one account
// per instance of the copresence component.
// TODO(ckehoe): We may eventually need to support multiple auth tokens.
const bool authenticated = !auth_token.empty();
if (authenticated && auth_token != auth_token_) {
LOG_IF(ERROR, !auth_token_.empty())
<< "Overwriting old auth token: " << LoggingStrForToken(auth_token);
auth_token_ = auth_token;
}
// Check that we have a "device" registered for this authentication state.
bool queue_request;
const std::string device_id = delegate_->GetDeviceId(authenticated);
if (device_id.empty()) {
queue_request = true;
if (pending_registrations_.count(authenticated) == 0)
RegisterDevice(authenticated);
// else, registration is already in progress.
} else {
queue_request = false;
}
// We're not registered, or registration is in progress.
if (queue_request) {
pending_requests_queue_.push_back(new PendingRequest(
request.Pass(), app_id, authenticated, status_callback));
return;
}
DVLOG(3) << "Sending ReportRequest to server.";
// If we are unpublishing or unsubscribing, we need to stop those publish or
// subscribes right away, we don't need to wait for the server to tell us.
ProcessRemovedOperations(*request);
request->mutable_update_signals_request()->set_allocated_state(
GetDeviceCapabilities(*request).release());
AddPlayingTokens(request.get());
SendServerRequest(kReportRequestRpcName,
device_id,
app_id,
authenticated,
request.Pass(),
// On destruction, this request will be cancelled.
base::Bind(&RpcHandler::ReportResponseHandler,
base::Unretained(this),
status_callback));
}
void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
DCHECK(!tokens.empty());
scoped_ptr<ReportRequest> request(new ReportRequest);
for (const AudioToken& token : tokens) {
if (invalid_audio_token_cache_.HasKey(ToUrlSafe(token.token)))
continue;
DVLOG(3) << "Sending token " << token.token << " to server";
AddTokenToRequest(token, request.get());
}
ReportOnAllDevices(request.Pass());
}
// Private functions.
RpcHandler::PendingRequest::PendingRequest(scoped_ptr<ReportRequest> report,
const std::string& app_id,
bool authenticated,
const StatusCallback& callback)
: report(report.Pass()),
app_id(app_id),
authenticated(authenticated),
callback(callback) {}
RpcHandler::PendingRequest::~PendingRequest() {}
void RpcHandler::RegisterDevice(bool authenticated) {
DVLOG(2) << "Sending " << (authenticated ? "authenticated" : "anonymous")
<< " registration to server.";
scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
// Add a GCM ID for authenticated registration, if we have one.
if (!authenticated || gcm_id_.empty()) {
request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
} else {
DVLOG(2) << "Registering GCM ID with " << LoggingStrForToken(auth_token_);
request->mutable_push_service()->set_service(GCM);
request->mutable_push_service()->mutable_gcm_registration()
->set_device_token(gcm_id_);
}
// Only identify as a Chrome device if we're in anonymous mode.
// Authenticated calls come from a "GAIA device".
if (!authenticated) {
// Make sure this isn't a duplicate anonymous registration.
// Duplicate authenticated registrations are allowed, to update the GCM ID.
DCHECK(delegate_->GetDeviceId(false).empty())
<< "Attempted anonymous re-registration";
Identity* identity =
request->mutable_device_identifiers()->mutable_registrant();
identity->set_type(CHROME);
}
bool gcm_pending = authenticated && gcm_handler_ && gcm_id_.empty();
pending_registrations_.insert(authenticated);
SendServerRequest(
kRegisterDeviceRpcName,
// The device is empty on first registration.
// When re-registering to pass on the GCM ID, it will be present.
delegate_->GetDeviceId(authenticated),
std::string(), // app ID
authenticated,
request.Pass(),
base::Bind(&RpcHandler::RegisterResponseHandler,
// On destruction, this request will be cancelled.
base::Unretained(this),
authenticated,
gcm_pending));
}
void RpcHandler::ProcessQueuedRequests(bool authenticated) {
// Track requests that are not in this auth state.
ScopedVector<PendingRequest> still_pending_requests;
// If there is no device ID for this auth state, registration failed.
bool registration_failed = delegate_->GetDeviceId(authenticated).empty();
// We momentarily take ownership of all the pointers in the queue.
// They are either deleted here or passed on to a new queue.
for (PendingRequest* request : pending_requests_queue_) {
if (request->authenticated == authenticated) {
if (registration_failed) {
request->callback.Run(FAIL);
} else {
if (request->authenticated)
DCHECK(!auth_token_.empty());
SendReportRequest(request->report.Pass(),
request->app_id,
request->authenticated ? auth_token_ : std::string(),
request->callback);
}
delete request;
} else {
// The request is in a different auth state.
still_pending_requests.push_back(request);
}
}
// Only keep the requests that weren't processed.
// All the pointers in the queue are now spoken for.
pending_requests_queue_.weak_clear();
pending_requests_queue_ = still_pending_requests.Pass();
}
void RpcHandler::ReportOnAllDevices(scoped_ptr<ReportRequest> request) {
std::vector<bool> auth_states;
if (!auth_token_.empty() && !delegate_->GetDeviceId(true).empty())
auth_states.push_back(true);
if (!delegate_->GetDeviceId(false).empty())
auth_states.push_back(false);
if (auth_states.empty()) {
VLOG(2) << "Skipping reporting because no device IDs are registered";
return;
}
for (bool authenticated : auth_states) {
SendReportRequest(make_scoped_ptr(new ReportRequest(*request)),
std::string(),
authenticated ? auth_token_ : std::string(),
StatusCallback());
}
}
// Store a GCM ID and send it to the server if needed. The constructor passes
// this callback to the GCMHandler to receive the ID whenever it's ready.
// It may be returned immediately, if the ID is cached, or require a server
// round-trip. This ID must then be passed along to the copresence server.
// There are a few ways this can happen:
//
// 1. The GCM ID is available when we first register, and is passed along
// with the RegisterDeviceRequest.
//
// 2. The GCM ID becomes available after the RegisterDeviceRequest has
// completed. Then this function will invoke RegisterDevice()
// again to pass on the ID.
//
// 3. The GCM ID becomes available after the RegisterDeviceRequest is sent,
// but before it completes. In this case, the gcm_pending flag is passed
// through to the RegisterResponseHandler, which invokes RegisterDevice()
// again to pass on the ID. This function must skip pending registrations,
// as the device ID will be empty.
//
// TODO(ckehoe): Add tests for these scenarios.
void RpcHandler::RegisterGcmId(const std::string& gcm_id) {
gcm_id_ = gcm_id;
if (!gcm_id.empty()) {
const std::string& device_id = delegate_->GetDeviceId(true);
if (!auth_token_.empty() && !device_id.empty())
RegisterDevice(true);
}
}
void RpcHandler::RegisterResponseHandler(
bool authenticated,
bool gcm_pending,
HttpPost* completed_post,
int http_status_code,
const std::string& response_data) {
if (completed_post) {
int elements_erased = pending_posts_.erase(completed_post);
DCHECK_GT(elements_erased, 0);
delete completed_post;
}
int registrations_completed = pending_registrations_.erase(authenticated);
DCHECK_GT(registrations_completed, 0);
RegisterDeviceResponse response;
const std::string token_str =
LoggingStrForToken(authenticated ? auth_token_ : std::string());
if (http_status_code != net::HTTP_OK) {
// TODO(ckehoe): Retry registration if appropriate.
LOG(ERROR) << token_str << " device registration failed";
} else if (!response.ParseFromString(response_data)) {
LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
} else if (!IsErrorStatus(response.header().status())) {
const std::string& device_id = response.registered_device_id();
DCHECK(!device_id.empty());
delegate_->SaveDeviceId(authenticated, device_id);
DVLOG(2) << token_str << " device registration successful. Id: "
<< device_id;
// If we have a GCM ID now, and didn't before, pass it on to the server.
if (gcm_pending && !gcm_id_.empty())
RegisterDevice(authenticated);
}
// Send or fail requests on this auth token.
ProcessQueuedRequests(authenticated);
}
void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
HttpPost* completed_post,
int http_status_code,
const std::string& response_data) {
if (completed_post) {
int elements_erased = pending_posts_.erase(completed_post);
DCHECK(elements_erased);
delete completed_post;
}
if (http_status_code != net::HTTP_OK) {
if (!status_callback.is_null())
status_callback.Run(FAIL);
return;
}
DVLOG(3) << "Received ReportResponse.";
ReportResponse response;
if (!response.ParseFromString(response_data)) {
LOG(ERROR) << "Invalid ReportResponse";
if (!status_callback.is_null())
status_callback.Run(FAIL);
return;
}
if (ReportErrorLogged(response)) {
if (!status_callback.is_null())
status_callback.Run(FAIL);
return;
}
for (const MessageResult& result :
response.manage_messages_response().published_message_result()) {
DVLOG(2) << "Published message with id " << result.published_message_id();
}
for (const SubscriptionResult& result :
response.manage_subscriptions_response().subscription_result()) {
DVLOG(2) << "Created subscription with id " << result.subscription_id();
}
if (response.has_update_signals_response()) {
const UpdateSignalsResponse& update_response =
response.update_signals_response();
new_messages_callback_.Run(update_response.message());
for (const Directive& directive : update_response.directive())
directive_handler_->AddDirective(directive);
for (const Token& token : update_response.token()) {
state_->UpdateTokenStatus(token.id(), token.status());
switch (token.status()) {
case VALID:
// TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
// short TTL (like 10s) and send it up with every report request.
// Then we'll still get messages while we're waiting to hear it again.
VLOG(1) << "Got valid token " << token.id();
break;
case INVALID:
DVLOG(3) << "Discarding invalid token " << token.id();
invalid_audio_token_cache_.Add(token.id(), true);
break;
default:
DVLOG(2) << "Token " << token.id() << " has status code "
<< token.status();
}
}
}
// TODO(ckehoe): Return a more detailed status response.
if (!status_callback.is_null())
status_callback.Run(SUCCESS);
}
void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
// Remove unpublishes.
if (request.has_manage_messages_request()) {
for (const std::string& unpublish :
request.manage_messages_request().id_to_unpublish()) {
directive_handler_->RemoveDirectives(unpublish);
}
}
// Remove unsubscribes.
if (request.has_manage_subscriptions_request()) {
for (const std::string& unsubscribe :
request.manage_subscriptions_request().id_to_unsubscribe()) {
directive_handler_->RemoveDirectives(unsubscribe);
}
}
}
void RpcHandler::AddPlayingTokens(ReportRequest* request) {
const std::string& audible_token =
directive_handler_->GetCurrentAudioToken(AUDIBLE);
const std::string& inaudible_token =
directive_handler_->GetCurrentAudioToken(INAUDIBLE);
if (!audible_token.empty())
AddTokenToRequest(AudioToken(audible_token, true), request);
if (!inaudible_token.empty())
AddTokenToRequest(AudioToken(inaudible_token, false), request);
}
// TODO(ckehoe): Pass in the version string and
// group this with the local functions up top.
RequestHeader* RpcHandler::CreateRequestHeader(
const std::string& app_id,
const std::string& device_id) const {
RequestHeader* header = new RequestHeader;
header->set_allocated_framework_version(CreateVersion(
"Chrome", delegate_->GetPlatformVersionString()));
if (!app_id.empty())
header->set_allocated_client_version(CreateVersion(app_id, std::string()));
header->set_current_time_millis(base::Time::Now().ToJsTime());
if (!device_id.empty())
header->set_registered_device_id(device_id);
DeviceFingerprint* fingerprint = new DeviceFingerprint;
fingerprint->set_platform_version(delegate_->GetPlatformVersionString());
fingerprint->set_type(CHROME_PLATFORM_TYPE);
header->set_allocated_device_fingerprint(fingerprint);
return header;
}
template <class T>
void RpcHandler::SendServerRequest(
const std::string& rpc_name,
const std::string& device_id,
const std::string& app_id,
bool authenticated,
scoped_ptr<T> request,
const PostCleanupCallback& response_handler) {
request->set_allocated_header(CreateRequestHeader(app_id, device_id));
if (authenticated)
DCHECK(!auth_token_.empty());
server_post_callback_.Run(delegate_->GetRequestContext(),
rpc_name,
delegate_->GetAPIKey(app_id),
authenticated ? auth_token_ : std::string(),
make_scoped_ptr<MessageLite>(request.release()),
response_handler);
}
void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
const std::string& rpc_name,
const std::string& api_key,
const std::string& auth_token,
scoped_ptr<MessageLite> request_proto,
const PostCleanupCallback& callback) {
// Create the base URL to call.
base::CommandLine* command_line = base::CommandLine::ForCurrentProcess();
const std::string copresence_server_host =
command_line->HasSwitch(switches::kCopresenceServer) ?
command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
kDefaultCopresenceServer;
// Create the request and keep a pointer until it completes.
HttpPost* http_post = new HttpPost(
url_context_getter,
copresence_server_host,
rpc_name,
api_key,
auth_token,
command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken),
*request_proto);
http_post->Start(base::Bind(callback, http_post));
pending_posts_.insert(http_post);
}
} // namespace copresence
|