Package flumotion :: Package admin :: Module admin
[hide private]

Source Code for Module flumotion.admin.admin

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  model abstraction for administration clients supporting different views 
 24  """ 
 25   
 26  from twisted.internet import error, defer, reactor 
 27  from zope.interface import implements 
 28   
 29  from flumotion.common import common, errors, interfaces, log 
 30  from flumotion.common import medium 
 31  from flumotion.common import messages, signals 
 32  from flumotion.common import planet, worker # register jelly 
 33  from flumotion.common.i18n import N_, gettexter 
 34  from flumotion.configure import configure 
 35  from flumotion.twisted import pb as fpb 
 36   
 37  __version__ = "$Rev: 6979 $" 
 38  T_ = gettexter() 
 39   
 40   
41 -class AdminClientFactory(fpb.ReconnectingFPBClientFactory):
42 perspectiveInterface = interfaces.IAdminMedium 43
44 - def __init__(self, medium, extraTenacious=False, maxDelay=20):
45 """ 46 @type medium: AdminModel 47 """ 48 fpb.ReconnectingFPBClientFactory.__init__(self) 49 self.medium = medium 50 self.maxDelay = maxDelay 51 52 self.extraTenacious = extraTenacious 53 self.hasBeenConnected = 0 54 55 self._connector = None
56
57 - def startedConnecting(self, connector):
58 self._connector = connector 59 return fpb.ReconnectingFPBClientFactory.startedConnecting( 60 self, connector)
61
62 - def clientConnectionMade(self, broker):
63 self.hasBeenConnected = 1 64 65 fpb.ReconnectingFPBClientFactory.clientConnectionMade(self, broker)
66
67 - def clientConnectionFailed(self, connector, reason):
68 """ 69 @type connector: implementation of 70 L{twisted.internet.interfaces.IConnector} 71 @param reason: L{twisted.spread.pb.failure.Failure} 72 """ 73 if reason.check(error.DNSLookupError): 74 self.debug('DNS lookup error') 75 if not self.extraTenacious: 76 self.medium.connectionFailed(reason) 77 return 78 elif (reason.check(error.ConnectionRefusedError) 79 or reason.check(error.ConnectError)): 80 # If we're logging in for the first time, we want to make this a 81 # real error; we present a dialog, etc. 82 # However, if we fail later on (e.g. manager shut down, and 83 # hasn't yet been restarted), we want to keep trying to reconnect, 84 # so we just log a message. 85 self.debug("Error connecting to %s: %s", connector.getDestination(), 86 log.getFailureMessage(reason)) 87 if self.hasBeenConnected: 88 self.log("we've been connected before though, so going " 89 "to retry") 90 # fall through 91 elif self.extraTenacious: 92 self.log("trying again due to +100 tenacity") 93 # fall through 94 else: 95 self.log("telling medium about connection failure") 96 self.medium.connectionFailed(reason) 97 # return 98 return 99 100 fpb.ReconnectingFPBClientFactory.clientConnectionFailed(self, 101 connector, reason)
102 103 # vmethod implementation
104 - def gotDeferredLogin(self, d):
105 def success(remote): 106 self.medium.setRemoteReference(remote)
107 108 def error(failure): 109 if self.extraTenacious: 110 self.debug('connection problem to %s: %s', 111 self._connector.getDestination(), 112 log.getFailureMessage(failure)) 113 self.debug('we are tenacious, so trying again later') 114 self.disconnect() 115 elif failure.check(errors.ConnectionFailedError): 116 self.debug("emitting connection-failed") 117 self.medium.emit('connection-failed', "I failed my master") 118 self.debug("emitted connection-failed") 119 elif failure.check(errors.ConnectionRefusedError): 120 self.debug("emitting connection-refused") 121 self.medium.emit('connection-refused') 122 self.debug("emitted connection-refused") 123 elif failure.check(errors.NotAuthenticatedError): 124 # FIXME: unauthorized login emit ! 125 self.debug("emitting connection-refused") 126 self.medium.emit('connection-refused') 127 self.debug("emitted connection-refused") 128 else: 129 self.medium.emit('connection-error', failure) 130 self.warning('connection error to %s:: %s', 131 self._connector.getDestination(), 132 log.getFailureMessage(failure))
133 # swallow error 134 135 d.addCallbacks(success, error) 136 return d 137 138 # FIXME: stop using signals, we can provide a richer interface with actual 139 # objects and real interfaces for the views a model communicates with
140 -class AdminModel(medium.PingingMedium, signals.SignalMixin):
141 """ 142 I live in the admin client. 143 I am a data model for any admin view implementing a UI to 144 communicate with one manager. 145 I send signals when things happen. 146 147 Manager calls on us through L{flumotion.manager.admin.AdminAvatar} 148 """ 149 __signals__ = ('connected', 'disconnected', 'connection-refused', 150 'connection-failed', 'connection-error', 'reloading', 151 'message', 'update') 152 153 logCategory = 'adminmodel' 154 155 implements(interfaces.IAdminMedium) 156 157 # Public instance variables (read-only) 158 planet = None 159
160 - def __init__(self):
161 # All of these instance variables are private. Cuidado cabrones! 162 self.connectionInfo = None 163 self.keepTrying = None 164 self._writeConnection = True 165 166 self.managerId = '<uninitialized>' 167 168 self.connected = False 169 self.clientFactory = None 170 171 self._deferredConnect = None 172 173 self._components = {} # dict of components 174 self.planet = None 175 self._workerHeavenState = None
176
177 - def connectToManager(self, connectionInfo, keepTrying=False, 178 writeConnection=True):
179 'Connect to a host.' 180 assert self.clientFactory is None 181 182 self.connectionInfo = connectionInfo 183 self._writeConnection = writeConnection 184 185 # give the admin an id unique to the manager -- if a program is 186 # adminning multiple managers, this id should tell them apart 187 # (and identify duplicates) 188 self.managerId = str(connectionInfo) 189 self.logName = self.managerId 190 191 self.info('Connecting to manager %s with %s', 192 self.managerId, connectionInfo.use_ssl and 'SSL' or 'TCP') 193 194 self.clientFactory = AdminClientFactory(self, 195 extraTenacious=keepTrying, 196 maxDelay=20) 197 self.clientFactory.startLogin(connectionInfo.authenticator) 198 199 if connectionInfo.use_ssl: 200 common.assertSSLAvailable() 201 from twisted.internet import ssl 202 reactor.connectSSL(connectionInfo.host, connectionInfo.port, 203 self.clientFactory, ssl.ClientContextFactory()) 204 else: 205 reactor.connectTCP(connectionInfo.host, connectionInfo.port, 206 self.clientFactory) 207 208 def connected(model, d): 209 # model is really "self". yay gobject? 210 d.callback(model)
211 212 def disconnected(model, d): 213 # can happen after setRemoteReference but before 214 # getPlanetState or getWorkerHeavenState returns 215 if not keepTrying: 216 d.errback(errors.ConnectionFailedError('Lost connection'))
217 218 def connection_refused(model, d): 219 if not keepTrying: 220 d.errback(errors.ConnectionRefusedError()) 221 222 def connection_failed(model, reason, d): 223 if not keepTrying: 224 d.errback(errors.ConnectionFailedError(reason)) 225 226 def connection_error(model, failure, d): 227 if not keepTrying: 228 d.errback(failure) 229 230 d = defer.Deferred() 231 ids = [] 232 ids.append(self.connect('connected', connected, d)) 233 ids.append(self.connect('disconnected', disconnected, d)) 234 ids.append(self.connect('connection-refused', connection_refused, d)) 235 ids.append(self.connect('connection-failed', connection_failed, d)) 236 ids.append(self.connect('connection-error', connection_error, d)) 237 238 def success(model): 239 map(self.disconnect, ids) 240 self._deferredConnect = None 241 return model 242 243 def failure(f): 244 map(self.disconnect, ids) 245 self._deferredConnect = None 246 return f 247 248 d.addCallbacks(success, failure) 249 self._deferredConnect = d 250 return d 251
252 - def shutdown(self):
253 self.debug('shutting down') 254 if self.clientFactory is not None: 255 # order not semantically important, but this way we avoid a 256 # "reconnecting in X seconds" in the log 257 self.clientFactory.stopTrying() 258 self.clientFactory.disconnect() 259 self.clientFactory = None 260 261 if self._deferredConnect is not None: 262 # this can happen with keepTrying=True 263 self.debug('cancelling connection attempt') 264 self._deferredConnect.errback(errors.ConnectionCancelledError())
265
266 - def reconnect(self, keepTrying=False):
267 """Close any existing connection to the manager and 268 reconnect.""" 269 self.debug('asked to log in again') 270 self.shutdown() 271 return self.connectToManager(self.connectionInfo, keepTrying)
272 273 # FIXME: give these three sensible names
274 - def adminInfoStr(self):
275 return self.managerId
276
277 - def connectionInfoStr(self):
278 return '%s:%s (%s)' % (self.connectionInfo.host, 279 self.connectionInfo.port, 280 self.connectionInfo.use_ssl 281 and 'https' or 'http')
282 283 # used in fgc
284 - def managerInfoStr(self):
285 assert self.planet 286 return '%s (%s)' % (self.planet.get('name'), self.managerId)
287
288 - def connectionFailed(self, failure):
289 # called by client factory 290 if failure.check(error.DNSLookupError): 291 message = ("Could not look up host '%s'." 292 % self.connectionInfo.host) 293 elif failure.check(error.ConnectionRefusedError): 294 message = ("Could not connect to host '%s' on port %d." 295 % (self.connectionInfo.host, 296 self.connectionInfo.port)) 297 else: 298 message = ("Unexpected failure.\nDebug information: %s" 299 % log.getFailureMessage (failure)) 300 self.debug('emitting connection-failed') 301 self.emit('connection-failed', message) 302 self.debug('emitted connection-failed')
303
304 - def setRemoteReference(self, remoteReference):
305 self.debug("setRemoteReference %r", remoteReference) 306 def gotPlanetState(planet): 307 self.planet = planet 308 # monkey, Monkey, MONKEYPATCH!!!!! 309 self.planet.admin = self 310 self.debug('got planet state') 311 return self.callRemote('getWorkerHeavenState')
312 313 def gotWorkerHeavenState(whs): 314 self._workerHeavenState = whs 315 self.debug('got worker state') 316 317 self.debug('Connected to manager and retrieved all state') 318 self.connected = True 319 if self._writeConnection: 320 writeConnection() 321 self.emit('connected') 322 323 def writeConnection(): 324 i = self.connectionInfo 325 if not (i.authenticator.username 326 and i.authenticator.password): 327 self.log('not caching connection information') 328 return 329 s = ''.join(['<connection>', 330 '<host>%s</host>' % i.host, 331 '<manager>%s</manager>' % self.planet.get('name'), 332 '<port>%d</port>' % i.port, 333 '<use_insecure>%d</use_insecure>' 334 % ((not i.use_ssl) and 1 or 0), 335 '<user>%s</user>' % i.authenticator.username, 336 '<passwd>%s</passwd>' % i.authenticator.password, 337 '</connection>']) 338 339 import os 340 import md5 341 md5sum = md5.new(s).hexdigest() 342 f = os.path.join(configure.registrydir, '%s.connection' % md5sum) 343 try: 344 h = open(f, 'w') 345 h.write(s) 346 h.close() 347 except Exception, e: 348 self.info('failed to write connection cache file %s: %s', 349 f, log.getExceptionMessage(e)) 350 351 # chain up 352 medium.PingingMedium.setRemoteReference(self, remoteReference) 353 354 # fixme: push the disconnect notification upstream 355 def remoteDisconnected(remoteReference): 356 self.debug("emitting disconnected") 357 self.connected = False 358 self.emit('disconnected') 359 self.debug("emitted disconnected") 360 self.remote.notifyOnDisconnect(remoteDisconnected) 361 362 d = self.callRemote('getPlanetState') 363 d.addCallback(gotPlanetState) 364 d.addCallback(gotWorkerHeavenState) 365 return d 366 367 ### model functions; called by UI's to send requests to manager or comp 368 369 ## view management functions
370 - def isConnected(self):
371 return self.connected
372 373 ## generic remote call methods
374 - def componentCallRemote(self, componentState, methodName, *args, **kwargs):
375 """ 376 Call the given method on the given component with the given args. 377 378 @param componentState: component to call the method on 379 @type componentState: L{flumotion.common.planet.AdminComponentState} 380 @param methodName: name of method to call; serialized to a 381 remote_methodName on the worker's medium 382 383 @rtype: L{twisted.internet.defer.Deferred} 384 """ 385 d = self.callRemote('componentCallRemote', 386 componentState, methodName, 387 *args, **kwargs) 388 def errback(failure): 389 msg = None 390 if failure.check(errors.NoMethodError): 391 msg = "Remote method '%s' does not exist." % methodName 392 msg += "\n" + failure.value 393 else: 394 msg = log.getFailureMessage(failure) 395 396 # FIXME: we probably need a nicer way of getting component 397 # messages shown from the admin model, but this allows us to 398 # make sure every type of admin has these messages 399 self.warning(msg) 400 m = messages.Warning(T_(N_("Internal error in component.")), 401 debug=msg) 402 componentState.observe_append('messages', m) 403 return failure
404 405 d.addErrback(errback) 406 # FIXME: dialog for other errors ? 407 return d 408
409 - def workerCallRemote(self, workerName, methodName, *args, **kwargs):
410 """ 411 Call the the given method on the given worker with the given args. 412 413 @param workerName: name of the worker to call the method on 414 @param methodName: name of method to call; serialized to a 415 remote_methodName on the worker's medium 416 417 @rtype: L{twisted.internet.defer.Deferred} 418 """ 419 return self.callRemote('workerCallRemote', workerName, 420 methodName, *args, **kwargs)
421 422 ## manager remote methods
423 - def loadConfiguration(self, xml_string):
424 return self.callRemote('loadConfiguration', xml_string)
425
426 - def getConfiguration(self):
427 return self.callRemote('getConfiguration')
428
429 - def cleanComponents(self):
430 return self.callRemote('cleanComponents')
431 432 ## worker remote methods
433 - def checkElements(self, workerName, elements):
434 return self.workerCallRemote(workerName, 'checkElements', elements)
435
436 - def checkImport(self, workerName, moduleName):
437 return self.workerCallRemote(workerName, 'checkImport', moduleName)
438
439 - def workerRun(self, workerName, moduleName, functionName, *args, **kwargs):
440 """ 441 Run the given function and args on the given worker. If the 442 worker does not already have the module, or it is out of date, 443 it will be retrieved from the manager. 444 445 @rtype: L{twisted.internet.defer.Deferred} firing an 446 L{flumotion.common.messages.Result} 447 """ 448 return self.workerCallRemote(workerName, 'runFunction', moduleName, 449 functionName, *args, **kwargs)
450
451 - def getWizardEntries(self, wizardTypes=None, provides=None, accepts=None):
452 return self.callRemote('getWizardEntries', 453 wizardTypes, provides, accepts)
454
455 - def getWorkerHeavenState(self):
456 return self._workerHeavenState
457