Package flumotion :: Package component :: Package misc :: Package httpserver :: Module cachemanager
[hide private]

Source Code for Module flumotion.component.misc.httpserver.cachemanager

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import errno 
 23  import os 
 24  import tempfile 
 25  import time 
 26  import stat 
 27   
 28  from twisted.internet import defer, threads, protocol, reactor, utils 
 29   
 30  from flumotion.common import log, common, python, format, errors 
 31   
 32  from flumotion.component.misc.httpserver import fileprovider 
 33   
 34  LOG_CATEGORY = "cache-manager" 
 35   
 36  DEFAULT_CACHE_SIZE = 1000 * 1024 * 1024 
 37  DEFAULT_CACHE_DIR = "/tmp/httpserver" 
 38  DEFAULT_CLEANUP_ENABLED = True 
 39  DEFAULT_CLEANUP_HIGH_WATERMARK = 1.0 
 40  DEFAULT_CLEANUP_LOW_WATERMARK = 0.6 
 41  ID_CACHE_MAX_SIZE = 1024 
 42  TEMP_FILE_POSTFIX = ".tmp" 
 43   
 44   
45 -class CacheManager(object, log.Loggable):
46 47 logCategory = LOG_CATEGORY 48
49 - def __init__(self, stats, 50 cacheDir = None, 51 cacheSize = None, 52 cleanupEnabled = None, 53 cleanupHighWatermark = None, 54 cleanupLowWatermark = None, 55 cacheRealm = None):
56 57 if cacheDir is None: 58 cacheDir = DEFAULT_CACHE_DIR 59 if cacheSize is None: 60 cacheSize = DEFAULT_CACHE_SIZE 61 if cleanupEnabled is None: 62 cleanupEnabled = DEFAULT_CLEANUP_ENABLED 63 if cleanupHighWatermark is None: 64 cleanupHighWatermark = DEFAULT_CLEANUP_HIGH_WATERMARK 65 if cleanupLowWatermark is None: 66 cleanupLowWatermark = DEFAULT_CLEANUP_LOW_WATERMARK 67 68 self.stats = stats 69 self._cacheDir = cacheDir 70 self._cacheSize = cacheSize # in bytes 71 self._cleanupEnabled = cleanupEnabled 72 highWatermark = max(0.0, min(1.0, float(cleanupHighWatermark))) 73 lowWatermark = max(0.0, min(1.0, float(cleanupLowWatermark))) 74 75 self._cachePrefix = (cacheRealm and (cacheRealm + ":")) or "" 76 77 self._identifiers = {} # {path: identifier} 78 79 self.info("Cache Manager initialized") 80 self.debug("Cache directory: '%s'", self._cacheDir) 81 self.debug("Cache size: %d bytes", self._cacheSize) 82 self.debug("Cache cleanup enabled: %s", self._cleanupEnabled) 83 84 common.ensureDir(self._cacheDir, "cache") 85 86 self._cacheUsage = None 87 self._cacheUsageLastUpdate = None 88 self._lastCacheTime = None 89 90 self._cacheMaxUsage = self._cacheSize * highWatermark # in bytes 91 self._cacheMinUsage = self._cacheSize * lowWatermark # in bytes
92
93 - def setUp(self):
94 """ 95 Initialize the cache manager 96 97 @return a defer 98 @raise: OSError or FlumotionError 99 """ 100 # Initialize cache usage 101 return self.updateCacheUsage()
102
103 - def getIdentifier(self, path):
104 """ 105 The returned identifier is a digest of the path encoded in hex string. 106 The hash function used is SHA1. 107 It caches the identifiers in a dictionary indexed by path and with 108 a maximum number of entry specified by the constant ID_CACHE_MAX_SIZE. 109 110 @return: an identifier for path. 111 """ 112 ident = self._identifiers.get(path, None) 113 if ident is None: 114 hash = python.sha1() 115 hash.update(self._cachePrefix + path) 116 ident = hash.digest().encode("hex").strip('\n') 117 # Prevent the cache from growing endlessly 118 if len(self._identifiers) >= ID_CACHE_MAX_SIZE: 119 self._identifiers.clear() 120 self._identifiers[path] = ident 121 return ident
122
123 - def getCachePath(self, path):
124 """ 125 @return: the cached file path for a path. 126 """ 127 ident = self.getIdentifier(path) 128 return os.path.join(self._cacheDir, ident)
129
130 - def getTempPath(self, path):
131 """ 132 @return: a temporary file path for a path. 133 134 Don't use this function, it's provided for compatibility. 135 Use newTempFile() instead. 136 """ 137 ident = self.getIdentifier(path) 138 return os.path.join(self._cacheDir, ident + TEMP_FILE_POSTFIX)
139
141 self.stats.onEstimateCacheUsage(self._cacheUsage, self._cacheSize)
142
143 - def _updateCacheUsage(self, usage):
144 self.log('Disk usage for path %r is %d bytes', self._cacheDir, usage) 145 self._cacheUsageLastUpdate = time.time() 146 self._cacheUsage = usage 147 self.updateCacheUsageStatistics() 148 return usage
149
150 - def updateCacheUsage(self):
151 """ 152 @return: a defered with the cache usage in bytes. 153 @raise: OSError or FlumotionError 154 """ 155 156 # Only calculate cache usage if the cache directory 157 # modification time changed since the last time we looked at it. 158 try: 159 cacheTime = os.path.getmtime(self._cacheDir) 160 except OSError, e: 161 return defer.fail(e) 162 163 if ((self._cacheUsage is None) or (self._lastCacheTime < cacheTime)): 164 self._lastCacheTime = cacheTime 165 self.log('Getting disk usage for path %r', self._cacheDir) 166 d = utils.getProcessOutput('du', ['-bs', self._cacheDir]) 167 d.addCallback(lambda o: int(o.split('\t', 1)[0])) 168 d.addCallback(self._updateCacheUsage) 169 return d 170 else: 171 return defer.succeed(self._cacheUsage)
172
173 - def _rmfiles(self, files):
174 try: 175 for path in files: 176 os.remove(path) 177 except OSError, e: 178 if e.errno != errno.ENOENT: 179 # TODO: is warning() thread safe? 180 self.warning("Error cleaning cached file: %s", str(e))
181
182 - def _setCacheUsage(self, _, usage):
183 # Update the cache usage 184 self._cacheUsage = usage 185 self._cacheUsageLastUpdate = time.time() 186 return usage
187
188 - def _cleanUp(self):
189 # Update cleanup statistics 190 self.stats.onCleanup() 191 # List the cached files with file state 192 try: 193 listdir = os.listdir(self._cacheDir) 194 except OSError, e: 195 return defer.fail(e) 196 197 files = [] 198 for f in listdir: 199 f = os.path.join(self._cacheDir, f) 200 # There's a possibility of getting an error on os.stat here. 201 try: 202 files.append((f, os.stat(f))) 203 except OSError, e: 204 if e.errno == errno.ENOENT: 205 pass 206 else: 207 return defer.fail(e) 208 209 # Calculate the cached file total size 210 usage = sum([d[1].st_size for d in files]) 211 # Delete the cached file starting by the oldest accessed ones 212 files.sort(key=lambda d: d[1].st_atime) 213 rmlist = [] 214 for path, info in files: 215 usage -= info.st_size 216 rmlist.append(path) 217 if usage <= self._cacheMinUsage: 218 # We reach the cleanup limit 219 self.debug('cleaned up, cache use is now %sbytes', 220 format.formatStorage(usage)) 221 break 222 d = threads.deferToThread(self._rmfiles, rmlist) 223 d.addBoth(self._setCacheUsage, usage) 224 return d
225
226 - def _allocateCacheSpaceAfterCleanUp(self, usage, size):
227 if (self._cacheUsage + size) >= self._cacheSize: 228 # There is not enough space, allocation failed 229 self.updateCacheUsageStatistics() 230 self.debug('not enough space in cache, ' 231 'cannot cache %d > %d' % 232 (self._cacheUsage + size, self._cacheSize)) 233 return None 234 235 # There is enough space to allocate, allocation succeed 236 self._cacheUsage += size 237 self.updateCacheUsageStatistics() 238 return (self._cacheUsageLastUpdate, size)
239
240 - def _allocateCacheSpace(self, usage, size):
241 if usage + size < self._cacheMaxUsage: 242 self._cacheUsage += size 243 self.updateCacheUsageStatistics() 244 return defer.succeed((self._cacheUsageLastUpdate, size)) 245 246 self.debug('cache usage will be %sbytes, need more cache', 247 format.formatStorage(usage + size)) 248 249 if not self._cleanupEnabled: 250 # No space available and cleanup disabled: allocation failed. 251 self.debug('not allowed to clean up cache, ' 252 'so cannot cache %d' % size) 253 return defer.succeed(None) 254 255 d = self._cleanUp() 256 d.addCallback(self._allocateCacheSpaceAfterCleanUp, size) 257 return d
258
259 - def allocateCacheSpace(self, size):
260 """ 261 Try to reserve cache space. 262 263 If there is not enough space and the cache cleanup is enabled, 264 it will delete files from the cache starting with the ones 265 with oldest access time until the cache usage drops below 266 the fraction specified by the property cleanup-low-threshold. 267 268 Returns a 'tag' that should be used to 'free' the cache space 269 using releaseCacheSpace. 270 This tag is needed to better estimate the cache usage, 271 if the cache usage has been updated since cache space 272 has been allocated, freeing up the space should not change 273 the cache usage estimation. 274 275 @param size: size to reserve, in bytes 276 @type size: int 277 278 @return: an allocation tag or None if the allocation failed. 279 @rtype: defer to tuple 280 """ 281 d = self.updateCacheUsage() 282 d.addCallback(self._allocateCacheSpace, size) 283 return d
284
285 - def releaseCacheSpace(self, tag):
286 """ 287 Low-level function to release reserved cache space. 288 """ 289 lastUpdate, size = tag 290 if lastUpdate == self._cacheUsageLastUpdate: 291 self._cacheUsage -= size 292 self.updateCacheUsageStatistics()
293
294 - def openCacheFile(self, path):
295 """ 296 @return: a defer to a CacheFile instance or None 297 """ 298 try: 299 return defer.succeed(CachedFile(self, path)) 300 except: 301 return defer.succeed(None)
302
303 - def _newTempFile(self, tag, path, size, mtime=None):
304 # if allocation fails 305 if tag is None: 306 return None 307 308 try: 309 return TempFile(self, path, tag, size, mtime) 310 except OSError, e: 311 return None
312
313 - def newTempFile(self, path, size, mtime=None):
314 """ 315 @return: a defer to a TempFile instance or None 316 """ 317 d = self.allocateCacheSpace(size) 318 d.addCallback(self._newTempFile, path, size, mtime) 319 return d
320 321
322 -class CachedFile:
323 """ 324 Read only. 325 326 See cachedprovider.py 327 @raise: OSError 328 """ 329
330 - def __init__(self, cachemgr, resPath):
331 cachedPath = cachemgr.getCachePath(resPath) 332 file = open(cachedPath, 'rb') 333 stat = os.fstat(file.fileno()) 334 335 cachemgr.log("Opened cached file %s [fd %d]", 336 cachedPath, file.fileno()) 337 338 self.name = cachedPath 339 self.file = file 340 self.stat = stat
341 354
355 - def __getattr__(self, name):
356 file = self.__dict__['file'] 357 a = getattr(file, name) 358 if type(a) != type(0): 359 setattr(self, name, a) 360 return a
361 362
363 -class TempFile:
364 """ 365 See cachedprovider.py 366 """ 367
368 - def __init__(self, cachemgr, resPath, tag, size, mtime=None):
369 """ 370 @raise: OSError 371 """ 372 self.tag = tag 373 self.cachemgr = cachemgr 374 self._completed = False 375 self._finishPath = cachemgr.getCachePath(resPath) 376 self.mtime = mtime 377 self.file = None 378 self.size = size 379 380 fd, tempPath = tempfile.mkstemp(TEMP_FILE_POSTFIX, 381 LOG_CATEGORY, cachemgr._cacheDir) 382 cachemgr.log("Created temporary file '%s' [fd %d]", 383 tempPath, fd) 384 self.file = os.fdopen(fd, "w+b") 385 cachemgr.log("Truncating temporary file to size %d", size) 386 self.file.truncate(size) 387 self.stat = os.fstat(self.file.fileno()) 388 self.name = tempPath
389
390 - def __getattr__(self, name):
391 file = self.__dict__['file'] 392 a = getattr(file, name) 393 if type(a) != type(0): 394 setattr(self, name, a) 395 return a
396
397 - def setModificationTime(self, mtime=None):
398 """ 399 Set file modification time. 400 """ 401 if (mtime): 402 self.mtime = mtime 403 try: 404 if self.mtime: 405 mtime = self.mtime 406 atime = int(time.time()) 407 self.cachemgr.log("Setting cache file " 408 "modification time to %d", mtime) 409 # FIXME: Should use futimes, but it's not wrapped by python 410 os.utime(self.name, (atime, mtime)) 411 except OSError, e: 412 if e.errno == errno.ENOENT: 413 self.cachemgr.releaseCacheSpace(self.tag) 414 else: 415 self.cachemgr.warning( 416 "Failed to update modification time of temporary " 417 "file: %s", log.getExceptionMessage(e))
418
419 - def close(self):
420 """ 421 @raise: OSError 422 """ 423 if self.cachemgr is None: 424 return 425 426 try: 427 if not self._completed: 428 self.cachemgr.log("Temporary file canceled '%s' [fd %d]", 429 self.name, self.fileno()) 430 self.cachemgr.releaseCacheSpace(self.tag) 431 os.unlink(self.name) 432 except OSError, e: 433 pass 434 435 self.file.close() 436 self.setModificationTime() 437 self.file = None 438 self.cachemgr = None
439
440 - def write(self, str):
441 """ 442 @raise: OSError 443 @raise: IOError 444 allocated size 445 """ 446 if (self.file.tell() + len(str) > self.size): 447 raise IOError("Cache size overrun (%d > %d)" % 448 (self.file.tell() + len(str), self.size)) 449 return self.file.write(str)
450
451 - def complete(self, checkSize=False):
452 """ 453 Make the temporary file available as a cached file. 454 Do NOT close the file, afterward the file can be used 455 as a normal CachedFile instance. 456 Do not raise exceptions on rename error. 457 458 @raise: IOError if checkSize and tell() != size 459 """ 460 if self.cachemgr is None: 461 return 462 if self._completed: 463 return 464 self._completed = True 465 466 _, size = self.tag 467 if (self.tell() != size and checkSize): 468 raise IOError("Did not reach end of file") 469 470 self.cachemgr.log("Temporary file completed '%s' [fd %d]", 471 self.name, self.fileno()) 472 try: 473 if self.mtime is not None: 474 mtime = os.path.getmtime(self._finishPath) 475 if mtime > self.mtime: 476 self.cachemgr.log("Did not complete(), " 477 "a more recent version exists already") 478 os.unlink(self.name) 479 self.name = self._finishPath 480 return 481 except OSError, e: 482 pass 483 484 try: 485 os.rename(self.name, self._finishPath) 486 except OSError, e: 487 if e.errno == errno.ENOENT: 488 self.cachemgr.releaseCacheSpace(self.tag) 489 self.cachemgr.warning( 490 "Failed to rename file '%s': %s" % 491 (self.name, str(e))) 492 return 493 494 self.setModificationTime() 495 496 self.name = self._finishPath 497 self.cachemgr.log("Temporary file renamed to '%s' [fd %d]", 498 self._finishPath, self.fileno())
499 500
501 -def main(argv=None):
502 # Functional tests 503 import random 504 505 CACHE_SIZE = 1 * 1024 * 1024 506 MAX_CLEANUPS = 512 507 508 class DummyStats: 509 510 def __init__(self): 511 self.oncleanup = 0
512 513 def info(): 514 pass 515 516 def onEstimateCacheUsage(self, usage, size): 517 #print "Stat: " + str(usage / (1024))\ 518 # + "k / " + str(size / (1024)) + "k" 519 pass 520 521 def onCleanup(self): 522 self.oncleanup += 1 523 print "OnCleanup" 524 525 def makeTemp(tag, size, m, name): 526 t = TempFile(m, name, tag, size) 527 return t 528 529 def completeAndClose(t): 530 try: 531 t.complete() 532 t.close() 533 except: 534 print "Got a complete exception" 535 536 def fillTestCache(manager): 537 i = 0 538 while (manager.stats.oncleanup < MAX_CLEANUPS): 539 i += 1 540 filesize = 4096 * random.randint(1, 30) 541 d = manager.newTempFile(str(i), filesize) 542 d.addCallback(completeAndClose) 543 544 def releaseCacheSpace(tag, m): 545 print "gotCacheSpace: ", tag 546 m.releaseCacheSpace(tag) 547 548 def checkUsage(usage, m, check): 549 if (not check(m._cacheUsage)): 550 print "Cache overrun!!! %d/%d" % (m._cacheUsage, m._cacheSize) 551 552 def openCacheAndClose(_, m, name): 553 d = m.openCacheFile(name) 554 d.addCallback(lambda f: f.close()) 555 return d 556 557 def checkMiss(_): 558 if (_ == "cacheMiss"): 559 return 560 raise errors.FlumotionError("an error") 561 562 def runTests(): 563 # low-level cache requests 564 d = m.allocateCacheSpace(1024) 565 d.addCallback(releaseCacheSpace, m) 566 d.addCallback(checkUsage, m, lambda u: u == 0) 567 568 d = m.allocateCacheSpace(CACHE_SIZE / 2) 569 d.addCallback(makeTemp, CACHE_SIZE / 2, m, "test") 570 d.addCallback(lambda t: t.close()) 571 d.addCallback(checkUsage, m, lambda u: u == 0) 572 573 d = m.allocateCacheSpace(CACHE_SIZE / 2) 574 d.addCallback(makeTemp, CACHE_SIZE / 2, m, "test2") 575 d.addCallback(completeAndClose) 576 d.addCallback(checkUsage, m, lambda u: u > 0) 577 578 # check hit and miss 579 m2 = CacheManager(DummyStats(), cachedir, CACHE_SIZE, True, 0.5, 0.3) 580 d = m2.newTempFile("test3", 12000) 581 d.addCallback(completeAndClose) 582 d.addCallback(openCacheAndClose, m, "test3") 583 584 d = openCacheAndClose(None, m, "test4_do_not_exists") 585 d.addErrback(lambda _: "cacheMiss") 586 d.addCallback(checkMiss) 587 588 # multi-thread test, full of races :) 589 threads.deferToThread(fillTestCache, m) 590 threads.deferToThread(fillTestCache, m) 591 threads.deferToThread(fillTestCache, m) 592 593 # check usage 594 m.updateCacheUsage().addCallback(checkUsage, m, 595 lambda u: u < CACHE_SIZE * 1.10) 596 597 598 cachedir = os.environ['HOME'] + "/tmp/cache" 599 m = CacheManager(DummyStats(), cachedir, CACHE_SIZE, True, 0.0, 0.0) 600 d = m.setUp() 601 602 m.addCallback(lambda x: runTests()) 603 604 reactor.callLater(3, reactor.stop) 605 reactor.run() 606 return 0 607 608 if __name__ == '__main__': 609 import sys 610 status = main() 611 sys.exit(status) 612