Package flumotion :: Package job :: Module job
[hide private]

Source Code for Module flumotion.job.job

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  the job-side half of the worker-job connection 
 24  """ 
 25   
 26  import os 
 27  import resource 
 28  import sys 
 29   
 30  # I've read somewhere that importing the traceback module messes up the 
 31  # exception state, so it's better to import it globally instead of in the 
 32  # exception handler 
 33  # import traceback 
 34   
 35  from twisted.cred import credentials 
 36  from twisted.internet import reactor, defer 
 37  from twisted.python import failure 
 38  from twisted.spread import pb 
 39   
 40  from flumotion.common import config, errors, interfaces, log, registry, keycards 
 41  from flumotion.common import medium, package 
 42  from flumotion.common.reflectcall import createComponent 
 43  from flumotion.component import component 
 44   
 45  from flumotion.twisted import fdserver 
 46  from flumotion.twisted import pb as fpb 
 47  from flumotion.twisted import defer as fdefer 
 48   
 49  from flumotion.twisted.defer import defer_generator_method 
 50  from flumotion.twisted.compat import implements 
 51   
52 -class JobMedium(medium.BaseMedium):
53 """ 54 I am a medium between the job and the worker's job avatar. 55 I live in the job process. 56 57 @cvar component: the component this is a medium for; created as part of 58 L{remote_create} 59 @type component: L{flumotion.component.component.BaseComponent} 60 """ 61 logCategory = 'jobmedium' 62 remoteLogName = 'jobavatar' 63 64 implements(interfaces.IJobMedium) 65
66 - def __init__(self):
67 self.avatarId = None 68 self.logName = None 69 self.component = None 70 71 self._workerName = None 72 self._managerHost = None 73 self._managerPort = None 74 self._managerTransport = None 75 self._managerKeycard = None 76 self._componentClientFactory = None # from component to manager 77 78 self._hasStoppedReactor = False
79 80 ### pb.Referenceable remote methods called on by the WorkerBrain
81 - def remote_bootstrap(self, workerName, host, port, transport, authenticator, 82 packagePaths):
83 """ 84 I receive the information on how to connect to the manager. I also set 85 up package paths to be able to run the component. 86 87 Called by the worker's JobAvatar. 88 89 @param workerName: the name of the worker running this job 90 @type workerName: str 91 @param host: the host that is running the manager 92 @type host: str 93 @param port: port on which the manager is listening 94 @type port: int 95 @param transport: 'tcp' or 'ssl' 96 @type transport: str 97 @param authenticator: remote reference to the worker-side authenticator 98 @type authenticator: L{twisted.spread.pb.RemoteReference} to a 99 L{flumotion.twisted.pb.Authenticator} 100 @param packagePaths: ordered list of 101 (package name, package path) tuples 102 @type packagePaths: list of (str, str) 103 """ 104 assert isinstance(workerName, str) 105 assert isinstance(host, str) 106 assert isinstance(port, int) 107 assert transport in ('ssl', 'tcp') 108 assert isinstance(authenticator, pb.RemoteReference) 109 assert isinstance(packagePaths, list) 110 111 self._workerName = workerName 112 self._managerHost = host 113 self._managerPort = port 114 self._managerTransport = transport 115 self._authenticator = fpb.RemoteAuthenticator(authenticator) 116 117 packager = package.getPackager() 118 for name, path in packagePaths: 119 self.debug('registering package path for %s' % name) 120 self.log('... from path %s' % path) 121 packager.registerPackagePath(path, name)
122
123 - def remote_create(self, avatarId, type, moduleName, methodName, nice=0):
124 """ 125 I am called on by the worker's JobAvatar to create a component. 126 127 @param avatarId: avatarId for component to log in to manager 128 @type avatarId: str 129 @param type: type of component to start 130 @type type: str 131 @param moduleName: name of the module to create the component from 132 @type moduleName: str 133 @param methodName: the factory method to use to create the component 134 @type methodName: str 135 @param nice: the nice level 136 @type nice: int 137 """ 138 self.avatarId = avatarId 139 self.logName = avatarId 140 141 self.component = self._createComponent(avatarId, type, moduleName, 142 methodName, nice) 143 self.component.setShutdownHook(self._componentStopped)
144
145 - def _componentStopped(self):
146 # stop reactor from a callLater so remote methods finish nicely 147 reactor.callLater(0, self.shutdown)
148
149 - def remote_stop(self):
150 if self.component: 151 self.debug('stopping component and shutting down') 152 self.component.stop() 153 else: 154 reactor.callLater(0, self.shutdown)
155
156 - def shutdownHandler(self):
157 dlist = [] 158 if self.hasRemoteReference(): 159 # tell the worker we are shutting down 160 dlist.append(self.callRemote("cleanShutdown")) 161 if self.component: 162 medium = self.component.medium 163 if medium.hasRemoteReference(): 164 dlist.append(medium.callRemote("cleanShutdown")) 165 166 # We mustn't fire the deferred returned from here except from a 167 # callLater. 168 dl = defer.DeferredList(dlist, fireOnOneErrback=False) 169 return fdefer.defer_call_later(dl)
170 171 ### our methods
172 - def shutdown(self):
173 """ 174 Shut down the job process completely, cleaning up the component 175 so the reactor can be left from. 176 """ 177 if self._hasStoppedReactor: 178 self.debug("Not stopping reactor again, already shutting down") 179 else: 180 self._hasStoppedReactor = True 181 self.info("Stopping reactor in job process") 182 reactor.stop()
183
184 - def _setNice(self, nice):
185 if not nice: 186 return 187 188 try: 189 os.nice(nice) 190 except OSError, e: 191 self.warning('Failed to set nice level: %s' % str(e)) 192 else: 193 self.debug('Nice level set to %d' % nice)
194
195 - def _enableCoreDumps(self):
196 soft, hard = resource.getrlimit(resource.RLIMIT_CORE) 197 if hard != resource.RLIM_INFINITY: 198 self.warning('Could not set unlimited core dump sizes, ' 199 'setting to %d instead' % hard) 200 else: 201 self.debug('Enabling core dumps of unlimited size') 202 203 resource.setrlimit(resource.RLIMIT_CORE, (hard, hard))
204
205 - def _createComponent(self, avatarId, type, moduleName, methodName, nice=0):
206 """ 207 Create a component of the given type. 208 Log in to the manager with the given avatarId. 209 210 @param avatarId: avatarId component will use to log in to manager 211 @type avatarId: str 212 @param type: type of component to start 213 @type type: str 214 @param moduleName: name of the module that contains the entry point 215 @type moduleName: str 216 @param methodName: name of the factory method to create the component 217 @type methodName: str 218 @param nice: the nice level to run with 219 @type nice: int 220 """ 221 self.info('Creating component "%s" of type "%s"' % (avatarId, type)) 222 #self.info('setting up signals') 223 #signal.signal(signal.SIGINT, signal.SIG_IGN) 224 225 self._setNice(nice) 226 self._enableCoreDumps() 227 228 try: 229 comp = createComponent(moduleName, methodName) 230 except Exception, e: 231 msg = "Exception %s during createComponent: %s" % ( 232 e.__class__.__name__, " ".join(e.args)) 233 # traceback.print_exc() 234 # a ComponentCreateError is already formatted 235 if isinstance(e, errors.ComponentCreateError): 236 msg = e.args[0] 237 self.warning( 238 "raising ComponentCreateError(%s) and stopping job" % msg) 239 # This is a Nasty Hack. We raise ComponentCreateError, which can be 240 # caught on the other side and marshalled as a reasonably 241 # comprehensible error message. However, if we shutdown immediately, 242 # the PB connection won't be available, so the worker will just get 243 # an error about that! So, instead, we shut down in a tenth of a 244 # second, usually allowing the worker to get scheduled and read the 245 # exception over PB. Ick! 246 reactor.callLater(0.1, self.shutdown) 247 raise errors.ComponentCreateError(msg) 248 249 comp.setWorkerName(self._workerName) 250 251 # make component log in to manager 252 self.debug('creating ComponentClientFactory') 253 managerClientFactory = component.ComponentClientFactory(comp) 254 self._componentClientFactory = managerClientFactory 255 self.debug('created ComponentClientFactory %r' % managerClientFactory) 256 self._authenticator.avatarId = avatarId 257 managerClientFactory.startLogin(self._authenticator) 258 259 host = self._managerHost 260 port = self._managerPort 261 transport = self._managerTransport 262 self.debug('logging in with authenticator %r' % self._authenticator) 263 if transport == "ssl": 264 from twisted.internet import ssl 265 self.info('Connecting to manager %s:%d with SSL' % (host, port)) 266 reactor.connectSSL(host, port, managerClientFactory, 267 ssl.ClientContextFactory()) 268 elif transport == "tcp": 269 self.info('Connecting to manager %s:%d with TCP' % (host, port)) 270 reactor.connectTCP(host, port, managerClientFactory) 271 else: 272 self.warning('Unknown transport protocol %s' % self._managerTransport) 273 274 return comp
275
276 -class JobClientBroker(pb.Broker, log.Loggable):
277 """ 278 A pb.Broker subclass that handles FDs being passed (with associated data) 279 over the same connection as the normal PB data stream. 280 When an FD is seen, the FD should be added to a given eater or feeder 281 element. 282 """
283 - def __init__(self, connectionClass, **kwargs):
284 """ 285 @param connectionClass: a subclass of L{twisted.internet.tcp.Connection} 286 """ 287 pb.Broker.__init__(self, **kwargs) 288 289 self._connectionClass = connectionClass
290
291 - def fileDescriptorsReceived(self, fds, message):
292 # file descriptors get delivered to the component 293 self.debug('received fds %r, message %r' % (fds, message)) 294 if message.startswith('sendFeed '): 295 def parseargs(_, feedName, eaterId=None): 296 return feedName, eaterId
297 feedName, eaterId = parseargs(*message.split(' ')) 298 self.factory.medium.component.feedToFD(feedName, fds[0], 299 os.close, eaterId) 300 elif message.startswith('receiveFeed '): 301 feedId = message[len('receiveFeed '):] 302 self.factory.medium.component.eatFromFD(feedId, fds[0]) 303 elif message == 'redirectStdout': 304 self.debug('told to rotate stdout to fd %d', fds[0]) 305 os.dup2(fds[0], sys.stdout.fileno()) 306 os.close(fds[0]) 307 self.debug('rotated stdout') 308 elif message == 'redirectStderr': 309 self.debug('told to rotate stderr to fd %d', fds[0]) 310 os.dup2(fds[0], sys.stderr.fileno()) 311 os.close(fds[0]) 312 self.info('rotated stderr') 313 else: 314 self.warning('Unknown message received: %r' % message)
315
316 -class JobClientFactory(pb.PBClientFactory, log.Loggable):
317 """ 318 I am a client factory that logs in to the WorkerBrain. 319 I live in the flumotion-job process spawned by the worker. 320 321 @cvar medium: the medium for the JobHeaven to access us through 322 @type medium: L{JobMedium} 323 """ 324 logCategory = "job" 325 perspectiveInterface = interfaces.IJobMedium 326
327 - def __init__(self, id):
328 """ 329 @param id: the avatar id used for logging into the workerbrain 330 @type id: str 331 """ 332 pb.PBClientFactory.__init__(self) 333 334 self.medium = JobMedium() 335 self.logName = id 336 self.login(id) 337 338 # use an FD-passing broker instead 339 self.protocol = JobClientBroker
340 341 ### pb.PBClientFactory methods
342 - def buildProtocol(self, addr):
343 p = self.protocol(fdserver.FDServer) 344 p.factory = self 345 return p
346 347 # FIXME: might be nice if jobs got a password to use to log in to brain
348 - def login(self, username):
349 self.info('Logging in to worker') 350 d = pb.PBClientFactory.login(self, 351 credentials.UsernamePassword(username, ''), 352 self.medium) 353 yield d 354 try: 355 remoteReference = d.value() 356 self.info('Logged in to worker') 357 self.debug('perspective %r connected' % remoteReference) 358 self.medium.setRemoteReference(remoteReference) 359 except Exception, e: 360 from flumotion.common import debug; debug.print_stack() 361 print ('ERROR connecting job to worker [%d]: %s' 362 % (os.getpid(), log.getExceptionMessage(e)))
363 # raise error 364 login = defer_generator_method(login) 365 366 # the only way stopFactory can be called is if the WorkerBrain closes 367 # the pb server. Ideally though we would have gotten a notice before. 368 # This ensures we shut down the component/job in ALL cases where the worker 369 # goes away.
370 - def stopFactory(self):
371 self.debug('shutting down medium') 372 self.medium.shutdown() 373 self.debug('shut down medium')
374