diff options
Diffstat (limited to 'lib/subunit/python/testtools/testsuite.py')
-rw-r--r-- | lib/subunit/python/testtools/testsuite.py | 74 |
1 files changed, 0 insertions, 74 deletions
diff --git a/lib/subunit/python/testtools/testsuite.py b/lib/subunit/python/testtools/testsuite.py deleted file mode 100644 index 26b193799b..0000000000 --- a/lib/subunit/python/testtools/testsuite.py +++ /dev/null @@ -1,74 +0,0 @@ -# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details. - -"""Test suites and related things.""" - -__metaclass__ = type -__all__ = [ - 'ConcurrentTestSuite', - ] - -try: - import Queue -except ImportError: - import queue as Queue -import threading -import unittest - -import testtools - - -class ConcurrentTestSuite(unittest.TestSuite): - """A TestSuite whose run() calls out to a concurrency strategy.""" - - def __init__(self, suite, make_tests): - """Create a ConcurrentTestSuite to execute suite. - - :param suite: A suite to run concurrently. - :param make_tests: A helper function to split the tests in the - ConcurrentTestSuite into some number of concurrently executing - sub-suites. make_tests must take a suite, and return an iterable - of TestCase-like object, each of which must have a run(result) - method. - """ - super(ConcurrentTestSuite, self).__init__([suite]) - self.make_tests = make_tests - - def run(self, result): - """Run the tests concurrently. - - This calls out to the provided make_tests helper, and then serialises - the results so that result only sees activity from one TestCase at - a time. - - ConcurrentTestSuite provides no special mechanism to stop the tests - returned by make_tests, it is up to the make_tests to honour the - shouldStop attribute on the result object they are run with, which will - be set if an exception is raised in the thread which - ConcurrentTestSuite.run is called in. - """ - tests = self.make_tests(self) - try: - threads = {} - queue = Queue.Queue() - result_semaphore = threading.Semaphore(1) - for test in tests: - process_result = testtools.ThreadsafeForwardingResult(result, - result_semaphore) - reader_thread = threading.Thread( - target=self._run_test, args=(test, process_result, queue)) - threads[test] = reader_thread, process_result - reader_thread.start() - while threads: - finished_test = queue.get() - threads[finished_test][0].join() - del threads[finished_test] - except: - for thread, process_result in threads.values(): - process_result.stop() - raise - - def _run_test(self, test, process_result, queue): - try: - test.run(process_result) - finally: - queue.put(test) |