diff options
author | Jelmer Vernooij <jelmer@samba.org> | 2012-11-14 09:47:16 +0100 |
---|---|---|
committer | Jelmer Vernooij <jelmer@samba.org> | 2012-11-14 12:11:57 +0100 |
commit | a53caea7a27c8616cabfc2e5bdf91a90e35891d5 (patch) | |
tree | 5e1ad6e0387c076cc41d14213efbe4457d861690 /lib/subunit/python | |
parent | 7b654a8c180a6467147189332916a5e56634b5af (diff) | |
download | samba-a53caea7a27c8616cabfc2e5bdf91a90e35891d5.tar.gz samba-a53caea7a27c8616cabfc2e5bdf91a90e35891d5.tar.bz2 samba-a53caea7a27c8616cabfc2e5bdf91a90e35891d5.zip |
subunit: Update to latest upstream version.
Autobuild-User(master): Jelmer Vernooij <jelmer@samba.org>
Autobuild-Date(master): Wed Nov 14 12:11:58 CET 2012 on sn-devel-104
Diffstat (limited to 'lib/subunit/python')
-rw-r--r-- | lib/subunit/python/subunit/__init__.py | 71 | ||||
-rw-r--r-- | lib/subunit/python/subunit/filters.py | 125 | ||||
-rw-r--r-- | lib/subunit/python/subunit/iso8601.py | 2 | ||||
-rw-r--r-- | lib/subunit/python/subunit/test_results.py | 404 | ||||
-rwxr-xr-x | lib/subunit/python/subunit/tests/sample-script.py | 20 | ||||
-rwxr-xr-x | lib/subunit/python/subunit/tests/sample-two-script.py | 8 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_run.py | 4 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_subunit_filter.py | 170 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_test_protocol.py | 38 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_test_results.py | 272 |
10 files changed, 950 insertions, 164 deletions
diff --git a/lib/subunit/python/subunit/__init__.py b/lib/subunit/python/subunit/__init__.py index b4c939756f..6015c0e68c 100644 --- a/lib/subunit/python/subunit/__init__.py +++ b/lib/subunit/python/subunit/__init__.py @@ -121,8 +121,14 @@ import re import subprocess import sys import unittest +if sys.version_info > (3, 0): + from io import UnsupportedOperation as _UnsupportedOperation +else: + _UnsupportedOperation = AttributeError + from testtools import content, content_type, ExtendedToOriginalDecorator +from testtools.content import TracebackContent from testtools.compat import _b, _u, BytesIO, StringIO try: from testtools.testresult.real import _StringException @@ -182,9 +188,15 @@ def tags_to_new_gone(tags): class DiscardStream(object): """A filelike object which discards what is written to it.""" + def fileno(self): + raise _UnsupportedOperation() + def write(self, bytes): pass + def read(self, len=0): + return _b('') + class _ParserState(object): """State for the subunit parser.""" @@ -599,8 +611,8 @@ class TestProtocolClient(testresult.TestResult): def __init__(self, stream): testresult.TestResult.__init__(self) + stream = _make_stream_binary(stream) self._stream = stream - _make_stream_binary(stream) self._progress_fmt = _b("progress: ") self._bytes_eol = _b("\n") self._progress_plus = _b("+") @@ -682,10 +694,9 @@ class TestProtocolClient(testresult.TestResult): raise ValueError if error is not None: self._stream.write(self._start_simple) - # XXX: this needs to be made much stricter, along the lines of - # Martin[gz]'s work in testtools. Perhaps subunit can use that? - for line in self._exc_info_to_unicode(error, test).splitlines(): - self._stream.write(("%s\n" % line).encode('utf8')) + tb_content = TracebackContent(error, test) + for bytes in tb_content.iter_bytes(): + self._stream.write(bytes) elif details is not None: self._write_details(details) else: @@ -755,6 +766,15 @@ class TestProtocolClient(testresult.TestResult): self._stream.write(self._progress_fmt + prefix + offset + self._bytes_eol) + def tags(self, new_tags, gone_tags): + """Inform the client about tags added/removed from the stream.""" + if not new_tags and not gone_tags: + return + tags = set([tag.encode('utf8') for tag in new_tags]) + tags.update([_b("-") + tag.encode('utf8') for tag in gone_tags]) + tag_line = _b("tags: ") + _b(" ").join(tags) + _b("\n") + self._stream.write(tag_line) + def time(self, a_datetime): """Inform the client of the time. @@ -1122,7 +1142,7 @@ class ProtocolTestCase(object): :seealso: TestProtocolServer (the subunit wire protocol parser). """ - def __init__(self, stream, passthrough=None, forward=False): + def __init__(self, stream, passthrough=None, forward=None): """Create a ProtocolTestCase reading from stream. :param stream: A filelike object which a subunit stream can be read @@ -1132,9 +1152,11 @@ class ProtocolTestCase(object): :param forward: A stream to pass subunit input on to. If not supplied subunit input is not forwarded. """ + stream = _make_stream_binary(stream) self._stream = stream - _make_stream_binary(stream) self._passthrough = passthrough + if forward is not None: + forward = _make_stream_binary(forward) self._forward = forward def __call__(self, result=None): @@ -1217,11 +1239,6 @@ def get_default_formatter(): return stream -if sys.version_info > (3, 0): - from io import UnsupportedOperation as _NoFilenoError -else: - _NoFilenoError = AttributeError - def read_test_list(path): """Read a list of test ids from a file on disk. @@ -1236,15 +1253,37 @@ def read_test_list(path): def _make_stream_binary(stream): - """Ensure that a stream will be binary safe. See _make_binary_on_windows.""" + """Ensure that a stream will be binary safe. See _make_binary_on_windows. + + :return: A binary version of the same stream (some streams cannot be + 'fixed' but can be unwrapped). + """ try: fileno = stream.fileno() - except _NoFilenoError: - return - _make_binary_on_windows(fileno) + except _UnsupportedOperation: + pass + else: + _make_binary_on_windows(fileno) + return _unwrap_text(stream) def _make_binary_on_windows(fileno): """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078.""" if sys.platform == "win32": import msvcrt msvcrt.setmode(fileno, os.O_BINARY) + + +def _unwrap_text(stream): + """Unwrap stream if it is a text stream to get the original buffer.""" + if sys.version_info > (3, 0): + try: + # Read streams + if type(stream.read(0)) is str: + return stream.buffer + except (_UnsupportedOperation, IOError): + # Cannot read from the stream: try via writes + try: + stream.write(_b('')) + except TypeError: + return stream.buffer + return stream diff --git a/lib/subunit/python/subunit/filters.py b/lib/subunit/python/subunit/filters.py new file mode 100644 index 0000000000..dc3fd8aedb --- /dev/null +++ b/lib/subunit/python/subunit/filters.py @@ -0,0 +1,125 @@ +# subunit: extensions to python unittest to get test results from subprocesses. +# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net> +# +# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +# license at the users choice. A copy of both licenses are available in the +# project source as Apache-2.0 and BSD. You may not use this file except in +# compliance with one of these two licences. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# license you chose for the specific language governing permissions and +# limitations under that license. +# + + +from optparse import OptionParser +import sys + +from subunit import DiscardStream, ProtocolTestCase + + +def make_options(description): + parser = OptionParser(description=description) + parser.add_option( + "--no-passthrough", action="store_true", + help="Hide all non subunit input.", default=False, + dest="no_passthrough") + parser.add_option( + "-o", "--output-to", + help="Send the output to this path rather than stdout.") + parser.add_option( + "-f", "--forward", action="store_true", default=False, + help="Forward subunit stream on stdout.") + return parser + + +def run_tests_from_stream(input_stream, result, passthrough_stream=None, + forward_stream=None): + """Run tests from a subunit input stream through 'result'. + + :param input_stream: A stream containing subunit input. + :param result: A TestResult that will receive the test events. + :param passthrough_stream: All non-subunit input received will be + sent to this stream. If not provided, uses the ``TestProtocolServer`` + default, which is ``sys.stdout``. + :param forward_stream: All subunit input received will be forwarded + to this stream. If not provided, uses the ``TestProtocolServer`` + default, which is to not forward any input. + """ + test = ProtocolTestCase( + input_stream, passthrough=passthrough_stream, + forward=forward_stream) + result.startTestRun() + test.run(result) + result.stopTestRun() + + +def filter_by_result(result_factory, output_path, passthrough, forward, + input_stream=sys.stdin): + """Filter an input stream using a test result. + + :param result_factory: A callable that when passed an output stream + returns a TestResult. It is expected that this result will output + to the given stream. + :param output_path: A path send output to. If None, output will be go + to ``sys.stdout``. + :param passthrough: If True, all non-subunit input will be sent to + ``sys.stdout``. If False, that input will be discarded. + :param forward: If True, all subunit input will be forwarded directly to + ``sys.stdout`` as well as to the ``TestResult``. + :param input_stream: The source of subunit input. Defaults to + ``sys.stdin``. + :return: A test result with the resultts of the run. + """ + if passthrough: + passthrough_stream = sys.stdout + else: + passthrough_stream = DiscardStream() + + if forward: + forward_stream = sys.stdout + else: + forward_stream = DiscardStream() + + if output_path is None: + output_to = sys.stdout + else: + output_to = file(output_path, 'wb') + + try: + result = result_factory(output_to) + run_tests_from_stream( + input_stream, result, passthrough_stream, forward_stream) + finally: + if output_path: + output_to.close() + return result + + +def run_filter_script(result_factory, description, post_run_hook=None): + """Main function for simple subunit filter scripts. + + Many subunit filter scripts take a stream of subunit input and use a + TestResult to handle the events generated by that stream. This function + wraps a lot of the boiler-plate around that by making a script with + options for handling passthrough information and stream forwarding, and + that will exit with a successful return code (i.e. 0) if the input stream + represents a successful test run. + + :param result_factory: A callable that takes an output stream and returns + a test result that outputs to that stream. + :param description: A description of the filter script. + """ + parser = make_options(description) + (options, args) = parser.parse_args() + result = filter_by_result( + result_factory, options.output_to, not options.no_passthrough, + options.forward) + if post_run_hook: + post_run_hook(result) + if result.wasSuccessful(): + sys.exit(0) + else: + sys.exit(1) diff --git a/lib/subunit/python/subunit/iso8601.py b/lib/subunit/python/subunit/iso8601.py index cbe9a3b3eb..07855d0975 100644 --- a/lib/subunit/python/subunit/iso8601.py +++ b/lib/subunit/python/subunit/iso8601.py @@ -127,7 +127,7 @@ def parse_date(datestring, default_timezone=UTC): if groups["fraction"] is None: groups["fraction"] = 0 else: - groups["fraction"] = int(float("0.%s" % groups["fraction"]) * 1e6) + groups["fraction"] = int(float("0.%s" % groups["fraction"].decode()) * 1e6) return datetime(int(groups["year"]), int(groups["month"]), int(groups["day"]), int(groups["hour"]), int(groups["minute"]), int(groups["second"]), int(groups["fraction"]), tz) diff --git a/lib/subunit/python/subunit/test_results.py b/lib/subunit/python/subunit/test_results.py index 33fb50e073..c00a2d3e97 100644 --- a/lib/subunit/python/subunit/test_results.py +++ b/lib/subunit/python/subunit/test_results.py @@ -16,9 +16,15 @@ """TestResult helper classes used to by subunit.""" +import csv import datetime import testtools +from testtools.compat import all +from testtools.content import ( + text_content, + TracebackContent, + ) from subunit import iso8601 @@ -34,6 +40,9 @@ class TestResultDecorator(object): or features by degrading them. """ + # XXX: Since lp:testtools r250, this is in testtools. Once it's released, + # we should gut this and just use that. + def __init__(self, decorated): """Create a TestResultDecorator forwarding to decorated.""" # Make every decorator degrade gracefully. @@ -200,34 +209,44 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator): return self.decorated.time(a_datetime) -class TagCollapsingDecorator(TestResultDecorator): - """Collapses many 'tags' calls into one where possible.""" +class TagsMixin(object): - def __init__(self, result): - super(TagCollapsingDecorator, self).__init__(result) - # The (new, gone) tags for the current test. - self._current_test_tags = None + def __init__(self): + self._clear_tags() - def startTest(self, test): - """Start a test. + def _clear_tags(self): + self._global_tags = set(), set() + self._test_tags = None - Not directly passed to the client, but used for handling of tags - correctly. - """ - self.decorated.startTest(test) - self._current_test_tags = set(), set() + def _get_active_tags(self): + global_new, global_gone = self._global_tags + if self._test_tags is None: + return set(global_new) + test_new, test_gone = self._test_tags + return global_new.difference(test_gone).union(test_new) - def stopTest(self, test): - """Stop a test. + def _get_current_scope(self): + if self._test_tags: + return self._test_tags + return self._global_tags - Not directly passed to the client, but used for handling of tags - correctly. - """ - # Tags to output for this test. - if self._current_test_tags[0] or self._current_test_tags[1]: - self.decorated.tags(*self._current_test_tags) - self.decorated.stopTest(test) - self._current_test_tags = None + def _flush_current_scope(self, tag_receiver): + new_tags, gone_tags = self._get_current_scope() + if new_tags or gone_tags: + tag_receiver.tags(new_tags, gone_tags) + if self._test_tags: + self._test_tags = set(), set() + else: + self._global_tags = set(), set() + + def startTestRun(self): + self._clear_tags() + + def startTest(self, test): + self._test_tags = set(), set() + + def stopTest(self, test): + self._test_tags = None def tags(self, new_tags, gone_tags): """Handle tag instructions. @@ -238,14 +257,25 @@ class TagCollapsingDecorator(TestResultDecorator): :param new_tags: Tags to add, :param gone_tags: Tags to remove. """ - if self._current_test_tags is not None: - # gather the tags until the test stops. - self._current_test_tags[0].update(new_tags) - self._current_test_tags[0].difference_update(gone_tags) - self._current_test_tags[1].update(gone_tags) - self._current_test_tags[1].difference_update(new_tags) - else: - return self.decorated.tags(new_tags, gone_tags) + current_new_tags, current_gone_tags = self._get_current_scope() + current_new_tags.update(new_tags) + current_new_tags.difference_update(gone_tags) + current_gone_tags.update(gone_tags) + current_gone_tags.difference_update(new_tags) + + +class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin): + """Collapses many 'tags' calls into one where possible.""" + + def __init__(self, result): + super(TagCollapsingDecorator, self).__init__(result) + self._clear_tags() + + def _before_event(self): + self._flush_current_scope(self.decorated) + + def tags(self, new_tags, gone_tags): + TagsMixin.tags(self, new_tags, gone_tags) class TimeCollapsingDecorator(HookedTestResultDecorator): @@ -273,93 +303,58 @@ class TimeCollapsingDecorator(HookedTestResultDecorator): self._last_received_time = a_time -def all_true(bools): - """Return True if all of 'bools' are True. False otherwise.""" - for b in bools: - if not b: - return False - return True +def and_predicates(predicates): + """Return a predicate that is true iff all predicates are true.""" + # XXX: Should probably be in testtools to be better used by matchers. jml + return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates) -class TestResultFilter(TestResultDecorator): - """A pyunit TestResult interface implementation which filters tests. +def make_tag_filter(with_tags, without_tags): + """Make a callback that checks tests against tags.""" - Tests that pass the filter are handed on to another TestResult instance - for further processing/reporting. To obtain the filtered results, - the other instance must be interrogated. + with_tags = with_tags and set(with_tags) or None + without_tags = without_tags and set(without_tags) or None - :ivar result: The result that tests are passed to after filtering. - :ivar filter_predicate: The callback run to decide whether to pass - a result. - """ + def check_tags(test, outcome, err, details, tags): + if with_tags and not with_tags <= tags: + return False + if without_tags and bool(without_tags & tags): + return False + return True - def __init__(self, result, filter_error=False, filter_failure=False, - filter_success=True, filter_skip=False, filter_xfail=False, - filter_predicate=None, fixup_expected_failures=None): - """Create a FilterResult object filtering to result. + return check_tags - :param filter_error: Filter out errors. - :param filter_failure: Filter out failures. - :param filter_success: Filter out successful tests. - :param filter_skip: Filter out skipped tests. - :param filter_xfail: Filter out expected failure tests. - :param filter_predicate: A callable taking (test, outcome, err, - details) and returning True if the result should be passed - through. err and details may be none if no error or extra - metadata is available. outcome is the name of the outcome such - as 'success' or 'failure'. - :param fixup_expected_failures: Set of test ids to consider known - failing. - """ - super(TestResultFilter, self).__init__(result) + +class _PredicateFilter(TestResultDecorator, TagsMixin): + + def __init__(self, result, predicate): + super(_PredicateFilter, self).__init__(result) + self._clear_tags() self.decorated = TimeCollapsingDecorator( TagCollapsingDecorator(self.decorated)) - predicates = [] - if filter_error: - predicates.append(lambda t, outcome, e, d: outcome != 'error') - if filter_failure: - predicates.append(lambda t, outcome, e, d: outcome != 'failure') - if filter_success: - predicates.append(lambda t, outcome, e, d: outcome != 'success') - if filter_skip: - predicates.append(lambda t, outcome, e, d: outcome != 'skip') - if filter_xfail: - predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure') - if filter_predicate is not None: - predicates.append(filter_predicate) - self.filter_predicate = ( - lambda test, outcome, err, details: - all_true(p(test, outcome, err, details) for p in predicates)) + self._predicate = predicate # The current test (for filtering tags) self._current_test = None # Has the current test been filtered (for outputting test tags) self._current_test_filtered = None # Calls to this result that we don't know whether to forward on yet. self._buffered_calls = [] - if fixup_expected_failures is None: - self._fixup_expected_failures = frozenset() - else: - self._fixup_expected_failures = fixup_expected_failures + + def filter_predicate(self, test, outcome, error, details): + return self._predicate( + test, outcome, error, details, self._get_active_tags()) def addError(self, test, err=None, details=None): if (self.filter_predicate(test, 'error', err, details)): - if self._failure_expected(test): - self._buffered_calls.append( - ('addExpectedFailure', [test, err], {'details': details})) - else: - self._buffered_calls.append( - ('addError', [test, err], {'details': details})) + self._buffered_calls.append( + ('addError', [test, err], {'details': details})) else: self._filtered() def addFailure(self, test, err=None, details=None): if (self.filter_predicate(test, 'failure', err, details)): - if self._failure_expected(test): - self._buffered_calls.append( - ('addExpectedFailure', [test, err], {'details': details})) - else: - self._buffered_calls.append( - ('addFailure', [test, err], {'details': details})) + self._buffered_calls.append( + ('addFailure', [test, err], {'details': details})) else: self._filtered() @@ -370,17 +365,6 @@ class TestResultFilter(TestResultDecorator): else: self._filtered() - def addSuccess(self, test, details=None): - if (self.filter_predicate(test, 'success', None, details)): - if self._failure_expected(test): - self._buffered_calls.append( - ('addUnexpectedSuccess', [test], {'details': details})) - else: - self._buffered_calls.append( - ('addSuccess', [test], {'details': details})) - else: - self._filtered() - def addExpectedFailure(self, test, err=None, details=None): if self.filter_predicate(test, 'expectedfailure', err, details): self._buffered_calls.append( @@ -392,18 +376,23 @@ class TestResultFilter(TestResultDecorator): self._buffered_calls.append( ('addUnexpectedSuccess', [test], {'details': details})) + def addSuccess(self, test, details=None): + if (self.filter_predicate(test, 'success', None, details)): + self._buffered_calls.append( + ('addSuccess', [test], {'details': details})) + else: + self._filtered() + def _filtered(self): self._current_test_filtered = True - def _failure_expected(self, test): - return (test.id() in self._fixup_expected_failures) - def startTest(self, test): """Start a test. Not directly passed to the client, but used for handling of tags correctly. """ + TagsMixin.startTest(self, test) self._current_test = test self._current_test_filtered = False self._buffered_calls.append(('startTest', [test], {})) @@ -415,19 +404,23 @@ class TestResultFilter(TestResultDecorator): correctly. """ if not self._current_test_filtered: - # Tags to output for this test. for method, args, kwargs in self._buffered_calls: getattr(self.decorated, method)(*args, **kwargs) self.decorated.stopTest(test) self._current_test = None self._current_test_filtered = None self._buffered_calls = [] + TagsMixin.stopTest(self, test) - def time(self, a_time): + def tags(self, new_tags, gone_tags): + TagsMixin.tags(self, new_tags, gone_tags) if self._current_test is not None: - self._buffered_calls.append(('time', [a_time], {})) + self._buffered_calls.append(('tags', [new_tags, gone_tags], {})) else: - return self.decorated.time(a_time) + return super(_PredicateFilter, self).tags(new_tags, gone_tags) + + def time(self, a_time): + return self.decorated.time(a_time) def id_to_orig_id(self, id): if id.startswith("subunit.RemotedTestCase."): @@ -435,6 +428,95 @@ class TestResultFilter(TestResultDecorator): return id +class TestResultFilter(TestResultDecorator): + """A pyunit TestResult interface implementation which filters tests. + + Tests that pass the filter are handed on to another TestResult instance + for further processing/reporting. To obtain the filtered results, + the other instance must be interrogated. + + :ivar result: The result that tests are passed to after filtering. + :ivar filter_predicate: The callback run to decide whether to pass + a result. + """ + + def __init__(self, result, filter_error=False, filter_failure=False, + filter_success=True, filter_skip=False, filter_xfail=False, + filter_predicate=None, fixup_expected_failures=None): + """Create a FilterResult object filtering to result. + + :param filter_error: Filter out errors. + :param filter_failure: Filter out failures. + :param filter_success: Filter out successful tests. + :param filter_skip: Filter out skipped tests. + :param filter_xfail: Filter out expected failure tests. + :param filter_predicate: A callable taking (test, outcome, err, + details, tags) and returning True if the result should be passed + through. err and details may be none if no error or extra + metadata is available. outcome is the name of the outcome such + as 'success' or 'failure'. tags is new in 0.0.8; 0.0.7 filters + are still supported but should be updated to accept the tags + parameter for efficiency. + :param fixup_expected_failures: Set of test ids to consider known + failing. + """ + predicates = [] + if filter_error: + predicates.append( + lambda t, outcome, e, d, tags: outcome != 'error') + if filter_failure: + predicates.append( + lambda t, outcome, e, d, tags: outcome != 'failure') + if filter_success: + predicates.append( + lambda t, outcome, e, d, tags: outcome != 'success') + if filter_skip: + predicates.append( + lambda t, outcome, e, d, tags: outcome != 'skip') + if filter_xfail: + predicates.append( + lambda t, outcome, e, d, tags: outcome != 'expectedfailure') + if filter_predicate is not None: + def compat(test, outcome, error, details, tags): + # 0.0.7 and earlier did not support the 'tags' parameter. + try: + return filter_predicate( + test, outcome, error, details, tags) + except TypeError: + return filter_predicate(test, outcome, error, details) + predicates.append(compat) + predicate = and_predicates(predicates) + super(TestResultFilter, self).__init__( + _PredicateFilter(result, predicate)) + if fixup_expected_failures is None: + self._fixup_expected_failures = frozenset() + else: + self._fixup_expected_failures = fixup_expected_failures + + def addError(self, test, err=None, details=None): + if self._failure_expected(test): + self.addExpectedFailure(test, err=err, details=details) + else: + super(TestResultFilter, self).addError( + test, err=err, details=details) + + def addFailure(self, test, err=None, details=None): + if self._failure_expected(test): + self.addExpectedFailure(test, err=err, details=details) + else: + super(TestResultFilter, self).addFailure( + test, err=err, details=details) + + def addSuccess(self, test, details=None): + if self._failure_expected(test): + self.addUnexpectedSuccess(test, details=details) + else: + super(TestResultFilter, self).addSuccess(test, details=details) + + def _failure_expected(self, test): + return (test.id() in self._fixup_expected_failures) + + class TestIdPrintingResult(testtools.TestResult): def __init__(self, stream, show_times=False): @@ -493,3 +575,97 @@ class TestIdPrintingResult(testtools.TestResult): def wasSuccessful(self): "Tells whether or not this result was a success" return self.failed_tests == 0 + + +class TestByTestResult(testtools.TestResult): + """Call something every time a test completes.""" + +# XXX: In testtools since lp:testtools r249. Once that's released, just +# import that. + + def __init__(self, on_test): + """Construct a ``TestByTestResult``. + + :param on_test: A callable that take a test case, a status (one of + "success", "failure", "error", "skip", or "xfail"), a start time + (a ``datetime`` with timezone), a stop time, an iterable of tags, + and a details dict. Is called at the end of each test (i.e. on + ``stopTest``) with the accumulated values for that test. + """ + super(TestByTestResult, self).__init__() + self._on_test = on_test + + def startTest(self, test): + super(TestByTestResult, self).startTest(test) + self._start_time = self._now() + # There's no supported (i.e. tested) behaviour that relies on these + # being set, but it makes me more comfortable all the same. -- jml + self._status = None + self._details = None + self._stop_time = None + + def stopTest(self, test): + self._stop_time = self._now() + super(TestByTestResult, self).stopTest(test) + self._on_test( + test=test, + status=self._status, + start_time=self._start_time, + stop_time=self._stop_time, + # current_tags is new in testtools 0.9.13. + tags=getattr(self, 'current_tags', None), + details=self._details) + + def _err_to_details(self, test, err, details): + if details: + return details + return {'traceback': TracebackContent(err, test)} + + def addSuccess(self, test, details=None): + super(TestByTestResult, self).addSuccess(test) + self._status = 'success' + self._details = details + + def addFailure(self, test, err=None, details=None): + super(TestByTestResult, self).addFailure(test, err, details) + self._status = 'failure' + self._details = self._err_to_details(test, err, details) + + def addError(self, test, err=None, details=None): + super(TestByTestResult, self).addError(test, err, details) + self._status = 'error' + self._details = self._err_to_details(test, err, details) + + def addSkip(self, test, reason=None, details=None): + super(TestByTestResult, self).addSkip(test, reason, details) + self._status = 'skip' + if details is None: + details = {'reason': text_content(reason)} + elif reason: + # XXX: What if details already has 'reason' key? + details['reason'] = text_content(reason) + self._details = details + + def addExpectedFailure(self, test, err=None, details=None): + super(TestByTestResult, self).addExpectedFailure(test, err, details) + self._status = 'xfail' + self._details = self._err_to_details(test, err, details) + + def addUnexpectedSuccess(self, test, details=None): + super(TestByTestResult, self).addUnexpectedSuccess(test, details) + self._status = 'success' + self._details = details + + +class CsvResult(TestByTestResult): + + def __init__(self, stream): + super(CsvResult, self).__init__(self._on_test) + self._write_row = csv.writer(stream).writerow + + def _on_test(self, test, status, start_time, stop_time, tags, details): + self._write_row([test.id(), status, start_time, stop_time]) + + def startTestRun(self): + super(CsvResult, self).startTestRun() + self._write_row(['test', 'status', 'start_time', 'stop_time']) diff --git a/lib/subunit/python/subunit/tests/sample-script.py b/lib/subunit/python/subunit/tests/sample-script.py index 618e4952d7..91838f6d6f 100755 --- a/lib/subunit/python/subunit/tests/sample-script.py +++ b/lib/subunit/python/subunit/tests/sample-script.py @@ -7,15 +7,15 @@ if len(sys.argv) == 2: # subunit.tests.test_test_protocol.TestExecTestCase.test_sample_method_args # uses this code path to be sure that the arguments were passed to # sample-script.py - print "test fail" - print "error fail" + print("test fail") + print("error fail") sys.exit(0) -print "test old mcdonald" -print "success old mcdonald" -print "test bing crosby" -print "failure bing crosby [" -print "foo.c:53:ERROR invalid state" -print "]" -print "test an error" -print "error an error" +print("test old mcdonald") +print("success old mcdonald") +print("test bing crosby") +print("failure bing crosby [") +print("foo.c:53:ERROR invalid state") +print("]") +print("test an error") +print("error an error") sys.exit(0) diff --git a/lib/subunit/python/subunit/tests/sample-two-script.py b/lib/subunit/python/subunit/tests/sample-two-script.py index d5550842bf..fc73dfc409 100755 --- a/lib/subunit/python/subunit/tests/sample-two-script.py +++ b/lib/subunit/python/subunit/tests/sample-two-script.py @@ -1,7 +1,7 @@ #!/usr/bin/env python import sys -print "test old mcdonald" -print "success old mcdonald" -print "test bing crosby" -print "success bing crosby" +print("test old mcdonald") +print("success old mcdonald") +print("test bing crosby") +print("success bing crosby") sys.exit(0) diff --git a/lib/subunit/python/subunit/tests/test_run.py b/lib/subunit/python/subunit/tests/test_run.py index 5a96bcf30e..10519ed086 100644 --- a/lib/subunit/python/subunit/tests/test_run.py +++ b/lib/subunit/python/subunit/tests/test_run.py @@ -14,7 +14,7 @@ # limitations under that license. # -from cStringIO import StringIO +from testtools.compat import BytesIO import unittest from testtools import PlaceHolder @@ -42,7 +42,7 @@ class TimeCollectingTestResult(unittest.TestResult): class TestSubunitTestRunner(unittest.TestCase): def test_includes_timing_output(self): - io = StringIO() + io = BytesIO() runner = SubunitTestRunner(stream=io) test = PlaceHolder('name') runner.run(test) diff --git a/lib/subunit/python/subunit/tests/test_subunit_filter.py b/lib/subunit/python/subunit/tests/test_subunit_filter.py index 06754840eb..33b924824d 100644 --- a/lib/subunit/python/subunit/tests/test_subunit_filter.py +++ b/lib/subunit/python/subunit/tests/test_subunit_filter.py @@ -17,15 +17,18 @@ """Tests for subunit.TestResultFilter.""" from datetime import datetime +import os +import subprocess +import sys from subunit import iso8601 import unittest from testtools import TestCase -from testtools.compat import _b, BytesIO, StringIO +from testtools.compat import _b, BytesIO from testtools.testresult.doubles import ExtendedTestResult import subunit -from subunit.test_results import TestResultFilter +from subunit.test_results import make_tag_filter, TestResultFilter class TestTestResultFilter(TestCase): @@ -77,6 +80,40 @@ xfail todo filtered_result.failures]) self.assertEqual(4, filtered_result.testsRun) + def test_tag_filter(self): + tag_filter = make_tag_filter(['global'], ['local']) + result = ExtendedTestResult() + result_filter = TestResultFilter( + result, filter_success=False, filter_predicate=tag_filter) + self.run_tests(result_filter) + tests_included = [ + event[1] for event in result._events if event[0] == 'startTest'] + tests_expected = list(map( + subunit.RemotedTestCase, + ['passed', 'error', 'skipped', 'todo'])) + self.assertEquals(tests_expected, tests_included) + + def test_tags_tracked_correctly(self): + tag_filter = make_tag_filter(['a'], []) + result = ExtendedTestResult() + result_filter = TestResultFilter( + result, filter_success=False, filter_predicate=tag_filter) + input_stream = _b( + "test: foo\n" + "tags: a\n" + "successful: foo\n" + "test: bar\n" + "successful: bar\n") + self.run_tests(result_filter, input_stream) + foo = subunit.RemotedTestCase('foo') + self.assertEquals( + [('startTest', foo), + ('tags', set(['a']), set()), + ('addSuccess', foo), + ('stopTest', foo), + ], + result._events) + def test_exclude_errors(self): filtered_result = unittest.TestResult() result_filter = TestResultFilter(filtered_result, filter_error=True) @@ -151,6 +188,8 @@ xfail todo def test_filter_predicate(self): """You can filter by predicate callbacks""" + # 0.0.7 and earlier did not support the 'tags' parameter, so we need + # to test that we still support behaviour without it. filtered_result = unittest.TestResult() def filter_cb(test, outcome, err, details): return outcome == 'success' @@ -161,6 +200,18 @@ xfail todo # Only success should pass self.assertEqual(1, filtered_result.testsRun) + def test_filter_predicate_with_tags(self): + """You can filter by predicate callbacks that accept tags""" + filtered_result = unittest.TestResult() + def filter_cb(test, outcome, err, details, tags): + return outcome == 'success' + result_filter = TestResultFilter(filtered_result, + filter_predicate=filter_cb, + filter_success=False) + self.run_tests(result_filter) + # Only success should pass + self.assertEqual(1, filtered_result.testsRun) + def test_time_ordering_preserved(self): # Passing a subunit stream through TestResultFilter preserves the # relative ordering of 'time' directives and any other subunit @@ -179,14 +230,41 @@ xfail todo result_filter = TestResultFilter(result) self.run_tests(result_filter, subunit_stream) foo = subunit.RemotedTestCase('foo') - self.assertEquals( + self.maxDiff = None + self.assertEqual( [('time', date_a), - ('startTest', foo), ('time', date_b), + ('startTest', foo), ('addError', foo, {}), ('stopTest', foo), ('time', date_c)], result._events) + def test_time_passes_through_filtered_tests(self): + # Passing a subunit stream through TestResultFilter preserves 'time' + # directives even if a specific test is filtered out. + date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC) + date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC) + date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC) + subunit_stream = _b('\n'.join([ + "time: %s", + "test: foo", + "time: %s", + "success: foo", + "time: %s", + ""]) % (date_a, date_b, date_c)) + result = ExtendedTestResult() + result_filter = TestResultFilter(result) + result_filter.startTestRun() + self.run_tests(result_filter, subunit_stream) + result_filter.stopTestRun() + foo = subunit.RemotedTestCase('foo') + self.maxDiff = None + self.assertEqual( + [('startTestRun',), + ('time', date_a), + ('time', date_c), + ('stopTestRun',),], result._events) + def test_skip_preserved(self): subunit_stream = _b('\n'.join([ "test: foo", @@ -201,6 +279,90 @@ xfail todo ('addSkip', foo, {}), ('stopTest', foo), ], result._events) + if sys.version_info < (2, 7): + # These tests require Python >=2.7. + del test_fixup_expected_failures, test_fixup_expected_errors, test_fixup_unexpected_success + + +class TestFilterCommand(TestCase): + + example_subunit_stream = _b("""\ +tags: global +test passed +success passed +test failed +tags: local +failure failed +test error +error error [ +error details +] +test skipped +skip skipped +test todo +xfail todo +""") + + def run_command(self, args, stream): + root = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) + script_path = os.path.join(root, 'filters', 'subunit-filter') + command = [sys.executable, script_path] + list(args) + ps = subprocess.Popen( + command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = ps.communicate(stream) + if ps.returncode != 0: + raise RuntimeError("%s failed: %s" % (command, err)) + return out + + def to_events(self, stream): + test = subunit.ProtocolTestCase(BytesIO(stream)) + result = ExtendedTestResult() + test.run(result) + return result._events + + def test_default(self): + output = self.run_command([], _b( + "test: foo\n" + "skip: foo\n" + )) + events = self.to_events(output) + foo = subunit.RemotedTestCase('foo') + self.assertEqual( + [('startTest', foo), + ('addSkip', foo, {}), + ('stopTest', foo)], + events) + + def test_tags(self): + output = self.run_command(['-s', '--with-tag', 'a'], _b( + "tags: a\n" + "test: foo\n" + "success: foo\n" + "tags: -a\n" + "test: bar\n" + "success: bar\n" + "test: baz\n" + "tags: a\n" + "success: baz\n" + )) + events = self.to_events(output) + foo = subunit.RemotedTestCase('foo') + baz = subunit.RemotedTestCase('baz') + self.assertEqual( + [('tags', set(['a']), set()), + ('startTest', foo), + ('addSuccess', foo), + ('stopTest', foo), + ('tags', set(), set(['a'])), + ('startTest', baz), + ('tags', set(['a']), set()), + ('addSuccess', baz), + ('stopTest', baz), + ], + events) + def test_suite(): loader = subunit.tests.TestUtil.TestLoader() diff --git a/lib/subunit/python/subunit/tests/test_test_protocol.py b/lib/subunit/python/subunit/tests/test_test_protocol.py index c93aabd80c..ec6830d03b 100644 --- a/lib/subunit/python/subunit/tests/test_test_protocol.py +++ b/lib/subunit/python/subunit/tests/test_test_protocol.py @@ -18,9 +18,9 @@ import datetime import unittest import os -from testtools import skipIf, TestCase -from testtools.compat import _b, _u, BytesIO, StringIO -from testtools.content import Content, TracebackContent +from testtools import skipIf, TestCase, TestResult +from testtools.compat import _b, _u, BytesIO +from testtools.content import Content, TracebackContent, text_content from testtools.content_type import ContentType try: from testtools.testresult.doubles import ( @@ -40,6 +40,10 @@ from subunit import _remote_exception_str, _remote_exception_str_chunked import subunit.iso8601 as iso8601 +def details_to_str(details): + return TestResult()._err_details_to_string(None, details=details) + + class TestTestImports(unittest.TestCase): def test_imports(self): @@ -87,11 +91,12 @@ class TestTestProtocolServerPipe(unittest.TestCase): def test_story(self): client = unittest.TestResult() protocol = subunit.TestProtocolServer(client) + traceback = "foo.c:53:ERROR invalid state\n" pipe = BytesIO(_b("test old mcdonald\n" "success old mcdonald\n" "test bing crosby\n" "failure bing crosby [\n" - "foo.c:53:ERROR invalid state\n" + + traceback + "]\n" "test an error\n" "error an error\n")) @@ -102,9 +107,8 @@ class TestTestProtocolServerPipe(unittest.TestCase): [(an_error, _remote_exception_str + '\n')]) self.assertEqual( client.failures, - [(bing, _remote_exception_str + ": Text attachment: traceback\n" - "------------\nfoo.c:53:ERROR invalid state\n" - "------------\n\n")]) + [(bing, _remote_exception_str + ": " + + details_to_str({'traceback': text_content(traceback)}) + "\n")]) self.assertEqual(client.testsRun, 3) def test_non_test_characters_forwarded_immediately(self): @@ -559,9 +563,7 @@ class TestTestProtocolServerAddxFail(unittest.TestCase): value = details else: if error_message is not None: - value = subunit.RemoteError(_u("Text attachment: traceback\n" - "------------\n") + _u(error_message) + - _u("------------\n")) + value = subunit.RemoteError(details_to_str(details)) else: value = subunit.RemoteError() self.assertEqual([ @@ -1299,6 +1301,22 @@ class TestTestProtocolClient(unittest.TestCase): "something\n" "F\r\nserialised\nform0\r\n]\n" % self.test.id())) + def test_tags_empty(self): + self.protocol.tags(set(), set()) + self.assertEqual(_b(""), self.io.getvalue()) + + def test_tags_add(self): + self.protocol.tags(set(['foo']), set()) + self.assertEqual(_b("tags: foo\n"), self.io.getvalue()) + + def test_tags_both(self): + self.protocol.tags(set(['quux']), set(['bar'])) + self.assertEqual(_b("tags: quux -bar\n"), self.io.getvalue()) + + def test_tags_gone(self): + self.protocol.tags(set(), set(['bar'])) + self.assertEqual(_b("tags: -bar\n"), self.io.getvalue()) + def test_suite(): loader = subunit.tests.TestUtil.TestLoader() diff --git a/lib/subunit/python/subunit/tests/test_test_results.py b/lib/subunit/python/subunit/tests/test_test_results.py index 94d22748e8..236dfa22e5 100644 --- a/lib/subunit/python/subunit/tests/test_test_results.py +++ b/lib/subunit/python/subunit/tests/test_test_results.py @@ -14,16 +14,25 @@ # limitations under that license. # +import csv import datetime +import sys import unittest from testtools import TestCase +from testtools.compat import StringIO +from testtools.content import ( + text_content, + TracebackContent, + ) from testtools.testresult.doubles import ExtendedTestResult import subunit import subunit.iso8601 as iso8601 import subunit.test_results +import testtools + class LoggingDecorator(subunit.test_results.HookedTestResultDecorator): @@ -192,12 +201,55 @@ class TestAutoTimingTestResultDecorator(unittest.TestCase): class TestTagCollapsingDecorator(TestCase): - def test_tags_forwarded_outside_of_tests(self): + def test_tags_collapsed_outside_of_tests(self): result = ExtendedTestResult() tag_collapser = subunit.test_results.TagCollapsingDecorator(result) - tag_collapser.tags(set(['a', 'b']), set()) + tag_collapser.tags(set(['a']), set()) + tag_collapser.tags(set(['b']), set()) + tag_collapser.startTest(self) self.assertEquals( - [('tags', set(['a', 'b']), set([]))], result._events) + [('tags', set(['a', 'b']), set([])), + ('startTest', self), + ], result._events) + + def test_tags_collapsed_outside_of_tests_are_flushed(self): + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TagCollapsingDecorator(result) + tag_collapser.startTestRun() + tag_collapser.tags(set(['a']), set()) + tag_collapser.tags(set(['b']), set()) + tag_collapser.startTest(self) + tag_collapser.addSuccess(self) + tag_collapser.stopTest(self) + tag_collapser.stopTestRun() + self.assertEquals( + [('startTestRun',), + ('tags', set(['a', 'b']), set([])), + ('startTest', self), + ('addSuccess', self), + ('stopTest', self), + ('stopTestRun',), + ], result._events) + + def test_tags_forwarded_after_tests(self): + test = subunit.RemotedTestCase('foo') + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TagCollapsingDecorator(result) + tag_collapser.startTestRun() + tag_collapser.startTest(test) + tag_collapser.addSuccess(test) + tag_collapser.stopTest(test) + tag_collapser.tags(set(['a']), set(['b'])) + tag_collapser.stopTestRun() + self.assertEqual( + [('startTestRun',), + ('startTest', test), + ('addSuccess', test), + ('stopTest', test), + ('tags', set(['a']), set(['b'])), + ('stopTestRun',), + ], + result._events) def test_tags_collapsed_inside_of_tests(self): result = ExtendedTestResult() @@ -229,6 +281,25 @@ class TestTagCollapsingDecorator(TestCase): ('stopTest', test)], result._events) + def test_tags_sent_before_result(self): + # Because addSuccess and friends tend to send subunit output + # immediately, and because 'tags:' before a result line means + # something different to 'tags:' after a result line, we need to be + # sure that tags are emitted before 'addSuccess' (or whatever). + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TagCollapsingDecorator(result) + test = subunit.RemotedTestCase('foo') + tag_collapser.startTest(test) + tag_collapser.tags(set(['a']), set()) + tag_collapser.addSuccess(test) + tag_collapser.stopTest(test) + self.assertEquals( + [('startTest', test), + ('tags', set(['a']), set()), + ('addSuccess', test), + ('stopTest', test)], + result._events) + class TestTimeCollapsingDecorator(TestCase): @@ -294,6 +365,201 @@ class TestTimeCollapsingDecorator(TestCase): ('stopTest', foo)], result._events) +class TestByTestResultTests(testtools.TestCase): + + def setUp(self): + super(TestByTestResultTests, self).setUp() + self.log = [] + self.result = subunit.test_results.TestByTestResult(self.on_test) + if sys.version_info >= (3, 0): + self.result._now = iter(range(5)).__next__ + else: + self.result._now = iter(range(5)).next + + def assertCalled(self, **kwargs): + defaults = { + 'test': self, + 'tags': set(), + 'details': None, + 'start_time': 0, + 'stop_time': 1, + } + defaults.update(kwargs) + self.assertEqual([defaults], self.log) + + def on_test(self, **kwargs): + self.log.append(kwargs) + + def test_no_tests_nothing_reported(self): + self.result.startTestRun() + self.result.stopTestRun() + self.assertEqual([], self.log) + + def test_add_success(self): + self.result.startTest(self) + self.result.addSuccess(self) + self.result.stopTest(self) + self.assertCalled(status='success') + + def test_add_success_details(self): + self.result.startTest(self) + details = {'foo': 'bar'} + self.result.addSuccess(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='success', details=details) + + def test_tags(self): + if not getattr(self.result, 'tags', None): + self.skipTest("No tags in testtools") + self.result.tags(['foo'], []) + self.result.startTest(self) + self.result.addSuccess(self) + self.result.stopTest(self) + self.assertCalled(status='success', tags=set(['foo'])) + + def test_add_error(self): + self.result.startTest(self) + try: + 1/0 + except ZeroDivisionError: + error = sys.exc_info() + self.result.addError(self, error) + self.result.stopTest(self) + self.assertCalled( + status='error', + details={'traceback': TracebackContent(error, self)}) + + def test_add_error_details(self): + self.result.startTest(self) + details = {"foo": text_content("bar")} + self.result.addError(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='error', details=details) + + def test_add_failure(self): + self.result.startTest(self) + try: + self.fail("intentional failure") + except self.failureException: + failure = sys.exc_info() + self.result.addFailure(self, failure) + self.result.stopTest(self) + self.assertCalled( + status='failure', + details={'traceback': TracebackContent(failure, self)}) + + def test_add_failure_details(self): + self.result.startTest(self) + details = {"foo": text_content("bar")} + self.result.addFailure(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='failure', details=details) + + def test_add_xfail(self): + self.result.startTest(self) + try: + 1/0 + except ZeroDivisionError: + error = sys.exc_info() + self.result.addExpectedFailure(self, error) + self.result.stopTest(self) + self.assertCalled( + status='xfail', + details={'traceback': TracebackContent(error, self)}) + + def test_add_xfail_details(self): + self.result.startTest(self) + details = {"foo": text_content("bar")} + self.result.addExpectedFailure(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='xfail', details=details) + + def test_add_unexpected_success(self): + self.result.startTest(self) + details = {'foo': 'bar'} + self.result.addUnexpectedSuccess(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='success', details=details) + + def test_add_skip_reason(self): + self.result.startTest(self) + reason = self.getUniqueString() + self.result.addSkip(self, reason) + self.result.stopTest(self) + self.assertCalled( + status='skip', details={'reason': text_content(reason)}) + + def test_add_skip_details(self): + self.result.startTest(self) + details = {'foo': 'bar'} + self.result.addSkip(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='skip', details=details) + + def test_twice(self): + self.result.startTest(self) + self.result.addSuccess(self, details={'foo': 'bar'}) + self.result.stopTest(self) + self.result.startTest(self) + self.result.addSuccess(self) + self.result.stopTest(self) + self.assertEqual( + [{'test': self, + 'status': 'success', + 'start_time': 0, + 'stop_time': 1, + 'tags': set(), + 'details': {'foo': 'bar'}}, + {'test': self, + 'status': 'success', + 'start_time': 2, + 'stop_time': 3, + 'tags': set(), + 'details': None}, + ], + self.log) + + +class TestCsvResult(testtools.TestCase): + + def parse_stream(self, stream): + stream.seek(0) + reader = csv.reader(stream) + return list(reader) + + def test_csv_output(self): + stream = StringIO() + result = subunit.test_results.CsvResult(stream) + if sys.version_info >= (3, 0): + result._now = iter(range(5)).__next__ + else: + result._now = iter(range(5)).next + result.startTestRun() + result.startTest(self) + result.addSuccess(self) + result.stopTest(self) + result.stopTestRun() + self.assertEqual( + [['test', 'status', 'start_time', 'stop_time'], + [self.id(), 'success', '0', '1'], + ], + self.parse_stream(stream)) + + def test_just_header_when_no_tests(self): + stream = StringIO() + result = subunit.test_results.CsvResult(stream) + result.startTestRun() + result.stopTestRun() + self.assertEqual( + [['test', 'status', 'start_time', 'stop_time']], + self.parse_stream(stream)) + + def test_no_output_before_events(self): + stream = StringIO() + subunit.test_results.CsvResult(stream) + self.assertEqual([], self.parse_stream(stream)) + + def test_suite(): loader = subunit.tests.TestUtil.TestLoader() result = loader.loadTestsFromName(__name__) |