From f9facb51207713d1f294038e68909ba7701c61e4 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Wed, 8 Oct 2008 02:17:47 +0200 Subject: Move all subunit files to lib directory. --- lib/subunit/python/__init__.py | 388 +++++++++++++ lib/subunit/python/tests/TestUtil.py | 80 +++ lib/subunit/python/tests/__init__.py | 25 + lib/subunit/python/tests/sample-script.py | 11 + lib/subunit/python/tests/sample-two-script.py | 7 + lib/subunit/python/tests/test_test_protocol.py | 730 +++++++++++++++++++++++++ 6 files changed, 1241 insertions(+) create mode 100644 lib/subunit/python/__init__.py create mode 100644 lib/subunit/python/tests/TestUtil.py create mode 100644 lib/subunit/python/tests/__init__.py create mode 100755 lib/subunit/python/tests/sample-script.py create mode 100755 lib/subunit/python/tests/sample-two-script.py create mode 100644 lib/subunit/python/tests/test_test_protocol.py (limited to 'lib/subunit') diff --git a/lib/subunit/python/__init__.py b/lib/subunit/python/__init__.py new file mode 100644 index 0000000000..406cd8765b --- /dev/null +++ b/lib/subunit/python/__init__.py @@ -0,0 +1,388 @@ +# +# subunit: extensions to python unittest to get test results from subprocesses. +# Copyright (C) 2005 Robert Collins +# Copyright (C) 2007 Jelmer Vernooij +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +import os +from StringIO import StringIO +import sys +import unittest + +def test_suite(): + import subunit.tests + return subunit.tests.test_suite() + + +def join_dir(base_path, path): + """ + Returns an absolute path to C{path}, calculated relative to the parent + of C{base_path}. + + @param base_path: A path to a file or directory. + @param path: An absolute path, or a path relative to the containing + directory of C{base_path}. + + @return: An absolute path to C{path}. + """ + return os.path.join(os.path.dirname(os.path.abspath(base_path)), path) + + +class TestProtocolServer(object): + """A class for receiving results from a TestProtocol client.""" + + OUTSIDE_TEST = 0 + TEST_STARTED = 1 + READING_FAILURE = 2 + READING_ERROR = 3 + + def __init__(self, client, stream=sys.stdout): + """Create a TestProtocol server instance. + + client should be an object that provides + - startTest + - addSuccess + - addFailure + - addError + - stopTest + methods, i.e. a TestResult. + """ + self.state = TestProtocolServer.OUTSIDE_TEST + self.client = client + self._stream = stream + + def _addError(self, offset, line): + if (self.state == TestProtocolServer.TEST_STARTED and + self.current_test_description == line[offset:-1]): + self.state = TestProtocolServer.OUTSIDE_TEST + self.current_test_description = None + self.client.addError(self._current_test, RemoteError("")) + self.client.stopTest(self._current_test) + self._current_test = None + elif (self.state == TestProtocolServer.TEST_STARTED and + self.current_test_description + " [" == line[offset:-1]): + self.state = TestProtocolServer.READING_ERROR + self._message = "" + else: + self.stdOutLineReceived(line) + + def _addFailure(self, offset, line): + if (self.state == TestProtocolServer.TEST_STARTED and + self.current_test_description == line[offset:-1]): + self.state = TestProtocolServer.OUTSIDE_TEST + self.current_test_description = None + self.client.addFailure(self._current_test, RemoteError()) + self.client.stopTest(self._current_test) + elif (self.state == TestProtocolServer.TEST_STARTED and + self.current_test_description + " [" == line[offset:-1]): + self.state = TestProtocolServer.READING_FAILURE + self._message = "" + else: + self.stdOutLineReceived(line) + + def _addSuccess(self, offset, line): + if (self.state == TestProtocolServer.TEST_STARTED and + self.current_test_description == line[offset:-1]): + self.client.addSuccess(self._current_test) + self.client.stopTest(self._current_test) + self.current_test_description = None + self._current_test = None + self.state = TestProtocolServer.OUTSIDE_TEST + else: + self.stdOutLineReceived(line) + + def _appendMessage(self, line): + if line[0:2] == " ]": + # quoted ] start + self._message += line[1:] + else: + self._message += line + + def endQuote(self, line): + if self.state == TestProtocolServer.READING_FAILURE: + self.state = TestProtocolServer.OUTSIDE_TEST + self.current_test_description = None + self.client.addFailure(self._current_test, + RemoteError(self._message)) + self.client.stopTest(self._current_test) + elif self.state == TestProtocolServer.READING_ERROR: + self.state = TestProtocolServer.OUTSIDE_TEST + self.current_test_description = None + self.client.addError(self._current_test, + RemoteError(self._message)) + self.client.stopTest(self._current_test) + else: + self.stdOutLineReceived(line) + + def lineReceived(self, line): + """Call the appropriate local method for the received line.""" + if line == "]\n": + self.endQuote(line) + elif (self.state == TestProtocolServer.READING_FAILURE or + self.state == TestProtocolServer.READING_ERROR): + self._appendMessage(line) + else: + parts = line.split(None, 1) + if len(parts) == 2: + cmd, rest = parts + offset = len(cmd) + 1 + cmd = cmd.strip(':') + if cmd in ('test', 'testing'): + self._startTest(offset, line) + elif cmd == 'error': + self._addError(offset, line) + elif cmd == 'failure': + self._addFailure(offset, line) + elif cmd in ('success', 'successful'): + self._addSuccess(offset, line) + else: + self.stdOutLineReceived(line) + else: + self.stdOutLineReceived(line) + + def lostConnection(self): + """The input connection has finished.""" + if self.state == TestProtocolServer.TEST_STARTED: + self.client.addError(self._current_test, + RemoteError("lost connection during test '%s'" + % self.current_test_description)) + self.client.stopTest(self._current_test) + elif self.state == TestProtocolServer.READING_ERROR: + self.client.addError(self._current_test, + RemoteError("lost connection during " + "error report of test " + "'%s'" % + self.current_test_description)) + self.client.stopTest(self._current_test) + elif self.state == TestProtocolServer.READING_FAILURE: + self.client.addError(self._current_test, + RemoteError("lost connection during " + "failure report of test " + "'%s'" % + self.current_test_description)) + self.client.stopTest(self._current_test) + + def readFrom(self, pipe): + for line in pipe.readlines(): + self.lineReceived(line) + self.lostConnection() + + def _startTest(self, offset, line): + """Internal call to change state machine. Override startTest().""" + if self.state == TestProtocolServer.OUTSIDE_TEST: + self.state = TestProtocolServer.TEST_STARTED + self._current_test = RemotedTestCase(line[offset:-1]) + self.current_test_description = line[offset:-1] + self.client.startTest(self._current_test) + else: + self.stdOutLineReceived(line) + + def stdOutLineReceived(self, line): + self._stream.write(line) + + +class RemoteException(Exception): + """An exception that occured remotely to python.""" + + def __eq__(self, other): + try: + return self.args == other.args + except AttributeError: + return False + + +class TestProtocolClient(unittest.TestResult): + """A class that looks like a TestResult and informs a TestProtocolServer.""" + + def __init__(self, stream): + super(TestProtocolClient, self).__init__() + self._stream = stream + + def addError(self, test, error): + """Report an error in test test.""" + self._stream.write("error: %s [\n" % (test.shortDescription() or str(test))) + for line in self._exc_info_to_string(error, test).splitlines(): + self._stream.write("%s\n" % line) + self._stream.write("]\n") + super(TestProtocolClient, self).addError(test, error) + + def addFailure(self, test, error): + """Report a failure in test test.""" + self._stream.write("failure: %s [\n" % (test.shortDescription() or str(test))) + for line in self._exc_info_to_string(error, test).splitlines(): + self._stream.write("%s\n" % line) + self._stream.write("]\n") + super(TestProtocolClient, self).addFailure(test, error) + + def addSuccess(self, test): + """Report a success in a test.""" + self._stream.write("successful: %s\n" % (test.shortDescription() or str(test))) + super(TestProtocolClient, self).addSuccess(test) + + def startTest(self, test): + """Mark a test as starting its test run.""" + self._stream.write("test: %s\n" % (test.shortDescription() or str(test))) + super(TestProtocolClient, self).startTest(test) + + +def RemoteError(description=""): + if description == "": + description = "\n" + return (RemoteException, RemoteException(description), None) + + +class RemotedTestCase(unittest.TestCase): + """A class to represent test cases run in child processes.""" + + def __eq__ (self, other): + try: + return self.__description == other.__description + except AttributeError: + return False + + def __init__(self, description): + """Create a psuedo test case with description description.""" + self.__description = description + + def error(self, label): + raise NotImplementedError("%s on RemotedTestCases is not permitted." % + label) + + def setUp(self): + self.error("setUp") + + def tearDown(self): + self.error("tearDown") + + def shortDescription(self): + return self.__description + + def id(self): + return "%s.%s" % (self._strclass(), self.__description) + + def __str__(self): + return "%s (%s)" % (self.__description, self._strclass()) + + def __repr__(self): + return "<%s description='%s'>" % \ + (self._strclass(), self.__description) + + def run(self, result=None): + if result is None: result = self.defaultTestResult() + result.startTest(self) + result.addError(self, RemoteError("Cannot run RemotedTestCases.\n")) + result.stopTest(self) + + def _strclass(self): + cls = self.__class__ + return "%s.%s" % (cls.__module__, cls.__name__) + + +class ExecTestCase(unittest.TestCase): + """A test case which runs external scripts for test fixtures.""" + + def __init__(self, methodName='runTest'): + """Create an instance of the class that will use the named test + method when executed. Raises a ValueError if the instance does + not have a method with the specified name. + """ + unittest.TestCase.__init__(self, methodName) + testMethod = getattr(self, methodName) + self.script = join_dir(sys.modules[self.__class__.__module__].__file__, + testMethod.__doc__) + + def countTestCases(self): + return 1 + + def run(self, result=None): + if result is None: result = self.defaultTestResult() + self._run(result) + + def debug(self): + """Run the test without collecting errors in a TestResult""" + self._run(unittest.TestResult()) + + def _run(self, result): + protocol = TestProtocolServer(result) + output = os.popen(self.script, mode='r') + protocol.readFrom(output) + + +class IsolatedTestCase(unittest.TestCase): + """A TestCase which runs its tests in a forked process.""" + + def run(self, result=None): + if result is None: result = self.defaultTestResult() + run_isolated(unittest.TestCase, self, result) + + +class IsolatedTestSuite(unittest.TestSuite): + """A TestCase which runs its tests in a forked process.""" + + def run(self, result=None): + if result is None: result = unittest.TestResult() + run_isolated(unittest.TestSuite, self, result) + + +def run_isolated(klass, self, result): + """Run a test suite or case in a subprocess, using the run method on klass. + """ + c2pread, c2pwrite = os.pipe() + # fixme - error -> result + # now fork + pid = os.fork() + if pid == 0: + # Child + # Close parent's pipe ends + os.close(c2pread) + # Dup fds for child + os.dup2(c2pwrite, 1) + # Close pipe fds. + os.close(c2pwrite) + + # at this point, sys.stdin is redirected, now we want + # to filter it to escape ]'s. + ### XXX: test and write that bit. + + result = TestProtocolClient(sys.stdout) + klass.run(self, result) + sys.stdout.flush() + sys.stderr.flush() + # exit HARD, exit NOW. + os._exit(0) + else: + # Parent + # Close child pipe ends + os.close(c2pwrite) + # hookup a protocol engine + protocol = TestProtocolServer(result) + protocol.readFrom(os.fdopen(c2pread, 'rU')) + os.waitpid(pid, 0) + # TODO return code evaluation. + return result + + +class SubunitTestRunner(object): + def __init__(self, stream=sys.stdout): + self.stream = stream + + def run(self, test): + "Run the given test case or test suite." + result = TestProtocolClient(self.stream) + test(result) + return result + diff --git a/lib/subunit/python/tests/TestUtil.py b/lib/subunit/python/tests/TestUtil.py new file mode 100644 index 0000000000..1b5ba9c293 --- /dev/null +++ b/lib/subunit/python/tests/TestUtil.py @@ -0,0 +1,80 @@ +# Copyright (c) 2004 Canonical Limited +# Author: Robert Collins +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +import sys +import logging +import unittest + + +class LogCollector(logging.Handler): + def __init__(self): + logging.Handler.__init__(self) + self.records=[] + def emit(self, record): + self.records.append(record.getMessage()) + + +def makeCollectingLogger(): + """I make a logger instance that collects its logs for programmatic analysis + -> (logger, collector)""" + logger=logging.Logger("collector") + handler=LogCollector() + handler.setFormatter(logging.Formatter("%(levelname)s: %(message)s")) + logger.addHandler(handler) + return logger, handler + + +def visitTests(suite, visitor): + """A foreign method for visiting the tests in a test suite.""" + for test in suite._tests: + #Abusing types to avoid monkey patching unittest.TestCase. + # Maybe that would be better? + try: + test.visit(visitor) + except AttributeError: + if isinstance(test, unittest.TestCase): + visitor.visitCase(test) + elif isinstance(test, unittest.TestSuite): + visitor.visitSuite(test) + visitTests(test, visitor) + else: + print "unvisitable non-unittest.TestCase element %r (%r)" % (test, test.__class__) + + +class TestSuite(unittest.TestSuite): + """I am an extended TestSuite with a visitor interface. + This is primarily to allow filtering of tests - and suites or + more in the future. An iterator of just tests wouldn't scale...""" + + def visit(self, visitor): + """visit the composite. Visiting is depth-first. + current callbacks are visitSuite and visitCase.""" + visitor.visitSuite(self) + visitTests(self, visitor) + + +class TestLoader(unittest.TestLoader): + """Custome TestLoader to set the right TestSuite class.""" + suiteClass = TestSuite + +class TestVisitor(object): + """A visitor for Tests""" + def visitSuite(self, aTestSuite): + pass + def visitCase(self, aTestCase): + pass diff --git a/lib/subunit/python/tests/__init__.py b/lib/subunit/python/tests/__init__.py new file mode 100644 index 0000000000..544d0e704f --- /dev/null +++ b/lib/subunit/python/tests/__init__.py @@ -0,0 +1,25 @@ +# +# subunit: extensions to python unittest to get test results from subprocesses. +# Copyright (C) 2005 Robert Collins +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +from subunit.tests import TestUtil, test_test_protocol + +def test_suite(): + result = TestUtil.TestSuite() + result.addTest(test_test_protocol.test_suite()) + return result diff --git a/lib/subunit/python/tests/sample-script.py b/lib/subunit/python/tests/sample-script.py new file mode 100755 index 0000000000..223d2f5d9f --- /dev/null +++ b/lib/subunit/python/tests/sample-script.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +import sys +print "test old mcdonald" +print "success old mcdonald" +print "test bing crosby" +print "failure bing crosby [" +print "foo.c:53:ERROR invalid state" +print "]" +print "test an error" +print "error an error" +sys.exit(0) diff --git a/lib/subunit/python/tests/sample-two-script.py b/lib/subunit/python/tests/sample-two-script.py new file mode 100755 index 0000000000..d5550842bf --- /dev/null +++ b/lib/subunit/python/tests/sample-two-script.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +import sys +print "test old mcdonald" +print "success old mcdonald" +print "test bing crosby" +print "success bing crosby" +sys.exit(0) diff --git a/lib/subunit/python/tests/test_test_protocol.py b/lib/subunit/python/tests/test_test_protocol.py new file mode 100644 index 0000000000..af31584a97 --- /dev/null +++ b/lib/subunit/python/tests/test_test_protocol.py @@ -0,0 +1,730 @@ +# +# subunit: extensions to python unittest to get test results from subprocesses. +# Copyright (C) 2005 Robert Collins +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +import unittest +from StringIO import StringIO +import os +import subunit +import sys + +try: + class MockTestProtocolServerClient(object): + """A mock protocol server client to test callbacks.""" + + def __init__(self): + self.end_calls = [] + self.error_calls = [] + self.failure_calls = [] + self.start_calls = [] + self.success_calls = [] + super(MockTestProtocolServerClient, self).__init__() + + def addError(self, test, error): + self.error_calls.append((test, error)) + + def addFailure(self, test, error): + self.failure_calls.append((test, error)) + + def addSuccess(self, test): + self.success_calls.append(test) + + def stopTest(self, test): + self.end_calls.append(test) + + def startTest(self, test): + self.start_calls.append(test) + +except AttributeError: + MockTestProtocolServer = None + + +class TestMockTestProtocolServer(unittest.TestCase): + + def test_start_test(self): + protocol = MockTestProtocolServerClient() + protocol.startTest(subunit.RemotedTestCase("test old mcdonald")) + self.assertEqual(protocol.start_calls, + [subunit.RemotedTestCase("test old mcdonald")]) + self.assertEqual(protocol.end_calls, []) + self.assertEqual(protocol.error_calls, []) + self.assertEqual(protocol.failure_calls, []) + self.assertEqual(protocol.success_calls, []) + + def test_add_error(self): + protocol = MockTestProtocolServerClient() + protocol.addError(subunit.RemotedTestCase("old mcdonald"), + subunit.RemoteError("omg it works")) + self.assertEqual(protocol.start_calls, []) + self.assertEqual(protocol.end_calls, []) + self.assertEqual(protocol.error_calls, [( + subunit.RemotedTestCase("old mcdonald"), + subunit.RemoteError("omg it works"))]) + self.assertEqual(protocol.failure_calls, []) + self.assertEqual(protocol.success_calls, []) + + def test_add_failure(self): + protocol = MockTestProtocolServerClient() + protocol.addFailure(subunit.RemotedTestCase("old mcdonald"), + subunit.RemoteError("omg it works")) + self.assertEqual(protocol.start_calls, []) + self.assertEqual(protocol.end_calls, []) + self.assertEqual(protocol.error_calls, []) + self.assertEqual(protocol.failure_calls, [ + (subunit.RemotedTestCase("old mcdonald"), + subunit.RemoteError("omg it works"))]) + self.assertEqual(protocol.success_calls, []) + + def test_add_success(self): + protocol = MockTestProtocolServerClient() + protocol.addSuccess(subunit.RemotedTestCase("test old mcdonald")) + self.assertEqual(protocol.start_calls, []) + self.assertEqual(protocol.end_calls, []) + self.assertEqual(protocol.error_calls, []) + self.assertEqual(protocol.failure_calls, []) + self.assertEqual(protocol.success_calls, + [subunit.RemotedTestCase("test old mcdonald")]) + + def test_end_test(self): + protocol = MockTestProtocolServerClient() + protocol.stopTest(subunit.RemotedTestCase("test old mcdonald")) + self.assertEqual(protocol.end_calls, + [subunit.RemotedTestCase("test old mcdonald")]) + self.assertEqual(protocol.error_calls, []) + self.assertEqual(protocol.failure_calls, []) + self.assertEqual(protocol.success_calls, []) + self.assertEqual(protocol.start_calls, []) + + +class TestTestImports(unittest.TestCase): + + def test_imports(self): + from subunit import TestProtocolServer + from subunit import RemotedTestCase + from subunit import RemoteError + from subunit import ExecTestCase + from subunit import IsolatedTestCase + from subunit import TestProtocolClient + + +class TestTestProtocolServerPipe(unittest.TestCase): + + def test_story(self): + client = unittest.TestResult() + protocol = subunit.TestProtocolServer(client) + pipe = StringIO("test old mcdonald\n" + "success old mcdonald\n" + "test bing crosby\n" + "failure bing crosby [\n" + "foo.c:53:ERROR invalid state\n" + "]\n" + "test an error\n" + "error an error\n") + protocol.readFrom(pipe) + mcdonald = subunit.RemotedTestCase("old mcdonald") + bing = subunit.RemotedTestCase("bing crosby") + an_error = subunit.RemotedTestCase("an error") + self.assertEqual(client.errors, + [(an_error, 'RemoteException: \n\n')]) + self.assertEqual( + client.failures, + [(bing, "RemoteException: foo.c:53:ERROR invalid state\n\n")]) + self.assertEqual(client.testsRun, 3) + + +class TestTestProtocolServerStartTest(unittest.TestCase): + + def setUp(self): + self.client = MockTestProtocolServerClient() + self.protocol = subunit.TestProtocolServer(self.client) + + def test_start_test(self): + self.protocol.lineReceived("test old mcdonald\n") + self.assertEqual(self.client.start_calls, + [subunit.RemotedTestCase("old mcdonald")]) + + def test_start_testing(self): + self.protocol.lineReceived("testing old mcdonald\n") + self.assertEqual(self.client.start_calls, + [subunit.RemotedTestCase("old mcdonald")]) + + def test_start_test_colon(self): + self.protocol.lineReceived("test: old mcdonald\n") + self.assertEqual(self.client.start_calls, + [subunit.RemotedTestCase("old mcdonald")]) + + def test_start_testing_colon(self): + self.protocol.lineReceived("testing: old mcdonald\n") + self.assertEqual(self.client.start_calls, + [subunit.RemotedTestCase("old mcdonald")]) + + +class TestTestProtocolServerPassThrough(unittest.TestCase): + + def setUp(self): + from StringIO import StringIO + self.stdout = StringIO() + self.test = subunit.RemotedTestCase("old mcdonald") + self.client = MockTestProtocolServerClient() + self.protocol = subunit.TestProtocolServer(self.client, self.stdout) + + def keywords_before_test(self): + self.protocol.lineReceived("failure a\n") + self.protocol.lineReceived("failure: a\n") + self.protocol.lineReceived("error a\n") + self.protocol.lineReceived("error: a\n") + self.protocol.lineReceived("success a\n") + self.protocol.lineReceived("success: a\n") + self.protocol.lineReceived("successful a\n") + self.protocol.lineReceived("successful: a\n") + self.protocol.lineReceived("]\n") + self.assertEqual(self.stdout.getvalue(), "failure a\n" + "failure: a\n" + "error a\n" + "error: a\n" + "success a\n" + "success: a\n" + "successful a\n" + "successful: a\n" + "]\n") + + def test_keywords_before_test(self): + self.keywords_before_test() + self.assertEqual(self.client.start_calls, []) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, []) + + def test_keywords_after_error(self): + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lineReceived("error old mcdonald\n") + self.keywords_before_test() + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, + [(self.test, subunit.RemoteError(""))]) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, []) + + def test_keywords_after_failure(self): + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lineReceived("failure old mcdonald\n") + self.keywords_before_test() + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, + [(self.test, subunit.RemoteError())]) + self.assertEqual(self.client.success_calls, []) + + def test_keywords_after_success(self): + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lineReceived("success old mcdonald\n") + self.keywords_before_test() + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, [self.test]) + + def test_keywords_after_test(self): + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lineReceived("failure a\n") + self.protocol.lineReceived("failure: a\n") + self.protocol.lineReceived("error a\n") + self.protocol.lineReceived("error: a\n") + self.protocol.lineReceived("success a\n") + self.protocol.lineReceived("success: a\n") + self.protocol.lineReceived("successful a\n") + self.protocol.lineReceived("successful: a\n") + self.protocol.lineReceived("]\n") + self.protocol.lineReceived("failure old mcdonald\n") + self.assertEqual(self.stdout.getvalue(), "test old mcdonald\n" + "failure a\n" + "failure: a\n" + "error a\n" + "error: a\n" + "success a\n" + "success: a\n" + "successful a\n" + "successful: a\n" + "]\n") + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.failure_calls, + [(self.test, subunit.RemoteError())]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.success_calls, []) + + def test_keywords_during_failure(self): + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lineReceived("failure: old mcdonald [\n") + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lineReceived("failure a\n") + self.protocol.lineReceived("failure: a\n") + self.protocol.lineReceived("error a\n") + self.protocol.lineReceived("error: a\n") + self.protocol.lineReceived("success a\n") + self.protocol.lineReceived("success: a\n") + self.protocol.lineReceived("successful a\n") + self.protocol.lineReceived("successful: a\n") + self.protocol.lineReceived(" ]\n") + self.protocol.lineReceived("]\n") + self.assertEqual(self.stdout.getvalue(), "") + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.failure_calls, + [(self.test, subunit.RemoteError("test old mcdonald\n" + "failure a\n" + "failure: a\n" + "error a\n" + "error: a\n" + "success a\n" + "success: a\n" + "successful a\n" + "successful: a\n" + "]\n"))]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.success_calls, []) + + def test_stdout_passthrough(self): + """Lines received which cannot be interpreted as any protocol action + should be passed through to sys.stdout. + """ + bytes = "randombytes\n" + self.protocol.lineReceived(bytes) + self.assertEqual(self.stdout.getvalue(), bytes) + + +class TestTestProtocolServerLostConnection(unittest.TestCase): + + def setUp(self): + self.client = MockTestProtocolServerClient() + self.protocol = subunit.TestProtocolServer(self.client) + self.test = subunit.RemotedTestCase("old mcdonald") + + def test_lost_connection_no_input(self): + self.protocol.lostConnection() + self.assertEqual(self.client.start_calls, []) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, []) + + def test_lost_connection_after_start(self): + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lostConnection() + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, [ + (self.test, subunit.RemoteError("lost connection during " + "test 'old mcdonald'"))]) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, []) + + def test_lost_connected_after_error(self): + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lineReceived("error old mcdonald\n") + self.protocol.lostConnection() + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, [ + (self.test, subunit.RemoteError(""))]) + self.assertEqual(self.client.success_calls, []) + + def test_lost_connection_during_error(self): + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lineReceived("error old mcdonald [\n") + self.protocol.lostConnection() + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, [ + (self.test, subunit.RemoteError("lost connection during error " + "report of test 'old mcdonald'"))]) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, []) + + def test_lost_connected_after_failure(self): + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lineReceived("failure old mcdonald\n") + self.protocol.lostConnection() + test = subunit.RemotedTestCase("old mcdonald") + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, + [(self.test, subunit.RemoteError())]) + self.assertEqual(self.client.success_calls, []) + + def test_lost_connection_during_failure(self): + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lineReceived("failure old mcdonald [\n") + self.protocol.lostConnection() + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, + [(self.test, + subunit.RemoteError("lost connection during " + "failure report" + " of test 'old mcdonald'"))]) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, []) + + def test_lost_connection_after_success(self): + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lineReceived("success old mcdonald\n") + self.protocol.lostConnection() + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, [self.test]) + + +class TestTestProtocolServerAddError(unittest.TestCase): + + def setUp(self): + self.client = MockTestProtocolServerClient() + self.protocol = subunit.TestProtocolServer(self.client) + self.protocol.lineReceived("test mcdonalds farm\n") + self.test = subunit.RemotedTestCase("mcdonalds farm") + + def simple_error_keyword(self, keyword): + self.protocol.lineReceived("%s mcdonalds farm\n" % keyword) + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, [ + (self.test, subunit.RemoteError(""))]) + self.assertEqual(self.client.failure_calls, []) + + def test_simple_error(self): + self.simple_error_keyword("error") + + def test_simple_error_colon(self): + self.simple_error_keyword("error:") + + def test_error_empty_message(self): + self.protocol.lineReceived("error mcdonalds farm [\n") + self.protocol.lineReceived("]\n") + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, [ + (self.test, subunit.RemoteError(""))]) + self.assertEqual(self.client.failure_calls, []) + + def error_quoted_bracket(self, keyword): + self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword) + self.protocol.lineReceived(" ]\n") + self.protocol.lineReceived("]\n") + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, [ + (self.test, subunit.RemoteError("]\n"))]) + self.assertEqual(self.client.failure_calls, []) + + def test_error_quoted_bracket(self): + self.error_quoted_bracket("error") + + def test_error_colon_quoted_bracket(self): + self.error_quoted_bracket("error:") + + +class TestTestProtocolServerAddFailure(unittest.TestCase): + + def setUp(self): + self.client = MockTestProtocolServerClient() + self.protocol = subunit.TestProtocolServer(self.client) + self.protocol.lineReceived("test mcdonalds farm\n") + self.test = subunit.RemotedTestCase("mcdonalds farm") + + def simple_failure_keyword(self, keyword): + self.protocol.lineReceived("%s mcdonalds farm\n" % keyword) + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, + [(self.test, subunit.RemoteError())]) + + def test_simple_failure(self): + self.simple_failure_keyword("failure") + + def test_simple_failure_colon(self): + self.simple_failure_keyword("failure:") + + def test_failure_empty_message(self): + self.protocol.lineReceived("failure mcdonalds farm [\n") + self.protocol.lineReceived("]\n") + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, + [(self.test, subunit.RemoteError())]) + + def failure_quoted_bracket(self, keyword): + self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword) + self.protocol.lineReceived(" ]\n") + self.protocol.lineReceived("]\n") + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, + [(self.test, subunit.RemoteError("]\n"))]) + + def test_failure_quoted_bracket(self): + self.failure_quoted_bracket("failure") + + def test_failure_colon_quoted_bracket(self): + self.failure_quoted_bracket("failure:") + + +class TestTestProtocolServerAddSuccess(unittest.TestCase): + + def setUp(self): + self.client = MockTestProtocolServerClient() + self.protocol = subunit.TestProtocolServer(self.client) + self.protocol.lineReceived("test mcdonalds farm\n") + self.test = subunit.RemotedTestCase("mcdonalds farm") + + def simple_success_keyword(self, keyword): + self.protocol.lineReceived("%s mcdonalds farm\n" % keyword) + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.success_calls, [self.test]) + + def test_simple_success(self): + self.simple_success_keyword("failure") + + def test_simple_success_colon(self): + self.simple_success_keyword("failure:") + + def test_simple_success(self): + self.simple_success_keyword("successful") + + def test_simple_success_colon(self): + self.simple_success_keyword("successful:") + + +class TestRemotedTestCase(unittest.TestCase): + + def test_simple(self): + test = subunit.RemotedTestCase("A test description") + self.assertRaises(NotImplementedError, test.setUp) + self.assertRaises(NotImplementedError, test.tearDown) + self.assertEqual("A test description", + test.shortDescription()) + self.assertEqual("subunit.RemotedTestCase.A test description", + test.id()) + self.assertEqual("A test description (subunit.RemotedTestCase)", "%s" % test) + self.assertEqual("", "%r" % test) + result = unittest.TestResult() + test.run(result) + self.assertEqual([(test, "RemoteException: " + "Cannot run RemotedTestCases.\n\n")], + result.errors) + self.assertEqual(1, result.testsRun) + another_test = subunit.RemotedTestCase("A test description") + self.assertEqual(test, another_test) + different_test = subunit.RemotedTestCase("ofo") + self.assertNotEqual(test, different_test) + self.assertNotEqual(another_test, different_test) + + +class TestRemoteError(unittest.TestCase): + + def test_eq(self): + error = subunit.RemoteError("Something went wrong") + another_error = subunit.RemoteError("Something went wrong") + different_error = subunit.RemoteError("boo!") + self.assertEqual(error, another_error) + self.assertNotEqual(error, different_error) + self.assertNotEqual(different_error, another_error) + + def test_empty_constructor(self): + self.assertEqual(subunit.RemoteError(), subunit.RemoteError("")) + + +class TestExecTestCase(unittest.TestCase): + + class SampleExecTestCase(subunit.ExecTestCase): + + def test_sample_method(self): + """sample-script.py""" + # the sample script runs three tests, one each + # that fails, errors and succeeds + + + def test_construct(self): + test = self.SampleExecTestCase("test_sample_method") + self.assertEqual(test.script, + subunit.join_dir(__file__, 'sample-script.py')) + + def test_run(self): + runner = MockTestProtocolServerClient() + test = self.SampleExecTestCase("test_sample_method") + test.run(runner) + mcdonald = subunit.RemotedTestCase("old mcdonald") + bing = subunit.RemotedTestCase("bing crosby") + an_error = subunit.RemotedTestCase("an error") + self.assertEqual(runner.error_calls, + [(an_error, subunit.RemoteError())]) + self.assertEqual(runner.failure_calls, + [(bing, + subunit.RemoteError( + "foo.c:53:ERROR invalid state\n"))]) + self.assertEqual(runner.start_calls, [mcdonald, bing, an_error]) + self.assertEqual(runner.end_calls, [mcdonald, bing, an_error]) + + def test_debug(self): + test = self.SampleExecTestCase("test_sample_method") + test.debug() + + def test_count_test_cases(self): + """TODO run the child process and count responses to determine the count.""" + + def test_join_dir(self): + sibling = subunit.join_dir(__file__, 'foo') + expected = '%s/foo' % (os.path.split(__file__)[0],) + self.assertEqual(sibling, expected) + + +class DoExecTestCase(subunit.ExecTestCase): + + def test_working_script(self): + """sample-two-script.py""" + + +class TestIsolatedTestCase(unittest.TestCase): + + class SampleIsolatedTestCase(subunit.IsolatedTestCase): + + SETUP = False + TEARDOWN = False + TEST = False + + def setUp(self): + TestIsolatedTestCase.SampleIsolatedTestCase.SETUP = True + + def tearDown(self): + TestIsolatedTestCase.SampleIsolatedTestCase.TEARDOWN = True + + def test_sets_global_state(self): + TestIsolatedTestCase.SampleIsolatedTestCase.TEST = True + + + def test_construct(self): + test = self.SampleIsolatedTestCase("test_sets_global_state") + + def test_run(self): + result = unittest.TestResult() + test = self.SampleIsolatedTestCase("test_sets_global_state") + test.run(result) + self.assertEqual(result.testsRun, 1) + self.assertEqual(self.SampleIsolatedTestCase.SETUP, False) + self.assertEqual(self.SampleIsolatedTestCase.TEARDOWN, False) + self.assertEqual(self.SampleIsolatedTestCase.TEST, False) + + def test_debug(self): + pass + #test = self.SampleExecTestCase("test_sample_method") + #test.debug() + + +class TestIsolatedTestSuite(unittest.TestCase): + + class SampleTestToIsolate(unittest.TestCase): + + SETUP = False + TEARDOWN = False + TEST = False + + def setUp(self): + TestIsolatedTestSuite.SampleTestToIsolate.SETUP = True + + def tearDown(self): + TestIsolatedTestSuite.SampleTestToIsolate.TEARDOWN = True + + def test_sets_global_state(self): + TestIsolatedTestSuite.SampleTestToIsolate.TEST = True + + + def test_construct(self): + suite = subunit.IsolatedTestSuite() + + def test_run(self): + result = unittest.TestResult() + suite = subunit.IsolatedTestSuite() + sub_suite = unittest.TestSuite() + sub_suite.addTest(self.SampleTestToIsolate("test_sets_global_state")) + sub_suite.addTest(self.SampleTestToIsolate("test_sets_global_state")) + suite.addTest(sub_suite) + suite.addTest(self.SampleTestToIsolate("test_sets_global_state")) + suite.run(result) + self.assertEqual(result.testsRun, 3) + self.assertEqual(self.SampleTestToIsolate.SETUP, False) + self.assertEqual(self.SampleTestToIsolate.TEARDOWN, False) + self.assertEqual(self.SampleTestToIsolate.TEST, False) + + +class TestTestProtocolClient(unittest.TestCase): + + def setUp(self): + self.io = StringIO() + self.protocol = subunit.TestProtocolClient(self.io) + self.test = TestTestProtocolClient("test_start_test") + + + def test_start_test(self): + """Test startTest on a TestProtocolClient.""" + self.protocol.startTest(self.test) + self.assertEqual(self.io.getvalue(), "test: %s\n" % self.test.id()) + + def test_stop_test(self): + """Test stopTest on a TestProtocolClient.""" + self.protocol.stopTest(self.test) + self.assertEqual(self.io.getvalue(), "") + + def test_add_success(self): + """Test addSuccess on a TestProtocolClient.""" + self.protocol.addSuccess(self.test) + self.assertEqual( + self.io.getvalue(), "successful: %s\n" % self.test.id()) + + def test_add_failure(self): + """Test addFailure on a TestProtocolClient.""" + self.protocol.addFailure(self.test, subunit.RemoteError("boo")) + self.assertEqual( + self.io.getvalue(), + 'failure: %s [\nRemoteException: boo\n]\n' % self.test.id()) + + def test_add_error(self): + """Test stopTest on a TestProtocolClient.""" + self.protocol.addError(self.test, subunit.RemoteError("phwoar")) + self.assertEqual( + self.io.getvalue(), + 'error: %s [\n' + "RemoteException: phwoar\n" + "]\n" % self.test.id()) + + +def test_suite(): + loader = subunit.tests.TestUtil.TestLoader() + result = loader.loadTestsFromName(__name__) + return result -- cgit