Package flumotion :: Package service :: Module service
[hide private]

Source Code for Module flumotion.service.service

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This program is free software; you can redistribute it and/or modify 
  9  # it under the terms of the GNU General Public License as published by 
 10  # the Free Software Foundation; either version 2 of the License, or 
 11  # (at your option) any later version. 
 12  # See "LICENSE.GPL" in the source distribution for more information. 
 13   
 14  # This program is also licensed under the Flumotion license. 
 15  # See "LICENSE.Flumotion" in the source distribution for more information. 
 16   
 17  """ 
 18  Servicer object used in service scripts 
 19  """ 
 20   
 21  import os 
 22  import glob 
 23  import time 
 24   
 25  from flumotion.configure import configure 
 26  from flumotion.common import errors, log 
 27  from flumotion.common.python import makedirs 
 28  from flumotion.common.process import checkPidRunning, deletePidFile, getPid, \ 
 29       killPid, termPid, waitPidFile 
 30   
 31  __version__ = "$Rev: 6988 $" 
 32   
 33   
34 -class Servicer(log.Loggable):
35 """ 36 I manage running managers and workers on behalf of a service script. 37 """ 38 39 logCategory = 'servicer' 40
41 - def __init__(self, configDir=None, logDir=None, runDir=None):
42 """ 43 @type configDir: string 44 @param configDir: overridden path to the configuration directory. 45 @type logDir: string 46 @param logDir: overridden path to the log directory. 47 @type runDir: string 48 @param runDir: overridden path to the run directory. 49 """ 50 self.managersDir = os.path.join(configure.configdir, 'managers') 51 self.workersDir = os.path.join(configure.configdir, 'workers') 52 self._overrideDir = { 53 'logdir': logDir, 54 'rundir': runDir, 55 }
56
57 - def _parseManagersWorkers(self, command, args):
58 # parse the given args and return two sorted lists; 59 # one of manager names to act on and one of worker names 60 managers = [] 61 workers = [] 62 63 if not args: 64 managers = self.getManagers().keys() 65 managers.sort() 66 workers = self.getWorkers() 67 workers.sort() 68 return (managers, workers) 69 70 which = args[0] 71 if which not in ['manager', 'worker']: 72 raise errors.FatalError, 'Please specify either manager or worker' 73 74 if len(args) < 2: 75 raise errors.FatalError, 'Please specify which %s to %s' % ( 76 which, command) 77 78 name = args[1] 79 if which == 'manager': 80 managers = self.getManagers() 81 if not managers.has_key(name): 82 raise errors.FatalError, 'No manager "%s"' % name 83 managers = [name, ] 84 elif which == 'worker': 85 workers = self.getWorkers() 86 if not name in workers: 87 raise errors.FatalError, 'No worker with name %s' % name 88 workers = [name, ] 89 90 return (managers, workers)
91
92 - def _getDirOptions(self):
93 """ 94 Return a list of override directories for configure.configure 95 suitable for appending to a command line. 96 """ 97 args = [] 98 for key, value in self._overrideDir.items(): 99 if value: 100 args.append('--%s=%s' % (key, value)) 101 return " ".join(args)
102
103 - def getManagers(self):
104 """ 105 @returns: a dictionary of manager names -> flow names 106 """ 107 managers = {} 108 109 self.log('getManagers()') 110 if not os.path.exists(self.managersDir): 111 return managers 112 113 for managerDir in glob.glob(os.path.join(self.managersDir, '*')): 114 flows = [] # names of flows 115 # find flow files 116 flowsDir = os.path.join(managerDir, 'flows') 117 if os.path.exists(flowsDir): 118 flowFiles = glob.glob(os.path.join(flowsDir, '*.xml')) 119 for flowFile in flowFiles: 120 filename = os.path.split(flowFile)[1] 121 name = filename.split(".xml")[0] 122 flows.append(name) 123 managerName = os.path.split(managerDir)[1] 124 self.log('Adding flows %r to manager %s' % (flows, managerName)) 125 managers[managerName] = flows 126 self.log('returning managers: %r' % managers) 127 return managers
128
129 - def getWorkers(self):
130 """ 131 @returns: a list of worker names 132 """ 133 workers = [] 134 135 if not os.path.exists(self.workersDir): 136 return workers 137 138 for workerFile in glob.glob(os.path.join(self.workersDir, '*.xml')): 139 filename = os.path.split(workerFile)[1] 140 name = filename.split(".xml")[0] 141 workers.append(name) 142 workers.sort() 143 return workers
144
145 - def start(self, args):
146 """ 147 Start processes as given in the args. 148 149 If nothing specified, start all managers and workers. 150 If first argument is "manager", start given manager. 151 If first argument is "worker", start given worker. 152 153 @returns: an exit value reflecting the number of processes that failed 154 to start 155 """ 156 (managers, workers) = self._parseManagersWorkers('start', args) 157 self.debug("Start managers %r and workers %r" % (managers, workers)) 158 managersDict = self.getManagers() 159 exitvalue = 0 160 161 for name in managers: 162 if not self.startManager(name, managersDict[name]): 163 exitvalue += 1 164 for name in workers: 165 if not self.startWorker(name): 166 exitvalue += 1 167 168 return exitvalue
169
170 - def stop(self, args):
171 """ 172 Stop processes as given in the args. 173 174 If nothing specified, stop all managers and workers. 175 If first argument is "manager", stop given manager. 176 If first argument is "worker", stop given worker. 177 178 @returns: an exit value reflecting the number of processes that failed 179 to stop 180 """ 181 (managers, workers) = self._parseManagersWorkers('stop', args) 182 self.debug("Stop managers %r and workers %r" % (managers, workers)) 183 184 exitvalue = 0 185 186 for name in workers: 187 if not self.stopWorker(name): 188 exitvalue += 1 189 for name in managers: 190 if not self.stopManager(name): 191 exitvalue += 1 192 193 return exitvalue
194
195 - def status(self, args):
196 """ 197 Give status on processes as given in the args. 198 """ 199 (managers, workers) = self._parseManagersWorkers('status', args) 200 self.debug("Status managers %r and workers %r" % (managers, workers)) 201 for kind, names in [('manager', managers), ('worker', workers)]: 202 for name in names: 203 pid = getPid(kind, name) 204 if not pid: 205 print "%s %s not running" % (kind, name) 206 continue 207 if checkPidRunning(pid): 208 print "%s %s is running with pid %d" % (kind, name, pid) 209 else: 210 print "%s %s dead (stale pid %d)" % (kind, name, pid)
211
212 - def clean(self, args):
213 """ 214 Clean up dead process pid files as given in the args. 215 """ 216 (managers, workers) = self._parseManagersWorkers('clean', args) 217 self.debug("Clean managers %r and workers %r" % (managers, workers)) 218 for kind, names in [('manager', managers), ('worker', workers)]: 219 for name in names: 220 pid = getPid(kind, name) 221 if not pid: 222 # may be a file that contains bogus data 223 print "deleting bogus pid file for %s %s" % (kind, name) 224 deletePidFile(kind, name) 225 continue 226 if not checkPidRunning(pid): 227 self.debug("Cleaning up stale pid %d for %s %s" % ( 228 pid, kind, name)) 229 print "deleting stale pid file for %s %s" % (kind, name) 230 deletePidFile(kind, name)
231
232 - def condrestart(self, args):
233 """ 234 Restart running processes as given in the args. 235 236 If nothing specified, condrestart all managers and workers. 237 If first argument is "manager", condrestart given manager. 238 If first argument is "worker", condrestart given worker. 239 240 @returns: an exit value reflecting the number of processes that failed 241 to start 242 """ 243 (managers, workers) = self._parseManagersWorkers('condrestart', args) 244 self.debug("condrestart managers %r and workers %r" % ( 245 managers, workers)) 246 managersDict = self.getManagers() 247 exitvalue = 0 248 249 for kind, names in [('manager', managers), ('worker', workers)]: 250 for name in names: 251 pid = getPid(kind, name) 252 if not pid: 253 continue 254 if checkPidRunning(pid): 255 if kind == 'manager': 256 if not self.stopManager(name): 257 exitvalue += 1 258 continue 259 if not self.startManager(name, managersDict[name]): 260 exitvalue += 1 261 elif kind == 'worker': 262 if not self.stopWorker(name): 263 exitvalue += 1 264 continue 265 if not self.startWorker(name): 266 exitvalue += 1 267 else: 268 print "%s %s dead (stale pid %d)" % (kind, name, pid) 269 270 return exitvalue
271
272 - def create(self, args):
273 # TODO: Andy suggested we should be able to customize the 274 # configuration this generates. 275 # For that we maybe first want to use the Command class way of 276 # writing the service script. 277 """ 278 Create a default manager or worker config. 279 """ 280 if len(args) == 0: 281 raise errors.FatalError, \ 282 "Please specify 'manager' or 'worker' to create." 283 kind = args[0] 284 if len(args) == 1: 285 raise errors.FatalError, \ 286 "Please specify name of %s to create." % kind 287 name = args[1] 288 289 port = 7531 290 if len(args) == 3: 291 port = int(args[2]) 292 293 if kind == 'manager': 294 self.createManager(name, port) 295 elif kind == 'worker': 296 self.createWorker(name, managerPort=port, randomFeederports=True) 297 else: 298 raise errors.FatalError, \ 299 "Please specify 'manager' or 'worker' to create."
300
301 - def createManager(self, name, port=7531):
302 """ 303 Create a sample manager. 304 305 @returns: whether or not the config was created. 306 """ 307 self.info("Creating manager %s" % name) 308 managerDir = os.path.join(self.managersDir, name) 309 if os.path.exists(managerDir): 310 raise errors.FatalError, \ 311 "Manager directory %s already exists" % managerDir 312 makedirs(managerDir) 313 314 planetFile = os.path.join(managerDir, 'planet.xml') 315 316 # generate the file 317 handle = open(planetFile, 'w') 318 handle.write("""<planet> 319 <manager> 320 <debug>4</debug> 321 <host>localhost</host> 322 <port>%(port)d</port> 323 <transport>ssl</transport> 324 <!-- certificate path can be relative to $sysconfdir/flumotion, 325 or absolute --> 326 <!-- 327 <certificate>default.pem</certificate> 328 --> 329 <component name="manager-bouncer" type="htpasswdcrypt-bouncer"> 330 <property name="data"><![CDATA[ 331 user:PSfNpHTkpTx1M 332 ]]></property> 333 </component> 334 </manager> 335 </planet> 336 """ % locals()) 337 handle.close() 338 339 # create a default.pem file if it doesn't exist yet 340 pemFile = os.path.join(configure.configdir, 'default.pem') 341 if not os.path.exists(pemFile): 342 # files in datadir are usually not executable, so call through sh 343 os.system("sh %s %s" % ( 344 os.path.join(configure.datadir, 'make-dummy-cert'), pemFile)) 345 346 return True
347
348 - def createWorker(self, name, managerPort=7531, randomFeederports=False):
349 """ 350 Create a sample worker. 351 352 @returns: whether or not the config was created. 353 """ 354 makedirs(self.workersDir) 355 self.info("Creating worker %s" % name) 356 workerFile = os.path.join(self.workersDir, "%s.xml" % name) 357 if os.path.exists(workerFile): 358 raise errors.FatalError, \ 359 "Worker file %s already exists." % workerFile 360 361 feederports = " <!-- <feederports>8600-8639</feederports> -->" 362 if randomFeederports: 363 feederports = ' <feederports random="True" />' 364 # generate the file 365 handle = open(workerFile, 'w') 366 handle.write("""<worker> 367 368 <debug>4</debug> 369 370 <manager> 371 <host>localhost</host> 372 <port>%(managerPort)s</port> 373 </manager> 374 375 <authentication type="plaintext"> 376 <username>user</username> 377 <password>test</password> 378 </authentication> 379 380 %(feederports)s 381 382 </worker> 383 """ % locals()) 384 handle.close() 385 386 return True
387 388
389 - def startManager(self, name, flowNames):
390 """ 391 Start the manager as configured in the manager directory for the given 392 manager name, together with the given flows. 393 394 @returns: whether or not the manager daemon started 395 """ 396 self.info("Starting manager %s" % name) 397 self.debug("Starting manager with flows %r" % flowNames) 398 managerDir = os.path.join(self.managersDir, name) 399 planetFile = os.path.join(managerDir, 'planet.xml') 400 if not os.path.exists(planetFile): 401 raise errors.FatalError, \ 402 "Planet file %s does not exist" % planetFile 403 self.info("Loading planet %s" % planetFile) 404 405 flowsDir = os.path.join(managerDir, 'flows') 406 flowFiles = [] 407 for flowName in flowNames: 408 flowFile = os.path.join(flowsDir, "%s.xml" % flowName) 409 if not os.path.exists(flowFile): 410 raise errors.FatalError, \ 411 "Flow file %s does not exist" % flowFile 412 flowFiles.append(flowFile) 413 self.info("Loading flow %s" % flowFile) 414 415 pid = getPid('manager', name) 416 if pid: 417 if checkPidRunning(pid): 418 raise errors.FatalError, \ 419 "Manager %s is already running (with pid %d)" % (name, pid) 420 else: 421 raise errors.FatalError, \ 422 "Manager %s is dead (stale pid %d)" % (name, pid) 423 424 dirOptions = self._getDirOptions() 425 command = "flumotion-manager %s -D --daemonize-to %s " \ 426 "--service-name %s %s %s" % ( 427 dirOptions, configure.daemondir, name, planetFile, 428 " ".join(flowFiles)) 429 self.debug("starting process %s" % command) 430 retval = self.startProcess(command) 431 432 if retval == 0: 433 self.debug("Waiting for pid for manager %s" % name) 434 pid = waitPidFile('manager', name) 435 if pid: 436 self.info("Started manager %s with pid %d" % (name, pid)) 437 return True 438 else: 439 self.warning("manager %s could not start" % name) 440 return False 441 442 self.warning("manager %s could not start (return value %d)" % ( 443 name, retval)) 444 return False
445
446 - def startWorker(self, name):
447 """ 448 Start the worker as configured in the worker directory for the given 449 worker name. 450 451 @returns: whether or not the worker daemon started 452 """ 453 self.info("Starting worker %s" % name) 454 workerFile = os.path.join(self.workersDir, "%s.xml" % name) 455 if not os.path.exists(workerFile): 456 raise errors.FatalError, \ 457 "Worker file %s does not exist" % workerFile 458 459 pid = getPid('worker', name) 460 if pid: 461 if checkPidRunning(pid): 462 raise errors.FatalError, \ 463 "Worker %s is already running (with pid %d)" % (name, pid) 464 else: 465 raise errors.FatalError, \ 466 "Worker %s is dead (stale pid %d)" % (name, pid) 467 468 # we are sure the worker is not running and there's no pid file 469 self.info("Loading worker %s" % workerFile) 470 471 dirOptions = self._getDirOptions() 472 command = "flumotion-worker %s -D --daemonize-to %s " \ 473 "--service-name %s %s" % ( 474 dirOptions, configure.daemondir, name, workerFile) 475 self.debug("Running %s" % command) 476 retval = self.startProcess(command) 477 478 if retval == 0: 479 self.debug("Waiting for pid for worker %s" % name) 480 pid = waitPidFile('worker', name) 481 if pid: 482 self.info("Started worker %s with pid %d" % (name, pid)) 483 return True 484 else: 485 self.warning("worker %s could not start" % name) 486 return False 487 488 self.warning("worker %s could not start (return value %d)" % ( 489 name, retval)) 490 return False
491
492 - def startProcess(self, command):
493 """ 494 Start the given process and block. 495 Returns the exit status of the process, or -1 in case of another error. 496 """ 497 status = os.system(command) 498 if os.WIFEXITED(status): 499 retval = os.WEXITSTATUS(status) 500 return retval 501 502 # definately something wrong 503 return -1
504
505 - def stopManager(self, name):
506 """ 507 Stop the given manager if it is running. 508 """ 509 self.info("Stopping manager %s" % name) 510 pid = getPid('manager', name) 511 if not pid: 512 return True 513 514 # FIXME: ensure a correct process is running this pid 515 if not checkPidRunning(pid): 516 self.info("Manager %s is dead (stale pid %d)" % (name, pid)) 517 return False 518 519 self.debug('Stopping manager %s with pid %d' % (name, pid)) 520 if not self.stopProcess(pid): 521 return False 522 523 self.info('Stopped manager %s with pid %d' % (name, pid)) 524 return True
525
526 - def stopWorker(self, name):
527 """ 528 Stop the given worker if it is running. 529 """ 530 self.info("Stopping worker %s" % name) 531 pid = getPid('worker', name) 532 if not pid: 533 self.info("worker %s was not running" % name) 534 return True 535 536 # FIXME: ensure a correct process is running this pid 537 if not checkPidRunning(pid): 538 self.info("Worker %s is dead (stale pid %d)" % (name, pid)) 539 return False 540 541 self.debug('Stopping worker %s with pid %d' % (name, pid)) 542 if not self.stopProcess(pid): 543 return False 544 545 self.info('Stopped worker %s with pid %d' % (name, pid)) 546 return True
547
548 - def stopProcess(self, pid):
549 """ 550 Stop the process with the given pid. 551 Wait until the pid has disappeared. 552 """ 553 startClock = time.clock() 554 termClock = startClock + configure.processTermWait 555 killClock = termClock + configure.processKillWait 556 557 self.debug('stopping process with pid %d' % pid) 558 if not termPid(pid): 559 self.warning('No process with pid %d' % pid) 560 return False 561 562 # wait for the kill 563 while (checkPidRunning(pid)): 564 if time.clock() > termClock: 565 self.warning("Process with pid %d has not responded to TERM " \ 566 "for %d seconds, killing" % (pid, 567 configure.processTermWait)) 568 killPid(pid) 569 termClock = killClock + 1.0 # so it does not get triggered again 570 571 if time.clock() > killClock: 572 self.warning("Process with pid %d has not responded to KILL " \ 573 "for %d seconds, stopping" % (pid, 574 configure.processKillWait)) 575 return False 576 577 # busy loop until kill is done 578 579 return True
580
581 - def list(self):
582 """ 583 List all service parts managed. 584 """ 585 managers = self.getManagers() 586 for name in managers.keys(): 587 flows = managers[name] 588 print "manager %s" % name 589 if flows: 590 for flow in flows: 591 print " flow %s" % flow 592 593 workers = self.getWorkers() 594 for worker in workers: 595 print "worker %s" % worker
596