1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 __version__ = "$Rev: 6699 $"
23
24 import time
25
26 from flumotion.common import log
27
28 from twisted.internet import reactor
29
30 from flumotion.component.plugs import base as plugbase
31
39
40
42
44 props = args['properties']
45 self._rateBytesPerSec = int(props.get('rate', 128000) / 8)
46
47 self._maxLevel = int(props.get('max-level',
48 self._rateBytesPerSec * 8 * 10) / 8)
49 self._initialLevel = int(props.get('initial-level', 0) / 8)
50
54
56 """
57 Use a token bucket to proxy between a producer (e.g. FileTransfer) and a
58 consumer (TCP protocol, etc.), doing rate control.
59
60 The bucket has a rate and a maximum level, so a small burst can be
61 permitted. The initial level can be set to a non-zero value, this is
62 useful to implement burst-on-connect behaviour.
63
64 TODO: This almost certainly only works with producers that work like
65 FileTransfer - i.e. they produce data directly in resumeProducing, and
66 ignore pauseProducing. This is sufficient for our needs right now.
67 """
68
69 logCategory = 'token-bucket'
70
71
72
73
74
75
76 _dripInterval = 1.0
77
78
79
80 - def __init__(self, consumer, maxLevel, fillRate, fillLevel=0):
81 self.maxLevel = maxLevel
82 self.fillRate = fillRate
83 self.fillLevel = fillLevel
84
85 self._buffers = []
86 self._buffersSize = 0
87
88 self._finishing = False
89
90
91 self._unregister = False
92
93
94 self._lastDrip = time.time()
95 self._dripDC = None
96
97 self.producer = None
98 self.consumer = consumer
99
100 self.consumer.registerProducer(self, 0)
101
102 self.info("Created TokenBucketConsumer with rate %d, initial level %d, "
103 "maximum level %d", fillRate, fillLevel, maxLevel)
104
106 """
107 Re-fill our token bucket based on how long it has been since we last
108 refilled it.
109 Then attempt to write some data.
110 """
111 self._dripDC = None
112
113 now = time.time()
114 elapsed = now - self._lastDrip
115 self._lastDrip = now
116
117 bytes = self.fillRate * elapsed
118
119
120
121 self.fillLevel = int(min(self.fillLevel + bytes, self.maxLevel))
122
123 self._tryWrite()
124
126 if not self.consumer:
127 return
128
129 while self.fillLevel > 0 and self._buffersSize > 0:
130
131 offset, buf = self._buffers[0]
132 sendbuf = buf[offset:offset+self.fillLevel]
133 bytes = len(sendbuf)
134
135 if bytes + offset == len(buf):
136 self._buffers.pop(0)
137 else:
138 self._buffers[0] = (offset+bytes, buf)
139 self._buffersSize -= bytes
140
141 self.consumer.write(sendbuf)
142 self.fillLevel -= bytes
143
144 if self._buffersSize > 0:
145
146
147
148 if not self._dripDC:
149 self._dripDC = reactor.callLater(self._dripInterval,
150 self._dripAndTryWrite)
151 else:
152
153 if self._finishing:
154 if self._unregister:
155 self._doUnregister()
156 self._doFinish()
157 elif self.producer:
158 self.producer.resumeProducing()
159 elif self._unregister:
160 self._doUnregister()
161
165
170
172 self.debug('stopProducing; buffered data: %d', self._buffersSize)
173 if self.producer is not None:
174 self.producer.stopProducing()
175
176 if self._dripDC:
177
178 self._dripDC.cancel()
179 self._dripDC = None
180
181
182 if self._unregister:
183 self._doUnregister()
184
185 if self._finishing:
186 self._finishing = False
187 self.consumer.finish()
188
189 if self._buffersSize > 0:
190
191 self._buffers = []
192 self._buffersSize = 0
193
194 self.consumer = None
195
198
204
213
215 if self._dripDC:
216 self._finishing = True
217 elif self.consumer:
218 self._doFinish()
219
225
227 self.debug('unregisterProducer; buffered data: %d', self._buffersSize)
228 if self.producer is not None:
229 self.producer = None
230
231 if not self._dripDC:
232 self._doUnregister()
233 else:
234
235 self._unregister = True
236