Package flumotion :: Package component :: Module feed
[hide private]

Source Code for Module flumotion.component.feed

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_worker_feed -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  implementation of a PB Client to interface with feedserver.py 
 24  """ 
 25   
 26  import socket 
 27  import os 
 28   
 29  from twisted.internet import reactor, main, defer, tcp 
 30  from twisted.python import failure 
 31  from zope.interface import implements 
 32   
 33  from flumotion.common import log, common, interfaces 
 34  from flumotion.twisted import pb as fpb 
 35   
 36  __version__ = "$Rev: 6561 $" 
 37   
 38   
 39  # copied from fdserver.py so that it can be bundled 
40 -class _SocketMaybeCloser(tcp._SocketCloser):
41 keepSocketAlive = False 42
43 - def _closeSocket(self):
44 # We override this (from tcp._SocketCloser) so that we can close sockets 45 # properly in the normal case, but once we've passed our socket on via 46 # the FD-channel, we just close() it (not calling shutdown() which will 47 # close the TCP channel without closing the FD itself) 48 if self.keepSocketAlive: 49 try: 50 self.socket.close() 51 except socket.error: 52 pass 53 else: 54 tcp._SocketCloser._closeSocket(self)
55
56 -class PassableClientConnection(_SocketMaybeCloser, tcp.Client):
57 pass
58
59 -class PassableClientConnector(tcp.Connector):
60 # It is unfortunate, but it seems that either we override this 61 # private-ish method or reimplement BaseConnector.connect(). This is 62 # the path that tcp.py takes, so we take it too.
63 - def _makeTransport(self):
64 return PassableClientConnection(self.host, self.port, 65 self.bindAddress, self, 66 self.reactor)
67
68 -class FeedClientFactory(fpb.FPBClientFactory, log.Loggable):
69 """ 70 I am a client factory used by a feed component's medium to log into 71 a worker and exchange feeds. 72 """ 73 logCategory = 'feedclient' 74 perspectiveInterface = interfaces.IFeedMedium 75
76 - def __init__(self, medium):
77 fpb.FPBClientFactory.__init__(self) 78 self.medium = medium
79 80 # not a BaseMedium because we are going to do strange things to the transport
81 -class FeedMedium(fpb.Referenceable):
82 """ 83 I am a client for a Feed Server. 84 85 I am used as the remote interface between a component and another 86 component. 87 88 @ivar component: the component this is a feed client for 89 @type component: L{flumotion.component.feedcomponent.FeedComponent} 90 @ivar remote: a reference to a 91 L{flumotion.worker.feedserver.FeedAvatar} 92 @type remote: L{twisted.spread.pb.RemoteReference} 93 """ 94 logCategory = 'feedmedium' 95 remoteLogName = 'feedserver' 96 implements(interfaces.IFeedMedium) 97 98 remote = None 99
100 - def __init__(self, logName=None):
101 if logName: 102 assert isinstance(logName, str) 103 self.logName = logName 104 self._factory = None 105 self._feedToDeferred = defer.Deferred()
106
107 - def startConnecting(self, host, port, authenticator, timeout=30, 108 bindAddress=None):
109 """Optional helper method to connect to a remote feed server. 110 111 This method starts a client factory connecting via a 112 L{PassableClientConnector}. It offers the possibility of 113 cancelling an in-progress connection via the stopConnecting() 114 method. 115 116 @param host: the remote host name 117 @type host: str 118 @param port: the tcp port on which to connect 119 @param port int 120 @param authenticator: the authenticator, normally provided by 121 the worker 122 @type authenticator: L{flumotion.twisted.pb.Authenticator} 123 124 @returns: a deferred that will fire with the remote reference, 125 once we have authenticated. 126 """ 127 assert self._factory is None 128 self._factory = FeedClientFactory(self) 129 reactor.connectWith(PassableClientConnector, host, port, 130 self._factory, timeout, bindAddress) 131 return self._factory.login(authenticator)
132
133 - def requestFeed(self, host, port, authenticator, fullFeedId):
134 """Request a feed from a remote feed server. 135 136 This helper method calls startConnecting() to make the 137 connection and authenticate, and will return the feed file 138 descriptor or an error. A pending connection attempt can be 139 cancelled via stopConnecting(). 140 141 @param host: the remote host name 142 @type host: str 143 @param port: the tcp port on which to connect 144 @type port: int 145 @param authenticator: the authenticator, normally provided by 146 the worker 147 @type authenticator: L{flumotion.twisted.pb.Authenticator} 148 @param fullFeedId: the full feed id (/flow/component:feed) 149 offered by the remote side 150 @type fullFeedId: str 151 152 @returns: a deferred that, if successful, will fire with a pair 153 (feedId, fd). In an error case it will errback and close the 154 remote connection. 155 """ 156 def connected(remote): 157 self.setRemoteReference(remote) 158 return remote.callRemote('sendFeed', fullFeedId)
159 160 def feedSent(res): 161 # res is None 162 # either just before or just after this, we received a 163 # sendFeedReply call from the feedserver. so now we're 164 # waiting on the component to get its fd 165 return self._feedToDeferred
166 167 def error(failure): 168 self.warning('failed to retrieve %s from %s:%d', fullFeedId, 169 host, port) 170 self.debug('failure: %s', log.getFailureMessage(failure)) 171 self.debug('closing connection') 172 self.stopConnecting() 173 return failure 174 175 d = self.startConnecting(host, port, authenticator) 176 d.addCallback(connected) 177 d.addCallback(feedSent) 178 d.addErrback(error) 179 return d 180
181 - def sendFeed(self, host, port, authenticator, fullFeedId):
182 """Send a feed to a remote feed server. 183 184 This helper method calls startConnecting() to make the 185 connection and authenticate, and will return the feed file 186 descriptor or an error. A pending connection attempt can be 187 cancelled via stopConnecting(). 188 189 @param host: the remote host name 190 @type host: str 191 @param port: the tcp port on which to connect 192 @type port: int 193 @param authenticator: the authenticator, normally provided by 194 the worker 195 @type authenticator: L{flumotion.twisted.pb.Authenticator} 196 @param fullFeedId: the full feed id (/flow/component:eaterAlias) 197 to feed to on the remote size 198 @type fullFeedId: str 199 200 @returns: a deferred that, if successful, will fire with a pair 201 (feedId, fd). In an error case it will errback and close the 202 remote connection. 203 """ 204 def connected(remote): 205 assert isinstance(remote.broker.transport, _SocketMaybeCloser) 206 self.setRemoteReference(remote) 207 return remote.callRemote('receiveFeed', fullFeedId)
208 209 def feedSent(res): 210 t = self.remote.broker.transport 211 self.debug('stop reading from transport') 212 t.stopReading() 213 214 self.debug('flushing PB write queue') 215 t.doWrite() 216 self.debug('stop writing to transport') 217 t.stopWriting() 218 219 t.keepSocketAlive = True 220 fd = os.dup(t.fileno()) 221 222 # avoid refcount cycles 223 self.setRemoteReference(None) 224 225 d = defer.Deferred() 226 def loseConnection(): 227 t.connectionLost(failure.Failure(main.CONNECTION_DONE)) 228 d.callback((fullFeedId, fd)) 229 230 reactor.callLater(0, loseConnection) 231 return d 232 233 def error(failure): 234 self.warning('failed to retrieve %s from %s:%d', fullFeedId, 235 host, port) 236 self.debug('failure: %s', log.getFailureMessage(failure)) 237 self.debug('closing connection') 238 self.stopConnecting() 239 return failure 240 241 d = self.startConnecting(host, port, authenticator) 242 d.addCallback(connected) 243 d.addCallback(feedSent) 244 d.addErrback(error) 245 return d 246
247 - def stopConnecting(self):
248 """Stop a pending or established connection made via 249 startConnecting(). 250 251 Stops any established or pending connection to a remote feed 252 server started via the startConnecting() method. Safe to call 253 even if connection has not been started. 254 """ 255 if self._factory: 256 self._factory.disconnect() 257 self._factory = None 258 # not sure if this is necessary; call it just in case, so we 259 # don't leave a lingering reference cycle 260 self.setRemoteReference(None)
261 262 ### IMedium methods
263 - def setRemoteReference(self, remoteReference):
264 self.remote = remoteReference
265
266 - def hasRemoteReference(self):
267 return self.remote is not None
268
269 - def callRemote(self, name, *args, **kwargs):
270 return self.remote.callRemote(name, args, kwargs)
271
272 - def remote_sendFeedReply(self, fullFeedId):
273 t = self.remote.broker.transport 274 # make sure we stop receiving PB messages 275 self.debug('stop reading from transport') 276 t.stopReading() 277 reactor.callLater(0, self._doFeedTo, fullFeedId, t)
278
279 - def _doFeedTo(self, fullFeedId, t):
280 self.debug('flushing PB write queue') 281 t.doWrite() 282 self.debug('stop writing to transport') 283 t.stopWriting() 284 285 # make sure shutdown() is not called on the socket 286 t.keepSocketAlive = True 287 288 fd = os.dup(t.fileno()) 289 # Similar to feedserver._sendFeedReplyCb, but since we are in a 290 # callLater, not doReadOrWrite, we call connectionLost directly 291 # on the transport. 292 t.connectionLost(failure.Failure(main.CONNECTION_DONE)) 293 294 # This medium object is of no use any more; drop our reference 295 # to the remote so we can avoid cycles. 296 self.setRemoteReference(None) 297 298 (flowName, componentName, feedName) = common.parseFullFeedId(fullFeedId) 299 feedId = common.feedId(componentName, feedName) 300 301 self.debug('firing deferred with feedId %s on fd %d', feedId, 302 fd) 303 self._feedToDeferred.callback((feedId, fd))
304