Package flumotion :: Package twisted :: Module pb
[hide private]

Source Code for Module flumotion.twisted.pb

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_pb -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Flumotion Perspective Broker using keycards 
 24   
 25  Inspired by L{twisted.spread.pb} 
 26  """ 
 27   
 28  import time 
 29   
 30  from twisted.cred import checkers, credentials, error 
 31  from twisted.cred.portal import IRealm, Portal 
 32  from twisted.internet import protocol, defer, reactor 
 33  from twisted.python import log, reflect, failure 
 34  from twisted.spread import pb, flavors 
 35  from twisted.spread.pb import PBClientFactory 
 36   
 37  from flumotion.configure import configure 
 38  from flumotion.common import keycards, interfaces, common, errors 
 39  from flumotion.common import log as flog 
 40  from flumotion.twisted import reflect as freflect 
 41  from flumotion.twisted import credentials as fcredentials 
 42  from flumotion.twisted.compat import implements 
 43  # TODO: 
 44  #   merge FMCF back into twisted 
 45   
 46  ### Keycard-based FPB objects 
 47   
 48  # we made three changes to the standard PBClientFactory: 
 49  # 1) the root object has a getKeycardClasses() call that the server 
 50  #    uses to tell clients about the interfaces it supports 
 51  # 2) you can request a specific interface for the avatar to 
 52  #    implement, instead of only IPerspective 
 53  # 3) you send in a keycard, on which you can set a preference for an avatarId 
 54  # this way you can request a different avatarId than the user you authenticate 
 55  # with, or you can login without a username 
 56   
57 -class FPBClientFactory(pb.PBClientFactory, flog.Loggable):
58 """ 59 I am an extended Perspective Broker client factory using generic 60 keycards for login. 61 62 63 @ivar keycard: the keycard used last for logging in; set after 64 self.login has completed 65 @type keycard: L{keycards.Keycard} 66 @ivar medium: the client-side referenceable for the PB server 67 to call on, and for the client to call to the 68 PB server 69 @type medium: L{flumotion.common.medium.BaseMedium} 70 @ivar perspectiveInterface: the interface we want to request a perspective 71 for 72 @type perspectiveInterface: subclass of 73 L{flumotion.common.interfaces.IMedium} 74 """ 75 logCategory = "FPBClientFactory" 76 keycard = None 77 medium = None 78 perspectiveInterface = None # override in subclass 79
80 - def getKeycardClasses(self):
81 """ 82 Ask the remote PB server for all the keycard interfaces it supports. 83 84 @rtype: L{defer.Deferred} returning list of str 85 """ 86 def getRootObjectCb(root): 87 return root.callRemote('getKeycardClasses')
88 89 d = self.getRootObject() 90 d.addCallback(getRootObjectCb) 91 return d
92
93 - def login(self, authenticator):
94 """ 95 Login, respond to challenges, and eventually get perspective 96 from remote PB server. 97 98 Currently only credentials implementing IUsernamePassword are 99 supported. 100 101 @return: Deferred of RemoteReference to the perspective. 102 """ 103 assert authenticator, "I really do need an authenticator" 104 assert not isinstance(authenticator, keycards.Keycard) 105 interfaces = [] 106 if self.perspectiveInterface: 107 self.debug('perspectiveInterface is %r' % self.perspectiveInterface) 108 interfaces.append(self.perspectiveInterface) 109 else: 110 self.warning('No perspectiveInterface set on %r' % self) 111 if not pb.IPerspective in interfaces: 112 interfaces.append(pb.IPerspective) 113 interfaces = [reflect.qual(interface) 114 for interface in interfaces] 115 116 def getKeycardClassesCb(keycardClasses): 117 self.debug('supported keycard classes: %r' % keycardClasses) 118 d = authenticator.issue(keycardClasses) 119 return d
120 121 def issueCb(keycard): 122 self.keycard = keycard 123 self.debug('using keycard: %r' % self.keycard) 124 return self.keycard 125 126 d = self.getKeycardClasses() 127 d.addCallback(getKeycardClassesCb) 128 d.addCallback(issueCb) 129 d.addCallback(lambda r: self.getRootObject()) 130 d.addCallback(self._cbSendKeycard, authenticator, self.medium, 131 interfaces) 132 return d 133 134 # we are a different kind of PB client, so warn
135 - def _cbSendUsername(self, root, username, password, avatarId, client, interfaces):
136 self.warning("you really want to use cbSendKeycard")
137 138
139 - def _cbSendKeycard(self, root, authenticator, client, interfaces, count=0):
140 self.debug("_cbSendKeycard(root=%r, authenticator=%r, client=%r, " \ 141 "interfaces=%r, count=%d" % ( 142 root, authenticator, client, interfaces, count)) 143 count = count + 1 144 d = root.callRemote("login", self.keycard, client, *interfaces) 145 return d.addCallback(self._cbLoginCallback, root, authenticator, client, 146 interfaces, count)
147 148 # we can get either a keycard, None (?) or a remote reference
149 - def _cbLoginCallback(self, result, root, authenticator, client, interfaces, 150 count):
151 if count > 5: 152 # too many recursions, server is h0rked 153 self.warning('Too many recursions, internal error.') 154 self.debug("FPBClientFactory(): result %r" % result) 155 156 if not result: 157 self.warning('No result, raising.') 158 raise error.UnauthorizedLogin() 159 160 if isinstance(result, pb.RemoteReference): 161 # everything done, return reference 162 self.debug('Done, returning result %r' % result) 163 return result 164 165 # must be a keycard 166 keycard = result 167 if not keycard.state == keycards.AUTHENTICATED: 168 self.debug("FPBClientFactory(): requester needs to resend %r" % 169 keycard) 170 d = authenticator.respond(keycard) 171 def _loginAgainCb(keycard): 172 d = root.callRemote("login", keycard, client, *interfaces) 173 return d.addCallback(self._cbLoginCallback, root, authenticator, 174 client, interfaces, count)
175 d.addCallback(_loginAgainCb) 176 return d 177 178 self.debug("FPBClientFactory(): authenticated %r" % keycard) 179 return keycard 180
181 -class ReconnectingPBClientFactory(pb.PBClientFactory, flog.Loggable, 182 protocol.ReconnectingClientFactory):
183 """ 184 Reconnecting client factory for normal PB brokers. 185 186 Users of this factory call startLogin to start logging in, and should 187 override getLoginDeferred to get the deferred returned from the PB server 188 for each login attempt. 189 """ 190
191 - def __init__(self):
192 pb.PBClientFactory.__init__(self) 193 self._doingLogin = False
194
195 - def clientConnectionFailed(self, connector, reason):
196 log.msg("connection failed, reason %r" % reason) 197 pb.PBClientFactory.clientConnectionFailed(self, connector, reason) 198 RCF = protocol.ReconnectingClientFactory 199 RCF.clientConnectionFailed(self, connector, reason)
200
201 - def clientConnectionLost(self, connector, reason):
202 log.msg("connection lost, reason %r" % reason) 203 pb.PBClientFactory.clientConnectionLost(self, connector, reason, 204 reconnecting=True) 205 RCF = protocol.ReconnectingClientFactory 206 RCF.clientConnectionLost(self, connector, reason)
207
208 - def clientConnectionMade(self, broker):
209 log.msg("connection made") 210 self.resetDelay() 211 pb.PBClientFactory.clientConnectionMade(self, broker) 212 if self._doingLogin: 213 d = self.login(self._credentials, self._client) 214 self.gotDeferredLogin(d)
215
216 - def startLogin(self, credentials, client=None):
217 self._credentials = credentials 218 self._client = client 219 220 self._doingLogin = True
221 222 # methods to override
223 - def gotDeferredLogin(self, deferred):
224 """ 225 The deferred from login is now available. 226 """ 227 raise NotImplementedError
228
229 -class ReconnectingFPBClientFactory(FPBClientFactory, 230 protocol.ReconnectingClientFactory):
231 """ 232 Reconnecting client factory for FPB brokers (using keycards for login). 233 234 Users of this factory call startLogin to start logging in. 235 Override getLoginDeferred to get a handle to the deferred returned 236 from the PB server. 237 """ 238
239 - def __init__(self):
240 FPBClientFactory.__init__(self) 241 self._doingLogin = False 242 self._doingGetPerspective = False
243
244 - def clientConnectionFailed(self, connector, reason):
245 log.msg("connection failed, reason %r" % reason) 246 FPBClientFactory.clientConnectionFailed(self, connector, reason) 247 RCF = protocol.ReconnectingClientFactory 248 RCF.clientConnectionFailed(self, connector, reason)
249
250 - def clientConnectionLost(self, connector, reason):
251 log.msg("connection lost, reason %r" % reason) 252 FPBClientFactory.clientConnectionLost(self, connector, reason, 253 reconnecting=True) 254 RCF = protocol.ReconnectingClientFactory 255 RCF.clientConnectionLost(self, connector, reason)
256
257 - def clientConnectionMade(self, broker):
258 log.msg("connection made") 259 self.resetDelay() 260 FPBClientFactory.clientConnectionMade(self, broker) 261 if self._doingLogin: 262 d = self.login(self._authenticator) 263 self.gotDeferredLogin(d)
264 265 # TODO: This is a poorly named method; it just provides the appropriate 266 # authentication information, and doesn't actually _start_ login at all.
267 - def startLogin(self, authenticator):
268 assert not isinstance(authenticator, keycards.Keycard) 269 self._authenticator = authenticator 270 self._doingLogin = True
271 272 # methods to override
273 - def gotDeferredLogin(self, deferred):
274 """ 275 The deferred from login is now available. 276 """ 277 raise NotImplementedError
278 279 ### FIXME: this code is an adaptation of twisted/spread/pb.py 280 # it allows you to login to a FPB server requesting interfaces other than 281 # IPerspective. 282 # in other terms, you can request different "kinds" of avatars from the same 283 # PB server. 284 # this code needs to be sent upstream to Twisted
285 -class _FPortalRoot:
286 """ 287 Root object, used to login to bouncer. 288 """ 289 290 implements(flavors.IPBRoot) 291
292 - def __init__(self, bouncerPortal):
293 """ 294 @type bouncerPortal: L{flumotion.twisted.portal.BouncerPortal} 295 """ 296 self.bouncerPortal = bouncerPortal
297
298 - def rootObject(self, broker):
299 return _BouncerWrapper(self.bouncerPortal, broker)
300
301 -class _BouncerWrapper(pb.Referenceable, flog.Loggable):
302 303 logCategory = "_BouncerWrapper" 304
305 - def __init__(self, bouncerPortal, broker):
306 self.bouncerPortal = bouncerPortal 307 self.broker = broker
308
309 - def remote_getKeycardClasses(self):
310 """ 311 @returns: the fully-qualified class names of supported keycard 312 interfaces 313 @rtype: L{defer.Deferred} firing list of str 314 """ 315 return self.bouncerPortal.getKeycardClasses()
316
317 - def remote_login(self, keycard, mind, *interfaces):
318 """ 319 Start of keycard login. 320 321 @param interfaces: list of fully qualified names of interface objects 322 323 @returns: one of 324 - a L{flumotion.common.keycards.Keycard} when more steps 325 need to be performed 326 - a L{twisted.spread.pb.AsReferenceable} when authentication 327 has succeeded, which will turn into a 328 L{twisted.spread.pb.RemoteReference} on the client side 329 - a L{twisted.cred.error.UnauthorizedLogin} when authentication 330 is denied 331 """ 332 # corresponds with FPBClientFactory._cbSendKeycard 333 self.log("remote_login(keycard=%s, *interfaces=%r" % (keycard, interfaces)) 334 interfaces = [freflect.namedAny(interface) for interface in interfaces] 335 d = self.bouncerPortal.login(keycard, mind, *interfaces) 336 d.addCallback(self._authenticateCallback, mind, *interfaces) 337 return d
338
339 - def _authenticateCallback(self, result, mind, *interfaces):
340 self.log("_authenticateCallback(result=%r, mind=%r, interfaces=%r" % (result, mind, interfaces)) 341 # FIXME: coverage indicates that "not result" does not happen, 342 # presumably because a Failure is triggered before us 343 if not result: 344 return failure.Failure(error.UnauthorizedLogin()) 345 346 # if the result is a keycard, we're not yet ready 347 if isinstance(result, keycards.Keycard): 348 return result 349 350 # authenticated, so the result is the tuple 351 # FIXME: our keycard should be stored higher up since it was authd 352 # then cleaned up sometime in the future 353 # for that we probably need to pass it along 354 return self._loggedIn(result)
355
356 - def _loggedIn(self, (interface, perspective, logout)):
357 self.broker.notifyOnDisconnect(logout) 358 return pb.AsReferenceable(perspective, "perspective")
359
360 -class Authenticator(flog.Loggable, pb.Referenceable):
361 """ 362 I am an object used by FPB clients to create keycards for me 363 and respond to challenges. 364 365 I encapsulate keycard-related data, plus secrets which are used locally 366 and not put on the keycard. 367 368 I can be serialized over PB connections to a RemoteReference and then 369 adapted with RemoteAuthenticator to present the same interface. 370 371 @cvar username: a username to log in with 372 @type username: str 373 @cvar password: a password to log in with 374 @type password: str 375 @cvar address: an address to log in from 376 @type address: str 377 @cvar avatarId: the avatarId we want to request from the PB server 378 @type avatarId: str 379 """ 380 logCategory = "authenticator" 381 382 avatarId = None 383 384 username = None 385 password = None 386 address = None 387 # FIXME: we can add ssh keys and similar here later on 388
389 - def __init__(self, **kwargs):
390 for key in kwargs: 391 setattr(self, key, kwargs[key])
392
393 - def issue(self, keycardClasses):
394 """ 395 Issue a keycard that implements one of the given interfaces. 396 397 @param keycardClasses: list of fully qualified keycard classes 398 @type keycardClasses: list of str 399 400 @rtype: L{defer.Deferred} firing L{keycards.Keycard} 401 """ 402 # this method returns a deferred so we present the same interface 403 # as the RemoteAuthenticator adapter 404 405 # construct a list of keycard interfaces we can support right now 406 supported = [] 407 # address is allowed to be None 408 if self.username is not None and self.password is not None: 409 # We only want to support challenge-based keycards, for 410 # security. Maybe later we want this to be configurable 411 # supported.append(keycards.KeycardUACPP) 412 supported.append(keycards.KeycardUACPCC) 413 supported.append(keycards.KeycardUASPCC) 414 415 # expand to fully qualified names 416 supported = [reflect.qual(k) for k in supported] 417 418 for i in keycardClasses: 419 if i in supported: 420 self.debug('Keycard interface %s supported, looking up' % i) 421 name = i.split(".")[-1] 422 methodName = "issue_%s" % name 423 method = getattr(self, methodName) 424 keycard = method() 425 self.debug('Issuing keycard %r of class %s' % ( 426 keycard, name)) 427 keycard.avatarId = self.avatarId 428 return defer.succeed(keycard) 429 430 self.debug('Could not issue a keycard') 431 return defer.succeed(None)
432 433 # non-challenge types
434 - def issue_KeycardUACPP(self):
435 return keycards.KeycardUACPP(self.username, self.password, 436 self.address)
437 438 # challenge types
439 - def issue_KeycardUACPCC(self):
440 return keycards.KeycardUACPCC(self.username, self.address)
441
442 - def issue_KeycardUASPCC(self):
443 return keycards.KeycardUASPCC(self.username, self.address)
444
445 - def respond(self, keycard):
446 """ 447 Respond to a challenge on the given keycard, based on the secrets 448 we have. 449 450 @param keycard: the keycard with the challenge to respond to 451 @type keycard: L{keycards.Keycard} 452 453 @rtype: L{defer.Deferred} firing a {keycards.Keycard} 454 @returns: a deferred firing the keycard with a response set 455 """ 456 self.debug('responding to challenge on keycard %r' % keycard) 457 methodName = "respond_%s" % keycard.__class__.__name__ 458 method = getattr(self, methodName) 459 return defer.succeed(method(keycard))
460
461 - def respond_KeycardUACPCC(self, keycard):
462 self.debug('setting password') 463 keycard.setPassword(self.password) 464 return keycard
465
466 - def respond_KeycardUASPCC(self, keycard):
467 self.debug('setting password') 468 keycard.setPassword(self.password) 469 return keycard
470 471 ### pb.Referenceable methods
472 - def remote_issue(self, interfaces):
473 return self.issue(interfaces)
474
475 - def remote_respond(self, keycard):
476 return self.respond(keycard)
477
478 -class RemoteAuthenticator:
479 """ 480 I am an adapter for a pb.RemoteReference to present the same interface 481 as L{Authenticator} 482 """ 483 484 avatarId = None # not serialized 485 username = None # for convenience, will always be None 486 password = None # for convenience, will always be None 487
488 - def __init__(self, remoteReference):
489 self._remote = remoteReference
490
491 - def issue(self, interfaces):
492 def issueCb(keycard): 493 keycard.avatarId = self.avatarId 494 return keycard
495 496 d = self._remote.callRemote('issue', interfaces) 497 d.addCallback(issueCb) 498 return d
499
500 - def respond(self, keycard):
501 return self._remote.callRemote('respond', keycard)
502 503
504 -class Referenceable(pb.Referenceable, flog.Loggable):
505 """ 506 @cvar remoteLogName: name to use to log the other side of the connection 507 @type remoteLogName: str 508 """ 509 logCategory = 'referenceable' 510 remoteLogName = 'remote' 511 512 513 # a referenceable that logs receiving remote messages
514 - def remoteMessageReceived(self, broker, message, args, kwargs):
515 args = broker.unserialize(args) 516 kwargs = broker.unserialize(kwargs) 517 method = getattr(self, "remote_%s" % message, None) 518 if method is None: 519 raise pb.NoSuchMethod("No such method: remote_%s" % (message,)) 520 521 level = flog.DEBUG 522 if message == 'ping': level = flog.LOG 523 524 debugClass = self.logCategory.upper() 525 # all this malarkey is to avoid actually interpolating variables 526 # if it is not needed 527 startArgs = [self.remoteLogName, debugClass, message] 528 format, debugArgs = flog.getFormatArgs( 529 '%s --> %s: remote_%s(', startArgs, 530 ')', (), args, kwargs) 531 # log going into the method 532 logKwArgs = self.doLog(level, method, format, *debugArgs) 533 534 # invoke the remote_ method 535 try: 536 state = method(*args, **kwargs) 537 except TypeError: 538 self.warning("%s didn't accept %s and %s" % (method, args, kwargs)) 539 raise 540 541 # log coming out of the method 542 if isinstance(state, defer.Deferred): 543 # for a deferred, we currently can't log a better location than 544 # the def line for the function/instance that we called above 545 def callback(result): 546 format, debugArgs = flog.getFormatArgs( 547 '%s <-- %s: remote_%s(', startArgs, 548 '): %r', (flog.ellipsize(result), ), args, kwargs) 549 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 550 return result
551 def errback(failure): 552 format, debugArgs = flog.getFormatArgs( 553 '%s <-- %s: remote_%s(', startArgs, 554 '): failure %r', (failure, ), args, kwargs) 555 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 556 return failure
557 558 state.addCallback(callback) 559 state.addErrback(errback) 560 else: 561 format, debugArgs = flog.getFormatArgs( 562 '%s <-- %s: remote_%s(', startArgs, 563 '): %r', (flog.ellipsize(state), ), args, kwargs) 564 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 565 566 return broker.serialize(state, self.perspective) 567
568 -class Avatar(pb.Avatar, flog.Loggable):
569 """ 570 @cvar remoteLogName: name to use to log the other side of the connection 571 @type remoteLogName: str 572 """ 573 logCategory = 'avatar' 574 remoteLogName = 'remote' 575
576 - def __init__(self, avatarId):
577 self.avatarId = avatarId 578 self.logName = avatarId 579 self.mind = None 580 self.debug("created new Avatar with id %s", avatarId)
581 582 # a referenceable that logs receiving remote messages
583 - def perspectiveMessageReceived(self, broker, message, args, kwargs):
584 args = broker.unserialize(args) 585 kwargs = broker.unserialize(kwargs) 586 method = getattr(self, "perspective_%s" % message, None) 587 if method is None: 588 raise pb.NoSuchMethod("No such method: perspective_%s" % (message,)) 589 590 level = flog.DEBUG 591 if message == 'ping': level = flog.LOG 592 debugClass = self.logCategory.upper() 593 startArgs = [self.remoteLogName, debugClass, message] 594 format, debugArgs = flog.getFormatArgs( 595 '%s --> %s: perspective_%s(', startArgs, 596 ')', (), args, kwargs) 597 # log going into the method 598 logKwArgs = self.doLog(level, method, format, *debugArgs) 599 600 # invoke the perspective_ method 601 try: 602 state = method(*args, **kwargs) 603 except TypeError: 604 self.debug("%s didn't accept %s and %s" % (method, args, kwargs)) 605 raise 606 except pb.Error, e: 607 format, debugArgs = flog.getFormatArgs( 608 '%s <-- %s: perspective_%s(', startArgs, 609 '): pb.Error %r', (e, ), args, kwargs) 610 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 611 raise e 612 613 # log coming out of the method 614 if isinstance(state, defer.Deferred): 615 # for a deferred, we currently can't log a better location than 616 # the def line for the function/instance that we called above 617 def callback(result): 618 format, debugArgs = flog.getFormatArgs( 619 '%s <-- %s: perspective_%s(', startArgs, 620 '): %r', (flog.ellipsize(result), ), args, kwargs) 621 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 622 return result
623 def errback(failure): 624 format, debugArgs = flog.getFormatArgs( 625 '%s <-- %s: perspective_%s(', startArgs, 626 '): failure %r', (failure, ), args, kwargs) 627 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 628 return failure
629 630 state.addCallback(callback) 631 state.addErrback(errback) 632 else: 633 format, debugArgs = flog.getFormatArgs( 634 '%s <-- %s: perspective_%s(', startArgs, 635 '): %r', (flog.ellipsize(state), ), args, kwargs) 636 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 637 638 return broker.serialize(state, self, method, args, kwargs) 639
640 - def setMind(self, mind):
641 """ 642 Tell the avatar that the given mind has been attached. 643 This gives the avatar a way to call remotely to the client that 644 requested this avatar. 645 This is scheduled by the portal after the client has logged in. 646 647 @type mind: L{twisted.spread.pb.RemoteReference} 648 """ 649 self.mind = mind 650 def nullMind(x): 651 self.debug('%r: disconnected from %r' % (self, self.mind)) 652 self.mind = None
653 self.mind.notifyOnDisconnect(nullMind) 654 655 transport = self.mind.broker.transport 656 tarzan = transport.getHost() 657 jane = transport.getPeer() 658 if tarzan and jane: 659 self.debug("PB client connection seen by me is from me %s to %s" % ( 660 common.addressGetHost(tarzan), 661 common.addressGetHost(jane))) 662 self.log('Client attached is mind %s', mind) 663
664 - def mindCallRemoteLogging(self, level, stackDepth, name, *args, 665 **kwargs):
666 """ 667 Call the given remote method, and log calling and returning nicely. 668 669 @param level: the level we should log at (log.DEBUG, log.INFO, etc) 670 @type level: int 671 @param stackDepth: the number of stack frames to go back to get 672 file and line information, negative or zero. 673 @type stackDepth: non-positive int 674 @param name: name of the remote method 675 @type name: str 676 """ 677 if level is not None: 678 debugClass = str(self.__class__).split(".")[-1].upper() 679 startArgs = [self.remoteLogName, debugClass, name] 680 format, debugArgs = flog.getFormatArgs( 681 '%s --> %s: callRemote(%s, ', startArgs, 682 ')', (), args, kwargs) 683 logKwArgs = self.doLog(level, stackDepth - 1, format, 684 *debugArgs) 685 686 if not self.mind: 687 self.warning('Tried to mindCallRemote(%s), but we are ' 688 'disconnected', name) 689 return defer.fail(errors.NotConnectedError()) 690 691 def callback(result): 692 format, debugArgs = flog.getFormatArgs( 693 '%s <-- %s: callRemote(%s, ', startArgs, 694 '): %r', (flog.ellipsize(result), ), args, kwargs) 695 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 696 return result
697 698 def errback(failure): 699 format, debugArgs = flog.getFormatArgs( 700 '%s <-- %s: callRemote(%s', startArgs, 701 '): %r', (failure, ), args, kwargs) 702 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 703 return failure 704 705 d = self.mind.callRemote(name, *args, **kwargs) 706 if level is not None: 707 d.addCallbacks(callback, errback) 708 return d 709
710 - def mindCallRemote(self, name, *args, **kwargs):
711 """ 712 Call the given remote method, and log calling and returning nicely. 713 714 @param name: name of the remote method 715 @type name: str 716 """ 717 return self.mindCallRemoteLogging(flog.DEBUG, -1, name, *args, 718 **kwargs)
719
720 - def disconnect(self):
721 """ 722 Disconnect the remote PB client. If we are already disconnected, 723 do nothing. 724 """ 725 if self.mind: 726 return self.mind.broker.transport.loseConnection()
727
728 -class PingableAvatar(Avatar):
729 _pingCheckInterval = configure.heartbeatInterval * 2.5 730
731 - def perspective_ping(self):
732 self._lastPing = time.time() 733 return defer.succeed(True)
734
735 - def startPingChecking(self, disconnect):
736 self._lastPing = time.time() 737 self._pingCheckDisconnect = disconnect 738 self._pingCheck()
739
740 - def _pingCheck(self):
741 self._pingCheckDC = None 742 if time.time() - self._lastPing > self._pingCheckInterval: 743 self.info('no ping in %f seconds, closing connection', 744 self._pingCheckInterval) 745 self._pingCheckDisconnect() 746 else: 747 self._pingCheckDC = reactor.callLater(self._pingCheckInterval, 748 self._pingCheck)
749
750 - def stopPingChecking(self):
751 if self._pingCheckDC: 752 self._pingCheckDC.cancel() 753 self._pingCheckDC = None
754
755 - def setMind(self, mind):
756 # chain up 757 Avatar.setMind(self, mind) 758 759 def stopPingCheckingCb(x): 760 self.debug('stop pinging') 761 self.stopPingChecking()
762 self.mind.notifyOnDisconnect(stopPingCheckingCb) 763 764 # Now we have a remote reference, so start checking pings. 765 def _disconnect(): 766 if self.mind: 767 self.mind.broker.transport.loseConnection()
768 self.startPingChecking(_disconnect) 769