Package flumotion :: Package worker :: Module base
[hide private]

Source Code for Module flumotion.worker.base

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects to handle worker clients 
 24  """ 
 25   
 26  import os 
 27  import sys 
 28  import signal 
 29   
 30  from twisted.cred import portal 
 31  from twisted.internet import defer, reactor 
 32  from twisted.spread import pb 
 33  from zope.interface import implements 
 34   
 35  from flumotion.common import errors, log 
 36  from flumotion.common import worker, startset 
 37  from flumotion.common.process import signalPid 
 38  from flumotion.twisted import checkers, fdserver 
 39  from flumotion.twisted import pb as fpb 
 40   
 41  __version__ = "$Rev: 6691 $" 
 42   
 43  JOB_SHUTDOWN_TIMEOUT = 5 
 44   
 45   
46 -def _getSocketPath():
47 # FIXME: there is mkstemp for sockets, so we have a small window 48 # here in which the socket could be created by something else 49 # I didn't succeed in preparing a socket file with that name either 50 51 # caller needs to delete name before using 52 import tempfile 53 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.worker.') 54 os.close(fd) 55 56 return name
57 58
59 -class JobInfo(object):
60 """ 61 I hold information about a job. 62 63 @cvar pid: PID of the child process 64 @type pid: int 65 @cvar avatarId: avatar identification string 66 @type avatarId: str 67 @cvar type: type of the component to create 68 @type type: str 69 @cvar moduleName: name of the module to create the component from 70 @type moduleName: str 71 @cvar methodName: the factory method to use to create the component 72 @type methodName: str 73 @cvar nice: the nice level to run the job as 74 @type nice: int 75 @cvar bundles: ordered list of (bundleName, bundlePath) needed to 76 create the component 77 @type bundles: list of (str, str) 78 """ 79 __slots__ = ('pid', 'avatarId', 'type', 'moduleName', 'methodName', 80 'nice', 'bundles')
81 - def __init__(self, pid, avatarId, type, moduleName, methodName, nice, 82 bundles):
83 self.pid = pid 84 self.avatarId = avatarId 85 self.type = type 86 self.moduleName = moduleName 87 self.methodName = methodName 88 self.nice = nice 89 self.bundles = bundles
90
91 -class JobProcessProtocol(worker.ProcessProtocol):
92 - def __init__(self, heaven, avatarId, startSet):
93 self._startSet = startSet 94 self._deferredStart = startSet.createRegistered(avatarId) 95 worker.ProcessProtocol.__init__(self, heaven, avatarId, 96 'component', 97 heaven.getWorkerName())
98
99 - def sendMessage(self, message):
100 heaven = self.loggable 101 heaven.brain.callRemote('componentAddMessage', self.avatarId, 102 message)
103
104 - def processEnded(self, status):
105 heaven = self.loggable 106 dstarts = self._startSet 107 signum = status.value.signal 108 109 # we need to trigger a failure on the create deferred 110 # if the job failed before logging in to the worker; 111 # otherwise the manager still thinks it's starting up when it's 112 # dead. If the job already attached to the worker however, 113 # the create deferred will already have callbacked. 114 deferred = dstarts.createRegistered(self.avatarId) 115 if deferred is self._deferredStart: 116 if signum: 117 reason = "received signal %d" % signum 118 else: 119 reason = "unknown reason" 120 text = ("Component '%s' has exited early (%s)." % 121 (self.avatarId, reason)) 122 dstarts.createFailed(self.avatarId, 123 errors.ComponentCreateError(text)) 124 125 if dstarts.shutdownRegistered(self.avatarId): 126 dstarts.shutdownSuccess(self.avatarId) 127 128 heaven.jobStopped(self.pid) 129 130 # chain up 131 worker.ProcessProtocol.processEnded(self, status)
132 133
134 -class BaseJobHeaven(pb.Root, log.Loggable):
135 """ 136 I am similar to but not quite the same as a manager-side Heaven. 137 I manage avatars inside the worker for job processes spawned by the worker. 138 139 @ivar avatars: dict of avatarId -> avatar 140 @type avatars: dict of str -> L{base.BaseJobAvatar} 141 @ivar brain: the worker brain 142 @type brain: L{worker.WorkerBrain} 143 """ 144 145 logCategory = "job-heaven" 146 implements(portal.IRealm) 147 148 avatarClass = None 149
150 - def __init__(self, brain):
151 """ 152 @param brain: a reference to the worker brain 153 @type brain: L{worker.WorkerBrain} 154 """ 155 self.avatars = {} # componentId -> avatar 156 self.brain = brain 157 self._socketPath = _getSocketPath() 158 self._port = None 159 self._onShutdown = None # If set, a deferred to fire when our last child 160 # process exits 161 162 self._jobInfos = {} # processid -> JobInfo 163 164 self._startSet = startset.StartSet(lambda x: x in self.avatars, 165 errors.ComponentAlreadyStartingError, 166 errors.ComponentAlreadyRunningError)
167
168 - def listen(self):
169 assert self._port is None 170 assert self.avatarClass is not None 171 # FIXME: we should hand a username and password to log in with to 172 # the job process instead of allowing anonymous 173 checker = checkers.FlexibleCredentialsChecker() 174 checker.allowPasswordless(True) 175 p = portal.Portal(self, [checker]) 176 f = pb.PBServerFactory(p) 177 try: 178 os.unlink(self._socketPath) 179 except OSError: 180 pass 181 182 # Rather than a listenUNIX(), we use listenWith so that we can specify 183 # our particular Port, which creates Transports that we know how to 184 # pass FDs over. 185 port = reactor.listenWith(fdserver.FDPort, self._socketPath, f) 186 self._port = port
187 188 ### portal.IRealm method
189 - def requestAvatar(self, avatarId, mind, *interfaces):
190 if pb.IPerspective in interfaces: 191 avatar = self.avatarClass(self, avatarId, mind) 192 assert avatarId not in self.avatars 193 self.avatars[avatarId] = avatar 194 return pb.IPerspective, avatar, avatar.logout 195 else: 196 raise NotImplementedError("no interface")
197
198 - def removeAvatar(self, avatarId):
199 if avatarId in self.avatars: 200 del self.avatars[avatarId] 201 else: 202 self.warning("some programmer is telling me about an avatar " 203 "I have no idea about: %s", avatarId)
204
205 - def getWorkerName(self):
206 """ 207 Gets the name of the worker that spawns the process. 208 209 @rtype: str 210 """ 211 return self.brain.workerName
212
213 - def addJobInfo(self, processId, jobInfo):
214 self._jobInfos[processId] = jobInfo
215
216 - def getJobInfo(self, processId):
217 return self._jobInfos[processId]
218
219 - def getJobInfos(self):
220 return self._jobInfos.values()
221
222 - def getJobPids(self):
223 return self._jobInfos.keys()
224
225 - def rotateChildLogFDs(self):
226 self.debug('telling kids about new log file descriptors') 227 for avatar in self.avatars.values(): 228 avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())
229
230 - def jobStopped(self, pid):
231 if pid in self._jobInfos: 232 self.debug('Removing job info for %d', pid) 233 del self._jobInfos[pid] 234 235 if not self._jobInfos and self._onShutdown: 236 self.debug("Last child exited") 237 self._onShutdown.callback(None) 238 else: 239 self.warning("some programmer is telling me about a pid " 240 "I have no idea about: %d", pid)
241
242 - def shutdown(self):
243 self.debug('Shutting down JobHeaven') 244 self.debug('Stopping all jobs') 245 for avatar in self.avatars.values(): 246 avatar.stop() 247 248 if self.avatars: 249 # If our jobs fail to shut down nicely within some period of 250 # time, shut them down less nicely 251 dc = reactor.callLater(JOB_SHUTDOWN_TIMEOUT, self.kill) 252 def cancelDelayedCall(res, dc): 253 # be nice to unit tests 254 if dc.active(): 255 dc.cancel() 256 return res
257 258 self._onShutdown = defer.Deferred() 259 self._onShutdown.addCallback(cancelDelayedCall, dc) 260 ret = self._onShutdown 261 else: 262 # everything's gone already, return success 263 ret = defer.succeed(None) 264 265 def stopListening(_): 266 # possible for it to be None, if we haven't been told to 267 # listen yet, as in some test cases 268 if self._port: 269 port = self._port 270 self._port = None 271 return port.stopListening()
272 ret.addCallback(stopListening) 273 return ret 274
275 - def kill(self, signum=signal.SIGKILL):
276 self.warning("Killing all children immediately") 277 for pid in self.getJobPids(): 278 self.killJobByPid(pid, signum)
279
280 - def killJobByPid(self, pid, signum):
281 if pid not in self._jobInfos: 282 raise errors.UnknownComponentError(pid) 283 284 jobInfo = self._jobInfos[pid] 285 self.debug("Sending signal %d to job %s at pid %d", signum, 286 jobInfo.avatarId, jobInfo.pid) 287 signalPid(jobInfo.pid, signum)
288
289 - def killJob(self, avatarId, signum):
290 for job in self._jobInfos.values(): 291 if job.avatarId == avatarId: 292 self.killJobByPid(job.pid, signum)
293 294
295 -class BaseJobAvatar(fpb.Avatar, log.Loggable):
296 """ 297 I am an avatar for the job living in the worker. 298 """ 299 logCategory = 'job-avatar' 300
301 - def __init__(self, heaven, avatarId, mind):
302 """ 303 @type heaven: L{flumotion.worker.base.BaseJobHeaven} 304 @type avatarId: str 305 """ 306 fpb.Avatar.__init__(self, avatarId) 307 self._heaven = heaven 308 self.setMind(mind) 309 self.pid = None
310
311 - def setMind(self, mind):
312 """ 313 @param mind: reference to the job's JobMedium on which we can call 314 @type mind: L{twisted.spread.pb.RemoteReference} 315 """ 316 fpb.Avatar.setMind(self, mind) 317 self.haveMind()
318
319 - def haveMind(self):
320 # implement me in subclasses 321 pass
322
323 - def logout(self):
324 self.log('logout called, %s disconnected', self.avatarId) 325 326 self._heaven.removeAvatar(self.avatarId)
327
328 - def stop(self):
329 """ 330 returns: a deferred marking completed stop. 331 """ 332 raise NotImplementedError
333
334 - def _sendFileDescriptor(self, fd, message):
335 try: 336 # FIXME: pay attention to the return value of 337 # sendFileDescriptor; is the same as the return value of 338 # sendmsg(2) 339 self.mind.broker.transport.sendFileDescriptor(fd, message) 340 return True 341 except RuntimeError, e: 342 # RuntimeError is what is thrown by the C code doing this 343 # when there are issues 344 self.warning("RuntimeError %s sending file descriptors", 345 log.getExceptionMessage(e)) 346 return False
347
348 - def logTo(self, stdout, stderr):
349 """ 350 Tell the job to log to the given file descriptors. 351 """ 352 self.debug('Giving job new stdout and stderr') 353 if self.mind: 354 self._sendFileDescriptor(stdout, "redirectStdout") 355 self._sendFileDescriptor(stdout, "redirectStderr")
356