Package flumotion :: Package twisted :: Module defer
[hide private]

Source Code for Module flumotion.twisted.defer

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_defer -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import random 
 23   
 24  from twisted.internet import defer, reactor 
 25  from twisted.python import reflect 
 26   
 27  # FIXME: this is for HandledException - maybe it should move here instead ? 
 28  from flumotion.common import errors 
 29   
 30  __version__ = "$Rev$" 
 31   
 32   
 33  # See flumotion.test.test_defer for examples 
 34   
 35   
36 -def defer_generator(proc):
37 38 def wrapper(*args, **kwargs): 39 gen = proc(*args, **kwargs) 40 result = defer.Deferred() 41 42 # To support having the errback of last resort, we need to have 43 # an errback which runs after all the other errbacks, *at the 44 # point at which the deferred is fired*. So users of this code 45 # have from between the time the deferred is created and the 46 # time that the deferred is fired to attach their errbacks. 47 # 48 # Unfortunately we only control the time that the deferred is 49 # created. So we attach a first errback that then adds an 50 # errback to the end of the list. Unfortunately we can't add to 51 # the list while the deferred is firing. In a decision between 52 # having decent error reporting and being nice to a small part 53 # of twisted I chose the former. This code takes a reference to 54 # the callback list, so that we can add an errback to the list 55 # while the deferred is being fired. It temporarily sets the 56 # state of the deferred to not having been fired, so that adding 57 # the errbacks doesn't automatically call the newly added 58 # methods. 59 result.__callbacks = result.callbacks 60 61 def with_saved_callbacks(proc, *_args, **_kwargs): 62 saved_callbacks, saved_called = result.callbacks, result.called 63 result.callbacks, result.called = result.__callbacks, False 64 proc(*_args, **_kwargs) 65 result.callbacks, result.called = saved_callbacks, saved_called
66 67 # Add errback-of-last-resort 68 69 def default_errback(failure, d): 70 # an already handled exception just gets propagated up without 71 # doing a traceback 72 if failure.check(errors.HandledException): 73 return failure 74 75 def print_traceback(f): 76 import traceback 77 print 'flumotion.twisted.defer: ' + \ 78 'Unhandled error calling', proc.__name__, ':', f.type 79 traceback.print_exc() 80 with_saved_callbacks(lambda: d.addErrback(print_traceback)) 81 raise 82 result.addErrback(default_errback, result) 83 84 def generator_next(): 85 try: 86 x = gen.next() 87 if isinstance(x, defer.Deferred): 88 x.addCallback(callback, x).addErrback(errback, x) 89 else: 90 result.callback(x) 91 except StopIteration: 92 result.callback(None) 93 except Exception, e: 94 result.errback(e) 95 96 def errback(failure, d): 97 98 def raise_error(): 99 # failure.parents[-1] will be the exception class for local 100 # failures and the string name of the exception class 101 # for remote failures (which might not exist in our 102 # namespace) 103 # 104 # failure.value will be the tuple of arguments to the 105 # exception in the local case, or a string 106 # representation of that in the remote case (see 107 # pb.CopyableFailure.getStateToCopy()). 108 # 109 # we can only reproduce a remote exception if the 110 # exception class is in our namespace, and it only takes 111 # one string argument. if either condition is not true, 112 # we wrap the strings in a default Exception. 113 k, v = failure.parents[-1], failure.value 114 try: 115 if isinstance(k, str): 116 k = reflect.namedClass(k) 117 if isinstance(v, tuple): 118 e = k(*v) 119 else: 120 e = k(v) 121 except Exception: 122 e = Exception('%s: %r' % (failure.type, v)) 123 raise e 124 d.value = raise_error 125 generator_next() 126 127 def callback(result, d): 128 d.value = lambda: result 129 generator_next() 130 131 generator_next() 132 133 return result 134 135 return wrapper 136 137
138 -def defer_generator_method(proc):
139 return lambda self, *args, **kwargs: \ 140 defer_generator(proc)(self, *args, **kwargs)
141 142
143 -def defer_call_later(deferred):
144 """ 145 Return a deferred which will fire from a callLater after d fires 146 """ 147 148 def fire(result, d): 149 reactor.callLater(0, d.callback, result)
150 res = defer.Deferred() 151 deferred.addCallback(fire, res) 152 return res 153 154
155 -class Resolution:
156 """ 157 I am a helper class to make sure that the deferred is fired only once 158 with either a result or exception. 159 160 @ivar d: the deferred that gets fired as part of the resolution 161 @type d: L{twisted.internet.defer.Deferred} 162 """ 163
164 - def __init__(self):
165 self.d = defer.Deferred() 166 self.fired = False
167
168 - def cleanup(self):
169 """ 170 Clean up any resources related to the resolution. 171 Subclasses can implement me. 172 """ 173 pass
174
175 - def callback(self, result):
176 """ 177 Make the result succeed, triggering the callbacks with 178 the given result. If a result was already reached, do nothing. 179 """ 180 if not self.fired: 181 self.fired = True 182 self.cleanup() 183 self.d.callback(result)
184
185 - def errback(self, exception):
186 """ 187 Make the result fail, triggering the errbacks with the given exception. 188 If a result was already reached, do nothing. 189 """ 190 if not self.fired: 191 self.fired = True 192 self.cleanup() 193 self.d.errback(exception)
194 195
196 -class RetryingDeferred(object):
197 """ 198 Provides a mechanism to attempt to run some deferred operation until it 199 succeeds. On failure, the operation is tried again later, exponentially 200 backing off. 201 """ 202 maxDelay = 1800 # Default to 30 minutes 203 initialDelay = 5.0 204 # Arbitrarily take these constants from twisted's ReconnectingClientFactory 205 factor = 2.7182818284590451 206 jitter = 0.11962656492 207 delay = None 208
209 - def __init__(self, deferredCreate, *args, **kwargs):
210 """ 211 Create a new RetryingDeferred. Will call 212 deferredCreate(*args, **kwargs) each time a new deferred is needed. 213 """ 214 self._create = deferredCreate 215 self._args = args 216 self._kwargs = kwargs 217 218 self._masterD = None 219 self._running = False 220 self._callId = None
221
222 - def start(self):
223 """ 224 Start trying. Returns a deferred that will fire when this operation 225 eventually succeeds. That deferred will only errback if this 226 RetryingDeferred is cancelled (it will then errback with the result of 227 the next attempt if one is in progress, or a CancelledError. 228 # TODO: yeah? 229 """ 230 self._masterD = defer.Deferred() 231 self._running = True 232 self.delay = None 233 234 self._retry() 235 236 return self._masterD
237
238 - def cancel(self):
239 if self._callId: 240 self._callId.cancel() 241 self._masterD.errback(errors.CancelledError()) 242 self._masterD = None 243 244 self._callId = None 245 self._running = False
246
247 - def _retry(self):
248 self._callId = None 249 d = self._create(*self._args, **self._kwargs) 250 d.addCallbacks(self._success, self._failed)
251
252 - def _success(self, val):
253 # TODO: what if we were cancelled and then get here? 254 self._masterD.callback(val) 255 self._masterD = None
256
257 - def _failed(self, failure):
258 if self._running: 259 next = self._nextDelay() 260 self._callId = reactor.callLater(next, self._retry) 261 else: 262 self._masterD.errback(failure) 263 self._masterD = None
264
265 - def _nextDelay(self):
266 if self.delay is None: 267 self.delay = self.initialDelay 268 else: 269 self.delay = self.delay * self.factor 270 271 if self.jitter: 272 self.delay = random.normalvariate(self.delay, 273 self.delay * self.jitter) 274 self.delay = min(self.delay, self.maxDelay) 275 276 return self.delay
277