1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import random
23
24 from twisted.internet import defer, reactor
25 from twisted.python import reflect
26
27
28 from flumotion.common import errors
29
30 __version__ = "$Rev: 6125 $"
31
32
33
35 def wrapper(*args, **kwargs):
36 gen = proc(*args, **kwargs)
37 result = defer.Deferred()
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 result.__callbacks = result.callbacks
57 def with_saved_callbacks(proc, *_args, **_kwargs):
58 saved_callbacks, saved_called = result.callbacks, result.called
59 result.callbacks, result.called = result.__callbacks, False
60 proc(*_args, **_kwargs)
61 result.callbacks, result.called = saved_callbacks, saved_called
62
63
64 def default_errback(failure, d):
65
66
67 if failure.check(errors.HandledException):
68 return failure
69
70 def print_traceback(f):
71 import traceback
72 print 'flumotion.twisted.defer: ' + \
73 'Unhandled error calling', proc.__name__, ':', f.type
74 traceback.print_exc()
75 with_saved_callbacks (lambda: d.addErrback(print_traceback))
76 raise
77 result.addErrback(default_errback, result)
78
79 def generator_next():
80 try:
81 x = gen.next()
82 if isinstance(x, defer.Deferred):
83 x.addCallback(callback, x).addErrback(errback, x)
84 else:
85 result.callback(x)
86 except StopIteration:
87 result.callback(None)
88 except Exception, e:
89 result.errback(e)
90
91 def errback(failure, d):
92 def raise_error():
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107 k, v = failure.parents[-1], failure.value
108 try:
109 if isinstance(k, str):
110 k = reflect.namedClass(k)
111 if isinstance(v, tuple):
112 e = k(*v)
113 else:
114 e = k(v)
115 except Exception:
116 e = Exception('%s: %r' % (failure.type, v))
117 raise e
118 d.value = raise_error
119 generator_next()
120
121 def callback(result, d):
122 d.value = lambda: result
123 generator_next()
124
125 generator_next()
126
127 return result
128
129 return wrapper
130
132 return lambda self, *args, **kwargs: \
133 defer_generator(proc)(self, *args, **kwargs)
134
136 """
137 Return a deferred which will fire from a callLater after d fires
138 """
139 def fire(result, d):
140 reactor.callLater(0, d.callback, result)
141 res = defer.Deferred()
142 deferred.addCallback(fire, res)
143 return res
144
146 """
147 I am a helper class to make sure that the deferred is fired only once
148 with either a result or exception.
149
150 @ivar d: the deferred that gets fired as part of the resolution
151 @type d: L{twisted.internet.defer.Deferred}
152 """
154 self.d = defer.Deferred()
155 self.fired = False
156
158 """
159 Clean up any resources related to the resolution.
160 Subclasses can implement me.
161 """
162 pass
163
165 """
166 Make the result succeed, triggering the callbacks with the given result.
167 If a result was already reached, do nothing.
168 """
169 if not self.fired:
170 self.fired = True
171 self.cleanup()
172 self.d.callback(result)
173
175 """
176 Make the result fail, triggering the errbacks with the given exception.
177 If a result was already reached, do nothing.
178 """
179 if not self.fired:
180 self.fired = True
181 self.cleanup()
182 self.d.errback(exception)
183
185 """
186 Provides a mechanism to attempt to run some deferred operation until it
187 succeeds. On failure, the operation is tried again later, exponentially
188 backing off.
189 """
190 maxDelay = 1800
191 initialDelay = 5.0
192
193 factor = 2.7182818284590451
194 jitter = 0.11962656492
195 delay = None
196
197 - def __init__(self, deferredCreate, *args, **kwargs):
198 """
199 Create a new RetryingDeferred. Will call
200 deferredCreate(*args, **kwargs) each time a new deferred is needed.
201 """
202 self._create = deferredCreate
203 self._args = args
204 self._kwargs = kwargs
205
206 self._masterD = None
207 self._running = False
208 self._callId = None
209
211 """
212 Start trying. Returns a deferred that will fire when this operation
213 eventually succeeds. That deferred will only errback if this
214 RetryingDeferred is cancelled (it will then errback with the result of
215 the next attempt if one is in progress, or a CancelledError. # TODO: yeah?
216 """
217 self._masterD = defer.Deferred()
218 self._running = True
219
220 self._retry()
221
222 return self._masterD
223
225 if self._callId:
226 self._callId.cancel()
227 self._masterD.errback(errors.CancelledError())
228 self._masterD = None
229
230 self._callId = None
231 self._running = False
232
234 self._callId = None
235 d = self._create(*self._args, **self._kwargs)
236 d.addCallbacks(self._success, self._failed)
237
239
240 self._masterD.callback(val)
241 self._masterD = None
242
244 if self._running:
245 next = self._nextDelay()
246 self._callId = reactor.callLater(next, self._retry)
247 else:
248 self._masterD.errback(failure)
249 self._masterD = None
250
263