Package flumotion :: Package twisted :: Module integration
[hide private]

Source Code for Module flumotion.twisted.integration

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import os 
 23  import signal 
 24   
 25  from twisted.python import failure 
 26  import twisted.copyright 
 27  from twisted.internet import reactor, protocol, defer 
 28  from flumotion.common import log as flog 
 29   
 30  """ 
 31  Framework for writing automated integration tests. 
 32   
 33  This module provides a way of writing automated integration tests from 
 34  within Twisted's unit testing framework, trial. Test cases are 
 35  constructed as subclasses of the normal trial 
 36  L{twisted.trial.unittest.TestCase} class. 
 37   
 38  Integration tests look like normal test methods, except that they are 
 39  decorated with L{integration.test}, take an extra "plan" argument, and 
 40  do not return anything. For example: 
 41   
 42    from twisted.trial import unittest 
 43    from flumotion.twisted import integration 
 44   
 45    class IntegrationTestExample(unittest.TestCase): 
 46        @integration.test 
 47        def testEchoFunctionality(self, plan): 
 48            process = plan.spawn('echo', 'hello world') 
 49            plan.wait(process, 0) 
 50   
 51  This example will spawn a process, as if you typed "echo 'hello world'" 
 52  at the shell prompt. It then waits for the process to exit, expecting 
 53  the exit status to be 0. 
 54   
 55  The example illustrates two of the fundamental plan operators, spawn and 
 56  wait. "spawn" spawns a process. "wait" waits for a process to finish. 
 57  The other operators are "spawnPar", which spawns a number of processes 
 58  in parallel, "waitPar", which waits for a number of processes in 
 59  parallel, and "kill", which kills one or more processes via SIGTERM and 
 60  then waits for them to exit. 
 61   
 62  It is evident that this framework is most appropriate for testing the 
 63  integration of multiple processes, and is not suitable for in-process 
 64  tests. The plan that is built up is only executed after the test method 
 65  exits, via the L{integration.test} decorator; the writer of the 
 66  integration test does not have access to the plan's state. 
 67   
 68  Note that all process exits must be anticipated. If at any point the 
 69  integration tester receives SIGCHLD, the next operation must be a wait 
 70  for that process. If this is not the case, the test is interpreted as 
 71  having failed. 
 72   
 73  Also note that while the test is running, the stdout and stderr of each 
 74  spawned process is redirected into log files in a subdirectory of where 
 75  the test is located. For example, in the previous example, the following 
 76  files will be created: 
 77   
 78    $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stdout 
 79    $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stderr 
 80   
 81  In the case that multiple echo commands are run in the same plan, the 
 82  subsequent commands will be named as echo-1, echo-2, and the like. Upon 
 83  successful completion of the test case, the log directory will be 
 84  deleted. 
 85  """ 
 86   
 87  # Twisted's reactor.iterate() is defined like this: 
 88  # 
 89  #     def iterate(self, delay=0): 
 90  #        """See twisted.internet.interfaces.IReactorCore.iterate. 
 91  #        """ 
 92  #        self.runUntilCurrent() 
 93  #        self.doIteration(delay) 
 94  # 
 95  # runUntilCurrent runs all the procs on the threadCallQueue. So if 
 96  # something is added to the threadCallQueue between runUntilCurrent() 
 97  # and doIteration(), the reactor needs to have an fd ready for reading 
 98  # to shortcut the select(). This is done by callFromThread() calling 
 99  # reactor.wakeUp(), which will write on the wakeup FD. 
100  # 
101  # HOWEVER. For some reason reactor.wakeUp() only writes on the fd if it 
102  # is being called from another thread. This is obviously borked in the 
103  # signal-handling case, when a signal arrives between runUntilCurrent() 
104  # and doIteration(), and is processed via reactor.callFromThread(), as 
105  # is the case with SIGCHLD. So we monkeypatch the reactor to always wake 
106  # the waker. This is twisted bug #1997. 
107  reactor.wakeUp = lambda: reactor.waker and reactor.waker.wakeUp() 
108   
109 -def log(format, *args):
110 flog.doLog(flog.LOG, None, 'integration', format, args, -2)
111 -def debug(format, *args):
112 flog.doLog(flog.DEBUG, None, 'integration', format, args, -2)
113 -def info(format, *args):
114 flog.doLog(flog.INFO, None, 'integration', format, args, -2)
115 -def warning(format, *args):
116 flog.doLog(flog.WARN, None, 'integration', format, args, -2)
117 -def error(format, *args):
118 flog.doLog(flog.ERROR, None, 'integration', format, args, -2)
119
120 -def _which(executable):
121 if os.sep in executable: 122 if os.access(os.path.abspath(executable), os.X_OK): 123 return os.path.abspath(executable) 124 elif os.getenv('PATH'): 125 for path in os.getenv('PATH').split(os.pathsep): 126 if os.access(os.path.join(path, executable), os.X_OK): 127 return os.path.join(path, executable) 128 raise CommandNotFoundException(executable)
129
130 -class UnexpectedExitCodeException(Exception):
131 - def __init__(self, process, expectedCode, actualCode):
132 Exception.__init__(self) 133 self.process = process 134 self.expected = expectedCode 135 self.actual = actualCode
136 - def __str__(self):
137 return ('Expected exit code %r from %r, but got %r' 138 % (self.expected, self.process, self.actual))
139
140 -class UnexpectedExitException(Exception):
141 - def __init__(self, process):
142 Exception.__init__(self) 143 self.process = process
144 - def __str__(self):
145 return 'The process %r exited prematurely.' % self.process
146
147 -class CommandNotFoundException(Exception):
148 - def __init__(self, command):
149 Exception.__init__(self) 150 self.command = command
151 - def __str__(self):
152 return 'Command %r not found in the PATH.' % self.command
153
154 -class ProcessesStillRunningException(Exception):
155 - def __init__(self, processes):
156 Exception.__init__(self) 157 self.processes = processes
158 - def __str__(self):
159 return ('Processes still running at end of test: %r' 160 % (self.processes,))
161
162 -class TimeoutException(Exception):
163 - def __init__(self, process, status):
164 self.process = process 165 self.status = status
166
167 - def __str__(self):
168 return ('Timed out waiting for %r to exit with status %r' 169 % (self.process, self.status))
170
171 -class ProcessProtocol(protocol.ProcessProtocol):
172 - def __init__(self):
173 self.exitDeferred = defer.Deferred() 174 self.timedOut = False
175
176 - def getDeferred(self):
177 return self.exitDeferred
178
179 - def timeout(self, process, status):
180 info('forcing timeout for process protocol %r', self) 181 self.timedOut = True 182 self.exitDeferred.errback(TimeoutException(process, status))
183
184 - def processEnded(self, status):
185 info('process ended with status %r, exit code %r', status, status.value.exitCode) 186 if self.timedOut: 187 warning('already timed out??') 188 print 'already timed out quoi?' 189 else: 190 info('process ended with status %r, exit code %r', status, status.value.exitCode) 191 self.exitDeferred.callback(status.value.exitCode)
192
193 -class Process:
194 NOT_STARTED, STARTED, STOPPED = 'NOT-STARTED', 'STARTED', 'STOPPED' 195
196 - def __init__(self, name, argv, testDir):
197 self.name = name 198 self.argv = (_which(argv[0]),) + argv[1:] 199 self.testDir = testDir 200 201 self.pid = None 202 self.protocol = None 203 self.state = self.NOT_STARTED 204 self._timeoutDC = None 205 206 log('created process object %r', self)
207
208 - def start(self):
209 assert self.state == self.NOT_STARTED 210 211 self.protocol = ProcessProtocol() 212 213 stdout = open(os.path.join(self.testDir, self.name + '.stdout'), 'w') 214 stderr = open(os.path.join(self.testDir, self.name + '.stderr'), 'w') 215 # don't give it a stdin, output to log files 216 childFDs = {1: stdout.fileno(), 2: stderr.fileno()} 217 # There's a race condition in twisted.internet.process, whereby 218 # signals received between the fork() and exec() in the child 219 # are handled with the twisted handlers, i.e. postponed, but 220 # they never get called because of the exec(). The end is they 221 # are ignored. 222 # 223 # So, work around that by resetting the sigterm handler to the 224 # default so if we self.kill() immediately after self.start(), 225 # that the subprocess won't ignore the signal. This is a window 226 # in the parent in which SIGTERM will cause immediate 227 # termination instead of the twisted nice termination, but 228 # that's better than the kid missing the signal. 229 info('spawning process %r, argv=%r', self, self.argv) 230 termHandler = signal.signal(signal.SIGTERM, signal.SIG_DFL) 231 env = dict(os.environ) 232 env['FLU_DEBUG'] = '5' 233 process = reactor.spawnProcess(self.protocol, self.argv[0], 234 env=env, args=self.argv, 235 childFDs=childFDs) 236 signal.signal(signal.SIGTERM, termHandler) 237 # close our handles on the log files 238 stdout.close() 239 stderr.close() 240 241 # it's possible the process *already* exited, from within the 242 # spawnProcess itself. So set our state to STARTED, *then* 243 # attach the callback. 244 self.pid = process.pid 245 self.state = self.STARTED 246 247 def got_exit(res): 248 self.state = self.STOPPED 249 info('process %r has stopped', self) 250 return res
251 self.protocol.getDeferred().addCallback(got_exit)
252
253 - def kill(self, sig=signal.SIGTERM):
254 assert self.state == self.STARTED 255 info('killing process %r, signal %d', self, sig) 256 os.kill(self.pid, sig)
257
258 - def wait(self, status, timeout=20):
259 assert self.state != self.NOT_STARTED 260 info('waiting for process %r to exit', self) 261 d = self.protocol.getDeferred() 262 def got_exit(res): 263 debug('process %r exited with status %r', self, res) 264 if res != status: 265 warning('expected exit code %r for process %r, but got %r', 266 status, self, res) 267 raise UnexpectedExitCodeException(self, status, res)
268 d.addCallback(got_exit) 269 if self.state == self.STARTED: 270 self._timeoutDC = reactor.callLater(timeout, 271 self.protocol.timeout, 272 self, 273 status) 274 def cancel_timeout(res): 275 debug('cancelling timeout for %r', self) 276 if self._timeoutDC.active(): 277 self._timeoutDC.cancel() 278 return res 279 d.addCallbacks(cancel_timeout, cancel_timeout) 280 return d 281
282 - def __repr__(self):
283 return '<Process %s in state %s>' % (self.name, self.state)
284
285 -class PlanExecutor:
286 # both the vm and its ops 287
288 - def __init__(self):
289 self.processes = [] 290 self.timeout = 20
291
292 - def spawn(self, process):
293 assert process not in self.processes 294 self.processes.append(process) 295 process.start() 296 return defer.succeed(True)
297
298 - def checkExits(self, expectedExits):
299 for process in self.processes: 300 if (process.state != process.STARTED 301 and process not in expectedExits): 302 raise UnexpectedExitException(process)
303
304 - def kill(self, process):
305 assert process in self.processes 306 process.kill() 307 return defer.succeed(True)
308
309 - def wait(self, process, exitCode):
310 assert process in self.processes 311 def remove_from_processes_list(_): 312 self.processes.remove(process)
313 d = process.wait(exitCode, timeout=self.timeout) 314 d.addCallback(remove_from_processes_list) 315 return d
316
317 - def _checkProcesses(self, failure=None):
318 if self.processes: 319 warning('processes still running at end of test: %r', 320 self.processes) 321 e = ProcessesStillRunningException(self.processes) 322 dlist = [] 323 # reap all processes, and once we have them reaped, errback 324 for p in self.processes: 325 if p.state != p.STARTED: 326 continue 327 d = defer.Deferred() 328 dlist.append(d) 329 def callbacker(d): 330 return lambda status: d.callback(status.value.exitCode)
331 p.protocol.processEnded = callbacker(d) 332 p.kill(sig=signal.SIGKILL) 333 d = defer.DeferredList(dlist) 334 def error(_): 335 if failure: 336 return failure 337 else: 338 raise e 339 d.addCallback(error) 340 return d 341 return failure 342
343 - def run(self, ops, timeout=20):
344 self.timeout = timeout 345 d = defer.Deferred() 346 def run_op(_, op): 347 # print 'Last result: %r' % (_,) 348 # print 'Now running: %s(%r)' % (op[0].__name__, op[1:]) 349 return op[0](*op[1:])
350 for op in ops: 351 d.addCallback(run_op, op) 352 d.addCallbacks(lambda _: self._checkProcesses(failure=None), 353 lambda failure: self._checkProcesses(failure=failure)) 354 355 # We should only spawn processes when twisted has set up its 356 # sighandlers. It does that *after* firing the reactor startup 357 # event and before entering the reactor loop. So, make sure 358 # twisted is ready for us by firing the plan in a callLater. 359 reactor.callLater(0, d.callback, None) 360 return d 361
362 -class Plan:
363 - def __init__(self, testCase, testName):
364 self.name = testName 365 self.testCaseName = testCase.__class__.__name__ 366 self.processes = {} 367 self.outputDir = self._makeOutputDir(os.getcwd()) 368 369 # put your boots on monterey jacks, cause this gravy just made a 370 # virtual machine whose instructions are python methods 371 self.vm = PlanExecutor() 372 self.ops = [] 373 self.timeout = 20
374
375 - def _makeOutputDir(self, testDir):
376 # ensure that testDir exists 377 try: 378 os.mkdir(testDir) 379 except OSError: 380 pass 381 tail = '%s-%s' % (self.testCaseName, self.name) 382 outputDir = os.path.join(testDir, tail) 383 os.mkdir(outputDir) 384 return outputDir
385
386 - def _cleanOutputDir(self):
387 for root, dirs, files in os.walk(self.outputDir, topdown=False): 388 for name in files: 389 os.remove(os.path.join(root, name)) 390 for name in dirs: 391 os.rmdir(os.path.join(root, name)) 392 os.rmdir(self.outputDir) 393 self.outputDir = None
394
395 - def _allocProcess(self, args):
396 command = args[0] 397 name = command 398 i = 0 399 while name in self.processes: 400 i += 1 401 name = '%s-%d' % (command, i) 402 process = Process(name, args, self.outputDir) 403 self.processes[name] = process 404 return process
405
406 - def _appendOp(self, *args):
407 self.ops.append(args)
408
409 - def setTimeout(self, timeout):
410 self.timeout = timeout
411
412 - def spawn(self, command, *args):
413 allArgs = (command,) + args 414 process, = self.spawnPar(allArgs) 415 return process
416
417 - def spawnPar(self, *argvs):
418 processes = [] 419 self._appendOp(self.vm.checkExits, ()) 420 for argv in argvs: 421 assert isinstance(argv, tuple), \ 422 'all arguments to spawnPar must be tuples' 423 for arg in argv: 424 assert isinstance(arg, str), \ 425 'all subarguments to spawnPar must be strings' 426 processes.append(self._allocProcess(argv)) 427 for process in processes: 428 self._appendOp(self.vm.spawn, process) 429 return tuple(processes)
430
431 - def wait(self, process, status):
432 self.waitPar((process, status))
433
434 - def waitPar(self, *processStatusPairs):
435 processes = tuple([p for p,s in processStatusPairs]) 436 self._appendOp(self.vm.checkExits, processes) 437 for process, status in processStatusPairs: 438 self._appendOp(self.vm.wait, process, status)
439
440 - def kill(self, process, status=None):
441 self._appendOp(self.vm.checkExits, ()) 442 self._appendOp(self.vm.kill, process) 443 self._appendOp(self.vm.wait, process, status)
444
445 - def execute(self):
446 d = self.vm.run(self.ops, timeout=self.timeout) 447 d.addCallback(lambda _: self._cleanOutputDir()) 448 return d
449
450 -def test(proc):
451 testName = proc.__name__ 452 def wrappedtest(self): 453 plan = Plan(self, testName) 454 proc(self, plan) 455 if twisted.copyright.version < '2.0': 456 # FIXME T1.3 457 info('using deferredResult for old trial') 458 from twisted.trial import unittest 459 return unittest.deferredResult(plan.execute()) 460 else: 461 return plan.execute()
462 try: 463 wrappedtest.__name__ = testName 464 except Exception: 465 # can only set procedure names in python >= 2.4 466 pass 467 # trial seems to require a timeout, at least in twisted 2.4, so give 468 # it a nice one 469 wrappedtest.timeout = 666 470 return wrappedtest 471