Package flumotion :: Package twisted :: Module integration
[hide private]

Source Code for Module flumotion.twisted.integration

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Framework for writing automated integration tests. 
 24   
 25  This module provides a way of writing automated integration tests from 
 26  within Twisted's unit testing framework, trial. Test cases are 
 27  constructed as subclasses of the normal trial 
 28  L{twisted.trial.unittest.TestCase} class. 
 29   
 30  Integration tests look like normal test methods, except that they are 
 31  decorated with L{integration.test}, take an extra "plan" argument, and 
 32  do not return anything. For example:: 
 33   
 34    from twisted.trial import unittest 
 35    from flumotion.twisted import integration 
 36   
 37    class IntegrationTestExample(unittest.TestCase): 
 38        @integration.test 
 39        def testEchoFunctionality(self, plan): 
 40            process = plan.spawn('echo', 'hello world') 
 41            plan.wait(process, 0) 
 42   
 43  This example will spawn a process, as if you typed "echo 'hello world'" 
 44  at the shell prompt. It then waits for the process to exit, expecting 
 45  the exit status to be 0. 
 46   
 47  The example illustrates two of the fundamental plan operators, spawn and 
 48  wait. "spawn" spawns a process. "wait" waits for a process to finish. 
 49  The other operators are "spawnPar", which spawns a number of processes 
 50  in parallel, "waitPar", which waits for a number of processes in 
 51  parallel, and "kill", which kills one or more processes via SIGTERM and 
 52  then waits for them to exit. 
 53   
 54  It is evident that this framework is most appropriate for testing the 
 55  integration of multiple processes, and is not suitable for in-process 
 56  tests. The plan that is built up is only executed after the test method 
 57  exits, via the L{integration.test} decorator; the writer of the 
 58  integration test does not have access to the plan's state. 
 59   
 60  Note that all process exits must be anticipated. If at any point the 
 61  integration tester receives SIGCHLD, the next operation must be a wait 
 62  for that process. If this is not the case, the test is interpreted as 
 63  having failed. 
 64   
 65  Also note that while the test is running, the stdout and stderr of each 
 66  spawned process is redirected into log files in a subdirectory of where 
 67  the test is located. For example, in the previous example, the following 
 68  files will be created:: 
 69   
 70    $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stdout 
 71    $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stderr 
 72   
 73  In the case that multiple echo commands are run in the same plan, the 
 74  subsequent commands will be named as echo-1, echo-2, and the like. Upon 
 75  successful completion of the test case, the log directory will be 
 76  deleted. 
 77  """ 
 78   
 79  import os 
 80  import signal 
 81   
 82  from twisted.python import failure 
 83  from twisted.internet import reactor, protocol, defer 
 84  from flumotion.common import log as flog 
 85   
 86  __version__ = "$Rev: 6561 $" 
 87   
 88   
 89  # Twisted's reactor.iterate() is defined like this: 
 90  # 
 91  #     def iterate(self, delay=0): 
 92  #        """See twisted.internet.interfaces.IReactorCore.iterate. 
 93  #        """ 
 94  #        self.runUntilCurrent() 
 95  #        self.doIteration(delay) 
 96  # 
 97  # runUntilCurrent runs all the procs on the threadCallQueue. So if 
 98  # something is added to the threadCallQueue between runUntilCurrent() 
 99  # and doIteration(), the reactor needs to have an fd ready for reading 
100  # to shortcut the select(). This is done by callFromThread() calling 
101  # reactor.wakeUp(), which will write on the wakeup FD. 
102  # 
103  # HOWEVER. For some reason reactor.wakeUp() only writes on the fd if it 
104  # is being called from another thread. This is obviously borked in the 
105  # signal-handling case, when a signal arrives between runUntilCurrent() 
106  # and doIteration(), and is processed via reactor.callFromThread(), as 
107  # is the case with SIGCHLD. So we monkeypatch the reactor to always wake 
108  # the waker. This is twisted bug #1997. 
109  reactor.wakeUp = lambda: reactor.waker and reactor.waker.wakeUp() 
110   
111 -def log(format, *args):
112 flog.doLog(flog.LOG, None, 'integration', format, args, -2)
113 -def debug(format, *args):
114 flog.doLog(flog.DEBUG, None, 'integration', format, args, -2)
115 -def info(format, *args):
116 flog.doLog(flog.INFO, None, 'integration', format, args, -2)
117 -def warning(format, *args):
118 flog.doLog(flog.WARN, None, 'integration', format, args, -2)
119 -def error(format, *args):
120 flog.doLog(flog.ERROR, None, 'integration', format, args, -2)
121
122 -def _which(executable):
123 if os.sep in executable: 124 if os.access(os.path.abspath(executable), os.X_OK): 125 return os.path.abspath(executable) 126 elif os.getenv('PATH'): 127 for path in os.getenv('PATH').split(os.pathsep): 128 if os.access(os.path.join(path, executable), os.X_OK): 129 return os.path.join(path, executable) 130 raise CommandNotFoundException(executable)
131
132 -class UnexpectedExitCodeException(Exception):
133 - def __init__(self, process, expectedCode, actualCode):
134 Exception.__init__(self) 135 self.process = process 136 self.expected = expectedCode 137 self.actual = actualCode
138 - def __str__(self):
139 return ('Expected exit code %r from %r, but got %r' 140 % (self.expected, self.process, self.actual))
141
142 -class UnexpectedExitException(Exception):
143 - def __init__(self, process):
144 Exception.__init__(self) 145 self.process = process
146 - def __str__(self):
147 return 'The process %r exited prematurely.' % self.process
148
149 -class CommandNotFoundException(Exception):
150 - def __init__(self, command):
151 Exception.__init__(self) 152 self.command = command
153 - def __str__(self):
154 return 'Command %r not found in the PATH.' % self.command
155
156 -class ProcessesStillRunningException(Exception):
157 - def __init__(self, processes):
158 Exception.__init__(self) 159 self.processes = processes
160 - def __str__(self):
161 return ('Processes still running at end of test: %r' 162 % (self.processes,))
163
164 -class TimeoutException(Exception):
165 - def __init__(self, process, status):
166 self.process = process 167 self.status = status
168
169 - def __str__(self):
170 return ('Timed out waiting for %r to exit with status %r' 171 % (self.process, self.status))
172
173 -class ProcessProtocol(protocol.ProcessProtocol):
174 - def __init__(self):
175 self.exitDeferred = defer.Deferred() 176 self.timedOut = False
177
178 - def getDeferred(self):
179 return self.exitDeferred
180
181 - def timeout(self, process, status):
182 info('forcing timeout for process protocol %r', self) 183 self.timedOut = True 184 self.exitDeferred.errback(TimeoutException(process, status))
185
186 - def processEnded(self, status):
187 info('process ended with status %r, exit code %r', status, status.value.exitCode) 188 if self.timedOut: 189 warning('already timed out??') 190 print 'already timed out quoi?' 191 else: 192 info('process ended with status %r, exit code %r', status, status.value.exitCode) 193 self.exitDeferred.callback(status.value.exitCode)
194
195 -class Process:
196 NOT_STARTED, STARTED, STOPPED = 'NOT-STARTED', 'STARTED', 'STOPPED' 197
198 - def __init__(self, name, argv, testDir):
199 self.name = name 200 self.argv = (_which(argv[0]),) + argv[1:] 201 self.testDir = testDir 202 203 self.pid = None 204 self.protocol = None 205 self.state = self.NOT_STARTED 206 self._timeoutDC = None 207 208 log('created process object %r', self)
209
210 - def start(self):
211 assert self.state == self.NOT_STARTED 212 213 self.protocol = ProcessProtocol() 214 215 stdout = open(os.path.join(self.testDir, self.name + '.stdout'), 'w') 216 stderr = open(os.path.join(self.testDir, self.name + '.stderr'), 'w') 217 # don't give it a stdin, output to log files 218 childFDs = {1: stdout.fileno(), 2: stderr.fileno()} 219 # There's a race condition in twisted.internet.process, whereby 220 # signals received between the fork() and exec() in the child 221 # are handled with the twisted handlers, i.e. postponed, but 222 # they never get called because of the exec(). The end is they 223 # are ignored. 224 # 225 # So, work around that by resetting the sigterm handler to the 226 # default so if we self.kill() immediately after self.start(), 227 # that the subprocess won't ignore the signal. This is a window 228 # in the parent in which SIGTERM will cause immediate 229 # termination instead of the twisted nice termination, but 230 # that's better than the kid missing the signal. 231 info('spawning process %r, argv=%r', self, self.argv) 232 termHandler = signal.signal(signal.SIGTERM, signal.SIG_DFL) 233 env = dict(os.environ) 234 env['FLU_DEBUG'] = '5' 235 process = reactor.spawnProcess(self.protocol, self.argv[0], 236 env=env, args=self.argv, 237 childFDs=childFDs) 238 signal.signal(signal.SIGTERM, termHandler) 239 # close our handles on the log files 240 stdout.close() 241 stderr.close() 242 243 # it's possible the process *already* exited, from within the 244 # spawnProcess itself. So set our state to STARTED, *then* 245 # attach the callback. 246 self.pid = process.pid 247 self.state = self.STARTED 248 249 def got_exit(res): 250 self.state = self.STOPPED 251 info('process %r has stopped', self) 252 return res
253 self.protocol.getDeferred().addCallback(got_exit)
254
255 - def kill(self, sig=signal.SIGTERM):
256 assert self.state == self.STARTED 257 info('killing process %r, signal %d', self, sig) 258 os.kill(self.pid, sig)
259
260 - def wait(self, status, timeout=20):
261 assert self.state != self.NOT_STARTED 262 info('waiting for process %r to exit', self) 263 d = self.protocol.getDeferred() 264 def got_exit(res): 265 debug('process %r exited with status %r', self, res) 266 if res != status: 267 warning('expected exit code %r for process %r, but got %r', 268 status, self, res) 269 raise UnexpectedExitCodeException(self, status, res)
270 d.addCallback(got_exit) 271 if self.state == self.STARTED: 272 self._timeoutDC = reactor.callLater(timeout, 273 self.protocol.timeout, 274 self, 275 status) 276 def cancel_timeout(res): 277 debug('cancelling timeout for %r', self) 278 if self._timeoutDC.active(): 279 self._timeoutDC.cancel() 280 return res 281 d.addCallbacks(cancel_timeout, cancel_timeout) 282 return d 283
284 - def __repr__(self):
285 return '<Process %s in state %s>' % (self.name, self.state)
286
287 -class PlanExecutor:
288 # both the vm and its ops 289
290 - def __init__(self):
291 self.processes = [] 292 self.timeout = 20
293
294 - def spawn(self, process):
295 assert process not in self.processes 296 self.processes.append(process) 297 process.start() 298 return defer.succeed(True)
299
300 - def checkExits(self, expectedExits):
301 for process in self.processes: 302 if (process.state != process.STARTED 303 and process not in expectedExits): 304 raise UnexpectedExitException(process)
305
306 - def kill(self, process):
307 assert process in self.processes 308 process.kill() 309 return defer.succeed(True)
310
311 - def wait(self, process, exitCode):
312 assert process in self.processes 313 def remove_from_processes_list(_): 314 self.processes.remove(process)
315 d = process.wait(exitCode, timeout=self.timeout) 316 d.addCallback(remove_from_processes_list) 317 return d
318
319 - def _checkProcesses(self, failure=None):
320 if self.processes: 321 warning('processes still running at end of test: %r', 322 self.processes) 323 e = ProcessesStillRunningException(self.processes) 324 dlist = [] 325 # reap all processes, and once we have them reaped, errback 326 for p in self.processes: 327 if p.state != p.STARTED: 328 continue 329 d = defer.Deferred() 330 dlist.append(d) 331 def callbacker(d): 332 return lambda status: d.callback(status.value.exitCode)
333 p.protocol.processEnded = callbacker(d) 334 p.kill(sig=signal.SIGKILL) 335 d = defer.DeferredList(dlist) 336 def error(_): 337 if failure: 338 return failure 339 else: 340 raise e 341 d.addCallback(error) 342 return d 343 return failure 344
345 - def run(self, ops, timeout=20):
346 self.timeout = timeout 347 d = defer.Deferred() 348 def run_op(_, op): 349 # print 'Last result: %r' % (_,) 350 # print 'Now running: %s(%r)' % (op[0].__name__, op[1:]) 351 return op[0](*op[1:])
352 for op in ops: 353 d.addCallback(run_op, op) 354 d.addCallbacks(lambda _: self._checkProcesses(failure=None), 355 lambda failure: self._checkProcesses(failure=failure)) 356 357 # We should only spawn processes when twisted has set up its 358 # sighandlers. It does that *after* firing the reactor startup 359 # event and before entering the reactor loop. So, make sure 360 # twisted is ready for us by firing the plan in a callLater. 361 reactor.callLater(0, d.callback, None) 362 return d 363
364 -class Plan:
365 - def __init__(self, testCase, testName):
366 self.name = testName 367 self.testCaseName = testCase.__class__.__name__ 368 self.processes = {} 369 self.outputDir = self._makeOutputDir(os.getcwd()) 370 371 # put your boots on monterey jacks, cause this gravy just made a 372 # virtual machine whose instructions are python methods 373 self.vm = PlanExecutor() 374 self.ops = [] 375 self.timeout = 20
376
377 - def _makeOutputDir(self, testDir):
378 # ensure that testDir exists 379 try: 380 os.mkdir(testDir) 381 except OSError: 382 pass 383 tail = '%s-%s' % (self.testCaseName, self.name) 384 outputDir = os.path.join(testDir, tail) 385 os.mkdir(outputDir) 386 return outputDir
387
388 - def _cleanOutputDir(self):
389 for root, dirs, files in os.walk(self.outputDir, topdown=False): 390 for name in files: 391 os.remove(os.path.join(root, name)) 392 for name in dirs: 393 os.rmdir(os.path.join(root, name)) 394 os.rmdir(self.outputDir) 395 self.outputDir = None
396
397 - def _allocProcess(self, args):
398 command = args[0] 399 name = command 400 i = 0 401 while name in self.processes: 402 i += 1 403 name = '%s-%d' % (command, i) 404 process = Process(name, args, self.outputDir) 405 self.processes[name] = process 406 return process
407
408 - def _appendOp(self, *args):
409 self.ops.append(args)
410
411 - def setTimeout(self, timeout):
412 self.timeout = timeout
413
414 - def spawn(self, command, *args):
415 allArgs = (command,) + args 416 process, = self.spawnPar(allArgs) 417 return process
418
419 - def spawnPar(self, *argvs):
420 processes = [] 421 self._appendOp(self.vm.checkExits, ()) 422 for argv in argvs: 423 assert isinstance(argv, tuple), \ 424 'all arguments to spawnPar must be tuples' 425 for arg in argv: 426 assert isinstance(arg, str), \ 427 'all subarguments to spawnPar must be strings' 428 processes.append(self._allocProcess(argv)) 429 for process in processes: 430 self._appendOp(self.vm.spawn, process) 431 return tuple(processes)
432
433 - def wait(self, process, status):
434 self.waitPar((process, status))
435
436 - def waitPar(self, *processStatusPairs):
437 processes = tuple([p for p,s in processStatusPairs]) 438 self._appendOp(self.vm.checkExits, processes) 439 for process, status in processStatusPairs: 440 self._appendOp(self.vm.wait, process, status)
441
442 - def kill(self, process, status=None):
443 self._appendOp(self.vm.checkExits, ()) 444 self._appendOp(self.vm.kill, process) 445 self._appendOp(self.vm.wait, process, status)
446
447 - def execute(self):
448 d = self.vm.run(self.ops, timeout=self.timeout) 449 d.addCallback(lambda _: self._cleanOutputDir()) 450 return d
451
452 -def test(proc):
453 testName = proc.__name__ 454 def wrappedtest(self): 455 plan = Plan(self, testName) 456 proc(self, plan) 457 return plan.execute()
458 try: 459 wrappedtest.__name__ = testName 460 except TypeError: 461 # can only set procedure names in python >= 2.4 462 pass 463 # trial seems to require a timeout, at least in twisted 2.4, so give 464 # it a nice one 465 wrappedtest.timeout = 666 466 return wrappedtest 467