Package flumotion :: Package manager :: Module component
[hide private]

Source Code for Module flumotion.manager.component

   1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_manager -*- 
   2  # vi:si:et:sw=4:sts=4:ts=4 
   3  # 
   4  # Flumotion - a streaming media server 
   5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
   6  # All rights reserved. 
   7   
   8  # This file may be distributed and/or modified under the terms of 
   9  # the GNU General Public License version 2 as published by 
  10  # the Free Software Foundation. 
  11  # This file is distributed without any warranty; without even the implied 
  12  # warranty of merchantability or fitness for a particular purpose. 
  13  # See "LICENSE.GPL" in the source distribution for more information. 
  14   
  15  # Licensees having purchased or holding a valid Flumotion Advanced 
  16  # Streaming Server license may use this file in accordance with the 
  17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
  18  # See "LICENSE.Flumotion" in the source distribution for more information. 
  19   
  20  # Headers in this file shall remain intact. 
  21   
  22  """ 
  23  manager-side objects for components 
  24   
  25  API Stability: semi-stable 
  26  """ 
  27   
  28  import time 
  29   
  30  from twisted.spread import pb 
  31  from twisted.internet import reactor, defer, error 
  32  from twisted.python.failure import Failure 
  33   
  34  from flumotion.configure import configure 
  35  # rename to base 
  36  from flumotion.manager import base 
  37  from flumotion.common import errors, interfaces, keycards, log, config, planet 
  38  from flumotion.common import messages, common 
  39  from flumotion.twisted import flavors 
  40  from flumotion.twisted.defer import defer_generator_method 
  41  from flumotion.twisted.compat import implements 
  42  from flumotion.common.planet import moods 
  43   
  44  from flumotion.common.messages import N_ 
  45  T_ = messages.gettexter('flumotion') 
  46   
47 -class ComponentAvatar(base.ManagerAvatar):
48 """ 49 I am a Manager-side avatar for a component. 50 I live in the L{ComponentHeaven}. 51 52 Each component that logs in to the manager gets an avatar created for it 53 in the manager. 54 55 @cvar avatarId: the L{componentId<common.componentId>} 56 @type avatarId: str 57 @cvar jobState: job state of this avatar's component 58 @type jobState: L{flumotion.common.planet.ManagerJobState} 59 @cvar componentState: component state of this avatar's component 60 @type componentState: L{flumotion.common.planet.ManagerComponentState} 61 """ 62 63 logCategory = 'comp-avatar' 64
65 - def __init__(self, *args, **kwargs):
66 # doc in base class 67 base.ManagerAvatar.__init__(self, *args, **kwargs) 68 69 self.componentState = None # set by the vishnu by componentAttached 70 self.jobState = None # set by the vishnu by componentAttached 71 72 # these flags say when this component is in the middle of doing stuff 73 # starting, setup and providing master clock respectively 74 self._starting = False 75 self._beingSetup = False 76 self._providingClock = False 77 78 self._ports = {} 79 80 self._shutdown_requested = False 81 82 self._happydefers = [] # deferreds to call when mood changes to happy 83 self.feeder_names = [] 84 self.eater_names = []
85 86 ### python methods
87 - def __repr__(self):
88 mood = '(unknown)' 89 if self.componentState: 90 moodValue = self.componentState.get('mood') 91 if moodValue is not None: 92 mood = moods.get(moodValue).name 93 return '<%s %s (mood %s)>' % (self.__class__.__name__, 94 self.avatarId, mood)
95 96 ### ComponentAvatar methods
97 - def cleanup(self):
98 """ 99 Clean up when detaching. 100 """ 101 if self._ports: 102 self.vishnu.releasePortsOnWorker(self.getWorkerName(), 103 self._ports.values()) 104 105 self._ports = {} 106 107 self.jobState = None
108
109 - def _setMood(self, mood):
110 if not self.componentState: 111 return 112 113 if not self.componentState.get('mood') == mood.value: 114 self.debug('Setting mood to %r' % mood) 115 self.componentState.setMood(mood.value)
116
117 - def _setMoodValue(self, moodValue):
118 mood = moods.get(moodValue) 119 self._setMood(mood)
120
121 - def _getMoodValue(self):
122 if not self.componentState: 123 return 124 return self.componentState.get('mood')
125
126 - def _addMessage(self, message):
127 if not self.componentState: 128 return 129 130 self.componentState.append('messages', message)
131 132 # general fallback for unhandled errors so we detect them 133 # FIXME: we can't use this since we want a PropertyError to fall through 134 # after going through the PropertyErrback.
135 - def _mindErrback(self, failure, *ignores):
136 if ignores: 137 if failure.check(*ignores): 138 return failure 139 self.warning("Unhandled remote call error: %s" % 140 failure.getErrorMessage()) 141 self.warning("raising '%s'" % str(failure.type)) 142 return failure
143 144 # we create this errback just so we can interject a message inbetween 145 # to make it clear the Traceback line is fine. 146 # When this is fixed in Twisted we can just remove the errback and 147 # the error will still get sent back correctly to admin.
148 - def _mindPropertyErrback(self, failure):
149 failure.trap(errors.PropertyError) 150 print "Ignore the following Traceback line, issue in Twisted" 151 return failure
152
153 - def attached(self, mind):
154 # doc in base class 155 self.info('component "%s" logged in' % self.avatarId) 156 base.ManagerAvatar.attached(self, mind) # sets self.mind 157 158 d = self.vishnu.componentAttached(self) 159 160 def checkInitialMood(_): 161 # If we're already set to happy, ensure that we trigger our 162 # deferreds 163 if self.jobState.get('mood') == moods.happy.value: 164 for d in self._happydefers: 165 d.callback(True) 166 self._happydefers = []
167 168 d.addCallback(checkInitialMood) 169 # listen to the mood so we can tell the depgraph 170 d.addCallback(lambda _: self.jobState.addListener(self, 171 set=self.stateSet)) 172 # make heaven register component 173 d.addCallback(lambda _: self.heaven.registerComponent(self)) 174 d.addCallback(lambda _: self.vishnu.registerComponent(self)) 175 return d
176
177 - def detached(self, mind):
178 # doc in base class 179 self.vishnu.unregisterComponent(self) 180 self.heaven.unregisterComponent(self) 181 182 self.info('component "%s" logged out' % self.avatarId) 183 184 self.componentState.clearJobState() 185 186 # Now, we're detached: set our state to sleeping (or lost). 187 # Do this before vishnu.componentDetached() severs our association 188 # with our componentState. Also, don't ever remove 'sad' here. 189 # If we shut down due to an explicit manager request, go to sleeping. 190 # Otherwise, go to lost, because it got disconnected for an unknown 191 # reason (probably network related) 192 if not self._getMoodValue() == moods.sad.value: 193 if self._shutdown_requested: 194 self.debug("Shutdown was requested, component now sleeping") 195 self._setMood(moods.sleeping) 196 else: 197 self.debug("Shutdown was NOT requested, component now lost") 198 self._setMood(moods.lost) 199 200 self.vishnu.componentDetached(self) 201 base.ManagerAvatar.detached(self, mind) 202 203 self.cleanup() # callback done at end
204 205 # IStateListener methods
206 - def stateSet(self, state, key, value):
207 self.log("state set on %r: %s now %r" % (state, key, value)) 208 if key == 'mood': 209 self.info('Mood changed to %s' % moods.get(value).name) 210 211 if value == moods.happy.value: 212 self.vishnu._depgraph.setComponentStarted(self.componentState) 213 # component not starting anymore 214 self._starting = False 215 # callback any deferreds waiting on this 216 for d in self._happydefers: 217 d.callback(True) 218 self._happydefers = []
219 220 # my methods
221 - def parseEaterConfig(self, eater_config):
222 # the source feeder names come from the config 223 # they are specified under <component> as <source> elements in XML 224 # so if they don't specify a feed name, use "default" as the feed name 225 eater_names = [] 226 for block in eater_config: 227 eater_name = block 228 if block.find(':') == -1: 229 eater_name = block + ':default' 230 eater_names.append(eater_name) 231 self.debug('parsed eater config, eaters %r' % eater_names) 232 self.eater_names = eater_names
233
234 - def parseFeederConfig(self, feeder_config):
235 # for pipeline components, in the case there is only one 236 # feeder, <feed></feed> still needs to be listed explicitly 237 238 # the feed names come from the config 239 # they are specified under <component> as <feed> elements in XML 240 feed_names = feeder_config 241 #self.debug("parseFeederConfig: feed_names: %r" % self.feed_names) 242 name = self.componentState.get('name') 243 # we create feeder names this component contains based on feed names 244 self.feeder_names = map(lambda n: name + ':' + n, feed_names) 245 self.debug('parsed feeder config, feeders %r' % self.feeder_names)
246 247 # FIXME: rename to something like getEaterFeeders()
248 - def getEaters(self):
249 """ 250 Get a list of L{feedId<flumotion.common.common.feedId>}s 251 for feeds this component wants to eat from. 252 253 @return: a list of feedId's, or the empty list 254 @rtype: list of str 255 """ 256 if not self.eater_names: 257 return [] 258 259 # this gets created and added by feedcomponent.py 260 return self.eater_names
261
262 - def getFeeders(self):
263 """ 264 Get a list of L{feedId<flumotion.common.common.feedId>}s that this 265 component has feeders for. 266 267 Obviously, the componentName part will be the same for all of them, 268 since it's the name of this component, but we return the feedId to be 269 similar to getEaters. 270 271 @return: a list of feedId's, or the empty list 272 @rtype: list of str 273 """ 274 # non-feed components don't have these keys 275 # FIXME: feederNames need to be renamed, either feedIds or feedNames 276 if not self.feeder_names: 277 self.warning('no feederNames key, so no feeders') 278 return [] 279 280 return self.feeder_names
281
282 - def getFeedServerPort(self):
283 """ 284 Returns the port on which a feed server for this component is 285 listening on. 286 287 @rtype: int 288 """ 289 return self.vishnu.getWorkerFeedServerPort(self.getWorkerName())
290
291 - def getRemoteManagerIP(self):
292 """ 293 Get the IP address of the manager as seen by the component. 294 295 @rtype: str 296 """ 297 return self.jobState.get('manager-ip')
298
299 - def getWorkerName(self):
300 """ 301 Return the name of the worker. 302 303 @rtype: str 304 """ 305 return self.jobState.get('workerName')
306
307 - def getPid(self):
308 """ 309 Return the PID of the component. 310 311 @rtype: int 312 """ 313 return self.jobState.get('pid')
314
315 - def getName(self):
316 """ 317 Get the name of the component. 318 319 @rtype: str 320 """ 321 return self.componentState.get('name')
322
323 - def getParentName(self):
324 """ 325 Get the name of the component's parent. 326 327 @rtype: str 328 """ 329 return self.componentState.get('parent').get('name')
330
331 - def getType(self):
332 """ 333 Get the component type name of the component. 334 335 @rtype: str 336 """ 337 return self.componentState.get('type')
338
339 - def stop(self):
340 """ 341 Tell the avatar to stop the component. 342 """ 343 d = self.mindCallRemote('stop') 344 # FIXME: real error handling 345 d.addErrback(lambda x: None) 346 return d
347
348 - def setup(self, conf):
349 """ 350 Set up the component with the given config. 351 Proxies to 352 L{flumotion.component.component.BaseComponentMedium.remote_setup} 353 354 @type conf: dict 355 """ 356 def _setupErrback(failure): 357 self._setMood(moods.sad) 358 return failure
359 360 d = self.mindCallRemote('setup', conf) 361 d.addErrback(_setupErrback) 362 return d 363 364 # This function tells the component to start 365 # feedcomponents will: 366 # - get their eaters connected to the feeders 367 # - start up their feeders
368 - def start(self):
369 """ 370 Tell the component to start, possibly linking to other components. 371 """ 372 self.debug('start') 373 conf = self.componentState.get('config') 374 master = conf['clock-master'] # avatarId of the clock master comp 375 clocking = None 376 if master != self.avatarId and master != None: 377 self.debug('Need to synchronize with clock master %r' % master) 378 d = self.heaven.getMasterClockInfo(master, self.avatarId) 379 yield d 380 try: 381 clocking = d.value() 382 self.debug('Got master clock info %r' % (clocking, )) 383 host, port, base_time = clocking 384 # FIXME: the host we get is as seen from the component, so lo 385 # mangle it here 386 # if the clock master is local (which is what we assume for now) 387 # and the slave is not, then we need to tell the slave our 388 # IP 389 if (not self.heaven._componentIsLocal(self) 390 and host == '127.0.0.1'): 391 host = self.getRemoteManagerIP() 392 self.debug('Overriding clock master host to %s' % host) 393 clocking = (host, port, base_time) 394 395 if master == self.avatarId: 396 self.debug('we are the master, so reset to None') 397 # we needed to wait for the set_master to complete, 398 # but we don't slave the clock to itself... 399 clocking = None 400 except Exception, e: 401 self.error("Could not make component start, reason %s" 402 % log.getExceptionMessage(e)) 403 404 self.debug('calling remote_start on component %r' % self) 405 d = self.mindCallRemote('start', clocking) 406 yield d 407 try: 408 d.value() 409 except errors.ComponentStartHandledError, e: 410 self.debug('already handled error while starting: %s' % 411 log.getExceptionMessage(e)) 412 except Exception, e: 413 m = messages.Error(T_(N_("Could not start component.")), 414 debug = log.getExceptionMessage(e), 415 id="component-start") 416 self._addMessage(m) 417 self.warning("Could not make component start, reason %s" 418 % log.getExceptionMessage(e)) 419 self._setMood(moods.sad) 420 raise
421 start = defer_generator_method(start) 422
423 - def eatFrom(self, fullFeedId, host, port):
424 d = self.mindCallRemote('eatFrom', fullFeedId, host, port) 425 return d
426
427 - def feedTo(self, componentId, feedId, host, port):
428 d = self.mindCallRemote('feedTo', componentId, feedId, host, port) 429 return d
430
431 - def setElementProperty(self, element, property, value):
432 """ 433 Set a property on an element. 434 435 @param element: the element to set the property on 436 @type element: str 437 @param property: the property to set 438 @type property: str 439 @param value: the value to set the property to 440 @type value: mixed 441 """ 442 if not element: 443 msg = "%s: no element specified" % self.avatarId 444 self.warning(msg) 445 raise errors.PropertyError(msg) 446 if not element in self.jobState.get('elements'): 447 msg = "%s: element '%s' does not exist" % (self.avatarId, element) 448 self.warning(msg) 449 raise errors.PropertyError(msg) 450 if not property: 451 msg = "%s: no property specified" % self.avatarId 452 self.warning(msg) 453 raise errors.PropertyError(msg) 454 self.debug("setting property '%s' on element '%s'" % (property, element)) 455 456 d = self.mindCallRemote('setElementProperty', element, property, value) 457 d.addErrback(self._mindPropertyErrback) 458 d.addErrback(self._mindErrback, errors.PropertyError) 459 return d
460
461 - def getElementProperty(self, element, property):
462 """ 463 Get a property of an element. 464 465 @param element: the element to get the property of 466 @type element: str 467 @param property: the property to get 468 @type property: str 469 """ 470 if not element: 471 msg = "%s: no element specified" % self.avatarId 472 self.warning(msg) 473 raise errors.PropertyError(msg) 474 # FIXME: this is wrong, since it's not dynamic. Elements can be 475 # renamed 476 # this will work automatically though if the component updates its 477 # state 478 if not element in self.jobState.get('elements'): 479 msg = "%s: element '%s' does not exist" % (self.avatarId, element) 480 self.warning(msg) 481 raise errors.PropertyError(msg) 482 if not property: 483 msg = "%s: no property specified" % self.avatarId 484 self.warning(msg) 485 raise errors.PropertyError(msg) 486 self.debug("getting property %s on element %s" % (element, property)) 487 d = self.mindCallRemote('getElementProperty', element, property) 488 d.addErrback(self._mindPropertyErrback) 489 d.addErrback(self._mindErrback, errors.PropertyError) 490 return d
491
492 - def reloadComponent(self):
493 """ 494 Tell the component to reload itself. 495 496 @rtype: L{twisted.internet.defer.Deferred} 497 """ 498 def _reloadComponentErrback(failure, self): 499 failure.trap(errors.ReloadSyntaxError) 500 self.warning(failure.getErrorMessage()) 501 print "Ignore the following Traceback line, issue in Twisted" 502 return failure
503 504 d = self.mindCallRemote('reloadComponent') 505 d.addErrback(_reloadComponentErrback, self) 506 d.addErrback(self._mindErrback, errors.ReloadSyntaxError) 507 return d 508 509 # FIXME: maybe make a BouncerComponentAvatar subclass ?
510 - def authenticate(self, keycard):
511 """ 512 Authenticate the given keycard. 513 Gets proxied to L{flumotion.component.bouncers.bouncer.""" \ 514 """BouncerMedium.remote_authenticate} 515 The component should be a subclass of 516 L{flumotion.component.bouncers.bouncer.Bouncer} 517 518 @type keycard: L{flumotion.common.keycards.Keycard} 519 """ 520 d = self.mindCallRemote('authenticate', keycard) 521 d.addErrback(self._mindErrback) 522 return d
523
524 - def removeKeycardId(self, keycardId):
525 """ 526 Remove a keycard managed by this bouncer because the requester 527 has gone. 528 529 @type keycardId: str 530 """ 531 self.debug('remotecalling removeKeycardId with id %s' % keycardId) 532 d = self.mindCallRemote('removeKeycardId', keycardId) 533 d.addErrback(self._mindErrback) 534 return d
535
536 - def expireKeycard(self, keycardId):
537 """ 538 Expire a keycard issued to this component because the bouncer decided 539 to. 540 541 @type keycardId: str 542 """ 543 self.debug('remotecalling expireKeycard with id %s' % keycardId) 544 d = self.mindCallRemote('expireKeycard', keycardId) 545 d.addErrback(self._mindErrback) 546 return d
547 548 ### IPerspective methods, called by the worker's component
549 - def perspective_feedReady(self, feedName, isReady):
550 """ 551 Called by the component to tell the manager that a given feed is 552 ready or not. Will notify other components depending on this 553 feeder, starting them if all of their dependencies are ready. 554 555 @param feedName: name of the feeder, e.g. "default". 556 @type feedName: str 557 @param isReady: True if the feed is now ready, False otherwise. 558 @type isReady: bool 559 @deprecated Don't call this! 560 """ 561 assert isinstance(feedName, str)
562
563 - def perspective_cleanShutdown(self):
564 """ 565 Called by a component to tell the manager that it's shutting down 566 cleanly (and thus should go to sleeping, rather than lost or sad) 567 """ 568 self.debug("shutdown is clean, shouldn't go to lost") 569 self._shutdown_requested = True
570
571 - def perspective_error(self, element, error):
572 self.error('error element=%s string=%s' % (element, error)) 573 self.heaven.removeComponent(self)
574
575 - def perspective_removeKeycardId(self, bouncerName, keycardId):
576 """ 577 Remove a keycard on the given bouncer on behalf of a component's medium. 578 579 This is requested by a component that created the keycard. 580 581 @type bouncerName: str 582 @param keycardId: id of keycard to remove 583 @type keycardId: str 584 """ 585 self.debug('asked to remove keycard %s on bouncer %s' % ( 586 keycardId, bouncerName)) 587 avatarId = '/atmosphere/%s' % bouncerName 588 if not self.heaven.hasAvatar(avatarId): 589 self.warning('No bouncer with id %s registered' % avatarId) 590 raise errors.UnknownComponentError(avatarId) 591 592 bouncerAvatar = self.heaven.getAvatar(avatarId) 593 return bouncerAvatar.removeKeycardId(keycardId)
594
595 - def perspective_expireKeycard(self, requesterId, keycardId):
596 """ 597 Expire a keycard (and thus the requester's connection) 598 issued to the given requester. 599 600 This is called by the bouncer component that authenticated the keycard. 601 602 603 @param requesterId: name (avatarId) of the component that originally 604 requested authentication for the given keycardId 605 @type requesterId: str 606 @param keycardId: id of keycard to expire 607 @type keycardId: str 608 """ 609 # FIXME: we should also be able to expire manager bouncer keycards 610 if not self.heaven.hasAvatar(requesterId): 611 self.warning('asked to expire keycard %s for requester %s, ' % ( 612 keycardId, requesterId) + 613 'but no such component registered') 614 raise errors.UnknownComponentError(requesterId) 615 616 componentAvatar = self.heaven.getAvatar(requesterId) 617 return componentAvatar.expireKeycard(keycardId)
618
619 - def perspective_reservePortsOnWorker(self, workerName, numberOfPorts):
620 """ 621 Request reservation a number of ports on a particular worker. 622 This can be called from a job if it needs some ports itself. 623 624 @param workerName: name of the worker to reserve ports on 625 @type workerName: str 626 @param numberOfPorts: the number of ports to reserve 627 @type numberOfPorts: int 628 """ 629 ports = self.heaven.vishnu.reservePortsOnWorker(workerName, 630 numberOfPorts) 631 return ports
632
633 -class ComponentHeaven(base.ManagerHeaven):
634 """ 635 I handle all registered components and provide L{ComponentAvatar}s 636 for them. 637 """ 638 639 implements(interfaces.IHeaven) 640 avatarClass = ComponentAvatar 641 642 logCategory = 'comp-heaven' 643
644 - def __init__(self, vishnu):
645 # doc in base class 646 base.ManagerHeaven.__init__(self, vishnu) 647 648 # hash of clock master avatarId -> 649 # list of (deferreds, avatarId) created by getMasterClockInfo 650 self._clockMasterWaiters = {} 651 self._masterClockInfo = {}
652 653 ### our methods
654 - def _componentIsLocal(self, componentAvatar):
655 # gets what we think is the other side's address 656 host = componentAvatar.getClientAddress() 657 658 if host == '127.0.0.1': 659 return True 660 else: 661 return False
662
663 - def removeComponent(self, componentAvatar):
664 """ 665 Remove a component avatar from the heaven. 666 667 @param componentAvatar: the component to remove 668 @type componentAvatar: L{flumotion.manager.component.ComponentAvatar} 669 """ 670 self.removeAvatar(componentAvatar.avatarId)
671
672 - def _getComponentEatersData(self, componentAvatar):
673 """ 674 Retrieve the information about the feeders this component's eaters 675 are eating from. 676 677 @param componentAvatar: the component 678 @type componentAvatar: L{flumotion.manager.component.ComponentAvatar} 679 680 @returns: list of fullFeedIds 681 """ 682 componentId = componentAvatar.avatarId 683 eaterFeedIds = componentAvatar.getEaters() 684 self.debug('feeds we eat: %r' % eaterFeedIds) 685 686 retval = [] 687 for feedId in eaterFeedIds: 688 (componentName, feedName) = common.parseFeedId(feedId) 689 flowName = common.parseComponentId(componentId)[0] 690 fullFeedId = common.fullFeedId(flowName, componentName, feedName) 691 692 retval.append(fullFeedId) 693 694 return retval
695
696 - def _getComponentFeedersData(self, component):
697 """ 698 Retrieves the data of feeders (feed producer elements) for a component. 699 700 @param component: the component 701 @type component: L{flumotion.manager.component.ComponentAvatar} 702 703 @returns: tuple of (feedId, host, port) for each feeder 704 @rtype: tuple of (str, str, int) tuple 705 """ 706 # FIXME: host and port are constant for all the feedIds, so 707 # maybe we should return host, port, list-of-feeders 708 709 # get what we think is the IP address where the component is running 710 host = component.getClientAddress() 711 port = component.getFeedServerPort() 712 feedIds = component.getFeeders() 713 self.debug('returning data for feeders: %r', feedIds) 714 return map(lambda f: (f, host, port), feedIds)
715
716 - def _startComponent(self, componentAvatar):
717 state = componentAvatar.componentState 718 conf = state.get('config') 719 720 # connect the component's eaters 721 eatersData = self._getComponentEatersData(componentAvatar) 722 for fullFeedId in eatersData: 723 self.debug('connecting eater of feed %s' % fullFeedId) 724 # FIXME: ideally we would get this from the config 725 # downstream makes more sense since it's more likely 726 # for a producer to be behind NAT 727 connection = "upstream" 728 729 if connection == "upstream": 730 self.debug('connecting from eater to feeder') 731 # find avatar that feeds this feed 732 (flowName, componentName, feedName) = common.parseFullFeedId( 733 fullFeedId) 734 avatarId = common.componentId(flowName, componentName) 735 feederAvatar = self.getAvatar(avatarId) 736 if not feederAvatar: 737 m = messages.Error(T_( 738 N_("Configuration problem.")), 739 debug="No component '%s'." % avatarId, 740 id="component-start-%s" % fullFeedId) 741 # FIXME: make addMessage and setMood public 742 componentAvatar._addMessage(m) 743 componentAvatar._setMood(moods.sad) 744 745 # FIXME: get from network map instead 746 host = feederAvatar.getClientAddress() 747 port = feederAvatar.getFeedServerPort() 748 749 # FIXME: until network map is implemented, hack to 750 # assume that connections from what appears to us to be 751 # the same IP go through localhost instead. Allows 752 # connections between components on a worker behind a 753 # firewall, but not between components running on 754 # different workers, both behind a firewall 755 eaterHost = componentAvatar.mind.broker.transport.getPeer().host 756 if eaterHost == host: 757 host = '127.0.0.1' 758 759 d = componentAvatar.eatFrom(fullFeedId, host, port) 760 yield d 761 try: 762 d.value() 763 except (error.ConnectError, error.ConnectionRefusedError), e: 764 m = messages.Error(T_( 765 N_("Could not connect component to %s:%d for feed %s."), 766 host, port, fullFeedId), 767 debug=log.getExceptionMessage(e, filename='component'), 768 id="component-start-%s" % fullFeedId) 769 # FIXME: make addMessage and setMood public 770 componentAvatar._addMessage(m) 771 componentAvatar._setMood(moods.sad) 772 raise errors.ComponentStartHandledError(e) 773 elif connection == "downstream": 774 self.debug('connecting from feeder to eater') 775 # find avatar that feeds this feed 776 (flowName, componentName, feedName) = common.parseFullFeedId( 777 fullFeedId) 778 feederAvatarId = common.componentId(flowName, componentName) 779 feederAvatar = self.getAvatar(feederAvatarId) 780 # FIXME: get from network map instead 781 host = componentAvatar.getClientAddress() 782 port = componentAvatar.getFeedServerPort() 783 d = feederAvatar.feedTo(componentAvatar.avatarId, 784 common.feedId(componentName, feedName), host, port) 785 yield d 786 try: 787 d.value() 788 except error.ConnectionRefusedError, e: 789 m = messages.Error(T_( 790 N_("Could not connect to %s:%d for feed %s."), 791 host, port, fullFeedId), 792 debug=log.getExceptionMessage(e), 793 id="component-start-%s" % fullFeedId) 794 self._addMessage(m) 795 self._setMood(moods.sad) 796 raise errors.ComponentStartHandledError 797 798 componentAvatar.debug('starting component') 799 try: 800 componentAvatar.start() 801 except errors.ComponentStartHandledError, e: 802 pass
803 _startComponent = defer_generator_method(_startComponent) 804
805 - def _tryWhatCanBeStarted(self, result=True):
806 """ 807 I try to start nodes in the depgraph if they should be started. I am 808 a recursive method, because the depgraph's list of what should be 809 started may change when nodes start/stop. 810 811 @param result: only needed because this method is added as a callback 812 """ 813 814 # Generic failure handler for 815 # synchronous and asynchronous errors 816 def handleFailure(failure, avatar, message, id_template): 817 log.warningFailure(failure, swallow=False) 818 if failure.check(errors.HandledException): 819 self.debug('failure %r already handled' % failure) 820 return 821 self.debug('showing error message for failure %r' % failure) 822 m = messages.Error(message, 823 id=id_template % componentAvatar.avatarId, 824 debug=log.getFailureMessage(failure)) 825 avatar._addMessage(m) 826 avatar._setMood(moods.sad)
827 828 self.debug("tryWhatCanBeStarted") 829 deplist = self.vishnu._depgraph.whatShouldBeStarted() 830 self.debug("Listing deplist") 831 832 if not deplist: 833 self.debug("Nothing needs to be setup or started!") 834 return 835 for dep in deplist: 836 self.debug("Deplist: %r,%r" % (dep[0], dep[1])) 837 838 # we handle all direct dependencies; 839 # an error for one of them shouldn't stop handling the others 840 for dep, deptype in deplist: 841 if dep: 842 if deptype == "COMPONENTSETUP": 843 self.debug("Component %s to be setup" % dep.get("name")) 844 componentAvatar = self.getComponentAvatarForState(dep) 845 if componentAvatar: 846 if not componentAvatar._beingSetup: 847 componentAvatar._beingSetup = True 848 # specific setup failure handler 849 def componentSetupFailed(failure): 850 componentAvatar._beingSetup = False 851 handleFailure(failure, componentAvatar, 852 T_(N_("Could not setup component.")), 853 "component-setup-%s")
854 try: 855 d = self.setupComponent(componentAvatar) 856 except: 857 # give feedback of synchronous failures 858 # to the componentAvatar, and resume the loop 859 componentSetupFailed(Failure()) 860 continue 861 # add callback because nodes that can be 862 # started as a result of this component being 863 # setup may not be in the current list, and 864 # add errback to be able to give feedback 865 # of asynchronous failures to the componentAvatar. 866 def setupErrback(failure): 867 componentSetupFailed(failure) 868 raise errors.ComponentSetupHandledError(failure) 869 d.addCallbacks(self._tryWhatCanBeStarted, 870 setupErrback) 871 else: 872 self.debug( 873 "Component %s already on way to being setup", 874 dep.get("name")) 875 else: 876 self.debug( 877 "Component %s to be setup but has no avatar yet", 878 dep.get("name")) 879 elif deptype == "COMPONENTSTART": 880 self.debug("Component %s to be started" % dep.get("name")) 881 componentAvatar = self.getComponentAvatarForState(dep) 882 if not componentAvatar._starting: 883 componentAvatar._starting = True 884 happyd = defer.Deferred() 885 # since we've reached happy, we should clear the pending 886 # mood - it is done transitioning 887 happyd.addCallback(lambda r, s: s.set( 888 'moodPending', None), 889 componentAvatar.componentState) 890 # add callback because nodes that can be 891 # started as a result of this component being 892 # happy may not be in the current list. 893 happyd.addCallback(self._tryWhatCanBeStarted) 894 componentAvatar._happydefers.append(happyd) 895 896 # specific startup failure handler 897 def componentStartupFailed(failure): 898 componentAvatar._starting = False 899 handleFailure(failure, componentAvatar, 900 T_(N_("Could not start component.")), 901 "component-start-%s") 902 try: 903 d = self._startComponent(componentAvatar) 904 except: 905 # give feedback of synchronous failures 906 # to the componentAvatar, and resume the loop 907 componentStartupFailed(Failure()) 908 continue 909 # add errback to be able to give feedback 910 # of asynchronous failures to the componentAvatar. 911 def startErrback(failure): 912 componentStartupFailed(failure) 913 raise errors.ComponentStartHandledError(failure) 914 d.addErrback(startErrback) 915 else: 916 self.log("Component is already starting") 917 elif deptype == "CLOCKMASTER": 918 self.debug("Component %s to be clock master!", 919 dep.get("name")) 920 componentAvatar = self.getComponentAvatarForState(dep) 921 if componentAvatar: 922 if not componentAvatar._providingClock: 923 componentAvatar._providingClock = True 924 # specific master clock failure handler 925 def componentMasterClockFailed(failure): 926 componentAvatar._providingClock = False 927 handleFailure(failure, componentAvatar, 928 T_(N_("Could not setup component's master clock.")), 929 "component-clock-%s") 930 try: 931 d = self.provideMasterClock(componentAvatar) 932 except: 933 # give feedback of synchronous failures 934 # to the componentAvatar and resume the loop 935 componentMasterClockFailed(Failure()) 936 continue 937 # add callback because nodes that can be 938 # started as a result of this component providing 939 # master clock may not be in the current list, and 940 # add errback to be able to give feedback 941 # of asynchronous failures to the componentAvatar. 942 def clockMasterErrback(failure): 943 componentMasterClockFailed(failure) 944 raise errors.ComponentStartHandledError(failure) 945 d.addCallbacks(self._tryWhatCanBeStarted, 946 clockMasterErrback) 947 else: 948 self.debug( 949 "Component %s already on way to clock master", 950 dep.get("name")) 951 else: 952 self.debug("Unknown dependency type") 953
954 - def _setupComponent(self, componentAvatar):
955 # set up the component 956 state = componentAvatar.componentState 957 conf = state.get('config') 958 eater_config = conf.get('source', []) 959 feeder_config = conf.get('feed', []) 960 componentAvatar.parseEaterConfig(eater_config) 961 componentAvatar.parseFeederConfig(feeder_config) 962 963 self.debug('setting up componentAvatar %r' % componentAvatar) 964 d = componentAvatar.setup(conf) 965 yield d 966 967 try: 968 d.value() 969 self.debug("componentAvatar %r now setup" % componentAvatar) 970 self.vishnu._depgraph.setComponentSetup(state) 971 # now not being setup 972 componentAvatar._beingSetup = False 973 except errors.HandledException, e: 974 self.warning('setup failed, already handled: %s' % 975 log.getExceptionMessage(e)) 976 raise e 977 except Exception, e: 978 self.warning('setup failed: %s' % log.getExceptionMessage(e)) 979 m = messages.Error(T_( 980 N_("Could not setup component.")), 981 debug=log.getExceptionMessage(e), 982 id="component-setup-%s" % componentAvatar.avatarId) 983 componentAvatar._addMessage(m) 984 componentAvatar._setMood(moods.sad) 985 raise errors.FlumotionError('Could not set up component')
986 987 setupComponent = defer_generator_method(_setupComponent) 988
989 - def registerComponent(self, componentAvatar):
990 """ 991 This function registers a component in the heaven. 992 It is triggered when the mind is attached. 993 994 @param componentAvatar: the component to register 995 @type componentAvatar: L{flumotion.manager.component.ComponentAvatar} 996 """ 997 self.debug('heaven registering component %r' % componentAvatar)
998 # nothing to do 999
1000 - def unregisterComponent(self, componentAvatar):
1001 """ 1002 This function unregisters a component in the heaven. 1003 It is triggered when the mind is detached. 1004 1005 @param componentAvatar: the component to unregister 1006 @type componentAvatar: L{flumotion.manager.component.ComponentAvatar} 1007 """ 1008 componentAvatar.debug('unregistering component') 1009 1010 conf = componentAvatar.componentState.get('config') 1011 if conf['clock-master'] == componentAvatar.avatarId: 1012 # houston, we have a master clock 1013 self.removeMasterClock(componentAvatar)
1014
1015 - def provideMasterClock(self, componentAvatar):
1016 """ 1017 Tell the given component to provide a master clock. 1018 Trigger all deferreds waiting on this componentAvatar to provide 1019 a master clock. 1020 1021 @type componentAvatar: L{ComponentAvatar} 1022 1023 @rtype: L{twisted.internet.defer.Deferred} 1024 """ 1025 avatarId = componentAvatar.avatarId 1026 self.debug('provideMasterClock on component %s' % avatarId) 1027 1028 def setMasterClockInfo(result): 1029 # we get host, port, base_time 1030 # FIXME: host is the default from NetClock, so the local IP, 1031 # always. A little inconvenient. 1032 self._masterClockInfo[avatarId] = result 1033 self.vishnu._depgraph.setClockMasterStarted( 1034 componentAvatar.componentState) 1035 # not in process of providing anymore 1036 componentAvatar._providingClock = False 1037 return result
1038 1039 def wakeClockMasterWaiters(result): 1040 self.info('Received master clock info from clock master %s' % 1041 avatarId) 1042 self.debug('got master clock info: %r' % (result, )) 1043 1044 # wake all components waiting on the clock master info 1045 if avatarId in self._clockMasterWaiters: 1046 waiters = self._clockMasterWaiters[avatarId] 1047 del self._clockMasterWaiters[avatarId] 1048 for d, waiterId in waiters: 1049 self.debug( 1050 'giving master clock info to waiting component %s' % 1051 waiterId) 1052 d.callback(result) 1053 1054 workerName = componentAvatar.getWorkerName() 1055 port = self.vishnu.reservePortsOnWorker(workerName, 1)[0] 1056 1057 def failedToProvideMasterClock(failure): 1058 # check if we actually did provide master clock and failed 1059 # to give to a waiting component or we actually failed to 1060 # provide a master clock 1061 if avatarId in self._masterClockInfo: 1062 self.warning('Failed to provide master clock info to a ' 1063 'component waiting for it') 1064 else: 1065 self.warning('Failed to provide master clock itself') 1066 self.debug('Going to release port') 1067 self.vishnu.releasePortsOnWorker(workerName, [port]) 1068 self.warning(failure.getErrorMessage()) 1069 1070 if avatarId in self._masterClockInfo: 1071 self.warning('component %s already has master clock info: %r' 1072 % (avatarId, self._masterClockInfo[avatarId])) 1073 del self._masterClockInfo[avatarId] 1074 d = componentAvatar.mindCallRemote('provideMasterClock', port) 1075 d.addCallback(setMasterClockInfo) 1076 d.addCallback(wakeClockMasterWaiters) 1077 d.addErrback(failedToProvideMasterClock) 1078 return d 1079
1080 - def removeMasterClock(self, componentAvatar):
1081 """ 1082 Tell the given component to stop providing a master clock. 1083 1084 @type componentAvatar: L{ComponentAvatar} 1085 """ 1086 1087 avatarId = componentAvatar.avatarId 1088 workerName = componentAvatar.getWorkerName() 1089 1090 # if any components were waiting on master clock info from this 1091 # clock master, errback them 1092 if avatarId in self._clockMasterWaiters: 1093 waiters = self._clockMasterWaiters[avatarId] 1094 del self._clockMasterWaiters[avatarId] 1095 for d, waiterId in waiters: 1096 self.debug('telling waiting component %s that ' 1097 'the clock master %s is gone' % (waiterId, avatarId)) 1098 d.errback(errors.ComponentStartError( 1099 'clock master component start cancelled')) 1100 1101 # release our clock port 1102 if avatarId in self._masterClockInfo: 1103 info = self._masterClockInfo[avatarId] 1104 if info: 1105 port = info[1] 1106 self.vishnu.releasePortsOnWorker(workerName, [port]) 1107 else: 1108 self.debug('avatarId %r has None masterClockInfo' % avatarId) 1109 del self._masterClockInfo[avatarId] 1110 else: 1111 self.warning('component %s has no master clock info' 1112 % (avatarId,))
1113
1114 - def getMasterClockInfo(self, avatarId, waiterId=None):
1115 """ 1116 Get the master clock information from the given clock master component. 1117 1118 @param avatarId: the id of the clock master 1119 @type avatarId: str 1120 @param waiterId: the id of the requesting component 1121 @type waiterId: str 1122 1123 @returns: a deferred firing an (ip, port, base_time) triple. 1124 @rtype: L{twisted.internet.defer.Deferred} 1125 """ 1126 self.debug('getting master clock info for component %s' % avatarId) 1127 1128 # if we already have it, return it immediately 1129 if avatarId in self._masterClockInfo: 1130 return defer.succeed(self._masterClockInfo[avatarId]) 1131 1132 if not avatarId in self._clockMasterWaiters: 1133 self._clockMasterWaiters[avatarId] = [] 1134 1135 # otherwise, add a deferred and our own avatarId 1136 ret = defer.Deferred() 1137 self._clockMasterWaiters[avatarId].append((ret, waiterId)) 1138 return ret
1139
1140 - def getComponentAvatarForState(self, state):
1141 """ 1142 Return a component avatar for the given state. 1143 1144 @type state: L{flumotion.common.planet.ManagerComponentState} 1145 1146 @rtype: L{ComponentAvatar} 1147 """ 1148 if state in self.vishnu._componentMappers: 1149 return self.vishnu._componentMappers[state].avatar 1150 else: 1151 return None
1152