Package flumotion :: Package manager :: Module admin
[hide private]

Source Code for Module flumotion.manager.admin

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_admin -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  manager-side objects to handle administrative clients 
 24  """ 
 25   
 26  import re 
 27  import os 
 28  from StringIO import StringIO 
 29   
 30  from twisted.internet import reactor, defer 
 31  from twisted.spread import pb 
 32  from twisted.python import failure 
 33   
 34  from flumotion.manager import base 
 35  from flumotion.common import errors, interfaces, log, planet, registry 
 36   
 37  # make Result and Message proxyable 
 38  from flumotion.common import messages 
 39   
 40  # make ComponentState proxyable 
 41  from flumotion.twisted import flavors 
 42  from flumotion.twisted.compat import implements 
 43  from flumotion.common import componentui 
 44   
 45  # FIXME: rename to Avatar since we are in the admin. namespace ? 
46 -class AdminAvatar(base.ManagerAvatar):
47 """ 48 I am an avatar created for an administrative client interface. 49 A reference to me is given (for example, to gui.AdminInterface) 50 when logging in and requesting an "admin" avatar. 51 I live in the manager. 52 """ 53 logCategory = 'admin-avatar' 54 55 # override base methods
56 - def attached(self, mind):
57 self.info('admin client "%s" logged in' % self.avatarId) 58 base.ManagerAvatar.attached(self, mind)
59
60 - def detached(self, mind):
61 self.info('admin client "%s" logged out' % self.avatarId) 62 base.ManagerAvatar.detached(self, mind)
63 64 # FIXME: instead of doing this, give a RemoteCache of the heaven state ?
65 - def getComponentStates(self):
66 """ 67 Return all component states logged in to the manager. 68 The list gets serialized to a list of 69 L{flumotion.common.planet.AdminComponentState} 70 71 @rtype: list of L{planet.ManagerComponentState} 72 """ 73 return self.vishnu.getComponentStates()
74
75 - def sendLog(self, category, type, message):
76 """ 77 Send the given log message to the peer. 78 """ 79 # don't send if we don't have a remote reference yet. 80 # this avoids recursion from the remote caller trying to warn 81 if self.hasRemoteReference(): 82 self.mindCallRemote('log', category, type, message)
83 84 # override pb.Avatar implementation so we can run admin actions
85 - def perspectiveMessageReceived(self, broker, message, args, kwargs):
86 args = broker.unserialize(args, self) 87 kwargs = broker.unserialize(kwargs, self) 88 method = getattr(self, "perspective_%s" % message) 89 90 level = log.DEBUG 91 if message == 'ping': level = log.LOG 92 debugClass = self.logCategory.upper() 93 startArgs = [self.remoteLogName, debugClass, message] 94 format, debugArgs = log.getFormatArgs( 95 '%s --> %s: perspective_%s(', startArgs, 96 ')', (), args, kwargs) 97 98 # log going into the method 99 logKwArgs = self.doLog(level, method, format, *debugArgs) 100 101 benignMethods = ('ping',) 102 if message not in benignMethods: 103 self.vishnu.adminAction(self.remoteIdentity, message, args, kwargs) 104 105 try: 106 state = method(*args, **kwargs) 107 except TypeError: 108 self.debug("%s didn't accept %s and %s" % (method, args, kwargs)) 109 raise 110 except pb.Error, e: 111 format, debugArgs = log.getFormatArgs( 112 '%s <-- %s: perspective_%s(', startArgs, 113 '): pb.Error %r', (e, ), args, kwargs) 114 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 115 raise e 116 117 # log coming out of the method 118 if isinstance(state, defer.Deferred): 119 # for a deferred, we currently can't log a better location than 120 # the def line for the function/instance that we called above 121 def callback(result): 122 format, debugArgs = log.getFormatArgs( 123 '%s <-- %s: perspective_%s(', startArgs, 124 '): %r', (result, ), args, kwargs) 125 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 126 return result
127 def errback(failure): 128 format, debugArgs = log.getFormatArgs( 129 '%s <-- %s: perspective_%s(', startArgs, 130 '): failure %r', (failure, ), args, kwargs) 131 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 132 return failure
133 134 state.addCallback(callback) 135 state.addErrback(errback) 136 else: 137 format, debugArgs = log.getFormatArgs( 138 '%s <-- %s: perspective_%s(', startArgs, 139 '): %r', (state, ), args, kwargs) 140 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 141 142 return broker.serialize(state, self, method, args, kwargs) 143 144 ### pb.Avatar IPerspective methods
145 - def perspective_getPlanetState(self):
146 """ 147 Get the planet state. 148 149 @rtype: L{flumotion.common.planet.ManagerPlanetState} 150 """ 151 self.debug("returning planet state %r" % self.vishnu.state) 152 return self.vishnu.state
153
154 - def perspective_getWorkerHeavenState(self):
155 """ 156 Get the worker heaven state. 157 158 @rtype: L{flumotion.common.worker.ManagerWorkerHeavenState} 159 """ 160 self.debug("returning worker heaven state %r" % self.vishnu.state) 161 return self.vishnu.workerHeaven.state
162
163 - def perspective_shutdown(self):
164 """ 165 Shut down the manager. 166 """ 167 self.warning("Shutdown requested, shutting down...") 168 reactor.stop()
169
170 - def perspective_componentStart(self, componentState):
171 """ 172 Start the given component. The component should be sleeping before 173 this. 174 175 @type componentState: L{planet.ManagerComponentState} 176 """ 177 self.debug('perspective_componentStart(%r)' % componentState) 178 return self.vishnu.componentCreate(componentState)
179
180 - def perspective_componentStop(self, componentState):
181 """ 182 Stop the given component. 183 If the component was sad, we clear its sad state as well, 184 since the stop was explicitly requested by the admin. 185 186 @type componentState: L{planet.ManagerComponentState} 187 """ 188 self.debug('perspective_componentStop(%r)' % componentState) 189 return self.vishnu.componentStop(componentState)
190
191 - def perspective_componentRestart(self, componentState):
192 """ 193 Restart the given component. 194 195 @type componentState: L{planet.ManagerComponentState} 196 """ 197 self.debug('perspective_componentRestart(%r)' % componentState) 198 d = self.perspective_componentStop(componentState) 199 d.addCallback(lambda *x: self.perspective_componentStart(componentState)) 200 return d
201 202 # Generic interface to call into a component
203 - def perspective_componentCallRemote(self, componentState, methodName, 204 *args, **kwargs):
205 """ 206 Call a method on the given component on behalf of an admin client. 207 208 @param componentState: state of the component to call the method on 209 @type componentState: L{planet.ManagerComponentState} 210 @param methodName: name of the method to call. Gets proxied to 211 L{flumotion.component.component.""" \ 212 """BaseComponentMedium}'s remote_(methodName) 213 @type methodName: str 214 215 @rtype: L{twisted.internet.defer.Deferred} 216 """ 217 assert isinstance(componentState, planet.ManagerComponentState) 218 219 if methodName == "start": 220 self.warning('forwarding "start" to perspective_componentStart') 221 return self.perspective_componentStart(componentState) 222 223 m = self.vishnu.getComponentMapper(componentState) 224 avatar = m.avatar 225 226 if not avatar: 227 self.warning('No avatar for %s, cannot call remote' % 228 componentState.get('name')) 229 raise errors.SleepingComponentError() 230 231 # XXX: Maybe we need to have a prefix, so we can limit what an 232 # admin interface can call on a component 233 try: 234 return avatar.mindCallRemote(methodName, *args, **kwargs) 235 except Exception, e: 236 msg = "exception on remote call %s: %s" % (methodName, 237 log.getExceptionMessage(e)) 238 self.warning(msg) 239 raise errors.RemoteMethodError(methodName, 240 log.getExceptionMessage(e))
241
242 - def perspective_workerCallRemote(self, workerName, methodName, 243 *args, **kwargs):
244 """ 245 Call a remote method on the worker. 246 This is used so that admin clients can call methods from the interface 247 to the worker. 248 249 @param workerName: the worker to call 250 @type workerName: str 251 @param methodName: Name of the method to call. Gets proxied to 252 L{flumotion.worker.worker.WorkerMedium} 's 253 remote_(methodName) 254 @type methodName: str 255 """ 256 257 self.debug('AdminAvatar.workerCallRemote(%r, %r)' % ( 258 workerName, methodName)) 259 workerAvatar = self.vishnu.workerHeaven.getAvatar(workerName) 260 261 # XXX: Maybe we need to a prefix, so we can limit what an admin 262 # interface can call on a worker 263 try: 264 return workerAvatar.mindCallRemote(methodName, *args, **kwargs) 265 except Exception, e: 266 self.warning("exception on remote call: %s" % 267 log.getExceptionMessage(e)) 268 return failure.Failure(errors.RemoteMethodError(methodName, 269 log.getExceptionMessage(e)))
270
271 - def perspective_getEntryByType(self, componentState, type):
272 """ 273 Get the entry point for a piece of bundled code by the type. 274 275 Returns: a (filename, methodName) tuple, or raises a Failure 276 """ 277 m = self.vishnu.getComponentMapper(componentState) 278 componentName = componentState.get('name') 279 280 if not m.avatar: 281 self.debug('component %s not logged in yet, no entry' % 282 componentName) 283 raise errors.SleepingComponentError(componentName) 284 285 componentType = m.avatar.getType() 286 self.debug('getting entry of type %s for component %s of type %s' % ( 287 type, componentName, componentType)) 288 try: 289 componentRegistryEntry = registry.getRegistry().getComponent( 290 componentType) 291 # FIXME: add logic here for default entry points and functions 292 entry = componentRegistryEntry.getEntryByType(type) 293 except KeyError: 294 self.warning("Could not find bundle for %s(%s)" % ( 295 componentType, type)) 296 raise errors.NoBundleError("entry type %s in component type %s" % 297 (type, componentType)) 298 299 filename = os.path.join(componentRegistryEntry.base, entry.location) 300 self.debug('entry point is in file path %s and function %s' % ( 301 filename, entry.function)) 302 return (filename, entry.function)
303
304 - def perspective_reloadComponent(self, componentState):
305 """ 306 Reload modules in the given component. 307 308 @param componentState: state of the component to reload 309 @type componentState: L{planet.ManagerComponentState} 310 """ 311 def _reloaded(result, self, name): 312 self.info("reloaded component %s code" % name)
313 314 name = componentState.get('name') 315 self.info("reloading component %s code" % name) 316 m = self.vishnu.getComponentMapper(componentState) 317 avatar = m.avatar 318 d = avatar.reloadComponent() 319 d.addCallback(_reloaded, self, name) 320 return d 321
322 - def perspective_reloadManager(self):
323 """ 324 Reload modules in the manager. 325 """ 326 import sys 327 from twisted.python.rebuild import rebuild 328 self.info('reloading manager code') 329 # reload ourselves first 330 rebuild(sys.modules[__name__]) 331 332 # now rebuild relevant modules 333 import flumotion.common.reload 334 rebuild(sys.modules['flumotion.common']) 335 flumotion.common.reload.reload() 336 self._reloaded()
337
338 - def perspective_getConfiguration(self):
339 """ 340 Get the configuration of the manager as an XML string. 341 342 @rtype: str 343 """ 344 return self.vishnu.getConfiguration()
345
346 - def _saveFlowFile(self, filename):
347 """Opens a file that the flow should be written to. 348 349 Note that the returned file object might be an existing file, 350 opened in append mode; if the loadConfiguration operation 351 succeeds, the file should first be truncated before writing. 352 """ 353 self.vishnu.adminAction(self.remoteIdentity, 354 '_saveFlowFile', (), {}) 355 def ensure_sane(name, extra=''): 356 if not re.match('^[a-zA-Z0-9_' + extra + '-]+$', name): 357 raise errors.ConfigError, \ 358 'Invalid planet or saveAs name: %s' % name
359 360 ensure_sane(self.vishnu.configDir, '/') 361 ensure_sane(filename) 362 dir = os.path.join(self.vishnu.configDir, "flows") 363 self.debug('told to save flow as %s/%s.xml', dir, filename) 364 try: 365 os.makedirs(dir, 0770) 366 except OSError, e: 367 if e.errno != 17: # 17 == EEXIST 368 raise e 369 prev = os.umask(0007) 370 output = open(os.path.join(dir, filename + '.xml'), 'a') 371 os.umask(prev) 372 return output 373
374 - def perspective_loadConfiguration(self, xml, saveAs=None):
375 """ 376 Load the given XML configuration into the manager. If the 377 optional saveAs parameter is passed, the XML snippet will be 378 saved to disk in the manager's flows directory. 379 380 @param xml: the XML configuration snippet. 381 @type xml: str 382 @param saveAs: The name of a file to save the XML as. 383 @type saveAs: str 384 """ 385 386 if saveAs: 387 output = self._saveFlowFile(saveAs) 388 389 f = StringIO(xml) 390 res = self.vishnu.loadConfigurationXML(f, self.remoteIdentity) 391 f.close() 392 393 if saveAs: 394 def success(res): 395 self.debug('loadConfiguration succeeded, writing flow to %r', 396 output) 397 output.truncate(0) 398 output.write(xml) 399 output.close() 400 return res
401 def failure(res): 402 self.debug('loadConfiguration failed, leaving %r as it was', 403 output) 404 output.close() 405 return res 406 res.addCallbacks(success, failure) 407 408 return res 409
410 - def perspective_deleteFlow(self, flowName):
411 return self.vishnu.deleteFlow(flowName)
412
413 - def perspective_deleteComponent(self, componentState):
414 """Delete a component from the manager. 415 416 A component can only be deleted when it is sleeping or sad. It 417 is the caller's job to ensure this is the case; calling this 418 function on a running component will raise a ComponentBusyError. 419 420 @returns: a deferred that will fire when all listeners have been 421 notified of the component removal 422 """ 423 return self.vishnu.deleteComponent(componentState)
424 425 # Deprecated -- remove me when no one uses me any more
426 - def perspective_cleanComponents(self):
427 return self.vishnu.emptyPlanet()
428 429 # separate method so it runs the newly reloaded one :)
430 - def _reloaded(self):
431 self.info('reloaded manager code')
432
433 -class AdminHeaven(base.ManagerHeaven):
434 """ 435 I interface between the Manager and administrative clients. 436 For each client I create an L{AdminAvatar} to handle requests. 437 I live in the manager. 438 """ 439 440 logCategory = "admin-heaven" 441 implements(interfaces.IHeaven) 442 avatarClass = AdminAvatar 443
444 - def __init__(self, vishnu):
445 # doc in base class 446 base.ManagerHeaven.__init__(self, vishnu) 447 #FIXME: don't add a log handler here until we have a good way 448 #of filtering client-side again 449 #log.addLogHandler(self.logHandler) 450 self._logcache = []
451 452 #def logHandler(self, category, type, message): 453 # self.logcache.append((category, type, message)) 454 # for avatar in self.getAvatars(): 455 # avatar.sendLog(category, type, message) 456 457 #def sendCache(self, avatar): 458 # if not avatar.hasRemoteReference(): 459 # reactor.callLater(0.25, self.sendCache, avatar) 460 # return 461 462 # FIXME: do this on request only 463 #self.debug('sending logcache to client (%d messages)' % len(self.logcache)) 464 #for category, type, message in self.logcache: 465 # avatar.sendLog(category, type, message) 466 467 ### my methods 468
469 - def avatarsCallRemote(self, methodName, *args, **kwargs):
470 """ 471 Call a remote method on all AdminAvatars in this heaven. 472 473 @param methodName: Name of the method to call. Gets proxied to 474 L{flumotion.admin.admin.AdminModel}'s 475 remote_(methodName) 476 @type methodName: str 477 """ 478 for avatar in self.getAvatars(): 479 avatar.mindCallRemote(methodName, *args, **kwargs)
480