1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 worker-side objects for components
24 """
25
26 import os
27 import time
28 import socket
29
30 from twisted.internet import reactor, error, defer
31 from twisted.spread import pb
32 from twisted.python import reflect
33 from zope.interface import implements
34
35 from flumotion.configure import configure
36 from flumotion.common import interfaces, errors, log, planet, medium
37 from flumotion.common import componentui, common, messages
38 from flumotion.common import interfaces, reflectcall, debug
39 from flumotion.common.i18n import N_, gettexter
40 from flumotion.common.planet import moods
41 from flumotion.common.poller import Poller
42 from flumotion.twisted import credentials
43 from flumotion.twisted import pb as fpb
44
45
46 __version__ = "$Rev: 6982 $"
47 T_ = gettexter()
48
50 """
51 I am a client factory for a component logging in to the manager.
52 """
53 logCategory = 'component'
54 perspectiveInterface = interfaces.IComponentMedium
73
77
78
80 def remoteDisconnected(remoteReference):
81 if reactor.killed:
82 self.log('Connection to manager lost due to shutdown')
83 else:
84 self.warning('Lost connection to manager, '
85 'will attempt to reconnect')
86
87 def loginCallback(reference):
88 self.info("Logged in to manager")
89 self.debug("remote reference %r" % reference)
90
91 self.medium.setRemoteReference(reference)
92 reference.notifyOnDisconnect(remoteDisconnected)
93
94 def loginFailedDisconnect(failure):
95
96
97 self.debug('Login failed, reason: %s, disconnecting', failure)
98 self.disconnect()
99 return failure
100
101 def accessDeniedErrback(failure):
102 failure.trap(errors.NotAuthenticatedError)
103 self.warning('Access denied.')
104
105 def connectionRefusedErrback(failure):
106 failure.trap(error.ConnectionRefusedError)
107 self.warning('Connection to manager refused.')
108
109 def alreadyLoggedInErrback(failure):
110 failure.trap(errors.AlreadyConnectedError)
111 self.warning('Component with id %s is already logged in.',
112 self.medium.authenticator.avatarId)
113
114 def loginFailedErrback(failure):
115 self.warning('Login failed, reason: %s' % failure)
116
117 d.addCallback(loginCallback)
118 d.addErrback(loginFailedDisconnect)
119 d.addErrback(accessDeniedErrback)
120 d.addErrback(connectionRefusedErrback)
121 d.addErrback(alreadyLoggedInErrback)
122 d.addErrback(loginFailedErrback)
123
124
128
130 """
131 Creates a deferred chain created by chaining calls to the given
132 procedures, each of them made with the given args and kwargs.
133 Only the result of the last procedure is returned; results for the
134 other procedures are discarded.
135
136 Failures triggered during any of the procedure short-circuit execution
137 of the other procedures and should be handled by the errbacks attached
138 to the deferred returned here.
139
140 @rtype: L{twisted.internet.defer.Deferred}
141 """
142 def call_proc(_, p):
143 log.debug('', 'calling %r', p)
144 return p(*args, **kwargs)
145 p, procs = procs[0], procs[1:]
146 d = defer.maybeDeferred(call_proc, None, p)
147 for p in procs:
148 d.addCallback(call_proc, p)
149 return d
150
151
153 """
154 I am a medium interfacing with a manager-side avatar.
155 I implement a Referenceable for the manager's avatar to call on me.
156 I have a remote reference to the manager's avatar to call upon.
157 I am created by the L{ComponentClientFactory}.
158
159 @cvar authenticator: the authenticator used to log in to manager
160 @type authenticator: L{flumotion.twisted.pb.Authenticator}
161 """
162
163 implements(interfaces.IComponentMedium)
164 logCategory = 'basecompmed'
165
167 """
168 @param component: L{flumotion.component.component.BaseComponent}
169 """
170 self.comp = component
171 self.authenticator = None
172 self.broker = None
173
177
178
179 - def setup(self, config):
181
183 """
184 Return the manager IP as seen by us.
185 """
186 assert self.remote or self.broker
187 broker = self.broker or self.remote.broker
188 peer = broker.transport.getPeer()
189 try:
190 host = peer.host
191 except AttributeError:
192 host = peer[1]
193
194 res = socket.gethostbyname(host)
195 self.debug("getManagerIP(): we think the manager's IP is %r" % res)
196 return res
197
199 """
200 Return the IP of this component based on connection to the manager.
201
202 Note: this is insufficient in general, and should be replaced by
203 network mapping stuff later.
204 """
205 assert self.remote
206 host = self.remote.broker.transport.getHost()
207 self.debug("getIP(): using %r as our IP", host.host)
208 return host.host
209
211 """
212 Set the authenticator the client factory has used to log in to the
213 manager. Can be reused by the component's medium to make
214 feed connections which also get authenticated by the manager's
215 bouncer.
216
217 @type authenticator: L{flumotion.twisted.pb.Authenticator}
218 """
219 self.authenticator = authenticator
220
221
222
224 """
225 Return the state of the component, which will be serialized to a
226 L{flumotion.common.planet.ManagerJobState} object.
227
228 @rtype: L{flumotion.common.planet.WorkerJobState}
229 @returns: state of component
230 """
231
232
233 self.comp.state.set('manager-ip', self.getManagerIP())
234 return self.comp.state
235
237 """
238 Return the configuration of the component.
239
240 @rtype: dict
241 @returns: component's current configuration
242 """
243 return self.comp.config
244
246 self.info('Stopping component')
247 return self.comp.stop()
248
253
255 """Get a WorkerComponentUIState containing details needed to
256 present an admin-side UI state
257 """
258 return self.comp.uiState
259
261 """
262 Base implementation of getMasterClockInfo, can be overridden by
263 subclasses. By default, just returns None.
264 """
265 return None
266
269
271 """
272 Sets the Flumotion debugging levels based on the passed debug string.
273
274 @since: 0.6.0
275 """
276 self.debug('Setting Flumotion debug level to %s' % debug)
277 log.setDebug(debug)
278
279
281 """
282 I am the base class for all Flumotion components.
283
284 @ivar name: the name of the component
285 @type name: string
286 @ivar medium: the component's medium
287 @type medium: L{BaseComponentMedium}
288 @ivar uiState: state of the component to be shown in a UI.
289 Contains at least the following keys.
290 - cpu-percent: percentage of CPU use in last interval
291 - start-time: time when component was started, in epoch
292 seconds
293 - current-time: current time in epoch seconds, as seen on
294 component's machine, which might be out of
295 sync
296 - virtual-size: virtual memory size in bytes
297 Subclasses can add additional keys for their respective UI.
298 @type uiState: L{componentui.WorkerComponentUIState}
299
300 @cvar componentMediumClass: the medium class to use for this component
301 @type componentMediumClass: child class of L{BaseComponentMedium}
302 """
303
304 logCategory = 'basecomp'
305 componentMediumClass = BaseComponentMedium
306
307 - def __init__(self, config, haveError=None):
308 """
309 Subclasses should not override __init__ at all.
310
311 Instead, they should implement init(), which will be called
312 by this implementation automatically.
313
314 L{flumotion.common.common.InitMixin} for more details.
315 """
316 self.debug("initializing %r with config %r", type(self), config)
317 self.config = config
318 self._haveError = haveError
319
320
321 common.InitMixin.__init__(self)
322
323 self.setup()
324
325
327 """
328 A subclass should do as little as possible in its init method.
329 In particular, it should not try to access resources.
330
331 Failures during init are marshalled back to the manager through
332 the worker's remote_create method, since there is no component state
333 proxied to the manager yet at the time of init.
334 """
335 self.state = planet.WorkerJobState()
336
337 self.name = self.config['name']
338
339 self.state.set('pid', os.getpid())
340 self.setMood(moods.waking)
341
342 self.medium = None
343
344 self.uiState = componentui.WorkerComponentUIState()
345 self.uiState.addKey('cpu-percent')
346 self.uiState.addKey('start-time')
347 self.uiState.addKey('current-time')
348 self.uiState.addKey('virtual-size')
349
350 self.plugs = {}
351
352 self._happyWaits = []
353
354
355 self._lastTime = time.time()
356 self._lastClock = time.clock()
357 self._cpuPoller = Poller(self._pollCPU, 5)
358 self._memoryPoller = Poller(self._pollMemory, 60)
359
360 self._shutdownHook = None
361
363 """
364 Subclasses can implement me to run any checks before the component
365 performs setup.
366
367 Messages can be added to the component state's 'messages' list key.
368 Any error messages added will trigger the component going to sad,
369 with L{flumotion.common.errors.ComponentSetupError} being raised
370 before getting to setup stage; do_setup() will not be called.
371
372 In the event of a fatal problem that can't be expressed through an
373 error message, this method should raise an exception or return a
374 failure.
375
376 It is not necessary to chain up in this function. The return
377 value may be a deferred.
378 """
379 return defer.maybeDeferred(self.check_properties,
380 self.config['properties'],
381 self.addMessage)
382
384 """
385 BaseComponent convenience vmethod for running checks.
386
387 A component implementation can override this method to run any
388 checks that it needs to. Typically, a check_properties
389 implementation will call the provided addMessage() callback to
390 note warnings or errors. For errors, addMessage() will set
391 component's mood to sad, which will abort the init process
392 before getting to do_setup().
393
394 @param properties: The component's properties
395 @type properties: dict of string => object
396 @param addMessage: Thunk to add a message to the component
397 state. Will raise an exception if the
398 message is of level ERROR.
399 @type addMessage: L{flumotion.common.messages.Message} -> None
400 """
401 pass
402
404 """
405 Subclasses can implement me to set up the component before it is
406 started. It should set up the component, possibly opening files
407 and resources.
408 Non-programming errors should not be raised, but returned as a
409 failing deferred.
410
411 The return value may be a deferred.
412 """
413 for socket, plugs in self.config['plugs'].items():
414 self.plugs[socket] = []
415 for plug in plugs:
416 entry = plug['entries']['default']
417 instance = reflectcall.reflectCall(entry['module-name'],
418 entry['function-name'],
419 plug)
420 self.plugs[socket].append(instance)
421 self.debug('Starting plug %r on socket %s',
422 instance, socket)
423 instance.start(self)
424
425
426
427 checks = common.get_all_methods(self, 'do_check', False)
428
429 def checkErrorCallback(result):
430
431
432
433
434 current = self.state.get('mood')
435 if current == moods.sad.value:
436 self.warning('Running checks made the component sad.')
437 raise errors.ComponentSetupHandledError()
438
439 checks.append(checkErrorCallback)
440 return _maybeDeferredChain(checks, self)
441
443 """
444 BaseComponent vmethod for stopping.
445 The component should do any cleanup it needs, but must not set the
446 component's mood to sleeping.
447
448 @Returns: L{twisted.internet.defer.Deferred}
449 """
450 for socket, plugs in self.plugs.items():
451 for plug in plugs:
452 self.debug('Stopping plug %r on socket %s', plug, socket)
453 plug.stop(self)
454
455 for message in self.state.get('messages'):
456
457 self.state.remove('messages', message)
458
459 if self._cpuPoller:
460 self._cpuPoller.stop()
461 self._cpuPoller = None
462 if self._memoryPoller:
463 self._memoryPoller.stop()
464 self._memoryPoller = None
465
466 if self._shutdownHook:
467 self.debug('_stoppedCallback: firing shutdown hook')
468 self._shutdownHook()
469
470
472 """
473 Sets up the component. Called during __init__, so be sure not
474 to raise exceptions, instead adding messages to the component
475 state.
476 """
477 def run_setups():
478 setups = common.get_all_methods(self, 'do_setup', False)
479 return _maybeDeferredChain(setups, self)
480
481 def setup_complete(_):
482 self.debug('setup completed')
483 self.setup_completed()
484
485 def got_error(failure):
486 txt = log.getFailureMessage(failure)
487 self.debug('got_error: %s', txt)
488 if not failure.check(errors.ComponentSetupHandledError):
489 self.warning('Setup failed: %s', txt)
490 m = messages.Error(T_(N_("Could not setup component.")),
491 debug=txt,
492 mid="component-setup-%s" % self.name)
493
494 self.addMessage(m)
495
496
497 return None
498
499 self.setMood(moods.waking)
500 self.uiState.set('start-time', time.time())
501
502 d = run_setups()
503 d.addCallbacks(setup_complete, got_error)
504
505
507 self.debug('turning happy')
508 self.setMood(moods.happy)
509
511 """
512 Set the shutdown hook for this component (replacing any previous hook).
513 When a component is stopped, then this hook will be fired.
514 """
515 self._shutdownHook = shutdownHook
516
518 """
519 Tell the component to stop.
520 The connection to the manager will be closed.
521 The job process will also finish.
522 """
523 self.debug('BaseComponent.stop')
524
525
526 self.setMood(moods.waking)
527
528
529 stops = common.get_all_methods(self, 'do_stop', True)
530 return _maybeDeferredChain(stops, self)
531
532
535
537 self.state.set('workerName', workerName)
538
541
546
548 """
549 Set the given mood on the component if it's different from the current
550 one.
551 """
552 current = self.state.get('mood')
553
554 if current == mood.value:
555 self.log('already in mood %r' % mood)
556 return
557 elif current == moods.sad.value:
558 self.info('tried to set mood to %r, but already sad :-(' % mood)
559 return
560
561 self.doLog(log.DEBUG, -2, 'MOOD changed to %r by caller', mood)
562 self.state.set('mood', mood.value)
563
564 if mood == moods.happy:
565 while self._happyWaits:
566 self._happyWaits.pop(0).callback(None)
567 elif mood == moods.sad:
568 while self._happyWaits:
569 self._happyWaits.pop(0).errback(errors.ComponentStartError())
570
572 """
573 Gets the mood on the component.
574
575 @rtype: int
576 """
577 return self.state.get('mood')
578
589
591 """
592 Add a message to the component.
593 If any of the messages is an error, the component will turn sad.
594
595 @type message: L{flumotion.common.messages.Message}
596 """
597 self.state.append('messages', message)
598 if message.level == messages.ERROR:
599 self.debug('error message, turning sad')
600 self.setMood(moods.sad)
601 if self._haveError:
602 self._haveError(message)
603
605 """
606 Fix properties that have been renamed from a previous version,
607 and add a warning for them.
608
609 @param properties: properties; will be modified as a result.
610 @type properties: dict
611 @param list: list of (old, new) tuples of property names.
612 @type list: list of tuple of (str, str)
613 """
614 found = []
615 for old, new in list:
616 if properties.has_key(old):
617 found.append((old, new))
618
619 if found:
620 m = messages.Warning(T_(N_(
621 "Your configuration uses deprecated properties. "
622 "Please update your configuration and correct them.\n")),
623 mid="deprecated")
624 for old, new in found:
625 m.add(T_(N_(
626 "Please rename '%s' to '%s'.\n"),
627 old, new))
628 self.debug("Setting new property '%s' to %r", new,
629 properties[old])
630 properties[new] = properties[old]
631 del properties[old]
632 self.addMessage(m)
633
635 """
636 Call a remote method on all admin client views on this component.
637
638 This gets serialized through the manager and multiplexed to all
639 admin clients, and from there on to all views connected to each
640 admin client model.
641
642 Because there can be any number of admin clients that this call
643 will go out do, it does not make sense to have one return value.
644 This function will return None always.
645 """
646 if self.medium:
647 self.medium.callRemote("adminCallRemote", methodName,
648 *args, **kwargs)
649 else:
650 self.debug('asked to adminCallRemote(%s, *%r, **%r), but '
651 'no manager.'
652 % (methodName, args, kwargs))
653
655
656 nowTime = time.time()
657 nowClock = time.clock()
658 deltaTime = nowTime - self._lastTime
659 deltaClock = nowClock - self._lastClock
660 self._lastTime = nowTime
661 self._lastClock = nowClock
662
663 if deltaClock >= 0:
664 CPU = deltaClock/deltaTime
665 self.log('latest CPU use: %r', CPU)
666 self.uiState.set('cpu-percent', CPU)
667
668 self.uiState.set('current-time', nowTime)
669
671
672
673 handle = open('/proc/%d/stat' % os.getpid())
674 line = handle.read()
675 handle.close()
676 fields = line.split()
677
678
679 vsize = int(fields[22])
680 self.log('vsize is %d', vsize)
681 self.uiState.set('virtual-size', vsize)
682