1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import os
23 import string
24 import time
25
26 from twisted.web import server, http
27 from twisted.web.resource import Resource
28 from twisted.internet import defer, reactor, error
29 from twisted.cred import credentials
30 from zope.interface import implements
31
32 from flumotion.common import log, messages, errors, netutils, interfaces
33 from flumotion.common.i18n import N_, gettexter
34 from flumotion.component import component
35 from flumotion.component.base import http as httpbase
36 from flumotion.component.component import moods
37 from flumotion.component.misc.httpserver import httpfile
38 from flumotion.component.misc.porter import porterclient
39 from flumotion.twisted import fdserver
40
41 __version__ = "$Rev: 6984 $"
42 T_ = gettexter()
43
44
46
48 server.Request.__init__(self, channel, queued)
49
50 self._component = channel.factory.component
51 self._completed = False
52 self._transfer = None
53
54 self._bytes_written = 0
55 self._start_time = time.time()
56 self._lastTimeWritten = self._start_time
57
58
59
60
61 self._fd = self.transport.fileno()
62
63 self._component.requestStarted(self)
64
70
79
85
87 if not self._completed:
88 self._component.requestFinished(self, self._bytes_written,
89 time.time() - self._start_time, fd)
90 self._completed = True
91
92
93 -class Site(server.Site):
100
101
108
110 """
111 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None.
112 """
113 return self.callRemote('authenticate', bouncerName, keycard)
114
115 - def keepAlive(self, bouncerName, issuerName, ttl):
116 """
117 @rtype: L{twisted.internet.defer.Deferred}
118 """
119 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
120
122 """
123 @rtype: L{twisted.internet.defer.Deferred}
124 """
125 return self.callRemote('removeKeycardId', bouncerName, keycardId)
126
129
132
135
138
141
142
144 implements(interfaces.IStreamingComponent)
145
146 componentMediumClass = HTTPFileMedium
147
148 REQUEST_TIMEOUT = 30
149
150
152 self.mountPoint = None
153 self.type = None
154 self.port = None
155 self.hostname = None
156 self._rateControlPlug = None
157 self._loggers = []
158 self._logfilter = None
159 self.httpauth = None
160
161 self._description = 'On-Demand Flumotion Stream'
162
163 self._singleFile = False
164 self._connected_clients = {}
165 self._total_bytes_written = 0
166
167 self._pbclient = None
168
169 self._twistedPort = None
170 self._timeoutRequestsCallLater = None
171
172 self._pendingDisconnects = {}
173 self._rootResource = None
174
175
176
177 self._mimeToResource = {
178 'video/x-flv': httpfile.FLVFile,
179 }
180
181
182 self.uiState.addKey("connected-clients", 0)
183 self.uiState.addKey("bytes-transferred", 0)
184 self.uiState.addKey('stream-url', None)
185
187 props = self.config['properties']
188 self.fixRenamedProperties(props, [
189 ('issuer', 'issuer-class'),
190 ('porter_socket_path', 'porter-socket-path'),
191 ('porter_username', 'porter-username'),
192 ('porter_password', 'porter-password'),
193 ('mount_point', 'mount-point')
194 ])
195
196 if props.get('type', 'master') == 'slave':
197 for k in 'socket-path', 'username', 'password':
198 if not 'porter-' + k in props:
199 msg = 'slave mode, missing required property porter-%s' % k
200 return defer.fail(errors.ConfigError(msg))
201
202 path = props.get('path', None)
203 if path is None:
204 return
205 if os.path.isfile(path):
206 self._singleFile = True
207 elif os.path.isdir(path):
208 self._singleFile = False
209 else:
210 msg = "the file or directory specified in 'path': %s does " \
211 "not exist or is neither a file nor directory" % path
212 return defer.fail(errors.ConfigError(msg))
213
215 desc = props.get('description', None)
216 if desc:
217 self._description = desc
218
219
220 mountPoint = props.get('mount-point', '/')
221 if not mountPoint.startswith('/'):
222 mountPoint = '/' + mountPoint
223 self.mountPoint = mountPoint
224 self.hostname = props.get('hostname', None)
225 if not self.hostname:
226 self.hostname = netutils.guess_public_hostname()
227
228 self.filePath = props.get('path')
229 self.type = props.get('type', 'master')
230 self.port = props.get('port', 8801)
231 if self.type == 'slave':
232
233 self._porterPath = props['porter-socket-path']
234 self._porterUsername = props['porter-username']
235 self._porterPassword = props['porter-password']
236 self._loggers = \
237 self.plugs.get('flumotion.component.plugs.loggers.Logger', [])
238
239 self.httpauth = httpbase.HTTPAuthentication(self)
240 if 'bouncer' in props:
241 self.httpauth.setBouncerName(props['bouncer'])
242 if 'issuer-class' in props:
243 self.httpauth.setIssuerClass(props['issuer-class'])
244 if 'ip-filter' in props:
245 logFilter = http.LogFilter()
246 for f in props['ip-filter']:
247 logFilter.addIPFilter(f)
248 self._logfilter = logFilter
249
250 socket = 'flumotion.component.misc.httpserver.ratecontroller.RateController'
251 plugs = self.plugs.get(socket, [])
252 if plugs:
253
254 self._rateControlPlug = self.plugs[socket][-1]
255
256
257 self.uiState.set('stream-url', self.getUrl())
258
260 self.have_properties(self.config['properties'])
261
262 root = self._rootResource
263 if root is None:
264 root = self._getDefaultRootResource()
265
266 if root is None:
267 raise errors.WrongStateError(
268 "a resource or path property must be set")
269
270 site = Site(root, self)
271 self._timeoutRequestsCallLater = reactor.callLater(
272 self.REQUEST_TIMEOUT, self._timeoutRequests)
273
274 d = defer.Deferred()
275 if self.type == 'slave':
276
277 if self._singleFile:
278 self._pbclient = porterclient.HTTPPorterClientFactory(
279 site, [self.mountPoint], d)
280 else:
281 self._pbclient = porterclient.HTTPPorterClientFactory(
282 site, [], d,
283 prefixes=[self.mountPoint])
284 creds = credentials.UsernamePassword(self._porterUsername,
285 self._porterPassword)
286 self._pbclient.startLogin(creds, self._pbclient.medium)
287 self.debug("Starting porter login!")
288
289 reactor.connectWith(fdserver.FDConnector, self._porterPath,
290 self._pbclient, 10, checkPID=False)
291 else:
292
293 try:
294 self.debug('Going to listen on port %d' % self.port)
295 iface = ""
296
297
298 self._twistedPort = reactor.listenTCP(self.port,
299 site, interface=iface)
300 self.port = self._twistedPort.getHost().port
301 self.debug('Listening on port %d' % self.port)
302 except error.CannotListenError:
303 t = 'Port %d is not available.' % self.port
304 self.warning(t)
305 m = messages.Error(T_(N_(
306 "Network error: TCP port %d is not available."), self.port))
307 self.addMessage(m)
308 self.setMood(moods.sad)
309 return defer.fail(errors.ComponentStartHandledError(t))
310
311 d.callback(None)
312
313 def setComponentHappy(result):
314 self.httpauth.scheduleKeepAlive()
315 self.setMood(moods.happy)
316 return result
317 d.addCallback(setComponentHappy)
318 return d
319
321 if self.httpauth:
322 self.httpauth.stopKeepAlive()
323 if self._timeoutRequestsCallLater:
324 self._timeoutRequestsCallLater.cancel()
325 self._timeoutRequestsCallLater = None
326 if self._twistedPort:
327 self._twistedPort.stopListening()
328
329 l = [self.remove_all_clients()]
330 if self.type == 'slave' and self._pbclient:
331 if self._singleFile:
332 l.append(self._pbclient.deregisterPath(self.mountPoint))
333 else:
334 l.append(self._pbclient.deregisterPrefix(self.mountPoint))
335 return defer.DeferredList(l)
336
338 """
339 Provide a new set of porter login information, for when we're in slave
340 mode and the porter changes.
341 If we're currently connected, this won't disconnect - it'll just change
342 the information so that next time we try and connect we'll use the
343 new ones
344 @param path: new path
345 @param username: new username
346 @param password: new password
347 """
348 if self.type != 'slave':
349 raise errors.WrongStateError(
350 "Can't specify porter details in master mode")
351
352 self._porterUsername = username
353 self._porterPassword = password
354
355 creds = credentials.UsernamePassword(self._porterUsername,
356 self._porterPassword)
357 self._pbclient.startLogin(creds, self.medium)
358
359 self._updatePath(path)
360
362
363 if path == self._porterPath:
364 return
365 self._porterPath = path
366
367
368 self._pbclient.stopTrying()
369
370 self._pbclient.resetDelay()
371 reactor.connectWith(fdserver.FDConnector, self._porterPath,
372 self._pbclient, 10, checkPID=False)
373
388
390 if self.filePath is None:
391 return None
392
393 self.debug('Starting with mount point "%s"' % self.mountPoint)
394 factory = httpfile.MimedFileFactory(self.httpauth,
395 mimeToResource=self._mimeToResource,
396 rateController=self._rateControlPlug)
397
398 root = factory.create(self.filePath)
399 if self.mountPoint != '/':
400 root = self._createRootResourceForPath(self.mountPoint, root)
401
402 return root
403
405 if path.endswith('/'):
406 path = path[:-1]
407
408 root = Resource()
409 children = string.split(path[1:], '/')
410 parent = root
411 for child in children[:-1]:
412 resource = Resource()
413 self.debug("Putting Resource at %s", child)
414 parent.putChild(child, resource)
415 parent = resource
416 self.debug("Putting resource %r at %r", fileResource, children[-1])
417 parent.putChild(children[-1], fileResource)
418 return root
419
421 """
422 Remove a client when requested.
423
424 Used by keycard expiry.
425 """
426 if fd in self._connected_clients:
427 request = self._connected_clients[fd]
428 self.debug("Removing client for fd %d", fd)
429 request.unregisterProducer()
430 request.channel.transport.loseConnection()
431 else:
432 self.debug("No client with fd %d found", fd)
433
435 l = []
436 for fd in self._connected_clients:
437 d = defer.Deferred()
438 self._pendingDisconnects[fd] = d
439 l.append(d)
440
441 request = self._connected_clients[fd]
442 request.unregisterProducer()
443 request.channel.transport.loseConnection()
444
445 self.debug("Waiting for %d clients to finish", len(l))
446 return defer.DeferredList(l)
447
449 fd = request.transport.fileno()
450 self._connected_clients[fd] = request
451 self.uiState.set("connected-clients", len(self._connected_clients))
452
454 self.httpauth.cleanupAuth(fd)
455 headers = request.getAllHeaders()
456
457 ip = request.getClientIP()
458 if not self._logfilter or not self._logfilter.isInRange(ip):
459 args = {'ip': ip,
460 'time': time.gmtime(),
461 'method': request.method,
462 'uri': request.uri,
463 'username': '-',
464 'get-parameters': request.args,
465 'clientproto': request.clientproto,
466 'response': request.code,
467 'bytes-sent': bytesWritten,
468 'referer': headers.get('referer', None),
469 'user-agent': headers.get('user-agent', None),
470 'time-connected': timeConnected}
471
472 l = []
473 for logger in self._loggers:
474 l.append(defer.maybeDeferred(
475 logger.event, 'http_session_completed', args))
476 d = defer.DeferredList(l)
477 else:
478 d = defer.succeed(None)
479
480 del self._connected_clients[fd]
481
482 self.uiState.set("connected-clients", len(self._connected_clients))
483
484 self._total_bytes_written += bytesWritten
485 self.uiState.set("bytes-transferred", self._total_bytes_written)
486
487 def firePendingDisconnect(_):
488 self.debug("Logging completed")
489 if fd in self._pendingDisconnects:
490 pending = self._pendingDisconnects.pop(fd)
491 self.debug("Firing pending disconnect deferred")
492 pending.callback(None)
493 d.addCallback(firePendingDisconnect)
494
496 return self._description
497
499 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
500
502 socket = 'flumotion.component.plugs.streamdata.StreamDataProvider'
503 if self.plugs[socket]:
504 plug = self.plugs[socket][-1]
505 return plug.getStreamData()
506 else:
507 return {
508 'protocol': 'HTTP',
509 'description': self._description,
510 'url' : self.getUrl()
511 }
512
514 """
515 Return a tuple (deltaadded, deltaremoved, bytes_transferred,
516 current_clients, current_load) of our current bandwidth and user values.
517 The deltas and current_load are NOT currently implemented here, we set
518 them as zero.
519 """
520 bytesTransferred = self._total_bytes_written
521 for request in self._connected_clients.values():
522 if request._transfer:
523 bytesTransferred += request._transfer.bytesWritten
524
525 return (0, 0, bytesTransferred, len(self._connected_clients), 0)
526
528 """
529 Close the logfile, then reopen using the previous logfilename
530 """
531 for logger in self._loggers:
532 self.debug('rotating logger %r' % logger)
533 logger.rotate()
534
536 """Attaches a root resource to this component. The root resource is the
537 once which will be used when accessing the mount point.
538 This is normally called from a plugs start() method.
539 @param resource: root resource
540 @type resource: L{twisted.web.resource.Resource}
541 """
542 rootResource = self._createRootResourceForPath(
543 self.getMountPoint(), resource)
544
545 self._rootResource = rootResource
546
548 """Get the mount point of this component
549 @returns: the mount point
550 """
551
552 return self.config['properties'].get('mount-point')
553