Package flumotion :: Package twisted :: Module flavors
[hide private]

Source Code for Module flumotion.twisted.flavors

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_flavors -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Flumotion Twisted-like flavors 
 24   
 25  Inspired by L{twisted.spread.flavors} 
 26  """ 
 27   
 28  from twisted.internet import defer 
 29  from twisted.spread import pb 
 30  from zope.interface import Interface 
 31  from flumotion.common import log 
 32   
 33  __version__ = "$Rev: 6990 $" 
 34   
 35   
 36  ### Generice Cacheable/RemoteCache for state objects 
37 -class IStateListener(Interface):
38 """ 39 I am an interface for objects that want to listen to changes on 40 cached states. 41 """
42 - def stateSet(self, object, key, value):
43 """ 44 @type object: L{StateRemoteCache} 45 @param object: the state object having changed 46 @type key: string 47 @param key: the key being set 48 @param value: the value the key is being set to 49 50 The given key on the given object has been set to the given value. 51 """
52
53 - def stateAppend(self, object, key, value):
54 """ 55 @type object: L{StateRemoteCache} 56 @param object: the state object having changed 57 @type key: string 58 @param key: the key being appended to 59 @param value: the value being appended to the list given by key 60 61 The given value has been added to the list given by the key. 62 """
63
64 - def stateRemove(self, object, key, value):
65 """ 66 @type object: L{StateRemoteCache} 67 @param object: the state object having changed 68 @type key: string 69 @param key: the key being removed from 70 @param value: the value being removed from the list given by key 71 72 The given value has been removed from the list given by the key. 73 """
74
75 -class StateCacheable(pb.Cacheable):
76 """ 77 I am a cacheable state object. 78 79 I cache key-value pairs, where values can be either single objects 80 or list of objects. 81 """
82 - def __init__(self):
83 self._observers = [] 84 self._dict = {}
85 86 # our methods
87 - def addKey(self, key, value=None):
88 """ 89 Add a key to the state cache so it can be used with set. 90 """ 91 self._dict[key] = value
92 93 # don't use [] as the default value, it creates only one reference and 94 # reuses it
95 - def addListKey(self, key, value=None):
96 """ 97 Add a key for a list of objects to the state cache. 98 """ 99 if value is None: 100 value = [] 101 self._dict[key] = value
102 103 # don't use {} as the default value, it creates only one reference and 104 # reuses it
105 - def addDictKey(self, key, value=None):
106 """ 107 Add a key for a dict value to the state cache. 108 """ 109 if value is None: 110 value = {} 111 self._dict[key] = value
112
113 - def hasKey(self, key):
114 return key in self._dict.keys()
115
116 - def keys(self):
117 return self._dict.keys()
118
119 - def get(self, key, otherwise=None):
120 """ 121 Get the state cache value for the given key. 122 123 Return otherwise in case where key is present but value None. 124 """ 125 if not key in self._dict.keys(): 126 raise KeyError('%s in %r' % (key, self)) 127 128 v = self._dict[key] 129 # not v would also trigger empty lists 130 if v == None: 131 return otherwise 132 133 return v
134
135 - def set(self, key, value):
136 """ 137 Set a given state key to the given value. 138 Notifies observers of this Cacheable through observe_set. 139 """ 140 if not key in self._dict.keys(): 141 raise KeyError('%s in %r' % (key, self)) 142 143 self._dict[key] = value 144 dList = [o.callRemote('set', key, value) for o in self._observers] 145 return defer.DeferredList(dList)
146
147 - def append(self, key, value):
148 """ 149 Append the given object to the given list. 150 Notifies observers of this Cacheable through observe_append. 151 """ 152 if not key in self._dict.keys(): 153 raise KeyError('%s in %r' % (key, self)) 154 155 self._dict[key].append(value) 156 dList = [o.callRemote('append', key, value) for o in self._observers] 157 return defer.DeferredList(dList)
158
159 - def remove(self, key, value):
160 """ 161 Remove the given object from the given list. 162 Notifies observers of this Cacheable through observe_remove. 163 """ 164 if not key in self._dict.keys(): 165 raise KeyError('%s in %r' % (key, self)) 166 167 try: 168 self._dict[key].remove(value) 169 except ValueError: 170 raise ValueError('value %r not in list %r for key %r' % ( 171 value, self._dict[key], key)) 172 dList = [o.callRemote('remove', key, value) for o in self._observers] 173 dl = defer.DeferredList(dList) 174 return dl
175
176 - def setitem(self, key, subkey, value):
177 """ 178 Set a value in the given dict. 179 Notifies observers of this Cacheable through observe_setitem. 180 """ 181 if not key in self._dict.keys(): 182 raise KeyError('%s in %r' % (key, self)) 183 184 self._dict[key][subkey] = value 185 dList = [o.callRemote('setitem', key, subkey, value) 186 for o in self._observers] 187 return defer.DeferredList(dList)
188
189 - def delitem(self, key, subkey):
190 """ 191 Removes an element from the given dict. Note that the key refers 192 to the dict; it is the subkey (and its value) that will be removed. 193 Notifies observers of this Cacheable through observe_delitem. 194 """ 195 if not key in self._dict.keys(): 196 raise KeyError('%s in %r' % (key, self)) 197 198 try: 199 value = self._dict[key].pop(subkey) 200 except KeyError: 201 raise KeyError('key %r not in dict %r for key %r' % ( 202 subkey, self._dict[key], key)) 203 dList = [o.callRemote('delitem', key, subkey, value) for o in 204 self._observers] 205 dl = defer.DeferredList(dList) 206 return dl
207 208 # pb.Cacheable methods
209 - def getStateToCacheAndObserveFor(self, perspective, observer):
210 self._observers.append(observer) 211 return self._dict
212
213 - def stoppedObserving(self, perspective, observer):
214 self._observers.remove(observer)
215 216 # At some point, a StateRemoteCache will become invalid. The normal way 217 # would be losing the connection to the RemoteCacheable, although 218 # particular kinds of RemoteCache objects might have other ways 219 # (e.g. component removed from flow). 220 # 221 # We support listening for invalidation events. However, in order to 222 # ensure predictable program behavior, we can't do a notifyOnDisconnect 223 # directly on the broker. If we did that, program semantics would be 224 # dependent on the call order of the notifyOnDisconnect methods, which 225 # would likely lead to heisenbugs. 226 # 227 # Instead, invalidation will only be performed by the application, if at 228 # all, via an explicit call to invalidate(). 229
230 -class StateRemoteCache(pb.RemoteCache):
231 """ 232 I am a remote cache of a state object. 233 """
234 - def __init__(self):
235 self._listeners = {}
236 # no constructor 237 # pb.RemoteCache.__init__(self) 238 239 # our methods
240 - def hasKey(self, key):
241 return key in self._dict.keys()
242
243 - def keys(self):
244 return self._dict.keys()
245
246 - def get(self, key, otherwise=None):
247 """ 248 Get the state cache value for the given key. 249 250 Return otherwise in case where key is present but value None. 251 """ 252 if not key in self._dict.keys(): 253 raise KeyError('%s in %r' % (key, self)) 254 255 v = self._dict[key] 256 # compare to actual None, otherwise we also get zero-like values 257 if v == None: 258 return otherwise 259 260 return v
261
262 - def _ensureListeners(self):
263 # when this is created through serialization from a JobCS, 264 # __init__ does not seem to get called, so create self._listeners 265 if not hasattr(self, '_listeners'): 266 # fixme: this means that callbacks will be fired in 267 # arbitrary order; should be fired in order of connecting. 268 self._listeners = {}
269 270 #F0.8: remove set=None and move set_=None there
271 - def addListener(self, listener, set=None, append=None, remove=None, 272 setitem=None, delitem=None, invalidate=None, set_=None):
273 """ 274 Adds a listener to the remote cache. 275 276 The caller will be notified of state events via the functions 277 given as the 'set', 'append', and 'remove', 'setitem', and 278 'delitem' keyword arguments. 279 280 Always call this method using keyword arguments for the functions; 281 calling them with positional arguments is not supported. 282 283 Setting one of the event handlers to None will ignore that 284 event. It is an error for all event handlers to be None. 285 286 @param listener: new listener object that wants to receive 287 cache state change notifications. 288 @type listener: object implementing 289 L{flumotion.twisted.flavors.IStateListener} 290 @param set_: procedure to call when a value is set 291 @type set_: procedure(object, key, value) -> None 292 @param append: procedure to call when a value is appended to a list 293 @type append: procedure(object, key, value) -> None 294 @param remove: procedure to call when a value is removed from 295 a list 296 @type remove: procedure(object, key, value) -> None 297 @param setitem: procedure to call when a value is set in a dict 298 @type setitem: procedure(object, key, subkey, value) -> None 299 @param delitem: procedure to call when a value is removed 300 from a dict. 301 @type delitem: procedure(object, key, subkey, value) -> None 302 @param invalidate: procedure to call when this cache has been 303 invalidated. 304 @type invalidate: procedure(object) -> None 305 """ 306 # F0.8: remove set 307 if set: 308 import warnings 309 warnings.warn('Please use the set_ kwarg instead', 310 DeprecationWarning, stacklevel=2) 311 set_ = set 312 313 if not (set_ or append or remove or setitem or delitem or invalidate): 314 # FIXME: remove this behavior in F0.6 315 import sys 316 log.safeprintf(sys.stderr, 317 "Warning: Use of deprecated %r.addListener(%r)" 318 " without explicit event handlers\n", self, 319 listener) 320 set_ = listener.stateSet 321 append = listener.stateAppend 322 remove = listener.stateRemove 323 324 self._ensureListeners() 325 if listener in self._listeners: 326 raise KeyError( 327 "%r is already a listener of %r" % (listener, self)) 328 self._listeners[listener] = [set_, append, remove, setitem, 329 delitem, invalidate] 330 if invalidate and hasattr(self, '_cache_invalid'): 331 invalidate(self)
332
333 - def removeListener(self, listener):
334 self._ensureListeners() 335 if listener not in self._listeners: 336 raise KeyError, listener 337 del self._listeners[listener]
338 339 # pb.RemoteCache methods
340 - def setCopyableState(self, dict):
341 self._dict = dict
342
343 - def _notifyListeners(self, index, *args):
344 # notify our local listeners; compute set of procs first, so as 345 # to allow the listeners set to change during the calls 346 self._ensureListeners() 347 for proc in [tup[index] for tup in self._listeners.values()]: 348 if proc: 349 try: 350 proc(self, *args) 351 except Exception, e: 352 # These are all programming errors 353 log.warning("stateremotecache", 354 'Exception in StateCache handler: %s', 355 log.getExceptionMessage(e))
356
357 - def observe_set(self, key, value):
358 self._dict[key] = value 359 # if we also subclass from Cacheable, then we're a proxy, so proxy 360 if hasattr(self, 'set'): 361 StateCacheable.set(self, key, value) 362 363 self._notifyListeners(0, key, value)
364
365 - def observe_append(self, key, value):
366 # if we also subclass from Cacheable, then we're a proxy, so proxy 367 if hasattr(self, 'append'): 368 StateCacheable.append(self, key, value) 369 else: 370 self._dict[key].append(value) 371 372 self._notifyListeners(1, key, value)
373
374 - def observe_remove(self, key, value):
375 # if we also subclass from Cacheable, then we're a proxy, so proxy 376 if hasattr(self, 'remove'): 377 StateCacheable.remove(self, key, value) 378 else: 379 try: 380 self._dict[key].remove(value) 381 except ValueError: 382 raise ValueError("value %r not under key %r with values %r" % 383 (value, key, self._dict[key])) 384 385 self._notifyListeners(2, key, value)
386
387 - def observe_setitem(self, key, subkey, value):
388 # if we also subclass from Cacheable, then we're a proxy, so proxy 389 if hasattr(self, 'setitem'): 390 StateCacheable.setitem(self, key, subkey, value) 391 else: 392 self._dict[key][subkey] = value 393 394 self._notifyListeners(3, key, subkey, value)
395
396 - def observe_delitem(self, key, subkey, value):
397 # if we also subclass from Cacheable, then we're a proxy, so proxy 398 if hasattr(self, 'delitem'): 399 StateCacheable.delitem(self, key, subkey) 400 else: 401 try: 402 del self._dict[key][subkey] 403 except KeyError: 404 raise KeyError("key %r not in dict %r for state dict %r" % 405 (subkey, self._dict[key], self._dict)) 406 407 self._notifyListeners(4, key, subkey, value)
408
409 - def invalidate(self):
410 """Invalidate this StateRemoteCache. 411 412 Calling this method will result in the invalidate callback being 413 called for all listeners that passed an invalidate handler to 414 addListener. This method is not called automatically; it is 415 provided as a convenience to applications. 416 """ 417 assert not hasattr(self, '_cache_invalid'), \ 418 'object has already been invalidated' 419 # if we also subclass from Cacheable, there is currently no way 420 # to remotely invalidate the cache. that's ok though, because 421 # double-caches are currently only used by the manager, which 422 # does not call invalidate() on its caches. 423 self._cache_invalid = True 424 425 self._notifyListeners(5)
426