Package flumotion :: Package twisted :: Module flavors
[hide private]

Source Code for Module flumotion.twisted.flavors

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_flavors -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Flumotion Twisted-like flavors 
 24   
 25  Inspired by L{twisted.spread.flavors} 
 26  """ 
 27   
 28  from twisted.internet import defer 
 29  from twisted.python import components 
 30  from twisted.spread import pb 
 31   
 32  # T1.3: suppress components warnings in Twisted 2.0 
 33  from flumotion.twisted import compat 
 34  compat.filterWarnings(components, 'ComponentsDeprecationWarning') 
 35   
 36  from flumotion.twisted.compat import Interface 
 37   
 38  ### Generice Cacheable/RemoteCache for state objects 
39 -class IStateListener(compat.Interface):
40 """ 41 I am an interface for objects that want to listen to changes on 42 cached states. 43 """
44 - def stateSet(self, object, key, value):
45 """ 46 @type object: L{StateRemoteCache} 47 @param object: the state object having changed 48 @type key: string 49 @param key: the key being set 50 @param value: the value the key is being set to 51 52 The given key on the given object has been set to the given value. 53 """
54
55 - def stateAppend(self, object, key, value):
56 """ 57 @type object: L{StateRemoteCache} 58 @param object: the state object having changed 59 @type key: string 60 @param key: the key being appended to 61 @param value: the value being appended to the list given by key 62 63 The given value has been added to the list given by the key. 64 """
65
66 - def stateRemove(self, object, key, value):
67 """ 68 @type object: L{StateRemoteCache} 69 @param object: the state object having changed 70 @type key: string 71 @param key: the key being removed from 72 @param value: the value being removed from the list given by key 73 74 The given value has been removed from the list given by the key. 75 """
76
77 -class StateCacheable(pb.Cacheable):
78 """ 79 I am a cacheable state object. 80 81 I cache key-value pairs, where values can be either single objects 82 or list of objects. 83 """
84 - def __init__(self):
85 self._observers = [] 86 self._dict = {}
87 88 # our methods
89 - def addKey(self, key, value=None):
90 """ 91 Add a key to the state cache so it can be used with set. 92 """ 93 self._dict[key] = value
94 95 # don't use [] as the default value, it creates only one reference and 96 # reuses it
97 - def addListKey(self, key, value=None):
98 """ 99 Add a key for a list of objects to the state cache. 100 """ 101 if value is None: 102 value = [] 103 self._dict[key] = value
104 105 # don't use {} as the default value, it creates only one reference and 106 # reuses it
107 - def addDictKey(self, key, value=None):
108 """ 109 Add a key for a dict value to the state cache. 110 """ 111 if value is None: 112 value = {} 113 self._dict[key] = value
114
115 - def hasKey(self, key):
116 return key in self._dict.keys()
117
118 - def keys(self):
119 return self._dict.keys()
120
121 - def get(self, key, otherwise=None):
122 """ 123 Get the state cache value for the given key. 124 125 Return otherwise in case where key is present but value None. 126 """ 127 if not key in self._dict.keys(): 128 raise KeyError('%s in %r' % (key, self)) 129 130 v = self._dict[key] 131 # not v would also trigger empty lists 132 if v == None: 133 return otherwise 134 135 return v
136
137 - def set(self, key, value):
138 """ 139 Set a given state key to the given value. 140 Notifies observers of this Cacheable through observe_set. 141 """ 142 if not key in self._dict.keys(): 143 raise KeyError('%s in %r' % (key, self)) 144 145 self._dict[key] = value 146 list = [o.callRemote('set', key, value) for o in self._observers] 147 return defer.DeferredList(list)
148
149 - def append(self, key, value):
150 """ 151 Append the given object to the given list. 152 Notifies observers of this Cacheable through observe_append. 153 """ 154 if not key in self._dict.keys(): 155 raise KeyError('%s in %r' % (key, self)) 156 157 self._dict[key].append(value) 158 list = [o.callRemote('append', key, value) for o in self._observers] 159 return defer.DeferredList(list)
160
161 - def remove(self, key, value):
162 """ 163 Remove the given object from the given list. 164 Notifies observers of this Cacheable through observe_remove. 165 """ 166 if not key in self._dict.keys(): 167 raise KeyError('%s in %r' % (key, self)) 168 169 try: 170 self._dict[key].remove(value) 171 except ValueError: 172 raise ValueError('value %r not in list %r for key %r' % ( 173 value, self._dict[key], key)) 174 list = [o.callRemote('remove', key, value) for o in self._observers] 175 dl = defer.DeferredList(list) 176 return dl
177
178 - def setitem(self, key, subkey, value):
179 """ 180 Set a value in the given dict. 181 Notifies observers of this Cacheable through observe_setitem. 182 """ 183 if not key in self._dict.keys(): 184 raise KeyError('%s in %r' % (key, self)) 185 186 self._dict[key][subkey] = value 187 list = [o.callRemote('setitem', key, subkey, value) 188 for o in self._observers] 189 return defer.DeferredList(list)
190
191 - def delitem(self, key, subkey):
192 """ 193 Removes an element from the given dict. Note that the key refers 194 to the dict; it is the subkey (and its value) that will be removed. 195 Notifies observers of this Cacheable through observe_delitem. 196 """ 197 if not key in self._dict.keys(): 198 raise KeyError('%s in %r' % (key, self)) 199 200 try: 201 value = self._dict[key].pop(subkey) 202 except KeyError: 203 raise KeyError('key %r not in dict %r for key %r' % ( 204 subkey, self._dict[key], key)) 205 list = [o.callRemote('delitem', key, subkey, value) for o in 206 self._observers] 207 dl = defer.DeferredList(list) 208 return dl
209 210 # pb.Cacheable methods
211 - def getStateToCacheAndObserveFor(self, perspective, observer):
212 self._observers.append(observer) 213 return self._dict
214
215 - def stoppedObserving(self, perspective, observer):
216 self._observers.remove(observer)
217 218 # At some point, a StateRemoteCache will become invalid. The normal way 219 # would be losing the connection to the RemoteCacheable, although 220 # particular kinds of RemoteCache objects might have other ways 221 # (e.g. component removed from flow). 222 # 223 # We support listening for invalidation events. However, in order to 224 # ensure predictable program behavior, we can't do a notifyOnDisconnect 225 # directly on the broker. If we did that, program semantics would be 226 # dependent on the call order of the notifyOnDisconnect methods, which 227 # would likely lead to heisenbugs. 228 # 229 # Instead, invalidation will only be performed by the application, if at 230 # all, via an explicit call to invalidate(). 231
232 -class StateRemoteCache(pb.RemoteCache):
233 """ 234 I am a remote cache of a state object. 235 """
236 - def __init__(self):
237 self._listeners = {}
238 # no constructor 239 # pb.RemoteCache.__init__(self) 240 241 # our methods
242 - def hasKey(self, key):
243 return key in self._dict.keys()
244
245 - def keys(self):
246 return self._dict.keys()
247
248 - def get(self, key, otherwise=None):
249 """ 250 Get the state cache value for the given key. 251 252 Return otherwise in case where key is present but value None. 253 """ 254 if not key in self._dict.keys(): 255 raise KeyError('%s in %r' % (key, self)) 256 257 v = self._dict[key] 258 # compare to actual None, otherwise we also get zero-like values 259 if v == None: 260 return otherwise 261 262 return v
263
264 - def _ensureListeners(self):
265 # when this is created through serialization from a JobCS, 266 # __init__ does not seem to get called, so create self._listeners 267 if not hasattr(self, '_listeners'): 268 # fixme: this means that callbacks will be fired in 269 # arbitrary order; should be fired in order of connecting. 270 self._listeners = {}
271
272 - def addListener(self, listener, set=None, append=None, remove=None, 273 setitem=None, delitem=None, invalidate=None):
274 """ 275 Adds a listener to the remote cache. 276 277 The caller will be notified of state events via the functions 278 given as the 'set', 'append', and 'remove', 'setitem', and 279 'delitem' keyword arguments. 280 281 Setting one of the event handlers to None will ignore that 282 event. It is an error for all event handlers to be None. 283 284 @param listener: A new listener object that wants to receive 285 cache state change notifications. 286 @type listener: object implementing 287 L{flumotion.twisted.flavors.IStateListener} 288 @param set: A procedure to call when a value is set 289 @type set: procedure(object, key, value) -> None 290 @param append: A procedure to call when a value is appended to a 291 list 292 @type append: procedure(object, key, value) -> None 293 @param remove: A procedure to call when a value is removed from 294 a list 295 @type remove: procedure(object, key, value) -> None 296 @param setitem: A procedure to call when a value is set in a 297 dict. 298 @type setitem: procedure(object, key, subkey, value) -> None 299 @param delitem: A procedure to call when a value is removed 300 from a dict. 301 @type delitem: procedure(object, key, subkey, value) -> None 302 @param invalidate: A procedure to call when this cache has been 303 invalidated. 304 @type invalidate: procedure(object) -> None 305 """ 306 if not (set or append or remove or setitem or delitem or invalidate): 307 print ("Warning: Use of deprecated %r.addListener(%r) without " 308 "explicit event handlers" % (self, listener)) 309 set = listener.stateSet 310 append = listener.stateAppend 311 remove = listener.stateRemove 312 self._ensureListeners() 313 if listener in self._listeners: 314 raise KeyError, listener 315 self._listeners[listener] = [set, append, remove, setitem, 316 delitem, invalidate] 317 if invalidate and hasattr(self, '_cache_invalid'): 318 invalidate(self)
319
320 - def removeListener(self, listener):
321 self._ensureListeners() 322 if listener not in self._listeners: 323 raise KeyError, listener 324 del self._listeners[listener]
325 326 # pb.RemoteCache methods
327 - def setCopyableState(self, dict):
328 self._dict = dict
329
330 - def observe_set(self, key, value):
331 self._dict[key] = value 332 # if we also subclass from Cacheable, then we're a proxy, so proxy 333 if hasattr(self, 'set'): 334 StateCacheable.set(self, key, value) 335 336 # notify our local listeners 337 self._ensureListeners() 338 for l in self._listeners: 339 stateSet = self._listeners[l][0] 340 if stateSet: 341 stateSet(self, key, value)
342
343 - def observe_append(self, key, value):
344 # if we also subclass from Cacheable, then we're a proxy, so proxy 345 if hasattr(self, 'append'): 346 StateCacheable.append(self, key, value) 347 else: 348 self._dict[key].append(value) 349 350 # notify our local listeners 351 self._ensureListeners() 352 for l in self._listeners: 353 stateAppend = self._listeners[l][1] 354 if stateAppend: 355 stateAppend(self, key, value)
356
357 - def observe_remove(self, key, value):
358 # if we also subclass from Cacheable, then we're a proxy, so proxy 359 if hasattr(self, 'remove'): 360 StateCacheable.remove(self, key, value) 361 else: 362 try: 363 self._dict[key].remove(value) 364 except ValueError: 365 raise ValueError("value %r not under key %r with values %r" % 366 (value, key, self._dict[key])) 367 368 # notify our local listeners 369 self._ensureListeners() 370 for l in self._listeners: 371 stateRemove = self._listeners[l][2] 372 if stateRemove: 373 stateRemove(self, key, value)
374
375 - def observe_setitem(self, key, subkey, value):
376 # if we also subclass from Cacheable, then we're a proxy, so proxy 377 if hasattr(self, 'setitem'): 378 StateCacheable.setitem(self, key, subkey, value) 379 else: 380 self._dict[key][subkey] = value 381 382 # notify our local listeners 383 self._ensureListeners() 384 for l in self._listeners: 385 stateSetitem = self._listeners[l][3] 386 if stateSetitem: 387 stateSetitem(self, key, subkey, value)
388
389 - def observe_delitem(self, key, subkey, value):
390 # if we also subclass from Cacheable, then we're a proxy, so proxy 391 if hasattr(self, 'delitem'): 392 StateCacheable.delitem(self, key, subkey) 393 else: 394 try: 395 del self._dict[key][subkey] 396 except KeyError: 397 raise KeyError("key %r not in dict %r for state dict %r" % 398 (subkey, self._dict[key], self._dict)) 399 400 # notify our local listeners 401 self._ensureListeners() 402 for l in self._listeners: 403 stateDelitem = self._listeners[l][4] 404 if stateDelitem: 405 stateDelitem(self, key, subkey, value)
406
407 - def invalidate(self):
408 """Invalidate this StateRemoteCache. 409 410 Calling this method will result in the invalidate callback being 411 called for all listeners that passed an invalidate handler to 412 addListener. This method is not called automatically; it is 413 provided as a convenience to applications. 414 """ 415 assert not hasattr(self, '_cache_invalid'), \ 416 'object has already been invalidated' 417 # if we also subclass from Cacheable, there is currently no way 418 # to remotely invalidate the cache. that's ok though, because 419 # double-caches are currently only used by the manager, which 420 # does not call invalidate() on its caches. 421 setattr(self, '_cache_invalid', True) 422 423 self._ensureListeners() 424 # copy of keys, because dict could change during iteration 425 for l in self._listeners.keys(): 426 invalidate = self._listeners[l][5] 427 if invalidate: 428 invalidate(self)
429