Package flumotion :: Package launch :: Module main
[hide private]

Source Code for Module flumotion.launch.main

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Flumotion-launch: A gst-launch analog for Flumotion. 
 24   
 25  The goal of flumotion-launch is to provide an easy way for testing 
 26  flumotion components, without involving much of Flumotion's core code. 
 27   
 28  Flumotion-launch takes a terse gst-launch-like syntax, translates that 
 29  into a component graph, and starts the components. An example would be:: 
 30   
 31    flumotion-launch videotest ! theora-encoder ! ogg-muxer ! http-streamer 
 32   
 33  You can also set properties:: 
 34   
 35    flumotion-launch videotest framerate=15/2 
 36   
 37  You can link specific feeders as well:: 
 38   
 39    flumotion-launch firewire .audio ! vorbis-encoder 
 40    flumotion-launch firewire firewire0.audio ! vorbis-encoder 
 41   
 42  Components can be backreferenced using their names:: 
 43   
 44    flumotion-launch videotest audiotest videotest0. ! ogg-muxer \ 
 45                     audiotest0. ! ogg-muxer0. 
 46   
 47  In addition, components can have plugs:: 
 48   
 49    flumotion-launch http-streamer /apachelogger,logfile=/dev/stdout 
 50   
 51  Flumotion-launch explicitly avoids much of Flumotion's core logic. It 
 52  does not import flumotion.manager, flumotion.admin, or flumotion.worker. 
 53  There is no depgraph, no feed server, no job process. Although it might 
 54  be useful in the future to add a way to use the standard interfaces to 
 55  start components via admin, manager, worker, and job instances, this 
 56  low-level interface is useful in debugging problems and should be kept. 
 57  """ 
 58   
 59   
 60  import os 
 61  import sys 
 62   
 63  from twisted.python import reflect 
 64  from twisted.internet import reactor, defer 
 65   
 66  from flumotion.common import log, common, registry, errors, messages 
 67  from flumotion.common import i18n 
 68  from flumotion.common.options import OptionParser 
 69  from flumotion.configure import configure 
 70  from flumotion.twisted import flavors 
 71   
 72  from flumotion.launch import parse 
 73   
 74  from gettext import gettext as _ 
 75   
 76  __version__ = "$Rev: 6907 $" 
 77  _headings = { 
 78      messages.ERROR:   _('Error'), 
 79      messages.WARNING: _('Warning'), 
 80      messages.INFO:    _('Note') 
 81  } 
 82   
 83   
84 -def err(x):
85 sys.stderr.write(x + '\n') 86 raise SystemExit(1)
87 88
89 -class ComponentWrapper(object, log.Loggable):
90 logCategory = "compwrapper" 91
92 - def __init__(self, config):
93 self.name = config['name'] 94 self.config = config 95 self.procedure = self._getProcedure(config['type']) 96 self.component = None
97
98 - def _getProcedure(self, type):
99 r = registry.getRegistry() 100 c = r.getComponent(type) 101 try: 102 entry = c.getEntryByType('component') 103 except KeyError: 104 err('Component %s has no component entry' % self.name) 105 importname = entry.getModuleName(c.getBase()) 106 try: 107 module = reflect.namedAny(importname) 108 except Exception, e: 109 err('Could not load module %s for component %s: %s' 110 % (importname, self.name, e)) 111 return getattr(module, entry.getFunction())
112
113 - def instantiate(self):
114 errors = [] 115 def haveError(value): 116 translator = i18n.Translator() 117 localedir = os.path.join(configure.localedatadir, 'locale') 118 # FIXME: add locales as messages from domains come in 119 translator.addLocaleDir(configure.PACKAGE, localedir) 120 print "%s: %s" % (_headings[value.level], 121 translator.translate(value)) 122 if value.debug: 123 print "Debug information:", value.debug 124 errors.append(value)
125 126 self.component = self.procedure(self.config, 127 haveError=haveError) 128 return not bool(errors)
129
130 - def provideMasterClock(self, port):
131 # rtype: defer.Deferred 132 d = self.component.provide_master_clock(port) 133 return d
134
135 - def set_master_clock(self, ip, port, base_time):
136 return self.component.set_master_clock(ip, port, base_time)
137
138 - def stop(self):
139 return self.component.stop()
140
141 - def feedToFD(self, feedName, fd):
142 self.debug('feedToFD(feedName=%s, %d)' % (feedName, fd)) 143 return self.component.feedToFD(feedName, fd, os.close)
144
145 - def eatFromFD(self, eaterAlias, feedId, fd):
146 self.debug('eatFromFD(eaterAlias=%s, feedId=%s, %d)', 147 eaterAlias, feedId, fd) 148 return self.component.eatFromFD(eaterAlias, feedId, fd)
149
150 -def make_pipes(wrappers):
151 fds = {} # feedcompname:feeder => (fd, start()) 152 wrappersByName = dict([(wrapper.name, wrapper) 153 for wrapper in wrappers]) 154 def starter(wrapper, feedName, write): 155 return lambda: wrapper.feedToFD(feedName, write)
156 for wrapper in wrappers: 157 eaters = wrapper.config.get('eater', {}) 158 for eaterName in eaters: 159 for feedId, eaterAlias in eaters[eaterName]: 160 compName, feederName = common.parseFeedId(feedId) 161 read, write = os.pipe() 162 log.debug('launch', '%s: read from fd %d, write to fd %d', 163 feedId, read, write) 164 start = starter(wrappersByName[compName], feederName, write) 165 fds[feedId] = (read, start) 166 return fds 167
168 -def start_components(wrappers, fds):
169 # figure out the links and start the components 170 171 def provide_clock(): 172 # second phase: clocking 173 need_sync = [x for x in wrappers if x.config['clock-master']] 174 175 if need_sync: 176 master = None 177 for x in need_sync: 178 if x.config['clock-master'] == x.config['avatarId']: 179 master = x 180 break 181 assert master 182 need_sync.remove(master) 183 d = master.provideMasterClock(7600 - 1) # hack! 184 def addNeedSync(clocking): 185 return need_sync, clocking
186 d.addCallback(addNeedSync) 187 return d 188 else: 189 return defer.succeed((None, None)) 190 191 def do_start(synchronization, wrapper): 192 need_sync, clocking = synchronization 193 194 # start it up, with clocking data only if it needs it 195 eaters = wrapper.config.get('eater', {}) 196 for eaterName in eaters: 197 for feedId, eaterAlias in eaters[eaterName]: 198 read, start = fds[feedId] 199 wrapper.eatFromFD(eaterAlias, feedId, read) 200 start() 201 if (not need_sync) or (wrapper not in need_sync) or (not clocking): 202 clocking = None 203 if clocking: 204 wrapper.set_master_clock(*clocking) 205 return synchronization 206 207 def do_stop(failure): 208 for wrapper in wrappers: 209 wrapper.stop() 210 return failure 211 212 for wrapper in wrappers: 213 if not wrapper.instantiate(): 214 # we don't have a ComponentState, so we cheat and give the 215 # exception a wrapper 216 return defer.fail(errors.ComponentStartError(wrapper)) 217 d = provide_clock() 218 for wrapper in wrappers: 219 d.addCallback(do_start, wrapper) 220 d.addErrback(do_stop) 221 return d 222
223 -def main(args):
224 from flumotion.common import setup 225 setup.setupPackagePath() 226 from flumotion.configure import configure 227 log.debug('launch', 'Running Flumotion version %s' % 228 configure.version) 229 import twisted.copyright 230 log.debug('launch', 'Running against Twisted version %s' % 231 twisted.copyright.version) 232 from flumotion.project import project 233 for p in project.list(): 234 log.debug('launch', 'Registered project %s version %s' % ( 235 p, project.get(p, 'version'))) 236 237 parser = OptionParser(domain="flumotion-launch") 238 239 log.debug('launch', 'Parsing arguments (%r)' % ', '.join(args)) 240 options, args = parser.parse_args(args) 241 242 i18n.installGettext() 243 244 # verbose overrides --debug 245 if options.verbose: 246 log.setFluDebug("*:3") 247 248 # handle all options 249 if options.version: 250 print common.version("flumotion-launch") 251 return 0 252 253 if options.debug: 254 log.setFluDebug(options.debug) 255 256 # note parser versus parse 257 configs = parse.parse_args(args[1:]) 258 259 # load the modules, make the component 260 wrappers = [ComponentWrapper(config) for config in configs] 261 262 # make socket pairs 263 fds = make_pipes(wrappers) 264 265 reactor.running = False 266 reactor.failure = False 267 reactor.callLater(0, lambda: setattr(reactor, 'running', True)) 268 269 d = start_components(wrappers, fds) 270 271 def errback(failure): 272 log.debug('launch', log.getFailureMessage(failure)) 273 print "Error occurred: %s" % failure.getErrorMessage() 274 failure.printDetailedTraceback() 275 reactor.failure = True 276 if reactor.running: 277 print "Stopping reactor." 278 reactor.stop()
279 d.addErrback(errback) 280 281 if not reactor.failure: 282 print 'Running the reactor. Press Ctrl-C to exit.' 283 284 log.debug('launch', 'Starting reactor') 285 reactor.run() 286 287 log.debug('launch', 'Reactor stopped') 288 289 if reactor.failure: 290 return 1 291 else: 292 return 0 293