Package flumotion :: Package worker :: Module main
[hide private]

Source Code for Module flumotion.worker.main

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import optparse 
 23  import os 
 24  import sys 
 25   
 26  from twisted.internet import reactor 
 27   
 28  from flumotion.configure import configure 
 29  from flumotion.common import log, keycards, common, errors 
 30  from flumotion.worker import worker, config 
 31  from flumotion.twisted import pb 
 32   
33 -def _createParser():
34 parser = optparse.OptionParser() 35 parser.add_option('-d', '--debug', 36 action="store", type="string", dest="debug", 37 help="set debug levels") 38 parser.add_option('-v', '--verbose', 39 action="store_true", dest="verbose", 40 help="be verbose") 41 parser.add_option('', '--version', 42 action="store_true", dest="version", 43 help="show version information") 44 45 group = optparse.OptionGroup(parser, "worker options") 46 group.add_option('-H', '--host', 47 action="store", type="string", dest="host", 48 help="manager host to connect to [default localhost]") 49 group.add_option('-P', '--port', 50 action="store", type="int", dest="port", 51 help="manager port to connect to " \ 52 "[default %d (ssl) or %d (tcp)]" % ( 53 configure.defaultSSLManagerPort, 54 configure.defaultTCPManagerPort)) 55 group.add_option('-T', '--transport', 56 action="store", type="string", dest="transport", 57 help="transport protocol to use (tcp/ssl) [default ssl]") 58 group.add_option('-n', '--name', 59 action="store", type="string", dest="name", 60 help="worker name to use in the manager") 61 group.add_option('-s', '--service-name', 62 action="store", type="string", dest="serviceName", 63 help="name to use for log and pid files " 64 "when run as a daemon") 65 group.add_option('-D', '--daemonize', 66 action="store_true", dest="daemonize", 67 help="run in background as a daemon") 68 group.add_option('', '--daemonize-to', 69 action="store", dest="daemonizeTo", 70 help="what directory to run from when daemonizing") 71 72 parser.add_option('-L', '--logdir', 73 action="store", dest="logdir", 74 help="flumotion log directory (default: %s)" % 75 configure.logdir) 76 parser.add_option('-R', '--rundir', 77 action="store", dest="rundir", 78 help="flumotion run directory (default: %s)" % 79 configure.rundir) 80 81 group.add_option('-u', '--username', 82 action="store", type="string", dest="username", 83 default="", 84 help="username to use") 85 group.add_option('-p', '--password', 86 action="store", type="string", dest="password", 87 default="", 88 help="password to use") 89 90 group.add_option('-F', '--feederports', 91 action="store", type="string", dest="feederports", 92 help="range of feeder ports to use") 93 group.add_option('', '--random-feederports', 94 action="store_true", 95 dest="randomFeederports", 96 help="Use randomly available feeder ports") 97 98 parser.add_option_group(group) 99 100 return parser
101
102 -def _readConfig(workerFile, options):
103 # modifies options dict in-place 104 log.info('worker', 'Reading configuration from %s' % workerFile) 105 try: 106 cfg = config.WorkerConfigXML(workerFile) 107 except config.ConfigError, value: 108 raise errors.SystemError( 109 "Could not load configuration from %s: %s" % ( 110 workerFile, value)) 111 except IOError, e: 112 raise errors.SystemError( 113 "Could not load configuration from %s: %s" % ( 114 workerFile, e.strerror)) 115 116 # now copy over stuff from config that is not set yet 117 if not options.name and cfg.name: 118 log.debug('worker', 'Setting worker name %s' % cfg.name) 119 options.name = cfg.name 120 121 # manager 122 if not options.host and cfg.manager.host: 123 options.host = cfg.manager.host 124 log.debug('worker', 'Setting manager host to %s' % options.host) 125 if not options.port and cfg.manager.port: 126 options.port = cfg.manager.port 127 log.debug('worker', 'Setting manager port to %s' % options.port) 128 if not options.transport and cfg.manager.transport: 129 options.transport = cfg.manager.transport 130 log.debug('worker', 'Setting manager transport to %s' % 131 options.transport) 132 133 # authentication 134 if not options.username and cfg.authentication.username: 135 options.username = cfg.authentication.username 136 log.debug('worker', 'Setting username %s' % options.username) 137 if not options.password and cfg.authentication.password: 138 options.password = cfg.authentication.password 139 log.debug('worker', 140 'Setting password [%s]' % ("*" * len(options.password))) 141 142 # feederports: list of allowed ports 143 # XML could specify it as empty, meaning "don't use any" 144 if not options.feederports and cfg.feederports is not None: 145 options.feederports = cfg.feederports 146 if options.randomFeederports: 147 options.feederports = None 148 log.debug('worker', 'Using random feederports') 149 if options.feederports is not None: 150 log.debug('worker', 'Using feederports %r' % options.feederports) 151 152 # general 153 # command-line debug > environment debug > config file debug 154 if not options.debug and cfg.fludebug \ 155 and not os.environ.has_key('FLU_DEBUG'): 156 options.debug = cfg.fludebug
157
158 -def main(args):
159 parser = _createParser() 160 log.debug('worker', 'Parsing arguments (%r)' % ', '.join(args)) 161 options, args = parser.parse_args(args) 162 163 # Force options down configure's throat 164 for d in ['logdir', 'rundir']: 165 o = getattr(options, d, None) 166 if o: 167 log.debug('worker', 'Setting configure.%s to %s' % (d, o)) 168 setattr(configure, d, o) 169 170 # verbose overrides --debug; is only a command-line option 171 if options.verbose: 172 options.debug = "*:3" 173 174 # apply the command-line debug level if is given through --verbose or -d 175 if options.debug: 176 log.setFluDebug(options.debug) 177 178 # translate feederports string to range 179 if options.feederports: 180 if not '-' in options.feederports: 181 raise errors.OptionError("feederports '%s' does not contain '-'" % 182 options.feederports) 183 (lower, upper) = options.feederports.split('-') 184 options.feederports = range(int(lower), int(upper) + 1) 185 186 # check if a config file was specified; if so, parse config and copy over 187 if len(args) > 1: 188 workerFile = args[1] 189 _readConfig(workerFile, options) 190 191 # set default values for all unset options 192 if not options.host: 193 options.host = 'localhost' 194 if not options.transport: 195 options.transport = 'ssl' 196 if not options.port: 197 if options.transport == "tcp": 198 options.port = configure.defaultTCPManagerPort 199 elif options.transport == "ssl": 200 options.port = configure.defaultSSLManagerPort 201 202 # set a default name if none is given 203 if not options.name: 204 if options.host == 'localhost': 205 options.name = 'localhost' 206 log.debug('worker', 'Setting worker name localhost') 207 else: 208 import socket 209 options.name = socket.gethostname() 210 log.debug('worker', 'Setting worker name %s (from hostname)' % 211 options.name) 212 213 if options.feederports is None and not options.randomFeederports: 214 options.feederports = configure.defaultGstPortRange 215 log.debug('worker', 'Using default feederports %r' % 216 options.feederports) 217 218 # check for wrong options/arguments 219 if not options.transport in ['ssl', 'tcp']: 220 sys.stderr.write('ERROR: wrong transport %s, must be ssl or tcp\n' % 221 options.transport) 222 return 1 223 224 # handle all options 225 if options.version: 226 print common.version("flumotion-worker") 227 return 0 228 229 # reset FLU_DEBUG which could be different after parsing XML file 230 if options.debug: 231 log.setFluDebug(options.debug) 232 233 if options.daemonizeTo and not options.daemonize: 234 sys.stderr.write( 235 'ERROR: --daemonize-to can only be used with -D/--daemonize.\n') 236 return 1 237 238 if options.serviceName and not options.daemonize: 239 sys.stderr.write( 240 'ERROR: --service-name can only be used with -D/--daemonize.\n') 241 return 1 242 243 brain = worker.WorkerBrain(options) 244 245 # Now bind and listen to our unix and tcp sockets 246 if not brain.listen(): 247 sys.stderr.write('ERROR: Failed to listen on worker ports.\n') 248 return 1 249 250 name = options.name 251 if options.daemonize: 252 if options.serviceName: 253 name = options.serviceName 254 255 common.startup("worker", name, options.daemonize, options.daemonizeTo) 256 257 log.debug('worker', 'Running Flumotion version %s' % 258 configure.version) 259 import twisted.copyright 260 log.debug('worker', 'Running against Twisted version %s' % 261 twisted.copyright.version) 262 log.debug('worker', 'Running against GStreamer version %s' % 263 configure.gst_version) 264 265 # register all package paths (FIXME: this should go away when 266 # components come from manager) 267 from flumotion.common import setup 268 setup.setupPackagePath() 269 270 # create a brain and have it remember the manager to direct jobs to 271 # connect the brain to the manager 272 if options.transport == "tcp": 273 reactor.connectTCP(options.host, options.port, 274 brain.workerClientFactory) 275 elif options.transport == "ssl": 276 from twisted.internet import ssl 277 reactor.connectSSL(options.host, options.port, 278 brain.workerClientFactory, 279 ssl.ClientContextFactory()) 280 281 log.info('worker', 282 'Connecting to manager %s:%d using %s' % (options.host, 283 options.port, 284 options.transport.upper())) 285 286 authenticator = pb.Authenticator( 287 username=options.username, 288 password=options.password, 289 address='localhost', # FIXME: why localhost ? 290 avatarId=options.name 291 ) 292 brain.login(authenticator) 293 294 reactor.addSystemEventTrigger('before', 'shutdown', brain.shutdownHandler) 295 296 # go into the reactor main loop 297 reactor.run() 298 299 return 0
300