summaryrefslogtreecommitdiffstats
path: root/third_party/typ
diff options
context:
space:
mode:
authordpranke <dpranke@chromium.org>2014-10-14 19:21:46 -0700
committerCommit bot <commit-bot@chromium.org>2014-10-15 02:24:42 +0000
commit8b924ce2f4c05da093daf504a298f6dcff4b9575 (patch)
tree37dd9709a12fa8394c6c2a3783660cf064b63f21 /third_party/typ
parent40e54e0ca748d0a7a79bfa6db4a4677925b7e1fc (diff)
downloadchromium_src-8b924ce2f4c05da093daf504a298f6dcff4b9575.zip
chromium_src-8b924ce2f4c05da093daf504a298f6dcff4b9575.tar.gz
chromium_src-8b924ce2f4c05da093daf504a298f6dcff4b9575.tar.bz2
Rev typ to 0.8.4.
This change pulls in the new --all flag for typ (so that we can run disabled tests), and pulls in the ability to set default values for the command line arguments. It also adds a bunch more test coverage, but that testing didn't expose any common bugs as I recall ... TBR=dtu@chromium.org BUG=402172 Review URL: https://codereview.chromium.org/654123003 Cr-Commit-Position: refs/heads/master@{#299630}
Diffstat (limited to 'third_party/typ')
-rw-r--r--third_party/typ/README.chromium4
-rw-r--r--third_party/typ/README.rst3
-rwxr-xr-xthird_party/typ/run31
-rwxr-xr-xthird_party/typ/tools/cov.py5
-rw-r--r--third_party/typ/typ/__init__.py3
-rw-r--r--third_party/typ/typ/__main__.py2
-rw-r--r--third_party/typ/typ/arg_parser.py22
-rw-r--r--third_party/typ/typ/cmdline.py30
-rw-r--r--third_party/typ/typ/fakes/host_fake.py91
-rw-r--r--third_party/typ/typ/fakes/tests/host_fake_test.py37
-rw-r--r--third_party/typ/typ/fakes/unittest_fakes.py176
-rw-r--r--third_party/typ/typ/host.py30
-rw-r--r--third_party/typ/typ/json_results.py3
-rw-r--r--third_party/typ/typ/pool.py134
-rw-r--r--third_party/typ/typ/runner.py209
-rw-r--r--third_party/typ/typ/test_case.py16
-rw-r--r--third_party/typ/typ/tests/cmdline_test.py45
-rw-r--r--third_party/typ/typ/tests/host_test.py41
-rw-r--r--third_party/typ/typ/tests/main_test.py173
-rw-r--r--third_party/typ/typ/tests/pool_test.py131
-rw-r--r--third_party/typ/typ/tests/runner_test.py88
-rw-r--r--third_party/typ/typ/version.py2
22 files changed, 697 insertions, 579 deletions
diff --git a/third_party/typ/README.chromium b/third_party/typ/README.chromium
index 7181783..a6fcca5 100644
--- a/third_party/typ/README.chromium
+++ b/third_party/typ/README.chromium
@@ -1,7 +1,7 @@
Name: typ
URL: https://github.com/dpranke/typ.git
-Version: 0.8.1
-Revision: 2cb7549d40852df0b9f9e323e0d31ff5bfcbace7
+Version: 0.8.4
+Revision: 5e73c13993ecb44b00f93ca2fe97a7c18158f7ff
Security Critical: no
License: Apache 2.0
License File: NOT_SHIPPED
diff --git a/third_party/typ/README.rst b/third_party/typ/README.rst
index 217683f..dc03841 100644
--- a/third_party/typ/README.rst
+++ b/third_party/typ/README.rst
@@ -46,8 +46,6 @@ Work remaining
typ is still a work in progress, but it's getting close to being done.
Things remaining for 1.0, roughly in priority order:
-- Add input validation on all of the public APIs.
-- Get test coverage for the remaining untested code.
- Implement a non-python file format for testing command line interfaces
- Write documentation
@@ -62,7 +60,6 @@ Possible future work
- --debugger improvements:
- make it skip the initial breakpoint?
- - make it play nicely w/ TestCase.check()?
- Support testing javascript, java, c++/gtest-style binaries?
- Support for test sharding in addition to parallel execution (so that
diff --git a/third_party/typ/run b/third_party/typ/run
index 22ea7ad..c825055 100755
--- a/third_party/typ/run
+++ b/third_party/typ/run
@@ -12,9 +12,12 @@ from tools import cov
is_python3 = bool(sys.version_info.major == 3)
has_python34 = False
+verbose = False
def call(*args, **kwargs):
+ if verbose:
+ print(' '.join(args[0]))
ret = subprocess.call(*args, **kwargs)
if ret != 0:
sys.exit(ret)
@@ -22,6 +25,9 @@ def call(*args, **kwargs):
def main(argv):
parser = argparse.ArgumentParser()
+ parser.add_argument('--no3', action='store_true',
+ help='Do not run the tests under Python 3.')
+ parser.add_argument('-v', '--verbose', action='store_true')
subps = parser.add_subparsers()
subp = subps.add_parser('build', help='build the package')
@@ -69,13 +75,16 @@ def main(argv):
args = parser.parse_args(argv)
+ global verbose
+ if args.verbose:
+ verbose = True
global has_python34
- try:
- if subprocess.checkout(['python3', '--version']).startswith(
- 'Python 3.4'):
- has_python34 = True
- except:
- pass
+ if not args.no3:
+ try:
+ ver = subprocess.check_output(['python3', '--version'])
+ has_python34 = ver.split()[1] >= '3.4'
+ except:
+ pass
args.func(args)
@@ -95,7 +104,7 @@ def run_coverage(args):
if not args.source:
args.source = [os.path.join(repo_dir, 'typ')]
argv = cov.argv_from_args(args)
- cov_args = ['-m', 'typ', '-q', '-j', '1']
+ cov_args = ['-m', 'typ', '-j', '1']
call(['python', path_to_cov] + argv + cov_args)
if has_python34:
call(['python3', path_to_cov] + argv + cov_args)
@@ -130,19 +139,19 @@ def run_lint(args):
def run_tests(args):
# Tests that we can run the command line directly if typ is in sys.path.
- call(['python', os.path.join('typ', 'cmdline.py'), '-q',
+ call(['python', os.path.join('typ', 'cmdline.py'),
'typ.tests.main_test.TestMain.test_basic'])
# Test that we can run the command line directly if typ is not in sys.path.
repo_dir = os.path.abspath(os.path.dirname(__file__))
home_dir = os.environ['HOME']
- call(['python', os.path.join(repo_dir, 'typ', 'cmdline.py'), '-q',
+ call(['python', os.path.join(repo_dir, 'typ', 'cmdline.py'),
'typ.tests.main_test.TestMain.test_basic'], cwd=home_dir)
# Now run all the tests under Python2 and Python3.
- call(['python', '-m', 'typ', '-q'])
+ call(['python', '-m', 'typ'])
if has_python34:
- call(['python3', '-m', 'typ', '-q'])
+ call(['python3', '-m', 'typ'])
if __name__ == '__main__':
diff --git a/third_party/typ/tools/cov.py b/third_party/typ/tools/cov.py
index f3a11c0..8e78fc4 100755
--- a/third_party/typ/tools/cov.py
+++ b/third_party/typ/tools/cov.py
@@ -112,6 +112,8 @@ def main(argv=None):
args.pragma = args.pragma or DEFAULT_PRAGMAS
+ if args.show:
+ args.show_missing = True
for pragma in args.show:
if pragma in args.pragma:
args.pragma.remove(pragma)
@@ -123,12 +125,13 @@ def main(argv=None):
cov.start()
try:
if remaining_args[0] == '-m':
- run_python_module(remaining_args[1], remaining_args)
+ run_python_module(remaining_args[1], remaining_args[1:])
else:
run_python_file(remaining_args[0], remaining_args)
except SystemExit as e:
ret = e.code
cov.stop()
+ cov.save()
cov.report(show_missing=args.show_missing)
return ret
diff --git a/third_party/typ/typ/__init__.py b/third_party/typ/typ/__init__.py
index 9107aee..6013a3a 100644
--- a/third_party/typ/typ/__init__.py
+++ b/third_party/typ/typ/__init__.py
@@ -63,7 +63,7 @@ from typ.cmdline import main, spawn_main
from typ.json_results import exit_code_from_full_results
from typ.json_results import make_full_results, make_upload_request
from typ.json_results import Result, ResultSet, ResultType
-from typ.runner import Runner, TestInput
+from typ.runner import Runner, TestInput, TestSet
from typ.stats import Stats
from typ.printer import Printer
from typ.test_case import convert_newlines, TestCase, MainTestCase
@@ -83,6 +83,7 @@ __all__ = [
'Stats',
'TestCase',
'TestInput',
+ 'TestSet',
'VERSION',
'convert_newlines',
'exit_code_from_full_results',
diff --git a/third_party/typ/typ/__main__.py b/third_party/typ/typ/__main__.py
index e664f3a..3b8b541 100644
--- a/third_party/typ/typ/__main__.py
+++ b/third_party/typ/typ/__main__.py
@@ -19,6 +19,6 @@ from typ import main, spawn_main
if __name__ == '__main__':
if sys.platform == 'win32': # pragma: win32
- sys.exit(spawn_main())
+ sys.exit(spawn_main(sys.argv[1:], sys.stdout, sys.stderr))
else: # pragma: no win32
sys.exit(main())
diff --git a/third_party/typ/typ/arg_parser.py b/third_party/typ/typ/arg_parser.py
index 3327e06..ac2d33e 100644
--- a/third_party/typ/typ/arg_parser.py
+++ b/third_party/typ/typ/arg_parser.py
@@ -58,14 +58,17 @@ class ArgumentParser(argparse.ArgumentParser):
action='store',
help=('Takes the list of tests from the file '
'(use "-" for stdin).'))
+ self.add_argument('--all', action='store_true',
+ help=('Run all the tests, including the ones '
+ 'normally skipped.'))
self.add_argument('--isolate', metavar='glob', default=[],
action='append',
help=('Globs of tests to run in isolation '
'(serially).'))
self.add_argument('--skip', metavar='glob', default=[],
action='append',
- help=('Globs of test names to skip (can specify '
- 'multiple times).'))
+ help=('Globs of test names to skip ('
+ 'defaults to %(default)s).'))
self.add_argument('--suffixes', metavar='glob', default=[],
action='append',
help=('Globs of test filenames to look for ('
@@ -153,9 +156,6 @@ class ArgumentParser(argparse.ArgumentParser):
self.add_argument('--no-overwrite', action='store_false',
dest='overwrite', default=None,
help=argparse.SUPPRESS)
- self.add_argument('--setup', help=argparse.SUPPRESS)
- self.add_argument('--teardown', help=argparse.SUPPRESS)
- self.add_argument('--context', help=argparse.SUPPRESS)
if discovery or running:
self.add_argument('-P', '--path', action='append', default=[],
@@ -197,7 +197,7 @@ class ArgumentParser(argparse.ArgumentParser):
if not rargs.coverage_omit:
rargs.coverage_omit = DEFAULT_COVERAGE_OMIT
- if rargs.debugger: # pragma: untested
+ if rargs.debugger: # pragma: no cover
rargs.jobs = 1
rargs.passthrough = True
@@ -214,14 +214,16 @@ class ArgumentParser(argparse.ArgumentParser):
def print_help(self, file=None):
self._print_message(msg=self.format_help(), file=file)
- def error(self, message):
- self.exit(2, '%s: error: %s\n' % (self.prog, message))
+ def error(self, message, bailout=True): # pylint: disable=W0221
+ self.exit(2, '%s: error: %s\n' % (self.prog, message), bailout=bailout)
- def exit(self, status=0, message=None):
+ def exit(self, status=0, message=None, # pylint: disable=W0221
+ bailout=True):
self.exit_status = status
if message:
self._print_message(message, file=self._host.stderr)
- raise _Bailout()
+ if bailout:
+ raise _Bailout()
def optparse_options(self, skip=None):
skip = skip or []
diff --git a/third_party/typ/typ/cmdline.py b/third_party/typ/typ/cmdline.py
index c3f67e2..4bb718f 100644
--- a/third_party/typ/typ/cmdline.py
+++ b/third_party/typ/typ/cmdline.py
@@ -15,29 +15,27 @@
import os
import subprocess
import sys
-import unittest
+
# This ensures that absolute imports of typ modules will work when
# running typ/cmdline.py as a script even if typ is not installed.
# We need this entry in addition to the one in __main__.py to ensure
# that typ/cmdline.py works when invoked via subprocess on windows in
# _spawn_main().
-dir_above_typ = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-if dir_above_typ not in sys.path: # pragma: untested
+path_to_file = os.path.realpath(__file__)
+dir_above_typ = os.path.dirname(os.path.dirname(path_to_file))
+if dir_above_typ not in sys.path: # pragma: no cover
sys.path.append(dir_above_typ)
-from typ.host import Host
from typ.runner import Runner
-def main(argv=None, host=None, loader=None):
- host = host or Host()
- loader = loader or unittest.loader.TestLoader()
- runner = Runner(host=host, loader=loader)
- return runner.main(argv)
+def main(argv=None, host=None, **defaults):
+ runner = Runner(host=host)
+ return runner.main(argv, **defaults)
-def spawn_main(): # pragma: untested
+def spawn_main(argv, stdout, stderr):
# This function is called from __main__.py when running 'python -m typ' on
# windows.
#
@@ -50,15 +48,9 @@ def spawn_main(): # pragma: untested
# We don't want to always spawn a subprocess, because doing so is more
# heavyweight than it needs to be on other platforms (and can make
# debugging a bit more annoying).
- proc = subprocess.Popen([sys.executable, __file__] + sys.argv[1:])
- try:
- proc.wait()
- except KeyboardInterrupt:
- # We may need a second wait in order to make sure the subprocess exits
- # completely.
- proc.wait()
- return proc.returncode
+ return subprocess.call([sys.executable, path_to_file] + argv,
+ stdout=stdout, stderr=stderr)
-if __name__ == '__main__': # pragma: untested
+if __name__ == '__main__': # pragma: no cover
sys.exit(main())
diff --git a/third_party/typ/typ/fakes/host_fake.py b/third_party/typ/typ/fakes/host_fake.py
index e758144..cb74d27 100644
--- a/third_party/typ/typ/fakes/host_fake.py
+++ b/third_party/typ/typ/fakes/host_fake.py
@@ -14,8 +14,18 @@
import copy
import io
+import logging
import sys
+from typ.host import _TeedStream
+
+
+is_python3 = bool(sys.version_info.major == 3)
+
+if is_python3: # pragma: python3
+ # redefining built-in 'unicode' pylint: disable=W0622
+ unicode = str
+
class FakeHost(object):
# "too many instance attributes" pylint: disable=R0902
@@ -26,6 +36,7 @@ class FakeHost(object):
is_python3 = bool(sys.version_info.major == 3)
def __init__(self):
+ self.logger = logging.getLogger()
self.stdin = io.StringIO()
self.stdout = io.StringIO()
self.stderr = io.StringIO()
@@ -41,17 +52,21 @@ class FakeHost(object):
self.mtimes = {}
self.cmds = []
self.cwd = '/tmp'
+ self._orig_logging_handlers = []
- def __getstate__(self): # pragma: untested
+ def __getstate__(self):
d = copy.copy(self.__dict__)
del d['stderr']
del d['stdout']
del d['stdin']
+ del d['logger']
+ del d['_orig_logging_handlers']
return d
- def __setstate__(self, d): # pragma: untested
+ def __setstate__(self, d):
for k, v in d.items():
setattr(self, k, v)
+ self.logger = logging.getLogger()
self.stdin = io.StringIO()
self.stdout = io.StringIO()
self.stderr = io.StringIO()
@@ -76,7 +91,7 @@ class FakeHost(object):
def chdir(self, *comps):
path = self.join(*comps)
- if not path.startswith('/'): # pragma: untested
+ if not path.startswith('/'):
path = self.join(self.cwd, path)
self.cwd = path
@@ -108,7 +123,7 @@ class FakeHost(object):
def getenv(self, key, default=None):
return self.env.get(key, default)
- def getpid(self): # pragma: untested
+ def getpid(self):
return 1
def isdir(self, *comps):
@@ -122,7 +137,7 @@ class FakeHost(object):
def join(self, *comps):
p = ''
for c in comps:
- if c in ('', '.'): # pragma: untested
+ if c in ('', '.'):
continue
elif c.startswith('/'):
p = c
@@ -135,7 +150,7 @@ class FakeHost(object):
p = p.replace('/./', '/')
# Handle ../
- while '/..' in p: # pragma: untested
+ while '/..' in p:
comps = p.split('/')
idx = comps.index('..')
comps = comps[:idx-1] + comps[idx+1:]
@@ -161,8 +176,6 @@ class FakeHost(object):
def print_(self, msg='', end='\n', stream=None):
stream = stream or self.stdout
- if not self.is_python3 and isinstance(msg, str): # pragma: untested
- msg = unicode(msg)
stream.write(msg + end)
stream.flush()
@@ -175,6 +188,9 @@ class FakeHost(object):
def _read(self, comps):
return self.files[self.abspath(*comps)]
+ def realpath(self, *comps):
+ return self.abspath(*comps)
+
def relpath(self, path, start):
return path.replace(start + '/', '')
@@ -215,21 +231,19 @@ class FakeHost(object):
self.files[full_path] = contents
self.written_files[full_path] = contents
- def fetch(self, url, data=None, headers=None): # pragma: untested
- resp = self.fetch_responses.get(url, FakeResponse('', url))
+ def fetch(self, url, data=None, headers=None):
+ resp = self.fetch_responses.get(url, FakeResponse(unicode(''), url))
self.fetches.append((url, data, headers, resp))
return resp
- def _tap_output(self): # pragma: untested
- # TODO: assigning to sys.stdout/sys.stderr confuses the debugger
- # with some sort of str/unicode problem.
+ def _tap_output(self):
self.stdout = _TeedStream(self.stdout)
self.stderr = _TeedStream(self.stderr)
if True:
sys.stdout = self.stdout
sys.stderr = self.stderr
- def _untap_output(self): # pragma: untested
+ def _untap_output(self):
assert isinstance(self.stdout, _TeedStream)
self.stdout = self.stdout.stream
self.stderr = self.stderr.stream
@@ -237,54 +251,23 @@ class FakeHost(object):
sys.stdout = self.stdout
sys.stderr = self.stderr
- def capture_output(self, divert=True): # pragma: untested
+ def capture_output(self, divert=True):
self._tap_output()
- self.stdout.capture(divert)
- self.stderr.capture(divert)
+ self._orig_logging_handlers = self.logger.handlers
+ if self._orig_logging_handlers:
+ self.logger.handlers = [logging.StreamHandler(self.stderr)]
+ self.stdout.capture(divert=divert)
+ self.stderr.capture(divert=divert)
- def restore_output(self): # pragma: untested
+ def restore_output(self):
assert isinstance(self.stdout, _TeedStream)
out, err = (self.stdout.restore(), self.stderr.restore())
+ self.logger.handlers = self._orig_logging_handlers
self._untap_output()
return out, err
-class _TeedStream(io.StringIO): # pragma: untested
-
- def __init__(self, stream):
- super(_TeedStream, self).__init__()
- self.stream = stream
- self.capturing = False
- self.diverting = False
-
- def write(self, msg, *args, **kwargs):
- if self.capturing:
- if sys.version_info.major == 2 and isinstance(msg, str):
- msg = unicode(msg)
- super(_TeedStream, self).write(msg, *args, **kwargs)
- if not self.diverting:
- self.stream.write(msg, *args, **kwargs)
-
- def flush(self):
- if self.capturing:
- super(_TeedStream, self).flush()
- if not self.diverting:
- self.stream.flush()
-
- def capture(self, divert=True):
- self.truncate(0)
- self.capturing = True
- self.diverting = divert
-
- def restore(self):
- msg = self.getvalue()
- self.truncate(0)
- self.capturing = False
- self.diverting = False
- return msg
-
-
-class FakeResponse(io.StringIO): # pragma: untested
+class FakeResponse(io.StringIO):
def __init__(self, response, url, code=200):
io.StringIO.__init__(self, response)
diff --git a/third_party/typ/typ/fakes/tests/host_fake_test.py b/third_party/typ/typ/fakes/tests/host_fake_test.py
index f89b850..02f74e7 100644
--- a/third_party/typ/typ/fakes/tests/host_fake_test.py
+++ b/third_party/typ/typ/fakes/tests/host_fake_test.py
@@ -15,8 +15,13 @@
import sys
from typ.tests import host_test
-from typ.fakes.host_fake import FakeHost
+from typ.fakes.host_fake import FakeHost, FakeResponse
+is_python3 = bool(sys.version_info.major == 3)
+
+if is_python3: # pragma: python3
+ # redefining built-in 'unicode' pylint: disable=W0622
+ unicode = str
class TestFakeHost(host_test.TestHost):
@@ -38,6 +43,36 @@ class TestFakeHost(host_test.TestHost):
self.assertEqual(err, '')
self.assertEqual(h.cmds, [['echo', 'hello, world']])
+ def test_capture_output(self):
+ h = self.host()
+ self.host = lambda: h
+ super(TestFakeHost, self).test_capture_output()
+
+ # This tests that the super-method only tested the
+ # divert=True case, and things were diverted properly.
+ self.assertEqual(h.stdout.getvalue(), '')
+ self.assertEqual(h.stderr.getvalue(), '')
+
+ h.capture_output(divert=False)
+ h.print_('on stdout')
+ h.print_('on stderr', stream=h.stderr)
+ out, err = h.restore_output()
+ self.assertEqual(out, 'on stdout\n')
+ self.assertEqual(err, 'on stderr\n')
+ self.assertEqual(h.stdout.getvalue(), 'on stdout\n')
+ self.assertEqual(h.stderr.getvalue(), 'on stderr\n')
+
def test_for_mp(self):
h = self.host()
self.assertNotEqual(h.for_mp(), None)
+
+ def test_fetch(self):
+ h = self.host()
+ url = 'http://localhost/test'
+ resp = FakeResponse(unicode('foo'), url)
+ h.fetch_responses[url] = resp
+ actual_resp = h.fetch(url)
+ self.assertEqual(actual_resp.geturl(), url)
+ self.assertEqual(actual_resp.getcode(), 200)
+ self.assertEqual(resp, actual_resp)
+ self.assertEqual(h.fetches, [(url, None, None, actual_resp)])
diff --git a/third_party/typ/typ/fakes/unittest_fakes.py b/third_party/typ/typ/fakes/unittest_fakes.py
deleted file mode 100644
index 51fc870..0000000
--- a/third_party/typ/typ/fakes/unittest_fakes.py
+++ /dev/null
@@ -1,176 +0,0 @@
-# Copyright 2014 Dirk Pranke. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import fnmatch
-import re
-import sys
-import unittest
-
-from typ.host import Host
-
-
-class FakeTestLoader(object):
- # invalid names pylint: disable=C0103
- # protected member _tests pylint: disable=W0212
- # unused args pylint: disable=W0613
-
- def __init__(self, host, orig_sys_path):
- self._host = host
- self.orig_sys_path = orig_sys_path
-
- def __getstate__(self):
- return {'orig_sys_path': self.orig_sys_path, '_host': None}
-
- def host(self):
- if not self._host:
- self._host = Host()
- return self._host
-
- def discover(self, start_dir, pattern='test*.py', top_level_dir=None):
- h = self.host()
- all_files = h.files_under(start_dir)
- matching_files = [f for f in all_files if
- fnmatch.fnmatch(h.basename(f), pattern)]
- suite = unittest.TestSuite()
- for f in matching_files:
- suite.addTests(self._loadTestsFromFile(h.join(start_dir, f),
- top_level_dir))
- return suite
-
- def _loadTestsFromFile(self, path, top_level_dir='.'):
- h = self.host()
- rpath = h.relpath(path, top_level_dir)
- module_name = (h.splitext(rpath)[0]).replace(h.sep, '.')
- class_name = ''
- suite = unittest.TestSuite()
- for l in h.read_text_file(path).splitlines():
- m = re.match('class (.+)\(', l)
- if m:
- class_name = m.group(1)
- m = re.match('.+def (.+)\(', l)
- if m:
- method_name = m.group(1)
- tc = FakeTestCase(h, '%s.%s.%s' % (module_name, class_name,
- method_name))
- suite.addTest(tc)
- return suite
-
- def loadTestsFromName(self, name, module=None): # pragma: untested
- h = self.host()
- comps = name.split('.')
- path = '/'.join(comps)
- test_path_dirs = [d for d in sys.path if d not in self.orig_sys_path]
-
- if len(comps) == 1:
- if h.isdir(path):
- # package
- return self.discover(path)
- if h.isfile(path + '.py'):
- # module
- return self._loadTestsFromFile(path + '.py')
- for d in test_path_dirs:
- path = h.join(d, comps[0] + '.py')
- if h.isfile(path):
- # module
- suite = self._loadTestsFromFile(path, d)
- matching_tests = [t for t in suite._tests if
- t.id().startswith(name)]
- if not matching_tests:
- raise AttributeError()
- return unittest.TestSuite(matching_tests)
- if h.isdir(d, path):
- # package
- return self.discover(path)
- raise ImportError()
-
- if len(comps) == 2:
- if h.isfile(comps[0] + '.py'):
- # module + class
- suite = self._loadTestsFromFile(comps[0] + '.py')
- matching_tests = [t for t in suite._tests if
- t.id().startswith(name)]
- if not matching_tests:
- raise AttributeError()
- return unittest.TestSuite(matching_tests)
-
- for d in test_path_dirs:
- path = h.join(d, comps[0], comps[1] + '.py')
- if h.isfile(path):
- # package + module
- suite = self._loadTestsFromFile(path, d)
- return unittest.TestSuite([t for t in suite._tests if
- t.id().startswith(name)])
- if h.isdir(d, comps[0], comps[1]):
- # package
- return self.discover(path)
-
- # no match
- raise ImportError()
-
- module_name = '.'.join(comps[:-2])
- fname = module_name.replace('.', h.sep) + '.py'
-
- for d in test_path_dirs:
- path = h.join(d, fname)
- if h.isfile(path):
- # module + class + method
- suite = self._loadTestsFromFile(path, d)
- return unittest.TestSuite([t for t in suite._tests if
- t.id() == name])
- if h.isdir(d, comps[0], comps[1]):
- # package
- return self.discover(h.join(d, comps[0], comps[1]))
-
- fname = module_name.replace('.', h.sep) + '.' + comps[-2] + '.py'
- if h.isfile(h.join(d, fname)):
- # module + class
- suite = self._loadTestsFromFile(comps[0] + '.py', d)
- return unittest.TestSuite([t for t in suite._tests if
- t.id().startswith(name)])
-
- # no match
- return unittest.TestSuite()
-
-
-class FakeTestCase(unittest.TestCase):
-
- def __init__(self, host, name):
- self._host = host
- self._name = name
- comps = self._name.split('.')
- self._class_name = comps[:-1]
- method_name = comps[-1]
- setattr(self, method_name, self._run)
- super(FakeTestCase, self).__init__(method_name)
-
- def id(self):
- return self._name
-
- def __str__(self): # pragma: untested
- return "%s (%s)" % (self._testMethodName, self._class_name)
-
- def __repr__(self): # pragma: untested
- return "%s testMethod=%s" % (self._class_name, self._testMethodName)
-
- def _run(self):
- if '_fail' in self._testMethodName:
- self.fail()
- if '_out' in self._testMethodName: # pragma: untested
- self._host.stdout.write('hello on stdout')
- self._host.stdout.flush()
- if '_err' in self._testMethodName: # pragma: untested
- self._host.stderr.write('hello on stderr')
- self._host.stderr.flush()
- if '_interrupt' in self._testMethodName:
- raise KeyboardInterrupt()
diff --git a/third_party/typ/typ/host.py b/third_party/typ/typ/host.py
index 5738e3a..4859d9c 100644
--- a/third_party/typ/typ/host.py
+++ b/third_party/typ/typ/host.py
@@ -31,9 +31,6 @@ else: # pragma: python3
from urllib.request import urlopen, Request # pylint: disable=F0401,E0611
-is_debugging = False
-
-
class Host(object):
python_interpreter = sys.executable
is_python3 = bool(sys.version_info.major == 3)
@@ -52,14 +49,6 @@ class Host(object):
self.stdin = sys.stdin
self.env = os.environ
- def set_debugging(self, flag): # pragma: untested
- # TODO: We currently use this to work around typ's brokenness
- # when running -d under python3. We may or may not actually need
- # this hook.
- # pylint: disable=W0603
- global is_debugging
- is_debugging = flag
-
def abspath(self, *comps):
return os.path.abspath(self.join(*comps))
@@ -130,7 +119,7 @@ class Host(object):
def maybe_mkdir(self, *comps):
path = self.abspath(self.join(*comps))
if not self.exists(path):
- os.mkdir(path)
+ os.makedirs(path)
def mkdtemp(self, **kwargs):
return tempfile.mkdtemp(**kwargs)
@@ -154,6 +143,9 @@ class Host(object):
with open(path, mode) as f:
return f.read()
+ def realpath(self, *comps):
+ return os.path.realpath(os.path.join(*comps))
+
def relpath(self, path, start):
return os.path.relpath(path, start)
@@ -216,7 +208,7 @@ class Host(object):
termios.TIOCGWINSZ, '\0' * 8)
_, columns, _, _ = struct.unpack('HHHH', packed)
return columns
- except Exception: # pragma: untested
+ except Exception:
return 0
def _tap_output(self):
@@ -230,12 +222,9 @@ class Host(object):
def capture_output(self, divert=True):
self._tap_output()
-
- # TODO: Make log capture more robust.
self._orig_logging_handlers = self.logger.handlers
- if self._orig_logging_handlers: # pragma: untested
+ if self._orig_logging_handlers:
self.logger.handlers = [logging.StreamHandler(self.stderr)]
-
self.stdout.capture(divert)
self.stderr.capture(divert)
@@ -255,15 +244,16 @@ class _TeedStream(io.StringIO):
self.capturing = False
self.diverting = False
- def write(self, msg, *args, **kwargs): # pragma: untested
+ def write(self, msg, *args, **kwargs):
if self.capturing:
- if sys.version_info.major == 2 and isinstance(msg, str):
+ if (sys.version_info.major == 2 and
+ isinstance(msg, str)): # pragma: python2
msg = unicode(msg)
super(_TeedStream, self).write(msg, *args, **kwargs)
if not self.diverting:
self.stream.write(msg, *args, **kwargs)
- def flush(self): # pragma: untested
+ def flush(self):
if self.capturing:
super(_TeedStream, self).flush()
if not self.diverting:
diff --git a/third_party/typ/typ/json_results.py b/third_party/typ/typ/json_results.py
index 8e03f6f..5e95ed5 100644
--- a/third_party/typ/typ/json_results.py
+++ b/third_party/typ/typ/json_results.py
@@ -127,8 +127,7 @@ def failed_test_names(results):
for r in results.results:
if r.actual == ResultType.Failure:
names.add(r.name)
- elif (r.actual == ResultType.Pass and
- r.name in names): # pragma: untested
+ elif (r.actual == ResultType.Pass and r.name in names):
names.remove(r.name)
return names
diff --git a/third_party/typ/typ/pool.py b/third_party/typ/typ/pool.py
index 11a3320..1afcd1c 100644
--- a/third_party/typ/typ/pool.py
+++ b/third_party/typ/typ/pool.py
@@ -19,6 +19,14 @@ import pickle
from typ.host import Host
+def make_pool(host, jobs, callback, context, pre_fn, post_fn):
+ _validate_args(context, pre_fn, post_fn)
+ if jobs > 1:
+ return _ProcessPool(host, jobs, callback, context, pre_fn, post_fn)
+ else:
+ return _AsyncPool(host, jobs, callback, context, pre_fn, post_fn)
+
+
class _MessageType(object):
Request = 'Request'
Response = 'Response'
@@ -30,25 +38,23 @@ class _MessageType(object):
values = [Request, Response, Close, Done, Error, Interrupt]
-def make_pool(host, jobs, callback, context, pre_fn, post_fn):
+def _validate_args(context, pre_fn, post_fn):
try:
_ = pickle.dumps(context)
- except Exception as e: # pragma: untested
+ except Exception as e:
raise ValueError('context passed to make_pool is not picklable: %s'
% str(e))
try:
_ = pickle.dumps(pre_fn)
- except pickle.PickleError: # pragma: untested
+ except pickle.PickleError:
raise ValueError('pre_fn passed to make_pool is not picklable')
try:
_ = pickle.dumps(post_fn)
- except pickle.PickleError: # pragma: untested
+ except pickle.PickleError:
raise ValueError('post_fn passed to make_pool is not picklable')
- cls = ProcessPool if jobs > 1 else AsyncPool
- return cls(host, jobs, callback, context, pre_fn, post_fn)
-class ProcessPool(object):
+class _ProcessPool(object):
def __init__(self, host, jobs, callback, context, pre_fn, post_fn):
self.host = host
@@ -56,6 +62,7 @@ class ProcessPool(object):
self.requests = multiprocessing.Queue()
self.responses = multiprocessing.Queue()
self.workers = []
+ self.discarded_responses = []
self.closed = False
self.erred = False
for worker_num in range(1, jobs + 1):
@@ -70,11 +77,11 @@ class ProcessPool(object):
def send(self, msg):
self.requests.put((_MessageType.Request, msg))
- def get(self, block=True, timeout=None):
- msg_type, resp = self.responses.get(block, timeout)
- if msg_type == _MessageType.Error: # pragma: untested
+ def get(self):
+ msg_type, resp = self.responses.get()
+ if msg_type == _MessageType.Error:
self._handle_error(resp)
- elif msg_type == _MessageType.Interrupt: # pragma: untested
+ elif msg_type == _MessageType.Interrupt:
raise KeyboardInterrupt
assert msg_type == _MessageType.Response
return resp
@@ -82,43 +89,89 @@ class ProcessPool(object):
def close(self):
for _ in self.workers:
self.requests.put((_MessageType.Close, None))
- self.requests.close()
self.closed = True
def join(self):
+ # TODO: one would think that we could close self.requests in close(),
+ # above, and close self.responses below, but if we do, we get
+ # weird tracebacks in the daemon threads multiprocessing starts up.
+ # Instead, we have to hack the innards of multiprocessing. It
+ # seems likely that there's a bug somewhere, either in this module or
+ # in multiprocessing.
+ if self.host.is_python3: # pragma: python3
+ multiprocessing.queues.is_exiting = lambda: True
+ else: # pragma: python2
+ multiprocessing.util._exiting = True
+
if not self.closed:
# We must be aborting; terminate the workers rather than
# shutting down cleanly.
- self.requests.close()
for w in self.workers:
w.terminate()
w.join()
- self.responses.close()
return []
final_responses = []
+ error = None
+ interrupted = None
for w in self.workers:
while True:
- msg_type, resp = self.responses.get(True)
- if msg_type == _MessageType.Error: # pragma: untested
- self._handle_error(resp)
- elif msg_type == _MessageType.Interrupt: # pragma: untested
- raise KeyboardInterrupt
- elif msg_type == _MessageType.Done:
+ msg_type, resp = self.responses.get()
+ if msg_type == _MessageType.Error:
+ error = resp
break
- # TODO: log something about discarding messages?
- final_responses.append(resp)
+ if msg_type == _MessageType.Interrupt:
+ interrupted = True
+ break
+ if msg_type == _MessageType.Done:
+ final_responses.append(resp[1])
+ break
+ self.discarded_responses.append(resp)
+
+ for w in self.workers:
w.join()
- self.responses.close()
+
+ # TODO: See comment above at the beginning of the function for
+ # why this is commented out.
+ # self.responses.close()
+
+ if error:
+ self._handle_error(error)
+ if interrupted:
+ raise KeyboardInterrupt
return final_responses
- def _handle_error(self, msg): # pragma: untested
+ def _handle_error(self, msg):
worker_num, ex_str = msg
self.erred = True
raise Exception("error from worker %d: %s" % (worker_num, ex_str))
-class AsyncPool(object):
+# 'Too many arguments' pylint: disable=R0913
+
+def _loop(requests, responses, host, worker_num,
+ callback, context, pre_fn, post_fn, should_loop=True):
+ host = host or Host()
+ try:
+ context_after_pre = pre_fn(host, worker_num, context)
+ keep_looping = True
+ while keep_looping:
+ message_type, args = requests.get(block=True)
+ if message_type == _MessageType.Close:
+ responses.put((_MessageType.Done,
+ (worker_num, post_fn(context_after_pre))))
+ break
+ assert message_type == _MessageType.Request
+ resp = callback(context_after_pre, args)
+ responses.put((_MessageType.Response, resp))
+ keep_looping = should_loop
+ except KeyboardInterrupt as e:
+ responses.put((_MessageType.Interrupt, (worker_num, str(e))))
+ except Exception as e:
+ responses.put((_MessageType.Error, (worker_num, str(e))))
+
+
+class _AsyncPool(object):
def __init__(self, host, jobs, callback, context, pre_fn, post_fn):
self.host = host or Host()
@@ -134,8 +187,7 @@ class AsyncPool(object):
def send(self, msg):
self.msgs.append(msg)
- def get(self, block=True, timeout=None):
- # unused pylint: disable=W0613
+ def get(self):
return self.callback(self.context_after_pre, self.msgs.pop(0))
def close(self):
@@ -146,31 +198,3 @@ class AsyncPool(object):
if not self.closed:
self.close()
return [self.final_context]
-
-
-def _loop(requests, responses, host, worker_num,
- callback, context, pre_fn, post_fn): # pragma: untested
- # TODO: Figure out how to get coverage to work w/ subprocesses.
- host = host or Host()
- erred = False
- try:
- context_after_pre = pre_fn(host, worker_num, context)
- while True:
- message_type, args = requests.get(block=True)
- if message_type == _MessageType.Close:
- break
- assert message_type == _MessageType.Request
- resp = callback(context_after_pre, args)
- responses.put((_MessageType.Response, resp))
- except KeyboardInterrupt as e:
- erred = True
- responses.put((_MessageType.Interrupt, (worker_num, str(e))))
- except Exception as e:
- erred = True
- responses.put((_MessageType.Error, (worker_num, str(e))))
-
- try:
- if not erred:
- responses.put((_MessageType.Done, post_fn(context_after_pre)))
- except Exception:
- pass
diff --git a/third_party/typ/typ/runner.py b/third_party/typ/typ/runner.py
index 638c94d..67cfd26 100644
--- a/third_party/typ/typ/runner.py
+++ b/third_party/typ/typ/runner.py
@@ -70,9 +70,9 @@ class _AddTestsError(Exception):
class Runner(object):
- def __init__(self, host=None, loader=None):
+ def __init__(self, host=None):
self.host = host or Host()
- self.loader = loader or unittest.loader.TestLoader()
+ self.loader = unittest.loader.TestLoader()
self.printer = None
self.stats = None
self.cov = None
@@ -84,9 +84,9 @@ class Runner(object):
parser = ArgumentParser(self.host)
self.parse_args(parser, [])
- def main(self, argv=None):
+ def main(self, argv=None, **defaults):
parser = ArgumentParser(self.host)
- self.parse_args(parser, argv)
+ self.parse_args(parser, argv, **defaults)
if parser.exit_status is not None:
return parser.exit_status
@@ -97,7 +97,13 @@ class Runner(object):
self.print_("interrupted, exiting", stream=self.host.stderr)
return 130
- def parse_args(self, parser, argv):
+ def parse_args(self, parser, argv, **defaults):
+ for attrname in defaults:
+ if not hasattr(self.args, attrname):
+ parser.error("Unknown default argument name '%s'" % attrname,
+ bailout=False)
+ return
+ parser.set_defaults(**defaults)
self.args = parser.parse_args(args=argv)
if parser.exit_status is not None:
return
@@ -105,8 +111,8 @@ class Runner(object):
def print_(self, msg='', end='\n', stream=None):
self.host.print_(msg, end, stream=stream)
- def run(self, test_set=None, classifier=None, context=None,
- setup_fn=None, teardown_fn=None):
+ def run(self, test_set=None, classifier=None,
+ context=None, setup_fn=None, teardown_fn=None):
ret = 0
h = self.host
@@ -115,7 +121,7 @@ class Runner(object):
return ret, None, None
ret = self._set_up_runner()
- if ret: # pragma: untested
+ if ret: # pragma: no cover
return ret, None, None
find_start = h.time()
@@ -168,16 +174,19 @@ class Runner(object):
self.top_level_dir = args.top_level_dir
if not self.top_level_dir:
- if args.tests and h.exists(args.tests[0]):
+ if args.tests and h.isdir(args.tests[0]):
# TODO: figure out what to do if multiple files are
# specified and they don't all have the same correct
# top level dir.
- top_dir = h.dirname(args.tests[0])
+ if h.exists(h.dirname(args.tests[0]), '__init__.py'):
+ top_dir = h.dirname(args.tests[0])
+ else:
+ top_dir = args.tests[0]
else:
top_dir = h.getcwd()
while h.exists(top_dir, '__init__.py'):
top_dir = h.dirname(top_dir)
- self.top_level_dir = h.abspath(top_dir)
+ self.top_level_dir = h.realpath(top_dir)
h.add_to_path(self.top_level_dir)
@@ -201,38 +210,41 @@ class Runner(object):
def find_tests(self, args, classifier=None,
context=None, setup_fn=None, teardown_fn=None):
- if not context and self.args.context: # pragma: untested
- context = json.loads(self.args.context)
- if not setup_fn and self.args.setup: # pragma: untested
- setup_fn = _import_name(self.args.setup)
- if not teardown_fn and self.args.teardown: # pragma: untested
- teardown_fn = _import_name(self.args.teardown)
-
test_set = self._make_test_set(context=context,
setup_fn=setup_fn,
teardown_fn=teardown_fn)
- names = self._name_list_from_args(args)
- classifier = classifier or _default_classifier(args)
+ orig_skip = unittest.skip
+ orig_skip_if = unittest.skipIf
+ if args.all:
+ unittest.skip = lambda reason: lambda x: x
+ unittest.skipIf = lambda condition, reason: lambda x: x
- for name in names:
- try:
- self._add_tests_to_set(test_set, args.suffixes,
- self.top_level_dir, classifier, name)
- except (AttributeError, ImportError, SyntaxError
- ) as e: # pragma: untested
- self.print_('Failed to load "%s": %s' % (name, e))
- return 1, None
- except _AddTestsError as e: # pragma: untested
- self.print_(str(e))
- return 1, None
-
- # TODO: Add support for discovering setupProcess/teardownProcess?
-
- test_set.parallel_tests = _sort_inputs(test_set.parallel_tests)
- test_set.isolated_tests = _sort_inputs(test_set.isolated_tests)
- test_set.tests_to_skip = _sort_inputs(test_set.tests_to_skip)
- return 0, test_set
+ try:
+ names = self._name_list_from_args(args)
+ classifier = classifier or _default_classifier(args)
+
+ for name in names:
+ try:
+ self._add_tests_to_set(test_set, args.suffixes,
+ self.top_level_dir, classifier,
+ name)
+ except (AttributeError, ImportError, SyntaxError) as e:
+ self.print_('Failed to load "%s": %s' % (name, e))
+ return 1, None
+ except _AddTestsError as e:
+ self.print_(str(e))
+ return 1, None
+
+ # TODO: Add support for discovering setupProcess/teardownProcess?
+
+ test_set.parallel_tests = _sort_inputs(test_set.parallel_tests)
+ test_set.isolated_tests = _sort_inputs(test_set.isolated_tests)
+ test_set.tests_to_skip = _sort_inputs(test_set.tests_to_skip)
+ return 0, test_set
+ finally:
+ unittest.skip = orig_skip
+ unittest.skipIf = orig_skip_if
def _name_list_from_args(self, args):
if args.tests:
@@ -244,7 +256,7 @@ class Runner(object):
s = self.host.read_text_file(args.file_list)
names = [line.strip() for line in s.splitlines()]
else:
- names = ['.']
+ names = [self.top_level_dir]
return names
def _add_tests_to_set(self, test_set, suffixes, top_level_dir, classifier,
@@ -399,14 +411,16 @@ class Runner(object):
def _print_test_finished(self, stats, result):
stats.add_time()
+
+ assert result.actual in [ResultType.Failure, ResultType.Skip,
+ ResultType.Pass]
if result.actual == ResultType.Failure:
result_str = ' failed'
elif result.actual == ResultType.Skip:
result_str = ' was skipped'
elif result.actual == ResultType.Pass:
result_str = ' passed'
- else: # pragma: untested
- raise ValueError('Unimplemented result type %s' % result)
+
if result.unexpected:
result_str += ' unexpectedly'
if self.args.timing:
@@ -420,16 +434,16 @@ class Runner(object):
if out or err:
suffix += ':\n'
self.update(stats.format() + result.name + suffix, elide=False)
- for l in out.splitlines(): # pragma: untested
+ for l in out.splitlines():
self.print_(' %s' % l)
- for l in err.splitlines(): # pragma: untested
+ for l in err.splitlines():
self.print_(' %s' % l)
elif not self.args.quiet:
- if self.args.verbose > 1 and (out or err): # pragma: untested
+ if self.args.verbose > 1 and (out or err):
suffix += ':\n'
self.update(stats.format() + result.name + suffix,
elide=(not self.args.verbose))
- if self.args.verbose > 1: # pragma: untested
+ if self.args.verbose > 1:
for l in out.splitlines():
self.print_(' %s' % l)
for l in err.splitlines():
@@ -552,7 +566,7 @@ def _matches(name, globs):
def _default_classifier(args):
def default_classifier(test_set, test):
name = test.id()
- if _matches(name, args.skip):
+ if not args.all and _matches(name, args.skip):
test_set.tests_to_skip.append(TestInput(name,
'skipped by request'))
elif _matches(name, args.isolate):
@@ -568,8 +582,7 @@ def _test_adder(test_set, classifier):
for el in obj:
add_tests(el)
elif (obj.id().startswith('unittest.loader.LoadTestsFailure') or
- obj.id().startswith('unittest.loader.ModuleImportFailure')
- ): # pragma: untested
+ obj.id().startswith('unittest.loader.ModuleImportFailure')):
# Access to protected member pylint: disable=W0212
module_name = obj._testMethodName
try:
@@ -592,6 +605,7 @@ class _Child(object):
def __init__(self, parent, loader, test_set):
self.host = None
self.worker_num = None
+ self.all = parent.args.all
self.debugger = parent.args.debugger
self.coverage = parent.args.coverage and parent.args.jobs > 1
self.coverage_source = parent.coverage_source
@@ -611,14 +625,14 @@ def _setup_process(host, worker_num, child):
child.host = host
child.worker_num = worker_num
- if child.coverage: # pragma: untested
+ if child.coverage: # pragma: no cover
import coverage
child.cov = coverage.coverage(source=child.coverage_source,
data_suffix=True)
child.cov._warn_no_data = False
child.cov.start()
- if child.setup_fn: # pragma: untested
+ if child.setup_fn:
child.context_after_setup = child.setup_fn(child, child.context)
else:
child.context_after_setup = child.context
@@ -626,36 +640,24 @@ def _setup_process(host, worker_num, child):
def _teardown_process(child):
- if child.teardown_fn: # pragma: untested
+ if child.teardown_fn:
child.teardown_fn(child, child.context_after_setup)
# TODO: Return a more structured result, including something from
# the teardown function?
- if child.cov: # pragma: untested
+ if child.cov: # pragma: no cover
child.cov.stop()
child.cov.save()
return child.worker_num
-def _import_name(name): # pragma: untested
- module_name, function_name = name.rsplit('.', 1)
- module = importlib.import_module(module_name)
- return getattr(module, function_name)
-
-
def _run_one_test(child, test_input):
h = child.host
pid = h.getpid()
test_name = test_input.name
start = h.time()
- if child.dry_run:
- return Result(test_name, ResultType.Pass, start, 0, child.worker_num,
- pid=pid)
-
- if h.is_python3 and child.debugger: # pragma: untested
- h.set_debugging(True)
# It is important to capture the output before loading the test
# to ensure that
@@ -667,19 +669,27 @@ def _run_one_test(child, test_input):
h.capture_output(divert=not child.passthrough)
try:
- suite = child.loader.loadTestsFromName(test_name)
- except Exception as e: # pragma: untested
- suite = _load_via_load_tests(child, test_name)
- if not suite:
- # TODO: Figure out how to handle failures here.
- err = 'failed to load %s: %s' % (test_name, str(e))
- h.restore_output()
- return Result(test_name, ResultType.Failure, start, 0,
- child.worker_num, unexpected=True, code=1,
- err=err, pid=pid)
+ orig_skip = unittest.skip
+ orig_skip_if = unittest.skipIf
+ if child.all:
+ unittest.skip = lambda reason: lambda x: x
+ unittest.skipIf = lambda condition, reason: lambda x: x
+ try:
+ suite = child.loader.loadTestsFromName(test_name)
+ except Exception:
+ suite = _load_via_load_tests(child, test_name)
+ finally:
+ unittest.skip = orig_skip
+ unittest.skipIf = orig_skip_if
tests = list(suite)
- assert len(tests) == 1
+ if len(tests) != 1:
+ err = 'failed to load %s' % test_name
+ h.restore_output()
+ return Result(test_name, ResultType.Failure, start, 0,
+ child.worker_num, unexpected=True, code=1,
+ err=err, pid=pid)
+
test_case = tests[0]
if isinstance(test_case, TypTestCase):
test_case.child = child
@@ -689,14 +699,10 @@ def _run_one_test(child, test_input):
out = ''
err = ''
try:
- if child.debugger: # pragma: untested
- # Access to protected member pylint: disable=W0212
- test_func = getattr(test_case, test_case._testMethodName)
- fname = inspect.getsourcefile(test_func)
- lineno = inspect.getsourcelines(test_func)[1] + 1
- dbg = pdb.Pdb(stdout=h.stdout)
- dbg.set_break(fname, lineno)
- dbg.runcall(suite.run, test_result)
+ if child.dry_run:
+ pass
+ elif child.debugger: # pragma: no cover
+ _run_under_debugger(h, test_case, suite, test_result)
else:
suite.run(test_result)
finally:
@@ -707,6 +713,17 @@ def _run_one_test(child, test_input):
err, child.worker_num, pid)
+def _run_under_debugger(host, test_case, suite,
+ test_result): # pragma: no cover
+ # Access to protected member pylint: disable=W0212
+ test_func = getattr(test_case, test_case._testMethodName)
+ fname = inspect.getsourcefile(test_func)
+ lineno = inspect.getsourcelines(test_func)[1] + 1
+ dbg = pdb.Pdb(stdout=host.stdout.stream)
+ dbg.set_break(fname, lineno)
+ dbg.runcall(suite.run, test_result)
+
+
def _result_from_test_result(test_result, test_name, start, took, out, err,
worker_num, pid):
flaky = False
@@ -716,25 +733,25 @@ def _result_from_test_result(test_result, test_name, start, took, out, err,
code = 1
unexpected = True
err = err + test_result.failures[0][1]
- elif test_result.errors: # pragma: untested
+ elif test_result.errors:
expected = [ResultType.Pass]
actual = ResultType.Failure
code = 1
unexpected = True
err = err + test_result.errors[0][1]
- elif test_result.skipped: # pragma: untested
+ elif test_result.skipped:
expected = [ResultType.Skip]
actual = ResultType.Skip
err = err + test_result.skipped[0][1]
code = 0
unexpected = False
- elif test_result.expectedFailures: # pragma: untested
+ elif test_result.expectedFailures:
expected = [ResultType.Failure]
actual = ResultType.Failure
code = 1
err = err + test_result.expectedFailures[0][1]
unexpected = False
- elif test_result.unexpectedSuccesses: # pragma: untested
+ elif test_result.unexpectedSuccesses:
expected = [ResultType.Failure]
actual = ResultType.Pass
code = 0
@@ -749,12 +766,13 @@ def _result_from_test_result(test_result, test_name, start, took, out, err,
expected, unexpected, flaky, code, out, err, pid)
-def _load_via_load_tests(child, test_name): # pragma: untested
+def _load_via_load_tests(child, test_name):
# If we couldn't import a test directly, the test may be only loadable
# via unittest's load_tests protocol. See if we can find a load_tests
# entry point that will work for this test.
loader = child.loader
comps = test_name.split('.')
+ new_suite = unittest.TestSuite()
while comps:
name = '.'.join(comps)
@@ -766,24 +784,17 @@ def _load_via_load_tests(child, test_name): # pragma: untested
except ImportError:
pass
if module:
- try:
- suite = loader.loadTestsFromModule(module)
- except Exception:
- # TODO: Figure out how to handle errors here
- pass
+ suite = loader.loadTestsFromModule(module)
child.loaded_suites[name] = suite
suite = child.loaded_suites[name]
if suite:
for test_case in suite:
- if not isinstance(test_case, unittest.TestCase):
- pass # pdb.set_trace()
+ assert isinstance(test_case, unittest.TestCase)
if test_case.id() == test_name:
- new_suite = unittest.TestSuite()
new_suite.addTest(test_case)
- return new_suite
+ break
comps.pop()
- if not comps:
- return None
+ return new_suite
def _sort_inputs(inps):
diff --git a/third_party/typ/typ/test_case.py b/third_party/typ/typ/test_case.py
index 2bf995b..5be83ae 100644
--- a/third_party/typ/typ/test_case.py
+++ b/third_party/typ/typ/test_case.py
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import fnmatch
import shlex
import unittest
@@ -29,6 +30,7 @@ class TestCase(unittest.TestCase):
class MainTestCase(TestCase):
prog = None
+ files_to_ignore = []
def _write_files(self, host, files):
for path, contents in list(files.items()):
@@ -40,6 +42,8 @@ class MainTestCase(TestCase):
def _read_files(self, host, tmpdir):
out_files = {}
for f in host.files_under(tmpdir):
+ if any(fnmatch.fnmatch(f, pat) for pat in self.files_to_ignore):
+ continue
key = f.replace(host.sep, '/')
out_files[key] = host.read_text_file(tmpdir, f)
return out_files
@@ -56,7 +60,6 @@ class MainTestCase(TestCase):
# If we are ever called by unittest directly, and not through typ,
# this will probably fail.
assert(self.child)
-
return self.child.host
def call(self, host, argv, stdin, env):
@@ -85,14 +88,23 @@ class MainTestCase(TestCase):
env = host.env.copy()
env.update(aenv)
+ if self.child.debugger: # pragma: no cover
+ host.print_('')
+ host.print_('cd %s' % tmpdir, stream=host.stdout.stream)
+ host.print_(' '.join(prog + argv), stream=host.stdout.stream)
+ host.print_('')
+ import pdb
+ dbg = pdb.Pdb(stdout=host.stdout.stream)
+ dbg.set_trace()
+
result = self.call(host, prog + argv, stdin=stdin, env=env)
actual_ret, actual_out, actual_err = result
actual_files = self._read_files(host, tmpdir)
finally:
+ host.chdir(orig_wd)
if tmpdir:
host.rmtree(tmpdir)
- host.chdir(orig_wd)
if universal_newlines:
actual_out = convert_newlines(actual_out)
diff --git a/third_party/typ/typ/tests/cmdline_test.py b/third_party/typ/typ/tests/cmdline_test.py
new file mode 100644
index 0000000..fbb908c
--- /dev/null
+++ b/third_party/typ/typ/tests/cmdline_test.py
@@ -0,0 +1,45 @@
+# Copyright 2014 Dirk Pranke. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import tempfile
+
+from typ import spawn_main
+from typ import test_case
+from typ import VERSION
+
+
+class TestSpawnMain(test_case.MainTestCase):
+ def call(self, host, argv, stdin, env):
+ out = err = None
+ out_str = err_str = ''
+ try:
+ out = tempfile.NamedTemporaryFile(delete=False)
+ err = tempfile.NamedTemporaryFile(delete=False)
+ ret = spawn_main(argv, stdout=out, stderr=err)
+ out.close()
+ out_str = open(out.name).read()
+ err.close()
+ err_str = open(err.name).read()
+ finally:
+ if out:
+ out.close()
+ os.remove(out.name)
+ if err:
+ err.close()
+ os.remove(err.name)
+ return ret, out_str, err_str
+
+ def test_version(self):
+ self.check(['--version'], ret=0, out=VERSION + '\n')
diff --git a/third_party/typ/typ/tests/host_test.py b/third_party/typ/typ/tests/host_test.py
index a0479b2..8bb94f8 100644
--- a/third_party/typ/typ/tests/host_test.py
+++ b/third_party/typ/typ/tests/host_test.py
@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import pickle
import sys
import unittest
@@ -23,6 +25,35 @@ class TestHost(unittest.TestCase):
def host(self):
return Host()
+ def test_capture_output(self):
+ try:
+ logging.basicConfig()
+ h = self.host()
+ h.capture_output()
+ h.print_('on stdout')
+ h.print_('on stderr', stream=h.stderr)
+ logging.critical('critical log failure')
+ out, err = h.restore_output()
+ self.assertEqual(out, 'on stdout\n')
+ self.assertEqual(err, 'on stderr\ncritical log failure\n')
+ finally:
+ h.logger.handlers = []
+
+ # TODO: Add tests for divert=False or eliminate the flag?
+
+ def test_abspath_and_realpath(self):
+ h = self.host()
+ self.assertNotEqual(h.abspath(h.getcwd()), None)
+ self.assertNotEqual(h.realpath(h.getcwd()), None)
+
+ def test_chdir(self):
+ h = self.host()
+ orig_cwd = h.getcwd()
+ h.chdir('.')
+ self.assertEqual(orig_cwd, h.getcwd())
+ h.chdir('..')
+ self.assertNotEqual(orig_cwd, h.getcwd())
+
def test_files(self):
h = self.host()
orig_cwd = h.getcwd()
@@ -76,9 +107,11 @@ class TestHost(unittest.TestCase):
h = self.host()
self.assertGreaterEqual(h.terminal_width(), 0)
- def test_for_mp(self):
+ def test_for_mp_and_pickling(self):
h = self.host()
- self.assertEqual(h.for_mp(), None)
+ mp_host = h.for_mp()
+ s = pickle.dumps(mp_host)
+ pickle.loads(s)
def test_cpu_count(self):
h = self.host()
@@ -88,6 +121,10 @@ class TestHost(unittest.TestCase):
h = self.host()
self.assertNotEqual(h.getenv('PATH', ''), None)
+ def test_getpid(self):
+ h = self.host()
+ self.assertNotEqual(h.getpid(), 0)
+
def test_basename(self):
h = self.host()
self.assertEqual(h.basename('foo.txt'), 'foo.txt')
diff --git a/third_party/typ/typ/tests/main_test.py b/third_party/typ/typ/tests/main_test.py
index e5883d9..ff9b975 100644
--- a/third_party/typ/typ/tests/main_test.py
+++ b/third_party/typ/typ/tests/main_test.py
@@ -24,7 +24,6 @@ from typ import test_case
from typ import Host
from typ import VERSION
from typ.fakes import test_result_server_fake
-from typ.fakes.unittest_fakes import FakeTestLoader
is_python3 = bool(sys.version_info.major == 3)
@@ -140,47 +139,6 @@ class ExpectedFailures(unittest.TestCase):
SF_TEST_FILES = {'sf_test.py': SF_TEST_PY}
-ST_TEST_PY = """
-import unittest
-from typ import test_case as typ_test_case
-
-def setupProcess(child, context):
- if context is None:
- context = {'calls': 0}
- child.host.print_('setupProcess(%d): %s' % (child.worker_num, context))
- context['calls'] += 1
- return context
-
-
-def teardownProcess(child, context):
- child.host.print_('\\nteardownProcess(%d): %s' %
- (child.worker_num, context))
-
-
-class UnitTest(unittest.TestCase):
- def test_one(self):
- self.assertFalse(hasattr(self, 'host'))
- self.assertFalse(hasattr(self, 'context'))
-
- def test_two(self):
- pass
-
-
-class TypTest(typ_test_case.TestCase):
- def test_one(self):
- self.assertNotEquals(self.child, None)
- self.assertGreaterEqual(self.context['calls'], 1)
- self.context['calls'] += 1
-
- def test_two(self):
- self.assertNotEquals(self.context, None)
- self.assertGreaterEqual(self.context['calls'], 1)
- self.context['calls'] += 1
-"""
-
-
-ST_TEST_FILES = {'st_test.py': ST_TEST_PY}
-
LOAD_TEST_PY = """
import unittest
def load_tests(_, _2, _3):
@@ -211,7 +169,8 @@ path_to_main = os.path.join(
class TestCli(test_case.MainTestCase):
- prog = [sys.executable, '-B', path_to_main]
+ prog = [sys.executable, path_to_main]
+ files_to_ignore = ['*.pyc']
def test_bad_arg(self):
self.check(['--bad-arg'], ret=2, out='',
@@ -287,6 +246,29 @@ class TestCli(test_case.MainTestCase):
self.assertIn('fail_test.FailingTest.test_fail failed unexpectedly',
out)
+ def test_fail_then_pass(self):
+ files = {'fail_then_pass_test.py': d("""\
+ import unittest
+ count = 0
+ class FPTest(unittest.TestCase):
+ def test_count(self):
+ global count
+ count += 1
+ if count == 1:
+ self.fail()
+ """)}
+ _, out, _, files = self.check(['--retry-limit', '3',
+ '--write-full-results-to',
+ 'full_results.json'],
+ files=files, ret=0, err='')
+ self.assertIn('Retrying failed tests (attempt #1 of 3)', out)
+ self.assertNotIn('Retrying failed tests (attempt #2 of 3)', out)
+ self.assertIn('1 test run, 0 failures.\n', out)
+ results = json.loads(files['full_results.json'])
+ self.assertEqual(results['tests'][
+ 'fail_then_pass_test']['FPTest']['test_count']['actual'],
+ 'FAIL PASS')
+
def test_failures_are_not_elided(self):
_, out, _, _ = self.check(['--terminal-width=20'],
files=FAIL_TEST_FILES, ret=1, err='')
@@ -461,22 +443,6 @@ class TestCli(test_case.MainTestCase):
if 'test_fail failed unexpectedly:' in l]),
3)
- def test_setup_and_teardown_single_child(self):
- self.check(['--jobs', '1',
- '--setup', 'st_test.setupProcess',
- '--teardown', 'st_test.teardownProcess'],
- files=ST_TEST_FILES, ret=0, err='',
- out=d("""\
- setupProcess(1): {'calls': 0}
- [1/4] st_test.TypTest.test_one passed
- [2/4] st_test.TypTest.test_two passed
- [3/4] st_test.UnitTest.test_one passed
- [4/4] st_test.UnitTest.test_two passed
- teardownProcess(1): {'calls': 3}
-
- 4 tests run, 0 failures.
- """))
-
def test_skip(self):
self.check(['--skip', '*test_fail*'], files=FAIL_TEST_FILES, ret=1,
out='No tests to run.\n', err='')
@@ -539,6 +505,39 @@ class TestCli(test_case.MainTestCase):
' setup failed\n'
'9 tests run, 4 failures.\n'), out)
+ def test_skip_and_all(self):
+ # --all should override --skip
+ self.check(['-l', '--skip', '*test_pass'],
+ files=PASS_TEST_FILES, ret=1, err='',
+ out='No tests to run.\n')
+ self.check(['-l', '--all', '--skip', '*test_pass'],
+ files=PASS_TEST_FILES, ret=0, err='',
+ out='pass_test.PassingTest.test_pass\n')
+
+ def test_skip_decorators_and_all(self):
+ _, out, _, _ = self.check(['--all', '-j', '1', '-v', '-v'],
+ files=SF_TEST_FILES, ret=1, err='')
+ self.assertIn('sf_test.SkipClass.test_method failed', out)
+ self.assertIn('sf_test.SkipMethods.test_reason failed', out)
+ self.assertIn('sf_test.SkipMethods.test_skip_if_true failed', out)
+ self.assertIn('sf_test.SkipMethods.test_skip_if_false failed', out)
+
+ # --all does not override explicit calls to skipTest(), only
+ # the decorators.
+ self.assertIn('sf_test.SkipSetup.test_notrun was skipped', out)
+
+ def test_subdir(self):
+ files = {
+ 'foo/__init__.py': '',
+ 'foo/bar/__init__.py': '',
+ 'foo/bar/pass_test.py': PASS_TEST_PY
+ }
+ self.check(['foo/bar'], files=files, ret=0, err='',
+ out=d("""\
+ [1/1] foo.bar.pass_test.PassingTest.test_pass passed
+ 1 test run, 0 failures.
+ """))
+
def test_timing(self):
self.check(['-t'], files=PASS_TEST_FILES, ret=0, err='',
rout=('\[1/1\] pass_test.PassingTest.test_pass passed '
@@ -658,60 +657,24 @@ class TestMain(TestCli):
host.stdin = io.StringIO(stdin)
if env:
host.getenv = env.get
- host.capture_output(divert=not self.child.debugger)
+ host.capture_output()
orig_sys_path = sys.path[:]
- loader = FakeTestLoader(host, orig_sys_path)
+ orig_sys_modules = list(sys.modules.keys())
try:
- ret = main(argv + ['-j', '1'], host, loader)
+ ret = main(argv + ['-j', '1'], host)
finally:
out, err = host.restore_output()
+ modules_to_unload = []
+ for k in sys.modules:
+ if k not in orig_sys_modules:
+ modules_to_unload.append(k)
+ for k in modules_to_unload:
+ del sys.modules[k]
sys.path = orig_sys_path
return ret, out, err
- # TODO: figure out how to make these tests pass w/ trapping output.
def test_debugger(self):
- pass
-
- def test_coverage(self):
- pass
-
- def test_error(self):
- pass
-
- def test_output_for_failures(self):
- pass
-
- def test_verbose(self):
- pass
-
- # TODO: These tests need to execute the real tests (they can't use a
- # FakeTestLoader and FakeTestCase) because we're testing
- # the side effects the tests have on setup and teardown.
- def test_import_failure_missing_file(self):
- pass
-
- def test_import_failure_missing_package(self):
- pass
-
- def test_import_failure_no_tests(self):
- pass
-
- def test_import_failure_syntax_error(self):
- pass
-
- def test_load_tests_failure(self):
- pass
-
- def test_load_tests_single_worker(self):
- pass
-
- def test_load_tests_multiple_workers(self):
- pass
-
- def test_setup_and_teardown_single_child(self):
- pass
-
- def test_skips_and_failures(self):
+ # TODO: this test seems to hang under coverage.
pass
diff --git a/third_party/typ/typ/tests/pool_test.py b/third_party/typ/typ/tests/pool_test.py
index ef9cc46..fdb3f39 100644
--- a/third_party/typ/typ/tests/pool_test.py
+++ b/third_party/typ/typ/tests/pool_test.py
@@ -14,29 +14,41 @@
from typ import test_case
from typ.host import Host
-from typ.pool import make_pool
+from typ.pool import make_pool, _MessageType, _ProcessPool, _loop
-def setup_fn(host, worker_num, context): # pylint: disable=W0613
- context['setup'] = True
+def _pre(host, worker_num, context): # pylint: disable=W0613
+ context['pre'] = True
return context
-def teardown_fn(context):
- context['teardown'] = True
+def _post(context):
+ context['post'] = True
return context
-def echo_fn(context, msg):
- return '%s/%s/%s' % (context['setup'], context['teardown'], msg)
+def _echo(context, msg):
+ return '%s/%s/%s' % (context['pre'], context['post'], msg)
+
+
+def _error(context, msg): # pylint: disable=W0613
+ raise Exception('_error() raised Exception')
+
+
+def _interrupt(context, msg): # pylint: disable=W0613
+ raise KeyboardInterrupt()
+
+
+def _stub(*args): # pylint: disable=W0613
+ return None
class TestPool(test_case.TestCase):
def run_basic_test(self, jobs):
host = Host()
- context = {'setup': False, 'teardown': False}
- pool = make_pool(host, jobs, echo_fn, context, setup_fn, teardown_fn)
+ context = {'pre': False, 'post': False}
+ pool = make_pool(host, jobs, _echo, context, _pre, _post)
pool.send('hello')
pool.send('world')
msg1 = pool.get()
@@ -46,19 +58,110 @@ class TestPool(test_case.TestCase):
self.assertEqual(set([msg1, msg2]),
set(['True/False/hello',
'True/False/world']))
- expected_context = {'setup': True, 'teardown': True}
+ expected_context = {'pre': True, 'post': True}
expected_final_contexts = [expected_context for _ in range(jobs)]
self.assertEqual(final_contexts, expected_final_contexts)
- def test_single_job(self):
+ def run_through_loop(self, callback=None, pool=None):
+ callback = callback or _stub
+ if pool:
+ host = pool.host
+ else:
+ host = Host()
+ pool = _ProcessPool(host, 0, _stub, None, _stub, _stub)
+ pool.send('hello')
+
+ worker_num = 1
+ _loop(pool.requests, pool.responses, host, worker_num, callback,
+ None, _stub, _stub, should_loop=False)
+ return pool
+
+ def test_async_close(self):
+ host = Host()
+ pool = make_pool(host, 1, _echo, None, _stub, _stub)
+ pool.join()
+
+ def test_basic_one_job(self):
self.run_basic_test(1)
- def test_two_jobs(self):
+ def test_basic_two_jobs(self):
self.run_basic_test(2)
+ def test_join_discards_messages(self):
+ host = Host()
+ context = {'pre': False, 'post': False}
+ pool = make_pool(host, 2, _echo, context, _pre, _post)
+ pool.send('hello')
+ pool.close()
+ pool.join()
+ self.assertEqual(len(pool.discarded_responses), 1)
+
+ def test_join_gets_an_error(self):
+ host = Host()
+ pool = make_pool(host, 2, _error, None, _stub, _stub)
+ pool.send('hello')
+ pool.close()
+ try:
+ pool.join()
+ except Exception as e:
+ self.assertIn('_error() raised Exception', str(e))
+
+ def test_join_gets_an_interrupt(self):
+ host = Host()
+ pool = make_pool(host, 2, _interrupt, None, _stub, _stub)
+ pool.send('hello')
+ pool.close()
+ self.assertRaises(KeyboardInterrupt, pool.join)
+
+ def test_loop(self):
+ pool = self.run_through_loop()
+ resp = pool.get()
+ self.assertEqual(resp, None)
+ pool.requests.put((_MessageType.Close, None))
+ pool.close()
+ self.run_through_loop(pool=pool)
+ pool.join()
+
+ def test_loop_fails_to_respond(self):
+ # This tests what happens if _loop() tries to send a response
+ # on a closed queue; we can't simulate this directly through the
+ # api in a single thread.
+ pool = self.run_through_loop()
+ pool.requests.put((_MessageType.Request, None))
+ pool.requests.put((_MessageType.Close, None))
+ self.run_through_loop(pool=pool)
+ pool.join()
+
+ def test_loop_get_raises_error(self):
+ pool = self.run_through_loop(_error)
+ self.assertRaises(Exception, pool.get)
+ pool.requests.put((_MessageType.Close, None))
+ pool.close()
+ pool.join()
+
+ def test_loop_get_raises_interrupt(self):
+ pool = self.run_through_loop(_interrupt)
+ self.assertRaises(KeyboardInterrupt, pool.get)
+ pool.requests.put((_MessageType.Close, None))
+ pool.close()
+ pool.join()
+
+ def test_pickling_errors(self):
+ def unpicklable_fn(): # pragma: no cover
+ pass
+
+ host = Host()
+ jobs = 2
+ self.assertRaises(ValueError, make_pool,
+ host, jobs, _stub, unpicklable_fn, None, None)
+ self.assertRaises(ValueError, make_pool,
+ host, jobs, _stub, None, unpicklable_fn, None)
+ self.assertRaises(ValueError, make_pool,
+ host, jobs, _stub, None, None, unpicklable_fn)
+
def test_no_close(self):
host = Host()
- context = {'setup': False, 'teardown': False}
- pool = make_pool(host, 2, echo_fn, context, setup_fn, teardown_fn)
+ context = {'pre': False, 'post': False}
+ pool = make_pool(host, 2, _echo, context, _pre, _post)
final_contexts = pool.join()
self.assertEqual(final_contexts, [])
diff --git a/third_party/typ/typ/tests/runner_test.py b/third_party/typ/typ/tests/runner_test.py
new file mode 100644
index 0000000..3409879
--- /dev/null
+++ b/third_party/typ/typ/tests/runner_test.py
@@ -0,0 +1,88 @@
+# Copyright 2014 Dirk Pranke. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from textwrap import dedent as d
+
+
+from typ import Host, Runner, TestCase, TestSet, TestInput
+
+
+def _setup_process(child, context): # pylint: disable=W0613
+ return context
+
+
+def _teardown_process(child, context): # pylint: disable=W0613
+ return context
+
+
+class RunnerTests(TestCase):
+ def test_context(self):
+ r = Runner()
+ r.args.tests = ['typ.tests.runner_test.ContextTests']
+ ret, _, _ = r.run(context={'foo': 'bar'}, setup_fn=_setup_process,
+ teardown_fn=_teardown_process)
+ self.assertEqual(ret, 0)
+
+ def test_bad_default(self):
+ r = Runner()
+ ret = r.main(foo='bar')
+ self.assertEqual(ret, 2)
+
+ def test_good_default(self):
+ r = Runner()
+ ret = r.main(tests=['typ.tests.runner_test.ContextTests'])
+ self.assertEqual(ret, 0)
+
+
+class TestSetTests(TestCase):
+ # This class exists to test the failures that can come up if you
+ # create your own test sets and bypass find_tests(); failures that
+ # would normally be caught there can occur later during test execution.
+
+ def test_missing_name(self):
+ test_set = TestSet()
+ test_set.parallel_tests = [TestInput('nonexistent test')]
+ r = Runner()
+ ret, _, _ = r.run(test_set)
+ self.assertEqual(ret, 1)
+
+ def test_failing_load_test(self):
+ h = Host()
+ orig_wd = h.getcwd()
+ tmpdir = None
+ try:
+ tmpdir = h.mkdtemp()
+ h.chdir(tmpdir)
+ h.write_text_file('load_test.py', d("""\
+ import unittest
+ def load_tests(_, _2, _3):
+ assert False
+ """))
+ test_set = TestSet()
+ test_set.parallel_tests = [TestInput('load_test.BaseTest.test_x')]
+ r = Runner()
+ ret, _, _ = r.run(test_set)
+ self.assertEqual(ret, 1)
+ finally:
+ h.chdir(orig_wd)
+ if tmpdir:
+ h.rmtree(tmpdir)
+
+
+class ContextTests(TestCase):
+ def test_context(self):
+ # This test is mostly intended to be called by
+ # RunnerTests.test_context, above. It is not interesting on its own.
+ if self.context:
+ self.assertEquals(self.context['foo'], 'bar')
diff --git a/third_party/typ/typ/version.py b/third_party/typ/typ/version.py
index 8157d02..58b2468 100644
--- a/third_party/typ/typ/version.py
+++ b/third_party/typ/typ/version.py
@@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-VERSION = '0.8.1'
+VERSION = '0.8.4'