Package flumotion :: Package worker :: Module feed
[hide private]

Source Code for Module flumotion.worker.feed

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_worker_feed -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  implementation of a PB Server through which other components can request 
 24  to eat from or feed to this worker's components. 
 25  """ 
 26   
 27  from twisted.internet import reactor, defer, main 
 28  from twisted.cred import error 
 29  from twisted.python import components, failure, reflect 
 30  from twisted.spread import pb 
 31  from twisted.cred import portal 
 32   
 33  from flumotion.configure import configure 
 34  from flumotion.common import log, common, interfaces 
 35  from flumotion.twisted import checkers, compat 
 36  from flumotion.twisted import portal as fportal 
 37  from flumotion.twisted import pb as fpb 
 38  from flumotion.twisted.defer import defer_generator_method 
 39   
40 -class ProxyManagerBouncer(log.Loggable):
41 logCategory = "proxymanagerbouncer" 42 # should be set as soon as we can ask the manager's bouncer 43 keycardClasses = () 44 45 """ 46 I proxy authenticate calls to the manager's bouncer. 47 """
48 - def __init__(self, remote):
49 """ 50 @param remote: an object that has .callRemote() 51 """ 52 self._remote = remote
53
54 - def getKeycardClasses(self):
55 """ 56 Call me before asking me to authenticate, so I know what I can 57 authenticate. 58 """ 59 def getKeycardClassesCb(classes): 60 self.keycardClasses = [reflect.namedObject(n) for n in classes] 61 self.log('set proxied keycardClasses to %r', self.keycardClasses) 62 return classes
63 64 d = self._remote.callRemote('getKeycardClasses') 65 d.addCallback(getKeycardClassesCb) 66 return d
67
68 - def authenticate(self, keycard):
69 self.debug("Authenticating keycard %r against manager" % keycard) 70 return self._remote.callRemote('authenticate', None, keycard)
71
72 -class ProxyManagerBouncerPortal(fportal.BouncerPortal):
73 - def getKeycardClasses(self):
74 self.debug('proxy getting keycardclasses') 75 d = self.bouncer.getKeycardClasses() 76 return d
77
78 -class FeedAvatar(fpb.Avatar):
79 """ 80 I am an avatar in a FeedServer for components that log in and request 81 to eat from or feed to one of my components. 82 83 My mind is a reference to a L{FeedMedium} 84 """ 85 logCategory = "feedavatar" 86 remoteLogName = "feedmedium" 87
88 - def __init__(self, feedServerParent, avatarId):
89 """ 90 @param feedServerParent: the parent of the feed server 91 @type feedServerParent: implementor of 92 L{interfaces.IFeedServerParent} 93 """ 94 self._transport = None 95 self._feedServerParent = feedServerParent 96 self.avatarId = avatarId
97
98 - def attached(self, mind):
99 self.debug("mind %s attached" % mind) 100 self._mind = mind
101
102 - def detached(self):
103 self.debug("mind %s detached" % self._mind) 104 self._mind = None
105
106 - def perspective_sendFeed(self, fullFeedId):
107 """ 108 Called when the PB client wants us to send them the given feed. 109 """ 110 # the PB message needs to be sent from the side that has the feeder 111 # for proper switching, so we call back as a reply 112 d = self._mind.callRemote('sendFeedReply', fullFeedId) 113 d.addCallback(self._sendFeedReplyCb, fullFeedId)
114
115 - def _sendFeedReplyCb(self, result, fullFeedId):
116 # compare with startStreaming in prototype 117 # Remove this from the reactor; we mustn't read or write from it from 118 # here on 119 t = self._mind.broker.transport 120 t.stopReading() 121 t.stopWriting() 122 123 # hand off the fd to the component 124 self.debug("Attempting to send FD: %d" % t.fileno()) 125 126 (flowName, componentName, feedName) = common.parseFullFeedId(fullFeedId) 127 componentId = common.componentId(flowName, componentName) 128 129 if self._feedServerParent.feedToFD(componentId, feedName, 130 t.fileno(), self.avatarId): 131 t.keepSocketAlive = True 132 133 # We removed the transport from the reactor before sending the FD; now 134 # we want a complete and immediate cleanup of the socket, which 135 # loseConnection() doesn't do. 136 t.connectionLost(failure.Failure(main.CONNECTION_DONE))
137 138 # TODO: receiveFeed is bitrotten. Clean it up.
139 - def perspective_receiveFeed(self, componentId, feedId):
140 """ 141 Called when the PB client wants to send the given feedId to the 142 given component 143 """ 144 # we need to make sure our result goes back, so only stop reading 145 t = self._mind.broker.transport 146 t.stopReading() 147 reactor.callLater(0, self._doReceiveFeed, componentId, feedId)
148 149 # FIXME: receiveFeed is broken and this method below will 150 # probably leak fds. Fix before using.
151 - def _doReceiveFeed(self, componentId, feedId):
152 t = self._mind.broker.transport 153 self.debug('flushing PB write queue') 154 t.doWrite() 155 self.debug('stop writing to transport') 156 t.stopWriting() 157 # this keeps a ref around, so the socket will not get closed 158 self._transport = t 159 self._mind.broker.transport = None 160 161 # pass the fd to the component to eat from 162 fd = t.fileno() 163 self.debug('telling component %s to eat feedId %s from fd %d' % ( 164 componentId, feedId, fd)) 165 if not self._feedServerParent.eatFromFD(componentId, feedId, fd): 166 self.debug("unsuccessful request to eatFromFD.")
167 168 # an internal class; used by the worker to create avatars for Feed clients
169 -class _WorkerFeedDispatcher(log.Loggable):
170 """ 171 I implement L{portal.IRealm}. 172 I make sure that when a L{pb.Avatar} is requested through me, the 173 Avatar being returned knows about the mind (client) requesting 174 the Avatar. 175 """ 176 177 __implements__ = portal.IRealm 178 179 logCategory = 'dispatcher' 180
181 - def __init__(self, brain):
182 """ 183 @param brain: L{flumotion.worker.worker.WorkerBrain} 184 """ 185 self._brain = brain
186 187 ### IRealm methods 188
189 - def requestAvatar(self, avatarId, keycard, mind, *ifaces):
190 avatar = FeedAvatar(self._brain, avatarId) 191 # schedule a perspective attached for after this function 192 # FIXME: there needs to be a way to not have to do a callLater 193 # blindly so cleanup can be guaranteed 194 dc = reactor.callLater(0, avatar.attached, mind) 195 196 # FIXME FIXME: Need to fix #566; this is a workaround for the 197 # race condition until a proper cleanup can be fully tested 198 def logout(): 199 if dc.active(): 200 dc.cancel() 201 else: 202 avatar.detached()
203 204 return (pb.IPerspective, avatar, logout)
205
206 -def feedServerFactory(brain, unsafeTracebacks=0):
207 """ 208 Create and return an FPB server factory. 209 210 @param brain: L{flumotion.worker.worker.WorkerBrain} 211 """ 212 # create a Dispatcher which will hand out avatars to clients 213 # connecting to me 214 dispatcher = _WorkerFeedDispatcher(brain) 215 216 # create a portal so that I can be connected to, through our dispatcher 217 # implementing the IRealm and a bouncer 218 # FIXME: decide if we allow anonymous login in this small (?) window 219 bouncer = ProxyManagerBouncer(brain) 220 portal = ProxyManagerBouncerPortal(dispatcher, bouncer) 221 #unsafeTracebacks = 1 # for debugging tracebacks to clients 222 factory = pb.PBServerFactory(portal, unsafeTracebacks=unsafeTracebacks) 223 return factory
224
225 -class FeedClientFactory(fpb.FPBClientFactory, log.Loggable):
226 """ 227 I am a client factory used by a feed component's medium to log into 228 a worker and exchange feeds. 229 """ 230 logCategory = 'feedclient' 231 perspectiveInterface = interfaces.IFeedMedium 232
233 - def __init__(self, medium):
234 fpb.FPBClientFactory.__init__(self) 235 self.medium = medium
236 237 # not a BaseMedium because we are going to do strange things to the transport
238 -class FeedMedium(fpb.Referenceable):
239 """ 240 I am a client for a Feed Server. 241 242 I am used as the remote interface between a component and another 243 component. 244 245 @ivar component: the component this is a feed client for 246 @type component: L{flumotion.component.feedcomponent.FeedComponent} 247 @ivar remote: a reference to a L{FeedAvatar} 248 @type remote: L{twisted.spread.pb.RemoteReference} 249 """ 250 logCategory = 'feedmedium' 251 remoteLogName = 'feedserver' 252 compat.implements(interfaces.IFeedMedium) 253 254 remote = None 255
256 - def __init__(self, component):
257 """ 258 @param component: the component this is a feed client for 259 @type component: L{flumotion.component.feedcomponent.FeedComponent} 260 """ 261 self.component = component 262 self.logName = component.name 263 self._transports = {} # fullFeedId -> transport
264 265 ### IMedium methods
266 - def setRemoteReference(self, remoteReference):
267 self.remote = remoteReference
268
269 - def hasRemoteReference(self):
270 return self.remote is not None
271
272 - def callRemote(self, name, *args, **kwargs):
273 return self.remote.callRemote(name, args, kwargs)
274
275 - def remote_sendFeedReply(self, fullFeedId):
276 t = self.remote.broker.transport 277 # make sure we stop receiving PB messages 278 self.debug('stop reading from transport') 279 t.stopReading() 280 reactor.callLater(0, self._doFeedTo, fullFeedId, t)
281
282 - def _doFeedTo(self, fullFeedId, t):
283 self.debug('flushing PB write queue') 284 t.doWrite() 285 self.debug('stop writing to transport') 286 t.stopWriting() 287 # store the transport so a ref to the socket is kept around. If we 288 # get reconnected, this'll be overwritten, and the socket will be 289 # collected, and closed 290 self._transports[fullFeedId] = t 291 self.remote.broker.transport = None 292 # pass the fd to the component to eat from 293 fd = t.fileno() 294 self.debug('telling component to eat from fd %d' % fd) 295 (flowName, componentName, feedName) = common.parseFullFeedId(fullFeedId) 296 self.component.eatFromFD(common.feedId(componentName, feedName), fd)
297