1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """a data structure to manage asynchronous avatar starts and shutdowns
23 """
24
25 from twisted.internet import defer
26
27 from flumotion.common import log
28
29 __version__ = "$Rev: 6964 $"
30
31
32
33
34
35
36
38 - def __init__(self, avatarLoggedIn, alreadyStartingError,
39 alreadyRunningError):
40 """Create a StartSet, a data structure for managing starts and
41 stops of remote processes, for example jobs in a jobheaven.
42
43 @param avatarLoggedIn: a procedure of type avatarId->boolean;
44 should return True if the avatarId is logged in and "ready", and
45 False otherwise. An avatarId is ready if avatarStarted() could
46 have been called on it. This interface is made this way because
47 it is assumed that whatever code instantiates a StartSet keeps
48 track of "ready" remote processes, and this way we prevent data
49 duplication.
50 @param alreadyStartingError: An exception class to raise if
51 createStart() is called, but there is already a create deferred
52 registered for that avatarId.
53 @param alreadyRunningError: An exception class to raise if
54 createStart() is called, but there is already a "ready" process
55 with that avatarId.
56 """
57 self._avatarLoggedIn = avatarLoggedIn
58 self._alreadyStartingError = alreadyStartingError
59 self._alreadyRunningError = alreadyRunningError
60
61 self._createDeferreds = {}
62 self._shutdownDeferreds = {}
63
65 """
66 Create and register a deferred for starting a given process.
67 The deferred will be fired when the process is ready, as
68 triggered by a call to createSuccess().
69
70 @param avatarId: the id of the remote process, for example the
71 avatarId of the job
72
73 @rtype: L{twisted.internet.defer.Deferred}
74 """
75 self.debug('making create deferred for %s', avatarId)
76
77 d = defer.Deferred()
78
79
80
81
82
83 if avatarId in self._createDeferreds:
84
85
86 self.info('already have a create deferred for %s', avatarId)
87 raise self._alreadyStartingError(avatarId)
88 elif avatarId in self._shutdownDeferreds:
89
90
91 self.debug('waiting for previous %s to shut down like it '
92 'said it would', avatarId)
93
94 def ensureShutdown(res,
95 shutdown=self._shutdownDeferreds[avatarId]):
96 shutdown.addCallback(lambda _: res)
97 return shutdown
98 d.addCallback(ensureShutdown)
99 elif self._avatarLoggedIn(avatarId):
100
101 self.info('avatar named %s already running', avatarId)
102 raise self._alreadyRunningError(avatarId)
103 else:
104
105 pass
106
107 self.debug('registering deferredCreate for %s', avatarId)
108 self._createDeferreds[avatarId] = d
109 return d
110
112 """
113 Trigger a deferred start previously registerd via createStart().
114 For example, a JobHeaven might call this method when a job has
115 logged in and been told to start a component.
116
117 @param avatarId: the id of the remote process, for example the
118 avatarId of the job
119 """
120 self.debug('triggering create deferred for %s', avatarId)
121 if not avatarId in self._createDeferreds:
122 self.warning('No create deferred registered for %s', avatarId)
123 return
124
125 d = self._createDeferreds[avatarId]
126 del self._createDeferreds[avatarId]
127
128 d.callback(avatarId)
129
131 """
132 Notify the caller that a create has failed, and remove the create
133 from the list of pending creates.
134
135 @param avatarId: the id of the remote process, for example the
136 avatarId of the job
137 @param exception: either an exception or a failure describing
138 why the create failed.
139 """
140 self.debug('create deferred failed for %s', avatarId)
141 if not avatarId in self._createDeferreds:
142 self.warning('No create deferred registered for %s', avatarId)
143 return
144
145 d = self._createDeferreds[avatarId]
146 del self._createDeferreds[avatarId]
147 d.errback(exception)
148
150 """
151 Check if a deferred create has been registered for the given avatarId.
152
153 @param avatarId: the id of the remote process, for example the
154 avatarId of the job
155
156 @returns: The deferred create, if one has been registered.
157 Otherwise None.
158 """
159 return self._createDeferreds.get(avatarId, None)
160
162 """
163 Create and register a deferred that will be fired when a process
164 has shut down cleanly.
165
166 @param avatarId: the id of the remote process, for example the
167 avatarId of the job
168
169 @rtype: L{twisted.internet.defer.Deferred}
170 """
171 self.debug('making shutdown deferred for %s', avatarId)
172
173 if avatarId in self._shutdownDeferreds:
174 self.warning('already have a shutdown deferred for %s',
175 avatarId)
176 return self._shutdownDeferreds[avatarId]
177 else:
178 self.debug('registering shutdown for %s', avatarId)
179 d = defer.Deferred()
180 self._shutdownDeferreds[avatarId] = d
181 return d
182
184 """
185 Trigger a callback on a deferred previously registered via
186 shutdownStart(). For example, a JobHeaven would call this when a
187 job for which shutdownStart() was called is reaped.
188
189 @param avatarId: the id of the remote process, for example the
190 avatarId of the job
191 """
192 self.debug('triggering shutdown deferred for %s', avatarId)
193 if not avatarId in self._shutdownDeferreds:
194 self.warning('No shutdown deferred registered for %s', avatarId)
195 return
196
197 d = self._shutdownDeferreds.pop(avatarId)
198 d.callback(avatarId)
199
201 """
202 Check if a deferred shutdown has been registered for the given
203 avatarId.
204
205 @param avatarId: the id of the remote process, for example the
206 avatarId of the job
207
208 @returns: True if a deferred shutdown has been registered for
209 this object, False otherwise
210 """
211 return avatarId in self._shutdownDeferreds
212
214 """
215 Notify the startset that an avatar has started. If there was a
216 create deferred registered for this avatar, this will cause
217 createSuccess() to be called.
218
219 @param avatarId: the id of the remote process, for example the
220 avatarId of the job
221 """
222 if avatarId in self._createDeferreds:
223 self.createSuccess(avatarId)
224 else:
225 self.log('avatar %s started, but we were not waiting for'
226 ' it', avatarId)
227
229 """
230 Notify the startset that an avatar has stopped. If there was a
231 shutdown deferred registered for this avatar, this will cause
232 shutdownSuccess() to be called.
233
234 On the other hand, if there was a create deferred still pending,
235 we will call createFailed with the result of calling getFailure.
236
237 If no start or create was registered, we do nothing.
238
239 @param avatarId: the id of the remote process, for example the
240 avatarId of the job
241 @param getFailure: procedure of type avatarId -> Failure. The
242 returned failure should describe the reason that the job failed.
243 """
244 if avatarId in self._createDeferreds:
245 self.createFailed(avatarId, getFailure(avatarId))
246 elif avatarId in self._shutdownDeferreds:
247 self.shutdownSuccess(avatarId)
248 else:
249 self.debug('unknown avatar %s logged out', avatarId)
250