Package flumotion :: Package component :: Package misc :: Package porter :: Module porter
[hide private]

Source Code for Module flumotion.component.misc.porter.porter

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_porter -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import os 
 23  import random 
 24  import socket 
 25  import string 
 26  from urllib2 import urlparse 
 27   
 28  from twisted.cred import portal 
 29  from twisted.internet import protocol, reactor, address, error, defer 
 30  from twisted.spread import pb 
 31  from zope.interface import implements 
 32   
 33  from flumotion.common import medium, log, messages 
 34  from flumotion.common.i18n import N_, gettexter 
 35  from flumotion.component import component 
 36  from flumotion.component.component import moods 
 37  from flumotion.twisted import credentials, fdserver, checkers 
 38  from flumotion.twisted import reflect 
 39   
 40  __version__ = "$Rev: 6981 $" 
 41  T_ = gettexter() 
 42   
 43   
44 -class PorterAvatar(pb.Avatar, log.Loggable):
45 """ 46 An Avatar in the porter representing a streamer 47 """
48 - def __init__(self, avatarId, porter, mind):
49 self.avatarId = avatarId 50 self.porter = porter 51 52 # The underlying transport is now accessible as 53 # self.mind.broker.transport, on which we can call sendFileDescriptor 54 self.mind = mind
55
56 - def isAttached(self):
57 return self.mind != None
58
59 - def logout(self):
60 self.debug("porter client %s logging out", self.avatarId) 61 self.mind = None
62
63 - def perspective_registerPath(self, path):
64 self.log("Perspective called: registering path \"%s\"" % path) 65 self.porter.registerPath(path, self)
66
67 - def perspective_deregisterPath(self, path):
68 self.log("Perspective called: deregistering path \"%s\"" % path) 69 self.porter.deregisterPath(path, self)
70
71 - def perspective_registerPrefix(self, prefix):
72 self.log("Perspective called: registering default") 73 self.porter.registerPrefix(prefix, self)
74
75 - def perspective_deregisterPrefix(self, prefix):
76 self.log("Perspective called: deregistering default") 77 self.porter.deregisterPrefix(prefix, self)
78
79 -class PorterRealm(log.Loggable):
80 """ 81 A Realm within the Porter that creates Avatars for streamers logging into 82 the porter. 83 """ 84 implements(portal.IRealm) 85
86 - def __init__(self, porter):
87 """ 88 @param porter: The porter that avatars created from here should use. 89 @type porter: L{Porter} 90 """ 91 self.porter = porter
92
93 - def requestAvatar(self, avatarId, mind, *interfaces):
94 self.log("Avatar requested for avatarId %s, mind %r, interfaces %r", 95 avatarId, mind, interfaces) 96 if pb.IPerspective in interfaces: 97 avatar = PorterAvatar(avatarId, self.porter, mind) 98 return pb.IPerspective, avatar, avatar.logout 99 else: 100 raise NotImplementedError("no interface")
101
102 -class PorterMedium(component.BaseComponentMedium):
103
104 - def remote_getPorterDetails(self):
105 """ 106 Return the location, login username/password, and listening port 107 and interface for the porter as a tuple (path, username, 108 password, port, interface). 109 """ 110 return (self.comp._socketPath, self.comp._username, 111 self.comp._password, self.comp._iptablesPort, 112 self.comp._interface)
113
114 -class Porter(component.BaseComponent, log.Loggable):
115 """ 116 The porter optionally sits in front of a set of streamer components. 117 The porter is what actually deals with incoming connections on a TCP socket. 118 It decides which streamer to direct the connection to, then passes the FD 119 (along with some amount of already-read data) to the appropriate streamer. 120 """ 121 122 componentMediumClass = PorterMedium 123
124 - def init(self):
125 # We maintain a map of path -> avatar (the underlying transport is 126 # accessible from the avatar, we need this for FD-passing) 127 self._mappings = {} 128 self._prefixes = {} 129 130 self._socketlistener = None 131 132 self._socketPath = None 133 self._username = None 134 self._password = None 135 self._port = None 136 self._iptablesPort = None 137 self._porterProtocol = None 138 139 self._interface = ''
140
141 - def registerPath(self, path, avatar):
142 """ 143 Register a path as being served by a streamer represented by this 144 avatar. Will remove any previous registration at this path. 145 146 @param path: The path to register 147 @type path: str 148 @param avatar: The avatar representing the streamer to direct this path 149 to 150 @type avatar: L{PorterAvatar} 151 """ 152 self.debug("Registering porter path \"%s\" to %r" % (path, avatar)) 153 if self._mappings.has_key(path): 154 self.warning("Replacing existing mapping for path \"%s\"" % path) 155 156 self._mappings[path] = avatar
157
158 - def deregisterPath(self, path, avatar):
159 """ 160 Attempt to deregister the given path. A deregistration will only be 161 accepted if the mapping is to the avatar passed. 162 163 @param path: The path to deregister 164 @type path: str 165 @param avatar: The avatar representing the streamer being deregistered 166 @type avatar: L{PorterAvatar} 167 """ 168 if self._mappings.has_key(path): 169 if self._mappings[path] == avatar: 170 self.debug("Removing porter mapping for \"%s\"" % path) 171 del self._mappings[path] 172 else: 173 self.warning("Mapping not removed: refers to a different avatar") 174 else: 175 self.warning("Mapping not removed: no mapping found")
176
177 - def registerPrefix(self, prefix, avatar):
178 """ 179 Register a destination for all requests directed to anything beginning 180 with a specified prefix. Where there are multiple matching prefixes, the 181 longest is selected. 182 183 @param avatar: The avatar being registered 184 @type avatar: L{PorterAvatar} 185 """ 186 187 self.debug("Setting prefix \"%s\" for porter", prefix) 188 if prefix in self._prefixes: 189 self.warning("Overwriting prefix") 190 191 self._prefixes[prefix] = avatar
192
193 - def deregisterPrefix(self, prefix, avatar):
194 """ 195 Attempt to deregister a default destination for all requests not 196 directed to a specifically-mapped path. This will only succeed if the 197 default is currently equal to this avatar. 198 199 @param avatar: The avatar being deregistered 200 @type avatar: L{PorterAvatar} 201 """ 202 if prefix not in self._prefixes: 203 self.warning("Mapping not removed: no mapping found") 204 return 205 206 if self._prefixes[prefix] == avatar: 207 self.debug("Removing prefix destination from porter") 208 del self._prefixes[prefix] 209 else: 210 self.warning("Not removing prefix destination: expected avatar not found")
211
212 - def findPrefixMatch(self, path):
213 found = None 214 # TODO: Horribly inefficient. Replace with pathtree code. 215 for prefix in self._prefixes.keys(): 216 self.log("Checking: %r, %r" % (prefix, path)) 217 if (path.startswith(prefix) and (not found or len(found) < len(prefix))): 218 found = prefix 219 if found: 220 return self._prefixes[found] 221 else: 222 return None
223
224 - def findDestination(self, path):
225 """ 226 Find a destination Avatar for this path. 227 @returns: The Avatar for this mapping, or None. 228 """ 229 230 if self._mappings.has_key(path): 231 return self._mappings[path] 232 else: 233 return self.findPrefixMatch(path)
234 235
236 - def generateSocketPath(self):
237 """ 238 Generate a socket pathname in an appropriate location 239 """ 240 # Also see worker/worker.py:_getSocketPath(), and note that this suffers 241 # from the same potential race. 242 import tempfile 243 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.porter.') 244 os.close(fd) 245 246 return name
247
248 - def generateRandomString(self, numchars):
249 """ 250 Generate a random US-ASCII string of length numchars 251 """ 252 string = "" 253 chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" 254 for _ in range(numchars): 255 string += chars[random.randint(0, len(chars) - 1)] 256 257 return string
258
259 - def have_properties(self):
260 props = self.config['properties'] 261 262 self.fixRenamedProperties(props, 263 [('socket_path', 'socket-path')]) 264 265 # We can operate in two modes: explicitly configured (neccesary if you 266 # want to handle connections from components in other managers), and 267 # self-configured (which is sufficient for slaving only streamers 268 # within this manager 269 if props.has_key('socket-path'): 270 # Explicitly configured 271 self._socketPath = props['socket-path'] 272 self._username = props['username'] 273 self._password = props['password'] 274 else: 275 # Self-configuring. Use a randomly create username/password, and 276 # a socket with a random name. 277 self._username = self.generateRandomString(12) 278 self._password = self.generateRandomString(12) 279 self._socketPath = self.generateSocketPath() 280 281 self._port = int(props['port']) 282 self._iptablesPort = int(props.get('iptables-port', self._port)) 283 self._porterProtocol = props.get('protocol', 284 'flumotion.component.misc.porter.porter.HTTPPorterProtocol') 285 self._interface = props.get('interface', '')
286
287 - def do_stop(self):
288 d = None 289 if self._socketlistener: 290 # stopListening() calls (via a callLater) connectionLost(), which 291 # will unlink our socket, so we don't need to explicitly delete it. 292 d = self._socketlistener.stopListening() 293 self._socketlistener = None 294 return d
295
296 - def do_setup(self):
297 # Create our combined PB-server/fd-passing channel 298 self.have_properties() 299 realm = PorterRealm(self) 300 checker = checkers.FlexibleCredentialsChecker() 301 checker.addUser(self._username, self._password) 302 303 p = portal.Portal(realm, [checker]) 304 serverfactory = pb.PBServerFactory(p) 305 306 # FIXME: shouldn't we be raising handled errors here? 307 308 try: 309 # Rather than a normal listenTCP() or listenUNIX(), we use 310 # listenWith so that we can specify our particular Port, which 311 # creates Transports that we know how to pass FDs over. 312 try: 313 os.unlink(self._socketPath) 314 except OSError: 315 pass 316 317 self._socketlistener = reactor.listenWith( 318 fdserver.FDPort, self._socketPath, serverfactory) 319 self.debug("Now listening on socketPath %s" % self._socketPath) 320 except error.CannotListenError, e: 321 self.warning("Failed to create socket %s" % self._socketPath) 322 m = messages.Error(T_(N_( 323 "Network error: socket path %s is not available."), 324 self._socketPath)) 325 self.addMessage(m) 326 self.setMood(moods.sad) 327 return defer.fail(e) 328 329 # Create the class that deals with the specific protocol we're proxying 330 # in this porter. 331 try: 332 proto = reflect.namedAny(self._porterProtocol) 333 self.debug("Created proto %r" % proto) 334 except (ImportError, AttributeError): 335 self.warning("Failed to import protocol '%s', defaulting to HTTP" % 336 self._porterProtocol) 337 proto = HTTPPorterProtocol 338 339 # And of course we also want to listen for incoming requests in the 340 # appropriate protocol (HTTP, RTSP, etc.) 341 factory = PorterProtocolFactory(self, proto) 342 try: 343 reactor.listenWith( 344 fdserver.PassableServerPort, self._port, factory, 345 interface=self._interface) 346 self.debug("Now listening on port %d" % self._port) 347 except error.CannotListenError, e: 348 self.warning("Failed to listen on port %d" % self._port) 349 m = messages.Error(T_(N_( 350 "Network error: TCP port %d is not available."), self._port)) 351 self.addMessage(m) 352 self.setMood(moods.sad) 353 return defer.fail(e)
354
355 -class PorterProtocolFactory(protocol.Factory):
356 - def __init__(self, porter, protocol):
357 self._porter = porter 358 self.protocol = protocol
359
360 - def buildProtocol(self, addr):
361 p = self.protocol(self._porter) 362 p.factory = self 363 return p
364
365 -class PorterProtocol(protocol.Protocol, log.Loggable):
366 """ 367 The base porter is capable of accepting HTTP-like protocols (including 368 RTSP) - it reads the first line of a request, and makes the decision 369 solely on that. 370 371 We can't guarantee that we read precisely a line, so the buffer we 372 accumulate will actually be larger than what we actually parse. 373 374 @cvar MAX_SIZE: the maximum number of bytes allowed for the first line 375 @cvar delimiters: a list of valid line delimiters I check for 376 """ 377 # Don't permit a first line longer than this. 378 MAX_SIZE = 4096 379 380 # Timeout any client connected to the porter for longer than this. A normal 381 # client should only ever be connected for a fraction of a second. 382 PORTER_CLIENT_TIMEOUT = 30 383 384 # In fact, because we check \r, we'll never need to check for \r\n - we 385 # leave this in as \r\n is the more correct form. At the other end, this 386 # gets processed by a full protocol implementation, so being flexible hurts 387 # us not at all 388 delimiters = ['\r\n', '\n', '\r'] 389
390 - def __init__(self, porter):
391 self._buffer = '' 392 self._porter = porter 393 394 self.debug("Accepted connection") 395 396 self._timeoutDC = reactor.callLater(self.PORTER_CLIENT_TIMEOUT, 397 self._timeout)
398
399 - def _timeout(self):
400 self._timeoutDC = None 401 self.debug("Timing out porter client after %d seconds", 402 self.PORTER_CLIENT_TIMEOUT) 403 self.transport.loseConnection()
404
405 - def connectionLost(self, reason):
406 if self._timeoutDC: 407 self._timeoutDC.cancel() 408 self._timeoutDC = None
409
410 - def dataReceived(self, data):
411 self._buffer = self._buffer + data 412 self.log("Got data, buffer now \"%s\"" % self._buffer) 413 # We accept more than just '\r\n' (the true HTTP line end) in the 414 # interests of compatibility. 415 for delim in self.delimiters: 416 try: 417 line, remaining = self._buffer.split(delim, 1) 418 break 419 except ValueError: 420 # We didn't find this delimiter; continue with the others. 421 pass 422 else: 423 # Failed to find a valid delimiter. 424 self.log("No valid delimiter found") 425 if len(self._buffer) > self.MAX_SIZE: 426 self.log("Dropping connection!") 427 return self.transport.loseConnection() 428 else: 429 # No delimiter found; haven't reached the length limit yet. 430 # Wait for more data. 431 return 432 433 # Got a line. self._buffer is still our entire buffer, should be 434 # provided to the slaved process. 435 identifier = self.parseLine(line) 436 437 if not identifier: 438 self.log("Couldn't find identifier in first line") 439 return self.transport.loseConnection() 440 441 # Ok, we have an identifier. Is it one we know about, or do we have 442 # a default destination? 443 destinationAvatar = self._porter.findDestination(identifier) 444 445 if not destinationAvatar or not destinationAvatar.isAttached(): 446 if destinationAvatar: 447 self.debug("There was an avatar, but it logged out?") 448 self.debug("No destination avatar found for \"%s\"" % identifier) 449 self.writeNotFoundResponse() 450 return self.transport.loseConnection() 451 452 # Transfer control over this FD. Pass all the data so-far received 453 # along in the same message. The receiver will push that data into 454 # the Twisted Protocol object as if it had been normally received, 455 # so it looks to the receiver like it has read the entire data stream 456 # itself. 457 458 # TODO: Check out blocking characteristics of sendFileDescriptor, fix 459 # if it blocks. 460 self.debug("Attempting to send FD: %d" % self.transport.fileno()) 461 destinationAvatar.mind.broker.transport.sendFileDescriptor( 462 self.transport.fileno(), self._buffer) 463 464 # After this, we don't want to do anything with the FD, other than 465 # close our reference to it - but not close the actual TCP connection. 466 # We set keepSocketAlive to make loseConnection() only call close() 467 # rather than shutdown() then close() 468 self.transport.keepSocketAlive = True 469 self.transport.loseConnection()
470
471 - def parseLine(self, line):
472 """ 473 Parse the initial line of the response. Return a string usable for 474 uniquely identifying the stream being requested, or None if the request 475 is unreadable. 476 477 Subclasses should override this. 478 """ 479 raise NotImplementedError
480
481 - def writeNotFoundResponse(self):
482 """ 483 Write a response indicating that the requested resource was not found 484 in this protocol. 485 486 Subclasses should override this to use the correct protocol. 487 """ 488 raise NotImplementedError
489
490 -class HTTPPorterProtocol(PorterProtocol):
491 scheme = 'http' 492 protos = ["HTTP/1.0", "HTTP/1.1"] 493
494 - def parseLine(self, line):
495 try: 496 (method, location, proto) = map(string.strip, line.split(' ', 2)) 497 498 if proto not in self.protos: 499 return None 500 501 # Currently, we just return the path part of the URL. 502 # Use the URL parsing from urllib2. 503 location = urlparse.urlparse(location, 'http')[2] 504 self.log('parsed %s %s %s' % (method, location, proto)) 505 if not location or location == '': 506 return None 507 508 return location 509 510 except ValueError: 511 return None
512
513 - def writeNotFoundResponse(self):
514 self.transport.write("HTTP/1.0 404 Not Found\r\n\r\nResource unknown")
515
516 -class RTSPPorterProtocol(HTTPPorterProtocol):
517 scheme = 'rtsp' 518 protos = ["RTSP/1.0"] 519
520 - def writeNotFoundResponse(self):
521 self.transport.write("RTSP/1.0 404 Not Found\r\n\r\nResource unknown")
522