Package flumotion :: Package common :: Module startset
[hide private]

Source Code for Module flumotion.common.startset

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """a data structure to manage asynchronous avatar starts and shutdowns 
 23  """ 
 24   
 25  from twisted.internet import defer 
 26   
 27  from flumotion.common import log 
 28   
 29  __version__ = "$Rev: 6964 $" 
 30   
 31   
 32  # This class was factored out of the worker's jobheaven, so sometimes 
 33  # the comments talk about jobs, but they refer to any asynchronous 
 34  # process. For example the multiadmin uses this to manage its 
 35  # connections to remote managers. 
 36   
37 -class StartSet(log.Loggable):
38 - def __init__(self, avatarLoggedIn, alreadyStartingError, 39 alreadyRunningError):
40 """Create a StartSet, a data structure for managing starts and 41 stops of remote processes, for example jobs in a jobheaven. 42 43 @param avatarLoggedIn: a procedure of type avatarId->boolean; 44 should return True if the avatarId is logged in and "ready", and 45 False otherwise. An avatarId is ready if avatarStarted() could 46 have been called on it. This interface is made this way because 47 it is assumed that whatever code instantiates a StartSet keeps 48 track of "ready" remote processes, and this way we prevent data 49 duplication. 50 @param alreadyStartingError: An exception class to raise if 51 createStart() is called, but there is already a create deferred 52 registered for that avatarId. 53 @param alreadyRunningError: An exception class to raise if 54 createStart() is called, but there is already a "ready" process 55 with that avatarId. 56 """ 57 self._avatarLoggedIn = avatarLoggedIn 58 self._alreadyStartingError = alreadyStartingError 59 self._alreadyRunningError = alreadyRunningError 60 61 self._createDeferreds = {} # avatarId => deferred 62 self._shutdownDeferreds = {} # avatarId => deferred
63
64 - def createStart(self, avatarId):
65 """ 66 Create and register a deferred for starting a given process. 67 The deferred will be fired when the process is ready, as 68 triggered by a call to createSuccess(). 69 70 @param avatarId: the id of the remote process, for example the 71 avatarId of the job 72 73 @rtype: L{twisted.internet.defer.Deferred} 74 """ 75 self.debug('making create deferred for %s', avatarId) 76 77 d = defer.Deferred() 78 79 # the question of "what jobs do we know about" is answered in 80 # three places: the create deferreds hash, the set of logged in 81 # avatars, and the shutdown deferreds hash. there are four 82 # possible answers: 83 if avatarId in self._createDeferreds: 84 # (1) a job is already starting: it is in the 85 # createdeferreds hash 86 self.info('already have a create deferred for %s', avatarId) 87 raise self._alreadyStartingError(avatarId) 88 elif avatarId in self._shutdownDeferreds: 89 # (2) a job is shutting down; note it is also in 90 # heaven.avatars 91 self.debug('waiting for previous %s to shut down like it ' 92 'said it would', avatarId) 93 # fixme: i don't understand this code 94 def ensureShutdown(res, 95 shutdown=self._shutdownDeferreds[avatarId]): 96 shutdown.addCallback(lambda _: res) 97 return shutdown
98 d.addCallback(ensureShutdown) 99 elif self._avatarLoggedIn(avatarId): 100 # (3) a job is running fine 101 self.info('avatar named %s already running', avatarId) 102 raise self._alreadyRunningError(avatarId) 103 else: 104 # (4) it's new; we know of nothing with this avatarId 105 pass 106 107 self.debug('registering deferredCreate for %s', avatarId) 108 self._createDeferreds[avatarId] = d 109 return d
110
111 - def createSuccess(self, avatarId):
112 """ 113 Trigger a deferred start previously registerd via createStart(). 114 For example, a JobHeaven might call this method when a job has 115 logged in and been told to start a component. 116 117 @param avatarId: the id of the remote process, for example the 118 avatarId of the job 119 """ 120 self.debug('triggering create deferred for %s', avatarId) 121 if not avatarId in self._createDeferreds: 122 self.warning('No create deferred registered for %s', avatarId) 123 return 124 125 d = self._createDeferreds[avatarId] 126 del self._createDeferreds[avatarId] 127 # return the avatarId the component will use to the original caller 128 d.callback(avatarId)
129
130 - def createFailed(self, avatarId, exception):
131 """ 132 Notify the caller that a create has failed, and remove the create 133 from the list of pending creates. 134 135 @param avatarId: the id of the remote process, for example the 136 avatarId of the job 137 @param exception: either an exception or a failure describing 138 why the create failed. 139 """ 140 self.debug('create deferred failed for %s', avatarId) 141 if not avatarId in self._createDeferreds: 142 self.warning('No create deferred registered for %s', avatarId) 143 return 144 145 d = self._createDeferreds[avatarId] 146 del self._createDeferreds[avatarId] 147 d.errback(exception)
148
149 - def createRegistered(self, avatarId):
150 """ 151 Check if a deferred create has been registered for the given avatarId. 152 153 @param avatarId: the id of the remote process, for example the 154 avatarId of the job 155 156 @returns: The deferred create, if one has been registered. 157 Otherwise None. 158 """ 159 return self._createDeferreds.get(avatarId, None)
160
161 - def shutdownStart(self, avatarId):
162 """ 163 Create and register a deferred that will be fired when a process 164 has shut down cleanly. 165 166 @param avatarId: the id of the remote process, for example the 167 avatarId of the job 168 169 @rtype: L{twisted.internet.defer.Deferred} 170 """ 171 self.debug('making shutdown deferred for %s', avatarId) 172 173 if avatarId in self._shutdownDeferreds: 174 self.warning('already have a shutdown deferred for %s', 175 avatarId) 176 return self._shutdownDeferreds[avatarId] 177 else: 178 self.debug('registering shutdown for %s', avatarId) 179 d = defer.Deferred() 180 self._shutdownDeferreds[avatarId] = d 181 return d
182
183 - def shutdownSuccess(self, avatarId):
184 """ 185 Trigger a callback on a deferred previously registered via 186 shutdownStart(). For example, a JobHeaven would call this when a 187 job for which shutdownStart() was called is reaped. 188 189 @param avatarId: the id of the remote process, for example the 190 avatarId of the job 191 """ 192 self.debug('triggering shutdown deferred for %s', avatarId) 193 if not avatarId in self._shutdownDeferreds: 194 self.warning('No shutdown deferred registered for %s', avatarId) 195 return 196 197 d = self._shutdownDeferreds.pop(avatarId) 198 d.callback(avatarId)
199
200 - def shutdownRegistered(self, avatarId):
201 """ 202 Check if a deferred shutdown has been registered for the given 203 avatarId. 204 205 @param avatarId: the id of the remote process, for example the 206 avatarId of the job 207 208 @returns: True if a deferred shutdown has been registered for 209 this object, False otherwise 210 """ 211 return avatarId in self._shutdownDeferreds
212
213 - def avatarStarted(self, avatarId):
214 """ 215 Notify the startset that an avatar has started. If there was a 216 create deferred registered for this avatar, this will cause 217 createSuccess() to be called. 218 219 @param avatarId: the id of the remote process, for example the 220 avatarId of the job 221 """ 222 if avatarId in self._createDeferreds: 223 self.createSuccess(avatarId) 224 else: 225 self.log('avatar %s started, but we were not waiting for' 226 ' it', avatarId)
227
228 - def avatarStopped(self, avatarId, getFailure):
229 """ 230 Notify the startset that an avatar has stopped. If there was a 231 shutdown deferred registered for this avatar, this will cause 232 shutdownSuccess() to be called. 233 234 On the other hand, if there was a create deferred still pending, 235 we will call createFailed with the result of calling getFailure. 236 237 If no start or create was registered, we do nothing. 238 239 @param avatarId: the id of the remote process, for example the 240 avatarId of the job 241 @param getFailure: procedure of type avatarId -> Failure. The 242 returned failure should describe the reason that the job failed. 243 """ 244 if avatarId in self._createDeferreds: 245 self.createFailed(avatarId, getFailure(avatarId)) 246 elif avatarId in self._shutdownDeferreds: 247 self.shutdownSuccess(avatarId) 248 else: 249 self.debug('unknown avatar %s logged out', avatarId)
250