1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import time
23
24 import gst
25
26 from twisted.internet import reactor
27
28 from flumotion.common import componentui
29
30 __version__ = "$Rev: 7089 $"
31
32
34 """
35 This class groups feeder-related information as used by a Feed Component.
36
37 @ivar feederName: name of the feeder
38 @ivar uiState: the serializable UI State for this feeder
39 """
41 self.feederName = feederName
42 self.elementName = 'feeder:' + feederName
43 self.payName = self.elementName + '-pay'
44 self.uiState = componentui.WorkerComponentUIState()
45 self.uiState.addKey('feederName')
46 self.uiState.set('feederName', feederName)
47 self.uiState.addListKey('clients')
48 self._fdToClient = {}
49 self._clients = {}
50
52 return ('<Feeder %s (%d client(s))>'
53 % (self.feederName, len(self._clients)))
54
56 """
57 The given client has connected on the given file descriptor, and is
58 being added to multifdsink. This is called solely from the reactor
59 thread.
60
61 @param clientId: id of the client of the feeder
62 @param fd: file descriptor representing the client
63 @param cleanup: callable to be called when the given fd is removed
64 """
65 if clientId not in self._clients:
66
67 client = FeederClient(clientId)
68 self._clients[clientId] = client
69 self.uiState.append('clients', client.uiState)
70
71 client = self._clients[clientId]
72 self._fdToClient[fd] = (client, cleanup)
73
74 client.connected(fd)
75
76 return client
77
79 """
80 The client has been entirely removed from multifdsink, and we may
81 now close its file descriptor.
82 The client object stays around so we can track over multiple
83 connections.
84
85 Called from GStreamer threads.
86
87 @type fd: file descriptor
88 """
89 (client, cleanup) = self._fdToClient.pop(fd)
90 client.disconnected(fd=fd)
91
92
93
94
95 reactor.callFromThread(cleanup, fd)
96
98 """
99 @rtype: list of all L{FeederClient}s ever seen, including currently
100 disconnected clients
101 """
102 return self._clients.values()
103
105 """
106 This class groups information related to the client of a feeder.
107 The client is identified by an id.
108 The information remains valid for the lifetime of the feeder, so it
109 can track reconnects of the client.
110
111 @ivar clientId: id of the client of the feeder
112 @ivar fd: file descriptor the client is currently using, or None.
113 """
115 self.uiState = componentui.WorkerComponentUIState()
116 self.uiState.addKey('client-id', clientId)
117 self.fd = None
118 self.uiState.addKey('fd', None)
119
120
121
122
123 for key in (
124 'bytes-read-current',
125 'bytes-read-total',
126 'reconnects',
127 'last-connect',
128 'last-disconnect',
129 'last-activity',
130 ):
131 self.uiState.addKey(key, 0)
132
133 for key in (
134 'buffers-dropped-current',
135 'buffers-dropped-total',
136 ):
137 self.uiState.addKey(key, None)
138
139
140 self._buffersDroppedBefore = 0
141 self._bytesReadBefore = 0
142
144 """
145 @type stats: list
146 """
147 bytesSent = stats[0]
148
149
150
151 timeLastActivity = float(stats[4]) / gst.SECOND
152 if len(stats) > 5:
153
154 buffersDropped = stats[5]
155 else:
156
157
158 buffersDropped = 0
159
160 self.uiState.set('bytes-read-current', bytesSent)
161 self.uiState.set('buffers-dropped-current', buffersDropped)
162 self.uiState.set('bytes-read-total', self._bytesReadBefore + bytesSent)
163 self.uiState.set('last-activity', timeLastActivity)
164 if buffersDropped is not None:
165 self.uiState.set('buffers-dropped-total',
166 self._buffersDroppedBefore + buffersDropped)
167
169 """
170 The client has connected on this fd.
171 Update related stats.
172
173 Called only from the reactor thread.
174 """
175 if not when:
176 when = time.time()
177
178 if self.fd:
179
180
181
182 self._updateUIStateForDisconnect(self.fd, when)
183
184 self.fd = fd
185 self.uiState.set('fd', fd)
186 self.uiState.set('last-connect', when)
187 self.uiState.set('reconnects', self.uiState.get('reconnects', 0) + 1)
188
190 if self.fd == fd:
191 self.fd = None
192 self.uiState.set('fd', None)
193 self.uiState.set('last-disconnect', when)
194
195
196 self._bytesReadBefore += self.uiState.get('bytes-read-current')
197 self.uiState.set('bytes-read-current', 0)
198 if self.uiState.get('buffers-dropped-current') is not None:
199 self._buffersDroppedBefore += self.uiState.get(
200 'buffers-dropped-current')
201 self.uiState.set('buffers-dropped-current', 0)
202
204 """
205 The client has disconnected.
206 Update related stats.
207
208 Called from GStreamer threads.
209 """
210 if self.fd != fd:
211
212
213 return
214
215 if not when:
216 when = time.time()
217
218 reactor.callFromThread(self._updateUIStateForDisconnect, fd,
219 when)
220