From bd01a8e79faa3d657f01529c063cd0e09d711880 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Sat, 27 Aug 2011 16:07:38 +0200 Subject: subunit: Update to latest upstream snapshot. --- lib/subunit/python/subunit/test_results.py | 225 ++++++++++++++++++++++------- 1 file changed, 169 insertions(+), 56 deletions(-) (limited to 'lib/subunit/python/subunit/test_results.py') diff --git a/lib/subunit/python/subunit/test_results.py b/lib/subunit/python/subunit/test_results.py index 1c91daadc6..33fb50e073 100644 --- a/lib/subunit/python/subunit/test_results.py +++ b/lib/subunit/python/subunit/test_results.py @@ -18,9 +18,10 @@ import datetime -import iso8601 import testtools +from subunit import iso8601 + # NOT a TestResult, because we are implementing the interface, not inheriting # it. @@ -81,8 +82,12 @@ class TestResultDecorator(object): def stop(self): return self.decorated.stop() + @property + def testsRun(self): + return self.decorated.testsRun + def tags(self, new_tags, gone_tags): - return self.decorated.time(new_tags, gone_tags) + return self.decorated.tags(new_tags, gone_tags) def time(self, a_datetime): return self.decorated.time(a_datetime) @@ -195,6 +200,87 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator): return self.decorated.time(a_datetime) +class TagCollapsingDecorator(TestResultDecorator): + """Collapses many 'tags' calls into one where possible.""" + + def __init__(self, result): + super(TagCollapsingDecorator, self).__init__(result) + # The (new, gone) tags for the current test. + self._current_test_tags = None + + def startTest(self, test): + """Start a test. + + Not directly passed to the client, but used for handling of tags + correctly. + """ + self.decorated.startTest(test) + self._current_test_tags = set(), set() + + def stopTest(self, test): + """Stop a test. + + Not directly passed to the client, but used for handling of tags + correctly. + """ + # Tags to output for this test. + if self._current_test_tags[0] or self._current_test_tags[1]: + self.decorated.tags(*self._current_test_tags) + self.decorated.stopTest(test) + self._current_test_tags = None + + def tags(self, new_tags, gone_tags): + """Handle tag instructions. + + Adds and removes tags as appropriate. If a test is currently running, + tags are not affected for subsequent tests. + + :param new_tags: Tags to add, + :param gone_tags: Tags to remove. + """ + if self._current_test_tags is not None: + # gather the tags until the test stops. + self._current_test_tags[0].update(new_tags) + self._current_test_tags[0].difference_update(gone_tags) + self._current_test_tags[1].update(gone_tags) + self._current_test_tags[1].difference_update(new_tags) + else: + return self.decorated.tags(new_tags, gone_tags) + + +class TimeCollapsingDecorator(HookedTestResultDecorator): + """Only pass on the first and last of a consecutive sequence of times.""" + + def __init__(self, decorated): + super(TimeCollapsingDecorator, self).__init__(decorated) + self._last_received_time = None + self._last_sent_time = None + + def _before_event(self): + if self._last_received_time is None: + return + if self._last_received_time != self._last_sent_time: + self.decorated.time(self._last_received_time) + self._last_sent_time = self._last_received_time + self._last_received_time = None + + def time(self, a_time): + # Don't upcall, because we don't want to call _before_event, it's only + # for non-time events. + if self._last_received_time is None: + self.decorated.time(a_time) + self._last_sent_time = a_time + self._last_received_time = a_time + + +def all_true(bools): + """Return True if all of 'bools' are True. False otherwise.""" + for b in bools: + if not b: + return False + return True + + class TestResultFilter(TestResultDecorator): """A pyunit TestResult interface implementation which filters tests. @@ -208,82 +294,110 @@ class TestResultFilter(TestResultDecorator): """ def __init__(self, result, filter_error=False, filter_failure=False, - filter_success=True, filter_skip=False, - filter_predicate=None): + filter_success=True, filter_skip=False, filter_xfail=False, + filter_predicate=None, fixup_expected_failures=None): """Create a FilterResult object filtering to result. :param filter_error: Filter out errors. :param filter_failure: Filter out failures. :param filter_success: Filter out successful tests. :param filter_skip: Filter out skipped tests. + :param filter_xfail: Filter out expected failure tests. :param filter_predicate: A callable taking (test, outcome, err, details) and returning True if the result should be passed through. err and details may be none if no error or extra metadata is available. outcome is the name of the outcome such as 'success' or 'failure'. + :param fixup_expected_failures: Set of test ids to consider known + failing. """ - TestResultDecorator.__init__(self, result) - self._filter_error = filter_error - self._filter_failure = filter_failure - self._filter_success = filter_success - self._filter_skip = filter_skip - if filter_predicate is None: - filter_predicate = lambda test, outcome, err, details: True - self.filter_predicate = filter_predicate + super(TestResultFilter, self).__init__(result) + self.decorated = TimeCollapsingDecorator( + TagCollapsingDecorator(self.decorated)) + predicates = [] + if filter_error: + predicates.append(lambda t, outcome, e, d: outcome != 'error') + if filter_failure: + predicates.append(lambda t, outcome, e, d: outcome != 'failure') + if filter_success: + predicates.append(lambda t, outcome, e, d: outcome != 'success') + if filter_skip: + predicates.append(lambda t, outcome, e, d: outcome != 'skip') + if filter_xfail: + predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure') + if filter_predicate is not None: + predicates.append(filter_predicate) + self.filter_predicate = ( + lambda test, outcome, err, details: + all_true(p(test, outcome, err, details) for p in predicates)) # The current test (for filtering tags) self._current_test = None # Has the current test been filtered (for outputting test tags) self._current_test_filtered = None - # The (new, gone) tags for the current test. - self._current_test_tags = None + # Calls to this result that we don't know whether to forward on yet. + self._buffered_calls = [] + if fixup_expected_failures is None: + self._fixup_expected_failures = frozenset() + else: + self._fixup_expected_failures = fixup_expected_failures def addError(self, test, err=None, details=None): - if (not self._filter_error and - self.filter_predicate(test, 'error', err, details)): - self.decorated.startTest(test) - self.decorated.addError(test, err, details=details) + if (self.filter_predicate(test, 'error', err, details)): + if self._failure_expected(test): + self._buffered_calls.append( + ('addExpectedFailure', [test, err], {'details': details})) + else: + self._buffered_calls.append( + ('addError', [test, err], {'details': details})) else: self._filtered() def addFailure(self, test, err=None, details=None): - if (not self._filter_failure and - self.filter_predicate(test, 'failure', err, details)): - self.decorated.startTest(test) - self.decorated.addFailure(test, err, details=details) + if (self.filter_predicate(test, 'failure', err, details)): + if self._failure_expected(test): + self._buffered_calls.append( + ('addExpectedFailure', [test, err], {'details': details})) + else: + self._buffered_calls.append( + ('addFailure', [test, err], {'details': details})) else: self._filtered() def addSkip(self, test, reason=None, details=None): - if (not self._filter_skip and - self.filter_predicate(test, 'skip', reason, details)): - self.decorated.startTest(test) - self.decorated.addSkip(test, reason, details=details) + if (self.filter_predicate(test, 'skip', reason, details)): + self._buffered_calls.append( + ('addSkip', [test, reason], {'details': details})) else: self._filtered() def addSuccess(self, test, details=None): - if (not self._filter_success and - self.filter_predicate(test, 'success', None, details)): - self.decorated.startTest(test) - self.decorated.addSuccess(test, details=details) + if (self.filter_predicate(test, 'success', None, details)): + if self._failure_expected(test): + self._buffered_calls.append( + ('addUnexpectedSuccess', [test], {'details': details})) + else: + self._buffered_calls.append( + ('addSuccess', [test], {'details': details})) else: self._filtered() def addExpectedFailure(self, test, err=None, details=None): if self.filter_predicate(test, 'expectedfailure', err, details): - self.decorated.startTest(test) - return self.decorated.addExpectedFailure(test, err, - details=details) + self._buffered_calls.append( + ('addExpectedFailure', [test, err], {'details': details})) else: self._filtered() def addUnexpectedSuccess(self, test, details=None): - self.decorated.startTest(test) - return self.decorated.addUnexpectedSuccess(test, details=details) + self._buffered_calls.append( + ('addUnexpectedSuccess', [test], {'details': details})) def _filtered(self): self._current_test_filtered = True + def _failure_expected(self, test): + return (test.id() in self._fixup_expected_failures) + def startTest(self, test): """Start a test. @@ -292,7 +406,7 @@ class TestResultFilter(TestResultDecorator): """ self._current_test = test self._current_test_filtered = False - self._current_test_tags = set(), set() + self._buffered_calls.append(('startTest', [test], {})) def stopTest(self, test): """Stop a test. @@ -302,29 +416,18 @@ class TestResultFilter(TestResultDecorator): """ if not self._current_test_filtered: # Tags to output for this test. - if self._current_test_tags[0] or self._current_test_tags[1]: - self.decorated.tags(*self._current_test_tags) + for method, args, kwargs in self._buffered_calls: + getattr(self.decorated, method)(*args, **kwargs) self.decorated.stopTest(test) self._current_test = None self._current_test_filtered = None - self._current_test_tags = None + self._buffered_calls = [] - def tags(self, new_tags, gone_tags): - """Handle tag instructions. - - Adds and removes tags as appropriate. If a test is currently running, - tags are not affected for subsequent tests. - - :param new_tags: Tags to add, - :param gone_tags: Tags to remove. - """ + def time(self, a_time): if self._current_test is not None: - # gather the tags until the test stops. - self._current_test_tags[0].update(new_tags) - self._current_test_tags[0].difference_update(gone_tags) - self._current_test_tags[1].update(gone_tags) - self._current_test_tags[1].difference_update(new_tags) - return self.decorated.tags(new_tags, gone_tags) + self._buffered_calls.append(('time', [a_time], {})) + else: + return self.decorated.time(a_time) def id_to_orig_id(self, id): if id.startswith("subunit.RemotedTestCase."): @@ -336,10 +439,10 @@ class TestIdPrintingResult(testtools.TestResult): def __init__(self, stream, show_times=False): """Create a FilterResult object outputting to stream.""" - testtools.TestResult.__init__(self) + super(TestIdPrintingResult, self).__init__() self._stream = stream self.failed_tests = 0 - self.__time = 0 + self.__time = None self.show_times = show_times self._test = None self._test_duration = 0 @@ -355,6 +458,16 @@ class TestIdPrintingResult(testtools.TestResult): def addSuccess(self, test): self._test = test + def addSkip(self, test, reason=None, details=None): + self._test = test + + def addUnexpectedSuccess(self, test, details=None): + self.failed_tests += 1 + self._test = test + + def addExpectedFailure(self, test, err=None, details=None): + self._test = test + def reportTest(self, test, duration): if self.show_times: seconds = duration.seconds -- cgit