Package flumotion :: Package admin :: Module multi
[hide private]

Source Code for Module flumotion.admin.multi

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """admin model used to connect to multiple managers""" 
 23   
 24  from twisted.internet import defer 
 25   
 26  from flumotion.common import log, planet, errors, startset, watched 
 27  from flumotion.admin import admin 
 28   
 29  __version__ = "$Rev: 6961 $" 
 30   
 31   
32 -def get_admin_for_object(object):
33 import warnings 34 warnings.warn('Use getAdminForObject', DeprecationWarning, stacklevel=2) 35 return getAdminForObject(object)
36
37 -def getAdminForObject(object):
38 if object.get('parent'): 39 return get_admin_for_object(object.get('parent')) 40 else: 41 return object.admin
42
43 -class MultiAdminModel(log.Loggable):
44 logCategory = 'multiadmin' 45
46 - def __init__(self):
47 self.admins = watched.WatchedDict() # {managerId: AdminModel} 48 49 self._listeners = [] 50 self._reconnectHandlerIds = {} # managerId => [disconnect, id..] 51 self._startSet = startset.StartSet(self.admins.has_key, 52 errors.AlreadyConnectingError, 53 errors.AlreadyConnectedError)
54 55 # Listener implementation
56 - def emit(self, signal_name, *args, **kwargs):
57 self.debug('emit %r %r %r' % (signal_name, args, kwargs)) 58 assert signal_name != 'handler' 59 for c in self._listeners: 60 if getattr(c, 'model_handler', None): 61 c.model_handler(c, signal_name, *args, **kwargs) 62 elif getattr(c, 'model_%s' % signal_name): 63 getattr(c, 'model_%s' % signal_name)(*args, **kwargs) 64 else: 65 s = 'No model_%s in %r and no model_handler' % (signal_name, c) 66 raise NotImplementedError(s)
67
68 - def addListener(self, obj):
69 assert not obj in self._listeners 70 self._listeners.append(obj)
71
72 - def removeListener(self, obj):
73 self._listeners.remove(obj)
74
75 - def _managerConnected(self, admin):
76 if admin.managerId not in self._reconnectHandlerIds: 77 # the first time a manager is connected to, start listening 78 # for reconnections; intertwingled with removeManager() 79 ids = [] 80 ids.append(admin.connect('connected', 81 self._managerConnected)) 82 ids.append(admin.connect('disconnected', 83 self._managerDisconnected)) 84 self._reconnectHandlerIds[admin.managerId] = admin, ids 85 86 adminplanet = admin.planet 87 self.info('Connected to manager %s (planet %s)', 88 admin.managerId, adminplanet.get('name')) 89 assert admin.managerId not in self.admins 90 self.admins[admin.managerId] = admin 91 self.emit('addPlanet', admin, adminplanet)
92
93 - def _managerDisconnected(self, admin):
94 if admin.managerId in self.admins: 95 self.emit('removePlanet', admin, admin.planet) 96 del self.admins[admin.managerId] 97 else: 98 self.warning('Could not find admin model %r', admin)
99
100 - def addManager(self, connectionInfo, tenacious=False, 101 writeConnection=True):
102 i = connectionInfo 103 managerId = str(i) 104 105 # This dance of deferreds is here so as to make sure that 106 # removeManager can cancel a pending connection. 107 108 # can raise errors.AlreadyConnectingError or 109 # errors.AlreadyConnectedError 110 try: 111 startD = self._startSet.createStart(managerId) 112 except Exception, e: 113 return defer.fail(e) 114 115 a = admin.AdminModel() 116 connectD = a.connectToManager(i, tenacious, 117 writeConnection=writeConnection) 118 assert a.managerId == managerId 119 120 def connect_callback(_): 121 self._startSet.avatarStarted(managerId)
122 123 def connect_errback(failure): 124 self._startSet.avatarStopped(managerId, lambda _: failure)
125 126 connectD.addCallbacks(connect_callback, connect_errback) 127 128 def start_callback(_): 129 self._managerConnected(a) 130 131 def start_errback(failure): 132 a.shutdown() 133 return failure 134 135 startD.addCallbacks(start_callback, start_errback) 136 137 return startD 138
139 - def removeManager(self, managerId):
140 self.info('disconnecting from %s', managerId) 141 142 # Four cases: 143 # (1) We have no idea about this managerId, the caller is 144 # confused -- do nothing 145 # (2) We started connecting to this managerId, but never 146 # succeeded -- cancel pending connections 147 # (3) We connected at least once, and are connected now -- we 148 # have entries in the _reconnectHandlerIds and in self.admins -- 149 # disconnect from the signals, disconnect from the remote 150 # manager, and don't try to reconnect 151 # (4) We connected at least once, but are disconnected now -- we 152 # have an entry in _reconnectHandlerIds but not self.admins -- 153 # disconnect from the signals, and stop trying to reconnect 154 155 # stop listening to admin's signals, if the manager had actually 156 # connected at some point 157 if managerId in self._reconnectHandlerIds: 158 admin, handlerIds = self._reconnectHandlerIds.pop(managerId) 159 map(admin.disconnect, handlerIds) # (3) and (4) 160 if managerId not in self.admins: 161 admin.shutdown() # (4) 162 163 if managerId in self.admins: # (3) 164 admin = self.admins[managerId] 165 admin.shutdown() 166 self._managerDisconnected(admin) 167 168 # Firing this has the side effect of errbacking on any pending 169 # start, calling start_errback above if appropriate. (2) 170 self._startSet.avatarStopped( 171 managerId, lambda _: errors.ConnectionCancelledError()) 172 173 # always succeed, see (1) 174 return defer.succeed(managerId)
175
176 - def for_each_component(self, object, proc):
177 '''Call a procedure on each component that is a child of OBJECT''' 178 # ah, for multimethods... 179 if isinstance(object, planet.AdminPlanetState): 180 self.for_each_component(object.get('atmosphere'), proc) 181 for f in object.get('flows'): 182 self.for_each_component(f, proc) 183 elif (isinstance(object, planet.AdminAtmosphereState) or 184 isinstance(object, planet.AdminFlowState)): 185 for c in object.get('components'): 186 self.for_each_component(c, proc) 187 elif isinstance(object, planet.AdminComponentState): 188 proc(object)
189
190 - def do_component_op(self, object, op):
191 '''Call a method on the remote component object associated with 192 a component state''' 193 admin = get_admin_for_object(object) 194 def do_op(object): 195 admin.callRemote('component'+op, object)
196 self.for_each_component(object, do_op) 197