Package flumotion :: Package component :: Package misc :: Package httpserver :: Module httpserver
[hide private]

Source Code for Module flumotion.component.misc.httpserver.httpserver

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_httpserver -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import os 
 23  import string 
 24  import time 
 25   
 26  from twisted.web import server, http 
 27  from twisted.web.resource import Resource 
 28  from twisted.internet import defer, reactor, error 
 29  from twisted.cred import credentials 
 30  from zope.interface import implements 
 31   
 32  from flumotion.common import log, messages, errors, netutils, interfaces 
 33  from flumotion.common.i18n import N_, gettexter 
 34  from flumotion.component import component 
 35  from flumotion.component.base import http as httpbase 
 36  from flumotion.component.component import moods 
 37  from flumotion.component.misc.httpserver import httpfile 
 38  from flumotion.component.misc.porter import porterclient 
 39  from flumotion.twisted import fdserver 
 40   
 41  __version__ = "$Rev: 6984 $" 
 42  T_ = gettexter() 
 43   
 44   
45 -class CancellableRequest(server.Request):
46
47 - def __init__(self, channel, queued):
48 server.Request.__init__(self, channel, queued) 49 50 self._component = channel.factory.component 51 self._completed = False 52 self._transfer = None 53 54 self._bytes_written = 0 55 self._start_time = time.time() 56 self._lastTimeWritten = self._start_time 57 58 # we index some things by the fd, so we need to store it so we 59 # can still use it (in the connectionLost() handler and in 60 # finish()) after transport's fd has been closed 61 self._fd = self.transport.fileno() 62 63 self._component.requestStarted(self)
64
65 - def write(self, data):
66 server.Request.write(self, data) 67 68 self._bytes_written += len(data) 69 self._lastTimeWritten = time.time()
70
71 - def finish(self):
72 # it can happen that this method will be called with the 73 # transport's fd already closed (if the connection is lost 74 # early in the request handling) 75 server.Request.finish(self) 76 # We sent Connection: close, so we must close the connection 77 self.transport.loseConnection() 78 self.requestCompleted(self._fd)
79
80 - def connectionLost(self, reason):
81 # this is called _after_ the self.transport.fileno() is not 82 # valid anymore, so we use the stored fd number 83 server.Request.connectionLost(self, reason) 84 self.requestCompleted(self._fd)
85
86 - def requestCompleted(self, fd):
87 if not self._completed: 88 self._component.requestFinished(self, self._bytes_written, 89 time.time() - self._start_time, fd) 90 self._completed = True
91 92
93 -class Site(server.Site):
94 requestFactory = CancellableRequest 95
96 - def __init__(self, resource, component):
97 server.Site.__init__(self, resource) 98 99 self.component = component
100 101
102 -class HTTPFileMedium(component.BaseComponentMedium):
103 - def __init__(self, comp):
104 """ 105 @type comp: L{HTTPFileStreamer} 106 """ 107 component.BaseComponentMedium.__init__(self, comp)
108
109 - def authenticate(self, bouncerName, keycard):
110 """ 111 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None. 112 """ 113 return self.callRemote('authenticate', bouncerName, keycard)
114
115 - def keepAlive(self, bouncerName, issuerName, ttl):
116 """ 117 @rtype: L{twisted.internet.defer.Deferred} 118 """ 119 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
120
121 - def removeKeycardId(self, bouncerName, keycardId):
122 """ 123 @rtype: L{twisted.internet.defer.Deferred} 124 """ 125 return self.callRemote('removeKeycardId', bouncerName, keycardId)
126
127 - def remote_expireKeycard(self, keycardId):
128 return self.comp.httpauth.expireKeycard(keycardId)
129
130 - def remote_getStreamData(self):
131 return self.comp.getStreamData()
132
133 - def remote_getLoadData(self):
134 return self.comp.getLoadData()
135
136 - def remote_updatePorterDetails(self, path, username, password):
137 return self.comp.updatePorterDetails(path, username, password)
138
139 - def remote_rotateLog(self):
140 return self.comp.rotateLog()
141 142
143 -class HTTPFileStreamer(component.BaseComponent, log.Loggable):
144 implements(interfaces.IStreamingComponent) 145 146 componentMediumClass = HTTPFileMedium 147 148 REQUEST_TIMEOUT = 30 # Time out requests after this many seconds of 149 # inactivity 150
151 - def init(self):
152 self.mountPoint = None 153 self.type = None 154 self.port = None 155 self.hostname = None 156 self._rateControlPlug = None 157 self._loggers = [] 158 self._logfilter = None 159 self.httpauth = None 160 161 self._description = 'On-Demand Flumotion Stream' 162 163 self._singleFile = False 164 self._connected_clients = {} # fd -> CancellableRequest 165 self._total_bytes_written = 0 166 167 self._pbclient = None 168 169 self._twistedPort = None 170 self._timeoutRequestsCallLater = None 171 172 self._pendingDisconnects = {} 173 self._rootResource = None 174 175 # FIXME: maybe we want to allow the configuration to specify 176 # additional mime -> File class mapping ? 177 self._mimeToResource = { 178 'video/x-flv': httpfile.FLVFile, 179 } 180 181 # store number of connected clients 182 self.uiState.addKey("connected-clients", 0) 183 self.uiState.addKey("bytes-transferred", 0) 184 self.uiState.addKey('stream-url', None)
185
186 - def do_check(self):
187 props = self.config['properties'] 188 self.fixRenamedProperties(props, [ 189 ('issuer', 'issuer-class'), 190 ('porter_socket_path', 'porter-socket-path'), 191 ('porter_username', 'porter-username'), 192 ('porter_password', 'porter-password'), 193 ('mount_point', 'mount-point') 194 ]) 195 196 if props.get('type', 'master') == 'slave': 197 for k in 'socket-path', 'username', 'password': 198 if not 'porter-' + k in props: 199 msg = 'slave mode, missing required property porter-%s' % k 200 return defer.fail(errors.ConfigError(msg)) 201 202 path = props.get('path', None) 203 if path is None: 204 return 205 if os.path.isfile(path): 206 self._singleFile = True 207 elif os.path.isdir(path): 208 self._singleFile = False 209 else: 210 msg = "the file or directory specified in 'path': %s does " \ 211 "not exist or is neither a file nor directory" % path 212 return defer.fail(errors.ConfigError(msg))
213
214 - def have_properties(self, props):
215 desc = props.get('description', None) 216 if desc: 217 self._description = desc 218 219 # always make sure the mount point starts with / 220 mountPoint = props.get('mount-point', '/') 221 if not mountPoint.startswith('/'): 222 mountPoint = '/' + mountPoint 223 self.mountPoint = mountPoint 224 self.hostname = props.get('hostname', None) 225 if not self.hostname: 226 self.hostname = netutils.guess_public_hostname() 227 228 self.filePath = props.get('path') 229 self.type = props.get('type', 'master') 230 self.port = props.get('port', 8801) 231 if self.type == 'slave': 232 # already checked for these in do_check 233 self._porterPath = props['porter-socket-path'] 234 self._porterUsername = props['porter-username'] 235 self._porterPassword = props['porter-password'] 236 self._loggers = \ 237 self.plugs.get('flumotion.component.plugs.loggers.Logger', []) 238 239 self.httpauth = httpbase.HTTPAuthentication(self) 240 if 'bouncer' in props: 241 self.httpauth.setBouncerName(props['bouncer']) 242 if 'issuer-class' in props: 243 self.httpauth.setIssuerClass(props['issuer-class']) 244 if 'ip-filter' in props: 245 logFilter = http.LogFilter() 246 for f in props['ip-filter']: 247 logFilter.addIPFilter(f) 248 self._logfilter = logFilter 249 250 socket = 'flumotion.component.misc.httpserver.ratecontroller.RateController' 251 plugs = self.plugs.get(socket, []) 252 if plugs: 253 # Rate controller factory plug; only one supported. 254 self._rateControlPlug = self.plugs[socket][-1] 255 256 # Update uiState 257 self.uiState.set('stream-url', self.getUrl())
258
259 - def do_setup(self):
260 self.have_properties(self.config['properties']) 261 262 root = self._rootResource 263 if root is None: 264 root = self._getDefaultRootResource() 265 266 if root is None: 267 raise errors.WrongStateError( 268 "a resource or path property must be set") 269 270 site = Site(root, self) 271 self._timeoutRequestsCallLater = reactor.callLater( 272 self.REQUEST_TIMEOUT, self._timeoutRequests) 273 274 d = defer.Deferred() 275 if self.type == 'slave': 276 # Streamer is slaved to a porter. 277 if self._singleFile: 278 self._pbclient = porterclient.HTTPPorterClientFactory( 279 site, [self.mountPoint], d) 280 else: 281 self._pbclient = porterclient.HTTPPorterClientFactory( 282 site, [], d, 283 prefixes=[self.mountPoint]) 284 creds = credentials.UsernamePassword(self._porterUsername, 285 self._porterPassword) 286 self._pbclient.startLogin(creds, self._pbclient.medium) 287 self.debug("Starting porter login!") 288 # This will eventually cause d to fire 289 reactor.connectWith(fdserver.FDConnector, self._porterPath, 290 self._pbclient, 10, checkPID=False) 291 else: 292 # File Streamer is standalone. 293 try: 294 self.debug('Going to listen on port %d' % self.port) 295 iface = "" 296 # we could be listening on port 0, in which case we need 297 # to figure out the actual port we listen on 298 self._twistedPort = reactor.listenTCP(self.port, 299 site, interface=iface) 300 self.port = self._twistedPort.getHost().port 301 self.debug('Listening on port %d' % self.port) 302 except error.CannotListenError: 303 t = 'Port %d is not available.' % self.port 304 self.warning(t) 305 m = messages.Error(T_(N_( 306 "Network error: TCP port %d is not available."), self.port)) 307 self.addMessage(m) 308 self.setMood(moods.sad) 309 return defer.fail(errors.ComponentStartHandledError(t)) 310 # fire callback so component gets happy 311 d.callback(None) 312 # we are responsible for setting component happy 313 def setComponentHappy(result): 314 self.httpauth.scheduleKeepAlive() 315 self.setMood(moods.happy) 316 return result
317 d.addCallback(setComponentHappy) 318 return d
319
320 - def do_stop(self):
321 if self.httpauth: 322 self.httpauth.stopKeepAlive() 323 if self._timeoutRequestsCallLater: 324 self._timeoutRequestsCallLater.cancel() 325 self._timeoutRequestsCallLater = None 326 if self._twistedPort: 327 self._twistedPort.stopListening() 328 329 l = [self.remove_all_clients()] 330 if self.type == 'slave' and self._pbclient: 331 if self._singleFile: 332 l.append(self._pbclient.deregisterPath(self.mountPoint)) 333 else: 334 l.append(self._pbclient.deregisterPrefix(self.mountPoint)) 335 return defer.DeferredList(l)
336
337 - def updatePorterDetails(self, path, username, password):
338 """ 339 Provide a new set of porter login information, for when we're in slave 340 mode and the porter changes. 341 If we're currently connected, this won't disconnect - it'll just change 342 the information so that next time we try and connect we'll use the 343 new ones 344 @param path: new path 345 @param username: new username 346 @param password: new password 347 """ 348 if self.type != 'slave': 349 raise errors.WrongStateError( 350 "Can't specify porter details in master mode") 351 352 self._porterUsername = username 353 self._porterPassword = password 354 355 creds = credentials.UsernamePassword(self._porterUsername, 356 self._porterPassword) 357 self._pbclient.startLogin(creds, self.medium) 358 359 self._updatePath(path)
360
361 - def _updatePath(self, path):
362 # If we've changed paths, we must do some extra work. 363 if path == self._porterPath: 364 return 365 self._porterPath = path 366 367 # Stop trying to connect with the old connector. 368 self._pbclient.stopTrying() 369 370 self._pbclient.resetDelay() 371 reactor.connectWith(fdserver.FDConnector, self._porterPath, 372 self._pbclient, 10, checkPID=False)
373
374 - def _timeoutRequests(self):
375 now = time.time() 376 for request in self._connected_clients.values(): 377 if now - request._lastTimeWritten > self.REQUEST_TIMEOUT: 378 self.debug("Timing out connection") 379 # Apparently this is private API. However, calling 380 # loseConnection is not sufficient - it won't drop the 381 # connection until the send queue is empty, which might never 382 # happen for an uncooperative client 383 request.channel.transport.connectionLost( 384 errors.TimeoutException()) 385 386 self._timeoutRequestsCallLater = reactor.callLater( 387 self.REQUEST_TIMEOUT, self._timeoutRequests)
388
389 - def _getDefaultRootResource(self):
390 if self.filePath is None: 391 return None 392 393 self.debug('Starting with mount point "%s"' % self.mountPoint) 394 factory = httpfile.MimedFileFactory(self.httpauth, 395 mimeToResource=self._mimeToResource, 396 rateController=self._rateControlPlug) 397 398 root = factory.create(self.filePath) 399 if self.mountPoint != '/': 400 root = self._createRootResourceForPath(self.mountPoint, root) 401 402 return root
403
404 - def _createRootResourceForPath(self, path, fileResource):
405 if path.endswith('/'): 406 path = path[:-1] 407 408 root = Resource() 409 children = string.split(path[1:], '/') 410 parent = root 411 for child in children[:-1]: 412 resource = Resource() 413 self.debug("Putting Resource at %s", child) 414 parent.putChild(child, resource) 415 parent = resource 416 self.debug("Putting resource %r at %r", fileResource, children[-1]) 417 parent.putChild(children[-1], fileResource) 418 return root
419
420 - def remove_client(self, fd):
421 """ 422 Remove a client when requested. 423 424 Used by keycard expiry. 425 """ 426 if fd in self._connected_clients: 427 request = self._connected_clients[fd] 428 self.debug("Removing client for fd %d", fd) 429 request.unregisterProducer() 430 request.channel.transport.loseConnection() 431 else: 432 self.debug("No client with fd %d found", fd)
433
434 - def remove_all_clients(self):
435 l = [] 436 for fd in self._connected_clients: 437 d = defer.Deferred() 438 self._pendingDisconnects[fd] = d 439 l.append(d) 440 441 request = self._connected_clients[fd] 442 request.unregisterProducer() 443 request.channel.transport.loseConnection() 444 445 self.debug("Waiting for %d clients to finish", len(l)) 446 return defer.DeferredList(l)
447
448 - def requestStarted(self, request):
449 fd = request.transport.fileno() # ugly! 450 self._connected_clients[fd] = request 451 self.uiState.set("connected-clients", len(self._connected_clients))
452
453 - def requestFinished(self, request, bytesWritten, timeConnected, fd):
454 self.httpauth.cleanupAuth(fd) 455 headers = request.getAllHeaders() 456 457 ip = request.getClientIP() 458 if not self._logfilter or not self._logfilter.isInRange(ip): 459 args = {'ip': ip, 460 'time': time.gmtime(), 461 'method': request.method, 462 'uri': request.uri, 463 'username': '-', # FIXME: put the httpauth name 464 'get-parameters': request.args, 465 'clientproto': request.clientproto, 466 'response': request.code, 467 'bytes-sent': bytesWritten, 468 'referer': headers.get('referer', None), 469 'user-agent': headers.get('user-agent', None), 470 'time-connected': timeConnected} 471 472 l = [] 473 for logger in self._loggers: 474 l.append(defer.maybeDeferred( 475 logger.event, 'http_session_completed', args)) 476 d = defer.DeferredList(l) 477 else: 478 d = defer.succeed(None) 479 480 del self._connected_clients[fd] 481 482 self.uiState.set("connected-clients", len(self._connected_clients)) 483 484 self._total_bytes_written += bytesWritten 485 self.uiState.set("bytes-transferred", self._total_bytes_written) 486 487 def firePendingDisconnect(_): 488 self.debug("Logging completed") 489 if fd in self._pendingDisconnects: 490 pending = self._pendingDisconnects.pop(fd) 491 self.debug("Firing pending disconnect deferred") 492 pending.callback(None)
493 d.addCallback(firePendingDisconnect) 494
495 - def getDescription(self):
496 return self._description
497
498 - def getUrl(self):
499 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
500
501 - def getStreamData(self):
502 socket = 'flumotion.component.plugs.streamdata.StreamDataProvider' 503 if self.plugs[socket]: 504 plug = self.plugs[socket][-1] 505 return plug.getStreamData() 506 else: 507 return { 508 'protocol': 'HTTP', 509 'description': self._description, 510 'url' : self.getUrl() 511 }
512
513 - def getLoadData(self):
514 """ 515 Return a tuple (deltaadded, deltaremoved, bytes_transferred, 516 current_clients, current_load) of our current bandwidth and user values. 517 The deltas and current_load are NOT currently implemented here, we set 518 them as zero. 519 """ 520 bytesTransferred = self._total_bytes_written 521 for request in self._connected_clients.values(): 522 if request._transfer: 523 bytesTransferred += request._transfer.bytesWritten 524 525 return (0, 0, bytesTransferred, len(self._connected_clients), 0)
526
527 - def rotateLog(self):
528 """ 529 Close the logfile, then reopen using the previous logfilename 530 """ 531 for logger in self._loggers: 532 self.debug('rotating logger %r' % logger) 533 logger.rotate()
534
535 - def setRootResource(self, resource):
536 """Attaches a root resource to this component. The root resource is the 537 once which will be used when accessing the mount point. 538 This is normally called from a plugs start() method. 539 @param resource: root resource 540 @type resource: L{twisted.web.resource.Resource} 541 """ 542 rootResource = self._createRootResourceForPath( 543 self.getMountPoint(), resource) 544 545 self._rootResource = rootResource
546
547 - def getMountPoint(self):
548 """Get the mount point of this component 549 @returns: the mount point 550 """ 551 # This is called early, before do_setup() 552 return self.config['properties'].get('mount-point')
553