Package flumotion :: Package component :: Module component
[hide private]

Source Code for Module flumotion.component.component

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects for components 
 24  """ 
 25   
 26  import os 
 27  import sys 
 28  import time 
 29  import socket 
 30   
 31  import gobject 
 32   
 33  from twisted.internet import reactor, error, defer 
 34  from twisted.cred import error as crederror 
 35  from twisted.spread import pb 
 36  from twisted.python import reflect 
 37   
 38  from flumotion.common import interfaces, errors, log, planet, medium, pygobject 
 39  from flumotion.common import componentui, common, registry, messages, interfaces 
 40  from flumotion.common.planet import moods 
 41  from flumotion.configure import configure 
 42  from flumotion.twisted import credentials 
 43  from flumotion.twisted import pb as fpb 
 44  from flumotion.twisted.compat import implements 
 45  from flumotion.common.pygobject import gsignal 
 46   
 47  from flumotion.common.messages import N_ 
 48  T_ = messages.gettexter('flumotion') 
 49   
50 -class ComponentClientFactory(fpb.ReconnectingFPBClientFactory):
51 """ 52 I am a client factory for a component logging in to the manager. 53 """ 54 logCategory = 'component' 55 perspectiveInterface = interfaces.IComponentMedium
56 - def __init__(self, component):
57 """ 58 @param component: L{flumotion.component.component.BaseComponent} 59 """ 60 # doing this as a class method triggers a doc error 61 fpb.ReconnectingFPBClientFactory.__init__(self) 62 63 self.component = component 64 # make a medium to interface with the manager 65 self.medium = component.componentMediumClass(component) 66 component.setMedium(self.medium) 67 68 self.maxDelay = 10 69 # get the interfaces implemented by the component medium class 70 #FIXME: interface 71 #self.interfaces = self.medium.__class__.__implements__ 72 73 self.logName = component.name
74 75 # vmethod implementation
76 - def gotDeferredLogin(self, d):
77 def remoteDisconnected(remoteReference): 78 if reactor.killed: 79 self.log('Connection to manager lost due to shutdown') 80 else: 81 self.warning('Lost connection to manager, ' 82 'will attempt to reconnect')
83 84 def loginCallback(reference): 85 self.info("Logged in to manager") 86 self.debug("remote reference %r" % reference) 87 self._previously_connected = True 88 89 self.medium.setRemoteReference(reference) 90 reference.notifyOnDisconnect(remoteDisconnected)
91 92 def accessDeniedErrback(failure): 93 failure.trap(crederror.UnauthorizedLogin) 94 self.warning('Access denied.') 95 96 def connectionRefusedErrback(failure): 97 failure.trap(error.ConnectionRefusedError) 98 self.warning('Connection to manager refused.') 99 100 def alreadyLoggedInErrback(failure): 101 failure.trap(errors.AlreadyConnectedError) 102 self.warning('Component with id %s is already logged in.', 103 self.medium.authenticator.avatarId) 104 105 def loginFailedErrback(failure): 106 self.warning('Login failed, reason: %s' % failure) 107 108 d.addCallback(loginCallback) 109 d.addErrback(accessDeniedErrback) 110 d.addErrback(connectionRefusedErrback) 111 d.addErrback(alreadyLoggedInErrback) 112 d.addErrback(loginFailedErrback) 113 114 # we want to save the authenticator
115 - def startLogin(self, authenticator):
116 self.medium.setAuthenticator(authenticator) 117 return fpb.ReconnectingFPBClientFactory.startLogin(self, authenticator)
118 119 # needs to be before BaseComponent because BaseComponent references it
120 -class BaseComponentMedium(medium.PingingMedium):
121 """ 122 I am a medium interfacing with a manager-side avatar. 123 I implement a Referenceable for the manager's avatar to call on me. 124 I have a remote reference to the manager's avatar to call upon. 125 I am created by the L{ComponentClientFactory}. 126 127 @cvar authenticator: the authenticator used to log in to manager 128 @type authenticator: L{flumotion.twisted.pb.Authenticator} 129 """ 130 131 implements(interfaces.IComponentMedium) 132 logCategory = 'basecompmed' 133
134 - def __init__(self, component):
135 """ 136 @param component: L{flumotion.component.component.BaseComponent} 137 """ 138 self.comp = component 139 self.authenticator = None
140 141 ### our methods
142 - def setup(self, config):
143 pass
144
145 - def getManagerIP(self):
146 """ 147 Return the manager IP as seen by us. 148 """ 149 assert self.remote 150 peer = self.remote.broker.transport.getPeer() 151 try: 152 host = peer.host 153 except AttributeError: 154 host = peer[1] 155 156 res = socket.gethostbyname(host) 157 self.debug("getManagerIP(): we think the manager's IP is %r" % res) 158 return res
159
160 - def getIP(self):
161 """ 162 Return the IP of this component based on connection to the manager. 163 164 Note: this is insufficient in general, and should be replaced by 165 network mapping stuff later. 166 """ 167 assert self.remote 168 host = self.remote.broker.transport.getHost() 169 self.debug("getIP(): using %r as our IP", host.host) 170 return host.host
171
172 - def setAuthenticator(self, authenticator):
173 """ 174 Set the authenticator the client factory has used to log in to the 175 manager. Can be reused by the component's medium to make 176 feed connections which also get authenticated by the manager's 177 bouncer. 178 179 @type authenticator: L{flumotion.twisted.pb.Authenticator} 180 """ 181 self.authenticator = authenticator
182 183 ### pb.Referenceable remote methods 184 ### called from manager by our avatar
185 - def remote_getState(self):
186 """ 187 Return the state of the component, which will be serialized to a 188 L{flumotion.common.planet.ManagerJobState} object. 189 190 @rtype: L{flumotion.common.planet.WorkerJobState} 191 @returns: state of component 192 """ 193 # we can only get the IP after we have a remote reference, so add it 194 # here 195 self.comp.state.set('manager-ip', self.getManagerIP()) 196 return self.comp.state
197
198 - def remote_getConfig(self):
199 """ 200 Return the configuration of the component. 201 202 @rtype: dict 203 @returns: component's current configuration 204 """ 205 try: 206 return self.comp.config 207 except AttributeError: 208 self.debug('getConfig(), but component is not set up yet') 209 return None
210
211 - def remote_setup(self, config):
212 """ 213 Set up the component and the component's medium with the given config, 214 in that order. 215 """ 216 d = self.comp.setup(config) 217 d.addCallback(lambda r, c: self.setup(c), config) 218 return d
219
220 - def remote_start(self, *args, **kwargs):
221 return self.comp.start(*args, **kwargs)
222
223 - def remote_stop(self):
224 self.info('Stopping component') 225 return self.comp.stop()
226
227 - def remote_reloadComponent(self):
228 """Reload modules in the component.""" 229 import sys 230 from twisted.python.rebuild import rebuild 231 from twisted.python.reflect import filenameToModuleName 232 name = filenameToModuleName(__file__) 233 234 ## fixme: re-fetch bundles 235 236 # reload ourselves first 237 rebuild(sys.modules[name]) 238 239 # now rebuild relevant modules 240 import flumotion.common.reload 241 rebuild(sys.modules['flumotion.common']) 242 try: 243 flumotion.common.reload.reload() 244 except SyntaxError, msg: 245 raise errors.ReloadSyntaxError(msg) 246 self._reloaded()
247
248 - def remote_getUIState(self):
249 """Get a WorkerComponentUIState containing details needed to 250 present an admin-side UI state 251 """ 252 return self.comp.uiState
253 254 # separate method so it runs the newly reloaded one :)
255 - def _reloaded(self):
256 self.info('reloaded module code for %s' % __name__)
257
258 - def remote_callMethod(self, methodName, *args, **kwargs):
259 method = getattr(self.comp, 'remote_' + methodName, None) 260 if method: 261 return method(*args, **kwargs) 262 msg = "%r doesn't have method remote_%s" % (self.comp, methodName) 263 self.warning(msg) 264 raise errors.MoMethodError(msg)
265
266 -class BaseComponent(common.InitMixin, log.Loggable, gobject.GObject):
267 """ 268 I am the base class for all Flumotion components. 269 270 @ivar name: the name of the component 271 @type name: string 272 @ivar medium: the component's medium 273 @type medium: L{BaseComponentMedium} 274 275 @cvar componentMediumClass: the medium class to use for this component 276 @type componentMediumClass: child class of L{BaseComponentMedium} 277 """ 278 279 logCategory = 'basecomp' 280 componentMediumClass = BaseComponentMedium 281
282 - def __init__(self):
283 # FIXME: name is unique where ? only in flow, so not in worker 284 # need to use full path maybe ? 285 """ 286 Subclasses should not override __init__ at all. 287 288 Instead, they should implement init(), which will be called 289 by this implementation automatically. 290 291 See L{flumotion.common.common.InitMixin} for more details. 292 """ 293 gobject.GObject.__init__(self) 294 295 # this will call self.init() for all implementors of init() 296 common.InitMixin.__init__(self)
297 298 # BaseComponent interface for subclasses related to component protocol
299 - def init(self):
300 """ 301 A subclass should do as little as possible in its init method. 302 In particular, it should not try to access resources. 303 304 Failures during init are marshalled back to the manager through 305 the worker's remote_create method, since there is no component state 306 proxied to the manager yet at the time of init. 307 """ 308 self.state = planet.WorkerJobState() 309 310 self.name = None 311 312 #self.state.set('name', name) 313 self.state.set('pid', os.getpid()) 314 self.state.set('mood', moods.waking.value) 315 316 self.medium = None # the medium connecting us to the manager's avatar 317 318 self.uiState = componentui.WorkerComponentUIState() 319 320 # FIXME: when we need this somewhere else, put this in a class and 321 # use it that way 322 self.baseTime = time.time() 323 self.lastTime = time.time() 324 self.lastClock = time.clock() 325 326 self.plugs = {} 327 328 # Start the cpu-usage updating. 329 self._cpuCallLater = reactor.callLater(5, self._updateCPUUsage) 330 331 self._shutdownHook = None
332
333 - def do_check(self):
334 """ 335 Subclasses can implement me to run any checks before the component 336 performs setup. 337 338 Messages can be added to the component state's 'messages' list key. 339 Any error messages added will trigger the component going to sad 340 an L{flumotion.common.errors.ComponentSetupError} being raised; 341 do_setup() will not be called. 342 343 In the event of a fatal problem that can't be expressed through an 344 error message, this method should set the mood to sad and raise the 345 error on its own. 346 347 self.config will be set before this is called. 348 349 @Returns: L{twisted.internet.defer.Deferred} 350 """ 351 return defer.succeed(None)
352
353 - def do_setup(self):
354 """ 355 Subclasses can implement me to set up the component before it is 356 started. It should set up the component, possibly opening files 357 and resources. 358 Non-programming errors should not be raised, but returned as a 359 failing deferred. 360 361 self.config will be set before this is called. 362 363 @Returns: L{twisted.internet.defer.Deferred} 364 """ 365 return defer.succeed(None)
366
367 - def do_start(self, *args, **kwargs):
368 """ 369 BaseComponent vmethod for starting up. If you override this 370 method, you are responsible for arranging that the component 371 becomes happy. 372 373 @Returns: L{twisted.internet.defer.Deferred} 374 """ 375 # default behavior 376 self.setMood(moods.happy) 377 return defer.succeed(None)
378
379 - def do_stop(self):
380 """ 381 BaseComponent vmethod for stopping. 382 The component should do any cleanup it needs, but must not set the 383 component's mood to sleeping. 384 385 @Returns: L{twisted.internet.defer.Deferred} 386 """ 387 return defer.succeed(None)
388 389 ### BaseComponent implementation related to compoment protocol 390 ### called by manager through medium
391 - def setup(self, config, *args, **kwargs):
392 """ 393 Sets up the component with the given config. Called by the manager 394 through the medium. 395 396 @Returns: L{twisted.internet.defer.Deferred} 397 @raise flumotion.common.errors.ComponentSetupError: 398 when an error happened during setup of the component 399 """ 400 def setup_plugs(): 401 # by this time we have a medium, so we can load bundles 402 reg = registry.getRegistry() 403 404 def load_bundles(): 405 modules = {} 406 for plugs in config['plugs'].values(): 407 for plug in plugs: 408 modules[plug['type']] = True 409 for plugtype in modules.keys(): 410 # we got this far, it should work 411 entry = reg.getPlug(plugtype).getEntry() 412 modules[plugtype] = entry.getModuleName() 413 if not modules: 414 return defer.succeed(True) # shortcut 415 elif not self.medium: 416 self.warning('Not connected to a medium, cannot ' 417 'load bundles -- assuming all modules ' 418 'are available') 419 return defer.succeed(True) 420 else: 421 loader = self.medium.bundleLoader 422 return loader.getBundles(moduleName=modules.values())
423 424 def make_plugs(): 425 for socket, plugs in config['plugs'].items(): 426 self.plugs[socket] = [] 427 for plug in plugs: 428 entry = reg.getPlug(plug['type']).getEntry() 429 module = reflect.namedAny(entry.getModuleName()) 430 proc = getattr(module, entry.getFunction()) 431 instance = proc(plug) 432 self.plugs[socket].append(instance)
433 434 try: 435 d = load_bundles() 436 d.addCallback(lambda x: make_plugs()) 437 return d 438 except Exception, e: 439 self.debug("Exception while loading bundles: %s" % 440 log.getExceptionMessage(e)) 441 return defer.fail(e) 442 443 def checkErrorCallback(result): 444 # if the mood is now sad, it means an error was encountered 445 # during check, and we should return a failure here. 446 # since the checks are responsible for adding a message, 447 # this is a handled error. 448 current = self.state.get('mood') 449 if current == moods.sad.value: 450 self.warning('Running checks made the component sad.') 451 raise errors.ComponentSetupHandledError() 452 453 self.debug("setup() called with config %r", config) 454 self.setMood(moods.waking) 455 self._setConfig(config) 456 # now we have a name, set it on the medium too 457 if self.medium: 458 self.medium.logName = self.getName() 459 d = setup_plugs() 460 d.addCallback(lambda r: self.do_check()) 461 d.addCallback(checkErrorCallback) 462 d.addCallback(lambda r: self.do_setup()) 463 def setupErrback(failure): 464 # pass through handled errors 465 if failure.check(errors.ComponentSetupHandledError): 466 return failure 467 468 self.warning('Could not set up component: %s', 469 log.getFailureMessage(failure)) 470 m = messages.Error(T_(N_("Could not setup component.")), 471 debug=log.getFailureMessage(failure), 472 id="component-setup-%s" % self.name) 473 self.state.append('messages', m) 474 self.setMood(moods.sad) 475 raise errors.ComponentSetupHandledError( 476 'Could not set up component') 477 478 d.addErrback(setupErrback) 479 return d 480
481 - def start(self, *args, **kwargs):
482 """ 483 Tell the component to start. This is called when all its dependencies 484 are already started. 485 486 To hook onto this method, implement your own do_start method. 487 See BaseComponent.do_start() for what your do_start method is 488 responsible for doing. 489 490 Again, don't override this method. Thanks. 491 """ 492 self.debug('BaseComponent.start') 493 494 def start_plugs(): 495 for socket, plugs in self.plugs.items(): 496 for plug in plugs: 497 self.debug('Starting plug %r on socket %s', plug, socket) 498 plug.start(self)
499 500 try: 501 start_plugs() 502 ret = self.do_start(*args, **kwargs) 503 assert isinstance(ret, defer.Deferred), \ 504 "do_start %r must return a deferred" % self.do_start 505 self.debug('start: returning value %s' % ret) 506 return ret 507 except Exception, e: 508 self.debug("Exception during component do_start: %s" % 509 log.getExceptionMessage(e)) 510 return defer.fail(e) 511
512 - def setShutdownHook(self, shutdownHook):
513 """ 514 Set the shutdown hook for this component (replacing any previous hook). 515 When a component is stopped, then this hook will be fired. 516 """ 517 self._shutdownHook = shutdownHook
518
519 - def stop(self):
520 """ 521 Tell the component to stop. 522 The connection to the manager will be closed. 523 The job process will also finish. 524 """ 525 self.debug('BaseComponent.stop') 526 527 def stop_plugs(ret): 528 for socket, plugs in self.plugs.items(): 529 for plug in plugs: 530 self.debug('Stopping plug %r on socket %s', plug, socket) 531 plug.stop(self) 532 return ret
533 534 def fireShutdownHook(ret): 535 if self._shutdownHook: 536 self.debug('_stoppedCallback: firing shutdown hook') 537 self._shutdownHook() 538 539 self.setMood(moods.waking) 540 for message in self.state.get('messages'): 541 self.state.remove('messages', message) 542 543 if self._cpuCallLater: 544 self._cpuCallLater.cancel() 545 self._cpuCallLater = None 546 547 d = self.do_stop() 548 d.addCallback(stop_plugs) 549 d.addBoth(fireShutdownHook) 550 return d 551 552 ### GObject methods
553 - def emit(self, name, *args):
554 if 'uninitialized' in str(self): 555 self.warning('Uninitialized object!') 556 #self.__gobject_init__() 557 else: 558 gobject.GObject.emit(self, name, *args)
559 560 ### BaseComponent public methods
561 - def getName(self):
562 return self.name
563
564 - def setWorkerName(self, workerName):
565 self.state.set('workerName', workerName)
566
567 - def getWorkerName(self):
568 return self.state.get('workerName')
569
570 - def setMedium(self, medium):
571 assert isinstance(medium, BaseComponentMedium) 572 self.medium = medium
573
574 - def setMood(self, mood):
575 """ 576 Set the given mood on the component if it's different from the current 577 one. 578 """ 579 current = self.state.get('mood') 580 581 if current == mood.value: 582 self.log('already in mood %r' % mood) 583 return 584 elif current == moods.sad.value: 585 self.info('tried to set mood to %r, but already sad :-(' % mood) 586 return 587 588 self.debug('MOOD changed to %r' % mood) 589 self.state.set('mood', mood.value)
590
591 - def getMood(self):
592 """ 593 Gets the mood on the component. 594 595 @rtype: int 596 """ 597 return self.state.get('mood')
598 599
600 - def addMessage(self, message):
601 """ 602 Add a message to the component. 603 If any of the messages is an error, the component will turn sad. 604 605 @type message: L{flumotion.common.messages.Message} 606 """ 607 self.state.append('messages', message) 608 if message.level == messages.ERROR: 609 self.debug('error message, turning sad') 610 self.setMood(moods.sad)
611
612 - def fixRenamedProperties(self, properties, list):
613 """ 614 Fix properties that have been renamed from a previous version, 615 and add a warning for them. 616 617 @param properties: properties; will be modified as a result. 618 @type properties: dict 619 @param list: list of (old, new) tuples of property names. 620 @type list: list of tuple of (str, str) 621 """ 622 found = [] 623 for old, new in list: 624 if properties.has_key(old): 625 found.append((old, new)) 626 627 if found: 628 m = messages.Warning(T_(N_( 629 "Your configuration uses deprecated properties. " 630 "Please update your configuration and correct them.\n")), 631 id = "deprecated") 632 for old, new in found: 633 m.add(T_(N_( 634 "Please rename '%s' to '%s'.\n"), 635 old, new)) 636 self.debug("Setting new property '%s' to %r", new, 637 properties[old]) 638 properties[new] = properties[old] 639 del properties[old] 640 self.addMessage(m)
641
642 - def adminCallRemote(self, methodName, *args, **kwargs):
643 """ 644 Call a remote method on all admin client views on this component. 645 646 This gets serialized through the manager and multiplexed to all 647 admin clients, and from there on to all views connected to each 648 admin client model. 649 650 Because there can be any number of admin clients that this call 651 will go out do, it does not make sense to have one return value. 652 This function will return None always. 653 """ 654 if self.medium: 655 self.medium.callRemote("adminCallRemote", methodName, 656 *args, **kwargs) 657 else: 658 self.debug('asked to adminCallRemote(%s, *%r, **%r), but ' 659 'no manager.' 660 % (methodName, args, kwargs))
661 662 # private methods
663 - def _setConfig(self, config):
664 if self.name: 665 assert config['name'] == self.name, \ 666 "Can't change name while running" 667 self.config = config 668 self.name = config['name']
669
670 - def _updateCPUUsage(self):
671 # update CPU time stats 672 nowTime = time.time() 673 nowClock = time.clock() 674 deltaTime = nowTime - self.lastTime 675 deltaClock = nowClock - self.lastClock 676 CPU = deltaClock/deltaTime 677 self.log('latest CPU use: %r' % CPU) 678 self.state.set('cpu', CPU) 679 deltaTime = nowTime - self.baseTime 680 deltaClock = nowClock 681 CPU = deltaClock/deltaTime 682 self.lastTime = nowTime 683 self.lastClock = nowClock 684 685 self._cpuCallLater = reactor.callLater(5, self._updateCPUUsage)
686 687 pygobject.type_register(BaseComponent) 688