summaryrefslogtreecommitdiffstats
path: root/components/proximity_auth/e2e_test/cryptauth.py
blob: 7373fa9fb8f2f6e498826766b3ee00b70af286b5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import httplib
import json
import logging
import pprint
import time

logger = logging.getLogger('proximity_auth.%s' % __name__)

_GOOGLE_APIS_URL = 'www.googleapis.com'
_REQUEST_PATH = '/cryptauth/v1/%s?alt=JSON'

class CryptAuthClient(object):
  """ A client for making blocking CryptAuth API calls. """

  def __init__(self, access_token, google_apis_url=_GOOGLE_APIS_URL):
    self._access_token = access_token
    self._google_apis_url = google_apis_url

  def GetMyDevices(self):
    """ Invokes the GetMyDevices API.

    Returns:
      A list of devices or None if the API call fails.
      Each device is a dictionary of the deserialized JSON returned by
      CryptAuth.
    """
    request_data = {
      'approvedForUnlockRequired': False,
      'allowStaleRead': False,
      'invocationReason': 13 # REASON_MANUAL
    }
    response = self._SendRequest('deviceSync/getmydevices', request_data)
    return response['devices'] if response is not None else None

  def GetUnlockKey(self):
    """
    Returns:
      The unlock key registered with CryptAuth if it exists or None.
      The device is a dictionary of the deserialized JSON returned by CryptAuth.
    """
    devices = self.GetMyDevices()
    if devices is None:
      return None

    for device in devices:
      if device['unlockKey']:
        return device
    return None

  def ToggleEasyUnlock(self, enable, public_key=''):
    """ Calls the ToggleEasyUnlock API.
    Args:
      enable: True to designate the device specified by |public_key| as an
          unlock key.
      public_key: The public key of the device to toggle. Ignored if |enable| is
          False, which toggles all unlock keys off.
    Returns:
      True upon success, else False.
    """
    request_data = { 'enable': enable, }
    if not enable:
      request_data['applyToAll'] = True
    else:
      request_data['publicKey'] = public_key
    response = self._SendRequest('deviceSync/toggleeasyunlock', request_data)
    return response is not None

  def FindEligibleUnlockDevices(self, time_delta_millis=None):
    """ Finds devices eligible to be an unlock key.
    Args:
      time_delta_millis: If specified, then only return eligible devices that
          have contacted CryptAuth in the last time delta.
    Returns:
      A tuple containing two lists, one of eligible devices and the other of
      ineligible devices.
      Each device is a dictionary of the deserialized JSON returned by
      CryptAuth.
    """
    request_data = {}
    if time_delta_millis is not None:
      request_data['maxLastUpdateTimeDeltaMillis'] = time_delta_millis * 1000;

    response = self._SendRequest(
        'deviceSync/findeligibleunlockdevices', request_data)
    if response is None:
      return None
    eligibleDevices = (
        response['eligibleDevices'] if 'eligibleDevices' in response else [])
    ineligibleDevices = (
        response['ineligibleDevices'] if (
            'ineligibleDevices' in response) else [])
    return eligibleDevices, ineligibleDevices

  def PingPhones(self, timeout_secs=10):
    """ Asks CryptAuth to ping registered phones and determine their status.
    Args:
      timeout_secs: The number of seconds to wait for phones to respond.
    Returns:
      A tuple containing two lists, one of eligible devices and the other of
      ineligible devices.
      Each device is a dictionary of the deserialized JSON returned by
      CryptAuth.
    """
    response = self._SendRequest(
        'deviceSync/senddevicesynctickle',
        { 'tickleType': 'updateEnrollment' })
    if response is None:
      return None
    # We wait for phones to update their status with CryptAuth.
    logger.info('Waiting for %s seconds for phone status...' % timeout_secs)
    time.sleep(timeout_secs)
    return self.FindEligibleUnlockDevices(time_delta_millis=timeout_secs)

  def _SendRequest(self, function_path, request_data):
    """ Sends an HTTP request to CryptAuth and returns the deserialized
        response.
    """
    conn = httplib.HTTPSConnection(self._google_apis_url)
    path = _REQUEST_PATH % function_path

    headers = {
      'authorization': 'Bearer ' + self._access_token,
      'Content-Type': 'application/json'
    }
    body = json.dumps(request_data)
    logger.info('Making request to %s with body:\n%s' % (
                path, pprint.pformat(request_data)))
    conn.request('POST', path, body, headers)

    response = conn.getresponse()
    if response.status == 204:
      return {}
    if response.status != 200:
      logger.warning('Request to %s failed: %s' % (path, response.status))
      return None
    return json.loads(response.read())