XRootD
Loading...
Searching...
No Matches
XrdPfc.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19#include <fcntl.h>
20#include <sstream>
21#include <algorithm>
22#include <sys/statvfs.h>
23
25#include "XrdCl/XrdClURL.hh"
26
27#include "XrdOuc/XrdOucEnv.hh"
28#include "XrdOuc/XrdOucUtils.hh"
29
31#include "XrdSys/XrdSysTimer.hh"
32#include "XrdSys/XrdSysTrace.hh"
33
35
36#include "XrdOss/XrdOss.hh"
37
38#include "XrdPfc.hh"
39#include "XrdPfcTrace.hh"
40#include "XrdPfcFSctl.hh"
41#include "XrdPfcInfo.hh"
42#include "XrdPfcIOFile.hh"
43#include "XrdPfcIOFileBlock.hh"
44
45using namespace XrdPfc;
46
47Cache * Cache::m_instance = 0;
48
50
51
57
58void *PurgeThread(void*)
59{
61 return 0;
62}
63
65{
67 return 0;
68}
69
70void *PrefetchThread(void*)
71{
73 return 0;
74}
75
76//==============================================================================
77
78extern "C"
79{
81 const char *config_filename,
82 const char *parameters,
83 XrdOucEnv *env)
84{
85 XrdSysError err(logger, "");
86 err.Say("++++++ Proxy file cache initialization started.");
87
88 if ( ! env ||
89 ! (XrdPfc::Cache::schedP = (XrdScheduler*) env->GetPtr("XrdScheduler*")))
90 {
93 }
94
95 Cache &instance = Cache::CreateInstance(logger, env);
96
97 if (! instance.Config(config_filename, parameters))
98 {
99 err.Say("Config Proxy file cache initialization failed.");
100 return 0;
101 }
102 err.Say("------ Proxy file cache initialization completed.");
103
104 {
105 pthread_t tid;
106
107 for (int wti = 0; wti < instance.RefConfiguration().m_wqueue_threads; ++wti)
108 {
109 XrdSysThread::Run(&tid, ProcessWriteTaskThread, 0, 0, "XrdPfc WriteTasks ");
110 }
111
112 if (instance.RefConfiguration().m_prefetch_max_blocks > 0)
113 {
114 XrdSysThread::Run(&tid, PrefetchThread, 0, 0, "XrdPfc Prefetch ");
115 }
116
117 XrdSysThread::Run(&tid, ResourceMonitorHeartBeatThread, 0, 0, "XrdPfc ResourceMonitorHeartBeat");
118
119 XrdSysThread::Run(&tid, PurgeThread, 0, 0, "XrdPfc Purge");
120 }
121
122 XrdPfcFSctl* pfcFSctl = new XrdPfcFSctl(instance, logger);
123 env->PutPtr("XrdFSCtl_PC*", pfcFSctl);
124
125 return &instance;
126}
127}
128
129//==============================================================================
130
131void Configuration::calculate_fractional_usages(long long du, long long fu,
132 double &frac_du, double &frac_fu)
133{
134 // Calculate fractional disk / file usage and clamp them to [0, 1].
135
136 // Fractional total usage above LWM:
137 // - can be > 1 if usage is above HWM;
138 // - can be < 0 if triggered via age-based-purging.
139 frac_du = (double) (du - m_diskUsageLWM) / (m_diskUsageHWM - m_diskUsageLWM);
140
141 // Fractional file usage above baseline.
142 // - can be > 1 if file usage is above max;
143 // - can be < 0 if file usage is below baseline.
144 frac_fu = (double) (fu - m_fileUsageBaseline) / (m_fileUsageMax - m_fileUsageBaseline);
145
146 frac_du = std::min( std::max( frac_du, 0.0), 1.0 );
147 frac_fu = std::min( std::max( frac_fu, 0.0), 1.0 );
148}
149
150//==============================================================================
151
153{
154 assert (m_instance == 0);
155 m_instance = new Cache(logger, env);
156 return *m_instance;
157}
158
159 Cache& Cache::GetInstance() { return *m_instance; }
160const Cache& Cache::TheOne() { return *m_instance; }
161const Configuration& Cache::Conf() { return m_instance->RefConfiguration(); }
162
164{
165 if (! m_decisionpoints.empty())
166 {
167 XrdCl::URL url(io->Path());
168 std::string filename = url.GetPath();
169 std::vector<Decision*>::const_iterator it;
170 for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
171 {
172 XrdPfc::Decision *d = *it;
173 if (! d) continue;
174 if (! d->Decide(filename, *m_oss))
175 {
176 return false;
177 }
178 }
179 }
180
181 return true;
182}
183
185 XrdOucCache("pfc"),
186 m_env(env),
187 m_log(logger, "XrdPfc_"),
188 m_trace(new XrdSysTrace("XrdPfc", logger)),
189 m_traceID("Cache"),
190 m_oss(0),
191 m_gstream(0),
192 m_prefetch_condVar(0),
193 m_prefetch_enabled(false),
194 m_RAM_used(0),
195 m_RAM_write_queue(0),
196 m_RAM_std_size(0),
197 m_isClient(false),
198 m_in_purge(false),
199 m_active_cond(0),
200 m_stats_n_purge_cond(0),
201 m_fs_state(0),
202 m_last_scan_duration(0),
203 m_last_purge_duration(0),
204 m_spt_state(SPTS_Idle)
205{
206 // Default log level is Warning.
207 m_trace->What = 2;
208}
209
211{
212 const char* tpfx = "Attach() ";
213
214 if (Cache::GetInstance().Decide(io))
215 {
216 TRACE(Info, tpfx << io->Path());
217
218 IO *cio;
219
220 if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
221 {
222 cio = new IOFileBlock(io, *this);
223 }
224 else
225 {
226 IOFile *iof = new IOFile(io, *this);
227
228 if ( ! iof->HasFile())
229 {
230 delete iof;
231 // TODO - redirect instead. But this is kind of an awkward place for it.
232 // errno is set during IOFile construction.
233 TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
234 return io;
235 }
236
237 cio = iof;
238 }
239
240 TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
241 ((loc && loc[0] != 0) ? loc : "<deferred open>"));
242
243 return cio;
244 }
245 else
246 {
247 TRACE(Info, tpfx << "decision decline " << io->Path());
248 }
249 return io;
250}
251
252void Cache::AddWriteTask(Block* b, bool fromRead)
253{
254 TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
255
256 {
257 XrdSysMutexHelper lock(&m_RAM_mutex);
258 m_RAM_write_queue += b->get_size();
259 }
260
261 m_writeQ.condVar.Lock();
262 if (fromRead)
263 m_writeQ.queue.push_back(b);
264 else
265 m_writeQ.queue.push_front(b);
266 m_writeQ.size++;
267 m_writeQ.condVar.Signal();
268 m_writeQ.condVar.UnLock();
269}
270
272{
273 std::list<Block*> removed_blocks;
274 long long sum_size = 0;
275
276 m_writeQ.condVar.Lock();
277 std::list<Block*>::iterator i = m_writeQ.queue.begin();
278 while (i != m_writeQ.queue.end())
279 {
280 if ((*i)->m_file == file)
281 {
282 TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
283 std::list<Block*>::iterator j = i++;
284 removed_blocks.push_back(*j);
285 sum_size += (*j)->get_size();
286 m_writeQ.queue.erase(j);
287 --m_writeQ.size;
288 }
289 else
290 {
291 ++i;
292 }
293 }
294 m_writeQ.condVar.UnLock();
295
296 {
297 XrdSysMutexHelper lock(&m_RAM_mutex);
298 m_RAM_write_queue -= sum_size;
299 }
300
301 file->BlocksRemovedFromWriteQ(removed_blocks);
302}
303
305{
306 std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
307
308 while (true)
309 {
310 m_writeQ.condVar.Lock();
311 while (m_writeQ.size == 0)
312 {
313 m_writeQ.condVar.Wait();
314 }
315
316 // MT -- optimize to pop several blocks if they are available (or swap the list).
317 // This makes sense especially for smallish block sizes.
318
319 int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
320 long long sum_size = 0;
321
322 for (int bi = 0; bi < n_pushed; ++bi)
323 {
324 Block* block = m_writeQ.queue.front();
325 m_writeQ.queue.pop_front();
326 m_writeQ.writes_between_purges += block->get_size();
327 sum_size += block->get_size();
328
329 blks_to_write[bi] = block;
330
331 TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
332 }
333 m_writeQ.size -= n_pushed;
334
335 m_writeQ.condVar.UnLock();
336
337 {
338 XrdSysMutexHelper lock(&m_RAM_mutex);
339 m_RAM_write_queue -= sum_size;
340 }
341
342 for (int bi = 0; bi < n_pushed; ++bi)
343 {
344 Block* block = blks_to_write[bi];
345
346 block->m_file->WriteBlockToDisk(block);
347 }
348 }
349}
350
351//==============================================================================
352
353char* Cache::RequestRAM(long long size)
354{
355 static const size_t s_block_align = sysconf(_SC_PAGESIZE);
356
357 bool std_size = (size == m_configuration.m_bufferSize);
358
359 m_RAM_mutex.Lock();
360
361 long long total = m_RAM_used + size;
362
363 if (total <= m_configuration.m_RamAbsAvailable)
364 {
365 m_RAM_used = total;
366 if (std_size && m_RAM_std_size > 0)
367 {
368 char *buf = m_RAM_std_blocks.back();
369 m_RAM_std_blocks.pop_back();
370 --m_RAM_std_size;
371
372 m_RAM_mutex.UnLock();
373
374 return buf;
375 }
376 else
377 {
378 m_RAM_mutex.UnLock();
379 char *buf;
380 if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
381 {
382 // Report out of mem? Probably should report it at least the first time,
383 // then periodically.
384 return 0;
385 }
386 return buf;
387 }
388 }
389 m_RAM_mutex.UnLock();
390 return 0;
391}
392
393void Cache::ReleaseRAM(char* buf, long long size)
394{
395 bool std_size = (size == m_configuration.m_bufferSize);
396 {
397 XrdSysMutexHelper lock(&m_RAM_mutex);
398
399 m_RAM_used -= size;
400
401 if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
402 {
403 m_RAM_std_blocks.push_back(buf);
404 ++m_RAM_std_size;
405 return;
406 }
407 }
408 free(buf);
409}
410
411File* Cache::GetFile(const std::string& path, IO* io, long long off, long long filesize)
412{
413 // Called from virtual IO::Attach
414
415 TRACE(Debug, "GetFile " << path << ", io " << io);
416
417 ActiveMap_i it;
418
419 {
420 XrdSysCondVarHelper lock(&m_active_cond);
421
422 while (true)
423 {
424 it = m_active.find(path);
425
426 // File is not open or being opened. Mark it as being opened and
427 // proceed to opening it outside of while loop.
428 if (it == m_active.end())
429 {
430 it = m_active.insert(std::make_pair(path, (File*) 0)).first;
431 break;
432 }
433
434 if (it->second != 0)
435 {
436 it->second->AddIO(io);
437 inc_ref_cnt(it->second, false, true);
438
439 return it->second;
440 }
441 else
442 {
443 // Wait for some change in m_active, then recheck.
444 m_active_cond.Wait();
445 }
446 }
447 }
448
449 if (filesize == 0)
450 {
451 struct stat st;
452 int res = io->Fstat(st);
453 if (res < 0) {
454 errno = res;
455 TRACE(Error, "GetFile, could not get valid stat");
456 } else if (res > 0) {
457 errno = ENOTSUP;
458 TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
459 } else {
460 filesize = st.st_size;
461 }
462 }
463
464 File *file = 0;
465
466 if (filesize >= 0)
467 {
468 file = File::FileOpen(path, off, filesize);
469 }
470
471 {
472 XrdSysCondVarHelper lock(&m_active_cond);
473
474 if (file)
475 {
476 inc_ref_cnt(file, false, true);
477 it->second = file;
478
479 file->AddIO(io);
480 }
481 else
482 {
483 m_active.erase(it);
484 }
485
486 m_active_cond.Broadcast();
487 }
488
489 return file;
490}
491
493{
494 // Called from virtual IO::DetachFinalize.
495
496 TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
497
498 {
499 XrdSysCondVarHelper lock(&m_active_cond);
500
501 f->RemoveIO(io);
502 }
503 dec_ref_cnt(f, true);
504}
505
506
507namespace
508{
509
510class DiskSyncer : public XrdJob
511{
512private:
513 File *m_file;
514 bool m_high_debug;
515
516public:
517 DiskSyncer(File *f, bool high_debug, const char *desc = "") :
518 XrdJob(desc),
519 m_file(f),
520 m_high_debug(high_debug)
521 {}
522
523 void DoIt()
524 {
525 m_file->Sync();
526 Cache::GetInstance().FileSyncDone(m_file, m_high_debug);
527 delete this;
528 }
529};
530
531
532class CommandExecutor : public XrdJob
533{
534private:
535 std::string m_command_url;
536
537public:
538 CommandExecutor(const std::string& command, const char *desc = "") :
539 XrdJob(desc),
540 m_command_url(command)
541 {}
542
543 void DoIt()
544 {
545 Cache::GetInstance().ExecuteCommandUrl(m_command_url);
546 delete this;
547 }
548};
549
550}
551
552//==============================================================================
553
554void Cache::schedule_file_sync(File* f, bool ref_cnt_already_set, bool high_debug)
555{
556 DiskSyncer* ds = new DiskSyncer(f, high_debug);
557
558 if ( ! ref_cnt_already_set) inc_ref_cnt(f, true, high_debug);
559
560 schedP->Schedule(ds);
561}
562
563void Cache::FileSyncDone(File* f, bool high_debug)
564{
565 dec_ref_cnt(f, high_debug);
566}
567
568void Cache::inc_ref_cnt(File* f, bool lock, bool high_debug)
569{
570 // called from GetFile() or SheduleFileSync();
571
572 int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
573
574 if (lock) m_active_cond.Lock();
575 int rc = f->inc_ref_cnt();
576 if (lock) m_active_cond.UnLock();
577
578 TRACE_INT(tlvl, "inc_ref_cnt " << f->GetLocalPath() << ", cnt at exit = " << rc);
579}
580
581void Cache::dec_ref_cnt(File* f, bool high_debug)
582{
583 // Called from ReleaseFile() or DiskSync callback.
584
585 int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
586 int cnt;
587
588 {
589 XrdSysCondVarHelper lock(&m_active_cond);
590
591 cnt = f->get_ref_cnt();
592
594 {
595 // In this case file has been already removed from m_active map and
596 // does not need to be synced.
597
598 if (cnt == 1)
599 {
600 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
601 << " -- deleting File object without further ado");
602 delete f;
603 }
604 else
605 {
606 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
607 << " -- waiting");
608 }
609
610 return;
611 }
612 }
613
614 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt at entry = " << cnt);
615
616 if (cnt == 1)
617 {
618 if (f->FinalizeSyncBeforeExit())
619 {
620 // Note, here we "reuse" the existing reference count for the
621 // final sync.
622
623 TRACE(Debug, "dec_ref_cnt " << f->GetLocalPath() << ", scheduling final sync");
624 schedule_file_sync(f, true, true);
625 return;
626 }
627 }
628
629 {
630 XrdSysCondVarHelper lock(&m_active_cond);
631
632 cnt = f->dec_ref_cnt();
633 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt after sync_check and dec_ref_cnt = " << cnt);
634 if (cnt == 0)
635 {
636 ActiveMap_i it = m_active.find(f->GetLocalPath());
637 m_active.erase(it);
638
639 m_closed_files_stats.insert(std::make_pair(f->GetLocalPath(), f->DeltaStatsFromLastCall()));
640
641 if (m_gstream)
642 {
643 const Stats &st = f->RefStats();
644 const Info::AStat *as = f->GetLastAccessStats();
645
646 char buf[4096];
647 int len = snprintf(buf, 4096, "{\"event\":\"file_close\","
648 "\"lfn\":\"%s\",\"size\":%lld,\"blk_size\":%d,\"n_blks\":%d,\"n_blks_done\":%d,"
649 "\"access_cnt\":%lu,\"attach_t\":%lld,\"detach_t\":%lld,\"remotes\":%s,"
650 "\"b_hit\":%lld,\"b_miss\":%lld,\"b_bypass\":%lld,\"n_cks_errs\":%d}",
651 f->GetLocalPath().c_str(), f->GetFileSize(), f->GetBlockSize(),
653 (unsigned long) f->GetAccessCnt(), (long long) as->AttachTime, (long long) as->DetachTime,
654 f->GetRemoteLocations().c_str(),
656 );
657 bool suc = false;
658 if (len < 4096)
659 {
660 suc = m_gstream->Insert(buf, len + 1);
661 }
662 if ( ! suc)
663 {
664 TRACE(Error, "Failed g-stream insertion of file_close record, len=" << len);
665 }
666 }
667
668 delete f;
669 }
670 }
671}
672
673bool Cache::IsFileActiveOrPurgeProtected(const std::string& path)
674{
675 XrdSysCondVarHelper lock(&m_active_cond);
676
677 return m_active.find(path) != m_active.end() ||
678 m_purge_delay_set.find(path) != m_purge_delay_set.end();
679}
680
681
682//==============================================================================
683//=== PREFETCH
684//==============================================================================
685
687{
688 // Can be called with other locks held.
689
690 if ( ! m_prefetch_enabled)
691 {
692 return;
693 }
694
695 m_prefetch_condVar.Lock();
696 m_prefetchList.push_back(file);
697 m_prefetch_condVar.Signal();
698 m_prefetch_condVar.UnLock();
699}
700
701
703{
704 // Can be called with other locks held.
705
706 if ( ! m_prefetch_enabled)
707 {
708 return;
709 }
710
711 m_prefetch_condVar.Lock();
712 for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
713 {
714 if (*it == file)
715 {
716 m_prefetchList.erase(it);
717 break;
718 }
719 }
720 m_prefetch_condVar.UnLock();
721}
722
723
725{
726 m_prefetch_condVar.Lock();
727 while (m_prefetchList.empty())
728 {
729 m_prefetch_condVar.Wait();
730 }
731
732 // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
733
734 size_t l = m_prefetchList.size();
735 int idx = rand() % l;
736 File* f = m_prefetchList[idx];
737
738 m_prefetch_condVar.UnLock();
739 return f;
740}
741
742
744{
745 const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
746
747 while (true)
748 {
749 m_RAM_mutex.Lock();
750 bool doPrefetch = (m_RAM_used < limit_RAM);
751 m_RAM_mutex.UnLock();
752
753 if (doPrefetch)
754 {
756 f->Prefetch();
757 }
758 else
759 {
761 }
762 }
763}
764
765
766//==============================================================================
767//=== Virtuals from XrdOucCache
768//==============================================================================
769
770//------------------------------------------------------------------------------
784
785int Cache::LocalFilePath(const char *curl, char *buff, int blen,
786 LFP_Reason why, bool forall)
787{
788 static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
789 static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
790 static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
791
792 TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
793
794 if (buff && blen > 0) buff[0] = 0;
795
796 XrdCl::URL url(curl);
797 std::string f_name = url.GetPath();
798 std::string i_name = f_name + Info::s_infoExtension;
799
800 if (why == ForPath)
801 {
802 int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
803 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
804 return ret;
805 }
806
807 {
808 XrdSysCondVarHelper lock(&m_active_cond);
809 m_purge_delay_set.insert(f_name);
810 }
811
812 struct stat sbuff, sbuff2;
813 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
814 m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
815 {
816 if (S_ISDIR(sbuff.st_mode))
817 {
818 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
819 return -EISDIR;
820 }
821 else
822 {
823 bool read_ok = false;
824 bool is_complete = false;
825
826 // Lock and check if the file is active. If NOT, keep the lock
827 // and add dummy access after successful reading of info file.
828 // If it IS active, just release the lock, this ongoing access will
829 // assure the file continues to exist.
830
831 // XXXX How can I just loop over the cinfo file when active?
832 // Can I not get is_complete from the existing file?
833 // Do I still want to inject access record?
834 // Oh, it writes only if not active .... still let's try to use existing File.
835
836 m_active_cond.Lock();
837
838 bool is_active = m_active.find(f_name) != m_active.end();
839
840 if (is_active) m_active_cond.UnLock();
841
842 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
843 XrdOucEnv myEnv;
844 int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
845 if (res >= 0)
846 {
847 Info info(m_trace, 0);
848 if (info.Read(infoFile, i_name.c_str()))
849 {
850 read_ok = true;
851
852 is_complete = info.IsComplete();
853
854 // Add full-size access if reason is for access.
855 if ( ! is_active && is_complete && why == ForAccess)
856 {
857 info.WriteIOStatSingle(info.GetFileSize());
858 info.Write(infoFile, i_name.c_str());
859 }
860 }
861 infoFile->Close();
862 }
863 delete infoFile;
864
865 if ( ! is_active) m_active_cond.UnLock();
866
867 if (read_ok)
868 {
869 if ((is_complete || why == ForInfo) && buff != 0)
870 {
871 int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
872 if (res2 < 0)
873 return res2;
874
875 // Normally, files are owned by us but when direct cache access
876 // is wanted and possible, make sure the file is world readable.
877 if (why == ForAccess)
878 {mode_t mode = (forall ? worldReadable : groupReadable);
879 if (((sbuff.st_mode & worldReadable) != mode)
880 && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
881 {is_complete = false;
882 *buff = 0;
883 }
884 }
885 }
886
887 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
888 (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
889
890 return is_complete ? 0 : -EREMOTE;
891 }
892 }
893 }
894
895 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
896 return -ENOENT;
897}
898
899//______________________________________________________________________________
900// Check if the file is cached including m_onlyIfCachedMinSize and m_onlyIfCachedMinFrac
901// pfc configuration parameters. The logic of accessing the Info file is the same
902// as in Cache::LocalFilePath.
912//------------------------------------------------------------------------------
913int Cache::ConsiderCached(const char *curl)
914{
915 TRACE(Debug, "ConsiderFileCached '" << curl << "'" );
916
917 XrdCl::URL url(curl);
918 std::string f_name = url.GetPath();
919 std::string i_name = f_name + Info::s_infoExtension;
920
921 {
922 XrdSysCondVarHelper lock(&m_active_cond);
923 m_purge_delay_set.insert(f_name);
924 }
925
926 struct stat sbuff, sbuff2;
927 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
928 m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
929 {
930 if (S_ISDIR(sbuff.st_mode))
931 {
932 TRACE(Info, "ConsiderCached '" << curl << ", why=ForInfo" << " -> EISDIR");
933 return -EISDIR;
934 }
935 else
936 {
937 bool read_ok = false;
938 bool is_cached = false;
939
940 // Lock and check if the file is active. If NOT, keep the lock
941 // and add dummy access after successful reading of info file.
942 // If it IS active, just release the lock, this ongoing access will
943 // assure the file continues to exist.
944
945 // XXXX How can I just loop over the cinfo file when active?
946 // Can I not get is_complete from the existing file?
947 // Do I still want to inject access record?
948 // Oh, it writes only if not active .... still let's try to use existing File.
949
950 m_active_cond.Lock();
951
952 bool is_active = m_active.find(f_name) != m_active.end();
953
954 if (is_active)
955 m_active_cond.UnLock();
956
957 XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
958 XrdOucEnv myEnv;
959 int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
960 if (res >= 0)
961 {
962 Info info(m_trace, 0);
963 if (info.Read(infoFile, i_name.c_str()))
964 {
965 read_ok = true;
966
967 if (info.IsComplete())
968 {
969 is_cached = true;
970 }
971 else if (info.GetFileSize() == 0)
972 {
973 is_cached = true;
974 }
975 else
976 {
977 long long fileSize = info.GetFileSize();
978 long long bytesRead = info.GetNDownloadedBytes();
979
980 if (fileSize < m_configuration.m_onlyIfCachedMinSize)
981 {
982 if ((float)bytesRead / fileSize > m_configuration.m_onlyIfCachedMinFrac)
983 is_cached = true;
984 }
985 else
986 {
987 if (bytesRead > m_configuration.m_onlyIfCachedMinSize &&
988 (float)bytesRead / fileSize > m_configuration.m_onlyIfCachedMinFrac)
989 is_cached = true;
990 }
991 }
992 }
993 infoFile->Close();
994 }
995 delete infoFile;
996
997 if (!is_active) m_active_cond.UnLock();
998
999 if (read_ok)
1000 {
1001 TRACE(Info, "ConsiderCached '" << curl << "', why=ForInfo" << (is_cached ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
1002 return is_cached ? 0 : -EREMOTE;
1003 }
1004 }
1005 }
1006
1007 TRACE(Info, "ConsiderCached '" << curl << "', why=ForInfo" << " -> ENOENT");
1008 return -ENOENT;
1009}
1010
1011//______________________________________________________________________________
1019//------------------------------------------------------------------------------
1020
1021int Cache::Prepare(const char *curl, int oflags, mode_t mode)
1022{
1023 XrdCl::URL url(curl);
1024 std::string f_name = url.GetPath();
1025 std::string i_name = f_name + Info::s_infoExtension;
1026
1027 // Do not allow write access.
1028 if (oflags & (O_WRONLY | O_RDWR | O_APPEND | O_CREAT))
1029 {
1030 TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1031 return -EROFS;
1032 }
1033
1034 // Intercept xrdpfc_command requests.
1035 if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1036 {
1037 // Schedule a job to process command request.
1038 {
1039 CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1040
1041 schedP->Schedule(ce);
1042 }
1043
1044 return -EAGAIN;
1045 }
1046
1047 {
1048 XrdSysCondVarHelper lock(&m_active_cond);
1049 m_purge_delay_set.insert(f_name);
1050 }
1051
1052 struct stat sbuff;
1053 int res = m_oss->Stat(i_name.c_str(), &sbuff);
1054 if (res == 0)
1055 {
1056 TRACE(Dump, "Prepare defer open " << f_name);
1057 return 1;
1058 }
1059 else
1060 {
1061 return 0;
1062 }
1063}
1064
1065//______________________________________________________________________________
1066// virtual method of XrdOucCache.
1071//------------------------------------------------------------------------------
1072
1073int Cache::Stat(const char *curl, struct stat &sbuff)
1074{
1075 XrdCl::URL url(curl);
1076 std::string f_name = url.GetPath();
1077
1078 {
1079 XrdSysCondVarHelper lock(&m_active_cond);
1080 m_purge_delay_set.insert(f_name);
1081 }
1082
1083 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK)
1084 {
1085 if (S_ISDIR(sbuff.st_mode))
1086 {
1087 return 0;
1088 }
1089 else
1090 {
1091 bool success = false;
1092 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
1093 XrdOucEnv myEnv;
1094
1095 f_name += Info::s_infoExtension;
1096 int res = infoFile->Open(f_name.c_str(), O_RDONLY, 0600, myEnv);
1097 if (res >= 0)
1098 {
1099 Info info(m_trace, 0);
1100 if (info.Read(infoFile, f_name.c_str()))
1101 {
1102 sbuff.st_size = info.GetFileSize();
1103 success = true;
1104 }
1105 }
1106 infoFile->Close();
1107 delete infoFile;
1108 return success ? 0 : 1;
1109 }
1110 }
1111
1112 return 1;
1113}
1114
1115//______________________________________________________________________________
1116// virtual method of XrdOucCache.
1120//------------------------------------------------------------------------------
1121
1122int Cache::Unlink(const char *curl)
1123{
1124 XrdCl::URL url(curl);
1125 std::string f_name = url.GetPath();
1126
1127 // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1128
1129 return UnlinkFile(f_name, false);
1130}
1131
1132int Cache::UnlinkFile(const std::string& f_name, bool fail_if_open)
1133{
1134 ActiveMap_i it;
1135 File *file = 0;
1136 {
1137 XrdSysCondVarHelper lock(&m_active_cond);
1138
1139 it = m_active.find(f_name);
1140
1141 if (it != m_active.end())
1142 {
1143 if (fail_if_open)
1144 {
1145 TRACE(Info, "UnlinkCommon " << f_name << ", file currently open and force not requested - denying request");
1146 return -EBUSY;
1147 }
1148
1149 // Null File* in m_active map means an operation is ongoing, probably
1150 // Attach() with possible File::Open(). Ask for retry.
1151 if (it->second == 0)
1152 {
1153 TRACE(Info, "UnlinkCommon " << f_name << ", an operation on this file is ongoing - denying request");
1154 return -EAGAIN;
1155 }
1156
1157 file = it->second;
1159 it->second = 0;
1160 }
1161 else
1162 {
1163 it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1164 }
1165 }
1166
1167 if (file)
1168 {
1170 }
1171
1172 std::string i_name = f_name + Info::s_infoExtension;
1173
1174 // Unlink file & cinfo
1175 int f_ret = m_oss->Unlink(f_name.c_str());
1176 int i_ret = m_oss->Unlink(i_name.c_str());
1177
1178 TRACE(Debug, "UnlinkCommon " << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1179
1180 {
1181 XrdSysCondVarHelper lock(&m_active_cond);
1182
1183 m_active.erase(it);
1184 }
1185
1186 return std::min(f_ret, i_ret);
1187}
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
#define TRACE_Debug
#define XrdOssOK
Definition XrdOss.hh:50
#define TRACE_Dump
#define TRACE_PC(act, pre_code, x)
#define TRACE_INT(act, x)
void * ProcessWriteTaskThread(void *)
Definition XrdPfc.cc:64
void * ResourceMonitorHeartBeatThread(void *)
Definition XrdPfc.cc:52
void * PrefetchThread(void *)
Definition XrdPfc.cc:70
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition XrdPfc.cc:80
void * PurgeThread(void *)
Definition XrdPfc.cc:58
#define stat(a, b)
Definition XrdPosix.hh:96
bool Debug
#define TRACE(act, x)
Definition XrdTrace.hh:63
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:212
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition XrdOss.hh:873
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
virtual int Fstat(struct stat &sbuff)
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:263
void PutPtr(const char *varname, void *value)
Definition XrdOucEnv.cc:298
int get_size() const
long long m_offset
File * get_file() const
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:267
void FileSyncDone(File *, bool high_debug)
Definition XrdPfc.cc:563
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition XrdPfc.cc:411
static const Configuration & Conf()
Definition XrdPfc.cc:161
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
virtual int LocalFilePath(const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
Definition XrdPfc.cc:785
virtual int Stat(const char *url, struct stat &sbuff)
Definition XrdPfc.cc:1073
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:315
void Purge()
Thread function invoked to scan and purge files from disk when needed.
void ReleaseRAM(char *buf, long long size)
Definition XrdPfc.cc:393
virtual int ConsiderCached(const char *url)
Definition XrdPfc.cc:913
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:159
void ResourceMonitorHeartBeat()
Thread function checking resource usage periodically.
void DeRegisterPrefetchFile(File *)
Definition XrdPfc.cc:702
void ExecuteCommandUrl(const std::string &command_url)
void RegisterPrefetchFile(File *)
Definition XrdPfc.cc:686
void Prefetch()
Definition XrdPfc.cc:743
void ReleaseFile(File *, IO *)
Definition XrdPfc.cc:492
void AddWriteTask(Block *b, bool from_read)
Add downloaded block in write queue.
Definition XrdPfc.cc:252
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition XrdPfc.cc:184
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition XrdPfc.cc:163
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1132
static XrdScheduler * schedP
Definition XrdPfc.hh:404
bool IsFileActiveOrPurgeProtected(const std::string &)
Definition XrdPfc.cc:673
File * GetNextFileToPrefetch()
Definition XrdPfc.cc:724
void ProcessWriteTasks()
Separate task which writes blocks from ram to disk.
Definition XrdPfc.cc:304
virtual int Unlink(const char *url)
Definition XrdPfc.cc:1122
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition XrdPfc.cc:271
virtual XrdOucCacheIO * Attach(XrdOucCacheIO *, int Options=0)
Definition XrdPfc.cc:210
static const Cache & TheOne()
Definition XrdPfc.cc:160
char * RequestRAM(long long size)
Definition XrdPfc.cc:353
virtual int Prepare(const char *url, int oflags, mode_t mode)
Definition XrdPfc.cc:1021
static Cache & CreateInstance(XrdSysLogger *logger, XrdOucEnv *env)
Singleton creation.
Definition XrdPfc.cc:152
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
void WriteBlockToDisk(Block *b)
std::string & GetLocalPath()
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition XrdPfcFile.cc:99
int GetNBlocks() const
std::string GetRemoteLocations() const
size_t GetAccessCnt() const
void AddIO(IO *io)
int GetBlockSize() const
int GetNDownloadedBlocks() const
const Info::AStat * GetLastAccessStats() const
long long GetFileSize()
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
void initiate_emergency_shutdown()
int inc_ref_cnt()
const Stats & RefStats() const
void Sync()
Sync file cache inf o and output data with disk.
int dec_ref_cnt()
int get_ref_cnt()
void RemoveIO(IO *io)
Stats DeltaStatsFromLastCall()
bool is_in_emergency_shutdown()
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
bool HasFile() const
Check if File was opened successfully.
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:18
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:45
static const char * s_infoExtension
void WriteIOStatSingle(long long bytes_disk)
Write single open/close time for given bytes read from disk.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
long long GetNDownloadedBytes() const
Get number of downloaded bytes.
bool IsComplete() const
Get complete status.
long long GetFileSize() const
Get file size.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Statistics of cache utilisation by a File object.
int m_NCksumErrors
number of checksum errors while getting data from remote
void Schedule(XrdJob *jp)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
bool Insert(const char *data, int dlen)
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:56
long long m_RamAbsAvailable
available from configuration
Definition XrdPfc.hh:102
long long m_fileUsageMax
cache purge - files usage maximum
Definition XrdPfc.hh:90
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition XrdPfc.hh:88
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition XrdPfc.hh:79
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition XrdPfc.hh:87
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition XrdPfc.hh:106
void calculate_fractional_usages(long long du, long long fu, double &frac_du, double &frac_fu)
Definition XrdPfc.cc:131
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition XrdPfc.hh:86
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition XrdPfc.hh:103
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:101
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition XrdPfc.hh:104
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:81
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:116
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:115
Access statistics.
Definition XrdPfcInfo.hh:61
long long BytesHit
read from cache
Definition XrdPfcInfo.hh:68
long long BytesBypassed
read from remote and dropped
Definition XrdPfcInfo.hh:70
time_t DetachTime
close time
Definition XrdPfcInfo.hh:63
long long BytesMissed
read from remote and cached
Definition XrdPfcInfo.hh:69
time_t AttachTime
open time
Definition XrdPfcInfo.hh:62