1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 the job-side half of the worker-job connection
24 """
25
26 import os
27 import resource
28 import sys
29
30
31
32
33
34
35 from twisted.cred import credentials
36 from twisted.internet import reactor, defer
37 from twisted.python import failure
38 from twisted.spread import pb
39 from zope.interface import implements
40
41 from flumotion.common import errors, interfaces, log, keycards
42 from flumotion.common import medium, package
43 from flumotion.common.reflectcall import createComponent, reflectCallCatching
44 from flumotion.component import component
45
46 from flumotion.twisted import fdserver
47 from flumotion.twisted import pb as fpb
48 from flumotion.twisted import defer as fdefer
49
50 __version__ = "$Rev: 6125 $"
51
52
54 """
55 I am a medium between the job and the worker's job avatar.
56 I live in the job process.
57
58 @cvar component: the component this is a medium for; created as part of
59 L{remote_create}
60 @type component: L{flumotion.component.component.BaseComponent}
61 """
62 logCategory = 'jobmedium'
63 remoteLogName = 'jobavatar'
64
65 implements(interfaces.IJobMedium)
66
68 self.avatarId = None
69 self.logName = None
70 self.component = None
71
72 self._workerName = None
73 self._managerHost = None
74 self._managerPort = None
75 self._managerTransport = None
76 self._managerKeycard = None
77 self._componentClientFactory = None
78
79 self._hasStoppedReactor = False
80
81
82 - def remote_bootstrap(self, workerName, host, port, transport, authenticator,
83 packagePaths):
84 """
85 I receive the information on how to connect to the manager. I also set
86 up package paths to be able to run the component.
87
88 Called by the worker's JobAvatar.
89
90 @param workerName: the name of the worker running this job
91 @type workerName: str
92 @param host: the host that is running the manager
93 @type host: str
94 @param port: port on which the manager is listening
95 @type port: int
96 @param transport: 'tcp' or 'ssl'
97 @type transport: str
98 @param authenticator: remote reference to the worker-side authenticator
99 @type authenticator: L{twisted.spread.pb.RemoteReference} to a
100 L{flumotion.twisted.pb.Authenticator}
101 @param packagePaths: ordered list of
102 (package name, package path) tuples
103 @type packagePaths: list of (str, str)
104 """
105 self._workerName = workerName
106 self._managerHost = host
107 self._managerPort = port
108 self._managerTransport = transport
109 if authenticator:
110 self._authenticator = fpb.RemoteAuthenticator(authenticator)
111 else:
112 self.debug('no authenticator, will not be able to log '
113 'into manager')
114 self._authenticator = None
115
116 packager = package.getPackager()
117 for name, path in packagePaths:
118 self.debug('registering package path for %s' % name)
119 self.log('... from path %s' % path)
120 packager.registerPackagePath(path, name)
121
124
126 """
127 I am called on by the worker's JobAvatar to run a function,
128 normally on behalf of the flumotion wizard.
129
130 @param moduleName: name of the module containing the function
131 @type moduleName: str
132 @param methodName: the method to run
133 @type methodName: str
134 @param args: args to pass to the method
135 @type args: tuple
136 @param kwargs: kwargs to pass to the method
137 @type kwargs: dict
138
139 @returns: the result of invoking the method
140 """
141 self.info('Running %s.%s(*%r, **%r)' % (moduleName, methodName,
142 args, kwargs))
143
144 self._enableCoreDumps()
145
146 return reflectCallCatching(errors.RemoteRunError, moduleName,
147 methodName, *args, **kwargs)
148
149 - def remote_create(self, avatarId, type, moduleName, methodName,
150 nice, conf):
151 """
152 I am called on by the worker's JobAvatar to create a component.
153
154 @param avatarId: avatarId for component to log in to manager
155 @type avatarId: str
156 @param type: type of component to start
157 @type type: str
158 @param moduleName: name of the module to create the component from
159 @type moduleName: str
160 @param methodName: the factory method to use to create the component
161 @type methodName: str
162 @param nice: the nice level
163 @type nice: int
164 @param conf: the component configuration
165 @type conf: dict
166 """
167 self.avatarId = avatarId
168 self.logName = avatarId
169
170 self.component = self._createComponent(avatarId, type, moduleName,
171 methodName, nice, conf)
172 self.component.setShutdownHook(self._componentStopped)
173
175
176 reactor.callLater(0, self.shutdown)
177
184
199
200
202 """
203 Shut down the job process completely, cleaning up the component
204 so the reactor can be left from.
205 """
206 if self._hasStoppedReactor:
207 self.debug("Not stopping reactor again, already shutting down")
208 else:
209 self._hasStoppedReactor = True
210 self.info("Stopping reactor in job process")
211 reactor.stop()
212
214 if not nice:
215 return
216
217 try:
218 os.nice(nice)
219 except OSError, e:
220 self.warning('Failed to set nice level: %s' % str(e))
221 else:
222 self.debug('Nice level set to %d' % nice)
223
225 soft, hard = resource.getrlimit(resource.RLIMIT_CORE)
226 if hard != resource.RLIM_INFINITY:
227 self.warning('Could not set unlimited core dump sizes, '
228 'setting to %d instead' % hard)
229 else:
230 self.debug('Enabling core dumps of unlimited size')
231
232 resource.setrlimit(resource.RLIMIT_CORE, (hard, hard))
233
234 - def _createComponent(self, avatarId, type, moduleName, methodName,
235 nice, conf):
236 """
237 Create a component of the given type.
238 Log in to the manager with the given avatarId.
239
240 @param avatarId: avatarId component will use to log in to manager
241 @type avatarId: str
242 @param type: type of component to start
243 @type type: str
244 @param moduleName: name of the module that contains the entry point
245 @type moduleName: str
246 @param methodName: name of the factory method to create the component
247 @type methodName: str
248 @param nice: the nice level to run with
249 @type nice: int
250 @param conf: the component configuration
251 @type conf: dict
252 """
253 self.info('Creating component "%s" of type "%s"', avatarId, type)
254
255 self._setNice(nice)
256 self._enableCoreDumps()
257
258 try:
259 comp = createComponent(moduleName, methodName, conf)
260 except Exception, e:
261 msg = "Exception %s during createComponent: %s" % (
262 e.__class__.__name__, " ".join(e.args))
263
264
265 if isinstance(e, errors.ComponentCreateError):
266 msg = e.args[0]
267 self.warning(
268 "raising ComponentCreateError(%s) and stopping job" % msg)
269
270
271
272
273
274
275
276 reactor.callLater(0.1, self.shutdown)
277 raise errors.ComponentCreateError(msg)
278
279 comp.setWorkerName(self._workerName)
280
281
282 self.debug('creating ComponentClientFactory')
283 managerClientFactory = component.ComponentClientFactory(comp)
284 self._componentClientFactory = managerClientFactory
285 self.debug('created ComponentClientFactory %r' % managerClientFactory)
286 self._authenticator.avatarId = avatarId
287 managerClientFactory.startLogin(self._authenticator)
288
289 host = self._managerHost
290 port = self._managerPort
291 transport = self._managerTransport
292 self.debug('logging in with authenticator %r' % self._authenticator)
293 if transport == "ssl":
294 from flumotion.common import common
295 common.assertSSLAvailable()
296 from twisted.internet import ssl
297 self.info('Connecting to manager %s:%d with SSL' % (host, port))
298 reactor.connectSSL(host, port, managerClientFactory,
299 ssl.ClientContextFactory())
300 elif transport == "tcp":
301 self.info('Connecting to manager %s:%d with TCP' % (host, port))
302 reactor.connectTCP(host, port, managerClientFactory)
303 else:
304 self.warning('Unknown transport protocol %s' % self._managerTransport)
305
306 return comp
307
309 """
310 A pb.Broker subclass that handles FDs being passed (with associated data)
311 over the same connection as the normal PB data stream.
312 When an FD is seen, the FD should be added to a given eater or feeder
313 element.
314 """
315 - def __init__(self, connectionClass, **kwargs):
316 """
317 @param connectionClass: a subclass of L{twisted.internet.tcp.Connection}
318 """
319 pb.Broker.__init__(self, **kwargs)
320
321 self._connectionClass = connectionClass
322
324
325 self.debug('received fds %r, message %r' % (fds, message))
326 if message.startswith('sendFeed '):
327 def parseargs(_, feedName, eaterId=None):
328 return feedName, eaterId
329 feedName, eaterId = parseargs(*message.split(' '))
330 self.factory.medium.component.feedToFD(feedName, fds[0],
331 os.close, eaterId)
332 elif message.startswith('receiveFeed '):
333 def parseargs2(_, eaterAlias, feedId=None):
334 return eaterAlias, feedId
335 eaterAlias, feedId = parseargs2(*message.split(' '))
336 self.factory.medium.component.eatFromFD(eaterAlias, feedId,
337 fds[0])
338 elif message == 'redirectStdout':
339 self.debug('told to rotate stdout to fd %d', fds[0])
340 os.dup2(fds[0], sys.stdout.fileno())
341 os.close(fds[0])
342 self.debug('rotated stdout')
343 elif message == 'redirectStderr':
344 self.debug('told to rotate stderr to fd %d', fds[0])
345 os.dup2(fds[0], sys.stderr.fileno())
346 os.close(fds[0])
347 self.info('rotated stderr')
348 else:
349 self.warning('Unknown message received: %r' % message)
350
352 """
353 I am a client factory that logs in to the WorkerBrain.
354 I live in the flumotion-job process spawned by the worker.
355
356 @cvar medium: the medium for the JobHeaven to access us through
357 @type medium: L{JobMedium}
358 """
359 logCategory = "job"
360 perspectiveInterface = interfaces.IJobMedium
361
375
376
381
382
383 - def login(self, username):
384 def haveReference(remoteReference):
385 self.info('Logged in to worker')
386 self.debug('perspective %r connected', remoteReference)
387 self.medium.setRemoteReference(remoteReference)
388
389 self.info('Logging in to worker')
390 d = pb.PBClientFactory.login(self,
391 credentials.UsernamePassword(username, ''),
392 self.medium)
393 d.addCallback(haveReference)
394 return d
395
396
397
398
399
404