Package flumotion :: Package service :: Module service
[hide private]

Source Code for Module flumotion.service.service

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This program is free software; you can redistribute it and/or modify 
  9  # it under the terms of the GNU General Public License as published by 
 10  # the Free Software Foundation; either version 2 of the License, or 
 11  # (at your option) any later version. 
 12  # See "LICENSE.GPL" in the source distribution for more information. 
 13   
 14  # This program is also licensed under the Flumotion license. 
 15  # See "LICENSE.Flumotion" in the source distribution for more information. 
 16   
 17  import os 
 18  import glob 
 19  import time 
 20   
 21  from flumotion.configure import configure 
 22  from flumotion.common import common, errors, log 
 23   
 24  """ 
 25  Servicer object used in service scripts 
 26  """ 
27 -class Servicer(log.Loggable):
28 """ 29 I manage running managers and workers on behalf of a service script. 30 """ 31 32 logCategory = 'servicer' 33
34 - def __init__(self, configDir=None, logDir=None, runDir=None):
35 """ 36 @type configDir: string 37 @param configDir: overridden path to the configuration directory. 38 @type logDir: string 39 @param logDir: overridden path to the log directory. 40 @type runDir: string 41 @param runDir: overridden path to the run directory. 42 """ 43 self.managersDir = os.path.join(configure.configdir, 'managers') 44 self.workersDir = os.path.join(configure.configdir, 'workers') 45 self._overrideDir = { 46 'logdir': logDir, 47 'rundir': runDir, 48 }
49
50 - def _parseManagersWorkers(self, command, args):
51 # parse the given args and return two sorted lists; 52 # one of manager names to act on and one of worker names 53 managers = [] 54 workers = [] 55 56 if not args: 57 managers = self.getManagers().keys() 58 managers.sort() 59 workers = self.getWorkers() 60 workers.sort() 61 return (managers, workers) 62 63 which = args[0] 64 if which not in ['manager', 'worker']: 65 raise errors.SystemError, 'Please specify either manager or worker' 66 67 if len(args) < 2: 68 raise errors.SystemError, 'Please specify which %s to %s' % ( 69 which, command) 70 71 name = args[1] 72 if which == 'manager': 73 managers = self.getManagers() 74 if not managers.has_key(name): 75 raise errors.SystemError, 'No manager "%s"' % name 76 managers = [name, ] 77 elif which == 'worker': 78 workers = self.getWorkers() 79 if not name in workers: 80 raise errors.SystemError, 'No worker with name %s' % name 81 workers = [name, ] 82 83 return (managers, workers)
84
85 - def _getDirOptions(self):
86 """ 87 Return a list of override directories for configure.configure 88 suitable for appending to a command line. 89 """ 90 args = [] 91 for key, value in self._overrideDir.items(): 92 if value: 93 args.append('--%s=%s' % (key, value)) 94 return " ".join(args)
95
96 - def getManagers(self):
97 """ 98 @returns: a dictionary of manager names -> flow names 99 """ 100 managers = {} 101 102 self.log('getManagers()') 103 if not os.path.exists(self.managersDir): 104 return managers 105 106 for managerDir in glob.glob(os.path.join(self.managersDir, '*')): 107 flows = [] # names of flows 108 # find flow files 109 flowsDir = os.path.join(managerDir, 'flows') 110 if os.path.exists(flowsDir): 111 flowFiles = glob.glob(os.path.join(flowsDir, '*.xml')) 112 for flowFile in flowFiles: 113 filename = os.path.split(flowFile)[1] 114 name = filename.split(".xml")[0] 115 flows.append(name) 116 managerName = os.path.split(managerDir)[1] 117 self.log('Adding flows %r to manager %s' % (flows, managerName)) 118 managers[managerName] = flows 119 self.log('returning managers: %r' % managers) 120 return managers
121
122 - def getWorkers(self):
123 """ 124 @returns: a list of worker names 125 """ 126 workers = [] 127 128 if not os.path.exists(self.workersDir): 129 return workers 130 131 for workerFile in glob.glob(os.path.join(self.workersDir, '*.xml')): 132 filename = os.path.split(workerFile)[1] 133 name = filename.split(".xml")[0] 134 workers.append(name) 135 workers.sort() 136 return workers
137
138 - def start(self, args):
139 """ 140 Start processes as given in the args. 141 142 If nothing specified, start all managers and workers. 143 If first argument is "manager", start given manager, 144 or all if none specified. 145 If first argument is "worker", start given worker, 146 or all if none specified. 147 148 @returns: an exit value reflecting the number of processes that failed 149 to start 150 """ 151 (managers, workers) = self._parseManagersWorkers('start', args) 152 self.debug("Start managers %r and workers %r" % (managers, workers)) 153 managersDict = self.getManagers() 154 exitvalue = 0 155 156 for name in managers: 157 if not self.startManager(name, managersDict[name]): 158 exitvalue += 1 159 for name in workers: 160 if not self.startWorker(name): 161 exitvalue += 1 162 163 return exitvalue
164
165 - def stop(self, args):
166 """ 167 Stop processes as given in the args. 168 169 If nothing specified, stop all managers and workers. 170 If first argument is "manager", stop given manager, 171 or all if none specified. 172 If first argument is "worker", stop given worker, 173 or all if none specified. 174 175 @returns: an exit value reflecting the number of processes that failed 176 to stop 177 """ 178 (managers, workers) = self._parseManagersWorkers('stop', args) 179 self.debug("Stop managers %r and workers %r" % (managers, workers)) 180 181 exitvalue = 0 182 183 for name in workers: 184 if not self.stopWorker(name): 185 exitvalue += 1 186 for name in managers: 187 if not self.stopManager(name): 188 exitvalue += 1 189 190 return exitvalue
191
192 - def status(self, args):
193 """ 194 Give status on processes as given in the args. 195 """ 196 (managers, workers) = self._parseManagersWorkers('status', args) 197 self.debug("Status managers %r and workers %r" % (managers, workers)) 198 for type, list in [('manager', managers), ('worker', workers)]: 199 for name in list: 200 pid = common.getPid(type, name) 201 if not pid: 202 print "%s %s not running" % (type, name) 203 continue 204 if common.checkPidRunning(pid): 205 print "%s %s is running with pid %d" % (type, name, pid) 206 else: 207 print "%s %s dead (stale pid %d)" % (type, name, pid)
208
209 - def clean(self, args):
210 """ 211 Clean up dead process pid files as given in the args. 212 """ 213 (managers, workers) = self._parseManagersWorkers('clean', args) 214 self.debug("Clean managers %r and workers %r" % (managers, workers)) 215 for type, list in [('manager', managers), ('worker', workers)]: 216 for name in list: 217 pid = common.getPid(type, name) 218 if not pid: 219 # may be a file that contains bogus data 220 print "deleting bogus pid file for %s %s" % (type, name) 221 common.deletePidFile(type, name) 222 continue 223 if not common.checkPidRunning(pid): 224 self.debug("Cleaning up stale pid %d for %s %s" % ( 225 pid, type, name)) 226 print "deleting stale pid file for %s %s" % (type, name) 227 common.deletePidFile(type, name)
228
229 - def create(self, args):
230 # TODO: Andy suggested we should be able to customize the 231 # configuration this generates. 232 # For that we maybe first want to use the Command class way of 233 # writing the service script. 234 """ 235 Create a default manager or worker config. 236 """ 237 if len(args) == 0: 238 raise errors.SystemError, \ 239 "Please specify 'manager' or 'worker' to create." 240 type = args[0] 241 if len(args) == 1: 242 raise errors.SystemError, \ 243 "Please specify name of %s to create." % type 244 name = args[1] 245 246 port = 7531 247 if len(args) == 3: 248 port = int(args[2]) 249 250 if type == 'manager': 251 self.createManager(name, port) 252 elif type == 'worker': 253 self.createWorker(name, managerPort=port, randomFeederports=True) 254 else: 255 raise errors.SystemError, \ 256 "Please specify 'manager' or 'worker' to create."
257
258 - def createManager(self, name, port=7531):
259 """ 260 Create a sample manager. 261 262 @returns: whether or not the config was created. 263 """ 264 self.info("Creating manager %s" % name) 265 managerDir = os.path.join(self.managersDir, name) 266 if os.path.exists(managerDir): 267 raise errors.SystemError, \ 268 "Manager directory %s already exists" % managerDir 269 os.makedirs(managerDir) 270 271 planetFile = os.path.join(managerDir, 'planet.xml') 272 273 # generate the file 274 handle = open(planetFile, 'w') 275 handle.write("""<planet> 276 <manager> 277 <debug>4</debug> 278 <host>localhost</host> 279 <port>%(port)d</port> 280 <transport>ssl</transport> 281 <!-- certificate path can be relative to $sysconfdir/flumotion, 282 or absolute --> 283 <!-- 284 <certificate>default.pem</certificate> 285 --> 286 <component name="manager-bouncer" type="htpasswdcrypt-bouncer"> 287 <property name="data"><![CDATA[ 288 user:PSfNpHTkpTx1M 289 ]]></property> 290 </component> 291 </manager> 292 </planet> 293 """ % locals()) 294 handle.close() 295 296 # create a default.pem file if it doesn't exist yet 297 pemFile = os.path.join(configure.configdir, 'default.pem') 298 if not os.path.exists(pemFile): 299 os.system("%s %s" % ( 300 os.path.join(configure.datadir, 'make-dummy-cert'), pemFile)) 301 302 return True
303
304 - def createWorker(self, name, managerPort=7531, randomFeederports=False):
305 """ 306 Create a sample worker. 307 308 @returns: whether or not the config was created. 309 """ 310 os.makedirs(self.workersDir) 311 self.info("Creating worker %s" % name) 312 workerFile = os.path.join(self.workersDir, "%s.xml" % name) 313 if os.path.exists(workerFile): 314 raise errors.SystemError, \ 315 "Worker file %s already exists." % workerFile 316 317 feederports = " <!-- <feederports>8600-8639</feederports> -->" 318 if randomFeederports: 319 feederports = ' <feederports random="True" />' 320 # generate the file 321 handle = open(workerFile, 'w') 322 handle.write("""<worker> 323 324 <debug>4</debug> 325 326 <manager> 327 <host>localhost</host> 328 <port>%(managerPort)s</port> 329 </manager> 330 331 <authentication type="plaintext"> 332 <username>user</username> 333 <password>test</password> 334 </authentication> 335 336 %(feederports)s 337 338 </worker> 339 """ % locals()) 340 handle.close() 341 342 return True
343 344
345 - def startManager(self, name, flowNames):
346 """ 347 Start the manager as configured in the manager directory for the given 348 manager name, together with the given flows. 349 350 @returns: whether or not the manager daemon started 351 """ 352 self.info("Starting manager %s" % name) 353 self.debug("Starting manager with flows %r" % flowNames) 354 managerDir = os.path.join(self.managersDir, name) 355 planetFile = os.path.join(managerDir, 'planet.xml') 356 if not os.path.exists(planetFile): 357 raise errors.SystemError, \ 358 "Planet file %s does not exist" % planetFile 359 self.info("Loading planet %s" % planetFile) 360 361 flowsDir = os.path.join(managerDir, 'flows') 362 flowFiles = [] 363 for flowName in flowNames: 364 flowFile = os.path.join(flowsDir, "%s.xml" % flowName) 365 if not os.path.exists(flowFile): 366 raise errors.SystemError, \ 367 "Flow file %s does not exist" % flowFile 368 flowFiles.append(flowFile) 369 self.info("Loading flow %s" % flowFile) 370 371 pid = common.getPid('manager', name) 372 if pid: 373 if common.checkPidRunning(pid): 374 raise errors.SystemError, \ 375 "Manager %s is already running (with pid %d)" % (name, pid) 376 else: 377 raise errors.SystemError, \ 378 "Manager %s is dead (stale pid %d)" % (name, pid) 379 380 dirOptions = self._getDirOptions() 381 command = "flumotion-manager %s -D --daemonize-to %s " \ 382 "--service-name %s %s %s" % ( 383 dirOptions, configure.daemondir, name, planetFile, 384 " ".join(flowFiles)) 385 self.debug("starting process %s" % command) 386 retval = self.startProcess(command) 387 388 if retval == 0: 389 self.debug("Waiting for pid for manager %s" % name) 390 pid = common.waitPidFile('manager', name) 391 if pid: 392 self.info("Started manager %s with pid %d" % (name, pid)) 393 return True 394 else: 395 self.warning("manager %s could not start" % name) 396 return False 397 398 self.warning("manager %s could not start (return value %d)" % ( 399 name, retval)) 400 return False
401
402 - def startWorker(self, name):
403 """ 404 Start the worker as configured in the worker directory for the given 405 worker name. 406 407 @returns: whether or not the worker daemon started 408 """ 409 self.info("Starting worker %s" % name) 410 workerFile = os.path.join(self.workersDir, "%s.xml" % name) 411 if not os.path.exists(workerFile): 412 raise errors.SystemError, \ 413 "Worker file %s does not exist" % workerFile 414 415 pid = common.getPid('worker', name) 416 if pid: 417 if common.checkPidRunning(pid): 418 raise errors.SystemError, \ 419 "Worker %s is already running (with pid %d)" % (name, pid) 420 else: 421 raise errors.SystemError, \ 422 "Worker %s is dead (stale pid %d)" % (name, pid) 423 424 # we are sure the worker is not running and there's no pid file 425 self.info("Loading worker %s" % workerFile) 426 427 dirOptions = self._getDirOptions() 428 command = "flumotion-worker %s -D --daemonize-to %s " \ 429 "--service-name %s %s" % ( 430 dirOptions, configure.daemondir, name, workerFile) 431 self.debug("Running %s" % command) 432 retval = self.startProcess(command) 433 434 if retval == 0: 435 self.debug("Waiting for pid for worker %s" % name) 436 pid = common.waitPidFile('worker', name) 437 if pid: 438 self.info("Started worker %s with pid %d" % (name, pid)) 439 return True 440 else: 441 self.warning("worker %s could not start" % name) 442 return False 443 444 self.warning("worker %s could not start (return value %d)" % ( 445 name, retval)) 446 return False
447
448 - def startProcess(self, command):
449 """ 450 Start the given process and block. 451 Returns the exit status of the process, or -1 in case of another error. 452 """ 453 status = os.system(command) 454 if os.WIFEXITED(status): 455 retval = os.WEXITSTATUS(status) 456 return retval 457 458 # definately something wrong 459 return -1
460
461 - def stopManager(self, name):
462 """ 463 Stop the given manager if it is running. 464 """ 465 self.info("Stopping manager %s" % name) 466 pid = common.getPid('manager', name) 467 if not pid: 468 return True 469 470 # FIXME: ensure a correct process is running this pid 471 if not common.checkPidRunning(pid): 472 self.info("Manager %s is dead (stale pid %d)" % (name, pid)) 473 return False 474 475 self.debug('Stopping manager %s with pid %d' % (name, pid)) 476 if not self.stopProcess(pid): 477 return False 478 479 self.info('Stopped manager %s with pid %d' % (name, pid)) 480 return True
481
482 - def stopWorker(self, name):
483 """ 484 Stop the given worker if it is running. 485 """ 486 self.info("Stopping worker %s" % name) 487 pid = common.getPid('worker', name) 488 if not pid: 489 self.info("worker %s was not running" % name) 490 return True 491 492 # FIXME: ensure a correct process is running this pid 493 if not common.checkPidRunning(pid): 494 self.info("Worker %s is dead (stale pid %d)" % (name, pid)) 495 return False 496 497 self.debug('Stopping worker %s with pid %d' % (name, pid)) 498 if not self.stopProcess(pid): 499 return False 500 501 self.info('Stopped worker %s with pid %d' % (name, pid)) 502 return True
503
504 - def stopProcess(self, pid):
505 """ 506 Stop the process with the given pid. 507 Wait until the pid has disappeared. 508 """ 509 startClock = time.clock() 510 termClock = startClock + configure.processTermWait 511 killClock = termClock + configure.processKillWait 512 513 self.debug('stopping process with pid %d' % pid) 514 if not common.termPid(pid): 515 self.warning('No process with pid %d' % pid) 516 return False 517 518 # wait for the kill 519 while (common.checkPidRunning(pid)): 520 if time.clock() > termClock: 521 self.warning("Process with pid %d has not responded to TERM " \ 522 "for %d seconds, killing" % (pid, 523 configure.processTermWait)) 524 common.killPid(pid) 525 termClock = killClock + 1.0 # so it does not get triggered again 526 527 if time.clock() > killClock: 528 self.warning("Process with pid %d has not responded to KILL " \ 529 "for %d seconds, stopping" % (pid, 530 configure.processKillWait)) 531 return False 532 533 # busy loop until kill is done 534 535 return True
536
537 - def list(self):
538 """ 539 List all service parts managed. 540 """ 541 managers = self.getManagers() 542 for name in managers.keys(): 543 flows = managers[name] 544 print "manager %s" % name 545 if flows: 546 for flow in flows: 547 print " flow %s" % flow 548 549 workers = self.getWorkers() 550 for worker in workers: 551 print "worker %s" % worker
552