1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import httplib
import json
import logging
import pprint
import time
logger = logging.getLogger('proximity_auth.%s' % __name__)
_GOOGLE_APIS_URL = 'www.googleapis.com'
_REQUEST_PATH = '/cryptauth/v1/%s?alt=JSON'
class CryptAuthClient(object):
""" A client for making blocking CryptAuth API calls. """
def __init__(self, access_token, google_apis_url=_GOOGLE_APIS_URL):
self._access_token = access_token
self._google_apis_url = google_apis_url
def GetMyDevices(self):
""" Invokes the GetMyDevices API.
Returns:
A list of devices or None if the API call fails.
Each device is a dictionary of the deserialized JSON returned by
CryptAuth.
"""
request_data = {
'approvedForUnlockRequired': False,
'allowStaleRead': False,
'invocationReason': 13 # REASON_MANUAL
}
response = self._SendRequest('deviceSync/getmydevices', request_data)
return response['devices'] if response is not None else None
def GetUnlockKey(self):
"""
Returns:
The unlock key registered with CryptAuth if it exists or None.
The device is a dictionary of the deserialized JSON returned by CryptAuth.
"""
devices = self.GetMyDevices()
if devices is None:
return None
for device in devices:
if device['unlockKey']:
return device
return None
def ToggleEasyUnlock(self, enable, public_key=''):
""" Calls the ToggleEasyUnlock API.
Args:
enable: True to designate the device specified by |public_key| as an
unlock key.
public_key: The public key of the device to toggle. Ignored if |enable| is
False, which toggles all unlock keys off.
Returns:
True upon success, else False.
"""
request_data = { 'enable': enable, }
if not enable:
request_data['applyToAll'] = True
else:
request_data['publicKey'] = public_key
response = self._SendRequest('deviceSync/toggleeasyunlock', request_data)
return response is not None
def FindEligibleUnlockDevices(self, time_delta_millis=None):
""" Finds devices eligible to be an unlock key.
Args:
time_delta_millis: If specified, then only return eligible devices that
have contacted CryptAuth in the last time delta.
Returns:
A tuple containing two lists, one of eligible devices and the other of
ineligible devices.
Each device is a dictionary of the deserialized JSON returned by
CryptAuth.
"""
request_data = {}
if time_delta_millis is not None:
request_data['maxLastUpdateTimeDeltaMillis'] = time_delta_millis * 1000;
response = self._SendRequest(
'deviceSync/findeligibleunlockdevices', request_data)
if response is None:
return None
eligibleDevices = (
response['eligibleDevices'] if 'eligibleDevices' in response else [])
ineligibleDevices = (
response['ineligibleDevices'] if (
'ineligibleDevices' in response) else [])
return eligibleDevices, ineligibleDevices
def PingPhones(self, timeout_secs=10):
""" Asks CryptAuth to ping registered phones and determine their status.
Args:
timeout_secs: The number of seconds to wait for phones to respond.
Returns:
A tuple containing two lists, one of eligible devices and the other of
ineligible devices.
Each device is a dictionary of the deserialized JSON returned by
CryptAuth.
"""
response = self._SendRequest(
'deviceSync/senddevicesynctickle',
{ 'tickleType': 'updateEnrollment' })
if response is None:
return None
# We wait for phones to update their status with CryptAuth.
logger.info('Waiting for %s seconds for phone status...' % timeout_secs)
time.sleep(timeout_secs)
return self.FindEligibleUnlockDevices(time_delta_millis=timeout_secs)
def _SendRequest(self, function_path, request_data):
""" Sends an HTTP request to CryptAuth and returns the deserialized
response.
"""
conn = httplib.HTTPSConnection(self._google_apis_url)
path = _REQUEST_PATH % function_path
headers = {
'authorization': 'Bearer ' + self._access_token,
'Content-Type': 'application/json'
}
body = json.dumps(request_data)
logger.info('Making request to %s with body:\n%s' % (
path, pprint.pformat(request_data)))
conn.request('POST', path, body, headers)
response = conn.getresponse()
if response.status == 204:
return {}
if response.status != 200:
logger.warning('Request to %s failed: %s' % (path, response.status))
return None
return json.loads(response.read())
|