1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
|
#! /usr/bin/env python
# Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
# Copyright (C) 2003 by Tim Potter <tpot@samba.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
"""comfychair: a Python-based instrument of software torture.
Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
Copyright (C) 2003 by Tim Potter <tpot@samba.org>
This is a test framework designed for testing programs written in
Python, or (through a fork/exec interface) any other language.
For more information, see the file README.comfychair.
To run a test suite based on ComfyChair, just run it as a program.
"""
import sys, re
class TestCase:
"""A base class for tests. This class defines required functions which
can optionally be overridden by subclasses. It also provides some
utility functions for"""
def __init__(self):
self.test_log = ""
self.background_pids = []
self._cleanups = []
self._enter_rundir()
self._save_environment()
self.add_cleanup(self.teardown)
# --------------------------------------------------
# Save and restore directory
def _enter_rundir(self):
import os
self.basedir = os.getcwd()
self.add_cleanup(self._restore_directory)
self.rundir = os.path.join(self.basedir,
'testtmp',
self.__class__.__name__)
self.tmpdir = os.path.join(self.rundir, 'tmp')
os.system("rm -fr %s" % self.rundir)
os.makedirs(self.tmpdir)
os.system("mkdir -p %s" % self.rundir)
os.chdir(self.rundir)
def _restore_directory(self):
import os
os.chdir(self.basedir)
# --------------------------------------------------
# Save and restore environment
def _save_environment(self):
import os
self._saved_environ = os.environ.copy()
self.add_cleanup(self._restore_environment)
def _restore_environment(self):
import os
os.environ.clear()
os.environ.update(self._saved_environ)
def setup(self):
"""Set up test fixture."""
pass
def teardown(self):
"""Tear down test fixture."""
pass
def runtest(self):
"""Run the test."""
pass
def add_cleanup(self, c):
"""Queue a cleanup to be run when the test is complete."""
self._cleanups.append(c)
def fail(self, reason = ""):
"""Say the test failed."""
raise AssertionError(reason)
#############################################################
# Requisition methods
def require(self, predicate, message):
"""Check a predicate for running this test.
If the predicate value is not true, the test is skipped with a message explaining
why."""
if not predicate:
raise NotRunError, message
def require_root(self):
"""Skip this test unless run by root."""
import os
self.require(os.getuid() == 0,
"must be root to run this test")
#############################################################
# Assertion methods
def assert_(self, expr, reason = ""):
if not expr:
raise AssertionError(reason)
def assert_equal(self, a, b):
if not a == b:
raise AssertionError("assertEquals failed: %s" % `(a, b)`)
def assert_notequal(self, a, b):
if a == b:
raise AssertionError("assertNotEqual failed: %s" % `(a, b)`)
def assert_re_match(self, pattern, s):
"""Assert that a string matches a particular pattern
Inputs:
pattern string: regular expression
s string: to be matched
Raises:
AssertionError if not matched
"""
if not re.match(pattern, s):
raise AssertionError("string does not match regexp\n"
" string: %s\n"
" re: %s" % (`s`, `pattern`))
def assert_re_search(self, pattern, s):
"""Assert that a string *contains* a particular pattern
Inputs:
pattern string: regular expression
s string: to be searched
Raises:
AssertionError if not matched
"""
if not re.search(pattern, s):
raise AssertionError("string does not contain regexp\n"
" string: %s\n"
" re: %s" % (`s`, `pattern`))
def assert_no_file(self, filename):
import os.path
assert not os.path.exists(filename), ("file exists but should not: %s" % filename)
#############################################################
# Methods for running programs
def runcmd_background(self, cmd):
import os
self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n"
pid = os.fork()
if pid == 0:
# child
try:
os.execvp("/bin/sh", ["/bin/sh", "-c", cmd])
finally:
os._exit(127)
self.test_log = self.test_log + "pid: %d\n" % pid
return pid
def runcmd(self, cmd, expectedResult = 0):
"""Run a command, fail if the command returns an unexpected exit
code. Return the output produced."""
rc, output, stderr = self.runcmd_unchecked(cmd)
if rc != expectedResult:
raise AssertionError("""command returned %d; expected %s: \"%s\"
stdout:
%s
stderr:
%s""" % (rc, expectedResult, cmd, output, stderr))
return output, stderr
def run_captured(self, cmd):
"""Run a command, capturing stdout and stderr.
Based in part on popen2.py
Returns (waitstatus, stdout, stderr)."""
import os, types
pid = os.fork()
if pid == 0:
# child
try:
pid = os.getpid()
openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC
outfd = os.open('%d.out' % pid, openmode, 0666)
os.dup2(outfd, 1)
os.close(outfd)
errfd = os.open('%d.err' % pid, openmode, 0666)
os.dup2(errfd, 2)
os.close(errfd)
if isinstance(cmd, types.StringType):
cmd = ['/bin/sh', '-c', cmd]
os.execvp(cmd[0], cmd)
finally:
os._exit(127)
else:
# parent
exited_pid, waitstatus = os.waitpid(pid, 0)
stdout = open('%d.out' % pid).read()
stderr = open('%d.err' % pid).read()
return waitstatus, stdout, stderr
def runcmd_unchecked(self, cmd, skip_on_noexec = 0):
"""Invoke a command; return (exitcode, stdout, stderr)"""
import os
waitstatus, stdout, stderr = self.run_captured(cmd)
assert not os.WIFSIGNALED(waitstatus), \
("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus)))
rc = os.WEXITSTATUS(waitstatus)
self.test_log = self.test_log + ("""Run command: %s
Wait status: %#x (exit code %d, signal %d)
stdout:
%s
stderr:
%s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus),
stdout, stderr))
if skip_on_noexec and rc == 127:
# Either we could not execute the command or the command
# returned exit code 127. According to system(3) we can't
# tell the difference.
raise NotRunError, "could not execute %s" % `cmd`
return rc, stdout, stderr
def explain_failure(self, exc_info = None):
print "test_log:"
print self.test_log
def log(self, msg):
"""Log a message to the test log. This message is displayed if
the test fails, or when the runtests function is invoked with
the verbose option."""
self.test_log = self.test_log + msg + "\n"
class NotRunError(Exception):
"""Raised if a test must be skipped because of missing resources"""
def __init__(self, value = None):
self.value = value
def _report_error(case, debugger):
"""Ask the test case to explain failure, and optionally run a debugger
Input:
case TestCase instance
debugger if true, a debugger function to be applied to the traceback
"""
import sys
ex = sys.exc_info()
print "-----------------------------------------------------------------"
if ex:
import traceback
traceback.print_exc(file=sys.stdout)
case.explain_failure()
print "-----------------------------------------------------------------"
if debugger:
tb = ex[2]
debugger(tb)
def runtests(test_list, verbose = 0, debugger = None):
"""Run a series of tests.
Inputs:
test_list sequence of TestCase classes
verbose print more information as testing proceeds
debugger debugger object to be applied to errors
Returns:
unix return code: 0 for success, 1 for failures, 2 for test failure
"""
import traceback
ret = 0
for test_class in test_list:
print "%-30s" % _test_name(test_class),
# flush now so that long running tests are easier to follow
sys.stdout.flush()
obj = None
try:
try: # run test and show result
obj = test_class()
obj.setup()
obj.runtest()
print "OK"
except KeyboardInterrupt:
print "INTERRUPT"
_report_error(obj, debugger)
ret = 2
break
except NotRunError, msg:
print "NOTRUN, %s" % msg.value
except:
print "FAIL"
_report_error(obj, debugger)
ret = 1
finally:
while obj and obj._cleanups:
try:
apply(obj._cleanups.pop())
except KeyboardInterrupt:
print "interrupted during teardown"
_report_error(obj, debugger)
ret = 2
break
except:
print "error during teardown"
_report_error(obj, debugger)
ret = 1
# Display log file if we're verbose
if ret == 0 and verbose:
obj.explain_failure()
return ret
def _test_name(test_class):
"""Return a human-readable name for a test class.
"""
try:
return test_class.__name__
except:
return `test_class`
def print_help():
"""Help for people running tests"""
import sys
print """%s: software test suite based on ComfyChair
usage:
To run all tests, just run this program. To run particular tests,
list them on the command line.
options:
--help show usage message
--list list available tests
--verbose, -v show more information while running tests
--post-mortem, -p enter Python debugger on error
""" % sys.argv[0]
def print_list(test_list):
"""Show list of available tests"""
for test_class in test_list:
print " %s" % _test_name(test_class)
def main(tests, extra_tests=[]):
"""Main entry point for test suites based on ComfyChair.
inputs:
tests Sequence of TestCase subclasses to be run by default.
extra_tests Sequence of TestCase subclasses that are available but
not run by default.
Test suites should contain this boilerplate:
if __name__ == '__main__':
comfychair.main(tests)
This function handles standard options such as --help and --list, and
by default runs all tests in the suggested order.
Calls sys.exit() on completion.
"""
from sys import argv
import getopt, sys
opt_verbose = 0
debugger = None
opts, args = getopt.getopt(argv[1:], 'pv',
['help', 'list', 'verbose', 'post-mortem'])
for opt, opt_arg in opts:
if opt == '--help':
print_help()
return
elif opt == '--list':
print_list(tests + extra_tests)
return
elif opt == '--verbose' or opt == '-v':
opt_verbose = 1
elif opt == '--post-mortem' or opt == '-p':
import pdb
debugger = pdb.post_mortem
if args:
all_tests = tests + extra_tests
by_name = {}
for t in all_tests:
by_name[_test_name(t)] = t
which_tests = []
for name in args:
which_tests.append(by_name[name])
else:
which_tests = tests
sys.exit(runtests(which_tests, verbose=opt_verbose,
debugger=debugger))
if __name__ == '__main__':
print __doc__
|