summaryrefslogtreecommitdiffstats
path: root/chrome/test/pyautolib/policy_base.py
blob: 023dfca0a624d35d49b9c1fb5e112c39e4b11d52 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Base class for tests that need to update the policies enforced by Chrome.

Subclasses can call SetUserPolicy (ChromeOS, Linux, Windows) and
SetDevicePolicy (ChromeOS only) with a dictionary of the policies to install.

The current implementation depends on the platform. The implementations might
change in the future, but tests relying on the above calls will keep working.
"""

# On ChromeOS, a mock DMServer is started and enterprise enrollment faked
# against it. The mock DMServer then serves user and device policy to Chrome.
#
# For this setup to work, the DNS, GAIA and TPM (if present) are mocked as well:
#
# * The mock DNS resolves all addresses to 127.0.0.1. This allows the mock GAIA
#   to handle all login attempts. It also eliminates the impact of flaky network
#   connections on tests. Beware though that no cloud services can be accessed
#   due to this DNS redirect.
#
# * The mock GAIA permits login with arbitrary credentials and accepts any OAuth
#   tokens sent to it for verification as valid.
#
# * When running on a real device, its TPM is disabled. If the TPM were enabled,
#   enrollment could not be undone without a reboot. Disabling the TPM makes
#   cryptohomed behave as if no TPM was present, allowing enrollment to be
#   undone by removing the install attributes.
#
# To disable the TPM, 0 must be written to /sys/class/misc/tpm0/device/enabled.
# Since this file is not writeable, a tpmfs is mounted that shadows the file
# with a writeable copy.

import json
import logging
import os
import subprocess

import pyauto

if pyauto.PyUITest.IsChromeOS():
  import sys
  import warnings

  import pyauto_paths

  # Ignore deprecation warnings, they make our output more cluttered.
  warnings.filterwarnings('ignore', category=DeprecationWarning)

  # Find the path to the pyproto and add it to sys.path.
  # Prepend it so that google.protobuf is loaded from here.
  for path in pyauto_paths.GetBuildDirs():
    p = os.path.join(path, 'pyproto')
    if os.path.isdir(p):
      sys.path = [p, os.path.join(p, 'chrome', 'browser', 'policy',
                                  'proto')] + sys.path
      break
  sys.path.append('/usr/local')  # to import autotest libs.

  import dbus
  import device_management_backend_pb2 as dm
  import pyauto_utils
  import string
  import tempfile
  import urllib
  import urllib2
  import uuid
  from autotest.cros import auth_server
  from autotest.cros import constants
  from autotest.cros import cros_ui
  from autotest.cros import dns_server
elif pyauto.PyUITest.IsWin():
  import _winreg as winreg
elif pyauto.PyUITest.IsMac():
  import getpass
  import plistlib

# ASN.1 object identifier for PKCS#1/RSA.
PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01'

TPM_SYSFS_PATH = '/sys/class/misc/tpm0'
TPM_SYSFS_ENABLED_FILE = os.path.join(TPM_SYSFS_PATH, 'device/enabled')


class PolicyTestBase(pyauto.PyUITest):
  """A base class for tests that need to set up and modify policies.

  Subclasses can use the methods SetUserPolicy (ChromeOS, Linux, Windows) and
  SetDevicePolicy (ChromeOS only) to set the policies seen by Chrome.
  """

  if pyauto.PyUITest.IsChromeOS():
    # TODO(bartfab): Extend the C++ wrapper that starts the mock DMServer so
    # that an owner can be passed in. Without this, the server will assume that
    # the owner is user@example.com and for consistency, so must we.
    owner = 'user@example.com'
    # Subclasses may override these credentials to fake enrollment into another
    # mode or use different device and machine IDs.
    mode = 'enterprise'
    device_id = string.upper(str(uuid.uuid4()))
    machine_id = 'CROSTEST'

    _auth_server = None
    _dns_server = None

  def ShouldAutoLogin(self):
    return False

  @staticmethod
  def _Call(command, check=False):
    """Invokes a subprocess and optionally asserts the return value is zero."""
    with open(os.devnull, 'w') as devnull:
      if check:
        return subprocess.check_call(command.split(' '), stdout=devnull)
      else:
        return subprocess.call(command.split(' '), stdout=devnull)

  def _WriteFile(self, path, content):
    """Writes content to path, creating any intermediary directories."""
    if not os.path.exists(os.path.dirname(path)):
      os.makedirs(os.path.dirname(path))
    f = open(path, 'w')
    f.write(content)
    f.close()

  def _GetTestServerPoliciesFilePath(self):
    """Returns the path of the cloud policy configuration file."""
    assert self.IsChromeOS()
    return os.path.join(self._temp_data_dir, 'device_management')

  def _GetHttpURLForDeviceManagement(self):
    """Returns the URL at which the TestServer is serving user policy."""
    assert self.IsChromeOS()
    return self._http_server.GetURL('device_management').spec()

  def _RemoveIfExists(self, filename):
    """Removes a file if it exists."""
    if os.path.exists(filename):
      os.remove(filename)

  def _StartSessionManagerAndChrome(self):
    """Starts the session manager and Chrome.

    Requires that the session manager be stopped already.
    """
    # Ugly hack: session manager will not spawn Chrome if this file exists. That
    # is usually a good thing (to keep the automation channel open), but in this
    # case we really want to restart chrome. PyUITest.setUp() will be called
    # after session manager and chrome have restarted, and will setup the
    # automation channel.
    restore_magic_file = False
    if os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE):
      logging.debug('DISABLE_BROWSER_RESTART_MAGIC_FILE found. '
                    'Removing temporarily for the next restart.')
      restore_magic_file = True
      os.remove(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)
      assert not os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)

    logging.debug('Starting session manager again')
    cros_ui.start()

    # cros_ui.start() waits for the login prompt to be visible, so Chrome has
    # already started once it returns.
    if restore_magic_file:
      open(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE, 'w').close()
      assert os.path.exists(constants.DISABLE_BROWSER_RESTART_MAGIC_FILE)

  def _WritePolicyOnChromeOS(self):
    """Updates the mock DMServer's input file with current policy."""
    assert self.IsChromeOS()
    policy_dict = {
      'google/chromeos/device': self._device_policy,
      'google/chromeos/user': {
        'mandatory': self._user_policy,
        'recommended': {},
      },
      'managed_users': ['*'],
    }
    self._WriteFile(self._GetTestServerPoliciesFilePath(),
                    json.dumps(policy_dict, sort_keys=True, indent=2) + '\n')

  @staticmethod
  def _IsCryptohomedReadyOnChromeOS():
    """Checks whether cryptohomed is running and ready to accept DBus calls."""
    assert pyauto.PyUITest.IsChromeOS()
    try:
      bus = dbus.SystemBus()
      proxy = bus.get_object('org.chromium.Cryptohome',
                             '/org/chromium/Cryptohome')
      dbus.Interface(proxy, 'org.chromium.CryptohomeInterface')
    except dbus.DBusException:
      return False
    return True

  def _ClearInstallAttributesOnChromeOS(self):
    """Resets the install attributes."""
    assert self.IsChromeOS()
    self._RemoveIfExists('/home/.shadow/install_attributes.pb')
    self._Call('restart cryptohomed', check=True)
    assert self.WaitUntil(self._IsCryptohomedReadyOnChromeOS)

  def _DMPostRequest(self, request_type, request, headers):
    """Posts a request to the mock DMServer."""
    assert self.IsChromeOS()
    url = self._GetHttpURLForDeviceManagement()
    url += '?' + urllib.urlencode({
      'deviceid': self.device_id,
      'oauth_token': 'dummy_oauth_token_that_is_not_checked_anyway',
      'request': request_type,
      'devicetype': 2,
      'apptype': 'Chrome',
      'agent': 'Chrome',
    })
    response = dm.DeviceManagementResponse()
    response.ParseFromString(urllib2.urlopen(urllib2.Request(
        url, request.SerializeToString(), headers)).read())
    return response

  def _DMRegisterDevice(self):
    """Registers with the mock DMServer and returns the DMToken."""
    assert self.IsChromeOS()
    dm_request = dm.DeviceManagementRequest()
    request = dm_request.register_request
    request.type = dm.DeviceRegisterRequest.DEVICE
    request.machine_id = self.machine_id
    dm_response = self._DMPostRequest('register', dm_request, {})
    return dm_response.register_response.device_management_token

  def _DMFetchPolicy(self, dm_token):
    """Fetches device policy from the mock DMServer."""
    assert self.IsChromeOS()
    dm_request = dm.DeviceManagementRequest()
    policy_request = dm_request.policy_request
    request = policy_request.request.add()
    request.policy_type = 'google/chromeos/device'
    request.signature_type = dm.PolicyFetchRequest.SHA1_RSA
    headers = {'Authorization': 'GoogleDMToken token=' + dm_token}
    dm_response = self._DMPostRequest('policy', dm_request, headers)
    response = dm_response.policy_response.response[0]
    assert response.policy_data
    assert response.policy_data_signature
    assert response.new_public_key
    return response

  def ExtraChromeFlags(self):
    """Sets up Chrome to use cloud policies on ChromeOS."""
    flags = pyauto.PyUITest.ExtraChromeFlags(self)
    if self.IsChromeOS():
      while '--skip-oauth-login' in flags:
        flags.remove('--skip-oauth-login')
      url = self._GetHttpURLForDeviceManagement()
      flags.append('--device-management-url=' + url)
      flags.append('--disable-sync')
    return flags

  def _SetUpWithSessionManagerStopped(self):
    """Sets up the test environment after stopping the session manager."""
    assert self.IsChromeOS()
    logging.debug('Stopping session manager')
    cros_ui.stop(allow_fail=True)

    # Start mock GAIA server.
    self._auth_server = auth_server.GoogleAuthServer()
    self._auth_server.run()

    # Disable TPM if present.
    if os.path.exists(TPM_SYSFS_PATH):
      self._Call('mount -t tmpfs -o size=1k tmpfs %s'
          % os.path.realpath(TPM_SYSFS_PATH), check=True)
      self._WriteFile(TPM_SYSFS_ENABLED_FILE, '0')

    # Clear install attributes and restart cryptohomed to pick up the change.
    self._ClearInstallAttributesOnChromeOS()

    # Set install attributes to mock enterprise enrollment.
    bus = dbus.SystemBus()
    proxy = bus.get_object('org.chromium.Cryptohome',
                            '/org/chromium/Cryptohome')
    install_attributes = {
      'enterprise.device_id': self.device_id,
      'enterprise.domain': string.split(self.owner, '@')[-1],
      'enterprise.mode': self.mode,
      'enterprise.owned': 'true',
      'enterprise.user': self.owner
    }
    interface = dbus.Interface(proxy, 'org.chromium.CryptohomeInterface')
    for name, value in install_attributes.iteritems():
      interface.InstallAttributesSet(name, '%s\0' % value)
    interface.InstallAttributesFinalize()

    # Start mock DNS server that redirects all traffic to 127.0.0.1.
    self._dns_server = dns_server.LocalDns()
    self._dns_server.run()

    # Start mock DMServer.
    source_dir = os.path.normpath(pyauto_paths.GetSourceDir())
    self._temp_data_dir = tempfile.mkdtemp(dir=source_dir)
    logging.debug('TestServer input path: %s' % self._temp_data_dir)
    relative_temp_data_dir = os.path.basename(self._temp_data_dir)
    self._http_server = self.StartHTTPServer(relative_temp_data_dir)

    # Initialize the policy served.
    self._device_policy = {}
    self._user_policy = {}
    self._WritePolicyOnChromeOS()

    # Register with mock DMServer and retrieve initial device policy blob.
    dm_token = self._DMRegisterDevice()
    policy = self._DMFetchPolicy(dm_token)

    # Write the initial device policy blob.
    self._WriteFile(constants.OWNER_KEY_FILE, policy.new_public_key)
    self._WriteFile(constants.SIGNED_POLICY_FILE, policy.SerializeToString())

    # Remove any existing vaults.
    self.RemoveAllCryptohomeVaultsOnChromeOS()

    # Restart session manager and Chrome.
    self._StartSessionManagerAndChrome()

  def _tearDownWithSessionManagerStopped(self):
    """Resets the test environment after stopping the session manager."""
    assert self.IsChromeOS()
    logging.debug('Stopping session manager')
    cros_ui.stop(allow_fail=True)

    # Stop mock GAIA server.
    if self._auth_server:
      self._auth_server.stop()

    # Reenable TPM if present.
    if os.path.exists(TPM_SYSFS_PATH):
      self._Call('umount %s' % os.path.realpath(TPM_SYSFS_PATH))

    # Clear install attributes and restart cryptohomed to pick up the change.
    self._ClearInstallAttributesOnChromeOS()

    # Stop mock DNS server.
    if self._dns_server:
      self._dns_server.stop()

    # Stop mock DMServer.
    self.StopHTTPServer(self._http_server)

    # Clear the policy served.
    pyauto_utils.RemovePath(self._temp_data_dir)

    # Remove the device policy blob.
    self._RemoveIfExists(constants.OWNER_KEY_FILE)
    self._RemoveIfExists(constants.SIGNED_POLICY_FILE)

    # Remove any existing vaults.
    self.RemoveAllCryptohomeVaultsOnChromeOS()

    # Restart session manager and Chrome.
    self._StartSessionManagerAndChrome()

  def setUp(self):
    """Sets up the platform for policy testing.

    On ChromeOS, part of the setup involves restarting the session manager to
    inject an initial device policy blob.
    """
    if self.IsChromeOS():
      # Perform the remainder of the setup with the device manager stopped.
      try:
        self.WaitForSessionManagerRestart(
            self._SetUpWithSessionManagerStopped)
      except:
        # Destroy the non re-entrant services.
        if self._auth_server:
          self._auth_server.stop()
        if self._dns_server:
          self._dns_server.stop()
        raise

    pyauto.PyUITest.setUp(self)
    self._branding = self.GetBrowserInfo()['properties']['branding']

  def tearDown(self):
    """Cleans up the policies and related files created in tests."""
    if self.IsChromeOS():
      # Perform the cleanup with the device manager stopped.
      self.WaitForSessionManagerRestart(self._tearDownWithSessionManagerStopped)
    else:
      # On other platforms, there is only user policy to clear.
      self.SetUserPolicy(refresh=False)

    pyauto.PyUITest.tearDown(self)

  def LoginWithTestAccount(self, account='prod_enterprise_test_user'):
    """Convenience method for logging in with one of the test accounts."""
    assert self.IsChromeOS()
    credentials = self.GetPrivateInfo()[account]
    self.Login(credentials['username'], credentials['password'])
    assert self.GetLoginInfo()['is_logged_in']

  def _GetCurrentLoginScreenId(self):
    return self.ExecuteJavascriptInOOBEWebUI(
        """window.domAutomationController.send(
               String(cr.ui.Oobe.getInstance().currentScreen.id));
        """)

  def _WaitForLoginScreenId(self, id):
    self.assertTrue(
        self.WaitUntil(function=self._GetCurrentLoginScreenId,
                       expect_retval=id),
                       msg='Expected login screen "%s" to be visible.' % id)

  def _CheckLoginFormLoading(self):
    return self.ExecuteJavascriptInOOBEWebUI(
        """window.domAutomationController.send(
               cr.ui.Oobe.getInstance().currentScreen.loading);
        """)

  def PrepareToWaitForLoginFormReload(self):
    self.assertEqual('gaia-signin',
                     self._GetCurrentLoginScreenId(),
                     msg='Expected the login form to be visible.')
    self.assertTrue(
        self.WaitUntil(function=self._CheckLoginFormLoading,
                       expect_retval=False),
                       msg='Expected the login form to finish loading.')
    # Set up a sentinel variable that is false now and will toggle to true when
    # the login form starts reloading.
    self.ExecuteJavascriptInOOBEWebUI(
        """var screen = cr.ui.Oobe.getInstance().currentScreen;
           if (!('reload_started' in screen)) {
             screen.orig_loadAuthExtension_ = screen.loadAuthExtension_;
             screen.loadAuthExtension_ = function(data) {
               this.orig_loadAuthExtension_(data);
               if (this.loading)
                 this.reload_started = true;
             }
           }
           screen.reload_started = false;
           window.domAutomationController.send(true);""")

  def _CheckLoginFormReloaded(self):
    return self.ExecuteJavascriptInOOBEWebUI(
        """window.domAutomationController.send(
               cr.ui.Oobe.getInstance().currentScreen.reload_started &&
               !cr.ui.Oobe.getInstance().currentScreen.loading);
        """)

  def WaitForLoginFormReload(self):
    self.assertEqual('gaia-signin',
                     self._GetCurrentLoginScreenId(),
                     msg='Expected the login form to be visible.')
    self.assertTrue(
        self.WaitUntil(function=self._CheckLoginFormReloaded),
                       msg='Expected the login form to finish reloading.')

  def _SetUserPolicyChromeOS(self, user_policy=None):
    """Writes the given user policy to the mock DMServer's input file."""
    self._user_policy = user_policy or {}
    self._WritePolicyOnChromeOS()

  def _SetUserPolicyWin(self, user_policy=None):
    """Writes the given user policy to the Windows registry."""
    def SetValueEx(key, sub_key, value):
      if isinstance(value, int):
        winreg.SetValueEx(key, sub_key, 0, winreg.REG_DWORD, int(value))
      elif isinstance(value, basestring):
        winreg.SetValueEx(key, sub_key, 0, winreg.REG_SZ, value.encode('ascii'))
      elif isinstance(value, list):
        k = winreg.CreateKey(key, sub_key)
        for index, v in list(enumerate(value)):
          SetValueEx(k, str(index + 1), v)
        winreg.CloseKey(k)
      else:
        raise TypeError('Unsupported data type: "%s"' % value)

    assert self.IsWin()
    if self._branding == 'Google Chrome':
      reg_base = r'SOFTWARE\Policies\Google\Chrome'
    else:
      reg_base = r'SOFTWARE\Policies\Chromium'

    if subprocess.call(
        r'reg query HKEY_LOCAL_MACHINE\%s' % reg_base) == 0:
      logging.debug(r'Removing %s' % reg_base)
      subprocess.call(r'reg delete HKLM\%s /f' % reg_base)

    if user_policy is not None:
      root_key = winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, reg_base)
      for k, v in user_policy.iteritems():
        SetValueEx(root_key, k, v)
      winreg.CloseKey(root_key)

  def _SetUserPolicyLinux(self, user_policy=None):
    """Writes the given user policy to the JSON policy file read by Chrome."""
    assert self.IsLinux()
    sudo_cmd_file = os.path.join(os.path.dirname(__file__),
                                 'policy_posix_util.py')

    if self._branding == 'Google Chrome':
      policies_location_base = '/etc/opt/chrome'
    else:
      policies_location_base = '/etc/chromium'

    if os.path.exists(policies_location_base):
      logging.debug('Removing directory %s' % policies_location_base)
      subprocess.call(['suid-python', sudo_cmd_file,
                       'remove_dir', policies_location_base])

    if user_policy is not None:
      self._WriteFile('/tmp/chrome.json',
                      json.dumps(user_policy, sort_keys=True, indent=2) + '\n')

      policies_location = '%s/policies/managed' % policies_location_base
      subprocess.call(['suid-python', sudo_cmd_file,
                       'setup_dir', policies_location])
      subprocess.call(['suid-python', sudo_cmd_file,
                       'perm_dir', policies_location])
      # Copy chrome.json file to the managed directory
      subprocess.call(['suid-python', sudo_cmd_file,
                       'copy', '/tmp/chrome.json', policies_location])
      os.remove('/tmp/chrome.json')

  def _SetUserPolicyMac(self, user_policy=None):
    """Writes the given user policy to the plist policy file read by Chrome."""
    assert self.IsMac()
    sudo_cmd_file = os.path.join(os.path.dirname(__file__),
                                 'policy_posix_util.py')

    if self._branding == 'Google Chrome':
      policies_file_base = 'com.google.Chrome.plist'
    else:
      policies_file_base = 'org.chromium.Chromium.plist'

    policies_location = os.path.join('/Library', 'Managed Preferences',
                                     getpass.getuser())

    if os.path.exists(policies_location):
      logging.debug('Removing directory %s' % policies_location)
      subprocess.call(['suid-python', sudo_cmd_file,
                       'remove_dir', policies_location])

    if user_policy is not None:
      policies_tmp_file = os.path.join('/tmp', policies_file_base)
      plistlib.writePlist(user_policy, policies_tmp_file)
      subprocess.call(['suid-python', sudo_cmd_file,
                       'setup_dir', policies_location])
      # Copy policy file to the managed directory
      subprocess.call(['suid-python', sudo_cmd_file,
                       'copy', policies_tmp_file, policies_location])
      os.remove(policies_tmp_file)

  def SetUserPolicy(self, user_policy=None, refresh=True):
    """Sets the user policy provided as a dict.

    Args:
      user_policy: The user policy to set. None clears it.
      refresh: If True, Chrome will refresh and apply the new policy.
               Requires Chrome to be alive for it.
    """
    if self.IsChromeOS():
      self._SetUserPolicyChromeOS(user_policy=user_policy)
    elif self.IsWin():
      self._SetUserPolicyWin(user_policy=user_policy)
    elif self.IsLinux():
      self._SetUserPolicyLinux(user_policy=user_policy)
    elif self.IsMac():
      self._SetUserPolicyMac(user_policy=user_policy)
    else:
      raise NotImplementedError('Not available on this platform.')

    if refresh:
      self.RefreshPolicies()

  def SetDevicePolicy(self, device_policy=None, refresh=True):
    """Sets the device policy provided as a dict.

    Args:
      device_policy: The device policy to set. None clears it.
      refresh: If True, Chrome will refresh and apply the new policy.
               Requires Chrome to be alive for it.
    """
    assert self.IsChromeOS()
    self._device_policy = device_policy or {}
    self._WritePolicyOnChromeOS()
    if refresh:
      self.RefreshPolicies()