searchdaemon.cpp 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696
  1. //
  2. // Copyright (c) 2017-2020, Manticore Software LTD (http://manticoresearch.com)
  3. // Copyright (c) 2001-2016, Andrew Aksyonoff
  4. // Copyright (c) 2008-2016, Sphinx Technologies Inc
  5. // All rights reserved
  6. //
  7. // This program is free software; you can redistribute it and/or modify
  8. // it under the terms of the GNU General Public License. You should have
  9. // received a copy of the GPL license along with this program; if you
  10. // did not, you can find it at http://www.gnu.org/
  11. //
  12. /// @file searchdaemon.cpp
  13. /// Definitions for the stuff need by searchd to work and serve the indexes.
  14. #include "sphinxstd.h"
  15. #include "searchdaemon.h"
  16. #include "optional.h"
  17. #if USE_WINDOWS
  18. #define USE_PSI_INTERFACE 1
  19. // for MAC address
  20. #include <iphlpapi.h>
  21. #pragma comment(lib, "IPHLPAPI.lib")
  22. #else
  23. #include <netdb.h>
  24. // for MAC address
  25. #include <net/if.h>
  26. #include <sys/ioctl.h>
  27. #include <net/ethernet.h>
  28. #endif
  29. // for FreeBSD
  30. #if defined(__FreeBSD__)
  31. #include <sys/sysctl.h>
  32. #include <net/route.h>
  33. #include <net/if_dl.h>
  34. #include <netinet/in.h>
  35. #endif
  36. #include <cmath>
  37. static auto& g_bGotSigterm = sphGetGotSigterm(); // we just received SIGTERM; need to shutdown
  38. /////////////////////////////////////////////////////////////////////////////
  39. // MISC GLOBALS
  40. /////////////////////////////////////////////////////////////////////////////
  41. // 'like' matcher
  42. CheckLike::CheckLike( const char* sPattern )
  43. {
  44. if ( !sPattern )
  45. return;
  46. m_sPattern.Reserve( 2 * (int) strlen( sPattern ));
  47. char* d = const_cast<char*> ( m_sPattern.cstr());
  48. // remap from SQL LIKE syntax to Sphinx wildcards syntax
  49. // '_' maps to '?', match any single char
  50. // '%' maps to '*', match zero or mor chars
  51. for ( const char* s = sPattern; *s; s++ )
  52. {
  53. switch ( *s )
  54. {
  55. case '_': *d++ = '?';
  56. break;
  57. case '%': *d++ = '*';
  58. break;
  59. case '?': *d++ = '\\';
  60. *d++ = '?';
  61. break;
  62. case '*': *d++ = '\\';
  63. *d++ = '*';
  64. break;
  65. default: *d++ = *s;
  66. break;
  67. }
  68. }
  69. *d = '\0';
  70. }
  71. bool CheckLike::Match( const char* sValue )
  72. {
  73. return sValue && ( m_sPattern.IsEmpty() || sphWildcardMatch( sValue, m_sPattern.cstr()));
  74. }
  75. // string vector with 'like' matcher
  76. /////////////////////////////////////////////////////////////////////////////
  77. VectorLike::VectorLike()
  78. : CheckLike( nullptr )
  79. {}
  80. VectorLike::VectorLike( const CSphString& sPattern )
  81. : CheckLike( sPattern.cstr()), m_sColKey( "Variable_name" ), m_sColValue( "Value" )
  82. {}
  83. const char* VectorLike::szColKey() const
  84. {
  85. return m_sColKey.cstr();
  86. }
  87. const char* VectorLike::szColValue() const
  88. {
  89. return m_sColValue.cstr();
  90. }
  91. bool VectorLike::MatchAdd( const char* sValue )
  92. {
  93. if ( Match( sValue ))
  94. {
  95. Add( sValue );
  96. return true;
  97. }
  98. return false;
  99. }
  100. bool VectorLike::MatchAddVa( const char* sTemplate, ... )
  101. {
  102. va_list ap;
  103. CSphString sValue;
  104. va_start ( ap, sTemplate );
  105. sValue.SetSprintfVa( sTemplate, ap );
  106. va_end ( ap );
  107. return MatchAdd( sValue.cstr());
  108. }
  109. const char* g_dIndexTypeName[1 + ( int ) IndexType_e::ERROR_] = {
  110. "plain",
  111. "template",
  112. "rt",
  113. "percolate",
  114. "distributed",
  115. "invalid"
  116. };
  117. CSphString GetTypeName( IndexType_e eType )
  118. {
  119. return g_dIndexTypeName[( int ) eType];
  120. }
  121. IndexType_e TypeOfIndexConfig( const CSphString& sType )
  122. {
  123. if ( sType=="distributed" )
  124. return IndexType_e::DISTR;
  125. if ( sType=="rt" )
  126. return IndexType_e::RT;
  127. if ( sType=="percolate" )
  128. return IndexType_e::PERCOLATE;
  129. if ( sType=="template" )
  130. return IndexType_e::TEMPLATE;
  131. if (( sType.IsEmpty() || sType=="plain" ))
  132. return IndexType_e::PLAIN;
  133. return IndexType_e::ERROR_;
  134. }
  135. void CheckPort( int iPort )
  136. {
  137. if ( !IsPortInRange( iPort ))
  138. sphFatal( "port %d is out of range", iPort );
  139. }
  140. // check only proto name in lowcase, no '_vip'
  141. static Proto_e SimpleProtoByName ( const CSphString& sProto )
  142. {
  143. if ( sProto=="sphinx" )
  144. return Proto_e::SPHINX;
  145. if ( sProto=="mysql41" || sProto=="mysql" )
  146. return Proto_e::MYSQL41;
  147. if ( sProto=="http" )
  148. return Proto_e::HTTP;
  149. if ( sProto=="https" )
  150. return Proto_e::HTTPS;
  151. if ( sProto=="replication" )
  152. return Proto_e::REPLICATION;
  153. sphFatal( "unknown listen protocol type '%s'", sProto.scstr());
  154. return Proto_e::SPHINX;
  155. }
  156. static void ProtoByName( CSphString sFullProto, ListenerDesc_t& tDesc )
  157. {
  158. sFullProto.ToLower();
  159. StrVec_t dParts;
  160. sphSplit( dParts, sFullProto.cstr(), "_" );
  161. if ( !dParts.IsEmpty() )
  162. tDesc.m_eProto = SimpleProtoByName( dParts[0] );
  163. if ( dParts.GetLength()==1 )
  164. return;
  165. if ( dParts.GetLength()==2 && dParts[1]=="vip" )
  166. {
  167. tDesc.m_bVIP = true;
  168. return;
  169. }
  170. sphFatal( "unknown listen protocol type '%s'", sFullProto.scstr() );
  171. }
  172. /// listen = ( address ":" port | port | path | address ":" port start - port end ) [ ":" protocol ] [ "_vip" ]
  173. ListenerDesc_t ParseListener( const char* sSpec )
  174. {
  175. ListenerDesc_t tRes;
  176. tRes.m_eProto = Proto_e::SPHINX;
  177. tRes.m_uIP = htonl(INADDR_ANY);
  178. tRes.m_iPort = SPHINXAPI_PORT;
  179. tRes.m_iPortsCount = 0;
  180. tRes.m_bVIP = false;
  181. // split by colon
  182. auto dParts = sphSplit( sSpec, ":" ); // diff. parts are :-separated
  183. int iParts = dParts.GetLength();
  184. if ( iParts>3 )
  185. sphFatal( "invalid listen format (too many fields)" );
  186. assert ( iParts>=1 && iParts<=3 );
  187. // handle UNIX socket case
  188. // might be either name on itself (1 part), or name+protocol (2 parts)
  189. if ( *dParts[0].scstr()=='/' )
  190. {
  191. if ( iParts>2 )
  192. sphFatal( "invalid listen format (too many fields)" );
  193. if ( iParts==2 )
  194. ProtoByName( dParts[1], tRes );
  195. tRes.m_sUnix = dParts[0];
  196. return tRes;
  197. }
  198. // check if it all starts with a valid port number
  199. auto sPart = dParts[0].cstr();
  200. auto iLen = (int) strlen( sPart );
  201. bool bAllDigits = true;
  202. for ( int i = 0; i<iLen && bAllDigits; ++i )
  203. if ( !isdigit( sPart[i] ))
  204. bAllDigits = false;
  205. int iPort = 0;
  206. if ( bAllDigits && iLen<=5 ) // if we have num from only digits, it may be only port, nothing else!
  207. {
  208. iPort = atol( sPart );
  209. CheckPort( iPort ); // lets forbid ambiguous magic like 0:sphinx or 99999:mysql41
  210. }
  211. // handle TCP port case
  212. // one part. might be either port name, or host name (unix socked case is already parsed)
  213. if ( iParts==1 )
  214. {
  215. if ( iPort )
  216. // port name on itself
  217. tRes.m_iPort = iPort;
  218. else
  219. // host name on itself
  220. tRes.m_uIP = sphGetAddress( sSpec, GETADDR_STRICT );
  221. return tRes;
  222. }
  223. // two or three parts
  224. if ( iPort )
  225. {
  226. // 1st part is a valid port number; must be port:proto
  227. if ( iParts!=2 )
  228. sphFatal( "invalid listen format (expected port:proto, got extra trailing part in listen=%s)", sSpec );
  229. tRes.m_iPort = iPort;
  230. ProtoByName( dParts[1], tRes );
  231. return tRes;
  232. }
  233. // 1st part must be a host name; must be host:port[:proto]
  234. if ( iParts==3 )
  235. ProtoByName( dParts[2], tRes );
  236. tRes.m_uIP = dParts[0].IsEmpty()
  237. ? htonl(INADDR_ANY)
  238. : sphGetAddress( dParts[0].cstr(), GETADDR_STRICT );
  239. auto dPorts = sphSplit( dParts[1].scstr(), "-" );
  240. tRes.m_iPort = atoi( dPorts[0].cstr());
  241. CheckPort( tRes.m_iPort );
  242. if ( dPorts.GetLength()==2 )
  243. {
  244. int iPortsEnd = atoi( dPorts[1].scstr() );
  245. CheckPort( iPortsEnd );
  246. if ( iPortsEnd<=tRes.m_iPort )
  247. sphFatal( "ports range invalid %d-%d", iPort, iPortsEnd );
  248. if (( iPortsEnd - tRes.m_iPort )<2 )
  249. sphFatal( "ports range %d-%d count should be at least 2, got %d", iPort, iPortsEnd,
  250. iPortsEnd - iPort );
  251. tRes.m_iPortsCount = iPortsEnd - tRes.m_iPort;
  252. }
  253. return tRes;
  254. }
  255. /////////////////////////////////////////////////////////////////////////////
  256. // NETWORK SOCKET WRAPPERS
  257. /////////////////////////////////////////////////////////////////////////////
  258. #if USE_WINDOWS
  259. const char * sphSockError ( int iErr )
  260. {
  261. if ( iErr==0 )
  262. iErr = WSAGetLastError ();
  263. static char sBuf [ 256 ];
  264. _snprintf ( sBuf, sizeof(sBuf), "WSA error %d", iErr );
  265. return sBuf;
  266. }
  267. #else
  268. const char* sphSockError( int )
  269. {
  270. return strerrorm(errno);
  271. }
  272. #endif
  273. int sphSockGetErrno()
  274. {
  275. #if USE_WINDOWS
  276. return WSAGetLastError();
  277. #else
  278. return errno;
  279. #endif
  280. }
  281. void sphSockSetErrno( int iErr )
  282. {
  283. #if USE_WINDOWS
  284. WSASetLastError ( iErr );
  285. #else
  286. errno = iErr;
  287. #endif
  288. }
  289. int sphSockPeekErrno()
  290. {
  291. int iRes = sphSockGetErrno();
  292. sphSockSetErrno( iRes );
  293. return iRes;
  294. }
  295. int sphSetSockNB( int iSock )
  296. {
  297. #if USE_WINDOWS
  298. u_long uMode = 1;
  299. return ioctlsocket ( iSock, FIONBIO, &uMode );
  300. #else
  301. return fcntl( iSock, F_SETFL, O_NONBLOCK );
  302. #endif
  303. }
  304. int RecvNBChunk( int iSock, char*& pBuf, int& iLeftBytes )
  305. {
  306. // try to receive next chunk
  307. auto iRes = sphSockRecv ( iSock, pBuf, iLeftBytes );
  308. if ( iRes>0 )
  309. {
  310. pBuf += iRes;
  311. iLeftBytes -= iRes;
  312. }
  313. return ( int ) iRes;
  314. }
  315. /// wait until socket is readable or writable
  316. int sphPoll( int iSock, int64_t tmTimeout, bool bWrite )
  317. {
  318. // don't need any epoll/kqueue here, since we check only 1 socket
  319. #if HAVE_POLL
  320. struct pollfd pfd;
  321. pfd.fd = iSock;
  322. pfd.events = bWrite ? POLLOUT : POLLIN;
  323. return ::poll( &pfd, 1, int( tmTimeout / 1000 ));
  324. #else
  325. fd_set fdSet;
  326. FD_ZERO ( &fdSet );
  327. sphFDSet ( iSock, &fdSet );
  328. struct timeval tv;
  329. tv.tv_sec = (int)( tmTimeout / 1000000 );
  330. tv.tv_usec = (int)( tmTimeout % 1000000 );
  331. return ::select ( iSock+1, bWrite ? NULL : &fdSet, bWrite ? &fdSet : NULL, NULL, &tv );
  332. #endif
  333. }
  334. #if USE_WINDOWS
  335. /// on Windows, the wrapper just prevents the warnings
  336. #pragma warning(push) // store current warning values
  337. #pragma warning(disable:4127) // conditional expr is const
  338. #pragma warning(disable:4389) // signed/unsigned mismatch
  339. void sphFDSet ( int fd, fd_set * fdset )
  340. {
  341. FD_SET ( fd, fdset );
  342. }
  343. void sphFDClr ( int fd, fd_set * fdset )
  344. {
  345. FD_SET ( fd, fdset );
  346. }
  347. #pragma warning(pop) // restore warnings
  348. #else // !USE_WINDOWS
  349. #define SPH_FDSET_OVERFLOW( _fd ) ( (_fd)<0 || (_fd)>=(int)FD_SETSIZE )
  350. /// on UNIX, we also check that the descript won't corrupt the stack
  351. void sphFDSet( int fd, fd_set* set )
  352. {
  353. if ( SPH_FDSET_OVERFLOW( fd ))
  354. sphFatal( "sphFDSet() failed fd=%d, FD_SETSIZE=%d", fd, FD_SETSIZE );
  355. else
  356. FD_SET ( fd, set );
  357. }
  358. void sphFDClr( int fd, fd_set* set )
  359. {
  360. if ( SPH_FDSET_OVERFLOW( fd ))
  361. sphFatal( "sphFDClr() failed fd=%d, FD_SETSIZE=%d", fd, FD_SETSIZE );
  362. else
  363. FD_CLR ( fd, set );
  364. }
  365. #endif // USE_WINDOWS
  366. DWORD sphGetAddress( const char* sHost, bool bFatal, bool bIP )
  367. {
  368. struct addrinfo tHints, * pResult = nullptr;
  369. memset( &tHints, 0, sizeof( tHints ));
  370. tHints.ai_family = AF_INET;
  371. tHints.ai_socktype = SOCK_STREAM;
  372. if ( bIP )
  373. tHints.ai_flags = AI_NUMERICHOST;
  374. int iResult = getaddrinfo( sHost, nullptr, &tHints, &pResult );
  375. auto pOrigResult = pResult;
  376. if ( iResult!=0 || !pResult )
  377. {
  378. if ( bFatal )
  379. sphFatal( "no AF_INET address found for: %s", sHost );
  380. else
  381. sphLogDebugv( "no AF_INET address found for: %s", sHost );
  382. return 0;
  383. }
  384. assert ( pResult );
  385. auto* pSockaddr_ipv4 = ( struct sockaddr_in* ) pResult->ai_addr;
  386. DWORD uAddr = pSockaddr_ipv4->sin_addr.s_addr;
  387. if ( pResult->ai_next )
  388. {
  389. StringBuilder_c sBuf( "; ip=", "ip=" );
  390. for ( ; pResult->ai_next; pResult = pResult->ai_next )
  391. {
  392. char sAddrBuf[SPH_ADDRESS_SIZE];
  393. auto* pAddr = ( struct sockaddr_in* ) pResult->ai_addr;
  394. DWORD uNextAddr = pAddr->sin_addr.s_addr;
  395. sphFormatIP( sAddrBuf, sizeof( sAddrBuf ), uNextAddr );
  396. sBuf << sAddrBuf;
  397. }
  398. sphWarning( "multiple addresses found for '%s', using the first one (%s)", sHost, sBuf.cstr());
  399. }
  400. freeaddrinfo( pOrigResult );
  401. return uAddr;
  402. }
  403. /// formats IP address given in network byte order into sBuffer
  404. /// returns the buffer
  405. char* sphFormatIP( char* sBuffer, int iBufferSize, DWORD uAddress )
  406. {
  407. const BYTE* a = ( const BYTE* ) &uAddress;
  408. snprintf( sBuffer, iBufferSize, "%u.%u.%u.%u", a[0], a[1], a[2], a[3] );
  409. return sBuffer;
  410. }
  411. bool IsPortInRange( int iPort )
  412. {
  413. return ( iPort>0 ) && ( iPort<=0xFFFF );
  414. }
  415. int sphSockRead( int iSock, void* buf, int iLen, int iReadTimeout, bool bIntr )
  416. {
  417. assert ( iLen>0 );
  418. int64_t tmMaxTimer = sphMicroTimer() + I64C( 1000000 ) * Max( 1, iReadTimeout ); // in microseconds
  419. int iLeftBytes = iLen; // bytes to read left
  420. auto pBuf = ( char* ) buf;
  421. int iErr = 0;
  422. int iRes = -1;
  423. while ( iLeftBytes>0 )
  424. {
  425. int64_t tmMicroLeft = tmMaxTimer - sphMicroTimer();
  426. if ( tmMicroLeft<=0 )
  427. break; // timed out
  428. #if USE_WINDOWS
  429. // Windows EINTR emulation
  430. // Ctrl-C will not interrupt select on Windows, so let's handle that manually
  431. // forcibly limit select() to 100 ms, and check flag afterwards
  432. if ( bIntr )
  433. tmMicroLeft = Min ( tmMicroLeft, 100000 );
  434. #endif
  435. // wait until there is data
  436. iRes = sphPoll( iSock, tmMicroLeft );
  437. // if there was EINTR, retry
  438. // if any other error, bail
  439. if ( iRes==-1 )
  440. {
  441. // only let SIGTERM (of all them) to interrupt, and only if explicitly allowed
  442. iErr = sphSockGetErrno();
  443. if ( iErr==EINTR )
  444. {
  445. if ( !( g_bGotSigterm && bIntr ))
  446. continue;
  447. sphLogDebug( "sphSockRead: select got SIGTERM, exit -1" );
  448. }
  449. return -1;
  450. }
  451. // if there was a timeout, report it as an error
  452. if ( iRes==0 )
  453. {
  454. #if USE_WINDOWS
  455. // Windows EINTR emulation
  456. if ( bIntr )
  457. {
  458. // got that SIGTERM
  459. if ( g_bGotSigterm )
  460. {
  461. sphLogDebug ( "sphSockRead: got SIGTERM emulation on Windows, exit -1" );
  462. sphSockSetErrno ( EINTR );
  463. return -1;
  464. }
  465. // timeout might not be fully over just yet, so re-loop
  466. continue;
  467. }
  468. #endif
  469. sphSockSetErrno( ETIMEDOUT );
  470. return -1;
  471. }
  472. // try to receive next chunk
  473. iRes = RecvNBChunk( iSock, pBuf, iLeftBytes );
  474. // if there was eof, we're done
  475. if ( !iRes )
  476. {
  477. sphSockSetErrno( ECONNRESET );
  478. return -1;
  479. }
  480. // if there was EINTR, retry
  481. // if any other error, bail
  482. if ( iRes==-1 )
  483. {
  484. // only let SIGTERM (of all them) to interrupt, and only if explicitly allowed
  485. iErr = sphSockGetErrno();
  486. if ( iErr==EINTR )
  487. {
  488. if ( !( g_bGotSigterm && bIntr ))
  489. continue;
  490. sphLogDebug( "sphSockRead: select got SIGTERM, exit -1" );
  491. }
  492. return -1;
  493. }
  494. // avoid partial buffer loss in case of signal during the 2nd (!) read
  495. bIntr = false;
  496. }
  497. // if there was a timeout, report it as an error
  498. if ( iLeftBytes!=0 )
  499. {
  500. sphSockSetErrno( ETIMEDOUT );
  501. return -1;
  502. }
  503. return iLen;
  504. }
  505. int SockReadFast( int iSock, void* buf, int iLen, int iReadTimeout )
  506. {
  507. auto pBuf = ( char* ) buf;
  508. int iFullLen = iLen;
  509. // try to receive available chunk
  510. int iChunk = RecvNBChunk( iSock, pBuf, iLen );
  511. if ( !iLen ) // all read in one-shot
  512. {
  513. assert ( iChunk==iFullLen );
  514. return iFullLen;
  515. }
  516. auto iRes = sphSockRead( iSock, pBuf, iLen, iReadTimeout, false );
  517. if ( iRes>=0 )
  518. iRes += iChunk;
  519. return iRes;
  520. }
  521. /////////////////////////////////////////////////////////////////////////////
  522. // NETWORK BUFFERS
  523. /////////////////////////////////////////////////////////////////////////////
  524. ISphOutputBuffer::ISphOutputBuffer()
  525. {
  526. m_dBuf.Reserve( NETOUTBUF );
  527. }
  528. // construct via adopting external buf
  529. ISphOutputBuffer::ISphOutputBuffer( CSphVector<BYTE>& dChunk )
  530. {
  531. m_dBuf.SwapData( dChunk );
  532. }
  533. void ISphOutputBuffer::SendString( const char* sStr )
  534. {
  535. int iLen = sStr ? (int) strlen( sStr ) : 0;
  536. SendInt( iLen );
  537. SendBytes( sStr, iLen );
  538. }
  539. /////////////////////////////////////////////////////////////////////////////
  540. void CachedOutputBuffer_c::Flush()
  541. {
  542. CommitAllMeasuredLengths();
  543. ISphOutputBuffer::Flush();
  544. }
  545. intptr_t CachedOutputBuffer_c::StartMeasureLength()
  546. {
  547. auto iPos = ( intptr_t ) m_dBuf.GetLength();
  548. m_dBlobs.Add( iPos );
  549. SendInt( 0 );
  550. return iPos;
  551. }
  552. void CachedOutputBuffer_c::CommitMeasuredLength( intptr_t iStoredPos )
  553. {
  554. if ( m_dBlobs.IsEmpty()) // possible if flush happens before APIheader destroyed.
  555. return;
  556. auto iPos = m_dBlobs.Pop();
  557. assert ( iStoredPos==-1 || iStoredPos==iPos );
  558. auto iBlobLen = m_dBuf.GetLength() - iPos - sizeof( int );
  559. WriteInt( iPos, (int) iBlobLen );
  560. }
  561. void CachedOutputBuffer_c::CommitAllMeasuredLengths()
  562. {
  563. while ( !m_dBlobs.IsEmpty())
  564. {
  565. auto uPos = m_dBlobs.Pop();
  566. auto iBlobLen = m_dBuf.GetLength() - uPos - sizeof( int );
  567. WriteInt( uPos, (int) iBlobLen );
  568. }
  569. }
  570. /// SmartOutputBuffer_t : chain of blobs could be used in scattered sending
  571. /////////////////////////////////////////////////////////////////////////////
  572. SmartOutputBuffer_t::~SmartOutputBuffer_t()
  573. {
  574. m_dChunks.Apply( []( ISphOutputBuffer*& pChunk ) {
  575. SafeRelease ( pChunk );
  576. } );
  577. }
  578. int SmartOutputBuffer_t::GetSentCount() const
  579. {
  580. int iSize = 0;
  581. m_dChunks.Apply( [ &iSize ]( ISphOutputBuffer*& pChunk ) {
  582. iSize += pChunk->GetSentCount();
  583. } );
  584. return iSize + m_dBuf.GetLength();
  585. }
  586. void SmartOutputBuffer_t::StartNewChunk()
  587. {
  588. CommitAllMeasuredLengths();
  589. assert ( BlobsEmpty());
  590. m_dChunks.Add( new ISphOutputBuffer( m_dBuf ));
  591. m_dBuf.Reserve( NETOUTBUF );
  592. }
  593. /*
  594. void SmartOutputBuffer_t::AppendBuf ( SmartOutputBuffer_t &dBuf )
  595. {
  596. if ( !dBuf.m_dBuf.IsEmpty () )
  597. dBuf.StartNewChunk ();
  598. for ( auto * pChunk : dBuf.m_dChunks )
  599. {
  600. pChunk->AddRef ();
  601. m_dChunks.Add ( pChunk );
  602. }
  603. }
  604. void SmartOutputBuffer_t::PrependBuf ( SmartOutputBuffer_t &dBuf )
  605. {
  606. CSphVector<ISphOutputBuffer *> dChunks;
  607. if ( !dBuf.m_dBuf.IsEmpty () )
  608. dBuf.StartNewChunk ();
  609. for ( auto * pChunk : dBuf.m_dChunks )
  610. {
  611. pChunk->AddRef ();
  612. dChunks.Add ( pChunk );
  613. }
  614. dChunks.Append ( m_dChunks );
  615. m_dChunks.SwapData ( dChunks );
  616. }
  617. */
  618. #ifndef UIO_MAXIOV
  619. #define UIO_MAXIOV (1024)
  620. #endif
  621. // makes vector of chunks suitable to direct using in Send() or WSASend()
  622. // returns federated size of the chunks
  623. size_t SmartOutputBuffer_t::GetIOVec( CSphVector<sphIovec>& dOut ) const
  624. {
  625. size_t iOutSize = 0;
  626. dOut.Reset();
  627. m_dChunks.Apply( [ &dOut, &iOutSize ]( const ISphOutputBuffer* pChunk ) {
  628. auto& dIovec = dOut.Add();
  629. IOPTR( dIovec ) = IOBUFTYPE ( pChunk->GetBufPtr());
  630. IOLEN ( dIovec ) = pChunk->GetSentCount();
  631. iOutSize += IOLEN ( dIovec );
  632. } );
  633. if ( !m_dBuf.IsEmpty())
  634. {
  635. auto& dIovec = dOut.Add();
  636. IOPTR ( dIovec ) = IOBUFTYPE ( GetBufPtr());
  637. IOLEN ( dIovec ) = (int) m_dBuf.GetLengthBytes();
  638. iOutSize += IOLEN ( dIovec );
  639. }
  640. assert ( dOut.GetLength()<UIO_MAXIOV );
  641. return iOutSize;
  642. };
  643. void SmartOutputBuffer_t::Reset()
  644. {
  645. m_dChunks.Apply( []( ISphOutputBuffer*& pChunk ) {
  646. SafeRelease ( pChunk );
  647. } );
  648. m_dChunks.Reset();
  649. m_dBuf.Reset();
  650. m_dBuf.Reserve( NETOUTBUF );
  651. };
  652. #if USE_WINDOWS
  653. void SmartOutputBuffer_t::LeakTo ( CSphVector<ISphOutputBuffer *> dOut )
  654. {
  655. for ( auto & pChunk : m_dChunks )
  656. dOut.Add ( pChunk );
  657. m_dChunks.Reset ();
  658. dOut.Add ( new ISphOutputBuffer ( m_dBuf ) );
  659. m_dBuf.Reserve ( NETOUTBUF );
  660. }
  661. #endif
  662. /////////////////////////////////////////////////////////////////////////////
  663. NetOutputBuffer_c::NetOutputBuffer_c( int iSock )
  664. : m_iSock( iSock )
  665. {
  666. assert ( m_iSock>0 );
  667. }
  668. void NetOutputBuffer_c::Flush()
  669. {
  670. CommitAllMeasuredLengths();
  671. if ( m_bError )
  672. return;
  673. int64_t iLen = m_dBuf.GetLength64();
  674. if ( !iLen )
  675. return;
  676. if ( g_bGotSigterm )
  677. sphLogDebug( "SIGTERM in NetOutputBuffer::Flush" );
  678. StringBuilder_c sError;
  679. auto* pBuffer = ( const char* ) m_dBuf.Begin();
  680. CSphScopedProfile tProf( m_pProfile, SPH_QSTATE_NET_WRITE );
  681. const int64_t tmMaxTimer = sphMicroTimer() + MS2SEC * g_iWriteTimeout; // in microseconds
  682. while ( !m_bError )
  683. {
  684. auto iRes = sphSockSend ( m_iSock, pBuffer, iLen );
  685. if ( iRes<0 )
  686. {
  687. int iErrno = sphSockGetErrno();
  688. if ( iErrno==EINTR ) // interrupted before any data was sent; just loop
  689. continue;
  690. if ( iErrno!=EAGAIN && iErrno!=EWOULDBLOCK )
  691. {
  692. sError.Sprintf( "send() failed: %d: %s", iErrno, sphSockError( iErrno ));
  693. sphWarning( "%s", sError.cstr());
  694. m_bError = true;
  695. break;
  696. }
  697. } else
  698. {
  699. m_iSent += iRes;
  700. pBuffer += iRes;
  701. iLen -= iRes;
  702. if ( iLen==0 )
  703. break;
  704. }
  705. // wait until we can write
  706. int64_t tmMicroLeft = tmMaxTimer - sphMicroTimer();
  707. iRes = 0;
  708. if ( tmMicroLeft>0 )
  709. iRes = sphPoll( m_iSock, tmMicroLeft, true );
  710. if ( !iRes ) // timeout
  711. {
  712. sError << "timed out while trying to flush network buffers";
  713. sphWarning( "%s", sError.cstr());
  714. m_bError = true;
  715. break;
  716. }
  717. if ( iRes<0 )
  718. {
  719. int iErrno = sphSockGetErrno();
  720. if ( iErrno==EINTR )
  721. break;
  722. sError.Sprintf( "sphPoll() failed: %d: %s", iErrno, sphSockError( iErrno ));
  723. sphWarning( "%s", sError.cstr());
  724. m_bError = true;
  725. break;
  726. }
  727. assert ( iRes>0 );
  728. }
  729. m_dBuf.Resize( 0 );
  730. }
  731. /////////////////////////////////////////////////////////////////////////////
  732. InputBuffer_c::InputBuffer_c( const BYTE* pBuf, int iLen )
  733. : m_pBuf( pBuf ), m_pCur( pBuf ), m_bError( !pBuf || iLen<0 ), m_iLen( iLen )
  734. {}
  735. InputBuffer_c::InputBuffer_c ( const VecTraits_T<BYTE> & dBuf )
  736. : m_pBuf ( dBuf.begin() ), m_pCur ( dBuf.begin () ), m_bError ( dBuf.IsEmpty() ), m_iLen ( dBuf.GetLength() ) {}
  737. CSphString InputBuffer_c::GetString()
  738. {
  739. CSphString sRes;
  740. int iLen = GetInt();
  741. if ( m_bError || iLen<0 || iLen>g_iMaxPacketSize || ( m_pCur + iLen>m_pBuf + m_iLen ))
  742. {
  743. SetError( true );
  744. return sRes;
  745. }
  746. if ( iLen )
  747. sRes.SetBinary(( char* ) m_pCur, iLen );
  748. m_pCur += iLen;
  749. return sRes;
  750. }
  751. CSphString InputBuffer_c::GetRawString( int iLen )
  752. {
  753. CSphString sRes;
  754. if ( m_bError || iLen<0 || iLen>g_iMaxPacketSize || ( m_pCur + iLen>m_pBuf + m_iLen ))
  755. {
  756. SetError( true );
  757. return sRes;
  758. }
  759. if ( iLen )
  760. sRes.SetBinary(( char* ) m_pCur, iLen );
  761. m_pCur += iLen;
  762. return sRes;
  763. }
  764. bool InputBuffer_c::GetString( CSphVector<BYTE>& dBuffer )
  765. {
  766. int iLen = GetInt();
  767. if ( m_bError || iLen<0 || iLen>g_iMaxPacketSize || ( m_pCur + iLen>m_pBuf + m_iLen ))
  768. {
  769. SetError( true );
  770. return false;
  771. }
  772. if ( !iLen )
  773. return true;
  774. return GetBytes( dBuffer.AddN( iLen ), iLen );
  775. }
  776. bool InputBuffer_c::GetBytes( void* pBuf, int iLen )
  777. {
  778. assert ( pBuf );
  779. assert ( iLen>0 && iLen<=g_iMaxPacketSize );
  780. if ( m_bError || ( m_pCur + iLen>m_pBuf + m_iLen ))
  781. {
  782. SetError( true );
  783. return false;
  784. }
  785. memcpy( pBuf, m_pCur, iLen );
  786. m_pCur += iLen;
  787. return true;
  788. }
  789. bool InputBuffer_c::GetBytesZerocopy( const BYTE** ppData, int iLen )
  790. {
  791. assert ( ppData );
  792. assert ( iLen>0 && iLen<=g_iMaxPacketSize );
  793. if ( m_bError || ( m_pCur + iLen>m_pBuf + m_iLen ))
  794. {
  795. SetError( true );
  796. return false;
  797. }
  798. *ppData = m_pCur;
  799. m_pCur += iLen;
  800. return true;
  801. }
  802. bool InputBuffer_c::GetDwords( CSphVector<DWORD>& dBuffer, int& iGot, int iMax )
  803. {
  804. iGot = GetInt();
  805. if ( iGot<0 || iGot>iMax )
  806. {
  807. SetError( true );
  808. return false;
  809. }
  810. dBuffer.Resize( iGot );
  811. ARRAY_FOREACH ( i, dBuffer )
  812. dBuffer[i] = GetDword();
  813. if ( m_bError )
  814. dBuffer.Reset();
  815. return !m_bError;
  816. }
  817. bool InputBuffer_c::GetQwords( CSphVector<SphAttr_t>& dBuffer, int& iGot, int iMax )
  818. {
  819. iGot = GetInt();
  820. if ( iGot<0 || iGot>iMax )
  821. {
  822. SetError( true );
  823. return false;
  824. }
  825. dBuffer.Resize( iGot );
  826. ARRAY_FOREACH ( i, dBuffer )
  827. dBuffer[i] = GetUint64();
  828. if ( m_bError )
  829. dBuffer.Reset();
  830. return !m_bError;
  831. }
  832. /////////////////////////////////////////////////////////////////////////////
  833. NetInputBuffer_c::NetInputBuffer_c( int iSock )
  834. : STORE( NET_MINIBUFFER_SIZE ), InputBuffer_c( m_pData, NET_MINIBUFFER_SIZE ), m_iSock( iSock )
  835. {
  836. Resize( 0 );
  837. }
  838. bool NetInputBuffer_c::ReadFrom( int iLen, int iTimeout, bool bIntr, bool bAppend )
  839. {
  840. int iTail = bAppend ? m_iLen : 0;
  841. m_bIntr = false;
  842. if ( iLen<=0 || iLen>g_iMaxPacketSize || m_iSock<0 )
  843. return false;
  844. int iOff = m_pCur - m_pBuf;
  845. Resize( m_iLen );
  846. Reserve( iTail + iLen );
  847. BYTE* pBuf = m_pData + iTail;
  848. m_pBuf = m_pData;
  849. m_pCur = bAppend ? m_pData + iOff : m_pData;
  850. int iGot = sphSockRead( m_iSock, pBuf, iLen, iTimeout, bIntr );
  851. if ( g_bGotSigterm )
  852. {
  853. sphLogDebug( "NetInputBuffer_c::ReadFrom: got SIGTERM, return false" );
  854. m_bError = true;
  855. m_bIntr = true;
  856. return false;
  857. }
  858. m_bError = ( iGot!=iLen );
  859. m_bIntr = m_bError && ( sphSockPeekErrno()==EINTR );
  860. m_iLen = m_bError ? 0 : iTail + iLen;
  861. return !m_bError;
  862. }
  863. /////////////////////////////////////////////////////////////////////////////
  864. // SERVED INDEX DESCRIPTORS STUFF
  865. /////////////////////////////////////////////////////////////////////////////
  866. class QueryStatContainer_c: public QueryStatContainer_i
  867. {
  868. public:
  869. void Add( uint64_t uFoundRows, uint64_t uQueryTime, uint64_t uTimestamp ) final;
  870. void GetRecord( int iRecord, QueryStatRecord_t& tRecord ) const final;
  871. int GetNumRecords() const final;
  872. QueryStatContainer_c();
  873. QueryStatContainer_c( QueryStatContainer_c&& tOther ) noexcept;
  874. void Swap( QueryStatContainer_c& rhs ) noexcept;
  875. QueryStatContainer_c& operator=( QueryStatContainer_c tOther ) noexcept;
  876. private:
  877. CircularBuffer_T<QueryStatRecord_t> m_dRecords;
  878. };
  879. void QueryStatContainer_c::Add( uint64_t uFoundRows, uint64_t uQueryTime, uint64_t uTimestamp )
  880. {
  881. if ( !m_dRecords.IsEmpty())
  882. {
  883. QueryStatRecord_t& tLast = m_dRecords.Last();
  884. const uint64_t BUCKET_TIME_DELTA = 100000;
  885. if ( uTimestamp - tLast.m_uTimestamp<=BUCKET_TIME_DELTA )
  886. {
  887. tLast.m_uFoundRowsMin = Min( uFoundRows, tLast.m_uFoundRowsMin );
  888. tLast.m_uFoundRowsMax = Max( uFoundRows, tLast.m_uFoundRowsMax );
  889. tLast.m_uFoundRowsSum += uFoundRows;
  890. tLast.m_uQueryTimeMin = Min( uQueryTime, tLast.m_uQueryTimeMin );
  891. tLast.m_uQueryTimeMax = Max( uQueryTime, tLast.m_uQueryTimeMax );
  892. tLast.m_uQueryTimeSum += uQueryTime;
  893. tLast.m_iCount++;
  894. return;
  895. }
  896. }
  897. const uint64_t MAX_TIME_DELTA = 15 * 60 * 1000000;
  898. while ( !m_dRecords.IsEmpty() && ( uTimestamp - m_dRecords[0].m_uTimestamp )>MAX_TIME_DELTA )
  899. m_dRecords.Pop();
  900. QueryStatRecord_t& tRecord = m_dRecords.Push();
  901. tRecord.m_uFoundRowsMin = uFoundRows;
  902. tRecord.m_uFoundRowsMax = uFoundRows;
  903. tRecord.m_uFoundRowsSum = uFoundRows;
  904. tRecord.m_uQueryTimeMin = uQueryTime;
  905. tRecord.m_uQueryTimeMax = uQueryTime;
  906. tRecord.m_uQueryTimeSum = uQueryTime;
  907. tRecord.m_uTimestamp = uTimestamp;
  908. tRecord.m_iCount = 1;
  909. }
  910. void QueryStatContainer_c::GetRecord( int iRecord, QueryStatRecord_t& tRecord ) const
  911. {
  912. tRecord = m_dRecords[iRecord];
  913. }
  914. int QueryStatContainer_c::GetNumRecords() const
  915. {
  916. return m_dRecords.GetLength();
  917. }
  918. QueryStatContainer_c::QueryStatContainer_c() = default;
  919. QueryStatContainer_c::QueryStatContainer_c( QueryStatContainer_c&& tOther ) noexcept
  920. : QueryStatContainer_c()
  921. { Swap( tOther ); }
  922. void QueryStatContainer_c::Swap( QueryStatContainer_c& rhs ) noexcept
  923. {
  924. rhs.m_dRecords.Swap( m_dRecords );
  925. }
  926. QueryStatContainer_c& QueryStatContainer_c::operator=( QueryStatContainer_c tOther ) noexcept
  927. {
  928. Swap( tOther );
  929. return *this;
  930. }
  931. //////////////////////////////////////////////////////////////////////////
  932. #ifndef NDEBUG
  933. class QueryStatContainerExact_c: public QueryStatContainer_i
  934. {
  935. public:
  936. void Add( uint64_t uFoundRows, uint64_t uQueryTime, uint64_t uTimestamp ) final;
  937. void GetRecord( int iRecord, QueryStatRecord_t& tRecord ) const final;
  938. int GetNumRecords() const final;
  939. QueryStatContainerExact_c();
  940. QueryStatContainerExact_c( QueryStatContainerExact_c&& tOther ) noexcept;
  941. void Swap( QueryStatContainerExact_c& rhs ) noexcept;
  942. QueryStatContainerExact_c& operator=( QueryStatContainerExact_c tOther ) noexcept;
  943. private:
  944. struct QueryStatRecordExact_t
  945. {
  946. uint64_t m_uQueryTime;
  947. uint64_t m_uFoundRows;
  948. uint64_t m_uTimestamp;
  949. };
  950. CircularBuffer_T<QueryStatRecordExact_t> m_dRecords;
  951. };
  952. void QueryStatContainerExact_c::Add( uint64_t uFoundRows, uint64_t uQueryTime, uint64_t uTimestamp )
  953. {
  954. const uint64_t MAX_TIME_DELTA = 15 * 60 * 1000000;
  955. while ( !m_dRecords.IsEmpty() && ( uTimestamp - m_dRecords[0].m_uTimestamp )>MAX_TIME_DELTA )
  956. m_dRecords.Pop();
  957. QueryStatRecordExact_t& tRecord = m_dRecords.Push();
  958. tRecord.m_uFoundRows = uFoundRows;
  959. tRecord.m_uQueryTime = uQueryTime;
  960. tRecord.m_uTimestamp = uTimestamp;
  961. }
  962. int QueryStatContainerExact_c::GetNumRecords() const
  963. {
  964. return m_dRecords.GetLength();
  965. }
  966. void QueryStatContainerExact_c::GetRecord( int iRecord, QueryStatRecord_t& tRecord ) const
  967. {
  968. const QueryStatRecordExact_t& tExact = m_dRecords[iRecord];
  969. tRecord.m_uQueryTimeMin = tExact.m_uQueryTime;
  970. tRecord.m_uQueryTimeMax = tExact.m_uQueryTime;
  971. tRecord.m_uQueryTimeSum = tExact.m_uQueryTime;
  972. tRecord.m_uFoundRowsMin = tExact.m_uFoundRows;
  973. tRecord.m_uFoundRowsMax = tExact.m_uFoundRows;
  974. tRecord.m_uFoundRowsSum = tExact.m_uFoundRows;
  975. tRecord.m_uTimestamp = tExact.m_uTimestamp;
  976. tRecord.m_iCount = 1;
  977. }
  978. QueryStatContainerExact_c::QueryStatContainerExact_c() = default;
  979. QueryStatContainerExact_c::QueryStatContainerExact_c( QueryStatContainerExact_c&& tOther ) noexcept
  980. : QueryStatContainerExact_c()
  981. { Swap( tOther ); }
  982. void QueryStatContainerExact_c::Swap( QueryStatContainerExact_c& rhs ) noexcept
  983. {
  984. rhs.m_dRecords.Swap( m_dRecords );
  985. }
  986. QueryStatContainerExact_c& QueryStatContainerExact_c::operator=( QueryStatContainerExact_c tOther ) noexcept
  987. {
  988. Swap( tOther );
  989. return *this;
  990. }
  991. #endif
  992. //////////////////////////////////////////////////////////////////////////
  993. ServedStats_c::ServedStats_c()
  994. : m_pQueryStatRecords { new QueryStatContainer_c }
  995. #ifndef NDEBUG
  996. , m_pQueryStatRecordsExact { new QueryStatContainerExact_c }
  997. #endif
  998. {
  999. Verify ( m_tStatsLock.Init( true ));
  1000. m_pQueryTimeDigest = sphCreateTDigest();
  1001. m_pRowsFoundDigest = sphCreateTDigest();
  1002. assert ( m_pQueryTimeDigest && m_pRowsFoundDigest );
  1003. }
  1004. ServedStats_c::~ServedStats_c()
  1005. {
  1006. SafeDelete ( m_pRowsFoundDigest );
  1007. SafeDelete ( m_pQueryTimeDigest );
  1008. m_tStatsLock.Done();
  1009. }
  1010. void ServedStats_c::AddQueryStat( uint64_t uFoundRows, uint64_t uQueryTime )
  1011. {
  1012. ScWL_t wLock( m_tStatsLock );
  1013. m_pRowsFoundDigest->Add(( double ) uFoundRows );
  1014. m_pQueryTimeDigest->Add(( double ) uQueryTime );
  1015. uint64_t uTimeStamp = sphMicroTimer();
  1016. m_pQueryStatRecords->Add( uFoundRows, uQueryTime, uTimeStamp );
  1017. #ifndef NDEBUG
  1018. m_pQueryStatRecordsExact->Add( uFoundRows, uQueryTime, uTimeStamp );
  1019. #endif
  1020. m_uTotalFoundRowsMin = Min( uFoundRows, m_uTotalFoundRowsMin );
  1021. m_uTotalFoundRowsMax = Max( uFoundRows, m_uTotalFoundRowsMax );
  1022. m_uTotalFoundRowsSum += uFoundRows;
  1023. m_uTotalQueryTimeMin = Min( uQueryTime, m_uTotalQueryTimeMin );
  1024. m_uTotalQueryTimeMax = Max( uQueryTime, m_uTotalQueryTimeMax );
  1025. m_uTotalQueryTimeSum += uQueryTime;
  1026. ++m_uTotalQueries;
  1027. }
  1028. static const uint64_t g_dStatsIntervals[] =
  1029. {
  1030. 1 * 60 * 1000000,
  1031. 5 * 60 * 1000000,
  1032. 15 * 60 * 1000000
  1033. };
  1034. void ServedStats_c::CalculateQueryStats( QueryStats_t& tRowsFoundStats, QueryStats_t& tQueryTimeStats ) const
  1035. {
  1036. DoStatCalcStats( m_pQueryStatRecords.Ptr(), tRowsFoundStats, tQueryTimeStats );
  1037. }
  1038. #ifndef NDEBUG
  1039. void ServedStats_c::CalculateQueryStatsExact( QueryStats_t& tRowsFoundStats, QueryStats_t& tQueryTimeStats ) const
  1040. {
  1041. DoStatCalcStats( m_pQueryStatRecordsExact.Ptr(), tRowsFoundStats, tQueryTimeStats );
  1042. }
  1043. #endif // !NDEBUG
  1044. void ServedStats_c::CalcStatsForInterval( const QueryStatContainer_i* pContainer, QueryStatElement_t& tRowResult,
  1045. QueryStatElement_t& tTimeResult, uint64_t uTimestamp, uint64_t uInterval, int iRecords )
  1046. {
  1047. assert ( pContainer );
  1048. using namespace QueryStats;
  1049. tRowResult.m_dData[TYPE_AVG] = 0;
  1050. tRowResult.m_dData[TYPE_MIN] = UINT64_MAX;
  1051. tRowResult.m_dData[TYPE_MAX] = 0;
  1052. tTimeResult.m_dData[TYPE_AVG] = 0;
  1053. tTimeResult.m_dData[TYPE_MIN] = UINT64_MAX;
  1054. tTimeResult.m_dData[TYPE_MAX] = 0;
  1055. CSphTightVector<uint64_t> dFound, dTime;
  1056. dFound.Reserve( iRecords );
  1057. dTime.Reserve( iRecords );
  1058. DWORD uTotalQueries = 0;
  1059. QueryStatRecord_t tRecord;
  1060. for ( int i = 0; i<pContainer->GetNumRecords(); ++i )
  1061. {
  1062. pContainer->GetRecord( i, tRecord );
  1063. if ( uTimestamp - tRecord.m_uTimestamp<=uInterval )
  1064. {
  1065. tRowResult.m_dData[TYPE_MIN] = Min( tRecord.m_uFoundRowsMin, tRowResult.m_dData[TYPE_MIN] );
  1066. tRowResult.m_dData[TYPE_MAX] = Max( tRecord.m_uFoundRowsMax, tRowResult.m_dData[TYPE_MAX] );
  1067. tTimeResult.m_dData[TYPE_MIN] = Min( tRecord.m_uQueryTimeMin, tTimeResult.m_dData[TYPE_MIN] );
  1068. tTimeResult.m_dData[TYPE_MAX] = Max( tRecord.m_uQueryTimeMax, tTimeResult.m_dData[TYPE_MAX] );
  1069. dFound.Add( tRecord.m_uFoundRowsSum / tRecord.m_iCount );
  1070. dTime.Add( tRecord.m_uQueryTimeSum / tRecord.m_iCount );
  1071. tRowResult.m_dData[TYPE_AVG] += tRecord.m_uFoundRowsSum;
  1072. tTimeResult.m_dData[TYPE_AVG] += tRecord.m_uQueryTimeSum;
  1073. uTotalQueries += tRecord.m_iCount;
  1074. }
  1075. }
  1076. dFound.Sort();
  1077. dTime.Sort();
  1078. tRowResult.m_uTotalQueries = uTotalQueries;
  1079. tTimeResult.m_uTotalQueries = uTotalQueries;
  1080. if ( !dFound.GetLength())
  1081. return;
  1082. tRowResult.m_dData[TYPE_AVG] /= uTotalQueries;
  1083. tTimeResult.m_dData[TYPE_AVG] /= uTotalQueries;
  1084. int u95 = Max( 0, Min( int( ceilf( dFound.GetLength() * 0.95f ) + 0.5f ) - 1, dFound.GetLength() - 1 ));
  1085. int u99 = Max( 0, Min( int( ceilf( dFound.GetLength() * 0.99f ) + 0.5f ) - 1, dFound.GetLength() - 1 ));
  1086. tRowResult.m_dData[TYPE_95] = dFound[u95];
  1087. tRowResult.m_dData[TYPE_99] = dFound[u99];
  1088. tTimeResult.m_dData[TYPE_95] = dTime[u95];
  1089. tTimeResult.m_dData[TYPE_99] = dTime[u99];
  1090. }
  1091. void ServedStats_c::DoStatCalcStats( const QueryStatContainer_i* pContainer,
  1092. QueryStats_t& tRowsFoundStats, QueryStats_t& tQueryTimeStats ) const
  1093. {
  1094. assert ( pContainer );
  1095. using namespace QueryStats;
  1096. auto uTimestamp = sphMicroTimer();
  1097. ScRL_t rLock( m_tStatsLock );
  1098. int iRecords = m_pQueryStatRecords->GetNumRecords();
  1099. for ( int i = INTERVAL_1MIN; i<=INTERVAL_15MIN; ++i )
  1100. CalcStatsForInterval( pContainer, tRowsFoundStats.m_dStats[i], tQueryTimeStats.m_dStats[i], uTimestamp,
  1101. g_dStatsIntervals[i], iRecords );
  1102. auto& tRowsAllStats = tRowsFoundStats.m_dStats[INTERVAL_ALLTIME];
  1103. tRowsAllStats.m_dData[TYPE_AVG] = m_uTotalQueries ? m_uTotalFoundRowsSum / m_uTotalQueries : 0;
  1104. tRowsAllStats.m_dData[TYPE_MIN] = m_uTotalFoundRowsMin;
  1105. tRowsAllStats.m_dData[TYPE_MAX] = m_uTotalFoundRowsMax;
  1106. tRowsAllStats.m_dData[TYPE_95] = ( uint64_t ) m_pRowsFoundDigest->Percentile( 95 );
  1107. tRowsAllStats.m_dData[TYPE_99] = ( uint64_t ) m_pRowsFoundDigest->Percentile( 99 );
  1108. tRowsAllStats.m_uTotalQueries = m_uTotalQueries;
  1109. auto& tQueryAllStats = tQueryTimeStats.m_dStats[INTERVAL_ALLTIME];
  1110. tQueryAllStats.m_dData[TYPE_AVG] = m_uTotalQueries ? m_uTotalQueryTimeSum / m_uTotalQueries : 0;
  1111. tQueryAllStats.m_dData[TYPE_MIN] = m_uTotalQueryTimeMin;
  1112. tQueryAllStats.m_dData[TYPE_MAX] = m_uTotalQueryTimeMax;
  1113. tQueryAllStats.m_dData[TYPE_95] = ( uint64_t ) m_pQueryTimeDigest->Percentile( 95 );
  1114. tQueryAllStats.m_dData[TYPE_99] = ( uint64_t ) m_pQueryTimeDigest->Percentile( 99 );
  1115. tQueryAllStats.m_uTotalQueries = m_uTotalQueries;
  1116. }
  1117. //////////////////////////////////////////////////////////////////////////
  1118. ServedDesc_t::~ServedDesc_t()
  1119. {
  1120. if ( m_pIndex )
  1121. m_pIndex->Dealloc();
  1122. if ( !m_sUnlink.IsEmpty())
  1123. {
  1124. sphLogDebug( "unlink %s", m_sUnlink.cstr());
  1125. sphUnlinkIndex( m_sUnlink.cstr(), false );
  1126. }
  1127. SafeDelete ( m_pIndex );
  1128. }
  1129. //////////////////////////////////////////////////////////////////////////
  1130. const ServedDesc_t* ServedIndex_c::ReadLock() const
  1131. {
  1132. AddRef();
  1133. if ( m_tLock.ReadLock())
  1134. sphLogDebugvv( "ReadLock %p", this );
  1135. else
  1136. {
  1137. sphLogDebug( "ReadLock %p failed", this );
  1138. assert ( false );
  1139. }
  1140. return ( const ServedDesc_t* ) this;
  1141. }
  1142. // want write lock to wipe out reader and not wait readers
  1143. // but only for RT and PQ indexes as these operations are rare there
  1144. ServedDesc_t* ServedIndex_c::WriteLock() const
  1145. {
  1146. AddRef();
  1147. sphLogDebugvv( "WriteLock %p wait", this );
  1148. if ( m_tLock.WriteLock())
  1149. sphLogDebugvv( "WriteLock %p", this );
  1150. else
  1151. {
  1152. sphLogDebug( "WriteLock %p failed", this );
  1153. assert ( false );
  1154. }
  1155. return ( ServedDesc_t* ) this;
  1156. }
  1157. void ServedIndex_c::Unlock() const
  1158. {
  1159. if ( m_tLock.Unlock())
  1160. sphLogDebugvv( "Unlock %p", this );
  1161. else
  1162. {
  1163. sphLogDebug( "Unlock %p failed", this );
  1164. assert ( false );
  1165. }
  1166. Release();
  1167. }
  1168. ServedIndex_c::ServedIndex_c( const ServedDesc_t& tDesc )
  1169. : m_tLock( ServedDesc_t::IsMutable( &tDesc ))
  1170. {
  1171. *( ServedDesc_t* ) ( this ) = tDesc;
  1172. }
  1173. //////////////////////////////////////////////////////////////////////////
  1174. GuardedHash_c::GuardedHash_c()
  1175. {
  1176. if ( !m_tIndexesRWLock.Init())
  1177. sphDie( "failed to init hash indexes rwlock" );
  1178. }
  1179. GuardedHash_c::~GuardedHash_c()
  1180. {
  1181. ReleaseAndClear();
  1182. Verify ( m_tIndexesRWLock.Done());
  1183. }
  1184. // atomically try add an entry and adopt it
  1185. bool GuardedHash_c::AddUniq( ISphRefcountedMT* pValue, const CSphString& tKey )
  1186. {
  1187. ScWL_t hHashWLock { m_tIndexesRWLock };
  1188. int iPrevSize = GetLengthUnl();
  1189. ISphRefcountedMT*& pVal = m_hIndexes.AddUnique( tKey );
  1190. if ( iPrevSize==GetLengthUnl())
  1191. return false;
  1192. pVal = pValue;
  1193. SafeAddRef ( pVal );
  1194. return true;
  1195. }
  1196. // atomically set new entry, then release previous, if not the same and is non-zero
  1197. void GuardedHash_c::AddOrReplace( ISphRefcountedMT* pValue, const CSphString& tKey )
  1198. {
  1199. ScWL_t hHashWLock { m_tIndexesRWLock };
  1200. // can not use AddUnique as new inserted item has no values
  1201. ISphRefcountedMT** ppEntry = m_hIndexes( tKey );
  1202. if ( ppEntry )
  1203. {
  1204. SafeRelease ( *ppEntry );
  1205. ( *ppEntry ) = pValue;
  1206. } else
  1207. {
  1208. Verify ( m_hIndexes.Add( pValue, tKey ));
  1209. }
  1210. SafeAddRef ( pValue );
  1211. if ( m_pHook )
  1212. m_pHook( pValue, tKey );
  1213. }
  1214. bool GuardedHash_c::Delete( const CSphString& tKey )
  1215. {
  1216. ScWL_t hHashWLock { m_tIndexesRWLock };
  1217. ISphRefcountedMT** ppEntry = m_hIndexes( tKey );
  1218. // release entry - last owner will free it
  1219. if ( ppEntry ) SafeRelease( *ppEntry );
  1220. // remove from hash
  1221. return m_hIndexes.Delete( tKey );
  1222. }
  1223. bool GuardedHash_c::DeleteIfNull( const CSphString& tKey )
  1224. {
  1225. ScWL_t hHashWLock { m_tIndexesRWLock };
  1226. ISphRefcountedMT** ppEntry = m_hIndexes( tKey );
  1227. if ( ppEntry && *ppEntry )
  1228. return false;
  1229. return m_hIndexes.Delete( tKey );
  1230. }
  1231. int GuardedHash_c::GetLength() const
  1232. {
  1233. CSphScopedRLock dRL { m_tIndexesRWLock };
  1234. return GetLengthUnl();
  1235. }
  1236. // check if hash contains an entry
  1237. bool GuardedHash_c::Contains( const CSphString& tKey ) const
  1238. {
  1239. ScRL_t hHashRLock { m_tIndexesRWLock };
  1240. ISphRefcountedMT** ppEntry = m_hIndexes( tKey );
  1241. return ppEntry!=nullptr;
  1242. }
  1243. void GuardedHash_c::ReleaseAndClear()
  1244. {
  1245. ScWL_t hHashWLock { m_tIndexesRWLock };
  1246. for ( m_hIndexes.IterateStart(); m_hIndexes.IterateNext(); ) SafeRelease ( m_hIndexes.IterateGet());
  1247. m_hIndexes.Reset();
  1248. }
  1249. ISphRefcountedMT* GuardedHash_c::Get( const CSphString& tKey ) const
  1250. {
  1251. ScRL_t hHashRLock { m_tIndexesRWLock };
  1252. ISphRefcountedMT** ppEntry = m_hIndexes( tKey );
  1253. if ( !ppEntry )
  1254. return nullptr;
  1255. if ( !*ppEntry )
  1256. return nullptr;
  1257. ( *ppEntry )->AddRef();
  1258. return *ppEntry;
  1259. }
  1260. ISphRefcountedMT* GuardedHash_c::TryAddThenGet( ISphRefcountedMT* pValue, const CSphString& tKey )
  1261. {
  1262. ScWL_t hHashWLock { m_tIndexesRWLock };
  1263. int iPrevSize = GetLengthUnl();
  1264. ISphRefcountedMT*& pVal = m_hIndexes.AddUnique( tKey );
  1265. if ( iPrevSize<GetLengthUnl()) // value just inserted
  1266. {
  1267. pVal = pValue;
  1268. SafeAddRef ( pVal );
  1269. }
  1270. SafeAddRef ( pVal );
  1271. return pVal;
  1272. }
  1273. int GuardedHash_c::GetLengthUnl() const
  1274. {
  1275. return m_hIndexes.GetLength();
  1276. }
  1277. void GuardedHash_c::Rlock() const
  1278. {
  1279. Verify ( m_tIndexesRWLock.ReadLock());
  1280. }
  1281. void GuardedHash_c::Wlock() const
  1282. {
  1283. Verify ( m_tIndexesRWLock.WriteLock());
  1284. }
  1285. void GuardedHash_c::Unlock() const
  1286. {
  1287. Verify ( m_tIndexesRWLock.Unlock());
  1288. }
  1289. CSphString GetMacAddress()
  1290. {
  1291. StringBuilder_c sMAC( ":" );
  1292. #if USE_WINDOWS
  1293. CSphFixedVector<IP_ADAPTER_ADDRESSES> dAdapters ( 128 );
  1294. PIP_ADAPTER_ADDRESSES pAdapter = dAdapters.Begin();
  1295. auto uSize = (DWORD) dAdapters.GetLengthBytes();
  1296. if ( GetAdaptersAddresses ( 0, 0, nullptr, pAdapter, &uSize )==NO_ERROR )
  1297. {
  1298. while ( pAdapter )
  1299. {
  1300. if ( pAdapter->IfType == IF_TYPE_ETHERNET_CSMACD && pAdapter->PhysicalAddressLength>=6 )
  1301. {
  1302. const BYTE * pMAC = pAdapter->PhysicalAddress;
  1303. for ( DWORD i=0; i<pAdapter->PhysicalAddressLength; i++ )
  1304. {
  1305. sMAC.Appendf ( "%02x", *pMAC );
  1306. pMAC++;
  1307. }
  1308. break;
  1309. }
  1310. pAdapter = pAdapter->Next;
  1311. }
  1312. }
  1313. #elif defined(__FreeBSD__)
  1314. size_t iLen = 0;
  1315. const int iMibLen = 6;
  1316. int dMib[iMibLen] = { CTL_NET, AF_ROUTE, 0, AF_LINK, NET_RT_IFLIST, 0 };
  1317. if ( sysctl ( dMib, iMibLen, NULL, &iLen, NULL, 0 )!=-1 )
  1318. {
  1319. CSphFixedVector<char> dBuf ( iLen );
  1320. if ( sysctl ( dMib, iMibLen, dBuf.Begin(), &iLen, NULL, 0 )>=0 )
  1321. {
  1322. if_msghdr * pIf = nullptr;
  1323. for ( const char * pNext = dBuf.Begin(); pNext<dBuf.Begin() + iLen; pNext+=pIf->ifm_msglen )
  1324. {
  1325. pIf = (if_msghdr *)pNext;
  1326. if ( pIf->ifm_type==RTM_IFINFO )
  1327. {
  1328. bool bAllZero = true;
  1329. const sockaddr_dl * pSdl= (const sockaddr_dl *)(pIf + 1);
  1330. const BYTE * pMAC = (const BYTE *)LLADDR(pSdl);
  1331. for ( int i=0; i<ETHER_ADDR_LEN; i++ )
  1332. {
  1333. BYTE uPart = *pMAC;
  1334. pMAC++;
  1335. bAllZero &= ( uPart==0 );
  1336. sMAC.Appendf ( "%02x", uPart );
  1337. }
  1338. if ( !bAllZero )
  1339. break;
  1340. sMAC.Clear();
  1341. sMAC.StartBlock ( ":" );
  1342. }
  1343. }
  1344. }
  1345. }
  1346. #elif defined ( __APPLE__ )
  1347. // no MAC address for OSX
  1348. #else
  1349. int iFD = socket( AF_INET, SOCK_DGRAM, 0 );
  1350. if ( iFD>=0 )
  1351. {
  1352. ifreq dIf[64];
  1353. ifconf tIfConf;
  1354. tIfConf.ifc_len = sizeof( dIf );
  1355. tIfConf.ifc_req = dIf;
  1356. if ( ioctl( iFD, SIOCGIFCONF, &tIfConf )>=0 )
  1357. {
  1358. const ifreq* pIfEnd = dIf + ( tIfConf.ifc_len / sizeof( dIf[0] ));
  1359. for ( const ifreq* pIfCur = tIfConf.ifc_req; pIfCur<pIfEnd; pIfCur++ )
  1360. {
  1361. if ( pIfCur->ifr_addr.sa_family==AF_INET )
  1362. {
  1363. ifreq tIfCur;
  1364. memset( &tIfCur, 0, sizeof( tIfCur ));
  1365. memcpy( tIfCur.ifr_name, pIfCur->ifr_name, sizeof( tIfCur.ifr_name ));
  1366. if ( ioctl( iFD, SIOCGIFHWADDR, &tIfCur )>=0 )
  1367. {
  1368. bool bAllZero = true;
  1369. const BYTE* pMAC = ( const BYTE* ) tIfCur.ifr_hwaddr.sa_data;
  1370. for ( int i = 0; i<ETHER_ADDR_LEN; i++ )
  1371. {
  1372. BYTE uPart = *pMAC;
  1373. pMAC++;
  1374. bAllZero &= ( uPart==0 );
  1375. sMAC.Appendf( "%02x", uPart );
  1376. }
  1377. if ( !bAllZero )
  1378. break;
  1379. sMAC.Clear();
  1380. sMAC.StartBlock( ":" );
  1381. }
  1382. }
  1383. }
  1384. }
  1385. }
  1386. SafeClose( iFD );
  1387. #endif
  1388. return sMAC.cstr();
  1389. }