diff options
author | mnissler@chromium.org <mnissler@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-02-19 05:13:50 +0000 |
---|---|---|
committer | mnissler@chromium.org <mnissler@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-02-19 05:13:50 +0000 |
commit | 9238644f427502492f2f70ff275d4e8d97a28a57 (patch) | |
tree | 9c4232b792da0c673998e40fa66b2d76c898a42e /net/tools | |
parent | 313288e9bb0ca09e4a8c49c4afc478611e95e2d1 (diff) | |
download | chromium_src-9238644f427502492f2f70ff275d4e8d97a28a57.zip chromium_src-9238644f427502492f2f70ff275d4e8d97a28a57.tar.gz chromium_src-9238644f427502492f2f70ff275d4e8d97a28a57.tar.bz2 |
Split out policy code from net/tools/testserver.
This moves the policy-specific code to chrome/browser/policy/cloud/test
and adds LocalPolicyTestServer which simplifies policy test server use
in unit tests.
BUG=chromium:119403
TEST=unit tests
TBR=ben@chromium.org
Review URL: https://chromiumcodereview.appspot.com/12235003
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@183159 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/tools')
-rw-r--r-- | net/tools/testserver/OWNERS | 5 | ||||
-rw-r--r-- | net/tools/testserver/asn1der.py | 60 | ||||
-rw-r--r-- | net/tools/testserver/device_management.py | 686 | ||||
-rwxr-xr-x | net/tools/testserver/testserver.py | 39 |
4 files changed, 0 insertions, 790 deletions
diff --git a/net/tools/testserver/OWNERS b/net/tools/testserver/OWNERS index e9e43d6..78ad9aa 100644 --- a/net/tools/testserver/OWNERS +++ b/net/tools/testserver/OWNERS @@ -1,8 +1,3 @@ # net/ OWNERS can also review testserver changes # General reviewer. phajdan.jr@chromium.org - -# For changes to device_management.py. -joaodasilva@chromium.org -mnissler@chromium.org -pastarmovj@chromium.org diff --git a/net/tools/testserver/asn1der.py b/net/tools/testserver/asn1der.py deleted file mode 100644 index 9e70893..0000000 --- a/net/tools/testserver/asn1der.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright (c) 2011 The Chromium Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Helper module for ASN.1/DER encoding.""" - -import binascii -import struct - -# Tags as defined by ASN.1. -INTEGER = 2 -BIT_STRING = 3 -NULL = 5 -OBJECT_IDENTIFIER = 6 -SEQUENCE = 0x30 - -def Data(tag, data): - """Generic type-length-value encoder. - - Args: - tag: the tag. - data: the data for the given tag. - Returns: - encoded TLV value. - """ - if len(data) == 0: - return struct.pack(">BB", tag, 0); - assert len(data) <= 0xffff; - return struct.pack(">BBH", tag, 0x82, len(data)) + data; - -def Integer(value): - """Encodes an integer. - - Args: - value: the long value. - Returns: - encoded TLV value. - """ - data = '%x' % value - return Data(INTEGER, binascii.unhexlify('00' + '0' * (len(data) % 2) + data)) - -def Bitstring(value): - """Encodes a bit string. - - Args: - value: a string holding the binary data. - Returns: - encoded TLV value. - """ - return Data(BIT_STRING, '\x00' + value) - -def Sequence(values): - """Encodes a sequence of other values. - - Args: - values: the list of values, must be strings holding already encoded data. - Returns: - encoded TLV value. - """ - return Data(SEQUENCE, ''.join(values)) diff --git a/net/tools/testserver/device_management.py b/net/tools/testserver/device_management.py deleted file mode 100644 index 93d5b00..0000000 --- a/net/tools/testserver/device_management.py +++ /dev/null @@ -1,686 +0,0 @@ -# Copyright (c) 2012 The Chromium Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""A bare-bones test server for testing cloud policy support. - -This implements a simple cloud policy test server that can be used to test -chrome's device management service client. The policy information is read from -the file named device_management in the server's data directory. It contains -enforced and recommended policies for the device and user scope, and a list -of managed users. - -The format of the file is JSON. The root dictionary contains a list under the -key "managed_users". It contains auth tokens for which the server will claim -that the user is managed. The token string "*" indicates that all users are -claimed to be managed. Other keys in the root dictionary identify request -scopes. The user-request scope is described by a dictionary that holds two -sub-dictionaries: "mandatory" and "recommended". Both these hold the policy -definitions as key/value stores, their format is identical to what the Linux -implementation reads from /etc. -The device-scope holds the policy-definition directly as key/value stores in the -protobuf-format. - -Example: - -{ - "google/chromeos/device" : { - "guest_mode_enabled" : false - }, - "google/chromeos/user" : { - "mandatory" : { - "HomepageLocation" : "http://www.chromium.org", - "IncognitoEnabled" : false - }, - "recommended" : { - "JavascriptEnabled": false - } - }, - "google/chromeos/publicaccount/user@example.com" : { - "mandatory" : { - "HomepageLocation" : "http://www.chromium.org" - }, - "recommended" : { - } - }, - "managed_users" : [ - "secret123456" - ], - "current_key_index": 0 -} - -""" - -import cgi -import hashlib -import logging -import os -import random -import re -import sys -import time -import tlslite -import tlslite.api -import tlslite.utils - -# The name and availability of the json module varies in python versions. -try: - import simplejson as json -except ImportError: - try: - import json - except ImportError: - json = None - -import asn1der -import device_management_backend_pb2 as dm -import cloud_policy_pb2 as cp -import chrome_device_policy_pb2 as dp - -# ASN.1 object identifier for PKCS#1/RSA. -PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01' - -# SHA256 sum of "0". -SHA256_0 = hashlib.sha256('0').digest() - -# List of bad machine identifiers that trigger the |valid_serial_number_missing| -# flag to be set set in the policy fetch response. -BAD_MACHINE_IDS = [ '123490EN400015' ]; - -# List of machines that trigger the server to send kiosk enrollment response -# for the register request. -KIOSK_MACHINE_IDS = [ 'KIOSK' ]; - -class RequestHandler(object): - """Decodes and handles device management requests from clients. - - The handler implements all the request parsing and protobuf message decoding - and encoding. It calls back into the server to lookup, register, and - unregister clients. - """ - - def __init__(self, server, path, headers, request): - """Initialize the handler. - - Args: - server: The TestServer object to use for (un)registering clients. - path: A string containing the request path and query parameters. - headers: A rfc822.Message-like object containing HTTP headers. - request: The request data received from the client as a string. - """ - self._server = server - self._path = path - self._headers = headers - self._request = request - self._params = None - - def GetUniqueParam(self, name): - """Extracts a unique query parameter from the request. - - Args: - name: Names the parameter to fetch. - Returns: - The parameter value or None if the parameter doesn't exist or is not - unique. - """ - if not self._params: - self._params = cgi.parse_qs(self._path[self._path.find('?') + 1:]) - - param_list = self._params.get(name, []) - if len(param_list) == 1: - return param_list[0] - return None; - - def HandleRequest(self): - """Handles a request. - - Parses the data supplied at construction time and returns a pair indicating - http status code and response data to be sent back to the client. - - Returns: - A tuple of HTTP status code and response data to send to the client. - """ - rmsg = dm.DeviceManagementRequest() - rmsg.ParseFromString(self._request) - - logging.debug('gaia auth token -> ' + - self._headers.getheader('Authorization', '')) - logging.debug('oauth token -> ' + str(self.GetUniqueParam('oauth_token'))) - logging.debug('deviceid -> ' + str(self.GetUniqueParam('deviceid'))) - self.DumpMessage('Request', rmsg) - - request_type = self.GetUniqueParam('request') - # Check server side requirements, as defined in - # device_management_backend.proto. - if (self.GetUniqueParam('devicetype') != '2' or - self.GetUniqueParam('apptype') != 'Chrome' or - (request_type != 'ping' and - len(self.GetUniqueParam('deviceid')) >= 64) or - len(self.GetUniqueParam('agent')) >= 64): - return (400, 'Invalid request parameter') - if request_type == 'register': - return self.ProcessRegister(rmsg.register_request) - elif request_type == 'unregister': - return self.ProcessUnregister(rmsg.unregister_request) - elif request_type == 'policy' or request_type == 'ping': - return self.ProcessPolicy(rmsg.policy_request, request_type) - elif request_type == 'enterprise_check': - return self.ProcessAutoEnrollment(rmsg.auto_enrollment_request) - else: - return (400, 'Invalid request parameter') - - def CheckGoogleLogin(self): - """Extracts the auth token from the request and returns it. The token may - either be a GoogleLogin token from an Authorization header, or an OAuth V2 - token from the oauth_token query parameter. Returns None if no token is - present. - """ - oauth_token = self.GetUniqueParam('oauth_token') - if oauth_token: - return oauth_token - - match = re.match('GoogleLogin auth=(\\w+)', - self._headers.getheader('Authorization', '')) - if match: - return match.group(1) - - return None - - def ProcessRegister(self, msg): - """Handles a register request. - - Checks the query for authorization and device identifier, registers the - device with the server and constructs a response. - - Args: - msg: The DeviceRegisterRequest message received from the client. - - Returns: - A tuple of HTTP status code and response data to send to the client. - """ - # Check the auth token and device ID. - auth = self.CheckGoogleLogin() - if not auth: - return (403, 'No authorization') - - policy = self._server.GetPolicies() - if ('*' not in policy['managed_users'] and - auth not in policy['managed_users']): - return (403, 'Unmanaged') - - device_id = self.GetUniqueParam('deviceid') - if not device_id: - return (400, 'Missing device identifier') - - token_info = self._server.RegisterDevice(device_id, - msg.machine_id, - msg.type) - - # Send back the reply. - response = dm.DeviceManagementResponse() - response.register_response.device_management_token = ( - token_info['device_token']) - response.register_response.machine_name = token_info['machine_name'] - response.register_response.enrollment_type = token_info['enrollment_mode'] - - self.DumpMessage('Response', response) - - return (200, response.SerializeToString()) - - def ProcessUnregister(self, msg): - """Handles a register request. - - Checks for authorization, unregisters the device and constructs the - response. - - Args: - msg: The DeviceUnregisterRequest message received from the client. - - Returns: - A tuple of HTTP status code and response data to send to the client. - """ - # Check the management token. - token, response = self.CheckToken(); - if not token: - return response - - # Unregister the device. - self._server.UnregisterDevice(token['device_token']); - - # Prepare and send the response. - response = dm.DeviceManagementResponse() - response.unregister_response.CopyFrom(dm.DeviceUnregisterResponse()) - - self.DumpMessage('Response', response) - - return (200, response.SerializeToString()) - - def ProcessPolicy(self, msg, request_type): - """Handles a policy request. - - Checks for authorization, encodes the policy into protobuf representation - and constructs the response. - - Args: - msg: The DevicePolicyRequest message received from the client. - - Returns: - A tuple of HTTP status code and response data to send to the client. - """ - for request in msg.request: - if (request.policy_type in - ('google/chrome/user', - 'google/chromeos/user', - 'google/chromeos/device', - 'google/chromeos/publicaccount')): - if request_type != 'policy': - return (400, 'Invalid request type') - else: - return self.ProcessCloudPolicy(request) - else: - return (400, 'Invalid policy_type') - - def ProcessAutoEnrollment(self, msg): - """Handles an auto-enrollment check request. - - The reply depends on the value of the modulus: - 1: replies with no new modulus and the sha256 hash of "0" - 2: replies with a new modulus, 4. - 4: replies with a new modulus, 2. - 8: fails with error 400. - 16: replies with a new modulus, 16. - 32: replies with a new modulus, 1. - anything else: replies with no new modulus and an empty list of hashes - - These allow the client to pick the testing scenario its wants to simulate. - - Args: - msg: The DeviceAutoEnrollmentRequest message received from the client. - - Returns: - A tuple of HTTP status code and response data to send to the client. - """ - auto_enrollment_response = dm.DeviceAutoEnrollmentResponse() - - if msg.modulus == 1: - auto_enrollment_response.hash.append(SHA256_0) - elif msg.modulus == 2: - auto_enrollment_response.expected_modulus = 4 - elif msg.modulus == 4: - auto_enrollment_response.expected_modulus = 2 - elif msg.modulus == 8: - return (400, 'Server error') - elif msg.modulus == 16: - auto_enrollment_response.expected_modulus = 16 - elif msg.modulus == 32: - auto_enrollment_response.expected_modulus = 1 - - response = dm.DeviceManagementResponse() - response.auto_enrollment_response.CopyFrom(auto_enrollment_response) - return (200, response.SerializeToString()) - - def SetProtobufMessageField(self, group_message, field, field_value): - '''Sets a field in a protobuf message. - - Args: - group_message: The protobuf message. - field: The field of the message to set, it should be a member of - group_message.DESCRIPTOR.fields. - field_value: The value to set. - ''' - if field.label == field.LABEL_REPEATED: - assert type(field_value) == list - entries = group_message.__getattribute__(field.name) - if field.message_type is None: - for list_item in field_value: - entries.append(list_item) - else: - # This field is itself a protobuf. - sub_type = field.message_type - for sub_value in field_value: - assert type(sub_value) == dict - # Add a new sub-protobuf per list entry. - sub_message = entries.add() - # Now iterate over its fields and recursively add them. - for sub_field in sub_message.DESCRIPTOR.fields: - if sub_field.name in sub_value: - value = sub_value[sub_field.name] - self.SetProtobufMessageField(sub_message, sub_field, value) - return - elif field.type == field.TYPE_BOOL: - assert type(field_value) == bool - elif field.type == field.TYPE_STRING: - assert type(field_value) == str or type(field_value) == unicode - elif field.type == field.TYPE_INT64: - assert type(field_value) == int - elif (field.type == field.TYPE_MESSAGE and - field.message_type.name == 'StringList'): - assert type(field_value) == list - entries = group_message.__getattribute__(field.name).entries - for list_item in field_value: - entries.append(list_item) - return - else: - raise Exception('Unknown field type %s' % field.type) - group_message.__setattr__(field.name, field_value) - - def GatherDevicePolicySettings(self, settings, policies): - '''Copies all the policies from a dictionary into a protobuf of type - CloudDeviceSettingsProto. - - Args: - settings: The destination ChromeDeviceSettingsProto protobuf. - policies: The source dictionary containing policies in JSON format. - ''' - for group in settings.DESCRIPTOR.fields: - # Create protobuf message for group. - group_message = eval('dp.' + group.message_type.name + '()') - # Indicates if at least one field was set in |group_message|. - got_fields = False - # Iterate over fields of the message and feed them from the - # policy config file. - for field in group_message.DESCRIPTOR.fields: - field_value = None - if field.name in policies: - got_fields = True - field_value = policies[field.name] - self.SetProtobufMessageField(group_message, field, field_value) - if got_fields: - settings.__getattribute__(group.name).CopyFrom(group_message) - - def GatherUserPolicySettings(self, settings, policies): - '''Copies all the policies from a dictionary into a protobuf of type - CloudPolicySettings. - - Args: - settings: The destination: a CloudPolicySettings protobuf. - policies: The source: a dictionary containing policies under keys - 'recommended' and 'mandatory'. - ''' - for field in settings.DESCRIPTOR.fields: - # |field| is the entry for a specific policy in the top-level - # CloudPolicySettings proto. - - # Look for this policy's value in the mandatory or recommended dicts. - if field.name in policies.get('mandatory', {}): - mode = cp.PolicyOptions.MANDATORY - value = policies['mandatory'][field.name] - elif field.name in policies.get('recommended', {}): - mode = cp.PolicyOptions.RECOMMENDED - value = policies['recommended'][field.name] - else: - continue - - # Create protobuf message for this policy. - policy_message = eval('cp.' + field.message_type.name + '()') - policy_message.policy_options.mode = mode - field_descriptor = policy_message.DESCRIPTOR.fields_by_name['value'] - self.SetProtobufMessageField(policy_message, field_descriptor, value) - settings.__getattribute__(field.name).CopyFrom(policy_message) - - def ProcessCloudPolicy(self, msg): - """Handles a cloud policy request. (New protocol for policy requests.) - - Checks for authorization, encodes the policy into protobuf representation, - signs it and constructs the repsonse. - - Args: - msg: The CloudPolicyRequest message received from the client. - - Returns: - A tuple of HTTP status code and response data to send to the client. - """ - - token_info, error = self.CheckToken() - if not token_info: - return error - - if msg.machine_id: - self._server.UpdateMachineId(token_info['device_token'], msg.machine_id) - - # Response is only given if the scope is specified in the config file. - # Normally 'google/chromeos/device', 'google/chromeos/user' and - # 'google/chromeos/publicaccount' should be accepted. - policy = self._server.GetPolicies() - policy_value = '' - policy_key = msg.policy_type - if msg.settings_entity_id: - policy_key += '/' + msg.settings_entity_id - if msg.policy_type in token_info['allowed_policy_types']: - if (msg.policy_type == 'google/chromeos/user' or - msg.policy_type == 'google/chrome/user' or - msg.policy_type == 'google/chromeos/publicaccount'): - settings = cp.CloudPolicySettings() - self.GatherUserPolicySettings(settings, policy.get(policy_key, {})) - elif msg.policy_type == 'google/chromeos/device': - settings = dp.ChromeDeviceSettingsProto() - self.GatherDevicePolicySettings(settings, policy.get(policy_key, {})) - - # Sign with 'current_key_index', defaulting to key 0. - signing_key = None - req_key = None - current_key_index = policy.get('current_key_index', 0) - nkeys = len(self._server.keys) - if (msg.signature_type == dm.PolicyFetchRequest.SHA1_RSA and - current_key_index in range(nkeys)): - signing_key = self._server.keys[current_key_index] - if msg.public_key_version in range(1, nkeys + 1): - # requested key exists, use for signing and rotate. - req_key = self._server.keys[msg.public_key_version - 1]['private_key'] - - # Fill the policy data protobuf. - policy_data = dm.PolicyData() - policy_data.policy_type = msg.policy_type - policy_data.timestamp = int(time.time() * 1000) - policy_data.request_token = token_info['device_token'] - policy_data.policy_value = settings.SerializeToString() - policy_data.machine_name = token_info['machine_name'] - policy_data.valid_serial_number_missing = ( - token_info['machine_id'] in BAD_MACHINE_IDS) - policy_data.settings_entity_id = msg.settings_entity_id - - if signing_key: - policy_data.public_key_version = current_key_index + 1 - if msg.policy_type == 'google/chromeos/publicaccount': - policy_data.username = msg.settings_entity_id - else: - # For regular user/device policy, there is no way for the testserver to - # know the user name belonging to the GAIA auth token we received (short - # of actually talking to GAIA). To address this, we read the username from - # the policy configuration dictionary, or use a default. - policy_data.username = policy.get('policy_user', 'user@example.com') - policy_data.device_id = token_info['device_id'] - signed_data = policy_data.SerializeToString() - - response = dm.DeviceManagementResponse() - fetch_response = response.policy_response.response.add() - fetch_response.policy_data = signed_data - if signing_key: - fetch_response.policy_data_signature = ( - signing_key['private_key'].hashAndSign(signed_data).tostring()) - if msg.public_key_version != current_key_index + 1: - fetch_response.new_public_key = signing_key['public_key'] - if req_key: - fetch_response.new_public_key_signature = ( - req_key.hashAndSign(fetch_response.new_public_key).tostring()) - - self.DumpMessage('Response', response) - - return (200, response.SerializeToString()) - - def CheckToken(self): - """Helper for checking whether the client supplied a valid DM token. - - Extracts the token from the request and passed to the server in order to - look up the client. - - Returns: - A pair of token information record and error response. If the first - element is None, then the second contains an error code to send back to - the client. Otherwise the first element is the same structure that is - returned by LookupToken(). - """ - error = 500 - dmtoken = None - request_device_id = self.GetUniqueParam('deviceid') - match = re.match('GoogleDMToken token=(\\w+)', - self._headers.getheader('Authorization', '')) - if match: - dmtoken = match.group(1) - if not dmtoken: - error = 401 - else: - token_info = self._server.LookupToken(dmtoken) - if (not token_info or - not request_device_id or - token_info['device_id'] != request_device_id): - error = 410 - else: - return (token_info, None) - - logging.debug('Token check failed with error %d' % error) - - return (None, (error, 'Server error %d' % error)) - - def DumpMessage(self, label, msg): - """Helper for logging an ASCII dump of a protobuf message.""" - logging.debug('%s\n%s' % (label, str(msg))) - -class TestServer(object): - """Handles requests and keeps global service state.""" - - def __init__(self, policy_path, private_key_paths): - """Initializes the server. - - Args: - policy_path: Names the file to read JSON-formatted policy from. - private_key_paths: List of paths to read private keys from. - """ - self._registered_tokens = {} - self.policy_path = policy_path - - self.keys = [] - if private_key_paths: - # Load specified keys from the filesystem. - for key_path in private_key_paths: - try: - key = tlslite.api.parsePEMKey(open(key_path).read(), private=True) - except IOError: - print 'Failed to load private key from %s' % key_path - continue - - assert key is not None - self.keys.append({ 'private_key' : key }) - else: - # Generate 2 private keys if none were passed from the command line. - for i in range(2): - key = tlslite.api.generateRSAKey(512) - assert key is not None - self.keys.append({ 'private_key' : key }) - - # Derive the public keys from the private keys. - for entry in self.keys: - key = entry['private_key'] - - algorithm = asn1der.Sequence( - [ asn1der.Data(asn1der.OBJECT_IDENTIFIER, PKCS1_RSA_OID), - asn1der.Data(asn1der.NULL, '') ]) - rsa_pubkey = asn1der.Sequence([ asn1der.Integer(key.n), - asn1der.Integer(key.e) ]) - pubkey = asn1der.Sequence([ algorithm, asn1der.Bitstring(rsa_pubkey) ]) - entry['public_key'] = pubkey; - - def GetPolicies(self): - """Returns the policies to be used, reloaded form the backend file every - time this is called. - """ - policy = {} - if json is None: - print 'No JSON module, cannot parse policy information' - else : - try: - policy = json.loads(open(self.policy_path).read()) - except IOError: - print 'Failed to load policy from %s' % self.policy_path - return policy - - def HandleRequest(self, path, headers, request): - """Handles a request. - - Args: - path: The request path and query parameters received from the client. - headers: A rfc822.Message-like object containing HTTP headers. - request: The request data received from the client as a string. - Returns: - A pair of HTTP status code and response data to send to the client. - """ - handler = RequestHandler(self, path, headers, request) - return handler.HandleRequest() - - def RegisterDevice(self, device_id, machine_id, type): - """Registers a device or user and generates a DM token for it. - - Args: - device_id: The device identifier provided by the client. - - Returns: - The newly generated device token for the device. - """ - dmtoken_chars = [] - while len(dmtoken_chars) < 32: - dmtoken_chars.append(random.choice('0123456789abcdef')) - dmtoken = ''.join(dmtoken_chars) - allowed_policy_types = { - dm.DeviceRegisterRequest.BROWSER: ['google/chrome/user'], - dm.DeviceRegisterRequest.USER: ['google/chromeos/user'], - dm.DeviceRegisterRequest.DEVICE: [ - 'google/chromeos/device', - 'google/chromeos/publicaccount' - ], - dm.DeviceRegisterRequest.TT: ['google/chromeos/user', - 'google/chrome/user'], - } - if machine_id in KIOSK_MACHINE_IDS: - enrollment_mode = dm.DeviceRegisterResponse.RETAIL - else: - enrollment_mode = dm.DeviceRegisterResponse.ENTERPRISE - self._registered_tokens[dmtoken] = { - 'device_id': device_id, - 'device_token': dmtoken, - 'allowed_policy_types': allowed_policy_types[type], - 'machine_name': 'chromeos-' + machine_id, - 'machine_id': machine_id, - 'enrollment_mode': enrollment_mode, - } - return self._registered_tokens[dmtoken] - - def UpdateMachineId(self, dmtoken, machine_id): - """Updates the machine identifier for a registered device. - - Args: - dmtoken: The device management token provided by the client. - machine_id: Updated hardware identifier value. - """ - if dmtoken in self._registered_tokens: - self._registered_tokens[dmtoken]['machine_id'] = machine_id - - def LookupToken(self, dmtoken): - """Looks up a device or a user by DM token. - - Args: - dmtoken: The device management token provided by the client. - - Returns: - A dictionary with information about a device or user that is registered by - dmtoken, or None if the token is not found. - """ - return self._registered_tokens.get(dmtoken, None) - - def UnregisterDevice(self, dmtoken): - """Unregisters a device identified by the given DM token. - - Args: - dmtoken: The device management token provided by the client. - """ - if dmtoken in self._registered_tokens.keys(): - del self._registered_tokens[dmtoken] diff --git a/net/tools/testserver/testserver.py b/net/tools/testserver/testserver.py index 0fe9bd7..54d8b19 100755 --- a/net/tools/testserver/testserver.py +++ b/net/tools/testserver/testserver.py @@ -272,7 +272,6 @@ class TestPageHandler(testserver_base.BasePageHandler): post_handlers = [ self.EchoTitleHandler, self.EchoHandler, - self.DeviceManagementHandler, self.PostOnlyFileHandler] + get_handlers put_handlers = [ self.EchoTitleHandler, @@ -1601,31 +1600,6 @@ class TestPageHandler(testserver_base.BasePageHandler): self.wfile.write(contents) return True - def DeviceManagementHandler(self): - """Delegates to the device management service used for cloud policy.""" - - if not self._ShouldHandleRequest("/device_management"): - return False - - raw_request = self.ReadRequestBody() - - if not self.server._device_management_handler: - import device_management - policy_path = os.path.join(self.server.data_dir, 'device_management') - self.server._device_management_handler = ( - device_management.TestServer(policy_path, self.server.policy_keys)) - - http_response, raw_reply = ( - self.server._device_management_handler.HandleRequest(self.path, - self.headers, - raw_request)) - self.send_response(http_response) - if (http_response == 200): - self.send_header('Content-Type', 'application/x-protobuffer') - self.end_headers() - self.wfile.write(raw_reply) - return True - # called by the redirect handling function when there is no parameter def sendRedirectHelp(self, redirect_name): self.send_response(200) @@ -1901,8 +1875,6 @@ class ServerRunner(testserver_base.TestServerRunner): server.data_dir = self.__make_data_dir() server.file_root_url = self.options.file_root_url server_data['port'] = server.server_port - server._device_management_handler = None - server.policy_keys = self.options.policy_keys elif self.options.server_type == SERVER_WEBSOCKET: # Launch pywebsocket via WebSocketServer. logger = logging.getLogger() @@ -2055,17 +2027,6 @@ class ServerRunner(testserver_base.TestServerRunner): 'multiple algorithms should be enabled.'); self.option_parser.add_option('--file-root-url', default='/files/', help='Specify a root URL for files served.') - self.option_parser.add_option('--policy-key', action='append', - dest='policy_keys', - help='Specify a path to a PEM-encoded ' - 'private key to use for policy signing. May ' - 'be specified multiple times in order to ' - 'load multipe keys into the server. If the ' - 'server has multiple keys, it will rotate ' - 'through them in at each request a ' - 'round-robin fashion. The server will ' - 'generate a random key if none is specified ' - 'on the command line.') if __name__ == '__main__': |