Package flumotion :: Package manager :: Module manager
[hide private]

Source Code for Module flumotion.manager.manager

   1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_manager -*- 
   2  # vi:si:et:sw=4:sts=4:ts=4 
   3  # 
   4  # Flumotion - a streaming media server 
   5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
   6  # All rights reserved. 
   7   
   8  # This file may be distributed and/or modified under the terms of 
   9  # the GNU General Public License version 2 as published by 
  10  # the Free Software Foundation. 
  11  # This file is distributed without any warranty; without even the implied 
  12  # warranty of merchantability or fitness for a particular purpose. 
  13  # See "LICENSE.GPL" in the source distribution for more information. 
  14   
  15  # Licensees having purchased or holding a valid Flumotion Advanced 
  16  # Streaming Server license may use this file in accordance with the 
  17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
  18  # See "LICENSE.Flumotion" in the source distribution for more information. 
  19   
  20  # Headers in this file shall remain intact. 
  21   
  22  """ 
  23  manager implementation and related classes 
  24   
  25  API Stability: semi-stable 
  26   
  27  @var  LOCAL_IDENTITY: an identity for the manager itself; can be used 
  28                        to compare against to verify that the manager 
  29                        requested an action 
  30  @type LOCAL_IDENTITY: L{LocalIdentity} 
  31  """ 
  32   
  33  import os 
  34   
  35  from twisted.internet import reactor, defer 
  36  from twisted.python import components, failure 
  37  from twisted.spread import pb 
  38  from twisted.cred import portal 
  39  from zope.interface import implements 
  40   
  41  from flumotion.common import errors, interfaces, log, registry 
  42  from flumotion.common import planet, common, dag, messages, reflectcall, server 
  43  from flumotion.common.i18n import N_, gettexter 
  44  from flumotion.common.identity import RemoteIdentity, LocalIdentity 
  45  from flumotion.common.netutils import addressGetHost 
  46  from flumotion.common.planet import moods 
  47  from flumotion.configure import configure 
  48  from flumotion.manager import admin, component, worker, base, config 
  49  from flumotion.twisted import checkers 
  50  from flumotion.twisted import portal as fportal 
  51   
  52  __all__ = ['ManagerServerFactory', 'Vishnu'] 
  53  __version__ = "$Rev: 6770 $" 
  54  T_ = gettexter() 
  55  LOCAL_IDENTITY = LocalIdentity('manager') 
  56   
  57   
  58  # an internal class 
59 -class Dispatcher(log.Loggable):
60 """ 61 I implement L{twisted.cred.portal.IRealm}. 62 I make sure that when a L{pb.Avatar} is requested through me, the 63 Avatar being returned knows about the mind (client) requesting 64 the Avatar. 65 """ 66 67 implements(portal.IRealm) 68 69 logCategory = 'dispatcher' 70
71 - def __init__(self, computeIdentity):
72 """ 73 @param computeIdentity: see L{Vishnu.computeIdentity} 74 @type computeIdentity: callable 75 """ 76 self._interfaceHeavens = {} # interface -> heaven 77 self._computeIdentity = computeIdentity 78 self._bouncer = None 79 self._avatarKeycards = {} # avatarId -> keycard
80
81 - def setBouncer(self, bouncer):
82 """ 83 @param bouncer: the bouncer to authenticate with 84 @type bouncer: L{flumotion.component.bouncers.bouncer} 85 """ 86 self._bouncer = bouncer
87
88 - def registerHeaven(self, heaven, interface):
89 """ 90 Register a Heaven as managing components with the given interface. 91 92 @type interface: L{twisted.python.components.Interface} 93 @param interface: a component interface to register the heaven with. 94 """ 95 assert isinstance(heaven, base.ManagerHeaven) 96 97 self._interfaceHeavens[interface] = heaven
98 99 ### IRealm methods
100 - def requestAvatar(self, avatarId, keycard, mind, *ifaces):
101 def got_avatar(avatar): 102 if avatar.avatarId in heaven.avatars: 103 raise errors.AlreadyConnectedError(avatar.avatarId) 104 heaven.avatars[avatar.avatarId] = avatar 105 self._avatarKeycards[avatar.avatarId] = keycard 106 107 # OK so this is byzantine, but test_manager_manager actually 108 # uses these kwargs to set its own info. so don't change 109 # these args or their order or you will break your test 110 # suite. 111 def cleanup(avatarId=avatar.avatarId, avatar=avatar, mind=mind): 112 self.info('lost connection to client %r', avatar) 113 del heaven.avatars[avatar.avatarId] 114 avatar.onShutdown() 115 # avoid leaking the keycard 116 keycard = self._avatarKeycards.pop(avatarId) 117 if self._bouncer: 118 try: 119 self._bouncer.removeKeycard(keycard) 120 except KeyError: 121 self.warning("bouncer forgot about keycard %r", 122 keycard)
123 124 return (pb.IPerspective, avatar, cleanup)
125 126 def got_error(failure): 127 # If we failed for some reason, we want to drop the connection. 128 # However, we want the failure to get to the client, so we don't 129 # call loseConnection() immediately - we return the failure first. 130 # loseConnection() will then not drop the connection until it has 131 # finished sending the current data to the client. 132 reactor.callLater(0, mind.broker.transport.loseConnection) 133 return failure 134 135 if pb.IPerspective not in ifaces: 136 raise errors.NoPerspectiveError(avatarId) 137 if len(ifaces) != 2: 138 # IPerspective and the specific avatar interface. 139 raise errors.NoPerspectiveError(avatarId) 140 iface = [x for x in ifaces if x != pb.IPerspective][0] 141 if iface not in self._interfaceHeavens: 142 self.warning('unknown interface %r', iface) 143 raise errors.NoPerspectiveError(avatarId) 144 145 heaven = self._interfaceHeavens[iface] 146 klass = heaven.avatarClass 147 host = addressGetHost(mind.broker.transport.getPeer()) 148 d = self._computeIdentity(keycard, host) 149 d.addCallback(lambda identity: \ 150 klass.makeAvatar(heaven, avatarId, identity, mind)) 151 d.addCallbacks(got_avatar, got_error) 152 return d 153
154 -class ComponentMapper:
155 """ 156 I am an object that ties together different objects related to a 157 component. I am used as values in a lookup hash in the vishnu. 158 """
159 - def __init__(self):
160 self.state = None # ManagerComponentState; created first 161 self.id = None # avatarId of the eventual ComponentAvatar 162 self.avatar = None # ComponentAvatar 163 self.jobState = None # ManagerJobState of a running component
164
165 -class Vishnu(log.Loggable):
166 """ 167 I am the toplevel manager object that knows about all heavens and factories. 168 169 @cvar dispatcher: dispatcher to create avatars 170 @type dispatcher: L{Dispatcher} 171 @cvar workerHeaven: the worker heaven 172 @type workerHeaven: L{worker.WorkerHeaven} 173 @cvar componentHeaven: the component heaven 174 @type componentHeaven: L{component.ComponentHeaven} 175 @cvar adminHeaven: the admin heaven 176 @type adminHeaven: L{admin.AdminHeaven} 177 @cvar configDir: the configuration directory for this Vishnu's manager 178 @type configDir: str 179 """ 180 181 implements(server.IServable) 182 183 logCategory = "vishnu" 184
185 - def __init__(self, name, unsafeTracebacks=0, configDir=None):
186 # create a Dispatcher which will hand out avatars to clients 187 # connecting to me 188 self.dispatcher = Dispatcher(self.computeIdentity) 189 190 self.workerHeaven = self._createHeaven(interfaces.IWorkerMedium, 191 worker.WorkerHeaven) 192 self.componentHeaven = self._createHeaven(interfaces.IComponentMedium, 193 component.ComponentHeaven) 194 self.adminHeaven = self._createHeaven(interfaces.IAdminMedium, 195 admin.AdminHeaven) 196 197 self.running = True 198 def setStopped(): 199 self.running = False
200 reactor.addSystemEventTrigger('before', 'shutdown', setStopped) 201 202 if configDir is not None: 203 self.configDir = configDir 204 else: 205 self.configDir = os.path.join(configure.configdir, 206 "managers", name) 207 208 self.bouncer = None # used by manager to authenticate worker/component 209 210 self.bundlerBasket = registry.getRegistry().makeBundlerBasket() 211 212 self._componentMappers = {} # any object -> ComponentMapper 213 214 self.state = planet.ManagerPlanetState() 215 self.state.set('name', name) 216 217 self.plugs = {} # socket -> list of plugs 218 219 # create a portal so that I can be connected to, through our dispatcher 220 # implementing the IRealm and a bouncer 221 self.portal = fportal.BouncerPortal(self.dispatcher, None) 222 #unsafeTracebacks = 1 # for debugging tracebacks to clients 223 self.factory = pb.PBServerFactory(self.portal, 224 unsafeTracebacks=unsafeTracebacks) 225 self.connectionInfo = {} 226 self.setConnectionInfo(None, None, None)
227
228 - def shutdown(self):
229 """Cancel any pending operations in preparation for shutdown. 230 231 This method is mostly useful for unit tests; currently, it is 232 not called during normal operation. Note that the caller is 233 responsible for stopping listening on the port, as the the 234 manager does not have a handle on the twisted port object. 235 236 @returns: A deferred that will fire when the manager has shut 237 down. 238 """ 239 if self.bouncer: 240 return self.bouncer.stop() 241 else: 242 return defer.succeed(None)
243
244 - def setConnectionInfo(self, host, port, use_ssl):
245 info = dict(host=host, port=port, use_ssl=use_ssl) 246 self.connectionInfo.update(info)
247
248 - def getConfiguration(self):
249 """Returns the manager's configuration as a string suitable for 250 importing via loadConfiguration(). 251 """ 252 return config.exportPlanetXml(self.state)
253
254 - def getBundlerBasket(self):
255 """ 256 Return a bundler basket to unbundle from. 257 If the registry files were updated since the last time, the 258 bundlerbasket will be rebuilt. 259 260 @since: 0.2.2 261 @rtype: L{flumotion.common.bundle.BundlerBasket} 262 """ 263 if registry.getRegistry().rebuildNeeded(): 264 self.info("Registry changed, rebuilding") 265 registry.getRegistry().verify(force=True) 266 self.bundlerBasket = registry.getRegistry().makeBundlerBasket() 267 return self.bundlerBasket
268
269 - def addMessage(self, level, id, format, *args, **kwargs):
270 """ 271 Convenience message to construct a message and add it to the 272 planet state. `format' should be marked as translatable in the 273 source with N_, and *args will be stored as format arguments. 274 Keyword arguments are passed on to the message constructor. See 275 L{flumotion.common.messages.Message} for the meanings of the 276 rest of the arguments. 277 278 For example:: 279 280 self.addMessage(messages.WARNING, 'foo-warning', 281 N_('The answer is %d'), 42, debug='not really') 282 """ 283 self.addMessageObject(messages.Message(level, 284 T_(format, *args), 285 id=id, **kwargs))
286
287 - def addMessageObject(self, message):
288 """ 289 Add a message to the planet state. 290 291 @type message: L{flumotion.common.messages.Message} 292 """ 293 self.state.setitem('messages', message.id, message)
294
295 - def clearMessage(self, mid):
296 """ 297 Clear any messages with the given message ID from the planet 298 state. 299 300 @type mid: message ID, normally a str 301 """ 302 if mid in self.state.get('messages'): 303 self.state.delitem('messages', mid)
304
305 - def adminAction(self, identity, message, args, kw):
306 """ 307 @param identity: L{flumotion.common.identity.Identity} 308 """ 309 socket = 'flumotion.component.plugs.adminaction.AdminAction' 310 if self.plugs.has_key(socket): 311 for plug in self.plugs[socket]: 312 plug.action(identity, message, args, kw)
313
314 - def computeIdentity(self, keycard, remoteHost):
315 """ 316 Compute a suitable identity for a remote host. First looks to 317 see if there is a 318 flumotion.component.plugs.identity.IdentityProvider plug 319 installed on the manager, falling back to user@host. 320 321 The identity is only used in the adminaction interface. An 322 example of its use is when you have an adminaction plug that 323 checks an admin's privileges before actually doing an action; 324 the identity object you use here might store the privileges that 325 the admin has. 326 327 @param keycard: the keycard that the remote host used to log in. 328 @type keycard: L{flumotion.common.keycards.Keycard} 329 @param remoteHost: the ip of the remote host 330 @type remoteHost: str 331 332 @rtype: a deferred that will fire a 333 L{flumotion.common.identity.RemoteIdentity} 334 """ 335 336 socket = 'flumotion.component.plugs.identity.IdentityProvider' 337 if self.plugs.has_key(socket): 338 for plug in self.plugs[socket]: 339 identity = plug.computeIdentity(keycard, remoteHost) 340 if identity: 341 return identity 342 username = getattr(keycard, 'username', None) 343 return defer.succeed(RemoteIdentity(username, remoteHost))
344
345 - def _addComponent(self, conf, parent, identity):
346 """ 347 Add a component state for the given component config entry. 348 349 @rtype: L{flumotion.common.planet.ManagerComponentState} 350 """ 351 352 self.debug('adding component %s to %s' 353 % (conf.name, parent.get('name'))) 354 355 if identity != LOCAL_IDENTITY: 356 self.adminAction(identity, '_addComponent', (conf, parent), {}) 357 358 state = planet.ManagerComponentState() 359 state.set('name', conf.name) 360 state.set('type', conf.getType()) 361 state.set('workerRequested', conf.worker) 362 state.setMood(moods.sleeping.value) 363 state.set('config', conf.getConfigDict()) 364 365 state.set('parent', parent) 366 parent.append('components', state) 367 368 avatarId = conf.getConfigDict()['avatarId'] 369 370 self.clearMessage('loadComponent-%s' % avatarId) 371 372 # FIXME: don't use configure.versionTuple, get the appropriate 373 # version for conf['package'] 374 if not common.checkVersionsCompat(conf.getConfigDict()['version'], 375 configure.versionTuple): 376 m = messages.Warning(T_(N_("This component is configured for " 377 "Flumotion version %s, but you are running version %s.\n" 378 "Please update the configuration of the component.\n"), 379 common.versionTupleToString(conf.getConfigDict()['version']), 380 configure.version)) 381 state.append('messages', m) 382 383 # add to mapper 384 m = ComponentMapper() 385 m.state = state 386 m.id = avatarId 387 self._componentMappers[state] = m 388 self._componentMappers[avatarId] = m 389 390 return state
391
392 - def _updateStateFromConf(self, _, conf, identity):
393 """ 394 Add a new config object into the planet state. 395 396 @returns: a list of all components added 397 @rtype: list of L{flumotion.common.planet.ManagerComponentState} 398 """ 399 400 self.debug('syncing up planet state with config') 401 added = [] # added components while parsing 402 403 def checkNotRunning(comp, parentState): 404 name = comp.getName() 405 406 comps = dict([(x.get('name'), x) 407 for x in parentState.get('components')]) 408 runningComps = dict([(x.get('name'), x) 409 for x in parentState.get('components') 410 if x.get('mood') != moods.sleeping.value]) 411 if name not in comps: 412 # We don't have it at all; allow it 413 return True 414 elif name not in runningComps: 415 # We have it, but it's not running. Allow it after deleting 416 # the old one. 417 oldComp = comps[name] 418 self.deleteComponent(oldComp) 419 return True 420 421 # if we get here, the component is already running; warn if 422 # the running configuration is different. Return False in 423 # all cases. 424 parent = comps[name].get('parent').get('name') 425 newConf = c.getConfigDict() 426 oldConf = comps[name].get('config') 427 428 if newConf == oldConf: 429 self.debug('%s already has component %s running with ' 430 'same configuration', parent, name) 431 self.clearMessage('loadComponent-%s' % oldConf['avatarId']) 432 return False 433 434 self.info('%s already has component %s, but configuration ' 435 'not the same -- notifying admin', parent, name) 436 437 diff = config.dictDiff(oldConf, newConf) 438 diffMsg = config.dictDiffMessageString(diff, 'existing', 'new') 439 440 self.addMessage(messages.WARNING, 441 'loadComponent-%s' % oldConf['avatarId'], 442 N_('Could not load component %r into %r: ' 443 'a component is already running with ' 444 'this name, but has a different ' 445 'configuration.'), name, parent, 446 debug=diffMsg) 447 return False
448 449 state = self.state 450 atmosphere = state.get('atmosphere') 451 for c in conf.atmosphere.components.values(): 452 if checkNotRunning(c, atmosphere): 453 added.append(self._addComponent(c, atmosphere, identity)) 454 455 flows = dict([(x.get('name'), x) for x in state.get('flows')]) 456 for f in conf.flows: 457 if f.name in flows: 458 flow = flows[f.name] 459 else: 460 self.info('creating flow %r', f.name) 461 flow = planet.ManagerFlowState(name=f.name, parent=state) 462 state.append('flows', flow) 463 464 for c in f.components.values(): 465 if checkNotRunning(c, flow): 466 added.append(self._addComponent(c, flow, identity)) 467 468 return added 469
470 - def _startComponents(self, components, identity):
471 # now start all components that need starting -- collecting into 472 # an temporary dict of the form {workerId => [components]} 473 componentsToStart = {} 474 for c in components: 475 workerId = c.get('workerRequested') 476 if not workerId in componentsToStart: 477 componentsToStart[workerId] = [] 478 componentsToStart[workerId].append(c) 479 self.debug('_startComponents: componentsToStart %r' % componentsToStart) 480 481 for workerId, componentStates in componentsToStart.items(): 482 self._workerCreateComponents(workerId, componentStates)
483
484 - def _loadComponentConfiguration(self, conf, identity):
485 # makeBouncer only makes a bouncer if there is one in the config 486 d = defer.succeed(None) 487 d.addCallback(self._updateStateFromConf, conf, identity) 488 d.addCallback(self._startComponents, identity) 489 return d
490
491 - def loadComponentConfigurationXML(self, file, identity):
492 """ 493 Load the configuration from the given XML, merging it on top of 494 the currently running configuration. 495 496 @param file: file to parse, either as an open file object, 497 or as the name of a file to open 498 @type file: str or file 499 @param identity: The identity making this request.. This is used by the 500 adminaction logging mechanism in order to say who is 501 performing the action. 502 @type identity: L{flumotion.common.identity.Identity} 503 """ 504 self.debug('loading configuration') 505 mid = 'loadComponent-parse-error' 506 if isinstance(file, str): 507 mid += '-%s' % file 508 try: 509 self.clearMessage(mid) 510 conf = config.PlanetConfigParser(file) 511 conf.parse() 512 return self._loadComponentConfiguration(conf, identity) 513 except errors.ConfigError, e: 514 self.addMessage(messages.WARNING, mid, 515 N_('Invalid component configuration.'), 516 debug=e.args[0]) 517 return defer.fail(e) 518 except errors.UnknownComponentError, e: 519 if isinstance(file, str): 520 debug = 'Configuration loaded from file %r' % file 521 else: 522 debug = 'Configuration loaded remotely' 523 self.addMessage(messages.WARNING, mid, 524 N_('Unknown component in configuration: %s.'), 525 e.args[0], debug=debug) 526 return defer.fail(e) 527 except Exception, e: 528 self.addMessage(messages.WARNING, mid, 529 N_('Unknown error while loading configuration.'), 530 debug=log.getExceptionMessage(e)) 531 return defer.fail(e)
532
533 - def _loadManagerPlugs(self, conf):
534 # Load plugs 535 for socket, plugs in conf.plugs.items(): 536 if not socket in self.plugs: 537 self.plugs[socket] = [] 538 539 for args in plugs: 540 self.debug('loading plug type %s for socket %s' 541 % (args['type'], socket)) 542 defs = registry.getRegistry().getPlug(args['type']) 543 e = defs.getEntry() 544 call = reflectcall.reflectCallCatching 545 546 plug = call(errors.ConfigError, 547 e.getModuleName(), e.getFunction(), args) 548 self.plugs[socket].append(plug)
549
550 - def startManagerPlugs(self):
551 for socket in self.plugs: 552 for plug in self.plugs[socket]: 553 self.debug('starting plug %r for socket %s', plug, socket) 554 plug.start(self)
555
556 - def _loadManagerBouncer(self, conf):
557 if not (conf.bouncer): 558 self.warning('no bouncer defined, nothing can access the ' 559 'manager') 560 return defer.succeed(None) 561 562 self.debug('going to start manager bouncer %s of type %s', 563 conf.bouncer.name, conf.bouncer.type) 564 565 defs = registry.getRegistry().getComponent(conf.bouncer.type) 566 entry = defs.getEntryByType('component') 567 # FIXME: use entry.getModuleName() (doesn't work atm?) 568 moduleName = defs.getSource() 569 methodName = entry.getFunction() 570 bouncer = reflectcall.createComponent(moduleName, methodName, 571 conf.bouncer.getConfigDict()) 572 d = bouncer.waitForHappy() 573 def setupCallback(result): 574 bouncer.debug('started') 575 self.setBouncer(bouncer)
576 def setupErrback(failure): 577 self.warning('Error starting manager bouncer') 578 d.addCallbacks(setupCallback, setupErrback) 579 return d 580
581 - def loadManagerConfigurationXML(self, file):
582 """ 583 Load manager configuration from the given XML. The manager 584 configuration is currently used to load the manager's bouncer 585 and plugs, and is only run once at startup. 586 587 @param file: file to parse, either as an open file object, 588 or as the name of a file to open 589 @type file: str or file 590 """ 591 self.debug('loading configuration') 592 conf = config.ManagerConfigParser(file) 593 conf.parseBouncerAndPlugs() 594 self._loadManagerPlugs(conf) 595 self._loadManagerBouncer(conf) 596 conf.unlink()
597 598 __pychecker__ = 'maxargs=11' # hahaha
599 - def loadComponent(self, identity, componentType, componentId, 600 componentLabel, properties, workerName, 601 plugs, eaters, isClockMaster, virtualFeeds):
602 """ 603 Load a component into the manager configuration. 604 605 See L{flumotion.manager.admin.AdminAvatar.perspective_loadComponent} 606 for a definition of the argument types. 607 """ 608 self.debug('loading %s component %s on %s', 609 componentType, componentId, workerName) 610 parentName, compName = common.parseComponentId(componentId) 611 612 if isClockMaster: 613 raise NotImplementedError("Clock master components are not " 614 "yet supported") 615 if worker is None: 616 raise errors.ConfigError("Component %r needs to specify the" 617 " worker on which it should run" 618 % componentId) 619 620 state = self.state 621 compState = None 622 623 compConf = config.ConfigEntryComponent(compName, parentName, 624 componentType, 625 componentLabel, 626 properties, 627 plugs, workerName, 628 eaters, isClockMaster, 629 None, None, virtualFeeds) 630 631 if compConf.defs.getNeedsSynchronization(): 632 raise NotImplementedError("Components that need " 633 "synchronization are not yet " 634 "supported") 635 636 if parentName == 'atmosphere': 637 parentState = state.get('atmosphere') 638 else: 639 flows = dict([(x.get('name'), x) for x in state.get('flows')]) 640 if parentName in flows: 641 parentState = flows[parentName] 642 else: 643 self.info('creating flow %r', parentName) 644 parentState = planet.ManagerFlowState(name=parentName, 645 parent=state) 646 state.append('flows', parentState) 647 648 components = [x.get('name') for x in parentState.get('components')] 649 if compName in components: 650 self.debug('%r already has component %r', parentName, compName) 651 raise errors.ComponentAlreadyExistsError(compName) 652 653 compState = self._addComponent(compConf, parentState, identity) 654 655 self._startComponents([compState], identity) 656 657 return compState
658
659 - def _createHeaven(self, interface, klass):
660 """ 661 Create a heaven of the given klass that will send avatars to clients 662 implementing the given medium interface. 663 664 @param interface: the medium interface to create a heaven for 665 @type interface: L{flumotion.common.interfaces.IMedium} 666 @param klass: the type of heaven to create 667 @type klass: an implementor of L{flumotion.common.interfaces.IHeaven} 668 """ 669 assert issubclass(interface, interfaces.IMedium) 670 heaven = klass(self) 671 self.dispatcher.registerHeaven(heaven, interface) 672 return heaven
673
674 - def setBouncer(self, bouncer):
675 """ 676 @type bouncer: L{flumotion.component.bouncers.bouncer.Bouncer} 677 """ 678 if self.bouncer: 679 self.warning("manager already had a bouncer, setting anyway") 680 681 self.bouncer = bouncer 682 self.portal.bouncer = bouncer 683 self.dispatcher.setBouncer(bouncer)
684
685 - def getFactory(self):
686 return self.factory
687
688 - def componentCreate(self, componentState):
689 """ 690 Create the given component. This will currently also trigger 691 a start eventually when the component avatar attaches. 692 693 The component should be sleeping. 694 The worker it should be started on should be present. 695 """ 696 m = componentState.get('mood') 697 if m != moods.sleeping.value: 698 raise errors.ComponentMoodError("%r not sleeping but %s" % ( 699 componentState, moods.get(m).name)) 700 701 p = componentState.get('moodPending') 702 if p != None: 703 raise errors.ComponentMoodError( 704 "%r already has a pending mood %s" % ( 705 componentState, moods.get(p).name)) 706 707 # find a worker this component can start on 708 workerId = (componentState.get('workerName') 709 or componentState.get('workerRequested')) 710 711 if not workerId in self.workerHeaven.avatars: 712 raise errors.ComponentNoWorkerError( 713 "worker %s is not logged in" % workerId) 714 else: 715 return self._workerCreateComponents(workerId, [componentState])
716
717 - def _componentStopNoAvatar(self, componentState, avatarId):
718 # NB: reset moodPending if asked to stop without an avatar 719 # because we changed above to allow stopping even if moodPending 720 # is happy 721 def stopSad(): 722 self.debug('asked to stop a sad component without avatar') 723 for mid in componentState.get('messages')[:]: 724 self.debug("Deleting message %r", mid) 725 componentState.remove('messages', mid) 726 727 componentState.setMood(moods.sleeping.value) 728 componentState.set('moodPending', None) 729 return defer.succeed(None)
730 731 def stopLost(): 732 def gotComponents(comps): 733 return avatarId in comps 734 def gotJobRunning(running): 735 if running: 736 self.warning('asked to stop lost component %r, but ' 737 'it is still running', avatarId) 738 # FIXME: put a message on the state to suggest a 739 # kill? 740 msg = "Cannot stop lost component which is still running." 741 raise errors.ComponentMoodError(msg) 742 else: 743 self.debug('component %r seems to be really lost, ' 744 'setting to sleeping') 745 componentState.setMood(moods.sleeping.value) 746 componentState.set('moodPending', None) 747 return None 748 749 self.debug('asked to stop a lost component without avatar') 750 workerName = componentState.get('workerRequested') 751 if workerName and self.workerHeaven.hasAvatar(workerName): 752 self.debug('checking if component has job process running') 753 d = self.workerHeaven.getAvatar(workerName).getComponents() 754 d.addCallback(gotComponents) 755 d.addCallback(gotJobRunning) 756 return d 757 else: 758 self.debug('component lacks a worker, setting to sleeping') 759 d = defer.maybeDeferred(gotJobRunning, False) 760 return d 761 762 def stopUnknown(): 763 msg = ('asked to stop a component without avatar in mood %s' 764 % moods.get(mood)) 765 self.warning(msg) 766 return defer.fail(errors.ComponentMoodError(msg)) 767 768 mood = componentState.get('mood') 769 stoppers = {moods.sad.value: stopSad, 770 moods.lost.value: stopLost} 771 return stoppers.get(mood, stopUnknown)() 772
773 - def _componentStopWithAvatar(self, componentState, componentAvatar):
774 # FIXME: This deferred is just the remote call; there's no actual 775 # deferred for completion of shutdown. 776 d = componentAvatar.stop() 777 778 return d
779
780 - def componentStop(self, componentState):
781 """ 782 Stop the given component. 783 If the component was sad, we clear its sad state as well, 784 since the stop was explicitly requested by the admin. 785 786 @type componentState: L{planet.ManagerComponentState} 787 788 @rtype: L{twisted.internet.defer.Deferred} 789 """ 790 self.debug('componentStop(%r)', componentState) 791 # We permit stopping a component even if it has a pending mood of 792 # happy, so that if it never gets to happy, we can still stop it. 793 if (componentState.get('moodPending') != None and 794 componentState.get('moodPending') != moods.happy.value): 795 self.debug("Pending mood is %r", componentState.get('moodPending')) 796 797 raise errors.BusyComponentError(componentState) 798 799 m = self.getComponentMapper(componentState) 800 if not m: 801 # We have a stale componentState for an already-deleted 802 # component 803 self.warning("Component mapper for component state %r doesn't " 804 "exist", componentState) 805 raise errors.UnknownComponentError(componentState) 806 elif not m.avatar: 807 return self._componentStopNoAvatar(componentState, m.id) 808 else: 809 return self._componentStopWithAvatar(componentState, m.avatar)
810
811 - def componentAddMessage(self, avatarId, message):
812 """ 813 Set the given message on the given component's state. 814 Can be called e.g. by a worker to report on a crashed component. 815 Sets the mood to sad if it is an error message. 816 """ 817 if not avatarId in self._componentMappers: 818 self.warning('asked to set a message on non-mapped component %s' % 819 avatarId) 820 return 821 822 m = self._componentMappers[avatarId] 823 m.state.append('messages', message) 824 if message.level == messages.ERROR: 825 self.debug('Error message makes component sad') 826 m.state.setMood(moods.sad.value)
827 828 # FIXME: unify naming of stuff like this
829 - def workerAttached(self, workerAvatar):
830 # called when a worker logs in 831 workerId = workerAvatar.avatarId 832 self.debug('vishnu.workerAttached(): id %s' % workerId) 833 834 # Create all components assigned to this worker. Note that the 835 # order of creation is unimportant, it's only the order of 836 # starting that matters (and that's different code). 837 components = [c for c in self._getComponentsToCreate() 838 if c.get('workerRequested') in (workerId, None)] 839 # So now, check what components worker is running 840 # so we can remove them from this components list 841 # also add components we have that are lost but not 842 # in list given by worker 843 d = workerAvatar.getComponents() 844 def workerAvatarComponentListReceived(workerComponents): 845 # list() is called to work around a pychecker bug. FIXME. 846 lostComponents = list([c for c in self.getComponentStates() 847 if c.get('workerRequested') == workerId and \ 848 c.get('mood') == moods.lost.value]) 849 for comp in workerComponents: 850 # comp is an avatarId string 851 # components is a list of {ManagerComponentState} 852 if comp in self._componentMappers: 853 compState = self._componentMappers[comp].state 854 if compState in components: 855 components.remove(compState) 856 if compState in lostComponents: 857 lostComponents.remove(compState) 858 859 for compState in lostComponents: 860 self.info( 861 "Restarting previously lost component %s on worker %s", 862 self._componentMappers[compState].id, workerId) 863 # We set mood to sleeping first. This allows things to 864 # distinguish between a newly-started component and a lost 865 # component logging back in. 866 compState.set('moodPending', None) 867 compState.setMood(moods.sleeping.value) 868 869 allComponents = components + lostComponents 870 871 if not allComponents: 872 self.debug( 873 "vishnu.workerAttached(): no components for this worker") 874 return 875 876 self._workerCreateComponents(workerId, allComponents)
877 d.addCallback(workerAvatarComponentListReceived) 878 879 reactor.callLater(0, self.componentHeaven.feedServerAvailable, 880 workerId) 881
882 - def _workerCreateComponents(self, workerId, components):
883 """ 884 Create the list of components on the given worker, sequentially, but 885 in no specific order. 886 887 @param workerId: avatarId of the worker 888 @type workerId: string 889 @param components: components to start 890 @type components: list of 891 L{flumotion.common.planet.ManagerComponentState} 892 """ 893 self.debug("_workerCreateComponents: workerId %r, components %r" % ( 894 workerId, components)) 895 896 if not workerId in self.workerHeaven.avatars: 897 self.debug('worker %s not logged in yet, delaying ' 898 'component start' % workerId) 899 return defer.succeed(None) 900 901 workerAvatar = self.workerHeaven.avatars[workerId] 902 903 d = defer.Deferred() 904 905 for c in components: 906 componentType = c.get('type') 907 conf = c.get('config') 908 self.debug('scheduling create of %s on %s' 909 % (conf['avatarId'], workerId)) 910 d.addCallback(self._workerCreateComponentDelayed, 911 workerAvatar, c, componentType, conf) 912 913 d.addCallback(lambda result: self.debug( 914 '_workerCreateComponents(): completed setting up create chain')) 915 916 # now trigger the chain 917 self.debug('_workerCreateComponents(): triggering create chain') 918 d.callback(None) 919 #reactor.callLater(0, d.callback, None) 920 return d
921
922 - def _workerCreateComponentDelayed(self, result, workerAvatar, 923 componentState, componentType, conf):
924 925 avatarId = conf['avatarId'] 926 nice = conf.get('nice', 0) 927 928 # we set the moodPending to HAPPY, so this component only gets 929 # asked to start once 930 componentState.set('moodPending', moods.happy.value) 931 932 d = workerAvatar.createComponent(avatarId, componentType, nice, 933 conf) 934 # FIXME: here we get the avatar Id of the component we wanted 935 # started, so now attach it to the planetState's component state 936 d.addCallback(self._createCallback, componentState) 937 d.addErrback(self._createErrback, componentState)
938 939 # FIXME: shouldn't we return d here to make sure components 940 # wait on each other to be started ? 941
942 - def _createCallback(self, result, componentState):
943 self.debug('got avatarId %s for state %s' % (result, componentState)) 944 m = self._componentMappers[componentState] 945 assert result == m.id, "received id %s is not the expected id %s" % ( 946 result, m.id)
947
948 - def _createErrback(self, failure, state):
949 # FIXME: make ConfigError copyable so we can .check() it here 950 # and print a nicer warning 951 self.warning('failed to create component %s: %s', 952 state.get('name'), log.getFailureMessage(failure)) 953 954 if failure.check(errors.ComponentAlreadyRunningError): 955 if self._componentMappers[state].jobState: 956 self.info('component appears to have logged in in the ' 957 'meantime') 958 else: 959 self.info('component appears to be running already; ' 960 'treating it as lost until it logs in') 961 state.setMood(moods.lost.value) 962 else: 963 message = messages.Error(T_( 964 N_("The component could not be started.")), 965 debug=log.getFailureMessage(failure)) 966 967 state.setMood(moods.sad.value) 968 state.append('messages', message) 969 970 return None
971
972 - def workerDetached(self, workerAvatar):
973 # called when a worker logs out 974 workerId = workerAvatar.avatarId 975 self.debug('vishnu.workerDetached(): id %s' % workerId)
976
977 - def addComponentToFlow(self, componentState, flowName):
978 # check if we have this flow yet and add if not 979 if flowName == 'atmosphere': 980 # treat the atmosphere like a flow, although it's not 981 flow = self.state.get('atmosphere') 982 else: 983 flow = self._getFlowByName(flowName) 984 if not flow: 985 self.info('Creating flow "%s"' % flowName) 986 flow = planet.ManagerFlowState() 987 flow.set('name', flowName) 988 flow.set('parent', self.state) 989 self.state.append('flows', flow) 990 991 componentState.set('parent', flow) 992 flow.append('components', componentState)
993
994 - def registerComponent(self, componentAvatar):
995 # fetch or create a new mapper 996 m = (self.getComponentMapper(componentAvatar.avatarId) 997 or ComponentMapper()) 998 999 m.state = componentAvatar.componentState 1000 m.jobState = componentAvatar.jobState 1001 m.id = componentAvatar.avatarId 1002 m.avatar = componentAvatar 1003 1004 self._componentMappers[m.state] = m 1005 self._componentMappers[m.jobState] = m 1006 self._componentMappers[m.id] = m 1007 self._componentMappers[m.avatar] = m
1008
1009 - def unregisterComponent(self, componentAvatar):
1010 # called when the component is logging out 1011 # clear up jobState and avatar 1012 self.debug('unregisterComponent(%r): cleaning up state' % 1013 componentAvatar) 1014 1015 m = self._componentMappers[componentAvatar] 1016 1017 # unmap jobstate 1018 try: 1019 del self._componentMappers[m.jobState] 1020 except KeyError: 1021 self.warning('Could not remove jobState for %r' % componentAvatar) 1022 m.jobState = None 1023 1024 m.state.set('pid', None) 1025 m.state.set('workerName', None) 1026 m.state.set('moodPending', None) 1027 1028 # unmap avatar 1029 del self._componentMappers[m.avatar] 1030 m.avatar = None
1031
1032 - def getComponentStates(self):
1033 cList = self.state.getComponents() 1034 self.debug('getComponentStates(): %d components' % len(cList)) 1035 for c in cList: 1036 self.log(repr(c)) 1037 mood = c.get('mood') 1038 if mood == None: 1039 self.warning('%s has mood None' % c.get('name')) 1040 1041 return cList
1042
1043 - def deleteComponent(self, componentState):
1044 """ 1045 Empty the planet of the given component. 1046 1047 @returns: a deferred that will fire when all listeners have been 1048 notified of the removal of the component. 1049 """ 1050 self.debug('deleting component %r from state', componentState) 1051 c = componentState 1052 if c not in self._componentMappers: 1053 raise errors.UnknownComponentError(c) 1054 1055 flow = componentState.get('parent') 1056 if (c.get('moodPending') != None 1057 or c.get('mood') is not moods.sleeping.value): 1058 raise errors.BusyComponentError(c) 1059 1060 del self._componentMappers[self._componentMappers[c].id] 1061 del self._componentMappers[c] 1062 return flow.remove('components', c)
1063
1064 - def _getFlowByName(self, flowName):
1065 for flow in self.state.get('flows'): 1066 if flow.get('name') == flowName: 1067 return flow
1068
1069 - def deleteFlow(self, flowName):
1070 """ 1071 Empty the planet of a flow. 1072 1073 @returns: a deferred that will fire when the flow is removed. 1074 """ 1075 1076 flow = self._getFlowByName(flowName) 1077 if flow is None: 1078 raise ValueError("No flow called %s found" % (flowName,)) 1079 1080 components = flow.get('components') 1081 for c in components: 1082 # if any component is already in a mood change/command, fail 1083 if (c.get('moodPending') != None or 1084 c.get('mood') is not moods.sleeping.value): 1085 raise errors.BusyComponentError(c) 1086 for c in components: 1087 del self._componentMappers[self._componentMappers[c].id] 1088 del self._componentMappers[c] 1089 d = flow.empty() 1090 d.addCallback(lambda _: self.state.remove('flows', flow)) 1091 return d
1092
1093 - def emptyPlanet(self):
1094 """ 1095 Empty the planet of all components, and flows. Also clears all 1096 messages. 1097 1098 @returns: a deferred that will fire when the planet is empty. 1099 """ 1100 for mid in self.state.get('messages').keys(): 1101 self.clearMessage(mid) 1102 1103 # first get all components to sleep 1104 components = self.getComponentStates() 1105 1106 # if any component is already in a mood change/command, fail 1107 components = [c for c in components 1108 if c.get('moodPending') != None] 1109 if components: 1110 state = components[0] 1111 raise errors.BusyComponentError( 1112 state, 1113 "moodPending is %s" % moods.get(state.get('moodPending'))) 1114 1115 # filter out the ones that aren't sleeping and stop them 1116 components = [c for c in self.getComponentStates() 1117 if c.get('mood') is not moods.sleeping.value] 1118 1119 # create a big deferred for stopping everything 1120 d = defer.Deferred() 1121 1122 self.debug('need to stop %d components: %r' % ( 1123 len(components), components)) 1124 1125 for c in components: 1126 avatar = self._componentMappers[c].avatar 1127 # If this has logged out, but isn't sleeping (so is sad or lost), 1128 # we won't have an avatar. So, stop if it we can. 1129 if avatar: 1130 d.addCallback(lambda result, a: a.stop(), avatar) 1131 else: 1132 assert (c.get('mood') is moods.sad.value or 1133 c.get('mood') is moods.lost.value) 1134 1135 d.addCallback(self._emptyPlanetCallback) 1136 1137 # trigger the deferred after returning 1138 reactor.callLater(0, d.callback, None) 1139 1140 return d
1141
1142 - def _emptyPlanetCallback(self, result):
1143 # gets called after all components have stopped 1144 # cleans up the rest of the planet state 1145 components = self.getComponentStates() 1146 self.debug('_emptyPlanetCallback: need to delete %d components' % 1147 len(components)) 1148 1149 for c in components: 1150 if c.get('mood') is not moods.sleeping.value: 1151 self.warning('Component %s is not sleeping', c.get('name')) 1152 # clear mapper; remove componentstate and id 1153 m = self._componentMappers[c] 1154 del self._componentMappers[m.id] 1155 del self._componentMappers[c] 1156 1157 # if anything's left, we have a mistake somewhere 1158 l = self._componentMappers.keys() 1159 if len(l) > 0: 1160 self.warning('mappers still has keys %r' % (repr(l))) 1161 1162 dList = [] 1163 1164 dList.append(self.state.get('atmosphere').empty()) 1165 1166 for f in self.state.get('flows'): 1167 self.debug('appending deferred for emptying flow %r' % f) 1168 dList.append(f.empty()) 1169 self.debug('appending deferred for removing flow %r' % f) 1170 dList.append(self.state.remove('flows', f)) 1171 self.debug('appended deferreds') 1172 1173 dl = defer.DeferredList(dList) 1174 return dl
1175
1176 - def _getComponentsToCreate(self):
1177 """ 1178 @rtype: list of L{flumotion.common.planet.ManagerComponentState} 1179 """ 1180 # return a list of components that are sleeping 1181 components = self.state.getComponents() 1182 1183 # filter the ones that are sleeping 1184 # NOTE: now sleeping indicates that there is no existing job 1185 # as when jobs are created, mood becomes waking, so no need to 1186 # filter on moodPending 1187 isSleeping = lambda c: c.get('mood') == moods.sleeping.value 1188 components = filter(isSleeping, components) 1189 return components
1190
1191 - def _getWorker(self, workerName):
1192 # returns the WorkerAvatar with the given name 1193 if not workerName in self.workerHeaven.avatars: 1194 raise errors.ComponentNoWorkerError("Worker %s not logged in?" 1195 % workerName) 1196 1197 return self.workerHeaven.avatars[workerName]
1198
1199 - def getWorkerFeedServerPort(self, workerName):
1200 if workerName in self.workerHeaven.avatars: 1201 return self._getWorker(workerName).feedServerPort 1202 return None
1203
1204 - def reservePortsOnWorker(self, workerName, numPorts):
1205 """ 1206 Requests a number of ports on the worker named workerName. The 1207 ports will be reserved for the use of the caller until 1208 releasePortsOnWorker is called. 1209 1210 @returns: a list of ports as integers 1211 """ 1212 return self._getWorker(workerName).reservePorts(numPorts)
1213
1214 - def releasePortsOnWorker(self, workerName, ports):
1215 """ 1216 Tells the manager that the given ports are no longer being used, 1217 and may be returned to the allocation pool. 1218 """ 1219 try: 1220 return self._getWorker(workerName).releasePorts(ports) 1221 except errors.ComponentNoWorkerError, e: 1222 self.warning('could not release ports: %r' % e.args)
1223
1224 - def getComponentMapper(self, object):
1225 """ 1226 Look up an object mapper given the object. 1227 1228 @rtype: L{ComponentMapper} or None 1229 """ 1230 if object in self._componentMappers.keys(): 1231 return self._componentMappers[object] 1232 1233 return None
1234
1235 - def getManagerComponentState(self, object):
1236 """ 1237 Look up an object mapper given the object. 1238 1239 @rtype: L{ComponentMapper} or None 1240 """ 1241 if object in self._componentMappers.keys(): 1242 return self._componentMappers[object].state 1243 1244 return None
1245