1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """base classes for PB client-side mediums.
23 """
24
25 import time
26
27 from twisted.spread import pb
28 from twisted.internet import defer, reactor
29 from zope.interface import implements
30
31 from flumotion.common import log, interfaces, bundleclient, errors
32 from flumotion.common import messages
33 from flumotion.common.netutils import addressGetHost
34 from flumotion.configure import configure
35 from flumotion.twisted import pb as fpb
36
37 __version__ = "$Rev: 6964 $"
38
39
41 """
42 I am a base interface for PB clients interfacing with PB server-side
43 avatars.
44 Used by admin/worker/component to talk to manager's vishnu,
45 and by job to talk to worker's brain.
46
47 @ivar remote: a remote reference to the server-side object on
48 which perspective_(methodName) methods can be called
49 @type remote: L{twisted.spread.pb.RemoteReference}
50 @type bundleLoader: L{flumotion.common.bundleclient.BundleLoader}
51 """
52
53
54
55 implements(interfaces.IMedium)
56 logCategory = "basemedium"
57 remoteLogName = "baseavatar"
58
59 remote = None
60 bundleLoader = None
61
63 """
64 Set the given remoteReference as the reference to the server-side
65 avatar.
66
67 @param remoteReference: L{twisted.spread.pb.RemoteReference}
68 """
69 self.debug('%r.setRemoteReference: %r' % (self, remoteReference))
70 self.remote = remoteReference
71 def nullRemote(x):
72 self.debug('%r: disconnected from %r' % (self, self.remote))
73 self.remote = None
74 self.remote.notifyOnDisconnect(nullRemote)
75
76 self.bundleLoader = bundleclient.BundleLoader(self.callRemote)
77
78
79 tarzan = None
80 jane = None
81 try:
82 transport = remoteReference.broker.transport
83 tarzan = transport.getHost()
84 jane = transport.getPeer()
85 except Exception, e:
86 self.debug("could not get connection info, reason %r" % e)
87 if tarzan and jane:
88 self.debug("connection is from me on %s to remote on %s" % (
89 addressGetHost(tarzan),
90 addressGetHost(jane)))
91
93 """
94 Does the medium have a remote reference to a server-side avatar ?
95 """
96 return self.remote != None
97
99 """
100 Call the given method with the given arguments remotely on the
101 server-side avatar.
102
103 Gets serialized to server-side perspective_ methods.
104
105 @param level: the level we should log at (log.DEBUG, log.INFO, etc)
106 @type level: int
107 @param stackDepth: the number of stack frames to go back to get
108 file and line information, negative or zero.
109 @type stackDepth: non-positive int
110 @param name: name of the remote method
111 @type name: str
112 """
113 if level is not None:
114 debugClass = str(self.__class__).split(".")[-1].upper()
115 startArgs = [self.remoteLogName, debugClass, name]
116 format, debugArgs = log.getFormatArgs(
117 '%s --> %s: callRemote(%s, ', startArgs,
118 ')', (), args, kwargs)
119 logKwArgs = self.doLog(level, stackDepth - 1,
120 format, *debugArgs)
121
122 if not self.remote:
123 self.warning('Tried to callRemote(%s), but we are disconnected'
124 % name)
125 return defer.fail(errors.NotConnectedError())
126
127 def callback(result):
128 format, debugArgs = log.getFormatArgs(
129 '%s <-- %s: callRemote(%s, ', startArgs,
130 '): %s', (log.ellipsize(result),), args, kwargs)
131 self.doLog(level, -1, format, *debugArgs, **logKwArgs)
132 return result
133
134 def errback(failure):
135 format, debugArgs = log.getFormatArgs(
136 '%s <-- %s: callRemote(%s, ', startArgs,
137 '): %r', (failure,), args, kwargs)
138 self.doLog(level, -1, format, *debugArgs, **logKwArgs)
139 return failure
140
141 d = self.remote.callRemote(name, *args, **kwargs)
142 if level is not None:
143 d.addCallbacks(callback, errback)
144 return d
145
147 """
148 Call the given method with the given arguments remotely on the
149 server-side avatar.
150
151 Gets serialized to server-side perspective_ methods.
152 """
153 return self.callRemoteLogging(log.DEBUG, -1, name, *args,
154 **kwargs)
155
157 """
158 Returns the given function in the given module, loading the
159 module from a bundle.
160
161 If we can't find the bundle for the given module, or if the
162 given module does not contain the requested function, we will
163 raise L{flumotion.common.errors.RemoteRunError} (perhaps a
164 poorly chosen error). If importing the module raises an
165 exception, that exception will be passed through unmodified.
166
167 @param module: module the function lives in
168 @type module: str
169 @param function: function to run
170 @type function: str
171
172 @returns: a callable, the given function in the given module.
173 """
174 def gotModule(mod):
175 if hasattr(mod, function):
176 return getattr(mod, function)
177 else:
178 msg = 'No procedure named %s in module %s' % (function,
179 module)
180 self.warning('%s', msg)
181 raise errors.RemoteRunError(msg)
182
183 def gotModuleError(failure):
184 failure.trap(errors.NoBundleError)
185 msg = 'Failed to find bundle for module %s' % module
186 self.warning('%s', msg)
187 raise errors.RemoteRunError(msg)
188
189 d = self.bundleLoader.loadModule(module)
190 d.addCallbacks(gotModule, gotModuleError)
191 return d
192
194 """
195 Runs the given function in the given module with the given
196 arguments.
197
198 This method calls getBundledFunction and then invokes the
199 function. Any error raised by getBundledFunction or by invoking
200 the function will be passed through unmodified.
201
202 Callers that expect to return their result over a PB connection
203 should catch nonserializable exceptions so as to prevent nasty
204 backtraces in the logs.
205
206 @param module: module the function lives in
207 @type module: str
208 @param function: function to run
209 @type function: str
210
211 @returns: the return value of the given function in the module.
212 """
213 self.debug('runBundledFunction(%r, %r)', module, function)
214 def gotFunction(proc):
215 def invocationError(failure):
216 self.warning('Exception raised while calling '
217 '%s.%s(*args=%r, **kwargs=%r): %s',
218 module, function, args, kwargs,
219 log.getFailureMessage(failure))
220 return failure
221
222 self.debug('calling %s.%s(%r, %r)', module, function, args,
223 kwargs)
224 d = defer.maybeDeferred(proc, *args, **kwargs)
225 d.addErrback(invocationError)
226 return d
227
228 d = self.getBundledFunction(module, function)
229 d.addCallback(gotFunction)
230 return d
231
232
262
263 if self.remote:
264 self.log('pinging')
265 d = self.callRemoteLogging(log.LOG, 0, 'ping')
266 d.addCallbacks(pingback, pingFailed)
267 else:
268 self.info('tried to ping, but disconnected yo')
269
270 self._pingDC = reactor.callLater(self._pingInterval,
271 self._ping)
272
291
293 if self.remote:
294 self.remote.broker.transport.loseConnection()
295
301 self.remote.notifyOnDisconnect(stopPingingCb)
302
303 self.startPinging(self._disconnect)
304
306 """
307 Sets a marker that will be prefixed to the log strings. Setting this
308 marker to multiple elements at a time helps debugging.
309 @param marker: A string to prefix all the log strings.
310 @type marker: str
311 """
312 self.writeMarker(marker, level)
313