Package flumotion :: Package component :: Package misc :: Package httpfile :: Module httpfile
[hide private]

Source Code for Module flumotion.component.misc.httpfile.httpfile

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21  import os 
 22  import time 
 23  import string 
 24   
 25  from flumotion.component import component 
 26  from flumotion.common import log, messages, errors, netutils, interfaces 
 27  from flumotion.component.component import moods 
 28  from flumotion.component.misc.porter import porterclient 
 29  from flumotion.component.base import http as httpbase 
 30  from twisted.web import resource, static, server, http 
 31  from twisted.web import error as weberror 
 32  from twisted.internet import defer, reactor, error 
 33  from flumotion.twisted import fdserver 
 34  from flumotion.twisted.compat import implements 
 35  from twisted.cred import credentials 
 36   
 37  from flumotion.component.misc.httpfile import file 
 38   
 39  from flumotion.common.messages import N_ 
 40  T_ = messages.gettexter('flumotion') 
 41   
42 -class CancellableRequest(server.Request):
43
44 - def __init__(self, channel, queued):
45 server.Request.__init__(self, channel, queued) 46 47 self._component = channel.factory.component 48 self._completed = False 49 self._transfer = None 50 51 self._bytes_written = 0 52 self._start_time = time.time() 53 self._lastTimeWritten = self._start_time 54 55 self._component.requestStarted(self)
56
57 - def write(self, data):
58 server.Request.write(self, data) 59 60 self._bytes_written += len(data) 61 self._lastTimeWritten = time.time()
62
63 - def finish(self):
64 server.Request.finish(self) 65 66 # We sent Connection: close, so we must close the connection 67 self.transport.loseConnection() 68 self.requestCompleted()
69
70 - def connectionLost(self, reason):
71 server.Request.connectionLost(self, reason) 72 self.requestCompleted()
73
74 - def requestCompleted(self):
75 if not self._completed: 76 self._component.requestFinished(self, self._bytes_written, 77 time.time() - self._start_time) 78 self._completed = True
79
80 -class Site(server.Site):
81 requestFactory = CancellableRequest 82
83 - def __init__(self, resource, component):
84 server.Site.__init__(self, resource) 85 86 self.component = component
87
88 -class HTTPFileMedium(component.BaseComponentMedium):
89 - def __init__(self, comp):
90 """ 91 @type comp: L{HTTPFileStreamer} 92 """ 93 component.BaseComponentMedium.__init__(self, comp)
94
95 - def authenticate(self, bouncerName, keycard):
96 """ 97 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None. 98 """ 99 return self.callRemote('authenticate', bouncerName, keycard)
100
101 - def removeKeycardId(self, bouncerName, keycardId):
102 """ 103 @rtype: L{twisted.internet.defer.Deferred} 104 """ 105 return self.callRemote('removeKeycardId', bouncerName, keycardId)
106
107 - def remote_getStreamData(self):
108 return self.comp.getStreamData()
109
110 - def remote_getLoadData(self):
111 return self.comp.getLoadData()
112
113 - def remote_updatePorterDetails(self, path, username, password):
114 return self.comp.updatePorterDetails(path, username, password)
115
116 - def remote_rotateLog(self):
117 return self.comp.rotateLog()
118
119 -class HTTPFileStreamer(component.BaseComponent, httpbase.HTTPAuthentication, 120 log.Loggable):
121 implements(interfaces.IStreamingComponent) 122 123 componentMediumClass = HTTPFileMedium 124 125 REQUEST_TIMEOUT = 30 # Time out requests after this many seconds of 126 # inactivity 127
128 - def __init__(self):
129 component.BaseComponent.__init__(self) 130 httpbase.HTTPAuthentication.__init__(self, self)
131
132 - def init(self):
133 self.mountPoint = None 134 self.type = None 135 self.port = None 136 self.hostname = None 137 self.loggers = [] 138 self.logfilter = None 139 140 self.description = 'On-Demand Flumotion Stream', 141 142 self._singleFile = False 143 self._connected_clients = [] 144 self._total_bytes_written = 0 145 146 self._pbclient = None 147 148 # store number of connected clients 149 self.uiState.addKey("connected-clients", 0) 150 self.uiState.addKey("bytes-transferred", 0)
151
152 - def getDescription(self):
153 return self.description
154
155 - def do_setup(self):
156 props = self.config['properties'] 157 158 mountPoint = props.get('mount-point', '') 159 if not mountPoint.startswith('/'): 160 mountPoint = '/' + mountPoint 161 self.mountPoint = mountPoint 162 self.hostname = props.get('hostname', None) 163 if not self.hostname: 164 self.hostname = netutils.guess_public_hostname() 165 166 self.filePath = props.get('path') 167 self.type = props.get('type', 'master') 168 self.port = props.get('port', 8801) 169 if self.type == 'slave': 170 # already checked for these in do_check 171 self._porterPath = props['porter-socket-path'] 172 self._porterUsername = props['porter-username'] 173 self._porterPassword = props['porter-password'] 174 self.loggers = \ 175 self.plugs['flumotion.component.plugs.loggers.Logger'] 176 177 if 'bouncer' in props: 178 self.setBouncerName(props['bouncer']) 179 if 'issuer-class' in props: 180 self.setIssuerClass(props['issuer-class']) 181 if 'ip-filter' in props: 182 filter = http.LogFilter() 183 for f in props['ip-filter']: 184 filter.addIPFilter(f) 185 self.logfilter = filter
186
187 - def do_stop(self):
188 if self.type == 'slave' and self._pbclient: 189 return self._pbclient.deregisterPath(self.mountPoint) 190 191 return component.BaseComponent.do_stop(self)
192
193 - def updatePorterDetails(self, path, username, password):
194 """ 195 Provide a new set of porter login information, for when we're in slave 196 mode and the porter changes. 197 If we're currently connected, this won't disconnect - it'll just change 198 the information so that next time we try and connect we'll use the 199 new ones 200 """ 201 if self.type == 'slave': 202 self._porterUsername = username 203 self._porterPassword = password 204 205 creds = credentials.UsernamePassword(self._porterUsername, 206 self._porterPassword) 207 self._pbclient.startLogin(creds, self.medium) 208 209 # If we've changed paths, we must do some extra work. 210 if path != self._porterPath: 211 self._porterPath = path 212 self._pbclient.stopTrying() # Stop trying to connect with the 213 # old connector. 214 self._pbclient.resetDelay() 215 reactor.connectWith( 216 fdserver.FDConnector, self._porterPath, 217 self._pbclient, 10, checkPID=False) 218 else: 219 raise errors.WrongStateError( 220 "Can't specify porter details in master mode")
221
222 - def do_start(self, *args, **kwargs):
223 #root = HTTPRoot() 224 root = resource.Resource() 225 # TwistedWeb wants the child path to not include the leading / 226 mount = self.mountPoint[1:] 227 # split path on / and add iteratively twisted.web resources 228 children = string.split(mount, '/') 229 current_resource = root 230 for child in children[:-1]: 231 res = resource.Resource() 232 current_resource.putChild(child, res) 233 current_resource = res 234 fileResource = file.File(self.filePath, self) 235 self.debug("Putting File resource at %r", children[-1:][0]) 236 current_resource.putChild(children[-1:][0], fileResource) 237 238 reactor.callLater(self.REQUEST_TIMEOUT, self._timeoutRequests) 239 240 d = defer.Deferred() 241 if self.type == 'slave': 242 # Streamer is slaved to a porter. 243 if self._singleFile: 244 self._pbclient = porterclient.HTTPPorterClientFactory( 245 Site(root, self), [self.mountPoint], d) 246 else: 247 self._pbclient = porterclient.HTTPPorterClientFactory( 248 Site(root, self), [], d, 249 prefixes=[self.mountPoint]) 250 creds = credentials.UsernamePassword(self._porterUsername, 251 self._porterPassword) 252 self._pbclient.startLogin(creds, self.medium) 253 self.debug("Starting porter login!") 254 # This will eventually cause d to fire 255 reactor.connectWith(fdserver.FDConnector, self._porterPath, 256 self._pbclient, 10, checkPID=False) 257 else: 258 # File Streamer is standalone. 259 try: 260 self.debug('Listening on %s' % self.port) 261 iface = "" 262 reactor.listenTCP(self.port, Site(root, self), 263 interface=iface) 264 except error.CannotListenError: 265 t = 'Port %d is not available.' % self.port 266 self.warning(t) 267 m = messages.Error(T_(N_( 268 "Network error: TCP port %d is not available."), self.port)) 269 self.addMessage(m) 270 self.setMood(moods.sad) 271 return defer.fail(errors.ComponentStartHandledError(t)) 272 # fire callback so component gets happy 273 d.callback(None) 274 # we are responsible for setting component happy 275 def setComponentHappy(result): 276 self.setMood(moods.happy) 277 return result
278 d.addCallback(setComponentHappy) 279 return d
280
281 - def do_check(self):
282 props = self.config['properties'] 283 self.fixRenamedProperties(props, [ 284 ('issuer', 'issuer-class'), 285 ('porter_socket_path', 'porter-socket-path'), 286 ('porter_username', 'porter-username'), 287 ('porter_password', 'porter-password'), 288 ('mount_point', 'mount-point') 289 ]) 290 291 if props.get('type', 'master') == 'slave': 292 for k in 'socket-path', 'username', 'password': 293 if not 'porter-' + k in props: 294 msg = 'slave mode, missing required property porter-%s' % k 295 return defer.fail(errors.ConfigError(msg)) 296 else: 297 if not 'port' in props: 298 msg = "master mode, missing required property 'port'" 299 return defer.fail(errors.ConfigError(msg)) 300 301 if props.get('mount-point', None) is not None: 302 if props['mount-point'] == '/': 303 return defer.fail(errors.ConfigError( 304 "A mount-point of / is not supported in this release")) 305 306 path = props.get('path', None) 307 if path is None: 308 msg = "missing required property 'path'" 309 return defer.fail(errors.ConfigError(msg)) 310 if os.path.isfile(path): 311 self._singleFile = True 312 elif os.path.isdir(path): 313 self._singleFile = False 314 else: 315 msg = "the file or directory specified in 'path': %s does " \ 316 "not exist or is neither a file nor directory" % path 317 return defer.fail(errors.ConfigError(msg))
318
319 - def _timeoutRequests(self):
320 now = time.time() 321 for request in self._connected_clients: 322 if now - request._lastTimeWritten > self.REQUEST_TIMEOUT: 323 self.debug("Timing out connection") 324 # Apparently this is private API. However, calling 325 # loseConnection is not sufficient - it won't drop the 326 # connection until the send queue is empty, which might never 327 # happen for an uncooperative client 328 request.channel.transport.connectionLost( 329 errors.TimeoutException()) 330 331 reactor.callLater(self.REQUEST_TIMEOUT, self._timeoutRequests)
332
333 - def requestStarted(self, request):
334 self._connected_clients.append(request) 335 self.uiState.set("connected-clients", self._connected_clients)
336
337 - def requestFinished(self, request, bytesWritten, timeConnected):
338 headers = request.getAllHeaders() 339 340 ip = request.getClientIP() 341 if not self.logfilter or not self.logfilter.isInRange(ip): 342 args = {'ip': ip, 343 'time': time.gmtime(), 344 'method': request.method, 345 'uri': request.uri, 346 'username': '-', # FIXME: put the httpauth name 347 'get-parameters': request.args, 348 'clientproto': request.clientproto, 349 'response': request.code, 350 'bytes-sent': bytesWritten, 351 'referer': headers.get('referer', None), 352 'user-agent': headers.get('user-agent', None), 353 'time-connected': timeConnected} 354 355 for logger in self.loggers: 356 logger.event('http_session_completed', args) 357 358 self._connected_clients.remove(request) 359 360 self.uiState.set("connected-clients", len(self._connected_clients)) 361 362 self._total_bytes_written += bytesWritten 363 self.uiState.set("bytes-transferred", self._total_bytes_written)
364
365 - def getUrl(self):
366 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
367
368 - def getStreamData(self):
369 socket = 'flumotion.component.plugs.streamdata.StreamDataProvider' 370 if self.plugs[socket]: 371 plug = self.plugs[socket][-1] 372 return plug.getStreamData() 373 else: 374 return { 375 'protocol': 'HTTP', 376 'description': self.description, 377 'url' : self.getUrl() 378 }
379
380 - def getLoadData(self):
381 """ 382 Return a tuple (deltaadded, deltaremoved, bytes_transferred, 383 current_clients, current_load) of our current bandwidth and user values. 384 The deltas and current_load are NOT currently implemented here, we set 385 them as zero. 386 """ 387 bytesTransferred = self._total_bytes_written 388 for request in self._connected_clients: 389 if request._transfer: 390 bytesTransferred += request._transfer.bytesSent 391 392 return (0, 0, bytesTransferred, len(self._connected_clients), 0)
393 394 # Override HTTPAuthentication methods
395 - def authenticateKeycard(self, bouncerName, keycard):
396 return self.medium.authenticate(bouncerName, keycard)
397
398 - def cleanupKeycard(self, bouncerName, keycard):
399 return self.medium.removeKeycardId(bouncerName, keycard.id)
400
401 - def clientDone(self, fd):
402 # TODO: implement this properly. 403 self.warning ("Expiring clients is not implemented for static " 404 "fileserving")
405
406 - def rotateLog(self):
407 """ 408 Close the logfile, then reopen using the previous logfilename 409 """ 410 for logger in self.loggers: 411 self.debug('rotating logger %r' % logger) 412 logger.rotate()
413