Package flumotion :: Package component :: Package misc :: Package httpserver :: Module ratecontrol
[hide private]

Source Code for Module flumotion.component.misc.httpserver.ratecontrol

  1  # -*- Mode: Python; test-case-name: -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  __version__ = "$Rev: 6699 $" 
 23   
 24  import time 
 25   
 26  from flumotion.common import log 
 27   
 28  from twisted.internet import reactor 
 29   
 30  from flumotion.component.plugs import base as plugbase 
 31   
32 -class RateController(plugbase.ComponentPlug):
33 34 # Create a producer-consumer proxy that sits between a FileTransfer object 35 # and a request object. 36 # You may return a Deferred here.
37 - def createProducerConsumerProxy(self, consumer, request):
38 pass
39 40
41 -class FixedRatePlug(RateController):
42
43 - def __init__(self, args):
44 props = args['properties'] 45 self._rateBytesPerSec = int(props.get('rate', 128000) / 8) 46 # Peak level is 10 seconds of data; this is chosen entirely arbitrarily. 47 self._maxLevel = int(props.get('max-level', 48 self._rateBytesPerSec * 8 * 10) / 8) 49 self._initialLevel = int(props.get('initial-level', 0) / 8)
50
51 - def createProducerConsumerProxy(self, consumer, request):
52 return TokenBucketConsumer(consumer, self._maxLevel, 53 self._rateBytesPerSec, self._initialLevel)
54
55 -class TokenBucketConsumer(log.Loggable):
56 """ 57 Use a token bucket to proxy between a producer (e.g. FileTransfer) and a 58 consumer (TCP protocol, etc.), doing rate control. 59 60 The bucket has a rate and a maximum level, so a small burst can be 61 permitted. The initial level can be set to a non-zero value, this is 62 useful to implement burst-on-connect behaviour. 63 64 TODO: This almost certainly only works with producers that work like 65 FileTransfer - i.e. they produce data directly in resumeProducing, and 66 ignore pauseProducing. This is sufficient for our needs right now. 67 """ 68 69 logCategory = 'token-bucket' 70 71 # NOTE: Performance is strongly correlated with this value. 72 # Low values (e.g. 0.2) give a 'smooth' transfer, but very high cpu usage 73 # if you have several hundred clients. 74 # Higher values (e.g. 1.0 or more) give bursty transfer, but nicely lower 75 # cpu usage. 76 _dripInterval = 1.0 # If we need to wait for more bits in our bucket, wait 77 # at least this long, to avoid overly frequent small 78 # writes 79
80 - def __init__(self, consumer, maxLevel, fillRate, fillLevel=0):
81 self.maxLevel = maxLevel # in bytes 82 self.fillRate = fillRate # in bytes per second 83 self.fillLevel = fillLevel # in bytes 84 85 self._buffers = [] # List of (offset, buffer) tuples 86 self._buffersSize = 0 87 88 self._finishing = False # If true, we'll stop once the current buffer 89 # has been sent. 90 91 self._unregister = False # If true, we'll unregister from the consumer 92 # once the data has been sent. 93 94 self._lastDrip = time.time() 95 self._dripDC = None 96 97 self.producer = None # we get this in registerProducer. 98 self.consumer = consumer 99 100 self.consumer.registerProducer(self, 0) 101 102 self.info("Created TokenBucketConsumer with rate %d, initial level %d, " 103 "maximum level %d", fillRate, fillLevel, maxLevel)
104
105 - def _dripAndTryWrite(self):
106 """ 107 Re-fill our token bucket based on how long it has been since we last 108 refilled it. 109 Then attempt to write some data. 110 """ 111 self._dripDC = None 112 113 now = time.time() 114 elapsed = now - self._lastDrip 115 self._lastDrip = now 116 117 bytes = self.fillRate * elapsed 118 # Note that this does introduce rounding errors - not particularly 119 # important if the drip interval is reasonably high, though. These will 120 # cause the actual rate to be lower than the nominal rate. 121 self.fillLevel = int(min(self.fillLevel + bytes, self.maxLevel)) 122 123 self._tryWrite()
124
125 - def _tryWrite(self):
126 if not self.consumer: 127 return 128 129 while self.fillLevel > 0 and self._buffersSize > 0: 130 # If we're permitted to write at the moment, do so. 131 offset, buf = self._buffers[0] 132 sendbuf = buf[offset:offset+self.fillLevel] 133 bytes = len(sendbuf) 134 135 if bytes + offset == len(buf): 136 self._buffers.pop(0) 137 else: 138 self._buffers[0] = (offset+bytes, buf) 139 self._buffersSize -= bytes 140 141 self.consumer.write(sendbuf) 142 self.fillLevel -= bytes 143 144 if self._buffersSize > 0: 145 # If we have data (and we're not already waiting for our next drip 146 # interval), wait... this is what actually performs the data 147 # throttling. 148 if not self._dripDC: 149 self._dripDC = reactor.callLater(self._dripInterval, 150 self._dripAndTryWrite) 151 else: 152 # No buffer remaining; ask for more data or finish 153 if self._finishing: 154 if self._unregister: 155 self._doUnregister() 156 self._doFinish() 157 elif self.producer: 158 self.producer.resumeProducing() 159 elif self._unregister: 160 self._doUnregister()
161
162 - def _doUnregister(self):
163 self.consumer.unregisterProducer() 164 self._unregister = False
165
166 - def _doFinish(self):
167 self.debug('consumer <- finish()') 168 self.consumer.finish() 169 self._finishing = False
170
171 - def stopProducing(self):
172 self.debug('stopProducing; buffered data: %d', self._buffersSize) 173 if self.producer is not None: 174 self.producer.stopProducing() 175 176 if self._dripDC: 177 # don't produce after stopProducing()! 178 self._dripDC.cancel() 179 self._dripDC = None 180 181 # ...and then, we still may have pending things to do 182 if self._unregister: 183 self._doUnregister() 184 185 if self._finishing: 186 self._finishing = False 187 self.consumer.finish() 188 189 if self._buffersSize > 0: 190 # make sure we release all the buffers, just in case 191 self._buffers = [] 192 self._buffersSize = 0 193 194 self.consumer = None
195
196 - def pauseProducing(self):
197 pass
198
199 - def resumeProducing(self):
200 self._tryWrite() 201 202 if not self._buffers and self.producer: 203 self.producer.resumeProducing()
204
205 - def write(self, data):
206 self._buffers.append((0, data)) 207 self._buffersSize += len(data) 208 209 self._tryWrite() 210 211 if self._buffers and not self.fillLevel and self.producer: 212 self.producer.pauseProducing()
213
214 - def finish(self):
215 if self._dripDC: 216 self._finishing = True 217 elif self.consumer: 218 self._doFinish()
219
220 - def registerProducer(self, producer, streaming):
221 self.debug("Producer registered: %r", producer) 222 self.producer = producer 223 224 self.resumeProducing()
225
226 - def unregisterProducer(self):
227 self.debug('unregisterProducer; buffered data: %d', self._buffersSize) 228 if self.producer is not None: 229 self.producer = None 230 231 if not self._dripDC: 232 self._doUnregister() 233 else: 234 # we need to wait until we've written the data 235 self._unregister = True
236