1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """worker-side objects to handle worker clients
23 """
24
25 import signal
26
27 from twisted.internet import defer, error, reactor
28 from zope.interface import implements
29
30 from flumotion.common import errors, interfaces, log
31 from flumotion.worker import medium, job, feedserver
32 from flumotion.twisted.defer import defer_call_later
33
34 __version__ = "$Rev: 6561 $"
35
36
38 logCategory = "proxybouncer"
39
40 """
41 I am a bouncer that proxies authenticate calls to a remote FPB root
42 object.
43 """
45 """
46 @param remote: an object that has .callRemote()
47 """
48 self._remote = remote
49
51 """
52 Call me before asking me to authenticate, so I know what I can
53 authenticate.
54 """
55 return self._remote.callRemote('getKeycardClasses')
56
61
62
64 """
65 I am the main object in the worker process, managing jobs and everything
66 related.
67 I live in the main worker process.
68
69 @ivar authenticator: authenticator worker used to log in to manager
70 @type authenticator L{flumotion.twisted.pb.Authenticator}
71 @ivar medium:
72 @type medium: L{medium.WorkerMedium}
73 @ivar jobHeaven:
74 @type jobHeaven: L{job.ComponentJobHeaven}
75 @ivar checkHeaven:
76 @type checkHeaven: L{job.CheckJobHeaven}
77 @ivar workerClientFactory:
78 @type workerClientFactory: L{medium.WorkerClientFactory}
79 @ivar feedServerPort: TCP port the Feed Server is listening on
80 @type feedServerPort: int
81 """
82
83 implements(interfaces.IFeedServerParent)
84
85 logCategory = 'workerbrain'
86
88 """
89 @param options: the optparsed dictionary of command-line options
90 @type options: an object with attributes
91 """
92 self.options = options
93 self.workerName = options.name
94
95
96 if not self.options.randomFeederports:
97 self.ports = self.options.feederports[:-1]
98 else:
99 self.ports = []
100
101 self.medium = medium.WorkerMedium(self)
102
103
104 self.jobHeaven = job.ComponentJobHeaven(self)
105
106 self.checkHeaven = job.CheckJobHeaven(self)
107
108 self.managerConnectionInfo = None
109
110
111
112 self.feedServer = None
113
114 self.stopping = False
115 reactor.addSystemEventTrigger('before', 'shutdown',
116 self.shutdownHandler)
117 self._installHUPHandler()
118
120 def sighup(signum, frame):
121 if self._oldHUPHandler:
122 self.log('got SIGHUP, calling previous handler %r',
123 self._oldHUPHandler)
124 self._oldHUPHandler(signum, frame)
125 self.debug('telling kids about new log file descriptors')
126 self.jobHeaven.rotateChildLogFDs()
127
128 handler = signal.signal(signal.SIGHUP, sighup)
129 if handler == signal.SIG_DFL or handler == signal.SIG_IGN:
130 self._oldHUPHandler = None
131 else:
132 self._oldHUPHandler = handler
133
135 """
136 Start listening on FeedServer (incoming eater requests) and
137 JobServer (through which we communicate with our children) ports
138
139 @returns: True if we successfully listened on both ports
140 """
141
142 try:
143 self.feedServer = self._makeFeedServer()
144 except error.CannotListenError, e:
145 self.warning("Failed to listen on feed server port: %r", e)
146 return False
147
148 try:
149 self.jobHeaven.listen()
150 except error.CannotListenError, e:
151 self.warning("Failed to listen on job server port: %r", e)
152 return False
153
154 try:
155 self.checkHeaven.listen()
156 except error.CannotListenError, e:
157 self.warning("Failed to listen on check server port: %r", e)
158 return False
159
160 return True
161
163 """
164 @returns: L{flumotion.worker.feedserver.FeedServer}
165 """
166 port = None
167 if self.options.randomFeederports:
168 port = 0
169 elif not self.options.feederports:
170 self.info('Not starting feed server because no port is '
171 'configured')
172 return None
173 else:
174 port = self.options.feederports[-1]
175
176 return feedserver.FeedServer(self, ProxyBouncer(self), port)
177
178 - def login(self, managerConnectionInfo):
179 self.managerConnectionInfo = managerConnectionInfo
180 self.medium.startConnecting(managerConnectionInfo)
181
182 - def callRemote(self, methodName, *args, **kwargs):
184
186 if self.stopping:
187 self.warning("Already shutting down, ignoring shutdown request")
188 return
189
190 self.info("Reactor shutting down, stopping jobHeaven")
191 self.stopping = True
192
193 l = [self.jobHeaven.shutdown(), self.checkHeaven.shutdown()]
194 if self.feedServer:
195 l.append(self.feedServer.shutdown())
196
197 return defer_call_later(defer.DeferredList(l))
198
199
200 - def feedToFD(self, componentId, feedName, fd, eaterId):
201 """
202 Called from the FeedAvatar to pass a file descriptor on to
203 the job running the component for this feeder.
204
205 @returns: whether the fd was successfully handed off to the component.
206 """
207 if componentId not in self.jobHeaven.avatars:
208 self.warning("No such component %s running", componentId)
209 return False
210
211 avatar = self.jobHeaven.avatars[componentId]
212 return avatar.sendFeed(feedName, fd, eaterId)
213
214 - def eatFromFD(self, componentId, eaterAlias, fd, feedId):
215 """
216 Called from the FeedAvatar to pass a file descriptor on to
217 the job running the given component.
218
219 @returns: whether the fd was successfully handed off to the component.
220 """
221 if componentId not in self.jobHeaven.avatars:
222 self.warning("No such component %s running", componentId)
223 return False
224
225 avatar = self.jobHeaven.avatars[componentId]
226 return avatar.receiveFeed(eaterAlias, fd, feedId)
227
228
230 return self.ports, self.options.randomFeederports
231
233 if self.feedServer:
234 return self.feedServer.getPortNum()
235 else:
236 return None
237
238 - def create(self, avatarId, type, moduleName, methodName, nice,
239 conf):
250
251 def spawnJob(bundles):
252 return self.jobHeaven.spawn(avatarId, type, moduleName,
253 methodName, nice, bundles, conf)
254
255 def createError(failure):
256 failure.trap(errors.ComponentCreateError)
257 self.debug('create deferred for %s failed, forwarding error',
258 avatarId)
259 return failure
260
261 def success(res):
262 self.debug('create deferred for %s succeeded (%r)',
263 avatarId, res)
264 return res
265
266 self.info('Starting component "%s" of type "%s"', avatarId,
267 type)
268 d = getBundles()
269 d.addCallback(spawnJob)
270 d.addCallback(success)
271 d.addErrback(createError)
272 return d
273
274 - def runCheck(self, module, function, *args, **kwargs):
278
279 def runCheck(bundles):
280 return self.checkHeaven.runCheck(bundles, module, function,
281 *args, **kwargs)
282
283 d = getBundles()
284 d.addCallback(runCheck)
285 return d
286
289
290 - def killJob(self, avatarId, signum):
292