1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import time
23
24 import gst
25 from twisted.cred import credentials
26 from twisted.internet import reactor, error, defer
27 from twisted.web import server
28 from zope.interface import implements
29
30 from flumotion.common import gstreamer, errors
31 from flumotion.common import messages, netutils, interfaces
32 from flumotion.common.format import formatStorage, formatTime
33 from flumotion.common.i18n import N_, gettexter
34 from flumotion.component import feedcomponent
35 from flumotion.component.base import http
36 from flumotion.component.component import moods
37 from flumotion.component.consumers.httpstreamer import resources
38 from flumotion.component.misc.porter import porterclient
39 from flumotion.twisted import fdserver
40
41 __all__ = ['HTTPMedium', 'MultifdSinkStreamer']
42 __version__ = "$Rev: 6981 $"
43 T_ = gettexter()
44 STATS_POLL_INTERVAL = 10
45 UI_UPDATE_THROTTLE_PERIOD = 2.0
46
47
48
49
52 self.sink = sink
53
54 self.no_clients = 0
55 self.clients_added_count = 0
56 self.clients_removed_count = 0
57 self.start_time = time.time()
58
59 self.peak_client_number = 0
60 self.peak_epoch = self.start_time
61 self.load_deltas = [0, 0]
62 self._load_deltas_period = 10
63 self._load_deltas_ongoing = [time.time(), 0, 0]
64 self._currentBitrate = -1
65 self._lastBytesReceived = -1
66
67
68 self.average_client_number = 0
69 self.average_time = self.start_time
70
71 self.hostname = "localhost"
72 self.port = 0
73 self.mountPoint = "/"
74
76
77 now = time.time()
78
79 dt1 = self.average_time - self.start_time
80 dc1 = self.average_client_number
81 dt2 = now - self.average_time
82 dc2 = self.no_clients
83 self.average_time = now
84 if dt1 == 0:
85
86 self.average_client_number = 0
87 else:
88 dt = dt1 + dt2
89 before = (dc1 * dt1) / dt
90 after = dc2 * dt2 / dt
91 self.average_client_number = before + after
92
94 self._updateAverage()
95
96 self.no_clients += 1
97 self.clients_added_count +=1
98
99
100 if self.no_clients >= self.peak_client_number:
101 self.peak_epoch = time.time()
102 self.peak_client_number = self.no_clients
103
105 self._updateAverage()
106 self.no_clients -= 1
107 self.clients_removed_count +=1
108
110 """
111 Periodically, update our statistics on load deltas, and update the
112 UIState with new values for total bytes, bitrate, etc.
113 """
114
115 oldtime, oldadd, oldremove = self._load_deltas_ongoing
116 add, remove = self.clients_added_count, self.clients_removed_count
117 now = time.time()
118 diff = float(now - oldtime)
119
120 self.load_deltas = [(add-oldadd)/diff, (remove-oldremove)/diff]
121 self._load_deltas_ongoing = [now, add, remove]
122
123 bytesReceived = self.getBytesReceived()
124 if self._lastBytesReceived >= 0:
125 self._currentBitrate = ((bytesReceived - self._lastBytesReceived) *
126 8 / STATS_POLL_INTERVAL)
127 self._lastBytesReceived = bytesReceived
128
129 self.update_ui_state()
130
131 self._updateCallLaterId = reactor.callLater(STATS_POLL_INTERVAL,
132 self._updateStats)
133
135 if self._currentBitrate >= 0:
136 return self._currentBitrate
137 else:
138 return self.getBytesReceived() * 8 / self.getUptime()
139
141 return self.sink.get_property('bytes-served')
142
144 return self.sink.get_property('bytes-to-serve')
145
147 return time.time() - self.start_time
148
150 return self.no_clients
151
153 return self.peak_client_number
154
156 return self.peak_epoch
157
159 return self.average_client_number
160
162 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
163
165 return self.load_deltas
166
168 c = self
169
170 bytes_sent = c.getBytesSent()
171 bytes_received = c.getBytesReceived()
172 uptime = c.getUptime()
173
174 set('stream-mime', c.get_mime())
175 set('stream-url', c.getUrl())
176 set('stream-uptime', formatTime(uptime))
177 bitspeed = bytes_received * 8 / uptime
178 currentbitrate = self.getCurrentBitrate()
179 set('stream-bitrate', formatStorage(bitspeed) + 'bit/s')
180 set('stream-current-bitrate',
181 formatStorage(currentbitrate) + 'bit/s')
182 set('stream-totalbytes', formatStorage(bytes_received) + 'Byte')
183 set('stream-bitrate-raw', bitspeed)
184 set('stream-totalbytes-raw', bytes_received)
185
186 set('clients-current', str(c.getClients()))
187 set('clients-max', str(c.getMaxClients()))
188 set('clients-peak', str(c.getPeakClients()))
189 set('clients-peak-time', c.getPeakEpoch())
190 set('clients-average', str(int(c.getAverageClients())))
191
192 bitspeed = bytes_sent * 8 / uptime
193 set('consumption-bitrate', formatStorage(bitspeed) + 'bit/s')
194 set('consumption-bitrate-current',
195 formatStorage(currentbitrate * c.getClients()) + 'bit/s')
196 set('consumption-totalbytes', formatStorage(bytes_sent) + 'Byte')
197 set('consumption-bitrate-raw', bitspeed)
198 set('consumption-totalbytes-raw', bytes_sent)
199
200
201 -class HTTPMedium(feedcomponent.FeedComponentMedium):
207
209 """
210 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None.
211 """
212 d = self.callRemote('authenticate', bouncerName, keycard)
213 return d
214
215 - def keepAlive(self, bouncerName, issuerName, ttl):
216 """
217 @rtype: L{twisted.internet.defer.Deferred}
218 """
219 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
220
222 """
223 @rtype: L{twisted.internet.defer.Deferred}
224 """
225 return self.callRemote('removeKeycardId', bouncerName, keycardId)
226
227
230
233
236
239
242
245
246
247
249 implements(interfaces.IStreamingComponent)
250
251 checkOffset = True
252
253
254 logCategory = 'cons-http'
255
256 pipe_template = 'multifdsink name=sink ' + \
257 'sync=false ' + \
258 'recover-policy=3'
259
260 componentMediumClass = HTTPMedium
261
263 reactor.debug = True
264 self.debug("HTTP streamer initialising")
265
266 self.caps = None
267 self.resource = None
268 self.httpauth = None
269 self.mountPoint = None
270 self.burst_on_connect = False
271
272 self.description = None
273
274 self.type = None
275
276
277 self._pbclient = None
278 self._porterUsername = None
279 self._porterPassword = None
280 self._porterPath = None
281
282
283
284 self.port = None
285
286 self.iface = None
287
288 self._tport = None
289
290 self._updateCallLaterId = None
291 self._lastUpdate = 0
292 self._updateUI_DC = None
293
294 self._pending_removals = {}
295
296 for i in ('stream-mime', 'stream-uptime', 'stream-current-bitrate',
297 'stream-bitrate', 'stream-totalbytes', 'clients-current',
298 'clients-max', 'clients-peak', 'clients-peak-time',
299 'clients-average', 'consumption-bitrate',
300 'consumption-bitrate-current',
301 'consumption-totalbytes', 'stream-bitrate-raw',
302 'stream-totalbytes-raw', 'consumption-bitrate-raw',
303 'consumption-totalbytes-raw', 'stream-url'):
304 self.uiState.addKey(i, None)
305
308
311
313
314
315 self.fixRenamedProperties(props, [
316 ('issuer', 'issuer-class'),
317 ('mount_point', 'mount-point'),
318 ('porter_socket_path', 'porter-socket-path'),
319 ('porter_username', 'porter-username'),
320 ('porter_password', 'porter-password'),
321 ('user_limit', 'client-limit'),
322 ('bandwidth_limit', 'bandwidth-limit'),
323 ('burst_on_connect', 'burst-on-connect'),
324 ('burst_size', 'burst-size'),
325 ])
326
327 if props.get('type', 'master') == 'slave':
328 for k in 'socket-path', 'username', 'password':
329 if not 'porter-' + k in props:
330 raise errors.ConfigError("slave mode, missing required"
331 " property 'porter-%s'" % k)
332
333 if 'burst-size' in props and 'burst-time' in props:
334 raise errors.ConfigError('both burst-size and burst-time '
335 'set, cannot satisfy')
336
337
338 version = gstreamer.get_plugin_version('tcp')
339 if version < (0, 10, 9, 1):
340 m = messages.Error(T_(N_(
341 "Version %s of the '%s' GStreamer plug-in is too old.\n"),
342 ".".join(map(str, version)), 'multifdsink'))
343 m.add(T_(N_("Please upgrade '%s' to version %s."),
344 'gst-plugins-base', '0.10.10'))
345 addMessage(m)
346
348 try:
349 sink.get_property('units-max')
350 return True
351 except TypeError:
352 return False
353
355 if self.burst_on_connect:
356 if self.burst_time and self.time_bursting_supported(sink):
357 self.debug("Configuring burst mode for %f second burst",
358 self.burst_time)
359
360
361 sink.set_property('sync-method', 4)
362 sink.set_property('burst-unit', 2)
363 sink.set_property('burst-value',
364 long(self.burst_time * gst.SECOND))
365
366
367
368
369 sink.set_property('time-min',
370 long((self.burst_time + 5) * gst.SECOND))
371
372 sink.set_property('unit-type', 2)
373 sink.set_property('units-soft-max',
374 long((self.burst_time + 8) * gst.SECOND))
375 sink.set_property('units-max',
376 long((self.burst_time + 10) * gst.SECOND))
377 elif self.burst_size:
378 self.debug("Configuring burst mode for %d kB burst",
379 self.burst_size)
380
381
382
383
384
385 sink.set_property('sync-method', 'burst-keyframe')
386 sink.set_property('burst-unit', 'bytes')
387 sink.set_property('burst-value', self.burst_size * 1024)
388
389
390
391
392 sink.set_property('bytes-min', (self.burst_size + 512) * 1024)
393
394
395
396
397
398
399
400 sink.set_property('buffers-soft-max',
401 (self.burst_size + 1024) / 4)
402 sink.set_property('buffers-max',
403 (self.burst_size + 2048) / 4)
404
405 else:
406
407 self.debug("simple burst-on-connect, setting sync-method 2")
408 sink.set_property('sync-method', 2)
409
410 sink.set_property('buffers-soft-max', 250)
411 sink.set_property('buffers-max', 500)
412 else:
413 self.debug("no burst-on-connect, setting sync-method 0")
414 sink.set_property('sync-method', 0)
415
416 sink.set_property('buffers-soft-max', 250)
417 sink.set_property('buffers-max', 500)
418
524
526 return '<MultifdSinkStreamer (%s)>' % self.name
527
529 return self.resource.maxclients
530
532 if self.caps:
533 return self.caps.get_structure(0).get_name()
534
536 mime = self.get_mime()
537 if mime == 'multipart/x-mixed-replace':
538 mime += ";boundary=ThisRandomString"
539 return mime
540
542 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
543
545 socket = 'flumotion.component.plugs.streamdata.StreamDataProvider'
546 if self.plugs[socket]:
547 plug = self.plugs[socket][-1]
548 return plug.getStreamData()
549 else:
550 return {
551 'protocol': 'HTTP',
552 'description': self.description,
553 'url' : self.getUrl()
554 }
555
557 """Return a tuple (deltaadded, deltaremoved, bytes_transferred,
558 current_clients, current_load) of our current bandwidth and user values.
559 The deltas are estimates of how much bitrate is added, removed
560 due to client connections, disconnections, per second.
561 """
562
563
564 deltaadded, deltaremoved = self.getLoadDeltas()
565
566 bytes_received = self.getBytesReceived()
567 uptime = self.getUptime()
568 bitrate = bytes_received * 8 / uptime
569
570 bytes_sent = self.getBytesSent()
571 clients_connected = self.getClients()
572 current_load = bitrate * clients_connected
573
574 return (deltaadded * bitrate, deltaremoved * bitrate, bytes_sent,
575 clients_connected, current_load)
576
580
584
586 """Remove all the clients.
587
588 Returns a deferred fired once all clients have been removed.
589 """
590 if self.resource:
591
592 self.debug("Asking for all clients to be removed")
593 return self.resource.removeAllClients()
594
596 """Update the uiState object.
597 Such updates (through this function) are throttled to a maximum rate,
598 to avoid saturating admin clients with traffic when many clients are
599 connecting/disconnecting.
600 """
601 def setIfChanged(k, v):
602 if self.uiState.get(k) != v:
603 self.uiState.set(k, v)
604
605 def update_ui_state_later():
606 self._updateUI_DC = None
607 self.update_ui_state()
608
609 now = time.time()
610
611
612 if now - self._lastUpdate >= UI_UPDATE_THROTTLE_PERIOD:
613 if self._updateUI_DC:
614 self._updateUI_DC.cancel()
615 self._updateUI_DC = None
616
617 self._lastUpdate = now
618
619
620 self.updateState(setIfChanged)
621 elif not self._updateUI_DC:
622
623
624 self._updateUI_DC = reactor.callLater(UI_UPDATE_THROTTLE_PERIOD,
625 update_ui_state_later)
626
631
633 self.log('[fd %5d] client_removed_handler, reason %s', fd, reason)
634 if reason.value_name == 'GST_CLIENT_STATUS_ERROR':
635 self.warning('[fd %5d] Client removed because of write error' % fd)
636
637 self.resource.clientRemoved(sink, fd, reason, stats)
638 Stats.clientRemoved(self)
639 self.update_ui_state()
640
641
642
644 caps = pad.get_negotiated_caps()
645 if caps == None:
646 return
647
648 caps_str = gstreamer.caps_repr(caps)
649 self.debug('Got caps: %s' % caps_str)
650
651 if not self.caps == None:
652 self.warning('Already had caps: %s, replacing' % caps_str)
653
654 self.debug('Storing caps: %s' % caps_str)
655 self.caps = caps
656
657 reactor.callFromThread(self.update_ui_state)
658
659
660
661
662
663
665 stats = sink.emit('get-stats', fd)
666 self._pending_removals[fd] = (stats, reason)
667
668
670 (stats, reason) = self._pending_removals.pop(fd)
671
672 reactor.callFromThread(self._client_removed_handler, sink, fd,
673 reason, stats)
674
675
676
678 if self._updateCallLaterId:
679 self._updateCallLaterId.cancel()
680 self._updateCallLaterId = None
681
682 if self.httpauth:
683 self.httpauth.stopKeepAlive()
684
685 if self._tport:
686 self._tport.stopListening()
687
688 l = []
689
690
691 clients = self.remove_all_clients()
692 if clients:
693 l.append(clients)
694
695 if self.type == 'slave' and self._pbclient:
696 l.append(self._pbclient.deregisterPath(self.mountPoint))
697 return defer.DeferredList(l)
698
700 """Provide a new set of porter login information, for when we're
701 in slave mode and the porter changes.
702 If we're currently connected, this won't disconnect - it'll just change
703 the information so that next time we try and connect we'll use the
704 new ones
705 """
706 if self.type == 'slave':
707 self._porterUsername = username
708 self._porterPassword = password
709
710 creds = credentials.UsernamePassword(self._porterUsername,
711 self._porterPassword)
712
713 self._pbclient.startLogin(creds, self._pbclient.medium)
714
715
716 if path != self._porterPath:
717 self.debug("Changing porter login to use \"%s\"", path)
718 self._porterPath = path
719 self._pbclient.stopTrying()
720
721 self._pbclient.resetDelay()
722 reactor.connectWith(
723 fdserver.FDConnector, self._porterPath,
724 self._pbclient, 10, checkPID=False)
725 else:
726 raise errors.WrongStateError(
727 "Can't specify porter details in master mode")
728
739
741 root = resources.HTTPRoot()
742
743 mount = self.mountPoint[1:]
744 root.putChild(mount, self.resource)
745 if self.type == 'slave':
746
747
748
749
750
751
752
753
754
755
756
757
758
759 self._porterDeferred = d = defer.Deferred()
760 mountpoints = [self.mountPoint]
761 self._pbclient = porterclient.HTTPPorterClientFactory(
762 server.Site(resource=root), mountpoints, d)
763
764 creds = credentials.UsernamePassword(self._porterUsername,
765 self._porterPassword)
766 self._pbclient.startLogin(creds, self._pbclient.medium)
767
768 self.debug("Starting porter login at \"%s\"", self._porterPath)
769
770 reactor.connectWith(
771 fdserver.FDConnector, self._porterPath,
772 self._pbclient, 10, checkPID=False)
773 else:
774
775 try:
776 self.debug('Listening on %d' % self.port)
777 iface = self.iface or ""
778 self._tport = reactor.listenTCP(self.port, server.Site(resource=root),
779 interface=iface)
780 except error.CannotListenError:
781 t = 'Port %d is not available.' % self.port
782 self.warning(t)
783 m = messages.Error(T_(N_(
784 "Network error: TCP port %d is not available."), self.port))
785 self.addMessage(m)
786 self.setMood(moods.sad)
787 return defer.fail(errors.ComponentStartHandledError(t))
788