XRootD
Loading...
Searching...
No Matches
XrdCephOssBufferedFile.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2014-2015 by European Organization for Nuclear Research (CERN)
3// Author: Sebastien Ponce <sebastien.ponce@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#include <sys/types.h>
26#include <unistd.h>
27#include <sstream>
28#include <iostream>
29#include <fcntl.h>
30#include <iomanip>
31#include <new>
32#include <ctime>
33#include <chrono>
34#include <thread>
35
37#include "XrdOuc/XrdOucEnv.hh"
38#include "XrdSys/XrdSysError.hh"
39#include "XrdOuc/XrdOucTrace.hh"
40#include "XrdSfs/XrdSfsAio.hh"
42
48
49#include <thread>
50
51using namespace XrdCephBuffer;
52using namespace std::chrono_literals;
53
56
57
59 size_t buffersize,const std::string& bufferIOmode,
60 size_t maxNumberSimulBuffers):
61 XrdCephOssFile(cephoss), m_cephoss(cephoss), m_xrdOssDF(cephossDF),
62 m_maxCountReadBuffers(maxNumberSimulBuffers),
64 m_bufsize(buffersize),
65 m_bufferIOmode(bufferIOmode)
66{
67
68}
69
71 // XrdCephEroute.Say("XrdCephOssBufferedFile::Destructor");
72
73 // remember to delete the inner XrdCephOssFile object
74 if (m_xrdOssDF) {
75 delete m_xrdOssDF;
76 m_xrdOssDF = nullptr;
77 }
78
79}
80
81
82int XrdCephOssBufferedFile::Open(const char *path, int flags, mode_t mode, XrdOucEnv &env) {
83
84 int rc = m_xrdOssDF->Open(path, flags, mode, env);
85 if (rc < 0) {
86 return rc;
87 }
88 m_fd = m_xrdOssDF->getFileDescriptor();
89 BUFLOG("XrdCephOssBufferedFile::Open got fd: " << m_fd << " " << path);
90 m_flags = flags; // e.g. for write/read knowledge
91 m_path = path; // good to keep the path for final stats presentation
92
93
94 // start the timer
95 //m_timestart = std::chrono::steady_clock::now();
96 m_timestart = std::chrono::system_clock::now();
97 // return the file descriptor
98 return rc;
99}
100
101int XrdCephOssBufferedFile::Close(long long *retsz) {
102 // if data is still in the buffer and we are writing, make sure to write it
103 if (m_bufferAlg && ((m_flags & O_ACCMODE) != O_RDONLY)) {
104 ssize_t rc = m_bufferAlg->flushWriteCache();
105 if (rc < 0) {
106 LOGCEPH( "XrdCephOssBufferedFile::Close: flush Error fd: " << m_fd << " rc:" << rc );
107 // still try to close the file
108 ssize_t rc2 = m_xrdOssDF->Close(retsz);
109 if (rc2 < 0) {
110 LOGCEPH( "XrdCephOssBufferedFile::Close: Close error after flush Error fd: " << m_fd << " rc:" << rc2 );
111 }
112 return rc; // return the original flush error
113 } else {
114 LOGCEPH( "XrdCephOssBufferedFile::Close: Flushed data on close fd: " << m_fd << " rc:" << rc );
115 }
116 } // check for write
117 const std::chrono::time_point<std::chrono::system_clock> now =
118 std::chrono::system_clock::now();
119 const std::time_t t_s = std::chrono::system_clock::to_time_t(m_timestart);
120 const std::time_t t_c = std::chrono::system_clock::to_time_t(now);
121
122 auto t_dur = std::chrono::duration_cast<std::chrono::milliseconds>(now - m_timestart).count();
123
124 LOGCEPH("XrdCephOssBufferedFile::Summary: {\"fd\":" << m_fd << ", \"Elapsed_time_ms\":" << t_dur
125 << ", \"path\":\"" << m_path
126 << "\", read_B:" << m_bytesRead.load()
127 << ", readV_B:" << m_bytesReadV.load()
128 << ", readAIO_B:" << m_bytesReadAIO.load()
129 << ", writeB:" << m_bytesWrite.load()
130 << ", writeAIO_B:" << m_bytesWriteAIO.load()
131 << ", startTime:\"" << std::put_time(std::localtime(&t_s), "%F %T") << "\", endTime:\""
132 << std::put_time(std::localtime(&t_c), "%F %T") << "\""
133 << ", nBuffersRead:" << m_bufferReadAlgs.size()
134 << "}");
135
136 return m_xrdOssDF->Close(retsz);
137}
138
139
141 // don't touch readV in the buffering method
142 ssize_t rc = m_xrdOssDF->ReadV(readV,rnum);
143 if (rc > 0) m_bytesReadV.fetch_add(rc);
144 return rc;
145}
146
147ssize_t XrdCephOssBufferedFile::Read(off_t offset, size_t blen) {
148 return m_xrdOssDF->Read(offset, blen);
149}
150
151ssize_t XrdCephOssBufferedFile::Read(void *buff, off_t offset, size_t blen) {
152 size_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id());
153
154 IXrdCephBufferAlg * buffer{nullptr};
155 // check for, and create if needed, a buffer
156 {
157 // lock in case need to create a new algorithm instance
158 const std::lock_guard<std::mutex> lock(m_buf_mutex);
159 auto buffer_itr = m_bufferReadAlgs.find(thread_id);
160 if (buffer_itr == m_bufferReadAlgs.end()) {
161 // only create a buffer, if we haven't hit the max buffers yet
162 auto buffer_ptr = createBuffer();
163 if (buffer_ptr) {
164 buffer = buffer_ptr.get();
165 m_bufferReadAlgs[thread_id] = std::move(buffer_ptr);
166 } else {
167 // if we can't create a buffer, we just have to pass through the read ...
168 ssize_t rc = m_xrdOssDF->Read(buff, offset, blen);
169 if (rc >= 0) {
170 LOGCEPH( "XrdCephOssBufferedFile::Read buffers and read failed with rc: " << rc );
171 }
172 return rc;
173 }
174 } else {
175 buffer = buffer_itr->second.get();
176 }
177 } // scope of lock
178
179 int retry_counter{m_maxBufferRetries};
180 ssize_t rc {0};
181 while (retry_counter > 0) {
182 rc = buffer->read(buff, offset, blen);
183 if (rc != -EBUSY) break; // either worked, or is a real non busy error
184 LOGCEPH( "XrdCephOssBufferedFile::Read Received EBUSY for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Sleeping .. "
185 << " rc:" << rc << " off:" << offset << " len:" << blen);
186 std::this_thread::sleep_for(m_maxBufferRetrySleepTime_ms * 1ms);
187 --retry_counter;
188 }
189 if (retry_counter == 0) {
190 // reach maximum attempts for ebusy retry; fail the job
191 LOGCEPH( "XrdCephOssBufferedFile::Read Max attempts for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Terminating with -EIO: "
192 << " rc:" << rc << " off:" << offset << " len:" << blen );
193 // set a permanent error code:
194 rc = -EIO;
195 }
196 if (rc >=0) {
197 m_bytesRead.fetch_add(rc);
198 } else {
199 LOGCEPH( "XrdCephOssBufferedFile::Read: Read error fd: " << m_fd << " rc:" << rc << " off:" << offset << " len:" << blen);
200 }
201 // LOGCEPH( "XrdCephOssBufferedFile::Read: Read good fd: " << m_fd << " rc:" << rc << " off:" << offset << " len:" << blen);
202 return rc;
203}
204
206 size_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id());
207 IXrdCephBufferAlg * buffer{nullptr};
208 // check for, and create if needed, a buffer
209 {
210 // lock in case need to create a new algorithm instance
211 const std::lock_guard<std::mutex> lock(m_buf_mutex);
212 auto buffer_itr = m_bufferReadAlgs.find(thread_id);
213 if (buffer_itr == m_bufferReadAlgs.end()) {
214 m_bufferReadAlgs[thread_id] = createBuffer();
215 buffer = m_bufferReadAlgs.find(thread_id)->second.get();
216 } else {
217 buffer = buffer_itr->second.get();
218 }
219 }
220
221 // LOGCEPH("XrdCephOssBufferedFile::AIOREAD: fd: " << m_xrdOssDF->getFileDescriptor() << " " << time(nullptr) << " : "
222 // << aiop->sfsAio.aio_offset << " "
223 // << aiop->sfsAio.aio_nbytes << " " << aiop->sfsAio.aio_reqprio << " "
224 // << aiop->sfsAio.aio_fildes );
225 ssize_t rc = buffer->read_aio(aiop);
226 if (rc > 0) {
227 m_bytesReadAIO.fetch_add(rc);
228 } else {
229 LOGCEPH( "XrdCephOssBufferedFile::Read: ReadAIO error fd: " << m_fd << " rc:" << rc
230 << " off:" << aiop->sfsAio.aio_offset << " len:" << aiop->sfsAio.aio_nbytes );
231 }
232 return rc;
233}
234
235ssize_t XrdCephOssBufferedFile::ReadRaw(void *buff, off_t offset, size_t blen) {
236 // #TODO; ReadRaw should bypass the buffer ?
237 return m_xrdOssDF->ReadRaw(buff, offset, blen);
238}
239
241 return m_xrdOssDF->Fstat(buff);
242}
243
244ssize_t XrdCephOssBufferedFile::Write(const void *buff, off_t offset, size_t blen) {
245
246 if (!m_bufferAlg) {
248 if (!m_bufferAlg) {
249 LOGCEPH( "XrdCephOssBufferedFile: Error in creating buffered object");
250 return -EINVAL;
251 }
252 }
253
254
255 int retry_counter{m_maxBufferRetries};
256 ssize_t rc {0};
257 while (retry_counter > 0) {
258 rc = m_bufferAlg->write(buff, offset, blen);
259 if (rc != -EBUSY) break; // either worked, or is a real non busy error
260 LOGCEPH( "XrdCephOssBufferedFile::Write Received EBUSY for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Sleeping .. "
261 << " rc:" << rc << " off:" << offset << " len:" << blen);
262 std::this_thread::sleep_for(m_maxBufferRetrySleepTime_ms * 1ms);
263 --retry_counter;
264 }
265 if (retry_counter == 0) {
266 // reach maximum attempts for ebusy retry; fail the job
267 LOGCEPH( "XrdCephOssBufferedFile::Write Max attempts for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Terminating with -EIO: "
268 << " rc:" << rc << " off:" << offset << " len:" << blen );
269 // set a permanent error code:
270 rc = -EIO;
271 }
272 if (rc >=0) {
273 m_bytesWrite.fetch_add(rc);
274 } else {
275 LOGCEPH( "XrdCephOssBufferedFile::Write: Write error fd: " << m_fd << " rc:" << rc << " off:" << offset << " len:" << blen);
276 }
277 return rc;
278}
279
281 if (!m_bufferAlg) {
283 if (!m_bufferAlg) {
284 LOGCEPH( "XrdCephOssBufferedFile: Error in creating buffered object");
285 return -EINVAL;
286 }
287 }
288
289 // LOGCEPH("XrdCephOssBufferedFile::AIOWRITE: fd: " << m_xrdOssDF->getFileDescriptor() << " " << time(nullptr) << " : "
290 // << aiop->sfsAio.aio_offset << " "
291 // << aiop->sfsAio.aio_nbytes << " " << aiop->sfsAio.aio_reqprio << " "
292 // << aiop->sfsAio.aio_fildes << " " );
293 ssize_t rc = m_bufferAlg->write_aio(aiop);
294 if (rc > 0) {
295 m_bytesWriteAIO.fetch_add(rc);
296 } else {
297 LOGCEPH( "XrdCephOssBufferedFile::Write: WriteAIO error fd: " << m_fd << " rc:" << rc
298 << " off:" << aiop->sfsAio.aio_offset << " len:" << aiop->sfsAio.aio_nbytes );
299 }
300 return rc;
301
302}
303
305 return m_xrdOssDF->Fsync();
306}
307
308int XrdCephOssBufferedFile::Ftruncate(unsigned long long len) {
309 return m_xrdOssDF->Ftruncate(len);
310}
311
312
313std::unique_ptr<XrdCephBuffer::IXrdCephBufferAlg> XrdCephOssBufferedFile::createBuffer() {
314 std::unique_ptr<IXrdCephBufferAlg> bufferAlg;
315
316 size_t bufferSize {m_bufsize}; // create buffer of default size
318 BUFLOG("XrdCephOssBufferedFile: buffer reached max number of simul-buffers for this file: creating only 1MiB buffer" );
319 bufferSize = 1048576;
320 } else {
321 BUFLOG("XrdCephOssBufferedFile: buffer: got " << m_bufferReadAlgs.size() << " buffers already");
322 }
323
324 try {
325 std::unique_ptr<IXrdCephBufferData> cephbuffer = std::unique_ptr<IXrdCephBufferData>(new XrdCephBufferDataSimple(bufferSize));
326 std::unique_ptr<ICephIOAdapter> cephio;
327 if (m_bufferIOmode == "aio") {
328 cephio = std::unique_ptr<ICephIOAdapter>(new CephIOAdapterAIORaw(cephbuffer.get(),m_fd));
329 } else if (m_bufferIOmode == "io") {
330 cephio = std::unique_ptr<ICephIOAdapter>(new CephIOAdapterRaw(cephbuffer.get(),m_fd,
331 !m_cephoss->m_useDefaultPreadAlg));
332 } else {
333 BUFLOG("XrdCephOssBufferedFile: buffer mode needs to be one of aio|io " );
334 m_xrdOssDF->Close();
335 return bufferAlg; // invalid instance;
336 }
337
338 LOGCEPH( "XrdCephOssBufferedFile::Open: fd: " << m_fd << " Buffer created: " << cephbuffer->capacity() );
339 bufferAlg = std::unique_ptr<IXrdCephBufferAlg>(new XrdCephBufferAlgSimple(std::move(cephbuffer),std::move(cephio),m_fd) );
340 } catch (const std::bad_alloc &e) {
341 BUFLOG("XrdCephOssBufferedFile: Bad memory allocation in buffer: " << e.what() );
342 }
343
344 return bufferAlg;
345 }
#define BUFLOG(x)
XrdOucTrace XrdCephTrace
XrdSysError XrdCephEroute(0)
Definition XrdCephOss.cc:50
#define LOGCEPH(x)
#define stat(a, b)
Definition XrdPosix.hh:101
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
Implements a non-async read and write to ceph via aio ceph_posix calls Using the standard ceph_posix_...
Implements a non-async read and write to ceph via ceph_posix calls Using the standard ceph_posix_ cal...
Interface to a holder of the main logic decisions of the buffering algortithm, decoupled from the buf...
virtual ssize_t read_aio(XrdSfsAio *aoip)=0
possible aio based code
Implementation of a buffer using a simple vector<char> Simplest implementation of a buffer using vect...
std::map< size_t, std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > > m_bufferReadAlgs
std::atomic< size_t > m_bytesRead
virtual int Ftruncate(unsigned long long)
std::chrono::time_point< std::chrono::system_clock > m_timestart
int m_maxBufferRetrySleepTime_ms
How many times to retry a ready from a buffer with EBUSY errors.
int m_maxBufferRetries
set the maximum of buffers to open on a single instance (e.g. for simultaneous file reads)
virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env)
virtual int Fstat(struct stat *buff)
std::atomic< size_t > m_bytesReadV
number of bytes read or written
std::atomic< size_t > m_bytesWrite
number of bytes read or written
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > m_bufferAlg
XrdCephOss * m_cephoss
create a new instance of the buffer
virtual ssize_t Write(const void *buff, off_t offset, size_t blen)
std::atomic< size_t > m_bytesReadAIO
number of bytes read or written
std::atomic< size_t > m_bytesWriteAIO
number of bytes read or written
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual ssize_t Read(off_t offset, size_t blen)
virtual int Close(long long *retsz=0)
int m_flags
number of ms to sleep if a retry is requested
size_t m_maxCountReadBuffers
any data access method on the buffer will use this
virtual ssize_t ReadRaw(void *, off_t, size_t)
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > createBuffer()
XrdCephOssBufferedFile(XrdCephOss *cephoss, XrdCephOssFile *cephossDF, size_t buffersize, const std::string &bufferIOmode, size_t maxNumberSimulBuffers)
XrdCephOssFile(XrdCephOss *cephoss)
struct aiocb sfsAio
Definition XrdSfsAio.hh:62
is a simple implementation of IXrdCephBufferData using std::vector<char> representation for the buffe...