Package flumotion :: Package component :: Module feedcomponent010
[hide private]

Source Code for Module flumotion.component.feedcomponent010

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_feedcomponent010 -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  import gobject 
 24   
 25  import os 
 26  import time 
 27   
 28  from twisted.internet import reactor, defer 
 29   
 30  from flumotion.common import common, errors, pygobject, messages, log 
 31  from flumotion.common import gstreamer 
 32  from flumotion.common.i18n import N_, gettexter 
 33  from flumotion.common.planet import moods 
 34  from flumotion.component import component as basecomponent 
 35  from flumotion.component import feed, padmonitor 
 36  from flumotion.component.feeder import Feeder 
 37  from flumotion.component.eater import Eater 
 38   
 39  __version__ = "$Rev: 6695 $" 
 40  T_ = gettexter() 
 41   
 42   
43 -class FeedComponent(basecomponent.BaseComponent):
44 """ 45 I am a base class for all Flumotion feed components. 46 """ 47 48 # how often to update the UIState feeder statistics 49 FEEDER_STATS_UPDATE_FREQUENCY = 12.5 50 51 logCategory = 'feedcomponent' 52 53 ### BaseComponent interface implementations
54 - def init(self):
55 # add keys for eaters and feeders uiState 56 self.feeders = {} # feeder feedName -> Feeder 57 self.eaters = {} # eater eaterAlias -> Eater 58 self.uiState.addListKey('feeders') 59 self.uiState.addListKey('eaters') 60 61 self.pipeline = None 62 self.pipeline_signals = [] 63 self.bus_signal_id = None 64 self.effects = {} 65 self._feeder_probe_cl = None 66 67 self._pad_monitors = padmonitor.PadMonitorSet( 68 lambda: self.setMood(moods.happy), 69 lambda: self.setMood(moods.hungry)) 70 71 self._clock_slaved = False 72 self.clock_provider = None 73 self._master_clock_info = None # (ip, port, basetime) if we're the 74 # clock master 75 76 self._change_monitor = gstreamer.StateChangeMonitor() 77 78 # multifdsink's get-stats signal had critical bugs before this version 79 self._get_stats_supported = (gstreamer.get_plugin_version('tcp') 80 >= (0, 10, 11, 0))
81
82 - def do_setup(self):
83 """ 84 Sets up component. 85 86 Invokes the L{create_pipeline} and L{set_pipeline} vmethods, 87 which subclasses can provide. 88 """ 89 config = self.config 90 eater_config = config.get('eater', {}) 91 feeder_config = config.get('feed', []) 92 source_config = config.get('source', []) 93 94 self.debug("FeedComponent.do_setup(): eater_config %r", eater_config) 95 self.debug("FeedComponent.do_setup(): feeder_config %r", feeder_config) 96 self.debug("FeedComponent.do_setup(): source_config %r", source_config) 97 # for upgrade of code without restarting managers 98 # this will only be for components whose eater name in registry is 99 # default, so no need to import registry and find eater name 100 if eater_config == {} and source_config != []: 101 eater_config = {'default': [(x, 'default') for x in source_config]} 102 103 for eaterName in eater_config: 104 for feedId, eaterAlias in eater_config[eaterName]: 105 self.eaters[eaterAlias] = Eater(eaterAlias, eaterName) 106 self.uiState.append('eaters', self.eaters[eaterAlias].uiState) 107 108 for feederName in feeder_config: 109 self.feeders[feederName] = Feeder(feederName) 110 self.uiState.append('feeders', 111 self.feeders[feederName].uiState) 112 113 clockMaster = config.get('clock-master', None) 114 if clockMaster: 115 self._clock_slaved = clockMaster != config['avatarId'] 116 else: 117 self._clock_slaved = False 118 119 pipeline = self.create_pipeline() 120 self.connect_feeders(pipeline) 121 self.set_pipeline(pipeline) 122 123 self.debug("FeedComponent.do_setup(): setup finished") 124 125 self.try_start_pipeline() 126 127 # no race, messages marshalled asynchronously via the bus 128 d = self._change_monitor.add(gst.STATE_CHANGE_PAUSED_TO_PLAYING) 129 d.addCallback(lambda x: self.do_pipeline_playing())
130
131 - def setup_completed(self):
132 # Just log; we override the superclass to not turn happy here. 133 # Instead, we turn happy once the pipeline gets to PLAYING. 134 self.debug("Setup completed")
135 136 ### FeedComponent interface for subclasses
137 - def create_pipeline(self):
138 """ 139 Subclasses have to implement this method. 140 141 @rtype: L{gst.Pipeline} 142 """ 143 raise NotImplementedError, "subclass must implement create_pipeline"
144
145 - def set_pipeline(self, pipeline):
146 """ 147 Subclasses can override me. 148 They should chain up first. 149 """ 150 if self.pipeline: 151 self.cleanup() 152 self.pipeline = pipeline 153 self._setup_pipeline()
154
155 - def attachPadMonitorToFeeder(self, feederName):
156 elementName = self.feeders[feederName].payName 157 element = self.pipeline.get_by_name(elementName) 158 if not element: 159 raise errors.ComponentError("No such feeder %s" % feederName) 160 161 pad = element.get_pad('src') 162 self._pad_monitors.attach(pad, elementName)
163 164 ### FeedComponent methods
165 - def addEffect(self, effect):
166 self.effects[effect.name] = effect 167 effect.setComponent(self)
168
169 - def connect_feeders(self, pipeline):
170 # Connect to the client-fd-removed signals on each feeder, so we 171 # can clean up properly on removal. 172 def client_fd_removed(sink, fd, feeder): 173 # Called (as a signal callback) when the FD is no longer in 174 # use by multifdsink. 175 # This will call the registered callable on the fd. 176 # Called from GStreamer threads. 177 self.debug("cleaning up fd %d", fd) 178 feeder.clientDisconnected(fd)
179 180 for feeder in self.feeders.values(): 181 element = pipeline.get_by_name(feeder.elementName) 182 element.connect('client-fd-removed', client_fd_removed, 183 feeder) 184 self.debug("Connected to client-fd-removed on %r", feeder)
185
186 - def get_pipeline(self):
187 return self.pipeline
188
189 - def do_pipeline_playing(self):
190 """ 191 Invoked when the pipeline has changed the state to playing. 192 The default implementation sets the component's mood to HAPPY. 193 """ 194 self.setMood(moods.happy)
195
196 - def make_message_for_gstreamer_error(self, gerror, debug):
197 """Make a flumotion error message to show to the user. 198 199 This method may be overridden by components that have special 200 knowledge about potential errors. If the component does not know 201 about the error, it can chain up to this implementation, which 202 will make a generic message. 203 204 @param gerror: The GError from the error message posted on the 205 GStreamer message bus. 206 @type gerror: L{gst.GError} 207 @param debug: A string with debugging information. 208 @type debug: str 209 210 @returns: A L{flumotion.common.messages.Message} to show to the 211 user. 212 """ 213 # generate a unique id 214 mid = "%s-%s-%d" % (self.name, gerror.domain, gerror.code) 215 m = messages.Error(T_(N_( 216 "Internal GStreamer error.")), 217 debug="%s\n%s: %d\n%s" % ( 218 gerror.message, gerror.domain, gerror.code, debug), 219 mid=mid, priority=40) 220 return m
221
222 - def bus_message_received_cb(self, bus, message):
223 def state_changed(): 224 if src == self.pipeline: 225 old, new, pending = message.parse_state_changed() 226 self._change_monitor.state_changed(old, new)
227 228 def error(): 229 gerror, debug = message.parse_error() 230 self.warning('element %s error %s %s', 231 src.get_path_string(), gerror, debug) 232 self.setMood(moods.sad) 233 234 # this method can fail if the component has a mistake 235 try: 236 m = self.make_message_for_gstreamer_error(gerror, debug) 237 except Exception, e: 238 msg = log.getExceptionMessage(e) 239 m = messages.Error(T_(N_( 240 "Programming error in component.")), 241 debug="Bug in %r.make_message_for_gstreamer_error: %s" % ( 242 self.__class__, msg)) 243 244 self.state.append('messages', m) 245 self._change_monitor.have_error(self.pipeline.get_state(), 246 message) 247 248 def eos(): 249 name = src.get_name() 250 if name in self._pad_monitors: 251 self.info('End of stream in element %s', name) 252 self._pad_monitors[name].setInactive() 253 else: 254 self.info("We got an eos from %s", name) 255 256 def default(): 257 self.log('message received: %r', message) 258 259 handlers = {gst.MESSAGE_STATE_CHANGED: state_changed, 260 gst.MESSAGE_ERROR: error, 261 gst.MESSAGE_EOS: eos} 262 t = message.type 263 src = message.src 264 handlers.get(t, default)() 265 return True 266
267 - def install_eater_continuity_watch(self, eaterWatchElements):
268 """Watch a set of elements for discontinuity messages. 269 270 @param eaterWatchElements: the set of elements to watch for 271 discontinuities. 272 @type eaterWatchElements: Dict of elementName => Eater. 273 """ 274 def on_element_message(bus, message): 275 src = message.src 276 name = src.get_name() 277 if name in eaterWatchElements: 278 eater = eaterWatchElements[name] 279 s = message.structure 280 def timestampDiscont(): 281 prevTs = s["prev-timestamp"] 282 prevDuration = s["prev-duration"] 283 curTs = s["cur-timestamp"] 284 discont = curTs - (prevTs + prevDuration) 285 dSeconds = discont / float(gst.SECOND) 286 self.debug("we have a discont on eater %s of %f s " 287 "between %s and %s ", eater.eaterAlias, 288 dSeconds, gst.TIME_ARGS(prevTs), 289 gst.TIME_ARGS(curTs)) 290 eater.timestampDiscont(dSeconds, 291 float(curTs) / float(gst.SECOND))
292 293 def offsetDiscont(): 294 prevOffsetEnd = s["prev-offset-end"] 295 curOffset = s["cur-offset"] 296 discont = curOffset - prevOffsetEnd 297 self.debug("we have a discont on eater %s of %d " 298 "units between %d and %d ", 299 eater.eaterAlias, discont, prevOffsetEnd, 300 curOffset) 301 eater.offsetDiscont(discont, curOffset) 302 303 handlers = {'imperfect-timestamp': timestampDiscont, 304 'imperfect-offset': offsetDiscont} 305 if s.get_name() in handlers: 306 handlers[s.get_name()]() 307 308 # we know that there is a signal watch already installed 309 bus = self.pipeline.get_bus() 310 # never gets cleaned up; does that matter? 311 bus.connect("message::element", on_element_message) 312
313 - def install_eater_event_probes(self, eater):
314 def fdsrc_event(pad, event): 315 # An event probe used to consume unwanted EOS events on eaters. 316 # Called from GStreamer threads. 317 if event.type == gst.EVENT_EOS: 318 self.info('End of stream for eater %s, disconnect will be ' 319 'triggered', eater.eaterAlias) 320 # We swallow it because otherwise our component acts on the EOS 321 # and we can't recover from that later. Instead, fdsrc will be 322 # taken out and given a new fd on the next eatFromFD call. 323 return False 324 return True
325 326 def depay_event(pad, event): 327 # An event probe used to consume unwanted duplicate 328 # newsegment events. 329 # Called from GStreamer threads. 330 if event.type == gst.EVENT_NEWSEGMENT: 331 # We do this because we know gdppay/gdpdepay screw up on 2nd 332 # newsegments (unclear what the original reason for this 333 # was, perhaps #349204) 334 if getattr(eater, '_gotFirstNewSegment', False): 335 self.info("Subsequent new segment event received on " 336 "depay on eater %s", eater.eaterAlias) 337 # swallow (gulp) 338 return False 339 else: 340 eater._gotFirstNewSegment = True 341 return True 342 343 self.debug('adding event probe for eater %s', eater.eaterAlias) 344 fdsrc = self.get_element(eater.elementName) 345 fdsrc.get_pad("src").add_event_probe(fdsrc_event) 346 if gstreamer.get_plugin_version('gdp') < (0, 10, 10, 1): 347 depay = self.get_element(eater.depayName) 348 depay.get_pad("src").add_event_probe(depay_event) 349
350 - def _setup_pipeline(self):
351 self.debug('setup_pipeline()') 352 assert self.bus_signal_id == None 353 354 self.pipeline.set_name('pipeline-' + self.getName()) 355 bus = self.pipeline.get_bus() 356 bus.add_signal_watch() 357 self.bus_signal_id = bus.connect('message', 358 self.bus_message_received_cb) 359 sig_id = self.pipeline.connect('deep-notify', 360 gstreamer.verbose_deep_notify_cb, self) 361 self.pipeline_signals.append(sig_id) 362 363 # set to ready so that multifdsinks can always receive fds, even 364 # if the pipeline has a delayed start due to clock slaving 365 self.pipeline.set_state(gst.STATE_READY) 366 367 # start checking feeders, if we have a sufficiently recent multifdsink 368 if self._get_stats_supported: 369 self._feeder_probe_cl = reactor.callLater( 370 self.FEEDER_STATS_UPDATE_FREQUENCY, self._feeder_probe_calllater) 371 else: 372 self.warning("Feeder statistics unavailable, your " 373 "gst-plugins-base is too old") 374 m = messages.Warning(T_(N_( 375 "Your gst-plugins-base is too old, so " 376 "feeder statistics will be unavailable.")), 377 mid='multifdsink') 378 m.add(T_(N_( 379 "Please upgrade '%s' to version %s."), 'gst-plugins-base', 380 '0.10.11')) 381 self.addMessage(m) 382 383 for eater in self.eaters.values(): 384 self.install_eater_event_probes(eater) 385 pad = self.get_element(eater.elementName).get_pad('src') 386 self._pad_monitors.attach(pad, eater.elementName, 387 padmonitor.EaterPadMonitor, 388 self.reconnectEater, 389 eater.eaterAlias) 390 eater.setPadMonitor(self._pad_monitors[eater.elementName])
391
392 - def stop_pipeline(self):
393 if not self.pipeline: 394 return 395 396 if self.clock_provider: 397 self.clock_provider.set_property('active', False) 398 self.clock_provider = None 399 retval = self.pipeline.set_state(gst.STATE_NULL) 400 if retval != gst.STATE_CHANGE_SUCCESS: 401 self.warning('Setting pipeline to NULL failed')
402
403 - def cleanup(self):
404 self.debug("cleaning up") 405 406 assert self.pipeline != None 407 408 self.stop_pipeline() 409 # Disconnect signals 410 map(self.pipeline.disconnect, self.pipeline_signals) 411 self.pipeline_signals = [] 412 if self.bus_signal_id: 413 self.pipeline.get_bus().disconnect(self.bus_signal_id) 414 self.pipeline.get_bus().remove_signal_watch() 415 self.bus_signal_id = None 416 self.pipeline = None 417 418 if self._feeder_probe_cl: 419 self._feeder_probe_cl.cancel() 420 self._feeder_probe_cl = None 421 422 # clean up checkEater callLaters 423 for eater in self.eaters.values(): 424 self._pad_monitors.remove(eater.elementName) 425 eater.setPadMonitor(None)
426
427 - def do_stop(self):
428 self.debug('Stopping') 429 if self.pipeline: 430 self.cleanup() 431 self.debug('Stopped') 432 return defer.succeed(None)
433
434 - def set_master_clock(self, ip, port, base_time):
435 self.debug("Master clock set to %s:%d with base_time %s", ip, port, 436 gst.TIME_ARGS(base_time)) 437 438 assert self._clock_slaved 439 if self._master_clock_info == (ip, port, base_time): 440 self.debug("Same master clock info, returning directly") 441 return defer.succeed(None) 442 elif self._master_clock_info: 443 self.stop_pipeline() 444 445 self._master_clock_info = ip, port, base_time 446 447 clock = gst.NetClientClock(None, ip, port, base_time) 448 # disable the pipeline's management of base_time -- we're going 449 # to set it ourselves. 450 self.pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE) 451 self.pipeline.set_base_time(base_time) 452 self.pipeline.use_clock(clock) 453 454 self.try_start_pipeline()
455
456 - def get_master_clock(self):
457 """ 458 Return the connection details for the network clock provided by 459 this component, if any. 460 """ 461 if self.clock_provider: 462 ip, port, base_time = self._master_clock_info 463 return ip, port, base_time 464 else: 465 return None
466
467 - def provide_master_clock(self, port):
468 """ 469 Tell the component to provide a master clock on the given port. 470 471 @returns: a deferred firing a (ip, port, base_time) triple. 472 """ 473 def pipelinePaused(r): 474 clock = self.pipeline.get_clock() 475 # make sure the pipeline sticks with this clock 476 self.pipeline.use_clock(clock) 477 478 self.clock_provider = gst.NetTimeProvider(clock, None, port) 479 realport = self.clock_provider.get_property('port') 480 481 base_time = self.pipeline.get_base_time() 482 483 self.debug('provided master clock from %r, base time %s', 484 clock, gst.TIME_ARGS(base_time)) 485 486 if self.medium: 487 # FIXME: This isn't always correct. We need a more flexible API, 488 # and a proper network map, to do this. Even then, it's not 489 # always going to be possible. 490 ip = self.medium.getIP() 491 else: 492 ip = "127.0.0.1" 493 494 self._master_clock_info = (ip, realport, base_time) 495 return self.get_master_clock()
496 497 assert self.pipeline 498 assert not self._clock_slaved 499 (ret, state, pending) = self.pipeline.get_state(0) 500 if state != gst.STATE_PAUSED and state != gst.STATE_PLAYING: 501 self.debug("pipeline still spinning up: %r", state) 502 d = self._change_monitor.add(gst.STATE_CHANGE_READY_TO_PAUSED) 503 d.addCallback(pipelinePaused) 504 return d 505 elif self.clock_provider: 506 self.debug("returning existing master clock info") 507 return defer.succeed(self.get_master_clock()) 508 else: 509 return defer.maybeDeferred(pipelinePaused, None) 510 511 ### BaseComponent interface implementation
512 - def try_start_pipeline(self):
513 """ 514 Tell the component to start. 515 Whatever is using the component is responsible for making sure all 516 eaters have received their file descriptor to eat from. 517 """ 518 (ret, state, pending) = self.pipeline.get_state(0) 519 if state == gst.STATE_PLAYING: 520 self.log('already PLAYING') 521 return 522 523 if self._clock_slaved and not self._master_clock_info: 524 self.debug("Missing master clock info, deferring set to PLAYING") 525 return 526 527 for eater in self.eaters.values(): 528 if not eater.fd: 529 self.debug('eater %s not yet connected, deferring set to ' 530 'PLAYING', eater.eaterAlias) 531 return 532 533 self.debug("Setting pipeline %r to GST_STATE_PLAYING", self.pipeline) 534 self.pipeline.set_state(gst.STATE_PLAYING)
535
536 - def _feeder_probe_calllater(self):
537 for feedId, feeder in self.feeders.items(): 538 feederElement = self.get_element(feeder.elementName) 539 for client in feeder.getClients(): 540 # a currently disconnected client will have fd None 541 if client.fd is not None: 542 array = feederElement.emit('get-stats', client.fd) 543 if len(array) == 0: 544 # There is an unavoidable race here: we can't know 545 # whether the fd has been removed from multifdsink. 546 # However, if we call get-stats on an fd that 547 # multifdsink doesn't know about, we just get a 0-length 548 # array. We ensure that we don't reuse the FD too soon 549 # so this can't result in calling this on a valid but 550 # WRONG fd 551 self.debug('Feeder element for feed %s does not know ' 552 'client fd %d' % (feedId, client.fd)) 553 else: 554 client.setStats(array) 555 self._feeder_probe_cl = reactor.callLater(self.FEEDER_STATS_UPDATE_FREQUENCY, 556 self._feeder_probe_calllater)
557
558 - def unblock_eater(self, eaterAlias):
559 """ 560 After this function returns, the stream lock for this eater must have 561 been released. If your component needs to do something here, override 562 this method. 563 """ 564 pass
565
566 - def get_element(self, element_name):
567 """Get an element out of the pipeline. 568 569 If it is possible that the component has not yet been set up, 570 the caller needs to check if self.pipeline is actually set. 571 """ 572 assert self.pipeline 573 self.log('Looking up element %r in pipeline %r', 574 element_name, self.pipeline) 575 element = self.pipeline.get_by_name(element_name) 576 if not element: 577 self.warning("No element named %r in pipeline", element_name) 578 return element
579
580 - def get_element_property(self, element_name, property):
581 'Gets a property of an element in the GStreamer pipeline.' 582 self.debug("%s: getting property %s of element %s" % (self.getName(), property, element_name)) 583 element = self.get_element(element_name) 584 if not element: 585 msg = "Element '%s' does not exist" % element_name 586 self.warning(msg) 587 raise errors.PropertyError(msg) 588 589 self.debug('getting property %s on element %s' % (property, element_name)) 590 try: 591 value = element.get_property(property) 592 except (ValueError, TypeError): 593 msg = "Property '%s' on element '%s' does not exist" % (property, element_name) 594 self.warning(msg) 595 raise errors.PropertyError(msg) 596 597 # param enums and enums need to be returned by integer value 598 if isinstance(value, gobject.GEnum): 599 value = int(value) 600 601 return value
602
603 - def set_element_property(self, element_name, property, value):
604 'Sets a property on an element in the GStreamer pipeline.' 605 self.debug("%s: setting property %s of element %s to %s" % ( 606 self.getName(), property, element_name, value)) 607 element = self.get_element(element_name) 608 if not element: 609 msg = "Element '%s' does not exist" % element_name 610 self.warning(msg) 611 raise errors.PropertyError(msg) 612 613 self.debug('setting property %s on element %r to %s' % 614 (property, element_name, value)) 615 pygobject.gobject_set_property(element, property, value)
616 617 ### methods to connect component eaters and feeders
618 - def reconnectEater(self, eaterAlias):
619 if not self.medium: 620 self.debug("Can't reconnect eater %s, running " 621 "without a medium", eaterAlias) 622 return 623 624 self.eaters[eaterAlias].disconnected() 625 self.medium.connectEater(eaterAlias)
626
627 - def feedToFD(self, feedName, fd, cleanup, eaterId=None):
628 """ 629 @param feedName: name of the feed to feed to the given fd. 630 @type feedName: str 631 @param fd: the file descriptor to feed to 632 @type fd: int 633 @param cleanup: the function to call when the FD is no longer feeding 634 @type cleanup: callable 635 """ 636 self.debug('FeedToFD(%s, %d)', feedName, fd) 637 638 # We must have a pipeline in READY or above to do this. Do a 639 # non-blocking (zero timeout) get_state. 640 if not self.pipeline or self.pipeline.get_state(0)[1] == gst.STATE_NULL: 641 self.warning('told to feed %s to fd %d, but pipeline not ' 642 'running yet', feedName, fd) 643 cleanup(fd) 644 # can happen if we are restarting but the other component is 645 # happy; assume other side will reconnect later 646 return 647 648 if feedName not in self.feeders: 649 msg = "Cannot find feeder named '%s'" % feedName 650 mid = "feedToFD-%s" % feedName 651 m = messages.Warning(T_(N_("Internal Flumotion error.")), 652 debug=msg, mid=mid, priority=40) 653 self.state.append('messages', m) 654 self.warning(msg) 655 cleanup(fd) 656 return False 657 658 feeder = self.feeders[feedName] 659 element = self.get_element(feeder.elementName) 660 assert element 661 clientId = eaterId or ('client-%d' % fd) 662 element.emit('add', fd) 663 feeder.clientConnected(clientId, fd, cleanup)
664
665 - def eatFromFD(self, eaterAlias, feedId, fd):
666 """ 667 Tell the component to eat the given feedId from the given fd. 668 The component takes over the ownership of the fd, closing it when 669 no longer eating. 670 671 @param eaterAlias: the alias of the eater 672 @type eaterAlias: str 673 @param feedId: feed id (componentName:feedName) to eat from through 674 the given fd 675 @type feedId: str 676 @param fd: the file descriptor to eat from 677 @type fd: int 678 """ 679 self.debug('EatFromFD(%s, %s, %d)', eaterAlias, feedId, fd) 680 681 if not self.pipeline: 682 self.warning('told to eat %s from fd %d, but pipeline not ' 683 'running yet', feedId, fd) 684 # can happen if we are restarting but the other component is 685 # happy; assume other side will reconnect later 686 os.close(fd) 687 return 688 689 if eaterAlias not in self.eaters: 690 self.warning('Unknown eater alias: %s', eaterAlias) 691 os.close(fd) 692 return 693 694 eater = self.eaters[eaterAlias] 695 element = self.get_element(eater.elementName) 696 if not element: 697 self.warning('Eater element %s not found', eater.elementName) 698 os.close(fd) 699 return 700 701 # fdsrc only switches to the new fd in ready or below 702 (result, current, pending) = element.get_state(0L) 703 pipeline_playing = current not in [gst.STATE_NULL, gst.STATE_READY] 704 if pipeline_playing: 705 self.debug('eater %s in state %r, kidnapping it', 706 eaterAlias, current) 707 708 # we unlink fdsrc from its peer, take it out of the pipeline 709 # so we can set it to READY without having it send EOS, 710 # then switch fd and put it back in. 711 # To do this safely, we first block fdsrc:src, then let the 712 # component do any neccesary unlocking (needed for multi-input 713 # elements) 714 srcpad = element.get_pad('src') 715 716 def _block_cb(pad, blocked): 717 pass
718 srcpad.set_blocked_async(True, _block_cb) 719 self.unblock_eater(eaterAlias) 720 721 # Now, we can switch FD with this mess 722 sinkpad = srcpad.get_peer() 723 srcpad.unlink(sinkpad) 724 parent = element.get_parent() 725 parent.remove(element) 726 self.log("setting to ready") 727 element.set_state(gst.STATE_READY) 728 self.log("setting to ready complete!!!") 729 old = element.get_property('fd') 730 self.log("Closing old fd %d", old) 731 os.close(old) 732 element.set_property('fd', fd) 733 parent.add(element) 734 srcpad.link(sinkpad) 735 element.set_state(gst.STATE_PLAYING) 736 # We're done; unblock the pad 737 srcpad.set_blocked_async(False, _block_cb) 738 else: 739 element.set_property('fd', fd) 740 741 # update our eater uiState, saying that we are eating from a 742 # possibly new feedId 743 eater.connected(fd, feedId) 744 745 if not pipeline_playing: 746 self.try_start_pipeline() 747