Package flumotion :: Package common :: Module planet
[hide private]

Source Code for Module flumotion.common.planet

  1  # -*- Mode: Python; -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """serializable objects from worker through manager to admin. 
 23  Used by planet, flow, job and component. 
 24  """ 
 25   
 26  from twisted.spread import pb 
 27  from twisted.internet import defer 
 28  from zope.interface import implements 
 29   
 30  from flumotion.twisted import flavors 
 31  from flumotion.common import enum, log 
 32   
 33  __version__ = "$Rev: 6990 $" 
 34   
 35   
36 -class ManagerPlanetState(flavors.StateCacheable):
37 """ 38 I represent the state of a planet in the manager. 39 40 I have the following keys: 41 42 - name 43 - manager 44 - atmosphere: L{ManagerAtmosphereState} 45 - flows (list): list of L{ManagerFlowState} 46 """ 47 # FIXME: why is there a 'parent' key ?
48 - def __init__(self):
49 flavors.StateCacheable.__init__(self) 50 self.addKey('name') 51 self.addKey('parent') 52 self.addKey('manager') 53 self.addKey('atmosphere') 54 self.addListKey('flows') 55 self.addDictKey('messages') 56 57 # we always have at least one atmosphere 58 self.set('atmosphere', ManagerAtmosphereState()) 59 self.get('atmosphere').set('parent', self)
60
61 - def getComponents(self):
62 """ 63 Return a list of all component states in this planet 64 (from atmosphere and all flows). 65 66 @rtype: list of L{ManagerComponentState} 67 """ 68 ret = [] 69 70 a = self.get('atmosphere') 71 if a: 72 ret.extend(a.get('components')) 73 74 flows = self.get('flows') 75 if flows: 76 for flow in flows: 77 ret.extend(flow.get('components')) 78 79 return ret
80 81
82 -class AdminPlanetState(flavors.StateRemoteCache):
83 """ 84 I represent the state of a planet in an admin client. 85 See L{ManagerPlanetState}. 86 """
87 - def invalidate(self):
88 for flow in self.get('flows'): 89 flow.invalidate() 90 91 self.get('atmosphere').invalidate() 92 93 flavors.StateRemoteCache.invalidate(self)
94 95 pb.setUnjellyableForClass(ManagerPlanetState, AdminPlanetState) 96
97 -class ManagerAtmosphereState(flavors.StateCacheable):
98 """ 99 I represent the state of an atmosphere in the manager. 100 The atmosphere contains components that do not participate in a flow, 101 but provide services to flow components. 102 103 I have the following keys: 104 105 - name: string, "atmosphere" 106 - parent: L{ManagerPlanetState} 107 - components (list): list of L{ManagerComponentState} 108 """ 109
110 - def __init__(self):
111 flavors.StateCacheable.__init__(self) 112 self.addKey('parent') 113 self.addListKey('components') 114 self.addKey('name') 115 self.set('name', 'atmosphere')
116
117 - def empty(self):
118 """ 119 Clear out all component entries. 120 121 @returns: a DeferredList that will fire when all notifications 122 are done. 123 """ 124 # make a copy, so we can iterate safely while modifying 125 components = self.get('components')[:] 126 127 dList = [self.remove('components', c) for c in components] 128 return defer.DeferredList(dList)
129
130 -class AdminAtmosphereState(flavors.StateRemoteCache):
131 """ 132 I represent the state of an atmosphere in an admin client. 133 See L{ManagerAtmosphereState}. 134 """
135 - def invalidate(self):
136 for component in self.get('components'): 137 component.invalidate() 138 139 flavors.StateRemoteCache.invalidate(self)
140 141 pb.setUnjellyableForClass(ManagerAtmosphereState, AdminAtmosphereState) 142
143 -class ManagerFlowState(flavors.StateCacheable):
144 """ 145 I represent the state of a flow in the manager. 146 147 I have the following keys: 148 149 - name: string, name of the flow 150 - parent: L{ManagerPlanetState} 151 - components (list): list of L{ManagerComponentState} 152 """
153 - def __init__(self, **kwargs):
154 """ 155 ManagerFlowState constructor. Any keyword arguments are 156 intepreted as initial key-value pairs to set on the new 157 ManagerFlowState. 158 """ 159 flavors.StateCacheable.__init__(self) 160 self.addKey('name') 161 self.addKey('parent') 162 self.addListKey('components') 163 for k, v in kwargs.items(): 164 self.set(k, v)
165
166 - def empty(self):
167 """ 168 Clear out all component entries 169 """ 170 # take a copy of the list because we're modifying while running 171 components = self.get('components')[:] 172 173 dList = [self.remove('components', c) for c in components] 174 return defer.DeferredList(dList)
175
176 -class AdminFlowState(flavors.StateRemoteCache):
177 """ 178 I represent the state of a flow in an admin client. 179 See L{ManagerFlowState}. 180 """
181 - def invalidate(self):
182 for component in self.get('components'): 183 component.invalidate() 184 185 flavors.StateRemoteCache.invalidate(self)
186 187 pb.setUnjellyableForClass(ManagerFlowState, AdminFlowState) 188 189 # moods 190 # FIXME. make epydoc like this 191 """ 192 @cvar moods: an enum representing the mood a component can be in. 193 """ 194 moods = enum.EnumClass( 195 'Moods', 196 ('happy', 'hungry', 'waking', 'sleeping', 'lost', 'sad') 197 ) 198 moods.can_stop = staticmethod(lambda m: m != moods.sleeping) 199 moods.can_start = staticmethod(lambda m: m == moods.sleeping) 200 201 _jobStateKeys = ['mood', 'manager-ip', 'pid', 'workerName'] 202 _jobStateListKeys = ['messages', ] 203 204 # FIXME: maybe make Atmosphere and Flow subclass from a ComponentGroup class ?
205 -class ManagerComponentState(flavors.StateCacheable):
206 """ 207 I represent the state of a component in the manager. 208 I have my own state, and also proxy state from the L{ManagerJobState} 209 when the component is actually created in a worker. 210 211 I have the following keys of my own: 212 213 - name: str, name of the component, unique in the parent 214 - parent: L{ManagerFlowState} or L{ManagerAtmosphereState} 215 - type: str, type of the component 216 - moodPending: int, the mood value the component is being set to 217 - workerRequested: str, name of the worker this component is 218 requested to be started on. 219 - config: dict, the configuration dict for this component 220 221 It also has a special key, 'mood'. This acts as a proxy for the mood 222 in the L{WorkerJobState}, when there is a job attached (the job's copy 223 is authoritative when it connects), and is controlled independently at 224 other times. 225 226 I proxy the following keys from the serialized L{WorkerJobState}: 227 - mood, manager-ip, pid, workerName 228 - messages (list) 229 """ 230
231 - def __init__(self):
232 flavors.StateCacheable.__init__(self) 233 # our additional keys 234 self.addKey('name') 235 self.addKey('type') 236 self.addKey('parent') 237 self.addKey('moodPending') 238 self.addKey('workerRequested') 239 self.addKey('config') # dictionary 240 241 # proxied from job state or combined with our state (mood) 242 for k in _jobStateKeys: 243 self.addKey(k) 244 for k in _jobStateListKeys: 245 self.addListKey(k) 246 self._jobState = None
247
248 - def __repr__(self):
249 return "<%s.%s name=%r>" % (self.__module__, 250 self.__class__.__name__, 251 self._dict['name'])
252
253 - def setJobState(self, jobState):
254 """ 255 Set the job state I proxy from. 256 257 @type jobState: L{ManagerJobState} 258 """ 259 self._jobState = jobState 260 for key in _jobStateKeys: 261 # only set non-None values 262 if key == 'mood': 263 continue 264 v = jobState.get(key) 265 if v != None: 266 self.set(key, v) 267 for key in _jobStateListKeys: 268 valueList = jobState.get(key) 269 if valueList != None: 270 for v in valueList: 271 self.append(key, v) 272 # set mood last; see #552 273 self.set('mood', jobState.get('mood')) 274 275 # only proxy keys we want proxied; eaterNames and feederNames 276 # are ignored for example 277 proxiedKeys = _jobStateKeys + _jobStateListKeys 278 def proxy(attr): 279 def event(state, key, value): 280 if key in proxiedKeys: 281 getattr(self, attr)(key, value)
282 return event
283 284 jobState.addListener(self, set_=proxy('set'), append=proxy('append'), 285 remove=proxy('remove')) 286
287 - def set(self, key, value):
288 # extend set so we can log mood changes 289 if key == 'mood': 290 log.info('componentstate', 'mood of %s changed to %s', 291 self.get('name'), moods.get(value).name) 292 flavors.StateCacheable.set(self, key, value) 293 if key == 'mood' and value == self.get('moodPending'): 294 # we have reached our pending mood 295 self.set('moodPending', None)
296
297 - def setMood(self, moodValue):
298 if self._jobState and moodValue != moods.sad.value: 299 log.warning('componentstate', 'cannot set component mood to ' 300 'something other than sad when we have a ' 301 'jobState -- fix your code!') 302 elif moodValue == self.get('mood'): 303 log.log('componentstate', '%s already in mood %d', 304 self.get('name'), moodValue) 305 else: 306 log.debug('componentstate', 'manager sets mood of %s from %s to %d', 307 self.get('name'), self.get('mood'), moodValue) 308 self.set('mood', moodValue)
309
310 - def clearJobState(self, shutdownRequested):
311 """ 312 Remove the job state. 313 """ 314 # Clear messages proxied from job 315 for m in self._jobState.get('messages'): 316 self.remove('messages', m) 317 318 self._jobState.removeListener(self) 319 self._jobState = None 320 321 # Clearing a job state means that a component logged out. If the 322 # component logs out due to an explicit manager request, go to 323 # sleeping. Otherwise if the component is sad, leave the mood as 324 # it is, or otherwise go to lost, because it got disconnected 325 # for an unknown reason (probably network related). 326 if shutdownRequested: 327 log.debug('componentstate', "Shutdown was requested, %s" 328 " now sleeping", self.get('name')) 329 self.setMood(moods.sleeping.value) 330 elif self.get('mood') != moods.sad.value: 331 log.debug('componentstate', "Shutdown was NOT requested," 332 " %s now lost", self.get('name')) 333 self.setMood(moods.lost.value)
334
335 -class AdminComponentState(flavors.StateRemoteCache):
336 """ 337 I represent the state of a component in the admin client. 338 See L{ManagerComponentState}. 339 """
340 - def __repr__(self):
341 return "<%s.%s name=%r>" % (self.__module__, 342 self.__class__.__name__, 343 self._dict['name'])
344 345 pb.setUnjellyableForClass(ManagerComponentState, AdminComponentState) 346 347 # state of an existing component running in a job process 348 # exchanged between worker and manager
349 -class WorkerJobState(flavors.StateCacheable):
350 """ 351 I represent the state of a job in the worker, running a component. 352 353 I have the following keys: 354 355 - mood: int, value of the mood this component is in 356 - ip: string, IP address of the worker 357 - pid: int, PID of the job process 358 - workerName: string, name of the worker I'm running on 359 - messages: list of L{flumotion.common.messages.Message} 360 361 In addition, if I am the state of a FeedComponent, then I also 362 have the following keys: 363 364 - eaterNames: list of feedId being eaten by the eaters 365 - feederNames: list of feedId being fed by the feeders 366 367 @todo: change eaterNames and feederNames to eaterFeedIds and ... 368 """
369 - def __init__(self):
370 flavors.StateCacheable.__init__(self) 371 for k in _jobStateKeys: 372 self.addKey(k) 373 for k in _jobStateListKeys: 374 self.addListKey(k)
375
376 -class ManagerJobState(flavors.StateRemoteCache):
377 """ 378 I represent the state of a job in the manager. 379 See L{WorkerJobState}. 380 """ 381 pass
382 383 pb.setUnjellyableForClass(WorkerJobState, ManagerJobState) 384