XRootD
Loading...
Searching...
No Matches
XrdXrootdAioTask.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d X r o o t d A i o T a s k . c c */
4/* */
5/* (c) 2021 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cerrno>
32#include <cstdio>
33#include <ctime>
34#include <limits.h>
35#include <sys/uio.h>
36#include "Xrd/XrdLink.hh"
37#include "Xrd/XrdScheduler.hh"
39#include "XrdSys/XrdSysError.hh"
40#include "XrdSys/XrdSysE2T.hh"
41#include "XrdSys/XrdSysTimer.hh"
47
48#define TRACELINK dataLink
49
50/******************************************************************************/
51/* G l o b a l S t a t i c s */
52/******************************************************************************/
53
55
56namespace XrdXrootd
57{
60}
61using namespace XrdXrootd;
62
63/******************************************************************************/
64/* S t a t i c M e m b e r s */
65/******************************************************************************/
66
67const char *XrdXrootdAioTask::TraceID = "AioTask";
68
69/******************************************************************************/
70/* C o m p l e t e d */
71/******************************************************************************/
72
74{
75// Lock this code path
76//
77 aioMutex.Lock();
78
79// If this request is not running and completed then take a shortcut.
80//
81 if (Status == Offline && isDone)
82 {aioP->Recycle();
83 inFlight--;
85 if (inFlight <= 0) Recycle(true);
86 return;
87 }
88
89// Add this element to the end of the queue
90//
91 aioP->next = 0;
92 if (!pendQ) pendQEnd = pendQ = aioP;
93 else {pendQEnd->next = aioP;
94 pendQEnd = aioP;
95 }
96
97// Check if the request is waiting for our buffer tell it now has one. Otherwise,
98// if the task is offline then it cannot be done (see above); so schedule it.
99//
100 if (Status != Running)
101 {if (Status == Waiting) aioReady.Signal();
102 else Sched->Schedule(this);
103 Status = Running;
104 }
105
107}
108
109/******************************************************************************/
110/* Protected: D r a i n */
111/******************************************************************************/
112
114{
115 XrdXrootdAioBuff *aioP;
116 int maxWait = 6; // Max seconds to wait for outstanding requests
117
118// Reap as many aio object as you can
119//
120 aioMutex.Lock();
121 while(inFlight > 0)
122 {while((aioP = pendQ))
123 {if (!(pendQ = aioP->next)) pendQEnd = 0;
124 aioMutex.UnLock(); // Open a window of opportunity
125 inFlight--;
126 aioP->Recycle();
127 aioMutex.Lock();
128 }
129 if (inFlight <= 0 || !Wait4Buff(maxWait)) break;
130 }
131
132// If there are still in flight requets, issue message and we will run the
133// drain in the background.
134//
135 if (inFlight > 0)
136 {char buff[128];
137 snprintf(buff, sizeof(buff),
138 "aio%c overdue %d inflight request%s for",
139 (aioState & aioRead ? 'R' : 'W'), int(inFlight),
140 (inFlight > 1 ? "s" : ""));
141 eLog.Emsg("AioTask", buff, dataLink->ID, dataFile->FileKey);
142 }
143
144// Indicate we are going offline and tell the caller if we need to stay
145// alive to drain the tardy requests in the background.
146//
147 Status = Offline;
148 isDone = true;
150 return inFlight <= 0;
151}
152
153/******************************************************************************/
154/* Private: g d D o n e */
155/******************************************************************************/
156
157int XrdXrootdAioTask::gdDone() // Only called for link to file transfers!
158{
159 XrdXrootdAioBuff *bP = pendWrite;
160 int rc;
161
162// Do some debugging
163//
164 TRACEP(DEBUG,"gdDone: "<<(void *)this<<" pendWrite "
165 <<(pendWrite != 0 ? "set":"not set"));
166
167// This is a callback indicating the pending aio object has all of the data.
168// Resume sending data to the destination.
169//
170 pendWrite = 0;
171 if (!bP) rc = CopyL2F();
172 else {if (CopyL2F(bP) && (inFlight || !isDone)) rc = CopyL2F();
173 else rc = 0;
174 }
175
176// Do some debugging
177//
178 TRACEP(DEBUG,"gdDone: "<<(void *)this<<" ending rc="<<rc);
179
180// If we are not pausing for data to be delivered. Drain any oustanding aio
181// requests and discard left over bytes, if any. Note we must copy the left
182// over length as we may recycle before discarding as discard must be last.
183//
184 if (rc <= 0)
186 int dlen = dataLen;
187 if (!inFlight) Recycle(true);
188 else Recycle(Drain());
189 if (!rc && dlen) return prot->getDump(Comment, dlen);
190 }
191 return rc;
192}
193
194/******************************************************************************/
195/* Private: g d F a i l */
196/******************************************************************************/
197
199{
200 char eBuff[512];
201
202// Do some tracing
203//
204 TRACEP(DEBUG,"gdFail: "<<(void *)this);
205
206// Format message for display
207//
208 snprintf(eBuff, sizeof(eBuff), "link error aborted %s for", Comment);
209 eLog.Emsg("AioTask", eBuff, dataLink->ID, dataFile->FileKey);
210
211// This is a callback indicating the link is dead. Terminate this operation.
212//
213 isDone = true;
214 aioState |= aioDead;
215 dataLen = 0;
216 if (pendWrite) {pendWrite->Recycle(); pendWrite = 0;}
217
218// If this is a read, cancel all queued read requests
219//
221
222// If we still have any requests in flight drain them.
223//
224 if (!inFlight) Recycle(true);
225 else Recycle(Drain());
226}
227
228/******************************************************************************/
229/* Protected: g e t B u f f */
230/******************************************************************************/
231
233{
234 XrdXrootdAioBuff* aioP;
235
236// Try to get the next buffer
237//
238 aioMutex.Lock();
239do{if ((aioP = pendQ))
240 {if (!(pendQ = aioP->next)) pendQEnd = 0;
242 inFlight--;
243 return aioP;
244 }
245
246// If the caller does not want to wait or if there is nothing in flight, return
247//
248 if (!wait || !inFlight)
249 {aioMutex.UnLock();
250 return 0;
251 }
252
253// So, wait for a buffer to arrive
254//
255 } while(Wait4Buff());
256
257// We timed out and this is considered an error
258//
260 SendError(ETIMEDOUT, (aioState & aioRead ? "aio file read timed out"
261 : "aio file write timed out"));
262 return 0;
263}
264
265/******************************************************************************/
266/* I D */
267/******************************************************************************/
268
269const char *XrdXrootdAioTask::ID() {return dataLink->ID;}
270
271/******************************************************************************/
272/* I n i t */
273/******************************************************************************/
274
276 XrdXrootdResponse &resp,
277 XrdXrootdFile *fP)
278{
279
280// Reset the object
281//
282 pendQEnd = pendQ = 0;
283 finalRead = 0; // Also sets pendWrite
284 Protocol = protP;
285 dataLink = resp.theLink();
286 Response = resp;
287 dataFile = fP;
288 aioState = 0;
289 inFlight = 0;
290 isDone = false;
291 Status = Running;
292}
293
294/******************************************************************************/
295/* Protected: S e n d E r r o r */
296/******************************************************************************/
297
298void XrdXrootdAioTask::SendError(int rc, const char *eText)
299{
300 char eBuff[1024];
301
302// If there is no error text, use the rc
303//
304 if (!eText) eText = (rc ? XrdSysE2T(rc) : "invalid error code");
305
306// For message for display
307//
308 snprintf(eBuff, sizeof(eBuff), "async %s failed for %s;",
309 (aioState & aioRead ? "read" : "write"), dataLink->ID);
310 eLog.Emsg("AioTask", eBuff, eText, dataFile->FileKey);
311
312// If this request is still active, send the error to the client
313//
314 if (!isDone)
316 if (Response.Send(eCode, eText))
317 {aioState |= aioDead;
318 dataLen = 0;
319 } else if (aioState & aioRead) dataLen = 0;
320 isDone = true;
321 }
322}
323
324/******************************************************************************/
325/* Protected: S e n d F S E r r o r */
326/******************************************************************************/
327
329{
330 XrdOucErrInfo &myError = dataFile->XrdSfsp->error;
331 int eCode;
332
333// We can only handle actual errors. Under some conditions a redirect (e.g.
334// Xcache) can return other error codes. We treat these as server errors.
335//
336 if (rc != SFS_ERROR)
337 {char eBuff[256];
338 snprintf(eBuff, sizeof(eBuff), "fs returned unexpected rc %d", rc);
339 SendError(EFAULT, eBuff);
340 if (myError.extData()) myError.Reset();
341 return;
342 }
343
344// Handle file system error but only if we are still alive
345//
346 if (!isDone)
347 {const char *eMsg = myError.getErrText(eCode);
348 eLog.Emsg("AioTask", dataLink->ID, eMsg, dataFile->FileKey);
349 int rc = XProtocol::mapError(eCode);
350 if (Response.Send((XErrorCode)rc, eMsg))
351 {aioState |= aioDead;
352 dataLen = 0;
353 } else if (aioState & aioRead) dataLen = 0;
354 isDone = true;
355 }
356
357// Clear error message and recycle aio object if need be
358//
359 if (myError.extData()) myError.Reset();
360}
361
362/******************************************************************************/
363/* Protected: V a l i d a t e */
364/******************************************************************************/
365
367{
368 ssize_t aioResult = aioP->Result;
369 off_t aioOffset = aioP->sfsAio.aio_offset;
370 int aioLength = aioP->sfsAio.aio_nbytes;
371
372// Step 1: Check if this request is already completed. This may be the case
373// if we had a previous error.
374//
375 if (isDone) return false;
376
377// Step 2: Check if an error occurred as this will terminate the request even
378// if we have not sent all of the data.
379//
380 if (aioP->Result < 0)
381 {SendError(-aioP->Result, 0);
382 return false;
383 }
384
385// Step 3: Check for a short read which signals that no more data past this
386// offset is forthcomming. Save it as we will send a final response
387// using this element. We discard zero length reads. It's an error if we
388// get more than one short read with data or if its offset is less than
389// the highest full read element.
390//
391 if (aioResult < aioLength)
392 {dataLen = 0;
393 if (!aioResult)
394 {if ((finalRead && aioOffset < finalRead->sfsAio.aio_offset)
395 || aioOffset < highOffset) SendError(EFAULT, "embedded null block");
396 return false;
397 } else {
398 if (aioOffset < highOffset)
399 {SendError(ENODEV, "embedded short block");
400 return false;
401 } else {
402 if (finalRead) SendError(ENODEV, "multiple short blocks");
403 else {finalRead = aioP;
404 highOffset = aioOffset;
405 }
406 }
407 }
408 return false;
409 }
410
411// Step 4: This is a full read and its offset must be lower than the offset of
412// any short read we have encountered.
413//
414 if (finalRead && aioOffset >= finalRead->sfsAio.aio_offset)
415 {SendError(ENODEV, "read offset past EOD");
416 return false;
417 }
418 if (aioOffset > highOffset) highOffset = aioOffset;
419 return true;
420}
421
422/******************************************************************************/
423/* Private: W a i t 4 B u f f */
424/******************************************************************************/
425
426// Called with with aioMutex locked and returns it locked.
427
428bool XrdXrootdAioTask::Wait4Buff(int maxWait)
429{
430 static const int msgInterval = 30;
431 time_t begWait;
432 int aioWait, msgWait = msgInterval, totWait = 0;
433
434// Return success if somehow we got a buffer
435//
436 if (pendQ) return true;
437
438// Make sure that something will actually arrive but issue a warning
439// message and sleep a bit to avoid a loop as there is clearly a logic error.
440//
441 if (!inFlight)
442 {eLog.Emsg("Wait4Buff", dataLink->ID, "has nothing inflight for",
445 return false;
446 }
447
448// Calculate wait time and when we should produce a message
449//
450 if (maxWait <= 0) maxWait = (XrdXrootdProtocol::as_timeout ?
452 aioWait = (maxWait > msgInterval ? msgInterval : maxWait);
453
454// Wait for a buffer to arrive.
455//
456 begWait = time(0);
457 while(totWait < maxWait)
458 {Status = Waiting;
459 aioReady.Wait(aioWait);
460 if (pendQ) break;
461 totWait = (time(0) - begWait); // Spurious wakeup
462 int tmpWait = maxWait - totWait;
463 if (tmpWait > 0 && tmpWait < aioWait) aioWait = tmpWait;
464 if (totWait >= msgWait)
465 {char buff[80];
466 int inF = inFlight;
467 msgWait += aioWait;
468 snprintf(buff, sizeof(buff), "%d tardy aio%c requests for",
469 inF, (aioState & aioRead ? 'R' : 'W'));
470 eLog.Emsg("Wait4Buff", dataLink->ID, buff, dataFile->FileKey);
471 }
472 }
473
474// If we are here either we actually have a buffer available or timed out.
475//
476 Status = Running;
477 return (pendQ != 0);
478}
XErrorCode
Definition XProtocol.hh:987
#define DEBUG(x)
#define eMsg(x)
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
#define SFS_ERROR
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:99
XrdSysTrace XrdXrootdTrace
#define TRACEP(act, x)
static int mapError(int rc)
const char * Comment
Definition XrdJob.hh:47
const char * getErrText()
void Reset()
Reset object to no message state. Call this method to release appendages.
void Schedule(XrdJob *jp)
ssize_t Result
Definition XrdSfsAio.hh:65
struct aiocb sfsAio
Definition XrdSfsAio.hh:62
XrdOucErrInfo & error
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Snooze(int seconds)
XrdXrootdAioBuff * next
virtual void Recycle() override
XrdSysCondVar2 aioReady
static const int Offline
int gdDone() override
static const int Waiting
virtual void Recycle(bool release)=0
virtual int CopyL2F()=0
XrdXrootdFile * dataFile
bool Validate(XrdXrootdAioBuff *aioP)
XrdXrootdAioBuff * pendQ
XrdXrootdAioBuff * getBuff(bool wait)
void SendError(int rc, const char *eText)
void Completed(XrdXrootdAioBuff *aioP)
XrdXrootdResponse Response
static const int Running
void Init(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP)
void SendFSError(int rc)
XrdXrootdAioBuff * pendQEnd
static const int aioRead
static const char * TraceID
void gdFail() override
RAtomic_uchar inFlight
static const int aioDead
XrdXrootdProtocol * Protocol
XrdSfsFile * XrdSfsp
XrdXrootdAioFob * aioFob
int getDump(const char *dtype, int dlen)
XrdScheduler * Sched
XrdSysError eLog