Package flumotion :: Package common :: Module medium
[hide private]

Source Code for Module flumotion.common.medium

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Contains the base class for PB client-side mediums. 
 24  """ 
 25   
 26  import time 
 27   
 28  from twisted.spread import pb 
 29  from twisted.internet import defer, reactor 
 30   
 31  from flumotion.twisted.defer import defer_generator_method 
 32  from flumotion.common import log, interfaces, bundleclient, errors, common 
 33  from flumotion.common import messages 
 34  from flumotion.configure import configure 
 35  from flumotion.twisted.compat import implements 
 36  from flumotion.twisted import pb as fpb 
 37   
38 -class BaseMedium(fpb.Referenceable):
39 """ 40 I am a base interface for PB clients interfacing with PB server-side 41 avatars. 42 Used by admin/worker/component to talk to manager's vishnu, 43 and by job to talk to worker's brain. 44 45 @ivar remote: a remote reference to the server-side object on 46 which perspective_(methodName) methods can be called 47 @type remote: L{twisted.spread.pb.RemoteReference} 48 @type bundleLoader: L{flumotion.common.bundleclient.BundleLoader} 49 """ 50 51 # subclasses will need to set this to the specific medium type 52 # tho... 53 implements(interfaces.IMedium) 54 logCategory = "basemedium" 55 remoteLogName = "baseavatar" 56 57 remote = None 58 bundleLoader = None 59
60 - def setRemoteReference(self, remoteReference):
61 """ 62 Set the given remoteReference as the reference to the server-side 63 avatar. 64 65 @param remoteReference: L{twisted.spread.pb.RemoteReference} 66 """ 67 self.debug('%r.setRemoteReference: %r' % (self, remoteReference)) 68 self.remote = remoteReference 69 def nullRemote(x): 70 self.debug('%r: disconnected from %r' % (self, self.remote)) 71 self.remote = None
72 self.remote.notifyOnDisconnect(nullRemote) 73 74 self.bundleLoader = bundleclient.BundleLoader(self.remote) 75 76 # figure out connection addresses if it's an internet address 77 tarzan = None 78 jane = None 79 try: 80 transport = remoteReference.broker.transport 81 tarzan = transport.getHost() 82 jane = transport.getPeer() 83 except Exception, e: 84 self.debug("could not get connection info, reason %r" % e) 85 if tarzan and jane: 86 self.debug("connection is from me on %s to manager on %s" % ( 87 common.addressGetHost(tarzan), 88 common.addressGetHost(jane)))
89
90 - def hasRemoteReference(self):
91 """ 92 Does the medium have a remote reference to a server-side avatar ? 93 """ 94 return self.remote != None
95
96 - def callRemoteLogging(self, level, stackDepth, name, *args, **kwargs):
97 """ 98 Call the given method with the given arguments remotely on the 99 server-side avatar. 100 101 Gets serialized to server-side perspective_ methods. 102 103 @param level: the level we should log at (log.DEBUG, log.INFO, etc) 104 @type level: int 105 @param stackDepth: the number of stack frames to go back to get 106 file and line information, negative or zero. 107 @type stackDepth: non-positive int 108 @param name: name of the remote method 109 @type name: str 110 """ 111 if level is not None: 112 debugClass = str(self.__class__).split(".")[-1].upper() 113 startArgs = [self.remoteLogName, debugClass, name] 114 format, debugArgs = log.getFormatArgs( 115 '%s --> %s: callRemote(%s, ', startArgs, 116 ')', (), args, kwargs) 117 logKwArgs = self.doLog(level, stackDepth - 1, 118 format, *debugArgs) 119 120 if not self.remote: 121 self.warning('Tried to callRemote(%s), but we are disconnected' 122 % name) 123 return defer.fail(errors.NotConnectedError()) 124 125 def callback(result): 126 format, debugArgs = log.getFormatArgs( 127 '%s <-- %s: callRemote(%s, ', startArgs, 128 '): %s', (log.ellipsize(result), ), args, kwargs) 129 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 130 return result
131 132 def errback(failure): 133 format, debugArgs = log.getFormatArgs( 134 '%s <-- %s: callRemote(%s, ', startArgs, 135 '): %r', (failure, ), args, kwargs) 136 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 137 return failure 138 139 d = self.remote.callRemote(name, *args, **kwargs) 140 if level is not None: 141 d.addCallbacks(callback, errback) 142 return d 143
144 - def callRemote(self, name, *args, **kwargs):
145 """ 146 Call the given method with the given arguments remotely on the 147 server-side avatar. 148 149 Gets serialized to server-side perspective_ methods. 150 """ 151 return self.callRemoteLogging(log.DEBUG, -1, name, *args, 152 **kwargs)
153
154 - def runBundledFunction(self, module, function, *args, **kwargs):
155 """ 156 Runs the given function in the given module with the given arguments. 157 158 If we can't find the bundle for the given module, or if the 159 given module does not contain the requested function, we will 160 raise L{flumotion.common.errors.RemoteRunError} (perhaps a 161 poorly chosen error). If importing the module or running the 162 function raises an exception, that exception will be passed 163 through unmodified. 164 165 Callers that expect to return their result over a PB connection 166 should catch nonserializable exceptions so as to prevent nasty 167 backtraces in the logs. 168 169 @param module: module the function lives in 170 @type module: str 171 @param function: function to run 172 @type function: str 173 174 @returns: the return value of the given function in the module. 175 """ 176 self.debug('remote runFunction(%r, %r)' % (module, function)) 177 d = self.bundleLoader.loadModule(module) 178 yield d 179 180 try: 181 mod = d.value() 182 except errors.NoBundleError: 183 msg = 'Failed to find bundle for module %s' % module 184 self.warning(msg) 185 raise errors.RemoteRunError(msg) 186 except Exception, e: 187 self.warning('Exception raised while loading bundle for ' 188 'module %s: %s', module, e) 189 raise 190 191 try: 192 proc = getattr(mod, function) 193 except AttributeError: 194 msg = 'No procedure named %s in module %s' % (function, module) 195 self.warning(msg) 196 raise errors.RemoteRunError(msg) 197 198 try: 199 self.debug('calling %s.%s(%r, %r)' % ( 200 module, function, args, kwargs)) 201 d = proc(*args, **kwargs) 202 except Exception, e: 203 self.warning('Exception raised while calling ' 204 '%s.%s(*args=%r, **kwargs=%r): %s', 205 module, function, args, kwargs, 206 log.getExceptionMessage(e)) 207 raise 208 209 # at this point, we have our result. it could be a value or a 210 # deferred. in the latter case we will need to yield again. 211 yield d 212 213 # only if d was actually a deferred will we get here 214 try: 215 yield d.value() 216 except Exception, e: 217 self.warning('Deferred failure from ' 218 '%s.%s(*args=%r, **kwargs=%r): %s', 219 module, function, args, kwargs, 220 log.getExceptionMessage(e)) 221 raise
222 runBundledFunction = defer_generator_method(runBundledFunction) 223
224 -class PingingMedium(BaseMedium):
225 _pingInterval = configure.heartbeatInterval 226 _pingCheckInterval = configure.heartbeatInterval * 2.5 227 _pingDC = None 228
229 - def startPinging(self, disconnect):
230 """ 231 @param disconnect: a method to call when we do not get ping replies 232 @type disconnect: callable 233 """ 234 self.debug('startPinging') 235 self._lastPingback = time.time() 236 if self._pingDC: 237 self.debug("Cannot start pinging, already pinging") 238 return 239 self._pingDisconnect = disconnect 240 self._ping() 241 self._pingCheck()
242
243 - def _ping(self):
244 def pingback(result): 245 self._lastPingback = time.time() 246 self.log('pinged, pingback at %r' % self._lastPingback)
247 248 if self.remote: 249 self.log('pinging') 250 d = self.callRemoteLogging(log.LOG, 0, 'ping') 251 d.addCallback(pingback) 252 else: 253 self.info('tried to ping, but disconnected yo') 254 255 self._pingDC = reactor.callLater(self._pingInterval, 256 self._ping)
257
258 - def _pingCheck(self):
259 self._pingCheckDC = None 260 if (self.remote and 261 (time.time() - self._lastPingback > self._pingCheckInterval)): 262 self.info('no pingback in %f seconds, closing connection', 263 self._pingCheckInterval) 264 self._pingDisconnect() 265 else: 266 self._pingCheckDC = reactor.callLater(self._pingCheckInterval, 267 self._pingCheck)
268 - def stopPinging(self):
269 if self._pingCheckDC: 270 self._pingCheckDC.cancel() 271 self._pingCheckDC = None 272 273 if self._pingDC: 274 self._pingDC.cancel() 275 self._pingDC = None
276
277 - def _disconnect(self):
278 if self.remote: 279 self.remote.broker.transport.loseConnection()
280
281 - def setRemoteReference(self, remote):
282 BaseMedium.setRemoteReference(self, remote) 283 def stopPingingCb(x): 284 self.debug('stop pinging') 285 self.stopPinging()
286 self.remote.notifyOnDisconnect(stopPingingCb) 287 288 self.startPinging(self._disconnect) 289