Package flumotion :: Package component :: Module padmonitor
[hide private]

Source Code for Module flumotion.component.padmonitor

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_feedcomponent010 -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import time 
 23   
 24  import gst 
 25  from twisted.internet import reactor, defer 
 26   
 27  from flumotion.common import log 
 28  from flumotion.common.poller import Poller 
 29   
 30  __version__ = "$Rev: 6967 $" 
 31   
 32   
33 -class PadMonitor(log.Loggable):
34 PAD_MONITOR_PROBE_FREQUENCY = 5.0 35 PAD_MONITOR_TIMEOUT = PAD_MONITOR_PROBE_FREQUENCY * 2.5 36
37 - def __init__(self, pad, name, setActive, setInactive):
38 self._last_data_time = -1 39 self._pad = pad 40 self.name = name 41 self._active = False 42 self._first = True 43 44 self._doSetActive = [] 45 self._doSetInactive = [] 46 self.addWatch(setActive, setInactive) 47 48 # This dict sillyness is because python's dict operations are atomic 49 # w.r.t. the GIL. 50 self._probe_id = {} 51 52 self.check_poller = Poller(self._check_timeout, 53 self.PAD_MONITOR_PROBE_FREQUENCY, 54 immediately=True) 55 56 self.watch_poller = Poller(self._watch_timeout, 57 self.PAD_MONITOR_TIMEOUT)
58
59 - def logMessage(self, message, *args):
60 if self._first: 61 self.debug(message, *args) 62 else: 63 self.log(message, *args)
64
65 - def isActive(self):
66 return self._active
67
68 - def detach(self):
69 self.check_poller.stop() 70 self.watch_poller.stop() 71 72 # implementation closely tied to _check_timeout wrt to GIL 73 # tricks, threadsafety, and getting the probe deferred to 74 # actually return 75 d, probe_id = self._probe_id.pop("id", (None, None)) 76 if probe_id: 77 self._pad.remove_buffer_probe(probe_id) 78 d.callback(None)
79
80 - def _check_timeout(self):
81 def probe_cb(pad, buffer): 82 """ 83 Periodically scheduled buffer probe, that ensures that we're 84 currently actually having dataflow through our eater 85 elements. 86 87 Called from GStreamer threads. 88 89 @param pad: The gst.Pad srcpad for one eater in this 90 component. 91 @param buffer: A gst.Buffer that has arrived on this pad 92 """ 93 self._last_data_time = time.time() 94 95 self.logMessage('buffer probe on %s has timestamp %s', self.name, 96 gst.TIME_ARGS(buffer.timestamp)) 97 98 deferred, probe_id = self._probe_id.pop("id", (None, None)) 99 if probe_id: 100 # This will be None only if detach() has been called. 101 self._pad.remove_buffer_probe(probe_id) 102 103 reactor.callFromThread(deferred.callback, None) 104 # Data received! Return to happy ASAP: 105 reactor.callFromThread(self.watch_poller.run) 106 107 self._first = False 108 109 # let the buffer through 110 return True
111 112 d = defer.Deferred() 113 # FIXME: this is racy: evaluate RHS, drop GIL, buffer probe 114 # fires before __setitem__ in LHS; need a mutex 115 self._probe_id['id'] = (d, self._pad.add_buffer_probe(probe_cb)) 116 return d
117
118 - def _watch_timeout(self):
119 self.log('last buffer for %s at %r', self.name, self._last_data_time) 120 121 now = time.time() 122 123 if self._last_data_time < 0: 124 # We never received any data in the first timeout period... 125 self._last_data_time = 0 126 self.setInactive() 127 elif self._last_data_time == 0: 128 # still no data... 129 pass 130 else: 131 # We received data at some time in the past. 132 delta = now - self._last_data_time 133 134 if self._active and delta > self.PAD_MONITOR_TIMEOUT: 135 self.info("No data received on pad %s for > %r seconds, setting " 136 "to hungry", self.name, self.PAD_MONITOR_TIMEOUT) 137 self.setInactive() 138 elif not self._active and delta < self.PAD_MONITOR_TIMEOUT: 139 self.info("Receiving data again on pad %s, flow active", 140 self.name) 141 self.setActive()
142
143 - def addWatch(self, setActive, setInactive):
144 self._doSetActive.append(setActive) 145 self._doSetInactive.append(setInactive)
146
147 - def setInactive(self):
148 self._active = False 149 for setInactive in self._doSetInactive: 150 setInactive(self.name)
151
152 - def setActive(self):
153 self._active = True 154 for setActive in self._doSetActive: 155 setActive(self.name)
156
157 -class EaterPadMonitor(PadMonitor):
158 - def __init__(self, pad, name, setActive, setInactive, 159 reconnectEater, *args):
160 PadMonitor.__init__(self, pad, name, setActive, setInactive) 161 162 self._reconnectPoller = Poller(lambda: reconnectEater(*args), 163 self.PAD_MONITOR_TIMEOUT, 164 start=False)
165
166 - def setInactive(self):
167 PadMonitor.setInactive(self) 168 169 # If an eater received a buffer before being marked as disconnected, 170 # and still within the buffer check interval, the next eaterCheck 171 # call could accidentally think the eater was reconnected properly. 172 # Setting this to 0 here avoids that happening in eaterCheck. 173 self._last_data_time = 0 174 175 self._reconnectPoller.start(immediately=True)
176
177 - def setActive(self):
178 PadMonitor.setActive(self) 179 self._reconnectPoller.stop()
180
181 - def detach(self):
182 PadMonitor.detach(self) 183 self._reconnectPoller.stop()
184 185
186 -class PadMonitorSet(dict, log.Loggable):
187 - def __init__(self, setActive, setInactive):
188 # These callbacks will be called when the set as a whole is 189 # active or inactive. 190 self._doSetActive = setActive 191 self._doSetInactive = setInactive 192 self._wasActive = True
193
194 - def attach(self, pad, name, klass=PadMonitor, *args):
195 """ 196 Watch for data flow through this pad periodically. 197 If data flow ceases for too long, we turn hungry. If data flow resumes, 198 we return to happy. 199 """ 200 def monitorActive(name): 201 self.info('Pad data flow at %s is active', name) 202 if self.isActive() and not self._wasActive: 203 # The wasActive check is to prevent _doSetActive from being 204 # called happy initially because of this; only if we 205 # previously went inactive because of an inactive monitor. A 206 # curious interface. 207 self._wasActive = True 208 self._doSetActive()
209 210 def monitorInactive(name): 211 self.info('Pad data flow at %s is inactive', name) 212 if self._wasActive: 213 self._doSetInactive() 214 self._wasActive = False
215 216 assert name not in self 217 monitor = klass(pad, name, monitorActive, monitorInactive, *args) 218 self[monitor.name] = monitor 219 self.info("Added pad monitor %s", monitor.name) 220
221 - def remove(self, name):
222 if name not in self: 223 self.warning("No pad monitor with name %s", name) 224 return 225 226 monitor = self.pop(name) 227 monitor.detach()
228
229 - def isActive(self):
230 for monitor in self.values(): 231 if not monitor.isActive(): 232 return False 233 return True
234