Package flumotion :: Package component :: Module feedcomponent
[hide private]

Source Code for Module flumotion.component.feedcomponent

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Feed components, participating in the stream 
 24  """ 
 25   
 26  import os 
 27   
 28  import gst 
 29  import gst.interfaces 
 30  import gobject 
 31   
 32  from twisted.internet import reactor, defer 
 33  from twisted.spread import pb 
 34  from zope.interface import implements 
 35   
 36  from flumotion.configure import configure 
 37  from flumotion.component import component as basecomponent 
 38  from flumotion.component import feed 
 39  from flumotion.common import common, interfaces, errors, log, pygobject, \ 
 40       messages 
 41  from flumotion.common import gstreamer 
 42  from flumotion.common.i18n import N_, gettexter 
 43  from flumotion.common.planet import moods 
 44  from flumotion.common.pygobject import gsignal 
 45   
 46  __version__ = "$Rev: 6695 $" 
 47  T_ = gettexter() 
 48   
 49   
50 -class FeedComponentMedium(basecomponent.BaseComponentMedium):
51 """ 52 I am a component-side medium for a FeedComponent to interface with 53 the manager-side ComponentAvatar. 54 """ 55 implements(interfaces.IComponentMedium) 56 logCategory = 'feedcompmed' 57 remoteLogName = 'feedserver' 58
59 - def __init__(self, component):
60 """ 61 @param component: L{flumotion.component.feedcomponent.FeedComponent} 62 """ 63 basecomponent.BaseComponentMedium.__init__(self, component) 64 65 self._feederFeedServer = {} # eaterAlias -> (fullFeedId, host, port) tuple 66 # for remote feeders 67 self._feederPendingConnections = {} # eaterAlias -> cancel thunk 68 self._eaterFeedServer = {} # fullFeedId -> (host, port) tuple 69 # for remote eaters 70 self._eaterPendingConnections = {} # feederName -> cancel thunk 71 self.logName = component.name
72 73 ### Referenceable remote methods which can be called from manager
74 - def remote_attachPadMonitorToFeeder(self, feederName):
75 self.comp.attachPadMonitorToFeeder(feederName)
76
77 - def remote_setGstDebug(self, debug):
78 """ 79 Sets the GStreamer debugging levels based on the passed debug string. 80 81 @since: 0.4.2 82 """ 83 self.debug('Setting GStreamer debug level to %s' % debug) 84 if not debug: 85 return 86 87 for part in debug.split(','): 88 glob = None 89 value = None 90 pair = part.split(':') 91 if len(pair) == 1: 92 # assume only the value 93 value = int(pair[0]) 94 elif len(pair) == 2: 95 glob, value = pair 96 value = int(value) 97 else: 98 self.warning("Cannot parse GStreamer debug setting '%s'." % 99 part) 100 continue 101 102 if glob: 103 try: 104 # value has to be an integer 105 gst.debug_set_threshold_for_name(glob, value) 106 except TypeError: 107 self.warning("Cannot set glob %s to value %s" % ( 108 glob, value)) 109 else: 110 gst.debug_set_default_threshold(value)
111
112 - def remote_eatFrom(self, eaterAlias, fullFeedId, host, port):
113 """ 114 Tell the component the host and port for the FeedServer through which 115 it can connect a local eater to a remote feeder to eat the given 116 fullFeedId. 117 118 Called on by the manager-side ComponentAvatar. 119 """ 120 self._feederFeedServer[eaterAlias] = (fullFeedId, host, port) 121 return self.connectEater(eaterAlias)
122
123 - def _getAuthenticatorForFeed(self, eaterAliasOrFeedName):
124 # The avatarId on the keycards issued by the authenticator will 125 # identify us to the remote component. Attempt to use our 126 # fullFeedId, for debugging porpoises. 127 if hasattr(self.authenticator, 'copy'): 128 tup = common.parseComponentId(self.authenticator.avatarId) 129 flowName, componentName = tup 130 fullFeedId = common.fullFeedId(flowName, componentName, 131 eaterAliasOrFeedName) 132 return self.authenticator.copy(fullFeedId) 133 else: 134 return self.authenticator
135
136 - def connectEater(self, eaterAlias):
137 """ 138 Connect one of the medium's component's eaters to a remote feed. 139 Called by the component, both on initial connection and for 140 reconnecting. 141 142 @returns: (deferred, cancel) pair, where cancel is a thunk that 143 you can call to cancel any pending connection attempt. 144 """ 145 def gotFeed((feedId, fd)): 146 self._feederPendingConnections.pop(eaterAlias, None) 147 self.comp.eatFromFD(eaterAlias, feedId, fd)
148 149 if eaterAlias not in self._feederFeedServer: 150 self.debug("eatFrom() hasn't been called yet for eater %s", 151 eaterAlias) 152 # unclear if this function should have a return value at 153 # all... 154 return defer.succeed(None) 155 156 (fullFeedId, host, port) = self._feederFeedServer[eaterAlias] 157 158 cancel = self._feederPendingConnections.pop(eaterAlias, None) 159 if cancel: 160 self.debug('cancelling previous connection attempt on %s', 161 eaterAlias) 162 cancel() 163 164 client = feed.FeedMedium(logName=self.comp.name) 165 166 d = client.requestFeed(host, port, 167 self._getAuthenticatorForFeed(eaterAlias), 168 fullFeedId) 169 self._feederPendingConnections[eaterAlias] = client.stopConnecting 170 d.addCallback(gotFeed) 171 return d
172
173 - def remote_feedTo(self, feederName, fullFeedId, host, port):
174 """ 175 Tell the component to feed the given feed to the receiving component 176 accessible through the FeedServer on the given host and port. 177 178 Called on by the manager-side ComponentAvatar. 179 """ 180 self._eaterFeedServer[fullFeedId] = (host, port) 181 self.connectFeeder(feederName, fullFeedId)
182
183 - def connectFeeder(self, feederName, fullFeedId):
184 """ 185 Tell the component to feed the given feed to the receiving component 186 accessible through the FeedServer on the given host and port. 187 188 Called on by the manager-side ComponentAvatar. 189 """ 190 def gotFeed((fullFeedId, fd)): 191 self._eaterPendingConnections.pop(feederName, None) 192 self.comp.feedToFD(feederName, fd, os.close, fullFeedId)
193 194 if fullFeedId not in self._eaterFeedServer: 195 self.debug("feedTo() hasn't been called yet for feeder %s", 196 feederName) 197 # unclear if this function should have a return value at 198 # all... 199 return defer.succeed(None) 200 201 host, port = self._eaterFeedServer[fullFeedId] 202 203 # probably should key on feederName as well 204 cancel = self._eaterPendingConnections.pop(fullFeedId, None) 205 if cancel: 206 self.debug('cancelling previous connection attempt on %s', 207 feederName) 208 cancel() 209 210 client = feed.FeedMedium(logName=self.comp.name) 211 212 d = client.sendFeed(host, port, 213 self._getAuthenticatorForFeed(feederName), 214 fullFeedId) 215 self._eaterPendingConnections[feederName] = client.stopConnecting 216 d.addCallback(gotFeed) 217 return d 218
219 - def remote_provideMasterClock(self, port):
220 """ 221 Tells the component to start providing a master clock on the given 222 UDP port. 223 Can only be called if setup() has been called on the component. 224 225 The IP address returned is the local IP the clock is listening on. 226 227 @returns: (ip, port, base_time) 228 @rtype: tuple of (str, int, long) 229 """ 230 self.debug('remote_provideMasterClock(port=%r)' % port) 231 return self.comp.provide_master_clock(port)
232
233 - def remote_getMasterClockInfo(self):
234 """ 235 Return the clock master info created by a previous call to provideMasterClock. 236 237 @returns: (ip, port, base_time) 238 @rtype: tuple of (str, int, long) 239 """ 240 return self.comp.get_master_clock()
241
242 - def remote_setMasterClock(self, ip, port, base_time):
243 return self.comp.set_master_clock(ip, port, base_time)
244
245 - def remote_effect(self, effectName, methodName, *args, **kwargs):
246 """ 247 Invoke the given methodName on the given effectName in this component. 248 The effect should implement effect_(methodName) to receive the call. 249 """ 250 self.debug("calling %s on effect %s" % (methodName, effectName)) 251 if not effectName in self.comp.effects: 252 raise errors.UnknownEffectError(effectName) 253 effect = self.comp.effects[effectName] 254 if not hasattr(effect, "effect_%s" % methodName): 255 raise errors.NoMethodError("%s on effect %s" % (methodName, 256 effectName)) 257 method = getattr(effect, "effect_%s" % methodName) 258 try: 259 result = method(*args, **kwargs) 260 except TypeError: 261 msg = "effect method %s did not accept %s and %s" % ( 262 methodName, args, kwargs) 263 self.debug(msg) 264 raise errors.RemoteRunError(msg) 265 self.debug("effect: result: %r" % result) 266 return result
267 268 from feedcomponent010 import FeedComponent 269 270 FeedComponent.componentMediumClass = FeedComponentMedium 271
272 -class ParseLaunchComponent(FeedComponent):
273 """A component using gst-launch syntax 274 275 @cvar checkTimestamp: whether to check continuity of timestamps for eaters 276 @cvar checkOffset: whether to check continuity of offsets for 277 eaters 278 """ 279 280 DELIMITER = '@' 281 282 # can be set by subclasses 283 checkTimestamp = False 284 checkOffset = False 285 286 # keep these as class variables for the tests 287 FDSRC_TMPL = 'fdsrc name=%(name)s' 288 DEPAY_TMPL = 'gdpdepay name=%(name)s-depay' 289 FEEDER_TMPL = 'gdppay name=%(name)s-pay ! multifdsink sync=false '\ 290 'name=%(name)s buffers-max=500 buffers-soft-max=450 '\ 291 'recover-policy=1' 292 EATER_TMPL = None 293
294 - def init(self):
295 if not gstreamer.get_plugin_version('coreelements'): 296 raise errors.MissingElementError('identity') 297 if not gstreamer.element_factory_has_property('identity', 298 'check-imperfect-timestamp'): 299 self.checkTimestamp = False 300 self.checkOffset = False 301 self.addMessage( 302 messages.Info(T_(N_( 303 "You will get more debugging information " 304 "if you upgrade to GStreamer 0.10.13 or later.")))) 305 306 self.EATER_TMPL = self.FDSRC_TMPL + ' %(queue)s ' + self.DEPAY_TMPL 307 if self.checkTimestamp or self.checkOffset: 308 self.EATER_TMPL += " ! identity name=%(name)s-identity silent=TRUE" 309 if self.checkTimestamp: 310 self.EATER_TMPL += " check-imperfect-timestamp=1" 311 if self.checkOffset: 312 self.EATER_TMPL += " check-imperfect-offset=1"
313 314 ### FeedComponent interface implementations
315 - def create_pipeline(self):
316 try: 317 unparsed = self.get_pipeline_string(self.config['properties']) 318 except errors.MissingElementError, e: 319 self.warning('Missing %s element' % e.args[0]) 320 m = messages.Error(T_(N_( 321 "The worker does not have the '%s' element installed.\n" 322 "Please install the necessary plug-in and restart " 323 "the component.\n"), e.args[0])) 324 self.addMessage(m) 325 raise errors.ComponentSetupHandledError(e) 326 327 self.pipeline_string = self.parse_pipeline(unparsed) 328 329 try: 330 pipeline = gst.parse_launch(self.pipeline_string) 331 except gobject.GError, e: 332 self.warning('Could not parse pipeline: %s' % e.message) 333 m = messages.Error(T_(N_( 334 "GStreamer error: could not parse component pipeline.")), 335 debug=e.message) 336 self.addMessage(m) 337 raise errors.PipelineParseError(e.message) 338 339 return pipeline
340
341 - def set_pipeline(self, pipeline):
342 FeedComponent.set_pipeline(self, pipeline) 343 if self.checkTimestamp or self.checkOffset: 344 watchElements = dict([(e.elementName + '-identity' , e) 345 for e in self.eaters.values()]) 346 self.install_eater_continuity_watch(watchElements) 347 self.configure_pipeline(self.pipeline, self.config['properties'])
348 349 ### ParseLaunchComponent interface for subclasses
350 - def get_pipeline_string(self, properties):
351 """ 352 Method that must be implemented by subclasses to produce the 353 gstparse string for the component's pipeline. Subclasses should 354 not chain up; this method raises a NotImplemented error. 355 356 Returns: a new pipeline string representation. 357 """ 358 raise NotImplementedError('subclasses should implement ' 359 'get_pipeline_string')
360
361 - def configure_pipeline(self, pipeline, properties):
362 """ 363 Method that can be implemented by subclasses if they wish to 364 interact with the pipeline after it has been created and set 365 on the component. 366 367 This could include attaching signals and bus handlers. 368 """ 369 pass
370 371 ### private methods
372 - def add_default_eater_feeder(self, pipeline):
373 if len(self.eaters) == 1: 374 eater = 'eater:' + self.eaters.keys()[0] 375 if eater not in pipeline: 376 pipeline = '@' + eater + '@ ! ' + pipeline 377 if len(self.feeders) == 1: 378 feeder = 'feeder:' + self.feeders.keys()[0] 379 if feeder not in pipeline: 380 pipeline = pipeline + ' ! @' + feeder + '@' 381 return pipeline
382
383 - def parse_tmpl(self, pipeline, templatizers):
384 """ 385 Expand the given pipeline string representation by substituting 386 blocks between '@' with a filled-in template. 387 388 @param pipeline: a pipeline string representation with variables 389 @param templatizers: A dict of prefix => procedure. Template 390 blocks in the pipeline will be replaced 391 with the result of calling the procedure 392 with what is left of the template after 393 taking off the prefix. 394 @returns: a new pipeline string representation. 395 """ 396 assert pipeline != '' 397 398 # verify the template has an even number of delimiters 399 if pipeline.count(self.DELIMITER) % 2 != 0: 400 raise TypeError("'%s' contains an odd number of '%s'" 401 % (pipeline, self.DELIMITER)) 402 403 out = [] 404 for i, block in enumerate(pipeline.split(self.DELIMITER)): 405 # when splitting, the even-indexed members will remain, and 406 # the odd-indexed members are the blocks to be substituted 407 if i % 2 == 0: 408 out.append(block) 409 else: 410 block = block.strip() 411 try: 412 pos = block.index(':') 413 except ValueError: 414 raise TypeError("Template %r has no colon" % (block,)) 415 prefix = block[:pos+1] 416 if prefix not in templatizers: 417 raise TypeError("Template %r has invalid prefix %r" 418 % (block, prefix)) 419 out.append(templatizers[prefix](block[pos+1:])) 420 return ''.join(out)
421
422 - def parse_pipeline(self, pipeline):
423 pipeline = " ".join(pipeline.split()) 424 self.debug('Creating pipeline, template is %s', pipeline) 425 426 if pipeline == '' and not self.eaters: 427 raise TypeError, "Need a pipeline or a eater" 428 429 if pipeline == '': 430 # code of dubious value 431 assert self.eaters 432 pipeline = 'fakesink signal-handoffs=1 silent=1 name=sink' 433 434 pipeline = self.add_default_eater_feeder(pipeline) 435 pipeline = self.parse_tmpl(pipeline, 436 {'eater:': self.get_eater_template, 437 'feeder:': self.get_feeder_template}) 438 439 self.debug('pipeline is %s', pipeline) 440 assert self.DELIMITER not in pipeline 441 442 return pipeline
443
444 - def get_eater_template(self, eaterAlias):
445 queue = self.get_queue_string(eaterAlias) 446 elementName = self.eaters[eaterAlias].elementName 447 448 return self.EATER_TMPL % {'name': elementName, 'queue': queue}
449
450 - def get_feeder_template(self, feederName):
451 elementName = self.feeders[feederName].elementName 452 return self.FEEDER_TMPL % {'name': elementName}
453
454 - def get_queue_string(self, eaterAlias):
455 """ 456 Return a parse-launch string to join the fdsrc eater element and 457 the depayer, for example '!' or '! queue !'. The string may have 458 no format strings. 459 """ 460 return '!'
461
462 -class Effect(log.Loggable):
463 """ 464 I am a part of a feed component for a specific group 465 of functionality. 466 467 @ivar name: name of the effect 468 @type name: string 469 @ivar component: component owning the effect 470 @type component: L{FeedComponent} 471 """ 472 logCategory = "effect" 473
474 - def __init__(self, name):
475 """ 476 @param name: the name of the effect 477 """ 478 self.name = name 479 self.setComponent(None)
480
481 - def setComponent(self, component):
482 """ 483 Set the given component as the effect's owner. 484 485 @param component: the component to set as an owner of this effect 486 @type component: L{FeedComponent} 487 """ 488 self.component = component 489 self.setUIState(component and component.uiState or None)
490
491 - def setUIState(self, state):
492 """ 493 Set the given UI state on the effect. This method is ideal for 494 adding keys to the UI state. 495 496 @param state: the UI state for the component to use. 497 @type state: L{flumotion.common.componentui.WorkerComponentUIState} 498 """ 499 self.uiState = state
500
501 - def getComponent(self):
502 """ 503 Get the component owning this effect. 504 505 @rtype: L{FeedComponent} 506 """ 507 return self.component
508
509 -class MultiInputParseLaunchComponent(ParseLaunchComponent):
510 """ 511 This class provides for multi-input ParseLaunchComponents, such as muxers, 512 with a queue attached to each input. 513 """ 514 QUEUE_SIZE_BUFFERS = 16 515
516 - def get_muxer_string(self, properties):
517 """ 518 Return a gst-parse description of the muxer, which must be named 'muxer' 519 """ 520 raise errors.NotImplementedError("Implement in a subclass")
521
522 - def get_queue_string(self, eaterAlias):
523 name = self.eaters[eaterAlias].elementName 524 return ("! queue name=%s-queue max-size-buffers=%d !" 525 % (name, self.QUEUE_SIZE_BUFFERS))
526
527 - def get_pipeline_string(self, properties):
528 eaters = self.config.get('eater', {}) 529 sources = self.config.get('source', []) 530 if eaters == {} and sources != []: 531 # for upgrade without manager restart 532 feeds = [] 533 for feed in sources: 534 if not ':' in feed: 535 feed = '%s:default' % feed 536 feeds.append(feed) 537 eaters = { 'default': [(x, 'default') for x in feeds] } 538 539 pipeline = '' 540 for e in eaters: 541 for feed, alias in eaters[e]: 542 pipeline += '@ eater:%s @ ! muxer. ' % alias 543 544 pipeline += self.get_muxer_string(properties) + ' ' 545 546 return pipeline
547
548 - def unblock_eater(self, eaterAlias):
549 # Firstly, ensure that any push in progress is guaranteed to return, 550 # by temporarily enlarging the queue 551 queuename = self.eaters[eaterAlias].elementName + '-queue' 552 queue = self.pipeline.get_by_name(queuename) 553 554 size = queue.get_property("max-size-buffers") 555 queue.set_property("max-size-buffers", size + 1) 556 557 # So, now it's guaranteed to return. However, we want to return the 558 # queue size to its original value. Doing this in a thread-safe manner 559 # is rather tricky... 560 def _block_cb(pad, blocked): 561 # This is called from streaming threads, but we don't do anything 562 # here so it's safe. 563 pass
564 def _underrun_cb(element): 565 # Called from a streaming thread. The queue element does not hold 566 # the queue lock when this is called, so we block our sinkpad, 567 # then re-check the current level. 568 pad = element.get_pad("sink") 569 pad.set_blocked_async(True, _block_cb) 570 level = element.get_property("current-level-buffers") 571 if level < self.QUEUE_SIZE_BUFFERS: 572 element.set_property('max-size-buffers', 573 self.QUEUE_SIZE_BUFFERS) 574 element.disconnect(signalid) 575 pad.set_blocked_async(False, _block_cb)
576 577 signalid = queue.connect("underrun", _underrun_cb) 578