diff options
Diffstat (limited to 'lib/subunit/python/subunit/tests')
-rwxr-xr-x | lib/subunit/python/subunit/tests/sample-script.py | 20 | ||||
-rwxr-xr-x | lib/subunit/python/subunit/tests/sample-two-script.py | 8 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_run.py | 4 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_subunit_filter.py | 170 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_test_protocol.py | 38 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_test_results.py | 272 |
6 files changed, 479 insertions, 33 deletions
diff --git a/lib/subunit/python/subunit/tests/sample-script.py b/lib/subunit/python/subunit/tests/sample-script.py index 618e4952d7..91838f6d6f 100755 --- a/lib/subunit/python/subunit/tests/sample-script.py +++ b/lib/subunit/python/subunit/tests/sample-script.py @@ -7,15 +7,15 @@ if len(sys.argv) == 2: # subunit.tests.test_test_protocol.TestExecTestCase.test_sample_method_args # uses this code path to be sure that the arguments were passed to # sample-script.py - print "test fail" - print "error fail" + print("test fail") + print("error fail") sys.exit(0) -print "test old mcdonald" -print "success old mcdonald" -print "test bing crosby" -print "failure bing crosby [" -print "foo.c:53:ERROR invalid state" -print "]" -print "test an error" -print "error an error" +print("test old mcdonald") +print("success old mcdonald") +print("test bing crosby") +print("failure bing crosby [") +print("foo.c:53:ERROR invalid state") +print("]") +print("test an error") +print("error an error") sys.exit(0) diff --git a/lib/subunit/python/subunit/tests/sample-two-script.py b/lib/subunit/python/subunit/tests/sample-two-script.py index d5550842bf..fc73dfc409 100755 --- a/lib/subunit/python/subunit/tests/sample-two-script.py +++ b/lib/subunit/python/subunit/tests/sample-two-script.py @@ -1,7 +1,7 @@ #!/usr/bin/env python import sys -print "test old mcdonald" -print "success old mcdonald" -print "test bing crosby" -print "success bing crosby" +print("test old mcdonald") +print("success old mcdonald") +print("test bing crosby") +print("success bing crosby") sys.exit(0) diff --git a/lib/subunit/python/subunit/tests/test_run.py b/lib/subunit/python/subunit/tests/test_run.py index 5a96bcf30e..10519ed086 100644 --- a/lib/subunit/python/subunit/tests/test_run.py +++ b/lib/subunit/python/subunit/tests/test_run.py @@ -14,7 +14,7 @@ # limitations under that license. # -from cStringIO import StringIO +from testtools.compat import BytesIO import unittest from testtools import PlaceHolder @@ -42,7 +42,7 @@ class TimeCollectingTestResult(unittest.TestResult): class TestSubunitTestRunner(unittest.TestCase): def test_includes_timing_output(self): - io = StringIO() + io = BytesIO() runner = SubunitTestRunner(stream=io) test = PlaceHolder('name') runner.run(test) diff --git a/lib/subunit/python/subunit/tests/test_subunit_filter.py b/lib/subunit/python/subunit/tests/test_subunit_filter.py index 06754840eb..33b924824d 100644 --- a/lib/subunit/python/subunit/tests/test_subunit_filter.py +++ b/lib/subunit/python/subunit/tests/test_subunit_filter.py @@ -17,15 +17,18 @@ """Tests for subunit.TestResultFilter.""" from datetime import datetime +import os +import subprocess +import sys from subunit import iso8601 import unittest from testtools import TestCase -from testtools.compat import _b, BytesIO, StringIO +from testtools.compat import _b, BytesIO from testtools.testresult.doubles import ExtendedTestResult import subunit -from subunit.test_results import TestResultFilter +from subunit.test_results import make_tag_filter, TestResultFilter class TestTestResultFilter(TestCase): @@ -77,6 +80,40 @@ xfail todo filtered_result.failures]) self.assertEqual(4, filtered_result.testsRun) + def test_tag_filter(self): + tag_filter = make_tag_filter(['global'], ['local']) + result = ExtendedTestResult() + result_filter = TestResultFilter( + result, filter_success=False, filter_predicate=tag_filter) + self.run_tests(result_filter) + tests_included = [ + event[1] for event in result._events if event[0] == 'startTest'] + tests_expected = list(map( + subunit.RemotedTestCase, + ['passed', 'error', 'skipped', 'todo'])) + self.assertEquals(tests_expected, tests_included) + + def test_tags_tracked_correctly(self): + tag_filter = make_tag_filter(['a'], []) + result = ExtendedTestResult() + result_filter = TestResultFilter( + result, filter_success=False, filter_predicate=tag_filter) + input_stream = _b( + "test: foo\n" + "tags: a\n" + "successful: foo\n" + "test: bar\n" + "successful: bar\n") + self.run_tests(result_filter, input_stream) + foo = subunit.RemotedTestCase('foo') + self.assertEquals( + [('startTest', foo), + ('tags', set(['a']), set()), + ('addSuccess', foo), + ('stopTest', foo), + ], + result._events) + def test_exclude_errors(self): filtered_result = unittest.TestResult() result_filter = TestResultFilter(filtered_result, filter_error=True) @@ -151,6 +188,8 @@ xfail todo def test_filter_predicate(self): """You can filter by predicate callbacks""" + # 0.0.7 and earlier did not support the 'tags' parameter, so we need + # to test that we still support behaviour without it. filtered_result = unittest.TestResult() def filter_cb(test, outcome, err, details): return outcome == 'success' @@ -161,6 +200,18 @@ xfail todo # Only success should pass self.assertEqual(1, filtered_result.testsRun) + def test_filter_predicate_with_tags(self): + """You can filter by predicate callbacks that accept tags""" + filtered_result = unittest.TestResult() + def filter_cb(test, outcome, err, details, tags): + return outcome == 'success' + result_filter = TestResultFilter(filtered_result, + filter_predicate=filter_cb, + filter_success=False) + self.run_tests(result_filter) + # Only success should pass + self.assertEqual(1, filtered_result.testsRun) + def test_time_ordering_preserved(self): # Passing a subunit stream through TestResultFilter preserves the # relative ordering of 'time' directives and any other subunit @@ -179,14 +230,41 @@ xfail todo result_filter = TestResultFilter(result) self.run_tests(result_filter, subunit_stream) foo = subunit.RemotedTestCase('foo') - self.assertEquals( + self.maxDiff = None + self.assertEqual( [('time', date_a), - ('startTest', foo), ('time', date_b), + ('startTest', foo), ('addError', foo, {}), ('stopTest', foo), ('time', date_c)], result._events) + def test_time_passes_through_filtered_tests(self): + # Passing a subunit stream through TestResultFilter preserves 'time' + # directives even if a specific test is filtered out. + date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC) + date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC) + date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC) + subunit_stream = _b('\n'.join([ + "time: %s", + "test: foo", + "time: %s", + "success: foo", + "time: %s", + ""]) % (date_a, date_b, date_c)) + result = ExtendedTestResult() + result_filter = TestResultFilter(result) + result_filter.startTestRun() + self.run_tests(result_filter, subunit_stream) + result_filter.stopTestRun() + foo = subunit.RemotedTestCase('foo') + self.maxDiff = None + self.assertEqual( + [('startTestRun',), + ('time', date_a), + ('time', date_c), + ('stopTestRun',),], result._events) + def test_skip_preserved(self): subunit_stream = _b('\n'.join([ "test: foo", @@ -201,6 +279,90 @@ xfail todo ('addSkip', foo, {}), ('stopTest', foo), ], result._events) + if sys.version_info < (2, 7): + # These tests require Python >=2.7. + del test_fixup_expected_failures, test_fixup_expected_errors, test_fixup_unexpected_success + + +class TestFilterCommand(TestCase): + + example_subunit_stream = _b("""\ +tags: global +test passed +success passed +test failed +tags: local +failure failed +test error +error error [ +error details +] +test skipped +skip skipped +test todo +xfail todo +""") + + def run_command(self, args, stream): + root = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) + script_path = os.path.join(root, 'filters', 'subunit-filter') + command = [sys.executable, script_path] + list(args) + ps = subprocess.Popen( + command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = ps.communicate(stream) + if ps.returncode != 0: + raise RuntimeError("%s failed: %s" % (command, err)) + return out + + def to_events(self, stream): + test = subunit.ProtocolTestCase(BytesIO(stream)) + result = ExtendedTestResult() + test.run(result) + return result._events + + def test_default(self): + output = self.run_command([], _b( + "test: foo\n" + "skip: foo\n" + )) + events = self.to_events(output) + foo = subunit.RemotedTestCase('foo') + self.assertEqual( + [('startTest', foo), + ('addSkip', foo, {}), + ('stopTest', foo)], + events) + + def test_tags(self): + output = self.run_command(['-s', '--with-tag', 'a'], _b( + "tags: a\n" + "test: foo\n" + "success: foo\n" + "tags: -a\n" + "test: bar\n" + "success: bar\n" + "test: baz\n" + "tags: a\n" + "success: baz\n" + )) + events = self.to_events(output) + foo = subunit.RemotedTestCase('foo') + baz = subunit.RemotedTestCase('baz') + self.assertEqual( + [('tags', set(['a']), set()), + ('startTest', foo), + ('addSuccess', foo), + ('stopTest', foo), + ('tags', set(), set(['a'])), + ('startTest', baz), + ('tags', set(['a']), set()), + ('addSuccess', baz), + ('stopTest', baz), + ], + events) + def test_suite(): loader = subunit.tests.TestUtil.TestLoader() diff --git a/lib/subunit/python/subunit/tests/test_test_protocol.py b/lib/subunit/python/subunit/tests/test_test_protocol.py index c93aabd80c..ec6830d03b 100644 --- a/lib/subunit/python/subunit/tests/test_test_protocol.py +++ b/lib/subunit/python/subunit/tests/test_test_protocol.py @@ -18,9 +18,9 @@ import datetime import unittest import os -from testtools import skipIf, TestCase -from testtools.compat import _b, _u, BytesIO, StringIO -from testtools.content import Content, TracebackContent +from testtools import skipIf, TestCase, TestResult +from testtools.compat import _b, _u, BytesIO +from testtools.content import Content, TracebackContent, text_content from testtools.content_type import ContentType try: from testtools.testresult.doubles import ( @@ -40,6 +40,10 @@ from subunit import _remote_exception_str, _remote_exception_str_chunked import subunit.iso8601 as iso8601 +def details_to_str(details): + return TestResult()._err_details_to_string(None, details=details) + + class TestTestImports(unittest.TestCase): def test_imports(self): @@ -87,11 +91,12 @@ class TestTestProtocolServerPipe(unittest.TestCase): def test_story(self): client = unittest.TestResult() protocol = subunit.TestProtocolServer(client) + traceback = "foo.c:53:ERROR invalid state\n" pipe = BytesIO(_b("test old mcdonald\n" "success old mcdonald\n" "test bing crosby\n" "failure bing crosby [\n" - "foo.c:53:ERROR invalid state\n" + + traceback + "]\n" "test an error\n" "error an error\n")) @@ -102,9 +107,8 @@ class TestTestProtocolServerPipe(unittest.TestCase): [(an_error, _remote_exception_str + '\n')]) self.assertEqual( client.failures, - [(bing, _remote_exception_str + ": Text attachment: traceback\n" - "------------\nfoo.c:53:ERROR invalid state\n" - "------------\n\n")]) + [(bing, _remote_exception_str + ": " + + details_to_str({'traceback': text_content(traceback)}) + "\n")]) self.assertEqual(client.testsRun, 3) def test_non_test_characters_forwarded_immediately(self): @@ -559,9 +563,7 @@ class TestTestProtocolServerAddxFail(unittest.TestCase): value = details else: if error_message is not None: - value = subunit.RemoteError(_u("Text attachment: traceback\n" - "------------\n") + _u(error_message) + - _u("------------\n")) + value = subunit.RemoteError(details_to_str(details)) else: value = subunit.RemoteError() self.assertEqual([ @@ -1299,6 +1301,22 @@ class TestTestProtocolClient(unittest.TestCase): "something\n" "F\r\nserialised\nform0\r\n]\n" % self.test.id())) + def test_tags_empty(self): + self.protocol.tags(set(), set()) + self.assertEqual(_b(""), self.io.getvalue()) + + def test_tags_add(self): + self.protocol.tags(set(['foo']), set()) + self.assertEqual(_b("tags: foo\n"), self.io.getvalue()) + + def test_tags_both(self): + self.protocol.tags(set(['quux']), set(['bar'])) + self.assertEqual(_b("tags: quux -bar\n"), self.io.getvalue()) + + def test_tags_gone(self): + self.protocol.tags(set(), set(['bar'])) + self.assertEqual(_b("tags: -bar\n"), self.io.getvalue()) + def test_suite(): loader = subunit.tests.TestUtil.TestLoader() diff --git a/lib/subunit/python/subunit/tests/test_test_results.py b/lib/subunit/python/subunit/tests/test_test_results.py index 94d22748e8..236dfa22e5 100644 --- a/lib/subunit/python/subunit/tests/test_test_results.py +++ b/lib/subunit/python/subunit/tests/test_test_results.py @@ -14,16 +14,25 @@ # limitations under that license. # +import csv import datetime +import sys import unittest from testtools import TestCase +from testtools.compat import StringIO +from testtools.content import ( + text_content, + TracebackContent, + ) from testtools.testresult.doubles import ExtendedTestResult import subunit import subunit.iso8601 as iso8601 import subunit.test_results +import testtools + class LoggingDecorator(subunit.test_results.HookedTestResultDecorator): @@ -192,12 +201,55 @@ class TestAutoTimingTestResultDecorator(unittest.TestCase): class TestTagCollapsingDecorator(TestCase): - def test_tags_forwarded_outside_of_tests(self): + def test_tags_collapsed_outside_of_tests(self): result = ExtendedTestResult() tag_collapser = subunit.test_results.TagCollapsingDecorator(result) - tag_collapser.tags(set(['a', 'b']), set()) + tag_collapser.tags(set(['a']), set()) + tag_collapser.tags(set(['b']), set()) + tag_collapser.startTest(self) self.assertEquals( - [('tags', set(['a', 'b']), set([]))], result._events) + [('tags', set(['a', 'b']), set([])), + ('startTest', self), + ], result._events) + + def test_tags_collapsed_outside_of_tests_are_flushed(self): + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TagCollapsingDecorator(result) + tag_collapser.startTestRun() + tag_collapser.tags(set(['a']), set()) + tag_collapser.tags(set(['b']), set()) + tag_collapser.startTest(self) + tag_collapser.addSuccess(self) + tag_collapser.stopTest(self) + tag_collapser.stopTestRun() + self.assertEquals( + [('startTestRun',), + ('tags', set(['a', 'b']), set([])), + ('startTest', self), + ('addSuccess', self), + ('stopTest', self), + ('stopTestRun',), + ], result._events) + + def test_tags_forwarded_after_tests(self): + test = subunit.RemotedTestCase('foo') + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TagCollapsingDecorator(result) + tag_collapser.startTestRun() + tag_collapser.startTest(test) + tag_collapser.addSuccess(test) + tag_collapser.stopTest(test) + tag_collapser.tags(set(['a']), set(['b'])) + tag_collapser.stopTestRun() + self.assertEqual( + [('startTestRun',), + ('startTest', test), + ('addSuccess', test), + ('stopTest', test), + ('tags', set(['a']), set(['b'])), + ('stopTestRun',), + ], + result._events) def test_tags_collapsed_inside_of_tests(self): result = ExtendedTestResult() @@ -229,6 +281,25 @@ class TestTagCollapsingDecorator(TestCase): ('stopTest', test)], result._events) + def test_tags_sent_before_result(self): + # Because addSuccess and friends tend to send subunit output + # immediately, and because 'tags:' before a result line means + # something different to 'tags:' after a result line, we need to be + # sure that tags are emitted before 'addSuccess' (or whatever). + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TagCollapsingDecorator(result) + test = subunit.RemotedTestCase('foo') + tag_collapser.startTest(test) + tag_collapser.tags(set(['a']), set()) + tag_collapser.addSuccess(test) + tag_collapser.stopTest(test) + self.assertEquals( + [('startTest', test), + ('tags', set(['a']), set()), + ('addSuccess', test), + ('stopTest', test)], + result._events) + class TestTimeCollapsingDecorator(TestCase): @@ -294,6 +365,201 @@ class TestTimeCollapsingDecorator(TestCase): ('stopTest', foo)], result._events) +class TestByTestResultTests(testtools.TestCase): + + def setUp(self): + super(TestByTestResultTests, self).setUp() + self.log = [] + self.result = subunit.test_results.TestByTestResult(self.on_test) + if sys.version_info >= (3, 0): + self.result._now = iter(range(5)).__next__ + else: + self.result._now = iter(range(5)).next + + def assertCalled(self, **kwargs): + defaults = { + 'test': self, + 'tags': set(), + 'details': None, + 'start_time': 0, + 'stop_time': 1, + } + defaults.update(kwargs) + self.assertEqual([defaults], self.log) + + def on_test(self, **kwargs): + self.log.append(kwargs) + + def test_no_tests_nothing_reported(self): + self.result.startTestRun() + self.result.stopTestRun() + self.assertEqual([], self.log) + + def test_add_success(self): + self.result.startTest(self) + self.result.addSuccess(self) + self.result.stopTest(self) + self.assertCalled(status='success') + + def test_add_success_details(self): + self.result.startTest(self) + details = {'foo': 'bar'} + self.result.addSuccess(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='success', details=details) + + def test_tags(self): + if not getattr(self.result, 'tags', None): + self.skipTest("No tags in testtools") + self.result.tags(['foo'], []) + self.result.startTest(self) + self.result.addSuccess(self) + self.result.stopTest(self) + self.assertCalled(status='success', tags=set(['foo'])) + + def test_add_error(self): + self.result.startTest(self) + try: + 1/0 + except ZeroDivisionError: + error = sys.exc_info() + self.result.addError(self, error) + self.result.stopTest(self) + self.assertCalled( + status='error', + details={'traceback': TracebackContent(error, self)}) + + def test_add_error_details(self): + self.result.startTest(self) + details = {"foo": text_content("bar")} + self.result.addError(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='error', details=details) + + def test_add_failure(self): + self.result.startTest(self) + try: + self.fail("intentional failure") + except self.failureException: + failure = sys.exc_info() + self.result.addFailure(self, failure) + self.result.stopTest(self) + self.assertCalled( + status='failure', + details={'traceback': TracebackContent(failure, self)}) + + def test_add_failure_details(self): + self.result.startTest(self) + details = {"foo": text_content("bar")} + self.result.addFailure(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='failure', details=details) + + def test_add_xfail(self): + self.result.startTest(self) + try: + 1/0 + except ZeroDivisionError: + error = sys.exc_info() + self.result.addExpectedFailure(self, error) + self.result.stopTest(self) + self.assertCalled( + status='xfail', + details={'traceback': TracebackContent(error, self)}) + + def test_add_xfail_details(self): + self.result.startTest(self) + details = {"foo": text_content("bar")} + self.result.addExpectedFailure(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='xfail', details=details) + + def test_add_unexpected_success(self): + self.result.startTest(self) + details = {'foo': 'bar'} + self.result.addUnexpectedSuccess(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='success', details=details) + + def test_add_skip_reason(self): + self.result.startTest(self) + reason = self.getUniqueString() + self.result.addSkip(self, reason) + self.result.stopTest(self) + self.assertCalled( + status='skip', details={'reason': text_content(reason)}) + + def test_add_skip_details(self): + self.result.startTest(self) + details = {'foo': 'bar'} + self.result.addSkip(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='skip', details=details) + + def test_twice(self): + self.result.startTest(self) + self.result.addSuccess(self, details={'foo': 'bar'}) + self.result.stopTest(self) + self.result.startTest(self) + self.result.addSuccess(self) + self.result.stopTest(self) + self.assertEqual( + [{'test': self, + 'status': 'success', + 'start_time': 0, + 'stop_time': 1, + 'tags': set(), + 'details': {'foo': 'bar'}}, + {'test': self, + 'status': 'success', + 'start_time': 2, + 'stop_time': 3, + 'tags': set(), + 'details': None}, + ], + self.log) + + +class TestCsvResult(testtools.TestCase): + + def parse_stream(self, stream): + stream.seek(0) + reader = csv.reader(stream) + return list(reader) + + def test_csv_output(self): + stream = StringIO() + result = subunit.test_results.CsvResult(stream) + if sys.version_info >= (3, 0): + result._now = iter(range(5)).__next__ + else: + result._now = iter(range(5)).next + result.startTestRun() + result.startTest(self) + result.addSuccess(self) + result.stopTest(self) + result.stopTestRun() + self.assertEqual( + [['test', 'status', 'start_time', 'stop_time'], + [self.id(), 'success', '0', '1'], + ], + self.parse_stream(stream)) + + def test_just_header_when_no_tests(self): + stream = StringIO() + result = subunit.test_results.CsvResult(stream) + result.startTestRun() + result.stopTestRun() + self.assertEqual( + [['test', 'status', 'start_time', 'stop_time']], + self.parse_stream(stream)) + + def test_no_output_before_events(self): + stream = StringIO() + subunit.test_results.CsvResult(stream) + self.assertEqual([], self.parse_stream(stream)) + + def test_suite(): loader = subunit.tests.TestUtil.TestLoader() result = loader.loadTestsFromName(__name__) |