Package flumotion :: Package manager :: Module manager
[hide private]

Source Code for Module flumotion.manager.manager

   1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_manager -*- 
   2  # vi:si:et:sw=4:sts=4:ts=4 
   3  # 
   4  # Flumotion - a streaming media server 
   5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
   6  # All rights reserved. 
   7   
   8  # This file may be distributed and/or modified under the terms of 
   9  # the GNU General Public License version 2 as published by 
  10  # the Free Software Foundation. 
  11  # This file is distributed without any warranty; without even the implied 
  12  # warranty of merchantability or fitness for a particular purpose. 
  13  # See "LICENSE.GPL" in the source distribution for more information. 
  14   
  15  # Licensees having purchased or holding a valid Flumotion Advanced 
  16  # Streaming Server license may use this file in accordance with the 
  17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
  18  # See "LICENSE.Flumotion" in the source distribution for more information. 
  19   
  20  # Headers in this file shall remain intact. 
  21   
  22  """ 
  23  manager implementation and related classes 
  24   
  25  API Stability: semi-stable 
  26   
  27  @var  LOCAL_IDENTITY: an identity for the manager itself; can be used 
  28                        to compare against to verify that the manager 
  29                        requested an action 
  30  @type LOCAL_IDENTITY: L{LocalIdentity} 
  31  """ 
  32   
  33  __all__ = ['ManagerServerFactory', 'Vishnu'] 
  34   
  35  import os 
  36   
  37  from twisted.internet import reactor, defer 
  38  from twisted.cred import error 
  39  from twisted.python import components, failure 
  40  from twisted.spread import pb 
  41  from twisted.cred import portal 
  42   
  43  from flumotion.common import bundle, config, errors, interfaces, log, registry 
  44  from flumotion.common import planet, common, dag, messages, reflectcall, server 
  45  from flumotion.common.identity import RemoteIdentity, LocalIdentity 
  46  from flumotion.common.planet import moods 
  47  from flumotion.configure import configure 
  48  from flumotion.manager import admin, component, worker, base, depgraph 
  49  from flumotion.twisted import checkers 
  50  from flumotion.twisted import portal as fportal 
  51  from flumotion.twisted.defer import defer_generator_method 
  52  from flumotion.twisted.compat import implements 
  53  from flumotion.common.messages import N_ 
  54  T_ = messages.gettexter('flumotion') 
  55   
  56  LOCAL_IDENTITY = LocalIdentity('manager') 
  57   
58 -def _find(list, value, proc=lambda x: x):
59 return list[[proc(x) for x in list].index(value)]
60
61 -def _first(list, proc=lambda x: x):
62 for x in list: 63 if proc(x): return x
64
65 -def _any(list, proc=lambda x: x):
66 return filter(proc, list)
67
68 -def _fint(*procs):
69 # intersection of functions 70 def int(*args, **kwargs): 71 for p in procs: 72 if not p(*args, **kwargs): return False 73 return True
74 return int 75 76 77 # an internal class
78 -class Dispatcher(log.Loggable):
79 """ 80 I implement L{twisted.cred.portal.IRealm}. 81 I make sure that when a L{pb.Avatar} is requested through me, the 82 Avatar being returned knows about the mind (client) requesting 83 the Avatar. 84 """ 85 86 implements(portal.IRealm) 87 88 logCategory = 'dispatcher' 89
90 - def __init__(self, computeIdentity):
91 """ 92 @param computeIdentity: see L{Vishnu.computeIdentity} 93 @type computeIdentity: callable 94 """ 95 # FIXME: Is passing a callable to a constructor offending anyone 96 # else's sense of aesthetics ? 97 self._interfaceHeavens = {} # interface -> heaven 98 self._avatarHeavens = {} # avatarId -> heaven 99 self._computeIdentity = computeIdentity
100 101 ### IRealm methods 102 103 # requestAvatar gets called through ClientFactory.login() 104 # An optional second argument can be passed to login, which should be 105 # a L{twisted.spread.flavours.Referenceable} 106 # A L{twisted.spread.pb.RemoteReference} to it is passed to 107 # requestAvatar as mind. 108 109 # So in short, the mind is a reference to the client passed in login() 110 # on the peer, allowing any object that has the mind to call back 111 # to the piece that called login(), 112 # which in our case is a component or an admin client.
113 - def requestAvatar(self, avatarId, keycard, mind, *ifaces):
114 def got_avatar(avatar): 115 # OK so this is byzantine, but test_manager_manager actually 116 # uses these kwargs to set its own info. so don't change 117 # these args or their order or you will break your test 118 # suite. 119 def cleanup(avatarId=avatarId, avatar=avatar, mind=mind): 120 self.removeAvatar(avatarId, avatar, mind)
121 # schedule a perspective attached for after this function 122 # FIXME: there needs to be a way to not have to do a callLater 123 # blindly so cleanup can be guaranteed 124 reactor.callLater(0, avatar.attached, mind) 125 return (pb.IPerspective, avatar, cleanup)
126 def got_error(failure): 127 # If we failed for some reason, we want to drop the connection. 128 # However, we want the failure to get to the client, so we don't 129 # call loseConnection() immediately - we return the failure first. 130 # loseConnection() will then not drop the connection until it has 131 # finished sending the current data to the client. 132 reactor.callLater(0, mind.broker.transport.loseConnection) 133 return failure 134 135 host = common.addressGetHost(mind.broker.transport.getPeer()) 136 d = self.createAvatarFor(avatarId, keycard, host, ifaces) 137 d.addCallbacks(got_avatar, got_error) 138 return d 139 140 ### our methods 141
142 - def removeAvatar(self, avatarId, avatar, mind):
143 """ 144 Remove an avatar because it logged out of the manager. 145 146 This function is registered by requestAvatar. 147 """ 148 heaven = self._avatarHeavens[avatarId] 149 del self._avatarHeavens[avatarId] 150 151 avatar.detached(mind) 152 heaven.removeAvatar(avatarId)
153
154 - def createAvatarFor(self, avatarId, keycard, remoteHost, ifaces):
155 """ 156 Create an avatar from the heaven implementing the given interface. 157 158 @type avatarId: str 159 @param avatarId: the name of the new avatar 160 @type keycard: L{flumotion.common.keycards.Keycard} 161 @param keycard: the credentials being used to log in 162 @type remoteHost: str 163 @param remoteHost: the remote host 164 @type ifaces: tuple of interfaces linked to heaven 165 @param ifaces: a list of heaven interfaces to get avatar from, 166 including pb.IPerspective 167 168 @returns: a deferred that will fire an avatar from 169 the heaven managing the given interface. 170 """ 171 def gotIdentity(identity): 172 for iface in ifaces: 173 heaven = self._interfaceHeavens.get(iface, None) 174 if heaven: 175 avatar = heaven.createAvatar(avatarId, identity) 176 self.debug('Created avatar %r for identity %r' % ( 177 avatar, identity)) 178 self._avatarHeavens[avatarId] = heaven 179 return avatar 180 raise errors.NoPerspectiveError("%s requesting iface %r", 181 avatarId, ifaces)
182 183 if not pb.IPerspective in ifaces: 184 raise errors.NoPerspectiveError(avatarId) 185 d = self._computeIdentity(keycard, remoteHost) 186 d.addCallback(gotIdentity) 187 return d 188
189 - def registerHeaven(self, heaven, interface):
190 """ 191 Register a Heaven as managing components with the given interface. 192 193 @type interface: L{twisted.python.components.Interface} 194 @param interface: a component interface to register the heaven with. 195 """ 196 assert isinstance(heaven, base.ManagerHeaven) 197 198 self._interfaceHeavens[interface] = heaven
199
200 -class ComponentMapper:
201 """ 202 I am an object that ties together different objects related to a 203 component. I am used as values in a lookup hash in the vishnu. 204 """
205 - def __init__(self):
206 self.state = None # ManagerComponentState; created first 207 self.id = None # avatarId of the eventual ComponentAvatar 208 self.avatar = None # ComponentAvatar 209 self.jobState = None # ManagerJobState of a running component
210
211 -class Vishnu(log.Loggable):
212 """ 213 I am the toplevel manager object that knows about all heavens and factories. 214 215 @cvar dispatcher: dispatcher to create avatars 216 @type dispatcher: L{Dispatcher} 217 @cvar workerHeaven: the worker heaven 218 @type workerHeaven: L{worker.WorkerHeaven} 219 @cvar componentHeaven: the component heaven 220 @type componentHeaven: L{component.ComponentHeaven} 221 @cvar adminHeaven: the admin heaven 222 @type adminHeaven: L{admin.AdminHeaven} 223 """ 224 225 implements(server.IServable) 226 227 logCategory = "vishnu" 228
229 - def __init__(self, name, unsafeTracebacks=0, configDir=None):
230 # create a Dispatcher which will hand out avatars to clients 231 # connecting to me 232 self.dispatcher = Dispatcher(self.computeIdentity) 233 234 self.workerHeaven = self._createHeaven(interfaces.IWorkerMedium, 235 worker.WorkerHeaven) 236 self.componentHeaven = self._createHeaven(interfaces.IComponentMedium, 237 component.ComponentHeaven) 238 self.adminHeaven = self._createHeaven(interfaces.IAdminMedium, 239 admin.AdminHeaven) 240 241 if configDir is not None: 242 self.configDir = configDir 243 else: 244 self.configDir = os.path.join(configure.configdir, 245 "managers", name) 246 247 self.bouncer = None # used by manager to authenticate worker/component 248 249 self.bundlerBasket = registry.getRegistry().makeBundlerBasket() 250 251 self._componentMappers = {} # any object -> ComponentMapper 252 253 self.state = planet.ManagerPlanetState() 254 self.state.set('name', name) 255 256 self.plugs = {} # socket -> list of plugs 257 258 self._depgraph = depgraph.DepGraph() 259 260 # create a portal so that I can be connected to, through our dispatcher 261 # implementing the IRealm and a bouncer 262 # FIXME: decide if we allow anonymous login in this small (?) window 263 self.portal = fportal.BouncerPortal(self.dispatcher, None) 264 #unsafeTracebacks = 1 # for debugging tracebacks to clients 265 self.factory = pb.PBServerFactory(self.portal, 266 unsafeTracebacks=unsafeTracebacks) 267 268 self.connectionInfo = {} 269 self.setConnectionInfo(None, None, None) 270 271 self.configuration = None
272
273 - def setConnectionInfo(self, host, port, use_ssl):
274 info = dict(host=host, port=port, use_ssl=use_ssl) 275 self.connectionInfo.update(info)
276
277 - def getConfiguration(self):
278 """Returns the manager's configuration as a string suitable for 279 importing via loadConfiguration(). 280 """ 281 if self.configuration: 282 return self.configuration.export() 283 else: 284 return None
285
286 - def getBundlerBasket(self):
287 """ 288 Return a bundler basket to unbundle from. 289 If the registry files were updated since the last time, the 290 bundlerbasket will be rebuilt. 291 292 @since: 0.2.2 293 @rtype: L{flumotion.common.bundle.BundlerBasket} 294 """ 295 if registry.getRegistry().rebuildNeeded(): 296 self.info("Registry changed, rebuilding") 297 registry.getRegistry().verify() 298 self.bundlerBasket = registry.getRegistry().makeBundlerBasket() 299 return self.bundlerBasket
300
301 - def adminAction(self, identity, message, args, kw):
302 """ 303 @param identity: L{flumotion.common.identity.Identity} 304 """ 305 socket = 'flumotion.component.plugs.adminaction.AdminAction' 306 if self.plugs.has_key(socket): 307 for plug in self.plugs[socket]: 308 plug.action(identity, message, args, kw)
309
310 - def computeIdentity(self, keycard, remoteHost):
311 """ 312 Compute a suitable identity for a remote host. First looks to 313 see if there is a 314 flumotion.component.plugs.identity.IdentityProvider plug 315 installed on the manager, falling back to user@host. 316 317 The identity is only used in the adminaction interface. An 318 example of its use is when you have an adminaction plug that 319 checks an admin's privileges before actually doing an action; 320 the identity object you use here might store the privileges that 321 the admin has. 322 323 @param keycard: the keycard that the remote host used to log in. 324 @type keycard: L{flumotion.common.keycards.Keycard} 325 @param remoteHost: the ip of the remote host 326 @type remoteHost: str 327 328 @rtype: a deferred that will fire a 329 L{flumotion.common.identity.RemoteIdentity} 330 """ 331 332 socket = 'flumotion.component.plugs.identity.IdentityProvider' 333 if self.plugs.has_key(socket): 334 for plug in self.plugs[socket]: 335 identity = plug.computeIdentity(keycard, remoteHost) 336 if identity: 337 return identity 338 username = getattr(keycard, 'username', None) 339 return defer.succeed(RemoteIdentity(username, remoteHost))
340
341 - def _makeBouncer(self, conf, identity):
342 # returns a deferred, always 343 if not (conf.manager and conf.manager.bouncer): 344 self.log('no bouncer in config') 345 return defer.succeed(None) 346 347 self.debug('going to start manager bouncer %s of type %s' % ( 348 conf.manager.bouncer.name, conf.manager.bouncer.type)) 349 350 if identity != LOCAL_IDENTITY: 351 self.adminAction(identity, '_makeBouncer', (conf,), {}) 352 353 defs = registry.getRegistry().getComponent( 354 conf.manager.bouncer.type) 355 entry = defs.getEntryByType('component') 356 # FIXME: use entry.getModuleName() (doesn't work atm?) 357 moduleName = defs.getSource() 358 methodName = entry.getFunction() 359 bouncer = reflectcall.createComponent(moduleName, methodName) 360 361 configDict = conf.manager.bouncer.getConfigDict() 362 self.debug('setting up manager bouncer') 363 d = bouncer.setup(configDict) 364 def setupCallback(result): 365 bouncer.debug('started') 366 self.setBouncer(bouncer)
367 def setupErrback(failure): 368 failure.trap(errors.ConfigError) 369 self.warning('Configuration error in manager bouncer: %s' % 370 failure.value.args[0])
371 d.addCallback(setupCallback) 372 d.addErrback(setupErrback) 373 return d 374
375 - def _addManagerPlug(self, socket, args, identity):
376 self.debug('loading plug type %s for socket %s' 377 % (args['type'], socket)) 378 379 if identity != LOCAL_IDENTITY: 380 self.adminAction(identity, '_addManagerPlug', (socket, args), {}) 381 382 defs = registry.getRegistry().getPlug(args['type']) 383 e = defs.getEntry() 384 call = reflectcall.reflectCallCatching 385 386 plug = call(errors.ConfigError, 387 e.getModuleName(), e.getFunction(), args) 388 self.plugs[socket].append(plug) 389 plug.start(self)
390
391 - def _addManagerPlugs(self, _, conf, identity):
392 if not conf.manager: 393 return 394 395 for socket, plugs in conf.manager.plugs.items(): 396 if not socket in self.plugs: 397 self.plugs[socket] = [] 398 399 for args in plugs: 400 self._addManagerPlug(socket, args, identity)
401
402 - def _addComponent(self, conf, parent, identity):
403 """ 404 Add a component state for the given component config entry. 405 406 @rtype: L{flumotion.common.planet.ManagerComponentState} 407 """ 408 409 self.debug('adding component %s to %s' 410 % (conf.name, parent.get('name'))) 411 412 if identity != LOCAL_IDENTITY: 413 self.adminAction(identity, '_addComponent', (conf, parent), {}) 414 415 state = planet.ManagerComponentState() 416 state.set('name', conf.name) 417 state.set('type', conf.getType()) 418 state.set('workerRequested', conf.worker) 419 state.setMood(moods.sleeping.value) 420 state.set('config', conf.getConfigDict()) 421 422 state.set('parent', parent) 423 parent.append('components', state) 424 425 avatarId = conf.getConfigDict()['avatarId'] 426 427 if conf.getConfigDict()['version'] != configure.versionTuple: 428 m = messages.Warning(T_(N_("This component is configured for " 429 "Flumotion version %s, but you are running version %s.\n" 430 "Please update the configuration of the component.\n"), 431 common.versionTupleToString(conf.getConfigDict()['version']), 432 configure.version)) 433 state.append('messages', m) 434 435 # add to mapper 436 m = ComponentMapper() 437 m.state = state 438 m.id = avatarId 439 self._componentMappers[state] = m 440 self._componentMappers[avatarId] = m 441 442 return state
443
444 - def _updateFlowDependencies(self, state):
445 self.debug('registering dependencies of %r' % state) 446 447 self._depgraph.addComponent(state) 448 449 conf = state.get('config') 450 451 # If this component has the same id as the clock-master, then it is the 452 # clock master; add to the dependency graph. 453 componentAvatarId = common.componentId( 454 state.get('parent').get('name'), state.get('name')) 455 456 if componentAvatarId == conf['clock-master']: 457 self._depgraph.addClockMaster(state)
458
459 - def _updateStateFromConf(self, _, conf, identity):
460 """ 461 Add a new config object into the planet state. 462 463 @returns: a list of all components added 464 @rtype: list of L{flumotion.common.planet.ManagerComponentState} 465 """ 466 467 self.debug('syncing up planet state with config') 468 added = [] # added components while parsing 469 470 state = self.state 471 atmosphere = state.get('atmosphere') 472 for name, c in conf.atmosphere.components.items(): 473 if name in [x.get('name') for x in atmosphere.get('components')]: 474 self.debug('atmosphere already has component %s' % name) 475 else: 476 added.append(self._addComponent(c, atmosphere, identity)) 477 478 flows = dict([(x.get('name'), x) for x in state.get('flows')]) 479 for f in conf.flows: 480 try: 481 flow = flows[f.name] 482 self.debug('checking existing flow %s' % f.name) 483 except KeyError: 484 self.info('creating flow "%s"' % f.name) 485 flow = planet.ManagerFlowState(name=f.name, parent=state) 486 state.append('flows', flow) 487 488 components = [x.get('name') for x in flow.get('components')] 489 for name, c in f.components.items(): 490 if name in components: 491 self.debug('component %s already in flow %s' 492 % (c.name, f.name)) 493 else: 494 added.append(self._addComponent(c, flow, identity)) 495 496 for componentState in added: 497 self._updateFlowDependencies(componentState) 498 499 try: 500 self._depgraph.mapEatersToFeeders() 501 except errors.ComponentConfigError, e: 502 state = e.args[0] 503 debug = e.args[1] 504 message = messages.Error(T_( 505 N_("The component is misconfigured.")), 506 debug=debug) 507 state.append('messages', message) 508 state.setMood(moods.sad.value) 509 raise e 510 511 return added
512
513 - def _startComponents(self, components, conf, identity):
514 # now start all components that need starting -- collecting into 515 # an temporary dict of the form {workerId => [components]} 516 componentsToStart = {} 517 for c in components: 518 workerId = c.get('workerRequested') 519 if not workerId in componentsToStart: 520 componentsToStart[workerId] = [] 521 componentsToStart[workerId].append(c) 522 self.debug('_startComponents: componentsToStart %r' % componentsToStart) 523 524 for workerId, componentStates in componentsToStart.items(): 525 self._workerCreateComponents(workerId, componentStates)
526
527 - def _loadConfiguration(self, conf, identity):
528 # makeBouncer only makes a bouncer if there is one in the config 529 d = self._makeBouncer(conf, identity) 530 d.addCallback(self._addManagerPlugs, conf, identity) 531 d.addCallback(self._updateStateFromConf, conf, identity) 532 d.addCallback(self._startComponents, conf, identity) 533 return d
534
535 - def loadConfigurationXML(self, file, identity):
536 """ 537 Load the configuration from the given XML, merging it on top of 538 the currently running configuration. 539 540 @param file: file to parse, either as an open file object, 541 or as the name of a file to open 542 @type file: str or file 543 @param identity: The identity making this request.. This is used by the 544 adminaction logging mechanism in order to say who is 545 performing the action. 546 @type identity: L{flumotion.common.identity.Identity} 547 """ 548 self.debug('loading configuration') 549 self.configuration = conf = config.FlumotionConfigXML(file) 550 conf.parse() 551 return self._loadConfiguration(conf, identity)
552
553 - def _createHeaven(self, interface, klass):
554 """ 555 Create a heaven of the given klass that will send avatars to clients 556 implementing the given medium interface. 557 558 @param interface: the medium interface to create a heaven for 559 @type interface: L{flumotion.common.interfaces.IMedium} 560 @param klass: the type of heaven to create 561 @type klass: an implementor of L{flumotion.common.interfaces.IHeaven} 562 """ 563 assert issubclass(interface, interfaces.IMedium) 564 heaven = klass(self) 565 self.dispatcher.registerHeaven(heaven, interface) 566 return heaven
567
568 - def setBouncer(self, bouncer):
569 """ 570 @type bouncer: L{flumotion.component.bouncers.bouncer.Bouncer} 571 """ 572 if self.bouncer: 573 self.warning("manager already had a bouncer, setting anyway") 574 575 self.bouncer = bouncer 576 self.portal.bouncer = bouncer
577
578 - def getFactory(self):
579 return self.factory
580
581 - def componentCreate(self, componentState):
582 """ 583 Create the given component. This will currently also trigger 584 a start eventually when the component avatar attaches. 585 586 The component should be sleeping. 587 The worker it should be started on should be present. 588 """ 589 m = componentState.get('mood') 590 if m != moods.sleeping.value: 591 raise errors.ComponentMoodError("%r not sleeping but %s" % ( 592 componentState, moods.get(m).name)) 593 594 p = componentState.get('moodPending') 595 if p != None: 596 raise errors.ComponentMoodError( 597 "%r already has a pending mood %s" % ( 598 componentState, moods.get(p).name)) 599 600 # find a worker this component can start on 601 workerId = (componentState.get('workerName') 602 or componentState.get('workerRequested')) 603 604 if not workerId in self.workerHeaven.avatars: 605 raise errors.ComponentNoWorkerError( 606 "worker %s is not logged in" % workerId) 607 else: 608 return self._workerCreateComponents(workerId, [componentState])
609
610 - def componentStop(self, componentState):
611 """ 612 Stop the given component. 613 If the component was sad, we clear its sad state as well, 614 since the stop was explicitly requested by the admin. 615 616 @type componentState: L{planet.ManagerComponentState} 617 618 @rtype: L{defer.Deferred} 619 """ 620 self.debug('componentStop(%r)' % componentState) 621 # clear all messages 622 for m in componentState.get('messages'): 623 self.debug('Removing message %r' % m) 624 componentState.remove('messages', m) 625 626 # We permit stopping a component even if it has a pending mood of 627 # happy, so that if it never gets to happy, we can still stop it. 628 if (componentState.get('moodPending') != None and 629 componentState.get('moodPending') != moods.happy.value): 630 self.debug("Pending mood is %r", componentState.get('moodPending')) 631 raise errors.BusyComponentError(componentState) 632 633 avatar = self.getComponentMapper(componentState).avatar 634 if not avatar: 635 # reset moodPending if asked to stop without an avatar 636 # because we changed above to allow stopping even if 637 # moodPending is happy 638 if componentState.get('mood') == moods.sad.value: 639 self.debug('asked to stop a sad component without avatar') 640 componentState.setMood(moods.sleeping.value) 641 componentState.set('moodPending', None) 642 return defer.succeed(None) 643 if componentState.get('mood') == moods.lost.value: 644 self.debug('asked to stop a lost component without avatar') 645 componentState.setMood(moods.sleeping.value) 646 componentState.set('moodPending', None) 647 return defer.succeed(None) 648 649 msg = 'asked to stop a component without avatar in mood %s' % \ 650 moods.get(componentState.get('mood')) 651 self.warning(msg) 652 return defer.fail(errors.ComponentError(msg)) 653 654 d = avatar.mindCallRemote('stop') 655 def cleanupAndDisconnectComponent(result): 656 avatar._starting = False 657 avatar._beingSetup = False 658 return avatar.disconnect()
659 660 def setSleeping(result): 661 if componentState.get('mood') == moods.sad.value: 662 self.debug('clearing sad mood after having stopped component') 663 componentState.setMood(moods.sleeping.value) 664 665 return result 666 667 d.addCallback(cleanupAndDisconnectComponent) 668 d.addCallback(setSleeping) 669 670 return d 671
672 - def componentAddMessage(self, avatarId, message):
673 """ 674 Set the given message on the given component's state. 675 Can be called e.g. by a worker to report on a crashed component. 676 Sets the mood to sad if it is an error message. 677 """ 678 if not avatarId in self._componentMappers: 679 self.warning('asked to set a message on non-mapped component %s' % 680 avatarId) 681 return 682 683 m = self._componentMappers[avatarId] 684 m.state.append('messages', message) 685 if message.level == messages.ERROR: 686 self.debug('Error message makes component sad') 687 m.state.setMood(moods.sad.value)
688 689 # FIXME: unify naming of stuff like this
690 - def workerAttached(self, workerAvatar):
691 # called when a worker logs in 692 workerId = workerAvatar.avatarId 693 self.debug('vishnu.workerAttached(): id %s' % workerId) 694 695 self._depgraph.addWorker(workerId) 696 self._depgraph.setWorkerStarted(workerId) 697 698 # Create all components assigned to this worker. Note that the 699 # order of creation is unimportant, it's only the order of 700 # starting that matters (and that's different code). 701 components = [c for c in self._getComponentsToCreate() 702 if c.get('workerRequested') in (workerId, None)] 703 # So now, check what components worker is running 704 # so we can remove them from this components list 705 # also add components we have that are lost but not 706 # in list given by worker 707 d = workerAvatar.getComponents() 708 def workerAvatarComponentListReceived(workerComponents): 709 # list() is called to work around a pychecker bug. FIXME. 710 lostComponents = list([c for c in self.getComponentStates() 711 if c.get('workerRequested') == workerId and \ 712 c.get('mood') == moods.lost.value]) 713 for comp in workerComponents: 714 # comp is an avatarId string 715 # components is a list of {ManagerComponentState} 716 if comp in self._componentMappers: 717 compState = self._componentMappers[comp].state 718 if compState in components: 719 components.remove(compState) 720 if compState in lostComponents: 721 lostComponents.remove(compState) 722 723 for compState in lostComponents: 724 self.info( 725 "Restarting previously lost component %s on worker %s", 726 self._componentMappers[compState].id, workerId) 727 # We set mood to sleeping first. This allows things to 728 # distinguish between a newly-started component and a lost 729 # component logging back in. 730 compState.set('moodPending', None) 731 compState.setMood(moods.sleeping.value) 732 733 allComponents = components + lostComponents 734 735 if not allComponents: 736 self.debug( 737 "vishnu.workerAttached(): no components for this worker") 738 return 739 740 self._workerCreateComponents(workerId, allComponents)
741 d.addCallback(workerAvatarComponentListReceived) 742
743 - def _workerCreateComponents(self, workerId, components):
744 """ 745 Create the list of components on the given worker, sequentially, but 746 in no specific order. 747 748 @param workerId: avatarId of the worker 749 @type workerId: string 750 @param components: components to start 751 @type components: list of 752 L{flumotion.common.planet.ManagerComponentState} 753 """ 754 self.debug("_workerCreateComponents: workerId %r, components %r" % ( 755 workerId, components)) 756 757 if not workerId in self.workerHeaven.avatars: 758 self.debug('worker %s not logged in yet, delaying ' 759 'component start' % workerId) 760 return defer.succeed(None) 761 762 workerAvatar = self.workerHeaven.avatars[workerId] 763 764 d = defer.Deferred() 765 766 for c in components: 767 type = c.get('type') 768 conf = c.get('config') 769 self.debug('scheduling create of %s on %s' 770 % (conf['avatarId'], workerId)) 771 d.addCallback(self._workerCreateComponentDelayed, 772 workerAvatar, c, type, conf) 773 774 d.addCallback(lambda result: self.debug( 775 '_workerCreateComponents(): completed setting up create chain')) 776 777 # now trigger the chain 778 self.debug('_workerCreateComponents(): triggering create chain') 779 d.callback(None) 780 #reactor.callLater(0, d.callback, None) 781 return d
782
783 - def _workerCreateComponentDelayed(self, result, workerAvatar, 784 componentState, type, conf):
785 786 avatarId = conf['avatarId'] 787 nice = conf.get('nice', 0) 788 789 # we set the moodPending to HAPPY, so this component only gets 790 # asked to start once 791 componentState.set('moodPending', moods.happy.value) 792 793 d = workerAvatar.createComponent(avatarId, type, nice) 794 # FIXME: here we get the avatar Id of the component we wanted 795 # started, so now attach it to the planetState's component state 796 d.addCallback(self._createCallback, componentState) 797 d.addErrback(self._createErrback, componentState)
798 799 # FIXME: shouldn't we return d here to make sure components 800 # wait on each other to be started ? 801
802 - def _createCallback(self, result, componentState):
803 self.debug('got avatarId %s for state %s' % (result, componentState)) 804 m = self._componentMappers[componentState] 805 assert result == m.id, "received id %s is not the expected id %s" % ( 806 result, m.id)
807
808 - def _createErrback(self, failure, state):
809 # FIXME: make ConfigError copyable so we can .check() it here 810 # and print a nicer warning 811 self.warning('failed to create component %s: %s' 812 % (state.get('name'), log.getFailureMessage(failure))) 813 814 if failure.check(errors.ComponentAlreadyRunningError): 815 if self._componentMappers[state].jobState: 816 self.info('component appears to have logged in in the ' 817 'meantime') 818 else: 819 self.info('component appears to be running already; ' 820 'treating it as lost until it logs in') 821 state.setMood(moods.lost.value) 822 else: 823 message = messages.Error(T_( 824 N_("The component could not be started.")), 825 debug=log.getFailureMessage(failure)) 826 827 state.setMood(moods.sad.value) 828 state.append('messages', message) 829 830 return None
831
832 - def workerDetached(self, workerAvatar):
833 # called when a worker logs out 834 workerId = workerAvatar.avatarId 835 self.debug('vishnu.workerDetached(): id %s' % workerId) 836 self._depgraph.setWorkerStopped(workerId)
837
838 - def _getComponentState(self, deferredListResult, avatar):
839 # a component just logged in with good credentials. we fetched 840 # its config and job state. now there are two possibilities: 841 # (1) we were waiting for such a component to start. There is a 842 # ManagerComponentState and an avatarId in the 843 # componentMappers waiting for us. 844 # (2) we don't know anything about this component, but since it 845 # logged in, we will deal with it, at least allowing the 846 # admin to control it. 847 848 def verifyExistingComponentState(jobState, state): 849 # condition (1) 850 state.setJobState(jobState) 851 852 if conf and state.get('config') != conf: 853 message = messages.Warning(T_( 854 N_("Component logged in with stale configuration. " 855 "Consider stopping this component and restarting " 856 "the manager.")), 857 debug=("Expected\n%r\n, but got\n%r;\n" 858 "updating internal state accordingly." % 859 (state.get('config'), conf))) 860 self.warning('updating internal component state for %r ' 861 '(changing config from %r to %r)', state, 862 state.get('config'), conf) 863 state.set('config', conf) 864 state.append('messages', message)
865 # if conf is None, then we just created the component and 866 # it's not set up yet 867 868 def makeNewComponentState(conf): 869 # condition (2) 870 state = planet.ManagerComponentState() 871 state.setJobState(jobState) 872 873 if conf: 874 flowName, compName = conf['parent'], conf['name'] 875 else: 876 # unfortunately there is a window in which a component does 877 # not have a config. accept that so that an admin can stop 878 # this component. 879 flowName, compName = common.parseComponentId(avatar.avatarId) 880 conf = {'name': compName, 881 'parent': flowName, 882 'type': 'unknown-component', 883 'avatarId': avatar.avatarId, 884 'properties': {}} 885 886 state.set('name', compName) 887 state.set('type', conf['type']) 888 state.set('workerRequested', jobState.get('workerName')) 889 state.set('config', conf) 890 891 # check if we have this flow yet and add if not 892 if flowName == 'atmosphere': 893 # treat the atmosphere like a flow, although it's not 894 flow = self.state.get('atmosphere') 895 else: 896 flow = _first(self.state.get('flows'), 897 lambda x: x.get('name') == flowName) 898 if not flow: 899 self.info('Creating flow "%s"' % flowName) 900 flow = planet.ManagerFlowState() 901 flow.set('name', flowName) 902 flow.set('parent', self.state) 903 self.state.append('flows', flow) 904 905 state.set('parent', flow) 906 flow.append('components', state) 907 908 # add to mapper 909 m = ComponentMapper() 910 m.state = state 911 m.id = avatar.avatarId 912 self._componentMappers[m.state] = m 913 self._componentMappers[m.id] = m 914 915 (_success1, conf), (_success2, jobState) = deferredListResult 916 m = self.getComponentMapper(avatar.avatarId) 917 918 if m: 919 verifyExistingComponentState(jobState, m.state) 920 else: 921 makeNewComponentState(conf) 922 923 m = self.getComponentMapper(avatar.avatarId) 924 925 # make sure the component is in the depgraph 926 self._depgraph.addComponent(m.state) 927 928 m.avatar = avatar 929 self._componentMappers[m.avatar] = m 930 avatar.componentState = m.state 931 avatar.jobState = jobState 932 m.jobState = jobState 933 self._componentMappers[jobState] = m 934
935 - def componentAttached(self, componentAvatar):
936 # called when a component logs in and gets a component avatar created 937 id = componentAvatar.avatarId 938 self.debug("%s component attached", id) 939 d = defer.DeferredList([componentAvatar.mindCallRemote('getConfig'), 940 componentAvatar.mindCallRemote('getState')], 941 fireOnOneErrback=True) 942 d.addCallback(self._getComponentState, componentAvatar) 943 return d
944
945 - def componentDetached(self, componentAvatar):
946 # called when the component has detached 947 self.debug("%s component detached", componentAvatar.avatarId) 948 self._depgraph.setJobStopped(componentAvatar.componentState) 949 componentAvatar.componentState.set('moodPending', None) 950 951 # detach componentstate fom avatar 952 componentAvatar.componentState = None 953 componentAvatar.jobState = None
954
955 - def registerComponent(self, componentAvatar):
956 # called when the jobstate is retrieved 957 self.debug('vishnu registering component %r' % componentAvatar) 958 959 state = componentAvatar.componentState 960 self._depgraph.setJobStarted(state) 961 # If this is a reconnecting component, we might also need to set the 962 # component as started. 963 # If mood is happy or hungry, then the component is running. 964 mood = state.get('mood') 965 if mood == moods.happy.value or mood == moods.hungry.value: 966 self.debug("Component %s is already in mood %s. Set depgraph " 967 "appropriately", componentAvatar.avatarId, moods.get(mood).name) 968 self._depgraph.setComponentSetup(state) 969 self._depgraph.setComponentStarted(state) 970 if self._depgraph.isAClockMaster(state): 971 self.log("Component %s is a clock master and is happy/hungry " 972 "so must already be providing clock master", 973 componentAvatar.avatarId) 974 self._depgraph.setClockMasterStarted(state) 975 976 self.debug('vishnu registered component %r' % componentAvatar) 977 self.componentHeaven._tryWhatCanBeStarted()
978
979 - def unregisterComponent(self, componentAvatar):
980 # called when the component is logging out 981 # clear up jobState and avatar 982 self.debug('unregisterComponent(%r): cleaning up state' % 983 componentAvatar) 984 985 if componentAvatar not in self._componentMappers: 986 self.warning("Component logging out that was incompletely logged " 987 " in, ignoring") 988 return 989 990 m = self._componentMappers[componentAvatar] 991 992 # unmap jobstate 993 try: 994 del self._componentMappers[m.jobState] 995 except KeyError: 996 self.warning('Could not remove jobState for %r' % componentAvatar) 997 m.jobState = None 998 999 m.state.set('pid', None) 1000 m.state.set('cpu', None) 1001 m.state.set('workerName', None) 1002 m.state.set('moodPending', None) 1003 1004 # unmap avatar 1005 del self._componentMappers[m.avatar] 1006 m.avatar = None
1007
1008 - def getComponentStates(self):
1009 list = self.state.getComponents() 1010 self.debug('getComponentStates(): %d components' % len(list)) 1011 for c in list: 1012 self.log(repr(c)) 1013 mood = c.get('mood') 1014 if mood == None: 1015 self.warning('%s has mood None' % c.get('name')) 1016 1017 return list
1018
1019 - def deleteComponent(self, componentState):
1020 """ 1021 Empty the planet of the given component. 1022 1023 @returns: a deferred that will fire when all listeners have been 1024 notified of the removal of the component. 1025 """ 1026 self.debug('deleting component %r from state', componentState) 1027 c = componentState 1028 flow = componentState.get('parent') 1029 if (c.get('moodPending') != None 1030 or c.get('mood') is not moods.sleeping.value): 1031 raise errors.BusyComponentError(c) 1032 1033 self._depgraph.removeComponent(c) 1034 1035 del self._componentMappers[self._componentMappers[c].id] 1036 del self._componentMappers[c] 1037 return flow.remove('components', c)
1038
1039 - def deleteFlow(self, flowName):
1040 """ 1041 Empty the planet of a flow. 1042 1043 @returns: a deferred that will fire when the flow is removed. 1044 """ 1045 1046 # first get all components to sleep 1047 flow = _find(self.state.get('flows'), flowName, lambda x: x.get('name')) 1048 components = flow.get('components') 1049 1050 # if any component is already in a mood change/command, fail 1051 isBusy = lambda c: c.get('moodPending') != None 1052 isNotSleeping = lambda c: c.get('mood') is not moods.sleeping.value 1053 pred = _fint(isBusy, isNotSleeping) 1054 if _any(components, pred): 1055 raise errors.BusyComponentError(_first(components, pred)) 1056 1057 for c in components: 1058 self._depgraph.removeComponent(c) 1059 del self._componentMappers[self._componentMappers[c].id] 1060 del self._componentMappers[c] 1061 yield flow.empty() 1062 yield self.state.remove('flows', flow)
1063 deleteFlow = defer_generator_method(deleteFlow) 1064
1065 - def emptyPlanet(self):
1066 """ 1067 Empty the planet of all components, and flows. 1068 1069 @returns: a deferred that will fire when the planet is empty. 1070 """ 1071 # first get all components to sleep 1072 components = self.getComponentStates() 1073 1074 # if any component is already in a mood change/command, fail 1075 isPending = lambda c: c.get('moodPending') != None 1076 components = filter(isPending, components) 1077 if len(components) > 0: 1078 state = components[0] 1079 raise errors.BusyComponentError(state, 1080 "moodPending is %s" % moods.get(state.get('moodPending'))) 1081 1082 # filter out the ones that aren't sleeping and stop them 1083 components = self.getComponentStates() 1084 isNotSleeping = lambda c: c.get('mood') is not moods.sleeping.value 1085 components = filter(isNotSleeping, components) 1086 1087 # create a big deferred for stopping everything 1088 d = defer.Deferred() 1089 1090 self.debug('need to stop %d components: %r' % ( 1091 len(components), components)) 1092 1093 # FIXME: we should shut components down in the correct order (according 1094 # to the dependency graph); this uses an undefined ordering. 1095 for c in components: 1096 avatar = self._componentMappers[c].avatar 1097 # If this has logged out, but isn't sleeping (so is sad or lost), 1098 # we won't have an avatar. So, stop if it we can. 1099 if avatar: 1100 d.addCallback(lambda result, a: a.stop(), avatar) 1101 else: 1102 assert (c.get('mood') is moods.sad.value or 1103 c.get('mood') is moods.lost.value) 1104 1105 d.addCallback(self._emptyPlanetCallback) 1106 1107 # trigger the deferred after returning 1108 reactor.callLater(0, d.callback, None) 1109 1110 return d
1111
1112 - def _emptyPlanetCallback(self, result):
1113 # gets called after all components have stopped 1114 # cleans up the rest of the planet state 1115 components = self.getComponentStates() 1116 self.debug('_emptyPlanetCallback: need to delete %d components' % 1117 len(components)) 1118 1119 for c in components: 1120 # remove from depgraph 1121 self._depgraph.removeComponent(c) 1122 1123 if c.get('mood') is not moods.sleeping.value: 1124 self.warning('Component %s is not sleeping' % c.get('name')) 1125 # clear mapper; remove componentstate and id 1126 m = self._componentMappers[c] 1127 del self._componentMappers[m.id] 1128 del self._componentMappers[c] 1129 1130 # if anything's left, we have a mistake somewhere 1131 l = self._componentMappers.keys() 1132 if len(l) > 0: 1133 self.warning('mappers still has keys %r' % (repr(l))) 1134 1135 list = [] 1136 1137 list.append(self.state.get('atmosphere').empty()) 1138 1139 for f in self.state.get('flows'): 1140 self.debug('appending deferred for emptying flow %r' % f) 1141 list.append(f.empty()) 1142 self.debug('appending deferred for removing flow %r' % f) 1143 list.append(self.state.remove('flows', f)) 1144 self.debug('appended deferreds') 1145 1146 dl = defer.DeferredList(list) 1147 return dl
1148
1149 - def _getComponentsToCreate(self):
1150 """ 1151 @rtype: list of L{flumotion.common.planet.ManagerComponentState} 1152 """ 1153 # return a list of components that are sleeping 1154 components = self.state.getComponents() 1155 1156 # filter the ones that are sleeping 1157 # NOTE: now sleeping indicates that there is no existing job 1158 # as when jobs are created, mood becomes waking, so no need to 1159 # filter on moodPending 1160 isSleeping = lambda c: c.get('mood') == moods.sleeping.value 1161 components = filter(isSleeping, components) 1162 return components
1163
1164 - def _getWorker(self, workerName):
1165 # returns the WorkerAvatar with the given name 1166 if not workerName in self.workerHeaven.avatars: 1167 raise errors.ComponentNoWorkerError("Worker %s not logged in?" 1168 % workerName) 1169 1170 return self.workerHeaven.avatars[workerName]
1171
1172 - def getWorkerFeedServerPort(self, workerName):
1173 return self._getWorker(workerName).feedServerPort
1174
1175 - def reservePortsOnWorker(self, workerName, numPorts):
1176 """ 1177 Requests a number of ports on the worker named workerName. The 1178 ports will be reserved for the use of the caller until 1179 releasePortsOnWorker is called. 1180 1181 @returns: a list of ports as integers 1182 """ 1183 return self._getWorker(workerName).reservePorts(numPorts)
1184
1185 - def releasePortsOnWorker(self, workerName, ports):
1186 """ 1187 Tells the manager that the given ports are no longer being used, 1188 and may be returned to the allocation pool. 1189 """ 1190 try: 1191 return self._getWorker(workerName).releasePorts(ports) 1192 except errors.ComponentNoWorkerError, e: 1193 self.warning('could not release ports: %r' % e.args)
1194
1195 - def getComponentMapper(self, object):
1196 """ 1197 Look up an object mapper given the object. 1198 1199 @rtype: L{ComponentMapper} or None 1200 """ 1201 if object in self._componentMappers.keys(): 1202 return self._componentMappers[object] 1203 1204 return None
1205