XRootD
Loading...
Searching...
No Matches
XrdClFileStateHandler.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26#include "XrdCl/XrdClURL.hh"
27#include "XrdCl/XrdClLog.hh"
28#include "XrdCl/XrdClStatus.hh"
35#include "XrdCl/XrdClMonitor.hh"
41#include "XrdCl/XrdClUtils.hh"
42
43#ifdef WITH_XRDEC
45#endif
46
47#include "XrdOuc/XrdOucCRC.hh"
49#include "XrdOuc/XrdOucUtils.hh"
50
54
55#include <sstream>
56#include <memory>
57#include <numeric>
58#include <sys/time.h>
59#include <uuid/uuid.h>
60#include <mutex>
61
62namespace
63{
64 //----------------------------------------------------------------------------
65 // Helper callback for handling PgRead responses
66 //----------------------------------------------------------------------------
67 class PgReadHandler : public XrdCl::ResponseHandler
68 {
69 friend class PgReadRetryHandler;
70
71 public:
72
73 //------------------------------------------------------------------------
74 // Constructor
75 //------------------------------------------------------------------------
76 PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
77 XrdCl::ResponseHandler *userHandler,
78 uint64_t orgOffset ) :
79 stateHandler( stateHandler ),
80 userHandler( userHandler ),
81 orgOffset( orgOffset ),
82 maincall( true ),
83 retrycnt( 0 ),
84 nbrepair( 0 )
85 {
86 }
87
88 //------------------------------------------------------------------------
89 // Handle the response
90 //------------------------------------------------------------------------
91 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
92 XrdCl::AnyObject *response,
93 XrdCl::HostList *hostList )
94 {
95 using namespace XrdCl;
96
97 std::unique_lock<std::mutex> lck( mtx );
98
99 if( !maincall )
100 {
101 //--------------------------------------------------------------------
102 // We are serving PgRead retry request
103 //--------------------------------------------------------------------
104 --retrycnt;
105 if( !status->IsOK() )
106 st.reset( status );
107 else
108 {
109 delete status; // by convention other args are null (see PgReadRetryHandler)
110 ++nbrepair; // update number of repaired pages
111 }
112
113 if( retrycnt == 0 )
114 {
115 //------------------------------------------------------------------
116 // All retries came back
117 //------------------------------------------------------------------
118 if( st->IsOK() )
119 {
120 PageInfo &pginf = XrdCl::To<PageInfo>( *resp );
121 pginf.SetNbRepair( nbrepair );
122 userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
123 }
124 else
125 userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
126 lck.unlock();
127 delete this;
128 }
129
130 return;
131 }
132
133 //----------------------------------------------------------------------
134 // We are serving main PgRead request
135 //----------------------------------------------------------------------
136 if( !status->IsOK() )
137 {
138 //--------------------------------------------------------------------
139 // The main PgRead request has failed
140 //--------------------------------------------------------------------
141 userHandler->HandleResponseWithHosts( status, response, hostList );
142 lck.unlock();
143 delete this;
144 return;
145 }
146
147 maincall = false;
148
149 //----------------------------------------------------------------------
150 // Do the integrity check
151 //----------------------------------------------------------------------
152 PageInfo *pginf = 0;
153 response->Get( pginf );
154
155 uint64_t pgoff = pginf->GetOffset();
156 uint32_t bytesRead = pginf->GetLength();
157 std::vector<uint32_t> &cksums = pginf->GetCksums();
158 char *buffer = reinterpret_cast<char*>( pginf->GetBuffer() );
159 size_t nbpages = XrdOucPgrwUtils::csNum( pgoff, bytesRead );
160 uint32_t pgsize = XrdSys::PageSize - pgoff % XrdSys::PageSize;
161 if( pgsize > bytesRead ) pgsize = bytesRead;
162
163 for( size_t pgnb = 0; pgnb < nbpages; ++pgnb )
164 {
165 uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
166 if( crcval != cksums[pgnb] )
167 {
168 Log *log = DefaultEnv::GetLog();
169 log->Info( FileMsg, "[%p@%s] Received corrupted page, will retry page #%zu.",
170 (void*)this, stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
171
172 XRootDStatus st = XrdCl::FileStateHandler::PgReadRetry( stateHandler, pgoff, pgsize, pgnb, buffer, this, 0 );
173 if( !st.IsOK())
174 {
175 *status = st; // the reason for this failure
176 break;
177 }
178 ++retrycnt; // update the retry counter
179 }
180
181 bytesRead -= pgsize;
182 buffer += pgsize;
183 pgoff += pgsize;
184 pgsize = XrdSys::PageSize;
185 if( pgsize > bytesRead ) pgsize = bytesRead;
186 }
187
188
189 if( retrycnt == 0 )
190 {
191 //--------------------------------------------------------------------
192 // All went well!
193 //--------------------------------------------------------------------
194 userHandler->HandleResponseWithHosts( status, response, hostList );
195 lck.unlock();
196 delete this;
197 return;
198 }
199
200 //----------------------------------------------------------------------
201 // We have to wait for retries!
202 //----------------------------------------------------------------------
203 resp.reset( response );
204 hosts.reset( hostList );
205 st.reset( status );
206 }
207
208 void UpdateCksum( size_t pgnb, uint32_t crcval )
209 {
210 if( resp )
211 {
212 XrdCl::PageInfo *pginf = 0;
213 resp->Get( pginf );
214 pginf->GetCksums()[pgnb] = crcval;
215 }
216 }
217
218 private:
219
220 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
221 XrdCl::ResponseHandler *userHandler;
222 uint64_t orgOffset;
223
224 std::unique_ptr<XrdCl::AnyObject> resp;
225 std::unique_ptr<XrdCl::HostList> hosts;
226 std::unique_ptr<XrdCl::XRootDStatus> st;
227
228 std::mutex mtx;
229 bool maincall;
230 size_t retrycnt;
231 size_t nbrepair;
232
233 };
234
235 //----------------------------------------------------------------------------
236 // Helper callback for handling PgRead retries
237 //----------------------------------------------------------------------------
238 class PgReadRetryHandler : public XrdCl::ResponseHandler
239 {
240 public:
241
242 PgReadRetryHandler( PgReadHandler *pgReadHandler, size_t pgnb ) : pgReadHandler( pgReadHandler ),
243 pgnb( pgnb )
244 {
245
246 }
247
248 //------------------------------------------------------------------------
249 // Handle the response
250 //------------------------------------------------------------------------
251 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
252 XrdCl::AnyObject *response,
253 XrdCl::HostList *hostList )
254 {
255 using namespace XrdCl;
256
257 if( !status->IsOK() )
258 {
259 Log *log = DefaultEnv::GetLog();
260 log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
261 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
262 pgReadHandler->HandleResponseWithHosts( status, response, hostList );
263 delete this;
264 return;
265 }
266
267 XrdCl::PageInfo *pginf = 0;
268 response->Get( pginf );
269 if( pginf->GetLength() > (uint32_t)XrdSys::PageSize || pginf->GetCksums().size() != 1 )
270 {
271 Log *log = DefaultEnv::GetLog();
272 log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
273 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
274 // we retry a page at a time so the length cannot exceed 4KB
275 DeleteArgs( status, response, hostList );
276 pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
277 delete this;
278 return;
279 }
280
281 uint32_t crcval = XrdOucCRC::Calc32C( pginf->GetBuffer(), pginf->GetLength() );
282 if( crcval != pginf->GetCksums().front() )
283 {
284 Log *log = DefaultEnv::GetLog();
285 log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
286 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
287 DeleteArgs( status, response, hostList );
288 pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
289 delete this;
290 return;
291 }
292
293 Log *log = DefaultEnv::GetLog();
294 log->Info( FileMsg, "[%p@%s] Successfully recovered page #%zu.",
295 (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
296
297 DeleteArgs( 0, response, hostList );
298 pgReadHandler->UpdateCksum( pgnb, crcval );
299 pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
300 delete this;
301 }
302
303 private:
304
305 inline void DeleteArgs( XrdCl::XRootDStatus *status,
306 XrdCl::AnyObject *response,
307 XrdCl::HostList *hostList )
308 {
309 delete status;
310 delete response;
311 delete hostList;
312 }
313
314 PgReadHandler *pgReadHandler;
315 size_t pgnb;
316 };
317
318 //----------------------------------------------------------------------------
319 // Handle PgRead substitution with ordinary Read
320 //----------------------------------------------------------------------------
321 class PgReadSubstitutionHandler : public XrdCl::ResponseHandler
322 {
323 public:
324
325 //------------------------------------------------------------------------
326 // Constructor
327 //------------------------------------------------------------------------
328 PgReadSubstitutionHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
329 XrdCl::ResponseHandler *userHandler ) :
330 stateHandler( stateHandler ),
331 userHandler( userHandler )
332 {
333 }
334
335 //------------------------------------------------------------------------
336 // Handle the response
337 //------------------------------------------------------------------------
338 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
339 XrdCl::AnyObject *rdresp,
340 XrdCl::HostList *hostList )
341 {
342 if( !status->IsOK() )
343 {
344 userHandler->HandleResponseWithHosts( status, rdresp, hostList );
345 delete this;
346 return;
347 }
348
349 using namespace XrdCl;
350
351 ChunkInfo *chunk = 0;
352 rdresp->Get( chunk );
353
354 std::vector<uint32_t> cksums;
355 if( stateHandler->pIsChannelEncrypted )
356 {
357 size_t nbpages = chunk->length / XrdSys::PageSize;
358 if( chunk->length % XrdSys::PageSize )
359 ++nbpages;
360 cksums.reserve( nbpages );
361
362 size_t size = chunk->length;
363 char *buffer = reinterpret_cast<char*>( chunk->buffer );
364
365 for( size_t pg = 0; pg < nbpages; ++pg )
366 {
367 size_t pgsize = XrdSys::PageSize;
368 if( pgsize > size ) pgsize = size;
369 uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
370 cksums.push_back( crcval );
371 buffer += pgsize;
372 size -= pgsize;
373 }
374 }
375
376 PageInfo *pages = new PageInfo( chunk->offset, chunk->length,
377 chunk->buffer, std::move( cksums ) );
378 delete rdresp;
379 AnyObject *response = new AnyObject();
380 response->Set( pages );
381 userHandler->HandleResponseWithHosts( status, response, hostList );
382
383 delete this;
384 }
385
386 private:
387
388 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
389 XrdCl::ResponseHandler *userHandler;
390 };
391
392 //----------------------------------------------------------------------------
393 // Object that does things to the FileStateHandler when kXR_open returns
394 // and then calls the user handler
395 //----------------------------------------------------------------------------
396 class OpenHandler: public XrdCl::ResponseHandler
397 {
398 public:
399 //------------------------------------------------------------------------
400 // Constructor
401 //------------------------------------------------------------------------
402 OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
403 XrdCl::ResponseHandler *userHandler ):
404 pStateHandler( stateHandler ),
405 pUserHandler( userHandler )
406 {
407 }
408
409 //------------------------------------------------------------------------
410 // Handle the response
411 //------------------------------------------------------------------------
412 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
413 XrdCl::AnyObject *response,
414 XrdCl::HostList *hostList )
415 {
416 using namespace XrdCl;
417
418 //----------------------------------------------------------------------
419 // Extract the statistics info
420 //----------------------------------------------------------------------
421 OpenInfo *openInfo = 0;
422 if( status->IsOK() )
423 response->Get( openInfo );
424#ifdef WITH_XRDEC
425 else
426 //--------------------------------------------------------------------
427 // Handle EC redirect
428 //--------------------------------------------------------------------
429 if( status->code == errRedirect )
430 {
431 std::string ecurl = status->GetErrorMessage();
432 EcHandler *ecHandler = GetEcHandler( hostList->front().url, ecurl );
433 if( ecHandler )
434 {
435 pStateHandler->pPlugin = ecHandler; // set the plugin for the File object
436 ecHandler->Open( pStateHandler->pOpenFlags, pUserHandler, 0/*TODO figure out right value for the timeout*/ );
437 return;
438 }
439 }
440#endif
441 //----------------------------------------------------------------------
442 // Notify the state handler and the client and say bye bye
443 //----------------------------------------------------------------------
444 pStateHandler->OnOpen( status, openInfo, hostList );
445 delete response;
446 if( pUserHandler )
447 pUserHandler->HandleResponseWithHosts( status, 0, hostList );
448 else
449 {
450 delete status;
451 delete hostList;
452 }
453 delete this;
454 }
455
456 private:
457 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
458 XrdCl::ResponseHandler *pUserHandler;
459 };
460
461 //----------------------------------------------------------------------------
462 // Object that does things to the FileStateHandler when kXR_close returns
463 // and then calls the user handler
464 //----------------------------------------------------------------------------
465 class CloseHandler: public XrdCl::ResponseHandler
466 {
467 public:
468 //------------------------------------------------------------------------
469 // Constructor
470 //------------------------------------------------------------------------
471 CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
472 XrdCl::ResponseHandler *userHandler,
473 XrdCl::Message *message ):
474 pStateHandler( stateHandler ),
475 pUserHandler( userHandler ),
476 pMessage( message )
477 {
478 }
479
480 //------------------------------------------------------------------------
482 //------------------------------------------------------------------------
483 virtual ~CloseHandler()
484 {
485 delete pMessage;
486 }
487
488 //------------------------------------------------------------------------
489 // Handle the response
490 //------------------------------------------------------------------------
491 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
492 XrdCl::AnyObject *response,
493 XrdCl::HostList *hostList )
494 {
495 pStateHandler->OnClose( status );
496 if( pUserHandler )
497 pUserHandler->HandleResponseWithHosts( status, response, hostList );
498 else
499 {
500 delete response;
501 delete status;
502 delete hostList;
503 }
504
505 delete this;
506 }
507
508 private:
509 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
510 XrdCl::ResponseHandler *pUserHandler;
511 XrdCl::Message *pMessage;
512 };
513
514 //----------------------------------------------------------------------------
515 // Stateful message handler
516 //----------------------------------------------------------------------------
517 class StatefulHandler: public XrdCl::ResponseHandler
518 {
519 public:
520 //------------------------------------------------------------------------
521 // Constructor
522 //------------------------------------------------------------------------
523 StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
524 XrdCl::ResponseHandler *userHandler,
525 XrdCl::Message *message,
526 const XrdCl::MessageSendParams &sendParams ):
527 pStateHandler( stateHandler ),
528 pUserHandler( userHandler ),
529 pMessage( message ),
530 pSendParams( sendParams )
531 {
532 }
533
534 //------------------------------------------------------------------------
535 // Destructor
536 //------------------------------------------------------------------------
537 virtual ~StatefulHandler()
538 {
539 delete pMessage;
540 delete pSendParams.chunkList;
541 delete pSendParams.kbuff;
542 }
543
544 //------------------------------------------------------------------------
545 // Handle the response
546 //------------------------------------------------------------------------
547 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
548 XrdCl::AnyObject *response,
549 XrdCl::HostList *hostList )
550 {
551 using namespace XrdCl;
552 std::unique_ptr<AnyObject> responsePtr( response );
553 pSendParams.hostList = hostList;
554
555 //----------------------------------------------------------------------
556 // Houston we have a problem...
557 //----------------------------------------------------------------------
558 if( !status->IsOK() )
559 {
560 XrdCl::FileStateHandler::OnStateError( pStateHandler, status, pMessage, this, pSendParams );
561 return;
562 }
563
564 //----------------------------------------------------------------------
565 // We're clear
566 //----------------------------------------------------------------------
567 responsePtr.release();
568 XrdCl::FileStateHandler::OnStateResponse( pStateHandler, status, pMessage, response, hostList );
569 if( pUserHandler )
570 pUserHandler->HandleResponseWithHosts( status, response, hostList );
571 else
572 {
573 delete status,
574 delete response;
575 delete hostList;
576 }
577 delete this;
578 }
579
580 //------------------------------------------------------------------------
582 //------------------------------------------------------------------------
583 XrdCl::ResponseHandler *GetUserHandler()
584 {
585 return pUserHandler;
586 }
587
588 private:
589 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
590 XrdCl::ResponseHandler *pUserHandler;
591 XrdCl::Message *pMessage;
592 XrdCl::MessageSendParams pSendParams;
593 };
594
595 //----------------------------------------------------------------------------
596 // Release-buffer Handler
597 //----------------------------------------------------------------------------
598 class ReleaseBufferHandler: public XrdCl::ResponseHandler
599 {
600 public:
601
602 //------------------------------------------------------------------------
603 // Constructor
604 //------------------------------------------------------------------------
605 ReleaseBufferHandler( XrdCl::Buffer &&buffer, XrdCl::ResponseHandler *handler ) :
606 buffer( std::move( buffer ) ),
607 handler( handler )
608 {
609 }
610
611 //------------------------------------------------------------------------
612 // Handle the response
613 //------------------------------------------------------------------------
614 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
615 XrdCl::AnyObject *response,
616 XrdCl::HostList *hostList )
617 {
618 if (handler)
619 handler->HandleResponseWithHosts( status, response, hostList );
620 }
621
622 //------------------------------------------------------------------------
623 // Get the underlying buffer
624 //------------------------------------------------------------------------
625 XrdCl::Buffer& GetBuffer()
626 {
627 return buffer;
628 }
629
630 private:
631 XrdCl::Buffer buffer;
632 XrdCl::ResponseHandler *handler;
633 };
634}
635
636namespace XrdCl
637{
638 //----------------------------------------------------------------------------
639 // Constructor
640 //----------------------------------------------------------------------------
642 pFileState( Closed ),
643 pStatInfo( 0 ),
644 pFileUrl( 0 ),
645 pDataServer( 0 ),
646 pLoadBalancer( 0 ),
647 pStateRedirect( 0 ),
648 pWrtRecoveryRedir( 0 ),
649 pFileHandle( 0 ),
650 pOpenMode( 0 ),
651 pOpenFlags( 0 ),
652 pSessionId( 0 ),
653 pDoRecoverRead( true ),
654 pDoRecoverWrite( true ),
655 pFollowRedirects( true ),
656 pUseVirtRedirector( true ),
657 pIsChannelEncrypted( false ),
658 pAllowBundledClose( false ),
659 pPlugin( plugin )
660 {
661 pFileHandle = new uint8_t[4];
662 ResetMonitoringVars();
665 pLFileHandler = new LocalFileHandler();
666 }
667
668 //------------------------------------------------------------------------
673 //------------------------------------------------------------------------
674 FileStateHandler::FileStateHandler( bool useVirtRedirector, FilePlugIn *& plugin ):
675 pFileState( Closed ),
676 pStatInfo( 0 ),
677 pFileUrl( 0 ),
678 pDataServer( 0 ),
679 pLoadBalancer( 0 ),
680 pStateRedirect( 0 ),
681 pWrtRecoveryRedir( 0 ),
682 pFileHandle( 0 ),
683 pOpenMode( 0 ),
684 pOpenFlags( 0 ),
685 pSessionId( 0 ),
686 pDoRecoverRead( true ),
687 pDoRecoverWrite( true ),
688 pFollowRedirects( true ),
689 pUseVirtRedirector( useVirtRedirector ),
690 pAllowBundledClose( false ),
691 pPlugin( plugin )
692 {
693 pFileHandle = new uint8_t[4];
694 ResetMonitoringVars();
697 pLFileHandler = new LocalFileHandler();
698 }
699
700 //----------------------------------------------------------------------------
701 // Destructor
702 //----------------------------------------------------------------------------
704 {
705 //--------------------------------------------------------------------------
706 // This, in principle, should never ever happen. Except for the case
707 // when we're interfaced with ROOT that may call this desctructor from
708 // its garbage collector, from its __cxa_finalize, ie. after the XrdCl lib
709 // has been finalized by the linker. So, if we don't have the log object
710 // at this point we just give up the hope.
711 //--------------------------------------------------------------------------
712 if( DefaultEnv::GetLog() && pSessionId && !pDataServer->IsLocalFile() ) // if the file object was bound to a physical connection
713 DefaultEnv::GetPostMaster()->DecFileInstCnt( *pDataServer );
714
717
720
721 if( pFileState != Closed && DefaultEnv::GetLog() )
722 {
723 XRootDStatus st;
724 MonitorClose( &st );
725 ResetMonitoringVars();
726 }
727
728 // check if the logger is still there, this is only for root, as root might
729 // have unload us already so in this case we don't want to do anything
730 if( DefaultEnv::GetLog() && pUseVirtRedirector && pFileUrl && pFileUrl->IsMetalink() )
731 {
733 registry.Release( *pFileUrl );
734 }
735
736 delete pStatInfo;
737 delete pFileUrl;
738 delete pDataServer;
739 delete pLoadBalancer;
740 delete [] pFileHandle;
741 delete pLFileHandler;
742 }
743
744 //----------------------------------------------------------------------------
745 // Open the file pointed to by the given URL
746 //----------------------------------------------------------------------------
747 XRootDStatus FileStateHandler::Open( std::shared_ptr<FileStateHandler> &self,
748 const std::string &url,
749 uint16_t flags,
750 uint16_t mode,
751 ResponseHandler *handler,
752 uint16_t timeout )
753 {
754 XrdSysMutexHelper scopedLock( self->pMutex );
755
756 //--------------------------------------------------------------------------
757 // Check if we can proceed
758 //--------------------------------------------------------------------------
759 if( self->pFileState == Error )
760 return self->pStatus;
761
762 if( self->pFileState == OpenInProgress )
764
765 if( self->pFileState == CloseInProgress || self->pFileState == Opened ||
766 self->pFileState == Recovering )
768
769 self->pFileState = OpenInProgress;
770
771 //--------------------------------------------------------------------------
772 // Check if the parameters are valid
773 //--------------------------------------------------------------------------
774 Log *log = DefaultEnv::GetLog();
775
776 if( self->pFileUrl )
777 {
778 if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
779 {
781 registry.Release( *self->pFileUrl );
782 }
783 delete self->pFileUrl;
784 self->pFileUrl = 0;
785 }
786
787 self->pFileUrl = new URL( url );
788
789 //--------------------------------------------------------------------------
790 // Add unique uuid to each open request so replays due to error/timeout
791 // recovery can be correctly handled.
792 //--------------------------------------------------------------------------
793 URL::ParamsMap cgi = self->pFileUrl->GetParams();
794 uuid_t uuid;
795 char requuid[37]= {0};
796 uuid_generate( uuid );
797 uuid_unparse( uuid, requuid );
798 cgi["xrdcl.requuid"] = requuid;
799 self->pFileUrl->SetParams( cgi );
800
801 if( !self->pFileUrl->IsValid() )
802 {
803 log->Error( FileMsg, "[%p@%s] Trying to open invalid url: %s",
804 (void*)self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
805 self->pStatus = XRootDStatus( stError, errInvalidArgs );
806 self->pFileState = Closed;
807 return self->pStatus;
808 }
809
810 //--------------------------------------------------------------------------
811 // Check if the recovery procedures should be enabled
812 //--------------------------------------------------------------------------
813 const URL::ParamsMap &urlParams = self->pFileUrl->GetParams();
814 URL::ParamsMap::const_iterator it;
815 it = urlParams.find( "xrdcl.recover-reads" );
816 if( (it != urlParams.end() && it->second == "false") ||
817 !self->pDoRecoverRead )
818 {
819 self->pDoRecoverRead = false;
820 log->Debug( FileMsg, "[%p@%s] Read recovery procedures are disabled",
821 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
822 }
823
824 it = urlParams.find( "xrdcl.recover-writes" );
825 if( (it != urlParams.end() && it->second == "false") ||
826 !self->pDoRecoverWrite )
827 {
828 self->pDoRecoverWrite = false;
829 log->Debug( FileMsg, "[%p@%s] Write recovery procedures are disabled",
830 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
831 }
832
833 //--------------------------------------------------------------------------
834 // Open the file
835 //--------------------------------------------------------------------------
836 log->Debug( FileMsg, "[%p@%s] Sending an open command", (void*)self.get(),
837 self->pFileUrl->GetObfuscatedURL().c_str() );
838
839 self->pOpenMode = mode;
840 self->pOpenFlags = flags;
841 OpenHandler *openHandler = new OpenHandler( self, handler );
842
843 Message *msg;
845 std::string path = self->pFileUrl->GetPathWithFilteredParams();
846 MessageUtils::CreateRequest( msg, req, path.length() );
847
848 req->requestid = kXR_open;
849 req->mode = mode;
850 req->options = flags | kXR_async | kXR_retstat;
851 req->dlen = path.length();
852 msg->Append( path.c_str(), path.length(), 24 );
853
855 MessageSendParams params; params.timeout = timeout;
856 params.followRedirects = self->pFollowRedirects;
858
859 XRootDStatus st = self->IssueRequest( *self->pFileUrl, msg, openHandler, params );
860
861 if( !st.IsOK() )
862 {
863 delete openHandler;
864 self->pStatus = st;
865 self->pFileState = Closed;
866 return st;
867 }
868 return st;
869 }
870
871 //----------------------------------------------------------------------------
872 // Close the file object
873 //----------------------------------------------------------------------------
874 XRootDStatus FileStateHandler::Close( std::shared_ptr<FileStateHandler> &self,
875 ResponseHandler *handler,
876 uint16_t timeout )
877 {
878 XrdSysMutexHelper scopedLock( self->pMutex );
879
880 //--------------------------------------------------------------------------
881 // Check if we can proceed
882 //--------------------------------------------------------------------------
883 if( self->pFileState == Error )
884 return self->pStatus;
885
886 if( self->pFileState == CloseInProgress )
888
889 if( self->pFileState == Closed )
891
892 if( self->pFileState == OpenInProgress || self->pFileState == Recovering )
894
895 if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
897
898 self->pFileState = CloseInProgress;
899
900 Log *log = DefaultEnv::GetLog();
901 log->Debug( FileMsg, "[%p@%s] Sending a close command for handle %#x to %s",
902 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
903 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
904
905 //--------------------------------------------------------------------------
906 // Close the file
907 //--------------------------------------------------------------------------
908 Message *msg;
910 MessageUtils::CreateRequest( msg, req );
911
912 req->requestid = kXR_close;
913 memcpy( req->fhandle, self->pFileHandle, 4 );
914
916 msg->SetSessionId( self->pSessionId );
917 CloseHandler *closeHandler = new CloseHandler( self, handler, msg );
918 MessageSendParams params;
919 params.timeout = timeout;
920 params.followRedirects = false;
921 params.stateful = true;
923
924 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
925
926 if( !st.IsOK() )
927 {
928 // an invalid-session error means the connection to the server has been
929 // closed, which in turn means that the server closed the file already
932 st.code == errPollerError || st.code == errSocketError )
933 {
934 self->pFileState = Closed;
935 ResponseJob *job = new ResponseJob( closeHandler, new XRootDStatus(),
936 nullptr, nullptr );
938 return XRootDStatus();
939 }
940
941 delete closeHandler;
942 self->pStatus = st;
943 self->pFileState = Error;
944 return st;
945 }
946 return st;
947 }
948
949 //----------------------------------------------------------------------------
950 // Stat the file
951 //----------------------------------------------------------------------------
952 XRootDStatus FileStateHandler::Stat( std::shared_ptr<FileStateHandler> &self,
953 bool force,
954 ResponseHandler *handler,
955 uint16_t timeout )
956 {
957 XrdSysMutexHelper scopedLock( self->pMutex );
958
959 if( self->pFileState == Error ) return self->pStatus;
960
961 if( self->pFileState != Opened && self->pFileState != Recovering )
963
964 //--------------------------------------------------------------------------
965 // Return the cached info
966 //--------------------------------------------------------------------------
967 if( !force )
968 {
969 AnyObject *obj = new AnyObject();
970 obj->Set( new StatInfo( *self->pStatInfo ) );
971 if (handler)
972 handler->HandleResponseWithHosts( new XRootDStatus(), obj, new HostList() );
973 return XRootDStatus();
974 }
975
976 Log *log = DefaultEnv::GetLog();
977 log->Debug( FileMsg, "[%p@%s] Sending a stat command for handle %#x to %s",
978 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
979 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
980
981 //--------------------------------------------------------------------------
982 // Issue a new stat request
983 // stating a file handle doesn't work (fixed in 3.2.0) so we need to
984 // stat the pat
985 //--------------------------------------------------------------------------
986 Message *msg;
988 std::string path = self->pFileUrl->GetPath();
989 MessageUtils::CreateRequest( msg, req );
990
991 req->requestid = kXR_stat;
992 memcpy( req->fhandle, self->pFileHandle, 4 );
993
994 MessageSendParams params;
995 params.timeout = timeout;
996 params.followRedirects = false;
997 params.stateful = true;
999
1001 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1002
1003 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1004 }
1005
1006 //----------------------------------------------------------------------------
1007 // Read a data chunk at a given offset - sync
1008 //----------------------------------------------------------------------------
1009 XRootDStatus FileStateHandler::Read( std::shared_ptr<FileStateHandler> &self,
1010 uint64_t offset,
1011 uint32_t size,
1012 void *buffer,
1013 ResponseHandler *handler,
1014 uint16_t timeout )
1015 {
1016 XrdSysMutexHelper scopedLock( self->pMutex );
1017
1018 if( self->pFileState == Error ) return self->pStatus;
1019
1020 if( self->pFileState != Opened && self->pFileState != Recovering )
1022
1023 Log *log = DefaultEnv::GetLog();
1024 log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1025 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1026 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1027
1028 Message *msg;
1029 ClientReadRequest *req;
1030 MessageUtils::CreateRequest( msg, req );
1031
1032 req->requestid = kXR_read;
1033 req->offset = offset;
1034 req->rlen = size;
1035 memcpy( req->fhandle, self->pFileHandle, 4 );
1036
1037 ChunkList *list = new ChunkList();
1038 list->push_back( ChunkInfo( offset, size, buffer ) );
1039
1041 MessageSendParams params;
1042 params.timeout = timeout;
1043 params.followRedirects = false;
1044 params.stateful = true;
1045 params.chunkList = list;
1047 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1048
1049 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1050 }
1051
1052 //------------------------------------------------------------------------
1053 // Read data pages at a given offset
1054 //------------------------------------------------------------------------
1055 XRootDStatus FileStateHandler::PgRead( std::shared_ptr<FileStateHandler> &self,
1056 uint64_t offset,
1057 uint32_t size,
1058 void *buffer,
1059 ResponseHandler *handler,
1060 uint16_t timeout )
1061 {
1062 int issupported = true;
1063 AnyObject obj;
1065 int protver = 0;
1066 XRootDStatus st2 = Utils::GetProtocolVersion( *self->pDataServer, protver );
1067 if( st1.IsOK() && st2.IsOK() )
1068 {
1069 int *ptr = 0;
1070 obj.Get( ptr );
1071 issupported = ( ptr && (*ptr & kXR_suppgrw) ) && ( protver >= kXR_PROTPGRWVERSION );
1072 delete ptr;
1073 }
1074 else
1075 issupported = false;
1076
1077 if( !issupported )
1078 {
1079 DefaultEnv::GetLog()->Debug( FileMsg, "[%p@%s] PgRead not supported; substituting with Read.",
1080 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
1081 ResponseHandler *substitHandler = new PgReadSubstitutionHandler( self, handler );
1082 auto st = Read( self, offset, size, buffer, substitHandler, timeout );
1083 if( !st.IsOK() ) delete substitHandler;
1084 return st;
1085 }
1086
1087 ResponseHandler* pgHandler = new PgReadHandler( self, handler, offset );
1088 auto st = PgReadImpl( self, offset, size, buffer, PgReadFlags::None, pgHandler, timeout );
1089 if( !st.IsOK() ) delete pgHandler;
1090 return st;
1091 }
1092
1093 XRootDStatus FileStateHandler::PgReadRetry( std::shared_ptr<FileStateHandler> &self,
1094 uint64_t offset,
1095 uint32_t size,
1096 size_t pgnb,
1097 void *buffer,
1098 PgReadHandler *handler,
1099 uint16_t timeout )
1100 {
1101 if( size > (uint32_t)XrdSys::PageSize )
1102 return XRootDStatus( stError, errInvalidArgs, EINVAL,
1103 "PgRead retry size exceeded 4KB." );
1104
1105 ResponseHandler *retryHandler = new PgReadRetryHandler( handler, pgnb );
1106 XRootDStatus st = PgReadImpl( self, offset, size, buffer, PgReadFlags::Retry, retryHandler, timeout );
1107 if( !st.IsOK() ) delete retryHandler;
1108 return st;
1109 }
1110
1111 XRootDStatus FileStateHandler::PgReadImpl( std::shared_ptr<FileStateHandler> &self,
1112 uint64_t offset,
1113 uint32_t size,
1114 void *buffer,
1115 uint16_t flags,
1116 ResponseHandler *handler,
1117 uint16_t timeout )
1118 {
1119 XrdSysMutexHelper scopedLock( self->pMutex );
1120
1121 if( self->pFileState == Error ) return self->pStatus;
1122
1123 if( self->pFileState != Opened && self->pFileState != Recovering )
1125
1126 Log *log = DefaultEnv::GetLog();
1127 log->Debug( FileMsg, "[%p@%s] Sending a pgread command for handle %#x to %s",
1128 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1129 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1130
1131 Message *msg;
1133 MessageUtils::CreateRequest( msg, req, sizeof( ClientPgReadReqArgs ) );
1134
1135 req->requestid = kXR_pgread;
1136 req->offset = offset;
1137 req->rlen = size;
1138 memcpy( req->fhandle, self->pFileHandle, 4 );
1139
1140 //--------------------------------------------------------------------------
1141 // Now adjust the message size so it can hold PgRead arguments
1142 //--------------------------------------------------------------------------
1143 req->dlen = sizeof( ClientPgReadReqArgs );
1144 void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
1145 memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
1146 ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
1147 msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
1148 args->reqflags = flags;
1149
1150 ChunkList *list = new ChunkList();
1151 list->push_back( ChunkInfo( offset, size, buffer ) );
1152
1154 MessageSendParams params;
1155 params.timeout = timeout;
1156 params.followRedirects = false;
1157 params.stateful = true;
1158 params.chunkList = list;
1160 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1161
1162 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1163 }
1164
1165 //----------------------------------------------------------------------------
1166 // Write a data chunk at a given offset - async
1167 //----------------------------------------------------------------------------
1168 XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1169 uint64_t offset,
1170 uint32_t size,
1171 const void *buffer,
1172 ResponseHandler *handler,
1173 uint16_t timeout )
1174 {
1175 XrdSysMutexHelper scopedLock( self->pMutex );
1176
1177 if( self->pFileState == Error ) return self->pStatus;
1178
1179 if( self->pFileState != Opened && self->pFileState != Recovering )
1181
1182 Log *log = DefaultEnv::GetLog();
1183 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1184 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1185 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1186
1187 Message *msg;
1188 ClientWriteRequest *req;
1189 MessageUtils::CreateRequest( msg, req );
1190
1191 req->requestid = kXR_write;
1192 req->offset = offset;
1193 req->dlen = size;
1194 memcpy( req->fhandle, self->pFileHandle, 4 );
1195
1196 ChunkList *list = new ChunkList();
1197 list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
1198
1199 MessageSendParams params;
1200 params.timeout = timeout;
1201 params.followRedirects = false;
1202 params.stateful = true;
1203 params.chunkList = list;
1204
1206
1208 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1209
1210 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1211 }
1212
1213 //----------------------------------------------------------------------------
1214 // Write a data chunk at a given offset
1215 //----------------------------------------------------------------------------
1216 XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1217 uint64_t offset,
1218 Buffer &&buffer,
1219 ResponseHandler *handler,
1220 uint16_t timeout )
1221 {
1222 //--------------------------------------------------------------------------
1223 // If the memory is not page (4KB) aligned we cannot use the kernel buffer
1224 // so fall back to normal write
1225 //--------------------------------------------------------------------------
1226 if( !XrdSys::KernelBuffer::IsPageAligned( buffer.GetBuffer() ) || self->pIsChannelEncrypted )
1227 {
1228 Log *log = DefaultEnv::GetLog();
1229 log->Info( FileMsg, "[%p@%s] Buffer for handle %#x is not page aligned (4KB), "
1230 "cannot convert it to kernel space buffer.", (void*)self.get(),
1231 self->pFileUrl->GetObfuscatedURL().c_str(), *((uint32_t*)self->pFileHandle) );
1232
1233 void *buff = buffer.GetBuffer();
1234 uint32_t size = buffer.GetSize();
1235 ReleaseBufferHandler *wrtHandler =
1236 new ReleaseBufferHandler( std::move( buffer ), handler );
1237 XRootDStatus st = self->Write( self, offset, size, buff, wrtHandler, timeout );
1238 if( !st.IsOK() )
1239 {
1240 buffer = std::move( wrtHandler->GetBuffer() );
1241 delete wrtHandler;
1242 }
1243 return st;
1244 }
1245
1246 //--------------------------------------------------------------------------
1247 // Transfer the data from user space to kernel space
1248 //--------------------------------------------------------------------------
1249 uint32_t length = buffer.GetSize();
1250 char *ubuff = buffer.Release();
1251
1252 std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1253 ssize_t ret = XrdSys::Move( ubuff, *kbuff, length );
1254 if( ret < 0 )
1256
1257 //--------------------------------------------------------------------------
1258 // Now create a write request and enqueue it
1259 //--------------------------------------------------------------------------
1260 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1261 }
1262
1263 //----------------------------------------------------------------------------
1264 // Write a data from a given file descriptor at a given offset - async
1265 //----------------------------------------------------------------------------
1266 XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1267 uint64_t offset,
1268 uint32_t size,
1269 Optional<uint64_t> fdoff,
1270 int fd,
1271 ResponseHandler *handler,
1272 uint16_t timeout )
1273 {
1274 //--------------------------------------------------------------------------
1275 // Read the data from the file descriptor into a kernel buffer
1276 //--------------------------------------------------------------------------
1277 std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1278 ssize_t ret = fdoff ? XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1279 XrdSys::Read( fd, *kbuff, size );
1280 if( ret < 0 )
1282
1283 //--------------------------------------------------------------------------
1284 // Now create a write request and enqueue it
1285 //--------------------------------------------------------------------------
1286 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1287 }
1288
1289 //----------------------------------------------------------------------------
1290 // Write number of pages at a given offset - async
1291 //----------------------------------------------------------------------------
1292 XRootDStatus FileStateHandler::PgWrite( std::shared_ptr<FileStateHandler> &self,
1293 uint64_t offset,
1294 uint32_t size,
1295 const void *buffer,
1296 std::vector<uint32_t> &cksums,
1297 ResponseHandler *handler,
1298 uint16_t timeout )
1299 {
1300 //--------------------------------------------------------------------------
1301 // Resolve timeout value
1302 //--------------------------------------------------------------------------
1303 if( timeout == 0 )
1304 {
1305 int val = DefaultRequestTimeout;
1306 XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
1307 timeout = val;
1308 }
1309
1310 //--------------------------------------------------------------------------
1311 // Validate the digest vector size
1312 //--------------------------------------------------------------------------
1313 if( cksums.empty() )
1314 {
1315 const char *data = static_cast<const char*>( buffer );
1316 XrdOucPgrwUtils::csCalc( data, offset, size, cksums );
1317 }
1318 else
1319 {
1320 size_t crc32cCnt = XrdOucPgrwUtils::csNum( offset, size );
1321 if( crc32cCnt != cksums.size() )
1322 return XRootDStatus( stError, errInvalidArgs, 0, "Wrong number of crc32c digests." );
1323 }
1324
1325 //--------------------------------------------------------------------------
1326 // Create a context for PgWrite operation
1327 //--------------------------------------------------------------------------
1328 struct pgwrt_t
1329 {
1330 pgwrt_t( ResponseHandler *h ) : handler( h ), status( nullptr )
1331 {
1332 }
1333
1334 ~pgwrt_t()
1335 {
1336 if( handler )
1337 {
1338 // if all retries were successful no error status was set
1339 if( !status ) status = new XRootDStatus();
1340 handler->HandleResponse( status, nullptr );
1341 }
1342 }
1343
1344 static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1345 {
1346 if( pgoff == offset ) return 0; // we need this if statement because we operate on unsigned integers
1347 return ( pgoff - ( offset + fstpglen ) ) / XrdSys::PageSize + 1;
1348 }
1349
1350 inline void SetStatus( XRootDStatus* s )
1351 {
1352 if( !status ) status = s;
1353 else delete s;
1354 }
1355
1356 ResponseHandler *handler;
1357 XRootDStatus *status;
1358 };
1359 auto pgwrt = std::make_shared<pgwrt_t>( handler );
1360
1361 int fLen, lLen;
1362 XrdOucPgrwUtils::csNum( offset, size, fLen, lLen );
1363 uint32_t fstpglen = fLen;
1364
1365 time_t start = ::time( nullptr );
1366 auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1367 {
1368 std::unique_ptr<AnyObject> scoped( r );
1369 // if the request failed simply pass the status to the
1370 // user handler
1371 if( !s->IsOK() )
1372 {
1373 pgwrt->SetStatus( s );
1374 return; // pgwrt destructor will call the handler
1375 }
1376 // also if the request was sucessful and there were no
1377 // corrupted pages pass the status to the user handler
1378 RetryInfo *inf = nullptr;
1379 r->Get( inf );
1380 if( !inf->NeedRetry() )
1381 {
1382 pgwrt->SetStatus( s );
1383 return; // pgwrt destructor will call the handler
1384 }
1385 delete s;
1386 // first adjust the timeout value
1387 uint16_t elapsed = ::time( nullptr ) - start;
1388 if( elapsed >= timeout )
1389 {
1390 pgwrt->SetStatus( new XRootDStatus( stError, errOperationExpired ) );
1391 return; // pgwrt destructor will call the handler
1392 }
1393 else timeout -= elapsed;
1394 // retransmit the corrupted pages
1395 for( size_t i = 0; i < inf->Size(); ++i )
1396 {
1397 auto tpl = inf->At( i );
1398 uint64_t pgoff = std::get<0>( tpl );
1399 uint32_t pglen = std::get<1>( tpl );
1400 const void *pgbuf = static_cast<const char*>( buffer ) + ( pgoff - offset );
1401 uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1402 auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1403 {
1404 std::unique_ptr<AnyObject> scoped( r );
1405 // if we failed simply set the status
1406 if( !s->IsOK() )
1407 {
1408 pgwrt->SetStatus( s );
1409 return; // the destructor will call the handler
1410 }
1411 delete s;
1412 // otherwise check if the data were not corrupted again
1413 RetryInfo *inf = nullptr;
1414 r->Get( inf );
1415 if( inf->NeedRetry() ) // so we failed in the end
1416 {
1417 DefaultEnv::GetLog()->Warning( FileMsg, "[%p@%s] Failed retransmitting corrupted "
1418 "page: pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1419 self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1420 pgwrt->SetStatus( new XRootDStatus( stError, errDataError, 0,
1421 "Failed to retransmit corrupted page" ) );
1422 }
1423 else
1424 DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Succesfuly retransmitted corrupted "
1425 "page: pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1426 self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1427 } );
1428 auto st = PgWriteRetry( self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1429 if( !st.IsOK() ) pgwrt->SetStatus( new XRootDStatus( st ) );
1430 DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Retransmitting corrupted page: "
1431 "pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1432 self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1433 }
1434 } );
1435
1436 auto st = PgWriteImpl( self, offset, size, buffer, cksums, 0, h, timeout );
1437 if( !st.IsOK() )
1438 {
1439 pgwrt->handler = nullptr;
1440 delete h;
1441 }
1442 return st;
1443 }
1444
1445 //------------------------------------------------------------------------
1446 // Write number of pages at a given offset - async
1447 //------------------------------------------------------------------------
1448 XRootDStatus FileStateHandler::PgWriteRetry( std::shared_ptr<FileStateHandler> &self,
1449 uint64_t offset,
1450 uint32_t size,
1451 const void *buffer,
1452 uint32_t digest,
1453 ResponseHandler *handler,
1454 uint16_t timeout )
1455 {
1456 std::vector<uint32_t> cksums{ digest };
1457 return PgWriteImpl( self, offset, size, buffer, cksums, PgReadFlags::Retry, handler, timeout );
1458 }
1459
1460 //------------------------------------------------------------------------
1461 // Write number of pages at a given offset - async
1462 //------------------------------------------------------------------------
1463 XRootDStatus FileStateHandler::PgWriteImpl( std::shared_ptr<FileStateHandler> &self,
1464 uint64_t offset,
1465 uint32_t size,
1466 const void *buffer,
1467 std::vector<uint32_t> &cksums,
1468 kXR_char flags,
1469 ResponseHandler *handler,
1470 uint16_t timeout )
1471 {
1472 XrdSysMutexHelper scopedLock( self->pMutex );
1473
1474 if( self->pFileState == Error ) return self->pStatus;
1475
1476 if( self->pFileState != Opened && self->pFileState != Recovering )
1478
1479 Log *log = DefaultEnv::GetLog();
1480 log->Debug( FileMsg, "[%p@%s] Sending a pgwrite command for handle %#x to %s",
1481 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1482 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1483
1484 //--------------------------------------------------------------------------
1485 // Create the message
1486 //--------------------------------------------------------------------------
1487 Message *msg;
1489 MessageUtils::CreateRequest( msg, req );
1490
1491 req->requestid = kXR_pgwrite;
1492 req->offset = offset;
1493 req->dlen = size + cksums.size() * sizeof( uint32_t );
1494 req->reqflags = flags;
1495 memcpy( req->fhandle, self->pFileHandle, 4 );
1496
1497 ChunkList *list = new ChunkList();
1498 list->push_back( ChunkInfo( offset, size, (char*)buffer ) );
1499
1500 MessageSendParams params;
1501 params.timeout = timeout;
1502 params.followRedirects = false;
1503 params.stateful = true;
1504 params.chunkList = list;
1505 params.crc32cDigests.swap( cksums );
1506
1508
1510 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1511
1512 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1513 }
1514
1515 //----------------------------------------------------------------------------
1516 // Commit all pending disk writes - async
1517 //----------------------------------------------------------------------------
1518 XRootDStatus FileStateHandler::Sync( std::shared_ptr<FileStateHandler> &self,
1519 ResponseHandler *handler,
1520 uint16_t timeout )
1521 {
1522 XrdSysMutexHelper scopedLock( self->pMutex );
1523
1524 if( self->pFileState == Error ) return self->pStatus;
1525
1526 if( self->pFileState != Opened && self->pFileState != Recovering )
1528
1529 Log *log = DefaultEnv::GetLog();
1530 log->Debug( FileMsg, "[%p@%s] Sending a sync command for handle %#x to %s",
1531 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1532 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1533
1534 Message *msg;
1535 ClientSyncRequest *req;
1536 MessageUtils::CreateRequest( msg, req );
1537
1538 req->requestid = kXR_sync;
1539 memcpy( req->fhandle, self->pFileHandle, 4 );
1540
1541 MessageSendParams params;
1542 params.timeout = timeout;
1543 params.followRedirects = false;
1544 params.stateful = true;
1546
1548 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1549
1550 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1551 }
1552
1553 //----------------------------------------------------------------------------
1554 // Truncate the file to a particular size - async
1555 //----------------------------------------------------------------------------
1556 XRootDStatus FileStateHandler::Truncate( std::shared_ptr<FileStateHandler> &self,
1557 uint64_t size,
1558 ResponseHandler *handler,
1559 uint16_t timeout )
1560 {
1561 XrdSysMutexHelper scopedLock( self->pMutex );
1562
1563 if( self->pFileState == Error ) return self->pStatus;
1564
1565 if( self->pFileState != Opened && self->pFileState != Recovering )
1567
1568 Log *log = DefaultEnv::GetLog();
1569 log->Debug( FileMsg, "[%p@%s] Sending a truncate command for handle %#x to %s",
1570 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1571 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1572
1573 Message *msg;
1575 MessageUtils::CreateRequest( msg, req );
1576
1577 req->requestid = kXR_truncate;
1578 memcpy( req->fhandle, self->pFileHandle, 4 );
1579 req->offset = size;
1580
1581 MessageSendParams params;
1582 params.timeout = timeout;
1583 params.followRedirects = false;
1584 params.stateful = true;
1586
1588 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1589
1590 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1591 }
1592
1593 //----------------------------------------------------------------------------
1594 // Read scattered data chunks in one operation - async
1595 //----------------------------------------------------------------------------
1596 XRootDStatus FileStateHandler::VectorRead( std::shared_ptr<FileStateHandler> &self,
1597 const ChunkList &chunks,
1598 void *buffer,
1599 ResponseHandler *handler,
1600 uint16_t timeout )
1601 {
1602 //--------------------------------------------------------------------------
1603 // Sanity check
1604 //--------------------------------------------------------------------------
1605 XrdSysMutexHelper scopedLock( self->pMutex );
1606
1607 if( self->pFileState == Error ) return self->pStatus;
1608
1609 if( self->pFileState != Opened && self->pFileState != Recovering )
1611
1612 Log *log = DefaultEnv::GetLog();
1613 log->Debug( FileMsg, "[%p@%s] Sending a vector read command for handle %#x to %s",
1614 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1615 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1616
1617 //--------------------------------------------------------------------------
1618 // Build the message
1619 //--------------------------------------------------------------------------
1620 Message *msg;
1621 ClientReadVRequest *req;
1622 MessageUtils::CreateRequest( msg, req, sizeof(readahead_list)*chunks.size() );
1623
1624 req->requestid = kXR_readv;
1625 req->dlen = sizeof(readahead_list)*chunks.size();
1626
1627 ChunkList *list = new ChunkList();
1628 char *cursor = (char*)buffer;
1629
1630 //--------------------------------------------------------------------------
1631 // Copy the chunk info
1632 //--------------------------------------------------------------------------
1633 readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
1634 for( size_t i = 0; i < chunks.size(); ++i )
1635 {
1636 dataChunk[i].rlen = chunks[i].length;
1637 dataChunk[i].offset = chunks[i].offset;
1638 memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1639
1640 void *chunkBuffer;
1641 if( cursor )
1642 {
1643 chunkBuffer = cursor;
1644 cursor += chunks[i].length;
1645 }
1646 else
1647 chunkBuffer = chunks[i].buffer;
1648
1649 list->push_back( ChunkInfo( chunks[i].offset,
1650 chunks[i].length,
1651 chunkBuffer ) );
1652 }
1653
1654 //--------------------------------------------------------------------------
1655 // Send the message
1656 //--------------------------------------------------------------------------
1657 MessageSendParams params;
1658 params.timeout = timeout;
1659 params.followRedirects = false;
1660 params.stateful = true;
1661 params.chunkList = list;
1663
1665 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1666
1667 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1668 }
1669
1670 //------------------------------------------------------------------------
1671 // Write scattered data chunks in one operation - async
1672 //------------------------------------------------------------------------
1673 XRootDStatus FileStateHandler::VectorWrite( std::shared_ptr<FileStateHandler> &self,
1674 const ChunkList &chunks,
1675 ResponseHandler *handler,
1676 uint16_t timeout )
1677 {
1678 //--------------------------------------------------------------------------
1679 // Sanity check
1680 //--------------------------------------------------------------------------
1681 XrdSysMutexHelper scopedLock( self->pMutex );
1682
1683 if( self->pFileState == Error ) return self->pStatus;
1684
1685 if( self->pFileState != Opened && self->pFileState != Recovering )
1687
1688 Log *log = DefaultEnv::GetLog();
1689 log->Debug( FileMsg, "[%p@%s] Sending a vector write command for handle %#x to %s",
1690 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1691 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1692
1693 //--------------------------------------------------------------------------
1694 // Determine the size of the payload
1695 //--------------------------------------------------------------------------
1696
1697 // the size of write vector
1698 uint32_t payloadSize = sizeof(XrdProto::write_list) * chunks.size();
1699
1700 //--------------------------------------------------------------------------
1701 // Build the message
1702 //--------------------------------------------------------------------------
1703 Message *msg;
1705 MessageUtils::CreateRequest( msg, req, payloadSize );
1706
1707 req->requestid = kXR_writev;
1708 req->dlen = sizeof(XrdProto::write_list) * chunks.size();
1709
1710 ChunkList *list = new ChunkList();
1711
1712 //--------------------------------------------------------------------------
1713 // Copy the chunk info
1714 //--------------------------------------------------------------------------
1715 XrdProto::write_list *writeList =
1716 reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
1717
1718
1719
1720 for( size_t i = 0; i < chunks.size(); ++i )
1721 {
1722 writeList[i].wlen = chunks[i].length;
1723 writeList[i].offset = chunks[i].offset;
1724 memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1725
1726 list->push_back( ChunkInfo( chunks[i].offset,
1727 chunks[i].length,
1728 chunks[i].buffer ) );
1729 }
1730
1731 //--------------------------------------------------------------------------
1732 // Send the message
1733 //--------------------------------------------------------------------------
1734 MessageSendParams params;
1735 params.timeout = timeout;
1736 params.followRedirects = false;
1737 params.stateful = true;
1738 params.chunkList = list;
1740
1742 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1743
1744 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1745 }
1746
1747 //------------------------------------------------------------------------
1748 // Write scattered buffers in one operation - async
1749 //------------------------------------------------------------------------
1750 XRootDStatus FileStateHandler::WriteV( std::shared_ptr<FileStateHandler> &self,
1751 uint64_t offset,
1752 const struct iovec *iov,
1753 int iovcnt,
1754 ResponseHandler *handler,
1755 uint16_t timeout )
1756 {
1757 XrdSysMutexHelper scopedLock( self->pMutex );
1758
1759 if( self->pFileState == Error ) return self->pStatus;
1760
1761 if( self->pFileState != Opened && self->pFileState != Recovering )
1763
1764 Log *log = DefaultEnv::GetLog();
1765 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1766 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1767 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1768
1769 Message *msg;
1770 ClientWriteRequest *req;
1771 MessageUtils::CreateRequest( msg, req );
1772
1773 ChunkList *list = new ChunkList();
1774
1775 uint32_t size = 0;
1776 for( int i = 0; i < iovcnt; ++i )
1777 {
1778 if( iov[i].iov_len == 0 ) continue;
1779 size += iov[i].iov_len;
1780 list->push_back( ChunkInfo( 0, iov[i].iov_len,
1781 (char*)iov[i].iov_base ) );
1782 }
1783
1784 req->requestid = kXR_write;
1785 req->offset = offset;
1786 req->dlen = size;
1787 memcpy( req->fhandle, self->pFileHandle, 4 );
1788
1789 MessageSendParams params;
1790 params.timeout = timeout;
1791 params.followRedirects = false;
1792 params.stateful = true;
1793 params.chunkList = list;
1794
1796
1798 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1799
1800 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1801 }
1802
1803 //------------------------------------------------------------------------
1804 // Read data into scattered buffers in one operation - async
1805 //------------------------------------------------------------------------
1806 XRootDStatus FileStateHandler::ReadV( std::shared_ptr<FileStateHandler> &self,
1807 uint64_t offset,
1808 struct iovec *iov,
1809 int iovcnt,
1810 ResponseHandler *handler,
1811 uint16_t timeout )
1812 {
1813 XrdSysMutexHelper scopedLock( self->pMutex );
1814
1815 if( self->pFileState == Error ) return self->pStatus;
1816
1817 if( self->pFileState != Opened && self->pFileState != Recovering )
1819
1820 Log *log = DefaultEnv::GetLog();
1821 log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1822 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1823 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1824
1825 Message *msg;
1826 ClientReadRequest *req;
1827 MessageUtils::CreateRequest( msg, req );
1828
1829 // calculate the total read size
1830 size_t size = std::accumulate( iov, iov + iovcnt, 0, []( size_t acc, iovec &rhs )
1831 {
1832 return acc + rhs.iov_len;
1833 } );
1834 req->requestid = kXR_read;
1835 req->offset = offset;
1836 req->rlen = size;
1838 memcpy( req->fhandle, self->pFileHandle, 4 );
1839
1840 ChunkList *list = new ChunkList();
1841 list->reserve( iovcnt );
1842 uint64_t choff = offset;
1843 for( int i = 0; i < iovcnt; ++i )
1844 {
1845 list->emplace_back( choff, iov[i].iov_len, iov[i].iov_base );
1846 choff += iov[i].iov_len;
1847 }
1848
1850 MessageSendParams params;
1851 params.timeout = timeout;
1852 params.followRedirects = false;
1853 params.stateful = true;
1854 params.chunkList = list;
1856 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1857
1858 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1859 }
1860
1861 //----------------------------------------------------------------------------
1862 // Performs a custom operation on an open file, server implementation
1863 // dependent - async
1864 //----------------------------------------------------------------------------
1865 XRootDStatus FileStateHandler::Fcntl( std::shared_ptr<FileStateHandler> &self,
1866 const Buffer &arg,
1867 ResponseHandler *handler,
1868 uint16_t timeout )
1869 {
1870 XrdSysMutexHelper scopedLock( self->pMutex );
1871
1872 if( self->pFileState == Error ) return self->pStatus;
1873
1874 if( self->pFileState != Opened && self->pFileState != Recovering )
1876
1877 Log *log = DefaultEnv::GetLog();
1878 log->Debug( FileMsg, "[%p@%s] Sending a fcntl command for handle %#x to %s",
1879 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1880 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1881
1882 Message *msg;
1883 ClientQueryRequest *req;
1884 MessageUtils::CreateRequest( msg, req, arg.GetSize() );
1885
1886 req->requestid = kXR_query;
1887 req->infotype = kXR_Qopaqug;
1888 req->dlen = arg.GetSize();
1889 memcpy( req->fhandle, self->pFileHandle, 4 );
1890 msg->Append( arg.GetBuffer(), arg.GetSize(), 24 );
1891
1892 MessageSendParams params;
1893 params.timeout = timeout;
1894 params.followRedirects = false;
1895 params.stateful = true;
1897
1899 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1900
1901 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1902 }
1903
1904 //----------------------------------------------------------------------------
1905 // Get access token to a file - async
1906 //----------------------------------------------------------------------------
1907 XRootDStatus FileStateHandler::Visa( std::shared_ptr<FileStateHandler> &self,
1908 ResponseHandler *handler,
1909 uint16_t timeout )
1910 {
1911 XrdSysMutexHelper scopedLock( self->pMutex );
1912
1913 if( self->pFileState == Error ) return self->pStatus;
1914
1915 if( self->pFileState != Opened && self->pFileState != Recovering )
1917
1918 Log *log = DefaultEnv::GetLog();
1919 log->Debug( FileMsg, "[%p@%s] Sending a visa command for handle %#x to %s",
1920 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1921 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1922
1923 Message *msg;
1924 ClientQueryRequest *req;
1925 MessageUtils::CreateRequest( msg, req );
1926
1927 req->requestid = kXR_query;
1928 req->infotype = kXR_Qvisa;
1929 memcpy( req->fhandle, self->pFileHandle, 4 );
1930
1931 MessageSendParams params;
1932 params.timeout = timeout;
1933 params.followRedirects = false;
1934 params.stateful = true;
1936
1938 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1939
1940 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1941 }
1942
1943 //------------------------------------------------------------------------
1944 // Set extended attributes - async
1945 //------------------------------------------------------------------------
1946 XRootDStatus FileStateHandler::SetXAttr( std::shared_ptr<FileStateHandler> &self,
1947 const std::vector<xattr_t> &attrs,
1948 ResponseHandler *handler,
1949 uint16_t timeout )
1950 {
1951 XrdSysMutexHelper scopedLock( self->pMutex );
1952
1953 if( self->pFileState == Error ) return self->pStatus;
1954
1955 if( self->pFileState != Opened && self->pFileState != Recovering )
1957
1958 Log *log = DefaultEnv::GetLog();
1959 log->Debug( FileMsg, "[%p@%s] Sending a fattr set command for handle %#x to %s",
1960 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1961 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1962
1963 //--------------------------------------------------------------------------
1964 // Issue a new fattr get request
1965 //--------------------------------------------------------------------------
1966 return XAttrOperationImpl( self, kXR_fattrSet, 0, attrs, handler, timeout );
1967 }
1968
1969 //------------------------------------------------------------------------
1970 // Get extended attributes - async
1971 //------------------------------------------------------------------------
1972 XRootDStatus FileStateHandler::GetXAttr( std::shared_ptr<FileStateHandler> &self,
1973 const std::vector<std::string> &attrs,
1974 ResponseHandler *handler,
1975 uint16_t timeout )
1976 {
1977 XrdSysMutexHelper scopedLock( self->pMutex );
1978
1979 if( self->pFileState == Error ) return self->pStatus;
1980
1981 if( self->pFileState != Opened && self->pFileState != Recovering )
1983
1984 Log *log = DefaultEnv::GetLog();
1985 log->Debug( FileMsg, "[%p@%s] Sending a fattr get command for handle %#x to %s",
1986 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1987 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1988
1989 //--------------------------------------------------------------------------
1990 // Issue a new fattr get request
1991 //--------------------------------------------------------------------------
1992 return XAttrOperationImpl( self, kXR_fattrGet, 0, attrs, handler, timeout );
1993 }
1994
1995 //------------------------------------------------------------------------
1996 // Delete extended attributes - async
1997 //------------------------------------------------------------------------
1998 XRootDStatus FileStateHandler::DelXAttr( std::shared_ptr<FileStateHandler> &self,
1999 const std::vector<std::string> &attrs,
2000 ResponseHandler *handler,
2001 uint16_t timeout )
2002 {
2003 XrdSysMutexHelper scopedLock( self->pMutex );
2004
2005 if( self->pFileState == Error ) return self->pStatus;
2006
2007 if( self->pFileState != Opened && self->pFileState != Recovering )
2009
2010 Log *log = DefaultEnv::GetLog();
2011 log->Debug( FileMsg, "[%p@%s] Sending a fattr del command for handle %#x to %s",
2012 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2013 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2014
2015 //--------------------------------------------------------------------------
2016 // Issue a new fattr del request
2017 //--------------------------------------------------------------------------
2018 return XAttrOperationImpl( self, kXR_fattrDel, 0, attrs, handler, timeout );
2019 }
2020
2021 //------------------------------------------------------------------------
2022 // List extended attributes - async
2023 //------------------------------------------------------------------------
2024 XRootDStatus FileStateHandler::ListXAttr( std::shared_ptr<FileStateHandler> &self,
2025 ResponseHandler *handler,
2026 uint16_t timeout )
2027 {
2028 XrdSysMutexHelper scopedLock( self->pMutex );
2029
2030 if( self->pFileState == Error ) return self->pStatus;
2031
2032 if( self->pFileState != Opened && self->pFileState != Recovering )
2034
2035 Log *log = DefaultEnv::GetLog();
2036 log->Debug( FileMsg, "[%p@%s] Sending a fattr list command for handle %#x to %s",
2037 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2038 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2039
2040 //--------------------------------------------------------------------------
2041 // Issue a new fattr get request
2042 //--------------------------------------------------------------------------
2043 static const std::vector<std::string> nothing;
2044 return XAttrOperationImpl( self, kXR_fattrList, ClientFattrRequest::aData,
2045 nothing, handler, timeout );
2046 }
2047
2048 //------------------------------------------------------------------------
2058 //------------------------------------------------------------------------
2059 XRootDStatus FileStateHandler::Checkpoint( std::shared_ptr<FileStateHandler> &self,
2060 kXR_char code,
2061 ResponseHandler *handler,
2062 uint16_t timeout )
2063 {
2064 XrdSysMutexHelper scopedLock( self->pMutex );
2065
2066 if( self->pFileState == Error ) return self->pStatus;
2067
2068 if( self->pFileState != Opened && self->pFileState != Recovering )
2070
2071 Log *log = DefaultEnv::GetLog();
2072 log->Debug( FileMsg, "[%p@%s] Sending a checkpoint command for handle %#x to %s",
2073 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2074 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2075
2076 Message *msg;
2078 MessageUtils::CreateRequest( msg, req );
2079
2080 req->requestid = kXR_chkpoint;
2081 req->opcode = code;
2082 memcpy( req->fhandle, self->pFileHandle, 4 );
2083
2084 MessageSendParams params;
2085 params.timeout = timeout;
2086 params.followRedirects = false;
2087 params.stateful = true;
2088
2090
2092 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2093
2094 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2095 }
2096
2097 //------------------------------------------------------------------------
2107 //------------------------------------------------------------------------
2108 XRootDStatus FileStateHandler::ChkptWrt( std::shared_ptr<FileStateHandler> &self,
2109 uint64_t offset,
2110 uint32_t size,
2111 const void *buffer,
2112 ResponseHandler *handler,
2113 uint16_t timeout )
2114 {
2115 XrdSysMutexHelper scopedLock( self->pMutex );
2116
2117 if( self->pFileState == Error ) return self->pStatus;
2118
2119 if( self->pFileState != Opened && self->pFileState != Recovering )
2121
2122 Log *log = DefaultEnv::GetLog();
2123 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2124 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2125 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2126
2127 Message *msg;
2129 MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2130
2131 req->requestid = kXR_chkpoint;
2132 req->opcode = kXR_ckpXeq;
2133 req->dlen = 24; // as specified in the protocol specification
2134 memcpy( req->fhandle, self->pFileHandle, 4 );
2135
2137 wrtreq->requestid = kXR_write;
2138 wrtreq->offset = offset;
2139 wrtreq->dlen = size;
2140 memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2141
2142 ChunkList *list = new ChunkList();
2143 list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
2144
2145 MessageSendParams params;
2146 params.timeout = timeout;
2147 params.followRedirects = false;
2148 params.stateful = true;
2149 params.chunkList = list;
2150
2152
2154 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2155
2156 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2157 }
2158
2159 //------------------------------------------------------------------------
2169 //------------------------------------------------------------------------
2170 XRootDStatus FileStateHandler::ChkptWrtV( std::shared_ptr<FileStateHandler> &self,
2171 uint64_t offset,
2172 const struct iovec *iov,
2173 int iovcnt,
2174 ResponseHandler *handler,
2175 uint16_t timeout )
2176 {
2177 XrdSysMutexHelper scopedLock( self->pMutex );
2178
2179 if( self->pFileState == Error ) return self->pStatus;
2180
2181 if( self->pFileState != Opened && self->pFileState != Recovering )
2183
2184 Log *log = DefaultEnv::GetLog();
2185 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2186 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2187 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2188
2189 Message *msg;
2191 MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2192
2193 req->requestid = kXR_chkpoint;
2194 req->opcode = kXR_ckpXeq;
2195 req->dlen = 24; // as specified in the protocol specification
2196 memcpy( req->fhandle, self->pFileHandle, 4 );
2197
2198 ChunkList *list = new ChunkList();
2199 uint32_t size = 0;
2200 for( int i = 0; i < iovcnt; ++i )
2201 {
2202 if( iov[i].iov_len == 0 ) continue;
2203 size += iov[i].iov_len;
2204 list->push_back( ChunkInfo( 0, iov[i].iov_len,
2205 (char*)iov[i].iov_base ) );
2206 }
2207
2209 wrtreq->requestid = kXR_write;
2210 wrtreq->offset = offset;
2211 wrtreq->dlen = size;
2212 memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2213
2214 MessageSendParams params;
2215 params.timeout = timeout;
2216 params.followRedirects = false;
2217 params.stateful = true;
2218 params.chunkList = list;
2219
2221
2223 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2224
2225 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2226 }
2227
2228 //----------------------------------------------------------------------------
2229 // Check if the file is open
2230 //----------------------------------------------------------------------------
2232 {
2233 XrdSysMutexHelper scopedLock( pMutex );
2234
2235 if( pFileState == Opened || pFileState == Recovering )
2236 return true;
2237 return false;
2238 }
2239
2240 //----------------------------------------------------------------------------
2241 // Set file property
2242 //----------------------------------------------------------------------------
2243 bool FileStateHandler::SetProperty( const std::string &name,
2244 const std::string &value )
2245 {
2246 XrdSysMutexHelper scopedLock( pMutex );
2247 if( name == "ReadRecovery" )
2248 {
2249 if( value == "true" ) pDoRecoverRead = true;
2250 else pDoRecoverRead = false;
2251 return true;
2252 }
2253 else if( name == "WriteRecovery" )
2254 {
2255 if( value == "true" ) pDoRecoverWrite = true;
2256 else pDoRecoverWrite = false;
2257 return true;
2258 }
2259 else if( name == "FollowRedirects" )
2260 {
2261 if( value == "true" ) pFollowRedirects = true;
2262 else pFollowRedirects = false;
2263 return true;
2264 }
2265 else if( name == "BundledClose" )
2266 {
2267 if( value == "true" ) pAllowBundledClose = true;
2268 else pAllowBundledClose = false;
2269 return true;
2270 }
2271 return false;
2272 }
2273
2274 //----------------------------------------------------------------------------
2275 // Get file property
2276 //----------------------------------------------------------------------------
2277 bool FileStateHandler::GetProperty( const std::string &name,
2278 std::string &value ) const
2279 {
2280 XrdSysMutexHelper scopedLock( pMutex );
2281 if( name == "ReadRecovery" )
2282 {
2283 if( pDoRecoverRead ) value = "true";
2284 else value = "false";
2285 return true;
2286 }
2287 else if( name == "WriteRecovery" )
2288 {
2289 if( pDoRecoverWrite ) value = "true";
2290 else value = "false";
2291 return true;
2292 }
2293 else if( name == "FollowRedirects" )
2294 {
2295 if( pFollowRedirects ) value = "true";
2296 else value = "false";
2297 return true;
2298 }
2299 else if( name == "DataServer" && pDataServer )
2300 { value = pDataServer->GetHostId(); return true; }
2301 else if( name == "LastURL" && pDataServer )
2302 { value = pDataServer->GetURL(); return true; }
2303 else if( name == "WrtRecoveryRedir" && pWrtRecoveryRedir )
2304 { value = pWrtRecoveryRedir->GetHostId(); return true; }
2305 value = "";
2306 return false;
2307 }
2308
2309 //----------------------------------------------------------------------------
2310 // Process the results of the opening operation
2311 //----------------------------------------------------------------------------
2313 const OpenInfo *openInfo,
2314 const HostList *hostList )
2315 {
2316 Log *log = DefaultEnv::GetLog();
2317 XrdSysMutexHelper scopedLock( pMutex );
2318
2319 //--------------------------------------------------------------------------
2320 // Assign the data server and the load balancer
2321 //--------------------------------------------------------------------------
2322 std::string lastServer = pFileUrl->GetHostId();
2323 if( hostList )
2324 {
2325 delete pDataServer;
2326 delete pLoadBalancer;
2327 pLoadBalancer = 0;
2328 delete pWrtRecoveryRedir;
2329 pWrtRecoveryRedir = 0;
2330
2331 pDataServer = new URL( hostList->back().url );
2332 pDataServer->SetParams( pFileUrl->GetParams() );
2333 if( !( pUseVirtRedirector && pFileUrl->IsMetalink() ) ) pDataServer->SetPath( pFileUrl->GetPath() );
2334 lastServer = pDataServer->GetHostId();
2335 HostList::const_iterator itC;
2336 URL::ParamsMap params = pDataServer->GetParams();
2337 for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2338 {
2339 MessageUtils::MergeCGI( params,
2340 itC->url.GetParams(),
2341 true );
2342 }
2343 pDataServer->SetParams( params );
2344
2345 HostList::const_reverse_iterator it;
2346 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2347 if( it->loadBalancer )
2348 {
2349 pLoadBalancer = new URL( it->url );
2350 break;
2351 }
2352
2353 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2354 if( it->flags & kXR_recoverWrts )
2355 {
2356 pWrtRecoveryRedir = new URL( it->url );
2357 break;
2358 }
2359 }
2360
2361 log->Debug(FileMsg, "[%p@%s] Open has returned with status %s",
2362 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), status->ToStr().c_str() );
2363
2364 if( pDataServer && !pDataServer->IsLocalFile() )
2365 {
2366 //------------------------------------------------------------------------
2367 // Check if we are using a secure connection
2368 //------------------------------------------------------------------------
2369 XrdCl::AnyObject isencobj;
2371 QueryTransport( *pDataServer, XRootDQuery::IsEncrypted, isencobj );
2372 if( st.IsOK() )
2373 {
2374 bool *isenc;
2375 isencobj.Get( isenc );
2376 pIsChannelEncrypted = isenc ? *isenc : false;
2377 delete isenc;
2378 }
2379 }
2380
2381 //--------------------------------------------------------------------------
2382 // We have failed
2383 //--------------------------------------------------------------------------
2384 pStatus = *status;
2385 if( !pStatus.IsOK() || !openInfo )
2386 {
2387 log->Debug(FileMsg, "[%p@%s] Error while opening at %s: %s",
2388 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), lastServer.c_str(),
2389 pStatus.ToStr().c_str() );
2390 FailQueuedMessages( pStatus );
2391 pFileState = Error;
2392
2393 //------------------------------------------------------------------------
2394 // Report to monitoring
2395 //------------------------------------------------------------------------
2397 if( mon )
2398 {
2400 i.file = pFileUrl;
2401 i.status = status;
2403 mon->Event( Monitor::EvErrIO, &i );
2404 }
2405 }
2406 //--------------------------------------------------------------------------
2407 // We have succeeded
2408 //--------------------------------------------------------------------------
2409 else
2410 {
2411 //------------------------------------------------------------------------
2412 // Store the response info
2413 //------------------------------------------------------------------------
2414 openInfo->GetFileHandle( pFileHandle );
2415 pSessionId = openInfo->GetSessionId();
2416 if( openInfo->GetStatInfo() )
2417 {
2418 delete pStatInfo;
2419 pStatInfo = new StatInfo( *openInfo->GetStatInfo() );
2420 }
2421
2422 log->Debug( FileMsg, "[%p@%s] successfully opened at %s, handle: %#x, "
2423 "session id: %llu", (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
2424 pDataServer->GetHostId().c_str(), *((uint32_t*)pFileHandle),
2425 (unsigned long long) pSessionId );
2426
2427 //------------------------------------------------------------------------
2428 // Inform the monitoring about opening success
2429 //------------------------------------------------------------------------
2430 gettimeofday( &pOpenTime, 0 );
2432 if( mon )
2433 {
2435 i.file = pFileUrl;
2436 i.dataServer = pDataServer->GetHostId();
2437 i.oFlags = pOpenFlags;
2438 i.fSize = pStatInfo ? pStatInfo->GetSize() : 0;
2439 mon->Event( Monitor::EvOpen, &i );
2440 }
2441
2442 //------------------------------------------------------------------------
2443 // Resend the queued messages if any
2444 //------------------------------------------------------------------------
2445 ReSendQueuedMessages();
2446 pFileState = Opened;
2447 }
2448 }
2449
2450 //----------------------------------------------------------------------------
2451 // Process the results of the closing operation
2452 //----------------------------------------------------------------------------
2454 {
2455 Log *log = DefaultEnv::GetLog();
2456 XrdSysMutexHelper scopedLock( pMutex );
2457
2458 log->Debug(FileMsg, "[%p@%s] Close returned from %s with: %s", (void*)this,
2459 pFileUrl->GetObfuscatedURL().c_str(), pDataServer->GetHostId().c_str(),
2460 status->ToStr().c_str() );
2461
2462 log->Dump(FileMsg, "[%p@%s] Items in the fly %zu, queued for recovery %zu",
2463 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), pInTheFly.size(), pToBeRecovered.size() );
2464
2465 MonitorClose( status );
2466 ResetMonitoringVars();
2467
2468 pStatus = *status;
2469 pFileState = Closed;
2470 }
2471
2472 //----------------------------------------------------------------------------
2473 // Handle an error while sending a stateful message
2474 //----------------------------------------------------------------------------
2475 void FileStateHandler::OnStateError( std::shared_ptr<FileStateHandler> &self,
2476 XRootDStatus *status,
2477 Message *message,
2478 ResponseHandler *userHandler,
2479 MessageSendParams &sendParams )
2480 {
2481 //--------------------------------------------------------------------------
2482 // It may be a redirection
2483 //--------------------------------------------------------------------------
2484 if( !status->IsOK() && status->code == errRedirect && self->pFollowRedirects )
2485 {
2486 static const std::string root = "root", xroot = "xroot", file = "file",
2487 roots = "roots", xroots = "xroots";
2488 std::string msg = status->GetErrorMessage();
2489 if( !msg.compare( 0, root.size(), root ) ||
2490 !msg.compare( 0, xroot.size(), xroot ) ||
2491 !msg.compare( 0, file.size(), file ) ||
2492 !msg.compare( 0, roots.size(), roots ) ||
2493 !msg.compare( 0, xroots.size(), xroots ) )
2494 {
2495 FileStateHandler::OnStateRedirection( self, msg, message, userHandler, sendParams );
2496 return;
2497 }
2498 }
2499
2500 //--------------------------------------------------------------------------
2501 // Handle error
2502 //--------------------------------------------------------------------------
2503 Log *log = DefaultEnv::GetLog();
2504 XrdSysMutexHelper scopedLock( self->pMutex );
2505 self->pInTheFly.erase( message );
2506
2507 log->Dump( FileMsg, "[%p@%s] File state error encountered. Message %s "
2508 "returned with %s", (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2509 message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2510
2511 //--------------------------------------------------------------------------
2512 // Report to monitoring
2513 //--------------------------------------------------------------------------
2515 if( mon )
2516 {
2518 i.file = self->pFileUrl;
2519 i.status = status;
2520
2521 ClientRequest *req = (ClientRequest*)message->GetBuffer();
2522 switch( req->header.requestid )
2523 {
2531 }
2532
2533 mon->Event( Monitor::EvErrIO, &i );
2534 }
2535
2536 //--------------------------------------------------------------------------
2537 // The message is not recoverable
2538 // (message using a kernel buffer is not recoverable by definition)
2539 //--------------------------------------------------------------------------
2540 if( !self->IsRecoverable( *status ) || sendParams.kbuff )
2541 {
2542 log->Error( FileMsg, "[%p@%s] Fatal file state error. Message %s "
2543 "returned with %s", (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2544 message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2545
2546 self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2547 delete status;
2548 return;
2549 }
2550
2551 //--------------------------------------------------------------------------
2552 // Insert the message to the recovery queue and start the recovery
2553 // procedure if we don't have any more message in the fly
2554 //--------------------------------------------------------------------------
2555 self->pCloseReason = *status;
2556 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2557 delete status;
2558 }
2559
2560 //----------------------------------------------------------------------------
2561 // Handle stateful redirect
2562 //----------------------------------------------------------------------------
2563 void FileStateHandler::OnStateRedirection( std::shared_ptr<FileStateHandler> &self,
2564 const std::string &redirectUrl,
2565 Message *message,
2566 ResponseHandler *userHandler,
2567 MessageSendParams &sendParams )
2568 {
2569 XrdSysMutexHelper scopedLock( self->pMutex );
2570 self->pInTheFly.erase( message );
2571
2572 //--------------------------------------------------------------------------
2573 // Register the state redirect url and append the new cgi information to
2574 // the file URL
2575 //--------------------------------------------------------------------------
2576 if( !self->pStateRedirect )
2577 {
2578 std::ostringstream o;
2579 self->pStateRedirect = new URL( redirectUrl );
2580 URL::ParamsMap params = self->pFileUrl->GetParams();
2581 MessageUtils::MergeCGI( params,
2582 self->pStateRedirect->GetParams(),
2583 false );
2584 self->pFileUrl->SetParams( params );
2585 }
2586
2587 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2588 }
2589
2590 //----------------------------------------------------------------------------
2591 // Handle stateful response
2592 //----------------------------------------------------------------------------
2593 void FileStateHandler::OnStateResponse( std::shared_ptr<FileStateHandler> &self,
2594 XRootDStatus *status,
2595 Message *message,
2596 AnyObject *response,
2597 HostList */*urlList*/ )
2598 {
2599 Log *log = DefaultEnv::GetLog();
2600 XrdSysMutexHelper scopedLock( self->pMutex );
2601
2602 log->Dump( FileMsg, "[%p@%s] Got state response for message %s",
2603 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2604 message->GetObfuscatedDescription().c_str() );
2605
2606 //--------------------------------------------------------------------------
2607 // Since this message may be the last "in-the-fly" and no recovery
2608 // is done if messages are in the fly, we may need to trigger recovery
2609 //--------------------------------------------------------------------------
2610 self->pInTheFly.erase( message );
2611 RunRecovery( self );
2612
2613 //--------------------------------------------------------------------------
2614 // Play with the actual response before returning it. This is a good
2615 // place to do caching in the future.
2616 //--------------------------------------------------------------------------
2617 ClientRequest *req = (ClientRequest*)message->GetBuffer();
2618 switch( req->header.requestid )
2619 {
2620 //------------------------------------------------------------------------
2621 // Cache the stat response
2622 //------------------------------------------------------------------------
2623 case kXR_stat:
2624 {
2625 StatInfo *info = 0;
2626 response->Get( info );
2627 delete self->pStatInfo;
2628 self->pStatInfo = new StatInfo( *info );
2629 break;
2630 }
2631
2632 //------------------------------------------------------------------------
2633 // Handle read response
2634 //------------------------------------------------------------------------
2635 case kXR_read:
2636 {
2637 ++self->pRCount;
2638 self->pRBytes += req->read.rlen;
2639 break;
2640 }
2641
2642 //------------------------------------------------------------------------
2643 // Handle read response
2644 //------------------------------------------------------------------------
2645 case kXR_pgread:
2646 {
2647 ++self->pRCount;
2648 self->pRBytes += req->pgread.rlen;
2649 break;
2650 }
2651
2652 //------------------------------------------------------------------------
2653 // Handle readv response
2654 //------------------------------------------------------------------------
2655 case kXR_readv:
2656 {
2657 ++self->pVRCount;
2658 size_t segs = req->header.dlen/sizeof(readahead_list);
2659 readahead_list *dataChunk = (readahead_list*)message->GetBuffer( 24 );
2660 for( size_t i = 0; i < segs; ++i )
2661 self->pVRBytes += dataChunk[i].rlen;
2662 self->pVSegs += segs;
2663 break;
2664 }
2665
2666 //------------------------------------------------------------------------
2667 // Handle write response
2668 //------------------------------------------------------------------------
2669 case kXR_write:
2670 {
2671 ++self->pWCount;
2672 self->pWBytes += req->write.dlen;
2673 break;
2674 }
2675
2676 //------------------------------------------------------------------------
2677 // Handle write response
2678 //------------------------------------------------------------------------
2679 case kXR_pgwrite:
2680 {
2681 ++self->pWCount;
2682 self->pWBytes += req->pgwrite.dlen;
2683 break;
2684 }
2685
2686 //------------------------------------------------------------------------
2687 // Handle writev response
2688 //------------------------------------------------------------------------
2689 case kXR_writev:
2690 {
2691 ++self->pVWCount;
2692 size_t size = req->header.dlen/sizeof(readahead_list);
2693 XrdProto::write_list *wrtList =
2694 reinterpret_cast<XrdProto::write_list*>( message->GetBuffer( 24 ) );
2695 for( size_t i = 0; i < size; ++i )
2696 self->pVWBytes += wrtList[i].wlen;
2697 break;
2698 }
2699 };
2700 }
2701
2702 //------------------------------------------------------------------------
2704 //------------------------------------------------------------------------
2705 void FileStateHandler::Tick( time_t now )
2706 {
2707 if (pMutex.CondLock())
2708 {TimeOutRequests( now );
2709 pMutex.UnLock();
2710 }
2711 }
2712
2713 //----------------------------------------------------------------------------
2714 // Declare timeout on requests being recovered
2715 //----------------------------------------------------------------------------
2717 {
2718 if( !pToBeRecovered.empty() )
2719 {
2720 Log *log = DefaultEnv::GetLog();
2721 log->Dump( FileMsg, "[%p@%s] Got a timer event", (void*)this,
2722 pFileUrl->GetObfuscatedURL().c_str() );
2723 RequestList::iterator it;
2725 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2726 {
2727 if( it->params.expires <= now )
2728 {
2729 jobMan->QueueJob( new ResponseJob(
2730 it->handler,
2732 0, it->params.hostList ) );
2733 it = pToBeRecovered.erase( it );
2734 }
2735 else
2736 ++it;
2737 }
2738 }
2739 }
2740
2741 //----------------------------------------------------------------------------
2742 // Called in the child process after the fork
2743 //----------------------------------------------------------------------------
2745 {
2746 Log *log = DefaultEnv::GetLog();
2747
2748 if( pFileState == Closed || pFileState == Error )
2749 return;
2750
2751 if( (IsReadOnly() && pDoRecoverRead) ||
2752 (!IsReadOnly() && pDoRecoverWrite) )
2753 {
2754 log->Debug( FileMsg, "[%p@%s] Putting the file in recovery state in "
2755 "process %d", (void*)this, pFileUrl->GetObfuscatedURL().c_str(), getpid() );
2756 pFileState = Recovering;
2757 pInTheFly.clear();
2758 pToBeRecovered.clear();
2759 }
2760 else
2761 pFileState = Error;
2762 }
2763
2764 //------------------------------------------------------------------------
2765 // Try other data server
2766 //------------------------------------------------------------------------
2767 XRootDStatus FileStateHandler::TryOtherServer( std::shared_ptr<FileStateHandler> &self, uint16_t timeout )
2768 {
2769 XrdSysMutexHelper scopedLock( self->pMutex );
2770
2771 if( self->pFileState != Opened || !self->pLoadBalancer )
2773
2774 self->pFileState = Recovering;
2775
2776 Log *log = DefaultEnv::GetLog();
2777 log->Debug( FileMsg, "[%p@%s] Reopen file at next data server.",
2778 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
2779
2780 // merge CGI
2781 auto lbcgi = self->pLoadBalancer->GetParams();
2782 auto dtcgi = self->pDataServer->GetParams();
2783 MessageUtils::MergeCGI( lbcgi, dtcgi, false );
2784 // update tried CGI
2785 auto itr = lbcgi.find( "tried" );
2786 if( itr == lbcgi.end() )
2787 lbcgi["tried"] = self->pDataServer->GetHostName();
2788 else
2789 {
2790 std::string tried = itr->second;
2791 tried += "," + self->pDataServer->GetHostName();
2792 lbcgi["tried"] = tried;
2793 }
2794 self->pLoadBalancer->SetParams( lbcgi );
2795
2796 return ReOpenFileAtServer( self, *self->pLoadBalancer, timeout );
2797 }
2798
2799 //------------------------------------------------------------------------
2800 // Generic implementation of xattr operation
2801 //------------------------------------------------------------------------
2802 template<typename T>
2803 Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &self,
2804 kXR_char subcode,
2805 kXR_char options,
2806 const std::vector<T> &attrs,
2807 ResponseHandler *handler,
2808 uint16_t timeout )
2809 {
2810 //--------------------------------------------------------------------------
2811 // Issue a new fattr request
2812 //--------------------------------------------------------------------------
2813 Message *msg;
2814 ClientFattrRequest *req;
2815 MessageUtils::CreateRequest( msg, req );
2816
2817 req->requestid = kXR_fattr;
2818 req->subcode = subcode;
2819 req->numattr = attrs.size();
2820 req->options = options;
2821 memcpy( req->fhandle, self->pFileHandle, 4 );
2823 if( !st.IsOK() ) return st;
2824
2825 MessageSendParams params;
2826 params.timeout = timeout;
2827 params.followRedirects = false;
2828 params.stateful = true;
2830
2832 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2833
2834 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2835 }
2836
2837 //----------------------------------------------------------------------------
2838 // Send a message to a host or put it in the recovery queue
2839 //----------------------------------------------------------------------------
2840 Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &self,
2841 const URL &url,
2842 Message *msg,
2843 ResponseHandler *handler,
2844 MessageSendParams &sendParams )
2845 {
2846 //--------------------------------------------------------------------------
2847 // Recovering
2848 //--------------------------------------------------------------------------
2849 if( self->pFileState == Recovering )
2850 {
2851 return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2852 }
2853
2854 //--------------------------------------------------------------------------
2855 // Trying to send
2856 //--------------------------------------------------------------------------
2857 if( self->pFileState == Opened )
2858 {
2859 msg->SetSessionId( self->pSessionId );
2860 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
2861
2862 //------------------------------------------------------------------------
2863 // Invalid session id means that the connection has been broken while we
2864 // were idle so we haven't been informed about this fact earlier.
2865 //------------------------------------------------------------------------
2866 if( !st.IsOK() && st.code == errInvalidSession && self->IsRecoverable( st ) )
2867 return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2868
2869 if( st.IsOK() )
2870 self->pInTheFly.insert(msg);
2871 else
2872 delete handler;
2873 return st;
2874 }
2875 return Status( stError, errInvalidOp );
2876 }
2877
2878 //----------------------------------------------------------------------------
2879 // Check if the stateful error is recoverable
2880 //----------------------------------------------------------------------------
2881 bool FileStateHandler::IsRecoverable( const XRootDStatus &status ) const
2882 {
2883 const auto recoverable_errors = {
2890 };
2891
2892 if (pDoRecoverRead || pDoRecoverWrite)
2893 for (const auto error : recoverable_errors)
2894 if (status.code == error)
2895 return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
2896
2897 return false;
2898 }
2899
2900 //----------------------------------------------------------------------------
2901 // Check if the file is open for read only
2902 //----------------------------------------------------------------------------
2903 bool FileStateHandler::IsReadOnly() const
2904 {
2905 if( (pOpenFlags & kXR_open_read) && !(pOpenFlags & kXR_open_updt) &&
2906 !(pOpenFlags & kXR_open_apnd ) )
2907 return true;
2908 return false;
2909 }
2910
2911 //----------------------------------------------------------------------------
2912 // Recover a message
2913 //----------------------------------------------------------------------------
2914 Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &self,
2915 RequestData rd,
2916 bool callbackOnFailure )
2917 {
2918 self->pFileState = Recovering;
2919
2920 Log *log = DefaultEnv::GetLog();
2921 log->Dump( FileMsg, "[%p@%s] Putting message %s in the recovery list",
2922 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2923 rd.request->GetObfuscatedDescription().c_str() );
2924
2925 Status st = RunRecovery( self );
2926 if( st.IsOK() )
2927 {
2928 self->pToBeRecovered.push_back( rd );
2929 return st;
2930 }
2931
2932 if( callbackOnFailure )
2933 self->FailMessage( rd, st );
2934
2935 return st;
2936 }
2937
2938 //----------------------------------------------------------------------------
2939 // Run the recovery procedure if appropriate
2940 //----------------------------------------------------------------------------
2941 Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &self )
2942 {
2943 if( self->pFileState != Recovering )
2944 return Status();
2945
2946 if( !self->pInTheFly.empty() )
2947 return Status();
2948
2949 Log *log = DefaultEnv::GetLog();
2950 log->Debug( FileMsg, "[%p@%s] Running the recovery procedure", (void*)self.get(),
2951 self->pFileUrl->GetObfuscatedURL().c_str() );
2952
2953 Status st;
2954 if( self->pStateRedirect )
2955 {
2956 SendClose( self, 0 );
2957 st = ReOpenFileAtServer( self, *self->pStateRedirect, 0 );
2958 delete self->pStateRedirect; self->pStateRedirect = 0;
2959 }
2960 else if( self->IsReadOnly() && self->pLoadBalancer )
2961 st = ReOpenFileAtServer( self, *self->pLoadBalancer, 0 );
2962 else
2963 st = ReOpenFileAtServer( self, *self->pDataServer, 0 );
2964
2965 if( !st.IsOK() )
2966 {
2967 self->pFileState = Error;
2968 self->pStatus = st;
2969 self->FailQueuedMessages( st );
2970 }
2971
2972 return st;
2973 }
2974
2975 //----------------------------------------------------------------------------
2976 // Send a close and ignore the response
2977 //----------------------------------------------------------------------------
2978 XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &self,
2979 uint16_t timeout )
2980 {
2981 Message *msg;
2982 ClientCloseRequest *req;
2983 MessageUtils::CreateRequest( msg, req );
2984
2985 req->requestid = kXR_close;
2986 memcpy( req->fhandle, self->pFileHandle, 4 );
2987
2989 msg->SetSessionId( self->pSessionId );
2990 ResponseHandler *handler = ResponseHandler::Wrap(
2991 [self]( XRootDStatus&, AnyObject& ) mutable { self.reset(); } );
2992 MessageSendParams params;
2993 params.timeout = timeout;
2994 params.followRedirects = false;
2995 params.stateful = true;
2996
2998
2999 return self->IssueRequest( *self->pDataServer, msg, handler, params );
3000 }
3001
3002 //----------------------------------------------------------------------------
3003 // Re-open the current file at a given server
3004 //----------------------------------------------------------------------------
3005 XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &self,
3006 const URL &url,
3007 uint16_t timeout )
3008 {
3009 Log *log = DefaultEnv::GetLog();
3010 log->Dump( FileMsg, "[%p@%s] Sending a recovery open command to %s",
3011 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(), url.GetObfuscatedURL().c_str() );
3012
3013 //--------------------------------------------------------------------------
3014 // Remove the kXR_delete and kXR_new flags, as we don't want the recovery
3015 // procedure to delete a file that has been partially updated or fail it
3016 // because a partially uploaded file already exists.
3017 //--------------------------------------------------------------------------
3018 if( self->pOpenFlags & kXR_delete)
3019 {
3020 self->pOpenFlags &= ~kXR_delete;
3021 self->pOpenFlags |= kXR_open_updt;
3022 }
3023
3024 self->pOpenFlags &= ~kXR_new;
3025
3026 Message *msg;
3027 ClientOpenRequest *req;
3028 URL u = url;
3029
3030 if( url.GetPath().empty() )
3031 u.SetPath( self->pFileUrl->GetPath() );
3032
3033 std::string path = u.GetPathWithFilteredParams();
3034 MessageUtils::CreateRequest( msg, req, path.length() );
3035
3036 req->requestid = kXR_open;
3037 req->mode = self->pOpenMode;
3038 req->options = self->pOpenFlags;
3039 req->dlen = path.length();
3040 msg->Append( path.c_str(), path.length(), 24 );
3041
3042 // create a new reopen handler
3043 // (it is not assigned to 'pReOpenHandler' in order not to bump the reference counter
3044 // until we know that 'SendMessage' was successful)
3045 OpenHandler *openHandler = new OpenHandler( self, 0 );
3046 MessageSendParams params; params.timeout = timeout;
3049
3050 //--------------------------------------------------------------------------
3051 // Issue the open request
3052 //--------------------------------------------------------------------------
3053 XRootDStatus st = self->IssueRequest( url, msg, openHandler, params );
3054
3055 // if there was a problem destroy the open handler
3056 if( !st.IsOK() )
3057 {
3058 delete openHandler;
3059 self->pStatus = st;
3060 self->pFileState = Closed;
3061 }
3062 return st;
3063 }
3064
3065 //------------------------------------------------------------------------
3066 // Fail a message
3067 //------------------------------------------------------------------------
3068 void FileStateHandler::FailMessage( RequestData rd, XRootDStatus status )
3069 {
3070 Log *log = DefaultEnv::GetLog();
3071 log->Dump( FileMsg, "[%p@%s] Failing message %s with %s",
3072 (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
3073 rd.request->GetObfuscatedDescription().c_str(),
3074 status.ToStr().c_str() );
3075
3076 StatefulHandler *sh = dynamic_cast<StatefulHandler*>(rd.handler);
3077 if( !sh )
3078 {
3079 Log *log = DefaultEnv::GetLog();
3080 log->Error( FileMsg, "[%p@%s] Internal error while recovering %s",
3081 (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
3082 rd.request->GetObfuscatedDescription().c_str() );
3083 return;
3084 }
3085
3086 JobManager *jobMan = DefaultEnv::GetPostMaster()->GetJobManager();
3087 ResponseHandler *userHandler = sh->GetUserHandler();
3088 jobMan->QueueJob( new ResponseJob(
3089 userHandler,
3090 new XRootDStatus( status ),
3091 0, rd.params.hostList ) );
3092
3093 delete sh;
3094 }
3095
3096 //----------------------------------------------------------------------------
3097 // Fail queued messages
3098 //----------------------------------------------------------------------------
3099 void FileStateHandler::FailQueuedMessages( XRootDStatus status )
3100 {
3101 RequestList::iterator it;
3102 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3103 FailMessage( *it, status );
3104 pToBeRecovered.clear();
3105 }
3106
3107 //------------------------------------------------------------------------
3108 // Re-send queued messages
3109 //------------------------------------------------------------------------
3110 void FileStateHandler::ReSendQueuedMessages()
3111 {
3112 RequestList::iterator it;
3113 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3114 {
3115 it->request->SetSessionId( pSessionId );
3116 ReWriteFileHandle( it->request );
3117 XRootDStatus st = IssueRequest( *pDataServer, it->request,
3118 it->handler, it->params );
3119 if( !st.IsOK() )
3120 FailMessage( *it, st );
3121 }
3122 pToBeRecovered.clear();
3123 }
3124
3125 //------------------------------------------------------------------------
3126 // Re-write file handle
3127 //------------------------------------------------------------------------
3128 void FileStateHandler::ReWriteFileHandle( Message *msg )
3129 {
3130 ClientRequestHdr *hdr = (ClientRequestHdr*)msg->GetBuffer();
3131 switch( hdr->requestid )
3132 {
3133 case kXR_read:
3134 {
3135 ClientReadRequest *req = (ClientReadRequest*)msg->GetBuffer();
3136 memcpy( req->fhandle, pFileHandle, 4 );
3137 break;
3138 }
3139 case kXR_write:
3140 {
3141 ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
3142 memcpy( req->fhandle, pFileHandle, 4 );
3143 break;
3144 }
3145 case kXR_sync:
3146 {
3147 ClientSyncRequest *req = (ClientSyncRequest*)msg->GetBuffer();
3148 memcpy( req->fhandle, pFileHandle, 4 );
3149 break;
3150 }
3151 case kXR_truncate:
3152 {
3153 ClientTruncateRequest *req = (ClientTruncateRequest*)msg->GetBuffer();
3154 memcpy( req->fhandle, pFileHandle, 4 );
3155 break;
3156 }
3157 case kXR_readv:
3158 {
3159 ClientReadVRequest *req = (ClientReadVRequest*)msg->GetBuffer();
3160 readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
3161 for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3162 memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3163 break;
3164 }
3165 case kXR_writev:
3166 {
3167 ClientWriteVRequest *req =
3168 reinterpret_cast<ClientWriteVRequest*>( msg->GetBuffer() );
3169 XrdProto::write_list *wrtList =
3170 reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
3171 size_t size = req->dlen / sizeof(XrdProto::write_list);
3172 for( size_t i = 0; i < size; ++i )
3173 memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3174 break;
3175 }
3176 case kXR_pgread:
3177 {
3178 ClientPgReadRequest *req = (ClientPgReadRequest*) msg->GetBuffer();
3179 memcpy( req->fhandle, pFileHandle, 4 );
3180 break;
3181 }
3182 case kXR_pgwrite:
3183 {
3184 ClientPgWriteRequest *req = (ClientPgWriteRequest*) msg->GetBuffer();
3185 memcpy( req->fhandle, pFileHandle, 4 );
3186 break;
3187 }
3188 }
3189
3190 Log *log = DefaultEnv::GetLog();
3191 log->Dump( FileMsg, "[%p@%s] Rewritten file handle for %s to %#x",
3192 (void*)this, pFileUrl->GetObfuscatedURL().c_str(), msg->GetObfuscatedDescription().c_str(),
3193 *((uint32_t*)pFileHandle) );
3195 }
3196
3197 //----------------------------------------------------------------------------
3198 // Dispatch monitoring information on close
3199 //----------------------------------------------------------------------------
3200 void FileStateHandler::MonitorClose( const XRootDStatus *status )
3201 {
3202 Monitor *mon = DefaultEnv::GetMonitor();
3203 if( mon )
3204 {
3205 Monitor::CloseInfo i;
3206 i.file = pFileUrl;
3207 i.oTOD = pOpenTime;
3208 gettimeofday( &i.cTOD, 0 );
3209 i.rBytes = pRBytes;
3210 i.vrBytes = pVRBytes;
3211 i.wBytes = pWBytes;
3212 i.vwBytes = pVWBytes;
3213 i.vSegs = pVSegs;
3214 i.rCount = pRCount;
3215 i.vCount = pVRCount;
3216 i.wCount = pWCount;
3217 i.status = status;
3218 mon->Event( Monitor::EvClose, &i );
3219 }
3220 }
3221
3222 XRootDStatus FileStateHandler::IssueRequest( const URL &url,
3223 Message *msg,
3224 ResponseHandler *handler,
3225 MessageSendParams &sendParams )
3226 {
3227 // first handle Metalinks
3228 if( pUseVirtRedirector && url.IsMetalink() )
3229 return MessageUtils::RedirectMessage( url, msg, handler,
3230 sendParams, pLFileHandler );
3231
3232 // than local file access
3233 if( url.IsLocalFile() )
3234 return pLFileHandler->ExecRequest( url, msg, handler, sendParams );
3235
3236 // and finally ordinary XRootD requests
3237 return MessageUtils::SendMessage( url, msg, handler,
3238 sendParams, pLFileHandler );
3239 }
3240
3241 //------------------------------------------------------------------------
3242 // Send a write request with payload being stored in a kernel buffer
3243 //------------------------------------------------------------------------
3244 XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &self,
3245 uint64_t offset,
3246 uint32_t length,
3247 std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3248 ResponseHandler *handler,
3249 uint16_t timeout )
3250 {
3251 //--------------------------------------------------------------------------
3252 // Create the write request
3253 //--------------------------------------------------------------------------
3254 XrdSysMutexHelper scopedLock( self->pMutex );
3255
3256 if( self->pFileState != Opened && self->pFileState != Recovering )
3257 return XRootDStatus( stError, errInvalidOp );
3258
3259 Log *log = DefaultEnv::GetLog();
3260 log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
3261 (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3262 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3263
3264 Message *msg;
3265 ClientWriteRequest *req;
3266 MessageUtils::CreateRequest( msg, req );
3267
3268 req->requestid = kXR_write;
3269 req->offset = offset;
3270 req->dlen = length;
3271 memcpy( req->fhandle, self->pFileHandle, 4 );
3272
3273 MessageSendParams params;
3274 params.timeout = timeout;
3275 params.followRedirects = false;
3276 params.stateful = true;
3277 params.kbuff = kbuff.release();
3278 params.chunkList = new ChunkList();
3279
3281
3283 StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
3284
3285 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3286 }
3287}
kXR_unt16 requestid
Definition XProtocol.hh:479
kXR_unt16 requestid
Definition XProtocol.hh:630
kXR_unt16 requestid
Definition XProtocol.hh:806
@ kXR_fattrDel
Definition XProtocol.hh:270
@ kXR_fattrSet
Definition XProtocol.hh:273
@ kXR_fattrList
Definition XProtocol.hh:272
@ kXR_fattrGet
Definition XProtocol.hh:271
#define kXR_suppgrw
kXR_char fhandle[4]
Definition XProtocol.hh:531
kXR_char fhandle[4]
Definition XProtocol.hh:782
struct ClientPgReadRequest pgread
Definition XProtocol.hh:861
kXR_char fhandle[4]
Definition XProtocol.hh:807
kXR_char fhandle[4]
Definition XProtocol.hh:771
kXR_unt16 requestid
Definition XProtocol.hh:644
@ kXR_virtReadv
Definition XProtocol.hh:150
kXR_unt16 options
Definition XProtocol.hh:481
static const int kXR_ckpXeq
Definition XProtocol.hh:216
struct ClientPgWriteRequest pgwrite
Definition XProtocol.hh:862
kXR_unt16 requestid
Definition XProtocol.hh:228
@ kXR_async
Definition XProtocol.hh:458
@ kXR_delete
Definition XProtocol.hh:453
@ kXR_open_read
Definition XProtocol.hh:456
@ kXR_open_updt
Definition XProtocol.hh:457
@ kXR_open_apnd
Definition XProtocol.hh:462
@ kXR_retstat
Definition XProtocol.hh:463
struct ClientRequestHdr header
Definition XProtocol.hh:846
kXR_char fhandle[4]
Definition XProtocol.hh:509
#define kXR_recoverWrts
kXR_char fhandle[4]
Definition XProtocol.hh:645
kXR_char fhandle[4]
Definition XProtocol.hh:229
kXR_unt16 requestid
Definition XProtocol.hh:157
kXR_char fhandle[4]
Definition XProtocol.hh:633
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_fattr
Definition XProtocol.hh:132
@ kXR_query
Definition XProtocol.hh:113
@ kXR_write
Definition XProtocol.hh:131
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_pgread
Definition XProtocol.hh:142
@ kXR_chkpoint
Definition XProtocol.hh:124
@ kXR_close
Definition XProtocol.hh:115
@ kXR_pgwrite
Definition XProtocol.hh:138
struct ClientReadRequest read
Definition XProtocol.hh:867
kXR_int32 rlen
Definition XProtocol.hh:660
kXR_unt16 requestid
Definition XProtocol.hh:768
kXR_unt16 requestid
Definition XProtocol.hh:781
kXR_int64 offset
Definition XProtocol.hh:661
#define kXR_PROTPGRWVERSION
Definition XProtocol.hh:73
struct ClientWriteRequest write
Definition XProtocol.hh:876
kXR_unt16 requestid
Definition XProtocol.hh:670
@ kXR_Qopaqug
Definition XProtocol.hh:625
@ kXR_Qvisa
Definition XProtocol.hh:622
unsigned char kXR_char
Definition XPtypes.hh:65
static int mapError(int rc)
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
uint32_t GetSize() const
Get the size of the message.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
An interface for file plug-ins.
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, uint16_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
FileStateHandler(FilePlugIn *&plugin)
Constructor.
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, uint16_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Opened
Opening has succeeded.
@ Recovering
Recovering from an error.
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool IsOpen() const
Check if the file is open.
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, uint16_t timeout)
Try other data server.
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
An abstract class to describe the client-side monitoring plugin interface.
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes)
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
PgReadSubstitutionHandler(XrdCl::ResponseHandler *a, bool isHttps)
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Call the user callback.
Object stat info.
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
bool IsMetalink() const
Is it a URL to a metalink.
Definition XrdClURL.cc:465
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
Definition XrdClURL.cc:331
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
Definition XrdClURL.cc:498
void SetPath(const std::string &path)
Set the path.
Definition XrdClURL.hh:225
bool IsLocalFile() const
Definition XrdClURL.cc:474
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
const uint16_t errTlsError
const uint16_t errOperationExpired
const uint16_t errPollerError
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInProgress
const uint16_t errSocketTimeout
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidOp
const uint64_t FileMsg
const uint16_t suAlreadyDone
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
const uint16_t errInvalidArgs
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
const uint16_t errSocketError
const uint16_t errOperationInterrupted
const uint16_t errInvalidSession
T & To(AnyObject &any)
const uint16_t errRedirect
const uint16_t errSocketDisconnected
Response NullRef< Response >::value
XrdSysError Log
Definition XrdConfig.cc:113
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
static const int aData
Definition XProtocol.hh:298
kXR_char fhandle[4]
Definition XProtocol.hh:288
kXR_unt16 requestid
Definition XProtocol.hh:287
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted