1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 manager-side objects to handle worker clients
24 """
25
26 from twisted.internet import defer
27
28 from flumotion.manager import base
29 from flumotion.common import errors, interfaces, log, registry
30 from flumotion.common import worker, common
31 from flumotion.common.vfs import registerVFSJelly
32
33 __version__ = "$Rev: 7038 $"
34
35
37 """
38 I am an avatar created for a worker.
39 A reference to me is given when logging in and requesting a worker avatar.
40 I live in the manager.
41
42 @ivar feedServerPort: TCP port the feed server is listening on
43 @type feedServerPort: int
44 """
45 logCategory = 'worker-avatar'
46
47 _portSet = None
48 feedServerPort = None
49
50 - def __init__(self, heaven, avatarId, remoteIdentity, mind,
51 feedServerPort, ports, randomPorts):
62
65
68 def havePorts(res):
69 log.debug('worker-avatar', 'got port information')
70 (_s1, feedServerPort), (_s2, (ports, random)) = res
71 return (heaven, avatarId, remoteIdentity, mind,
72 feedServerPort, ports, random)
73 log.debug('worker-avatar', 'calling mind for port information')
74 d = defer.DeferredList([mind.callRemote('getFeedServerPort'),
75 mind.callRemote('getPorts')],
76 fireOnOneErrback=True)
77 d.addCallback(havePorts)
78 return d
79 makeAvatarInitArgs = classmethod(makeAvatarInitArgs)
80
85
87 """
88 Reserve the given number of ports on the worker.
89
90 @param numPorts: how many ports to reserve
91 @type numPorts: int
92 """
93 return self._portSet.reservePorts(numPorts)
94
96 """
97 Release the given list of ports on the worker.
98
99 @param ports: list of ports to release
100 @type ports: list of int
101 """
102 self._portSet.releasePorts(ports)
103
105 """
106 Create a component of the given type with the given nice level.
107
108 @param avatarId: avatarId the component should use to log in
109 @type avatarId: str
110 @param type: type of the component to create
111 @type type: str
112 @param nice: the nice level to create the component at
113 @type nice: int
114 @param conf: the component's config dict
115 @type conf: dict
116
117 @returns: a deferred that will give the avatarId the component
118 will use to log in to the manager
119 """
120 self.debug('creating %s (%s) on worker %s with nice level %d',
121 avatarId, type, self.avatarId, nice)
122 defs = registry.getRegistry().getComponent(type)
123 try:
124 entry = defs.getEntryByType('component')
125
126 moduleName = defs.getSource()
127 methodName = entry.getFunction()
128 except KeyError:
129 self.warning('no "component" entry in registry of type %s, %s',
130 type, 'falling back to createComponent')
131 moduleName = defs.getSource()
132 methodName = "createComponent"
133
134 self.debug('call remote create')
135 return self.mindCallRemote('create', avatarId, type, moduleName,
136 methodName, nice, conf)
137
139 """
140 Get a list of components that the worker is running.
141
142 @returns: a deferred that will give the avatarIds running on the
143 worker
144 """
145 self.debug('getting component list from worker %s' %
146 self.avatarId)
147 return self.mindCallRemote('getComponents')
148
149
151 """
152 Called by the worker to tell the manager to add a given message to
153 the given component.
154
155 Useful in cases where the component can't report messages itself,
156 for example because it crashed.
157
158 @param avatarId: avatarId of the component the message is about
159 @type message: L{flumotion.common.messages.Message}
160 """
161 self.debug('received message from component %s' % avatarId)
162 self.vishnu.componentAddMessage(avatarId, message)
163
165 """
166 I interface between the Manager and worker clients.
167 For each worker client I create an L{WorkerAvatar} to handle requests.
168 I live in the manager.
169 """
170
171 logCategory = "workerheaven"
172 avatarClass = WorkerAvatar
173
177
178
196
198 """
199 Notify the heaven that the given worker has logged out.
200
201 @type workerAvatar: L{WorkerAvatar}
202 """
203 workerName = workerAvatar.getName()
204 try:
205 self.state.remove('names', workerName)
206 for state in list(self.state.get('workers')):
207 if state.get('name') == workerName:
208 self.state.remove('workers', state)
209 except ValueError:
210 self.warning('worker %s was never registered in the heaven',
211 workerName)
212