Package flumotion :: Package admin :: Module multi
[hide private]

Source Code for Module flumotion.admin.multi

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22   
 23  from flumotion.twisted import pb as fpb 
 24  from flumotion.common import log, planet 
 25  from flumotion.admin import admin 
 26   
 27   
28 -def get_admin_for_object(object):
29 if object.get('parent'): 30 return get_admin_for_object(object.get('parent')) 31 else: 32 return object.admin
33 34 # this is looking for a home.
35 -def _make_watched(type, *mutators):
36 class Watched(type): 37 def __init__(self): 38 type.__init__(self) 39 self.watch_id = 0 40 self.watch_procs = {} # id -> proc
41 42 def watch(self, proc): 43 self.watch_id += 1 44 self.watch_procs[self.watch_id] = proc 45 return self.watch_id 46 47 def unwatch(self, id): 48 del self.watch_procs[id] 49 50 def notify_changed(self): 51 for proc in self.watch_procs.values(): 52 proc(self) 53 54 def mutate(method): 55 def do_mutate(self, *args, **kwargs): 56 method(self, *args, **kwargs) 57 self.notify_changed() 58 setattr(Watched, method.__name__, do_mutate) 59 for i in mutators: 60 mutate(getattr(type, i)) 61 62 return Watched 63 64 WatchedList = _make_watched(list, 'append', 'insert', 'remove', 'pop', 65 'sort', 'reverse') 66 WatchedDict = _make_watched(dict, '__setitem__', '__delitem__', 'pop', 67 'popitem', 'update') 68 69
70 -class MultiAdminModel(log.Loggable):
71 logCategory = 'multiadmin' 72
73 - def __init__(self):
74 # public 75 self.admins = WatchedDict() # {managerId: AdminModel} 76 # private 77 self.listeners = []
78 79 # Listener implementation
80 - def emit(self, signal_name, *args, **kwargs):
81 self.debug('emit %r %r %r' % (signal_name, args, kwargs)) 82 assert signal_name != 'handler' 83 for c in self.listeners: 84 if getattr(c, 'model_handler', None): 85 c.model_handler(c, signal_name, *args, **kwargs) 86 elif getattr(c, 'model_%s' % signal_name): 87 getattr(c, 'model_%s' % signal_name)(*args, **kwargs) 88 else: 89 s = 'No model_%s in %r and no model_handler' % (signal_name, c) 90 raise NotImplementedError(s)
91
92 - def addListener(self, obj):
93 assert not obj in self.listeners 94 self.listeners.append(obj)
95 96 # Public
97 - def addManager(self, host, port, use_insecure, authenticator, 98 tenacious=False):
99 def connected_cb(admin): 100 planet = admin.planet 101 self.info('Connected to manager %s (planet %s)' 102 % (admin.managerId, planet.get('name'))) 103 self.admins[admin.managerId] = admin 104 self.emit('addPlanet', admin, planet)
105 106 def disconnected_cb(admin): 107 self.info('Disconnected from manager') 108 if admin.managerId in self.admins: 109 self.emit('removePlanet', admin, admin.planet) 110 del self.admins[admin.managerId] 111 else: 112 self.warning('Could not find admin model %r' % admin)
113 114 def connection_refused_cb(admin): 115 self.info('Connection to %s:%d refused.' % (host, port)) 116 117 def connection_failed_cb(admin, string): 118 self.info('Connection to %s:%d failed: %s' % (host, port, string)) 119 120 a = admin.AdminModel(authenticator) 121 122 a.connectToHost(host, port, use_insecure, keep_trying=tenacious) 123 a.connect('connected', connected_cb) 124 a.connect('disconnected', disconnected_cb) 125 a.connect('connection-refused', connection_refused_cb) 126 a.connect('connection-failed', connection_failed_cb) 127 return a 128
129 - def close_admin(self, admin):
130 admin.shutdown()
131
132 - def for_each_component(self, object, proc):
133 '''Call a procedure on each component that is a child of OBJECT''' 134 # ah, for multimethods... 135 if isinstance(object, planet.AdminPlanetState): 136 self.for_each_component(object.get('atmosphere'), proc) 137 for f in object.get('flows'): 138 self.for_each_component(f, proc) 139 elif (isinstance(object, planet.AdminAtmosphereState) or 140 isinstance(object, planet.AdminFlowState)): 141 for c in object.get('components'): 142 self.for_each_component(c, proc) 143 elif isinstance(object, planet.AdminComponentState): 144 proc(object)
145
146 - def do_component_op(self, object, op):
147 '''Call a method on the remote component object associated with 148 a component state''' 149 admin = get_admin_for_object(object) 150 def do_op(object): 151 admin.callRemote('component'+op, object)
152 self.for_each_component(object, do_op) 153