1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Feed components, participating in the stream
24 """
25
26 import os
27
28 import gst
29 import gst.interfaces
30 import gobject
31
32 from twisted.internet import reactor, defer
33 from twisted.spread import pb
34 from zope.interface import implements
35
36 from flumotion.configure import configure
37 from flumotion.component import component as basecomponent
38 from flumotion.component import feed
39 from flumotion.common import common, interfaces, errors, log, pygobject, \
40 messages
41 from flumotion.common import gstreamer
42 from flumotion.common.i18n import N_, gettexter
43 from flumotion.common.planet import moods
44 from flumotion.common.pygobject import gsignal
45
46 __version__ = "$Rev: 6695 $"
47 T_ = gettexter()
48
49
51 """
52 I am a component-side medium for a FeedComponent to interface with
53 the manager-side ComponentAvatar.
54 """
55 implements(interfaces.IComponentMedium)
56 logCategory = 'feedcompmed'
57 remoteLogName = 'feedserver'
58
60 """
61 @param component: L{flumotion.component.feedcomponent.FeedComponent}
62 """
63 basecomponent.BaseComponentMedium.__init__(self, component)
64
65 self._feederFeedServer = {}
66
67 self._feederPendingConnections = {}
68 self._eaterFeedServer = {}
69
70 self._eaterPendingConnections = {}
71 self.logName = component.name
72
73
76
78 """
79 Sets the GStreamer debugging levels based on the passed debug string.
80
81 @since: 0.4.2
82 """
83 self.debug('Setting GStreamer debug level to %s' % debug)
84 if not debug:
85 return
86
87 for part in debug.split(','):
88 glob = None
89 value = None
90 pair = part.split(':')
91 if len(pair) == 1:
92
93 value = int(pair[0])
94 elif len(pair) == 2:
95 glob, value = pair
96 value = int(value)
97 else:
98 self.warning("Cannot parse GStreamer debug setting '%s'." %
99 part)
100 continue
101
102 if glob:
103 try:
104
105 gst.debug_set_threshold_for_name(glob, value)
106 except TypeError:
107 self.warning("Cannot set glob %s to value %s" % (
108 glob, value))
109 else:
110 gst.debug_set_default_threshold(value)
111
113 """
114 Tell the component the host and port for the FeedServer through which
115 it can connect a local eater to a remote feeder to eat the given
116 fullFeedId.
117
118 Called on by the manager-side ComponentAvatar.
119 """
120 self._feederFeedServer[eaterAlias] = (fullFeedId, host, port)
121 return self.connectEater(eaterAlias)
122
135
137 """
138 Connect one of the medium's component's eaters to a remote feed.
139 Called by the component, both on initial connection and for
140 reconnecting.
141
142 @returns: (deferred, cancel) pair, where cancel is a thunk that
143 you can call to cancel any pending connection attempt.
144 """
145 def gotFeed((feedId, fd)):
146 self._feederPendingConnections.pop(eaterAlias, None)
147 self.comp.eatFromFD(eaterAlias, feedId, fd)
148
149 if eaterAlias not in self._feederFeedServer:
150 self.debug("eatFrom() hasn't been called yet for eater %s",
151 eaterAlias)
152
153
154 return defer.succeed(None)
155
156 (fullFeedId, host, port) = self._feederFeedServer[eaterAlias]
157
158 cancel = self._feederPendingConnections.pop(eaterAlias, None)
159 if cancel:
160 self.debug('cancelling previous connection attempt on %s',
161 eaterAlias)
162 cancel()
163
164 client = feed.FeedMedium(logName=self.comp.name)
165
166 d = client.requestFeed(host, port,
167 self._getAuthenticatorForFeed(eaterAlias),
168 fullFeedId)
169 self._feederPendingConnections[eaterAlias] = client.stopConnecting
170 d.addCallback(gotFeed)
171 return d
172
174 """
175 Tell the component to feed the given feed to the receiving component
176 accessible through the FeedServer on the given host and port.
177
178 Called on by the manager-side ComponentAvatar.
179 """
180 self._eaterFeedServer[fullFeedId] = (host, port)
181 self.connectFeeder(feederName, fullFeedId)
182
184 """
185 Tell the component to feed the given feed to the receiving component
186 accessible through the FeedServer on the given host and port.
187
188 Called on by the manager-side ComponentAvatar.
189 """
190 def gotFeed((fullFeedId, fd)):
191 self._eaterPendingConnections.pop(feederName, None)
192 self.comp.feedToFD(feederName, fd, os.close, fullFeedId)
193
194 if fullFeedId not in self._eaterFeedServer:
195 self.debug("feedTo() hasn't been called yet for feeder %s",
196 feederName)
197
198
199 return defer.succeed(None)
200
201 host, port = self._eaterFeedServer[fullFeedId]
202
203
204 cancel = self._eaterPendingConnections.pop(fullFeedId, None)
205 if cancel:
206 self.debug('cancelling previous connection attempt on %s',
207 feederName)
208 cancel()
209
210 client = feed.FeedMedium(logName=self.comp.name)
211
212 d = client.sendFeed(host, port,
213 self._getAuthenticatorForFeed(feederName),
214 fullFeedId)
215 self._eaterPendingConnections[feederName] = client.stopConnecting
216 d.addCallback(gotFeed)
217 return d
218
220 """
221 Tells the component to start providing a master clock on the given
222 UDP port.
223 Can only be called if setup() has been called on the component.
224
225 The IP address returned is the local IP the clock is listening on.
226
227 @returns: (ip, port, base_time)
228 @rtype: tuple of (str, int, long)
229 """
230 self.debug('remote_provideMasterClock(port=%r)' % port)
231 return self.comp.provide_master_clock(port)
232
234 """
235 Return the clock master info created by a previous call to provideMasterClock.
236
237 @returns: (ip, port, base_time)
238 @rtype: tuple of (str, int, long)
239 """
240 return self.comp.get_master_clock()
241
244
245 - def remote_effect(self, effectName, methodName, *args, **kwargs):
246 """
247 Invoke the given methodName on the given effectName in this component.
248 The effect should implement effect_(methodName) to receive the call.
249 """
250 self.debug("calling %s on effect %s" % (methodName, effectName))
251 if not effectName in self.comp.effects:
252 raise errors.UnknownEffectError(effectName)
253 effect = self.comp.effects[effectName]
254 if not hasattr(effect, "effect_%s" % methodName):
255 raise errors.NoMethodError("%s on effect %s" % (methodName,
256 effectName))
257 method = getattr(effect, "effect_%s" % methodName)
258 try:
259 result = method(*args, **kwargs)
260 except TypeError:
261 msg = "effect method %s did not accept %s and %s" % (
262 methodName, args, kwargs)
263 self.debug(msg)
264 raise errors.RemoteRunError(msg)
265 self.debug("effect: result: %r" % result)
266 return result
267
268 from feedcomponent010 import FeedComponent
269
270 FeedComponent.componentMediumClass = FeedComponentMedium
271
273 """A component using gst-launch syntax
274
275 @cvar checkTimestamp: whether to check continuity of timestamps for eaters
276 @cvar checkOffset: whether to check continuity of offsets for
277 eaters
278 """
279
280 DELIMITER = '@'
281
282
283 checkTimestamp = False
284 checkOffset = False
285
286
287 FDSRC_TMPL = 'fdsrc name=%(name)s'
288 DEPAY_TMPL = 'gdpdepay name=%(name)s-depay'
289 FEEDER_TMPL = 'gdppay name=%(name)s-pay ! multifdsink sync=false '\
290 'name=%(name)s buffers-max=500 buffers-soft-max=450 '\
291 'recover-policy=1'
292 EATER_TMPL = None
293
313
314
340
348
349
351 """
352 Method that must be implemented by subclasses to produce the
353 gstparse string for the component's pipeline. Subclasses should
354 not chain up; this method raises a NotImplemented error.
355
356 Returns: a new pipeline string representation.
357 """
358 raise NotImplementedError('subclasses should implement '
359 'get_pipeline_string')
360
370
371
382
384 """
385 Expand the given pipeline string representation by substituting
386 blocks between '@' with a filled-in template.
387
388 @param pipeline: a pipeline string representation with variables
389 @param templatizers: A dict of prefix => procedure. Template
390 blocks in the pipeline will be replaced
391 with the result of calling the procedure
392 with what is left of the template after
393 taking off the prefix.
394 @returns: a new pipeline string representation.
395 """
396 assert pipeline != ''
397
398
399 if pipeline.count(self.DELIMITER) % 2 != 0:
400 raise TypeError("'%s' contains an odd number of '%s'"
401 % (pipeline, self.DELIMITER))
402
403 out = []
404 for i, block in enumerate(pipeline.split(self.DELIMITER)):
405
406
407 if i % 2 == 0:
408 out.append(block)
409 else:
410 block = block.strip()
411 try:
412 pos = block.index(':')
413 except ValueError:
414 raise TypeError("Template %r has no colon" % (block,))
415 prefix = block[:pos+1]
416 if prefix not in templatizers:
417 raise TypeError("Template %r has invalid prefix %r"
418 % (block, prefix))
419 out.append(templatizers[prefix](block[pos+1:]))
420 return ''.join(out)
421
443
445 queue = self.get_queue_string(eaterAlias)
446 elementName = self.eaters[eaterAlias].elementName
447
448 return self.EATER_TMPL % {'name': elementName, 'queue': queue}
449
451 elementName = self.feeders[feederName].elementName
452 return self.FEEDER_TMPL % {'name': elementName}
453
455 """
456 Return a parse-launch string to join the fdsrc eater element and
457 the depayer, for example '!' or '! queue !'. The string may have
458 no format strings.
459 """
460 return '!'
461
463 """
464 I am a part of a feed component for a specific group
465 of functionality.
466
467 @ivar name: name of the effect
468 @type name: string
469 @ivar component: component owning the effect
470 @type component: L{FeedComponent}
471 """
472 logCategory = "effect"
473
475 """
476 @param name: the name of the effect
477 """
478 self.name = name
479 self.setComponent(None)
480
482 """
483 Set the given component as the effect's owner.
484
485 @param component: the component to set as an owner of this effect
486 @type component: L{FeedComponent}
487 """
488 self.component = component
489 self.setUIState(component and component.uiState or None)
490
492 """
493 Set the given UI state on the effect. This method is ideal for
494 adding keys to the UI state.
495
496 @param state: the UI state for the component to use.
497 @type state: L{flumotion.common.componentui.WorkerComponentUIState}
498 """
499 self.uiState = state
500
502 """
503 Get the component owning this effect.
504
505 @rtype: L{FeedComponent}
506 """
507 return self.component
508
576
577 signalid = queue.connect("underrun", _underrun_cb)
578