Package flumotion :: Package admin :: Module admin
[hide private]

Source Code for Module flumotion.admin.admin

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  model abstraction for administration clients supporting different views 
 24  """ 
 25   
 26  import sys 
 27  import gobject 
 28   
 29  from twisted.spread import pb 
 30  from twisted.internet import error, defer, reactor 
 31  from twisted.cred import error as crederror 
 32  from twisted.python import rebuild, reflect, failure 
 33   
 34  from flumotion.common import common, errors, interfaces, log, pygobject 
 35  from flumotion.common import keycards, worker, planet, medium, package, messages 
 36  # serializable worker and component state 
 37  from flumotion.twisted import flavors 
 38  from flumotion.twisted.defer import defer_generator_method 
 39   
 40  from flumotion.configure import configure 
 41  from flumotion.common import reload, connection 
 42  from flumotion.twisted import credentials 
 43  from flumotion.twisted import pb as fpb 
 44  from flumotion.twisted.compat import implements 
 45   
 46  from flumotion.common.pygobject import gsignal, gproperty 
 47   
 48  from flumotion.common.messages import N_ 
 49  T_ = messages.gettexter('flumotion') 
 50   
51 -class AdminClientFactory(fpb.ReconnectingFPBClientFactory):
52 perspectiveInterface = interfaces.IAdminMedium 53
54 - def __init__(self, medium, extraTenacious=False):
55 """ 56 @type medium: AdminModel 57 """ 58 fpb.ReconnectingFPBClientFactory.__init__(self) 59 self.medium = medium 60 self.maxDelay = 20 61 62 self.extraTenacious = extraTenacious 63 self.hasBeenConnected = 0
64
65 - def clientConnectionMade(self, broker):
66 self.hasBeenConnected = 1 67 68 fpb.ReconnectingFPBClientFactory.clientConnectionMade(self, broker)
69
70 - def clientConnectionFailed(self, connector, reason):
71 """ 72 @param reason: L{twisted.spread.pb.failure.Failure} 73 """ 74 if reason.check(error.DNSLookupError): 75 self.debug('DNS lookup error') 76 if not self.extraTenacious: 77 self.medium.connectionFailed(reason) 78 return 79 elif (reason.check(error.ConnectionRefusedError) 80 or reason.check(error.ConnectError)): 81 # If we're logging in for the first time, we want to make this a 82 # real error; we present a dialog, etc. 83 # However, if we fail later on (e.g. manager shut down, and 84 # hasn't yet been restarted), we want to keep trying to reconnect, 85 # so we just log a message. 86 self.debug("Error connecting: %s", log.getFailureMessage(reason)) 87 if self.hasBeenConnected: 88 self.log("we've been connected before though, so going " 89 "to retry") 90 # fall through 91 elif self.extraTenacious: 92 self.log("trying again due to +100 tenacity") 93 # fall through 94 else: 95 self.log("telling medium about connection failure") 96 self.medium.connectionFailed(reason) 97 # return 98 return 99 100 fpb.ReconnectingFPBClientFactory.clientConnectionFailed(self, 101 connector, reason) 102 # delay is now updated 103 self.debug("will try reconnect in %f seconds" % self.delay)
104 105 # vmethod implementation
106 - def gotDeferredLogin(self, d):
107 yield d 108 109 try: 110 try: 111 result = d.value() 112 assert result 113 except Exception, e: 114 if self.extraTenacious: 115 self.debug('connection problem: %s', 116 log.getExceptionMessage(e)) 117 self.debug('we are tenacious, so trying again later') 118 self.disconnect() 119 yield None 120 else: 121 raise 122 # if it's not a reference, we need to respond to a 123 # challenge... 124 if not isinstance(result, pb.RemoteReference): 125 keycard = result 126 keycard.setPassword(self.passwd) 127 self.log("_loginCallback: responding to challenge") 128 d = self.login(keycard, self.medium, interfaces.IAdminMedium) 129 yield d 130 result = d.value() 131 132 self.medium.setRemoteReference(result) 133 134 except errors.ConnectionFailedError: 135 self.debug("emitting connection-failed") 136 self.medium.emit('connection-failed', "I failed my master") 137 self.debug("emitted connection-failed") 138 139 except errors.ConnectionRefusedError: 140 self.debug("emitting connection-refused") 141 self.medium.emit('connection-refused') 142 self.debug("emitted connection-refused") 143 144 except crederror.UnauthorizedLogin: 145 # FIXME: unauthorized login emit ! 146 self.debug("emitting connection-refused") 147 self.medium.emit('connection-refused') 148 self.debug("emitted connection-refused") 149 150 except Exception, e: 151 self.medium.emit('connection-error', e) 152 self.medium._defaultErrback(failure.Failure(e))
153 154 gotDeferredLogin = defer_generator_method(gotDeferredLogin)
155 156 # FIXME: stop using signals, we can provide a richer interface with actual 157 # objects and real interfaces for the views a model communicates with
158 -class AdminModel(medium.PingingMedium, gobject.GObject):
159 """ 160 I live in the admin client. 161 I am a data model for any admin view implementing a UI to 162 communicate with one manager. 163 I send signals when things happen. 164 165 Manager calls on us through L{flumotion.manager.admin.AdminAvatar} 166 """ 167 gsignal('connected') 168 gsignal('disconnected') 169 gsignal('connection-refused') 170 gsignal('connection-failed', str) 171 gsignal('connection-error', object) 172 gsignal('component-property-changed', str, str, object) 173 gsignal('reloading', str) 174 gsignal('message', str) 175 gsignal('update') 176 177 logCategory = 'adminmodel' 178 179 implements(interfaces.IAdminMedium, flavors.IStateListener) 180 181 # Public instance variables (read-only) 182 planet = None 183
184 - def __init__(self, authenticator):
185 self.__gobject_init__() 186 187 # All of these instance variables are private. Cuidado cabrones! 188 self.authenticator = authenticator 189 self.host = self.port = self.use_insecure = None 190 191 self.managerId = '<uninitialized>' 192 193 self.state = 'disconnected' 194 self.clientFactory = self._makeFactory(authenticator) 195 # 20 secs max for an admin to reconnect 196 self.clientFactory.maxDelay = 20 197 198 self._components = {} # dict of components 199 self.planet = None 200 self._workerHeavenState = None
201 202 # a method so mock testing frameworks can override it
203 - def _makeFactory(self, authenticator):
204 # FIXME: this needs further refactoring, so we only ever pass 205 # an authenticator. For that we need to fix all users of this 206 # class too 207 factory = AdminClientFactory(self) 208 factory.startLogin(authenticator) 209 return factory
210
211 - def connectToHost(self, host, port, use_insecure=False, 212 keep_trying=False):
213 'Connect to a host.' 214 self.host = host 215 self.port = port 216 self.use_insecure = use_insecure 217 218 # the intention here is to give an id unique to the manager -- 219 # if a program is adminning multiple managers, this id should 220 # tell them apart (and identify duplicates) 221 info = connection.PBConnectionInfo(host, port, not use_insecure, 222 self.authenticator) 223 self.managerId = str(info) 224 225 self.info('Connecting to manager %s with %s', 226 self.managerId, use_insecure and 'TCP' or 'SSL') 227 if keep_trying: 228 self.info('AdminClientFactory, now with extra tenacity') 229 self.clientFactory.extraTenacious = True 230 231 if use_insecure: 232 reactor.connectTCP(host, port, self.clientFactory) 233 else: 234 from twisted.internet import ssl 235 reactor.connectSSL(host, port, self.clientFactory, 236 ssl.ClientContextFactory()) 237 238 def connected(model, d, ids): 239 map(model.disconnect, ids) 240 d.callback(model)
241 242 def connection_refused(model, d, ids): 243 map(model.disconnect, ids) 244 d.errback(errors.ConnectionRefusedError())
245 246 def connection_failed(model, reason, d, ids): 247 map(model.disconnect, ids) 248 d.errback(errors.ConnectionFailedError(reason)) 249 250 def connection_error(model, exception, d, ids): 251 map(model.disconnect, ids) 252 d.errback(exception) 253 254 d = defer.Deferred() 255 ids = [] 256 ids.append(self.connect('connected', connected, d, ids)) 257 ids.append(self.connect('connection-refused', 258 connection_refused, d, ids)) 259 ids.append(self.connect('connection-failed', 260 connection_failed, d, ids)) 261 ids.append(self.connect('connection-error', 262 connection_error, d, ids)) 263 return d 264 265 # default Errback 266 # FIXME: we can set it up with a list of types not to warn for ?
267 - def _defaultErrback(self, failure):
268 self.debug('Possibly unhandled deferred failure: %r (%s)' % ( 269 failure, failure.getErrorMessage())) 270 return failure
271
272 - def reconnect(self):
273 self.debug('asked to log in again') 274 self.clientFactory.stopTrying() 275 # this also makes it try to connect again 276 self.clientFactory.resetDelay() 277 self.connectToHost(self.host, self.port, self.use_insecure)
278 279 # FIXME: give these three sensible names
280 - def adminInfoStr(self):
281 return self.managerId
282
283 - def connectionInfoStr(self):
284 return '%s:%s (%s)' % (self.host, self.port, 285 self.use_insecure and 'http' or 'https')
286 287 # used in fgc
288 - def managerInfoStr(self):
289 assert self.planet 290 return '%s (%s:%s)' % (self.planet.get('name'), self.host, self.port)
291
292 - def connectionFailed(self, failure):
293 # called by client factory 294 if failure.check(error.DNSLookupError): 295 message = "Could not look up host '%s'." % self.host 296 elif (failure.check(error.ConnectionRefusedError) 297 or failure.check(error.ConnectionRefusedError)): 298 message = ("Could not connect to host '%s' on port %d." 299 % (self.host, self.port)) 300 else: 301 message = ("Unexpected failure.\nDebug information: %s" 302 % log.getFailureMessage (failure)) 303 self.debug('emitting connection-failed') 304 self.emit('connection-failed', message) 305 self.debug('emitted connection-failed')
306
307 - def setRemoteReference(self, remoteReference):
308 self.debug("setRemoteReference %r" % remoteReference) 309 def writeConnection(): 310 if not (self.authenticator.username 311 and self.authenticator.password): 312 self.log('not caching connection information') 313 return 314 s = ''.join(['<connection>', 315 '<host>%s</host>' % self.host, 316 '<manager>%s</manager>' % self.planet.get('name'), 317 '<port>%d</port>' % self.port, 318 '<use_insecure>%d</use_insecure>' 319 % (self.use_insecure and 1 or 0), 320 '<user>%s</user>' % self.authenticator.username, 321 '<passwd>%s</passwd>' % self.authenticator.password, 322 '</connection>']) 323 324 import os 325 import md5 326 sum = md5.new(s).hexdigest() 327 f = os.path.join(configure.registrydir, '%s.connection' % sum) 328 try: 329 h = open(f, 'w') 330 h.write(s) 331 h.close() 332 except Exception, e: 333 self.info('failed to write connection cache file %s: %s', 334 f, log.getExceptionMessage(e))
335 336 # chain up 337 medium.PingingMedium.setRemoteReference(self, remoteReference) 338 339 # fixme: push the disconnect notification upstream 340 def remoteDisconnected(remoteReference): 341 self.debug("emitting disconnected") 342 self.state = 'disconnected' 343 self.emit('disconnected') 344 self.debug("emitted disconnected") 345 self.remote.notifyOnDisconnect(remoteDisconnected) 346 347 d = self.callRemote('getPlanetState') 348 yield d 349 self.planet = d.value() 350 # monkey, Monkey, MONKEYPATCH!!!!! 351 self.planet.admin = self 352 self.debug('got planet state') 353 354 d = self.callRemote('getWorkerHeavenState') 355 yield d 356 self._workerHeavenState = d.value() 357 self.debug('got worker state') 358 359 writeConnection() 360 361 self.debug('Connected to manager and retrieved all state') 362 self.state = 'connected' 363 self.emit('connected') 364 setRemoteReference = defer_generator_method(setRemoteReference) 365 366 ### pb.Referenceable methods
367 - def remote_log(self, category, type, message):
368 self.log('remote: %s: %s: %s' % (type, category, message))
369 370 # IStateListener interface
371 - def stateSet(self, state, key, value):
372 self.debug("state set on %r: key %s" % (state, key))
373
374 - def stateAppend(self, state, key, value):
375 self.debug("state append on %r: key %s" % (state, key))
376 377 # if a flow gets added to a planet, add ourselves as a listener 378
379 - def stateRemove(self, state, key, value):
380 self.debug("state remove on %r: key %s" % (state, key))
381 382 ### model functions; called by UI's to send requests to manager or comp 383 384 ## view management functions 385 # FIXME: what is this crap ? strings as enums ?
386 - def isConnected(self):
387 return self.state == 'connected'
388
389 - def shutdown(self):
390 self.debug('shutting down') 391 if self.state != 'disconnected': 392 self.clientFactory.disconnect() 393 self.clientFactory.stopTrying()
394 395 ## generic remote call methods
396 - def componentCallRemote(self, componentState, methodName, *args, **kwargs):
397 """ 398 Call the given method on the given component with the given args. 399 400 @param componentState: component to call the method on 401 @type componentState: L{flumotion.common.planet.AdminComponentState} 402 @param methodName: name of method to call; serialized to a 403 remote_methodName on the worker's medium 404 405 @rtype: L{twisted.internet.defer.Deferred} 406 """ 407 assert isinstance(componentState, planet.AdminComponentState), \ 408 "componentState %r is of the wrong type calling %s" % ( 409 componentState, methodName) 410 componentName = componentState.get('name') 411 412 self.debug('Calling remote method %s on component %s' % ( 413 methodName, componentName)) 414 d = self.callRemote('componentCallRemote', 415 componentState, methodName, 416 *args, **kwargs) 417 d.addCallback(self._callRemoteCallback, methodName, componentName) 418 def errback(failure): 419 msg = None 420 if failure.check(errors.NoMethodError): 421 msg = "Remote method '%s' does not exist." % methodName 422 msg += "\n" + failure.value 423 else: 424 msg = log.getFailureMessage(failure) 425 426 # FIXME: we probably need a nicer way of getting component 427 # messages shown from the admin model, but this allows us to 428 # make sure every type of admin has these messages 429 self.warning(msg) 430 m = messages.Warning(T_(N_("Internal error in component.")), 431 debug=msg) 432 componentState.observe_append('messages', m) 433 return failure
434 435 d.addErrback(errback) 436 # FIXME: dialog for other errors ? 437 return d 438
439 - def _callRemoteCallback(self, result, methodName, componentName):
440 self.debug('Called remote method %s on component %s successfully' % ( 441 methodName, componentName)) 442 return result
443
444 - def workerCallRemote(self, workerName, methodName, *args, **kwargs):
445 """ 446 Call the the given method on the given worker with the given args. 447 448 @param workerName: name of the worker to call the method on 449 @param methodName: name of method to call; serialized to a 450 remote_methodName on the worker's medium 451 452 @rtype: L{twisted.internet.defer.Deferred} 453 """ 454 r = common.argRepr(args, kwargs, max=20) 455 self.debug('calling remote method %s(%s) on worker %s' % (methodName, r, 456 workerName)) 457 d = self.callRemote('workerCallRemote', workerName, 458 methodName, *args, **kwargs) 459 d.addErrback(self._callRemoteErrback, "worker", 460 workerName, methodName) 461 return d
462
463 - def _callRemoteErrback(self, failure, type, name, methodName):
464 print "THOMAS: errback: failure %r" % failure 465 if failure.check(errors.NoMethodError): 466 self.warning("method '%s' on component '%s' does not exist, " 467 "component bug" % (methodName, name)) 468 else: 469 self.debug("passing through failure on remote call to %s(%s): %r" % 470 (name, methodName, failure)) 471 472 # FIXME: throw up some sort of dialog with debug info 473 return failure
474 475 ## component remote methods
476 - def setProperty(self, componentState, element, property, value):
477 """ 478 @type componentState: L{flumotion.common.planet.AdminComponentState} 479 """ 480 return self.componentCallRemote(componentState, 'setElementProperty', 481 element, property, value)
482
483 - def getProperty(self, componentState, element, property):
484 """ 485 @type componentState: L{flumotion.common.planet.AdminComponentState} 486 """ 487 return self.componentCallRemote(componentState, 'getElementProperty', 488 element, property)
489 490 ## reload methods for everything
491 - def reloadAdmin(self):
492 name = reflect.filenameToModuleName(__file__) 493 494 self.info('Reloading admin code') 495 self.debug("rebuilding '%s'" % name) 496 rebuild.rebuild(sys.modules[name]) 497 self.debug("reloading '%s'" % name) 498 reload.reload() 499 self.info('Reloaded admin code')
500
501 - def reload(self):
502 # XXX: reload admin.py too 503 d = defer.execute(self.reloadAdmin) 504 505 d = d.addCallback(lambda result, self: self.reloadManager(), self) 506 d.addErrback(self._defaultErrback) 507 # stack callbacks so that a new one only gets sent after the previous 508 # one has completed 509 for name in self._components.keys(): 510 d = d.addCallback(lambda result, name: self.reloadComponent(name), name) 511 d.addErrback(self._defaultErrback) 512 return d
513 514 # used by other admin clients 515 # FIXME: isn't it great how hard it is to guess what duckport is ?
516 - def reload_async(self, duckport):
517 name = reflect.filenameToModuleName(__file__) 518 519 self.info("rebuilding '%s'" % name) 520 rebuild.rebuild(sys.modules[name]) 521 522 d = self.reloadManager() 523 yield d 524 try: 525 d.value() 526 duckport.write('Reloaded manager') 527 except Exception, e: 528 duckport.write('Failed to reload manager: %s' % e) 529 530 for name in self._components.keys(): 531 d = self.reloadComponent(name) 532 yield d 533 try: 534 d.value() 535 duckport.write('Reloaded component %s' % name) 536 except Exception, e: 537 duckport.write('Failed to reload component %s: %s' % (name, e)) 538 duckport.close()
539 reload_async = defer_generator_method(reload_async) 540
541 - def reloadManager(self):
542 """ 543 Tell the manager to reload its code. 544 545 @rtype: deferred 546 """ 547 def _reloaded(result, self): 548 self.info("reloaded manager code")
549 550 self.emit('reloading', 'manager') 551 self.info("reloading manager code") 552 d = self.callRemote('reloadManager') 553 d.addCallback(_reloaded, self) 554 d.addErrback(self._defaultErrback) 555 return d 556
557 - def reloadComponent(self, componentState):
558 """ 559 Tell the manager to reload code for a component. 560 561 @type componentState: L{flumotion.common.planet.AdminComponentState} 562 563 @rtype: L{twisted.internet.defer.Deferred} 564 """ 565 def _reloaded(result, self, state): 566 self.info("reloaded component %s code" % state.get('name'))
567 568 name = componentState.get('name') 569 self.info("reloading component %s code" % name) 570 self.emit('reloading', name) 571 d = self.callRemote('reloadComponent', componentState) 572 d.addCallback(_reloaded, self, componentState) 573 d.addErrback(self._defaultErrback) 574 return d 575 576 ## manager remote methods
577 - def loadConfiguration(self, xml_string):
578 return self.callRemote('loadConfiguration', xml_string)
579
580 - def getConfiguration(self):
581 return self.callRemote('getConfiguration')
582
583 - def cleanComponents(self):
584 return self.callRemote('cleanComponents')
585 586 # function to get remote code for admin parts 587 # FIXME: rename slightly ? 588 # FIXME: still have hard-coded os.path.join stuff in here for md5sum, 589 # move to bundleloader ?
590 - def getEntry(self, componentState, type):
591 """ 592 Do everything needed to set up the entry point for the given 593 component and type, including transferring and setting up bundles. 594 595 Caller is responsible for adding errbacks to the deferred. 596 597 Returns: a deferred returning (entryPath, filename, methodName) with 598 entryPath: the full local path to the bundle's base 599 fileName: the relative location of the bundled file 600 methodName: the method to instantiate with 601 """ 602 d = self.callRemote('getEntryByType', componentState, type) 603 yield d 604 605 fileName, methodName = d.value() 606 607 self.debug("entry for %r of type %s is in file %s and method %s" % ( 608 componentState, type, fileName, methodName)) 609 d = self.bundleLoader.getBundles(fileName=fileName) 610 yield d 611 612 name, bundlePath = d.value()[-1] 613 yield (bundlePath, fileName, methodName)
614 getEntry = defer_generator_method(getEntry) 615 616 ## worker remote methods
617 - def checkElements(self, workerName, elements):
618 d = self.workerCallRemote(workerName, 'checkElements', elements) 619 d.addErrback(self._defaultErrback) 620 return d
621
622 - def checkImport(self, workerName, moduleName):
623 d = self.workerCallRemote(workerName, 'checkImport', moduleName) 624 d.addErrback(self._defaultErrback) 625 return d
626
627 - def workerRun(self, workerName, moduleName, functionName, *args, **kwargs):
628 """ 629 Run the given function and args on the given worker. If the 630 worker does not already have the module, or it is out of date, 631 it will be retrieved from the manager. 632 633 @rtype: L{twisted.internet.defer.Deferred} firing an 634 L{flumotion.common.messages.Result} 635 """ 636 return self.workerCallRemote(workerName, 'runFunction', moduleName, 637 functionName, *args, **kwargs)
638 639 # FIXME: this should not be allowed to be called, move away 640 # by abstracting callers further
641 - def get_components(self):
642 # returns a dict of name -> component 643 return self._components
644 getComponents = get_components 645
646 - def _setWorkerHeavenState(self, state):
647 self._workerHeavenState = state
648
649 - def getWorkerHeavenState(self):
650 return self._workerHeavenState
651 652 pygobject.type_register(AdminModel) 653