Package flumotion :: Package component :: Module feedcomponent010
[hide private]

Source Code for Module flumotion.component.feedcomponent010

   1  # -*- Mode: Python; test-case-name: flumotion.test.test_feedcomponent010 -*- 
   2  # vi:si:et:sw=4:sts=4:ts=4 
   3  # 
   4  # Flumotion - a streaming media server 
   5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
   6  # All rights reserved. 
   7   
   8  # This file may be distributed and/or modified under the terms of 
   9  # the GNU General Public License version 2 as published by 
  10  # the Free Software Foundation. 
  11  # This file is distributed without any warranty; without even the implied 
  12  # warranty of merchantability or fitness for a particular purpose. 
  13  # See "LICENSE.GPL" in the source distribution for more information. 
  14   
  15  # Licensees having purchased or holding a valid Flumotion Advanced 
  16  # Streaming Server license may use this file in accordance with the 
  17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
  18  # See "LICENSE.Flumotion" in the source distribution for more information. 
  19   
  20  # Headers in this file shall remain intact. 
  21   
  22  import gst 
  23  import gobject 
  24   
  25  import os 
  26  import time 
  27   
  28  from twisted.internet import reactor, defer 
  29   
  30  from flumotion.component import component as basecomponent 
  31  from flumotion.common import common, errors, pygobject, messages 
  32  from flumotion.common import gstreamer, componentui 
  33  from flumotion.worker import feed 
  34   
  35  from flumotion.common.planet import moods 
  36  from flumotion.common.pygobject import gsignal 
  37   
  38  from flumotion.common.messages import N_ 
  39  T_ = messages.gettexter('flumotion') 
  40   
41 -class Feeder:
42 """ 43 This class groups feeder-related information as used by a Feed Component. 44 45 @ivar feedId: id of the feed this is a feeder for 46 @ivar uiState: the serializable UI State for this feeder 47 """
48 - def __init__(self, feedId):
49 self.feedId = feedId 50 self.uiState = componentui.WorkerComponentUIState() 51 self.uiState.addKey('feedId') 52 self.uiState.set('feedId', feedId) 53 self.uiState.addListKey('clients') 54 self._fdToClient = {} # fd -> (FeederClient, cleanupfunc) 55 self._clients = {} # id -> FeederClient
56
57 - def clientConnected(self, clientId, fd, cleanup):
58 """ 59 The given client has connected on the given file descriptor, and is 60 being added to multifdsink. This is called solely from the reactor 61 thread. 62 63 @param clientId: id of the client of the feeder 64 @param fd: file descriptor representing the client 65 @param cleanup: callable to be called when the given fd is removed 66 """ 67 if clientId not in self._clients: 68 # first time we see this client, create an object 69 client = FeederClient(clientId) 70 self._clients[clientId] = client 71 self.uiState.append('clients', client.uiState) 72 73 client = self._clients[clientId] 74 self._fdToClient[fd] = (client, cleanup) 75 76 client.connected(fd) 77 78 return client
79
80 - def clientDisconnected(self, fd):
81 """ 82 The client has been entirely removed from multifdsink, and we may 83 now close its file descriptor. 84 The client object stays around so we can track over multiple 85 connections. 86 87 Called from GStreamer threads. 88 89 @type fd: file descriptor 90 """ 91 (client, cleanup) = self._fdToClient.pop(fd) 92 client.disconnected() 93 94 # To avoid races between this thread (a GStreamer thread) closing the 95 # FD, and the reactor thread reusing this FD, we only actually perform 96 # the close in the reactor thread. 97 reactor.callFromThread(cleanup, fd)
98
99 - def getClients(self):
100 """ 101 @rtype: list of all L{FeederClient}s ever seen, including currently 102 disconnected clients 103 """ 104 return self._clients.values()
105
106 -class FeederClient:
107 """ 108 This class groups information related to the client of a feeder. 109 The client is identified by an id. 110 The information remains valid for the lifetime of the feeder, so it 111 can track reconnects of the client. 112 113 @ivar clientId: id of the client of the feeder 114 @ivar fd: file descriptor the client is currently using, or None. 115 """
116 - def __init__(self, clientId):
117 self.uiState = componentui.WorkerComponentUIState() 118 self.uiState.addKey('clientId', clientId) 119 self.fd = None 120 self.uiState.addKey('fd', None) 121 122 # these values can be set to None, which would mean 123 # Unknown, not supported 124 # these are supported 125 for key in ( 126 'bytesReadCurrent', # bytes read over current connection 127 'bytesReadTotal', # bytes read over all connections 128 'reconnects', # number of connections made by this client 129 'lastConnect', # last client connection, in epoch seconds 130 'lastDisconnect', # last client disconnect, in epoch seconds 131 'lastActivity', # last time client read or connected 132 ): 133 self.uiState.addKey(key, 0) 134 # these are possibly unsupported 135 for key in ( 136 'buffersDroppedCurrent', # buffers dropped over current connection 137 'buffersDroppedTotal', # buffers dropped over all connections 138 ): 139 self.uiState.addKey(key, None) 140 141 # internal state allowing us to track global numbers 142 self._buffersDroppedBefore = 0 143 self._bytesReadBefore = 0
144
145 - def setStats(self, stats):
146 """ 147 @type stats: list 148 """ 149 bytesSent = stats[0] 150 #timeAdded = stats[1] 151 #timeRemoved = stats[2] 152 #timeActive = stats[3] 153 timeLastActivity = float(stats[4]) / gst.SECOND 154 if len(stats) > 5: 155 # added in gst-plugins-base 0.10.11 156 buffersDropped = stats[5] 157 else: 158 # We don't know, but we cannot use None 159 # since that would break integer addition below 160 buffersDropped = 0 161 162 self.uiState.set('bytesReadCurrent', bytesSent) 163 self.uiState.set('buffersDroppedCurrent', buffersDropped) 164 self.uiState.set('bytesReadTotal', self._bytesReadBefore + bytesSent) 165 self.uiState.set('lastActivity', timeLastActivity) 166 if buffersDropped is not None: 167 self.uiState.set('buffersDroppedTotal', 168 self._buffersDroppedBefore + buffersDropped)
169
170 - def connected(self, fd, when=None):
171 """ 172 The client has connected on this fd. 173 Update related stats. 174 175 Called only from the reactor thread. 176 """ 177 if not when: 178 when = time.time() 179 self.fd = fd 180 self.uiState.set('fd', fd) 181 self.uiState.set('lastConnect', when) 182 self.uiState.set('reconnects', self.uiState.get('reconnects', 0) + 1)
183
184 - def disconnected(self, when=None):
185 """ 186 The client has disconnected. 187 Update related stats. 188 189 Called from GStreamer threads. 190 """ 191 if not when: 192 when = time.time() 193 self.fd = None 194 195 def updateUIState(): 196 self.uiState.set('fd', None) 197 self.uiState.set('lastDisconnect', when) 198 199 # update our internal counters and reset current counters to 0 200 self._bytesReadBefore += self.uiState.get('bytesReadCurrent') 201 self.uiState.set('bytesReadCurrent', 0) 202 if self.uiState.get('buffersDroppedCurrent') is not None: 203 self._buffersDroppedBefore += self.uiState.get( 204 'buffersDroppedCurrent') 205 self.uiState.set('buffersDroppedCurrent', 0)
206 reactor.callFromThread(updateUIState)
207
208 -class Eater:
209 """ 210 This class groups eater-related information as used by a Feed Component. 211 212 @ivar eaterId: id of the feed this is eating from 213 @ivar uiState: the serializable UI State for this eater 214 """
215 - def __init__(self, eaterId):
216 self.eaterId = eaterId 217 self.uiState = componentui.WorkerComponentUIState() 218 self.uiState.addKey('eaterId') 219 self.uiState.set('eaterId', eaterId) 220 # dict for the current connection 221 connectionDict = { 222 "timeTimestampDiscont": None, 223 "timestampTimestampDiscont": 0.0, # ts of buffer after discont, 224 # in float seconds 225 "lastTimestampDiscont": 0.0, 226 "totalTimestampDiscont": 0.0, 227 "countTimestampDiscont": 0, 228 "timeOffsetDiscont": None, 229 "offsetOffsetDiscont": 0, # offset of buffer after discont 230 "lastOffsetDiscont": 0, 231 "totalOffsetDiscont": 0, 232 "countOffsetDiscont": 0, 233 234 } 235 self.uiState.addDictKey('connection', connectionDict) 236 237 for key in ( 238 'lastConnect', # last client connection, in epoch seconds 239 'lastDisconnect', # last client disconnect, in epoch seconds 240 'totalConnections', # number of connections made by this client 241 'countTimestampDiscont', # number of timestamp disconts seen 242 'countOffsetDiscont', # number of timestamp disconts seen 243 ): 244 self.uiState.addKey(key, 0) 245 for key in ( 246 'totalTimestampDiscont', # total timestamp discontinuity 247 'totalOffsetDiscont', # total offset discontinuity 248 ): 249 self.uiState.addKey(key, 0.0) 250 self.uiState.addKey('fd', None)
251
252 - def connected(self, fd, when=None):
253 """ 254 The eater has been connected. 255 Update related stats. 256 """ 257 if not when: 258 when = time.time() 259 260 def updateUIState(): 261 self.uiState.set('lastConnect', when) 262 self.uiState.set('fd', fd) 263 self.uiState.set('totalConnections', 264 self.uiState.get('totalConnections', 0) + 1) 265 266 self.uiState.setitem("connection", "countTimestampDiscont", 0) 267 self.uiState.setitem("connection", "timeTimestampDiscont", None) 268 self.uiState.setitem("connection", "lastTimestampDiscont", 0.0) 269 self.uiState.setitem("connection", "totalTimestampDiscont", 0.0) 270 self.uiState.setitem("connection", "countOffsetDiscont", 0) 271 self.uiState.setitem("connection", "timeOffsetDiscont", None) 272 self.uiState.setitem("connection", "lastOffsetDiscont", 0) 273 self.uiState.setitem("connection", "totalOffsetDiscont", 0)
274 275 reactor.callFromThread(updateUIState)
276
277 - def disconnected(self, when=None):
278 """ 279 The eater has been disconnected. 280 Update related stats. 281 """ 282 if not when: 283 when = time.time() 284 285 def updateUIState(): 286 self.uiState.set('lastDisconnect', when) 287 self.uiState.set('fd', None)
288 289 reactor.callFromThread(updateUIState) 290
291 - def timestampDiscont(self, seconds, timestamp):
292 """ 293 @param seconds: discont duration in seconds 294 @param timestamp: GStreamer timestamp of new buffer, in seconds. 295 296 Inform the eater of a timestamp discontinuity. 297 This is called from a bus message handler, so in the main thread. 298 """ 299 uiState = self.uiState 300 301 c = uiState.get('connection') # dict 302 uiState.setitem('connection', 'countTimestampDiscont', 303 c.get('countTimestampDiscont', 0) + 1) 304 uiState.set('countTimestampDiscont', 305 uiState.get('countTimestampDiscont', 0) + 1) 306 307 uiState.setitem('connection', 'timeTimestampDiscont', time.time()) 308 uiState.setitem('connection', 'timestampTimestampDiscont', timestamp) 309 uiState.setitem('connection', 'lastTimestampDiscont', seconds) 310 uiState.setitem('connection', 'totalTimestampDiscont', 311 c.get('totalTimestampDiscont', 0) + seconds) 312 uiState.set('totalTimestampDiscont', 313 uiState.get('totalTimestampDiscont', 0) + seconds)
314
315 - def offsetDiscont(self, units, offset):
316 """ 317 Inform the eater of an offset discontinuity. 318 This is called from a bus message handler, so in the main thread. 319 """ 320 uiState = self.uiState 321 322 c = uiState.get('connection') # dict 323 uiState.setitem('connection', 'countOffsetDiscont', 324 c.get('countOffsetDiscont', 0) + 1) 325 uiState.set('countOffsetDiscont', 326 uiState.get('countOffsetDiscont', 0) + 1) 327 328 uiState.setitem('connection', 'timeOffsetDiscont', time.time()) 329 uiState.setitem('connection', 'offsetOffsetDiscont', offset) 330 uiState.setitem('connection', 'lastOffsetDiscont', units) 331 uiState.setitem('connection', 'totalOffsetDiscont', 332 c.get('totalOffsetDiscont', 0) + units) 333 uiState.set('totalOffsetDiscont', 334 uiState.get('totalOffsetDiscont', 0) + units)
335
336 -class FeedComponent(basecomponent.BaseComponent):
337 """ 338 I am a base class for all Flumotion feed components. 339 340 @cvar checkTimestamp: whether to check continuity of timestamps for eaters 341 @cvar checkOffset: whether to check continuity of offsets for eaters 342 """ 343 # keep these as class variables for the tests 344 FDSRC_TMPL = 'fdsrc name=%(name)s' 345 DEPAY_TMPL = 'gdpdepay name=%(name)s-depay' 346 FEEDER_TMPL = 'gdppay ! multifdsink sync=false name=%(name)s buffers-max=500 buffers-soft-max=450 recover-policy=1' 347 # EATER_TMPL is no longer used due to it being dynamic 348 # how often to add the buffer probe 349 BUFFER_PROBE_ADD_FREQUENCY = 5 350 351 # how often to check that a buffer has arrived recently 352 BUFFER_CHECK_FREQUENCY = BUFFER_PROBE_ADD_FREQUENCY * 2.5 353 354 BUFFER_TIME_THRESHOLD = BUFFER_CHECK_FREQUENCY 355 356 logCategory = 'feedcomponent' 357 358 gsignal('feed-ready', str, bool) 359 gsignal('error', str, str) 360 361 _reconnectInterval = 3 362 363 checkTimestamp = False 364 checkOffset = False 365 366 ### BaseComponent interface implementations
367 - def init(self):
368 # add extra keys to state 369 self.state.addKey('eaterNames') # feedId of eaters 370 self.state.addKey('feederNames') # feedId of feeders 371 372 # add keys for eaters and feeders uiState 373 self._feeders = {} # feeder feedId -> Feeder 374 self._eaters = {} # eater feedId -> Eater 375 self.uiState.addListKey('feeders') 376 self.uiState.addListKey('eaters') 377 378 self.pipeline = None 379 self.pipeline_signals = [] 380 self.bus_watch_id = None 381 self.files = [] 382 self.effects = {} 383 self._probe_ids = {} # eater name -> probe handler id 384 self._feeder_probe_cl = None 385 386 self.clock_provider = None 387 388 self.eater_names = [] # componentName:feedName list 389 self._eaterReconnectDC = {} 390 391 self.feedersFeeding = 0 392 self.feed_names = [] # list of feedName 393 self.feeder_names = [] # list of feedId 394 395 self._inactiveEaters = [] # list of feedId's 396 # feedId -> dict of lastTime, lastConnectTime, lastConnectD, 397 # checkEaterDC, 398 self._eaterStatus = {} 399 400 # statechange -> [ deferred ] 401 self._stateChangeDeferreds = {} 402 403 self._gotFirstNewSegment = {} 404 405 # multifdsink's get-stats signal had critical bugs before this version 406 tcppluginversion = gstreamer.get_plugin_version('tcp') 407 self._get_stats_supported = tcppluginversion >= (0, 10, 11, 0) 408 409 # check for identity version and set checkTimestamp and checkOffset 410 # to false if too old 411 vt = gstreamer.get_plugin_version('coreelements') 412 if not vt: 413 raise errors.MissingElementError('identity') 414 if not gstreamer.element_factory_has_property('identity', 415 'check-imperfect-timestamp'): 416 self.checkTimestamp = False 417 self.checkOffset = False 418 self.addMessage( 419 messages.Info(T_(N_( 420 "You will get more debugging information " 421 "if you upgrade to GStreamer 0.10.13 or later " 422 "as and when available."))))
423
424 - def do_setup(self):
425 """ 426 Sets up component. 427 """ 428 eater_config = self.config.get('source', []) 429 feeder_config = self.config.get('feed', []) 430 431 self.debug("feedcomponent.setup(): eater_config %r" % eater_config) 432 self.debug("feedcomponent.setup(): feeder_config %r" % feeder_config) 433 434 # this sets self.eater_names 435 self.parseEaterConfig(eater_config) 436 437 # all eaters start out inactive 438 self._inactiveEaters = self.eater_names[:] 439 440 for name in self.eater_names: 441 d = { 442 'lastTime': 0, 443 'lastConnectTime': 0, 444 'lastConnectD': None, 445 'checkEaterDC': None 446 } 447 self._eaterStatus[name] = d 448 self._eaters[name] = Eater(name) 449 self.uiState.append('eaters', self._eaters[name].uiState) 450 self._eaterReconnectDC['eater:' + name] = None 451 452 # this sets self.feeder_names 453 self.parseFeederConfig(feeder_config) 454 self.feedersWaiting = len(self.feeder_names) 455 for feederName in self.feeder_names: 456 self._feeders[feederName] = Feeder(feederName) 457 self.uiState.append('feeders', 458 self._feeders[feederName].uiState) 459 460 self.debug('setup() with %d eaters and %d feeders waiting' % ( 461 len(self._inactiveEaters), self.feedersWaiting)) 462 463 pipeline = self.create_pipeline() 464 self.set_pipeline(pipeline) 465 466 self.debug('setup() finished') 467 468 return defer.succeed(None)
469 470 ### FeedComponent interface for subclasses
471 - def create_pipeline(self):
472 """ 473 Subclasses have to implement this method. 474 475 @rtype: L{gst.Pipeline} 476 """ 477 raise NotImplementedError, "subclass must implement create_pipeline"
478
479 - def set_pipeline(self, pipeline):
480 """ 481 Subclasses can override me. 482 They should chain up first. 483 """ 484 if self.pipeline: 485 self.cleanup() 486 self.pipeline = pipeline 487 self.setup_pipeline()
488
489 - def eaterSetInactive(self, feedId):
490 """ 491 The eater for the given feedId is no longer active 492 By default, the component will go hungry. 493 """ 494 self.info('Eater of %s is inactive' % feedId) 495 if feedId in self._inactiveEaters: 496 self.warning('Eater of %s was already inactive' % feedId) 497 else: 498 self._inactiveEaters.append(feedId) 499 self.setMood(moods.hungry)
500
501 - def eaterSetActive(self, feedId):
502 """ 503 The eater for the given feedId is now active and producing data. 504 By default, the component will go happy if all eaters are active. 505 """ 506 self.info('Eater of %s is active' % feedId) 507 if feedId not in self._inactiveEaters: 508 self.warning('Eater of %s was already active' % feedId) 509 else: 510 self._inactiveEaters.remove(feedId) 511 if not self._inactiveEaters: 512 self.setMood(moods.happy)
513 # FIXME: it may make sense to have an updateMood method, that can be used 514 # by the two previous methods, but also in other places, and then 515 # overridden. That would make us have to publicize inactiveEaters 516
517 - def eaterTimestampDiscont(self, feedId, prevTs, prevDuration, curTs):
518 """ 519 Inform of a timestamp discontinuity for the given eater. 520 """ 521 discont = curTs - (prevTs + prevDuration) 522 dSeconds = discont / float(gst.SECOND) 523 self.debug("we have a discont on feedId %s of %f s between %s and %s ", 524 feedId, dSeconds, 525 gst.TIME_ARGS(prevTs), 526 gst.TIME_ARGS(curTs)) 527 self._eaters[feedId].timestampDiscont(dSeconds, 528 float(curTs) / float(gst.SECOND))
529
530 - def eaterOffsetDiscont(self, feedId, prevOffsetEnd, curOffset):
531 """ 532 Inform of a timestamp discontinuity for the given eater. 533 """ 534 discont = curOffset - prevOffsetEnd 535 self.debug( 536 "we have a discont on feedId %s of %d units between %d and %d ", 537 feedId, discont, prevOffsetEnd, curOffset) 538 self._eaters[feedId].offsetDiscont(discont, curOffset)
539 540 ### FeedComponent methods
541 - def addEffect(self, effect):
542 self.effects[effect.name] = effect 543 effect.setComponent(self)
544
545 - def effectPropertyChanged(self, effectName, propertyName, value):
546 """ 547 Notify the manager that an effect property has changed to a new value. 548 549 Admin clients will receive it as a propertyChanged message for 550 effectName:propertyName. 551 """ 552 self.medium.callRemote("propertyChanged", self.name, 553 "%s:%s" % (effectName, propertyName), value)
554
555 - def parseEaterConfig(self, eater_config):
556 # the source feeder names come from the config 557 # they are specified under <component> as <source> elements in XML 558 # so if they don't specify a feed name, use "default" as the feed name 559 eater_names = [] 560 for block in eater_config: 561 eater_name = block 562 if block.find(':') == -1: 563 eater_name = block + ':default' 564 eater_names.append(eater_name) 565 self.debug('parsed eater config, eater feedIds %r' % eater_names) 566 self.eater_names = eater_names 567 self.state.set('eaterNames', self.eater_names)
568
569 - def parseFeederConfig(self, feeder_config):
570 # for pipeline components, in the case there is only one 571 # feeder, <feed></feed> still needs to be listed explicitly 572 573 # the feed names come from the config 574 # they are specified under <component> as <feed> elements in XML 575 self.feed_names = feeder_config 576 #self.debug("parseFeederConfig: feed_names: %r" % self.feed_names) 577 578 # we create feeder names this component contains based on feed names 579 self.feeder_names = map(lambda n: self.name + ':' + n, self.feed_names) 580 self.debug('parsed feeder config, feeders %r' % self.feeder_names) 581 self.state.set('feederNames', self.feeder_names)
582
583 - def get_eater_names(self):
584 """ 585 Return the list of feeder names this component eats from. 586 587 @returns: a list of "componentName:feedName" strings 588 """ 589 return self.eater_names
590
591 - def get_feeder_names(self):
592 """ 593 Return the list of feedId's of feeders this component has. 594 595 @returns: a list of "componentName:feedName" strings 596 """ 597 return self.feeder_names
598
599 - def get_feed_names(self):
600 """ 601 Return the list of feedeNames for feeds this component has. 602 603 @returns: a list of "feedName" strings 604 """ 605 return self.feed_names
606
607 - def get_pipeline(self):
608 return self.pipeline
609
610 - def _addStateChangeDeferred(self, statechange):
611 if statechange not in self._stateChangeDeferreds: 612 self._stateChangeDeferreds[statechange] = [] 613 614 d = defer.Deferred() 615 self._stateChangeDeferreds[statechange].append(d) 616 617 return d
618 619 # GstPython should have something for this, but doesn't.
620 - def _getStateChange(self, old, new):
621 if old == gst.STATE_NULL and new == gst.STATE_READY: 622 return gst.STATE_CHANGE_NULL_TO_READY 623 elif old == gst.STATE_READY and new == gst.STATE_PAUSED: 624 return gst.STATE_CHANGE_READY_TO_PAUSED 625 elif old == gst.STATE_PAUSED and new == gst.STATE_PLAYING: 626 return gst.STATE_CHANGE_PAUSED_TO_PLAYING 627 elif old == gst.STATE_PLAYING and new == gst.STATE_PAUSED: 628 return gst.STATE_CHANGE_PLAYING_TO_PAUSED 629 elif old == gst.STATE_PAUSED and new == gst.STATE_READY: 630 return gst.STATE_CHANGE_PAUSED_TO_READY 631 elif old == gst.STATE_READY and new == gst.STATE_NULL: 632 return gst.STATE_CHANGE_READY_TO_NULL 633 else: 634 return 0
635
636 - def bus_watch_func(self, bus, message):
637 t = message.type 638 src = message.src 639 640 # print 'message:', t, src and src.get_name() or '(no source)' 641 if t == gst.MESSAGE_STATE_CHANGED: 642 old, new, pending = message.parse_state_changed() 643 # print src.get_name(), old.value_nick, new.value_nick, pending.value_nick 644 if src == self.pipeline: 645 self.log('state change: %r %s->%s' 646 % (src, old.value_nick, new.value_nick)) 647 if old == gst.STATE_PAUSED and new == gst.STATE_PLAYING: 648 self.setMood(moods.happy) 649 650 change = self._getStateChange(old,new) 651 if change in self._stateChangeDeferreds: 652 dlist = self._stateChangeDeferreds[change] 653 for d in dlist: 654 d.callback(None) 655 del self._stateChangeDeferreds[change] 656 657 elif src.get_name() in ['feeder:'+n for n in self.feeder_names]: 658 if old == gst.STATE_PAUSED and new == gst.STATE_PLAYING: 659 self.debug('feeder %s is now feeding' % src.get_name()) 660 self.feedersWaiting -= 1 661 self.debug('%d feeders waiting' % self.feedersWaiting) 662 # somewhat hacky... feeder:foo:default => default 663 feed_name = src.get_name().split(':')[2] 664 self.emit('feed-ready', feed_name, True) 665 elif t == gst.MESSAGE_ERROR: 666 gerror, debug = message.parse_error() 667 self.warning('element %s error %s %s' % 668 (src.get_path_string(), gerror, debug)) 669 self.setMood(moods.sad) 670 # generate a unique id 671 id = "%s-%s-%d" % (self.name, gerror.domain, gerror.code) 672 m = messages.Error(T_(N_( 673 "Internal GStreamer error.")), 674 debug="%s\n%s: %d\n%s" % ( 675 gerror.message, gerror.domain, gerror.code, debug), 676 id=id, priority=40) 677 self.state.append('messages', m) 678 # if we have a state change defer that has not yet 679 # fired, we should errback it 680 changes = [gst.STATE_CHANGE_NULL_TO_READY, 681 gst.STATE_CHANGE_READY_TO_PAUSED, 682 gst.STATE_CHANGE_PAUSED_TO_PLAYING] 683 # get current state and add downward state changes from states 684 # higher than current element state 685 curstate = self.pipeline.get_state() 686 if curstate == gst.STATE_NULL: 687 changes.append(gst.STATE_CHANGE_READY_TO_NULL) 688 if curstate <= gst.STATE_PAUSED: 689 changes.append(gst.STATE_CHANGE_PLAYING_TO_PAUSED) 690 if curstate <= gst.STATE_READY: 691 changes.append(gst.STATE_CHANGE_PAUSED_TO_READY) 692 for change in changes: 693 if change in self._stateChangeDeferreds: 694 self.log("We have an error, going to errback pending " 695 "state change defers") 696 dlist = self._stateChangeDeferreds[change] 697 for d in dlist: 698 d.errback(errors.ComponentStartHandledError( 699 gerror.message)) 700 del self._stateChangeDeferreds[change] 701 702 elif t == gst.MESSAGE_EOS: 703 name = src.get_name() 704 if name in ['eater:' + n for n in self.eater_names]: 705 self.info('End of stream in eater %s' % src.get_name()) 706 feedId = name[len('eater:'):] 707 self.eaterSetInactive(feedId) 708 # start reconnection 709 self._reconnectEater(feedId) 710 else: 711 self.warning("We got an eos from %s", name) 712 elif t == gst.MESSAGE_ELEMENT: 713 if message.structure.get_name() == 'imperfect-timestamp': 714 identityName = src.get_name() 715 eaterName = identityName.split("-identity")[0] 716 feedId = eaterName[len('eater:'):] 717 718 self.log("we have an imperfect stream from %s" % src.get_name()) 719 # figure out the discontinuity 720 s = message.structure 721 self.eaterTimestampDiscont(feedId, s["prev-timestamp"], 722 s["prev-duration"], s["cur-timestamp"]) 723 elif message.structure.get_name() == 'imperfect-offset': 724 identityName = src.get_name() 725 eaterName = identityName.split("-identity")[0] 726 feedId = eaterName[len('eater:'):] 727 728 self.log("we have an imperfect stream from %s" % src.get_name()) 729 # figure out the discontinuity 730 s = message.structure 731 self.eaterOffsetDiscont(feedId, s["prev-offset-end"], 732 s["cur-offset"]) 733 734 735 else: 736 self.log('message received: %r' % message) 737 738 return True
739 740 # FIXME: privatize
741 - def setup_pipeline(self):
742 self.debug('setup_pipeline()') 743 assert self.bus_watch_id == None 744 745 # disable the pipeline's management of base_time -- we're going 746 # to set it ourselves. 747 self.pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE) 748 749 self.pipeline.set_name('pipeline-' + self.getName()) 750 bus = self.pipeline.get_bus() 751 bus.add_signal_watch() 752 self.bus_watch_id = bus.connect('message', self.bus_watch_func) 753 sig_id = self.pipeline.connect('deep-notify', 754 gstreamer.verbose_deep_notify_cb, self) 755 self.pipeline_signals.append(sig_id) 756 757 # start checking eaters 758 for feedId in self.eater_names: 759 status = self._eaterStatus[feedId] 760 status['checkEaterDC'] = reactor.callLater( 761 self.BUFFER_CHECK_FREQUENCY, self._checkEater, feedId) 762 763 # start checking feeders, if we have a sufficiently recent multifdsink 764 if self._get_stats_supported: 765 self._feeder_probe_cl = reactor.callLater( 766 self.BUFFER_CHECK_FREQUENCY, self._feeder_probe_calllater) 767 else: 768 self.warning("Feeder statistics unavailable, your " 769 "gst-plugins-base is too old") 770 self.addMessage( 771 messages.Warning(T_(N_( 772 "Your gst-plugins-base is too old (older than 0.10.11), so " 773 "feeder statistics will be unavailable. Please upgrade to " 774 "the most recent gst-plugins-base release.")), 775 id='multifdsink'))
776
777 - def pipeline_stop(self):
778 if not self.pipeline: 779 return 780 781 if self.clock_provider: 782 self.clock_provider.set_property('active', False) 783 self.clock_provider = None 784 retval = self.pipeline.set_state(gst.STATE_NULL) 785 if retval != gst.STATE_CHANGE_SUCCESS: 786 self.warning('Setting pipeline to NULL failed')
787
788 - def cleanup(self):
789 self.debug("cleaning up") 790 791 assert self.pipeline != None 792 793 self.pipeline_stop() 794 # Disconnect signals 795 map(self.pipeline.disconnect, self.pipeline_signals) 796 self.pipeline.get_bus().disconnect(self.bus_watch_id) 797 self.pipeline.get_bus().remove_signal_watch() 798 self.pipeline = None 799 self.pipeline_signals = [] 800 self.bus_watch_id = None 801 802 if self._feeder_probe_cl: 803 self._feeder_probe_cl.cancel() 804 self._feeder_probe_cl = None 805 806 # clean up checkEater callLaters 807 for feedId in self.eater_names: 808 status = self._eaterStatus[feedId] 809 if status['checkEaterDC']: 810 status['checkEaterDC'].cancel() 811 status['checkEaterDC'] = None
812
813 - def do_stop(self):
814 self.debug('Stopping') 815 if self.pipeline: 816 self.cleanup() 817 self.debug('Stopped') 818 return defer.succeed(None)
819
820 - def set_master_clock(self, ip, port, base_time):
821 self.debug("Master clock set to %s:%d with base_time %s", ip, port, 822 gst.TIME_ARGS(base_time)) 823 824 clock = gst.NetClientClock(None, ip, port, base_time) 825 self.pipeline.set_base_time(base_time) 826 self.pipeline.use_clock(clock)
827
828 - def provide_master_clock(self, port):
829 """ 830 Tell the component to provide a master clock on the given port. 831 832 @returns: (ip, port, base_time) triple. 833 """ 834 def pipelinePaused(r): 835 clock = self.pipeline.get_clock() 836 # make sure the pipeline sticks with this clock 837 self.pipeline.use_clock(clock) 838 839 self.clock_provider = gst.NetTimeProvider(clock, None, port) 840 # small window here but that's ok 841 self.clock_provider.set_property('active', False) 842 843 base_time = clock.get_time() 844 self.pipeline.set_base_time(base_time) 845 846 self.debug('provided master clock from %r, base time %s' 847 % (clock, gst.TIME_ARGS(base_time))) 848 849 if self.medium: 850 # FIXME: This isn't always correct. We need a more flexible API, 851 # and a proper network map, to do this. Even then, it's not 852 # always going to be possible. 853 ip = self.medium.getIP() 854 else: 855 ip = "127.0.0.1" 856 857 return (ip, port, base_time)
858 859 if not self.pipeline: 860 self.warning('No self.pipeline, cannot provide master clock') 861 # FIXME: should we have a NoSetupError() for cases where setup 862 # was not called ? For now we fall through and get an exception 863 864 if self.clock_provider: 865 self.warning('already had a clock provider, removing it') 866 self.clock_provider = None 867 868 # We need to be >= PAUSED to get the correct clock, in general 869 (ret, state, pending) = self.pipeline.get_state(0) 870 if state != gst.STATE_PAUSED and state != gst.STATE_PLAYING: 871 self.info ("Setting pipeline to PAUSED") 872 873 d = self._addStateChangeDeferred(gst.STATE_CHANGE_READY_TO_PAUSED) 874 d.addCallback(pipelinePaused) 875 876 self.pipeline.set_state(gst.STATE_PAUSED) 877 return d 878 else: 879 self.info ("Pipeline already started, retrieving clocking") 880 # Just return the already set up info, as a fired deferred 881 ip = self.state.get('manager-ip') 882 base_time = self.pipeline.get_base_time() 883 d = defer.Deferred() 884 d.callback((ip, port, base_time)) 885 return d
886 887 # FIXME: rename, since this just starts the pipeline, 888 # and linking is done by the manager 922
923 - def _feeder_probe_calllater(self):
924 for feedId, feeder in self._feeders.items(): 925 feederElement = self.get_element("feeder:%s" % feedId) 926 for client in feeder.getClients(): 927 # a currently disconnected client will have fd None 928 if client.fd is not None: 929 array = feederElement.emit('get-stats', client.fd) 930 if len(array) == 0: 931 # There is an unavoidable race here: we can't know 932 # whether the fd has been removed from multifdsink. 933 # However, if we call get-stats on an fd that 934 # multifdsink doesn't know about, we just get a 0-length 935 # array. We ensure that we don't reuse the FD too soon 936 # so this can't result in calling this on a valid but 937 # WRONG fd 938 self.debug('Feeder element for feed %s does not know ' 939 'client fd %d' % (feedId, client.fd)) 940 else: 941 client.setStats(array) 942 self._feeder_probe_cl = reactor.callLater(self.BUFFER_CHECK_FREQUENCY, 943 self._feeder_probe_calllater)
944
945 - def _add_buffer_probe(self, pad, feedId, firstTime=False):
946 # attached from above, and called again every 947 # BUFFER_PROBE_ADD_FREQUENCY seconds 948 method = self.log 949 if firstTime: method = self.debug 950 method("Adding new scheduled buffer probe for %s" % feedId) 951 self._probe_ids[feedId] = pad.add_buffer_probe(self._buffer_probe_cb, 952 feedId, firstTime)
953
954 - def _buffer_probe_cb(self, pad, buffer, feedId, firstTime=False):
955 """ 956 Periodically scheduled buffer probe, that ensures that we're currently 957 actually having dataflow through our eater elements. 958 959 Called from GStreamer threads. 960 961 @param pad The gst.Pad srcpad for one eater in this component. 962 @param buffer A gst.Buffer that has arrived on this pad 963 @param feedId The feedId for the feed we're eating on this pad 964 @param firstTime Boolean, true if this is the first time this buffer 965 probe has been added for this eater. 966 """ 967 968 # log info about first incoming buffer for this check interval, 969 # then remove ourselves 970 method = self.log 971 if firstTime: method = self.debug 972 method('buffer probe on eater %s has timestamp %s' % ( 973 feedId, gst.TIME_ARGS(buffer.timestamp))) 974 # We carefully only use atomic (w.r.t. the GIL) operations on the dicts 975 # here: we pop things from _probe_ids, and only set things in 976 # self._eaterStatus[feedId]. 977 978 # now store the last buffer received time 979 self._eaterStatus[feedId]['lastTime'] = time.time() 980 probeid = self._probe_ids.pop(feedId, None) 981 if probeid: 982 pad.remove_buffer_probe(probeid) 983 984 # add buffer probe every BUFFER_PROBE_ADD_FREQUENCY seconds 985 reactor.callFromThread(reactor.callLater, 986 self.BUFFER_PROBE_ADD_FREQUENCY, 987 self._add_buffer_probe, pad, feedId) 988 989 # since we've received a buffer, it makes sense to call _checkEater, 990 # allowing us to revert to go back to happy as soon as possible 991 reactor.callFromThread(self._checkEater, feedId) 992 993 return True
994
995 - def _checkEater(self, feedId):
996 """ 997 Check that buffers are being received by the eater. 998 If no buffer was received for more than BUFFER_TIME_THRESHOLD on 999 a connected feed, I call eaterSetInactive. 1000 Likewise, if a buffer was received on an inactive feed, I call 1001 eaterSetActive. 1002 1003 I am used both as a callLater and as a direct method. 1004 """ 1005 status = self._eaterStatus[feedId] 1006 # a callLater is not active anymore while it's being executed, 1007 # cancel deferred call if there's one pending (i.e. if we were called 1008 # by something other than the deferred call) 1009 if status['checkEaterDC'] and status['checkEaterDC'].active(): 1010 status['checkEaterDC'].cancel() 1011 1012 self.log('_checkEater: last buffer at %r' % status['lastTime']) 1013 currentTime = time.time() 1014 1015 # we do not run any checks if the last buffer time is 0 or lower 1016 # this allows us to make sure no check is run when this is needed 1017 # (for example, on eos) 1018 if status['lastTime'] > 0: 1019 delta = currentTime - status['lastTime'] 1020 1021 if feedId not in self._inactiveEaters \ 1022 and delta > self.BUFFER_TIME_THRESHOLD: 1023 self.info( 1024 'No data received for %r seconds, feed %s inactive' % ( 1025 self.BUFFER_TIME_THRESHOLD, feedId)) 1026 self.eaterSetInactive(feedId) 1027 # TODO: we never actually disconnect the eater explicitly, but 1028 # a successful reconnect will cause the old fd to be closed. 1029 # Maybe we should change this? 1030 # start reconnection 1031 self._reconnectEater(feedId) 1032 1033 # mark as connected if recent data received 1034 elif feedId in self._inactiveEaters \ 1035 and delta < self.BUFFER_TIME_THRESHOLD: 1036 self.debug('Received data, feed %s active' % feedId) 1037 self.eaterSetActive(feedId) 1038 1039 # retry a connect call if it has been too long since the 1040 # last and we still don't have data 1041 if feedId in self._inactiveEaters \ 1042 and status['lastConnectTime'] > 0: 1043 connectDelta = currentTime - status['lastConnectTime'] 1044 if connectDelta > self.BUFFER_TIME_THRESHOLD: 1045 self.debug('Too long since last reconnect, retrying') 1046 self._reconnectEater(feedId) 1047 1048 # we run forever 1049 status['checkEaterDC'] = reactor.callLater(self.BUFFER_CHECK_FREQUENCY, 1050 self._checkEater, feedId)
1051
1052 - def _reconnectEater(self, feedId):
1053 eater = self._eaters[feedId] 1054 eater.disconnected() 1055 # reconnect the eater for the given feedId, updating the internal 1056 # status for that eater 1057 status = self._eaterStatus[feedId] 1058 1059 # If an eater received a buffer before being marked as disconnected, 1060 # and still within the buffer check interval, the next eaterCheck 1061 # call could accidentally think the eater was reconnected properly. 1062 # Setting lastTime to 0 here avoids that happening in eaterCheck. 1063 self._eaterStatus[feedId]['lastTime'] = 0 1064 1065 status['lastConnectTime'] = time.time() 1066 if status['lastConnectD']: 1067 self.debug('Cancel previous connection attempt ?') 1068 # FIXME: it seems fine to not errback explicitly, but we may 1069 # want to investigate further later 1070 d = self.medium.connectEater(feedId) 1071 def connectEaterCb(result, status, eater): 1072 status['lastConnectD'] = None
1073 d.addCallback(connectEaterCb, status, eater) 1074 status['lastConnectD'] = d 1075
1076 - def get_element(self, element_name):
1077 """Get an element out of the pipeline. 1078 1079 If it is possible that the component has not yet been set up, 1080 the caller needs to check if self.pipeline is actually set. 1081 """ 1082 assert self.pipeline 1083 element = self.pipeline.get_by_name(element_name) 1084 return element
1085
1086 - def get_element_property(self, element_name, property):
1087 'Gets a property of an element in the GStreamer pipeline.' 1088 self.debug("%s: getting property %s of element %s" % (self.getName(), property, element_name)) 1089 element = self.get_element(element_name) 1090 if not element: 1091 msg = "Element '%s' does not exist" % element_name 1092 self.warning(msg) 1093 raise errors.PropertyError(msg) 1094 1095 self.debug('getting property %s on element %s' % (property, element_name)) 1096 try: 1097 value = element.get_property(property) 1098 except (ValueError, TypeError): 1099 msg = "Property '%s' on element '%s' does not exist" % (property, element_name) 1100 self.warning(msg) 1101 raise errors.PropertyError(msg) 1102 1103 # param enums and enums need to be returned by integer value 1104 if isinstance(value, gobject.GEnum): 1105 value = int(value) 1106 1107 return value
1108
1109 - def set_element_property(self, element_name, property, value):
1110 'Sets a property on an element in the GStreamer pipeline.' 1111 self.debug("%s: setting property %s of element %s to %s" % ( 1112 self.getName(), property, element_name, value)) 1113 element = self.get_element(element_name) 1114 if not element: 1115 msg = "Element '%s' does not exist" % element_name 1116 self.warning(msg) 1117 raise errors.PropertyError(msg) 1118 1119 self.debug('setting property %s on element %r to %s' % 1120 (property, element_name, value)) 1121 pygobject.gobject_set_property(element, property, value)
1122 1123 ### methods to connect component eaters and feeders
1124 - def feedToFD(self, feedName, fd, cleanup, eaterId=None):
1125 """ 1126 @param feedName: name of the feed to feed to the given fd. 1127 @type feedName: str 1128 @param fd: the file descriptor to feed to 1129 @type fd: int 1130 @param cleanup: the function to call when the FD is no longer feeding 1131 @type cleanup: callable 1132 """ 1133 self.debug('FeedToFD(%s, %d)' % (feedName, fd)) 1134 feedId = common.feedId(self.name, feedName) 1135 1136 if not self.pipeline: 1137 self.warning('told to feed %s to fd %d, but pipeline not ' 1138 'running yet', feedId, fd) 1139 cleanup(fd) 1140 # can happen if we are restarting but the other component is 1141 # happy; assume other side will reconnect later 1142 return 1143 1144 elementName = "feeder:%s" % feedId 1145 element = self.get_element(elementName) 1146 if not element: 1147 msg = "Cannot find feeder element named '%s'" % elementName 1148 id = "feedToFD-%s" % feedName 1149 m = messages.Warning(T_(N_("Internal Flumotion error.")), 1150 debug=msg, id=id, priority=40) 1151 self.state.append('messages', m) 1152 self.warning(msg) 1153 return False 1154 1155 clientId = eaterId or ('client-%d' % fd) 1156 1157 element.emit('add', fd) 1158 self._feeders[feedId].clientConnected(clientId, fd, cleanup)
1159
1160 - def removeClientCallback(self, sink, fd):
1161 """ 1162 Called (as a signal callback) when the FD is no longer in use by 1163 multifdsink. 1164 This will call the registered callable on the fd. 1165 1166 Called from GStreamer threads. 1167 """ 1168 self.debug("cleaning up fd %d", fd) 1169 feedId = ':'.join(sink.get_name().split(':')[1:]) 1170 self._feeders[feedId].clientDisconnected(fd)
1171
1172 - def eatFromFD(self, feedId, fd):
1173 """ 1174 Tell the component to eat the given feedId from the given fd. 1175 The component takes over the ownership of the fd, closing it when 1176 no longer eating. 1177 1178 @param feedId: feed id (componentName:feedName) to eat from through 1179 the given fd 1180 @type feedId: str 1181 @param fd: the file descriptor to eat from 1182 @type fd: int 1183 """ 1184 self.debug('EatFromFD(%s, %d)' % (feedId, fd)) 1185 1186 if not self.pipeline: 1187 self.warning('told to eat %s from fd %d, but pipeline not ' 1188 'running yet', feedId, fd) 1189 # can happen if we are restarting but the other component is 1190 # happy; assume other side will reconnect later 1191 os.close(fd) 1192 return 1193 1194 eaterName = "eater:%s" % feedId 1195 self.debug('looking up element %s' % eaterName) 1196 element = self.get_element(eaterName) 1197 1198 # fdsrc only switches to the new fd in ready or below 1199 (result, current, pending) = element.get_state(0L) 1200 if current not in [gst.STATE_NULL, gst.STATE_READY]: 1201 self.debug('eater %s in state %r, kidnapping it' % ( 1202 eaterName, current)) 1203 1204 # we unlink fdsrc from its peer, take it out of the pipeline 1205 # so we can set it to READY without having it send EOS, 1206 # then switch fd and put it back in. 1207 # To do this safely, we first block fdsrc:src, then let the 1208 # component do any neccesary unlocking (needed for multi-input 1209 # elements) 1210 srcpad = element.get_pad('src') 1211 1212 def _block_cb(pad, blocked): 1213 pass
1214 srcpad.set_blocked_async(True, _block_cb) 1215 self.unblock_eater(feedId) 1216 1217 # Now, we can switch FD with this mess 1218 sinkpad = srcpad.get_peer() 1219 srcpad.unlink(sinkpad) 1220 self.pipeline.remove(element) 1221 self.log("setting to ready") 1222 element.set_state(gst.STATE_READY) 1223 self.log("setting to ready complete!!!") 1224 old = element.get_property('fd') 1225 os.close(old) 1226 element.set_property('fd', fd) 1227 self.pipeline.add(element) 1228 srcpad.link(sinkpad) 1229 element.set_state(gst.STATE_PLAYING) 1230 # We're done; unblock the pad 1231 srcpad.set_blocked_async(False, _block_cb) 1232 else: 1233 element.set_property('fd', fd) 1234 1235 # update our eater uiState 1236 self._eaters[feedId].connected(fd) 1237
1238 - def unblock_eater(self, feedId):
1239 """ 1240 After this function returns, the stream lock for this eater must have 1241 been released. If your component needs to do something here, override 1242 this method. 1243 """ 1244 pass
1245
1246 - def _eater_event_probe_cb(self, pad, event, feedId):
1247 """ 1248 An event probe used to consume unwanted EOS events on eaters. 1249 1250 Called from GStreamer threads. 1251 """ 1252 if event.type == gst.EVENT_EOS: 1253 self.info( 1254 'End of stream on feed %s, disconnect will be triggered' % 1255 feedId) 1256 # We swallow it because otherwise our component acts on the EOS 1257 # and we can't recover from that later. Instead, fdsrc will be 1258 # taken out and given a new fd on the next eatFromFD call. 1259 return False 1260 return True
1261
1262 - def _depay_eater_event_probe_cb(self, pad, event, feedId):
1263 """ 1264 An event probe used to consume unwanted duplicate newsegment events. 1265 1266 Called from GStreamer threads. 1267 """ 1268 if event.type == gst.EVENT_NEWSEGMENT: 1269 # We do this because we know gdppay/gdpdepay screw up on 2nd 1270 # newsegments 1271 if feedId in self._gotFirstNewSegment: 1272 self.info( 1273 "Subsequent new segment event received on depay on " 1274 " feed %s" % feedId) 1275 # swallow 1276 return False 1277 else: 1278 self._gotFirstNewSegment[feedId] = True 1279 return True
1280 1281 pygobject.type_register(FeedComponent) 1282