Package flumotion :: Package worker :: Module job
[hide private]

Source Code for Module flumotion.worker.job

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects to handle worker clients 
 24  """ 
 25   
 26  import os 
 27  import signal 
 28  import sys 
 29   
 30  from twisted.internet import defer, reactor 
 31   
 32  from flumotion.common import errors, log 
 33  from flumotion.common import messages 
 34  from flumotion.common.i18n import N_, gettexter 
 35  from flumotion.configure import configure 
 36  from flumotion.worker import base 
 37   
 38  __version__ = "$Rev: 6695 $" 
 39  T_ = gettexter() 
 40   
 41   
42 -class ComponentJobAvatar(base.BaseJobAvatar):
43 - def haveMind(self):
44 def bootstrap(*args): 45 return self.mindCallRemote('bootstrap', *args)
46 47 def create(_, job): 48 self.debug("asking job to create component with avatarId %s," 49 " type %s", job.avatarId, job.type) 50 return self.mindCallRemote('create', job.avatarId, job.type, 51 job.moduleName, job.methodName, 52 job.nice, job.conf)
53 54 def success(_, avatarId): 55 self.debug('job started component with avatarId %s', 56 avatarId) 57 # FIXME: drills down too much? 58 self._heaven._startSet.createSuccess(avatarId) 59 60 def error(failure, job): 61 msg = log.getFailureMessage(failure) 62 if failure.check(errors.ComponentCreateError): 63 self.warning('could not create component %s of type %s:' 64 ' %s', job.avatarId, job.type, msg) 65 else: 66 self.warning('unhandled error creating component %s: %s', 67 job.avatarId, msg) 68 # FIXME: drills down too much? 69 self._heaven._startSet.createFailed(job.avatarId, failure) 70 71 def gotPid(pid): 72 self.pid = pid 73 info = self._heaven.getManagerConnectionInfo() 74 if info.use_ssl: 75 transport = 'ssl' 76 else: 77 transport = 'tcp' 78 job = self._heaven.getJobInfo(pid) 79 workerName = self._heaven.getWorkerName() 80 81 d = bootstrap(workerName, info.host, info.port, transport, 82 info.authenticator, job.bundles) 83 d.addCallback(create, job) 84 d.addCallback(success, job.avatarId) 85 d.addErrback(error, job) 86 return d 87 d = self.mindCallRemote("getPid") 88 d.addCallback(gotPid) 89 return d 90
91 - def stop(self):
92 """ 93 returns: a deferred marking completed stop. 94 """ 95 if not self.mind: 96 self.debug('already logged out') 97 return defer.succeed(None) 98 else: 99 self.debug('stopping') 100 return self.mindCallRemote('stop')
101
102 - def sendFeed(self, feedName, fd, eaterId):
103 """ 104 Tell the feeder to send the given feed to the given fd. 105 106 @returns: whether the fd was successfully handed off to the component. 107 """ 108 self.debug('Sending FD %d to component job to feed %s to fd', 109 fd, feedName) 110 111 # it is possible that the component has logged out, in which 112 # case we don't have a mind. Trying to check for this earlier 113 # only introduces a race, so we handle it here by triggering a 114 # disconnect on the fd. 115 if self.mind: 116 message = "sendFeed %s %s" % (feedName, eaterId) 117 return self._sendFileDescriptor(fd, message) 118 else: 119 self.debug('my mind is gone, trigger disconnect') 120 return False
121
122 - def receiveFeed(self, eaterAlias, fd, feedId):
123 """ 124 Tell the feeder to receive the given feed from the given fd. 125 126 @returns: whether the fd was successfully handed off to the component. 127 """ 128 self.debug('Sending FD %d to component job to eat %s from fd', 129 fd, eaterAlias) 130 131 # same note as in sendFeed 132 if self.mind: 133 message = "receiveFeed %s %s" % (eaterAlias, feedId) 134 return self._sendFileDescriptor(fd, message) 135 else: 136 self.debug('my mind is gone, trigger disconnect') 137 return False
138
139 - def perspective_cleanShutdown(self):
140 """ 141 This notification from the job process will be fired when it is 142 shutting down, so that although the process might still be 143 around, we know it's OK to accept new start requests for this 144 avatar ID. 145 """ 146 self.info("component %s shutting down cleanly", self.avatarId) 147 # FIXME: drills down too much? 148 self._heaven._startSet.shutdownStart(self.avatarId)
149 150
151 -class ComponentJobInfo(base.JobInfo):
152 __slots__ = ('conf',) 153
154 - def __init__(self, pid, avatarId, type, moduleName, methodName, 155 nice, bundles, conf):
159 160
161 -class ComponentJobHeaven(base.BaseJobHeaven):
162 avatarClass = ComponentJobAvatar 163
164 - def getManagerConnectionInfo(self):
165 """ 166 Gets the L{flumotion.common.connection.PBConnectionInfo} 167 describing how to connect to the manager. 168 169 @rtype: L{flumotion.common.connection.PBConnectionInfo} 170 """ 171 return self.brain.managerConnectionInfo
172
173 - def spawn(self, avatarId, type, moduleName, methodName, nice, 174 bundles, conf):
175 """ 176 Spawn a new job. 177 178 This will spawn a new flumotion-job process, running under the 179 requested nice level. When the job logs in, it will be told to 180 load bundles and run a function, which is expected to return a 181 component. 182 183 @param avatarId: avatarId the component should use to log in 184 @type avatarId: str 185 @param type: type of component to start 186 @type type: str 187 @param moduleName: name of the module to create the component from 188 @type moduleName: str 189 @param methodName: the factory method to use to create the component 190 @type methodName: str 191 @param nice: nice level 192 @type nice: int 193 @param bundles: ordered list of (bundleName, bundlePath) for this 194 component 195 @type bundles: list of (str, str) 196 @param conf: component configuration 197 @type conf: dict 198 """ 199 d = self._startSet.createStart(avatarId) 200 201 p = base.JobProcessProtocol(self, avatarId, self._startSet) 202 executable = os.path.join(configure.bindir, 'flumotion-job') 203 if not os.path.exists(executable): 204 self.error("Trying to spawn job process, but '%s' does not " 205 "exist", executable) 206 argv = [executable, avatarId, self._socketPath] 207 208 realexecutable = executable 209 210 # Run some jobs under valgrind, optionally. Would be nice to have the 211 # arguments to run it with configurable, but this'll do for now. 212 # FLU_VALGRIND_JOB takes a comma-seperated list of full component 213 # avatar IDs. 214 if os.environ.has_key('FLU_VALGRIND_JOB'): 215 jobnames = os.environ['FLU_VALGRIND_JOB'].split(',') 216 if avatarId in jobnames: 217 realexecutable = 'valgrind' 218 # We can't just valgrind flumotion-job, we have to valgrind 219 # python running flumotion-job, otherwise we'd need 220 # --trace-children (not quite sure why), which we don't want 221 argv = ['valgrind', '--leak-check=full', '--num-callers=24', 222 '--leak-resolution=high', '--show-reachable=yes', 223 'python'] + argv 224 225 childFDs = {0: 0, 1: 1, 2: 2} 226 env = {} 227 env.update(os.environ) 228 env['FLU_DEBUG'] = log.getDebug() 229 process = reactor.spawnProcess(p, realexecutable, env=env, args=argv, 230 childFDs=childFDs) 231 232 p.setPid(process.pid) 233 234 self.addJobInfo(process.pid, 235 ComponentJobInfo(process.pid, avatarId, type, 236 moduleName, methodName, nice, 237 bundles, conf)) 238 return d
239 240
241 -class CheckJobAvatar(base.BaseJobAvatar):
242 - def haveMind(self):
243 # FIXME: drills down too much? 244 def gotPid(pid): 245 self.pid = pid 246 job = self._heaven.getJobInfo(pid) 247 self._heaven._startSet.createSuccess(job.avatarId)
248 249 d = self.mindCallRemote("getPid") 250 d.addCallback(gotPid) 251 return d
252
253 - def stop(self):
254 """ 255 returns: a deferred marking completed stop. 256 """ 257 self._heaven._startSet.shutdownStart(self.avatarId) 258 self._heaven.killJob(self.avatarId, signal.SIGTERM)
259
260 - def perspective_cleanShutdown(self):
261 self.debug("job is stopping")
262 263
264 -class CheckJobHeaven(base.BaseJobHeaven):
265 avatarClass = CheckJobAvatar 266 267 _checkCount = 0 268 _timeout = 45 269
270 - def __init__(self, brain):
271 base.BaseJobHeaven.__init__(self, brain) 272 273 # job processes that are available to do work (i.e. not actively 274 # running checks) 275 self.jobPool = []
276
277 - def getCheckJobFromPool(self):
278 if self.jobPool: 279 job, expireDC = self.jobPool.pop(0) 280 expireDC.cancel() 281 self.debug('running check in already-running job %s', 282 job.avatarId) 283 return defer.succeed(job) 284 285 avatarId = 'check-%d' % (self._checkCount,) 286 self._checkCount += 1 287 288 self.debug('spawning new job %s to run a check', avatarId) 289 d = self._startSet.createStart(avatarId) 290 291 p = base.JobProcessProtocol(self, avatarId, self._startSet) 292 executable = os.path.join(configure.bindir, 'flumotion-job') 293 argv = [executable, avatarId, self._socketPath] 294 295 childFDs = {0: 0, 1: 1, 2: 2} 296 env = {} 297 env.update(os.environ) 298 env['FLU_DEBUG'] = log.getDebug() 299 process = reactor.spawnProcess(p, executable, env=env, args=argv, 300 childFDs=childFDs) 301 302 p.setPid(process.pid) 303 jobInfo = base.JobInfo(process.pid, avatarId, type, None, None, 304 None, []) 305 self._jobInfos[process.pid] = jobInfo 306 307 def haveMind(_): 308 # we have a mind, in theory; return the job avatar 309 return self.avatars[avatarId]
310 311 d.addCallback(haveMind) 312 return d
313
314 - def runCheck(self, bundles, moduleName, methodName, *args, **kwargs):
315 def haveJob(job): 316 def callProc(_): 317 return job.mindCallRemote('runFunction', moduleName, 318 methodName, *args, **kwargs)
319 320 def timeout(sig): 321 self.killJobByPid(job.pid, sig) 322 323 def haveResult(res): 324 if not termtimeout.active(): 325 self.info("Discarding error %s", res) 326 res = messages.Result() 327 res.add(messages.Error(T_(N_("Check timed out.")), 328 debug=("Timed out running %s."%methodName))) 329 else: 330 def expire(): 331 if (job, expireDC) in self.jobPool: 332 self.debug('stopping idle check job process %s', 333 job.avatarId) 334 self.jobPool.remove((job, expireDC)) 335 job.mindCallRemote('stop') 336 expireDC = reactor.callLater(self._timeout, expire) 337 self.jobPool.append((job, expireDC)) 338 339 if termtimeout.active(): 340 termtimeout.cancel() 341 if killtimeout.active(): 342 killtimeout.cancel() 343 return res 344 345 # add callbacks and errbacks that kill the job 346 347 termtimeout = reactor.callLater(self._timeout, timeout, 348 signal.SIGTERM) 349 killtimeout = reactor.callLater(self._timeout, timeout, 350 signal.SIGKILL) 351 352 d = job.mindCallRemote('bootstrap', self.getWorkerName(), 353 None, None, None, None, bundles) 354 d.addCallback(callProc) 355 d.addCallbacks(haveResult, haveResult) 356 return d 357 358 d = self.getCheckJobFromPool() 359 d.addCallback(haveJob) 360 361 return d 362