Package flumotion :: Package component :: Package consumers :: Package httpstreamer :: Module resources
[hide private]

Source Code for Module flumotion.component.consumers.httpstreamer.resources

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import os 
 23  import socket 
 24  import time 
 25  import errno 
 26  import string 
 27  import resource 
 28   
 29  import gst 
 30   
 31  try: 
 32      from twisted.web import http 
 33  except ImportError: 
 34      from twisted.protocols import http 
 35   
 36  from twisted.web import server, resource as web_resource 
 37  from twisted.internet import reactor, defer 
 38  from twisted.python import reflect 
 39   
 40  from flumotion.configure import configure 
 41  from flumotion.common import errors 
 42   
 43  from flumotion.common import common, log, keycards 
 44   
 45  from flumotion.component.base import http as httpbase 
 46   
 47  __all__ = ['HTTPStreamingResource', 'MultifdSinkStreamer'] 
 48   
 49  HTTP_NAME = 'FlumotionHTTPServer' 
 50  HTTP_VERSION = configure.version 
 51   
 52  ERROR_TEMPLATE = """<!doctype html public "-//IETF//DTD HTML 2.0//EN"> 
 53  <html> 
 54  <head> 
 55    <title>%(code)d %(error)s</title> 
 56  </head> 
 57  <body> 
 58  <h2>%(code)d %(error)s</h2> 
 59  </body> 
 60  </html> 
 61  """ 
 62   
 63  HTTP_SERVER = '%s/%s' % (HTTP_NAME, HTTP_VERSION) 
 64   
 65  ### the Twisted resource that handles the base URL 
66 -class HTTPStreamingResource(web_resource.Resource, httpbase.HTTPAuthentication, 67 log.Loggable):
68 69 __reserve_fds__ = 50 # number of fd's to reserve for non-streaming 70 71 logCategory = 'httpstreamer' 72 73 # IResource interface variable; True means it will not chain requests 74 # further down the path to other resource providers through 75 # getChildWithDefault 76 isLeaf = True 77
78 - def __init__(self, streamer):
79 """ 80 @param streamer: L{MultifdSinkStreamer} 81 """ 82 streamer.connect('client-removed', self._streamer_client_removed_cb) 83 self.streamer = streamer 84 85 self._requests = {} # request fd -> Request 86 87 self.maxclients = self.getMaxAllowedClients(-1) 88 89 self.loggers = \ 90 streamer.plugs['flumotion.component.plugs.loggers.Logger'] 91 92 self.logfilter = None 93 94 web_resource.Resource.__init__(self) 95 httpbase.HTTPAuthentication.__init__(self, streamer)
96
97 - def _streamer_client_removed_cb(self, streamer, sink, fd, reason, stats):
98 # this is the callback attached to our flumotion component, 99 # not the GStreamer element 100 if fd in self._requests: 101 request = self._requests[fd] 102 self._removeClient(request, fd, stats) 103 else: 104 self.warning('[fd %5d] not found in _requests' % fd)
105
106 - def setRoot(self, path):
107 self.putChild(path, self)
108
109 - def setLogFilter(self, logfilter):
110 self.logfilter = logfilter
111
112 - def rotateLogs(self):
113 """ 114 Close the logfile, then reopen using the previous logfilename 115 """ 116 for logger in self.loggers: 117 self.debug('rotating logger %r' % logger) 118 logger.rotate()
119
120 - def logWrite(self, fd, ip, request, stats):
121 122 headers = request.getAllHeaders() 123 124 if stats: 125 bytes_sent = stats[0] 126 time_connected = int(stats[3] / gst.SECOND) 127 else: 128 bytes_sent = -1 129 time_connected = -1 130 131 args = {'ip': ip, 132 'time': time.gmtime(), 133 'method': request.method, 134 'uri': request.uri, 135 'username': '-', # FIXME: put the httpauth name 136 'get-parameters': request.args, 137 'clientproto': request.clientproto, 138 'response': request.code, 139 'bytes-sent': bytes_sent, 140 'referer': headers.get('referer', None), 141 'user-agent': headers.get('user-agent', None), 142 'time-connected': time_connected} 143 144 for logger in self.loggers: 145 logger.event('http_session_completed', args)
146
147 - def setUserLimit(self, limit):
148 self.info('setting maxclients to %d' % limit) 149 self.maxclients = self.getMaxAllowedClients(limit) 150 # Log what we actually managed to set it to. 151 self.info('set maxclients to %d' % self.maxclients)
152 153 # FIXME: rename to writeHeaders 154 """ 155 Write out the HTTP headers for the incoming HTTP request. 156 157 @rtype: boolean 158 @returns: whether or not the file descriptor can be used further. 159 """
160 - def _writeHeaders(self, request):
161 fd = request.transport.fileno() 162 fdi = request.fdIncoming 163 164 # the fd could have been closed, in which case it will be -1 165 if fd == -1: 166 self.info('[fd %5d] Client gone before writing header' % fdi) 167 # FIXME: do this ? del request 168 return False 169 if fd != request.fdIncoming: 170 self.warning('[fd %5d] does not match current fd %d' % (fdi, fd)) 171 # FIXME: do this ? del request 172 return False 173 174 headers = [] 175 176 def setHeader(field, name): 177 headers.append('%s: %s\r\n' % (field, name))
178 179 # Mimic Twisted as close as possible 180 content = self.streamer.get_content_type() 181 setHeader('Server', HTTP_SERVER) 182 setHeader('Date', http.datetimeToString()) 183 setHeader('Cache-Control', 'no-cache') 184 setHeader('Cache-Control', 'private') 185 setHeader('Content-type', content) 186 187 # ASF needs a Pragma header for live broadcasts 188 # Apparently ASF breaks on WMP port 80 if you use the pragma header 189 # - Sep 5 2006 190 #if content in [ 191 # "video/x-ms-asf", 192 # "audio/x-ms-asf", 193 #]: 194 #setHeader('Pragma', 'features=broadcast') 195 196 #self.debug('setting Content-type to %s' % mime) 197 ### FIXME: there's a window where Twisted could have removed the 198 # fd because the client disconnected. Catch EBADF correctly here. 199 try: 200 # TODO: This is a non-blocking socket, we really should check 201 # return values here, or just let twisted handle all of this 202 # normally, and not hand off the fd until after twisted has 203 # finished writing the headers. 204 os.write(fd, 'HTTP/1.0 200 OK\r\n%s\r\n' % ''.join(headers)) 205 # tell TwistedWeb we already wrote headers ourselves 206 request.startedWriting = True 207 return True 208 except OSError, (no, s): 209 if no == errno.EBADF: 210 self.info('[fd %5d] client gone before writing header' % fd) 211 elif no == errno.ECONNRESET: 212 self.info('[fd %5d] client reset connection writing header' % fd) 213 else: 214 self.info('[fd %5d] unhandled write error when writing header: %s' % (fd, s)) 215 # trigger cleanup of request 216 del request 217 return False
218
219 - def isReady(self):
220 if self.streamer.caps == None: 221 self.debug('We have no caps yet') 222 return False 223 224 return True
225
226 - def getMaxAllowedClients(self, maxclients):
227 """ 228 maximum number of allowed clients based on soft limit for number of 229 open file descriptors and fd reservation. Increases soft limit to 230 hard limit if possible. 231 """ 232 (softmax, hardmax) = resource.getrlimit(resource.RLIMIT_NOFILE) 233 import sys 234 version = sys.version_info 235 236 if maxclients != -1: 237 neededfds = maxclients + self.__reserve_fds__ 238 239 # Bug in python 2.4.3, see http://sourceforge.net/tracker/index.php?func=detail&aid=1494314&group_id=5470&atid=105470 240 if version[:3] == (2,4,3) and not hasattr(socket,"has_2_4_3_patch"): 241 hardmax = 1024 242 243 if neededfds > softmax: 244 lim = min(neededfds, hardmax) 245 resource.setrlimit(resource.RLIMIT_NOFILE, (lim, hardmax)) 246 return lim - self.__reserve_fds__ 247 else: 248 return maxclients 249 else: 250 return softmax - self.__reserve_fds__
251
252 - def reachedMaxClients(self):
253 return len(self._requests) >= self.maxclients and self.maxclients >= 0
254
255 - def _addClient(self, request):
256 """ 257 Add a request, so it can be used for statistics. 258 259 @param request: the request 260 @type request: twisted.protocol.http.Request 261 """ 262 263 fd = request.transport.fileno() 264 self._requests[fd] = request
265
266 - def _logRequestFromIP(self, ip):
267 """ 268 Returns whether we want to log a request from this IP; allows us to 269 filter requests from automated monitoring systems. 270 """ 271 if self.logfilter: 272 return not self.logfilter.isInRange(ip) 273 else: 274 return True
275
276 - def _removeClient(self, request, fd, stats):
277 """ 278 Removes a request and add logging. 279 Note that it does not disconnect the client; it is called in reaction 280 to a client disconnecting. 281 It also removes the keycard if one was created. 282 283 @param request: the request 284 @type request: L{twisted.protocols.http.Request} 285 @param fd: the file descriptor for the client being removed 286 @type fd: L{int} 287 @param stats: the statistics for the removed client 288 @type stats: GValueArray 289 """ 290 291 ip = request.getClientIP() 292 if self._logRequestFromIP(ip): 293 self.logWrite(fd, ip, request, stats) 294 self.info('[fd %5d] Client from %s disconnected' % (fd, ip)) 295 296 # We can't call request.finish(), since we already "stole" the fd, we 297 # just loseConnection on the transport directly, and delete the 298 # Request object, after cleaning up the bouncer bits. 299 self.cleanupAuth(fd) 300 301 self.debug('[fd %5d] closing transport %r' % (fd, request.transport)) 302 # This will close the underlying socket. We first remove the request 303 # from our fd->request map, since the moment we call this the fd might 304 # get re-added. 305 del self._requests[fd] 306 request.transport.loseConnection() 307 308 self.debug('[fd %5d] closed transport %r' % (fd, request.transport))
309
310 - def clientDone(self, fd):
311 self.streamer.remove_client(fd)
312
313 - def handleAuthenticatedRequest(self, res, request):
314 if request.method == 'GET': 315 self._handleNewClient(request) 316 elif request.method == 'HEAD': 317 self.debug('handling HEAD request') 318 self._writeHeaders(request) 319 request.finish() 320 else: 321 raise AssertionError 322 323 return res
324 325 ### resource.Resource methods 326 327 # this is the callback receiving the request initially
328 - def _render(self, request):
329 fd = request.transport.fileno() 330 # we store the fd again in the request using it as an id for later 331 # on, so we can check when an fd went away (being -1) inbetween 332 request.fdIncoming = fd 333 334 self.info('[fd %5d] Incoming client connection from %s' % ( 335 fd, request.getClientIP())) 336 self.debug('[fd %5d] _render(): request %s' % ( 337 fd, request)) 338 339 if not self.isReady(): 340 return self._handleNotReady(request) 341 elif self.reachedMaxClients(): 342 return self._handleMaxClients(request) 343 344 self.debug('_render(): asked for (possible) authentication') 345 d = self.startAuthentication(request) 346 d.addCallback(self.handleAuthenticatedRequest, request) 347 348 # we MUST return this from our _render. 349 return server.NOT_DONE_YET
350
351 - def authenticateKeycard(self, bouncerName, keycard):
352 return self.streamer.medium.authenticate(bouncerName, keycard)
353
354 - def cleanupKeycard(self, bouncerName, keycard):
355 return self.streamer.medium.removeKeycardId(bouncerName, keycard.id)
356
357 - def _handleNotReady(self, request):
358 self.debug('Not sending data, it\'s not ready') 359 return server.NOT_DONE_YET
360
361 - def _handleMaxClients(self, request):
362 self.debug('Refusing clients, client limit %d reached' % 363 self.maxclients) 364 365 request.setHeader('content-type', 'text/html') 366 request.setHeader('server', HTTP_VERSION) 367 368 error_code = http.SERVICE_UNAVAILABLE 369 request.setResponseCode(error_code) 370 371 return ERROR_TEMPLATE % {'code': error_code, 372 'error': http.RESPONSES[error_code]}
373
374 - def _handleNewClient(self, request):
375 # everything fulfilled, serve to client 376 fdi = request.fdIncoming 377 if not self._writeHeaders(request): 378 self.debug("[fd %5d] not adding as a client" % fdi) 379 return 380 self._addClient(request) 381 382 # take over the file descriptor from Twisted by removing them from 383 # the reactor 384 # spiv told us to remove* on request.transport, and that works 385 # then we figured out that a new request is only a Reader, so we 386 # remove the removedWriter - this is because we never write to the 387 # socket through twisted, only with direct os.write() calls from 388 # _writeHeaders. 389 fd = fdi 390 self.debug("taking away [fd %5d] from Twisted" % fd) 391 reactor.removeReader(request.transport) 392 #reactor.removeWriter(request.transport) 393 394 # check if it's really an open fd (i.e. that twisted didn't close it 395 # before the removeReader() call) 396 import fcntl 397 try: 398 fcntl.fcntl(fd, fcntl.F_GETFL) 399 except IOError, e: 400 if e.errno == errno.EBADF: 401 self.warning("[fd %5d] is not actually open, ignoring" % fd) 402 else: 403 self.warning("[fd %5d] error during check: %s (%d)" % ( 404 fd, e.strerror, e.errno)) 405 return 406 407 # hand it to multifdsink 408 self.streamer.add_client(fd) 409 ip = request.getClientIP() 410 411 self.info('[fd %5d] Started streaming to %s' % (fd, ip))
412 413 render_GET = _render 414 render_HEAD = _render 415
416 -class HTTPRoot(web_resource.Resource, log.Loggable):
417 logCategory = "httproot" 418
419 - def getChildWithDefault(self, path, request):
420 # we override this method so that we can look up tree resources 421 # directly without having their parents. 422 # There's probably a more Twisted way of doing this, but ... 423 fullPath = path 424 if request.postpath: 425 fullPath += '/' + string.join(request.postpath, '/') 426 self.debug("Incoming request %r for path %s" % (request, fullPath)) 427 r = web_resource.Resource.getChildWithDefault(self, fullPath, request) 428 self.debug("Returning resource %r" % r) 429 return r
430