Package flumotion :: Package component :: Package base :: Module http
[hide private]

Source Code for Module flumotion.component.base.http

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import struct 
 23  import socket 
 24   
 25  from twisted.web import http, server 
 26  from twisted.web import resource as web_resource 
 27  from twisted.internet import reactor, defer 
 28  from twisted.python import reflect, failure 
 29   
 30  from flumotion.configure import configure 
 31  from flumotion.common import errors 
 32  from flumotion.twisted.credentials import cryptChallenge 
 33   
 34  from flumotion.common import common, log, keycards 
 35   
 36  #__all__ = ['HTTPStreamingResource', 'MultifdSinkStreamer'] 
 37  __version__ = "$Rev: 6628 $" 
 38   
 39   
 40  HTTP_SERVER_NAME = 'FlumotionHTTPServer' 
 41  HTTP_SERVER_VERSION = configure.version 
 42   
 43  ERROR_TEMPLATE = """<!doctype html public "-//IETF//DTD HTML 2.0//EN"> 
 44  <html> 
 45  <head> 
 46    <title>%(code)d %(error)s</title> 
 47  </head> 
 48  <body> 
 49  <h2>%(code)d %(error)s</h2> 
 50  </body> 
 51  </html> 
 52  """ 
 53   
 54  HTTP_SERVER = '%s/%s' % (HTTP_SERVER_NAME, HTTP_SERVER_VERSION) 
 55   
 56  ### This is new Issuer code that eventually should move to e.g. 
 57  ### flumotion.common.keycards or related 
 58   
59 -class Issuer(log.Loggable):
60 """ 61 I am a base class for all Issuers. 62 An issuer issues keycards of a given class based on an object 63 (incoming HTTP request, ...) 64 """
65 - def issue(self, *args, **kwargs):
66 """ 67 Return a keycard, or None, based on the given arguments. 68 """ 69 raise NotImplementedError
70
71 -class HTTPGenericIssuer(Issuer):
72 """ 73 I create L{flumotion.common.keycards.Keycard} based on just a 74 standard HTTP request. Useful for authenticating based on 75 server-side checks such as time, rather than client credentials. 76 """
77 - def issue(self, request):
78 keycard = keycards.KeycardGeneric() 79 self.debug("Asking for authentication, generic HTTP") 80 return keycard
81
82 -class HTTPAuthIssuer(Issuer):
83 """ 84 I create L{flumotion.common.keycards.KeycardUACPP} keycards based on 85 an incoming L{twisted.protocols.http.Request} request's standard 86 HTTP authentication information. 87 """
88 - def issue(self, request):
89 # for now, we're happy with a UACPP keycard; the password arrives 90 # plaintext anyway 91 keycard = keycards.KeycardUACPP( 92 request.getUser(), 93 request.getPassword(), request.getClientIP()) 94 self.debug('Asking for authentication, user %s, password %s, ip %s' % ( 95 keycard.username, keycard.password, keycard.address)) 96 return keycard
97
98 -class HTTPTokenIssuer(Issuer):
99 """ 100 I create L{flumotion.common.keycards.KeycardToken} keycards based on 101 an incoming L{twisted.protocols.http.Request} request's GET "token" 102 parameter. 103 """
104 - def issue(self, request):
105 if not 'token' in request.args.keys(): 106 return None 107 108 # args can have lists as values, if more than one specified 109 token = request.args['token'] 110 if not isinstance(token, str): 111 token = token[0] 112 113 keycard = keycards.KeycardToken(token, 114 request.getClientIP(), request.path) 115 return keycard
116 117 BOUNCER_SOCKET = 'flumotion.component.bouncers.plug.BouncerPlug' 118
119 -class HTTPAuthentication(log.Loggable):
120 """ 121 Helper object for handling HTTP authentication for twisted.web 122 Resources, using issuers and bouncers. 123 """ 124 125 logCategory = 'httpauth' 126 127 KEYCARD_TTL = 60 * 60 128 KEYCARD_KEEPALIVE_INTERVAL = 20 * 60 129 KEYCARD_TRYAGAIN_INTERVAL = 1 * 60 130
131 - def __init__(self, component):
132 self.component = component 133 self._fdToKeycard = {} # request fd -> Keycard 134 self._idToKeycard = {} # keycard id -> Keycard 135 self._fdToDurationCall = {} # request fd -> IDelayedCall for duration 136 self._domain = None # used for auth challenge and on keycard 137 self._issuer = HTTPAuthIssuer() # issues keycards; default for compat 138 self.bouncerName = None 139 self.setRequesterId(component.getName()) 140 self._defaultDuration = None # default duration to use if the keycard 141 # doesn't specify one. 142 self._pendingCleanups = [] 143 self._keepAlive = None 144 145 if (BOUNCER_SOCKET in self.component.plugs 146 and self.component.plugs[BOUNCER_SOCKET]): 147 assert len(self.component.plugs[BOUNCER_SOCKET]) == 1 148 self.plug = self.component.plugs[BOUNCER_SOCKET][0] 149 else: 150 self.plug = None
151
152 - def scheduleKeepAlive(self, tryingAgain=False):
153 def timeout(): 154 def reschedule(res): 155 if isinstance(res, failure.Failure): 156 self.info('keepAlive failed, rescheduling in %d ' 157 'seconds', self.KEYCARD_TRYAGAIN_INTERVAL) 158 self._keepAlive = None 159 self.scheduleKeepAlive(tryingAgain=True) 160 else: 161 self.info('keepAlive successful') 162 self._keepAlive = None 163 self.scheduleKeepAlive(tryingAgain=False)
164 165 if self.bouncerName is not None: 166 self.debug('calling keepAlive on bouncer %s', 167 self.bouncerName) 168 d = self.keepAlive(self.bouncerName, self.issuerName, 169 self.KEYCARD_TTL) 170 d.addCallbacks(reschedule, reschedule) 171 else: 172 self.scheduleKeepAlive()
173 174 if tryingAgain: 175 self._keepAlive = reactor.callLater(self.KEYCARD_TRYAGAIN_INTERVAL, 176 timeout) 177 else: 178 self._keepAlive = reactor.callLater(self.KEYCARD_KEEPALIVE_INTERVAL, 179 timeout) 180
181 - def stopKeepAlive(self):
182 if self._keepAlive is not None: 183 self._keepAlive.cancel() 184 self._keepAlive = None
185
186 - def setDomain(self, domain):
187 """ 188 Set a domain name on the resource, used in HTTP auth challenges and 189 on the keycard. 190 191 @type domain: string 192 """ 193 self._domain = domain
194
195 - def setBouncerName(self, bouncerName):
196 self.bouncerName = bouncerName
197
198 - def setRequesterId(self, requesterId):
199 self.requesterId = requesterId 200 # make something uniquey 201 self.issuerName = str(self.requesterId) + '-' + cryptChallenge()
202
203 - def setDefaultDuration(self, defaultDuration):
204 self._defaultDuration = defaultDuration
205
206 - def setIssuerClass(self, issuerClass):
207 # FIXME: in the future, we want to make this pluggable and have it 208 # look up somewhere ? 209 if issuerClass == 'HTTPTokenIssuer': 210 self._issuer = HTTPTokenIssuer() 211 elif issuerClass == 'HTTPAuthIssuer': 212 self._issuer = HTTPAuthIssuer() 213 elif issuerClass == 'HTTPGenericIssuer': 214 self._issuer = HTTPGenericIssuer() 215 else: 216 raise ValueError, "issuerClass %s not accepted" % issuerClass
217
218 - def authenticate(self, request):
219 """ 220 Returns: a deferred returning a keycard or None 221 """ 222 keycard = self._issuer.issue(request) 223 if not keycard: 224 self.debug('no keycard from issuer, firing None') 225 return defer.succeed(None) 226 227 keycard.requesterId = self.requesterId 228 keycard.issuerName = self.issuerName 229 keycard._fd = request.transport.fileno() 230 keycard.setDomain(self._domain) 231 232 if self.plug: 233 self.debug('authenticating against plug') 234 return self.plug.authenticate(keycard) 235 elif self.bouncerName == None: 236 self.debug('no bouncer, accepting') 237 return defer.succeed(keycard) 238 else: 239 keycard.ttl = self.KEYCARD_TTL 240 self.debug('sending keycard to remote bouncer %r', 241 self.bouncerName) 242 return self.authenticateKeycard(self.bouncerName, keycard)
243
244 - def authenticateKeycard(self, bouncerName, keycard):
245 return self.component.medium.authenticate(bouncerName, keycard)
246
247 - def keepAlive(self, bouncerName, issuerName, ttl):
248 return self.component.medium.keepAlive(bouncerName, issuerName, ttl)
249
250 - def cleanupKeycard(self, bouncerName, keycard):
251 return self.component.medium.removeKeycardId(bouncerName, keycard.id)
252 253 # FIXME: check this
254 - def clientDone(self, fd):
255 return self.component.remove_client(fd)
256
257 - def doCleanupKeycard(self, bouncerName, keycard):
258 # cleanup this one keycard, and take the opportunity to retry 259 # previous failed cleanups 260 def cleanup(bouncerName, keycard): 261 def cleanupLater(res, pair): 262 self.log('failed to clean up keycard %r, will do ' 263 'so later', keycard) 264 self._pendingCleanups.append(pair)
265 d = self.cleanupKeycard(bouncerName, keycard) 266 d.addErrback(cleanupLater, (bouncerName, keycard)) 267 pending = self._pendingCleanups 268 self._pendingCleanups = [] 269 cleanup(bouncerName, keycard) 270 for bouncerName, keycard in pending: 271 cleanup(bouncerName, keycard) 272 273 # public
274 - def cleanupAuth(self, fd):
275 if self.bouncerName and self._fdToKeycard.has_key(fd): 276 keycard = self._fdToKeycard[fd] 277 del self._fdToKeycard[fd] 278 del self._idToKeycard[keycard.id] 279 self.debug('[fd %5d] asking bouncer %s to remove keycard id %s', 280 fd, self.bouncerName, keycard.id) 281 self.doCleanupKeycard(self.bouncerName, keycard) 282 if self._fdToDurationCall.has_key(fd): 283 self.debug('[fd %5d] canceling later expiration call' % fd) 284 self._fdToDurationCall[fd].cancel() 285 del self._fdToDurationCall[fd]
286
287 - def _durationCallLater(self, fd):
288 """ 289 Expire a client due to a duration expiration. 290 """ 291 self.debug('[fd %5d] duration exceeded, expiring client' % fd) 292 293 # we're called from a callLater, so we've already run; just delete 294 if self._fdToDurationCall.has_key(fd): 295 del self._fdToDurationCall[fd] 296 297 self.debug('[fd %5d] asking streamer to remove client' % fd) 298 self.clientDone(fd)
299
300 - def expireKeycard(self, keycardId):
301 """ 302 Expire a client's connection associated with the keycard Id. 303 """ 304 keycard = self._idToKeycard[keycardId] 305 fd = keycard._fd 306 307 self.debug('[fd %5d] expiring client' % fd) 308 309 if self._fdToDurationCall.has_key(fd): 310 self.debug('[fd %5d] canceling later expiration call' % fd) 311 self._fdToDurationCall[fd].cancel() 312 del self._fdToDurationCall[fd] 313 314 self.debug('[fd %5d] asking streamer to remove client' % fd) 315 self.clientDone(fd)
316 317 ### resource.Resource methods 318
319 - def startAuthentication(self, request):
320 d = self.authenticate(request) 321 d.addCallback(self._authenticatedCallback, request) 322 d.addErrback(self._authenticatedErrback, request) 323 d.addErrback(self._defaultErrback, request) 324 325 return d
326
327 - def _authenticatedCallback(self, keycard, request):
328 # !: since we are a callback, the incoming fd might have gone away 329 # and closed 330 self.debug('_authenticatedCallback: keycard %r' % keycard) 331 if not keycard: 332 raise errors.NotAuthenticatedError() 333 334 # properly authenticated 335 if request.method == 'GET': 336 fd = request.transport.fileno() 337 338 if self.bouncerName: 339 if keycard.id in self._idToKeycard: 340 self.warning("Duplicate keycard id: refusing") 341 raise errors.NotAuthenticatedError() 342 343 self._fdToKeycard[fd] = keycard 344 self._idToKeycard[keycard.id] = keycard 345 346 duration = keycard.duration or self._defaultDuration 347 348 if duration: 349 self.debug('new connection on %d will expire in %f seconds' % ( 350 fd, duration)) 351 self._fdToDurationCall[fd] = reactor.callLater( 352 duration, self._durationCallLater, fd) 353 354 return None
355
356 - def _authenticatedErrback(self, failure, request):
357 failure.trap(errors.UnknownComponentError, errors.NotAuthenticatedError) 358 self._handleUnauthorized(request, http.UNAUTHORIZED) 359 return failure
360
361 - def _defaultErrback(self, failure, request):
362 if failure.check(errors.UnknownComponentError, 363 errors.NotAuthenticatedError) is None: 364 # If something else went wrong, we want to disconnect the client and 365 # give them a 500 Internal Server Error. 366 self._handleUnauthorized(request, http.INTERNAL_SERVER_ERROR) 367 return failure
368
369 - def _handleUnauthorized(self, request, code):
370 self.debug('client from %s is unauthorized' % (request.getClientIP())) 371 request.setHeader('content-type', 'text/html') 372 request.setHeader('server', HTTP_SERVER_VERSION) 373 if self._domain and code == http.UNAUTHORIZED: 374 request.setHeader('WWW-Authenticate', 375 'Basic realm="%s"' % self._domain) 376 377 request.setResponseCode(code) 378 379 # we have to write data ourselves, 380 # since we already returned NOT_DONE_YET 381 html = ERROR_TEMPLATE % {'code': code, 382 'error': http.RESPONSES[code]} 383 request.write(html) 384 request.finish()
385
386 -class LogFilter:
387 - def __init__(self):
388 self.filters = [] # list of (network, mask)
389
390 - def addIPFilter(self, filter):
391 """ 392 Add an IP filter of the form IP/prefix-length (CIDR syntax), or just 393 a single IP address 394 """ 395 definition = filter.split('/') 396 if len(definition) == 2: 397 (net, prefixlen) = definition 398 prefixlen = int(prefixlen) 399 elif len(definition) == 1: 400 net = definition[0] 401 prefixlen = 32 402 else: 403 raise errors.ConfigError( 404 "Cannot parse filter definition %s" % filter) 405 406 if prefixlen < 0 or prefixlen > 32: 407 raise errors.ConfigError("Invalid prefix length") 408 409 mask = ~((1 << (32 - prefixlen)) - 1) 410 try: 411 net = struct.unpack(">I", socket.inet_pton(socket.AF_INET, net))[0] 412 except socket.error: 413 raise errors.ConfigError("Failed to parse network address %s" % net) 414 net = net & mask # just in case 415 416 self.filters.append((net, mask))
417
418 - def isInRange(self, ip):
419 """ 420 Return true if ip is in any of the defined network(s) for this filter 421 """ 422 # Handles IPv4 only. 423 realip = struct.unpack(">I", socket.inet_pton(socket.AF_INET, ip))[0] 424 for f in self.filters: 425 if (realip & f[1]) == f[0]: 426 return True 427 return False
428