Package flumotion :: Package job :: Module job
[hide private]

Source Code for Module flumotion.job.job

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  the job-side half of the worker-job connection 
 24  """ 
 25   
 26  import os 
 27  import resource 
 28  import sys 
 29   
 30  # I've read somewhere that importing the traceback module messes up the 
 31  # exception state, so it's better to import it globally instead of in the 
 32  # exception handler 
 33  # import traceback 
 34   
 35  from twisted.cred import credentials 
 36  from twisted.internet import reactor, defer 
 37  from twisted.python import failure 
 38  from twisted.spread import pb 
 39  from zope.interface import implements 
 40   
 41  from flumotion.common import errors, interfaces, log, keycards 
 42  from flumotion.common import medium, package 
 43  from flumotion.common.reflectcall import createComponent, reflectCallCatching 
 44  from flumotion.component import component 
 45   
 46  from flumotion.twisted import fdserver 
 47  from flumotion.twisted import pb as fpb 
 48  from flumotion.twisted import defer as fdefer 
 49   
 50  __version__ = "$Rev: 6125 $" 
 51   
 52   
53 -class JobMedium(medium.BaseMedium):
54 """ 55 I am a medium between the job and the worker's job avatar. 56 I live in the job process. 57 58 @cvar component: the component this is a medium for; created as part of 59 L{remote_create} 60 @type component: L{flumotion.component.component.BaseComponent} 61 """ 62 logCategory = 'jobmedium' 63 remoteLogName = 'jobavatar' 64 65 implements(interfaces.IJobMedium) 66
67 - def __init__(self):
68 self.avatarId = None 69 self.logName = None 70 self.component = None 71 72 self._workerName = None 73 self._managerHost = None 74 self._managerPort = None 75 self._managerTransport = None 76 self._managerKeycard = None 77 self._componentClientFactory = None # from component to manager 78 79 self._hasStoppedReactor = False
80 81 ### pb.Referenceable remote methods called on by the WorkerBrain
82 - def remote_bootstrap(self, workerName, host, port, transport, authenticator, 83 packagePaths):
84 """ 85 I receive the information on how to connect to the manager. I also set 86 up package paths to be able to run the component. 87 88 Called by the worker's JobAvatar. 89 90 @param workerName: the name of the worker running this job 91 @type workerName: str 92 @param host: the host that is running the manager 93 @type host: str 94 @param port: port on which the manager is listening 95 @type port: int 96 @param transport: 'tcp' or 'ssl' 97 @type transport: str 98 @param authenticator: remote reference to the worker-side authenticator 99 @type authenticator: L{twisted.spread.pb.RemoteReference} to a 100 L{flumotion.twisted.pb.Authenticator} 101 @param packagePaths: ordered list of 102 (package name, package path) tuples 103 @type packagePaths: list of (str, str) 104 """ 105 self._workerName = workerName 106 self._managerHost = host 107 self._managerPort = port 108 self._managerTransport = transport 109 if authenticator: 110 self._authenticator = fpb.RemoteAuthenticator(authenticator) 111 else: 112 self.debug('no authenticator, will not be able to log ' 113 'into manager') 114 self._authenticator = None 115 116 packager = package.getPackager() 117 for name, path in packagePaths: 118 self.debug('registering package path for %s' % name) 119 self.log('... from path %s' % path) 120 packager.registerPackagePath(path, name)
121
122 - def remote_getPid(self):
123 return os.getpid()
124
125 - def remote_runFunction(self, moduleName, methodName, *args, **kwargs):
126 """ 127 I am called on by the worker's JobAvatar to run a function, 128 normally on behalf of the flumotion wizard. 129 130 @param moduleName: name of the module containing the function 131 @type moduleName: str 132 @param methodName: the method to run 133 @type methodName: str 134 @param args: args to pass to the method 135 @type args: tuple 136 @param kwargs: kwargs to pass to the method 137 @type kwargs: dict 138 139 @returns: the result of invoking the method 140 """ 141 self.info('Running %s.%s(*%r, **%r)' % (moduleName, methodName, 142 args, kwargs)) 143 # FIXME: do we want to do this? 144 self._enableCoreDumps() 145 146 return reflectCallCatching(errors.RemoteRunError, moduleName, 147 methodName, *args, **kwargs)
148
149 - def remote_create(self, avatarId, type, moduleName, methodName, 150 nice, conf):
151 """ 152 I am called on by the worker's JobAvatar to create a component. 153 154 @param avatarId: avatarId for component to log in to manager 155 @type avatarId: str 156 @param type: type of component to start 157 @type type: str 158 @param moduleName: name of the module to create the component from 159 @type moduleName: str 160 @param methodName: the factory method to use to create the component 161 @type methodName: str 162 @param nice: the nice level 163 @type nice: int 164 @param conf: the component configuration 165 @type conf: dict 166 """ 167 self.avatarId = avatarId 168 self.logName = avatarId 169 170 self.component = self._createComponent(avatarId, type, moduleName, 171 methodName, nice, conf) 172 self.component.setShutdownHook(self._componentStopped)
173
174 - def _componentStopped(self):
175 # stop reactor from a callLater so remote methods finish nicely 176 reactor.callLater(0, self.shutdown)
177
178 - def remote_stop(self):
179 if self.component: 180 self.debug('stopping component and shutting down') 181 self.component.stop() 182 else: 183 reactor.callLater(0, self.shutdown)
184
185 - def shutdownHandler(self):
186 dlist = [] 187 if self.hasRemoteReference(): 188 # tell the worker we are shutting down 189 dlist.append(self.callRemote("cleanShutdown")) 190 if self.component: 191 medium = self.component.medium 192 if medium.hasRemoteReference(): 193 dlist.append(medium.callRemote("cleanShutdown")) 194 195 # We mustn't fire the deferred returned from here except from a 196 # callLater. 197 dl = defer.DeferredList(dlist, fireOnOneErrback=False) 198 return fdefer.defer_call_later(dl)
199 200 ### our methods
201 - def shutdown(self):
202 """ 203 Shut down the job process completely, cleaning up the component 204 so the reactor can be left from. 205 """ 206 if self._hasStoppedReactor: 207 self.debug("Not stopping reactor again, already shutting down") 208 else: 209 self._hasStoppedReactor = True 210 self.info("Stopping reactor in job process") 211 reactor.stop()
212
213 - def _setNice(self, nice):
214 if not nice: 215 return 216 217 try: 218 os.nice(nice) 219 except OSError, e: 220 self.warning('Failed to set nice level: %s' % str(e)) 221 else: 222 self.debug('Nice level set to %d' % nice)
223
224 - def _enableCoreDumps(self):
225 soft, hard = resource.getrlimit(resource.RLIMIT_CORE) 226 if hard != resource.RLIM_INFINITY: 227 self.warning('Could not set unlimited core dump sizes, ' 228 'setting to %d instead' % hard) 229 else: 230 self.debug('Enabling core dumps of unlimited size') 231 232 resource.setrlimit(resource.RLIMIT_CORE, (hard, hard))
233
234 - def _createComponent(self, avatarId, type, moduleName, methodName, 235 nice, conf):
236 """ 237 Create a component of the given type. 238 Log in to the manager with the given avatarId. 239 240 @param avatarId: avatarId component will use to log in to manager 241 @type avatarId: str 242 @param type: type of component to start 243 @type type: str 244 @param moduleName: name of the module that contains the entry point 245 @type moduleName: str 246 @param methodName: name of the factory method to create the component 247 @type methodName: str 248 @param nice: the nice level to run with 249 @type nice: int 250 @param conf: the component configuration 251 @type conf: dict 252 """ 253 self.info('Creating component "%s" of type "%s"', avatarId, type) 254 255 self._setNice(nice) 256 self._enableCoreDumps() 257 258 try: 259 comp = createComponent(moduleName, methodName, conf) 260 except Exception, e: 261 msg = "Exception %s during createComponent: %s" % ( 262 e.__class__.__name__, " ".join(e.args)) 263 # traceback.print_exc() 264 # a ComponentCreateError is already formatted 265 if isinstance(e, errors.ComponentCreateError): 266 msg = e.args[0] 267 self.warning( 268 "raising ComponentCreateError(%s) and stopping job" % msg) 269 # This is a Nasty Hack. We raise ComponentCreateError, which can be 270 # caught on the other side and marshalled as a reasonably 271 # comprehensible error message. However, if we shutdown immediately, 272 # the PB connection won't be available, so the worker will just get 273 # an error about that! So, instead, we shut down in a tenth of a 274 # second, usually allowing the worker to get scheduled and read the 275 # exception over PB. Ick! 276 reactor.callLater(0.1, self.shutdown) 277 raise errors.ComponentCreateError(msg) 278 279 comp.setWorkerName(self._workerName) 280 281 # make component log in to manager 282 self.debug('creating ComponentClientFactory') 283 managerClientFactory = component.ComponentClientFactory(comp) 284 self._componentClientFactory = managerClientFactory 285 self.debug('created ComponentClientFactory %r' % managerClientFactory) 286 self._authenticator.avatarId = avatarId 287 managerClientFactory.startLogin(self._authenticator) 288 289 host = self._managerHost 290 port = self._managerPort 291 transport = self._managerTransport 292 self.debug('logging in with authenticator %r' % self._authenticator) 293 if transport == "ssl": 294 from flumotion.common import common 295 common.assertSSLAvailable() 296 from twisted.internet import ssl 297 self.info('Connecting to manager %s:%d with SSL' % (host, port)) 298 reactor.connectSSL(host, port, managerClientFactory, 299 ssl.ClientContextFactory()) 300 elif transport == "tcp": 301 self.info('Connecting to manager %s:%d with TCP' % (host, port)) 302 reactor.connectTCP(host, port, managerClientFactory) 303 else: 304 self.warning('Unknown transport protocol %s' % self._managerTransport) 305 306 return comp
307
308 -class JobClientBroker(pb.Broker, log.Loggable):
309 """ 310 A pb.Broker subclass that handles FDs being passed (with associated data) 311 over the same connection as the normal PB data stream. 312 When an FD is seen, the FD should be added to a given eater or feeder 313 element. 314 """
315 - def __init__(self, connectionClass, **kwargs):
316 """ 317 @param connectionClass: a subclass of L{twisted.internet.tcp.Connection} 318 """ 319 pb.Broker.__init__(self, **kwargs) 320 321 self._connectionClass = connectionClass
322
323 - def fileDescriptorsReceived(self, fds, message):
324 # file descriptors get delivered to the component 325 self.debug('received fds %r, message %r' % (fds, message)) 326 if message.startswith('sendFeed '): 327 def parseargs(_, feedName, eaterId=None): 328 return feedName, eaterId
329 feedName, eaterId = parseargs(*message.split(' ')) 330 self.factory.medium.component.feedToFD(feedName, fds[0], 331 os.close, eaterId) 332 elif message.startswith('receiveFeed '): 333 def parseargs2(_, eaterAlias, feedId=None): 334 return eaterAlias, feedId
335 eaterAlias, feedId = parseargs2(*message.split(' ')) 336 self.factory.medium.component.eatFromFD(eaterAlias, feedId, 337 fds[0]) 338 elif message == 'redirectStdout': 339 self.debug('told to rotate stdout to fd %d', fds[0]) 340 os.dup2(fds[0], sys.stdout.fileno()) 341 os.close(fds[0]) 342 self.debug('rotated stdout') 343 elif message == 'redirectStderr': 344 self.debug('told to rotate stderr to fd %d', fds[0]) 345 os.dup2(fds[0], sys.stderr.fileno()) 346 os.close(fds[0]) 347 self.info('rotated stderr') 348 else: 349 self.warning('Unknown message received: %r' % message) 350
351 -class JobClientFactory(pb.PBClientFactory, log.Loggable):
352 """ 353 I am a client factory that logs in to the WorkerBrain. 354 I live in the flumotion-job process spawned by the worker. 355 356 @cvar medium: the medium for the JobHeaven to access us through 357 @type medium: L{JobMedium} 358 """ 359 logCategory = "job" 360 perspectiveInterface = interfaces.IJobMedium 361
362 - def __init__(self, id):
363 """ 364 @param id: the avatar id used for logging into the workerbrain 365 @type id: str 366 """ 367 pb.PBClientFactory.__init__(self) 368 369 self.medium = JobMedium() 370 self.logName = id 371 self.login(id) 372 373 # use an FD-passing broker instead 374 self.protocol = JobClientBroker
375 376 ### pb.PBClientFactory methods
377 - def buildProtocol(self, addr):
378 p = self.protocol(fdserver.FDServer) 379 p.factory = self 380 return p
381 382 # FIXME: might be nice if jobs got a password to use to log in to brain
383 - def login(self, username):
384 def haveReference(remoteReference): 385 self.info('Logged in to worker') 386 self.debug('perspective %r connected', remoteReference) 387 self.medium.setRemoteReference(remoteReference)
388 389 self.info('Logging in to worker') 390 d = pb.PBClientFactory.login(self, 391 credentials.UsernamePassword(username, ''), 392 self.medium) 393 d.addCallback(haveReference) 394 return d
395 396 # the only way stopFactory can be called is if the WorkerBrain closes 397 # the pb server. Ideally though we would have gotten a notice before. 398 # This ensures we shut down the component/job in ALL cases where the worker 399 # goes away.
400 - def stopFactory(self):
401 self.debug('shutting down medium') 402 self.medium.shutdown() 403 self.debug('shut down medium')
404