1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Framework for writing automated integration tests.
24
25 This module provides a way of writing automated integration tests from
26 within Twisted's unit testing framework, trial. Test cases are
27 constructed as subclasses of the normal trial
28 L{twisted.trial.unittest.TestCase} class.
29
30 Integration tests look like normal test methods, except that they are
31 decorated with L{integration.test}, take an extra "plan" argument, and
32 do not return anything. For example::
33
34 from twisted.trial import unittest
35 from flumotion.twisted import integration
36
37 class IntegrationTestExample(unittest.TestCase):
38 @integration.test
39 def testEchoFunctionality(self, plan):
40 process = plan.spawn('echo', 'hello world')
41 plan.wait(process, 0)
42
43 This example will spawn a process, as if you typed "echo 'hello world'"
44 at the shell prompt. It then waits for the process to exit, expecting
45 the exit status to be 0.
46
47 The example illustrates two of the fundamental plan operators, spawn and
48 wait. "spawn" spawns a process. "wait" waits for a process to finish.
49 The other operators are "spawnPar", which spawns a number of processes
50 in parallel, "waitPar", which waits for a number of processes in
51 parallel, and "kill", which kills one or more processes via SIGTERM and
52 then waits for them to exit.
53
54 It is evident that this framework is most appropriate for testing the
55 integration of multiple processes, and is not suitable for in-process
56 tests. The plan that is built up is only executed after the test method
57 exits, via the L{integration.test} decorator; the writer of the
58 integration test does not have access to the plan's state.
59
60 Note that all process exits must be anticipated. If at any point the
61 integration tester receives SIGCHLD, the next operation must be a wait
62 for that process. If this is not the case, the test is interpreted as
63 having failed.
64
65 Also note that while the test is running, the stdout and stderr of each
66 spawned process is redirected into log files in a subdirectory of where
67 the test is located. For example, in the previous example, the following
68 files will be created::
69
70 $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stdout
71 $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stderr
72
73 In the case that multiple echo commands are run in the same plan, the
74 subsequent commands will be named as echo-1, echo-2, and the like. Upon
75 successful completion of the test case, the log directory will be
76 deleted.
77 """
78
79 import os
80 import signal
81
82 from twisted.python import failure
83 from twisted.internet import reactor, protocol, defer
84 from flumotion.common import log as flog
85
86 __version__ = "$Rev: 6561 $"
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 reactor.wakeUp = lambda: reactor.waker and reactor.waker.wakeUp()
110
111 -def log(format, *args):
113 -def debug(format, *args):
115 -def info(format, *args):
119 -def error(format, *args):
121
123 if os.sep in executable:
124 if os.access(os.path.abspath(executable), os.X_OK):
125 return os.path.abspath(executable)
126 elif os.getenv('PATH'):
127 for path in os.getenv('PATH').split(os.pathsep):
128 if os.access(os.path.join(path, executable), os.X_OK):
129 return os.path.join(path, executable)
130 raise CommandNotFoundException(executable)
131
133 - def __init__(self, process, expectedCode, actualCode):
134 Exception.__init__(self)
135 self.process = process
136 self.expected = expectedCode
137 self.actual = actualCode
139 return ('Expected exit code %r from %r, but got %r'
140 % (self.expected, self.process, self.actual))
141
147 return 'The process %r exited prematurely.' % self.process
148
154 return 'Command %r not found in the PATH.' % self.command
155
158 Exception.__init__(self)
159 self.processes = processes
161 return ('Processes still running at end of test: %r'
162 % (self.processes,))
163
168
170 return ('Timed out waiting for %r to exit with status %r'
171 % (self.process, self.status))
172
175 self.exitDeferred = defer.Deferred()
176 self.timedOut = False
177
179 return self.exitDeferred
180
181 - def timeout(self, process, status):
185
187 info('process ended with status %r, exit code %r', status, status.value.exitCode)
188 if self.timedOut:
189 warning('already timed out??')
190 print 'already timed out quoi?'
191 else:
192 info('process ended with status %r, exit code %r', status, status.value.exitCode)
193 self.exitDeferred.callback(status.value.exitCode)
194
196 NOT_STARTED, STARTED, STOPPED = 'NOT-STARTED', 'STARTED', 'STOPPED'
197
198 - def __init__(self, name, argv, testDir):
199 self.name = name
200 self.argv = (_which(argv[0]),) + argv[1:]
201 self.testDir = testDir
202
203 self.pid = None
204 self.protocol = None
205 self.state = self.NOT_STARTED
206 self._timeoutDC = None
207
208 log('created process object %r', self)
209
211 assert self.state == self.NOT_STARTED
212
213 self.protocol = ProcessProtocol()
214
215 stdout = open(os.path.join(self.testDir, self.name + '.stdout'), 'w')
216 stderr = open(os.path.join(self.testDir, self.name + '.stderr'), 'w')
217
218 childFDs = {1: stdout.fileno(), 2: stderr.fileno()}
219
220
221
222
223
224
225
226
227
228
229
230
231 info('spawning process %r, argv=%r', self, self.argv)
232 termHandler = signal.signal(signal.SIGTERM, signal.SIG_DFL)
233 env = dict(os.environ)
234 env['FLU_DEBUG'] = '5'
235 process = reactor.spawnProcess(self.protocol, self.argv[0],
236 env=env, args=self.argv,
237 childFDs=childFDs)
238 signal.signal(signal.SIGTERM, termHandler)
239
240 stdout.close()
241 stderr.close()
242
243
244
245
246 self.pid = process.pid
247 self.state = self.STARTED
248
249 def got_exit(res):
250 self.state = self.STOPPED
251 info('process %r has stopped', self)
252 return res
253 self.protocol.getDeferred().addCallback(got_exit)
254
255 - def kill(self, sig=signal.SIGTERM):
256 assert self.state == self.STARTED
257 info('killing process %r, signal %d', self, sig)
258 os.kill(self.pid, sig)
259
260 - def wait(self, status, timeout=20):
270 d.addCallback(got_exit)
271 if self.state == self.STARTED:
272 self._timeoutDC = reactor.callLater(timeout,
273 self.protocol.timeout,
274 self,
275 status)
276 def cancel_timeout(res):
277 debug('cancelling timeout for %r', self)
278 if self._timeoutDC.active():
279 self._timeoutDC.cancel()
280 return res
281 d.addCallbacks(cancel_timeout, cancel_timeout)
282 return d
283
285 return '<Process %s in state %s>' % (self.name, self.state)
286
288
289
291 self.processes = []
292 self.timeout = 20
293
294 - def spawn(self, process):
299
305
306 - def kill(self, process):
310
311 - def wait(self, process, exitCode):
312 assert process in self.processes
313 def remove_from_processes_list(_):
314 self.processes.remove(process)
315 d = process.wait(exitCode, timeout=self.timeout)
316 d.addCallback(remove_from_processes_list)
317 return d
318
333 p.protocol.processEnded = callbacker(d)
334 p.kill(sig=signal.SIGKILL)
335 d = defer.DeferredList(dlist)
336 def error(_):
337 if failure:
338 return failure
339 else:
340 raise e
341 d.addCallback(error)
342 return d
343 return failure
344
345 - def run(self, ops, timeout=20):
346 self.timeout = timeout
347 d = defer.Deferred()
348 def run_op(_, op):
349
350
351 return op[0](*op[1:])
352 for op in ops:
353 d.addCallback(run_op, op)
354 d.addCallbacks(lambda _: self._checkProcesses(failure=None),
355 lambda failure: self._checkProcesses(failure=failure))
356
357
358
359
360
361 reactor.callLater(0, d.callback, None)
362 return d
363
365 - def __init__(self, testCase, testName):
366 self.name = testName
367 self.testCaseName = testCase.__class__.__name__
368 self.processes = {}
369 self.outputDir = self._makeOutputDir(os.getcwd())
370
371
372
373 self.vm = PlanExecutor()
374 self.ops = []
375 self.timeout = 20
376
378
379 try:
380 os.mkdir(testDir)
381 except OSError:
382 pass
383 tail = '%s-%s' % (self.testCaseName, self.name)
384 outputDir = os.path.join(testDir, tail)
385 os.mkdir(outputDir)
386 return outputDir
387
389 for root, dirs, files in os.walk(self.outputDir, topdown=False):
390 for name in files:
391 os.remove(os.path.join(root, name))
392 for name in dirs:
393 os.rmdir(os.path.join(root, name))
394 os.rmdir(self.outputDir)
395 self.outputDir = None
396
407
410
413
414 - def spawn(self, command, *args):
418
420 processes = []
421 self._appendOp(self.vm.checkExits, ())
422 for argv in argvs:
423 assert isinstance(argv, tuple), \
424 'all arguments to spawnPar must be tuples'
425 for arg in argv:
426 assert isinstance(arg, str), \
427 'all subarguments to spawnPar must be strings'
428 processes.append(self._allocProcess(argv))
429 for process in processes:
430 self._appendOp(self.vm.spawn, process)
431 return tuple(processes)
432
433 - def wait(self, process, status):
435
436 - def waitPar(self, *processStatusPairs):
441
442 - def kill(self, process, status=None):
446
451
453 testName = proc.__name__
454 def wrappedtest(self):
455 plan = Plan(self, testName)
456 proc(self, plan)
457 return plan.execute()
458 try:
459 wrappedtest.__name__ = testName
460 except TypeError:
461
462 pass
463
464
465 wrappedtest.timeout = 666
466 return wrappedtest
467