Package flumotion :: Package manager :: Module admin
[hide private]

Source Code for Module flumotion.manager.admin

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_admin -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  manager-side objects to handle administrative clients 
 24  """ 
 25   
 26  import re 
 27  import os 
 28  import errno 
 29  from StringIO import StringIO 
 30   
 31  from twisted.internet import reactor 
 32  from twisted.python import failure 
 33  from twisted.spread import pb 
 34  from zope.interface import implements 
 35   
 36  from flumotion.manager import base 
 37  from flumotion.common import errors, interfaces, log, planet, registry, debug 
 38  from flumotion.common.python import makedirs 
 39   
 40  # make Result and Message proxyable 
 41  from flumotion.common import messages 
 42   
 43  # make ComponentState proxyable 
 44  from flumotion.twisted import flavors 
 45  from flumotion.common import componentui 
 46   
 47  __version__ = "$Rev: 6981 $" 
 48   
 49   
 50  # FIXME: rename to Avatar since we are in the admin. namespace ? 
51 -class AdminAvatar(base.ManagerAvatar):
52 """ 53 I am an avatar created for an administrative client interface. 54 A reference to me is given (for example, to gui.AdminInterface) 55 when logging in and requesting an "admin" avatar. 56 I live in the manager. 57 """ 58 logCategory = 'admin-avatar' 59 60 # override pb.Avatar implementation so we can run admin actions
61 - def perspectiveMessageReceived(self, broker, message, args, kwargs):
62 benignMethods = ('ping',) 63 64 args = broker.unserialize(args) 65 kwargs = broker.unserialize(kwargs) 66 67 if message not in benignMethods: 68 self.vishnu.adminAction(self.remoteIdentity, message, args, kwargs) 69 70 return base.ManagerAvatar.perspectiveMessageReceivedUnserialised( 71 self, broker, message, args, kwargs)
72 73 ### pb.Avatar IPerspective methods
75 """ 76 Get the planet state. 77 78 @rtype: L{flumotion.common.planet.ManagerPlanetState} 79 """ 80 self.debug("returning planet state %r" % self.vishnu.state) 81 return self.vishnu.state
82
84 """ 85 Get the worker heaven state. 86 87 @rtype: L{flumotion.common.worker.ManagerWorkerHeavenState} 88 """ 89 self.debug("returning worker heaven state %r" % self.vishnu.state) 90 return self.vishnu.workerHeaven.state
91
92 - def perspective_componentStart(self, componentState):
93 """ 94 Start the given component. The component should be sleeping before 95 this. 96 97 @type componentState: L{planet.ManagerComponentState} 98 """ 99 self.debug('perspective_componentStart(%r)' % componentState) 100 return self.vishnu.componentCreate(componentState)
101
102 - def perspective_componentStop(self, componentState):
103 """ 104 Stop the given component. 105 If the component was sad, we clear its sad state as well, 106 since the stop was explicitly requested by the admin. 107 108 @type componentState: L{planet.ManagerComponentState} 109 """ 110 self.debug('perspective_componentStop(%r)' % componentState) 111 return self.vishnu.componentStop(componentState)
112
113 - def perspective_componentRestart(self, componentState):
114 """ 115 Restart the given component. 116 117 @type componentState: L{planet.ManagerComponentState} 118 """ 119 self.debug('perspective_componentRestart(%r)' % componentState) 120 d = self.perspective_componentStop(componentState) 121 d.addCallback(lambda *x: self.perspective_componentStart(componentState)) 122 return d
123 124 # Generic interface to call into a component
125 - def perspective_componentCallRemote(self, componentState, methodName, 126 *args, **kwargs):
127 """ 128 Call a method on the given component on behalf of an admin client. 129 130 @param componentState: state of the component to call the method on 131 @type componentState: L{planet.ManagerComponentState} 132 @param methodName: name of the method to call. Gets proxied to 133 L{flumotion.component.component.""" \ 134 """BaseComponentMedium}'s remote_(methodName) 135 @type methodName: str 136 137 @rtype: L{twisted.internet.defer.Deferred} 138 """ 139 assert isinstance(componentState, planet.ManagerComponentState), \ 140 "%r is not a componentState" % componentState 141 142 if methodName == "start": 143 self.warning('forwarding "start" to perspective_componentStart') 144 return self.perspective_componentStart(componentState) 145 146 m = self.vishnu.getComponentMapper(componentState) 147 avatar = m.avatar 148 149 if not avatar: 150 self.warning('No avatar for %s, cannot call remote' % 151 componentState.get('name')) 152 raise errors.SleepingComponentError(componentState) 153 154 # XXX: Maybe we need to have a prefix, so we can limit what an 155 # admin interface can call on a component 156 try: 157 return avatar.mindCallRemote(methodName, *args, **kwargs) 158 except Exception, e: 159 msg = "exception on remote call %s: %s" % (methodName, 160 log.getExceptionMessage(e)) 161 self.warning(msg) 162 raise errors.RemoteMethodError(methodName, 163 log.getExceptionMessage(e))
164
165 - def perspective_workerCallRemote(self, workerName, methodName, 166 *args, **kwargs):
167 """ 168 Call a remote method on the worker. 169 This is used so that admin clients can call methods from the interface 170 to the worker. 171 172 @param workerName: the worker to call 173 @type workerName: str 174 @param methodName: Name of the method to call. Gets proxied to 175 L{flumotion.worker.medium.WorkerMedium} 's 176 remote_(methodName) 177 @type methodName: str 178 """ 179 180 self.debug('AdminAvatar.workerCallRemote(%r, %r)' % ( 181 workerName, methodName)) 182 workerAvatar = self.vishnu.workerHeaven.getAvatar(workerName) 183 184 # XXX: Maybe we need to a prefix, so we can limit what an admin 185 # interface can call on a worker 186 try: 187 return workerAvatar.mindCallRemote(methodName, *args, **kwargs) 188 except Exception, e: 189 self.warning("exception on remote call: %s" % 190 log.getExceptionMessage(e)) 191 return failure.Failure(errors.RemoteMethodError(methodName, 192 log.getExceptionMessage(e)))
193
194 - def perspective_getEntryByType(self, componentType, entryType):
195 """ 196 Get the entry point for a piece of bundled code in a component by type. 197 @param componentType: the component 198 @type componentType: a string 199 @param entryType: location of the entry point 200 @type entryType: a string 201 Returns: a (filename, methodName) tuple, or raises:: 202 - NoBundleError if the entry location does not exist 203 """ 204 assert componentType is not None 205 206 self.debug('getting entry of type %s for component type %s', 207 entryType, componentType) 208 209 try: 210 componentRegistryEntry = registry.getRegistry().getComponent( 211 componentType) 212 # FIXME: add logic here for default entry points and functions 213 entry = componentRegistryEntry.getEntryByType(entryType) 214 except KeyError: 215 self.warning("Could not find bundle for %s(%s)" % ( 216 componentType, entryType)) 217 raise errors.NoBundleError("entry type %s in component type %s" % 218 (entryType, componentType)) 219 220 filename = os.path.join(componentRegistryEntry.base, entry.location) 221 self.debug('entry point is in file path %s and function %s' % ( 222 filename, entry.function)) 223 return (filename, entry.function)
224
225 - def perspective_getPlugEntry(self, plugType, entryType):
226 """ 227 Get the entry point for a piece of bundled code in a plug by type. 228 @param plugType: the plug 229 @type plugType: a string 230 @param entryType: location of the entry point 231 @type entryType: a string 232 Returns: a (filename, methodName) tuple, or raises:: 233 - NoBundleError if the entry location does not exist 234 """ 235 assert plugType is not None 236 237 self.debug('getting entry of type %s for plug type %s', 238 entryType, plugType) 239 240 try: 241 plugRegistryEntry = registry.getRegistry().getPlug(plugType) 242 entry = plugRegistryEntry.getEntryByType(entryType) 243 except KeyError: 244 self.warning("Could not find bundle for %s(%s)" % ( 245 plugType, entryType)) 246 raise errors.NoBundleError("entry type %s in plug type %s" % 247 (entryType, plugType)) 248 249 self.debug('entry point is in file path %s and function %s' % ( 250 entry.location, entry.function)) 251 return (entry.location, entry.function)
252
254 """ 255 Get the configuration of the manager as an XML string. 256 257 @rtype: str 258 """ 259 return self.vishnu.getConfiguration()
260
261 - def _saveFlowFile(self, filename):
262 """Opens a file that the flow should be written to. 263 264 Note that the returned file object might be an existing file, 265 opened in append mode; if the loadConfiguration operation 266 succeeds, the file should first be truncated before writing. 267 """ 268 self.vishnu.adminAction(self.remoteIdentity, 269 '_saveFlowFile', (), {}) 270 def ensure_sane(name, extra=''): 271 if not re.match('^[a-zA-Z0-9_' + extra + '-]+$', name): 272 raise errors.ConfigError, \ 273 'Invalid planet or saveAs name: %s' % name
274 275 ensure_sane(self.vishnu.configDir, '/') 276 ensure_sane(filename) 277 directory = os.path.join(self.vishnu.configDir, "flows") 278 self.debug('told to save flow as %s/%s.xml', directory, filename) 279 try: 280 makedirs(directory, 0770) 281 except OSError, e: 282 if e.errno != errno.EEXIST: 283 raise e 284 prev = os.umask(0007) 285 output = open(os.path.join(directory, filename + '.xml'), 'a') 286 os.umask(prev) 287 return output
288
289 - def perspective_loadConfiguration(self, xml, saveAs=None):
290 """ 291 Load the given XML configuration into the manager. If the 292 optional saveAs parameter is passed, the XML snippet will be 293 saved to disk in the manager's flows directory. 294 295 @param xml: the XML configuration snippet. 296 @type xml: str 297 @param saveAs: The name of a file to save the XML as. 298 @type saveAs: str 299 """ 300 301 if saveAs: 302 output = self._saveFlowFile(saveAs) 303 304 # Update the registry if needed, so that new/changed component types 305 # can be parsed. 306 registry.getRegistry().verify() 307 308 f = StringIO(xml) 309 res = self.vishnu.loadComponentConfigurationXML(f, self.remoteIdentity) 310 f.close() 311 312 if saveAs: 313 def success(res): 314 self.debug('loadConfiguration succeeded, writing flow to %r', 315 output) 316 output.truncate(0) 317 output.write(xml) 318 output.close() 319 return res
320 def failure(res): 321 self.debug('loadConfiguration failed, leaving %r as it was', 322 output) 323 output.close() 324 return res 325 res.addCallbacks(success, failure) 326 327 return res 328
329 - def perspective_loadComponent(self, componentType, componentId, 330 componentLabel, properties, workerName, 331 plugs=None, eaters=None, 332 isClockMaster=None, virtualFeeds=None):
333 """ 334 Load a component into the manager configuration. 335 Returns a deferred that will be called with the component state. 336 337 @param componentType: The registered type of the component to be added 338 @type componentType: str 339 @param componentId: The identifier of the component to add, 340 should be created by the function 341 L{flumotion.common.common.componentId} 342 @type componentId: str 343 @param componentLabel: The human-readable label of the component. 344 if None, no label will be set. 345 @type componentLabel: str or None 346 @param properties: List of property name-value pairs. 347 See L{flumotion.common.config.buildPropertyDict} 348 @type properties: list of (str, object) 349 @param workerName: the name of the worker where the added 350 component should run. 351 @type workerName: str 352 @param plugs: List of plugs, as type-propertyList pairs. 353 See {flumotion.manager.config.buildPlugsSet}. 354 @type plugs: [(str, [(str, object)])] 355 @param eaters: List of (eater name, feed ID) pairs. 356 See L{flumotion.manager.config.buildEatersDict} 357 @type eaters: [(str, str)] 358 @param isClockMaster: True if the component to be added must be 359 a clock master. Passing False here means 360 that the manager will choose what 361 component, if any, will be clock master 362 for this flow. 363 @type isClockMaster: bool 364 @param virtualFeeds: List of (virtual feed, feeder name) pairs. 365 See L{flumotion.manager.config.buildVirtualFeeds} 366 @type virtualFeeds: [(str, str)] 367 """ 368 return self.vishnu.loadComponent(self.remoteIdentity, componentType, 369 componentId, componentLabel, 370 properties, workerName, 371 plugs or [], eaters or [], 372 isClockMaster, virtualFeeds or [])
373
374 - def perspective_deleteFlow(self, flowName):
375 return self.vishnu.deleteFlow(flowName)
376
377 - def perspective_deleteComponent(self, componentState):
378 """Delete a component from the manager. 379 380 A component can only be deleted when it is sleeping or sad. It 381 is the caller's job to ensure this is the case; calling this 382 function on a running component will raise a ComponentBusyError. 383 384 @returns: a deferred that will fire when all listeners have been 385 notified of the component removal 386 """ 387 return self.vishnu.deleteComponent(componentState)
388
389 - def perspective_getVersions(self):
390 return debug.getVersions()
391
392 - def perspective_cleanComponents(self):
393 return self.vishnu.emptyPlanet()
394 395
396 - def perspective_getWizardEntries(self, types=None, provides=None, 397 accepts=None):
398 """ 399 Fetches the wizard entries which matches the parameters sent in 400 401 @param types: list of component types to fetch, is usually 402 something like ['video-producer'] or ['audio-encoder'] 403 @type types: list of strings 404 @param provides: formats provided, eg ['jpeg', 'speex'] 405 @type provides: list of strings 406 @param accepts: formats accepted, eg ['theora'] 407 @type accepts: list of strings 408 @returns: L{componentui.WizardEntryState} 409 """ 410 def extract(wizards): 411 for wizard in wizards: 412 if types is not None: 413 if wizard.type not in types: 414 continue 415 if provides is not None: 416 for format in wizard.provides: 417 if format.media_type in provides: 418 break 419 else: 420 continue 421 if accepts is not None: 422 for format in wizard.accepts: 423 if format.media_type in accepts: 424 break 425 else: 426 continue 427 yield wizard
428 429 retval = [] 430 r = registry.getRegistry() 431 for component in r.getComponents(): 432 retval += extract(component.wizards) 433 for plug in r.getPlugs(): 434 retval += extract(plug.wizards) 435 del r 436 437 return retval 438 439
440 -class AdminHeaven(base.ManagerHeaven):
441 """ 442 I interface between the Manager and administrative clients. 443 For each client I create an L{AdminAvatar} to handle requests. 444 I live in the manager. 445 """ 446 447 logCategory = "admin-heaven" 448 implements(interfaces.IHeaven) 449 avatarClass = AdminAvatar
450