1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 model abstraction for administration clients supporting different views
24 """
25
26 from twisted.internet import error, defer, reactor
27 from zope.interface import implements
28
29 from flumotion.common import common, errors, interfaces, log
30 from flumotion.common import medium
31 from flumotion.common import messages, signals
32 from flumotion.common import planet, worker
33 from flumotion.common.i18n import N_, gettexter
34 from flumotion.configure import configure
35 from flumotion.twisted import pb as fpb
36
37 __version__ = "$Rev: 6979 $"
38 T_ = gettexter()
39
40
42 perspectiveInterface = interfaces.IAdminMedium
43
44 - def __init__(self, medium, extraTenacious=False, maxDelay=20):
56
61
66
102
103
107
108 def error(failure):
109 if self.extraTenacious:
110 self.debug('connection problem to %s: %s',
111 self._connector.getDestination(),
112 log.getFailureMessage(failure))
113 self.debug('we are tenacious, so trying again later')
114 self.disconnect()
115 elif failure.check(errors.ConnectionFailedError):
116 self.debug("emitting connection-failed")
117 self.medium.emit('connection-failed', "I failed my master")
118 self.debug("emitted connection-failed")
119 elif failure.check(errors.ConnectionRefusedError):
120 self.debug("emitting connection-refused")
121 self.medium.emit('connection-refused')
122 self.debug("emitted connection-refused")
123 elif failure.check(errors.NotAuthenticatedError):
124
125 self.debug("emitting connection-refused")
126 self.medium.emit('connection-refused')
127 self.debug("emitted connection-refused")
128 else:
129 self.medium.emit('connection-error', failure)
130 self.warning('connection error to %s:: %s',
131 self._connector.getDestination(),
132 log.getFailureMessage(failure))
133
134
135 d.addCallbacks(success, error)
136 return d
137
138
139
140 -class AdminModel(medium.PingingMedium, signals.SignalMixin):
141 """
142 I live in the admin client.
143 I am a data model for any admin view implementing a UI to
144 communicate with one manager.
145 I send signals when things happen.
146
147 Manager calls on us through L{flumotion.manager.admin.AdminAvatar}
148 """
149 __signals__ = ('connected', 'disconnected', 'connection-refused',
150 'connection-failed', 'connection-error', 'reloading',
151 'message', 'update')
152
153 logCategory = 'adminmodel'
154
155 implements(interfaces.IAdminMedium)
156
157
158 planet = None
159
161
162 self.connectionInfo = None
163 self.keepTrying = None
164 self._writeConnection = True
165
166 self.managerId = '<uninitialized>'
167
168 self.connected = False
169 self.clientFactory = None
170
171 self._deferredConnect = None
172
173 self._components = {}
174 self.planet = None
175 self._workerHeavenState = None
176
177 - def connectToManager(self, connectionInfo, keepTrying=False,
178 writeConnection=True):
179 'Connect to a host.'
180 assert self.clientFactory is None
181
182 self.connectionInfo = connectionInfo
183 self._writeConnection = writeConnection
184
185
186
187
188 self.managerId = str(connectionInfo)
189 self.logName = self.managerId
190
191 self.info('Connecting to manager %s with %s',
192 self.managerId, connectionInfo.use_ssl and 'SSL' or 'TCP')
193
194 self.clientFactory = AdminClientFactory(self,
195 extraTenacious=keepTrying,
196 maxDelay=20)
197 self.clientFactory.startLogin(connectionInfo.authenticator)
198
199 if connectionInfo.use_ssl:
200 common.assertSSLAvailable()
201 from twisted.internet import ssl
202 reactor.connectSSL(connectionInfo.host, connectionInfo.port,
203 self.clientFactory, ssl.ClientContextFactory())
204 else:
205 reactor.connectTCP(connectionInfo.host, connectionInfo.port,
206 self.clientFactory)
207
208 def connected(model, d):
209
210 d.callback(model)
211
212 def disconnected(model, d):
213
214
215 if not keepTrying:
216 d.errback(errors.ConnectionFailedError('Lost connection'))
217
218 def connection_refused(model, d):
219 if not keepTrying:
220 d.errback(errors.ConnectionRefusedError())
221
222 def connection_failed(model, reason, d):
223 if not keepTrying:
224 d.errback(errors.ConnectionFailedError(reason))
225
226 def connection_error(model, failure, d):
227 if not keepTrying:
228 d.errback(failure)
229
230 d = defer.Deferred()
231 ids = []
232 ids.append(self.connect('connected', connected, d))
233 ids.append(self.connect('disconnected', disconnected, d))
234 ids.append(self.connect('connection-refused', connection_refused, d))
235 ids.append(self.connect('connection-failed', connection_failed, d))
236 ids.append(self.connect('connection-error', connection_error, d))
237
238 def success(model):
239 map(self.disconnect, ids)
240 self._deferredConnect = None
241 return model
242
243 def failure(f):
244 map(self.disconnect, ids)
245 self._deferredConnect = None
246 return f
247
248 d.addCallbacks(success, failure)
249 self._deferredConnect = d
250 return d
251
253 self.debug('shutting down')
254 if self.clientFactory is not None:
255
256
257 self.clientFactory.stopTrying()
258 self.clientFactory.disconnect()
259 self.clientFactory = None
260
261 if self._deferredConnect is not None:
262
263 self.debug('cancelling connection attempt')
264 self._deferredConnect.errback(errors.ConnectionCancelledError())
265
267 """Close any existing connection to the manager and
268 reconnect."""
269 self.debug('asked to log in again')
270 self.shutdown()
271 return self.connectToManager(self.connectionInfo, keepTrying)
272
273
275 return self.managerId
276
278 return '%s:%s (%s)' % (self.connectionInfo.host,
279 self.connectionInfo.port,
280 self.connectionInfo.use_ssl
281 and 'https' or 'http')
282
283
285 assert self.planet
286 return '%s (%s)' % (self.planet.get('name'), self.managerId)
287
303
305 self.debug("setRemoteReference %r", remoteReference)
306 def gotPlanetState(planet):
307 self.planet = planet
308
309 self.planet.admin = self
310 self.debug('got planet state')
311 return self.callRemote('getWorkerHeavenState')
312
313 def gotWorkerHeavenState(whs):
314 self._workerHeavenState = whs
315 self.debug('got worker state')
316
317 self.debug('Connected to manager and retrieved all state')
318 self.connected = True
319 if self._writeConnection:
320 writeConnection()
321 self.emit('connected')
322
323 def writeConnection():
324 i = self.connectionInfo
325 if not (i.authenticator.username
326 and i.authenticator.password):
327 self.log('not caching connection information')
328 return
329 s = ''.join(['<connection>',
330 '<host>%s</host>' % i.host,
331 '<manager>%s</manager>' % self.planet.get('name'),
332 '<port>%d</port>' % i.port,
333 '<use_insecure>%d</use_insecure>'
334 % ((not i.use_ssl) and 1 or 0),
335 '<user>%s</user>' % i.authenticator.username,
336 '<passwd>%s</passwd>' % i.authenticator.password,
337 '</connection>'])
338
339 import os
340 import md5
341 md5sum = md5.new(s).hexdigest()
342 f = os.path.join(configure.registrydir, '%s.connection' % md5sum)
343 try:
344 h = open(f, 'w')
345 h.write(s)
346 h.close()
347 except Exception, e:
348 self.info('failed to write connection cache file %s: %s',
349 f, log.getExceptionMessage(e))
350
351
352 medium.PingingMedium.setRemoteReference(self, remoteReference)
353
354
355 def remoteDisconnected(remoteReference):
356 self.debug("emitting disconnected")
357 self.connected = False
358 self.emit('disconnected')
359 self.debug("emitted disconnected")
360 self.remote.notifyOnDisconnect(remoteDisconnected)
361
362 d = self.callRemote('getPlanetState')
363 d.addCallback(gotPlanetState)
364 d.addCallback(gotWorkerHeavenState)
365 return d
366
367
368
369
372
373
375 """
376 Call the given method on the given component with the given args.
377
378 @param componentState: component to call the method on
379 @type componentState: L{flumotion.common.planet.AdminComponentState}
380 @param methodName: name of method to call; serialized to a
381 remote_methodName on the worker's medium
382
383 @rtype: L{twisted.internet.defer.Deferred}
384 """
385 d = self.callRemote('componentCallRemote',
386 componentState, methodName,
387 *args, **kwargs)
388 def errback(failure):
389 msg = None
390 if failure.check(errors.NoMethodError):
391 msg = "Remote method '%s' does not exist." % methodName
392 msg += "\n" + failure.value
393 else:
394 msg = log.getFailureMessage(failure)
395
396
397
398
399 self.warning(msg)
400 m = messages.Warning(T_(N_("Internal error in component.")),
401 debug=msg)
402 componentState.observe_append('messages', m)
403 return failure
404
405 d.addErrback(errback)
406
407 return d
408
410 """
411 Call the the given method on the given worker with the given args.
412
413 @param workerName: name of the worker to call the method on
414 @param methodName: name of method to call; serialized to a
415 remote_methodName on the worker's medium
416
417 @rtype: L{twisted.internet.defer.Deferred}
418 """
419 return self.callRemote('workerCallRemote', workerName,
420 methodName, *args, **kwargs)
421
422
424 return self.callRemote('loadConfiguration', xml_string)
425
428
431
432
435
438
439 - def workerRun(self, workerName, moduleName, functionName, *args, **kwargs):
440 """
441 Run the given function and args on the given worker. If the
442 worker does not already have the module, or it is out of date,
443 it will be retrieved from the manager.
444
445 @rtype: L{twisted.internet.defer.Deferred} firing an
446 L{flumotion.common.messages.Result}
447 """
448 return self.workerCallRemote(workerName, 'runFunction', moduleName,
449 functionName, *args, **kwargs)
450
452 return self.callRemote('getWizardEntries',
453 wizardTypes, provides, accepts)
454
456 return self._workerHeavenState
457