Package flumotion :: Package component :: Package misc :: Package porter :: Module porter
[hide private]

Source Code for Module flumotion.component.misc.porter.porter

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_porter -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  from urllib2 import urlparse 
 23   
 24  from twisted.internet import protocol, reactor, address, error, defer 
 25   
 26  from twisted.spread import pb 
 27  from twisted.cred import portal 
 28   
 29  from flumotion.common import medium, log, messages 
 30  from flumotion.twisted import credentials, fdserver, checkers 
 31  from flumotion.twisted import reflect  
 32   
 33  from flumotion.component import component 
 34  from flumotion.component.component import moods 
 35   
 36  import socket, string, os, random 
 37   
 38  from flumotion.common.messages import N_ 
 39  T_ = messages.gettexter('flumotion') 
 40   
41 -class PorterAvatar(pb.Avatar, log.Loggable):
42 """ 43 An Avatar in the porter representing a streamer 44 """
45 - def __init__(self, avatarId, porter, mind):
46 self.avatarId = avatarId 47 self.porter = porter 48 49 # The underlying transport is now accessible as 50 # self.mind.broker.transport, on which we can call sendFileDescriptor 51 self.mind = mind
52
53 - def isAttached(self):
54 return self.mind != None
55
56 - def logout(self):
57 self.mind = None
58
59 - def perspective_registerPath(self, path):
60 self.log("Perspective called: registering path \"%s\"" % path) 61 self.porter.registerPath(path, self)
62
63 - def perspective_deregisterPath(self, path):
64 self.log("Perspective called: deregistering path \"%s\"" % path) 65 self.porter.deregisterPath(path, self)
66
67 - def perspective_registerPrefix(self, prefix):
68 self.log("Perspective called: registering default") 69 self.porter.registerPrefix(prefix, self)
70
71 - def perspective_deregisterPrefix(self, prefix):
72 self.log("Perspective called: deregistering default") 73 self.porter.deregisterPrefix(prefix, self)
74
75 -class PorterRealm(log.Loggable):
76 """ 77 A Realm within the Porter that creates Avatars for streamers logging into 78 the porter. 79 """ 80 __implements__ = portal.IRealm 81
82 - def __init__(self, porter):
83 """ 84 @param porter: The porter that avatars created from here should use. 85 @type porter: L{Porter} 86 """ 87 self.porter = porter
88
89 - def requestAvatar(self, avatarId, mind, *interfaces):
90 self.log("Avatar requested for avatarId %s, mind %r, interfaces %r" % 91 (avatarId, mind, interfaces)) 92 if pb.IPerspective in interfaces: 93 avatar = PorterAvatar(avatarId, self.porter, mind) 94 return pb.IPerspective, avatar, avatar.logout 95 else: 96 raise NotImplementedError("no interface")
97
98 -class PorterMedium(component.BaseComponentMedium):
99
100 - def remote_getPorterDetails(self):
101 """ 102 Return the location, login username/password, and listening port 103 and interface for the porter as a tuple (path, username, 104 password, port, interface). 105 """ 106 return (self.comp._socketPath, self.comp._username, 107 self.comp._password, self.comp._iptablesPort, 108 self.comp._interface)
109
110 -class Porter(component.BaseComponent, log.Loggable):
111 """ 112 The porter optionally sits in front of a set of streamer components. 113 The porter is what actually deals with incoming connections on a TCP socket. 114 It decides which streamer to direct the connection to, then passes the FD 115 (along with some amount of already-read data) to the appropriate streamer. 116 """ 117 118 componentMediumClass = PorterMedium 119
120 - def init(self):
121 # We maintain a map of path -> avatar (the underlying transport is 122 # accessible from the avatar, we need this for FD-passing) 123 self._mappings = {} 124 self._prefixes = {} 125 126 self._socketlistener = None 127 128 self._socketPath = None 129 self._username = None 130 self._password = None 131 self._port = None 132 self._iptablesPort = None 133 self._porterProtocol = None 134 135 self._interface = ''
136
137 - def registerPath(self, path, avatar):
138 """ 139 Register a path as being served by a streamer represented by this 140 avatar. Will remove any previous registration at this path. 141 142 @param path: The path to register 143 @type path: str 144 @param avatar: The avatar representing the streamer to direct this path 145 to 146 @type avatar: L{PorterAvatar} 147 """ 148 self.debug("Registering porter path \"%s\" to %r" % (path, avatar)) 149 if self._mappings.has_key(path): 150 self.warning("Replacing existing mapping for path \"%s\"" % path) 151 152 self._mappings[path] = avatar
153
154 - def deregisterPath(self, path, avatar):
155 """ 156 Attempt to deregister the given path. A deregistration will only be 157 accepted if the mapping is to the avatar passed. 158 159 @param path: The path to deregister 160 @type path: str 161 @param avatar: The avatar representing the streamer being deregistered 162 @type avatar: L{PorterAvatar} 163 """ 164 if self._mappings.has_key(path): 165 if self._mappings[path] == avatar: 166 self.debug("Removing porter mapping for \"%s\"" % path) 167 del self._mappings[path] 168 else: 169 self.warning("Mapping not removed: refers to a different avatar") 170 else: 171 self.warning("Mapping not removed: no mapping found")
172
173 - def registerPrefix(self, prefix, avatar):
174 """ 175 Register a destination for all requests directed to anything beginning 176 with a specified prefix. Where there are multiple matching prefixes, the 177 longest is selected. 178 179 @param avatar: The avatar being registered 180 @type avatar: L{PorterAvatar} 181 """ 182 183 self.debug("Setting prefix \"%s\" for porter", prefix) 184 if prefix in self._prefixes: 185 self.warning("Overwriting prefix") 186 187 self._prefixes[prefix] = avatar
188
189 - def deregisterPrefix(self, prefix, avatar):
190 """ 191 Attempt to deregister a default destination for all requests not 192 directed to a specifically-mapped path. This will only succeed if the 193 default is currently equal to this avatar. 194 195 @param avatar: The avatar being deregistered 196 @type avatar: L{PorterAvatar} 197 """ 198 if prefix not in self._prefixes: 199 self.warning("Mapping not removed: no mapping found") 200 return 201 202 if self._prefixes[prefix] == avatar: 203 self.debug("Removing prefix destination from porter") 204 del self._prefixes[prefix] 205 else: 206 self.warning("Not removing prefix destination: expected avatar not found")
207
208 - def findPrefixMatch(self, path):
209 found = None 210 # TODO: Horribly inefficient. Figure out a smart algorithm 211 for prefix in self._prefixes.keys(): 212 self.debug("Checking: %r, %r" % (prefix, path)) 213 if (path.startswith(prefix) and (not found or len(found) < len(prefix))): 214 found = prefix 215 if found: 216 return self._prefixes[found] 217 else: 218 return None
219
220 - def findDestination(self, path):
221 """ 222 Find a destination Avatar for this path. 223 @returns: The Avatar for this mapping, or None. 224 """ 225 226 if self._mappings.has_key(path): 227 return self._mappings[path] 228 else: 229 return self.findPrefixMatch(path)
230 231
232 - def generateSocketPath(self):
233 """ 234 Generate a socket pathname in an appropriate location 235 """ 236 # Also see worker/worker.py:_getSocketPath(), and note that this suffers 237 # from the same potential race. 238 import tempfile 239 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.porter.') 240 os.close(fd) 241 242 return name
243
244 - def generateRandomString(self, numchars):
245 """ 246 Generate a random US-ASCII string of length numchars 247 """ 248 str = "" 249 chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" 250 for _ in range(numchars): 251 str += chars[random.randint(0, len(chars)-1)] 252 253 return str
254
255 - def do_setup(self):
256 props = self.config['properties'] 257 258 self.fixRenamedProperties(props, 259 [('socket_path', 'socket-path')]) 260 261 # We can operate in two modes: explicitly configured (neccesary if you 262 # want to handle connections from components in other managers), and 263 # self-configured (which is sufficient for slaving only streamers 264 # within this manager 265 if props.has_key('socket-path'): 266 # Explicitly configured 267 self._socketPath = props['socket-path'] 268 self._username = props['username'] 269 self._password = props['password'] 270 else: 271 # Self-configuring. Use a randomly create username/password, and 272 # a socket with a random name. 273 self._username = self.generateRandomString(12) 274 self._password = self.generateRandomString(12) 275 self._socketPath = self.generateSocketPath() 276 277 self._port = int(props['port']) 278 self._iptablesPort = int(props.get('iptables-port', self._port)) 279 self._porterProtocol = props.get('protocol', 280 'flumotion.component.misc.porter.porter.HTTPPorterProtocol') 281 self._interface = props.get('interface', '')
282
283 - def do_stop(self):
284 if self._socketlistener: 285 # stopListening() calls (via a callLater) connectionLost(), which 286 # would normally unlink our socket. However, if we stop the reactor 287 # before this happens, we leave a stale socket. So, we explicitly 288 # unlink it below, as well. 289 self._socketlistener.stopListening() 290 self._socketlistener = None 291 292 try: 293 os.unlink(self._socketPath) 294 except: 295 pass 296 297 return component.BaseComponent.do_stop(self)
298
299 - def do_start(self, *args, **kwargs):
300 # Create our combined PB-server/fd-passing channel 301 302 realm = PorterRealm(self) 303 checker = checkers.FlexibleCredentialsChecker() 304 checker.addUser(self._username, self._password) 305 306 p = portal.Portal(realm, [checker]) 307 serverfactory = pb.PBServerFactory(p) 308 309 try: 310 # Rather than a normal listenTCP() or listenUNIX(), we use 311 # listenWith so that we can specify our particular Port, which 312 # creates Transports that we know how to pass FDs over. 313 try: 314 os.unlink(self._socketPath) 315 except: 316 pass 317 318 self._socketlistener = reactor.listenWith( 319 fdserver.FDPort, self._socketPath, serverfactory) 320 self.debug("Now listening on socketPath %s" % self._socketPath) 321 except error.CannotListenError, e: 322 self.warning("Failed to create socket %s" % self._socketPath) 323 m = messages.Error(T_(N_( 324 "Network error: socket path %s is not available."), 325 self._socketPath)) 326 self.addMessage(m) 327 self.setMood(moods.sad) 328 return defer.fail(e) 329 330 # Create the class that deals with the specific protocol we're proxying 331 # in this porter. 332 try: 333 proto = reflect.namedAny(self._porterProtocol) 334 self.debug("Created proto %r" % proto) 335 except: 336 self.warning("Failed to import protocol '%s', defaulting to HTTP" % 337 self._porterProtocol) 338 proto = HTTPPorterProtocol 339 340 # And of course we also want to listen for incoming requests in the 341 # appropriate protocol (HTTP, RTSP, etc.) 342 factory = PorterProtocolFactory(self, proto) 343 try: 344 reactor.listenWith( 345 fdserver.PassableServerPort, self._port, factory, 346 interface=self._interface) 347 self.debug("Now listening on port %d" % self._port) 348 except error.CannotListenError, e: 349 self.warning("Failed to listen on port %d" % self._port) 350 m = messages.Error(T_(N_( 351 "Network error: TCP port %d is not available."), self._port)) 352 self.addMessage(m) 353 self.setMood(moods.sad) 354 return defer.fail(e) 355 356 return component.BaseComponent.do_start(self, *args, **kwargs)
357
358 -class PorterProtocolFactory(protocol.Factory):
359 - def __init__(self, porter, protocol):
360 self._porter = porter 361 self.protocol = protocol
362
363 - def buildProtocol(self, addr):
364 p = self.protocol(self._porter) 365 p.factory = self 366 return p
367
368 -class PorterProtocol(protocol.Protocol, log.Loggable):
369 """ 370 The base porter is capable of accepting HTTP-like protocols (including 371 RTSP) - it reads the first line of a request, and makes the decision 372 solely on that. 373 374 We can't guarantee that we read precisely a line, so the buffer we 375 accumulate will actually be larger than what we actually parse. 376 377 @cvar MAX_SIZE: the maximum number of bytes allowed for the first line 378 @cvar delimiters: a list of valid line delimiters I check for 379 """ 380 # Don't permit a first line longer than this. 381 MAX_SIZE = 4096 382 383 # In fact, because we check \r, we'll never need to check for \r\n - we 384 # leave this in as \r\n is the more correct form. At the other end, this 385 # gets processed by a full protocol implementation, so being flexible hurts 386 # us not at all 387 delimiters = ['\r\n', '\n', '\r'] 388
389 - def __init__(self, porter):
390 self._buffer = '' 391 self._porter = porter
392
393 - def dataReceived(self, data):
394 self._buffer = self._buffer + data 395 self.log("Got data, buffer now \"%s\"" % self._buffer) 396 # We accept more than just '\r\n' (the true HTTP line end) in the 397 # interests of compatibility. 398 for delim in self.delimiters: 399 try: 400 line, remaining = self._buffer.split(delim, 1) 401 break 402 except ValueError: 403 self.log("No line break found yet") 404 pass 405 else: 406 # Failed to find a valid delimiter. 407 self.log("No valid delimiter found") 408 if len(self._buffer) > self.MAX_SIZE: 409 self.log("Dropping connection!") 410 return self.transport.loseConnection() 411 else: 412 # TODO: Should we return anything? 413 return 414 415 # Got a line. self._buffer is still our entire buffer, should be 416 # provided to the slaved process. 417 identifier = self.parseLine(line) 418 419 if not identifier: 420 self.log("Couldn't find identifier in first line") 421 return self.transport.loseConnection() 422 423 # Ok, we have an identifier. Is it one we know about, or do we have 424 # a default destination? 425 destinationAvatar = self._porter.findDestination(identifier) 426 427 if not destinationAvatar or not destinationAvatar.isAttached(): 428 if destinationAvatar: 429 self.log("There was an avatar, but it logged out?") 430 self.log("No destination avatar found for \"%s\"" % identifier) 431 self.writeNotFoundResponse() 432 return self.transport.loseConnection() 433 434 # Transfer control over this FD. Pass all the data so-far received 435 # along in the same message. The receiver will push that data into 436 # the Twisted Protocol object as if it had been normally received, 437 # so it looks to the receiver like it has read the entire data stream 438 # itself. 439 440 # TODO: Check out blocking characteristics of sendFileDescriptor, fix 441 # if it blocks. 442 self.debug("Attempting to send FD: %d" % self.transport.fileno()) 443 destinationAvatar.mind.broker.transport.sendFileDescriptor( 444 self.transport.fileno(), self._buffer) 445 446 # After this, we don't want to do anything with the FD, other than 447 # close our reference to it - but not close the actual TCP connection. 448 # We set keepSocketAlive to make loseConnection() only call close() 449 # rather than shutdown() then close() 450 self.transport.keepSocketAlive = True 451 self.transport.loseConnection()
452
453 - def parseLine(self, line):
454 """ 455 Parse the initial line of the response. Return a string usable for 456 uniquely identifying the stream being requested, or None if the request 457 is unreadable. 458 459 Subclasses should override this. 460 """ 461 raise NotImplementedError
462
463 - def writeNotFoundResponse(self):
464 """ 465 Write a response indicating that the requested resource was not found 466 in this protocol. 467 468 Subclasses should override this to use the correct protocol. 469 """ 470 raise NotImplementedError
471
472 -class HTTPPorterProtocol(PorterProtocol):
473 scheme = 'http' 474 protos = ["HTTP/1.0", "HTTP/1.1"] 475
476 - def parseLine(self, line):
477 try: 478 (method, location, proto) = map(string.strip, line.split(' ', 2)) 479 480 if proto not in self.protos: 481 return None 482 483 # Currently, we just return the path part of the URL. 484 # Use the URL parsing from urllib2. 485 location = urlparse.urlparse(location, 'http')[2] 486 self.log('parsed %s %s %s' % (method, location, proto)) 487 if not location or location == '': 488 return None 489 490 return location 491 492 except ValueError: 493 return None
494
495 - def writeNotFoundResponse(self):
496 self.transport.write("HTTP/1.0 404 Not Found\r\n\r\nResource unknown")
497
498 -class RTSPPorterProtocol(HTTPPorterProtocol):
499 scheme = 'rtsp' 500 protos = ["RTSP/1.0"] 501
502 - def writeNotFoundResponse(self):
503 self.transport.write("RTSP/1.0 404 Not Found\r\n\r\nResource unknown")
504