diff options
-rw-r--r-- | testing/legion/common_lib.py | 8 | ||||
-rwxr-xr-x | testing/legion/examples/subprocess/subprocess_test.py | 36 | ||||
-rw-r--r-- | testing/legion/legion_test_case.py | 7 | ||||
-rw-r--r-- | testing/legion/process.py | 68 | ||||
-rw-r--r-- | testing/legion/rpc_methods.py | 28 | ||||
-rw-r--r-- | testing/legion/task_controller.py | 26 | ||||
-rwxr-xr-x | testing/legion/tools/legion.py | 2 |
7 files changed, 148 insertions, 27 deletions
diff --git a/testing/legion/common_lib.py b/testing/legion/common_lib.py index f8169d5..863f887 100644 --- a/testing/legion/common_lib.py +++ b/testing/legion/common_lib.py @@ -33,3 +33,11 @@ def InitLogging(): logging.basicConfig( format='%(asctime)s %(filename)s:%(lineno)s %(levelname)s] %(message)s', datefmt='%H:%M:%S', level=args.verbosity) + + +def GetOutputDir(): + """Get the isolated output directory specified on the command line.""" + parser = argparse.ArgumentParser() + parser.add_argument('--output-dir') + args, _ = parser.parse_known_args() + return args.output_dir diff --git a/testing/legion/examples/subprocess/subprocess_test.py b/testing/legion/examples/subprocess/subprocess_test.py index cea871a..4b7fb6f 100755 --- a/testing/legion/examples/subprocess/subprocess_test.py +++ b/testing/legion/examples/subprocess/subprocess_test.py @@ -19,6 +19,7 @@ TESTING_DIR = os.path.join( sys.path.append(TESTING_DIR) from legion import legion_test_case +from legion import jsonrpclib class ExampleTestController(legion_test_case.TestCase): @@ -90,6 +91,41 @@ class ExampleTestController(legion_test_case.TestCase): self.assertEqual(ls.GetReturncode(), 0) self.assertIn('task.isolate', ls.ReadStdout()) + def testProcessOutput(self): + """Tests that a process's output gets logged to a file in the output-dir.""" + code = ('import sys\n' + 'sys.stdout.write("Hello stdout")\n' + 'sys.stderr.write("Hello stderr")') + self.task.rpc.WriteFile('test.py', code) + proc = self.task.Process(['python', 'test.py'],) + + self.CheckProcessOutput('stdout', proc.key, 'Hello stdout') + self.CheckProcessOutput('stderr', proc.key, 'Hello stderr') + + def testCustomKey(self): + """Tests that a custom key passed to a process works correctly.""" + code = ('import sys\n' + 'sys.stdout.write("Hello CustomKey stdout")\n' + 'sys.stderr.write("Hello CustomKey stderr")') + self.task.rpc.WriteFile('test.py', code) + self.task.Process(['python', 'test.py'], key='CustomKey') + + self.CheckProcessOutput('stdout', 'CustomKey', 'Hello CustomKey stdout') + self.CheckProcessOutput('stderr', 'CustomKey', 'Hello CustomKey stderr') + + def testKeyReuse(self): + """Tests that a key cannot be reused.""" + self.task.Process(['ls'], key='KeyReuse') + self.assertRaises(jsonrpclib.Fault, self.task.Process, ['ls'], + key='KeyReuse') + + def CheckProcessOutput(self, pipe, key, expected): + """Checks that a process' output files are correct.""" + logging.info('Reading output file') + output_dir = self.task.rpc.GetOutputDir() + path = self.task.rpc.PathJoin(output_dir, '%s.%s' % (key, pipe)) + actual = self.task.rpc.ReadFile(path) + self.assertEqual(expected, actual) if __name__ == '__main__': legion_test_case.main() diff --git a/testing/legion/legion_test_case.py b/testing/legion/legion_test_case.py index 9514e8e..c70d154 100644 --- a/testing/legion/legion_test_case.py +++ b/testing/legion/legion_test_case.py @@ -36,6 +36,13 @@ class TestCase(unittest.TestCase): # Install the _RunTest method self._TestMethod = method setattr(self, test_name, self._RunTest) + self._output_dir = None + + @property + def output_dir(self): + if not self._output_dir: + self._output_dir = self.rpc.GetOutputDir() + return self._output_dir def _RunTest(self): """Runs the test method and provides banner info and error reporting.""" diff --git a/testing/legion/process.py b/testing/legion/process.py index 356db61..f3cabb5 100644 --- a/testing/legion/process.py +++ b/testing/legion/process.py @@ -9,6 +9,7 @@ process wrapper for easier access and usage of the task-side process. """ import logging +import os import subprocess import sys import threading @@ -29,30 +30,37 @@ class ControllerProcessWrapper(object): than calling the methods directly using the RPC object. """ - def __init__(self, rpc, cmd, verbose=False, detached=False, cwd=None): + def __init__(self, rpc, cmd, verbose=False, detached=False, cwd=None, + key=None): + logging.info('Creating a process with cmd=%s', cmd) self._rpc = rpc - self._id = rpc.subprocess.Process(cmd) + self._key = rpc.subprocess.Process(cmd, key) + logging.info('Process created with key=%s', self._key) if verbose: - self._rpc.subprocess.SetVerbose(self._id) + self._rpc.subprocess.SetVerbose(self._key) if detached: - self._rpc.subprocess.SetDetached(self._id) + self._rpc.subprocess.SetDetached(self._key) if cwd: self._rpc.subprocess.SetCwd(self._rpc, cwd) - self._rpc.subprocess.Start(self._id) + self._rpc.subprocess.Start(self._key) + + @property + def key(self): + return self._key def Terminate(self): - logging.debug('Terminating process %s', self._id) - return self._rpc.subprocess.Terminate(self._id) + logging.debug('Terminating process %s', self._key) + return self._rpc.subprocess.Terminate(self._key) def Kill(self): - logging.debug('Killing process %s', self._id) - self._rpc.subprocess.Kill(self._id) + logging.debug('Killing process %s', self._key) + self._rpc.subprocess.Kill(self._key) def Delete(self): - return self._rpc.subprocess.Delete(self._id) + return self._rpc.subprocess.Delete(self._key) def GetReturncode(self): - return self._rpc.subprocess.GetReturncode(self._id) + return self._rpc.subprocess.GetReturncode(self._key) def ReadStdout(self): """Returns all stdout since the last call to ReadStdout. @@ -62,30 +70,30 @@ class ControllerProcessWrapper(object): multiple calls to ReadStdout and to retain the entire output the results of this call will need to be buffered in the calling code. """ - return self._rpc.subprocess.ReadStdout(self._id) + return self._rpc.subprocess.ReadStdout(self._key) def ReadStderr(self): """Returns all stderr read since the last call to ReadStderr. See ReadStdout for additional details. """ - return self._rpc.subprocess.ReadStderr(self._id) + return self._rpc.subprocess.ReadStderr(self._key) def ReadOutput(self): """Returns the (stdout, stderr) since the last Read* call. See ReadStdout for additional details. """ - return self._rpc.subprocess.ReadOutput(self._id) + return self._rpc.subprocess.ReadOutput(self._key) def Wait(self): - return self._rpc.subprocess.Wait(self._id) + return self._rpc.subprocess.Wait(self._key) def Poll(self): - return self._rpc.subprocess.Poll(self._id) + return self._rpc.subprocess.Poll(self._key) def GetPid(self): - return self._rpc.subprocess.GetPid(self._id) + return self._rpc.subprocess.GetPid(self._key) @@ -105,15 +113,21 @@ class Process(object): _process_next_id = 0 _creation_lock = threading.Lock() - def __init__(self, cmd): + def __init__(self, cmd, key): self.stdout = '' self.stderr = '' + self.key = key self.cmd = cmd self.proc = None self.cwd = None self.verbose = False self.detached = False self.data_lock = threading.Lock() + self.stdout_file = open(self._CreateOutputFilename('stdout'), 'wb+') + self.stderr_file = open(self._CreateOutputFilename('stderr'), 'wb+') + + def _CreateOutputFilename(self, fname): + return os.path.join(common_lib.GetOutputDir(), '%s.%s' % (self.key, fname)) def __str__(self): return '%r, cwd=%r, verbose=%r, detached=%r' % ( @@ -124,10 +138,14 @@ class Process(object): with self.data_lock: if pipe == 'stdout': self.stdout += data + self.stdout_file.write(data) + self.stdout_file.flush() if self.verbose: sys.stdout.write(data) else: self.stderr += data + self.stderr_file.write(data) + self.stderr_file.flush() if self.verbose: sys.stderr.write(data) @@ -137,13 +155,15 @@ class Process(object): cls.Kill(key) @classmethod - def Process(cls, cmd): + def Process(cls, cmd, key=None): with cls._creation_lock: - key = 'Process%d' % cls._process_next_id - cls._process_next_id += 1 - logging.debug('Creating process %s with cmd %r', key, cmd) - process = cls(cmd) - cls._processes[key] = process + if not key: + key = 'Process%d' % cls._process_next_id + cls._process_next_id += 1 + if key in cls._processes: + raise KeyError('Key %s already in use' % key) + logging.debug('Creating process %s with cmd %r', key, cmd) + cls._processes[key] = cls(cmd, key) return key def _Start(self): diff --git a/testing/legion/rpc_methods.py b/testing/legion/rpc_methods.py index 24d0312..aaab7e0 100644 --- a/testing/legion/rpc_methods.py +++ b/testing/legion/rpc_methods.py @@ -10,6 +10,7 @@ import sys import threading #pylint: disable=relative-import +import common_lib import process @@ -49,3 +50,30 @@ class RPCMethods(object): """ t = threading.Thread(target=self._server.shutdown) t.start() + + def GetOutputDir(self): + """Returns the isolated output directory on the task machine.""" + return common_lib.GetOutputDir() + + def WriteFile(self, path, text, mode='wb+'): + """Writes a file on the task machine.""" + with open(path, mode) as fh: + fh.write(text) + + def ReadFile(self, path, mode='rb'): + """Reads a file from the local task machine.""" + with open(path, mode) as fh: + return fh.read() + + def PathJoin(self, *parts): + """Performs an os.path.join on the task machine. + + This is needed due to the fact that there is no guarantee that os.sep will + be the same across all machines in a particular test. This method will + join the path parts locally to ensure the correct separator is used. + """ + return os.path.join(*parts) + + def ListDir(self, path): + """Returns the results of os.listdir.""" + return os.listdir(path) diff --git a/testing/legion/task_controller.py b/testing/legion/task_controller.py index cb7ab15..7e0aa99 100644 --- a/testing/legion/task_controller.py +++ b/testing/legion/task_controller.py @@ -69,6 +69,7 @@ class TaskController(object): self._ip_address = None self._otp = self._CreateOTP() self._rpc = None + self._output_dir = None run_id = run_id or datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') self._task_name = '%s/%s/%s' % ( @@ -122,14 +123,19 @@ class TaskController(object): level = logging.getLevelName(level) self._verbosity = level #pylint: disable=attribute-defined-outside-init + @property + def output_dir(self): + if not self._output_dir: + self._output_dir = self.rpc.GetOutputDir() + return self._output_dir + @classmethod def ReleaseAllTasks(cls): for task in cls._tasks: task.Release() - def Process(self, cmd, verbose=False, detached=False, cwd=None): - return process.ControllerProcessWrapper( - self.rpc, cmd, verbose, detached, cwd) + def Process(self, cmd, *args, **kwargs): + return process.ControllerProcessWrapper(self.rpc, cmd, *args, **kwargs) def _CreateOTP(self): """Creates the OTP.""" @@ -161,6 +167,8 @@ class TaskController(object): def Release(self): """Quits the task's RPC server so it can release the machine.""" if self._rpc is not None and self._connected: + logging.info('Copying output-dir files to controller') + self.RetrieveOutputFiles() logging.info('Releasing %s', self._name) try: self._rpc.Quit() @@ -193,6 +201,7 @@ class TaskController(object): '--otp', self._otp, '--verbosity', self._verbosity, '--idle-timeout', str(self._idle_timeout_secs), + '--output-dir', '${ISOLATED_OUTDIR}' ]) self._ExecuteProcess(cmd) @@ -212,3 +221,14 @@ class TaskController(object): self._rpc = ssl_util.SslRpcServer.Connect(self._ip_address) logging.info('%s connected from %s', self._name, ip_address) self._connect_event.set() + + def RetrieveOutputFiles(self): + """Retrieves all files in the output-dir.""" + files = self.rpc.ListDir(self.output_dir) + for fname in files: + remote_path = self.rpc.PathJoin(self.output_dir, fname) + local_name = os.path.join(common_lib.GetOutputDir(), + '%s.%s' % (self.name, fname)) + contents = self.rpc.ReadFile(remote_path) + with open(local_name, 'wb+') as fh: + fh.write(contents) diff --git a/testing/legion/tools/legion.py b/testing/legion/tools/legion.py index 7b67133..4ddfc5e 100755 --- a/testing/legion/tools/legion.py +++ b/testing/legion/tools/legion.py @@ -133,6 +133,8 @@ def GetSwarmingCommandLine(args): cmd.append('--') + # Specify the output dir + cmd.extend(['--output-dir', '${ISOLATED_OUTDIR}']) # Task name/hash values for name, isolated in args.tasks: cmd.extend(['--' + name, Archive(isolated, args.isolate_server)]) |