61 static const unsigned long long BYTES_PER_MEG = 1048576ULL;
65 static const unsigned long long MAX_CACHE_SIZE_IN_MEGABYTES = (1ULL << 44);
81 d_cache_dir(cache_dir), d_prefix(prefix), d_max_cache_size_in_bytes(size)
85 m_initialize_cache_info();
89 d_cache_dir = cache_dir;
91 d_max_cache_size_in_bytes = size;
95 m_initialize_cache_info();
100 static inline string get_errno() {
101 char *s_err = strerror(errno);
105 return "Unknown error.";
109 static inline struct flock *lock(
int type) {
110 static struct flock lock;
112 lock.l_whence = SEEK_SET;
115 lock.l_pid = getpid();
121 inline void BESFileLockingCache::m_record_descriptor(
const string &file,
int fd) {
122 BESDEBUG(
"cache",
"DAP Cache: recording descriptor: " << file <<
", " << fd << endl);
123 d_locks.insert(std::pair<string, int>(file, fd));
126 inline int BESFileLockingCache::m_get_descriptor(
const string &file) {
127 FilesAndLockDescriptors::iterator i = d_locks.find(file);
128 if (i == d_locks.end())
132 BESDEBUG(
"cache",
"DAP Cache: getting descriptor: " << file <<
", " << fd << endl);
142 static void unlock(
int fd)
144 if (fcntl(fd, F_SETLK, lock(F_UNLCK)) == -1) {
145 throw BESInternalError(
"An error occurred trying to unlock the file" + get_errno(), __FILE__, __LINE__);
149 throw BESInternalError(
"Could not close the (just) unlocked file.", __FILE__, __LINE__);
164 static bool getSharedLock(
const string &file_name,
int &ref_fd)
166 BESDEBUG(
"cache",
"getSharedLock: " << file_name <<endl);
169 if ((fd = open(file_name.c_str(), O_RDONLY)) < 0) {
179 struct flock *l = lock(F_RDLCK);
180 if (fcntl(fd, F_SETLKW, l) == -1) {
183 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
187 BESDEBUG(
"cache",
"getSharedLock exit: " << file_name <<endl);
206 static bool getExclusiveLock(
string file_name,
int &ref_fd)
208 BESDEBUG(
"cache",
"getExclusiveLock: " << file_name <<endl);
211 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
221 struct flock *l = lock(F_WRLCK);
222 if (fcntl(fd, F_SETLKW, l) == -1) {
225 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
229 BESDEBUG(
"cache",
"getExclusiveLock exit: " << file_name <<endl);
247 static bool getExclusiveLockNB(
string file_name,
int &ref_fd)
249 BESDEBUG(
"cache",
"getExclusiveLock_nonblocking: " << file_name <<endl);
252 if ((fd = open(file_name.c_str(), O_RDWR)) < 0) {
262 struct flock *l = lock(F_WRLCK);
263 if (fcntl(fd, F_SETLK, l) == -1) {
266 BESDEBUG(
"cache",
"getExclusiveLock_nonblocking exit (false): " << file_name <<
" by: " << l->l_pid << endl);
273 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
279 BESDEBUG(
"cache",
"getExclusiveLock_nonblocking exit (true): " << file_name <<endl);
299 static bool createLockedFile(
string file_name,
int &ref_fd)
301 BESDEBUG(
"cache",
"createLockedFile: " << file_name <<endl);
304 if ((fd = open(file_name.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666)) < 0) {
314 struct flock *l = lock(F_WRLCK);
315 if (fcntl(fd, F_SETLKW, l) == -1) {
318 oss <<
"cache process: " << l->l_pid <<
" triggered a locking error: " << get_errno();
322 BESDEBUG(
"cache",
"createLockedFile exit: " << file_name <<endl);
330 void BESFileLockingCache::m_check_ctor_params()
332 if (d_cache_dir.empty()) {
333 string err =
"The cache directory was not specified";
338 int statret = stat(d_cache_dir.c_str(), &buf);
339 if (statret != 0 || !S_ISDIR(buf.st_mode)) {
341 int status = mkdir(d_cache_dir.c_str(), 0775);
343 string err =
"The cache directory " + d_cache_dir +
" does not exist or could not be created.";
348 if (d_prefix.empty()) {
349 string err =
"The cache file prefix was not specified, must not be empty";
353 if (d_max_cache_size_in_bytes <= 0) {
354 string err =
"The cache size was not specified, must be greater than zero";
358 BESDEBUG(
"cache",
"DAP Cache: directory " << d_cache_dir <<
", prefix " << d_prefix
359 <<
", max size " << d_max_cache_size_in_bytes << endl );
363 void BESFileLockingCache::m_initialize_cache_info()
367 d_max_cache_size_in_bytes = min(d_max_cache_size_in_bytes, MAX_CACHE_SIZE_IN_MEGABYTES);
368 d_max_cache_size_in_bytes *= BYTES_PER_MEG;
369 d_target_size = d_max_cache_size_in_bytes * 0.8;
371 m_check_ctor_params();
373 d_cache_info = d_cache_dir +
"/dap.cache.info";
377 if (createLockedFile(d_cache_info, d_cache_info_fd)) {
379 unsigned long long size = 0;
380 if (write(d_cache_info_fd, &size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
381 throw BESInternalError(
"Could not write size info to the cache info file `"+d_cache_info+
"`", __FILE__, __LINE__);
387 if ((d_cache_info_fd = open(d_cache_info.c_str(), O_RDWR)) == -1) {
394 BESDEBUG(
"cache",
"d_cache_info_fd: " << d_cache_info_fd << endl);
417 if (target.at(0) ==
'/') {
418 target = src.substr(1, target.length() - 1);
420 string::size_type slash = 0;
421 while ((slash = target.find(
'/')) != string::npos) {
422 target.replace(slash, 1, 1, BESFileLockingCache::DAP_CACHE_CHAR);
424 string::size_type last_dot = target.rfind(
'.');
425 if (last_dot != string::npos) {
426 target = target.substr(0, last_dot);
429 BESDEBUG(
"cache",
" d_cache_dir: '" << d_cache_dir <<
"'" << endl);
430 BESDEBUG(
"cache",
" d_prefix: '" << d_prefix <<
"'" << endl);
431 BESDEBUG(
"cache",
" target: '" << target <<
"'" << endl);
433 return d_cache_dir +
"/" + d_prefix + BESFileLockingCache::DAP_CACHE_CHAR + target;
457 bool status = getSharedLock(target, fd);
459 BESDEBUG(
"cache",
"DAP Cache: read_lock: " << target <<
"(" << status <<
")" << endl);
462 m_record_descriptor(target, fd);
485 bool status = createLockedFile(target, fd);
487 BESDEBUG(
"cache",
"DAP Cache: create_and_lock: " << target <<
"(" << status <<
")" << endl);
490 m_record_descriptor(target, fd);
514 lock.l_type = F_RDLCK;
515 lock.l_whence = SEEK_SET;
518 lock.l_pid = getpid();
520 if (fcntl(fd, F_SETLKW, &lock) == -1) {
535 BESDEBUG(
"cache",
"lock_cache - d_cache_info_fd: " << d_cache_info_fd << endl);
537 if (fcntl(d_cache_info_fd, F_SETLKW, lock(F_WRLCK)) == -1) {
538 throw BESInternalError(
"An error occurred trying to lock the cache-control file" + get_errno(), __FILE__, __LINE__);
547 BESDEBUG(
"cache",
"lock_cache - d_cache_info_fd: " << d_cache_info_fd << endl);
549 if (fcntl(d_cache_info_fd, F_SETLKW, lock(F_RDLCK)) == -1) {
550 throw BESInternalError(
"An error occurred trying to lock the cache-control file" + get_errno(), __FILE__, __LINE__);
561 BESDEBUG(
"cache",
"DAP Cache: unlock: cache_info (fd: " << d_cache_info_fd <<
")" << endl);
563 if (fcntl(d_cache_info_fd, F_SETLK, lock(F_UNLCK)) == -1) {
564 throw BESInternalError(
"An error occurred trying to unlock the cache-control file" + get_errno(), __FILE__, __LINE__);
585 BESDEBUG(
"cache",
"DAP Cache: unlock file: " << file_name << endl);
587 int fd = m_get_descriptor(file_name);
590 fd = m_get_descriptor(file_name);
601 BESDEBUG(
"cache",
"DAP Cache: unlock fd: " << fd << endl);
605 BESDEBUG(
"cache",
"DAP Cache: unlock " << fd <<
" Success" << endl);
620 unsigned long long current_size;
624 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
625 throw BESInternalError(
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
628 if (read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
629 throw BESInternalError(
"Could not get read size info from the cache info file!", __FILE__, __LINE__);
632 int statret = stat(target.c_str(), &buf);
634 current_size += buf.st_size;
636 throw BESInternalError(
"Could not read the size of the new file: " + target +
" : " + get_errno(), __FILE__, __LINE__);
638 BESDEBUG(
"cache",
"DAP Cache: cache size updated to: " << current_size << endl);
640 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
641 throw BESInternalError(
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
643 if(write(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
644 throw BESInternalError(
"Could not write size info from the cache info file!", __FILE__, __LINE__);
662 return current_size > d_max_cache_size_in_bytes;
674 unsigned long long current_size;
678 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
679 throw BESInternalError(
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
681 if(read(d_cache_info_fd, ¤t_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
682 throw BESInternalError(
"Could not get read size info from the cache info file!", __FILE__, __LINE__);
701 unsigned long long BESFileLockingCache::m_collect_cache_dir_info(
CacheFiles &contents)
703 DIR *dip = opendir(d_cache_dir.c_str());
705 throw BESInternalError(
"Unable to open cache directory " + d_cache_dir, __FILE__, __LINE__);
708 vector<string> files;
711 while ((dit = readdir(dip)) != NULL) {
712 string dirEntry = dit->d_name;
713 if (dirEntry.compare(0, d_prefix.length(), d_prefix) == 0) {
714 files.push_back(d_cache_dir +
"/" + dirEntry);
720 unsigned long long current_size = 0;
722 for (vector<string>::iterator file = files.begin(); file != files.end(); ++file) {
723 if (stat(file->c_str(), &buf) == 0) {
724 current_size += buf.st_size;
727 entry.
size = buf.st_size;
728 entry.
time = buf.st_atime;
732 throw BESInternalError(
"Zero-byte file found in cache. " + *file, __FILE__, __LINE__);
734 contents.push_back(entry);
739 contents.sort(entry_op);
757 BESDEBUG(
"cache",
"purge - starting the purge" << endl);
763 unsigned long long computed_size = m_collect_cache_dir_info(contents);
766 BESDEBUG(
"cache",
"BEFORE Purge " << computed_size/BYTES_PER_MEG << endl );
767 CacheFiles::iterator ti = contents.begin();
768 CacheFiles::iterator te = contents.end();
769 for (; ti != te; ti++) {
770 BESDEBUG(
"cache", (*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
774 BESDEBUG(
"cache",
"purge - current and target size (in MB) " << computed_size/BYTES_PER_MEG <<
", " << d_target_size/BYTES_PER_MEG << endl );
781 CacheFiles::iterator i = contents.begin();
782 while (i != contents.end() && computed_size > d_target_size) {
787 if (i->name != new_file && getExclusiveLockNB(i->name, cfile_fd)) {
788 BESDEBUG(
"cache",
"purge: " << i->name <<
" removed." << endl );
790 if (unlink(i->name.c_str()) != 0)
791 throw BESInternalError(
"Unable to purge the file " + i->name +
" from the cache: " + get_errno(), __FILE__, __LINE__);
794 computed_size -= i->size;
798 BESDEBUG(
"cache",
"purge - current and target size (in MB) " << computed_size/BYTES_PER_MEG <<
", " << d_target_size/BYTES_PER_MEG << endl );
803 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
804 throw BESInternalError(
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
806 if(write(d_cache_info_fd, &computed_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
807 throw BESInternalError(
"Could not write size info to the cache info file!", __FILE__, __LINE__);
811 computed_size = m_collect_cache_dir_info(contents);
812 BESDEBUG(
"cache",
"AFTER Purge " << computed_size/BYTES_PER_MEG << endl );
813 CacheFiles::iterator ti = contents.begin();
814 CacheFiles::iterator te = contents.end();
815 for (; ti != te; ti++) {
816 BESDEBUG(
"cache", (*ti).time <<
": " << (*ti).name <<
": size " << (*ti).size/BYTES_PER_MEG << endl );
841 BESDEBUG(
"cache",
"purge_file - starting the purge" << endl);
848 if (getExclusiveLock(file, cfile_fd)) {
850 unsigned long long size = 0;
852 if (stat(file.c_str(), &buf) == 0) {
856 BESDEBUG(
"cache",
"purge_file: " << file <<
" removed." << endl );
858 if (unlink(file.c_str()) != 0)
859 throw BESInternalError(
"Unable to purge the file " + file +
" from the cache: " + get_errno(), __FILE__, __LINE__);
865 if (lseek(d_cache_info_fd, 0, SEEK_SET) == -1)
866 throw BESInternalError(
"Could not rewind to front of cache info file.", __FILE__, __LINE__);
868 if (write(d_cache_info_fd, &cache_size,
sizeof(
unsigned long long)) !=
sizeof(
unsigned long long))
869 throw BESInternalError(
"Could not write size info to the cache info file!", __FILE__, __LINE__);
900 strm <<
BESIndent::LMarg <<
"BESFileLockingCache::dump - (" << (
void *)
this <<
")" << endl;
904 strm <<
BESIndent::LMarg <<
"size (bytes): " << d_max_cache_size_in_bytes << endl;
virtual void unlock_cache()
Unlock the cache info file.
#define BESISDEBUG(x)
macro used to determine if the specified debug context is set
std::list< cache_entry > CacheFiles
exception thrown if inernal error encountered
virtual bool create_and_lock(const string &target, int &fd)
Create a file in the cache and lock it for write access.
const string getCacheDirectory()
virtual unsigned long long get_cache_size()
Get the cache size.
void initialize(const string &cache_dir, const string &prefix, unsigned long long size)
virtual void dump(ostream &strm) const
dumps information about this object
static ostream & LMarg(ostream &strm)
virtual void purge_file(const string &file)
Purge a single file from the cache.
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big...
virtual void lock_cache_write()
Get an exclusive lock on the 'cache info' file.
virtual string get_cache_file_name(const string &src, bool mangle=true)
Build the name of file that will holds the uncompressed data from 'src' in the cache.
const string getCacheFilePrefix()
virtual bool get_read_lock(const string &target, int &fd)
Get a read-only lock on the file if it exists.
virtual void update_and_purge(const string &new_file)
Purge files from the cache.
virtual unsigned long long update_cache_info(const string &target)
Update the cache info file to include 'target'.
virtual void lock_cache_read()
Get a shared lock on the 'cache info' file.
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock.
#define BESDEBUG(x, y)
macro used to send debug information to the debug stream
virtual void unlock_and_close(const string &target)
Unlock the named file.