Package flumotion :: Package worker :: Module medium
[hide private]

Source Code for Module flumotion.worker.medium

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects to handle worker clients 
 24  """ 
 25   
 26  import signal 
 27   
 28  from twisted.internet import reactor, error 
 29  from twisted.spread import flavors 
 30  from zope.interface import implements 
 31   
 32  from flumotion.common import errors, interfaces, debug 
 33  from flumotion.common import medium 
 34  from flumotion.common.vfs import listDirectory, registerVFSJelly 
 35  from flumotion.twisted.pb import ReconnectingFPBClientFactory 
 36   
 37  __version__ = "$Rev: 7038 $" 
 38  JOB_SHUTDOWN_TIMEOUT = 5 
 39   
 40   
41 -class WorkerClientFactory(ReconnectingFPBClientFactory):
42 """ 43 I am a client factory for the worker to log in to the manager. 44 """ 45 logCategory = 'worker' 46 perspectiveInterface = interfaces.IWorkerMedium 47
48 - def __init__(self, medium, host, port):
49 """ 50 @type medium: L{flumotion.worker.medium.WorkerMedium} 51 @type host: str 52 @type port: int 53 """ 54 self._managerHost = host 55 self._managerPort = port 56 self.medium = medium 57 # doing this as a class method triggers a doc error 58 ReconnectingFPBClientFactory.__init__(self) 59 # maximum 10 second delay for workers to attempt to log in again 60 self.maxDelay = 10
61
62 - def clientConnectionFailed(self, connector, reason):
63 """ 64 @param reason: L{twisted.spread.pb.failure.Failure} 65 """ 66 # this method exists so that we log the failure 67 ReconnectingFPBClientFactory.clientConnectionFailed(self, 68 connector, reason) 69 # delay is now updated 70 self.debug("failed to connect, will try to reconnect in %f seconds" % self.delay)
71 72 ### ReconnectingPBClientFactory methods
73 - def gotDeferredLogin(self, d):
74 # the deferred from the login is now available 75 # add some of our own to it 76 def remoteDisconnected(remoteReference): 77 if reactor.killed: 78 self.log('Connection to manager lost due to shutdown') 79 else: 80 self.warning('Lost connection to manager, ' 81 'will attempt to reconnect')
82 83 def loginCallback(reference): 84 self.info("Logged in to manager") 85 self.debug("remote reference %r" % reference) 86 87 self.medium.setRemoteReference(reference) 88 reference.notifyOnDisconnect(remoteDisconnected)
89 90 def alreadyConnectedErrback(failure): 91 failure.trap(errors.AlreadyConnectedError) 92 self.warning('A worker with the name "%s" is already connected.' % 93 failure.value) 94 95 def accessDeniedErrback(failure): 96 failure.trap(errors.NotAuthenticatedError) 97 self.warning('Access denied.') 98 99 def connectionRefusedErrback(failure): 100 failure.trap(error.ConnectionRefusedError) 101 self.warning('Connection to %s:%d refused.' % (self._managerHost, 102 self._managerPort)) 103 104 def NoSuchMethodErrback(failure): 105 failure.trap(flavors.NoSuchMethod) 106 # failure.value is a str 107 if failure.value.find('remote_getKeycardClasses') > -1: 108 self.warning( 109 "Manager %s:%d is older than version 0.3.0. " 110 "Please upgrade." % (self._managerHost, self._managerPort)) 111 return 112 113 return failure 114 115 def loginFailedErrback(failure): 116 self.warning('Login failed, reason: %s' % str(failure)) 117 118 d.addCallback(loginCallback) 119 d.addErrback(accessDeniedErrback) 120 d.addErrback(connectionRefusedErrback) 121 d.addErrback(alreadyConnectedErrback) 122 d.addErrback(NoSuchMethodErrback) 123 d.addErrback(loginFailedErrback) 124
125 -class WorkerMedium(medium.PingingMedium):
126 """ 127 I am a medium interfacing with the manager-side WorkerAvatar. 128 129 @ivar brain: the worker brain 130 @type brain: L{worker.WorkerBrain} 131 @ivar factory: the worker client factory 132 @type factory: L{WorkerClientFactory} 133 """ 134 135 logCategory = 'workermedium' 136 137 implements(interfaces.IWorkerMedium) 138
139 - def __init__(self, brain):
140 """ 141 @type brain: L{worker.WorkerBrain} 142 """ 143 self.brain = brain 144 self.factory = None 145 registerVFSJelly()
146
147 - def startConnecting(self, connectionInfo):
148 info = connectionInfo 149 150 self.factory = WorkerClientFactory(self, info.host, info.port) 151 self.factory.startLogin(info.authenticator) 152 153 if info.use_ssl: 154 from flumotion.common import common 155 common.assertSSLAvailable() 156 from twisted.internet import ssl 157 reactor.connectSSL(info.host, info.port, self.factory, 158 ssl.ClientContextFactory()) 159 else: 160 reactor.connectTCP(info.host, info.port, self.factory)
161
162 - def stopConnecting(self):
163 # only called by test suites 164 self.factory.disconnect() 165 self.factory.stopTrying()
166 167 ### pb.Referenceable method for the manager's WorkerAvatar
168 - def remote_getPorts(self):
169 """ 170 Gets the set of TCP ports that this worker is configured to use. 171 172 @rtype: 2-tuple: (list of int, bool) 173 @return: list of ports, and a boolean if we allocate ports 174 randomly 175 """ 176 return self.brain.getPorts()
177
178 - def remote_getFeedServerPort(self):
179 """ 180 Return the TCP port the Feed Server is listening on. 181 182 @rtype: int, or NoneType 183 @return: TCP port number, or None if there is no feed server 184 """ 185 return self.brain.getFeedServerPort()
186
187 - def remote_create(self, avatarId, type, moduleName, methodName, 188 nice, conf):
189 """ 190 Start a component of the given type with the given nice level. 191 Will spawn a new job process to run the component in. 192 193 @param avatarId: avatar identification string 194 @type avatarId: str 195 @param type: type of the component to create 196 @type type: str 197 @param moduleName: name of the module to create the component from 198 @type moduleName: str 199 @param methodName: the factory method to use to create the component 200 @type methodName: str 201 @param nice: nice level 202 @type nice: int 203 @param conf: component config 204 @type conf: dict 205 206 @returns: a deferred fired when the process has started and created 207 the component 208 """ 209 return self.brain.create(avatarId, type, moduleName, methodName, 210 nice, conf)
211
212 - def remote_checkElements(self, elementNames):
213 """ 214 Checks if one or more GStreamer elements are present and can be 215 instantiated. 216 217 @param elementNames: names of the Gstreamer elements 218 @type elementNames: list of str 219 220 @rtype: list of str 221 @returns: a list of instantiatable element names 222 """ 223 return self.brain.runCheck('flumotion.worker.checks.check', 224 'checkElements', elementNames)
225
226 - def remote_checkImport(self, moduleName):
227 """ 228 Checks if the given module can be imported. 229 230 @param moduleName: name of the module to check 231 @type moduleName: str 232 233 @returns: None or Failure 234 """ 235 return self.brain.runCheck('flumotion.worker.checks.check', 'checkImport', 236 moduleName)
237
238 - def remote_runCheck(self, module, function, *args, **kwargs):
239 """ 240 Runs the given function in the given module with the given arguments. 241 242 @param module: module the function lives in 243 @type module: str 244 @param function: function to run 245 @type function: str 246 247 @returns: the return value of the given function in the module. 248 """ 249 return self.brain.runCheck(module, function, *args, **kwargs)
250 remote_runFunction = remote_runCheck 251
252 - def remote_getComponents(self):
253 """ 254 I return a list of componentAvatarIds, I have. I am called by the 255 manager soon after I attach to it. This is needed on reconnects 256 so that the manager knows what components it needs to start on me. 257 258 @returns: a list of componentAvatarIds 259 """ 260 return self.brain.getComponents()
261
262 - def remote_killJob(self, avatarId, signum=signal.SIGKILL):
263 """Kill one of the worker's jobs. 264 265 This method is intended for exceptional purposes only; a normal 266 component shutdown is performed by the manager via calling 267 remote_stop() on the component avatar. 268 269 Raises L{flumotion.common.errors.UnknownComponentError} if the 270 job is unknown. 271 272 @param avatarId: the avatar Id of the component, e.g. 273 '/default/audio-encoder' 274 @type avatarId: string 275 @param signum: Signal to send, optional. Defaults to SIGKILL. 276 @type signum: int 277 """ 278 self.brain.killJob(avatarId, signum)
279
280 - def remote_getVersions(self):
281 return debug.getVersions()
282
283 - def remote_listDirectory(self, directoryName):
284 """List the directory called path 285 @returns: the directory 286 @rtype: deferred that will fire an object implementing L{IDirectory} 287 """ 288 return listDirectory(directoryName)
289