Package flumotion :: Package worker :: Module worker
[hide private]

Source Code for Module flumotion.worker.worker

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """worker-side objects to handle worker clients 
 23  """ 
 24   
 25  import signal 
 26   
 27  from twisted.internet import defer, error, reactor 
 28  from zope.interface import implements 
 29   
 30  from flumotion.common import errors, interfaces, log 
 31  from flumotion.worker import medium, job, feedserver 
 32  from flumotion.twisted.defer import defer_call_later 
 33   
 34  __version__ = "$Rev: 6561 $" 
 35   
 36   
37 -class ProxyBouncer(log.Loggable):
38 logCategory = "proxybouncer" 39 40 """ 41 I am a bouncer that proxies authenticate calls to a remote FPB root 42 object. 43 """
44 - def __init__(self, remote):
45 """ 46 @param remote: an object that has .callRemote() 47 """ 48 self._remote = remote
49
50 - def getKeycardClasses(self):
51 """ 52 Call me before asking me to authenticate, so I know what I can 53 authenticate. 54 """ 55 return self._remote.callRemote('getKeycardClasses')
56
57 - def authenticate(self, keycard):
58 self.debug("Authenticating keycard %r against remote bouncer", 59 keycard) 60 return self._remote.callRemote('authenticate', None, keycard)
61 62 # Similar to Vishnu, but for worker related classes
63 -class WorkerBrain(log.Loggable):
64 """ 65 I am the main object in the worker process, managing jobs and everything 66 related. 67 I live in the main worker process. 68 69 @ivar authenticator: authenticator worker used to log in to manager 70 @type authenticator L{flumotion.twisted.pb.Authenticator} 71 @ivar medium: 72 @type medium: L{medium.WorkerMedium} 73 @ivar jobHeaven: 74 @type jobHeaven: L{job.ComponentJobHeaven} 75 @ivar checkHeaven: 76 @type checkHeaven: L{job.CheckJobHeaven} 77 @ivar workerClientFactory: 78 @type workerClientFactory: L{medium.WorkerClientFactory} 79 @ivar feedServerPort: TCP port the Feed Server is listening on 80 @type feedServerPort: int 81 """ 82 83 implements(interfaces.IFeedServerParent) 84 85 logCategory = 'workerbrain' 86
87 - def __init__(self, options):
88 """ 89 @param options: the optparsed dictionary of command-line options 90 @type options: an object with attributes 91 """ 92 self.options = options 93 self.workerName = options.name 94 95 # the last port is reserved for our FeedServer 96 if not self.options.randomFeederports: 97 self.ports = self.options.feederports[:-1] 98 else: 99 self.ports = [] 100 101 self.medium = medium.WorkerMedium(self) 102 103 # really should be componentJobHeaven, but this is shorter :) 104 self.jobHeaven = job.ComponentJobHeaven(self) 105 # for ephemeral checks 106 self.checkHeaven = job.CheckJobHeaven(self) 107 108 self.managerConnectionInfo = None 109 110 # it's possible we don't have a feed server, if we are 111 # configured to have 0 tcp ports; setup this in listen() 112 self.feedServer = None 113 114 self.stopping = False 115 reactor.addSystemEventTrigger('before', 'shutdown', 116 self.shutdownHandler) 117 self._installHUPHandler()
118
119 - def _installHUPHandler(self):
120 def sighup(signum, frame): 121 if self._oldHUPHandler: 122 self.log('got SIGHUP, calling previous handler %r', 123 self._oldHUPHandler) 124 self._oldHUPHandler(signum, frame) 125 self.debug('telling kids about new log file descriptors') 126 self.jobHeaven.rotateChildLogFDs()
127 128 handler = signal.signal(signal.SIGHUP, sighup) 129 if handler == signal.SIG_DFL or handler == signal.SIG_IGN: 130 self._oldHUPHandler = None 131 else: 132 self._oldHUPHandler = handler
133
134 - def listen(self):
135 """ 136 Start listening on FeedServer (incoming eater requests) and 137 JobServer (through which we communicate with our children) ports 138 139 @returns: True if we successfully listened on both ports 140 """ 141 # set up feed server if we have the feederports for it 142 try: 143 self.feedServer = self._makeFeedServer() 144 except error.CannotListenError, e: 145 self.warning("Failed to listen on feed server port: %r", e) 146 return False 147 148 try: 149 self.jobHeaven.listen() 150 except error.CannotListenError, e: 151 self.warning("Failed to listen on job server port: %r", e) 152 return False 153 154 try: 155 self.checkHeaven.listen() 156 except error.CannotListenError, e: 157 self.warning("Failed to listen on check server port: %r", e) 158 return False 159 160 return True
161
162 - def _makeFeedServer(self):
163 """ 164 @returns: L{flumotion.worker.feedserver.FeedServer} 165 """ 166 port = None 167 if self.options.randomFeederports: 168 port = 0 169 elif not self.options.feederports: 170 self.info('Not starting feed server because no port is ' 171 'configured') 172 return None 173 else: 174 port = self.options.feederports[-1] 175 176 return feedserver.FeedServer(self, ProxyBouncer(self), port)
177
178 - def login(self, managerConnectionInfo):
179 self.managerConnectionInfo = managerConnectionInfo 180 self.medium.startConnecting(managerConnectionInfo)
181
182 - def callRemote(self, methodName, *args, **kwargs):
183 return self.medium.callRemote(methodName, *args, **kwargs)
184
185 - def shutdownHandler(self):
186 if self.stopping: 187 self.warning("Already shutting down, ignoring shutdown request") 188 return 189 190 self.info("Reactor shutting down, stopping jobHeaven") 191 self.stopping = True 192 193 l = [self.jobHeaven.shutdown(), self.checkHeaven.shutdown()] 194 if self.feedServer: 195 l.append(self.feedServer.shutdown()) 196 # Don't fire this other than from a callLater 197 return defer_call_later(defer.DeferredList(l))
198 199 ### These methods called by feed server
200 - def feedToFD(self, componentId, feedName, fd, eaterId):
201 """ 202 Called from the FeedAvatar to pass a file descriptor on to 203 the job running the component for this feeder. 204 205 @returns: whether the fd was successfully handed off to the component. 206 """ 207 if componentId not in self.jobHeaven.avatars: 208 self.warning("No such component %s running", componentId) 209 return False 210 211 avatar = self.jobHeaven.avatars[componentId] 212 return avatar.sendFeed(feedName, fd, eaterId)
213
214 - def eatFromFD(self, componentId, eaterAlias, fd, feedId):
215 """ 216 Called from the FeedAvatar to pass a file descriptor on to 217 the job running the given component. 218 219 @returns: whether the fd was successfully handed off to the component. 220 """ 221 if componentId not in self.jobHeaven.avatars: 222 self.warning("No such component %s running", componentId) 223 return False 224 225 avatar = self.jobHeaven.avatars[componentId] 226 return avatar.receiveFeed(eaterAlias, fd, feedId)
227 228 ### these methods called by WorkerMedium
229 - def getPorts(self):
230 return self.ports, self.options.randomFeederports
231
232 - def getFeedServerPort(self):
233 if self.feedServer: 234 return self.feedServer.getPortNum() 235 else: 236 return None
237
238 - def create(self, avatarId, type, moduleName, methodName, nice, 239 conf):
240 def getBundles(): 241 # set up bundles as we need to have a pb connection to 242 # download the modules -- can't do that in the kid yet. 243 moduleNames = [moduleName] 244 for plugs in conf.get('plugs', {}).values(): 245 for plug in plugs: 246 for entry in plug.get('entries', {}).values(): 247 moduleNames.append(entry['module-name']) 248 self.debug('setting up bundles for %r', moduleNames) 249 return self.medium.bundleLoader.getBundles(moduleName=moduleNames)
250 251 def spawnJob(bundles): 252 return self.jobHeaven.spawn(avatarId, type, moduleName, 253 methodName, nice, bundles, conf) 254 255 def createError(failure): 256 failure.trap(errors.ComponentCreateError) 257 self.debug('create deferred for %s failed, forwarding error', 258 avatarId) 259 return failure 260 261 def success(res): 262 self.debug('create deferred for %s succeeded (%r)', 263 avatarId, res) 264 return res 265 266 self.info('Starting component "%s" of type "%s"', avatarId, 267 type) 268 d = getBundles() 269 d.addCallback(spawnJob) 270 d.addCallback(success) 271 d.addErrback(createError) 272 return d 273
274 - def runCheck(self, module, function, *args, **kwargs):
275 def getBundles(): 276 self.debug('setting up bundles for %s', module) 277 return self.medium.bundleLoader.getBundles(moduleName=module)
278 279 def runCheck(bundles): 280 return self.checkHeaven.runCheck(bundles, module, function, 281 *args, **kwargs) 282 283 d = getBundles() 284 d.addCallback(runCheck) 285 return d 286
287 - def getComponents(self):
288 return [job.avatarId for job in self.jobHeaven.getJobInfos()]
289
290 - def killJob(self, avatarId, signum):
291 self.jobHeaven.killJob(avatarId, signum)
292