Package flumotion :: Package manager :: Module depgraph
[hide private]

Source Code for Module flumotion.manager.depgraph

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_depgraph -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  from flumotion.common import dag, log, registry, errors, common 
 23  from flumotion.common.planet import moods 
 24   
25 -class Feeder:
26 """ 27 I am an object representing a feeder in the DepGraph 28 """
29 - def __init__(self, feederName, component):
30 self.feederName = feederName 31 self.component = component 32 self.feederData = None
33
34 -class Eater:
35 """ 36 I am an object representing an eater in the DepGraph 37 """
38 - def __init__(self, eaterName, component):
39 # feeder attribute is a reference to the Feeder object 40 # that this eater eats from 41 self.eaterName = eaterName 42 self.component = component 43 self.feeder = None
44
45 -class DepGraph(log.Loggable):
46 """ 47 I am a dependency graph for components. I also maintain boolean state 48 for each of the nodes. 49 50 I contain a DAG to help with resolving dependencies. 51 """ 52 logCategory = "depgraph" 53 54 typeNames = ("WORKER", "JOB", "COMPONENTSETUP", "CLOCKMASTER", 55 "COMPONENTSTART") 56
57 - def __init__(self):
58 self._dag = dag.DAG() 59 self._state = {}
60
61 - def _addNode(self, component, type):
62 # type: str 63 self.debug("Adding node %r of type %s" % (component, type)) 64 self._dag.addNode(component, type) 65 self._state[(component, type)] = False
66
67 - def _removeNode(self, component, type):
68 self.debug("Removing node %r of type %s" % (component, type)) 69 self._dag.removeNode(component, type)
70
71 - def _addEdge(self, parent, child, parentType, childType):
72 self.debug("Adding edge %r of type %s to %r of type %s" % ( 73 parent, parentType, child, childType)) 74 self._dag.addEdge(parent, child, parentType, childType)
75
76 - def _removeEdge(self, parent, child, parentType, childType):
77 self.debug("Removing edge %r of type %s to %r of type %s" % ( 78 parent, parentType, child, childType)) 79 self._dag.removeEdge(parent, child, parentType, childType)
80
81 - def addClockMaster(self, component):
82 """ 83 I set a component to be the clock master in the dependency 84 graph. This component must have already been added to the 85 dependency graph. 86 87 @param component: the component to set as the clock master 88 @type component: L{flumotion.manager.component.ComponentAvatar} 89 """ 90 if self._dag.hasNode(component, "JOB"): 91 self._addNode(component, "CLOCKMASTER") 92 self._addEdge(component, component, "COMPONENTSETUP", 93 "CLOCKMASTER") 94 95 # now go through all the component starts and make them dep on the 96 # clock master 97 startnodes = self._dag.getAllNodesByType("COMPONENTSTART") 98 for start in startnodes: 99 # only add if they share the same parent flow 100 if start.get('parent') == component.get('parent'): 101 self._addEdge(component, start, "CLOCKMASTER", 102 "COMPONENTSTART") 103 else: 104 raise KeyError("Component %r has not been added" % component)
105
106 - def addComponent(self, component):
107 """ 108 I add a component to the dependency graph. 109 This includes adding the worker (if not already added), the job, 110 the feeders and the eaters. 111 112 Requirement: worker must already be assigned to component 113 114 @param component: component object to add 115 @type component: L{flumotion.common.planet.ManagerComponentState} 116 """ 117 if self._dag.hasNode(component, "JOB"): 118 self.debug('component %r already in depgraph, ignoring', 119 component) 120 return 121 122 self.debug('adding component %r to depgraph' % component) 123 self._addNode(component, "JOB") 124 self._addNode(component, "COMPONENTSTART") 125 self._addNode(component, "COMPONENTSETUP") 126 self._addEdge(component, component, "JOB", "COMPONENTSETUP") 127 workername = component.get('workerRequested') 128 if workername: 129 self.addWorker(workername) 130 self.setComponentWorker(component, workername) 131 self._addEdge(component, component, "COMPONENTSETUP", 132 "COMPONENTSTART")
133
134 - def addWorker(self, worker):
135 """ 136 I add a worker to the dependency graph. 137 138 @param worker: the worker to add 139 @type worker: str 140 """ 141 self.debug('adding worker %s' % worker) 142 if not self._dag.hasNode(worker, "WORKER"): 143 self._addNode(worker, "WORKER")
144
145 - def removeComponent(self, component):
146 """ 147 I remove a component in the dependency graph, this includes removing 148 the JOB, COMPONENTSETUP, COMPONENTSTART, CLOCKMASTER. 149 150 @param component: the component to remove 151 @type component: L{flumotion.manager.component.ComponentAvatar} 152 """ 153 self.debug('removing component %r from depgraph' % component) 154 for type in self.typeNames: 155 if self._dag.hasNode(component, type): 156 self._removeNode(component, type) 157 del self._state[(component, type)]
158
159 - def removeWorker(self, worker):
160 """ 161 I remove a worker from the dependency graph. 162 163 @param worker: the worker to remove 164 @type worker: str 165 """ 166 self.debug('removing worker %s' % worker) 167 if self._dag.hasNode(worker, "WORKER"): 168 self._dag.removeNode(worker, "WORKER") 169 del self._state[(worker, "WORKER")]
170
171 - def setComponentWorker(self, component, worker):
172 """ 173 I assign a component to a specific worker. 174 175 @param component: the component 176 @type component: L{flumotion.common.planet.ManagerComponentState} 177 @param worker: the worker to set it to 178 @type worker: str 179 """ 180 if self._dag.hasNode(worker, "WORKER") and ( 181 self._dag.hasNode(component, "JOB")): 182 self._addEdge(worker, component, "WORKER", "JOB") 183 else: 184 raise KeyError("Worker %s or Component %r not in dependency graph" % 185 (worker, component))
186
187 - def mapEatersToFeeders(self):
188 """ 189 I am called once a piece of configuration has been added, 190 so I can add edges to the DAG for each feed from the 191 feeding component to the eating component. 192 193 @raise errors.ComponentConfigError: if a component is 194 misconfigured and eats from 195 a non-existant component 196 """ 197 toSetup = self._dag.getAllNodesByType("COMPONENTSETUP") 198 199 for eatingComponent in toSetup: 200 # for this component setup, go through all the feeders in it 201 config = eatingComponent.get('config') 202 203 if not config.has_key('source'): 204 # no eaters 205 self.debug("Component %r has no eaters" % eatingComponent) 206 else: 207 # source is a list of componentName[:feedName] 208 # with feedName defaulting to default 209 # FIXME: maybe source should really be eaters and contain 210 # a list of feedId 211 list = config['source'] 212 213 # FIXME: there's a bug in config parsing - sometimes this gives 214 # us one string, and sometimes a list of one string, and 215 # sometimes a list 216 if isinstance(list, str): 217 list = [list, ] 218 219 for source in list: 220 feederFound = False 221 feederComponentName = source.split(':')[0] 222 # find the feeder 223 for feedingComponent in toSetup: 224 if feedingComponent.get("name") == feederComponentName: 225 feederFound = True 226 try: 227 self._addEdge(feedingComponent, eatingComponent, 228 "COMPONENTSETUP", "COMPONENTSETUP") 229 except KeyError: 230 # it is possible for a component to have 231 # two eaters, each eating from feeders on 232 # one other component 233 pass 234 try: 235 self._addEdge(feedingComponent, eatingComponent, 236 "COMPONENTSTART", "COMPONENTSTART") 237 except KeyError: 238 pass 239 240 if not feederFound: 241 raise errors.ComponentConfigError(eatingComponent, 242 "No feeder exists for eater %s" % source)
243
244 - def whatShouldBeStarted(self):
245 """ 246 I return a list of things that can and should be started now. 247 248 @return: a list of nodes that should be started, in order 249 @rtype: list of (object, str) 250 """ 251 # A bit tricky because workers can't be started by manager, 252 # and jobs are started automatically when worker is attached 253 # So we get all the stuff sorted by depgraph, 254 # then remove ones that are already have state of True, 255 # then remove ones that are workers who are False, and their offspring, 256 # then remove ones that are jobs who are False, and their offspring, 257 # and also remove nodes that are offspring of nodes with state of False 258 toBeStarted = self._dag.sort() 259 # we want to loop over all objects, so we loop over a copy 260 for obj in toBeStarted[:]: 261 if obj in toBeStarted: 262 self.log("toBeStarted: checking if (%r, %r) needs starting", 263 obj[0], obj[1]) 264 if self._state[obj]: 265 toBeStarted.remove(obj) 266 elif obj[1] == "WORKER": 267 # This is a worker not started. 268 # Let's remove it and its offspring 269 worker_offspring = self._dag.getOffspringTyped( 270 obj[0], obj[1]) 271 for offspring in worker_offspring: 272 if offspring in toBeStarted: 273 toBeStarted.remove(offspring) 274 toBeStarted.remove(obj) 275 elif obj[1] == "JOB": 276 job_offspring = self._dag.getOffspringTyped(obj[0], obj[1]) 277 for offspring in job_offspring: 278 if offspring in toBeStarted: 279 toBeStarted.remove(offspring) 280 toBeStarted.remove(obj) 281 else: 282 offspring = self._dag.getOffspringTyped(obj[0], obj[1]) 283 for child in offspring: 284 if child in toBeStarted: 285 toBeStarted.remove(child) 286 287 return toBeStarted
288
289 - def _setState(self, object, type, value):
290 self.doLog(log.DEBUG, -2, "Setting state of (%r, %s) to %d" % ( 291 object, type, value)) 292 self._state[(object,type)] = value 293 # if making state False, should make its offspring False 294 # if the object is the same 295 if not value: 296 self.debug("Setting state of all (%r, %s)'s offspring to %d" % 297 (object, type, value)) 298 offspring = self._dag.getOffspringTyped(object, type) 299 for kid in offspring: 300 self.debug("Setting state of offspring (%r) to %d", kid, value) 301 if kid[0] == object: 302 self._state[kid] = False
303
304 - def setComponentStarted(self, component):
305 """ 306 Set a COMPONENTSTART node to have state of True 307 308 @param component: the component to set COMPONENTSTART to True for 309 @type component: L{flumotion.common.planet.ManagerComponentState} 310 """ 311 self._setState(component, "COMPONENTSTART", True)
312
313 - def setComponentNotStarted(self, component):
314 """ 315 Set a COMPONENTSTART node to have state of False 316 317 @param component: the component to set COMPONENTSTART to False for 318 @type component: L{flumotion.common.planet.ManagerComponentState} 319 """ 320 321 self._setState(component, "COMPONENTSTART", False)
322
323 - def setComponentSetup(self, component):
324 """ 325 Set a COMPONENTSETUP node to have state of True 326 327 @param component: the component to set COMPONENTSETUP to True for 328 @type component: L{flumotion.common.planet.ManagerComponentState} 329 """ 330 331 self._setState(component, "COMPONENTSETUP", True)
332
333 - def setComponentNotSetup(self, component):
334 """ 335 Set a COMPONENTSETUP node to have state of False 336 337 @param component: the component to set COMPONENTSETUP to True for 338 @type component: L{flumotion.common.planet.ManagerComponentState} 339 """ 340 341 self._setState(component, "COMPONENTSETUP", False)
342 343
344 - def setJobStarted(self, component):
345 """ 346 Set a JOB node to have state of True 347 348 @param component: the component to set JOB to True for 349 @type component: L{flumotion.common.planet.ManagerComponentState} 350 """ 351 self._setState(component, "JOB", True)
352
353 - def setJobStopped(self, component):
354 """ 355 Set a JOB node to have state of False 356 357 @param component: the component to set JOB to False for 358 @type component: L{flumotion.common.planet.ManagerComponentState} 359 """ 360 self.doLog(log.DEBUG, -2, "Setting component's job %r to FALSE" % 361 component) 362 self._setState(component, "JOB", False)
363
364 - def setWorkerStarted(self, worker):
365 """ 366 Set a WORKER node to have state of True 367 368 @param worker: the component to set WORKER to True for 369 @type worker: str 370 """ 371 self._setState(worker, "WORKER", True)
372
373 - def setWorkerStopped(self, worker):
374 """ 375 Set a WORKER node to have state of False 376 377 @param worker: the component to set WORKER to False for 378 @type worker: str 379 """ 380 self._setState(worker, "WORKER", False)
381
382 - def setClockMasterStarted(self, component):
383 """ 384 Set a CLOCKMASTER node to have state of True 385 386 @param component: the component to set CLOCKMASTER to True for 387 @type component: {flumotion.common.planet.ManagerComponentState} 388 """ 389 self._setState(component, "CLOCKMASTER", True)
390
391 - def setClockMasterStopped(self, component):
392 """ 393 Set a CLOCKMASTER node to have state of False 394 395 @param component: the component to set CLOCKMASTER to True for 396 @type component: {flumotion.common.planet.ManagerComponentState} 397 """ 398 self._setState(component, "CLOCKMASTER", False)
399
400 - def isAClockMaster(self, component):
401 """ 402 Checks if component has a CLOCKMASTER node 403 404 @param component: the component to check if CLOCKMASTER node exists 405 @type component: {flumotion.common.planet.ManagerComponentState} 406 """ 407 return self._dag.hasNode(component, "CLOCKMASTER")
408