1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Flumotion-launch: A gst-launch analog for Flumotion.
24
25 The goal of flumotion-launch is to provide an easy way for testing
26 flumotion components, without involving much of Flumotion's core code.
27
28 Flumotion-launch takes a terse gst-launch-like syntax, translates that
29 into a component graph, and starts the components. An example would be::
30
31 flumotion-launch videotest ! theora-encoder ! ogg-muxer ! http-streamer
32
33 You can also set properties::
34
35 flumotion-launch videotest framerate=15/2
36
37 You can link specific feeders as well::
38
39 flumotion-launch firewire .audio ! vorbis-encoder
40 flumotion-launch firewire firewire0.audio ! vorbis-encoder
41
42 Components can be backreferenced using their names::
43
44 flumotion-launch videotest audiotest videotest0. ! ogg-muxer \
45 audiotest0. ! ogg-muxer0.
46
47 In addition, components can have plugs::
48
49 flumotion-launch http-streamer /apachelogger,logfile=/dev/stdout
50
51 Flumotion-launch explicitly avoids much of Flumotion's core logic. It
52 does not import flumotion.manager, flumotion.admin, or flumotion.worker.
53 There is no depgraph, no feed server, no job process. Although it might
54 be useful in the future to add a way to use the standard interfaces to
55 start components via admin, manager, worker, and job instances, this
56 low-level interface is useful in debugging problems and should be kept.
57 """
58
59
60 import os
61 import sys
62
63 from twisted.python import reflect
64 from twisted.internet import reactor, defer
65
66 from flumotion.common import log, common, registry, errors, messages
67 from flumotion.common import i18n
68 from flumotion.common.options import OptionParser
69 from flumotion.configure import configure
70 from flumotion.twisted import flavors
71
72 from flumotion.launch import parse
73
74 from gettext import gettext as _
75
76 __version__ = "$Rev: 6907 $"
77 _headings = {
78 messages.ERROR: _('Error'),
79 messages.WARNING: _('Warning'),
80 messages.INFO: _('Note')
81 }
82
83
85 sys.stderr.write(x + '\n')
86 raise SystemExit(1)
87
88
129
134
137
140
142 self.debug('feedToFD(feedName=%s, %d)' % (feedName, fd))
143 return self.component.feedToFD(feedName, fd, os.close)
144
145 - def eatFromFD(self, eaterAlias, feedId, fd):
149
151 fds = {}
152 wrappersByName = dict([(wrapper.name, wrapper)
153 for wrapper in wrappers])
154 def starter(wrapper, feedName, write):
155 return lambda: wrapper.feedToFD(feedName, write)
156 for wrapper in wrappers:
157 eaters = wrapper.config.get('eater', {})
158 for eaterName in eaters:
159 for feedId, eaterAlias in eaters[eaterName]:
160 compName, feederName = common.parseFeedId(feedId)
161 read, write = os.pipe()
162 log.debug('launch', '%s: read from fd %d, write to fd %d',
163 feedId, read, write)
164 start = starter(wrappersByName[compName], feederName, write)
165 fds[feedId] = (read, start)
166 return fds
167
169
170
171 def provide_clock():
172
173 need_sync = [x for x in wrappers if x.config['clock-master']]
174
175 if need_sync:
176 master = None
177 for x in need_sync:
178 if x.config['clock-master'] == x.config['avatarId']:
179 master = x
180 break
181 assert master
182 need_sync.remove(master)
183 d = master.provideMasterClock(7600 - 1)
184 def addNeedSync(clocking):
185 return need_sync, clocking
186 d.addCallback(addNeedSync)
187 return d
188 else:
189 return defer.succeed((None, None))
190
191 def do_start(synchronization, wrapper):
192 need_sync, clocking = synchronization
193
194
195 eaters = wrapper.config.get('eater', {})
196 for eaterName in eaters:
197 for feedId, eaterAlias in eaters[eaterName]:
198 read, start = fds[feedId]
199 wrapper.eatFromFD(eaterAlias, feedId, read)
200 start()
201 if (not need_sync) or (wrapper not in need_sync) or (not clocking):
202 clocking = None
203 if clocking:
204 wrapper.set_master_clock(*clocking)
205 return synchronization
206
207 def do_stop(failure):
208 for wrapper in wrappers:
209 wrapper.stop()
210 return failure
211
212 for wrapper in wrappers:
213 if not wrapper.instantiate():
214
215
216 return defer.fail(errors.ComponentStartError(wrapper))
217 d = provide_clock()
218 for wrapper in wrappers:
219 d.addCallback(do_start, wrapper)
220 d.addErrback(do_stop)
221 return d
222
224 from flumotion.common import setup
225 setup.setupPackagePath()
226 from flumotion.configure import configure
227 log.debug('launch', 'Running Flumotion version %s' %
228 configure.version)
229 import twisted.copyright
230 log.debug('launch', 'Running against Twisted version %s' %
231 twisted.copyright.version)
232 from flumotion.project import project
233 for p in project.list():
234 log.debug('launch', 'Registered project %s version %s' % (
235 p, project.get(p, 'version')))
236
237 parser = OptionParser(domain="flumotion-launch")
238
239 log.debug('launch', 'Parsing arguments (%r)' % ', '.join(args))
240 options, args = parser.parse_args(args)
241
242 i18n.installGettext()
243
244
245 if options.verbose:
246 log.setFluDebug("*:3")
247
248
249 if options.version:
250 print common.version("flumotion-launch")
251 return 0
252
253 if options.debug:
254 log.setFluDebug(options.debug)
255
256
257 configs = parse.parse_args(args[1:])
258
259
260 wrappers = [ComponentWrapper(config) for config in configs]
261
262
263 fds = make_pipes(wrappers)
264
265 reactor.running = False
266 reactor.failure = False
267 reactor.callLater(0, lambda: setattr(reactor, 'running', True))
268
269 d = start_components(wrappers, fds)
270
271 def errback(failure):
272 log.debug('launch', log.getFailureMessage(failure))
273 print "Error occurred: %s" % failure.getErrorMessage()
274 failure.printDetailedTraceback()
275 reactor.failure = True
276 if reactor.running:
277 print "Stopping reactor."
278 reactor.stop()
279 d.addErrback(errback)
280
281 if not reactor.failure:
282 print 'Running the reactor. Press Ctrl-C to exit.'
283
284 log.debug('launch', 'Starting reactor')
285 reactor.run()
286
287 log.debug('launch', 'Reactor stopped')
288
289 if reactor.failure:
290 return 1
291 else:
292 return 0
293