1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 implementation of a PB Client to interface with feedserver.py
24 """
25
26 import socket
27 import os
28
29 from twisted.internet import reactor, main, defer, tcp
30 from twisted.python import failure
31 from zope.interface import implements
32
33 from flumotion.common import log, common, interfaces
34 from flumotion.twisted import pb as fpb
35
36 __version__ = "$Rev: 6561 $"
37
38
39
55
58
67
79
80
82 """
83 I am a client for a Feed Server.
84
85 I am used as the remote interface between a component and another
86 component.
87
88 @ivar component: the component this is a feed client for
89 @type component: L{flumotion.component.feedcomponent.FeedComponent}
90 @ivar remote: a reference to a
91 L{flumotion.worker.feedserver.FeedAvatar}
92 @type remote: L{twisted.spread.pb.RemoteReference}
93 """
94 logCategory = 'feedmedium'
95 remoteLogName = 'feedserver'
96 implements(interfaces.IFeedMedium)
97
98 remote = None
99
106
107 - def startConnecting(self, host, port, authenticator, timeout=30,
108 bindAddress=None):
109 """Optional helper method to connect to a remote feed server.
110
111 This method starts a client factory connecting via a
112 L{PassableClientConnector}. It offers the possibility of
113 cancelling an in-progress connection via the stopConnecting()
114 method.
115
116 @param host: the remote host name
117 @type host: str
118 @param port: the tcp port on which to connect
119 @param port int
120 @param authenticator: the authenticator, normally provided by
121 the worker
122 @type authenticator: L{flumotion.twisted.pb.Authenticator}
123
124 @returns: a deferred that will fire with the remote reference,
125 once we have authenticated.
126 """
127 assert self._factory is None
128 self._factory = FeedClientFactory(self)
129 reactor.connectWith(PassableClientConnector, host, port,
130 self._factory, timeout, bindAddress)
131 return self._factory.login(authenticator)
132
133 - def requestFeed(self, host, port, authenticator, fullFeedId):
134 """Request a feed from a remote feed server.
135
136 This helper method calls startConnecting() to make the
137 connection and authenticate, and will return the feed file
138 descriptor or an error. A pending connection attempt can be
139 cancelled via stopConnecting().
140
141 @param host: the remote host name
142 @type host: str
143 @param port: the tcp port on which to connect
144 @type port: int
145 @param authenticator: the authenticator, normally provided by
146 the worker
147 @type authenticator: L{flumotion.twisted.pb.Authenticator}
148 @param fullFeedId: the full feed id (/flow/component:feed)
149 offered by the remote side
150 @type fullFeedId: str
151
152 @returns: a deferred that, if successful, will fire with a pair
153 (feedId, fd). In an error case it will errback and close the
154 remote connection.
155 """
156 def connected(remote):
157 self.setRemoteReference(remote)
158 return remote.callRemote('sendFeed', fullFeedId)
159
160 def feedSent(res):
161
162
163
164
165 return self._feedToDeferred
166
167 def error(failure):
168 self.warning('failed to retrieve %s from %s:%d', fullFeedId,
169 host, port)
170 self.debug('failure: %s', log.getFailureMessage(failure))
171 self.debug('closing connection')
172 self.stopConnecting()
173 return failure
174
175 d = self.startConnecting(host, port, authenticator)
176 d.addCallback(connected)
177 d.addCallback(feedSent)
178 d.addErrback(error)
179 return d
180
181 - def sendFeed(self, host, port, authenticator, fullFeedId):
182 """Send a feed to a remote feed server.
183
184 This helper method calls startConnecting() to make the
185 connection and authenticate, and will return the feed file
186 descriptor or an error. A pending connection attempt can be
187 cancelled via stopConnecting().
188
189 @param host: the remote host name
190 @type host: str
191 @param port: the tcp port on which to connect
192 @type port: int
193 @param authenticator: the authenticator, normally provided by
194 the worker
195 @type authenticator: L{flumotion.twisted.pb.Authenticator}
196 @param fullFeedId: the full feed id (/flow/component:eaterAlias)
197 to feed to on the remote size
198 @type fullFeedId: str
199
200 @returns: a deferred that, if successful, will fire with a pair
201 (feedId, fd). In an error case it will errback and close the
202 remote connection.
203 """
204 def connected(remote):
205 assert isinstance(remote.broker.transport, _SocketMaybeCloser)
206 self.setRemoteReference(remote)
207 return remote.callRemote('receiveFeed', fullFeedId)
208
209 def feedSent(res):
210 t = self.remote.broker.transport
211 self.debug('stop reading from transport')
212 t.stopReading()
213
214 self.debug('flushing PB write queue')
215 t.doWrite()
216 self.debug('stop writing to transport')
217 t.stopWriting()
218
219 t.keepSocketAlive = True
220 fd = os.dup(t.fileno())
221
222
223 self.setRemoteReference(None)
224
225 d = defer.Deferred()
226 def loseConnection():
227 t.connectionLost(failure.Failure(main.CONNECTION_DONE))
228 d.callback((fullFeedId, fd))
229
230 reactor.callLater(0, loseConnection)
231 return d
232
233 def error(failure):
234 self.warning('failed to retrieve %s from %s:%d', fullFeedId,
235 host, port)
236 self.debug('failure: %s', log.getFailureMessage(failure))
237 self.debug('closing connection')
238 self.stopConnecting()
239 return failure
240
241 d = self.startConnecting(host, port, authenticator)
242 d.addCallback(connected)
243 d.addCallback(feedSent)
244 d.addErrback(error)
245 return d
246
248 """Stop a pending or established connection made via
249 startConnecting().
250
251 Stops any established or pending connection to a remote feed
252 server started via the startConnecting() method. Safe to call
253 even if connection has not been started.
254 """
255 if self._factory:
256 self._factory.disconnect()
257 self._factory = None
258
259
260 self.setRemoteReference(None)
261
262
264 self.remote = remoteReference
265
267 return self.remote is not None
268
271
273 t = self.remote.broker.transport
274
275 self.debug('stop reading from transport')
276 t.stopReading()
277 reactor.callLater(0, self._doFeedTo, fullFeedId, t)
278
304