Package flumotion :: Package manager :: Module worker
[hide private]

Source Code for Module flumotion.manager.worker

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  manager-side objects to handle worker clients 
 24  """ 
 25   
 26  from twisted.internet import defer 
 27   
 28  # FIXME: rename to base 
 29  from flumotion.manager import base 
 30  from flumotion.common import errors, interfaces, log, registry 
 31  from flumotion.common import config, worker, common 
 32  from flumotion.twisted.defer import defer_generator_method 
 33   
34 -class WorkerAvatar(base.ManagerAvatar):
35 """ 36 I am an avatar created for a worker. 37 A reference to me is given when logging in and requesting a worker avatar. 38 I live in the manager. 39 40 @ivar feedServerPort: TCP port the feed server is listening on 41 @type feedServerPort: int 42 """ 43 logCategory = 'worker-avatar' 44 45 _portSet = None 46 feedServerPort = None 47
48 - def getName(self):
49 return self.avatarId
50
51 - def attached(self, mind):
52 # doc in base class 53 self.info('worker "%s" logged in' % self.getName()) 54 base.ManagerAvatar.attached(self, mind) 55 56 self.debug('MANAGER -> WORKER: getFeedServerPort()') 57 d = self.mindCallRemote('getFeedServerPort') 58 yield d 59 # this can be None if no feed server is configured 60 self.feedServerPort = d.value() 61 self.debug('WORKER -> MANAGER: getFeedServerPort(): %r' % 62 self.feedServerPort) 63 64 self.debug('MANAGER -> WORKER: getPorts()') 65 d = self.mindCallRemote('getPorts') 66 yield d 67 ports = d.value() 68 self.debug('WORKER -> MANAGER: getPorts(): %r' % ports) 69 self._portSet = worker.PortSet(self.avatarId, ports) 70 71 self.heaven.workerAttached(self) 72 self.vishnu.workerAttached(self)
73 attached = defer_generator_method(attached) 74
75 - def detached(self, mind):
76 # doc in base class 77 self.info('worker "%s" logged out' % self.getName()) 78 base.ManagerAvatar.detached(self, mind) 79 self.heaven.workerDetached(self) 80 self.vishnu.workerDetached(self)
81
82 - def reservePorts(self, numPorts):
83 """ 84 Reserve the given number of ports on the worker. 85 86 @param numPorts: how many ports to reserve 87 @type numPorts: int 88 """ 89 return self._portSet.reservePorts(numPorts)
90
91 - def releasePorts(self, ports):
92 """ 93 Release the given list of ports on the worker. 94 95 @param ports: list of ports to release 96 @type ports: list of int 97 """ 98 self._portSet.releasePorts(ports)
99
100 - def createComponent(self, avatarId, type, nice):
101 """ 102 Create a component of the given type with the given nice level. 103 104 @param avatarId: avatarId the component should use to log in 105 @type avatarId: str 106 @param type: type of the component to create 107 @type type: str 108 @param nice: the nice level to create the component at 109 @type nice: int 110 111 @returns: a deferred that will give the avatarId the component 112 will use to log in to the manager 113 """ 114 self.debug('creating %s (%s) on worker %s with nice level %d' % ( 115 avatarId, type, self.avatarId, nice)) 116 defs = registry.getRegistry().getComponent(type) 117 try: 118 entry = defs.getEntryByType('component') 119 # FIXME: use entry.getModuleName() (doesn't work atm?) 120 moduleName = defs.getSource() 121 methodName = entry.getFunction() 122 except KeyError: 123 self.warning('no "component" entry in registry of type %s, %s', 124 type, 'falling back to createComponent') 125 moduleName = defs.getSource() 126 methodName = "createComponent" 127 128 self.debug('call remote create') 129 return self.mindCallRemote('create', avatarId, type, moduleName, 130 methodName, nice)
131
132 - def getComponents(self):
133 """ 134 Get a list of components that the worker is running. 135 136 @returns: a deferred that will give the avatarIds running on the 137 worker 138 """ 139 self.debug('getting component list from worker %s' % 140 self.avatarId) 141 return self.mindCallRemote('getComponents')
142 143 ### IPerspective methods, called by the worker's component
144 - def perspective_componentAddMessage(self, avatarId, message):
145 """ 146 Called by the worker to tell the manager to add a given message to 147 the given component. 148 149 Useful in cases where the component can't report messages itself, 150 for example because it crashed. 151 152 @param avatarId: avatarId of the component the message is about 153 @type message: L{flumotion.common.messages.Message} 154 """ 155 self.debug('received message from component %s' % avatarId) 156 self.vishnu.componentAddMessage(avatarId, message)
157
158 -class WorkerHeaven(base.ManagerHeaven):
159 """ 160 I interface between the Manager and worker clients. 161 For each worker client I create an L{WorkerAvatar} to handle requests. 162 I live in the manager. 163 """ 164 165 logCategory = "workerheaven" 166 avatarClass = WorkerAvatar 167
168 - def __init__(self, vishnu):
169 base.ManagerHeaven.__init__(self, vishnu) 170 self.conf = None 171 self.state = worker.ManagerWorkerHeavenState()
172 173 ### my methods
174 - def workerAttached(self, workerAvatar):
175 """ 176 Notify the heaven that the given worker has logged in. 177 178 @type workerAvatar: L{WorkerAvatar} 179 """ 180 workerName = workerAvatar.getName() 181 if not workerName in self.state.get('names'): 182 # wheee 183 host = workerAvatar.mind.broker.transport.getPeer().host 184 state = worker.ManagerWorkerState(name=workerName, host=host) 185 self.state.append('names', workerName) 186 self.state.append('workers', state) 187 else: 188 # FIXME: what if it was already there ? 189 self.warning('worker %s was already registered in the heaven' % 190 workerName)
191
192 - def workerDetached(self, workerAvatar):
193 """ 194 Notify the heaven that the given worker has logged out. 195 196 @type workerAvatar: L{WorkerAvatar} 197 """ 198 workerName = workerAvatar.getName() 199 try: 200 self.state.remove('names', workerName) 201 for state in list(self.state.get('workers')): 202 if state.get('name') == workerName: 203 self.state.remove('workers', state) 204 except ValueError: 205 self.warning('worker %s was never registered in the heaven' % 206 workerName)
207