1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 worker-side objects to handle worker clients
24 """
25
26 import signal
27
28 from twisted.internet import reactor, error
29 from twisted.spread import flavors
30 from zope.interface import implements
31
32 from flumotion.common import errors, interfaces, debug
33 from flumotion.common import medium
34 from flumotion.common.vfs import listDirectory, registerVFSJelly
35 from flumotion.twisted.pb import ReconnectingFPBClientFactory
36
37 __version__ = "$Rev: 7038 $"
38 JOB_SHUTDOWN_TIMEOUT = 5
39
40
42 """
43 I am a client factory for the worker to log in to the manager.
44 """
45 logCategory = 'worker'
46 perspectiveInterface = interfaces.IWorkerMedium
47
61
71
72
74
75
76 def remoteDisconnected(remoteReference):
77 if reactor.killed:
78 self.log('Connection to manager lost due to shutdown')
79 else:
80 self.warning('Lost connection to manager, '
81 'will attempt to reconnect')
82
83 def loginCallback(reference):
84 self.info("Logged in to manager")
85 self.debug("remote reference %r" % reference)
86
87 self.medium.setRemoteReference(reference)
88 reference.notifyOnDisconnect(remoteDisconnected)
89
90 def alreadyConnectedErrback(failure):
91 failure.trap(errors.AlreadyConnectedError)
92 self.warning('A worker with the name "%s" is already connected.' %
93 failure.value)
94
95 def accessDeniedErrback(failure):
96 failure.trap(errors.NotAuthenticatedError)
97 self.warning('Access denied.')
98
99 def connectionRefusedErrback(failure):
100 failure.trap(error.ConnectionRefusedError)
101 self.warning('Connection to %s:%d refused.' % (self._managerHost,
102 self._managerPort))
103
104 def NoSuchMethodErrback(failure):
105 failure.trap(flavors.NoSuchMethod)
106
107 if failure.value.find('remote_getKeycardClasses') > -1:
108 self.warning(
109 "Manager %s:%d is older than version 0.3.0. "
110 "Please upgrade." % (self._managerHost, self._managerPort))
111 return
112
113 return failure
114
115 def loginFailedErrback(failure):
116 self.warning('Login failed, reason: %s' % str(failure))
117
118 d.addCallback(loginCallback)
119 d.addErrback(accessDeniedErrback)
120 d.addErrback(connectionRefusedErrback)
121 d.addErrback(alreadyConnectedErrback)
122 d.addErrback(NoSuchMethodErrback)
123 d.addErrback(loginFailedErrback)
124
126 """
127 I am a medium interfacing with the manager-side WorkerAvatar.
128
129 @ivar brain: the worker brain
130 @type brain: L{worker.WorkerBrain}
131 @ivar factory: the worker client factory
132 @type factory: L{WorkerClientFactory}
133 """
134
135 logCategory = 'workermedium'
136
137 implements(interfaces.IWorkerMedium)
138
140 """
141 @type brain: L{worker.WorkerBrain}
142 """
143 self.brain = brain
144 self.factory = None
145 registerVFSJelly()
146
161
166
167
169 """
170 Gets the set of TCP ports that this worker is configured to use.
171
172 @rtype: 2-tuple: (list of int, bool)
173 @return: list of ports, and a boolean if we allocate ports
174 randomly
175 """
176 return self.brain.getPorts()
177
179 """
180 Return the TCP port the Feed Server is listening on.
181
182 @rtype: int, or NoneType
183 @return: TCP port number, or None if there is no feed server
184 """
185 return self.brain.getFeedServerPort()
186
187 - def remote_create(self, avatarId, type, moduleName, methodName,
188 nice, conf):
189 """
190 Start a component of the given type with the given nice level.
191 Will spawn a new job process to run the component in.
192
193 @param avatarId: avatar identification string
194 @type avatarId: str
195 @param type: type of the component to create
196 @type type: str
197 @param moduleName: name of the module to create the component from
198 @type moduleName: str
199 @param methodName: the factory method to use to create the component
200 @type methodName: str
201 @param nice: nice level
202 @type nice: int
203 @param conf: component config
204 @type conf: dict
205
206 @returns: a deferred fired when the process has started and created
207 the component
208 """
209 return self.brain.create(avatarId, type, moduleName, methodName,
210 nice, conf)
211
213 """
214 Checks if one or more GStreamer elements are present and can be
215 instantiated.
216
217 @param elementNames: names of the Gstreamer elements
218 @type elementNames: list of str
219
220 @rtype: list of str
221 @returns: a list of instantiatable element names
222 """
223 return self.brain.runCheck('flumotion.worker.checks.check',
224 'checkElements', elementNames)
225
227 """
228 Checks if the given module can be imported.
229
230 @param moduleName: name of the module to check
231 @type moduleName: str
232
233 @returns: None or Failure
234 """
235 return self.brain.runCheck('flumotion.worker.checks.check', 'checkImport',
236 moduleName)
237
239 """
240 Runs the given function in the given module with the given arguments.
241
242 @param module: module the function lives in
243 @type module: str
244 @param function: function to run
245 @type function: str
246
247 @returns: the return value of the given function in the module.
248 """
249 return self.brain.runCheck(module, function, *args, **kwargs)
250 remote_runFunction = remote_runCheck
251
253 """
254 I return a list of componentAvatarIds, I have. I am called by the
255 manager soon after I attach to it. This is needed on reconnects
256 so that the manager knows what components it needs to start on me.
257
258 @returns: a list of componentAvatarIds
259 """
260 return self.brain.getComponents()
261
263 """Kill one of the worker's jobs.
264
265 This method is intended for exceptional purposes only; a normal
266 component shutdown is performed by the manager via calling
267 remote_stop() on the component avatar.
268
269 Raises L{flumotion.common.errors.UnknownComponentError} if the
270 job is unknown.
271
272 @param avatarId: the avatar Id of the component, e.g.
273 '/default/audio-encoder'
274 @type avatarId: string
275 @param signum: Signal to send, optional. Defaults to SIGKILL.
276 @type signum: int
277 """
278 self.brain.killJob(avatarId, signum)
279
282
284 """List the directory called path
285 @returns: the directory
286 @rtype: deferred that will fire an object implementing L{IDirectory}
287 """
288 return listDirectory(directoryName)
289