Package flumotion :: Package component :: Package consumers :: Package httpstreamer :: Module resources
[hide private]

Source Code for Module flumotion.component.consumers.httpstreamer.resources

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import os 
 23  import socket 
 24  import time 
 25  import errno 
 26  import string 
 27  import resource 
 28  import fcntl 
 29   
 30  import gst 
 31   
 32  try: 
 33      from twisted.web import http 
 34  except ImportError: 
 35      from twisted.protocols import http 
 36   
 37  from twisted.web import server, resource as web_resource 
 38  from twisted.internet import reactor, defer 
 39  from twisted.python import reflect 
 40   
 41  from flumotion.configure import configure 
 42  from flumotion.common import errors 
 43   
 44  from flumotion.common import common, log, keycards 
 45   
 46  from flumotion.component.base import http as httpbase 
 47   
 48  __all__ = ['HTTPStreamingResource', 'MultifdSinkStreamer'] 
 49  __version__ = "$Rev: 6628 $" 
 50   
 51  HTTP_NAME = 'FlumotionHTTPServer' 
 52  HTTP_VERSION = configure.version 
 53   
 54  ERROR_TEMPLATE = """<!doctype html public "-//IETF//DTD HTML 2.0//EN"> 
 55  <html> 
 56  <head> 
 57    <title>%(code)d %(error)s</title> 
 58  </head> 
 59  <body> 
 60  <h2>%(code)d %(error)s</h2> 
 61  </body> 
 62  </html> 
 63  """ 
 64   
 65  HTTP_SERVER = '%s/%s' % (HTTP_NAME, HTTP_VERSION) 
 66   
 67  ### the Twisted resource that handles the base URL 
68 -class HTTPStreamingResource(web_resource.Resource, log.Loggable):
69 70 __reserve_fds__ = 50 # number of fd's to reserve for non-streaming 71 72 logCategory = 'httpstreamer' 73 74 # IResource interface variable; True means it will not chain requests 75 # further down the path to other resource providers through 76 # getChildWithDefault 77 isLeaf = True 78
79 - def __init__(self, streamer, httpauth):
80 """ 81 @param streamer: L{MultifdSinkStreamer} 82 """ 83 self.streamer = streamer 84 self.httpauth = httpauth 85 86 self._requests = {} # request fd -> Request 87 88 self.maxclients = self.getMaxAllowedClients(-1) 89 self.maxbandwidth = -1 # not limited by default 90 91 # If set, a URL to redirect a user to when the limits above are reached 92 self._redirectOnFull = None 93 94 self._removing = {} # Optional deferred notification of client removals. 95 96 self.loggers = \ 97 streamer.plugs['flumotion.component.plugs.loggers.Logger'] 98 99 self.logfilter = None 100 101 web_resource.Resource.__init__(self)
102
103 - def clientRemoved(self, sink, fd, reason, stats):
104 # this is the callback attached to our flumotion component, 105 # not the GStreamer element 106 if fd in self._requests: 107 request = self._requests[fd] 108 self._removeClient(request, fd, stats) 109 else: 110 self.warning('[fd %5d] not found in _requests' % fd)
111
112 - def removeAllClients(self):
113 """ 114 Start to remove all the clients connected (this will complete 115 asynchronously from another thread) 116 117 Returns a deferred that will fire once they're all removed. 118 """ 119 l = [] 120 for fd in self._requests: 121 self._removing[fd] = defer.Deferred() 122 l.append(self._removing[fd]) 123 self.streamer.remove_client(fd) 124 125 return defer.DeferredList(l)
126
127 - def setRoot(self, path):
128 self.putChild(path, self)
129
130 - def setLogFilter(self, logfilter):
131 self.logfilter = logfilter
132
133 - def rotateLogs(self):
134 """ 135 Close the logfile, then reopen using the previous logfilename 136 """ 137 for logger in self.loggers: 138 self.debug('rotating logger %r' % logger) 139 logger.rotate()
140
141 - def logWrite(self, fd, ip, request, stats):
142 143 headers = request.getAllHeaders() 144 145 if stats: 146 bytes_sent = stats[0] 147 time_connected = int(stats[3] / gst.SECOND) 148 else: 149 bytes_sent = -1 150 time_connected = -1 151 152 args = {'ip': ip, 153 'time': time.gmtime(), 154 'method': request.method, 155 'uri': request.uri, 156 'username': '-', # FIXME: put the httpauth name 157 'get-parameters': request.args, 158 'clientproto': request.clientproto, 159 'response': request.code, 160 'bytes-sent': bytes_sent, 161 'referer': headers.get('referer', None), 162 'user-agent': headers.get('user-agent', None), 163 'time-connected': time_connected} 164 165 l = [] 166 for logger in self.loggers: 167 l.append(defer.maybeDeferred( 168 logger.event, 'http_session_completed', args)) 169 170 return defer.DeferredList(l)
171
172 - def setUserLimit(self, limit):
173 self.info('setting maxclients to %d' % limit) 174 self.maxclients = self.getMaxAllowedClients(limit) 175 # Log what we actually managed to set it to. 176 self.info('set maxclients to %d' % self.maxclients)
177
178 - def setBandwidthLimit(self, limit):
179 self.maxbandwidth = limit 180 self.info("set maxbandwidth to %d", self.maxbandwidth)
181
182 - def setRedirectionOnLimits(self, url):
183 self._redirectOnFull = url
184 185 # FIXME: rename to writeHeaders
186 - def _writeHeaders(self, request):
187 """ 188 Write out the HTTP headers for the incoming HTTP request. 189 190 @rtype: boolean 191 @returns: whether or not the file descriptor can be used further. 192 """ 193 fd = request.transport.fileno() 194 fdi = request.fdIncoming 195 196 # the fd could have been closed, in which case it will be -1 197 if fd == -1: 198 self.info('[fd %5d] Client gone before writing header' % fdi) 199 # FIXME: do this ? del request 200 return False 201 if fd != request.fdIncoming: 202 self.warning('[fd %5d] does not match current fd %d' % (fdi, fd)) 203 # FIXME: do this ? del request 204 return False 205 206 headers = [] 207 208 def setHeader(field, name): 209 headers.append('%s: %s\r\n' % (field, name))
210 211 # Mimic Twisted as close as possible 212 content = self.streamer.get_content_type() 213 setHeader('Server', HTTP_SERVER) 214 setHeader('Date', http.datetimeToString()) 215 setHeader('Cache-Control', 'no-cache') 216 setHeader('Cache-Control', 'private') 217 setHeader('Content-type', content) 218 219 # ASF needs a Pragma header for live broadcasts 220 # Apparently ASF breaks on WMP port 80 if you use the pragma header 221 # - Sep 5 2006 222 #if content in [ 223 # "video/x-ms-asf", 224 # "audio/x-ms-asf", 225 #]: 226 #setHeader('Pragma', 'features=broadcast') 227 228 #self.debug('setting Content-type to %s' % mime) 229 ### FIXME: there's a window where Twisted could have removed the 230 # fd because the client disconnected. Catch EBADF correctly here. 231 try: 232 # TODO: This is a non-blocking socket, we really should check 233 # return values here, or just let twisted handle all of this 234 # normally, and not hand off the fd until after twisted has 235 # finished writing the headers. 236 os.write(fd, 'HTTP/1.0 200 OK\r\n%s\r\n' % ''.join(headers)) 237 # tell TwistedWeb we already wrote headers ourselves 238 request.startedWriting = True 239 return True 240 except OSError, (no, s): 241 if no == errno.EBADF: 242 self.info('[fd %5d] client gone before writing header' % fd) 243 elif no == errno.ECONNRESET: 244 self.info('[fd %5d] client reset connection writing header' % fd) 245 else: 246 self.info('[fd %5d] unhandled write error when writing header: %s' % (fd, s)) 247 # trigger cleanup of request 248 del request 249 return False
250
251 - def isReady(self):
252 if self.streamer.caps == None: 253 self.debug('We have no caps yet') 254 return False 255 256 return True
257
258 - def getMaxAllowedClients(self, maxclients):
259 """ 260 maximum number of allowed clients based on soft limit for number of 261 open file descriptors and fd reservation. Increases soft limit to 262 hard limit if possible. 263 """ 264 (softmax, hardmax) = resource.getrlimit(resource.RLIMIT_NOFILE) 265 import sys 266 version = sys.version_info 267 268 if maxclients != -1: 269 neededfds = maxclients + self.__reserve_fds__ 270 271 # Bug in python 2.4.3, see http://sourceforge.net/tracker/index.php?func=detail&aid=1494314&group_id=5470&atid=105470 272 if version[:3] == (2, 4, 3) and \ 273 not hasattr(socket, "has_2_4_3_patch"): 274 self.warning( 275 'Setting hardmax to 1024 due to python 2.4.3 bug') 276 hardmax = 1024 277 278 if neededfds > softmax: 279 lim = min(neededfds, hardmax) 280 resource.setrlimit(resource.RLIMIT_NOFILE, (lim, hardmax)) 281 return lim - self.__reserve_fds__ 282 else: 283 return maxclients 284 else: 285 return softmax - self.__reserve_fds__
286
287 - def reachedServerLimits(self):
288 if self.maxclients >= 0 and len(self._requests) >= self.maxclients: 289 return True 290 elif self.maxbandwidth >= 0: 291 # Reject if adding one more client would take us over the limit. 292 if ((len(self._requests) + 1) * 293 self.streamer.getCurrentBitrate() >= self.maxbandwidth): 294 return True 295 return False
296
297 - def _addClient(self, request):
298 """ 299 Add a request, so it can be used for statistics. 300 301 @param request: the request 302 @type request: twisted.protocol.http.Request 303 """ 304 305 fd = request.transport.fileno() 306 self._requests[fd] = request
307
308 - def _logRequestFromIP(self, ip):
309 """ 310 Returns whether we want to log a request from this IP; allows us to 311 filter requests from automated monitoring systems. 312 """ 313 if self.logfilter: 314 return not self.logfilter.isInRange(ip) 315 else: 316 return True
317
318 - def _removeClient(self, request, fd, stats):
319 """ 320 Removes a request and add logging. 321 Note that it does not disconnect the client; it is called in reaction 322 to a client disconnecting. 323 It also removes the keycard if one was created. 324 325 @param request: the request 326 @type request: L{twisted.protocols.http.Request} 327 @param fd: the file descriptor for the client being removed 328 @type fd: L{int} 329 @param stats: the statistics for the removed client 330 @type stats: GValueArray 331 """ 332 333 ip = request.getClientIP() 334 if self._logRequestFromIP(ip): 335 d = self.logWrite(fd, ip, request, stats) 336 else: 337 d = defer.succeed(True) 338 self.info('[fd %5d] Client from %s disconnected' % (fd, ip)) 339 340 # We can't call request.finish(), since we already "stole" the fd, we 341 # just loseConnection on the transport directly, and delete the 342 # Request object, after cleaning up the bouncer bits. 343 self.httpauth.cleanupAuth(fd) 344 345 self.debug('[fd %5d] closing transport %r' % (fd, request.transport)) 346 # This will close the underlying socket. We first remove the request 347 # from our fd->request map, since the moment we call this the fd might 348 # get re-added. 349 del self._requests[fd] 350 request.transport.loseConnection() 351 352 self.debug('[fd %5d] closed transport %r' % (fd, request.transport)) 353 354 def _done(_): 355 if fd in self._removing: 356 self.debug("client is removed; firing deferred") 357 removeD = self._removing.pop(fd) 358 removeD.callback(None)
359 d.addCallback(_done) 360 return d 361
362 - def handleAuthenticatedRequest(self, res, request):
363 if request.method == 'GET': 364 self._handleNewClient(request) 365 elif request.method == 'HEAD': 366 self.debug('handling HEAD request') 367 self._writeHeaders(request) 368 request.finish() 369 else: 370 raise AssertionError 371 372 return res
373 374 ### resource.Resource methods 375 376 # this is the callback receiving the request initially
377 - def _render(self, request):
378 fd = request.transport.fileno() 379 # we store the fd again in the request using it as an id for later 380 # on, so we can check when an fd went away (being -1) inbetween 381 request.fdIncoming = fd 382 383 self.info('[fd %5d] Incoming client connection from %s' % ( 384 fd, request.getClientIP())) 385 self.debug('[fd %5d] _render(): request %s' % ( 386 fd, request)) 387 388 if not self.isReady(): 389 return self._handleNotReady(request) 390 elif self.reachedServerLimits(): 391 return self._handleServerFull(request) 392 393 self.debug('_render(): asked for (possible) authentication') 394 d = self.httpauth.startAuthentication(request) 395 d.addCallback(self.handleAuthenticatedRequest, request) 396 # Authentication has failed and we've written a response; nothing 397 # more to do 398 d.addErrback(lambda x: None) 399 400 # we MUST return this from our _render. 401 return server.NOT_DONE_YET
402
403 - def _handleNotReady(self, request):
404 self.debug('Not sending data, it\'s not ready') 405 return server.NOT_DONE_YET
406
407 - def _handleServerFull(self, request):
408 if self._redirectOnFull: 409 self.debug("Redirecting client, client limit %d reached", 410 self.maxclients) 411 error_code = http.FOUND 412 request.setHeader('location', self._redirectOnFull) 413 else: 414 self.debug('Refusing clients, client limit %d reached' % 415 self.maxclients) 416 error_code = http.SERVICE_UNAVAILABLE 417 418 request.setHeader('content-type', 'text/html') 419 420 request.setHeader('server', HTTP_VERSION) 421 request.setResponseCode(error_code) 422 423 return ERROR_TEMPLATE % {'code': error_code, 424 'error': http.RESPONSES[error_code]}
425
426 - def _handleNewClient(self, request):
427 # everything fulfilled, serve to client 428 fdi = request.fdIncoming 429 if not self._writeHeaders(request): 430 self.debug("[fd %5d] not adding as a client" % fdi) 431 return 432 self._addClient(request) 433 434 # take over the file descriptor from Twisted by removing them from 435 # the reactor 436 # spiv told us to remove* on request.transport, and that works 437 # then we figured out that a new request is only a Reader, so we 438 # remove the removedWriter - this is because we never write to the 439 # socket through twisted, only with direct os.write() calls from 440 # _writeHeaders. 441 fd = fdi 442 self.debug("taking away [fd %5d] from Twisted" % fd) 443 reactor.removeReader(request.transport) 444 #reactor.removeWriter(request.transport) 445 446 # check if it's really an open fd (i.e. that twisted didn't close it 447 # before the removeReader() call) 448 try: 449 fcntl.fcntl(fd, fcntl.F_GETFL) 450 except IOError, e: 451 if e.errno == errno.EBADF: 452 self.warning("[fd %5d] is not actually open, ignoring" % fd) 453 else: 454 self.warning("[fd %5d] error during check: %s (%d)" % ( 455 fd, e.strerror, e.errno)) 456 return 457 458 # hand it to multifdsink 459 self.streamer.add_client(fd) 460 ip = request.getClientIP() 461 462 self.info('[fd %5d] Started streaming to %s' % (fd, ip))
463 464 render_GET = _render 465 render_HEAD = _render 466
467 -class HTTPRoot(web_resource.Resource, log.Loggable):
468 logCategory = "httproot" 469
470 - def getChildWithDefault(self, path, request):
471 # we override this method so that we can look up tree resources 472 # directly without having their parents. 473 # There's probably a more Twisted way of doing this, but ... 474 fullPath = path 475 if request.postpath: 476 fullPath += '/' + string.join(request.postpath, '/') 477 self.debug("Incoming request %r for path %s" % (request, fullPath)) 478 r = web_resource.Resource.getChildWithDefault(self, fullPath, request) 479 self.debug("Returning resource %r" % r) 480 return r
481