| Trees | Indices | Help |
|---|
|
|
1 # -*- Mode: Python; test-case-name: flumotion.test.test_worker_feed -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 # 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 6 # All rights reserved. 7 8 # This file may be distributed and/or modified under the terms of 9 # the GNU General Public License version 2 as published by 10 # the Free Software Foundation. 11 # This file is distributed without any warranty; without even the implied 12 # warranty of merchantability or fitness for a particular purpose. 13 # See "LICENSE.GPL" in the source distribution for more information. 14 15 # Licensees having purchased or holding a valid Flumotion Advanced 16 # Streaming Server license may use this file in accordance with the 17 # Flumotion Advanced Streaming Server Commercial License Agreement. 18 # See "LICENSE.Flumotion" in the source distribution for more information. 19 20 # Headers in this file shall remain intact. 21 22 """ 23 implementation of a PB Server through which other components can request 24 to eat from or feed to this worker's components. 25 """ 26 27 from twisted.internet import reactor 28 from twisted.spread import pb 29 from twisted.cred import portal 30 from zope.interface import implements 31 32 from flumotion.common import log, common 33 from flumotion.twisted import fdserver 34 from flumotion.twisted import portal as fportal 35 from flumotion.twisted import pb as fpb 36 37 __version__ = "$Rev: 6561 $" 38 3941 """ 42 I am the feed server. PHEAR 43 """ 44 45 implements(portal.IRealm) 46 47 logCategory = 'dispatcher' 489550 """ 51 @param brain: L{flumotion.worker.worker.WorkerBrain} 52 """ 53 self._brain = brain 54 self._tport = None 55 self.listen(bouncer, portNum)5658 if not self._tport: 59 self.warning('not listening!') 60 return 0 61 return self._tport.getHost().port6264 portal = fportal.BouncerPortal(self, bouncer) 65 factory = pb.PBServerFactory(portal, 66 unsafeTracebacks=unsafeTracebacks) 67 68 tport = reactor.listenWith(fdserver.PassableServerPort, portNum, 69 factory) 70 71 self._tport = tport 72 self.debug('Listening for feed requests on TCP port %d', 73 self.getPortNum())74 79 80 ### IRealm method82 avatar = FeedAvatar(self, avatarId, mind) 83 return (pb.IPerspective, avatar, 84 lambda: self.avatarLogout(avatar))85 88 89 ## proxy these to the brain 9297 """ 98 I am an avatar in a FeedServer for components that log in and request 99 to eat from or feed to one of my components. 100 101 My mind is a reference to a L{flumotion.component.feed.FeedMedium} 102 """ 103 logCategory = "feedavatar" 104 remoteLogName = "feedmedium" 105175107 """ 108 """ 109 fpb.Avatar.__init__(self, avatarId) 110 self._transport = None 111 self.feedServer = feedServer 112 self.avatarId = avatarId 113 self.setMind(mind)114116 """ 117 Called when the PB client wants us to send them the given feed. 118 """ 119 # the PB message needs to be sent from the side that has the feeder 120 # for proper switching, so we call back as a reply 121 d = self.mindCallRemote('sendFeedReply', fullFeedId) 122 d.addCallback(self._sendFeedReplyCb, fullFeedId)123125 # compare with startStreaming in prototype 126 # Remove this from the reactor; we mustn't read or write from it from 127 # here on 128 t = self.mind.broker.transport 129 t.stopReading() 130 t.stopWriting() 131 132 # hand off the fd to the component 133 self.debug("Attempting to send FD: %d", t.fileno()) 134 135 (flowName, componentName, feedName) = common.parseFullFeedId(fullFeedId) 136 componentId = common.componentId(flowName, componentName) 137 138 if self.feedServer.feedToFD(componentId, feedName, t.fileno(), 139 self.avatarId): 140 t.keepSocketAlive = True 141 142 # We removed the transport from the reactor before sending the 143 # FD; now we want the socket cleaned up. 144 t.loseConnection()145147 """ 148 Called when the PB client wants to send the given feedId to the 149 given component 150 """ 151 # we need to make sure our result goes back, so only stop reading 152 t = self.mind.broker.transport 153 t.stopReading() 154 reactor.callLater(0, self._doReceiveFeed, fullFeedId)155157 t = self.mind.broker.transport 158 159 self.debug('flushing PB write queue') 160 t.doWrite() 161 self.debug('stop writing to transport') 162 t.stopWriting() 163 164 # hand off the fd to the component 165 self.debug("Attempting to send FD: %d", t.fileno()) 166 167 (flowName, componentName, eaterAlias) = common.parseFullFeedId(fullFeedId) 168 componentId = common.componentId(flowName, componentName) 169 170 if self.feedServer.eatFromFD(componentId, eaterAlias, t.fileno(), 171 self.avatarId): 172 t.keepSocketAlive = True 173 174 t.loseConnection()
| Trees | Indices | Help |
|---|
| Generated by Epydoc 3.0.1 on Sat Jul 26 09:43:19 2008 | http://epydoc.sourceforge.net |