Package flumotion :: Package common :: Module medium
[hide private]

Source Code for Module flumotion.common.medium

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """base classes for PB client-side mediums. 
 23  """ 
 24   
 25  import time 
 26   
 27  from twisted.spread import pb 
 28  from twisted.internet import defer, reactor 
 29  from zope.interface import implements 
 30   
 31  from flumotion.common import log, interfaces, bundleclient, errors 
 32  from flumotion.common import messages 
 33  from flumotion.common.netutils import addressGetHost 
 34  from flumotion.configure import configure 
 35  from flumotion.twisted import pb as fpb 
 36   
 37  __version__ = "$Rev: 6964 $" 
 38   
 39   
40 -class BaseMedium(fpb.Referenceable):
41 """ 42 I am a base interface for PB clients interfacing with PB server-side 43 avatars. 44 Used by admin/worker/component to talk to manager's vishnu, 45 and by job to talk to worker's brain. 46 47 @ivar remote: a remote reference to the server-side object on 48 which perspective_(methodName) methods can be called 49 @type remote: L{twisted.spread.pb.RemoteReference} 50 @type bundleLoader: L{flumotion.common.bundleclient.BundleLoader} 51 """ 52 53 # subclasses will need to set this to the specific medium type 54 # tho... 55 implements(interfaces.IMedium) 56 logCategory = "basemedium" 57 remoteLogName = "baseavatar" 58 59 remote = None 60 bundleLoader = None 61
62 - def setRemoteReference(self, remoteReference):
63 """ 64 Set the given remoteReference as the reference to the server-side 65 avatar. 66 67 @param remoteReference: L{twisted.spread.pb.RemoteReference} 68 """ 69 self.debug('%r.setRemoteReference: %r' % (self, remoteReference)) 70 self.remote = remoteReference 71 def nullRemote(x): 72 self.debug('%r: disconnected from %r' % (self, self.remote)) 73 self.remote = None
74 self.remote.notifyOnDisconnect(nullRemote) 75 76 self.bundleLoader = bundleclient.BundleLoader(self.callRemote) 77 78 # figure out connection addresses if it's an internet address 79 tarzan = None 80 jane = None 81 try: 82 transport = remoteReference.broker.transport 83 tarzan = transport.getHost() 84 jane = transport.getPeer() 85 except Exception, e: 86 self.debug("could not get connection info, reason %r" % e) 87 if tarzan and jane: 88 self.debug("connection is from me on %s to remote on %s" % ( 89 addressGetHost(tarzan), 90 addressGetHost(jane)))
91
92 - def hasRemoteReference(self):
93 """ 94 Does the medium have a remote reference to a server-side avatar ? 95 """ 96 return self.remote != None
97
98 - def callRemoteLogging(self, level, stackDepth, name, *args, **kwargs):
99 """ 100 Call the given method with the given arguments remotely on the 101 server-side avatar. 102 103 Gets serialized to server-side perspective_ methods. 104 105 @param level: the level we should log at (log.DEBUG, log.INFO, etc) 106 @type level: int 107 @param stackDepth: the number of stack frames to go back to get 108 file and line information, negative or zero. 109 @type stackDepth: non-positive int 110 @param name: name of the remote method 111 @type name: str 112 """ 113 if level is not None: 114 debugClass = str(self.__class__).split(".")[-1].upper() 115 startArgs = [self.remoteLogName, debugClass, name] 116 format, debugArgs = log.getFormatArgs( 117 '%s --> %s: callRemote(%s, ', startArgs, 118 ')', (), args, kwargs) 119 logKwArgs = self.doLog(level, stackDepth - 1, 120 format, *debugArgs) 121 122 if not self.remote: 123 self.warning('Tried to callRemote(%s), but we are disconnected' 124 % name) 125 return defer.fail(errors.NotConnectedError()) 126 127 def callback(result): 128 format, debugArgs = log.getFormatArgs( 129 '%s <-- %s: callRemote(%s, ', startArgs, 130 '): %s', (log.ellipsize(result),), args, kwargs) 131 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 132 return result
133 134 def errback(failure): 135 format, debugArgs = log.getFormatArgs( 136 '%s <-- %s: callRemote(%s, ', startArgs, 137 '): %r', (failure,), args, kwargs) 138 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 139 return failure 140 141 d = self.remote.callRemote(name, *args, **kwargs) 142 if level is not None: 143 d.addCallbacks(callback, errback) 144 return d 145
146 - def callRemote(self, name, *args, **kwargs):
147 """ 148 Call the given method with the given arguments remotely on the 149 server-side avatar. 150 151 Gets serialized to server-side perspective_ methods. 152 """ 153 return self.callRemoteLogging(log.DEBUG, -1, name, *args, 154 **kwargs)
155
156 - def getBundledFunction(self, module, function):
157 """ 158 Returns the given function in the given module, loading the 159 module from a bundle. 160 161 If we can't find the bundle for the given module, or if the 162 given module does not contain the requested function, we will 163 raise L{flumotion.common.errors.RemoteRunError} (perhaps a 164 poorly chosen error). If importing the module raises an 165 exception, that exception will be passed through unmodified. 166 167 @param module: module the function lives in 168 @type module: str 169 @param function: function to run 170 @type function: str 171 172 @returns: a callable, the given function in the given module. 173 """ 174 def gotModule(mod): 175 if hasattr(mod, function): 176 return getattr(mod, function) 177 else: 178 msg = 'No procedure named %s in module %s' % (function, 179 module) 180 self.warning('%s', msg) 181 raise errors.RemoteRunError(msg)
182 183 def gotModuleError(failure): 184 failure.trap(errors.NoBundleError) 185 msg = 'Failed to find bundle for module %s' % module 186 self.warning('%s', msg) 187 raise errors.RemoteRunError(msg) 188 189 d = self.bundleLoader.loadModule(module) 190 d.addCallbacks(gotModule, gotModuleError) 191 return d 192
193 - def runBundledFunction(self, module, function, *args, **kwargs):
194 """ 195 Runs the given function in the given module with the given 196 arguments. 197 198 This method calls getBundledFunction and then invokes the 199 function. Any error raised by getBundledFunction or by invoking 200 the function will be passed through unmodified. 201 202 Callers that expect to return their result over a PB connection 203 should catch nonserializable exceptions so as to prevent nasty 204 backtraces in the logs. 205 206 @param module: module the function lives in 207 @type module: str 208 @param function: function to run 209 @type function: str 210 211 @returns: the return value of the given function in the module. 212 """ 213 self.debug('runBundledFunction(%r, %r)', module, function) 214 def gotFunction(proc): 215 def invocationError(failure): 216 self.warning('Exception raised while calling ' 217 '%s.%s(*args=%r, **kwargs=%r): %s', 218 module, function, args, kwargs, 219 log.getFailureMessage(failure)) 220 return failure
221 222 self.debug('calling %s.%s(%r, %r)', module, function, args, 223 kwargs) 224 d = defer.maybeDeferred(proc, *args, **kwargs) 225 d.addErrback(invocationError) 226 return d 227 228 d = self.getBundledFunction(module, function) 229 d.addCallback(gotFunction) 230 return d 231 232
233 -class PingingMedium(BaseMedium):
234 _pingInterval = configure.heartbeatInterval 235 _pingCheckInterval = configure.heartbeatInterval * 2.5 236 _pingDC = None 237
238 - def startPinging(self, disconnect):
239 """ 240 @param disconnect: a method to call when we do not get ping replies 241 @type disconnect: callable 242 """ 243 self.debug('startPinging') 244 self._lastPingback = time.time() 245 if self._pingDC: 246 self.debug("Cannot start pinging, already pinging") 247 return 248 self._pingDisconnect = disconnect 249 self._ping() 250 self._pingCheck()
251
252 - def _ping(self):
253 def pingback(result): 254 self._lastPingback = time.time() 255 self.log('pinged, pingback at %r' % self._lastPingback)
256 257 def pingFailed(failure): 258 # ignoring the connection failures so they don't end up in 259 # the logs - we'll notice the lack of pingback eventually 260 failure.trap(pb.PBConnectionLost) 261 self.log('ping failed: %s' % log.getFailureMessage(failure))
262 263 if self.remote: 264 self.log('pinging') 265 d = self.callRemoteLogging(log.LOG, 0, 'ping') 266 d.addCallbacks(pingback, pingFailed) 267 else: 268 self.info('tried to ping, but disconnected yo') 269 270 self._pingDC = reactor.callLater(self._pingInterval, 271 self._ping) 272
273 - def _pingCheck(self):
274 self._pingCheckDC = None 275 if (self.remote and 276 (time.time() - self._lastPingback > self._pingCheckInterval)): 277 self.info('no pingback in %f seconds, closing connection', 278 self._pingCheckInterval) 279 self._pingDisconnect() 280 else: 281 self._pingCheckDC = reactor.callLater(self._pingCheckInterval, 282 self._pingCheck)
283 - def stopPinging(self):
284 if self._pingCheckDC: 285 self._pingCheckDC.cancel() 286 self._pingCheckDC = None 287 288 if self._pingDC: 289 self._pingDC.cancel() 290 self._pingDC = None
291
292 - def _disconnect(self):
293 if self.remote: 294 self.remote.broker.transport.loseConnection()
295
296 - def setRemoteReference(self, remote):
297 BaseMedium.setRemoteReference(self, remote) 298 def stopPingingCb(x): 299 self.debug('stop pinging') 300 self.stopPinging()
301 self.remote.notifyOnDisconnect(stopPingingCb) 302 303 self.startPinging(self._disconnect) 304
305 - def remote_writeFluDebugMarker(self, level, marker):
306 """ 307 Sets a marker that will be prefixed to the log strings. Setting this 308 marker to multiple elements at a time helps debugging. 309 @param marker: A string to prefix all the log strings. 310 @type marker: str 311 """ 312 self.writeMarker(marker, level)
313