fileio.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116
  1. //
  2. // Copyright (c) 2017-2025, Manticore Software LTD (https://manticoresearch.com)
  3. // Copyright (c) 2001-2016, Andrew Aksyonoff
  4. // Copyright (c) 2008-2016, Sphinx Technologies Inc
  5. // All rights reserved
  6. //
  7. // This program is free software; you can redistribute it and/or modify
  8. // it under the terms of the GNU General Public License. You should have
  9. // received a copy of the GPL license along with this program; if you
  10. // did not, you can find it at http://www.gnu.org/
  11. //
  12. #include "fileio.h"
  13. #include "sphinxint.h"
  14. #include <sys/stat.h>
  15. #define SPH_READ_NOPROGRESS_CHUNK (32768*1024)
  16. //////////////////////////////////////////////////////////////////////////
  17. CSphAutofile::CSphAutofile ( const CSphString & sName, int iMode, CSphString & sError, bool bTemp )
  18. {
  19. Open ( sName, iMode, sError, bTemp );
  20. }
  21. CSphAutofile::~CSphAutofile()
  22. {
  23. Close();
  24. }
  25. static int AutoFileOpen ( const CSphString & sName, int iMode )
  26. {
  27. int iFD = -1;
  28. #if _WIN32
  29. if ( iMode==SPH_O_READ )
  30. {
  31. intptr_t tFD = (intptr_t)CreateFile ( sName.cstr(), GENERIC_READ , FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL );
  32. iFD = _open_osfhandle ( tFD, 0 );
  33. } else
  34. iFD = ::open ( sName.cstr(), iMode, 0644 );
  35. #else
  36. iFD = ::open ( sName.cstr(), iMode, 0644 );
  37. #endif
  38. return iFD;
  39. }
  40. int CSphAutofile::Open ( const CSphString & sName, int iMode, CSphString & sError, bool bTemp )
  41. {
  42. assert ( m_iFD==-1 && m_sFilename.IsEmpty() );
  43. assert ( !sName.IsEmpty() );
  44. m_iFD = AutoFileOpen ( sName, iMode );
  45. m_sFilename = sName; // not exactly sure why is this unconditional. for error reporting later, i suppose
  46. if ( m_iFD<0 )
  47. sError.SetSprintf ( "failed to open %s: %s", sName.cstr(), strerrorm(errno) );
  48. else
  49. m_bTemporary = bTemp; // only if we managed to actually open it
  50. return m_iFD;
  51. }
  52. void CSphAutofile::Close()
  53. {
  54. if ( m_iFD>=0 )
  55. {
  56. ::close ( m_iFD );
  57. if ( m_bTemporary )
  58. ::unlink ( m_sFilename.cstr() );
  59. }
  60. m_iFD = -1;
  61. m_sFilename = "";
  62. m_bTemporary = false;
  63. }
  64. int CSphAutofile::LeakID ()
  65. {
  66. m_sFilename = "";
  67. m_bTemporary = false;
  68. return std::exchange ( m_iFD, -1 );
  69. }
  70. void CSphAutofile::SetPersistent()
  71. {
  72. m_bTemporary = false;
  73. }
  74. const char * CSphAutofile::GetFilename() const
  75. {
  76. return m_sFilename.scstr();
  77. }
  78. SphOffset_t CSphAutofile::GetSize ( SphOffset_t iMinSize, bool bCheckSizeT, CSphString & sError )
  79. {
  80. struct_stat st;
  81. if ( stat ( GetFilename(), &st )<0 )
  82. {
  83. sError.SetSprintf ( "failed to stat %s: %s", GetFilename(), strerrorm(errno) );
  84. return -1;
  85. }
  86. if ( st.st_size<iMinSize )
  87. {
  88. sError.SetSprintf ( "failed to load %s: bad size " INT64_FMT " (at least " INT64_FMT " bytes expected)",
  89. GetFilename(), (int64_t)st.st_size, (int64_t)iMinSize );
  90. return -1;
  91. }
  92. if ( bCheckSizeT )
  93. {
  94. size_t uCheck = (size_t)st.st_size;
  95. if ( st.st_size!=SphOffset_t(uCheck) )
  96. {
  97. sError.SetSprintf ( "failed to load %s: bad size " INT64_FMT " (out of size_t; 4 GB limit on 32-bit machine hit?)",
  98. GetFilename(), (int64_t)st.st_size );
  99. return -1;
  100. }
  101. }
  102. return st.st_size;
  103. }
  104. SphOffset_t CSphAutofile::GetSize()
  105. {
  106. CSphString sTmp;
  107. return GetSize ( 0, false, sTmp );
  108. }
  109. bool CSphAutofile::Read ( void * pBuf, int64_t iCount, CSphString & sError )
  110. {
  111. assert ( iCount>=0 );
  112. int64_t iToRead = iCount;
  113. BYTE * pCur = (BYTE *)pBuf;
  114. while ( iToRead>0 )
  115. {
  116. int64_t iToReadOnce = Min ( iToRead, SPH_READ_NOPROGRESS_CHUNK );
  117. int64_t iGot = sphRead ( GetFD(), pCur, (size_t)iToReadOnce );
  118. if ( iGot==-1 )
  119. {
  120. // interrupted by a signal - try again
  121. if ( errno==EINTR )
  122. continue;
  123. sError.SetSprintf ( "read error in %s (%s); " INT64_FMT " of " INT64_FMT " bytes read",
  124. GetFilename(), strerrorm(errno), iCount-iToRead, iCount );
  125. return false;
  126. }
  127. // EOF
  128. if ( iGot==0 )
  129. {
  130. sError.SetSprintf ( "unexpected EOF in %s (%s); " INT64_FMT " of " INT64_FMT " bytes read",
  131. GetFilename(), strerrorm(errno), iCount-iToRead, iCount );
  132. return false;
  133. }
  134. iToRead -= iGot;
  135. pCur += iGot;
  136. }
  137. if ( iToRead!=0 )
  138. {
  139. sError.SetSprintf ( "read error in %s (%s); " INT64_FMT " of " INT64_FMT " bytes read",
  140. GetFilename(), strerrorm(errno), iCount-iToRead, iCount );
  141. return false;
  142. }
  143. return true;
  144. }
  145. //////////////////////////////////////////////////////////////////////////
  146. CSphReader::CSphReader ( BYTE * pBuf, int iSize )
  147. : m_pBuff ( pBuf )
  148. , m_iBufSize ( iSize )
  149. , m_iReadUnhinted ( DEFAULT_READ_UNHINTED )
  150. {
  151. assert ( pBuf==NULL || iSize>0 );
  152. }
  153. CSphReader::~CSphReader()
  154. {
  155. if ( m_bBufOwned )
  156. SafeDeleteArray ( m_pBuff );
  157. }
  158. void CSphReader::SetBuffers ( int iReadBuffer, int iReadUnhinted )
  159. {
  160. if ( !m_pBuff )
  161. m_iBufSize = iReadBuffer;
  162. m_iReadUnhinted = iReadUnhinted;
  163. }
  164. void CSphReader::SetFile ( int iFD, const char * sFilename )
  165. {
  166. m_iFD = iFD;
  167. m_iPos = 0;
  168. m_iBuffPos = 0;
  169. m_iBuffUsed = 0;
  170. m_sFilename = sFilename;
  171. }
  172. void CSphReader::SetFile ( const CSphAutofile & tFile )
  173. {
  174. SetFile ( tFile.GetFD(), tFile.GetFilename() );
  175. }
  176. void CSphReader::Reset()
  177. {
  178. SetFile ( -1, "" );
  179. }
  180. /// sizehint > 0 means we expect to read approx that much bytes
  181. /// sizehint == 0 means no hint, use default (happens later in UpdateCache())
  182. /// sizehint == -1 means reposition and adjust current hint
  183. void CSphReader::SeekTo ( SphOffset_t iPos, int iSizeHint )
  184. {
  185. assert ( iPos>=0 );
  186. assert ( iSizeHint>=-1 );
  187. #ifndef NDEBUG
  188. #if PARANOID
  189. struct_stat tStat;
  190. fstat ( m_iFD, &tStat );
  191. if ( iPos > tStat.st_size )
  192. sphDie ( "INTERNAL ERROR: seeking past the end of file" );
  193. #endif
  194. #endif
  195. if ( iPos>=m_iPos && iPos<m_iPos+m_iBuffUsed )
  196. {
  197. m_iBuffPos = (int)( iPos-m_iPos ); // reposition to proper byte
  198. m_iSizeHint = iSizeHint - ( m_iBuffUsed - m_iBuffPos ); // we already have some bytes cached, so let's adjust size hint
  199. assert ( m_iBuffPos<m_iBuffUsed );
  200. } else
  201. {
  202. m_iPos = iPos;
  203. m_iBuffPos = 0; // for GetPos() to work properly, aaaargh
  204. m_iBuffUsed = 0;
  205. if ( iSizeHint==-1 )
  206. {
  207. // the adjustment bureau
  208. // we need to seek but still keep the current hint
  209. // happens on a skiplist jump, for instance
  210. int64_t iHintLeft = m_iPos + m_iSizeHint - iPos;
  211. if ( iHintLeft>0 && iHintLeft<INT_MAX )
  212. iSizeHint = (int)iHintLeft;
  213. else
  214. iSizeHint = 0;
  215. }
  216. // get that hint
  217. assert ( iSizeHint>=0 );
  218. m_iSizeHint = iSizeHint;
  219. }
  220. }
  221. void CSphReader::SkipBytes ( int iCount )
  222. {
  223. // 0 means "no hint", so this clamp works alright
  224. SeekTo ( m_iPos+m_iBuffPos+iCount, Max ( m_iSizeHint-m_iBuffPos-iCount, 0 ) );
  225. }
  226. void CSphReader::UpdateCache()
  227. {
  228. CSphScopedProfile tProf ( m_pProfile, m_eProfileState );
  229. assert ( m_iFD>=0 );
  230. // alloc buf on first actual read
  231. if ( !m_pBuff )
  232. {
  233. if ( m_iBufSize<=0 )
  234. m_iBufSize = DEFAULT_READ_BUFFER;
  235. m_bBufOwned = true;
  236. m_pBuff = new BYTE [ m_iBufSize ];
  237. }
  238. // stream position could be changed externally
  239. // so let's just hope that the OS optimizes redundant seeks
  240. SphOffset_t iNewPos = m_iPos + Min ( m_iBuffPos, m_iBuffUsed );
  241. if ( m_iSizeHint<=0 )
  242. m_iSizeHint = ( m_iReadUnhinted>0 ) ? m_iReadUnhinted : DEFAULT_READ_UNHINTED;
  243. int iReadLen = Min ( m_iSizeHint, m_iBufSize );
  244. m_iBuffPos = 0;
  245. m_iBuffUsed = sphPread ( m_iFD, m_pBuff, iReadLen, iNewPos ); // FIXME! what about throttling?
  246. if ( m_iBuffUsed<0 )
  247. {
  248. m_iBuffUsed = m_iBuffPos = 0;
  249. m_bError = true;
  250. m_sError.SetSprintf ( "pread error in %s: pos=" INT64_FMT ", len=%d, code=%d, msg=%s", m_sFilename.cstr(), (int64_t)iNewPos, iReadLen, errno, strerror(errno) );
  251. return;
  252. }
  253. // all fine, adjust offset and hint
  254. m_iSizeHint -= m_iBuffUsed;
  255. m_iPos = iNewPos;
  256. }
  257. int CSphReader::GetByte()
  258. {
  259. if ( m_iBuffPos>=m_iBuffUsed )
  260. {
  261. UpdateCache();
  262. if ( m_iBuffPos>=m_iBuffUsed )
  263. {
  264. m_bError = true;
  265. m_sError.SetSprintf ( "pread error in %s: pos=" INT64_FMT ", len=%d", m_sFilename.cstr(), (int64_t)m_iPos, 1 );
  266. return 0; // unexpected io failure
  267. }
  268. }
  269. assert ( m_iBuffPos<m_iBuffUsed );
  270. return m_pBuff [ m_iBuffPos++ ];
  271. }
  272. void CSphReader::GetBytes ( void * pData, int64_t iSize )
  273. {
  274. BYTE * pOut = (BYTE*) pData;
  275. while ( iSize>m_iBufSize )
  276. {
  277. int iLen = m_iBuffUsed - m_iBuffPos;
  278. assert ( iLen<=m_iBufSize );
  279. memcpy ( pOut, m_pBuff+m_iBuffPos, iLen );
  280. m_iBuffPos += iLen;
  281. pOut += iLen;
  282. iSize -= iLen;
  283. m_iSizeHint = Max ( m_iReadUnhinted, iSize );
  284. if ( iSize>0 )
  285. {
  286. UpdateCache();
  287. if ( !m_iBuffUsed )
  288. {
  289. m_sError.SetSprintf ( "pread error in %s: pos=" INT64_FMT ", len=" INT64_FMT ", code=%d, msg=%s", m_sFilename.cstr(), (int64_t)m_iPos, iSize, errno, strerror(errno) );
  290. memset ( pData, 0, iSize );
  291. return; // unexpected io failure
  292. }
  293. }
  294. }
  295. if ( iSize>m_iBuffUsed-m_iBuffPos )
  296. {
  297. // move old buffer tail to buffer head to avoid losing the data
  298. const int iLen = m_iBuffUsed - m_iBuffPos;
  299. if ( iLen>0 )
  300. {
  301. memcpy ( pOut, m_pBuff+m_iBuffPos, iLen );
  302. m_iBuffPos += iLen;
  303. pOut += iLen;
  304. iSize -= iLen;
  305. }
  306. m_iSizeHint = Max ( m_iReadUnhinted, iSize );
  307. UpdateCache();
  308. if ( iSize>m_iBuffUsed-m_iBuffPos )
  309. {
  310. memset ( pData, 0, iSize ); // unexpected io failure
  311. m_bError = true;
  312. m_sError.SetSprintf ( "pread error in %s: pos=" INT64_FMT ", len=" INT64_FMT, m_sFilename.cstr(), (int64_t)m_iPos, iSize );
  313. return;
  314. }
  315. }
  316. assert ( (m_iBuffPos+iSize)<=m_iBuffUsed );
  317. memcpy ( pOut, m_pBuff+m_iBuffPos, iSize );
  318. m_iBuffPos += iSize;
  319. }
  320. int CSphReader::GetLine ( char * sBuffer, int iMaxLen )
  321. {
  322. int iOutPos = 0;
  323. iMaxLen--; // reserve space for trailing '\0'
  324. // grab as many chars as we can
  325. while ( iOutPos<iMaxLen )
  326. {
  327. // read next chunk if necessary
  328. if ( m_iBuffPos>=m_iBuffUsed )
  329. {
  330. UpdateCache();
  331. if ( m_iBuffPos>=m_iBuffUsed )
  332. {
  333. if ( iOutPos==0 ) return -1; // current line is empty; indicate eof
  334. break; // return current line; will return eof next time
  335. }
  336. }
  337. // break on CR or LF
  338. if ( m_pBuff[m_iBuffPos]=='\r' || m_pBuff[m_iBuffPos]=='\n' )
  339. break;
  340. // one more valid char
  341. sBuffer[iOutPos++] = m_pBuff[m_iBuffPos++];
  342. }
  343. // skip everything until the newline or eof
  344. while (true)
  345. {
  346. // read next chunk if necessary
  347. if ( m_iBuffPos>=m_iBuffUsed )
  348. UpdateCache();
  349. // eof?
  350. if ( m_iBuffPos>=m_iBuffUsed )
  351. break;
  352. // newline?
  353. if ( m_pBuff[m_iBuffPos++]=='\n' )
  354. break;
  355. }
  356. // finalize
  357. sBuffer[iOutPos] = '\0';
  358. return iOutPos;
  359. }
  360. void CSphReader::ResetError()
  361. {
  362. m_bError = false;
  363. m_sError = "";
  364. }
  365. SphOffset_t CSphReader::GetFilesize() const
  366. {
  367. assert ( m_iFD>=0 );
  368. return sphGetFileSize ( m_iFD, nullptr );
  369. }
  370. #if TRACE_UNZIP
  371. std::array<std::atomic<uint64_t>, 5> CSphReader::m_dZip32Stats = { 0 };
  372. std::array<std::atomic<uint64_t>, 10> CSphReader::m_dZip64Stats = { 0 };
  373. DWORD CSphReader::UnzipInt()
  374. {
  375. DWORD uRes = UnzipValueBE<DWORD> ( [this]() mutable { return GetByte(); } );
  376. m_dZip32Stats[sphCalcZippedLen ( uRes ) - 1].fetch_add ( 1, std::memory_order_relaxed );
  377. return uRes;
  378. }
  379. uint64_t CSphReader::UnzipOffset()
  380. {
  381. uint64_t uRes = UnzipValueBE<uint64_t> ( [this]() mutable { return GetByte(); } );
  382. m_dZip64Stats[sphCalcZippedLen ( uRes ) - 1].fetch_add ( 1, std::memory_order_relaxed );
  383. return uRes;
  384. }
  385. #else
  386. DWORD CSphReader::UnzipInt()
  387. {
  388. return UnzipValueBE<DWORD> ( [this]() mutable { return GetByte(); } );
  389. }
  390. uint64_t CSphReader::UnzipOffset()
  391. {
  392. return UnzipValueBE<uint64_t> ( [this]() mutable { return GetByte(); } );
  393. }
  394. #endif
  395. CSphReader & CSphReader::operator = ( const CSphReader & rhs )
  396. {
  397. SetFile ( rhs.m_iFD, rhs.m_sFilename.cstr() );
  398. SeekTo ( rhs.m_iPos + rhs.m_iBuffPos, rhs.m_iSizeHint );
  399. return *this;
  400. }
  401. DWORD CSphReader::GetDword()
  402. {
  403. DWORD uRes = 0;
  404. GetBytes ( &uRes, sizeof(DWORD) );
  405. return uRes;
  406. }
  407. SphOffset_t CSphReader::GetOffset()
  408. {
  409. SphOffset_t uRes = 0;
  410. GetBytes ( &uRes, sizeof(SphOffset_t) );
  411. return uRes;
  412. }
  413. CSphString CSphReader::GetString()
  414. {
  415. CSphString sRes;
  416. DWORD uLen = GetDword ();
  417. if ( uLen )
  418. {
  419. sRes.Reserve ( uLen );
  420. GetBytes ( (BYTE *) sRes.cstr (), uLen );
  421. }
  422. return sRes;
  423. }
  424. CSphString CSphReader::GetZString ()
  425. {
  426. CSphString sRes;
  427. auto uLen = UnzipOffset();
  428. if ( uLen )
  429. {
  430. sRes.Reserve ( uLen );
  431. GetBytes ( (BYTE *) sRes.cstr (), uLen );
  432. }
  433. return sRes;
  434. }
  435. bool CSphReader::Tag ( const char * sTag )
  436. {
  437. if ( m_bError )
  438. return false;
  439. assert ( sTag && *sTag ); // empty tags are nonsense
  440. assert ( strlen(sTag)<64 ); // huge tags are nonsense
  441. auto iLen = (int) strlen(sTag);
  442. char sBuf[64];
  443. GetBytes ( sBuf, iLen );
  444. if ( !memcmp ( sBuf, sTag, iLen ) )
  445. return true;
  446. m_bError = true;
  447. m_sError.SetSprintf ( "expected tag %s was not found", sTag );
  448. return false;
  449. }
  450. //////////////////////////////////////////////////////////////////////////
  451. bool CSphAutoreader::Open ( const CSphString & sFilename, CSphString & sError )
  452. {
  453. assert ( m_iFD<0 );
  454. assert ( !sFilename.IsEmpty() );
  455. m_iFD = AutoFileOpen ( sFilename, SPH_O_READ );
  456. m_iPos = 0;
  457. m_iBuffPos = 0;
  458. m_iBuffUsed = 0;
  459. m_sFilename = sFilename;
  460. if ( m_iFD<0 )
  461. sError.SetSprintf ( "failed to open %s: %s", sFilename.cstr(), strerror(errno) );
  462. return ( m_iFD>=0 );
  463. }
  464. void CSphAutoreader::Close()
  465. {
  466. if ( m_iFD>=0 )
  467. ::close ( m_iFD );
  468. m_iFD = -1;
  469. }
  470. SphOffset_t FileReader_c::GetFilesize() const
  471. {
  472. assert ( m_iFD>=0 );
  473. return sphGetFileSize ( m_iFD, nullptr );
  474. }
  475. //////////////////////////////////////////////////////////////////////////
  476. void CSphWriter::SetBufferSize ( int iBufferSize )
  477. {
  478. if ( iBufferSize!=m_iBufferSize )
  479. {
  480. m_iBufferSize = Max ( iBufferSize, 262144 );
  481. m_pBuffer = nullptr;
  482. }
  483. }
  484. bool CSphWriter::OpenFile ( const CSphString & sName, CSphString & sErrorBuffer )
  485. {
  486. return OpenFile ( sName, SPH_O_NEW, sErrorBuffer );
  487. }
  488. bool CSphWriter::OpenFile ( const CSphString & sName, int iOpenFlags, CSphString & sErrorBuffer )
  489. {
  490. assert ( !sName.IsEmpty() );
  491. assert ( m_iFD<0 && "already open" );
  492. m_bOwnFile = true;
  493. m_sName = sName;
  494. m_pError = &sErrorBuffer;
  495. if ( !m_pBuffer )
  496. m_pBuffer = std::make_unique<BYTE[]> ( m_iBufferSize );
  497. m_iFD = ::open ( m_sName.cstr(), iOpenFlags, 0644 );
  498. m_pPool = m_pBuffer.get();
  499. m_iPoolUsed = 0;
  500. m_iPos = 0;
  501. m_iDiskPos = 0;
  502. m_bError = ( m_iFD<0 );
  503. if ( m_bError )
  504. m_pError->SetSprintf ( "failed to create %s: %s" , sName.cstr(), strerror(errno) );
  505. return !m_bError;
  506. }
  507. void CSphWriter::SetFile ( CSphAutofile & tAuto, SphOffset_t * pSharedOffset, CSphString & sError )
  508. {
  509. assert ( m_iFD<0 && "already open" );
  510. m_bOwnFile = false;
  511. if ( !m_pBuffer )
  512. m_pBuffer = std::make_unique<BYTE[]> ( m_iBufferSize );
  513. m_iFD = tAuto.GetFD();
  514. m_sName = tAuto.GetFilename();
  515. m_pPool = m_pBuffer.get();
  516. m_iPoolUsed = 0;
  517. m_iPos = 0;
  518. m_iDiskPos = 0;
  519. m_pSharedOffset = pSharedOffset;
  520. m_pError = &sError;
  521. assert ( m_pError );
  522. }
  523. CSphWriter::~CSphWriter()
  524. {
  525. if ( m_bUnlinkNonClosed && m_bOwnFile )
  526. {
  527. if ( m_iFD >= 0 )
  528. ::close ( m_iFD );
  529. ::unlink ( m_sName.cstr() );
  530. } else
  531. CloseFile();
  532. }
  533. void CSphWriter::CloseFile ( bool bTruncate )
  534. {
  535. if ( m_iFD>=0 )
  536. {
  537. Flush();
  538. if ( bTruncate )
  539. sphTruncate ( m_iFD );
  540. if ( m_bOwnFile )
  541. ::close ( m_iFD );
  542. m_iFD = -1;
  543. m_bUnlinkNonClosed = m_bError;
  544. }
  545. }
  546. void CSphWriter::UpdatePoolUsed()
  547. {
  548. if ( m_pPool-m_pBuffer.get() > m_iPoolUsed )
  549. m_iPoolUsed = m_pPool- m_pBuffer.get();
  550. }
  551. void CSphWriter::PutByte ( BYTE uValue )
  552. {
  553. assert ( m_pPool );
  554. if ( m_iPoolUsed==m_iBufferSize )
  555. Flush();
  556. *m_pPool++ = uValue;
  557. UpdatePoolUsed();
  558. m_iPos++;
  559. }
  560. void CSphWriter::PutBytes ( const void * pData, int64_t iSize )
  561. {
  562. assert ( m_pPool );
  563. const BYTE * pBuf = (const BYTE *) pData;
  564. while ( iSize>0 )
  565. {
  566. int iPut = ( iSize<m_iBufferSize ? int(iSize) : m_iBufferSize ); // comparison int64 to int32
  567. if ( m_iPoolUsed+iPut>m_iBufferSize )
  568. Flush();
  569. assert ( m_iPoolUsed+iPut<=m_iBufferSize );
  570. memcpy ( m_pPool, pBuf, iPut );
  571. m_pPool += iPut;
  572. UpdatePoolUsed();
  573. m_iPos += iPut;
  574. pBuf += iPut;
  575. iSize -= iPut;
  576. }
  577. }
  578. void CSphWriter::ZipInt ( DWORD uValue )
  579. {
  580. ZipValueBE ( [this] ( BYTE b ) { PutByte ( b ); }, uValue );
  581. }
  582. void CSphWriter::ZipOffset ( uint64_t uValue )
  583. {
  584. ZipValueBE ( [this] ( BYTE b ) { PutByte ( b ); }, uValue );
  585. }
  586. void CSphWriter::Flush()
  587. {
  588. if ( m_pSharedOffset && *m_pSharedOffset!=m_iDiskPos )
  589. {
  590. auto uMoved = sphSeek ( m_iFD, m_iDiskPos, SEEK_SET );
  591. if ( uMoved!= m_iDiskPos )
  592. {
  593. m_bError = true;
  594. return;
  595. }
  596. }
  597. if ( !sphWriteThrottled ( m_iFD, m_pBuffer.get(), m_iPoolUsed, m_sName.cstr(), *m_pError ) )
  598. m_bError = true;
  599. m_iDiskPos += m_iPoolUsed;
  600. m_iPoolUsed = 0;
  601. m_pPool = m_pBuffer.get();
  602. if ( m_pSharedOffset )
  603. *m_pSharedOffset = m_iDiskPos;
  604. }
  605. void CSphWriterNonThrottled::Flush ()
  606. {
  607. if ( m_pSharedOffset && *m_pSharedOffset!=m_iDiskPos )
  608. {
  609. auto uMoved = sphSeek ( m_iFD, m_iDiskPos, SEEK_SET );
  610. if ( uMoved!=m_iDiskPos )
  611. {
  612. m_bError = true;
  613. return;
  614. }
  615. }
  616. if ( !WriteNonThrottled ( m_iFD, m_pBuffer.get (), m_iPoolUsed, m_sName.cstr (), *m_pError ) )
  617. m_bError = true;
  618. m_iDiskPos += m_iPoolUsed;
  619. m_iPoolUsed = 0;
  620. m_pPool = m_pBuffer.get ();
  621. if ( m_pSharedOffset )
  622. *m_pSharedOffset = m_iDiskPos;
  623. }
  624. void CSphWriter::PutString ( const char * szString )
  625. {
  626. int iLen = szString ? (int) strlen ( szString ) : 0;
  627. PutDword ( iLen );
  628. if ( iLen )
  629. PutBytes ( szString, iLen );
  630. }
  631. void CSphWriter::PutString ( const CSphString & sString )
  632. {
  633. int iLen = sString.Length();
  634. PutDword ( iLen );
  635. if ( iLen )
  636. PutBytes ( sString.cstr(), iLen );
  637. }
  638. void CSphWriter::PutZString ( const char * szString )
  639. {
  640. int iLen = szString ? (int) strlen ( szString ) : 0;
  641. ZipOffset ( iLen );
  642. if ( iLen )
  643. PutBytes ( szString, iLen );
  644. }
  645. void CSphWriter::PutZString ( const CSphString & sString )
  646. {
  647. int iLen = sString.Length ();
  648. ZipOffset ( iLen );
  649. if ( iLen )
  650. PutBytes ( sString.cstr (), iLen );
  651. }
  652. void CSphWriter::Tag ( const char * sTag )
  653. {
  654. assert ( sTag && *sTag ); // empty tags are nonsense
  655. assert ( strlen(sTag)<64 ); // huge tags are nonsense
  656. PutBytes ( sTag, strlen(sTag) );
  657. }
  658. bool SeekAndWarn ( int iFD, SphOffset_t iPos, const char * szWarnPrefix )
  659. {
  660. assert ( szWarnPrefix );
  661. auto iSeek = sphSeek ( iFD, iPos, SEEK_SET );
  662. if ( iSeek!=iPos )
  663. {
  664. if ( iSeek<0 )
  665. sphWarning ( "%s : seek error. Error: %d '%s'", szWarnPrefix, errno, strerrorm (errno) );
  666. else
  667. sphWarning ( "%s : seek error. Expected: " INT64_FMT ", got " INT64_FMT, szWarnPrefix, (int64_t) iPos, (int64_t) iSeek );
  668. return false;
  669. }
  670. assert ( iSeek==iPos );
  671. return true;
  672. }
  673. void CSphWriter::SeekTo ( SphOffset_t iPos, bool bTruncate )
  674. {
  675. assert ( iPos>=0 );
  676. if ( iPos>=m_iDiskPos && iPos<=( m_iDiskPos + m_iPoolUsed ) )
  677. {
  678. // seeking inside the buffer
  679. // m_iPoolUsed should be always in sync with m_iPos
  680. // or it breaks seek back at cidxHit
  681. m_iPoolUsed = (int)( iPos - m_iDiskPos );
  682. m_pPool = m_pBuffer.get() + m_iPoolUsed;
  683. } else
  684. {
  685. Flush();
  686. SeekAndWarn ( m_iFD, iPos, "CSphWriter::SeekTo" );
  687. if ( bTruncate )
  688. sphTruncate(m_iFD);
  689. m_pPool = m_pBuffer.get();
  690. m_iPoolUsed = 0;
  691. m_iDiskPos = iPos;
  692. }
  693. m_iPos = iPos;
  694. }
  695. //////////////////////////////////////////////////////////////////////////
  696. static int g_iIOpsDelay = 0;
  697. static const int g_iLimitIOSize = ( 1UL << 30 ); // same as write chunk limit 1GB:
  698. // on Linux, read()/write() will transfer at most 0x7ffff000 bytes (on both 32 and 64 bit systems).
  699. static int g_iMaxIOSize = g_iLimitIOSize;
  700. static std::atomic<int64_t> g_tmNextIOTime { 0 };
  701. void sphSetThrottling ( int iMaxIOps, int iMaxIOSize )
  702. {
  703. g_iIOpsDelay = iMaxIOps ? 1000000 / iMaxIOps : iMaxIOps;
  704. g_iMaxIOSize = iMaxIOSize ? Clamp ( 1, g_iLimitIOSize, iMaxIOSize ) : g_iLimitIOSize;
  705. }
  706. static inline void ThrottleSleep()
  707. {
  708. if ( !g_iIOpsDelay )
  709. return;
  710. auto tmTimer = sphMicroTimer();
  711. while ( tmTimer < g_tmNextIOTime.load ( std::memory_order_relaxed ) ) // m.b. >1 sleeps if another thread more lucky
  712. {
  713. sphSleepMsec ( (int)( g_tmNextIOTime.load ( std::memory_order_relaxed ) - tmTimer ) / 1000 );
  714. tmTimer = sphMicroTimer();
  715. }
  716. g_tmNextIOTime.store ( tmTimer + g_iIOpsDelay, std::memory_order_relaxed );
  717. }
  718. bool sphWriteThrottled ( int iFD, const void* pBuf, int64_t iCount, const char* sName, CSphString& sError )
  719. {
  720. if ( iCount <= 0 )
  721. return true;
  722. // by default, slice ios by at most 1 GB
  723. int iChunkSize = ( 1UL << 30 );
  724. // when there's a sane max_iosize (4K to 1GB), use it
  725. if ( g_iMaxIOSize >= 4096 )
  726. iChunkSize = Min ( iChunkSize, g_iMaxIOSize );
  727. CSphIOStats* pIOStats = GetIOStats();
  728. int64_t iTotalWritten = 0;
  729. const int64_t iTotalCount = iCount;
  730. // while there's data, write it chunk by chunk
  731. auto* p = (const BYTE*)pBuf;
  732. while ( iCount )
  733. {
  734. // wait for a timely occasion
  735. ThrottleSleep();
  736. // write (and maybe time)
  737. int64_t tmTimer = 0;
  738. if ( pIOStats )
  739. tmTimer = sphMicroTimer();
  740. auto iToWrite = (int)Min ( iCount, iChunkSize );
  741. auto iWritten = (int)::write ( iFD, &p[iTotalWritten], iToWrite );
  742. if ( pIOStats )
  743. {
  744. pIOStats->m_iWriteTime += sphMicroTimer() - tmTimer;
  745. pIOStats->m_iWriteOps++;
  746. pIOStats->m_iWriteBytes += iWritten;
  747. }
  748. if ( sphInterrupted() && iWritten != iToWrite )
  749. {
  750. sError.SetSprintf ( "%s: write interrupted: %d of %d bytes written", sName, iWritten, iToWrite );
  751. return false;
  752. }
  753. // failure? report, bailout
  754. if ( iWritten<0 )
  755. {
  756. if ( iTotalWritten!=iTotalCount )
  757. sError.SetSprintf ( "%s: write error: %s", sName, strerrorm ( errno ) );
  758. else
  759. sError.SetSprintf ( "%s: write error: %s; " INT64_FMT " of " INT64_FMT " bytes written", sName, strerrorm ( errno ), iTotalWritten, iTotalCount );
  760. return false;
  761. }
  762. // success? rinse, repeat
  763. iCount -= iWritten;
  764. iTotalWritten += iWritten;
  765. }
  766. return true;
  767. }
  768. bool WriteNonThrottled ( int iFD, const void * pBuf, int64_t iCount, const char * sName, CSphString & sError )
  769. {
  770. if ( iCount<=0 )
  771. return true;
  772. CSphIOStats * pIOStats = GetIOStats ();
  773. int64_t iTotalWritten = 0;
  774. const int64_t iTotalCount = iCount;
  775. // while there's data, write it chunk by chunk
  776. auto * p = (const BYTE *) pBuf;
  777. while ( iCount )
  778. {
  779. int64_t tmTimer = 0;
  780. if ( pIOStats )
  781. tmTimer = sphMicroTimer ();
  782. auto iToWrite = (int) Min ( iCount, 1UL << 30 );
  783. auto iWritten = (int) ::write ( iFD, &p[iTotalWritten], iToWrite );
  784. if ( pIOStats )
  785. {
  786. pIOStats->m_iWriteTime += sphMicroTimer ()-tmTimer;
  787. pIOStats->m_iWriteOps++;
  788. pIOStats->m_iWriteBytes += iWritten;
  789. }
  790. if ( sphInterrupted () && iWritten!=iToWrite )
  791. {
  792. sError.SetSprintf ( "%s: write interrupted: %d of %d bytes written", sName, iWritten, iToWrite );
  793. return false;
  794. }
  795. // failure? report, bailout
  796. if ( iWritten<0 )
  797. {
  798. if ( iTotalWritten!=iTotalCount )
  799. sError.SetSprintf ( "%s: write error: %s", sName, strerrorm ( errno ) );
  800. else
  801. sError.SetSprintf ( "%s: write error: %s; " INT64_FMT " of " INT64_FMT " bytes written", sName, strerrorm ( errno ), iTotalWritten, iTotalCount );
  802. return false;
  803. }
  804. // success? rinse, repeat
  805. iCount -= iWritten;
  806. iTotalWritten += iWritten;
  807. }
  808. return true;
  809. }
  810. size_t sphReadThrottled ( int iFD, void* pBuf, size_t iCount )
  811. {
  812. if ( iCount <= 0 )
  813. return iCount;
  814. auto iStep = Min ( iCount, (size_t)g_iMaxIOSize ); // Now always 0 < g_iMaxIOSize < 1 GB
  815. auto* p = (BYTE*)pBuf;
  816. size_t nBytesToRead = iCount;
  817. while ( iCount && !sphInterrupted() )
  818. {
  819. ThrottleSleep();
  820. auto iChunk = (long)Min ( iCount, iStep );
  821. auto iRead = sphRead ( iFD, p, iChunk );
  822. p += iRead;
  823. iCount -= iRead;
  824. if ( iRead != iChunk )
  825. break;
  826. }
  827. return nBytesToRead - iCount; // FIXME? we sure this is under 2gb?
  828. }
  829. //////////////////////////////////////////////////////////////////////////
  830. #if _WIN32
  831. // atomic seek+read for Windows
  832. int sphPread ( int iFD, void * pBuf, int iBytes, SphOffset_t iOffset )
  833. {
  834. if ( iBytes==0 )
  835. return 0;
  836. CSphIOStats * pIOStats = GetIOStats();
  837. int64_t tmStart = 0;
  838. if ( pIOStats )
  839. tmStart = sphMicroTimer();
  840. HANDLE hFile;
  841. hFile = (HANDLE) _get_osfhandle ( iFD );
  842. if ( hFile==INVALID_HANDLE_VALUE )
  843. return -1;
  844. STATIC_SIZE_ASSERT ( SphOffset_t, 8 );
  845. OVERLAPPED tOverlapped = { 0 };
  846. tOverlapped.Offset = (DWORD)( iOffset & I64C(0xffffffff) );
  847. tOverlapped.OffsetHigh = (DWORD)( iOffset>>32 );
  848. DWORD uRes;
  849. if ( !ReadFile ( hFile, pBuf, iBytes, &uRes, &tOverlapped ) )
  850. {
  851. DWORD uErr = GetLastError();
  852. if ( uErr==ERROR_HANDLE_EOF )
  853. return 0;
  854. errno = uErr; // FIXME! should remap from Win to POSIX
  855. return -1;
  856. }
  857. if ( pIOStats )
  858. {
  859. pIOStats->m_iReadTime += sphMicroTimer() - tmStart;
  860. pIOStats->m_iReadOps++;
  861. pIOStats->m_iReadBytes += iBytes;
  862. }
  863. return uRes;
  864. }
  865. #else
  866. #if HAVE_PREAD
  867. // atomic seek+read for non-Windows systems with pread() call
  868. int sphPread ( int iFD, void * pBuf, int iBytes, SphOffset_t iOffset )
  869. {
  870. CSphIOStats * pIOStats = GetIOStats();
  871. if ( !pIOStats )
  872. return ::pread ( iFD, pBuf, iBytes, iOffset );
  873. int64_t tmStart = sphMicroTimer();
  874. int iRes = (int) ::pread ( iFD, pBuf, iBytes, iOffset );
  875. if ( pIOStats )
  876. {
  877. pIOStats->m_iReadTime += sphMicroTimer() - tmStart;
  878. pIOStats->m_iReadOps++;
  879. pIOStats->m_iReadBytes += iBytes;
  880. }
  881. return iRes;
  882. }
  883. #else
  884. // generic fallback; prone to races between seek and read
  885. int sphPread ( int iFD, void * pBuf, int iBytes, SphOffset_t iOffset )
  886. {
  887. if ( sphSeek ( iFD, iOffset, SEEK_SET )==-1 )
  888. return -1;
  889. return sphReadThrottled ( iFD, pBuf, iBytes, &g_tThrottle );
  890. }
  891. #endif // HAVE_PREAD
  892. #endif // _WIN32