Package flumotion :: Package component :: Module feedcomponent
[hide private]

Source Code for Module flumotion.component.feedcomponent

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Feed components, participating in the stream 
 24  """ 
 25   
 26  import gst 
 27  import gst.interfaces 
 28  import gobject 
 29   
 30  from twisted.internet import reactor, defer 
 31  from twisted.spread import pb 
 32   
 33  from flumotion.configure import configure 
 34  from flumotion.component import component as basecomponent 
 35  from flumotion.common import common, interfaces, errors, log, pygobject, messages 
 36  from flumotion.common import gstreamer 
 37   
 38  from flumotion.common.planet import moods 
 39  from flumotion.common.pygobject import gsignal 
 40  from flumotion.twisted.compat import implements 
 41   
 42  # FIXME: maybe move feed to component ? 
 43  from flumotion.worker import feed 
 44  from flumotion.common.messages import N_ 
 45  T_ = messages.gettexter('flumotion') 
 46   
47 -class FeedComponentMedium(basecomponent.BaseComponentMedium):
48 """ 49 I am a component-side medium for a FeedComponent to interface with 50 the manager-side ComponentAvatar. 51 """ 52 implements(interfaces.IComponentMedium) 53 logCategory = 'feedcompmed' 54 remoteLogName = 'feedserver' 55
56 - def __init__(self, component):
57 """ 58 @param component: L{flumotion.component.feedcomponent.FeedComponent} 59 """ 60 basecomponent.BaseComponentMedium.__init__(self, component) 61 62 self._feederFeedServer = {} # FeedId -> (fullFeedId, host, port) tuple 63 # for remote feeders 64 self._feederClientFactory = {} # fullFeedId -> client factory 65 self._eaterFeedServer = {} # fullFeedId -> (host, port) tuple 66 # for remote eaters 67 self._eaterClientFactory = {} # (componentId, feedId) -> client factory 68 self._eaterTransport = {} # (componentId, feedId) -> transport 69 self.logName = component.name 70 71 def on_feed_ready(component, feedName, isReady): 72 self.callRemote('feedReady', feedName, isReady)
73 74 def on_component_error(component, element_path, message): 75 self.callRemote('error', element_path, message)
76 77 self.comp.connect('feed-ready', on_feed_ready) 78 self.comp.connect('error', on_component_error) 79 80 # override base Errback for callRemote to stop the pipeline 81 #def callRemoteErrback(reason): 82 # self.warning('stopping pipeline because of %s' % reason) 83 # self.comp.pipeline_stop() 84 85 ### Referenceable remote methods which can be called from manager
86 - def remote_getElementProperty(self, elementName, property):
87 return self.comp.get_element_property(elementName, property)
88
89 - def remote_setElementProperty(self, elementName, property, value):
90 self.comp.set_element_property(elementName, property, value)
91
92 - def remote_setGstDebug(self, debug):
93 """ 94 Sets the GStreamer debugging levels based on the passed debug string. 95 """ 96 self.debug('Setting GStreamer debug level to %s' % debug) 97 if not debug: 98 return 99 100 for part in debug.split(','): 101 glob = None 102 value = None 103 pair = part.split(':') 104 if len(pair) == 1: 105 # assume only the value 106 value = int(pair[0]) 107 elif len(pair) == 2: 108 glob, value = pair 109 else: 110 self.warning("Cannot parse GStreamer debug setting '%s'." % 111 part) 112 continue 113 114 if glob: 115 gst.debug_set_threshold_for_name(glob, value) 116 else: 117 gst.debug_set_default_threshold(value)
118
119 - def remote_eatFrom(self, fullFeedId, host, port):
120 """ 121 Tell the component the host and port for the FeedServer through which 122 it can connect a local eater to a remote feeder to eat the given 123 fullFeedId. 124 125 Called on by the manager-side ComponentAvatar. 126 """ 127 # we key on the feedId because a component is part of only one flow, 128 # and doesn't even know the flow name it is part of. 129 flowName, componentName, feedName = common.parseFullFeedId(fullFeedId) 130 feedId = common.feedId(componentName, feedName) 131 self._feederFeedServer[feedId] = (fullFeedId, host, port) 132 # FIXME: drop connection if we already had one 133 return self.connectEater(feedId)
134
135 - def connectEater(self, feedId):
136 """ 137 Actually eat the given feed. 138 Used on initial connection, and for reconnecting. 139 """ 140 (fullFeedId, host, port) = self._feederFeedServer[feedId] 141 client = feed.FeedMedium(self.comp) 142 factory = feed.FeedClientFactory(client) 143 # FIXME: maybe copy keycard instead, so we can change requester ? 144 self.debug('connecting to FeedServer on %s:%d' % (host, port)) 145 reactor.connectTCP(host, port, factory) 146 d = factory.login(self.authenticator) 147 self._feederClientFactory[fullFeedId] = factory 148 def loginCb(remoteRef): 149 self.debug('logged in to feedserver, remoteRef %r' % remoteRef) 150 client.setRemoteReference(remoteRef) 151 # now call on the remoteRef to eat 152 self.debug( 153 'COMPONENT --> feedserver: sendFeed(%s)' % fullFeedId) 154 d = remoteRef.callRemote('sendFeed', fullFeedId) 155 156 def sendFeedCb(result): 157 self.debug('COMPONENT <-- feedserver: sendFeed(%s): %r' % ( 158 fullFeedId, result)) 159 # FIXME: why does this not return result ? 160 return None
161 162 d.addCallback(sendFeedCb) 163 return d 164 165 d.addCallback(loginCb) 166 return d 167
168 - def remote_feedTo(self, componentId, feedId, host, port):
169 """ 170 Tell the component to feed the given feed to the receiving component 171 accessible through the FeedServer on the given host and port. 172 173 Called on by the manager-side ComponentAvatar. 174 """ 175 # FIXME: check if this overwrites current config, and adapt if it 176 # does 177 self._eaterFeedServer[(componentId, feedId)] = (host, port) 178 client = feed.FeedMedium(self.comp) 179 factory = feed.FeedClientFactory(client) 180 # FIXME: maybe copy keycard instead, so we can change requester ? 181 self.debug('connecting to FeedServer on %s:%d' % (host, port)) 182 reactor.connectTCP(host, port, factory) 183 d = factory.login(self.authenticator) 184 self._eaterClientFactory[(componentId, feedId)] = factory 185 def loginCb(remoteRef): 186 self.debug('logged in to feedserver, remoteRef %r' % remoteRef) 187 client.setRemoteReference(remoteRef) 188 # now call on the remoteRef to eat 189 self.debug( 190 'COMPONENT --> feedserver: receiveFeed(%s, %s)' % ( 191 componentId, feedId)) 192 d = remoteRef.callRemote('receiveFeed', componentId, feedId) 193 194 def receiveFeedCb(result): 195 self.debug( 196 'COMPONENT <-- feedserver: receiveFeed(%s, %s): %r' % ( 197 componentId, feedId, result)) 198 componentName, feedName = common.parseFeedId(feedId) 199 t = remoteRef.broker.transport 200 t.stopReading() 201 t.stopWriting() 202 203 key = (componentId, feedId) 204 self._eaterTransport[key] = t 205 remoteRef.broker.transport = None 206 fd = t.fileno() 207 self.debug('Telling component to feed feedName %s to fd %d'% ( 208 feedName, fd)) 209 self.comp.feedToFD(feedName, fd)
210 211 d.addCallback(receiveFeedCb) 212 return d 213 214 d.addCallback(loginCb) 215 return d 216
217 - def remote_provideMasterClock(self, port):
218 """ 219 Tells the component to start providing a master clock on the given 220 UDP port. 221 Can only be called if setup() has been called on the component. 222 223 The IP address returned is the local IP the clock is listening on. 224 225 @returns: (ip, port, base_time) 226 @rtype: tuple of (str, int, long) 227 """ 228 self.debug('remote_provideMasterClock(port=%r)' % port) 229 return self.comp.provide_master_clock(port)
230
231 - def remote_effect(self, effectName, methodName, *args, **kwargs):
232 """ 233 Invoke the given methodName on the given effectName in this component. 234 The effect should implement effect_(methodName) to receive the call. 235 """ 236 self.debug("calling %s on effect %s" % (methodName, effectName)) 237 if not effectName in self.comp.effects: 238 raise errors.UnknownEffectError(effectName) 239 effect = self.comp.effects[effectName] 240 if not hasattr(effect, "effect_%s" % methodName): 241 raise errors.NoMethodError("%s on effect %s" % (methodName, 242 effectName)) 243 method = getattr(effect, "effect_%s" % methodName) 244 try: 245 result = method(*args, **kwargs) 246 except TypeError: 247 msg = "effect method %s did not accept %s and %s" % ( 248 methodName, args, kwargs) 249 self.debug(msg) 250 raise errors.RemoteRunError(msg) 251 self.debug("effect: result: %r" % result) 252 return result
253 254 from feedcomponent010 import FeedComponent 255 256 FeedComponent.componentMediumClass = FeedComponentMedium 257
258 -class ParseLaunchComponent(FeedComponent):
259 'A component using gst-launch syntax' 260 261 DELIMITER = '@' 262 263 ### FeedComponent interface implementations
264 - def create_pipeline(self):
265 try: 266 unparsed = self.get_pipeline_string(self.config['properties']) 267 except errors.MissingElementError, e: 268 m = messages.Error(T_(N_( 269 "The worker does not have the '%s' element installed.\n" 270 "Please install the necessary plug-in and restart " 271 "the component.\n"), e.args[0])) 272 self.state.append('messages', m) 273 raise errors.ComponentSetupHandledError(e) 274 275 self.pipeline_string = self.parse_pipeline(unparsed) 276 277 try: 278 pipeline = gst.parse_launch(self.pipeline_string) 279 280 # Connect to the client-fd-removed signals on each feeder, so we 281 # can clean up properly on removal. 282 feeder_element_names = map(lambda n: "feeder:" + n, 283 self.feeder_names) 284 for feeder in feeder_element_names: 285 element = pipeline.get_by_name(feeder) 286 element.connect('client-fd-removed', self.removeClientCallback) 287 self.debug("Connected %s to removeClientCallback", feeder) 288 289 return pipeline 290 except gobject.GError, e: 291 self.warning('Could not parse pipeline: %s' % e.message) 292 m = messages.Error(T_(N_( 293 "GStreamer error: could not parse component pipeline.")), 294 debug=e.message) 295 self.state.append('messages', m) 296 raise errors.PipelineParseError(e.message)
297
298 - def set_pipeline(self, pipeline):
299 FeedComponent.set_pipeline(self, pipeline) 300 self.configure_pipeline(self.pipeline, self.config['properties'])
301 302 ### ParseLaunchComponent interface for subclasses
303 - def get_pipeline_string(self, properties):
304 """ 305 Method that must be implemented by subclasses to produce the 306 gstparse string for the component's pipeline. Subclasses should 307 not chain up; this method raises a NotImplemented error. 308 309 Returns: a new pipeline string representation. 310 """ 311 raise NotImplementedError('subclasses should implement ' 312 'get_pipeline_string')
313
314 - def configure_pipeline(self, pipeline, properties):
315 """ 316 Method that can be implemented by subclasses if they wish to 317 interact with the pipeline after it has been created and set 318 on the component. 319 320 This could include attaching signals and bus handlers. 321 """ 322 pass
323 324 ### private methods
325 - def _expandElementName(self, block):
326 """ 327 Expand the given string to a full element name for an eater or feeder. 328 The full name is of the form eater:(sourceComponentName):(feedName) 329 or feeder:(componentName):feedName 330 """ 331 if ' ' in block: 332 raise TypeError, "spaces not allowed in '%s'" % block 333 if not ':' in block: 334 raise TypeError, "no colons in'%s'" % block 335 if block.count(':') > 2: 336 raise TypeError, "too many colons in '%s'" % block 337 338 parts = block.split(':') 339 340 if parts[0] != 'eater' and parts[0] != 'feeder': 341 raise TypeError, "'%s' does not start with eater or feeder" % block 342 343 # we can only fill in component names for feeders 344 if not parts[1]: 345 if parts[0] == 'eater': 346 raise TypeError, "'%s' should specify feeder component" % block 347 parts[1] = self.name 348 if len(parts) == 2: 349 parts.append('') 350 if not parts[2]: 351 parts[2] = 'default' 352 353 return ":".join(parts)
354
355 - def _expandElementNames(self, block):
356 """ 357 Expand each @..@ block to use the full element name for eater or feeder. 358 The full name is of the form eater:(sourceComponentName):(feedName) 359 or feeder:(componentName):feedName 360 This also does some basic checking of the block. 361 """ 362 assert block != '' 363 364 # verify the template has an even number of delimiters 365 if block.count(self.DELIMITER) % 2 != 0: 366 raise TypeError, "'%s' contains an odd number of '%s'" % (block, self.DELIMITER) 367 368 # when splitting, the even-indexed members will remain, 369 # and the odd-indexed members are the blocks to be substituted 370 blocks = block.split(self.DELIMITER) 371 372 for i in range(1, len(blocks) - 1, 2): 373 blocks[i] = self._expandElementName(blocks[i].strip()) 374 return "@".join(blocks)
375
376 - def parse_tmpl(self, pipeline, names, template_func, format):
377 """ 378 Expand the given pipeline string representation by substituting 379 blocks between '@' with a filled-in template. 380 381 @param pipeline: a pipeline string representation with variables 382 @param names: the element names to substitute for @...@ segments 383 @param template_func: function to call to get the template to use for 384 element factory info 385 @param format: the format to use when substituting 386 387 Returns: a new pipeline string representation. 388 """ 389 assert pipeline != '' 390 391 deli = self.DELIMITER 392 393 if len(names) == 1: 394 part_name = names[0] 395 template = template_func(part_name) 396 named = template % {'name': part_name} 397 if pipeline.find(part_name) != -1: 398 pipeline = pipeline.replace(deli + part_name + deli, named) 399 else: 400 pipeline = format % {'tmpl': named, 'pipeline': pipeline} 401 else: 402 for part in names: 403 part_name = deli + part + deli # mmm, deli sandwich 404 if pipeline.find(part_name) == -1: 405 raise TypeError, "%s needs to be specified in the pipeline '%s'" % (part_name, pipeline) 406 407 template = template_func(part) 408 pipeline = pipeline.replace(part_name, 409 template % {'name': part}) 410 return pipeline
411
412 - def parse_pipeline(self, pipeline):
413 pipeline = " ".join(pipeline.split()) 414 self.debug('Creating pipeline, template is %s' % pipeline) 415 416 eater_names = self.get_eater_names() 417 if pipeline == '' and not eater_names: 418 raise TypeError, "Need a pipeline or a eater" 419 420 if pipeline == '': 421 assert eater_names 422 pipeline = 'fakesink signal-handoffs=1 silent=1 name=sink' 423 424 # we expand the pipeline based on the templates and eater/feeder names 425 # elements are named eater:(source_component_name):(feed_name) 426 # or feeder:(component_name):(feed_name) 427 eater_element_names = map(lambda n: "eater:" + n, eater_names) 428 feeder_element_names = map(lambda n: "feeder:" + n, self.feeder_names) 429 self.debug('we eat with eater elements %s' % eater_element_names) 430 self.debug('we feed with feeder elements %s' % feeder_element_names) 431 pipeline = self._expandElementNames(pipeline) 432 433 pipeline = self.parse_tmpl(pipeline, eater_element_names, 434 self.get_eater_template, 435 '%(tmpl)s ! %(pipeline)s') 436 pipeline = self.parse_tmpl(pipeline, feeder_element_names, 437 self.get_feeder_template, 438 '%(pipeline)s ! %(tmpl)s') 439 pipeline = " ".join(pipeline.split()) 440 441 self.debug('pipeline for %s is %s' % (self.getName(), pipeline)) 442 assert self.DELIMITER not in pipeline 443 444 return pipeline
445
446 - def get_eater_template(self, eaterName):
447 queue = self.get_queue_string(eaterName) 448 check = "" 449 if self.checkTimestamp: 450 check += " check-imperfect-timestamp=1" 451 if self.checkOffset: 452 check += " check-imperfect-offset=1" 453 if check != "": 454 check = " ! identity name=%s-identity silent=TRUE %s" % ( 455 eaterName, check) 456 depay = self.DEPAY_TMPL + check 457 if not queue: 458 return self.FDSRC_TMPL + ' ! ' + depay 459 else: 460 return self.FDSRC_TMPL + ' ! ' + queue + ' ! ' + depay
461
462 - def get_feeder_template(self, eaterName):
463 return self.FEEDER_TMPL
464
465 - def get_queue_string(self, eaterName):
466 """ 467 Return a parse-launch description of a queue, if this component 468 wants an input queue on this eater, or None if not 469 """ 470 return None
471 472 ### BaseComponent interface implementation
473 - def do_start(self, clocking):
474 """ 475 Tell the component to start. 476 Whatever is using the component is responsible for making sure all 477 eaters have received their file descriptor to eat from. 478 479 @param clocking: tuple of (ip, port, base_time) of a master clock, 480 or None not to slave the clock 481 @type clocking: tuple(str, int, long) or None. 482 """ 483 self.debug('ParseLaunchComponent.start') 484 if clocking: 485 self.info('slaving to master clock on %s:%d with base time %d' % 486 clocking) 487 488 if clocking: 489 self.set_master_clock(*clocking) 490 491 self.link() 492 493 return defer.succeed(None)
494
495 -class Effect(log.Loggable):
496 """ 497 I am a part of a feed component for a specific group 498 of functionality. 499 500 @ivar name: name of the effect 501 @type name: string 502 @ivar component: component owning the effect 503 @type component: L{FeedComponent} 504 """ 505 logCategory = "effect" 506
507 - def __init__(self, name):
508 """ 509 @param name: the name of the effect 510 """ 511 self.name = name 512 self.setComponent(None)
513
514 - def setComponent(self, component):
515 """ 516 Set the given component as the effect's owner. 517 518 @param component: the component to set as an owner of this effect 519 @type component: L{FeedComponent} 520 """ 521 self.component = component 522 self.setUIState(component and component.uiState or None)
523
524 - def setUIState(self, state):
525 """ 526 Set the given UI state on the effect. This method is ideal for 527 adding keys to the UI state. 528 529 @param state: the UI state for the component to use. 530 @type state: L{flumotion.common.componentui.WorkerComponentUIState} 531 """ 532 self.uiState = state
533
534 - def getComponent(self):
535 """ 536 Get the component owning this effect. 537 538 @rtype: L{FeedComponent} 539 """ 540 return self.component
541
542 -class MultiInputParseLaunchComponent(ParseLaunchComponent):
543 """ 544 This class provides for multi-input ParseLaunchComponents, such as muxers, 545 with a queue attached to each input. 546 """ 547 QUEUE_SIZE_BUFFERS = 16 548
549 - def get_muxer_string(self, properties):
550 """ 551 Return a gst-parse description of the muxer, which must be named 'muxer' 552 """ 553 raise errors.NotImplementedError("Implement in a subclass")
554
555 - def get_queue_string(self, eaterName):
556 return "queue name=%s-queue max-size-buffers=%d" % (eaterName, 557 self.QUEUE_SIZE_BUFFERS)
558
559 - def get_pipeline_string(self, properties):
560 sources = self.config['source'] 561 562 pipeline = self.get_muxer_string(properties) + ' ' 563 for eater in sources: 564 tmpl = '@ eater:%s @ ! muxer. ' 565 pipeline += tmpl % eater 566 567 pipeline += 'muxer.' 568 569 return pipeline
570
571 - def unblock_eater(self, feedId):
572 # Firstly, ensure that any push in progress is guaranteed to return, 573 # by temporarily enlarging the queue 574 queuename = "eater:%s-queue" % feedId 575 queue = self.pipeline.get_by_name(queuename) 576 577 size = queue.get_property("max-size-buffers") 578 queue.set_property("max-size-buffers", size + 1) 579 580 # So, now it's guaranteed to return. However, we want to return the 581 # queue size to its original value. Doing this in a thread-safe manner 582 # is rather tricky... 583 def _block_cb(pad, blocked): 584 # This is called from streaming threads, but we don't do anything 585 # here so it's safe. 586 pass
587 def _underrun_cb(element): 588 # Called from a streaming thread. The queue element does not hold 589 # the queue lock when this is called, so we block our sinkpad, 590 # then re-check the current level. 591 pad = element.get_pad("sink") 592 pad.set_blocked_async(True, _block_cb) 593 level = element.get_property("current-level-buffers") 594 if level < self.QUEUE_SIZE_BUFFERS: 595 element.set_property('max-size-buffers', 596 self.QUEUE_SIZE_BUFFERS) 597 element.disconnect(signalid) 598 pad.set_blocked_async(False, _block_cb)
599 600 signalid = queue.connect("underrun", _underrun_cb) 601