Package flumotion :: Package worker :: Module worker
[hide private]

Source Code for Module flumotion.worker.worker

   1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
   2  # vi:si:et:sw=4:sts=4:ts=4 
   3  # 
   4  # Flumotion - a streaming media server 
   5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
   6  # All rights reserved. 
   7   
   8  # This file may be distributed and/or modified under the terms of 
   9  # the GNU General Public License version 2 as published by 
  10  # the Free Software Foundation. 
  11  # This file is distributed without any warranty; without even the implied 
  12  # warranty of merchantability or fitness for a particular purpose. 
  13  # See "LICENSE.GPL" in the source distribution for more information. 
  14   
  15  # Licensees having purchased or holding a valid Flumotion Advanced 
  16  # Streaming Server license may use this file in accordance with the 
  17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
  18  # See "LICENSE.Flumotion" in the source distribution for more information. 
  19   
  20  # Headers in this file shall remain intact. 
  21   
  22  """ 
  23  worker-side objects to handle worker clients 
  24  """ 
  25   
  26  import os 
  27  import signal 
  28  import sys 
  29  import exceptions 
  30   
  31  import gst 
  32  import gst.interfaces 
  33   
  34  from twisted.cred import portal 
  35  from twisted.internet import defer, reactor 
  36  from twisted.spread import pb 
  37  import twisted.cred.error 
  38  from twisted.internet import error 
  39   
  40  from flumotion.common import errors, interfaces, log, bundleclient 
  41  from flumotion.common import common, medium, messages, worker 
  42  from flumotion.twisted import checkers, fdserver, compat 
  43  from flumotion.twisted import pb as fpb 
  44  from flumotion.twisted import defer as fdefer 
  45  from flumotion.twisted.defer import defer_generator_method 
  46  from flumotion.twisted.compat import implements 
  47  from flumotion.configure import configure 
  48  from flumotion.worker import feed 
  49   
  50  JOB_SHUTDOWN_TIMEOUT = 5 
  51   
  52  factoryClass = fpb.ReconnectingFPBClientFactory 
53 -class WorkerClientFactory(factoryClass):
54 """ 55 I am a client factory for the worker to log in to the manager. 56 """ 57 logCategory = 'worker' 58 perspectiveInterface = interfaces.IWorkerMedium 59
60 - def __init__(self, brain):
61 """ 62 @type brain: L{flumotion.worker.worker.WorkerBrain} 63 """ 64 self._managerHost = brain.managerHost 65 self._managerPort = brain.managerPort 66 self.medium = brain.medium 67 # doing this as a class method triggers a doc error 68 factoryClass.__init__(self) 69 # maximum 10 second delay for workers to attempt to log in again 70 self.maxDelay = 10
71
72 - def clientConnectionFailed(self, connector, reason):
73 """ 74 @param reason: L{twisted.spread.pb.failure.Failure} 75 """ 76 # this method exists so that we log the failure 77 fpb.ReconnectingFPBClientFactory.clientConnectionFailed(self, 78 connector, reason) 79 # delay is now updated 80 self.debug("failed to connect, will try to reconnect in %f seconds" % self.delay)
81 82 ### ReconnectingPBClientFactory methods
83 - def gotDeferredLogin(self, d):
84 # the deferred from the login is now available 85 # add some of our own to it 86 def remoteDisconnected(remoteReference): 87 if reactor.killed: 88 self.log('Connection to manager lost due to shutdown') 89 else: 90 self.warning('Lost connection to manager, ' 91 'will attempt to reconnect')
92 93 def loginCallback(reference): 94 self.info("Logged in to manager") 95 self.debug("remote reference %r" % reference) 96 97 self.medium.setRemoteReference(reference) 98 reference.notifyOnDisconnect(remoteDisconnected)
99 100 def alreadyConnectedErrback(failure): 101 failure.trap(errors.AlreadyConnectedError) 102 self.warning('A worker with the name "%s" is already connected.' % 103 failure.value) 104 105 def accessDeniedErrback(failure): 106 failure.trap(twisted.cred.error.UnauthorizedLogin) 107 self.warning('Access denied.') 108 109 def connectionRefusedErrback(failure): 110 failure.trap(twisted.internet.error.ConnectionRefusedError) 111 self.warning('Connection to %s:%d refused.' % (self._managerHost, 112 self._managerPort)) 113 114 def NoSuchMethodErrback(failure): 115 failure.trap(twisted.spread.flavors.NoSuchMethod) 116 # failure.value is a str 117 if failure.value.find('remote_getKeycardClasses') > -1: 118 self.warning( 119 "Manager %s:%d is older than version 0.3.0. " 120 "Please upgrade." % (self._managerHost, self._managerPort)) 121 return 122 123 return failure 124 125 def loginFailedErrback(failure): 126 self.warning('Login failed, reason: %s' % str(failure)) 127 128 d.addCallback(loginCallback) 129 d.addErrback(accessDeniedErrback) 130 d.addErrback(connectionRefusedErrback) 131 d.addErrback(alreadyConnectedErrback) 132 d.addErrback(NoSuchMethodErrback) 133 d.addErrback(loginFailedErrback) 134
135 -class WorkerMedium(medium.PingingMedium):
136 """ 137 I am a medium interfacing with the manager-side WorkerAvatar. 138 139 @ivar brain: the worker brain 140 @type brain: L{WorkerBrain} 141 """ 142 143 logCategory = 'workermedium' 144 145 implements(interfaces.IWorkerMedium) 146
147 - def __init__(self, brain, ports):
148 """ 149 @type brain: L{WorkerBrain} 150 """ 151 self.brain = brain 152 self._ports = ports
153 154 ### pb.Referenceable method for the manager's WorkerAvatar
155 - def remote_getPorts(self):
156 """ 157 Gets the range of feed ports that this worker was configured to 158 use. 159 160 @rtype: list of int 161 @return: list of ports 162 """ 163 return self._ports
164
165 - def remote_getFeedServerPort(self):
166 """ 167 Return the TCP port the Feed Server is listening on. 168 169 @rtype: int, or NoneType 170 @return: TCP port number, or None if there is no feed server 171 """ 172 port = self.brain.feedServerPort 173 return port
174
175 - def remote_create(self, avatarId, type, moduleName, methodName, nice=0):
176 """ 177 Start a component of the given type with the given nice level. 178 Will spawn a new job process to run the component in. 179 180 @param avatarId: avatar identification string 181 @type avatarId: str 182 @param type: type of the component to create 183 @type type: str 184 @param moduleName: name of the module to create the component from 185 @type moduleName: str 186 @param methodName: the factory method to use to create the component 187 @type methodName: str 188 @param nice: nice level 189 @type nice: int 190 191 @returns: a deferred fired when the process has started and created 192 the component 193 """ 194 195 # from flumotion.common import debug 196 # def write(indent, str, *args): 197 # print ('[%d]%s%s' % (os.getpid(), indent, str)) % args 198 # debug.trace_start(ignore_files_re='twisted/python/rebuild', 199 # write=write) 200 self.info('Starting component "%s" of type "%s"' % (avatarId, type)) 201 202 # set up bundles as we need to have a pb connection to download 203 # the modules -- can't do that in the kid yet. 204 # FIXME: thomas: find a way to rebuild less so this doesn't take 205 # excessive amounts of CPU time 206 self.debug('setting up bundles for %s' % moduleName) 207 d = self.bundleLoader.getBundles(moduleName=moduleName) 208 yield d 209 # check errors, will proxy to the manager 210 bundles = d.value() 211 212 # this could throw ComponentAlreadyStartingError 213 d = self.brain.deferredCreate(avatarId) 214 if not d: 215 msg = ("Component '%s' has already received a create request" 216 % avatarId) 217 raise errors.ComponentCreateError(msg) 218 219 # spawn the job process 220 self.brain.kindergarten.play(avatarId, type, moduleName, methodName, 221 nice, bundles) 222 223 yield d 224 225 try: 226 result = d.value() 227 except errors.ComponentCreateError, e: 228 self.debug('create deferred for %s failed, forwarding error' % 229 avatarId) 230 raise 231 self.debug('create deferred for %s succeeded (%r)' 232 % (avatarId, result)) 233 yield result
234 remote_create = defer_generator_method(remote_create) 235
236 - def remote_checkElements(self, elementNames):
237 """ 238 Checks if one or more GStreamer elements are present and can be 239 instantiated. 240 241 @param elementNames: names of the Gstreamer elements 242 @type elementNames: list of str 243 244 @rtype: list of str 245 @returns: a list of instantiatable element names 246 """ 247 self.debug('remote_checkElements: element names to check %r' % ( 248 elementNames,)) 249 250 list = [] 251 for name in elementNames: 252 try: 253 gst.element_factory_make(name) 254 list.append(name) 255 except gst.PluginNotFoundError: 256 pass 257 self.debug('remote_checkElements: returning elements names %r' % list) 258 return list
259
260 - def remote_checkImport(self, moduleName):
261 """ 262 Checks if the given module can be imported. 263 264 @param moduleName: name of the module to check 265 @type moduleName: str 266 267 @returns: None or Failure 268 """ 269 self.debug('remote_checkImport: %s', moduleName) 270 # FIXME: maybe find a nice way to check if we can import 271 # without importing ? 272 __import__(moduleName)
273
274 - def remote_runFunction(self, module, function, *args, **kwargs):
275 """ 276 Runs the given function in the given module with the given arguments. 277 278 @param module: module the function lives in 279 @type module: str 280 @param function: function to run 281 @type function: str 282 283 @returns: the return value of the given function in the module. 284 """ 285 return self.runBundledFunction(module, function, *args, **kwargs)
286
287 - def remote_getComponents(self):
288 """ 289 I return a list of componentAvatarIds, I have. I am called by the 290 manager soon after I attach to it. This is needed on reconnects 291 so that the manager knows what components it needs to start on me. 292 293 @returns: a list of componentAvatarIds 294 """ 295 return self.brain.kindergarten.getKidAvatarIds()
296
297 -class Kid:
298 """ 299 I am an abstraction of a job process started by the worker. 300 301 @cvar pid: PID of the child process 302 @type pid: int 303 @cvar avatarId: avatar identification string 304 @type avatarId: str 305 @cvar type: type of the component to create 306 @type type: str 307 @cvar moduleName: name of the module to create the component from 308 @type moduleName: str 309 @cvar methodName: the factory method to use to create the component 310 @type methodName: str 311 @cvar nice: the nice level to run the kid as 312 @type nice: int 313 @cvar bundles: ordered list of (bundleName, bundlePath) needed to 314 create the component 315 @type bundles: list of (str, str) 316 """
317 - def __init__(self, pid, avatarId, type, moduleName, methodName, nice, 318 bundles):
319 self.pid = pid 320 self.avatarId = avatarId 321 self.type = type 322 self.moduleName = moduleName 323 self.methodName = methodName 324 self.nice = nice 325 self.bundles = bundles
326
327 -class JobProcessProtocol(worker.ProcessProtocol):
328 - def __init__(self, kindergarten, avatarId):
329 worker.ProcessProtocol.__init__(self, kindergarten, avatarId, 330 'component', 331 kindergarten.brain.workerName)
332
333 - def sendMessage(self, message):
334 kg = self.loggable 335 kg.brain.callRemote('componentAddMessage', self.avatarId, 336 message)
337
338 - def processEnded(self, status):
339 kg = self.loggable 340 signum = status.value.signal 341 342 kg.removeKidByPid(self.pid) 343 344 # we need to trigger a failure on the create deferred 345 # if the job failed before logging in to the worker; 346 # otherwise the manager still thinks it's starting up when it's 347 # dead. If the job already attached to the worker however, 348 # the create deferred will already have callbacked. 349 if kg.brain.deferredCreateRegistered(self.avatarId): 350 if signum: 351 reason = "received signal %d" % signum 352 else: 353 reason = "unknown reason" 354 text = "Component '%s' has exited early (%s). " \ 355 "This is sometimes triggered by a corrupt " \ 356 "GStreamer registry." % (self.avatarId, reason) 357 kg.brain.deferredCreateFailed(self.avatarId, 358 errors.ComponentCreateError(text)) 359 360 kg.brain.jobHeaven.lostAvatar(self.avatarId) 361 if kg.brain.deferredShutdownRegistered(self.avatarId): 362 kg.brain.deferredShutdownTrigger(self.avatarId) 363 364 # chain up 365 worker.ProcessProtocol.processEnded(self, status)
366
367 -class Kindergarten(log.Loggable):
368 """ 369 I spawn job processes. 370 I live in the worker brain. 371 """ 372 373 logCategory = 'workerbrain' # thomas: I don't like Kindergarten 374
375 - def __init__(self, options, socketPath, brain):
376 """ 377 @param options: the optparse option instance of command-line options 378 @type options: dict 379 @param socketPath: the path of the Unix domain socket for PB 380 @type socketPath: str 381 @param brain: a reference to the worker brain 382 @type brain: L{WorkerBrain} 383 """ 384 self.brain = brain 385 self.options = options 386 387 self._onShutdown = None # If set, a deferred to fire when our last child 388 # process exits 389 390 self._kids = {} # avatarId -> Kid 391 self._socketPath = socketPath
392
393 - def play(self, avatarId, type, moduleName, methodName, nice, bundles):
394 """ 395 Create a kid and make it "play" by starting a job. 396 Starts a component with the given name, of the given type, with 397 the given nice level. 398 399 This will spawn a new flumotion-job process. 400 401 @param avatarId: avatarId the component should use to log in 402 @type avatarId: str 403 @param type: type of component to start 404 @type type: str 405 @param moduleName: name of the module to create the component from 406 @type moduleName: str 407 @param methodName: the factory method to use to create the component 408 @type methodName: str 409 @param nice: nice level 410 @type nice: int 411 @param bundles: ordered list of (bundleName, bundlePath) for this 412 component 413 @type bundles: list of (str, str) 414 """ 415 p = JobProcessProtocol(self, avatarId) 416 executable = os.path.join(os.path.dirname(sys.argv[0]), 'flumotion-job') 417 if not os.path.exists(executable): 418 self.error("Trying to spawn job process, but '%s' does not " 419 "exist" % executable) 420 # Evil FIXME: make argv[0] of the kid insult the user 421 argv = [executable, avatarId, self._socketPath] 422 423 realexecutable = executable 424 425 # Run some jobs under valgrind, optionally. Would be nice to have the 426 # arguments to run it with configurable, but this'll do for now. 427 # FLU_VALGRIND_JOB takes a comma-seperated list of full component 428 # avatar IDs. 429 if os.environ.has_key('FLU_VALGRIND_JOB'): 430 jobnames = os.environ['FLU_VALGRIND_JOB'].split(',') 431 if avatarId in jobnames: 432 realexecutable = 'valgrind' 433 # We can't just valgrind flumotion-job, we have to valgrind 434 # python running flumotion-job, otherwise we'd need 435 # --trace-children (not quite sure why), which we don't want 436 argv = ['valgrind', '--leak-check=full', '--num-callers=24', 437 '--leak-resolution=high', '--show-reachable=yes', 438 'python'] + argv 439 440 childFDs = {0: 0, 1: 1, 2: 2} 441 env = {} 442 env.update(os.environ) 443 # FIXME: publicize log._FLU_DEBUG ? 444 env['FLU_DEBUG'] = log._FLU_DEBUG 445 process = reactor.spawnProcess(p, realexecutable, env=env, args=argv, 446 childFDs=childFDs) 447 448 p.setPid(process.pid) 449 450 self._kids[avatarId] = \ 451 Kid(process.pid, avatarId, type, moduleName, methodName, nice, 452 bundles)
453
454 - def setOnShutdown(self, d):
455 """ 456 Set a deferred to fire when we have no children 457 """ 458 if not self._kids: 459 d.callback(None) 460 else: 461 self._onShutdown = d
462
463 - def getKid(self, avatarId):
464 return self._kids[avatarId]
465
466 - def getKids(self):
467 return self._kids.values()
468
469 - def getKidAvatarIds(self):
470 return self._kids.keys()
471
472 - def terminateAll(self):
473 self.warning("Killing all children immediately") 474 for kid in self.getKids(): 475 self.debug("Sending SIGKILL to pid %d", kid.pid) 476 common.killPid(kid.pid)
477
478 - def removeKidByPid(self, pid):
479 """ 480 Remove the kid from the kindergarten based on the pid. 481 Called by the signal handler in the brain. 482 483 @returns: whether or not a kid with that pid was removed 484 @rtype: boolean 485 """ 486 for path, kid in self._kids.items(): 487 if kid.pid == pid: 488 self.debug('Removing kid with name %s and pid %d' % ( 489 path, pid)) 490 del self._kids[path] 491 if not self._kids and self._onShutdown: 492 self.debug("Last child exited") 493 self._onShutdown.callback(None) 494 495 return True 496 497 self.warning('Asked to remove kid with pid %d but not found' % pid) 498 return False
499
500 -def _getSocketPath():
501 # FIXME: there is mkstemp for sockets, so we have a small window 502 # here in which the socket could be created by something else 503 # I didn't succeed in preparing a socket file with that name either 504 505 # caller needs to delete name before using 506 import tempfile 507 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.worker.') 508 os.close(fd) 509 510 return name
511 512 # Similar to Vishnu, but for worker related classes
513 -class WorkerBrain(log.Loggable):
514 """ 515 I am the main object in the worker process, managing jobs and everything 516 related. 517 I live in the main worker process. 518 519 @ivar authenticator: authenticator worker used to log in to manager 520 @type authenticator L{flumotion.twisted.pb.Authenticator} 521 @ivar kindergarten: 522 @type kindergarten: L{Kindergarten} 523 @ivar medium: 524 @type medium: L{WorkerMedium} 525 @ivar jobHeaven: 526 @type jobHeaven: L{JobHeaven} 527 @ivar workerClientFactory: 528 @type workerClientFactory: L{WorkerClientFactory} 529 @ivar feedServerPort: TCP port the Feed Server is listening on 530 @type feedServerPort: int 531 """ 532 533 compat.implements(interfaces.IFeedServerParent) 534 535 logCategory = 'workerbrain' 536
537 - def __init__(self, options):
538 """ 539 @param options: the optparsed dictionary of command-line options 540 @type options: an object with attributes 541 """ 542 self.options = options 543 self.workerName = options.name 544 545 self.managerHost = options.host 546 self.managerPort = options.port 547 self.managerTransport = options.transport 548 549 self.authenticator = None 550 # the last one is reserved for our FeedServer 551 ports = [] 552 if not self.options.randomFeederports: 553 ports = self.options.feederports[:-1] 554 self.medium = WorkerMedium(self, ports) 555 self._socketPath = _getSocketPath() 556 self.kindergarten = Kindergarten(options, self._socketPath, self) 557 self.jobHeaven = JobHeaven(self) 558 self.workerClientFactory = WorkerClientFactory(self) 559 560 self._port = None # port for unix domain socket, set from _setup 561 562 self._jobServerFactory = None 563 self._jobServerPort = None 564 self._feedServerFactory = feed.feedServerFactory(self) 565 566 self._feedServerPort = None # twisted port 567 self.feedServerPort = None # port number 568 569 self._createDeferreds = {} # avatarId => deferred that will fire 570 # when the job attaches 571 self._shutdownDeferreds = {} # avatarId => deferred for shutting
572 # down jobs; fires when job is reaped 573
574 - def listen(self):
575 """ 576 Start listening on FeedServer (incoming eater requests) and 577 JobServer (through which we communicate with our children) ports 578 579 @returns: True if we successfully listened on both ports 580 """ 581 # set up feed server if we have the feederports for it 582 try: 583 self._setupFeedServer() 584 except error.CannotListenError, e: 585 self.warning("Failed to listen on feed server port: %r", e) 586 return False 587 588 try: 589 self._jobServerFactory, self._jobServerPort = self._setupJobServer() 590 except error.CannotListenError, e: 591 self.warning("Failed to listen on job server port: %r", e) 592 return False 593 594 return True
595
596 - def login(self, authenticator):
597 self.authenticator = authenticator 598 self.workerClientFactory.startLogin(authenticator)
599
600 - def _setupJobServer(self):
601 """ 602 @returns: (factory, port) 603 """ 604 dispatcher = JobDispatcher(self.jobHeaven) 605 # FIXME: we should hand a username and password to log in with to 606 # the job process instead of allowing anonymous 607 checker = checkers.FlexibleCredentialsChecker() 608 checker.allowPasswordless(True) 609 p = portal.Portal(dispatcher, [checker]) 610 f = pb.PBServerFactory(p) 611 try: 612 os.unlink(self._socketPath) 613 except: 614 pass 615 616 # Rather than a listenUNIX(), we use listenWith so that we can specify 617 # our particular Port, which creates Transports that we know how to 618 # pass FDs over. 619 port = reactor.listenWith(fdserver.FDPort, self._socketPath, f) 620 621 return f, port
622
623 - def _setupFeedServer(self):
624 """ 625 @returns: (port, portNumber) 626 """ 627 port = None 628 if self.options.randomFeederports: 629 port = 0 630 else: 631 try: 632 port = self.options.feederports[-1] 633 except IndexError: 634 self.info( 635 'Not starting feed server because no port is configured') 636 return 637 638 self._feedServerPort = reactor.listenWith( 639 fdserver.PassableServerPort, port, 640 self._feedServerFactory) 641 642 # jumping through hoops is fun 643 self.feedServerPort = self._feedServerPort.getHost().port 644 self.debug('Listening for feed requests on TCP port %s' % 645 self.feedServerPort)
646 647 # FIXME: this is only called from the tests
648 - def teardown(self):
649 """ 650 Clean up after setup() 651 652 @Returns: a L{twisted.internet.defer.Deferred} that fires when 653 the teardown is completed 654 """ 655 self.debug("cleaning up port %r" % self._port) 656 dl = [] 657 if self._jobServerPort: 658 dl.append(self._jobServerPort.stopListening()) 659 if self._feedServerPort: 660 dl.append(self._feedServerPort.stopListening()) 661 662 return defer.DeferredList(dl)
663
664 - def callRemote(self, methodName, *args, **kwargs):
665 return self.medium.callRemote(methodName, *args, **kwargs)
666
667 - def shutdownHandler(self):
668 self.info("Reactor shutting down, stopping jobHeaven") 669 670 # If they fail to shut down nicely within the time, shut them down 671 # less nicely 672 reactor.callLater(JOB_SHUTDOWN_TIMEOUT, self.kindergarten.terminateAll) 673 674 d = self.jobHeaven.shutdown() 675 d2 = defer.Deferred() 676 self.kindergarten.setOnShutdown(d2) 677 dl = defer.DeferredList([d, d2]) 678 # Don't fire this other than from a callLater 679 return fdefer.defer_call_later(dl)
680
681 - def deferredCreate(self, avatarId):
682 """ 683 Create and register a deferred for creating the given component. 684 This deferred will be fired when the JobAvatar has instructed the 685 job to create the component. 686 687 @rtype: L{twisted.internet.defer.Deferred} 688 """ 689 self.debug('making create deferred for %s' % avatarId) 690 691 d = defer.Deferred() 692 693 # the question of "what jobs do we know about" is answered in 694 # three places: the create deferreds hash, the avatar list in 695 # the jobheaven, and the shutdown deferreds hash. there are four 696 # possible answers: 697 if avatarId in self._createDeferreds: 698 # (1) a job is already starting: it is in the 699 # createdeferreds hash 700 self.info('already have a create deferred for %s', avatarId) 701 raise errors.ComponentAlreadyStartingError(avatarId) 702 elif avatarId in self._shutdownDeferreds: 703 # (2) a job is shutting down; note it is also in 704 # heaven.avatars 705 self.debug('waiting for previous %s to shut down like it ' 706 'said it would', avatarId) 707 def ensureShutdown(res, 708 shutdown=self._shutdownDeferreds[avatarId]): 709 shutdown.addCallback(lambda _: res) 710 return shutdown
711 d.addCallback(ensureShutdown) 712 elif avatarId in self.jobHeaven.avatars: 713 # (3) a job is running fine 714 self.info('avatar named %s already running', avatarId) 715 raise errors.ComponentAlreadyRunningError(avatarId) 716 else: 717 # (4) it's new; we know of nothing with this avatarId 718 pass 719 720 self.debug('registering deferredCreate for %s', avatarId) 721 self._createDeferreds[avatarId] = d 722 return d
723
724 - def deferredCreateTrigger(self, avatarId):
725 """ 726 Trigger a previously registered deferred for creating up the given 727 component. 728 """ 729 self.debug('triggering create deferred for %s' % avatarId) 730 if not avatarId in self._createDeferreds: 731 self.warning('No create deferred registered for %s' % avatarId) 732 return 733 734 d = self._createDeferreds[avatarId] 735 del self._createDeferreds[avatarId] 736 # return the avatarId the component will use to the original caller 737 d.callback(avatarId)
738
739 - def deferredCreateFailed(self, avatarId, exception):
740 """ 741 Notify the caller that a create has failed, and remove the create 742 from the list of pending creates. 743 """ 744 self.debug('create deferred failed for %s' % avatarId) 745 if not avatarId in self._createDeferreds: 746 self.warning('No create deferred registered for %s' % avatarId) 747 return 748 749 d = self._createDeferreds[avatarId] 750 del self._createDeferreds[avatarId] 751 d.errback(exception)
752
753 - def deferredCreateRegistered(self, avatarId):
754 """ 755 Check if a deferred create has been registered for the given avatarId. 756 """ 757 return avatarId in self._createDeferreds
758
759 - def deferredShutdown(self, avatarId):
760 """ 761 Create and register a deferred for notifying the worker of a 762 clean job shutdown. This deferred will be fired when the job is 763 reaped. 764 765 @rtype: L{twisted.internet.defer.Deferred} 766 """ 767 self.debug('making shutdown deferred for %s' % avatarId) 768 769 if avatarId in self._shutdownDeferreds: 770 self.warning('already have a shutdown deferred for %s', 771 avatarId) 772 return self._shutdownDeferreds[avatarId] 773 else: 774 self.debug('registering deferredShutdown for %s', avatarId) 775 d = defer.Deferred() 776 self._shutdownDeferreds[avatarId] = d 777 return d
778
779 - def deferredShutdownTrigger(self, avatarId):
780 """ 781 Trigger a previously registered deferred for creating up the given 782 component. 783 """ 784 self.debug('triggering shutdown deferred for %s', avatarId) 785 if not avatarId in self._shutdownDeferreds: 786 self.warning('No shutdown deferred registered for %s', avatarId) 787 return 788 789 d = self._shutdownDeferreds.pop(avatarId) 790 d.callback(avatarId)
791
792 - def deferredShutdownRegistered(self, avatarId):
793 """ 794 Check if a deferred shutdown has been registered for the given avatarId. 795 """ 796 return avatarId in self._shutdownDeferreds
797 798 ### IFeedServerParent methods
799 - def feedToFD(self, componentId, feedName, fd, eaterId):
800 """ 801 Called from the FeedAvatar to pass a file descriptor on to 802 the job running the component for this feeder. 803 804 @returns: whether the fd was successfully handed off to the component. 805 """ 806 if componentId not in self.jobHeaven.avatars: 807 self.warning("No such component %s running", componentId) 808 return False 809 810 avatar = self.jobHeaven.avatars[componentId] 811 return avatar.sendFeed(feedName, fd, eaterId)
812
813 - def eatFromFD(self, componentId, feedId, fd):
814 """ 815 Called from the FeedAvatar to pass a file descriptor on to 816 the job running the given component. 817 818 @returns: whether the fd was successfully handed off to the component. 819 """ 820 if componentId not in self.jobHeaven.avatars: 821 self.warning("No such component %s running", componentId) 822 return False 823 824 avatar = self.jobHeaven.avatars[componentId] 825 return avatar.receiveFeed(feedId, fd)
826
827 -class JobDispatcher:
828 """ 829 I am a Realm inside the worker for forked jobs to log in to. 830 """ 831 implements(portal.IRealm) 832
833 - def __init__(self, root):
834 """ 835 @type root: L{flumotion.worker.worker.JobHeaven} 836 """ 837 self._root = root
838 839 ### portal.IRealm methods 840 # flumotion-worker job processes log in to us. 841 # The mind is a RemoteReference which allows the brain to call back into 842 # the job. 843 # the avatar id is of the form /(parent)/(name)
844 - def requestAvatar(self, avatarId, mind, *interfaces):
845 if pb.IPerspective in interfaces: 846 avatar = self._root.createAvatar(avatarId) 847 reactor.callLater(0, avatar.attached, mind) 848 return pb.IPerspective, avatar, avatar.logout 849 else: 850 raise NotImplementedError("no interface")
851
852 -class JobAvatar(pb.Avatar, log.Loggable):
853 """ 854 I am an avatar for the job living in the worker. 855 """ 856 logCategory = 'job-avatar' 857
858 - def __init__(self, heaven, avatarId):
859 """ 860 @type heaven: L{flumotion.worker.worker.JobHeaven} 861 @type avatarId: str 862 """ 863 self.avatarId = avatarId 864 self.logName = avatarId 865 self._heaven = heaven 866 self._mind = None 867 self.debug("created new JobAvatar")
868
869 - def hasRemoteReference(self):
870 """ 871 Check if the avatar has a remote reference to the peer. 872 873 @rtype: boolean 874 """ 875 return self._mind != None
876
877 - def attached(self, mind):
878 """ 879 @param mind: reference to the job's JobMedium on which we can call 880 @type mind: L{twisted.spread.pb.RemoteReference} 881 882 I am scheduled from the dispatcher's requestAvatar method. 883 """ 884 self._mind = mind 885 self.log('Client attached mind %s' % mind) 886 host = self._heaven.brain.managerHost 887 port = self._heaven.brain.managerPort 888 transport = self._heaven.brain.managerTransport 889 890 kid = self._heaven.brain.kindergarten.getKid(self.avatarId) 891 892 d = self._mind.callRemote('bootstrap', self._heaven.getWorkerName(), 893 host, port, transport, self._heaven.getAuthenticator(), kid.bundles) 894 895 yield d 896 d.value() # allow exceptions 897 898 self.debug( 899 "asking job to create component with avatarId %s, type %s" % ( 900 kid.avatarId, kid.type)) 901 d = self._mind.callRemote('create', kid.avatarId, kid.type, 902 kid.moduleName, kid.methodName, kid.nice) 903 904 yield d 905 try: 906 d.value() # check for errors 907 self.debug('job started component with avatarId %s' % kid.avatarId) 908 self._heaven.brain.deferredCreateTrigger(kid.avatarId) 909 except errors.ComponentCreateError, e: 910 self.warning('could not create component %s of type %s: %r' 911 % (kid.avatarId, kid.type, e)) 912 self._heaven.brain.deferredCreateFailed(kid.avatarId, e) 913 except Exception, e: 914 self.warning('unhandled remote error: type %s, message %s' 915 % (e.__class__.__name__, e)) 916 self._heaven.brain.deferredCreateFailed(kid.avatarId, e)
917 attached = defer_generator_method(attached) 918
919 - def logout(self):
920 self.log('logout called, %s disconnected' % self.avatarId) 921 self._mind = None
922
923 - def stop(self):
924 """ 925 returns: a deferred marking completed stop. 926 """ 927 self.debug('stopping %s' % self.avatarId) 928 if not self._mind: 929 return defer.succeed(None) 930 931 return self._mind.callRemote('stop')
932
933 - def remote_ready(self):
934 pass
935
936 - def logTo(self, stdout, stderr):
937 """ 938 Tell the feeder to log to the given file descriptors. 939 """ 940 self.debug('Giving job new stdout and stderr') 941 if self._mind: 942 try: 943 self._mind.broker.transport.sendFileDescriptor( 944 stdout, "redirectStdout") 945 self._mind.broker.transport.sendFileDescriptor( 946 stderr, "redirectStderr") 947 except exceptions.RuntimeError, e: 948 # RuntimeError is what is thrown by the C code doing this 949 # when there are issues 950 self.debug("We got a Runtime Error %s sending file descriptors.", 951 log.getExceptionMessage(e)) 952 return False
953
954 - def sendFeed(self, feedName, fd, eaterId):
955 """ 956 Tell the feeder to send the given feed to the given fd. 957 958 @returns: whether the fd was successfully handed off to the component. 959 """ 960 self.debug('Sending FD %d to component job to feed %s to fd' % ( 961 fd, feedName)) 962 963 # it is possible that the component has logged out, in which case 964 # we don't have a _mind. Trying to check for this earlier only 965 # introduces a race, so we handle it here by triggering a disconnect 966 # on the fd. 967 if self._mind: 968 try: 969 self._mind.broker.transport.sendFileDescriptor( 970 fd, "sendFeed %s %s" % (feedName, eaterId)) 971 return True 972 except exceptions.RuntimeError, e: 973 # RuntimeError is what is thrown by the C code doing this 974 # when there are issues 975 self.debug("We got a Runtime Error %s sending file descriptors.", 976 log.getExceptionMessage(e)) 977 return False 978 self.debug('my mind is gone, trigger disconnect') 979 return False
980 981 # FIXME: why do we ignore return value of sendFileDescriptor???
982 - def receiveFeed(self, feedId, fd):
983 """ 984 Tell the feeder to receive the given feed from the given fd. 985 986 @returns: whether the fd was successfully handed off to the component. 987 """ 988 self.debug('Sending FD %d to component job to eat %s from fd' % ( 989 fd, feedId)) 990 try: 991 self._mind.broker.transport.sendFileDescriptor( 992 fd, "receiveFeed %s" % feedId) 993 return True 994 except exceptions.RuntimeError, e: 995 # RuntimeError is what is thrown by the C code doing this 996 # when there are issues 997 self.debug("We got a Runtime Error %s sending file descriptors.", 998 log.getExceptionMessage(e)) 999 return False
1000
1001 - def perspective_cleanShutdown(self):
1002 """ 1003 This notification from the job process will be fired when it is 1004 shutting down, so that although the process might still be 1005 around, we know it's OK to accept new start requests for this 1006 avatar ID. 1007 """ 1008 self.info("component %s shutting down cleanly", self.avatarId) 1009 self._heaven.brain.deferredShutdown(self.avatarId)
1010 1011 ### this is a different kind of heaven, not IHeaven, for now...
1012 -class JobHeaven(pb.Root, log.Loggable):
1013 """ 1014 I am similar to but not quite the same as a manager-side Heaven. 1015 I manage avatars inside the worker for job processes spawned by the worker. 1016 1017 @ivar avatars: dict of avatarId -> avatar 1018 @type avatars: dict of str -> L{JobAvatar} 1019 @ivar brain: the worker brain 1020 @type brain: L{WorkerBrain} 1021 """ 1022 logCategory = "job-heaven"
1023 - def __init__(self, brain):
1024 """ 1025 @type brain: L{WorkerBrain} 1026 """ 1027 self.avatars = {} # componentId -> avatar 1028 self.brain = brain 1029 1030 handler = signal.signal(signal.SIGHUP, self._HUPHandler) 1031 if handler == signal.SIG_DFL or handler == signal.SIG_IGN: 1032 self._oldHUPHandler = None 1033 else: 1034 self._oldHUPHandler = handler
1035
1036 - def _HUPHandler(self, signum, frame):
1037 if self._oldHUPHandler: 1038 self.log('got SIGHUP, calling previous handler %r', 1039 self._oldHUPHandler) 1040 self._oldHUPHandler(signum, frame) 1041 self.debug('telling kids about new log file descriptors') 1042 for avatar in self.avatars.values(): 1043 avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())
1044
1045 - def createAvatar(self, avatarId):
1046 avatar = JobAvatar(self, avatarId) 1047 self.avatars[avatarId] = avatar 1048 return avatar
1049
1050 - def lostAvatar(self, avatarId):
1051 if avatarId not in self.avatars: 1052 self.warning("some programmer is telling me about an avatar " 1053 "I have no idea about: %s", avatarId) 1054 else: 1055 return self.avatars.pop(avatarId)
1056
1057 - def shutdown(self):
1058 self.debug('Shutting down JobHeaven') 1059 self.debug('Stopping all jobs') 1060 dl = defer.DeferredList([x.stop() for x in self.avatars.values()]) 1061 dl.addCallback(lambda result: self.debug('Stopped all jobs')) 1062 return dl
1063
1064 - def getAuthenticator(self):
1065 """ 1066 Gets the authenticator that the worker used to log in to the manager. 1067 1068 @rtype: L{flumotion.twisted.pb.Authenticator} 1069 """ 1070 return self.brain.authenticator
1071
1072 - def getWorkerName(self):
1073 """ 1074 Gets the name of the worker that spawns the process. 1075 1076 @rtype: str 1077 """ 1078 return self.brain.workerName
1079