Trees | Indices | Help |
---|
|
1 # -*- Mode: Python; test-case-name: -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 # 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 6 # All rights reserved. 7 8 # This file may be distributed and/or modified under the terms of 9 # the GNU General Public License version 2 as published by 10 # the Free Software Foundation. 11 # This file is distributed without any warranty; without even the implied 12 # warranty of merchantability or fitness for a particular purpose. 13 # See "LICENSE.GPL" in the source distribution for more information. 14 15 # Licensees having purchased or holding a valid Flumotion Advanced 16 # Streaming Server license may use this file in accordance with the 17 # Flumotion Advanced Streaming Server Commercial License Agreement. 18 # See "LICENSE.Flumotion" in the source distribution for more information. 19 20 # Headers in this file shall remain intact. 21 22 __version__ = "$Rev: 7792 $" 23 24 import time 25 26 from flumotion.common import log 27 28 from twisted.internet import reactor 29 30 from flumotion.component.plugs import base as plugbase 31 3234 35 # Create a producer-consumer proxy that sits between a FileTransfer object 36 # and a request object. 37 # You may return a Deferred here. 3841 424457 5846 props = args['properties'] 47 self._rateBytesPerSec = int(props.get('rate', 128000) / 8) 48 # Peak level is 10 seconds of data; this is chosen 49 # entirely arbitrarily. 50 self._maxLevel = int(props.get('max-level', 51 self._rateBytesPerSec * 8 * 10) / 8) 52 self._initialLevel = int(props.get('initial-level', 0) / 8)5355 return TokenBucketConsumer(consumer, self._maxLevel, 56 self._rateBytesPerSec, self._initialLevel)60 """ 61 Use a token bucket to proxy between a producer (e.g. FileTransfer) and a 62 consumer (TCP protocol, etc.), doing rate control. 63 64 The bucket has a rate and a maximum level, so a small burst can be 65 permitted. The initial level can be set to a non-zero value, this is 66 useful to implement burst-on-connect behaviour. 67 68 TODO: This almost certainly only works with producers that work like 69 FileTransfer - i.e. they produce data directly in resumeProducing, and 70 ignore pauseProducing. This is sufficient for our needs right now. 71 """ 72 73 logCategory = 'token-bucket' 74 75 # NOTE: Performance is strongly correlated with this value. 76 # Low values (e.g. 0.2) give a 'smooth' transfer, but very high cpu usage 77 # if you have several hundred clients. 78 # Higher values (e.g. 1.0 or more) give bursty transfer, but nicely lower 79 # cpu usage. 80 _dripInterval = 1.0 # If we need to wait for more bits in our bucket, wait 81 # at least this long, to avoid overly frequent small 82 # writes 8328385 self.maxLevel = maxLevel # in bytes 86 self.fillRate = fillRate # in bytes per second 87 self.fillLevel = fillLevel # in bytes 88 89 self._buffers = [] # List of (offset, buffer) tuples 90 self._buffersSize = 0 91 92 self._finishing = False # If true, we'll stop once the current buffer 93 # has been sent. 94 95 self._unregister = False # If true, we'll unregister from the consumer 96 # once the data has been sent. 97 98 self._lastDrip = time.time() 99 self._dripDC = None 100 self._paused = True 101 102 self.producer = None # we get this in registerProducer. 103 self.consumer = consumer 104 105 # We are implemented as a push producer. We forcibly push some 106 # data every couple of seconds to maintain the requested 107 # rate. If the consumer cannot keep up with that rate we want 108 # to get a pauseProducing() call, so we will stop 109 # writing. Otherwise the data would have been buffered on the 110 # server side, leading to excessive memory consumption. 111 self.consumer.registerProducer(self, 1) 112 113 self.info("Created TokenBucketConsumer with rate %d, " 114 "initial level %d, maximum level %d", 115 fillRate, fillLevel, maxLevel)116118 """ 119 Re-fill our token bucket based on how long it has been since we last 120 refilled it. 121 Then attempt to write some data. 122 """ 123 self._dripDC = None 124 125 now = time.time() 126 elapsed = now - self._lastDrip 127 self._lastDrip = now 128 129 bytes = self.fillRate * elapsed 130 # Note that this does introduce rounding errors - not particularly 131 # important if the drip interval is reasonably high, though. These will 132 # cause the actual rate to be lower than the nominal rate. 133 self.fillLevel = int(min(self.fillLevel + bytes, self.maxLevel)) 134 135 self._tryWrite()136138 if not self.consumer: 139 return 140 141 while self.fillLevel > 0 and self._buffersSize > 0: 142 # If we're permitted to write at the moment, do so. 143 offset, buf = self._buffers[0] 144 sendbuf = buf[offset:offset+self.fillLevel] 145 bytes = len(sendbuf) 146 147 if bytes + offset == len(buf): 148 self._buffers.pop(0) 149 else: 150 self._buffers[0] = (offset+bytes, buf) 151 self._buffersSize -= bytes 152 153 self.consumer.write(sendbuf) 154 self.fillLevel -= bytes 155 156 if self._buffersSize > 0: 157 # If we have data (and we're not already waiting for our next drip 158 # interval), wait... this is what actually performs the data 159 # throttling. 160 if not (self._dripDC or self._paused): 161 self._dripDC = reactor.callLater(self._dripInterval, 162 self._dripAndTryWrite) 163 else: 164 # No buffer remaining; ask for more data or finish 165 if self._finishing: 166 if self._unregister: 167 self._doUnregister() 168 self._doFinish() 169 elif self.producer: 170 self.producer.resumeProducing() 171 elif self._unregister: 172 self._doUnregister()173 177 182184 self.debug('stopProducing; buffered data: %d', self._buffersSize) 185 if self.producer is not None: 186 self.producer.stopProducing() 187 188 if self._dripDC: 189 # don't produce after stopProducing()! 190 self._dripDC.cancel() 191 self._dripDC = None 192 193 # ...and then, we still may have pending things to do 194 if self._unregister: 195 self._doUnregister() 196 197 if self._finishing: 198 self._finishing = False 199 self.consumer.finish() 200 201 if self._buffersSize > 0: 202 # make sure we release all the buffers, just in case 203 self._buffers = [] 204 self._buffersSize = 0 205 206 self.consumer = None207209 self._paused = True 210 211 # In case our producer is also 'push', we want it to stop. 212 # FIXME: Pull producers don't even need to implement that 213 # method, so we probably should remember what kind of producer 214 # are we dealing with and not call pauseProducing when it's 215 # 'pull'. 216 # However, all our producers (e.g. FileProducer) just 217 # ignore pauseProducing, so for now it works. 218 # 219 # FIXME: convert the following scenario into a unit test and remove it 220 # from here. It's rather lengthy for a comment. 221 # 222 # The producer might be None at this point if the following happened: 223 # 1) we resumeProducing() 224 # 2) we find out we're not permitted to write more, so we set up the 225 # callLater to write after self._dripInterval 226 # 3) the producer goes avay, unregisterProducer() gets called 227 # 4) the callLater fires and we _dripAndTryWrite() 228 # 5) we try to push some data to the consumer 229 # 6) but the consumer is not reading fast enough, Twisted calls 230 # pauseProducing() on us 231 # 7) at this point if self.producer is None we simply don't proxy the 232 # pauseProducing() call to him 233 if self.producer: 234 self.producer.pauseProducing() 235 236 # We have to stop dripping, otherwise we will keep on filling 237 # the buffers and eventually run out of memory. 238 if self._dripDC: 239 self._dripDC.cancel() 240 self._dripDC = None241243 self._paused = False 244 self._tryWrite() 245 246 if not self._buffers and self.producer: 247 self.producer.resumeProducing()248250 self._buffers.append((0, data)) 251 self._buffersSize += len(data) 252 253 self._tryWrite() 254 255 if self._buffers and not self.fillLevel and self.producer: 256 # FIXME: That's not completely correct. See the comment in 257 # self.pauseProducing() about not calling pauseProducing 258 # on 'pull' producers. 259 self.producer.pauseProducing()260 266268 self.debug("Producer registered: %r", producer) 269 self.producer = producer 270 271 self.resumeProducing()272274 self.debug('unregisterProducer; buffered data: %d', self._buffersSize) 275 if self.producer is not None: 276 self.producer = None 277 278 if not self._dripDC: 279 self._doUnregister() 280 else: 281 # we need to wait until we've written the data 282 self._unregister = True
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Thu Mar 10 06:49:45 2011 | http://epydoc.sourceforge.net |