diff options
author | mmeade <mmeade@chromium.org> | 2015-04-02 18:03:34 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-04-03 01:04:06 +0000 |
commit | 6dc766ea57cd776e35ca0f6f6d89b42e613f7e05 (patch) | |
tree | cd0e500e95860c34fa2f103fe258b5880b9a545e /testing/legion | |
parent | d0bdd942dd0f8e75d8c3bdac3dcf804925b95af3 (diff) | |
download | chromium_src-6dc766ea57cd776e35ca0f6f6d89b42e613f7e05.zip chromium_src-6dc766ea57cd776e35ca0f6f6d89b42e613f7e05.tar.gz chromium_src-6dc766ea57cd776e35ca0f6f6d89b42e613f7e05.tar.bz2 |
Factoring out the subprocess code into a stand-alone module.
Also adding a controller-side wrapper to make interacting with the process more intuitive.
BUG=
Review URL: https://codereview.chromium.org/1057943002
Cr-Commit-Position: refs/heads/master@{#323576}
Diffstat (limited to 'testing/legion')
-rw-r--r-- | testing/legion/common_lib.py | 2 | ||||
-rwxr-xr-x | testing/legion/examples/hello_world/controller_test.py | 12 | ||||
-rwxr-xr-x | testing/legion/examples/subprocess/subprocess_test.py | 36 | ||||
-rw-r--r-- | testing/legion/legion.isolate | 1 | ||||
-rw-r--r-- | testing/legion/process.py | 247 | ||||
-rw-r--r-- | testing/legion/rpc_methods.py | 169 | ||||
-rw-r--r-- | testing/legion/task_controller.py | 5 |
7 files changed, 279 insertions, 193 deletions
diff --git a/testing/legion/common_lib.py b/testing/legion/common_lib.py index c752e0f..d3d4c3c 100644 --- a/testing/legion/common_lib.py +++ b/testing/legion/common_lib.py @@ -40,4 +40,4 @@ def ConnectToServer(server): """Connect to an RPC server.""" addr = 'http://%s:%d' % (server, SERVER_PORT) logging.debug('Connecting to RPC server at %s', addr) - return xmlrpclib.Server(addr) + return xmlrpclib.Server(addr, allow_none=True) diff --git a/testing/legion/examples/hello_world/controller_test.py b/testing/legion/examples/hello_world/controller_test.py index 37afcef..3456662 100755 --- a/testing/legion/examples/hello_world/controller_test.py +++ b/testing/legion/examples/hello_world/controller_test.py @@ -32,7 +32,7 @@ class ExampleTestController(test_controller.TestController): """Create a task object and set the proper values.""" task = self.CreateNewTask( isolated_hash=isolated_hash, - dimensions={'os': 'Ubuntu-14.04', 'pool': 'Legion'}, priority=200, + dimensions={'os': 'Ubuntu-14.04'}, idle_timeout_secs=90, connection_timeout_secs=90, verbosity=logging.INFO, run_id=1) @@ -70,11 +70,11 @@ class ExampleTestController(test_controller.TestController): def CallTaskTest(self, task): """Call task_test.py name on a task.""" logging.info('Calling Subprocess to run "./task_test.py %s"', task.name) - proc = task.rpc.subprocess.Popen(['./task_test.py', task.name]) - task.rpc.subprocess.Wait(proc) - retcode = task.rpc.subprocess.GetReturncode(proc) - stdout = task.rpc.subprocess.ReadStdout(proc) - stderr = task.rpc.subprocess.ReadStderr(proc) + proc = task.Process(['./task_test.py', task.name]) + proc.Wait() + retcode = proc.GetReturncode() + stdout = proc.ReadStdout() + stderr = proc.ReadStderr() logging.info('retcode: %s, stdout: %s, stderr: %s', retcode, stdout, stderr) diff --git a/testing/legion/examples/subprocess/subprocess_test.py b/testing/legion/examples/subprocess/subprocess_test.py index 1a28ddd..7696ded 100755 --- a/testing/legion/examples/subprocess/subprocess_test.py +++ b/testing/legion/examples/subprocess/subprocess_test.py @@ -32,7 +32,7 @@ class ExampleTestController(test_controller.TestController): self.task = self.CreateNewTask( isolated_hash=args.task_hash, - dimensions={'os': 'Ubuntu-14.04', 'pool': 'Chromoting'}, + dimensions={'os': 'Ubuntu-14.04'}, idle_timeout_secs=90, connection_timeout_secs=90, verbosity=logging.DEBUG) self.task.Create() @@ -47,42 +47,38 @@ class ExampleTestController(test_controller.TestController): def TestMultipleProcesses(self): start = time.time() - sleep20 = self.task.rpc.subprocess.Process(['sleep', '20']) - self.task.rpc.subprocess.Start(sleep20) - sleep10 = self.task.rpc.subprocess.Process(['sleep', '10']) - self.task.rpc.subprocess.Start(sleep10) + sleep10 = self.task.Process(['sleep', '10']) + sleep20 = self.task.Process(['sleep', '20']) - self.task.rpc.subprocess.Wait(sleep10) + sleep10.Wait() elapsed = time.time() - start assert elapsed >= 10 and elapsed < 11 - self.task.rpc.subprocess.Wait(sleep20) + sleep20.Wait() elapsed = time.time() - start assert elapsed >= 20 - self.task.rpc.subprocess.Delete(sleep20) - self.task.rpc.subprocess.Delete(sleep10) + sleep10.Delete() + sleep20.Delete() def TestTerminate(self): start = time.time() - proc = self.task.rpc.subprocess.Process(['sleep', '20']) - self.task.rpc.subprocess.Start(proc) - self.task.rpc.subprocess.Terminate(proc) + sleep20 = self.task.Process(['sleep', '20']) + sleep20.Terminate() try: - self.task.rpc.subprocess.Wait(proc) + sleep20.Wait() except xmlrpclib.Fault: pass finally: - self.task.rpc.subprocess.Delete(proc) + sleep20.Delete() assert time.time() - start < 20 def TestLs(self): - proc = self.task.rpc.subprocess.Process(['ls']) - self.task.rpc.subprocess.Start(proc) - self.task.rpc.subprocess.Wait(proc) - assert self.task.rpc.subprocess.GetReturncode(proc) == 0 - assert 'task.isolate' in self.task.rpc.subprocess.ReadStdout(proc) - self.task.rpc.subprocess.Delete(proc) + ls = self.task.Process(['ls']) + ls.Wait() + assert ls.GetReturncode() == 0 + assert 'task.isolate' in ls.ReadStdout() + ls.Delete() if __name__ == '__main__': diff --git a/testing/legion/legion.isolate b/testing/legion/legion.isolate index 774b27a..63982e0 100644 --- a/testing/legion/legion.isolate +++ b/testing/legion/legion.isolate @@ -8,6 +8,7 @@ '__init__.py', 'common_lib.py', 'legion.isolate', + 'process.py', 'rpc_methods.py', 'rpc_server.py', 'run_task.py', diff --git a/testing/legion/process.py b/testing/legion/process.py new file mode 100644 index 0000000..356db61 --- /dev/null +++ b/testing/legion/process.py @@ -0,0 +1,247 @@ +# Copyright 2015 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""RPC compatible subprocess-type module. + +This module defined both a task-side process class as well as a controller-side +process wrapper for easier access and usage of the task-side process. +""" + +import logging +import subprocess +import sys +import threading + +#pylint: disable=relative-import +import common_lib + +# Map swarming_client to use subprocess42 +sys.path.append(common_lib.SWARMING_DIR) + +from utils import subprocess42 + + +class ControllerProcessWrapper(object): + """Controller-side process wrapper class. + + This class provides a more intuitive interface to task-side processes + than calling the methods directly using the RPC object. + """ + + def __init__(self, rpc, cmd, verbose=False, detached=False, cwd=None): + self._rpc = rpc + self._id = rpc.subprocess.Process(cmd) + if verbose: + self._rpc.subprocess.SetVerbose(self._id) + if detached: + self._rpc.subprocess.SetDetached(self._id) + if cwd: + self._rpc.subprocess.SetCwd(self._rpc, cwd) + self._rpc.subprocess.Start(self._id) + + def Terminate(self): + logging.debug('Terminating process %s', self._id) + return self._rpc.subprocess.Terminate(self._id) + + def Kill(self): + logging.debug('Killing process %s', self._id) + self._rpc.subprocess.Kill(self._id) + + def Delete(self): + return self._rpc.subprocess.Delete(self._id) + + def GetReturncode(self): + return self._rpc.subprocess.GetReturncode(self._id) + + def ReadStdout(self): + """Returns all stdout since the last call to ReadStdout. + + This call allows the user to read stdout while the process is running. + However each call will flush the local stdout buffer. In order to make + multiple calls to ReadStdout and to retain the entire output the results + of this call will need to be buffered in the calling code. + """ + return self._rpc.subprocess.ReadStdout(self._id) + + def ReadStderr(self): + """Returns all stderr read since the last call to ReadStderr. + + See ReadStdout for additional details. + """ + return self._rpc.subprocess.ReadStderr(self._id) + + def ReadOutput(self): + """Returns the (stdout, stderr) since the last Read* call. + + See ReadStdout for additional details. + """ + return self._rpc.subprocess.ReadOutput(self._id) + + def Wait(self): + return self._rpc.subprocess.Wait(self._id) + + def Poll(self): + return self._rpc.subprocess.Poll(self._id) + + def GetPid(self): + return self._rpc.subprocess.GetPid(self._id) + + + +class Process(object): + """Implements a task-side non-blocking subprocess. + + This non-blocking subprocess allows the caller to continue operating while + also able to interact with this subprocess based on a key returned to + the caller at the time of creation. + + Creation args are set via Set* methods called after calling Process but + before calling Start. This is due to a limitation of the XML-RPC + implementation not supporting keyword arguments. + """ + + _processes = {} + _process_next_id = 0 + _creation_lock = threading.Lock() + + def __init__(self, cmd): + self.stdout = '' + self.stderr = '' + self.cmd = cmd + self.proc = None + self.cwd = None + self.verbose = False + self.detached = False + self.data_lock = threading.Lock() + + def __str__(self): + return '%r, cwd=%r, verbose=%r, detached=%r' % ( + self.cmd, self.cwd, self.verbose, self.detached) + + def _reader(self): + for pipe, data in self.proc.yield_any(): + with self.data_lock: + if pipe == 'stdout': + self.stdout += data + if self.verbose: + sys.stdout.write(data) + else: + self.stderr += data + if self.verbose: + sys.stderr.write(data) + + @classmethod + def KillAll(cls): + for key in cls._processes: + cls.Kill(key) + + @classmethod + def Process(cls, cmd): + with cls._creation_lock: + key = 'Process%d' % cls._process_next_id + cls._process_next_id += 1 + logging.debug('Creating process %s with cmd %r', key, cmd) + process = cls(cmd) + cls._processes[key] = process + return key + + def _Start(self): + logging.info('Starting process %s', self) + self.proc = subprocess42.Popen(self.cmd, stdout=subprocess42.PIPE, + stderr=subprocess42.PIPE, + detached=self.detached, cwd=self.cwd) + threading.Thread(target=self._reader).start() + + @classmethod + def Start(cls, key): + cls._processes[key]._Start() + + @classmethod + def SetCwd(cls, key, cwd): + """Sets the process's cwd.""" + logging.debug('Setting %s cwd to %s', key, cwd) + cls._processes[key].cwd = cwd + + @classmethod + def SetDetached(cls, key): + """Creates a detached process.""" + logging.debug('Setting %s.detached = True', key) + cls._processes[key].detached = True + + @classmethod + def SetVerbose(cls, key): + """Sets the stdout and stderr to be emitted locally.""" + logging.debug('Setting %s.verbose = True', key) + cls._processes[key].verbose = True + + @classmethod + def Terminate(cls, key): + logging.debug('Terminating process %s', key) + cls._processes[key].proc.terminate() + + @classmethod + def Kill(cls, key): + logging.debug('Killing process %s', key) + cls._processes[key].proc.kill() + + @classmethod + def Delete(cls, key): + if cls.GetReturncode(key) is None: + logging.warning('Killing %s before deleting it', key) + cls.Kill(key) + logging.debug('Deleting process %s', key) + cls._processes.pop(key) + + @classmethod + def GetReturncode(cls, key): + return cls._processes[key].proc.returncode + + @classmethod + def ReadStdout(cls, key): + """Returns all stdout since the last call to ReadStdout. + + This call allows the user to read stdout while the process is running. + However each call will flush the local stdout buffer. In order to make + multiple calls to ReadStdout and to retain the entire output the results + of this call will need to be buffered in the calling code. + """ + proc = cls._processes[key] + with proc.data_lock: + # Perform a "read" on the stdout data + stdout = proc.stdout + proc.stdout = '' + return stdout + + @classmethod + def ReadStderr(cls, key): + """Returns all stderr read since the last call to ReadStderr. + + See ReadStdout for additional details. + """ + proc = cls._processes[key] + with proc.data_lock: + # Perform a "read" on the stderr data + stderr = proc.stderr + proc.stderr = '' + return stderr + + @classmethod + def ReadOutput(cls, key): + """Returns the (stdout, stderr) since the last Read* call. + + See ReadStdout for additional details. + """ + return cls.ReadStdout(key), cls.ReadStderr(key) + + @classmethod + def Wait(cls, key): + return cls._processes[key].proc.wait() + + @classmethod + def Poll(cls, key): + return cls._processes[key].proc.poll() + + @classmethod + def GetPid(cls, key): + return cls._processes[key].proc.pid diff --git a/testing/legion/rpc_methods.py b/testing/legion/rpc_methods.py index 1a59f28..24d0312 100644 --- a/testing/legion/rpc_methods.py +++ b/testing/legion/rpc_methods.py @@ -4,18 +4,13 @@ """Defines the task RPC methods.""" +import logging import os import sys -import logging import threading #pylint: disable=relative-import -import common_lib - -# Map swarming_client to use subprocess42 -sys.path.append(common_lib.SWARMING_DIR) - -from utils import subprocess42 +import process class RPCMethods(object): @@ -25,7 +20,7 @@ class RPCMethods(object): def __init__(self, server): self._server = server - self.subprocess = Subprocess + self.subprocess = process.Process def _dispatch(self, method, params): obj = self @@ -54,161 +49,3 @@ class RPCMethods(object): """ t = threading.Thread(target=self._server.shutdown) t.start() - - -class Subprocess(object): - """Implements a server-based non-blocking subprocess. - - This non-blocking subprocess allows the caller to continue operating while - also able to interact with this subprocess based on a key returned to - the caller at the time of creation. - - Creation args are set via Set* methods called after calling Process but - before calling Start. This is due to a limitation of the XML-RPC - implementation not supporting keyword arguments. - """ - - _processes = {} - _process_next_id = 0 - _creation_lock = threading.Lock() - - def __init__(self, cmd): - self.stdout = '' - self.stderr = '' - self.cmd = cmd - self.proc = None - self.cwd = None - self.verbose = False - self.detached = False - self.data_lock = threading.Lock() - - def __str__(self): - return '%r, cwd=%r, verbose=%r, detached=%r' % ( - self.cmd, self.cwd, self.verbose, self.detached) - - def _reader(self): - for pipe, data in self.proc.yield_any(): - with self.data_lock: - if pipe == 'stdout': - self.stdout += data - if self.verbose: - sys.stdout.write(data) - else: - self.stderr += data - if self.verbose: - sys.stderr.write(data) - - @classmethod - def KillAll(cls): - for key in cls._processes: - cls.Kill(key) - - @classmethod - def Process(cls, cmd): - with cls._creation_lock: - key = 'Process%d' % cls._process_next_id - cls._process_next_id += 1 - logging.debug('Creating process %s', key) - process = cls(cmd) - cls._processes[key] = process - return key - - def _Start(self): - logging.info('Starting process %s', self) - self.proc = subprocess42.Popen(self.cmd, stdout=subprocess42.PIPE, - stderr=subprocess42.PIPE, - detached=self.detached, cwd=self.cwd) - threading.Thread(target=self._reader).start() - - @classmethod - def Start(cls, key): - cls._processes[key]._Start() - - @classmethod - def SetCwd(cls, key, cwd): - """Sets the process's cwd.""" - logging.debug('Setting %s cwd to %s', key, cwd) - cls._processes[key].cwd = cwd - - @classmethod - def SetDetached(cls, key): - """Creates a detached process.""" - logging.debug('Setting %s to run detached', key) - cls._processes[key].detached = True - - @classmethod - def SetVerbose(cls, key): - """Sets the stdout and stderr to be emitted locally.""" - logging.debug('Setting %s to be verbose', key) - cls._processes[key].verbose = True - - @classmethod - def Terminate(cls, key): - logging.debug('Terminating process %s', key) - cls._processes[key].proc.terminate() - - @classmethod - def Kill(cls, key): - logging.debug('Killing process %s', key) - cls._processes[key].proc.kill() - - @classmethod - def Delete(cls, key): - if cls.GetReturncode(key) is None: - logging.warning('Killing %s before deleting it', key) - cls.Kill(key) - logging.debug('Deleting process %s', key) - cls._processes.pop(key) - - @classmethod - def GetReturncode(cls, key): - return cls._processes[key].proc.returncode - - @classmethod - def ReadStdout(cls, key): - """Returns all stdout since the last call to ReadStdout. - - This call allows the user to read stdout while the process is running. - However each call will flush the local stdout buffer. In order to make - multiple calls to ReadStdout and to retain the entire output the results - of this call will need to be buffered in the calling code. - """ - proc = cls._processes[key] - with proc.data_lock: - # Perform a "read" on the stdout data - stdout = proc.stdout - proc.stdout = '' - return stdout - - @classmethod - def ReadStderr(cls, key): - """Returns all stderr read since the last call to ReadStderr. - - See ReadStdout for additional details. - """ - proc = cls._processes[key] - with proc.data_lock: - # Perform a "read" on the stderr data - stderr = proc.stderr - proc.stderr = '' - return stderr - - @classmethod - def ReadOutput(cls, key): - """Returns the (stdout, stderr) since the last Read* call. - - See ReadStdout for additional details. - """ - return cls.ReadStdout(key), cls.ReadStderr(key) - - @classmethod - def Wait(cls, key): - return cls._processes[key].proc.wait() - - @classmethod - def Poll(cls, key): - return cls._processes[key].proc.poll() - - @classmethod - def GetPid(cls, key): - return cls._processes[key].proc.pid diff --git a/testing/legion/task_controller.py b/testing/legion/task_controller.py index df9ddbd..9514f44 100644 --- a/testing/legion/task_controller.py +++ b/testing/legion/task_controller.py @@ -17,6 +17,7 @@ import xmlrpclib #pylint: disable=relative-import import common_lib +import process ISOLATE_PY = os.path.join(common_lib.SWARMING_DIR, 'isolate.py') SWARMING_PY = os.path.join(common_lib.SWARMING_DIR, 'swarming.py') @@ -125,6 +126,10 @@ class TaskController(object): for task in cls._tasks: task.Release() + def Process(self, cmd, verbose=False, detached=False, cwd=None): + return process.ControllerProcessWrapper( + self.rpc, cmd, verbose, detached, cwd) + def _CreateOTP(self): """Creates the OTP.""" controller_name = socket.gethostname() |