Package flumotion :: Package twisted :: Module defer
[hide private]

Source Code for Module flumotion.twisted.defer

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_defer -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import random 
 23   
 24  from twisted.internet import defer, reactor 
 25  from twisted.python import reflect 
 26   
 27  # FIXME: this is for HandledException - maybe it should move here instead ? 
 28  from flumotion.common import errors 
 29   
 30  __version__ = "$Rev: 6125 $" 
 31   
 32   
 33  # See flumotion.test.test_defer for examples 
34 -def defer_generator(proc):
35 def wrapper(*args, **kwargs): 36 gen = proc(*args, **kwargs) 37 result = defer.Deferred() 38 39 # To support having the errback of last resort, we need to have 40 # an errback which runs after all the other errbacks, *at the 41 # point at which the deferred is fired*. So users of this code 42 # have from between the time the deferred is created and the 43 # time that the deferred is fired to attach their errbacks. 44 # 45 # Unfortunately we only control the time that the deferred is 46 # created. So we attach a first errback that then adds an 47 # errback to the end of the list. Unfortunately we can't add to 48 # the list while the deferred is firing. In a decision between 49 # having decent error reporting and being nice to a small part 50 # of twisted I chose the former. This code takes a reference to 51 # the callback list, so that we can add an errback to the list 52 # while the deferred is being fired. It temporarily sets the 53 # state of the deferred to not having been fired, so that adding 54 # the errbacks doesn't automatically call the newly added 55 # methods. 56 result.__callbacks = result.callbacks 57 def with_saved_callbacks(proc, *_args, **_kwargs): 58 saved_callbacks, saved_called = result.callbacks, result.called 59 result.callbacks, result.called = result.__callbacks, False 60 proc(*_args, **_kwargs) 61 result.callbacks, result.called = saved_callbacks, saved_called
62 63 # Add errback-of-last-resort 64 def default_errback(failure, d): 65 # an already handled exception just gets propagated up without 66 # doing a traceback 67 if failure.check(errors.HandledException): 68 return failure 69 70 def print_traceback(f): 71 import traceback 72 print 'flumotion.twisted.defer: ' + \ 73 'Unhandled error calling', proc.__name__, ':', f.type 74 traceback.print_exc() 75 with_saved_callbacks (lambda: d.addErrback(print_traceback)) 76 raise 77 result.addErrback(default_errback, result) 78 79 def generator_next(): 80 try: 81 x = gen.next() 82 if isinstance(x, defer.Deferred): 83 x.addCallback(callback, x).addErrback(errback, x) 84 else: 85 result.callback(x) 86 except StopIteration: 87 result.callback(None) 88 except Exception, e: 89 result.errback(e) 90 91 def errback(failure, d): 92 def raise_error(): 93 # failure.parents[-1] will be the exception class for local 94 # failures and the string name of the exception class 95 # for remote failures (which might not exist in our 96 # namespace) 97 # 98 # failure.value will be the tuple of arguments to the 99 # exception in the local case, or a string 100 # representation of that in the remote case (see 101 # pb.CopyableFailure.getStateToCopy()). 102 # 103 # we can only reproduce a remote exception if the 104 # exception class is in our namespace, and it only takes 105 # one string argument. if either condition is not true, 106 # we wrap the strings in a default Exception. 107 k, v = failure.parents[-1], failure.value 108 try: 109 if isinstance(k, str): 110 k = reflect.namedClass(k) 111 if isinstance(v, tuple): 112 e = k(*v) 113 else: 114 e = k(v) 115 except Exception: 116 e = Exception('%s: %r' % (failure.type, v)) 117 raise e 118 d.value = raise_error 119 generator_next() 120 121 def callback(result, d): 122 d.value = lambda: result 123 generator_next() 124 125 generator_next() 126 127 return result 128 129 return wrapper 130
131 -def defer_generator_method(proc):
132 return lambda self, *args, **kwargs: \ 133 defer_generator(proc)(self, *args, **kwargs)
134
135 -def defer_call_later(deferred):
136 """ 137 Return a deferred which will fire from a callLater after d fires 138 """ 139 def fire(result, d): 140 reactor.callLater(0, d.callback, result)
141 res = defer.Deferred() 142 deferred.addCallback(fire, res) 143 return res 144
145 -class Resolution:
146 """ 147 I am a helper class to make sure that the deferred is fired only once 148 with either a result or exception. 149 150 @ivar d: the deferred that gets fired as part of the resolution 151 @type d: L{twisted.internet.defer.Deferred} 152 """
153 - def __init__(self):
154 self.d = defer.Deferred() 155 self.fired = False
156
157 - def cleanup(self):
158 """ 159 Clean up any resources related to the resolution. 160 Subclasses can implement me. 161 """ 162 pass
163
164 - def callback(self, result):
165 """ 166 Make the result succeed, triggering the callbacks with the given result. 167 If a result was already reached, do nothing. 168 """ 169 if not self.fired: 170 self.fired = True 171 self.cleanup() 172 self.d.callback(result)
173
174 - def errback(self, exception):
175 """ 176 Make the result fail, triggering the errbacks with the given exception. 177 If a result was already reached, do nothing. 178 """ 179 if not self.fired: 180 self.fired = True 181 self.cleanup() 182 self.d.errback(exception)
183
184 -class RetryingDeferred(object):
185 """ 186 Provides a mechanism to attempt to run some deferred operation until it 187 succeeds. On failure, the operation is tried again later, exponentially 188 backing off. 189 """ 190 maxDelay = 1800 # Default to 30 minutes 191 initialDelay = 5.0 192 # Arbitrarily take these constants from twisted's ReconnectingClientFactory 193 factor = 2.7182818284590451 194 jitter = 0.11962656492 195 delay = None 196
197 - def __init__(self, deferredCreate, *args, **kwargs):
198 """ 199 Create a new RetryingDeferred. Will call 200 deferredCreate(*args, **kwargs) each time a new deferred is needed. 201 """ 202 self._create = deferredCreate 203 self._args = args 204 self._kwargs = kwargs 205 206 self._masterD = None 207 self._running = False 208 self._callId = None
209
210 - def start(self):
211 """ 212 Start trying. Returns a deferred that will fire when this operation 213 eventually succeeds. That deferred will only errback if this 214 RetryingDeferred is cancelled (it will then errback with the result of 215 the next attempt if one is in progress, or a CancelledError. # TODO: yeah? 216 """ 217 self._masterD = defer.Deferred() 218 self._running = True 219 220 self._retry() 221 222 return self._masterD
223
224 - def cancel(self):
225 if self._callId: 226 self._callId.cancel() 227 self._masterD.errback(errors.CancelledError()) 228 self._masterD = None 229 230 self._callId = None 231 self._running = False
232
233 - def _retry(self):
234 self._callId = None 235 d = self._create(*self._args, **self._kwargs) 236 d.addCallbacks(self._success, self._failed)
237
238 - def _success(self, val):
239 # TODO: what if we were cancelled and then get here? 240 self._masterD.callback(val) 241 self._masterD = None
242
243 - def _failed(self, failure):
244 if self._running: 245 next = self._nextDelay() 246 self._callId = reactor.callLater(next, self._retry) 247 else: 248 self._masterD.errback(failure) 249 self._masterD = None
250
251 - def _nextDelay(self):
252 if self.delay is None: 253 self.delay = self.initialDelay 254 else: 255 self.delay = self.delay * self.factor 256 257 if self.jitter: 258 self.delay = random.normalvariate(self.delay, 259 self.delay * self.jitter) 260 self.delay = min(self.delay, self.maxDelay) 261 262 return self.delay
263