Package flumotion :: Package component :: Module component
[hide private]

Source Code for Module flumotion.component.component

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects for components 
 24  """ 
 25   
 26  import os 
 27  import time 
 28  import socket 
 29   
 30  from twisted.internet import reactor, error, defer 
 31  from twisted.spread import pb 
 32  from twisted.python import reflect 
 33  from zope.interface import implements 
 34   
 35  from flumotion.configure import configure 
 36  from flumotion.common import interfaces, errors, log, planet, medium 
 37  from flumotion.common import componentui, common, messages 
 38  from flumotion.common import interfaces, reflectcall, debug 
 39  from flumotion.common.i18n import N_, gettexter 
 40  from flumotion.common.planet import moods 
 41  from flumotion.common.poller import Poller 
 42  from flumotion.twisted import credentials 
 43  from flumotion.twisted import pb as fpb 
 44   
 45   
 46  __version__ = "$Rev: 6982 $" 
 47  T_ = gettexter() 
 48   
49 -class ComponentClientFactory(fpb.ReconnectingFPBClientFactory):
50 """ 51 I am a client factory for a component logging in to the manager. 52 """ 53 logCategory = 'component' 54 perspectiveInterface = interfaces.IComponentMedium
55 - def __init__(self, component):
56 """ 57 @param component: L{flumotion.component.component.BaseComponent} 58 """ 59 # doing this as a class method triggers a doc error 60 fpb.ReconnectingFPBClientFactory.__init__(self) 61 62 self.component = component 63 # make a medium to interface with the manager 64 self.medium = component.componentMediumClass(component) 65 component.setMedium(self.medium) 66 67 self.maxDelay = 10 68 # get the interfaces implemented by the component medium class 69 #FIXME: interface 70 #self.interfaces = self.medium.__class__.__implements__ 71 72 self.logName = component.name
73
74 - def clientConnectionMade(self, broker):
75 self.medium.broker = broker 76 fpb.ReconnectingFPBClientFactory.clientConnectionMade(self, broker)
77 78 # vmethod implementation
79 - def gotDeferredLogin(self, d):
80 def remoteDisconnected(remoteReference): 81 if reactor.killed: 82 self.log('Connection to manager lost due to shutdown') 83 else: 84 self.warning('Lost connection to manager, ' 85 'will attempt to reconnect')
86 87 def loginCallback(reference): 88 self.info("Logged in to manager") 89 self.debug("remote reference %r" % reference) 90 91 self.medium.setRemoteReference(reference) 92 reference.notifyOnDisconnect(remoteDisconnected)
93 94 def loginFailedDisconnect(failure): 95 # We _always_ disconnect. Then, we may log a more specific failure 96 # message at a higher warning level. 97 self.debug('Login failed, reason: %s, disconnecting', failure) 98 self.disconnect() 99 return failure 100 101 def accessDeniedErrback(failure): 102 failure.trap(errors.NotAuthenticatedError) 103 self.warning('Access denied.') 104 105 def connectionRefusedErrback(failure): 106 failure.trap(error.ConnectionRefusedError) 107 self.warning('Connection to manager refused.') 108 109 def alreadyLoggedInErrback(failure): 110 failure.trap(errors.AlreadyConnectedError) 111 self.warning('Component with id %s is already logged in.', 112 self.medium.authenticator.avatarId) 113 114 def loginFailedErrback(failure): 115 self.warning('Login failed, reason: %s' % failure) 116 117 d.addCallback(loginCallback) 118 d.addErrback(loginFailedDisconnect) 119 d.addErrback(accessDeniedErrback) 120 d.addErrback(connectionRefusedErrback) 121 d.addErrback(alreadyLoggedInErrback) 122 d.addErrback(loginFailedErrback) 123 124 # we want to save the authenticator
125 - def startLogin(self, authenticator):
126 self.medium.setAuthenticator(authenticator) 127 return fpb.ReconnectingFPBClientFactory.startLogin(self, authenticator)
128
129 -def _maybeDeferredChain(procs, *args, **kwargs):
130 """ 131 Creates a deferred chain created by chaining calls to the given 132 procedures, each of them made with the given args and kwargs. 133 Only the result of the last procedure is returned; results for the 134 other procedures are discarded. 135 136 Failures triggered during any of the procedure short-circuit execution 137 of the other procedures and should be handled by the errbacks attached 138 to the deferred returned here. 139 140 @rtype: L{twisted.internet.defer.Deferred} 141 """ 142 def call_proc(_, p): 143 log.debug('', 'calling %r', p) 144 return p(*args, **kwargs)
145 p, procs = procs[0], procs[1:] 146 d = defer.maybeDeferred(call_proc, None, p) 147 for p in procs: 148 d.addCallback(call_proc, p) 149 return d 150 151 # needs to be before BaseComponent because BaseComponent references it
152 -class BaseComponentMedium(medium.PingingMedium):
153 """ 154 I am a medium interfacing with a manager-side avatar. 155 I implement a Referenceable for the manager's avatar to call on me. 156 I have a remote reference to the manager's avatar to call upon. 157 I am created by the L{ComponentClientFactory}. 158 159 @cvar authenticator: the authenticator used to log in to manager 160 @type authenticator: L{flumotion.twisted.pb.Authenticator} 161 """ 162 163 implements(interfaces.IComponentMedium) 164 logCategory = 'basecompmed' 165
166 - def __init__(self, component):
167 """ 168 @param component: L{flumotion.component.component.BaseComponent} 169 """ 170 self.comp = component 171 self.authenticator = None 172 self.broker = None
173
174 - def setRemoteReference(self, reference):
175 self.broker = None # We no longer need that reference 176 medium.PingingMedium.setRemoteReference(self, reference)
177 178 ### our methods
179 - def setup(self, config):
180 pass
181
182 - def getManagerIP(self):
183 """ 184 Return the manager IP as seen by us. 185 """ 186 assert self.remote or self.broker 187 broker = self.broker or self.remote.broker 188 peer = broker.transport.getPeer() 189 try: 190 host = peer.host 191 except AttributeError: 192 host = peer[1] 193 194 res = socket.gethostbyname(host) 195 self.debug("getManagerIP(): we think the manager's IP is %r" % res) 196 return res
197
198 - def getIP(self):
199 """ 200 Return the IP of this component based on connection to the manager. 201 202 Note: this is insufficient in general, and should be replaced by 203 network mapping stuff later. 204 """ 205 assert self.remote 206 host = self.remote.broker.transport.getHost() 207 self.debug("getIP(): using %r as our IP", host.host) 208 return host.host
209
210 - def setAuthenticator(self, authenticator):
211 """ 212 Set the authenticator the client factory has used to log in to the 213 manager. Can be reused by the component's medium to make 214 feed connections which also get authenticated by the manager's 215 bouncer. 216 217 @type authenticator: L{flumotion.twisted.pb.Authenticator} 218 """ 219 self.authenticator = authenticator
220 221 ### pb.Referenceable remote methods 222 ### called from manager by our avatar
223 - def remote_getState(self):
224 """ 225 Return the state of the component, which will be serialized to a 226 L{flumotion.common.planet.ManagerJobState} object. 227 228 @rtype: L{flumotion.common.planet.WorkerJobState} 229 @returns: state of component 230 """ 231 # we can only get the IP after we have a remote reference, so add it 232 # here 233 self.comp.state.set('manager-ip', self.getManagerIP()) 234 return self.comp.state
235
236 - def remote_getConfig(self):
237 """ 238 Return the configuration of the component. 239 240 @rtype: dict 241 @returns: component's current configuration 242 """ 243 return self.comp.config
244
245 - def remote_stop(self):
246 self.info('Stopping component') 247 return self.comp.stop()
248
249 - def remote_reloadComponent(self):
250 """Reload modules in the component.""" 251 from flumotion.common.reload import reloadFlumotion 252 reloadFlumotion()
253
254 - def remote_getUIState(self):
255 """Get a WorkerComponentUIState containing details needed to 256 present an admin-side UI state 257 """ 258 return self.comp.uiState
259
261 """ 262 Base implementation of getMasterClockInfo, can be overridden by 263 subclasses. By default, just returns None. 264 """ 265 return None
266
267 - def remote_getVersions(self):
268 return debug.getVersions()
269
270 - def remote_setFluDebug(self, debug):
271 """ 272 Sets the Flumotion debugging levels based on the passed debug string. 273 274 @since: 0.6.0 275 """ 276 self.debug('Setting Flumotion debug level to %s' % debug) 277 log.setDebug(debug)
278 279
280 -class BaseComponent(common.InitMixin, log.Loggable):
281 """ 282 I am the base class for all Flumotion components. 283 284 @ivar name: the name of the component 285 @type name: string 286 @ivar medium: the component's medium 287 @type medium: L{BaseComponentMedium} 288 @ivar uiState: state of the component to be shown in a UI. 289 Contains at least the following keys. 290 - cpu-percent: percentage of CPU use in last interval 291 - start-time: time when component was started, in epoch 292 seconds 293 - current-time: current time in epoch seconds, as seen on 294 component's machine, which might be out of 295 sync 296 - virtual-size: virtual memory size in bytes 297 Subclasses can add additional keys for their respective UI. 298 @type uiState: L{componentui.WorkerComponentUIState} 299 300 @cvar componentMediumClass: the medium class to use for this component 301 @type componentMediumClass: child class of L{BaseComponentMedium} 302 """ 303 304 logCategory = 'basecomp' 305 componentMediumClass = BaseComponentMedium 306
307 - def __init__(self, config, haveError=None):
308 """ 309 Subclasses should not override __init__ at all. 310 311 Instead, they should implement init(), which will be called 312 by this implementation automatically. 313 314 L{flumotion.common.common.InitMixin} for more details. 315 """ 316 self.debug("initializing %r with config %r", type(self), config) 317 self.config = config 318 self._haveError = haveError 319 320 # this will call self.init() for all implementors of init() 321 common.InitMixin.__init__(self) 322 323 self.setup()
324 325 # BaseComponent interface for subclasses related to component protocol
326 - def init(self):
327 """ 328 A subclass should do as little as possible in its init method. 329 In particular, it should not try to access resources. 330 331 Failures during init are marshalled back to the manager through 332 the worker's remote_create method, since there is no component state 333 proxied to the manager yet at the time of init. 334 """ 335 self.state = planet.WorkerJobState() 336 337 self.name = self.config['name'] 338 339 self.state.set('pid', os.getpid()) 340 self.setMood(moods.waking) 341 342 self.medium = None # the medium connecting us to the manager's avatar 343 344 self.uiState = componentui.WorkerComponentUIState() 345 self.uiState.addKey('cpu-percent') 346 self.uiState.addKey('start-time') 347 self.uiState.addKey('current-time') 348 self.uiState.addKey('virtual-size') 349 350 self.plugs = {} 351 352 self._happyWaits = [] 353 354 # Start the cpu-usage updating. 355 self._lastTime = time.time() 356 self._lastClock = time.clock() 357 self._cpuPoller = Poller(self._pollCPU, 5) 358 self._memoryPoller = Poller(self._pollMemory, 60) 359 360 self._shutdownHook = None
361
362 - def do_check(self):
363 """ 364 Subclasses can implement me to run any checks before the component 365 performs setup. 366 367 Messages can be added to the component state's 'messages' list key. 368 Any error messages added will trigger the component going to sad, 369 with L{flumotion.common.errors.ComponentSetupError} being raised 370 before getting to setup stage; do_setup() will not be called. 371 372 In the event of a fatal problem that can't be expressed through an 373 error message, this method should raise an exception or return a 374 failure. 375 376 It is not necessary to chain up in this function. The return 377 value may be a deferred. 378 """ 379 return defer.maybeDeferred(self.check_properties, 380 self.config['properties'], 381 self.addMessage)
382
383 - def check_properties(self, properties, addMessage):
384 """ 385 BaseComponent convenience vmethod for running checks. 386 387 A component implementation can override this method to run any 388 checks that it needs to. Typically, a check_properties 389 implementation will call the provided addMessage() callback to 390 note warnings or errors. For errors, addMessage() will set 391 component's mood to sad, which will abort the init process 392 before getting to do_setup(). 393 394 @param properties: The component's properties 395 @type properties: dict of string => object 396 @param addMessage: Thunk to add a message to the component 397 state. Will raise an exception if the 398 message is of level ERROR. 399 @type addMessage: L{flumotion.common.messages.Message} -> None 400 """ 401 pass
402
403 - def do_setup(self):
404 """ 405 Subclasses can implement me to set up the component before it is 406 started. It should set up the component, possibly opening files 407 and resources. 408 Non-programming errors should not be raised, but returned as a 409 failing deferred. 410 411 The return value may be a deferred. 412 """ 413 for socket, plugs in self.config['plugs'].items(): 414 self.plugs[socket] = [] 415 for plug in plugs: 416 entry = plug['entries']['default'] 417 instance = reflectcall.reflectCall(entry['module-name'], 418 entry['function-name'], 419 plug) 420 self.plugs[socket].append(instance) 421 self.debug('Starting plug %r on socket %s', 422 instance, socket) 423 instance.start(self) 424 425 # Call check methods, starting from the base class and working down to 426 # subclasses. 427 checks = common.get_all_methods(self, 'do_check', False) 428 429 def checkErrorCallback(result): 430 # if the mood is now sad, it means an error was encountered 431 # during check, and we should return a failure here. 432 # since the checks are responsible for adding a message, 433 # this is a handled error. 434 current = self.state.get('mood') 435 if current == moods.sad.value: 436 self.warning('Running checks made the component sad.') 437 raise errors.ComponentSetupHandledError()
438 439 checks.append(checkErrorCallback) 440 return _maybeDeferredChain(checks, self)
441
442 - def do_stop(self):
443 """ 444 BaseComponent vmethod for stopping. 445 The component should do any cleanup it needs, but must not set the 446 component's mood to sleeping. 447 448 @Returns: L{twisted.internet.defer.Deferred} 449 """ 450 for socket, plugs in self.plugs.items(): 451 for plug in plugs: 452 self.debug('Stopping plug %r on socket %s', plug, socket) 453 plug.stop(self) 454 455 for message in self.state.get('messages'): 456 # FIXME: not necessary 457 self.state.remove('messages', message) 458 459 if self._cpuPoller: 460 self._cpuPoller.stop() 461 self._cpuPoller = None 462 if self._memoryPoller: 463 self._memoryPoller.stop() 464 self._memoryPoller = None 465 466 if self._shutdownHook: 467 self.debug('_stoppedCallback: firing shutdown hook') 468 self._shutdownHook()
469 470 ### BaseComponent implementation related to compoment protocol
471 - def setup(self):
472 """ 473 Sets up the component. Called during __init__, so be sure not 474 to raise exceptions, instead adding messages to the component 475 state. 476 """ 477 def run_setups(): 478 setups = common.get_all_methods(self, 'do_setup', False) 479 return _maybeDeferredChain(setups, self)
480 481 def setup_complete(_): 482 self.debug('setup completed') 483 self.setup_completed() 484 485 def got_error(failure): 486 txt = log.getFailureMessage(failure) 487 self.debug('got_error: %s', txt) 488 if not failure.check(errors.ComponentSetupHandledError): 489 self.warning('Setup failed: %s', txt) 490 m = messages.Error(T_(N_("Could not setup component.")), 491 debug=txt, 492 mid="component-setup-%s" % self.name) 493 # will call setMood(moods.sad) 494 self.addMessage(m) 495 496 # swallow 497 return None 498 499 self.setMood(moods.waking) 500 self.uiState.set('start-time', time.time()) 501 502 d = run_setups() 503 d.addCallbacks(setup_complete, got_error) 504 # all status info via messages and the mood 505
506 - def setup_completed(self):
507 self.debug('turning happy') 508 self.setMood(moods.happy)
509
510 - def setShutdownHook(self, shutdownHook):
511 """ 512 Set the shutdown hook for this component (replacing any previous hook). 513 When a component is stopped, then this hook will be fired. 514 """ 515 self._shutdownHook = shutdownHook
516
517 - def stop(self):
518 """ 519 Tell the component to stop. 520 The connection to the manager will be closed. 521 The job process will also finish. 522 """ 523 self.debug('BaseComponent.stop') 524 525 # Set ourselves to waking while we're shutting down. 526 self.setMood(moods.waking) 527 528 # Run stop methods, starting from the subclass, up to this base class. 529 stops = common.get_all_methods(self, 'do_stop', True) 530 return _maybeDeferredChain(stops, self)
531 532 ### BaseComponent public methods
533 - def getName(self):
534 return self.name
535
536 - def setWorkerName(self, workerName):
537 self.state.set('workerName', workerName)
538
539 - def getWorkerName(self):
540 return self.state.get('workerName')
541
542 - def setMedium(self, medium):
543 assert isinstance(medium, BaseComponentMedium) 544 self.medium = medium 545 self.medium.logName = self.getName()
546
547 - def setMood(self, mood):
548 """ 549 Set the given mood on the component if it's different from the current 550 one. 551 """ 552 current = self.state.get('mood') 553 554 if current == mood.value: 555 self.log('already in mood %r' % mood) 556 return 557 elif current == moods.sad.value: 558 self.info('tried to set mood to %r, but already sad :-(' % mood) 559 return 560 561 self.doLog(log.DEBUG, -2, 'MOOD changed to %r by caller', mood) 562 self.state.set('mood', mood.value) 563 564 if mood == moods.happy: 565 while self._happyWaits: 566 self._happyWaits.pop(0).callback(None) 567 elif mood == moods.sad: 568 while self._happyWaits: 569 self._happyWaits.pop(0).errback(errors.ComponentStartError())
570
571 - def getMood(self):
572 """ 573 Gets the mood on the component. 574 575 @rtype: int 576 """ 577 return self.state.get('mood')
578
579 - def waitForHappy(self):
580 mood = self.getMood() 581 if mood == moods.happy.value: 582 return defer.succeed(None) 583 elif mood == moods.sad.value: 584 return defer.fail(errors.ComponentStartError()) 585 else: 586 d = defer.Deferred() 587 self._happyWaits.append(d) 588 return d
589
590 - def addMessage(self, message):
591 """ 592 Add a message to the component. 593 If any of the messages is an error, the component will turn sad. 594 595 @type message: L{flumotion.common.messages.Message} 596 """ 597 self.state.append('messages', message) 598 if message.level == messages.ERROR: 599 self.debug('error message, turning sad') 600 self.setMood(moods.sad) 601 if self._haveError: 602 self._haveError(message)
603
604 - def fixRenamedProperties(self, properties, list):
605 """ 606 Fix properties that have been renamed from a previous version, 607 and add a warning for them. 608 609 @param properties: properties; will be modified as a result. 610 @type properties: dict 611 @param list: list of (old, new) tuples of property names. 612 @type list: list of tuple of (str, str) 613 """ 614 found = [] 615 for old, new in list: 616 if properties.has_key(old): 617 found.append((old, new)) 618 619 if found: 620 m = messages.Warning(T_(N_( 621 "Your configuration uses deprecated properties. " 622 "Please update your configuration and correct them.\n")), 623 mid="deprecated") 624 for old, new in found: 625 m.add(T_(N_( 626 "Please rename '%s' to '%s'.\n"), 627 old, new)) 628 self.debug("Setting new property '%s' to %r", new, 629 properties[old]) 630 properties[new] = properties[old] 631 del properties[old] 632 self.addMessage(m)
633
634 - def adminCallRemote(self, methodName, *args, **kwargs):
635 """ 636 Call a remote method on all admin client views on this component. 637 638 This gets serialized through the manager and multiplexed to all 639 admin clients, and from there on to all views connected to each 640 admin client model. 641 642 Because there can be any number of admin clients that this call 643 will go out do, it does not make sense to have one return value. 644 This function will return None always. 645 """ 646 if self.medium: 647 self.medium.callRemote("adminCallRemote", methodName, 648 *args, **kwargs) 649 else: 650 self.debug('asked to adminCallRemote(%s, *%r, **%r), but ' 651 'no manager.' 652 % (methodName, args, kwargs))
653
654 - def _pollCPU(self):
655 # update CPU time stats 656 nowTime = time.time() 657 nowClock = time.clock() 658 deltaTime = nowTime - self._lastTime 659 deltaClock = nowClock - self._lastClock 660 self._lastTime = nowTime 661 self._lastClock = nowClock 662 # deltaClock can be < 0 if time.clock() wrapped around 663 if deltaClock >= 0: 664 CPU = deltaClock/deltaTime 665 self.log('latest CPU use: %r', CPU) 666 self.uiState.set('cpu-percent', CPU) 667 668 self.uiState.set('current-time', nowTime)
669
670 - def _pollMemory(self):
671 # Figure out our virtual memory size and report that. 672 # I don't know a nicer way to find vsize than groping /proc/ 673 handle = open('/proc/%d/stat' % os.getpid()) 674 line = handle.read() 675 handle.close() 676 fields = line.split() 677 # field 1 (comm) could potentially contain spaces and thus split over 678 # multiple list items, but our processes do not contain spaces 679 vsize = int(fields[22]) 680 self.log('vsize is %d', vsize) 681 self.uiState.set('virtual-size', vsize)
682