Package flumotion :: Package component :: Module feeder
[hide private]

Source Code for Module flumotion.component.feeder

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import time 
 23   
 24  import gst 
 25   
 26  from twisted.internet import reactor 
 27   
 28  from flumotion.common import componentui 
 29   
 30  __version__ = "$Rev: 7089 $" 
 31   
 32   
33 -class Feeder:
34 """ 35 This class groups feeder-related information as used by a Feed Component. 36 37 @ivar feederName: name of the feeder 38 @ivar uiState: the serializable UI State for this feeder 39 """
40 - def __init__(self, feederName):
41 self.feederName = feederName 42 self.elementName = 'feeder:' + feederName 43 self.payName = self.elementName + '-pay' 44 self.uiState = componentui.WorkerComponentUIState() 45 self.uiState.addKey('feederName') 46 self.uiState.set('feederName', feederName) 47 self.uiState.addListKey('clients') 48 self._fdToClient = {} # fd -> (FeederClient, cleanupfunc) 49 self._clients = {} # id -> FeederClient
50
51 - def __repr__(self):
52 return ('<Feeder %s (%d client(s))>' 53 % (self.feederName, len(self._clients)))
54
55 - def clientConnected(self, clientId, fd, cleanup):
56 """ 57 The given client has connected on the given file descriptor, and is 58 being added to multifdsink. This is called solely from the reactor 59 thread. 60 61 @param clientId: id of the client of the feeder 62 @param fd: file descriptor representing the client 63 @param cleanup: callable to be called when the given fd is removed 64 """ 65 if clientId not in self._clients: 66 # first time we see this client, create an object 67 client = FeederClient(clientId) 68 self._clients[clientId] = client 69 self.uiState.append('clients', client.uiState) 70 71 client = self._clients[clientId] 72 self._fdToClient[fd] = (client, cleanup) 73 74 client.connected(fd) 75 76 return client
77
78 - def clientDisconnected(self, fd):
79 """ 80 The client has been entirely removed from multifdsink, and we may 81 now close its file descriptor. 82 The client object stays around so we can track over multiple 83 connections. 84 85 Called from GStreamer threads. 86 87 @type fd: file descriptor 88 """ 89 (client, cleanup) = self._fdToClient.pop(fd) 90 client.disconnected(fd=fd) 91 92 # To avoid races between this thread (a GStreamer thread) closing the 93 # FD, and the reactor thread reusing this FD, we only actually perform 94 # the close in the reactor thread. 95 reactor.callFromThread(cleanup, fd)
96
97 - def getClients(self):
98 """ 99 @rtype: list of all L{FeederClient}s ever seen, including currently 100 disconnected clients 101 """ 102 return self._clients.values()
103
104 -class FeederClient:
105 """ 106 This class groups information related to the client of a feeder. 107 The client is identified by an id. 108 The information remains valid for the lifetime of the feeder, so it 109 can track reconnects of the client. 110 111 @ivar clientId: id of the client of the feeder 112 @ivar fd: file descriptor the client is currently using, or None. 113 """
114 - def __init__(self, clientId):
115 self.uiState = componentui.WorkerComponentUIState() 116 self.uiState.addKey('client-id', clientId) 117 self.fd = None 118 self.uiState.addKey('fd', None) 119 120 # these values can be set to None, which would mean 121 # Unknown, not supported 122 # these are supported 123 for key in ( 124 'bytes-read-current', # bytes read over current connection 125 'bytes-read-total', # bytes read over all connections 126 'reconnects', # number of connections made by this client 127 'last-connect', # last client connection, in epoch seconds 128 'last-disconnect', # last client disconnect, in epoch seconds 129 'last-activity', # last time client read or connected 130 ): 131 self.uiState.addKey(key, 0) 132 # these are possibly unsupported 133 for key in ( 134 'buffers-dropped-current', # buffers dropped over current connection 135 'buffers-dropped-total', # buffers dropped over all connections 136 ): 137 self.uiState.addKey(key, None) 138 139 # internal state allowing us to track global numbers 140 self._buffersDroppedBefore = 0 141 self._bytesReadBefore = 0
142
143 - def setStats(self, stats):
144 """ 145 @type stats: list 146 """ 147 bytesSent = stats[0] 148 #timeAdded = stats[1] 149 #timeRemoved = stats[2] 150 #timeActive = stats[3] 151 timeLastActivity = float(stats[4]) / gst.SECOND 152 if len(stats) > 5: 153 # added in gst-plugins-base 0.10.11 154 buffersDropped = stats[5] 155 else: 156 # We don't know, but we cannot use None 157 # since that would break integer addition below 158 buffersDropped = 0 159 160 self.uiState.set('bytes-read-current', bytesSent) 161 self.uiState.set('buffers-dropped-current', buffersDropped) 162 self.uiState.set('bytes-read-total', self._bytesReadBefore + bytesSent) 163 self.uiState.set('last-activity', timeLastActivity) 164 if buffersDropped is not None: 165 self.uiState.set('buffers-dropped-total', 166 self._buffersDroppedBefore + buffersDropped)
167
168 - def connected(self, fd, when=None):
169 """ 170 The client has connected on this fd. 171 Update related stats. 172 173 Called only from the reactor thread. 174 """ 175 if not when: 176 when = time.time() 177 178 if self.fd: 179 # It's normal to receive a reconnection before we notice 180 # that an old connection has been closed. Perform the 181 # disconnection logic for the old FD if necessary. See #591. 182 self._updateUIStateForDisconnect(self.fd, when) 183 184 self.fd = fd 185 self.uiState.set('fd', fd) 186 self.uiState.set('last-connect', when) 187 self.uiState.set('reconnects', self.uiState.get('reconnects', 0) + 1)
188
189 - def _updateUIStateForDisconnect(self, fd, when):
190 if self.fd == fd: 191 self.fd = None 192 self.uiState.set('fd', None) 193 self.uiState.set('last-disconnect', when) 194 195 # update our internal counters and reset current counters to 0 196 self._bytesReadBefore += self.uiState.get('bytes-read-current') 197 self.uiState.set('bytes-read-current', 0) 198 if self.uiState.get('buffers-dropped-current') is not None: 199 self._buffersDroppedBefore += self.uiState.get( 200 'buffers-dropped-current') 201 self.uiState.set('buffers-dropped-current', 0)
202
203 - def disconnected(self, when=None, fd=None):
204 """ 205 The client has disconnected. 206 Update related stats. 207 208 Called from GStreamer threads. 209 """ 210 if self.fd != fd: 211 # assume that connected() already called 212 # _updateUIStateForDisconnect for us 213 return 214 215 if not when: 216 when = time.time() 217 218 reactor.callFromThread(self._updateUIStateForDisconnect, fd, 219 when)
220