diff options
author | Jelmer Vernooij <jelmer@samba.org> | 2012-11-14 09:47:16 +0100 |
---|---|---|
committer | Jelmer Vernooij <jelmer@samba.org> | 2012-11-14 12:11:57 +0100 |
commit | a53caea7a27c8616cabfc2e5bdf91a90e35891d5 (patch) | |
tree | 5e1ad6e0387c076cc41d14213efbe4457d861690 /lib/subunit/python/subunit/test_results.py | |
parent | 7b654a8c180a6467147189332916a5e56634b5af (diff) | |
download | samba-a53caea7a27c8616cabfc2e5bdf91a90e35891d5.tar.gz samba-a53caea7a27c8616cabfc2e5bdf91a90e35891d5.tar.bz2 samba-a53caea7a27c8616cabfc2e5bdf91a90e35891d5.zip |
subunit: Update to latest upstream version.
Autobuild-User(master): Jelmer Vernooij <jelmer@samba.org>
Autobuild-Date(master): Wed Nov 14 12:11:58 CET 2012 on sn-devel-104
Diffstat (limited to 'lib/subunit/python/subunit/test_results.py')
-rw-r--r-- | lib/subunit/python/subunit/test_results.py | 404 |
1 files changed, 290 insertions, 114 deletions
diff --git a/lib/subunit/python/subunit/test_results.py b/lib/subunit/python/subunit/test_results.py index 33fb50e073..c00a2d3e97 100644 --- a/lib/subunit/python/subunit/test_results.py +++ b/lib/subunit/python/subunit/test_results.py @@ -16,9 +16,15 @@ """TestResult helper classes used to by subunit.""" +import csv import datetime import testtools +from testtools.compat import all +from testtools.content import ( + text_content, + TracebackContent, + ) from subunit import iso8601 @@ -34,6 +40,9 @@ class TestResultDecorator(object): or features by degrading them. """ + # XXX: Since lp:testtools r250, this is in testtools. Once it's released, + # we should gut this and just use that. + def __init__(self, decorated): """Create a TestResultDecorator forwarding to decorated.""" # Make every decorator degrade gracefully. @@ -200,34 +209,44 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator): return self.decorated.time(a_datetime) -class TagCollapsingDecorator(TestResultDecorator): - """Collapses many 'tags' calls into one where possible.""" +class TagsMixin(object): - def __init__(self, result): - super(TagCollapsingDecorator, self).__init__(result) - # The (new, gone) tags for the current test. - self._current_test_tags = None + def __init__(self): + self._clear_tags() - def startTest(self, test): - """Start a test. + def _clear_tags(self): + self._global_tags = set(), set() + self._test_tags = None - Not directly passed to the client, but used for handling of tags - correctly. - """ - self.decorated.startTest(test) - self._current_test_tags = set(), set() + def _get_active_tags(self): + global_new, global_gone = self._global_tags + if self._test_tags is None: + return set(global_new) + test_new, test_gone = self._test_tags + return global_new.difference(test_gone).union(test_new) - def stopTest(self, test): - """Stop a test. + def _get_current_scope(self): + if self._test_tags: + return self._test_tags + return self._global_tags - Not directly passed to the client, but used for handling of tags - correctly. - """ - # Tags to output for this test. - if self._current_test_tags[0] or self._current_test_tags[1]: - self.decorated.tags(*self._current_test_tags) - self.decorated.stopTest(test) - self._current_test_tags = None + def _flush_current_scope(self, tag_receiver): + new_tags, gone_tags = self._get_current_scope() + if new_tags or gone_tags: + tag_receiver.tags(new_tags, gone_tags) + if self._test_tags: + self._test_tags = set(), set() + else: + self._global_tags = set(), set() + + def startTestRun(self): + self._clear_tags() + + def startTest(self, test): + self._test_tags = set(), set() + + def stopTest(self, test): + self._test_tags = None def tags(self, new_tags, gone_tags): """Handle tag instructions. @@ -238,14 +257,25 @@ class TagCollapsingDecorator(TestResultDecorator): :param new_tags: Tags to add, :param gone_tags: Tags to remove. """ - if self._current_test_tags is not None: - # gather the tags until the test stops. - self._current_test_tags[0].update(new_tags) - self._current_test_tags[0].difference_update(gone_tags) - self._current_test_tags[1].update(gone_tags) - self._current_test_tags[1].difference_update(new_tags) - else: - return self.decorated.tags(new_tags, gone_tags) + current_new_tags, current_gone_tags = self._get_current_scope() + current_new_tags.update(new_tags) + current_new_tags.difference_update(gone_tags) + current_gone_tags.update(gone_tags) + current_gone_tags.difference_update(new_tags) + + +class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin): + """Collapses many 'tags' calls into one where possible.""" + + def __init__(self, result): + super(TagCollapsingDecorator, self).__init__(result) + self._clear_tags() + + def _before_event(self): + self._flush_current_scope(self.decorated) + + def tags(self, new_tags, gone_tags): + TagsMixin.tags(self, new_tags, gone_tags) class TimeCollapsingDecorator(HookedTestResultDecorator): @@ -273,93 +303,58 @@ class TimeCollapsingDecorator(HookedTestResultDecorator): self._last_received_time = a_time -def all_true(bools): - """Return True if all of 'bools' are True. False otherwise.""" - for b in bools: - if not b: - return False - return True +def and_predicates(predicates): + """Return a predicate that is true iff all predicates are true.""" + # XXX: Should probably be in testtools to be better used by matchers. jml + return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates) -class TestResultFilter(TestResultDecorator): - """A pyunit TestResult interface implementation which filters tests. +def make_tag_filter(with_tags, without_tags): + """Make a callback that checks tests against tags.""" - Tests that pass the filter are handed on to another TestResult instance - for further processing/reporting. To obtain the filtered results, - the other instance must be interrogated. + with_tags = with_tags and set(with_tags) or None + without_tags = without_tags and set(without_tags) or None - :ivar result: The result that tests are passed to after filtering. - :ivar filter_predicate: The callback run to decide whether to pass - a result. - """ + def check_tags(test, outcome, err, details, tags): + if with_tags and not with_tags <= tags: + return False + if without_tags and bool(without_tags & tags): + return False + return True - def __init__(self, result, filter_error=False, filter_failure=False, - filter_success=True, filter_skip=False, filter_xfail=False, - filter_predicate=None, fixup_expected_failures=None): - """Create a FilterResult object filtering to result. + return check_tags - :param filter_error: Filter out errors. - :param filter_failure: Filter out failures. - :param filter_success: Filter out successful tests. - :param filter_skip: Filter out skipped tests. - :param filter_xfail: Filter out expected failure tests. - :param filter_predicate: A callable taking (test, outcome, err, - details) and returning True if the result should be passed - through. err and details may be none if no error or extra - metadata is available. outcome is the name of the outcome such - as 'success' or 'failure'. - :param fixup_expected_failures: Set of test ids to consider known - failing. - """ - super(TestResultFilter, self).__init__(result) + +class _PredicateFilter(TestResultDecorator, TagsMixin): + + def __init__(self, result, predicate): + super(_PredicateFilter, self).__init__(result) + self._clear_tags() self.decorated = TimeCollapsingDecorator( TagCollapsingDecorator(self.decorated)) - predicates = [] - if filter_error: - predicates.append(lambda t, outcome, e, d: outcome != 'error') - if filter_failure: - predicates.append(lambda t, outcome, e, d: outcome != 'failure') - if filter_success: - predicates.append(lambda t, outcome, e, d: outcome != 'success') - if filter_skip: - predicates.append(lambda t, outcome, e, d: outcome != 'skip') - if filter_xfail: - predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure') - if filter_predicate is not None: - predicates.append(filter_predicate) - self.filter_predicate = ( - lambda test, outcome, err, details: - all_true(p(test, outcome, err, details) for p in predicates)) + self._predicate = predicate # The current test (for filtering tags) self._current_test = None # Has the current test been filtered (for outputting test tags) self._current_test_filtered = None # Calls to this result that we don't know whether to forward on yet. self._buffered_calls = [] - if fixup_expected_failures is None: - self._fixup_expected_failures = frozenset() - else: - self._fixup_expected_failures = fixup_expected_failures + + def filter_predicate(self, test, outcome, error, details): + return self._predicate( + test, outcome, error, details, self._get_active_tags()) def addError(self, test, err=None, details=None): if (self.filter_predicate(test, 'error', err, details)): - if self._failure_expected(test): - self._buffered_calls.append( - ('addExpectedFailure', [test, err], {'details': details})) - else: - self._buffered_calls.append( - ('addError', [test, err], {'details': details})) + self._buffered_calls.append( + ('addError', [test, err], {'details': details})) else: self._filtered() def addFailure(self, test, err=None, details=None): if (self.filter_predicate(test, 'failure', err, details)): - if self._failure_expected(test): - self._buffered_calls.append( - ('addExpectedFailure', [test, err], {'details': details})) - else: - self._buffered_calls.append( - ('addFailure', [test, err], {'details': details})) + self._buffered_calls.append( + ('addFailure', [test, err], {'details': details})) else: self._filtered() @@ -370,17 +365,6 @@ class TestResultFilter(TestResultDecorator): else: self._filtered() - def addSuccess(self, test, details=None): - if (self.filter_predicate(test, 'success', None, details)): - if self._failure_expected(test): - self._buffered_calls.append( - ('addUnexpectedSuccess', [test], {'details': details})) - else: - self._buffered_calls.append( - ('addSuccess', [test], {'details': details})) - else: - self._filtered() - def addExpectedFailure(self, test, err=None, details=None): if self.filter_predicate(test, 'expectedfailure', err, details): self._buffered_calls.append( @@ -392,18 +376,23 @@ class TestResultFilter(TestResultDecorator): self._buffered_calls.append( ('addUnexpectedSuccess', [test], {'details': details})) + def addSuccess(self, test, details=None): + if (self.filter_predicate(test, 'success', None, details)): + self._buffered_calls.append( + ('addSuccess', [test], {'details': details})) + else: + self._filtered() + def _filtered(self): self._current_test_filtered = True - def _failure_expected(self, test): - return (test.id() in self._fixup_expected_failures) - def startTest(self, test): """Start a test. Not directly passed to the client, but used for handling of tags correctly. """ + TagsMixin.startTest(self, test) self._current_test = test self._current_test_filtered = False self._buffered_calls.append(('startTest', [test], {})) @@ -415,19 +404,23 @@ class TestResultFilter(TestResultDecorator): correctly. """ if not self._current_test_filtered: - # Tags to output for this test. for method, args, kwargs in self._buffered_calls: getattr(self.decorated, method)(*args, **kwargs) self.decorated.stopTest(test) self._current_test = None self._current_test_filtered = None self._buffered_calls = [] + TagsMixin.stopTest(self, test) - def time(self, a_time): + def tags(self, new_tags, gone_tags): + TagsMixin.tags(self, new_tags, gone_tags) if self._current_test is not None: - self._buffered_calls.append(('time', [a_time], {})) + self._buffered_calls.append(('tags', [new_tags, gone_tags], {})) else: - return self.decorated.time(a_time) + return super(_PredicateFilter, self).tags(new_tags, gone_tags) + + def time(self, a_time): + return self.decorated.time(a_time) def id_to_orig_id(self, id): if id.startswith("subunit.RemotedTestCase."): @@ -435,6 +428,95 @@ class TestResultFilter(TestResultDecorator): return id +class TestResultFilter(TestResultDecorator): + """A pyunit TestResult interface implementation which filters tests. + + Tests that pass the filter are handed on to another TestResult instance + for further processing/reporting. To obtain the filtered results, + the other instance must be interrogated. + + :ivar result: The result that tests are passed to after filtering. + :ivar filter_predicate: The callback run to decide whether to pass + a result. + """ + + def __init__(self, result, filter_error=False, filter_failure=False, + filter_success=True, filter_skip=False, filter_xfail=False, + filter_predicate=None, fixup_expected_failures=None): + """Create a FilterResult object filtering to result. + + :param filter_error: Filter out errors. + :param filter_failure: Filter out failures. + :param filter_success: Filter out successful tests. + :param filter_skip: Filter out skipped tests. + :param filter_xfail: Filter out expected failure tests. + :param filter_predicate: A callable taking (test, outcome, err, + details, tags) and returning True if the result should be passed + through. err and details may be none if no error or extra + metadata is available. outcome is the name of the outcome such + as 'success' or 'failure'. tags is new in 0.0.8; 0.0.7 filters + are still supported but should be updated to accept the tags + parameter for efficiency. + :param fixup_expected_failures: Set of test ids to consider known + failing. + """ + predicates = [] + if filter_error: + predicates.append( + lambda t, outcome, e, d, tags: outcome != 'error') + if filter_failure: + predicates.append( + lambda t, outcome, e, d, tags: outcome != 'failure') + if filter_success: + predicates.append( + lambda t, outcome, e, d, tags: outcome != 'success') + if filter_skip: + predicates.append( + lambda t, outcome, e, d, tags: outcome != 'skip') + if filter_xfail: + predicates.append( + lambda t, outcome, e, d, tags: outcome != 'expectedfailure') + if filter_predicate is not None: + def compat(test, outcome, error, details, tags): + # 0.0.7 and earlier did not support the 'tags' parameter. + try: + return filter_predicate( + test, outcome, error, details, tags) + except TypeError: + return filter_predicate(test, outcome, error, details) + predicates.append(compat) + predicate = and_predicates(predicates) + super(TestResultFilter, self).__init__( + _PredicateFilter(result, predicate)) + if fixup_expected_failures is None: + self._fixup_expected_failures = frozenset() + else: + self._fixup_expected_failures = fixup_expected_failures + + def addError(self, test, err=None, details=None): + if self._failure_expected(test): + self.addExpectedFailure(test, err=err, details=details) + else: + super(TestResultFilter, self).addError( + test, err=err, details=details) + + def addFailure(self, test, err=None, details=None): + if self._failure_expected(test): + self.addExpectedFailure(test, err=err, details=details) + else: + super(TestResultFilter, self).addFailure( + test, err=err, details=details) + + def addSuccess(self, test, details=None): + if self._failure_expected(test): + self.addUnexpectedSuccess(test, details=details) + else: + super(TestResultFilter, self).addSuccess(test, details=details) + + def _failure_expected(self, test): + return (test.id() in self._fixup_expected_failures) + + class TestIdPrintingResult(testtools.TestResult): def __init__(self, stream, show_times=False): @@ -493,3 +575,97 @@ class TestIdPrintingResult(testtools.TestResult): def wasSuccessful(self): "Tells whether or not this result was a success" return self.failed_tests == 0 + + +class TestByTestResult(testtools.TestResult): + """Call something every time a test completes.""" + +# XXX: In testtools since lp:testtools r249. Once that's released, just +# import that. + + def __init__(self, on_test): + """Construct a ``TestByTestResult``. + + :param on_test: A callable that take a test case, a status (one of + "success", "failure", "error", "skip", or "xfail"), a start time + (a ``datetime`` with timezone), a stop time, an iterable of tags, + and a details dict. Is called at the end of each test (i.e. on + ``stopTest``) with the accumulated values for that test. + """ + super(TestByTestResult, self).__init__() + self._on_test = on_test + + def startTest(self, test): + super(TestByTestResult, self).startTest(test) + self._start_time = self._now() + # There's no supported (i.e. tested) behaviour that relies on these + # being set, but it makes me more comfortable all the same. -- jml + self._status = None + self._details = None + self._stop_time = None + + def stopTest(self, test): + self._stop_time = self._now() + super(TestByTestResult, self).stopTest(test) + self._on_test( + test=test, + status=self._status, + start_time=self._start_time, + stop_time=self._stop_time, + # current_tags is new in testtools 0.9.13. + tags=getattr(self, 'current_tags', None), + details=self._details) + + def _err_to_details(self, test, err, details): + if details: + return details + return {'traceback': TracebackContent(err, test)} + + def addSuccess(self, test, details=None): + super(TestByTestResult, self).addSuccess(test) + self._status = 'success' + self._details = details + + def addFailure(self, test, err=None, details=None): + super(TestByTestResult, self).addFailure(test, err, details) + self._status = 'failure' + self._details = self._err_to_details(test, err, details) + + def addError(self, test, err=None, details=None): + super(TestByTestResult, self).addError(test, err, details) + self._status = 'error' + self._details = self._err_to_details(test, err, details) + + def addSkip(self, test, reason=None, details=None): + super(TestByTestResult, self).addSkip(test, reason, details) + self._status = 'skip' + if details is None: + details = {'reason': text_content(reason)} + elif reason: + # XXX: What if details already has 'reason' key? + details['reason'] = text_content(reason) + self._details = details + + def addExpectedFailure(self, test, err=None, details=None): + super(TestByTestResult, self).addExpectedFailure(test, err, details) + self._status = 'xfail' + self._details = self._err_to_details(test, err, details) + + def addUnexpectedSuccess(self, test, details=None): + super(TestByTestResult, self).addUnexpectedSuccess(test, details) + self._status = 'success' + self._details = details + + +class CsvResult(TestByTestResult): + + def __init__(self, stream): + super(CsvResult, self).__init__(self._on_test) + self._write_row = csv.writer(stream).writerow + + def _on_test(self, test, status, start_time, stop_time, tags, details): + self._write_row([test.id(), status, start_time, stop_time]) + + def startTestRun(self): + super(CsvResult, self).startTestRun() + self._write_row(['test', 'status', 'start_time', 'stop_time']) |