1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 worker-side objects to handle worker clients
24 """
25
26 import os
27 import signal
28 import sys
29
30 from twisted.internet import defer, reactor
31
32 from flumotion.common import errors, log
33 from flumotion.common import messages
34 from flumotion.common.i18n import N_, gettexter
35 from flumotion.configure import configure
36 from flumotion.worker import base
37
38 __version__ = "$Rev: 6695 $"
39 T_ = gettexter()
40
41
44 def bootstrap(*args):
45 return self.mindCallRemote('bootstrap', *args)
46
47 def create(_, job):
48 self.debug("asking job to create component with avatarId %s,"
49 " type %s", job.avatarId, job.type)
50 return self.mindCallRemote('create', job.avatarId, job.type,
51 job.moduleName, job.methodName,
52 job.nice, job.conf)
53
54 def success(_, avatarId):
55 self.debug('job started component with avatarId %s',
56 avatarId)
57
58 self._heaven._startSet.createSuccess(avatarId)
59
60 def error(failure, job):
61 msg = log.getFailureMessage(failure)
62 if failure.check(errors.ComponentCreateError):
63 self.warning('could not create component %s of type %s:'
64 ' %s', job.avatarId, job.type, msg)
65 else:
66 self.warning('unhandled error creating component %s: %s',
67 job.avatarId, msg)
68
69 self._heaven._startSet.createFailed(job.avatarId, failure)
70
71 def gotPid(pid):
72 self.pid = pid
73 info = self._heaven.getManagerConnectionInfo()
74 if info.use_ssl:
75 transport = 'ssl'
76 else:
77 transport = 'tcp'
78 job = self._heaven.getJobInfo(pid)
79 workerName = self._heaven.getWorkerName()
80
81 d = bootstrap(workerName, info.host, info.port, transport,
82 info.authenticator, job.bundles)
83 d.addCallback(create, job)
84 d.addCallback(success, job.avatarId)
85 d.addErrback(error, job)
86 return d
87 d = self.mindCallRemote("getPid")
88 d.addCallback(gotPid)
89 return d
90
92 """
93 returns: a deferred marking completed stop.
94 """
95 if not self.mind:
96 self.debug('already logged out')
97 return defer.succeed(None)
98 else:
99 self.debug('stopping')
100 return self.mindCallRemote('stop')
101
102 - def sendFeed(self, feedName, fd, eaterId):
103 """
104 Tell the feeder to send the given feed to the given fd.
105
106 @returns: whether the fd was successfully handed off to the component.
107 """
108 self.debug('Sending FD %d to component job to feed %s to fd',
109 fd, feedName)
110
111
112
113
114
115 if self.mind:
116 message = "sendFeed %s %s" % (feedName, eaterId)
117 return self._sendFileDescriptor(fd, message)
118 else:
119 self.debug('my mind is gone, trigger disconnect')
120 return False
121
123 """
124 Tell the feeder to receive the given feed from the given fd.
125
126 @returns: whether the fd was successfully handed off to the component.
127 """
128 self.debug('Sending FD %d to component job to eat %s from fd',
129 fd, eaterAlias)
130
131
132 if self.mind:
133 message = "receiveFeed %s %s" % (eaterAlias, feedId)
134 return self._sendFileDescriptor(fd, message)
135 else:
136 self.debug('my mind is gone, trigger disconnect')
137 return False
138
140 """
141 This notification from the job process will be fired when it is
142 shutting down, so that although the process might still be
143 around, we know it's OK to accept new start requests for this
144 avatar ID.
145 """
146 self.info("component %s shutting down cleanly", self.avatarId)
147
148 self._heaven._startSet.shutdownStart(self.avatarId)
149
150
152 __slots__ = ('conf',)
153
154 - def __init__(self, pid, avatarId, type, moduleName, methodName,
155 nice, bundles, conf):
159
160
162 avatarClass = ComponentJobAvatar
163
165 """
166 Gets the L{flumotion.common.connection.PBConnectionInfo}
167 describing how to connect to the manager.
168
169 @rtype: L{flumotion.common.connection.PBConnectionInfo}
170 """
171 return self.brain.managerConnectionInfo
172
173 - def spawn(self, avatarId, type, moduleName, methodName, nice,
174 bundles, conf):
175 """
176 Spawn a new job.
177
178 This will spawn a new flumotion-job process, running under the
179 requested nice level. When the job logs in, it will be told to
180 load bundles and run a function, which is expected to return a
181 component.
182
183 @param avatarId: avatarId the component should use to log in
184 @type avatarId: str
185 @param type: type of component to start
186 @type type: str
187 @param moduleName: name of the module to create the component from
188 @type moduleName: str
189 @param methodName: the factory method to use to create the component
190 @type methodName: str
191 @param nice: nice level
192 @type nice: int
193 @param bundles: ordered list of (bundleName, bundlePath) for this
194 component
195 @type bundles: list of (str, str)
196 @param conf: component configuration
197 @type conf: dict
198 """
199 d = self._startSet.createStart(avatarId)
200
201 p = base.JobProcessProtocol(self, avatarId, self._startSet)
202 executable = os.path.join(configure.bindir, 'flumotion-job')
203 if not os.path.exists(executable):
204 self.error("Trying to spawn job process, but '%s' does not "
205 "exist", executable)
206 argv = [executable, avatarId, self._socketPath]
207
208 realexecutable = executable
209
210
211
212
213
214 if os.environ.has_key('FLU_VALGRIND_JOB'):
215 jobnames = os.environ['FLU_VALGRIND_JOB'].split(',')
216 if avatarId in jobnames:
217 realexecutable = 'valgrind'
218
219
220
221 argv = ['valgrind', '--leak-check=full', '--num-callers=24',
222 '--leak-resolution=high', '--show-reachable=yes',
223 'python'] + argv
224
225 childFDs = {0: 0, 1: 1, 2: 2}
226 env = {}
227 env.update(os.environ)
228 env['FLU_DEBUG'] = log.getDebug()
229 process = reactor.spawnProcess(p, realexecutable, env=env, args=argv,
230 childFDs=childFDs)
231
232 p.setPid(process.pid)
233
234 self.addJobInfo(process.pid,
235 ComponentJobInfo(process.pid, avatarId, type,
236 moduleName, methodName, nice,
237 bundles, conf))
238 return d
239
240
248
249 d = self.mindCallRemote("getPid")
250 d.addCallback(gotPid)
251 return d
252
259
261 self.debug("job is stopping")
262
263
265 avatarClass = CheckJobAvatar
266
267 _checkCount = 0
268 _timeout = 45
269
276
278 if self.jobPool:
279 job, expireDC = self.jobPool.pop(0)
280 expireDC.cancel()
281 self.debug('running check in already-running job %s',
282 job.avatarId)
283 return defer.succeed(job)
284
285 avatarId = 'check-%d' % (self._checkCount,)
286 self._checkCount += 1
287
288 self.debug('spawning new job %s to run a check', avatarId)
289 d = self._startSet.createStart(avatarId)
290
291 p = base.JobProcessProtocol(self, avatarId, self._startSet)
292 executable = os.path.join(configure.bindir, 'flumotion-job')
293 argv = [executable, avatarId, self._socketPath]
294
295 childFDs = {0: 0, 1: 1, 2: 2}
296 env = {}
297 env.update(os.environ)
298 env['FLU_DEBUG'] = log.getDebug()
299 process = reactor.spawnProcess(p, executable, env=env, args=argv,
300 childFDs=childFDs)
301
302 p.setPid(process.pid)
303 jobInfo = base.JobInfo(process.pid, avatarId, type, None, None,
304 None, [])
305 self._jobInfos[process.pid] = jobInfo
306
307 def haveMind(_):
308
309 return self.avatars[avatarId]
310
311 d.addCallback(haveMind)
312 return d
313
314 - def runCheck(self, bundles, moduleName, methodName, *args, **kwargs):
319
320 def timeout(sig):
321 self.killJobByPid(job.pid, sig)
322
323 def haveResult(res):
324 if not termtimeout.active():
325 self.info("Discarding error %s", res)
326 res = messages.Result()
327 res.add(messages.Error(T_(N_("Check timed out.")),
328 debug=("Timed out running %s."%methodName)))
329 else:
330 def expire():
331 if (job, expireDC) in self.jobPool:
332 self.debug('stopping idle check job process %s',
333 job.avatarId)
334 self.jobPool.remove((job, expireDC))
335 job.mindCallRemote('stop')
336 expireDC = reactor.callLater(self._timeout, expire)
337 self.jobPool.append((job, expireDC))
338
339 if termtimeout.active():
340 termtimeout.cancel()
341 if killtimeout.active():
342 killtimeout.cancel()
343 return res
344
345
346
347 termtimeout = reactor.callLater(self._timeout, timeout,
348 signal.SIGTERM)
349 killtimeout = reactor.callLater(self._timeout, timeout,
350 signal.SIGKILL)
351
352 d = job.mindCallRemote('bootstrap', self.getWorkerName(),
353 None, None, None, None, bundles)
354 d.addCallback(callProc)
355 d.addCallbacks(haveResult, haveResult)
356 return d
357
358 d = self.getCheckJobFromPool()
359 d.addCallback(haveJob)
360
361 return d
362