Package flumotion :: Package manager :: Module component
[hide private]

Source Code for Module flumotion.manager.component

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_component -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  manager-side objects for components 
 24   
 25  API Stability: semi-stable 
 26  """ 
 27   
 28  import time 
 29   
 30  from twisted.spread import pb 
 31  from twisted.internet import reactor, defer 
 32  from twisted.internet import error as terror 
 33  from twisted.python.failure import Failure 
 34  from zope.interface import implements 
 35   
 36  from flumotion.configure import configure 
 37  from flumotion.manager import base, config 
 38  from flumotion.common import errors, interfaces, keycards, log, planet 
 39  from flumotion.common import messages, common 
 40  from flumotion.common.i18n import N_, gettexter 
 41  from flumotion.common.planet import moods 
 42  from flumotion.twisted import flavors 
 43   
 44  __version__ = "$Rev: 6947 $" 
 45  T_ = gettexter() 
 46   
 47   
48 -class ComponentAvatar(base.ManagerAvatar):
49 """ 50 I am a Manager-side avatar for a component. 51 I live in the L{ComponentHeaven}. 52 53 Each component that logs in to the manager gets an avatar created for it 54 in the manager. 55 56 @cvar avatarId: the L{componentId<common.componentId>} 57 @type avatarId: str 58 @cvar jobState: job state of this avatar's component 59 @type jobState: L{flumotion.common.planet.ManagerJobState} 60 @cvar componentState: component state of this avatar's component 61 @type componentState: L{flumotion.common.planet.ManagerComponentState} 62 """ 63 64 logCategory = 'comp-avatar' 65
66 - def __init__(self, heaven, avatarId, remoteIdentity, mind, conf, 67 jobState, clocking):
68 # doc in base class 69 base.ManagerAvatar.__init__(self, heaven, avatarId, 70 remoteIdentity, mind) 71 72 self.jobState = jobState 73 self.makeComponentState(conf) 74 self.clocking = clocking 75 76 self._shutdown_requested = False 77 self._shutdownDeferred = None 78 79 self.vishnu.registerComponent(self) 80 # calllater to allow the component a chance to receive its 81 # avatar, so that it has set medium.remote 82 reactor.callLater(0, self.heaven.componentAttached, self)
83 84 ### python methods
85 - def __repr__(self):
86 mood = '(unknown)' 87 if self.componentState: 88 moodValue = self.componentState.get('mood') 89 if moodValue is not None: 90 mood = moods.get(moodValue).name 91 return '<%s %s (mood %s)>' % (self.__class__.__name__, 92 self.avatarId, mood)
93 94 ### ComponentAvatar methods
95 - def makeAvatarInitArgs(klass, heaven, avatarId, remoteIdentity, 96 mind):
97 def gotStates(result): 98 (_s1, conf), (_s2, jobState), (_s3, clocking) = result 99 assert _s1 and _s2 and _s3 # fireOnErrback=1 100 log.debug('component-avatar', 'got state information') 101 return (heaven, avatarId, remoteIdentity, mind, 102 conf, jobState, clocking)
103 log.debug('component-avatar', 'calling mind for state information') 104 d = defer.DeferredList([mind.callRemote('getConfig'), 105 mind.callRemote('getState'), 106 mind.callRemote('getMasterClockInfo')], 107 fireOnOneErrback=True) 108 d.addCallback(gotStates) 109 return d
110 makeAvatarInitArgs = classmethod(makeAvatarInitArgs) 111
112 - def onShutdown(self):
113 # doc in base class 114 self.info('component "%s" logged out', self.avatarId) 115 116 self.vishnu.unregisterComponent(self) 117 118 if self.clocking: 119 ip, port, base_time = self.clocking 120 self.vishnu.releasePortsOnWorker(self.getWorkerName(), 121 [port]) 122 123 self.componentState.clearJobState(self._shutdown_requested) 124 125 # FIXME: why? 126 self.componentState.set('moodPending', None) 127 128 self.componentState = None 129 self.jobState = None 130 131 self.heaven.componentDetached(self) 132 133 if self._shutdownDeferred: 134 reactor.callLater(0, self._shutdownDeferred.callback, True) 135 self._shutdownDeferred = None 136 137 base.ManagerAvatar.onShutdown(self)
138 139 # my methods
140 - def addMessage(self, level, id, format, *args, **kwargs):
141 """ 142 Convenience message to construct a message and add it to the 143 component state. `format' should be marked as translatable in 144 the source with N_, and *args will be stored as format 145 arguments. Keyword arguments are passed on to the message 146 constructor. See L{flumotion.common.messages.Message} for the 147 meanings of the rest of the arguments. 148 149 For example:: 150 151 self.addMessage(messages.WARNING, 'foo-warning', 152 N_('The answer is %d'), 42, debug='not really') 153 """ 154 self.addMessageObject(messages.Message(level, 155 T_(format, *args), 156 id=id, **kwargs))
157
158 - def addMessageObject(self, message):
159 """ 160 Add a message to the planet state. 161 162 @type message: L{flumotion.common.messages.Message} 163 """ 164 self.componentState.append('messages', message)
165
166 - def upgradeConfig(self, state, conf):
167 # different from conf['version'], eh... 168 version = conf.get('config-version', 0) 169 while version < config.CURRENT_VERSION: 170 try: 171 config.UPGRADERS[version](conf) 172 version += 1 173 conf['config-version'] = version 174 except Exception, e: 175 self.addMessage(messages.WARNING, 176 'upgrade-%d' % version, 177 N_("Failed to upgrade config %r " 178 "from version %d. Please file " 179 "a bug."), conf, version, 180 debug=log.getExceptionMessage(e)) 181 return
182
183 - def makeComponentState(self, conf):
184 # the component just logged in with good credentials. we fetched 185 # its config and job state. now there are two possibilities: 186 # (1) we were waiting for such a component to start. There was 187 # a ManagerComponentState and an avatarId in the 188 # componentMappers waiting for us. 189 # (2) we don't know anything about this component, but it has a 190 # state and config. We deal with it, creating all the 191 # neccesary internal state. 192 def verifyExistingComponentState(conf, state): 193 # condition (1) 194 state.setJobState(self.jobState) 195 self.componentState = state 196 197 self.upgradeConfig(state, conf) 198 if state.get('config') != conf: 199 diff = config.dictDiff(state.get('config'), conf) 200 diffMsg = config.dictDiffMessageString(diff, 201 'internal conf', 202 'running conf') 203 self.addMessage(messages.WARNING, 'stale-config', 204 N_("Component logged in with stale " 205 "configuration. To fix this, stop " 206 "this component and then restart " 207 "the manager."), 208 debug=("Updating internal conf from " 209 "running conf:\n" + diffMsg)) 210 self.warning('updating internal component state for %r', 211 state) 212 self.debug('changes to conf: %s', 213 config.dictDiffMessageString(diff)) 214 state.set('config', conf)
215 216 def makeNewComponentState(conf): 217 # condition (2) 218 state = planet.ManagerComponentState() 219 state.setJobState(self.jobState) 220 self.componentState = state 221 222 self.upgradeConfig(state, conf) 223 224 flowName, compName = conf['parent'], conf['name'] 225 226 state.set('name', compName) 227 state.set('type', conf['type']) 228 state.set('workerRequested', self.jobState.get('workerName')) 229 state.set('config', conf) 230 self.vishnu.addComponentToFlow(state, flowName) 231 return state 232 233 mState = self.vishnu.getManagerComponentState(self.avatarId) 234 if mState: 235 verifyExistingComponentState(conf, mState) 236 else: 237 makeNewComponentState(conf) 238
239 - def provideMasterClock(self):
240 """ 241 Tell the component to provide a master clock. 242 243 @rtype: L{twisted.internet.defer.Deferred} 244 """ 245 def success(clocking): 246 self.clocking = clocking 247 self.heaven.masterClockAvailable(self.avatarId, clocking)
248 249 def error(failure): 250 self.addMessage(messages.WARNING, 'provide-master-clock', 251 N_('Failed to provide the master clock'), 252 debug=log.getFailureMessage(failure)) 253 self.vishnu.releasePortsOnWorker(self.getWorkerName(), [port]) 254 255 if self.clocking: 256 self.heaven.masterClockAvailable(self.avatarId, self.clocking) 257 else: 258 (port,) = self.vishnu.reservePortsOnWorker(self.getWorkerName(), 1) 259 self.debug('provideMasterClock on port %d', port) 260 261 d = self.mindCallRemote('provideMasterClock', port) 262 d.addCallbacks(success, error) 263
264 - def getFeedServerPort(self):
265 """ 266 Returns the port on which a feed server for this component is 267 listening on. 268 269 @rtype: int 270 """ 271 return self.vishnu.getWorkerFeedServerPort(self.getWorkerName())
272
273 - def getRemoteManagerIP(self):
274 """ 275 Get the IP address of the manager as seen by the component. 276 277 @rtype: str 278 """ 279 return self.jobState.get('manager-ip')
280
281 - def getWorkerName(self):
282 """ 283 Return the name of the worker. 284 285 @rtype: str 286 """ 287 return self.jobState.get('workerName')
288
289 - def getPid(self):
290 """ 291 Return the PID of the component. 292 293 @rtype: int 294 """ 295 return self.jobState.get('pid')
296
297 - def getName(self):
298 """ 299 Get the name of the component. 300 301 @rtype: str 302 """ 303 return self.componentState.get('name')
304
305 - def getParentName(self):
306 """ 307 Get the name of the component's parent. 308 309 @rtype: str 310 """ 311 return self.componentState.get('parent').get('name')
312
313 - def getType(self):
314 """ 315 Get the component type name of the component. 316 317 @rtype: str 318 """ 319 return self.componentState.get('type')
320
321 - def getEaters(self):
322 """ 323 Get the set of eaters that this component eats from. 324 325 @rtype: dict of eaterName -> [(feedId, eaterAlias)] 326 """ 327 return self.componentState.get('config').get('eater', {})
328
329 - def getFeeders(self):
330 """ 331 Get the list of feeders that this component provides. 332 333 @rtype: list of feederName 334 """ 335 return self.componentState.get('config').get('feed', [])
336
337 - def getFeedId(self, feedName):
338 """ 339 Get the feedId of a feed provided or consumed by this component. 340 341 @param feedName: The name of the feed (i.e., eater alias or 342 feeder name) 343 @rtype: L{flumotion.common.common.feedId} 344 """ 345 return common.feedId(self.getName(), feedName)
346
347 - def getFullFeedId(self, feedName):
348 """ 349 Get the full feedId of a feed provided or consumed by this 350 component. 351 352 @param feedName: The name of the feed (i.e., eater alias or 353 feeder name) 354 @rtype: L{flumotion.common.common.fullFeedId} 355 """ 356 return common.fullFeedId(self.getParentName(), self.getName(), feedName)
357
358 - def getVirtualFeeds(self):
359 """ 360 Get the set of virtual feeds provided by this component. 361 362 @rtype: dict of fullFeedId -> (ComponentAvatar, feederName) 363 """ 364 conf = self.componentState.get('config') 365 ret = {} 366 for feedId, feederName in conf.get('virtual-feeds', {}).items(): 367 vComp, vFeed = common.parseFeedId(feedId) 368 ffid = common.fullFeedId(self.getParentName(), vComp, vFeed) 369 ret[ffid] = (self, feederName) 370 return ret
371
372 - def getWorker(self):
373 """ 374 Get the worker that this component should run on. 375 376 @rtype: str 377 """ 378 return self.componentState.get('workerRequested')
379
380 - def getClockMaster(self):
381 """ 382 Get this component's clock master, if any. 383 384 @rtype: avatarId or None 385 """ 386 return self.componentState.get('config')['clock-master']
387
388 - def stop(self):
389 """ 390 Tell the remote component to shut down. 391 """ 392 self._shutdownDeferred = defer.Deferred() 393 394 self.mindCallRemote('stop') 395 396 return self._shutdownDeferred
397
398 - def setClocking(self, host, port, base_time):
399 # setMood on error? 400 return self.mindCallRemote('setMasterClock', host, port, base_time)
401
402 - def eatFrom(self, eaterAlias, fullFeedId, host, port):
403 self.debug('connecting eater %s to feed %s', eaterAlias, fullFeedId) 404 return self.mindCallRemote('eatFrom', eaterAlias, fullFeedId, 405 host, port)
406
407 - def feedTo(self, feederName, fullFeedId, host, port):
408 self.debug('connecting feeder %s to feed %s', feederName, fullFeedId) 409 return self.mindCallRemote('feedTo', feederName, fullFeedId, 410 host, port)
411 412 # FIXME: maybe make a BouncerComponentAvatar subclass ?
413 - def authenticate(self, keycard):
414 """ 415 Authenticate the given keycard. 416 Gets proxied to L{flumotion.component.bouncers.bouncer.""" \ 417 """BouncerMedium.remote_authenticate} 418 The component should be a subclass of 419 L{flumotion.component.bouncers.bouncer.Bouncer} 420 421 @type keycard: L{flumotion.common.keycards.Keycard} 422 """ 423 return self.mindCallRemote('authenticate', keycard)
424
425 - def removeKeycardId(self, keycardId):
426 """ 427 Remove a keycard managed by this bouncer because the requester 428 has gone. 429 430 @type keycardId: str 431 """ 432 return self.mindCallRemote('removeKeycardId', keycardId)
433
434 - def expireKeycard(self, keycardId):
435 """ 436 Expire a keycard issued to this component because the bouncer decided 437 to. 438 439 @type keycardId: str 440 """ 441 return self.mindCallRemote('expireKeycard', keycardId)
442 443 ### IPerspective methods, called by the worker's component
444 - def perspective_cleanShutdown(self):
445 """ 446 Called by a component to tell the manager that it's shutting down 447 cleanly (and thus should go to sleeping, rather than lost or sad) 448 """ 449 self.debug("shutdown is clean, shouldn't go to lost") 450 self._shutdown_requested = True
451
452 - def perspective_removeKeycardId(self, bouncerName, keycardId):
453 """ 454 Remove a keycard on the given bouncer on behalf of a component's medium. 455 456 This is requested by a component that created the keycard. 457 458 @type bouncerName: str 459 @param keycardId: id of keycard to remove 460 @type keycardId: str 461 """ 462 avatarId = common.componentId('atmosphere', bouncerName) 463 if not self.heaven.hasAvatar(avatarId): 464 self.warning('No bouncer with id %s registered', avatarId) 465 raise errors.UnknownComponentError(avatarId) 466 467 return self.heaven.getAvatar(avatarId).removeKeycardId(keycardId)
468
469 - def perspective_expireKeycard(self, requesterId, keycardId):
470 """ 471 Expire a keycard (and thus the requester's connection) 472 issued to the given requester. 473 474 This is called by the bouncer component that authenticated the keycard. 475 476 @param requesterId: name (avatarId) of the component that originally 477 requested authentication for the given keycardId 478 @type requesterId: str 479 @param keycardId: id of keycard to expire 480 @type keycardId: str 481 """ 482 # FIXME: we should also be able to expire manager bouncer keycards 483 if not self.heaven.hasAvatar(requesterId): 484 self.warning('asked to expire keycard %s for requester %s, ' 485 'but no such component registered', 486 keycardId, requesterId) 487 raise errors.UnknownComponentError(requesterId) 488 489 return self.heaven.getAvatar(requesterId).expireKeycard(keycardId)
490
491 -class dictlist(dict):
492 - def add(self, key, value):
493 if key not in self: 494 self[key] = [] 495 self[key].append(value)
496
497 - def remove(self, key, value):
498 self[key].remove(value) 499 if not self[key]: 500 del self[key]
501
502 -class FeedMap(object, log.Loggable):
503 logName = 'feed-map'
504 - def __init__(self):
505 self.avatars = {} 506 self._ordered_avatars = [] 507 self._dirty = True 508 self._recalc()
509
510 - def componentAttached(self, avatar):
511 assert avatar.avatarId not in self.avatars 512 self.avatars[avatar.avatarId] = avatar 513 self._ordered_avatars.append(avatar) 514 self._dirty = True
515
516 - def componentDetached(self, avatar):
517 # returns the a list of other components that will need to be 518 # reconnected 519 del self.avatars[avatar.avatarId] 520 self._ordered_avatars.remove(avatar) 521 self._dirty = True 522 # NB, feedDeps is dirty. Scrub it of avatars that have logged 523 # out 524 return [(a, f) for a, f in self.feedDeps.pop(avatar, []) 525 if a.avatarId in self.avatars]
526
527 - def _getFeederAvatar(self, eater, feedId):
528 # FIXME: 'get' part is confusing - this methods _modifies_ structures! 529 flowName = eater.getParentName() 530 compName, feedName = common.parseFeedId(feedId) 531 ffid = common.fullFeedId(flowName, compName, feedName) 532 feeder = None 533 if ffid in self.feeds: 534 feeder, feedName = self.feeds[ffid][0] 535 self.feedDeps.add(feeder, (eater, ffid)) 536 if feeder.getFeedId(feedName) != feedId: 537 self.debug('chose %s for feed %s', 538 feeder.getFeedId(feedName), feedId) 539 return feeder, feedName
540
541 - def _recalc(self):
542 if not self._dirty: 543 return 544 self.feedersForEaters = ffe = {} 545 self.eatersForFeeders = eff = dictlist() 546 self.feeds = dictlist() 547 self.feedDeps = dictlist() 548 549 for comp in self._ordered_avatars: 550 for feederName in comp.getFeeders(): 551 self.feeds.add(comp.getFullFeedId(feederName), 552 (comp, feederName)) 553 for ffid, pair in comp.getVirtualFeeds().items(): 554 self.feeds.add(ffid, pair) 555 556 for eater in self.avatars.values(): 557 for pairs in eater.getEaters().values(): 558 for feedId, eName in pairs: 559 feeder, fName = self._getFeederAvatar(eater, feedId) 560 if feeder: 561 ffe[eater.getFullFeedId(eName)] = (eName, feeder, fName) 562 eff.add(feeder.getFullFeedId(fName), 563 (fName, eater, eName)) 564 else: 565 self.debug('eater %s waiting for feed %s to log in', 566 eater.getFeedId(eName), feedId) 567 self._dirty = False
568
569 - def getFeedersForEaters(self, avatar):
570 """Get the set of feeds that this component is eating from, 571 keyed by eater alias. 572 573 @return: a list of (eaterAlias, feederAvatar, feedName) tuples 574 @rtype: list of (str, ComponentAvatar, str) 575 """ 576 self._recalc() 577 ret = [] 578 for tups in avatar.getEaters().values(): 579 for feedId, alias in tups: 580 ffid = avatar.getFullFeedId(alias) 581 if ffid in self.feedersForEaters: 582 ret.append(self.feedersForEaters[ffid]) 583 return ret
584
585 - def getFeedersForEater(self, avatar, ffid):
586 """Get the set of feeds that this component is eating from 587 for the given feedId. 588 589 @param avatar: the eater component 590 @type avatar: L{ComponentAvatar} 591 @param ffid: full feed id for which to return feeders 592 @type ffid: str 593 @return: a list of (eaterAlias, feederAvatar, feedName) tuples 594 @rtype: list of (str, L{ComponentAvatar}, str) 595 """ 596 self._recalc() 597 ret = [] 598 for feeder, feedName in self.feeds.get(ffid, []): 599 rffid = feeder.getFullFeedId(feedName) 600 eff = self.eatersForFeeders.get(rffid, []) 601 for fName, eater, eaterName in eff: 602 if eater == avatar: 603 ret.append((eaterName, feeder, feedName)) 604 return ret
605
606 - def getEatersForFeeders(self, avatar):
607 """Get the set of eaters that this component feeds, keyed by 608 feeder name. 609 610 @return: a list of (feederName, eaterAvatar, eaterAlias) tuples 611 @rtype: list of (str, ComponentAvatar, str) 612 """ 613 self._recalc() 614 ret = [] 615 for feedName in avatar.getFeeders(): 616 ffid = avatar.getFullFeedId(feedName) 617 if ffid in self.eatersForFeeders: 618 ret.extend(self.eatersForFeeders[ffid]) 619 return ret
620
621 -class ComponentHeaven(base.ManagerHeaven):
622 """ 623 I handle all registered components and provide L{ComponentAvatar}s 624 for them. 625 """ 626 627 implements(interfaces.IHeaven) 628 avatarClass = ComponentAvatar 629 630 logCategory = 'comp-heaven' 631
632 - def __init__(self, vishnu):
633 # doc in base class 634 base.ManagerHeaven.__init__(self, vishnu) 635 self.feedMap = FeedMap()
636 637 ### our methods
638 - def feedServerAvailable(self, workerName):
639 self.debug('feed server %s logged in, we can connect to its port', 640 workerName) 641 # can be made more efficient 642 for avatar in self.avatars.values(): 643 if avatar.getWorkerName() == workerName: 644 self._setupClocking(avatar) 645 self._connectEatersAndFeeders(avatar)
646
647 - def masterClockAvailable(self, avatarId, clocking):
648 self.debug('master clock for %r provided on %r', avatarId, 649 clocking) 650 # can be made more efficient 651 for avatar in self.avatars.values(): 652 if avatar.avatarId != avatarId: 653 self._setupClocking(avatar)
654
655 - def _setupClocking(self, avatar):
656 master = avatar.getClockMaster() 657 if master: 658 if master == avatar.avatarId: 659 self.debug('Need for %r to provide a clock master', 660 master) 661 avatar.provideMasterClock() 662 else: 663 self.debug('Need to synchronize with clock master %r', 664 master) 665 # if master in self.avatars would be natural, but it seems 666 # that for now due to the getClocking() calls etc we need to 667 # check against the componentMapper set. could (and probably 668 # should) be fixed in the future. 669 m = self.vishnu.getComponentMapper(master) 670 if m and m.avatar: 671 clocking = m.avatar.clocking 672 if clocking: 673 host, port, base_time = clocking 674 avatar.setClocking(host, port, base_time) 675 else: 676 self.warning('%r should provide a clock master ' 677 'but is not doing so', master) 678 # should we componentAvatar.provideMasterClock() ? 679 else: 680 self.debug('clock master not logged in yet, will ' 681 'set clocking later')
682
683 - def componentAttached(self, avatar):
684 # No need to wait for any of this, they are not interdependent 685 assert avatar.avatarId in self.avatars 686 self.feedMap.componentAttached(avatar) 687 self._setupClocking(avatar) 688 self._connectEatersAndFeeders(avatar)
689
690 - def componentDetached(self, avatar):
691 assert avatar.avatarId not in self.avatars 692 compsNeedingReconnect = self.feedMap.componentDetached(avatar) 693 if self.vishnu.running: 694 self.debug('will reconnect: %r', compsNeedingReconnect) 695 # FIXME: this will need revision when we have the 'feedTo' 696 # direction working 697 for comp, ffid in compsNeedingReconnect: 698 self._connectEaters(comp, ffid)
699
700 - def mapNetFeed(self, fromAvatar, toAvatar):
701 toHost = toAvatar.getClientAddress() 702 toPort = toAvatar.getFeedServerPort() # can be None 703 704 # FIXME: until network map is implemented, hack to assume that 705 # connections from what appears to us to be the same IP go 706 # through localhost instead. Allows connections between 707 # components on a worker behind a firewall, but not between 708 # components running on different workers, both behind a 709 # firewall 710 fromHost = fromAvatar.mind.broker.transport.getPeer().host 711 if fromHost == toHost: 712 toHost = '127.0.0.1' 713 714 return toHost, toPort
715
716 - def _connectFeederToEater(self, fromComp, fromFeed, toComp, toFeed, method):
717 host, port = self.mapNetFeed(fromComp, toComp) 718 if port: 719 fullFeedId = toComp.getFullFeedId(toFeed) 720 proc = getattr(fromComp, method) 721 proc(fromFeed, fullFeedId, host, port) 722 else: 723 self.debug('postponing connection to %s: feed server ' 724 'unavailable', toComp.getFeedId(toFeed))
725
726 - def _connectEatersAndFeeders(self, avatar):
727 # FIXME: all connections are upstream for now 728 def always(otherComp): 729 return True
730 def never(otherComp): 731 return False
732 directions = [(self.feedMap.getFeedersForEaters, 733 always, 'eatFrom', 'feedTo'), 734 (self.feedMap.getEatersForFeeders, 735 never, 'feedTo', 'eatFrom')] 736 737 myComp = avatar 738 for getPeers, initiate, directMethod, reversedMethod in directions: 739 for myFeedName, otherComp, otherFeedName in getPeers(myComp): 740 if initiate(otherComp): 741 # we initiate the connection 742 self._connectFeederToEater(myComp, myFeedName, otherComp, 743 otherFeedName, directMethod) 744 else: 745 # make the other component initiate connection 746 self._connectFeederToEater(otherComp, otherFeedName, 747 myComp, myFeedName, 748 reversedMethod) 749
750 - def _connectEaters(self, avatar, ffid):
751 # FIXME: all connections are upstream for now 752 ffe = self.feedMap.getFeedersForEater(avatar, ffid) 753 for myFeedName, otherComp, otherFeedName in ffe: 754 self._connectFeederToEater(avatar, myFeedName, otherComp, 755 otherFeedName, 'eatFrom')
756