| Trees | Indices | Help |
|---|
|
|
1 # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 # 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 6 # All rights reserved. 7 8 # This file may be distributed and/or modified under the terms of 9 # the GNU General Public License version 2 as published by 10 # the Free Software Foundation. 11 # This file is distributed without any warranty; without even the implied 12 # warranty of merchantability or fitness for a particular purpose. 13 # See "LICENSE.GPL" in the source distribution for more information. 14 15 # Licensees having purchased or holding a valid Flumotion Advanced 16 # Streaming Server license may use this file in accordance with the 17 # Flumotion Advanced Streaming Server Commercial License Agreement. 18 # See "LICENSE.Flumotion" in the source distribution for more information. 19 20 # Headers in this file shall remain intact. 21 22 """ 23 worker-side objects to handle worker clients 24 """ 25 26 import os 27 import sys 28 import signal 29 30 from twisted.cred import portal 31 from twisted.internet import defer, reactor 32 from twisted.spread import pb 33 from zope.interface import implements 34 35 from flumotion.common import errors, log 36 from flumotion.common import worker, startset 37 from flumotion.common.process import signalPid 38 from flumotion.twisted import checkers, fdserver 39 from flumotion.twisted import pb as fpb 40 41 __version__ = "$Rev: 6691 $" 42 43 JOB_SHUTDOWN_TIMEOUT = 5 44 4547 # FIXME: there is mkstemp for sockets, so we have a small window 48 # here in which the socket could be created by something else 49 # I didn't succeed in preparing a socket file with that name either 50 51 # caller needs to delete name before using 52 import tempfile 53 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.worker.') 54 os.close(fd) 55 56 return name57 5860 """ 61 I hold information about a job. 62 63 @cvar pid: PID of the child process 64 @type pid: int 65 @cvar avatarId: avatar identification string 66 @type avatarId: str 67 @cvar type: type of the component to create 68 @type type: str 69 @cvar moduleName: name of the module to create the component from 70 @type moduleName: str 71 @cvar methodName: the factory method to use to create the component 72 @type methodName: str 73 @cvar nice: the nice level to run the job as 74 @type nice: int 75 @cvar bundles: ordered list of (bundleName, bundlePath) needed to 76 create the component 77 @type bundles: list of (str, str) 78 """ 79 __slots__ = ('pid', 'avatarId', 'type', 'moduleName', 'methodName', 80 'nice', 'bundles')9083 self.pid = pid 84 self.avatarId = avatarId 85 self.type = type 86 self.moduleName = moduleName 87 self.methodName = methodName 88 self.nice = nice 89 self.bundles = bundles132 13393 self._startSet = startSet 94 self._deferredStart = startSet.createRegistered(avatarId) 95 worker.ProcessProtocol.__init__(self, heaven, avatarId, 96 'component', 97 heaven.getWorkerName())98100 heaven = self.loggable 101 heaven.brain.callRemote('componentAddMessage', self.avatarId, 102 message)103105 heaven = self.loggable 106 dstarts = self._startSet 107 signum = status.value.signal 108 109 # we need to trigger a failure on the create deferred 110 # if the job failed before logging in to the worker; 111 # otherwise the manager still thinks it's starting up when it's 112 # dead. If the job already attached to the worker however, 113 # the create deferred will already have callbacked. 114 deferred = dstarts.createRegistered(self.avatarId) 115 if deferred is self._deferredStart: 116 if signum: 117 reason = "received signal %d" % signum 118 else: 119 reason = "unknown reason" 120 text = ("Component '%s' has exited early (%s)." % 121 (self.avatarId, reason)) 122 dstarts.createFailed(self.avatarId, 123 errors.ComponentCreateError(text)) 124 125 if dstarts.shutdownRegistered(self.avatarId): 126 dstarts.shutdownSuccess(self.avatarId) 127 128 heaven.jobStopped(self.pid) 129 130 # chain up 131 worker.ProcessProtocol.processEnded(self, status)135 """ 136 I am similar to but not quite the same as a manager-side Heaven. 137 I manage avatars inside the worker for job processes spawned by the worker. 138 139 @ivar avatars: dict of avatarId -> avatar 140 @type avatars: dict of str -> L{base.BaseJobAvatar} 141 @ivar brain: the worker brain 142 @type brain: L{worker.WorkerBrain} 143 """ 144 145 logCategory = "job-heaven" 146 implements(portal.IRealm) 147 148 avatarClass = None 149272 ret.addCallback(stopListening) 273 return ret 274151 """ 152 @param brain: a reference to the worker brain 153 @type brain: L{worker.WorkerBrain} 154 """ 155 self.avatars = {} # componentId -> avatar 156 self.brain = brain 157 self._socketPath = _getSocketPath() 158 self._port = None 159 self._onShutdown = None # If set, a deferred to fire when our last child 160 # process exits 161 162 self._jobInfos = {} # processid -> JobInfo 163 164 self._startSet = startset.StartSet(lambda x: x in self.avatars, 165 errors.ComponentAlreadyStartingError, 166 errors.ComponentAlreadyRunningError)167169 assert self._port is None 170 assert self.avatarClass is not None 171 # FIXME: we should hand a username and password to log in with to 172 # the job process instead of allowing anonymous 173 checker = checkers.FlexibleCredentialsChecker() 174 checker.allowPasswordless(True) 175 p = portal.Portal(self, [checker]) 176 f = pb.PBServerFactory(p) 177 try: 178 os.unlink(self._socketPath) 179 except OSError: 180 pass 181 182 # Rather than a listenUNIX(), we use listenWith so that we can specify 183 # our particular Port, which creates Transports that we know how to 184 # pass FDs over. 185 port = reactor.listenWith(fdserver.FDPort, self._socketPath, f) 186 self._port = port187 188 ### portal.IRealm method190 if pb.IPerspective in interfaces: 191 avatar = self.avatarClass(self, avatarId, mind) 192 assert avatarId not in self.avatars 193 self.avatars[avatarId] = avatar 194 return pb.IPerspective, avatar, avatar.logout 195 else: 196 raise NotImplementedError("no interface")197199 if avatarId in self.avatars: 200 del self.avatars[avatarId] 201 else: 202 self.warning("some programmer is telling me about an avatar " 203 "I have no idea about: %s", avatarId)204206 """ 207 Gets the name of the worker that spawns the process. 208 209 @rtype: str 210 """ 211 return self.brain.workerName212 215 218 221223 return self._jobInfos.keys()224226 self.debug('telling kids about new log file descriptors') 227 for avatar in self.avatars.values(): 228 avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())229231 if pid in self._jobInfos: 232 self.debug('Removing job info for %d', pid) 233 del self._jobInfos[pid] 234 235 if not self._jobInfos and self._onShutdown: 236 self.debug("Last child exited") 237 self._onShutdown.callback(None) 238 else: 239 self.warning("some programmer is telling me about a pid " 240 "I have no idea about: %d", pid)241243 self.debug('Shutting down JobHeaven') 244 self.debug('Stopping all jobs') 245 for avatar in self.avatars.values(): 246 avatar.stop() 247 248 if self.avatars: 249 # If our jobs fail to shut down nicely within some period of 250 # time, shut them down less nicely 251 dc = reactor.callLater(JOB_SHUTDOWN_TIMEOUT, self.kill) 252 def cancelDelayedCall(res, dc): 253 # be nice to unit tests 254 if dc.active(): 255 dc.cancel() 256 return res257 258 self._onShutdown = defer.Deferred() 259 self._onShutdown.addCallback(cancelDelayedCall, dc) 260 ret = self._onShutdown 261 else: 262 # everything's gone already, return success 263 ret = defer.succeed(None) 264 265 def stopListening(_): 266 # possible for it to be None, if we haven't been told to 267 # listen yet, as in some test cases 268 if self._port: 269 port = self._port 270 self._port = None 271 return port.stopListening()276 self.warning("Killing all children immediately") 277 for pid in self.getJobPids(): 278 self.killJobByPid(pid, signum)279281 if pid not in self._jobInfos: 282 raise errors.UnknownComponentError(pid) 283 284 jobInfo = self._jobInfos[pid] 285 self.debug("Sending signal %d to job %s at pid %d", signum, 286 jobInfo.avatarId, jobInfo.pid) 287 signalPid(jobInfo.pid, signum)288290 for job in self._jobInfos.values(): 291 if job.avatarId == avatarId: 292 self.killJobByPid(job.pid, signum)293 294296 """ 297 I am an avatar for the job living in the worker. 298 """ 299 logCategory = 'job-avatar' 300356302 """ 303 @type heaven: L{flumotion.worker.base.BaseJobHeaven} 304 @type avatarId: str 305 """ 306 fpb.Avatar.__init__(self, avatarId) 307 self._heaven = heaven 308 self.setMind(mind) 309 self.pid = None310312 """ 313 @param mind: reference to the job's JobMedium on which we can call 314 @type mind: L{twisted.spread.pb.RemoteReference} 315 """ 316 fpb.Avatar.setMind(self, mind) 317 self.haveMind()318 322324 self.log('logout called, %s disconnected', self.avatarId) 325 326 self._heaven.removeAvatar(self.avatarId)327 333335 try: 336 # FIXME: pay attention to the return value of 337 # sendFileDescriptor; is the same as the return value of 338 # sendmsg(2) 339 self.mind.broker.transport.sendFileDescriptor(fd, message) 340 return True 341 except RuntimeError, e: 342 # RuntimeError is what is thrown by the C code doing this 343 # when there are issues 344 self.warning("RuntimeError %s sending file descriptors", 345 log.getExceptionMessage(e)) 346 return False347349 """ 350 Tell the job to log to the given file descriptors. 351 """ 352 self.debug('Giving job new stdout and stderr') 353 if self.mind: 354 self._sendFileDescriptor(stdout, "redirectStdout") 355 self._sendFileDescriptor(stdout, "redirectStderr")
| Trees | Indices | Help |
|---|
| Generated by Epydoc 3.0.1 on Sat Jul 26 09:43:05 2008 | http://epydoc.sourceforge.net |