1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import gst
23 import gobject
24
25 import os
26 import time
27
28 from twisted.internet import reactor, defer
29
30 from flumotion.common import common, errors, pygobject, messages, log
31 from flumotion.common import gstreamer
32 from flumotion.common.i18n import N_, gettexter
33 from flumotion.common.planet import moods
34 from flumotion.component import component as basecomponent
35 from flumotion.component import feed, padmonitor
36 from flumotion.component.feeder import Feeder
37 from flumotion.component.eater import Eater
38
39 __version__ = "$Rev: 6695 $"
40 T_ = gettexter()
41
42
44 """
45 I am a base class for all Flumotion feed components.
46 """
47
48
49 FEEDER_STATS_UPDATE_FREQUENCY = 12.5
50
51 logCategory = 'feedcomponent'
52
53
55
56 self.feeders = {}
57 self.eaters = {}
58 self.uiState.addListKey('feeders')
59 self.uiState.addListKey('eaters')
60
61 self.pipeline = None
62 self.pipeline_signals = []
63 self.bus_signal_id = None
64 self.effects = {}
65 self._feeder_probe_cl = None
66
67 self._pad_monitors = padmonitor.PadMonitorSet(
68 lambda: self.setMood(moods.happy),
69 lambda: self.setMood(moods.hungry))
70
71 self._clock_slaved = False
72 self.clock_provider = None
73 self._master_clock_info = None
74
75
76 self._change_monitor = gstreamer.StateChangeMonitor()
77
78
79 self._get_stats_supported = (gstreamer.get_plugin_version('tcp')
80 >= (0, 10, 11, 0))
81
83 """
84 Sets up component.
85
86 Invokes the L{create_pipeline} and L{set_pipeline} vmethods,
87 which subclasses can provide.
88 """
89 config = self.config
90 eater_config = config.get('eater', {})
91 feeder_config = config.get('feed', [])
92 source_config = config.get('source', [])
93
94 self.debug("FeedComponent.do_setup(): eater_config %r", eater_config)
95 self.debug("FeedComponent.do_setup(): feeder_config %r", feeder_config)
96 self.debug("FeedComponent.do_setup(): source_config %r", source_config)
97
98
99
100 if eater_config == {} and source_config != []:
101 eater_config = {'default': [(x, 'default') for x in source_config]}
102
103 for eaterName in eater_config:
104 for feedId, eaterAlias in eater_config[eaterName]:
105 self.eaters[eaterAlias] = Eater(eaterAlias, eaterName)
106 self.uiState.append('eaters', self.eaters[eaterAlias].uiState)
107
108 for feederName in feeder_config:
109 self.feeders[feederName] = Feeder(feederName)
110 self.uiState.append('feeders',
111 self.feeders[feederName].uiState)
112
113 clockMaster = config.get('clock-master', None)
114 if clockMaster:
115 self._clock_slaved = clockMaster != config['avatarId']
116 else:
117 self._clock_slaved = False
118
119 pipeline = self.create_pipeline()
120 self.connect_feeders(pipeline)
121 self.set_pipeline(pipeline)
122
123 self.debug("FeedComponent.do_setup(): setup finished")
124
125 self.try_start_pipeline()
126
127
128 d = self._change_monitor.add(gst.STATE_CHANGE_PAUSED_TO_PLAYING)
129 d.addCallback(lambda x: self.do_pipeline_playing())
130
132
133
134 self.debug("Setup completed")
135
136
138 """
139 Subclasses have to implement this method.
140
141 @rtype: L{gst.Pipeline}
142 """
143 raise NotImplementedError, "subclass must implement create_pipeline"
144
154
156 elementName = self.feeders[feederName].payName
157 element = self.pipeline.get_by_name(elementName)
158 if not element:
159 raise errors.ComponentError("No such feeder %s" % feederName)
160
161 pad = element.get_pad('src')
162 self._pad_monitors.attach(pad, elementName)
163
164
168
179
180 for feeder in self.feeders.values():
181 element = pipeline.get_by_name(feeder.elementName)
182 element.connect('client-fd-removed', client_fd_removed,
183 feeder)
184 self.debug("Connected to client-fd-removed on %r", feeder)
185
188
190 """
191 Invoked when the pipeline has changed the state to playing.
192 The default implementation sets the component's mood to HAPPY.
193 """
194 self.setMood(moods.happy)
195
197 """Make a flumotion error message to show to the user.
198
199 This method may be overridden by components that have special
200 knowledge about potential errors. If the component does not know
201 about the error, it can chain up to this implementation, which
202 will make a generic message.
203
204 @param gerror: The GError from the error message posted on the
205 GStreamer message bus.
206 @type gerror: L{gst.GError}
207 @param debug: A string with debugging information.
208 @type debug: str
209
210 @returns: A L{flumotion.common.messages.Message} to show to the
211 user.
212 """
213
214 mid = "%s-%s-%d" % (self.name, gerror.domain, gerror.code)
215 m = messages.Error(T_(N_(
216 "Internal GStreamer error.")),
217 debug="%s\n%s: %d\n%s" % (
218 gerror.message, gerror.domain, gerror.code, debug),
219 mid=mid, priority=40)
220 return m
221
227
228 def error():
229 gerror, debug = message.parse_error()
230 self.warning('element %s error %s %s',
231 src.get_path_string(), gerror, debug)
232 self.setMood(moods.sad)
233
234
235 try:
236 m = self.make_message_for_gstreamer_error(gerror, debug)
237 except Exception, e:
238 msg = log.getExceptionMessage(e)
239 m = messages.Error(T_(N_(
240 "Programming error in component.")),
241 debug="Bug in %r.make_message_for_gstreamer_error: %s" % (
242 self.__class__, msg))
243
244 self.state.append('messages', m)
245 self._change_monitor.have_error(self.pipeline.get_state(),
246 message)
247
248 def eos():
249 name = src.get_name()
250 if name in self._pad_monitors:
251 self.info('End of stream in element %s', name)
252 self._pad_monitors[name].setInactive()
253 else:
254 self.info("We got an eos from %s", name)
255
256 def default():
257 self.log('message received: %r', message)
258
259 handlers = {gst.MESSAGE_STATE_CHANGED: state_changed,
260 gst.MESSAGE_ERROR: error,
261 gst.MESSAGE_EOS: eos}
262 t = message.type
263 src = message.src
264 handlers.get(t, default)()
265 return True
266
268 """Watch a set of elements for discontinuity messages.
269
270 @param eaterWatchElements: the set of elements to watch for
271 discontinuities.
272 @type eaterWatchElements: Dict of elementName => Eater.
273 """
274 def on_element_message(bus, message):
275 src = message.src
276 name = src.get_name()
277 if name in eaterWatchElements:
278 eater = eaterWatchElements[name]
279 s = message.structure
280 def timestampDiscont():
281 prevTs = s["prev-timestamp"]
282 prevDuration = s["prev-duration"]
283 curTs = s["cur-timestamp"]
284 discont = curTs - (prevTs + prevDuration)
285 dSeconds = discont / float(gst.SECOND)
286 self.debug("we have a discont on eater %s of %f s "
287 "between %s and %s ", eater.eaterAlias,
288 dSeconds, gst.TIME_ARGS(prevTs),
289 gst.TIME_ARGS(curTs))
290 eater.timestampDiscont(dSeconds,
291 float(curTs) / float(gst.SECOND))
292
293 def offsetDiscont():
294 prevOffsetEnd = s["prev-offset-end"]
295 curOffset = s["cur-offset"]
296 discont = curOffset - prevOffsetEnd
297 self.debug("we have a discont on eater %s of %d "
298 "units between %d and %d ",
299 eater.eaterAlias, discont, prevOffsetEnd,
300 curOffset)
301 eater.offsetDiscont(discont, curOffset)
302
303 handlers = {'imperfect-timestamp': timestampDiscont,
304 'imperfect-offset': offsetDiscont}
305 if s.get_name() in handlers:
306 handlers[s.get_name()]()
307
308
309 bus = self.pipeline.get_bus()
310
311 bus.connect("message::element", on_element_message)
312
314 def fdsrc_event(pad, event):
315
316
317 if event.type == gst.EVENT_EOS:
318 self.info('End of stream for eater %s, disconnect will be '
319 'triggered', eater.eaterAlias)
320
321
322
323 return False
324 return True
325
326 def depay_event(pad, event):
327
328
329
330 if event.type == gst.EVENT_NEWSEGMENT:
331
332
333
334 if getattr(eater, '_gotFirstNewSegment', False):
335 self.info("Subsequent new segment event received on "
336 "depay on eater %s", eater.eaterAlias)
337
338 return False
339 else:
340 eater._gotFirstNewSegment = True
341 return True
342
343 self.debug('adding event probe for eater %s', eater.eaterAlias)
344 fdsrc = self.get_element(eater.elementName)
345 fdsrc.get_pad("src").add_event_probe(fdsrc_event)
346 if gstreamer.get_plugin_version('gdp') < (0, 10, 10, 1):
347 depay = self.get_element(eater.depayName)
348 depay.get_pad("src").add_event_probe(depay_event)
349
351 self.debug('setup_pipeline()')
352 assert self.bus_signal_id == None
353
354 self.pipeline.set_name('pipeline-' + self.getName())
355 bus = self.pipeline.get_bus()
356 bus.add_signal_watch()
357 self.bus_signal_id = bus.connect('message',
358 self.bus_message_received_cb)
359 sig_id = self.pipeline.connect('deep-notify',
360 gstreamer.verbose_deep_notify_cb, self)
361 self.pipeline_signals.append(sig_id)
362
363
364
365 self.pipeline.set_state(gst.STATE_READY)
366
367
368 if self._get_stats_supported:
369 self._feeder_probe_cl = reactor.callLater(
370 self.FEEDER_STATS_UPDATE_FREQUENCY, self._feeder_probe_calllater)
371 else:
372 self.warning("Feeder statistics unavailable, your "
373 "gst-plugins-base is too old")
374 m = messages.Warning(T_(N_(
375 "Your gst-plugins-base is too old, so "
376 "feeder statistics will be unavailable.")),
377 mid='multifdsink')
378 m.add(T_(N_(
379 "Please upgrade '%s' to version %s."), 'gst-plugins-base',
380 '0.10.11'))
381 self.addMessage(m)
382
383 for eater in self.eaters.values():
384 self.install_eater_event_probes(eater)
385 pad = self.get_element(eater.elementName).get_pad('src')
386 self._pad_monitors.attach(pad, eater.elementName,
387 padmonitor.EaterPadMonitor,
388 self.reconnectEater,
389 eater.eaterAlias)
390 eater.setPadMonitor(self._pad_monitors[eater.elementName])
391
393 if not self.pipeline:
394 return
395
396 if self.clock_provider:
397 self.clock_provider.set_property('active', False)
398 self.clock_provider = None
399 retval = self.pipeline.set_state(gst.STATE_NULL)
400 if retval != gst.STATE_CHANGE_SUCCESS:
401 self.warning('Setting pipeline to NULL failed')
402
404 self.debug("cleaning up")
405
406 assert self.pipeline != None
407
408 self.stop_pipeline()
409
410 map(self.pipeline.disconnect, self.pipeline_signals)
411 self.pipeline_signals = []
412 if self.bus_signal_id:
413 self.pipeline.get_bus().disconnect(self.bus_signal_id)
414 self.pipeline.get_bus().remove_signal_watch()
415 self.bus_signal_id = None
416 self.pipeline = None
417
418 if self._feeder_probe_cl:
419 self._feeder_probe_cl.cancel()
420 self._feeder_probe_cl = None
421
422
423 for eater in self.eaters.values():
424 self._pad_monitors.remove(eater.elementName)
425 eater.setPadMonitor(None)
426
433
435 self.debug("Master clock set to %s:%d with base_time %s", ip, port,
436 gst.TIME_ARGS(base_time))
437
438 assert self._clock_slaved
439 if self._master_clock_info == (ip, port, base_time):
440 self.debug("Same master clock info, returning directly")
441 return defer.succeed(None)
442 elif self._master_clock_info:
443 self.stop_pipeline()
444
445 self._master_clock_info = ip, port, base_time
446
447 clock = gst.NetClientClock(None, ip, port, base_time)
448
449
450 self.pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE)
451 self.pipeline.set_base_time(base_time)
452 self.pipeline.use_clock(clock)
453
454 self.try_start_pipeline()
455
457 """
458 Return the connection details for the network clock provided by
459 this component, if any.
460 """
461 if self.clock_provider:
462 ip, port, base_time = self._master_clock_info
463 return ip, port, base_time
464 else:
465 return None
466
468 """
469 Tell the component to provide a master clock on the given port.
470
471 @returns: a deferred firing a (ip, port, base_time) triple.
472 """
473 def pipelinePaused(r):
474 clock = self.pipeline.get_clock()
475
476 self.pipeline.use_clock(clock)
477
478 self.clock_provider = gst.NetTimeProvider(clock, None, port)
479 realport = self.clock_provider.get_property('port')
480
481 base_time = self.pipeline.get_base_time()
482
483 self.debug('provided master clock from %r, base time %s',
484 clock, gst.TIME_ARGS(base_time))
485
486 if self.medium:
487
488
489
490 ip = self.medium.getIP()
491 else:
492 ip = "127.0.0.1"
493
494 self._master_clock_info = (ip, realport, base_time)
495 return self.get_master_clock()
496
497 assert self.pipeline
498 assert not self._clock_slaved
499 (ret, state, pending) = self.pipeline.get_state(0)
500 if state != gst.STATE_PAUSED and state != gst.STATE_PLAYING:
501 self.debug("pipeline still spinning up: %r", state)
502 d = self._change_monitor.add(gst.STATE_CHANGE_READY_TO_PAUSED)
503 d.addCallback(pipelinePaused)
504 return d
505 elif self.clock_provider:
506 self.debug("returning existing master clock info")
507 return defer.succeed(self.get_master_clock())
508 else:
509 return defer.maybeDeferred(pipelinePaused, None)
510
511
513 """
514 Tell the component to start.
515 Whatever is using the component is responsible for making sure all
516 eaters have received their file descriptor to eat from.
517 """
518 (ret, state, pending) = self.pipeline.get_state(0)
519 if state == gst.STATE_PLAYING:
520 self.log('already PLAYING')
521 return
522
523 if self._clock_slaved and not self._master_clock_info:
524 self.debug("Missing master clock info, deferring set to PLAYING")
525 return
526
527 for eater in self.eaters.values():
528 if not eater.fd:
529 self.debug('eater %s not yet connected, deferring set to '
530 'PLAYING', eater.eaterAlias)
531 return
532
533 self.debug("Setting pipeline %r to GST_STATE_PLAYING", self.pipeline)
534 self.pipeline.set_state(gst.STATE_PLAYING)
535
557
559 """
560 After this function returns, the stream lock for this eater must have
561 been released. If your component needs to do something here, override
562 this method.
563 """
564 pass
565
567 """Get an element out of the pipeline.
568
569 If it is possible that the component has not yet been set up,
570 the caller needs to check if self.pipeline is actually set.
571 """
572 assert self.pipeline
573 self.log('Looking up element %r in pipeline %r',
574 element_name, self.pipeline)
575 element = self.pipeline.get_by_name(element_name)
576 if not element:
577 self.warning("No element named %r in pipeline", element_name)
578 return element
579
581 'Gets a property of an element in the GStreamer pipeline.'
582 self.debug("%s: getting property %s of element %s" % (self.getName(), property, element_name))
583 element = self.get_element(element_name)
584 if not element:
585 msg = "Element '%s' does not exist" % element_name
586 self.warning(msg)
587 raise errors.PropertyError(msg)
588
589 self.debug('getting property %s on element %s' % (property, element_name))
590 try:
591 value = element.get_property(property)
592 except (ValueError, TypeError):
593 msg = "Property '%s' on element '%s' does not exist" % (property, element_name)
594 self.warning(msg)
595 raise errors.PropertyError(msg)
596
597
598 if isinstance(value, gobject.GEnum):
599 value = int(value)
600
601 return value
602
604 'Sets a property on an element in the GStreamer pipeline.'
605 self.debug("%s: setting property %s of element %s to %s" % (
606 self.getName(), property, element_name, value))
607 element = self.get_element(element_name)
608 if not element:
609 msg = "Element '%s' does not exist" % element_name
610 self.warning(msg)
611 raise errors.PropertyError(msg)
612
613 self.debug('setting property %s on element %r to %s' %
614 (property, element_name, value))
615 pygobject.gobject_set_property(element, property, value)
616
617
619 if not self.medium:
620 self.debug("Can't reconnect eater %s, running "
621 "without a medium", eaterAlias)
622 return
623
624 self.eaters[eaterAlias].disconnected()
625 self.medium.connectEater(eaterAlias)
626
627 - def feedToFD(self, feedName, fd, cleanup, eaterId=None):
628 """
629 @param feedName: name of the feed to feed to the given fd.
630 @type feedName: str
631 @param fd: the file descriptor to feed to
632 @type fd: int
633 @param cleanup: the function to call when the FD is no longer feeding
634 @type cleanup: callable
635 """
636 self.debug('FeedToFD(%s, %d)', feedName, fd)
637
638
639
640 if not self.pipeline or self.pipeline.get_state(0)[1] == gst.STATE_NULL:
641 self.warning('told to feed %s to fd %d, but pipeline not '
642 'running yet', feedName, fd)
643 cleanup(fd)
644
645
646 return
647
648 if feedName not in self.feeders:
649 msg = "Cannot find feeder named '%s'" % feedName
650 mid = "feedToFD-%s" % feedName
651 m = messages.Warning(T_(N_("Internal Flumotion error.")),
652 debug=msg, mid=mid, priority=40)
653 self.state.append('messages', m)
654 self.warning(msg)
655 cleanup(fd)
656 return False
657
658 feeder = self.feeders[feedName]
659 element = self.get_element(feeder.elementName)
660 assert element
661 clientId = eaterId or ('client-%d' % fd)
662 element.emit('add', fd)
663 feeder.clientConnected(clientId, fd, cleanup)
664
665 - def eatFromFD(self, eaterAlias, feedId, fd):
666 """
667 Tell the component to eat the given feedId from the given fd.
668 The component takes over the ownership of the fd, closing it when
669 no longer eating.
670
671 @param eaterAlias: the alias of the eater
672 @type eaterAlias: str
673 @param feedId: feed id (componentName:feedName) to eat from through
674 the given fd
675 @type feedId: str
676 @param fd: the file descriptor to eat from
677 @type fd: int
678 """
679 self.debug('EatFromFD(%s, %s, %d)', eaterAlias, feedId, fd)
680
681 if not self.pipeline:
682 self.warning('told to eat %s from fd %d, but pipeline not '
683 'running yet', feedId, fd)
684
685
686 os.close(fd)
687 return
688
689 if eaterAlias not in self.eaters:
690 self.warning('Unknown eater alias: %s', eaterAlias)
691 os.close(fd)
692 return
693
694 eater = self.eaters[eaterAlias]
695 element = self.get_element(eater.elementName)
696 if not element:
697 self.warning('Eater element %s not found', eater.elementName)
698 os.close(fd)
699 return
700
701
702 (result, current, pending) = element.get_state(0L)
703 pipeline_playing = current not in [gst.STATE_NULL, gst.STATE_READY]
704 if pipeline_playing:
705 self.debug('eater %s in state %r, kidnapping it',
706 eaterAlias, current)
707
708
709
710
711
712
713
714 srcpad = element.get_pad('src')
715
716 def _block_cb(pad, blocked):
717 pass
718 srcpad.set_blocked_async(True, _block_cb)
719 self.unblock_eater(eaterAlias)
720
721
722 sinkpad = srcpad.get_peer()
723 srcpad.unlink(sinkpad)
724 parent = element.get_parent()
725 parent.remove(element)
726 self.log("setting to ready")
727 element.set_state(gst.STATE_READY)
728 self.log("setting to ready complete!!!")
729 old = element.get_property('fd')
730 self.log("Closing old fd %d", old)
731 os.close(old)
732 element.set_property('fd', fd)
733 parent.add(element)
734 srcpad.link(sinkpad)
735 element.set_state(gst.STATE_PLAYING)
736
737 srcpad.set_blocked_async(False, _block_cb)
738 else:
739 element.set_property('fd', fd)
740
741
742
743 eater.connected(fd, feedId)
744
745 if not pipeline_playing:
746 self.try_start_pipeline()
747