Package flumotion :: Package manager :: Module worker
[hide private]

Source Code for Module flumotion.manager.worker

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  manager-side objects to handle worker clients 
 24  """ 
 25   
 26  from twisted.internet import defer 
 27   
 28  from flumotion.manager import base 
 29  from flumotion.common import errors, interfaces, log, registry 
 30  from flumotion.common import worker, common 
 31  from flumotion.common.vfs import registerVFSJelly 
 32   
 33  __version__ = "$Rev: 7038 $" 
 34   
 35   
36 -class WorkerAvatar(base.ManagerAvatar):
37 """ 38 I am an avatar created for a worker. 39 A reference to me is given when logging in and requesting a worker avatar. 40 I live in the manager. 41 42 @ivar feedServerPort: TCP port the feed server is listening on 43 @type feedServerPort: int 44 """ 45 logCategory = 'worker-avatar' 46 47 _portSet = None 48 feedServerPort = None 49
50 - def __init__(self, heaven, avatarId, remoteIdentity, mind, 51 feedServerPort, ports, randomPorts):
52 base.ManagerAvatar.__init__(self, heaven, avatarId, 53 remoteIdentity, mind) 54 self.feedServerPort = feedServerPort 55 56 self._portSet = worker.PortSet(self.avatarId, ports, randomPorts) 57 58 self.heaven.workerAttached(self) 59 self.vishnu.workerAttached(self) 60 61 registerVFSJelly()
62
63 - def getName(self):
64 return self.avatarId
65
66 - def makeAvatarInitArgs(klass, heaven, avatarId, remoteIdentity, 67 mind):
68 def havePorts(res): 69 log.debug('worker-avatar', 'got port information') 70 (_s1, feedServerPort), (_s2, (ports, random)) = res 71 return (heaven, avatarId, remoteIdentity, mind, 72 feedServerPort, ports, random)
73 log.debug('worker-avatar', 'calling mind for port information') 74 d = defer.DeferredList([mind.callRemote('getFeedServerPort'), 75 mind.callRemote('getPorts')], 76 fireOnOneErrback=True) 77 d.addCallback(havePorts) 78 return d
79 makeAvatarInitArgs = classmethod(makeAvatarInitArgs) 80
81 - def onShutdown(self):
82 self.heaven.workerDetached(self) 83 self.vishnu.workerDetached(self) 84 base.ManagerAvatar.onShutdown(self)
85
86 - def reservePorts(self, numPorts):
87 """ 88 Reserve the given number of ports on the worker. 89 90 @param numPorts: how many ports to reserve 91 @type numPorts: int 92 """ 93 return self._portSet.reservePorts(numPorts)
94
95 - def releasePorts(self, ports):
96 """ 97 Release the given list of ports on the worker. 98 99 @param ports: list of ports to release 100 @type ports: list of int 101 """ 102 self._portSet.releasePorts(ports)
103
104 - def createComponent(self, avatarId, type, nice, conf):
105 """ 106 Create a component of the given type with the given nice level. 107 108 @param avatarId: avatarId the component should use to log in 109 @type avatarId: str 110 @param type: type of the component to create 111 @type type: str 112 @param nice: the nice level to create the component at 113 @type nice: int 114 @param conf: the component's config dict 115 @type conf: dict 116 117 @returns: a deferred that will give the avatarId the component 118 will use to log in to the manager 119 """ 120 self.debug('creating %s (%s) on worker %s with nice level %d', 121 avatarId, type, self.avatarId, nice) 122 defs = registry.getRegistry().getComponent(type) 123 try: 124 entry = defs.getEntryByType('component') 125 # FIXME: use entry.getModuleName() (doesn't work atm?) 126 moduleName = defs.getSource() 127 methodName = entry.getFunction() 128 except KeyError: 129 self.warning('no "component" entry in registry of type %s, %s', 130 type, 'falling back to createComponent') 131 moduleName = defs.getSource() 132 methodName = "createComponent" 133 134 self.debug('call remote create') 135 return self.mindCallRemote('create', avatarId, type, moduleName, 136 methodName, nice, conf)
137
138 - def getComponents(self):
139 """ 140 Get a list of components that the worker is running. 141 142 @returns: a deferred that will give the avatarIds running on the 143 worker 144 """ 145 self.debug('getting component list from worker %s' % 146 self.avatarId) 147 return self.mindCallRemote('getComponents')
148 149 ### IPerspective methods, called by the worker's component
150 - def perspective_componentAddMessage(self, avatarId, message):
151 """ 152 Called by the worker to tell the manager to add a given message to 153 the given component. 154 155 Useful in cases where the component can't report messages itself, 156 for example because it crashed. 157 158 @param avatarId: avatarId of the component the message is about 159 @type message: L{flumotion.common.messages.Message} 160 """ 161 self.debug('received message from component %s' % avatarId) 162 self.vishnu.componentAddMessage(avatarId, message)
163
164 -class WorkerHeaven(base.ManagerHeaven):
165 """ 166 I interface between the Manager and worker clients. 167 For each worker client I create an L{WorkerAvatar} to handle requests. 168 I live in the manager. 169 """ 170 171 logCategory = "workerheaven" 172 avatarClass = WorkerAvatar 173
174 - def __init__(self, vishnu):
177 178 ### my methods
179 - def workerAttached(self, workerAvatar):
180 """ 181 Notify the heaven that the given worker has logged in. 182 183 @type workerAvatar: L{WorkerAvatar} 184 """ 185 workerName = workerAvatar.getName() 186 if not workerName in self.state.get('names'): 187 # wheee 188 host = workerAvatar.mind.broker.transport.getPeer().host 189 state = worker.ManagerWorkerState(name=workerName, host=host) 190 self.state.append('names', workerName) 191 self.state.append('workers', state) 192 else: 193 self.warning('worker %s was already registered in the heaven', 194 workerName) 195 raise errors.AlreadyConnectedError()
196
197 - def workerDetached(self, workerAvatar):
198 """ 199 Notify the heaven that the given worker has logged out. 200 201 @type workerAvatar: L{WorkerAvatar} 202 """ 203 workerName = workerAvatar.getName() 204 try: 205 self.state.remove('names', workerName) 206 for state in list(self.state.get('workers')): 207 if state.get('name') == workerName: 208 self.state.remove('workers', state) 209 except ValueError: 210 self.warning('worker %s was never registered in the heaven', 211 workerName)
212