1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """
18 Servicer object used in service scripts
19 """
20
21 import os
22 import glob
23 import time
24
25 from flumotion.configure import configure
26 from flumotion.common import errors, log
27 from flumotion.common.python import makedirs
28 from flumotion.common.process import checkPidRunning, deletePidFile, getPid, \
29 killPid, termPid, waitPidFile
30
31 __version__ = "$Rev: 6988 $"
32
33
35 """
36 I manage running managers and workers on behalf of a service script.
37 """
38
39 logCategory = 'servicer'
40
41 - def __init__(self, configDir=None, logDir=None, runDir=None):
42 """
43 @type configDir: string
44 @param configDir: overridden path to the configuration directory.
45 @type logDir: string
46 @param logDir: overridden path to the log directory.
47 @type runDir: string
48 @param runDir: overridden path to the run directory.
49 """
50 self.managersDir = os.path.join(configure.configdir, 'managers')
51 self.workersDir = os.path.join(configure.configdir, 'workers')
52 self._overrideDir = {
53 'logdir': logDir,
54 'rundir': runDir,
55 }
56
58
59
60 managers = []
61 workers = []
62
63 if not args:
64 managers = self.getManagers().keys()
65 managers.sort()
66 workers = self.getWorkers()
67 workers.sort()
68 return (managers, workers)
69
70 which = args[0]
71 if which not in ['manager', 'worker']:
72 raise errors.FatalError, 'Please specify either manager or worker'
73
74 if len(args) < 2:
75 raise errors.FatalError, 'Please specify which %s to %s' % (
76 which, command)
77
78 name = args[1]
79 if which == 'manager':
80 managers = self.getManagers()
81 if not managers.has_key(name):
82 raise errors.FatalError, 'No manager "%s"' % name
83 managers = [name, ]
84 elif which == 'worker':
85 workers = self.getWorkers()
86 if not name in workers:
87 raise errors.FatalError, 'No worker with name %s' % name
88 workers = [name, ]
89
90 return (managers, workers)
91
93 """
94 Return a list of override directories for configure.configure
95 suitable for appending to a command line.
96 """
97 args = []
98 for key, value in self._overrideDir.items():
99 if value:
100 args.append('--%s=%s' % (key, value))
101 return " ".join(args)
102
104 """
105 @returns: a dictionary of manager names -> flow names
106 """
107 managers = {}
108
109 self.log('getManagers()')
110 if not os.path.exists(self.managersDir):
111 return managers
112
113 for managerDir in glob.glob(os.path.join(self.managersDir, '*')):
114 flows = []
115
116 flowsDir = os.path.join(managerDir, 'flows')
117 if os.path.exists(flowsDir):
118 flowFiles = glob.glob(os.path.join(flowsDir, '*.xml'))
119 for flowFile in flowFiles:
120 filename = os.path.split(flowFile)[1]
121 name = filename.split(".xml")[0]
122 flows.append(name)
123 managerName = os.path.split(managerDir)[1]
124 self.log('Adding flows %r to manager %s' % (flows, managerName))
125 managers[managerName] = flows
126 self.log('returning managers: %r' % managers)
127 return managers
128
130 """
131 @returns: a list of worker names
132 """
133 workers = []
134
135 if not os.path.exists(self.workersDir):
136 return workers
137
138 for workerFile in glob.glob(os.path.join(self.workersDir, '*.xml')):
139 filename = os.path.split(workerFile)[1]
140 name = filename.split(".xml")[0]
141 workers.append(name)
142 workers.sort()
143 return workers
144
146 """
147 Start processes as given in the args.
148
149 If nothing specified, start all managers and workers.
150 If first argument is "manager", start given manager.
151 If first argument is "worker", start given worker.
152
153 @returns: an exit value reflecting the number of processes that failed
154 to start
155 """
156 (managers, workers) = self._parseManagersWorkers('start', args)
157 self.debug("Start managers %r and workers %r" % (managers, workers))
158 managersDict = self.getManagers()
159 exitvalue = 0
160
161 for name in managers:
162 if not self.startManager(name, managersDict[name]):
163 exitvalue += 1
164 for name in workers:
165 if not self.startWorker(name):
166 exitvalue += 1
167
168 return exitvalue
169
170 - def stop(self, args):
171 """
172 Stop processes as given in the args.
173
174 If nothing specified, stop all managers and workers.
175 If first argument is "manager", stop given manager.
176 If first argument is "worker", stop given worker.
177
178 @returns: an exit value reflecting the number of processes that failed
179 to stop
180 """
181 (managers, workers) = self._parseManagersWorkers('stop', args)
182 self.debug("Stop managers %r and workers %r" % (managers, workers))
183
184 exitvalue = 0
185
186 for name in workers:
187 if not self.stopWorker(name):
188 exitvalue += 1
189 for name in managers:
190 if not self.stopManager(name):
191 exitvalue += 1
192
193 return exitvalue
194
196 """
197 Give status on processes as given in the args.
198 """
199 (managers, workers) = self._parseManagersWorkers('status', args)
200 self.debug("Status managers %r and workers %r" % (managers, workers))
201 for kind, names in [('manager', managers), ('worker', workers)]:
202 for name in names:
203 pid = getPid(kind, name)
204 if not pid:
205 print "%s %s not running" % (kind, name)
206 continue
207 if checkPidRunning(pid):
208 print "%s %s is running with pid %d" % (kind, name, pid)
209 else:
210 print "%s %s dead (stale pid %d)" % (kind, name, pid)
211
213 """
214 Clean up dead process pid files as given in the args.
215 """
216 (managers, workers) = self._parseManagersWorkers('clean', args)
217 self.debug("Clean managers %r and workers %r" % (managers, workers))
218 for kind, names in [('manager', managers), ('worker', workers)]:
219 for name in names:
220 pid = getPid(kind, name)
221 if not pid:
222
223 print "deleting bogus pid file for %s %s" % (kind, name)
224 deletePidFile(kind, name)
225 continue
226 if not checkPidRunning(pid):
227 self.debug("Cleaning up stale pid %d for %s %s" % (
228 pid, kind, name))
229 print "deleting stale pid file for %s %s" % (kind, name)
230 deletePidFile(kind, name)
231
233 """
234 Restart running processes as given in the args.
235
236 If nothing specified, condrestart all managers and workers.
237 If first argument is "manager", condrestart given manager.
238 If first argument is "worker", condrestart given worker.
239
240 @returns: an exit value reflecting the number of processes that failed
241 to start
242 """
243 (managers, workers) = self._parseManagersWorkers('condrestart', args)
244 self.debug("condrestart managers %r and workers %r" % (
245 managers, workers))
246 managersDict = self.getManagers()
247 exitvalue = 0
248
249 for kind, names in [('manager', managers), ('worker', workers)]:
250 for name in names:
251 pid = getPid(kind, name)
252 if not pid:
253 continue
254 if checkPidRunning(pid):
255 if kind == 'manager':
256 if not self.stopManager(name):
257 exitvalue += 1
258 continue
259 if not self.startManager(name, managersDict[name]):
260 exitvalue += 1
261 elif kind == 'worker':
262 if not self.stopWorker(name):
263 exitvalue += 1
264 continue
265 if not self.startWorker(name):
266 exitvalue += 1
267 else:
268 print "%s %s dead (stale pid %d)" % (kind, name, pid)
269
270 return exitvalue
271
273
274
275
276
277 """
278 Create a default manager or worker config.
279 """
280 if len(args) == 0:
281 raise errors.FatalError, \
282 "Please specify 'manager' or 'worker' to create."
283 kind = args[0]
284 if len(args) == 1:
285 raise errors.FatalError, \
286 "Please specify name of %s to create." % kind
287 name = args[1]
288
289 port = 7531
290 if len(args) == 3:
291 port = int(args[2])
292
293 if kind == 'manager':
294 self.createManager(name, port)
295 elif kind == 'worker':
296 self.createWorker(name, managerPort=port, randomFeederports=True)
297 else:
298 raise errors.FatalError, \
299 "Please specify 'manager' or 'worker' to create."
300
302 """
303 Create a sample manager.
304
305 @returns: whether or not the config was created.
306 """
307 self.info("Creating manager %s" % name)
308 managerDir = os.path.join(self.managersDir, name)
309 if os.path.exists(managerDir):
310 raise errors.FatalError, \
311 "Manager directory %s already exists" % managerDir
312 makedirs(managerDir)
313
314 planetFile = os.path.join(managerDir, 'planet.xml')
315
316
317 handle = open(planetFile, 'w')
318 handle.write("""<planet>
319 <manager>
320 <debug>4</debug>
321 <host>localhost</host>
322 <port>%(port)d</port>
323 <transport>ssl</transport>
324 <!-- certificate path can be relative to $sysconfdir/flumotion,
325 or absolute -->
326 <!--
327 <certificate>default.pem</certificate>
328 -->
329 <component name="manager-bouncer" type="htpasswdcrypt-bouncer">
330 <property name="data"><![CDATA[
331 user:PSfNpHTkpTx1M
332 ]]></property>
333 </component>
334 </manager>
335 </planet>
336 """ % locals())
337 handle.close()
338
339
340 pemFile = os.path.join(configure.configdir, 'default.pem')
341 if not os.path.exists(pemFile):
342
343 os.system("sh %s %s" % (
344 os.path.join(configure.datadir, 'make-dummy-cert'), pemFile))
345
346 return True
347
348 - def createWorker(self, name, managerPort=7531, randomFeederports=False):
349 """
350 Create a sample worker.
351
352 @returns: whether or not the config was created.
353 """
354 makedirs(self.workersDir)
355 self.info("Creating worker %s" % name)
356 workerFile = os.path.join(self.workersDir, "%s.xml" % name)
357 if os.path.exists(workerFile):
358 raise errors.FatalError, \
359 "Worker file %s already exists." % workerFile
360
361 feederports = " <!-- <feederports>8600-8639</feederports> -->"
362 if randomFeederports:
363 feederports = ' <feederports random="True" />'
364
365 handle = open(workerFile, 'w')
366 handle.write("""<worker>
367
368 <debug>4</debug>
369
370 <manager>
371 <host>localhost</host>
372 <port>%(managerPort)s</port>
373 </manager>
374
375 <authentication type="plaintext">
376 <username>user</username>
377 <password>test</password>
378 </authentication>
379
380 %(feederports)s
381
382 </worker>
383 """ % locals())
384 handle.close()
385
386 return True
387
388
390 """
391 Start the manager as configured in the manager directory for the given
392 manager name, together with the given flows.
393
394 @returns: whether or not the manager daemon started
395 """
396 self.info("Starting manager %s" % name)
397 self.debug("Starting manager with flows %r" % flowNames)
398 managerDir = os.path.join(self.managersDir, name)
399 planetFile = os.path.join(managerDir, 'planet.xml')
400 if not os.path.exists(planetFile):
401 raise errors.FatalError, \
402 "Planet file %s does not exist" % planetFile
403 self.info("Loading planet %s" % planetFile)
404
405 flowsDir = os.path.join(managerDir, 'flows')
406 flowFiles = []
407 for flowName in flowNames:
408 flowFile = os.path.join(flowsDir, "%s.xml" % flowName)
409 if not os.path.exists(flowFile):
410 raise errors.FatalError, \
411 "Flow file %s does not exist" % flowFile
412 flowFiles.append(flowFile)
413 self.info("Loading flow %s" % flowFile)
414
415 pid = getPid('manager', name)
416 if pid:
417 if checkPidRunning(pid):
418 raise errors.FatalError, \
419 "Manager %s is already running (with pid %d)" % (name, pid)
420 else:
421 raise errors.FatalError, \
422 "Manager %s is dead (stale pid %d)" % (name, pid)
423
424 dirOptions = self._getDirOptions()
425 command = "flumotion-manager %s -D --daemonize-to %s " \
426 "--service-name %s %s %s" % (
427 dirOptions, configure.daemondir, name, planetFile,
428 " ".join(flowFiles))
429 self.debug("starting process %s" % command)
430 retval = self.startProcess(command)
431
432 if retval == 0:
433 self.debug("Waiting for pid for manager %s" % name)
434 pid = waitPidFile('manager', name)
435 if pid:
436 self.info("Started manager %s with pid %d" % (name, pid))
437 return True
438 else:
439 self.warning("manager %s could not start" % name)
440 return False
441
442 self.warning("manager %s could not start (return value %d)" % (
443 name, retval))
444 return False
445
447 """
448 Start the worker as configured in the worker directory for the given
449 worker name.
450
451 @returns: whether or not the worker daemon started
452 """
453 self.info("Starting worker %s" % name)
454 workerFile = os.path.join(self.workersDir, "%s.xml" % name)
455 if not os.path.exists(workerFile):
456 raise errors.FatalError, \
457 "Worker file %s does not exist" % workerFile
458
459 pid = getPid('worker', name)
460 if pid:
461 if checkPidRunning(pid):
462 raise errors.FatalError, \
463 "Worker %s is already running (with pid %d)" % (name, pid)
464 else:
465 raise errors.FatalError, \
466 "Worker %s is dead (stale pid %d)" % (name, pid)
467
468
469 self.info("Loading worker %s" % workerFile)
470
471 dirOptions = self._getDirOptions()
472 command = "flumotion-worker %s -D --daemonize-to %s " \
473 "--service-name %s %s" % (
474 dirOptions, configure.daemondir, name, workerFile)
475 self.debug("Running %s" % command)
476 retval = self.startProcess(command)
477
478 if retval == 0:
479 self.debug("Waiting for pid for worker %s" % name)
480 pid = waitPidFile('worker', name)
481 if pid:
482 self.info("Started worker %s with pid %d" % (name, pid))
483 return True
484 else:
485 self.warning("worker %s could not start" % name)
486 return False
487
488 self.warning("worker %s could not start (return value %d)" % (
489 name, retval))
490 return False
491
493 """
494 Start the given process and block.
495 Returns the exit status of the process, or -1 in case of another error.
496 """
497 status = os.system(command)
498 if os.WIFEXITED(status):
499 retval = os.WEXITSTATUS(status)
500 return retval
501
502
503 return -1
504
506 """
507 Stop the given manager if it is running.
508 """
509 self.info("Stopping manager %s" % name)
510 pid = getPid('manager', name)
511 if not pid:
512 return True
513
514
515 if not checkPidRunning(pid):
516 self.info("Manager %s is dead (stale pid %d)" % (name, pid))
517 return False
518
519 self.debug('Stopping manager %s with pid %d' % (name, pid))
520 if not self.stopProcess(pid):
521 return False
522
523 self.info('Stopped manager %s with pid %d' % (name, pid))
524 return True
525
527 """
528 Stop the given worker if it is running.
529 """
530 self.info("Stopping worker %s" % name)
531 pid = getPid('worker', name)
532 if not pid:
533 self.info("worker %s was not running" % name)
534 return True
535
536
537 if not checkPidRunning(pid):
538 self.info("Worker %s is dead (stale pid %d)" % (name, pid))
539 return False
540
541 self.debug('Stopping worker %s with pid %d' % (name, pid))
542 if not self.stopProcess(pid):
543 return False
544
545 self.info('Stopped worker %s with pid %d' % (name, pid))
546 return True
547
549 """
550 Stop the process with the given pid.
551 Wait until the pid has disappeared.
552 """
553 startClock = time.clock()
554 termClock = startClock + configure.processTermWait
555 killClock = termClock + configure.processKillWait
556
557 self.debug('stopping process with pid %d' % pid)
558 if not termPid(pid):
559 self.warning('No process with pid %d' % pid)
560 return False
561
562
563 while (checkPidRunning(pid)):
564 if time.clock() > termClock:
565 self.warning("Process with pid %d has not responded to TERM " \
566 "for %d seconds, killing" % (pid,
567 configure.processTermWait))
568 killPid(pid)
569 termClock = killClock + 1.0
570
571 if time.clock() > killClock:
572 self.warning("Process with pid %d has not responded to KILL " \
573 "for %d seconds, stopping" % (pid,
574 configure.processKillWait))
575 return False
576
577
578
579 return True
580
582 """
583 List all service parts managed.
584 """
585 managers = self.getManagers()
586 for name in managers.keys():
587 flows = managers[name]
588 print "manager %s" % name
589 if flows:
590 for flow in flows:
591 print " flow %s" % flow
592
593 workers = self.getWorkers()
594 for worker in workers:
595 print "worker %s" % worker
596