Package flumotion :: Package launch :: Module main
[hide private]

Source Code for Module flumotion.launch.main

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22   
 23  """ 
 24  Flumotion-launch: A gst-launch analog for Flumotion. 
 25   
 26  The goal of flumotion-launch is to provide an easy way for testing 
 27  flumotion components, without involving much of Flumotion's core code. 
 28   
 29  Flumotion-launch takes a terse gst-launch-like syntax, translates that 
 30  into a component graph, and starts the components. An example would be:: 
 31   
 32    flumotion-launch videotest ! theora-encoder ! ogg-muxer ! http-streamer 
 33   
 34  You can also set properties:: 
 35   
 36    flumotion-launch videotest framerate=15/2 
 37   
 38  You can link specific feeders as well:: 
 39   
 40    flumotion-launch firewire .audio ! vorbis-encoder 
 41    flumotion-launch firewire firewire0.audio ! vorbis-encoder 
 42   
 43  Components can be backreferenced using their names:: 
 44   
 45    flumotion-launch videotest audiotest videotest0. ! ogg-muxer \ 
 46                     audiotest0. ! ogg-muxer0. 
 47   
 48  In addition, components can have plugs:: 
 49   
 50    flumotion-launch http-streamer /apachelogger,logfile=/dev/stdout 
 51   
 52  Flumotion-launch explicitly avoids much of Flumotion's core logic. It 
 53  does not import flumotion.manager, flumotion.admin, or flumotion.worker. 
 54  There is no depgraph, no feed server, no job process. Although it might 
 55  be useful in the future to add a way to use the standard interfaces to 
 56  start components via admin, manager, worker, and job instances, this 
 57  low-level interface is useful in debugging problems and should be kept. 
 58  """ 
 59   
 60   
 61  import optparse 
 62  import os 
 63  import sys 
 64   
 65  from twisted.python import reflect 
 66  from twisted.internet import reactor, defer 
 67   
 68  from flumotion.common import log, common, registry, errors, messages 
 69  from flumotion.twisted import flavors 
 70   
 71  from flumotion.launch import parse 
 72   
 73  from gettext import gettext as _ 
 74   
 75  _headings = { 
 76      messages.ERROR:   _('Error'), 
 77      messages.WARNING: _('Warning'), 
 78      messages.INFO:    _('Note') 
 79  } 
 80   
 81   
82 -def err(x):
83 sys.stderr.write(x + '\n') 84 raise SystemExit(1)
85 86
87 -class ComponentWrapper(object):
88 - def __init__(self, config):
89 self.name = config['name'] 90 self.config = config 91 self.procedure = self._getProcedure(config['type']) 92 self.component = None
93
94 - def _getProcedure(self, type):
95 r = registry.getRegistry() 96 c = r.getComponent(type) 97 try: 98 entry = c.getEntryByType('component') 99 except KeyError: 100 err('Component %s has no component entry' % self.name) 101 importname = entry.getModuleName(c.getBase()) 102 try: 103 module = reflect.namedAny(importname) 104 except Exception, e: 105 err('Could not load module %s for component %s: %s' 106 % (importname, self.name, e)) 107 return getattr(module, entry.getFunction())
108
109 - def instantiate(self):
110 self.component = self.procedure() 111 # since we cannot listen to a StateCacheable, we monkeypatch the 112 # append method so we can intercept messages 113 def append(instance, key, value): 114 if key == 'messages': 115 translator = messages.Translator() 116 print "%s: %s" % (_headings[value.level], 117 translator.translate(value)) 118 if value.debug: 119 print "Debug information:", value.debug 120 flavors.StateCacheable.append(instance, key, value)
121 122 import new 123 self.component.state.append = new.instancemethod(append, 124 self.component.state) 125 d = self.component.setup(self.config) 126 def handledEb(failure): 127 failure.trap(errors.ComponentSetupHandledError) 128 os._exit(1)
129 d.addErrback(handledEb) 130 return d 131
132 - def provideMasterClock(self, port):
133 d = self.component.provide_master_clock(port) 134 return d
135
136 - def start(self, clocking):
137 return self.component.start(clocking)
138
139 - def stop(self):
140 return self.component.stop()
141
142 - def feedToFD(self, feedName, fd):
143 return self.component.feedToFD(feedName, fd, os.close)
144
145 - def eatFromFD(self, feedId, fd):
146 return self.component.eatFromFD(feedId, fd)
147
148 -def make_pipes(wrappers):
149 fds = {} # feedcompname:feeder => (fd, start()) 150 wrappersByName = dict([(wrapper.name, wrapper) 151 for wrapper in wrappers]) 152 def starter(wrapper, feedName, write): 153 return lambda: wrapper.feedToFD(feedName, write)
154 for wrapper in wrappers: 155 for source in wrapper.config.get('source', []): 156 compName, feedName = source.split(':') 157 read, write = os.pipe() 158 start = starter(wrappersByName[compName], feedName, write) 159 fds[source] = (read, start) 160 return fds 161
162 -def DeferredDelay(time, val):
163 d = defer.Deferred() 164 reactor.callLater(time, d.callback, val) 165 return d
166
167 -def start_components(wrappers, fds, delay):
168 # figure out the links and start the components 169 170 # first phase: instantiation and setup 171 def got_results(results): 172 success = True 173 for result, wrapper in zip(results, wrappers): 174 if not result[0]: 175 print ("Component %s failed to start, reason: %r" 176 % (wrapper, result[1])) 177 success = False 178 if not success: 179 raise errors.ComponentStartError()
180 181 def choose_clocking(unused): 182 # second phase: clocking 183 need_sync = [(x.config['clock-master'], x) for x in wrappers 184 if x.config['clock-master'] is not None] 185 need_sync.sort() 186 need_sync = [x[1] for x in need_sync] 187 188 if need_sync: 189 def addNeedSync(clocking): 190 return need_sync, clocking 191 master = need_sync.pop(0) 192 print "Telling", master.name, "to provide the master clock." 193 d = master.provideMasterClock(7600 - 1) # hack! 194 d.addCallback(addNeedSync) 195 return d 196 else: 197 return None, None 198 199 def add_delay(val): 200 if delay: 201 print 'Delaying component startup by %f seconds...' % delay 202 return DeferredDelay(delay, val) 203 else: 204 return defer.succeed(val) 205 206 def do_start(synchronization, wrapper): 207 need_sync, clocking = synchronization 208 209 # start it up, with clocking data only if it needs it 210 for source in wrapper.config.get('source', []): 211 read, start = fds[source] 212 wrapper.eatFromFD(source, read) 213 start() 214 if (not need_sync) or (wrapper not in need_sync) or (not clocking): 215 clocking = None 216 d = wrapper.start(clocking) 217 d.addCallback(lambda val: synchronization) 218 return d 219 220 def do_stop(failure): 221 for wrapper in wrappers: 222 wrapper.stop() 223 return failure 224 225 d = defer.DeferredList([wrapper.instantiate() for wrapper in wrappers]) 226 d.addCallback(got_results) 227 d.addCallback(choose_clocking) 228 for wrapper in wrappers: 229 d.addCallback(add_delay) 230 d.addCallback(do_start, wrapper) 231 d.addErrback(do_stop) 232 return d 233
234 -def main(args):
235 from flumotion.common import setup 236 setup.setupPackagePath() 237 from flumotion.configure import configure 238 log.debug('manager', 'Running Flumotion version %s' % 239 configure.version) 240 import twisted.copyright 241 log.debug('launch', 'Running against Twisted version %s' % 242 twisted.copyright.version) 243 from flumotion.project import project 244 for p in project.list(): 245 log.debug('launch', 'Registered project %s version %s' % ( 246 p, project.get(p, 'version'))) 247 248 parser = optparse.OptionParser() 249 parser.add_option('-d', '--debug', 250 action="store", type="string", dest="debug", 251 help="set debug levels") 252 parser.add_option('', '--delay', 253 action="store", type="float", dest="delay", 254 help="set debug levels") 255 parser.add_option('-v', '--verbose', 256 action="store_true", dest="verbose", 257 help="be verbose") 258 parser.add_option('', '--version', 259 action="store_true", dest="version", 260 default=False, 261 help="show version information") 262 263 log.debug('worker', 'Parsing arguments (%r)' % ', '.join(args)) 264 options, args = parser.parse_args(args) 265 266 # verbose overrides --debug 267 if options.verbose: 268 options.debug = "*:3" 269 270 # handle all options 271 if options.version: 272 print common.version("flumotion-launch") 273 return 0 274 275 if options.debug: 276 log.setFluDebug(options.debug) 277 278 if options.delay: 279 delay = options.delay 280 else: 281 delay = 0. 282 283 # note parser versus parse 284 configs = parse.parse_args(args[1:]) 285 286 # load the modules, make the component 287 wrappers = [ComponentWrapper(config) for config in configs] 288 289 # make socket pairs 290 fds = make_pipes(wrappers) 291 292 reactor.running = False 293 reactor.failure = False 294 reactor.callLater(0, lambda: setattr(reactor, 'running', True)) 295 296 d = start_components(wrappers, fds, delay) 297 298 def errback(failure): 299 log.debug('launch', log.getFailureMessage(failure)) 300 print "Error occurred: %s" % failure.getErrorMessage() 301 failure.printDetailedTraceback() 302 reactor.failure = True 303 if reactor.running: 304 print "Stopping reactor." 305 reactor.stop()
306 d.addErrback(errback) 307 308 if not reactor.failure: 309 print 'Running the reactor. Press Ctrl-C to exit.' 310 311 log.debug('launch', 'Starting reactor') 312 reactor.run() 313 314 log.debug('launch', 'Reactor stopped') 315 316 if reactor.failure: 317 return 1 318 else: 319 return 0 320