Package flumotion :: Package component :: Package misc :: Package porter :: Module porterclient
[hide private]

Source Code for Module flumotion.component.misc.porter.porterclient

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  from twisted.internet.protocol import Protocol, Factory 
 23  from twisted.internet.tcp import Port, Connection 
 24  from twisted.internet import reactor, address 
 25  from twisted.cred import credentials 
 26   
 27  from flumotion.common import medium, log 
 28  from flumotion.twisted import defer, fdserver 
 29  from flumotion.twisted import pb as fpb 
 30   
 31  import socket 
 32   
 33  __version__ = "$Rev: 6125 $" 
 34   
 35   
 36  # Very similar to tcp.Server, but we need to call things in a different order 
37 -class FDPorterServer(Connection):
38 """ 39 A connection class for use with passed FDs. 40 Similar to tcp.Server, but gets the initial FD from a different source, 41 obviously, and also passes along some data with the original connection. 42 """
43 - def __init__(self, sock, protocol, addr, additionalData):
44 Connection.__init__(self, sock, protocol) 45 self.client = addr 46 47 # Inform the protocol we've made a connection. 48 protocol.makeConnection(self) 49 50 # Now, we want to feed in the extra data BEFORE the reactor reads 51 # anything additional from the socket. However, if we call this in 52 # the other order, and the socket gets closed (or passed to something 53 # non-twisted) after just the initial chunk, we'll be calling 54 # startReading() on something we've already stopped reading. That won't 55 # work too well... Fortunately, the reactor runs in this thread, so 56 # merely adding it (with startReading()) can't cause a read to happen 57 # immediately. 58 self.startReading() 59 self.connected = 1 60 61 protocol.dataReceived(additionalData)
62
63 - def getHost(self):
64 return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',)))
65
66 - def getPeer(self):
67 return address.IPv4Address('TCP', *(self.client + ('INET',)))
68
69 -class PorterMedium(medium.BaseMedium):
70 """ 71 A medium we use to talk to the porter. 72 Mostly, we use this to say what mountpoints (or perhaps, later, 73 (hostname, mountpoint) pairs?) we expect to receive requests for. 74 """
75 - def registerPath(self, path):
76 return self.callRemote("registerPath", path)
77
78 - def deregisterPath(self, path):
79 return self.callRemote("deregisterPath", path)
80
81 - def registerPrefix(self, prefix):
82 return self.callRemote("registerPrefix", prefix)
83
84 - def deregisterPrefix(self, prefix):
85 return self.callRemote("deregisterPrefix", prefix)
86
87 -class PorterClientFactory(fpb.ReconnectingPBClientFactory):
88 """ 89 A PB client factory that knows how to log into a Porter. 90 Lives in streaming components, and accepts FDs passed over this connection. 91 """ 92
93 - def __init__(self, childFactory):
94 """ 95 Create a PorterClientFactory that will use childFactory to create 96 protocol instances for clients attached to the FDs received over this 97 connection. 98 """ 99 fpb.ReconnectingPBClientFactory.__init__(self) 100 101 self.medium = PorterMedium() 102 103 self.protocol = fdserver.FDPassingBroker 104 self._childFactory = childFactory
105
106 - def buildProtocol(self, addr):
107 p = self.protocol(self._childFactory, FDPorterServer) 108 p.factory = self 109 return p
110
111 - def registerPath(self, path):
112 return self.medium.registerPath(path)
113
114 - def deregisterPath(self, path):
115 return self.medium.deregisterPath(path)
116
117 - def registerPrefix(self, prefix):
118 return self.medium.registerPrefix(prefix)
119
120 - def deregisterPrefix(self, prefix):
121 return self.medium.deregisterPrefix(prefix)
122
123 - def registerDefault(self):
124 return self.medium.registerPrefix("/")
125
126 - def deregisterDefault(self):
127 return self.medium.deregisterPrefix("/")
128
129 -class HTTPPorterClientFactory(PorterClientFactory):
130 - def __init__(self, childFactory, mountPoints, do_start_deferred, 131 prefixes=None):
132 """ 133 @param mountPoints: a list of mountPoint strings that should be 134 registered to the porter 135 """ 136 PorterClientFactory.__init__(self, childFactory) 137 self._mountPoints = mountPoints 138 self._prefixes = prefixes or [] 139 self._do_start_deferred = do_start_deferred
140
141 - def _fireDeferred(self, r):
142 # If we still have the deferred, fire it (this happens after we've 143 # completed log in the _first_ time, not subsequent times) 144 if self._do_start_deferred: 145 self.debug("Firing initial deferred: should indicate that login is " 146 "complete") 147 self._do_start_deferred.callback(None) 148 self._do_start_deferred = None
149
150 - def gotDeferredLogin(self, deferred):
151 # This is called when we start logging in to give us the deferred for 152 # the login process. Once we're logged in, we want to set our 153 # remote ref, then register our path with the porter, then (possibly) 154 # fire a different deferred 155 self.debug("Got deferred login, adding callbacks") 156 deferred.addCallback(self.medium.setRemoteReference) 157 for mount in self._mountPoints: 158 self.debug("Registering mount point %s with porter", mount) 159 deferred.addCallback(lambda r,m: self.registerPath(m), 160 mount) 161 for mount in self._prefixes: 162 self.debug("Registering mount prefix %s with porter", mount) 163 deferred.addCallback(lambda r,m: self.registerPrefix(m), 164 mount) 165 deferred.addCallback(self._fireDeferred)
166