XRootD
Loading...
Searching...
No Matches
XrdEc::Reader Class Reference

#include <XrdEcReader.hh>

+ Collaboration diagram for XrdEc::Reader:

Public Member Functions

 Reader (ObjCfg &objcfg)
 
virtual ~Reader ()
 
void Close (XrdCl::ResponseHandler *handler, uint16_t timeout=0)
 Close the data object.
 
uint64_t GetSize ()
 
void Open (XrdCl::ResponseHandler *handler, uint16_t timeout=0)
 
void Read (uint64_t offset, uint32_t length, void *buffer, XrdCl::ResponseHandler *handler, uint16_t timeout)
 
void VectorRead (const XrdCl::ChunkList &chunks, void *buffer, XrdCl::ResponseHandler *handler, uint16_t timeout)
 

Friends

class ::MicroTest
 
class ::XrdEcTests
 
struct block_t
 

Detailed Description

Definition at line 58 of file XrdEcReader.hh.

Constructor & Destructor Documentation

◆ Reader()

XrdEc::Reader::Reader ( ObjCfg & objcfg)
inline

Constructor

Parameters
objcfg: configuration for the data object (e.g. number of data and parity stripes)

Definition at line 71 of file XrdEcReader.hh.

71 : objcfg( objcfg ), lstblk( 0 ), filesize( 0 )
72 {
73 }

◆ ~Reader()

XrdEc::Reader::~Reader ( )
virtual

Definition at line 427 of file XrdEcReader.cc.

428 {
429 }

References ~Reader().

Referenced by ~Reader().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Member Function Documentation

◆ Close()

void XrdEc::Reader::Close ( XrdCl::ResponseHandler * handler,
uint16_t timeout = 0 )

Close the data object.

Definition at line 588 of file XrdEcReader.cc.

589 {
590 //---------------------------------------------------------------------
591 // prepare the pipelines ...
592 //---------------------------------------------------------------------
593 std::vector<XrdCl::Pipeline> closes;
594 closes.reserve( dataarchs.size() );
595 auto itr = dataarchs.begin();
596 for( ; itr != dataarchs.end() ; ++itr )
597 {
598 auto &zipptr = itr->second;
599 if( zipptr->IsOpen() )
600 {
601 zipptr->SetProperty( "BundledClose", "true");
602 closes.emplace_back( XrdCl::CloseArchive( *zipptr ) >>
603 [zipptr]( XrdCl::XRootDStatus& ){ } );
604 }
605 }
606
607 // if there is nothing to close just schedule the handler
608 if( closes.empty() ) ScheduleHandler( handler );
609 // otherwise close the archives
610 else XrdCl::Async( XrdCl::Parallel( closes ) >> handler, timeout );
611 }
CloseArchiveImpl< false > CloseArchive(Ctx< ZipArchive > zip, uint16_t timeout=0)
Factory for creating CloseFileImpl objects.
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
void ScheduleHandler(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler)

References XrdCl::Async(), Close(), XrdCl::CloseArchive(), XrdCl::Parallel(), and XrdEc::ScheduleHandler().

Referenced by Close().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetSize()

uint64_t XrdEc::Reader::GetSize ( )
inline
Returns
: get file size

Definition at line 121 of file XrdEcReader.hh.

122 {
123 return filesize;
124 }

Referenced by VectorRead().

+ Here is the caller graph for this function:

◆ Open()

void XrdEc::Reader::Open ( XrdCl::ResponseHandler * handler,
uint16_t timeout = 0 )

Open the erasure coded / striped object

Parameters
handler: user callback

Definition at line 434 of file XrdEcReader.cc.

435 {
436 const size_t size = objcfg.plgr.size();
437 std::vector<XrdCl::Pipeline> opens; opens.reserve( size );
438 for( size_t i = 0; i < size; ++i )
439 {
440 // generate the URL
441 std::string url = objcfg.GetDataUrl( i );
442 archiveIndices.emplace(url, i);
443 // create the file object
444 dataarchs.emplace( url, std::make_shared<XrdCl::ZipArchive>(
445 Config::Instance().enable_plugins ) );
446 // open the archive
447 if( objcfg.nomtfile )
448 {
449 opens.emplace_back( XrdCl::OpenArchive( *dataarchs[url], url, XrdCl::OpenFlags::Read ) );
450 }
451 else
452 opens.emplace_back( OpenOnly( *dataarchs[url], url, false ) );
453 }
454
455 auto pipehndl = [handler,this]( const XrdCl::XRootDStatus &st )
456 { // set the central directories in ZIP archives (if we use metadata files)
457 auto itr = dataarchs.begin();
458 for( ; itr != dataarchs.end() ; ++itr )
459 {
460 const std::string &url = itr->first;
461 auto &zipptr = itr->second;
462 if( zipptr->openstage == XrdCl::ZipArchive::NotParsed )
463 zipptr->SetCD( metadata[url] );
464 else if( zipptr->openstage != XrdCl::ZipArchive::Done && !metadata.empty() )
465 this->AddMissing( metadata[url] );
466 auto itr = zipptr->cdmap.begin();
467 for( ; itr != zipptr->cdmap.end() ; ++itr )
468 {
469 urlmap.emplace( itr->first, url );
470 size_t blknb = fntoblk( itr->first );
471 if( blknb > lstblk ) lstblk = blknb;
472 }
473 }
474 metadata.clear();
475 // call user handler
476 if( handler )
477 handler->HandleResponse( new XrdCl::XRootDStatus( st ), nullptr );
478 };
479 // in parallel open the data files and read the metadata
480 XrdCl::Pipeline p = objcfg.nomtfile
481 ? XrdCl::Parallel( opens ).AtLeast( objcfg.nbdata ) | ReadSize( 0 ) | XrdCl::Final( pipehndl )
482 : XrdCl::Parallel( ReadMetadata( 0 ),
483 XrdCl::Parallel( opens ).AtLeast( objcfg.nbdata ) ) >> pipehndl;
484 XrdCl::Async( std::move( p ), timeout );
485 }
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
static Config & Instance()
Singleton access.
FinalOperation Final
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
static size_t fntoblk(const std::string &fn)
OpenOnlyImpl< false > OpenOnly(XrdCl::Ctx< XrdCl::ZipArchive > zip, XrdCl::Arg< std::string > fn, XrdCl::Arg< bool > updt, uint16_t timeout=0)
@ Read
Open only for reading.

References XrdCl::Async(), XrdEc::fntoblk(), XrdCl::ResponseHandler::HandleResponse(), XrdEc::Config::Instance(), Open(), XrdCl::OpenArchive(), XrdEc::OpenOnly(), XrdCl::Parallel(), and XrdCl::OpenFlags::Read.

Referenced by Open().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Read()

void XrdEc::Reader::Read ( uint64_t offset,
uint32_t length,
void * buffer,
XrdCl::ResponseHandler * handler,
uint16_t timeout )

Read data from the data object

Parameters
offset: offset of the data to be read
length: length of the data to be read
buffer: buffer for the data to be read
handler: user callback

Definition at line 490 of file XrdEcReader.cc.

495 {
496 if( objcfg.nomtfile )
497 {
498 if( offset >= filesize )
499 length = 0;
500 else if( offset + length > filesize )
501 length = filesize - offset;
502 }
503
504 if( length == 0 )
505 {
506 ScheduleHandler( offset, 0, buffer, handler );
507 return;
508 }
509
510 char *usrbuff = reinterpret_cast<char*>( buffer );
511 typedef std::tuple<uint64_t, uint32_t,
512 void*, uint32_t,
513 XrdCl::ResponseHandler*,
514 XrdCl::XRootDStatus> rdctx_t;
515 auto rdctx = std::make_shared<rdctx_t>( offset, 0, buffer,
516 length, handler,
517 XrdCl::XRootDStatus() );
518 auto rdmtx = std::make_shared<std::mutex>();
519
520 while( length > 0 )
521 {
522 size_t blkid = offset / objcfg.datasize; //< ID of the block from which we will be reading
523 size_t strpid = ( offset % objcfg.datasize ) / objcfg.chunksize; //< ID of the stripe from which we will be reading
524 uint64_t rdoff = offset - blkid * objcfg.datasize - strpid * objcfg.chunksize; //< relative read offset within the stripe
525 uint32_t rdsize = objcfg.chunksize - rdoff; //< read size within the stripe
526 if( rdsize > length ) rdsize = length;
527 //-------------------------------------------------------------------
528 // Make sure we operate on a valid block
529 //-------------------------------------------------------------------
530 std::unique_lock<std::mutex> lck( blkmtx );
531 if( !block || block->blkid != blkid )
532 block = std::make_shared<block_t>( blkid, *this, objcfg );
533 //-------------------------------------------------------------------
534 // Prepare the callback for reading from single stripe
535 //-------------------------------------------------------------------
536 auto blk = block;
537 lck.unlock();
538 auto callback = [blk, rdctx, rdsize, rdmtx]( const XrdCl::XRootDStatus &st, uint32_t nbrd )
539 {
540 std::unique_lock<std::mutex> lck( *rdmtx );
541 //---------------------------------------------------------------------
542 // update number of bytes left to be read (bytes requested not actually
543 // read)
544 //---------------------------------------------------------------------
545 std::get<3>( *rdctx ) -= rdsize;
546 //---------------------------------------------------------------------
547 // Handle failure ...
548 //---------------------------------------------------------------------
549 if( !st.IsOK() )
550 std::get<5>( *rdctx ) = st; // the error
551 //---------------------------------------------------------------------
552 // Handle success ...
553 //---------------------------------------------------------------------
554 else
555 std::get<1>( *rdctx ) += nbrd; // number of bytes read
556 //---------------------------------------------------------------------
557 // Are we done?
558 //---------------------------------------------------------------------
559 if( std::get<3>( *rdctx ) == 0 )
560 {
561 //-------------------------------------------------------------------
562 // Check if the read operation was successful ...
563 //-------------------------------------------------------------------
564 XrdCl::XRootDStatus &status = std::get<5>( *rdctx );
565 if( !status.IsOK() )
566 ScheduleHandler( std::get<4>( *rdctx ), status );
567 else
568 ScheduleHandler( std::get<0>( *rdctx ), std::get<1>( *rdctx ),
569 std::get<2>( *rdctx ), std::get<4>( *rdctx ) );
570 }
571 };
572 //-------------------------------------------------------------------
573 // Read data from a stripe
574 //-------------------------------------------------------------------
575 block_t::read( blk, strpid, rdoff, rdsize, usrbuff, callback, timeout );
576 //-------------------------------------------------------------------
577 // Update absolute offset, read length, and user buffer
578 //-------------------------------------------------------------------
579 offset += rdsize;
580 length -= rdsize;
581 usrbuff += rdsize;
582 }
583 }
bool IsOK() const
We're fine.
static void read(std::shared_ptr< block_t > &self, size_t strpid, uint64_t offset, uint32_t size, char *usrbuff, callback_t usrcb, uint16_t timeout)

References XrdCl::Status::IsOK(), Read(), XrdEc::block_t::read(), and XrdEc::ScheduleHandler().

Referenced by Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ VectorRead()

void XrdEc::Reader::VectorRead ( const XrdCl::ChunkList & chunks,
void * buffer,
XrdCl::ResponseHandler * handler,
uint16_t timeout )

Definition at line 868 of file XrdEcReader.cc.

868 {
869 if(chunks.size() > 1024) {
870 if(handler) handler->HandleResponse(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidArgs, XrdCl::errInvalidArgs), nullptr);
871 return;
872 }
873
874 std::vector<XrdCl::ChunkList> hostLists;
875 for(size_t dataHosts = 0; dataHosts < objcfg.plgr.size(); dataHosts++){
876 hostLists.emplace_back(XrdCl::ChunkList());
877 }
878
879 auto log = XrdCl::DefaultEnv::GetLog();
880
881 //bool useGlobalBuffer = buffer != nullptr;
882 char* globalBuffer = (char*)buffer;
883
884 // host index, blkid, strpid
885 std::set<std::tuple<size_t, size_t, size_t>> requestedChunks;
886 // create block_ts for any requested block index
887 std::map<size_t, std::shared_ptr<block_t>> blockMap;
888
889 // go through the requested lists of chunks and assign them to fitting hosts
890 for(size_t index = 0; index < chunks.size(); index++){
891 uint32_t remainLength = chunks[index].length;
892 uint64_t currentOffset = chunks[index].offset;
893
894 while(remainLength > 0){
895 size_t blkid = currentOffset / objcfg.datasize; //< ID of the block from which we will be reading
896 size_t strpid = ( currentOffset % objcfg.datasize ) / objcfg.chunksize; //< ID of the stripe from which we will be reading
897 uint64_t rdoff = currentOffset - blkid * objcfg.datasize - strpid * objcfg.chunksize ; //< relative read offset within the stripe
898 //uint64_t offsetInFile = rdoff + blkid * objcfg.chunksize; // relative offset within the file
899 uint32_t rdsize = objcfg.chunksize - rdoff; //< read size within the stripe
900 if( rdsize > remainLength ) rdsize = remainLength;
901 if(currentOffset + rdsize >= filesize) {
902 rdsize = filesize - currentOffset;
903 remainLength = rdsize;
904 }
905
906
907 std::string fn = objcfg.GetFileName(blkid, strpid);
908
909 auto itr = urlmap.find( fn );
910 if( itr == urlmap.end() )
911 {
912 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: No mapping of file to host found.");
913 break;
914 }
915 // get the URL of the ZIP archive with the respective data
916 const std::string &url = itr->second;
917 auto itr2 = archiveIndices.find(url);
918 if(itr2 == archiveIndices.end())
919 {
920 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Couldn't find host for file.");
921 break;
922 }
923 size_t indexOfArchive = archiveIndices[url];
924
925 if (blockMap.find(blkid) == blockMap.end())
926 {
927 blockMap.emplace(blkid,
928 std::make_shared<block_t>(blkid, *this, objcfg));
929 }
930
931 blockMap[blkid]->state[strpid] = block_t::Loading;
932 XrdCl::StatInfo* info = nullptr;
933 if(dataarchs[url]->Stat(objcfg.GetFileName(blkid, strpid), info).IsOK())
934 blockMap[blkid]->stripes[strpid].resize( info ->GetSize() );
935
936 auto requestChunk = std::make_tuple(indexOfArchive, blkid, strpid);
937 if(requestedChunks.find(requestChunk) == requestedChunks.end())
938 {
939 uint64_t off = 0;
940 dataarchs[url]->GetOffset(objcfg.GetFileName(blkid, strpid), off);
941 hostLists[indexOfArchive].emplace_back(XrdCl::ChunkInfo(
942 off,
943 info ->GetSize(),
944 blockMap[blkid]->stripes[strpid].data()));
945
946 // fill list of requested chunks by block and stripe id
947 requestedChunks.emplace(requestChunk);
948
949 }
950 remainLength -= rdsize;
951 currentOffset += rdsize;
952
953 }
954 }
955
956 std::vector<XrdCl::Pipeline> hostPipes;
957 hostPipes.reserve(hostLists.size());
958 for(size_t i = 0; i < hostLists.size(); i++){
959 while(hostLists[i].size() > 0){
960 uint32_t range = hostLists[i].size() > 1024 ? 1024 : hostLists[i].size();
961 XrdCl::ChunkList partList(hostLists[i].begin(), hostLists[i].begin() + range);
962 hostLists[i].erase(hostLists[i].begin(), hostLists[i].begin() + range);
963 hostPipes.emplace_back(
964 XrdCl::VectorRead(XrdCl::Ctx<XrdCl::File>(dataarchs[objcfg.GetDataUrl(i)]->archive),
965 partList, nullptr, timeout)
966 >> [i, log, timeout, blockMap, requestedChunks, this](const XrdCl::XRootDStatus &st, XrdCl::VectorReadInfo ch) mutable
967 {
968 auto it = requestedChunks.begin();
969 while(it!=requestedChunks.end())
970 {
971 auto &args = *it;
972 size_t host = std::get<0>(args);
973 size_t blkid = std::get<1>(args);
974 size_t strpid = std::get<2>(args);
975 it++;
976 if(host == i)
977 {
978 std::shared_ptr<block_t> currentBlock = blockMap[blkid];
979
980
981 if(!st.IsOK())
982 {
983 log->Dump(XrdCl::XRootDMsg, "EC Vector Read of host %zu failed entirely.", i);
984 this->MissingVectorRead(currentBlock, blkid, strpid, timeout);
985 }
986 else{
987 uint32_t orgcksum = 0;
988 auto s = dataarchs[objcfg.GetDataUrl(i)]->GetCRC32( objcfg.GetFileName(blkid, strpid), orgcksum );
989 //---------------------------------------------------
990 // If we cannot extract the checksum assume the data
991 // are corrupted
992 //---------------------------------------------------
993 if( !st.IsOK() )
994 {
995 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Couldn't read CRC32 from CD.");
996 this->MissingVectorRead(currentBlock, blkid, strpid, timeout);
997 continue;
998 }
999 //---------------------------------------------------
1000 // Verify data integrity
1001 //---------------------------------------------------
1002 uint32_t cksum = objcfg.digest( 0, currentBlock->stripes[strpid].data(), currentBlock->stripes[strpid].size() );
1003 if( orgcksum != cksum )
1004 {
1005 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Wrong checksum for block %zu stripe %zu.", blkid, strpid);
1006 this->MissingVectorRead(currentBlock, blkid, strpid, timeout);
1007 continue;
1008 }
1009 else{
1010 currentBlock->state[strpid] = block_t::Valid;
1011 bool recoverable = currentBlock->error_correction( currentBlock );
1012 if(!recoverable)
1013 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Couldn't recover block %zu.", blkid);
1014 }
1015 }
1016 }
1017 }
1018 }
1019 );
1020 }
1021 }
1022
1023 auto finalPipehndl = [handler, log, blockMap, chunks, globalBuffer, this] (const XrdCl::XRootDStatus &st) mutable {
1024 // wait until all missing chunks are corrected (uses single reads to get parity stripes)
1025 std::unique_lock<std::mutex> lk(missingChunksMutex);
1026 waitMissing.wait(lk, [this] { return this->missingChunksVectorRead.size() == 0;});
1027
1028 bool failed = false;
1029 for(size_t index = 0; index < chunks.size(); index++){
1030 uint32_t remainLength = chunks[index].length;
1031 uint64_t currentOffset = chunks[index].offset;
1032
1033 char *localBuffer;
1034 if (globalBuffer)
1035 localBuffer = globalBuffer;
1036 else
1037 localBuffer = (char*)(chunks[index].buffer);
1038
1039 while(remainLength > 0){
1040 size_t blkid = currentOffset / objcfg.datasize; //< ID of the block from which we will be reading
1041 size_t strpid = ( currentOffset % objcfg.datasize ) / objcfg.chunksize; //< ID of the stripe from which we will be reading
1042 uint64_t rdoff = currentOffset - blkid * objcfg.datasize - strpid * objcfg.chunksize ; //< relative read offset within the stripe
1043 uint32_t rdsize = objcfg.chunksize - rdoff; //< read size within the stripe
1044 if( rdsize > remainLength ) rdsize = remainLength;
1045
1046 // put received data into given buffers
1047 if(blockMap.find(blkid) == blockMap.end() || blockMap[blkid] == nullptr){
1048 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Missing block %zu.", blkid);
1049 failed = true;
1050 break;
1051 }
1052 if(blockMap[blkid]->state[strpid] != block_t::Valid){
1053 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Invalid stripe in block %zu stripe %zu.", blkid, strpid);
1054 failed = true;
1055 break;
1056 }
1057
1058 memcpy(localBuffer, blockMap[blkid]->stripes[strpid].data() + rdoff, rdsize);
1059
1060 remainLength -= rdsize;
1061 currentOffset += rdsize;
1062 localBuffer += rdsize;
1063 }
1064 if(globalBuffer) globalBuffer = localBuffer;
1065 }
1066 if(handler){
1067 if(failed) log->Dump(XrdCl::XRootDMsg, "EC Vector Read failed (at least in part).");
1068 if(failed) handler->HandleResponse(new XrdCl::XRootDStatus(XrdCl::stError, "Couldn't read all segments"), nullptr);
1069 else handler->HandleResponse(new XrdCl::XRootDStatus(), nullptr);
1070 }
1071 };
1072
1073 XrdCl::Pipeline p = XrdCl::Parallel(hostPipes) |
1074 XrdCl::Final(finalPipehndl);
1075
1076 XrdCl::Async(std::move(p), timeout);
1077
1078 }
struct stat Stat
Definition XrdCks.cc:49
static Log * GetLog()
Get default log.
uint64_t GetSize()
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t XRootDMsg
const uint16_t errInvalidArgs
std::vector< ChunkInfo > ChunkList
List of chunks.
VectorReadImpl< false > VectorRead(Ctx< File > file, Arg< ChunkList > chunks, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating VectorReadImpl objects.

References XrdCl::errInvalidArgs, XrdCl::DefaultEnv::GetLog(), GetSize(), XrdCl::ResponseHandler::HandleResponse(), XrdEc::block_t::Loading, Stat, XrdCl::stError, XrdCl::VectorRead(), VectorRead(), and XrdCl::XRootDMsg.

Referenced by VectorRead().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Friends And Related Symbol Documentation

◆ ::MicroTest

friend class ::MicroTest
friend

Definition at line 60 of file XrdEcReader.hh.

◆ ::XrdEcTests

friend class ::XrdEcTests
friend

Definition at line 61 of file XrdEcReader.hh.

◆ block_t

friend struct block_t
friend

Definition at line 62 of file XrdEcReader.hh.

References block_t.

Referenced by block_t.


The documentation for this class was generated from the following files: