1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 manager-side objects for components
24
25 API Stability: semi-stable
26 """
27
28 import time
29
30 from twisted.spread import pb
31 from twisted.internet import reactor, defer
32 from twisted.internet import error as terror
33 from twisted.python.failure import Failure
34 from zope.interface import implements
35
36 from flumotion.configure import configure
37 from flumotion.manager import base, config
38 from flumotion.common import errors, interfaces, keycards, log, planet
39 from flumotion.common import messages, common
40 from flumotion.common.i18n import N_, gettexter
41 from flumotion.common.planet import moods
42 from flumotion.twisted import flavors
43
44 __version__ = "$Rev: 6947 $"
45 T_ = gettexter()
46
47
49 """
50 I am a Manager-side avatar for a component.
51 I live in the L{ComponentHeaven}.
52
53 Each component that logs in to the manager gets an avatar created for it
54 in the manager.
55
56 @cvar avatarId: the L{componentId<common.componentId>}
57 @type avatarId: str
58 @cvar jobState: job state of this avatar's component
59 @type jobState: L{flumotion.common.planet.ManagerJobState}
60 @cvar componentState: component state of this avatar's component
61 @type componentState: L{flumotion.common.planet.ManagerComponentState}
62 """
63
64 logCategory = 'comp-avatar'
65
66 - def __init__(self, heaven, avatarId, remoteIdentity, mind, conf,
67 jobState, clocking):
83
84
86 mood = '(unknown)'
87 if self.componentState:
88 moodValue = self.componentState.get('mood')
89 if moodValue is not None:
90 mood = moods.get(moodValue).name
91 return '<%s %s (mood %s)>' % (self.__class__.__name__,
92 self.avatarId, mood)
93
94
97 def gotStates(result):
98 (_s1, conf), (_s2, jobState), (_s3, clocking) = result
99 assert _s1 and _s2 and _s3
100 log.debug('component-avatar', 'got state information')
101 return (heaven, avatarId, remoteIdentity, mind,
102 conf, jobState, clocking)
103 log.debug('component-avatar', 'calling mind for state information')
104 d = defer.DeferredList([mind.callRemote('getConfig'),
105 mind.callRemote('getState'),
106 mind.callRemote('getMasterClockInfo')],
107 fireOnOneErrback=True)
108 d.addCallback(gotStates)
109 return d
110 makeAvatarInitArgs = classmethod(makeAvatarInitArgs)
111
138
139
140 - def addMessage(self, level, id, format, *args, **kwargs):
141 """
142 Convenience message to construct a message and add it to the
143 component state. `format' should be marked as translatable in
144 the source with N_, and *args will be stored as format
145 arguments. Keyword arguments are passed on to the message
146 constructor. See L{flumotion.common.messages.Message} for the
147 meanings of the rest of the arguments.
148
149 For example::
150
151 self.addMessage(messages.WARNING, 'foo-warning',
152 N_('The answer is %d'), 42, debug='not really')
153 """
154 self.addMessageObject(messages.Message(level,
155 T_(format, *args),
156 id=id, **kwargs))
157
159 """
160 Add a message to the planet state.
161
162 @type message: L{flumotion.common.messages.Message}
163 """
164 self.componentState.append('messages', message)
165
182
184
185
186
187
188
189
190
191
192 def verifyExistingComponentState(conf, state):
193
194 state.setJobState(self.jobState)
195 self.componentState = state
196
197 self.upgradeConfig(state, conf)
198 if state.get('config') != conf:
199 diff = config.dictDiff(state.get('config'), conf)
200 diffMsg = config.dictDiffMessageString(diff,
201 'internal conf',
202 'running conf')
203 self.addMessage(messages.WARNING, 'stale-config',
204 N_("Component logged in with stale "
205 "configuration. To fix this, stop "
206 "this component and then restart "
207 "the manager."),
208 debug=("Updating internal conf from "
209 "running conf:\n" + diffMsg))
210 self.warning('updating internal component state for %r',
211 state)
212 self.debug('changes to conf: %s',
213 config.dictDiffMessageString(diff))
214 state.set('config', conf)
215
216 def makeNewComponentState(conf):
217
218 state = planet.ManagerComponentState()
219 state.setJobState(self.jobState)
220 self.componentState = state
221
222 self.upgradeConfig(state, conf)
223
224 flowName, compName = conf['parent'], conf['name']
225
226 state.set('name', compName)
227 state.set('type', conf['type'])
228 state.set('workerRequested', self.jobState.get('workerName'))
229 state.set('config', conf)
230 self.vishnu.addComponentToFlow(state, flowName)
231 return state
232
233 mState = self.vishnu.getManagerComponentState(self.avatarId)
234 if mState:
235 verifyExistingComponentState(conf, mState)
236 else:
237 makeNewComponentState(conf)
238
240 """
241 Tell the component to provide a master clock.
242
243 @rtype: L{twisted.internet.defer.Deferred}
244 """
245 def success(clocking):
246 self.clocking = clocking
247 self.heaven.masterClockAvailable(self.avatarId, clocking)
248
249 def error(failure):
250 self.addMessage(messages.WARNING, 'provide-master-clock',
251 N_('Failed to provide the master clock'),
252 debug=log.getFailureMessage(failure))
253 self.vishnu.releasePortsOnWorker(self.getWorkerName(), [port])
254
255 if self.clocking:
256 self.heaven.masterClockAvailable(self.avatarId, self.clocking)
257 else:
258 (port,) = self.vishnu.reservePortsOnWorker(self.getWorkerName(), 1)
259 self.debug('provideMasterClock on port %d', port)
260
261 d = self.mindCallRemote('provideMasterClock', port)
262 d.addCallbacks(success, error)
263
265 """
266 Returns the port on which a feed server for this component is
267 listening on.
268
269 @rtype: int
270 """
271 return self.vishnu.getWorkerFeedServerPort(self.getWorkerName())
272
274 """
275 Get the IP address of the manager as seen by the component.
276
277 @rtype: str
278 """
279 return self.jobState.get('manager-ip')
280
282 """
283 Return the name of the worker.
284
285 @rtype: str
286 """
287 return self.jobState.get('workerName')
288
290 """
291 Return the PID of the component.
292
293 @rtype: int
294 """
295 return self.jobState.get('pid')
296
298 """
299 Get the name of the component.
300
301 @rtype: str
302 """
303 return self.componentState.get('name')
304
306 """
307 Get the name of the component's parent.
308
309 @rtype: str
310 """
311 return self.componentState.get('parent').get('name')
312
314 """
315 Get the component type name of the component.
316
317 @rtype: str
318 """
319 return self.componentState.get('type')
320
322 """
323 Get the set of eaters that this component eats from.
324
325 @rtype: dict of eaterName -> [(feedId, eaterAlias)]
326 """
327 return self.componentState.get('config').get('eater', {})
328
330 """
331 Get the list of feeders that this component provides.
332
333 @rtype: list of feederName
334 """
335 return self.componentState.get('config').get('feed', [])
336
338 """
339 Get the feedId of a feed provided or consumed by this component.
340
341 @param feedName: The name of the feed (i.e., eater alias or
342 feeder name)
343 @rtype: L{flumotion.common.common.feedId}
344 """
345 return common.feedId(self.getName(), feedName)
346
348 """
349 Get the full feedId of a feed provided or consumed by this
350 component.
351
352 @param feedName: The name of the feed (i.e., eater alias or
353 feeder name)
354 @rtype: L{flumotion.common.common.fullFeedId}
355 """
356 return common.fullFeedId(self.getParentName(), self.getName(), feedName)
357
359 """
360 Get the set of virtual feeds provided by this component.
361
362 @rtype: dict of fullFeedId -> (ComponentAvatar, feederName)
363 """
364 conf = self.componentState.get('config')
365 ret = {}
366 for feedId, feederName in conf.get('virtual-feeds', {}).items():
367 vComp, vFeed = common.parseFeedId(feedId)
368 ffid = common.fullFeedId(self.getParentName(), vComp, vFeed)
369 ret[ffid] = (self, feederName)
370 return ret
371
373 """
374 Get the worker that this component should run on.
375
376 @rtype: str
377 """
378 return self.componentState.get('workerRequested')
379
381 """
382 Get this component's clock master, if any.
383
384 @rtype: avatarId or None
385 """
386 return self.componentState.get('config')['clock-master']
387
389 """
390 Tell the remote component to shut down.
391 """
392 self._shutdownDeferred = defer.Deferred()
393
394 self.mindCallRemote('stop')
395
396 return self._shutdownDeferred
397
401
402 - def eatFrom(self, eaterAlias, fullFeedId, host, port):
406
407 - def feedTo(self, feederName, fullFeedId, host, port):
411
412
414 """
415 Authenticate the given keycard.
416 Gets proxied to L{flumotion.component.bouncers.bouncer.""" \
417 """BouncerMedium.remote_authenticate}
418 The component should be a subclass of
419 L{flumotion.component.bouncers.bouncer.Bouncer}
420
421 @type keycard: L{flumotion.common.keycards.Keycard}
422 """
423 return self.mindCallRemote('authenticate', keycard)
424
426 """
427 Remove a keycard managed by this bouncer because the requester
428 has gone.
429
430 @type keycardId: str
431 """
432 return self.mindCallRemote('removeKeycardId', keycardId)
433
435 """
436 Expire a keycard issued to this component because the bouncer decided
437 to.
438
439 @type keycardId: str
440 """
441 return self.mindCallRemote('expireKeycard', keycardId)
442
443
445 """
446 Called by a component to tell the manager that it's shutting down
447 cleanly (and thus should go to sleeping, rather than lost or sad)
448 """
449 self.debug("shutdown is clean, shouldn't go to lost")
450 self._shutdown_requested = True
451
468
470 """
471 Expire a keycard (and thus the requester's connection)
472 issued to the given requester.
473
474 This is called by the bouncer component that authenticated the keycard.
475
476 @param requesterId: name (avatarId) of the component that originally
477 requested authentication for the given keycardId
478 @type requesterId: str
479 @param keycardId: id of keycard to expire
480 @type keycardId: str
481 """
482
483 if not self.heaven.hasAvatar(requesterId):
484 self.warning('asked to expire keycard %s for requester %s, '
485 'but no such component registered',
486 keycardId, requesterId)
487 raise errors.UnknownComponentError(requesterId)
488
489 return self.heaven.getAvatar(requesterId).expireKeycard(keycardId)
490
492 - def add(self, key, value):
493 if key not in self:
494 self[key] = []
495 self[key].append(value)
496
497 - def remove(self, key, value):
498 self[key].remove(value)
499 if not self[key]:
500 del self[key]
501
502 -class FeedMap(object, log.Loggable):
503 logName = 'feed-map'
505 self.avatars = {}
506 self._ordered_avatars = []
507 self._dirty = True
508 self._recalc()
509
511 assert avatar.avatarId not in self.avatars
512 self.avatars[avatar.avatarId] = avatar
513 self._ordered_avatars.append(avatar)
514 self._dirty = True
515
517
518
519 del self.avatars[avatar.avatarId]
520 self._ordered_avatars.remove(avatar)
521 self._dirty = True
522
523
524 return [(a, f) for a, f in self.feedDeps.pop(avatar, [])
525 if a.avatarId in self.avatars]
526
540
542 if not self._dirty:
543 return
544 self.feedersForEaters = ffe = {}
545 self.eatersForFeeders = eff = dictlist()
546 self.feeds = dictlist()
547 self.feedDeps = dictlist()
548
549 for comp in self._ordered_avatars:
550 for feederName in comp.getFeeders():
551 self.feeds.add(comp.getFullFeedId(feederName),
552 (comp, feederName))
553 for ffid, pair in comp.getVirtualFeeds().items():
554 self.feeds.add(ffid, pair)
555
556 for eater in self.avatars.values():
557 for pairs in eater.getEaters().values():
558 for feedId, eName in pairs:
559 feeder, fName = self._getFeederAvatar(eater, feedId)
560 if feeder:
561 ffe[eater.getFullFeedId(eName)] = (eName, feeder, fName)
562 eff.add(feeder.getFullFeedId(fName),
563 (fName, eater, eName))
564 else:
565 self.debug('eater %s waiting for feed %s to log in',
566 eater.getFeedId(eName), feedId)
567 self._dirty = False
568
570 """Get the set of feeds that this component is eating from,
571 keyed by eater alias.
572
573 @return: a list of (eaterAlias, feederAvatar, feedName) tuples
574 @rtype: list of (str, ComponentAvatar, str)
575 """
576 self._recalc()
577 ret = []
578 for tups in avatar.getEaters().values():
579 for feedId, alias in tups:
580 ffid = avatar.getFullFeedId(alias)
581 if ffid in self.feedersForEaters:
582 ret.append(self.feedersForEaters[ffid])
583 return ret
584
586 """Get the set of feeds that this component is eating from
587 for the given feedId.
588
589 @param avatar: the eater component
590 @type avatar: L{ComponentAvatar}
591 @param ffid: full feed id for which to return feeders
592 @type ffid: str
593 @return: a list of (eaterAlias, feederAvatar, feedName) tuples
594 @rtype: list of (str, L{ComponentAvatar}, str)
595 """
596 self._recalc()
597 ret = []
598 for feeder, feedName in self.feeds.get(ffid, []):
599 rffid = feeder.getFullFeedId(feedName)
600 eff = self.eatersForFeeders.get(rffid, [])
601 for fName, eater, eaterName in eff:
602 if eater == avatar:
603 ret.append((eaterName, feeder, feedName))
604 return ret
605
607 """Get the set of eaters that this component feeds, keyed by
608 feeder name.
609
610 @return: a list of (feederName, eaterAvatar, eaterAlias) tuples
611 @rtype: list of (str, ComponentAvatar, str)
612 """
613 self._recalc()
614 ret = []
615 for feedName in avatar.getFeeders():
616 ffid = avatar.getFullFeedId(feedName)
617 if ffid in self.eatersForFeeders:
618 ret.extend(self.eatersForFeeders[ffid])
619 return ret
620
622 """
623 I handle all registered components and provide L{ComponentAvatar}s
624 for them.
625 """
626
627 implements(interfaces.IHeaven)
628 avatarClass = ComponentAvatar
629
630 logCategory = 'comp-heaven'
631
636
637
646
654
656 master = avatar.getClockMaster()
657 if master:
658 if master == avatar.avatarId:
659 self.debug('Need for %r to provide a clock master',
660 master)
661 avatar.provideMasterClock()
662 else:
663 self.debug('Need to synchronize with clock master %r',
664 master)
665
666
667
668
669 m = self.vishnu.getComponentMapper(master)
670 if m and m.avatar:
671 clocking = m.avatar.clocking
672 if clocking:
673 host, port, base_time = clocking
674 avatar.setClocking(host, port, base_time)
675 else:
676 self.warning('%r should provide a clock master '
677 'but is not doing so', master)
678
679 else:
680 self.debug('clock master not logged in yet, will '
681 'set clocking later')
682
689
691 assert avatar.avatarId not in self.avatars
692 compsNeedingReconnect = self.feedMap.componentDetached(avatar)
693 if self.vishnu.running:
694 self.debug('will reconnect: %r', compsNeedingReconnect)
695
696
697 for comp, ffid in compsNeedingReconnect:
698 self._connectEaters(comp, ffid)
699
701 toHost = toAvatar.getClientAddress()
702 toPort = toAvatar.getFeedServerPort()
703
704
705
706
707
708
709
710 fromHost = fromAvatar.mind.broker.transport.getPeer().host
711 if fromHost == toHost:
712 toHost = '127.0.0.1'
713
714 return toHost, toPort
715
725
727
728 def always(otherComp):
729 return True
730 def never(otherComp):
731 return False
732 directions = [(self.feedMap.getFeedersForEaters,
733 always, 'eatFrom', 'feedTo'),
734 (self.feedMap.getEatersForFeeders,
735 never, 'feedTo', 'eatFrom')]
736
737 myComp = avatar
738 for getPeers, initiate, directMethod, reversedMethod in directions:
739 for myFeedName, otherComp, otherFeedName in getPeers(myComp):
740 if initiate(otherComp):
741
742 self._connectFeederToEater(myComp, myFeedName, otherComp,
743 otherFeedName, directMethod)
744 else:
745
746 self._connectFeederToEater(otherComp, otherFeedName,
747 myComp, myFeedName,
748 reversedMethod)
749
751
752 ffe = self.feedMap.getFeedersForEater(avatar, ffid)
753 for myFeedName, otherComp, otherFeedName in ffe:
754 self._connectFeederToEater(avatar, myFeedName, otherComp,
755 otherFeedName, 'eatFrom')
756