1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import os
23 import random
24 import socket
25 import string
26 from urllib2 import urlparse
27
28 from twisted.cred import portal
29 from twisted.internet import protocol, reactor, address, error, defer
30 from twisted.spread import pb
31 from zope.interface import implements
32
33 from flumotion.common import medium, log, messages
34 from flumotion.common.i18n import N_, gettexter
35 from flumotion.component import component
36 from flumotion.component.component import moods
37 from flumotion.twisted import credentials, fdserver, checkers
38 from flumotion.twisted import reflect
39
40 __version__ = "$Rev: 6981 $"
41 T_ = gettexter()
42
43
45 """
46 An Avatar in the porter representing a streamer
47 """
48 - def __init__(self, avatarId, porter, mind):
55
57 return self.mind != None
58
60 self.debug("porter client %s logging out", self.avatarId)
61 self.mind = None
62
66
70
74
78
80 """
81 A Realm within the Porter that creates Avatars for streamers logging into
82 the porter.
83 """
84 implements(portal.IRealm)
85
87 """
88 @param porter: The porter that avatars created from here should use.
89 @type porter: L{Porter}
90 """
91 self.porter = porter
92
101
103
105 """
106 Return the location, login username/password, and listening port
107 and interface for the porter as a tuple (path, username,
108 password, port, interface).
109 """
110 return (self.comp._socketPath, self.comp._username,
111 self.comp._password, self.comp._iptablesPort,
112 self.comp._interface)
113
114 -class Porter(component.BaseComponent, log.Loggable):
115 """
116 The porter optionally sits in front of a set of streamer components.
117 The porter is what actually deals with incoming connections on a TCP socket.
118 It decides which streamer to direct the connection to, then passes the FD
119 (along with some amount of already-read data) to the appropriate streamer.
120 """
121
122 componentMediumClass = PorterMedium
123
125
126
127 self._mappings = {}
128 self._prefixes = {}
129
130 self._socketlistener = None
131
132 self._socketPath = None
133 self._username = None
134 self._password = None
135 self._port = None
136 self._iptablesPort = None
137 self._porterProtocol = None
138
139 self._interface = ''
140
142 """
143 Register a path as being served by a streamer represented by this
144 avatar. Will remove any previous registration at this path.
145
146 @param path: The path to register
147 @type path: str
148 @param avatar: The avatar representing the streamer to direct this path
149 to
150 @type avatar: L{PorterAvatar}
151 """
152 self.debug("Registering porter path \"%s\" to %r" % (path, avatar))
153 if self._mappings.has_key(path):
154 self.warning("Replacing existing mapping for path \"%s\"" % path)
155
156 self._mappings[path] = avatar
157
159 """
160 Attempt to deregister the given path. A deregistration will only be
161 accepted if the mapping is to the avatar passed.
162
163 @param path: The path to deregister
164 @type path: str
165 @param avatar: The avatar representing the streamer being deregistered
166 @type avatar: L{PorterAvatar}
167 """
168 if self._mappings.has_key(path):
169 if self._mappings[path] == avatar:
170 self.debug("Removing porter mapping for \"%s\"" % path)
171 del self._mappings[path]
172 else:
173 self.warning("Mapping not removed: refers to a different avatar")
174 else:
175 self.warning("Mapping not removed: no mapping found")
176
178 """
179 Register a destination for all requests directed to anything beginning
180 with a specified prefix. Where there are multiple matching prefixes, the
181 longest is selected.
182
183 @param avatar: The avatar being registered
184 @type avatar: L{PorterAvatar}
185 """
186
187 self.debug("Setting prefix \"%s\" for porter", prefix)
188 if prefix in self._prefixes:
189 self.warning("Overwriting prefix")
190
191 self._prefixes[prefix] = avatar
192
194 """
195 Attempt to deregister a default destination for all requests not
196 directed to a specifically-mapped path. This will only succeed if the
197 default is currently equal to this avatar.
198
199 @param avatar: The avatar being deregistered
200 @type avatar: L{PorterAvatar}
201 """
202 if prefix not in self._prefixes:
203 self.warning("Mapping not removed: no mapping found")
204 return
205
206 if self._prefixes[prefix] == avatar:
207 self.debug("Removing prefix destination from porter")
208 del self._prefixes[prefix]
209 else:
210 self.warning("Not removing prefix destination: expected avatar not found")
211
213 found = None
214
215 for prefix in self._prefixes.keys():
216 self.log("Checking: %r, %r" % (prefix, path))
217 if (path.startswith(prefix) and (not found or len(found) < len(prefix))):
218 found = prefix
219 if found:
220 return self._prefixes[found]
221 else:
222 return None
223
225 """
226 Find a destination Avatar for this path.
227 @returns: The Avatar for this mapping, or None.
228 """
229
230 if self._mappings.has_key(path):
231 return self._mappings[path]
232 else:
233 return self.findPrefixMatch(path)
234
235
237 """
238 Generate a socket pathname in an appropriate location
239 """
240
241
242 import tempfile
243 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.porter.')
244 os.close(fd)
245
246 return name
247
249 """
250 Generate a random US-ASCII string of length numchars
251 """
252 string = ""
253 chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
254 for _ in range(numchars):
255 string += chars[random.randint(0, len(chars) - 1)]
256
257 return string
258
260 props = self.config['properties']
261
262 self.fixRenamedProperties(props,
263 [('socket_path', 'socket-path')])
264
265
266
267
268
269 if props.has_key('socket-path'):
270
271 self._socketPath = props['socket-path']
272 self._username = props['username']
273 self._password = props['password']
274 else:
275
276
277 self._username = self.generateRandomString(12)
278 self._password = self.generateRandomString(12)
279 self._socketPath = self.generateSocketPath()
280
281 self._port = int(props['port'])
282 self._iptablesPort = int(props.get('iptables-port', self._port))
283 self._porterProtocol = props.get('protocol',
284 'flumotion.component.misc.porter.porter.HTTPPorterProtocol')
285 self._interface = props.get('interface', '')
286
288 d = None
289 if self._socketlistener:
290
291
292 d = self._socketlistener.stopListening()
293 self._socketlistener = None
294 return d
295
297
298 self.have_properties()
299 realm = PorterRealm(self)
300 checker = checkers.FlexibleCredentialsChecker()
301 checker.addUser(self._username, self._password)
302
303 p = portal.Portal(realm, [checker])
304 serverfactory = pb.PBServerFactory(p)
305
306
307
308 try:
309
310
311
312 try:
313 os.unlink(self._socketPath)
314 except OSError:
315 pass
316
317 self._socketlistener = reactor.listenWith(
318 fdserver.FDPort, self._socketPath, serverfactory)
319 self.debug("Now listening on socketPath %s" % self._socketPath)
320 except error.CannotListenError, e:
321 self.warning("Failed to create socket %s" % self._socketPath)
322 m = messages.Error(T_(N_(
323 "Network error: socket path %s is not available."),
324 self._socketPath))
325 self.addMessage(m)
326 self.setMood(moods.sad)
327 return defer.fail(e)
328
329
330
331 try:
332 proto = reflect.namedAny(self._porterProtocol)
333 self.debug("Created proto %r" % proto)
334 except (ImportError, AttributeError):
335 self.warning("Failed to import protocol '%s', defaulting to HTTP" %
336 self._porterProtocol)
337 proto = HTTPPorterProtocol
338
339
340
341 factory = PorterProtocolFactory(self, proto)
342 try:
343 reactor.listenWith(
344 fdserver.PassableServerPort, self._port, factory,
345 interface=self._interface)
346 self.debug("Now listening on port %d" % self._port)
347 except error.CannotListenError, e:
348 self.warning("Failed to listen on port %d" % self._port)
349 m = messages.Error(T_(N_(
350 "Network error: TCP port %d is not available."), self._port))
351 self.addMessage(m)
352 self.setMood(moods.sad)
353 return defer.fail(e)
354
357 self._porter = porter
358 self.protocol = protocol
359
361 p = self.protocol(self._porter)
362 p.factory = self
363 return p
364
366 """
367 The base porter is capable of accepting HTTP-like protocols (including
368 RTSP) - it reads the first line of a request, and makes the decision
369 solely on that.
370
371 We can't guarantee that we read precisely a line, so the buffer we
372 accumulate will actually be larger than what we actually parse.
373
374 @cvar MAX_SIZE: the maximum number of bytes allowed for the first line
375 @cvar delimiters: a list of valid line delimiters I check for
376 """
377
378 MAX_SIZE = 4096
379
380
381
382 PORTER_CLIENT_TIMEOUT = 30
383
384
385
386
387
388 delimiters = ['\r\n', '\n', '\r']
389
398
400 self._timeoutDC = None
401 self.debug("Timing out porter client after %d seconds",
402 self.PORTER_CLIENT_TIMEOUT)
403 self.transport.loseConnection()
404
406 if self._timeoutDC:
407 self._timeoutDC.cancel()
408 self._timeoutDC = None
409
411 self._buffer = self._buffer + data
412 self.log("Got data, buffer now \"%s\"" % self._buffer)
413
414
415 for delim in self.delimiters:
416 try:
417 line, remaining = self._buffer.split(delim, 1)
418 break
419 except ValueError:
420
421 pass
422 else:
423
424 self.log("No valid delimiter found")
425 if len(self._buffer) > self.MAX_SIZE:
426 self.log("Dropping connection!")
427 return self.transport.loseConnection()
428 else:
429
430
431 return
432
433
434
435 identifier = self.parseLine(line)
436
437 if not identifier:
438 self.log("Couldn't find identifier in first line")
439 return self.transport.loseConnection()
440
441
442
443 destinationAvatar = self._porter.findDestination(identifier)
444
445 if not destinationAvatar or not destinationAvatar.isAttached():
446 if destinationAvatar:
447 self.debug("There was an avatar, but it logged out?")
448 self.debug("No destination avatar found for \"%s\"" % identifier)
449 self.writeNotFoundResponse()
450 return self.transport.loseConnection()
451
452
453
454
455
456
457
458
459
460 self.debug("Attempting to send FD: %d" % self.transport.fileno())
461 destinationAvatar.mind.broker.transport.sendFileDescriptor(
462 self.transport.fileno(), self._buffer)
463
464
465
466
467
468 self.transport.keepSocketAlive = True
469 self.transport.loseConnection()
470
472 """
473 Parse the initial line of the response. Return a string usable for
474 uniquely identifying the stream being requested, or None if the request
475 is unreadable.
476
477 Subclasses should override this.
478 """
479 raise NotImplementedError
480
482 """
483 Write a response indicating that the requested resource was not found
484 in this protocol.
485
486 Subclasses should override this to use the correct protocol.
487 """
488 raise NotImplementedError
489
491 scheme = 'http'
492 protos = ["HTTP/1.0", "HTTP/1.1"]
493
512
514 self.transport.write("HTTP/1.0 404 Not Found\r\n\r\nResource unknown")
515
517 scheme = 'rtsp'
518 protos = ["RTSP/1.0"]
519
521 self.transport.write("RTSP/1.0 404 Not Found\r\n\r\nResource unknown")
522