1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import time
23
24 import gst
25 from twisted.internet import reactor, defer
26
27 from flumotion.common import log
28 from flumotion.common.poller import Poller
29
30 __version__ = "$Rev: 6967 $"
31
32
34 PAD_MONITOR_PROBE_FREQUENCY = 5.0
35 PAD_MONITOR_TIMEOUT = PAD_MONITOR_PROBE_FREQUENCY * 2.5
36
37 - def __init__(self, pad, name, setActive, setInactive):
58
64
67
69 self.check_poller.stop()
70 self.watch_poller.stop()
71
72
73
74
75 d, probe_id = self._probe_id.pop("id", (None, None))
76 if probe_id:
77 self._pad.remove_buffer_probe(probe_id)
78 d.callback(None)
79
81 def probe_cb(pad, buffer):
82 """
83 Periodically scheduled buffer probe, that ensures that we're
84 currently actually having dataflow through our eater
85 elements.
86
87 Called from GStreamer threads.
88
89 @param pad: The gst.Pad srcpad for one eater in this
90 component.
91 @param buffer: A gst.Buffer that has arrived on this pad
92 """
93 self._last_data_time = time.time()
94
95 self.logMessage('buffer probe on %s has timestamp %s', self.name,
96 gst.TIME_ARGS(buffer.timestamp))
97
98 deferred, probe_id = self._probe_id.pop("id", (None, None))
99 if probe_id:
100
101 self._pad.remove_buffer_probe(probe_id)
102
103 reactor.callFromThread(deferred.callback, None)
104
105 reactor.callFromThread(self.watch_poller.run)
106
107 self._first = False
108
109
110 return True
111
112 d = defer.Deferred()
113
114
115 self._probe_id['id'] = (d, self._pad.add_buffer_probe(probe_cb))
116 return d
117
119 self.log('last buffer for %s at %r', self.name, self._last_data_time)
120
121 now = time.time()
122
123 if self._last_data_time < 0:
124
125 self._last_data_time = 0
126 self.setInactive()
127 elif self._last_data_time == 0:
128
129 pass
130 else:
131
132 delta = now - self._last_data_time
133
134 if self._active and delta > self.PAD_MONITOR_TIMEOUT:
135 self.info("No data received on pad %s for > %r seconds, setting "
136 "to hungry", self.name, self.PAD_MONITOR_TIMEOUT)
137 self.setInactive()
138 elif not self._active and delta < self.PAD_MONITOR_TIMEOUT:
139 self.info("Receiving data again on pad %s, flow active",
140 self.name)
141 self.setActive()
142
143 - def addWatch(self, setActive, setInactive):
146
151
156
158 - def __init__(self, pad, name, setActive, setInactive,
159 reconnectEater, *args):
165
167 PadMonitor.setInactive(self)
168
169
170
171
172
173 self._last_data_time = 0
174
175 self._reconnectPoller.start(immediately=True)
176
180
184
185
187 - def __init__(self, setActive, setInactive):
188
189
190 self._doSetActive = setActive
191 self._doSetInactive = setInactive
192 self._wasActive = True
193
195 """
196 Watch for data flow through this pad periodically.
197 If data flow ceases for too long, we turn hungry. If data flow resumes,
198 we return to happy.
199 """
200 def monitorActive(name):
201 self.info('Pad data flow at %s is active', name)
202 if self.isActive() and not self._wasActive:
203
204
205
206
207 self._wasActive = True
208 self._doSetActive()
209
210 def monitorInactive(name):
211 self.info('Pad data flow at %s is inactive', name)
212 if self._wasActive:
213 self._doSetInactive()
214 self._wasActive = False
215
216 assert name not in self
217 monitor = klass(pad, name, monitorActive, monitorInactive, *args)
218 self[monitor.name] = monitor
219 self.info("Added pad monitor %s", monitor.name)
220
222 if name not in self:
223 self.warning("No pad monitor with name %s", name)
224 return
225
226 monitor = self.pop(name)
227 monitor.detach()
228
230 for monitor in self.values():
231 if not monitor.isActive():
232 return False
233 return True
234