1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 manager implementation and related classes
24
25 API Stability: semi-stable
26
27 @var LOCAL_IDENTITY: an identity for the manager itself; can be used
28 to compare against to verify that the manager
29 requested an action
30 @type LOCAL_IDENTITY: L{LocalIdentity}
31 """
32
33 import os
34
35 from twisted.internet import reactor, defer
36 from twisted.python import components, failure
37 from twisted.spread import pb
38 from twisted.cred import portal
39 from zope.interface import implements
40
41 from flumotion.common import errors, interfaces, log, registry
42 from flumotion.common import planet, common, dag, messages, reflectcall, server
43 from flumotion.common.i18n import N_, gettexter
44 from flumotion.common.identity import RemoteIdentity, LocalIdentity
45 from flumotion.common.netutils import addressGetHost
46 from flumotion.common.planet import moods
47 from flumotion.configure import configure
48 from flumotion.manager import admin, component, worker, base, config
49 from flumotion.twisted import checkers
50 from flumotion.twisted import portal as fportal
51
52 __all__ = ['ManagerServerFactory', 'Vishnu']
53 __version__ = "$Rev: 6770 $"
54 T_ = gettexter()
55 LOCAL_IDENTITY = LocalIdentity('manager')
56
57
58
60 """
61 I implement L{twisted.cred.portal.IRealm}.
62 I make sure that when a L{pb.Avatar} is requested through me, the
63 Avatar being returned knows about the mind (client) requesting
64 the Avatar.
65 """
66
67 implements(portal.IRealm)
68
69 logCategory = 'dispatcher'
70
72 """
73 @param computeIdentity: see L{Vishnu.computeIdentity}
74 @type computeIdentity: callable
75 """
76 self._interfaceHeavens = {}
77 self._computeIdentity = computeIdentity
78 self._bouncer = None
79 self._avatarKeycards = {}
80
82 """
83 @param bouncer: the bouncer to authenticate with
84 @type bouncer: L{flumotion.component.bouncers.bouncer}
85 """
86 self._bouncer = bouncer
87
89 """
90 Register a Heaven as managing components with the given interface.
91
92 @type interface: L{twisted.python.components.Interface}
93 @param interface: a component interface to register the heaven with.
94 """
95 assert isinstance(heaven, base.ManagerHeaven)
96
97 self._interfaceHeavens[interface] = heaven
98
99
123
124 return (pb.IPerspective, avatar, cleanup)
125
126 def got_error(failure):
127
128
129
130
131
132 reactor.callLater(0, mind.broker.transport.loseConnection)
133 return failure
134
135 if pb.IPerspective not in ifaces:
136 raise errors.NoPerspectiveError(avatarId)
137 if len(ifaces) != 2:
138
139 raise errors.NoPerspectiveError(avatarId)
140 iface = [x for x in ifaces if x != pb.IPerspective][0]
141 if iface not in self._interfaceHeavens:
142 self.warning('unknown interface %r', iface)
143 raise errors.NoPerspectiveError(avatarId)
144
145 heaven = self._interfaceHeavens[iface]
146 klass = heaven.avatarClass
147 host = addressGetHost(mind.broker.transport.getPeer())
148 d = self._computeIdentity(keycard, host)
149 d.addCallback(lambda identity: \
150 klass.makeAvatar(heaven, avatarId, identity, mind))
151 d.addCallbacks(got_avatar, got_error)
152 return d
153
155 """
156 I am an object that ties together different objects related to a
157 component. I am used as values in a lookup hash in the vishnu.
158 """
160 self.state = None
161 self.id = None
162 self.avatar = None
163 self.jobState = None
164
166 """
167 I am the toplevel manager object that knows about all heavens and factories.
168
169 @cvar dispatcher: dispatcher to create avatars
170 @type dispatcher: L{Dispatcher}
171 @cvar workerHeaven: the worker heaven
172 @type workerHeaven: L{worker.WorkerHeaven}
173 @cvar componentHeaven: the component heaven
174 @type componentHeaven: L{component.ComponentHeaven}
175 @cvar adminHeaven: the admin heaven
176 @type adminHeaven: L{admin.AdminHeaven}
177 @cvar configDir: the configuration directory for this Vishnu's manager
178 @type configDir: str
179 """
180
181 implements(server.IServable)
182
183 logCategory = "vishnu"
184
185 - def __init__(self, name, unsafeTracebacks=0, configDir=None):
200 reactor.addSystemEventTrigger('before', 'shutdown', setStopped)
201
202 if configDir is not None:
203 self.configDir = configDir
204 else:
205 self.configDir = os.path.join(configure.configdir,
206 "managers", name)
207
208 self.bouncer = None
209
210 self.bundlerBasket = registry.getRegistry().makeBundlerBasket()
211
212 self._componentMappers = {}
213
214 self.state = planet.ManagerPlanetState()
215 self.state.set('name', name)
216
217 self.plugs = {}
218
219
220
221 self.portal = fportal.BouncerPortal(self.dispatcher, None)
222
223 self.factory = pb.PBServerFactory(self.portal,
224 unsafeTracebacks=unsafeTracebacks)
225 self.connectionInfo = {}
226 self.setConnectionInfo(None, None, None)
227
229 """Cancel any pending operations in preparation for shutdown.
230
231 This method is mostly useful for unit tests; currently, it is
232 not called during normal operation. Note that the caller is
233 responsible for stopping listening on the port, as the the
234 manager does not have a handle on the twisted port object.
235
236 @returns: A deferred that will fire when the manager has shut
237 down.
238 """
239 if self.bouncer:
240 return self.bouncer.stop()
241 else:
242 return defer.succeed(None)
243
247
249 """Returns the manager's configuration as a string suitable for
250 importing via loadConfiguration().
251 """
252 return config.exportPlanetXml(self.state)
253
268
269 - def addMessage(self, level, id, format, *args, **kwargs):
270 """
271 Convenience message to construct a message and add it to the
272 planet state. `format' should be marked as translatable in the
273 source with N_, and *args will be stored as format arguments.
274 Keyword arguments are passed on to the message constructor. See
275 L{flumotion.common.messages.Message} for the meanings of the
276 rest of the arguments.
277
278 For example::
279
280 self.addMessage(messages.WARNING, 'foo-warning',
281 N_('The answer is %d'), 42, debug='not really')
282 """
283 self.addMessageObject(messages.Message(level,
284 T_(format, *args),
285 id=id, **kwargs))
286
288 """
289 Add a message to the planet state.
290
291 @type message: L{flumotion.common.messages.Message}
292 """
293 self.state.setitem('messages', message.id, message)
294
296 """
297 Clear any messages with the given message ID from the planet
298 state.
299
300 @type mid: message ID, normally a str
301 """
302 if mid in self.state.get('messages'):
303 self.state.delitem('messages', mid)
304
306 """
307 @param identity: L{flumotion.common.identity.Identity}
308 """
309 socket = 'flumotion.component.plugs.adminaction.AdminAction'
310 if self.plugs.has_key(socket):
311 for plug in self.plugs[socket]:
312 plug.action(identity, message, args, kw)
313
315 """
316 Compute a suitable identity for a remote host. First looks to
317 see if there is a
318 flumotion.component.plugs.identity.IdentityProvider plug
319 installed on the manager, falling back to user@host.
320
321 The identity is only used in the adminaction interface. An
322 example of its use is when you have an adminaction plug that
323 checks an admin's privileges before actually doing an action;
324 the identity object you use here might store the privileges that
325 the admin has.
326
327 @param keycard: the keycard that the remote host used to log in.
328 @type keycard: L{flumotion.common.keycards.Keycard}
329 @param remoteHost: the ip of the remote host
330 @type remoteHost: str
331
332 @rtype: a deferred that will fire a
333 L{flumotion.common.identity.RemoteIdentity}
334 """
335
336 socket = 'flumotion.component.plugs.identity.IdentityProvider'
337 if self.plugs.has_key(socket):
338 for plug in self.plugs[socket]:
339 identity = plug.computeIdentity(keycard, remoteHost)
340 if identity:
341 return identity
342 username = getattr(keycard, 'username', None)
343 return defer.succeed(RemoteIdentity(username, remoteHost))
344
346 """
347 Add a component state for the given component config entry.
348
349 @rtype: L{flumotion.common.planet.ManagerComponentState}
350 """
351
352 self.debug('adding component %s to %s'
353 % (conf.name, parent.get('name')))
354
355 if identity != LOCAL_IDENTITY:
356 self.adminAction(identity, '_addComponent', (conf, parent), {})
357
358 state = planet.ManagerComponentState()
359 state.set('name', conf.name)
360 state.set('type', conf.getType())
361 state.set('workerRequested', conf.worker)
362 state.setMood(moods.sleeping.value)
363 state.set('config', conf.getConfigDict())
364
365 state.set('parent', parent)
366 parent.append('components', state)
367
368 avatarId = conf.getConfigDict()['avatarId']
369
370 self.clearMessage('loadComponent-%s' % avatarId)
371
372
373
374 if not common.checkVersionsCompat(conf.getConfigDict()['version'],
375 configure.versionTuple):
376 m = messages.Warning(T_(N_("This component is configured for "
377 "Flumotion version %s, but you are running version %s.\n"
378 "Please update the configuration of the component.\n"),
379 common.versionTupleToString(conf.getConfigDict()['version']),
380 configure.version))
381 state.append('messages', m)
382
383
384 m = ComponentMapper()
385 m.state = state
386 m.id = avatarId
387 self._componentMappers[state] = m
388 self._componentMappers[avatarId] = m
389
390 return state
391
393 """
394 Add a new config object into the planet state.
395
396 @returns: a list of all components added
397 @rtype: list of L{flumotion.common.planet.ManagerComponentState}
398 """
399
400 self.debug('syncing up planet state with config')
401 added = []
402
403 def checkNotRunning(comp, parentState):
404 name = comp.getName()
405
406 comps = dict([(x.get('name'), x)
407 for x in parentState.get('components')])
408 runningComps = dict([(x.get('name'), x)
409 for x in parentState.get('components')
410 if x.get('mood') != moods.sleeping.value])
411 if name not in comps:
412
413 return True
414 elif name not in runningComps:
415
416
417 oldComp = comps[name]
418 self.deleteComponent(oldComp)
419 return True
420
421
422
423
424 parent = comps[name].get('parent').get('name')
425 newConf = c.getConfigDict()
426 oldConf = comps[name].get('config')
427
428 if newConf == oldConf:
429 self.debug('%s already has component %s running with '
430 'same configuration', parent, name)
431 self.clearMessage('loadComponent-%s' % oldConf['avatarId'])
432 return False
433
434 self.info('%s already has component %s, but configuration '
435 'not the same -- notifying admin', parent, name)
436
437 diff = config.dictDiff(oldConf, newConf)
438 diffMsg = config.dictDiffMessageString(diff, 'existing', 'new')
439
440 self.addMessage(messages.WARNING,
441 'loadComponent-%s' % oldConf['avatarId'],
442 N_('Could not load component %r into %r: '
443 'a component is already running with '
444 'this name, but has a different '
445 'configuration.'), name, parent,
446 debug=diffMsg)
447 return False
448
449 state = self.state
450 atmosphere = state.get('atmosphere')
451 for c in conf.atmosphere.components.values():
452 if checkNotRunning(c, atmosphere):
453 added.append(self._addComponent(c, atmosphere, identity))
454
455 flows = dict([(x.get('name'), x) for x in state.get('flows')])
456 for f in conf.flows:
457 if f.name in flows:
458 flow = flows[f.name]
459 else:
460 self.info('creating flow %r', f.name)
461 flow = planet.ManagerFlowState(name=f.name, parent=state)
462 state.append('flows', flow)
463
464 for c in f.components.values():
465 if checkNotRunning(c, flow):
466 added.append(self._addComponent(c, flow, identity))
467
468 return added
469
471
472
473 componentsToStart = {}
474 for c in components:
475 workerId = c.get('workerRequested')
476 if not workerId in componentsToStart:
477 componentsToStart[workerId] = []
478 componentsToStart[workerId].append(c)
479 self.debug('_startComponents: componentsToStart %r' % componentsToStart)
480
481 for workerId, componentStates in componentsToStart.items():
482 self._workerCreateComponents(workerId, componentStates)
483
490
492 """
493 Load the configuration from the given XML, merging it on top of
494 the currently running configuration.
495
496 @param file: file to parse, either as an open file object,
497 or as the name of a file to open
498 @type file: str or file
499 @param identity: The identity making this request.. This is used by the
500 adminaction logging mechanism in order to say who is
501 performing the action.
502 @type identity: L{flumotion.common.identity.Identity}
503 """
504 self.debug('loading configuration')
505 mid = 'loadComponent-parse-error'
506 if isinstance(file, str):
507 mid += '-%s' % file
508 try:
509 self.clearMessage(mid)
510 conf = config.PlanetConfigParser(file)
511 conf.parse()
512 return self._loadComponentConfiguration(conf, identity)
513 except errors.ConfigError, e:
514 self.addMessage(messages.WARNING, mid,
515 N_('Invalid component configuration.'),
516 debug=e.args[0])
517 return defer.fail(e)
518 except errors.UnknownComponentError, e:
519 if isinstance(file, str):
520 debug = 'Configuration loaded from file %r' % file
521 else:
522 debug = 'Configuration loaded remotely'
523 self.addMessage(messages.WARNING, mid,
524 N_('Unknown component in configuration: %s.'),
525 e.args[0], debug=debug)
526 return defer.fail(e)
527 except Exception, e:
528 self.addMessage(messages.WARNING, mid,
529 N_('Unknown error while loading configuration.'),
530 debug=log.getExceptionMessage(e))
531 return defer.fail(e)
532
549
555
576 def setupErrback(failure):
577 self.warning('Error starting manager bouncer')
578 d.addCallbacks(setupCallback, setupErrback)
579 return d
580
597
598 __pychecker__ = 'maxargs=11'
599 - def loadComponent(self, identity, componentType, componentId,
600 componentLabel, properties, workerName,
601 plugs, eaters, isClockMaster, virtualFeeds):
602 """
603 Load a component into the manager configuration.
604
605 See L{flumotion.manager.admin.AdminAvatar.perspective_loadComponent}
606 for a definition of the argument types.
607 """
608 self.debug('loading %s component %s on %s',
609 componentType, componentId, workerName)
610 parentName, compName = common.parseComponentId(componentId)
611
612 if isClockMaster:
613 raise NotImplementedError("Clock master components are not "
614 "yet supported")
615 if worker is None:
616 raise errors.ConfigError("Component %r needs to specify the"
617 " worker on which it should run"
618 % componentId)
619
620 state = self.state
621 compState = None
622
623 compConf = config.ConfigEntryComponent(compName, parentName,
624 componentType,
625 componentLabel,
626 properties,
627 plugs, workerName,
628 eaters, isClockMaster,
629 None, None, virtualFeeds)
630
631 if compConf.defs.getNeedsSynchronization():
632 raise NotImplementedError("Components that need "
633 "synchronization are not yet "
634 "supported")
635
636 if parentName == 'atmosphere':
637 parentState = state.get('atmosphere')
638 else:
639 flows = dict([(x.get('name'), x) for x in state.get('flows')])
640 if parentName in flows:
641 parentState = flows[parentName]
642 else:
643 self.info('creating flow %r', parentName)
644 parentState = planet.ManagerFlowState(name=parentName,
645 parent=state)
646 state.append('flows', parentState)
647
648 components = [x.get('name') for x in parentState.get('components')]
649 if compName in components:
650 self.debug('%r already has component %r', parentName, compName)
651 raise errors.ComponentAlreadyExistsError(compName)
652
653 compState = self._addComponent(compConf, parentState, identity)
654
655 self._startComponents([compState], identity)
656
657 return compState
658
660 """
661 Create a heaven of the given klass that will send avatars to clients
662 implementing the given medium interface.
663
664 @param interface: the medium interface to create a heaven for
665 @type interface: L{flumotion.common.interfaces.IMedium}
666 @param klass: the type of heaven to create
667 @type klass: an implementor of L{flumotion.common.interfaces.IHeaven}
668 """
669 assert issubclass(interface, interfaces.IMedium)
670 heaven = klass(self)
671 self.dispatcher.registerHeaven(heaven, interface)
672 return heaven
673
684
687
689 """
690 Create the given component. This will currently also trigger
691 a start eventually when the component avatar attaches.
692
693 The component should be sleeping.
694 The worker it should be started on should be present.
695 """
696 m = componentState.get('mood')
697 if m != moods.sleeping.value:
698 raise errors.ComponentMoodError("%r not sleeping but %s" % (
699 componentState, moods.get(m).name))
700
701 p = componentState.get('moodPending')
702 if p != None:
703 raise errors.ComponentMoodError(
704 "%r already has a pending mood %s" % (
705 componentState, moods.get(p).name))
706
707
708 workerId = (componentState.get('workerName')
709 or componentState.get('workerRequested'))
710
711 if not workerId in self.workerHeaven.avatars:
712 raise errors.ComponentNoWorkerError(
713 "worker %s is not logged in" % workerId)
714 else:
715 return self._workerCreateComponents(workerId, [componentState])
716
718
719
720
721 def stopSad():
722 self.debug('asked to stop a sad component without avatar')
723 for mid in componentState.get('messages')[:]:
724 self.debug("Deleting message %r", mid)
725 componentState.remove('messages', mid)
726
727 componentState.setMood(moods.sleeping.value)
728 componentState.set('moodPending', None)
729 return defer.succeed(None)
730
731 def stopLost():
732 def gotComponents(comps):
733 return avatarId in comps
734 def gotJobRunning(running):
735 if running:
736 self.warning('asked to stop lost component %r, but '
737 'it is still running', avatarId)
738
739
740 msg = "Cannot stop lost component which is still running."
741 raise errors.ComponentMoodError(msg)
742 else:
743 self.debug('component %r seems to be really lost, '
744 'setting to sleeping')
745 componentState.setMood(moods.sleeping.value)
746 componentState.set('moodPending', None)
747 return None
748
749 self.debug('asked to stop a lost component without avatar')
750 workerName = componentState.get('workerRequested')
751 if workerName and self.workerHeaven.hasAvatar(workerName):
752 self.debug('checking if component has job process running')
753 d = self.workerHeaven.getAvatar(workerName).getComponents()
754 d.addCallback(gotComponents)
755 d.addCallback(gotJobRunning)
756 return d
757 else:
758 self.debug('component lacks a worker, setting to sleeping')
759 d = defer.maybeDeferred(gotJobRunning, False)
760 return d
761
762 def stopUnknown():
763 msg = ('asked to stop a component without avatar in mood %s'
764 % moods.get(mood))
765 self.warning(msg)
766 return defer.fail(errors.ComponentMoodError(msg))
767
768 mood = componentState.get('mood')
769 stoppers = {moods.sad.value: stopSad,
770 moods.lost.value: stopLost}
771 return stoppers.get(mood, stopUnknown)()
772
774
775
776 d = componentAvatar.stop()
777
778 return d
779
781 """
782 Stop the given component.
783 If the component was sad, we clear its sad state as well,
784 since the stop was explicitly requested by the admin.
785
786 @type componentState: L{planet.ManagerComponentState}
787
788 @rtype: L{twisted.internet.defer.Deferred}
789 """
790 self.debug('componentStop(%r)', componentState)
791
792
793 if (componentState.get('moodPending') != None and
794 componentState.get('moodPending') != moods.happy.value):
795 self.debug("Pending mood is %r", componentState.get('moodPending'))
796
797 raise errors.BusyComponentError(componentState)
798
799 m = self.getComponentMapper(componentState)
800 if not m:
801
802
803 self.warning("Component mapper for component state %r doesn't "
804 "exist", componentState)
805 raise errors.UnknownComponentError(componentState)
806 elif not m.avatar:
807 return self._componentStopNoAvatar(componentState, m.id)
808 else:
809 return self._componentStopWithAvatar(componentState, m.avatar)
810
812 """
813 Set the given message on the given component's state.
814 Can be called e.g. by a worker to report on a crashed component.
815 Sets the mood to sad if it is an error message.
816 """
817 if not avatarId in self._componentMappers:
818 self.warning('asked to set a message on non-mapped component %s' %
819 avatarId)
820 return
821
822 m = self._componentMappers[avatarId]
823 m.state.append('messages', message)
824 if message.level == messages.ERROR:
825 self.debug('Error message makes component sad')
826 m.state.setMood(moods.sad.value)
827
828
830
831 workerId = workerAvatar.avatarId
832 self.debug('vishnu.workerAttached(): id %s' % workerId)
833
834
835
836
837 components = [c for c in self._getComponentsToCreate()
838 if c.get('workerRequested') in (workerId, None)]
839
840
841
842
843 d = workerAvatar.getComponents()
844 def workerAvatarComponentListReceived(workerComponents):
845
846 lostComponents = list([c for c in self.getComponentStates()
847 if c.get('workerRequested') == workerId and \
848 c.get('mood') == moods.lost.value])
849 for comp in workerComponents:
850
851
852 if comp in self._componentMappers:
853 compState = self._componentMappers[comp].state
854 if compState in components:
855 components.remove(compState)
856 if compState in lostComponents:
857 lostComponents.remove(compState)
858
859 for compState in lostComponents:
860 self.info(
861 "Restarting previously lost component %s on worker %s",
862 self._componentMappers[compState].id, workerId)
863
864
865
866 compState.set('moodPending', None)
867 compState.setMood(moods.sleeping.value)
868
869 allComponents = components + lostComponents
870
871 if not allComponents:
872 self.debug(
873 "vishnu.workerAttached(): no components for this worker")
874 return
875
876 self._workerCreateComponents(workerId, allComponents)
877 d.addCallback(workerAvatarComponentListReceived)
878
879 reactor.callLater(0, self.componentHeaven.feedServerAvailable,
880 workerId)
881
883 """
884 Create the list of components on the given worker, sequentially, but
885 in no specific order.
886
887 @param workerId: avatarId of the worker
888 @type workerId: string
889 @param components: components to start
890 @type components: list of
891 L{flumotion.common.planet.ManagerComponentState}
892 """
893 self.debug("_workerCreateComponents: workerId %r, components %r" % (
894 workerId, components))
895
896 if not workerId in self.workerHeaven.avatars:
897 self.debug('worker %s not logged in yet, delaying '
898 'component start' % workerId)
899 return defer.succeed(None)
900
901 workerAvatar = self.workerHeaven.avatars[workerId]
902
903 d = defer.Deferred()
904
905 for c in components:
906 componentType = c.get('type')
907 conf = c.get('config')
908 self.debug('scheduling create of %s on %s'
909 % (conf['avatarId'], workerId))
910 d.addCallback(self._workerCreateComponentDelayed,
911 workerAvatar, c, componentType, conf)
912
913 d.addCallback(lambda result: self.debug(
914 '_workerCreateComponents(): completed setting up create chain'))
915
916
917 self.debug('_workerCreateComponents(): triggering create chain')
918 d.callback(None)
919
920 return d
921
938
939
940
941
943 self.debug('got avatarId %s for state %s' % (result, componentState))
944 m = self._componentMappers[componentState]
945 assert result == m.id, "received id %s is not the expected id %s" % (
946 result, m.id)
947
971
973
974 workerId = workerAvatar.avatarId
975 self.debug('vishnu.workerDetached(): id %s' % workerId)
976
978
979 if flowName == 'atmosphere':
980
981 flow = self.state.get('atmosphere')
982 else:
983 flow = self._getFlowByName(flowName)
984 if not flow:
985 self.info('Creating flow "%s"' % flowName)
986 flow = planet.ManagerFlowState()
987 flow.set('name', flowName)
988 flow.set('parent', self.state)
989 self.state.append('flows', flow)
990
991 componentState.set('parent', flow)
992 flow.append('components', componentState)
993
995
996 m = (self.getComponentMapper(componentAvatar.avatarId)
997 or ComponentMapper())
998
999 m.state = componentAvatar.componentState
1000 m.jobState = componentAvatar.jobState
1001 m.id = componentAvatar.avatarId
1002 m.avatar = componentAvatar
1003
1004 self._componentMappers[m.state] = m
1005 self._componentMappers[m.jobState] = m
1006 self._componentMappers[m.id] = m
1007 self._componentMappers[m.avatar] = m
1008
1010
1011
1012 self.debug('unregisterComponent(%r): cleaning up state' %
1013 componentAvatar)
1014
1015 m = self._componentMappers[componentAvatar]
1016
1017
1018 try:
1019 del self._componentMappers[m.jobState]
1020 except KeyError:
1021 self.warning('Could not remove jobState for %r' % componentAvatar)
1022 m.jobState = None
1023
1024 m.state.set('pid', None)
1025 m.state.set('workerName', None)
1026 m.state.set('moodPending', None)
1027
1028
1029 del self._componentMappers[m.avatar]
1030 m.avatar = None
1031
1033 cList = self.state.getComponents()
1034 self.debug('getComponentStates(): %d components' % len(cList))
1035 for c in cList:
1036 self.log(repr(c))
1037 mood = c.get('mood')
1038 if mood == None:
1039 self.warning('%s has mood None' % c.get('name'))
1040
1041 return cList
1042
1044 """
1045 Empty the planet of the given component.
1046
1047 @returns: a deferred that will fire when all listeners have been
1048 notified of the removal of the component.
1049 """
1050 self.debug('deleting component %r from state', componentState)
1051 c = componentState
1052 if c not in self._componentMappers:
1053 raise errors.UnknownComponentError(c)
1054
1055 flow = componentState.get('parent')
1056 if (c.get('moodPending') != None
1057 or c.get('mood') is not moods.sleeping.value):
1058 raise errors.BusyComponentError(c)
1059
1060 del self._componentMappers[self._componentMappers[c].id]
1061 del self._componentMappers[c]
1062 return flow.remove('components', c)
1063
1065 for flow in self.state.get('flows'):
1066 if flow.get('name') == flowName:
1067 return flow
1068
1070 """
1071 Empty the planet of a flow.
1072
1073 @returns: a deferred that will fire when the flow is removed.
1074 """
1075
1076 flow = self._getFlowByName(flowName)
1077 if flow is None:
1078 raise ValueError("No flow called %s found" % (flowName,))
1079
1080 components = flow.get('components')
1081 for c in components:
1082
1083 if (c.get('moodPending') != None or
1084 c.get('mood') is not moods.sleeping.value):
1085 raise errors.BusyComponentError(c)
1086 for c in components:
1087 del self._componentMappers[self._componentMappers[c].id]
1088 del self._componentMappers[c]
1089 d = flow.empty()
1090 d.addCallback(lambda _: self.state.remove('flows', flow))
1091 return d
1092
1094 """
1095 Empty the planet of all components, and flows. Also clears all
1096 messages.
1097
1098 @returns: a deferred that will fire when the planet is empty.
1099 """
1100 for mid in self.state.get('messages').keys():
1101 self.clearMessage(mid)
1102
1103
1104 components = self.getComponentStates()
1105
1106
1107 components = [c for c in components
1108 if c.get('moodPending') != None]
1109 if components:
1110 state = components[0]
1111 raise errors.BusyComponentError(
1112 state,
1113 "moodPending is %s" % moods.get(state.get('moodPending')))
1114
1115
1116 components = [c for c in self.getComponentStates()
1117 if c.get('mood') is not moods.sleeping.value]
1118
1119
1120 d = defer.Deferred()
1121
1122 self.debug('need to stop %d components: %r' % (
1123 len(components), components))
1124
1125 for c in components:
1126 avatar = self._componentMappers[c].avatar
1127
1128
1129 if avatar:
1130 d.addCallback(lambda result, a: a.stop(), avatar)
1131 else:
1132 assert (c.get('mood') is moods.sad.value or
1133 c.get('mood') is moods.lost.value)
1134
1135 d.addCallback(self._emptyPlanetCallback)
1136
1137
1138 reactor.callLater(0, d.callback, None)
1139
1140 return d
1141
1143
1144
1145 components = self.getComponentStates()
1146 self.debug('_emptyPlanetCallback: need to delete %d components' %
1147 len(components))
1148
1149 for c in components:
1150 if c.get('mood') is not moods.sleeping.value:
1151 self.warning('Component %s is not sleeping', c.get('name'))
1152
1153 m = self._componentMappers[c]
1154 del self._componentMappers[m.id]
1155 del self._componentMappers[c]
1156
1157
1158 l = self._componentMappers.keys()
1159 if len(l) > 0:
1160 self.warning('mappers still has keys %r' % (repr(l)))
1161
1162 dList = []
1163
1164 dList.append(self.state.get('atmosphere').empty())
1165
1166 for f in self.state.get('flows'):
1167 self.debug('appending deferred for emptying flow %r' % f)
1168 dList.append(f.empty())
1169 self.debug('appending deferred for removing flow %r' % f)
1170 dList.append(self.state.remove('flows', f))
1171 self.debug('appended deferreds')
1172
1173 dl = defer.DeferredList(dList)
1174 return dl
1175
1177 """
1178 @rtype: list of L{flumotion.common.planet.ManagerComponentState}
1179 """
1180
1181 components = self.state.getComponents()
1182
1183
1184
1185
1186
1187 isSleeping = lambda c: c.get('mood') == moods.sleeping.value
1188 components = filter(isSleeping, components)
1189 return components
1190
1192
1193 if not workerName in self.workerHeaven.avatars:
1194 raise errors.ComponentNoWorkerError("Worker %s not logged in?"
1195 % workerName)
1196
1197 return self.workerHeaven.avatars[workerName]
1198
1200 if workerName in self.workerHeaven.avatars:
1201 return self._getWorker(workerName).feedServerPort
1202 return None
1203
1205 """
1206 Requests a number of ports on the worker named workerName. The
1207 ports will be reserved for the use of the caller until
1208 releasePortsOnWorker is called.
1209
1210 @returns: a list of ports as integers
1211 """
1212 return self._getWorker(workerName).reservePorts(numPorts)
1213
1215 """
1216 Tells the manager that the given ports are no longer being used,
1217 and may be returned to the allocation pool.
1218 """
1219 try:
1220 return self._getWorker(workerName).releasePorts(ports)
1221 except errors.ComponentNoWorkerError, e:
1222 self.warning('could not release ports: %r' % e.args)
1223
1225 """
1226 Look up an object mapper given the object.
1227
1228 @rtype: L{ComponentMapper} or None
1229 """
1230 if object in self._componentMappers.keys():
1231 return self._componentMappers[object]
1232
1233 return None
1234
1236 """
1237 Look up an object mapper given the object.
1238
1239 @rtype: L{ComponentMapper} or None
1240 """
1241 if object in self._componentMappers.keys():
1242 return self._componentMappers[object].state
1243
1244 return None
1245