summaryrefslogtreecommitdiffstats
path: root/testing/legion
diff options
context:
space:
mode:
authormmeade <mmeade@chromium.org>2015-04-02 18:03:34 -0700
committerCommit bot <commit-bot@chromium.org>2015-04-03 01:04:06 +0000
commit6dc766ea57cd776e35ca0f6f6d89b42e613f7e05 (patch)
treecd0e500e95860c34fa2f103fe258b5880b9a545e /testing/legion
parentd0bdd942dd0f8e75d8c3bdac3dcf804925b95af3 (diff)
downloadchromium_src-6dc766ea57cd776e35ca0f6f6d89b42e613f7e05.zip
chromium_src-6dc766ea57cd776e35ca0f6f6d89b42e613f7e05.tar.gz
chromium_src-6dc766ea57cd776e35ca0f6f6d89b42e613f7e05.tar.bz2
Factoring out the subprocess code into a stand-alone module.
Also adding a controller-side wrapper to make interacting with the process more intuitive. BUG= Review URL: https://codereview.chromium.org/1057943002 Cr-Commit-Position: refs/heads/master@{#323576}
Diffstat (limited to 'testing/legion')
-rw-r--r--testing/legion/common_lib.py2
-rwxr-xr-xtesting/legion/examples/hello_world/controller_test.py12
-rwxr-xr-xtesting/legion/examples/subprocess/subprocess_test.py36
-rw-r--r--testing/legion/legion.isolate1
-rw-r--r--testing/legion/process.py247
-rw-r--r--testing/legion/rpc_methods.py169
-rw-r--r--testing/legion/task_controller.py5
7 files changed, 279 insertions, 193 deletions
diff --git a/testing/legion/common_lib.py b/testing/legion/common_lib.py
index c752e0f..d3d4c3c 100644
--- a/testing/legion/common_lib.py
+++ b/testing/legion/common_lib.py
@@ -40,4 +40,4 @@ def ConnectToServer(server):
"""Connect to an RPC server."""
addr = 'http://%s:%d' % (server, SERVER_PORT)
logging.debug('Connecting to RPC server at %s', addr)
- return xmlrpclib.Server(addr)
+ return xmlrpclib.Server(addr, allow_none=True)
diff --git a/testing/legion/examples/hello_world/controller_test.py b/testing/legion/examples/hello_world/controller_test.py
index 37afcef..3456662 100755
--- a/testing/legion/examples/hello_world/controller_test.py
+++ b/testing/legion/examples/hello_world/controller_test.py
@@ -32,7 +32,7 @@ class ExampleTestController(test_controller.TestController):
"""Create a task object and set the proper values."""
task = self.CreateNewTask(
isolated_hash=isolated_hash,
- dimensions={'os': 'Ubuntu-14.04', 'pool': 'Legion'}, priority=200,
+ dimensions={'os': 'Ubuntu-14.04'},
idle_timeout_secs=90, connection_timeout_secs=90,
verbosity=logging.INFO,
run_id=1)
@@ -70,11 +70,11 @@ class ExampleTestController(test_controller.TestController):
def CallTaskTest(self, task):
"""Call task_test.py name on a task."""
logging.info('Calling Subprocess to run "./task_test.py %s"', task.name)
- proc = task.rpc.subprocess.Popen(['./task_test.py', task.name])
- task.rpc.subprocess.Wait(proc)
- retcode = task.rpc.subprocess.GetReturncode(proc)
- stdout = task.rpc.subprocess.ReadStdout(proc)
- stderr = task.rpc.subprocess.ReadStderr(proc)
+ proc = task.Process(['./task_test.py', task.name])
+ proc.Wait()
+ retcode = proc.GetReturncode()
+ stdout = proc.ReadStdout()
+ stderr = proc.ReadStderr()
logging.info('retcode: %s, stdout: %s, stderr: %s', retcode, stdout, stderr)
diff --git a/testing/legion/examples/subprocess/subprocess_test.py b/testing/legion/examples/subprocess/subprocess_test.py
index 1a28ddd..7696ded 100755
--- a/testing/legion/examples/subprocess/subprocess_test.py
+++ b/testing/legion/examples/subprocess/subprocess_test.py
@@ -32,7 +32,7 @@ class ExampleTestController(test_controller.TestController):
self.task = self.CreateNewTask(
isolated_hash=args.task_hash,
- dimensions={'os': 'Ubuntu-14.04', 'pool': 'Chromoting'},
+ dimensions={'os': 'Ubuntu-14.04'},
idle_timeout_secs=90, connection_timeout_secs=90,
verbosity=logging.DEBUG)
self.task.Create()
@@ -47,42 +47,38 @@ class ExampleTestController(test_controller.TestController):
def TestMultipleProcesses(self):
start = time.time()
- sleep20 = self.task.rpc.subprocess.Process(['sleep', '20'])
- self.task.rpc.subprocess.Start(sleep20)
- sleep10 = self.task.rpc.subprocess.Process(['sleep', '10'])
- self.task.rpc.subprocess.Start(sleep10)
+ sleep10 = self.task.Process(['sleep', '10'])
+ sleep20 = self.task.Process(['sleep', '20'])
- self.task.rpc.subprocess.Wait(sleep10)
+ sleep10.Wait()
elapsed = time.time() - start
assert elapsed >= 10 and elapsed < 11
- self.task.rpc.subprocess.Wait(sleep20)
+ sleep20.Wait()
elapsed = time.time() - start
assert elapsed >= 20
- self.task.rpc.subprocess.Delete(sleep20)
- self.task.rpc.subprocess.Delete(sleep10)
+ sleep10.Delete()
+ sleep20.Delete()
def TestTerminate(self):
start = time.time()
- proc = self.task.rpc.subprocess.Process(['sleep', '20'])
- self.task.rpc.subprocess.Start(proc)
- self.task.rpc.subprocess.Terminate(proc)
+ sleep20 = self.task.Process(['sleep', '20'])
+ sleep20.Terminate()
try:
- self.task.rpc.subprocess.Wait(proc)
+ sleep20.Wait()
except xmlrpclib.Fault:
pass
finally:
- self.task.rpc.subprocess.Delete(proc)
+ sleep20.Delete()
assert time.time() - start < 20
def TestLs(self):
- proc = self.task.rpc.subprocess.Process(['ls'])
- self.task.rpc.subprocess.Start(proc)
- self.task.rpc.subprocess.Wait(proc)
- assert self.task.rpc.subprocess.GetReturncode(proc) == 0
- assert 'task.isolate' in self.task.rpc.subprocess.ReadStdout(proc)
- self.task.rpc.subprocess.Delete(proc)
+ ls = self.task.Process(['ls'])
+ ls.Wait()
+ assert ls.GetReturncode() == 0
+ assert 'task.isolate' in ls.ReadStdout()
+ ls.Delete()
if __name__ == '__main__':
diff --git a/testing/legion/legion.isolate b/testing/legion/legion.isolate
index 774b27a..63982e0 100644
--- a/testing/legion/legion.isolate
+++ b/testing/legion/legion.isolate
@@ -8,6 +8,7 @@
'__init__.py',
'common_lib.py',
'legion.isolate',
+ 'process.py',
'rpc_methods.py',
'rpc_server.py',
'run_task.py',
diff --git a/testing/legion/process.py b/testing/legion/process.py
new file mode 100644
index 0000000..356db61
--- /dev/null
+++ b/testing/legion/process.py
@@ -0,0 +1,247 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""RPC compatible subprocess-type module.
+
+This module defined both a task-side process class as well as a controller-side
+process wrapper for easier access and usage of the task-side process.
+"""
+
+import logging
+import subprocess
+import sys
+import threading
+
+#pylint: disable=relative-import
+import common_lib
+
+# Map swarming_client to use subprocess42
+sys.path.append(common_lib.SWARMING_DIR)
+
+from utils import subprocess42
+
+
+class ControllerProcessWrapper(object):
+ """Controller-side process wrapper class.
+
+ This class provides a more intuitive interface to task-side processes
+ than calling the methods directly using the RPC object.
+ """
+
+ def __init__(self, rpc, cmd, verbose=False, detached=False, cwd=None):
+ self._rpc = rpc
+ self._id = rpc.subprocess.Process(cmd)
+ if verbose:
+ self._rpc.subprocess.SetVerbose(self._id)
+ if detached:
+ self._rpc.subprocess.SetDetached(self._id)
+ if cwd:
+ self._rpc.subprocess.SetCwd(self._rpc, cwd)
+ self._rpc.subprocess.Start(self._id)
+
+ def Terminate(self):
+ logging.debug('Terminating process %s', self._id)
+ return self._rpc.subprocess.Terminate(self._id)
+
+ def Kill(self):
+ logging.debug('Killing process %s', self._id)
+ self._rpc.subprocess.Kill(self._id)
+
+ def Delete(self):
+ return self._rpc.subprocess.Delete(self._id)
+
+ def GetReturncode(self):
+ return self._rpc.subprocess.GetReturncode(self._id)
+
+ def ReadStdout(self):
+ """Returns all stdout since the last call to ReadStdout.
+
+ This call allows the user to read stdout while the process is running.
+ However each call will flush the local stdout buffer. In order to make
+ multiple calls to ReadStdout and to retain the entire output the results
+ of this call will need to be buffered in the calling code.
+ """
+ return self._rpc.subprocess.ReadStdout(self._id)
+
+ def ReadStderr(self):
+ """Returns all stderr read since the last call to ReadStderr.
+
+ See ReadStdout for additional details.
+ """
+ return self._rpc.subprocess.ReadStderr(self._id)
+
+ def ReadOutput(self):
+ """Returns the (stdout, stderr) since the last Read* call.
+
+ See ReadStdout for additional details.
+ """
+ return self._rpc.subprocess.ReadOutput(self._id)
+
+ def Wait(self):
+ return self._rpc.subprocess.Wait(self._id)
+
+ def Poll(self):
+ return self._rpc.subprocess.Poll(self._id)
+
+ def GetPid(self):
+ return self._rpc.subprocess.GetPid(self._id)
+
+
+
+class Process(object):
+ """Implements a task-side non-blocking subprocess.
+
+ This non-blocking subprocess allows the caller to continue operating while
+ also able to interact with this subprocess based on a key returned to
+ the caller at the time of creation.
+
+ Creation args are set via Set* methods called after calling Process but
+ before calling Start. This is due to a limitation of the XML-RPC
+ implementation not supporting keyword arguments.
+ """
+
+ _processes = {}
+ _process_next_id = 0
+ _creation_lock = threading.Lock()
+
+ def __init__(self, cmd):
+ self.stdout = ''
+ self.stderr = ''
+ self.cmd = cmd
+ self.proc = None
+ self.cwd = None
+ self.verbose = False
+ self.detached = False
+ self.data_lock = threading.Lock()
+
+ def __str__(self):
+ return '%r, cwd=%r, verbose=%r, detached=%r' % (
+ self.cmd, self.cwd, self.verbose, self.detached)
+
+ def _reader(self):
+ for pipe, data in self.proc.yield_any():
+ with self.data_lock:
+ if pipe == 'stdout':
+ self.stdout += data
+ if self.verbose:
+ sys.stdout.write(data)
+ else:
+ self.stderr += data
+ if self.verbose:
+ sys.stderr.write(data)
+
+ @classmethod
+ def KillAll(cls):
+ for key in cls._processes:
+ cls.Kill(key)
+
+ @classmethod
+ def Process(cls, cmd):
+ with cls._creation_lock:
+ key = 'Process%d' % cls._process_next_id
+ cls._process_next_id += 1
+ logging.debug('Creating process %s with cmd %r', key, cmd)
+ process = cls(cmd)
+ cls._processes[key] = process
+ return key
+
+ def _Start(self):
+ logging.info('Starting process %s', self)
+ self.proc = subprocess42.Popen(self.cmd, stdout=subprocess42.PIPE,
+ stderr=subprocess42.PIPE,
+ detached=self.detached, cwd=self.cwd)
+ threading.Thread(target=self._reader).start()
+
+ @classmethod
+ def Start(cls, key):
+ cls._processes[key]._Start()
+
+ @classmethod
+ def SetCwd(cls, key, cwd):
+ """Sets the process's cwd."""
+ logging.debug('Setting %s cwd to %s', key, cwd)
+ cls._processes[key].cwd = cwd
+
+ @classmethod
+ def SetDetached(cls, key):
+ """Creates a detached process."""
+ logging.debug('Setting %s.detached = True', key)
+ cls._processes[key].detached = True
+
+ @classmethod
+ def SetVerbose(cls, key):
+ """Sets the stdout and stderr to be emitted locally."""
+ logging.debug('Setting %s.verbose = True', key)
+ cls._processes[key].verbose = True
+
+ @classmethod
+ def Terminate(cls, key):
+ logging.debug('Terminating process %s', key)
+ cls._processes[key].proc.terminate()
+
+ @classmethod
+ def Kill(cls, key):
+ logging.debug('Killing process %s', key)
+ cls._processes[key].proc.kill()
+
+ @classmethod
+ def Delete(cls, key):
+ if cls.GetReturncode(key) is None:
+ logging.warning('Killing %s before deleting it', key)
+ cls.Kill(key)
+ logging.debug('Deleting process %s', key)
+ cls._processes.pop(key)
+
+ @classmethod
+ def GetReturncode(cls, key):
+ return cls._processes[key].proc.returncode
+
+ @classmethod
+ def ReadStdout(cls, key):
+ """Returns all stdout since the last call to ReadStdout.
+
+ This call allows the user to read stdout while the process is running.
+ However each call will flush the local stdout buffer. In order to make
+ multiple calls to ReadStdout and to retain the entire output the results
+ of this call will need to be buffered in the calling code.
+ """
+ proc = cls._processes[key]
+ with proc.data_lock:
+ # Perform a "read" on the stdout data
+ stdout = proc.stdout
+ proc.stdout = ''
+ return stdout
+
+ @classmethod
+ def ReadStderr(cls, key):
+ """Returns all stderr read since the last call to ReadStderr.
+
+ See ReadStdout for additional details.
+ """
+ proc = cls._processes[key]
+ with proc.data_lock:
+ # Perform a "read" on the stderr data
+ stderr = proc.stderr
+ proc.stderr = ''
+ return stderr
+
+ @classmethod
+ def ReadOutput(cls, key):
+ """Returns the (stdout, stderr) since the last Read* call.
+
+ See ReadStdout for additional details.
+ """
+ return cls.ReadStdout(key), cls.ReadStderr(key)
+
+ @classmethod
+ def Wait(cls, key):
+ return cls._processes[key].proc.wait()
+
+ @classmethod
+ def Poll(cls, key):
+ return cls._processes[key].proc.poll()
+
+ @classmethod
+ def GetPid(cls, key):
+ return cls._processes[key].proc.pid
diff --git a/testing/legion/rpc_methods.py b/testing/legion/rpc_methods.py
index 1a59f28..24d0312 100644
--- a/testing/legion/rpc_methods.py
+++ b/testing/legion/rpc_methods.py
@@ -4,18 +4,13 @@
"""Defines the task RPC methods."""
+import logging
import os
import sys
-import logging
import threading
#pylint: disable=relative-import
-import common_lib
-
-# Map swarming_client to use subprocess42
-sys.path.append(common_lib.SWARMING_DIR)
-
-from utils import subprocess42
+import process
class RPCMethods(object):
@@ -25,7 +20,7 @@ class RPCMethods(object):
def __init__(self, server):
self._server = server
- self.subprocess = Subprocess
+ self.subprocess = process.Process
def _dispatch(self, method, params):
obj = self
@@ -54,161 +49,3 @@ class RPCMethods(object):
"""
t = threading.Thread(target=self._server.shutdown)
t.start()
-
-
-class Subprocess(object):
- """Implements a server-based non-blocking subprocess.
-
- This non-blocking subprocess allows the caller to continue operating while
- also able to interact with this subprocess based on a key returned to
- the caller at the time of creation.
-
- Creation args are set via Set* methods called after calling Process but
- before calling Start. This is due to a limitation of the XML-RPC
- implementation not supporting keyword arguments.
- """
-
- _processes = {}
- _process_next_id = 0
- _creation_lock = threading.Lock()
-
- def __init__(self, cmd):
- self.stdout = ''
- self.stderr = ''
- self.cmd = cmd
- self.proc = None
- self.cwd = None
- self.verbose = False
- self.detached = False
- self.data_lock = threading.Lock()
-
- def __str__(self):
- return '%r, cwd=%r, verbose=%r, detached=%r' % (
- self.cmd, self.cwd, self.verbose, self.detached)
-
- def _reader(self):
- for pipe, data in self.proc.yield_any():
- with self.data_lock:
- if pipe == 'stdout':
- self.stdout += data
- if self.verbose:
- sys.stdout.write(data)
- else:
- self.stderr += data
- if self.verbose:
- sys.stderr.write(data)
-
- @classmethod
- def KillAll(cls):
- for key in cls._processes:
- cls.Kill(key)
-
- @classmethod
- def Process(cls, cmd):
- with cls._creation_lock:
- key = 'Process%d' % cls._process_next_id
- cls._process_next_id += 1
- logging.debug('Creating process %s', key)
- process = cls(cmd)
- cls._processes[key] = process
- return key
-
- def _Start(self):
- logging.info('Starting process %s', self)
- self.proc = subprocess42.Popen(self.cmd, stdout=subprocess42.PIPE,
- stderr=subprocess42.PIPE,
- detached=self.detached, cwd=self.cwd)
- threading.Thread(target=self._reader).start()
-
- @classmethod
- def Start(cls, key):
- cls._processes[key]._Start()
-
- @classmethod
- def SetCwd(cls, key, cwd):
- """Sets the process's cwd."""
- logging.debug('Setting %s cwd to %s', key, cwd)
- cls._processes[key].cwd = cwd
-
- @classmethod
- def SetDetached(cls, key):
- """Creates a detached process."""
- logging.debug('Setting %s to run detached', key)
- cls._processes[key].detached = True
-
- @classmethod
- def SetVerbose(cls, key):
- """Sets the stdout and stderr to be emitted locally."""
- logging.debug('Setting %s to be verbose', key)
- cls._processes[key].verbose = True
-
- @classmethod
- def Terminate(cls, key):
- logging.debug('Terminating process %s', key)
- cls._processes[key].proc.terminate()
-
- @classmethod
- def Kill(cls, key):
- logging.debug('Killing process %s', key)
- cls._processes[key].proc.kill()
-
- @classmethod
- def Delete(cls, key):
- if cls.GetReturncode(key) is None:
- logging.warning('Killing %s before deleting it', key)
- cls.Kill(key)
- logging.debug('Deleting process %s', key)
- cls._processes.pop(key)
-
- @classmethod
- def GetReturncode(cls, key):
- return cls._processes[key].proc.returncode
-
- @classmethod
- def ReadStdout(cls, key):
- """Returns all stdout since the last call to ReadStdout.
-
- This call allows the user to read stdout while the process is running.
- However each call will flush the local stdout buffer. In order to make
- multiple calls to ReadStdout and to retain the entire output the results
- of this call will need to be buffered in the calling code.
- """
- proc = cls._processes[key]
- with proc.data_lock:
- # Perform a "read" on the stdout data
- stdout = proc.stdout
- proc.stdout = ''
- return stdout
-
- @classmethod
- def ReadStderr(cls, key):
- """Returns all stderr read since the last call to ReadStderr.
-
- See ReadStdout for additional details.
- """
- proc = cls._processes[key]
- with proc.data_lock:
- # Perform a "read" on the stderr data
- stderr = proc.stderr
- proc.stderr = ''
- return stderr
-
- @classmethod
- def ReadOutput(cls, key):
- """Returns the (stdout, stderr) since the last Read* call.
-
- See ReadStdout for additional details.
- """
- return cls.ReadStdout(key), cls.ReadStderr(key)
-
- @classmethod
- def Wait(cls, key):
- return cls._processes[key].proc.wait()
-
- @classmethod
- def Poll(cls, key):
- return cls._processes[key].proc.poll()
-
- @classmethod
- def GetPid(cls, key):
- return cls._processes[key].proc.pid
diff --git a/testing/legion/task_controller.py b/testing/legion/task_controller.py
index df9ddbd..9514f44 100644
--- a/testing/legion/task_controller.py
+++ b/testing/legion/task_controller.py
@@ -17,6 +17,7 @@ import xmlrpclib
#pylint: disable=relative-import
import common_lib
+import process
ISOLATE_PY = os.path.join(common_lib.SWARMING_DIR, 'isolate.py')
SWARMING_PY = os.path.join(common_lib.SWARMING_DIR, 'swarming.py')
@@ -125,6 +126,10 @@ class TaskController(object):
for task in cls._tasks:
task.Release()
+ def Process(self, cmd, verbose=False, detached=False, cwd=None):
+ return process.ControllerProcessWrapper(
+ self.rpc, cmd, verbose, detached, cwd)
+
def _CreateOTP(self):
"""Creates the OTP."""
controller_name = socket.gethostname()