Package flumotion :: Package component :: Module eater
[hide private]

Source Code for Module flumotion.component.eater

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import time 
 23   
 24  from twisted.internet import reactor 
 25   
 26  from flumotion.common import componentui 
 27   
 28  __version__ = "$Rev: 7089 $" 
 29   
 30   
31 -class Eater:
32 """ 33 This class groups eater-related information as used by a Feed Component. 34 35 @ivar eaterAlias: the alias of this eater (e.g. "default", "video", 36 ...) 37 @ivar feedId: id of the feed this is eating from 38 @ivar uiState: the serializable UI State for this eater 39 """
40 - def __init__(self, eaterAlias, eaterName):
41 self.eaterAlias = eaterAlias 42 self.eaterName = eaterName 43 self.feedId = None 44 self.fd = None 45 self.elementName = 'eater:' + eaterAlias 46 self.depayName = self.elementName + '-depay' 47 self.setPadMonitor(None) 48 self.uiState = componentui.WorkerComponentUIState() 49 self.uiState.addKey('eater-alias') 50 self.uiState.set('eater-alias', eaterAlias) 51 self.uiState.addKey('eater-name') 52 self.uiState.set('eater-name', eaterName) 53 # dict for the current connection 54 connectionDict = { 55 "feed-id": None, 56 "time-timestamp-discont": None, 57 "timestamp-timestamp-discont": 0.0, # ts of buffer after discont, 58 # in float seconds 59 "last-timestamp-discont": 0.0, 60 "total-timestamp-discont": 0.0, 61 "count-timestamp-discont": 0, 62 "time-offset-discont": None, 63 "offset-offset-discont": 0, # offset of buffer 64 # after discont 65 "last-offset-discont": 0, 66 "total-offset-discont": 0, 67 "count-offset-discont": 0, 68 69 } 70 self.uiState.addDictKey('connection', connectionDict) 71 72 for key in ( 73 'last-connect', # last client connection, in epoch seconds 74 'last-disconnect', # last client disconnect, in epoch seconds 75 'total-connections', # number of connections by this client 76 'count-timestamp-discont', # number of timestamp disconts seen 77 'count-offset-discont', # number of timestamp disconts seen 78 ): 79 self.uiState.addKey(key, 0) 80 for key in ( 81 'total-timestamp-discont', # total timestamp discontinuity 82 'total-offset-discont', # total offset discontinuity 83 ): 84 self.uiState.addKey(key, 0.0) 85 self.uiState.addKey('fd', None)
86
87 - def __repr__(self):
88 return '<Eater %s %s>' % (self.eaterAlias, 89 (self.feedId and '(disconnected)' 90 or ('eating from %s' % self.feedId)))
91
92 - def connected(self, fd, feedId, when=None):
93 """ 94 The eater has been connected. 95 Update related stats. 96 """ 97 if not when: 98 when = time.time() 99 100 self.feedId = feedId 101 self.fd = fd 102 103 self.uiState.set('last-connect', when) 104 self.uiState.set('fd', fd) 105 self.uiState.set('total-connections', 106 self.uiState.get('total-connections', 0) + 1) 107 108 self.uiState.setitem("connection", 'feed-id', feedId) 109 self.uiState.setitem("connection", "count-timestamp-discont", 0) 110 self.uiState.setitem("connection", "time-timestamp-discont", None) 111 self.uiState.setitem("connection", "last-timestamp-discont", 0.0) 112 self.uiState.setitem("connection", "total-timestamp-discont", 0.0) 113 self.uiState.setitem("connection", "count-offset-discont", 0) 114 self.uiState.setitem("connection", "time-offset-discont", None) 115 self.uiState.setitem("connection", "last-offset-discont", 0) 116 self.uiState.setitem("connection", "total-offset-discont", 0)
117
118 - def disconnected(self, when=None):
119 """ 120 The eater has been disconnected. 121 Update related stats. 122 """ 123 if not when: 124 when = time.time() 125 126 def updateUIState(): 127 self.uiState.set('last-disconnect', when) 128 self.fd = None 129 self.uiState.set('fd', None)
130 131 reactor.callFromThread(updateUIState)
132
133 - def setPadMonitor(self, monitor):
134 self._padMonitor = monitor
135
136 - def isActive(self):
137 return self._padMonitor and self._padMonitor.isActive()
138
139 - def addWatch(self, setActive, setInactive):
140 self._padMonitor.addWatch(lambda _: setActive(self.eaterAlias), 141 lambda _: setInactive(self.eaterAlias))
142
143 - def timestampDiscont(self, seconds, timestamp):
144 """ 145 @param seconds: discont duration in seconds 146 @param timestamp: GStreamer timestamp of new buffer, in seconds. 147 148 Inform the eater of a timestamp discontinuity. 149 This is called from a bus message handler, so in the main thread. 150 """ 151 uiState = self.uiState 152 153 c = uiState.get('connection') # dict 154 uiState.setitem('connection', 'count-timestamp-discont', 155 c.get('count-timestamp-discont', 0) + 1) 156 uiState.set('count-timestamp-discont', 157 uiState.get('count-timestamp-discont', 0) + 1) 158 159 uiState.setitem('connection', 'time-timestamp-discont', time.time()) 160 uiState.setitem('connection', 'timestamp-timestamp-discont', timestamp) 161 uiState.setitem('connection', 'last-timestamp-discont', seconds) 162 uiState.setitem('connection', 'total-timestamp-discont', 163 c.get('total-timestamp-discont', 0) + seconds) 164 uiState.set('total-timestamp-discont', 165 uiState.get('total-timestamp-discont', 0) + seconds)
166
167 - def offsetDiscont(self, units, offset):
168 """ 169 Inform the eater of an offset discontinuity. 170 This is called from a bus message handler, so in the main thread. 171 """ 172 uiState = self.uiState 173 174 c = uiState.get('connection') # dict 175 uiState.setitem('connection', 'count-offset-discont', 176 c.get('count-offset-discont', 0) + 1) 177 uiState.set('count-offset-discont', 178 uiState.get('count-offset-discont', 0) + 1) 179 180 uiState.setitem('connection', 'time-offset-discont', time.time()) 181 uiState.setitem('connection', 'offset-offset-discont', offset) 182 uiState.setitem('connection', 'last-offset-discont', units) 183 uiState.setitem('connection', 'total-offset-discont', 184 c.get('total-offset-discont', 0) + units) 185 uiState.set('total-offset-discont', 186 uiState.get('total-offset-discont', 0) + units)
187