networking_daemon.cpp 28 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018
  1. //
  2. // Copyright (c) 2017-2020, Manticore Software LTD (http://manticoresearch.com)
  3. // Copyright (c) 2001-2016, Andrew Aksyonoff
  4. // Copyright (c) 2008-2016, Sphinx Technologies Inc
  5. // All rights reserved
  6. //
  7. // This program is free software; you can redistribute it and/or modify
  8. // it under the terms of the GNU General Public License. You should have
  9. // received a copy of the GPL license along with this program; if you
  10. // did not, you can find it at http://www.gnu.org/
  11. //
  12. #include "networking_daemon.h"
  13. #include "loop_profiler.h"
  14. #include "net_action_accept.h"
  15. #include "coroutine.h"
  16. #if USE_WINDOWS
  17. // Win-specific headers and calls
  18. #include <io.h>
  19. #else
  20. // UNIX-specific headers and calls
  21. #include <sys/wait.h>
  22. #include <netdb.h>
  23. #include <netinet/in.h>
  24. #include <netinet/tcp.h>
  25. #endif
  26. int g_tmWait = -1;
  27. int g_iThrottleAction = 0;
  28. /////////////////////////////////////////////////////////////////////////////
  29. /// CSphWakeupEvent - used to kick poller from outside
  30. /////////////////////////////////////////////////////////////////////////////
  31. // event that wakes-up poll net loop from finished thread pool job
  32. CSphWakeupEvent::CSphWakeupEvent ()
  33. : PollableEvent_t()
  34. , ISphNetAction ( m_iPollablefd )
  35. {
  36. m_uNetEvents = NetPollEvent_t::READ;
  37. }
  38. CSphWakeupEvent::~CSphWakeupEvent ()
  39. {
  40. Close ();
  41. m_iSock = -1;
  42. }
  43. void CSphWakeupEvent::Wakeup ()
  44. {
  45. if ( FireEvent () )
  46. return;
  47. int iErrno = PollableErrno ();
  48. sphLogDebugv ( "failed to wakeup net thread ( error %d,'%s')", iErrno, strerrorm ( iErrno ) );
  49. }
  50. void CSphWakeupEvent::Process ( DWORD uGotEvents, CSphNetLoop * )
  51. {
  52. if ( uGotEvents & NetPollEvent_t::READ )
  53. DisposeEvent();
  54. }
  55. #if 0
  56. struct ThdJobCleanup_t final : public ISphJob
  57. {
  58. CSphVector<ISphNetAction *> m_dCleanup;
  59. explicit ThdJobCleanup_t ( CSphVector<ISphNetAction *>&& dCleanup )
  60. : m_dCleanup ( std::move ( dCleanup ))
  61. {}
  62. ~ThdJobCleanup_t () final { Clear(); }
  63. void Call () final { Clear(); };
  64. void Clear()
  65. {
  66. if ( g_eLogLevel>=SPH_LOG_VERBOSE_DEBUG && m_dCleanup.GetLength() )
  67. {
  68. StringBuilder_c sTmp;
  69. ARRAY_FOREACH ( i, m_dCleanup )
  70. sTmp.Sprintf ( "%p(%d), ", m_dCleanup[i], m_dCleanup[i]->m_iSock );
  71. sphLogDebugv ( "cleaned jobs(sock)=%d, %s", m_dCleanup.GetLength (), sTmp.cstr ());
  72. }
  73. ARRAY_FOREACH ( i, m_dCleanup )
  74. SafeDelete ( m_dCleanup[i] );
  75. m_dCleanup.Reset();
  76. }
  77. };
  78. #endif
  79. /////////////////////////////////////////////////////////////////////////////
  80. /// CSphNetLoop - main poller. Used for serving accepts and all socket operations
  81. /////////////////////////////////////////////////////////////////////////////
  82. class CSphNetLoop::Impl_c
  83. {
  84. // since it is impl, everything is private and accessible by friendship
  85. friend class CSphNetLoop;
  86. DWORD & m_uTick;
  87. CSphNetLoop * m_pParent; // that is weak ref
  88. CSphVector<ISphNetAction *> m_dWorkInternal;
  89. CSphVector<ISphNetAction *> m_dWorkExternal GUARDED_BY ( m_tExtLock );
  90. volatile bool m_bGotExternal = false;
  91. CSphWakeupEvent * m_pWakeup = nullptr;
  92. CSphMutex m_tExtLock;
  93. LoopProfiler_t m_tPrf;
  94. CSphScopedPtr<NetPooller_c> m_pPoll;
  95. CSphAutoEvent m_tWorkerFinished;
  96. explicit Impl_c ( const VecTraits_T<Listener_t> & dListeners, DWORD& uTick, CSphNetLoop* pParent )
  97. : m_uTick ( uTick )
  98. , m_pParent ( pParent )
  99. , m_pPoll { new NetPooller_c ( 1000 )}
  100. {
  101. CSphScopedPtr<CSphWakeupEvent> pWakeup ( new CSphWakeupEvent );
  102. if ( pWakeup->IsPollable() )
  103. {
  104. m_pWakeup = pWakeup.LeakPtr ();
  105. sphLogDebugvv ( "Setup wakeup as %d, %d", m_pWakeup->m_iSock, (int) m_pWakeup->m_iTimeoutTimeUS );
  106. m_pPoll->SetupEvent ( m_pWakeup );
  107. } else
  108. sphWarning ( "net-loop use timeout due to %s", pWakeup->m_sError.cstr () );
  109. for ( const auto & dListener : dListeners )
  110. {
  111. auto * pCur = new NetActionAccept_c ( dListener );
  112. sphLogDebugvv ( "setup listener as %d, %d", pCur->m_iSock, (int) pCur->m_iTimeoutTimeUS );
  113. m_pPoll->SetupEvent ( pCur );
  114. }
  115. m_dWorkExternal.Reserve ( 1000 );
  116. m_dWorkInternal.Reserve ( 1000 );
  117. }
  118. void TerminateSessions() REQUIRES ( NetPoollingThread )
  119. {
  120. sphLogDebug ( "TerminateSessions() (%p) invoked", this );
  121. assert ( m_dWorkInternal.IsEmpty () );
  122. {
  123. ScopedMutex_t tExtLock ( m_tExtLock );
  124. for ( auto * pWork : m_dWorkExternal )
  125. pWork->NetLoopDestroying ();
  126. }
  127. m_pPoll->ProcessAll( [] ( NetPollEvent_t * pWork ) { ((ISphNetAction *) pWork)->NetLoopDestroying (); } );
  128. }
  129. // add actions planned by jobs
  130. void PickNewActions () REQUIRES ( NetPoollingThread )
  131. {
  132. m_tPrf.StartExt ();
  133. ScopedMutex_t tExtLock ( m_tExtLock );
  134. m_tPrf.m_iPerfExt = m_dWorkExternal.GetLength ();
  135. assert ( m_dWorkInternal.IsEmpty ());
  136. m_dWorkInternal.SwapData ( m_dWorkExternal );
  137. m_bGotExternal = false;
  138. m_tPrf.EndTask ();
  139. }
  140. int ProcessReady () REQUIRES ( NetPoollingThread )
  141. {
  142. int iMaxIters = 0;
  143. for ( NetPollEvent_t & dReady : *m_pPoll.Ptr() )
  144. {
  145. if ( g_iThrottleAction && iMaxIters>=g_iThrottleAction )
  146. break;
  147. m_tPrf.StartAt ();
  148. assert ( dReady.m_uNetEvents );
  149. auto pWork = (ISphNetAction *) &dReady;
  150. m_pPoll->RemoveTimeout ( pWork ); // ensure that timer (if any) will no more fire
  151. pWork->Process ( dReady.m_uNetEvents, m_pParent );
  152. ++m_tPrf.m_iPerfEv;
  153. ++iMaxIters;
  154. m_tPrf.EndTask ();
  155. }
  156. return iMaxIters;
  157. }
  158. void Poll ( int64_t tmLastWait ) REQUIRES ( NetPoollingThread )
  159. {
  160. // lets spin net-loop thread without syscall\sleep\wait up to net_wait period
  161. // in case we got events recently or call job that might finish early
  162. // otherwise poll ( 1 ) \ epoll_wait ( 1 ) put off this thread and introduce some latency, ie
  163. // sysbench test with 1 thd and 3 empty indexes reports:
  164. // 3k qps for net-loop without spin-wait
  165. // 5k qps for net-loop with spin-wait
  166. int iWaitMs = 0;
  167. if ( g_tmWait==-1 || ( g_tmWait>0 && sphMicroTimer ()-tmLastWait>I64C( 10000 ) * g_tmWait ))
  168. iWaitMs = m_pWakeup ? WAIT_UNTIL_TIMEOUT : 1;
  169. m_tPrf.StartPoll ();
  170. // need positive timeout for communicate threads back and shutdown
  171. Threads::IdleTimer_t _;
  172. m_pPoll->Wait ( iWaitMs );
  173. m_tPrf.EndTask ();
  174. }
  175. void LoopNetPoll () REQUIRES ( NetPoollingThread )
  176. {
  177. int64_t tmLastWait = sphMicroTimer();
  178. while ( !sphInterrupted() )
  179. {
  180. m_tPrf.Start();
  181. Poll ( tmLastWait );
  182. ++m_uTick;
  183. // add actions planned by jobs
  184. if ( m_bGotExternal )
  185. PickNewActions ();
  186. // handle events and collect stats
  187. m_tPrf.StartTick();
  188. sphLogDebugv ( "got events=%d, tick=%u, interrupted=%d", m_pPoll->GetNumOfReady (), m_uTick, !!sphInterrupted () );
  189. auto iProcessed = ProcessReady();
  190. m_tPrf.EndTask();
  191. // setup or refresh handlers
  192. m_tPrf.StartNext();
  193. m_dWorkInternal.Apply ( [&] ( ISphNetAction * pWork ) REQUIRES ( NetPoollingThread )
  194. {
  195. assert ( pWork && pWork->m_iSock>=0 );
  196. m_pPoll->SetupEvent ( pWork );
  197. });
  198. m_dWorkInternal.Resize ( 0 );
  199. m_tPrf.EndTask();
  200. // will remove outdated even if they're just added (to avoid polling them)
  201. iProcessed += RemoveOutdated ();
  202. if ( iProcessed )
  203. tmLastWait = sphMicroTimer();
  204. m_tPrf.End();
  205. }
  206. m_tWorkerFinished.SetEvent ();
  207. }
  208. int RemoveOutdated () REQUIRES ( NetPoollingThread )
  209. {
  210. int64_t tmNow = sphMicroTimer();
  211. m_tPrf.StartRemove();
  212. int iRemoved = 0;
  213. // remove outdated items on no signals
  214. m_pPoll->ProcessAll([&] ( NetPollEvent_t * pEvent ) REQUIRES ( NetPoollingThread )
  215. {
  216. auto * pWork = (ISphNetAction *) pEvent;
  217. // skip eternal (non-timeouted)
  218. if ( pWork->m_iTimeoutIdx<0 || pWork->m_iTimeoutTimeUS<=0 || !TimeoutReached (pWork->m_iTimeoutTimeUS, tmNow))
  219. return;
  220. sphLogDebugv ( "%p bailing on timeout no signal, sock=%d", pWork, pWork->m_iSock );
  221. pWork->Process ( NetPollEvent_t::TIMEOUT, m_pParent );
  222. ++iRemoved;
  223. });
  224. m_tPrf.EndTask();
  225. return iRemoved;
  226. }
  227. void Kick ()
  228. {
  229. sphLogDebugvv ( "Kick" );
  230. if ( m_pWakeup )
  231. m_pWakeup->Wakeup ();
  232. }
  233. void StopNetLoop ()
  234. {
  235. sphLogDebug ( "StopNetLoop()" );
  236. Kick ();
  237. m_tWorkerFinished.WaitEvent ();
  238. // it is safe to call terminations here, since netpool is stopped. So, declare that we're 'netpool' now.
  239. ScopedRole_c thPoll ( NetPoollingThread );
  240. TerminateSessions();
  241. sphLogDebug ( "StopNetLoop() succeeded" );
  242. }
  243. void AddAction ( ISphNetAction * pElem )
  244. {
  245. sphLogDebugvv ( "AddAction action as %d, events %d, timeout %d",
  246. pElem->m_iSock, pElem->m_uNetEvents, (int) pElem->m_iTimeoutTimeUS );
  247. ScopedMutex_t tExtLock ( m_tExtLock );
  248. m_dWorkExternal.Add ( pElem );
  249. m_bGotExternal = true;
  250. Kick();
  251. }
  252. void RemoveEvent ( NetPollEvent_t * pEvent ) REQUIRES ( NetPoollingThread )
  253. {
  254. sphLogDebug ( "RemoveEvent()" );
  255. m_pPoll->RemoveEvent ( pEvent );
  256. }
  257. };
  258. /////////////////////////////////////////////////////////////////////////////
  259. CSphNetLoop::CSphNetLoop ( const VecTraits_T<Listener_t> & dListeners )
  260. {
  261. m_pImpl = new Impl_c ( dListeners, m_uTick, this );
  262. }
  263. CSphNetLoop::~CSphNetLoop ()
  264. {
  265. SafeDelete ( m_pImpl );
  266. sphLogDebugv ( "~CSphNetLoop() (%p) completed", this );
  267. }
  268. void CSphNetLoop::LoopNetPoll ()
  269. {
  270. ScopedRole_c thPoll ( NetPoollingThread );
  271. assert ( m_pImpl );
  272. m_pImpl->LoopNetPoll();
  273. }
  274. void CSphNetLoop::StopNetLoop()
  275. {
  276. assert ( m_pImpl );
  277. m_pImpl->StopNetLoop ();
  278. };
  279. void CSphNetLoop::AddAction ( ISphNetAction * pElem )
  280. {
  281. assert ( m_pImpl );
  282. m_pImpl->AddAction ( pElem );
  283. }
  284. void CSphNetLoop::RemoveEvent ( NetPollEvent_t * pEvent ) REQUIRES ( NetPoollingThread )
  285. {
  286. assert ( m_pImpl );
  287. m_pImpl->RemoveEvent ( pEvent );
  288. }
  289. /////////////////////////////////////////////////////////////////////////////
  290. /// SockWrapper_c::Impl_c internal async socket implementation
  291. /////////////////////////////////////////////////////////////////////////////
  292. class SockWrapper_c::Impl_c final : public ISphNetAction
  293. {
  294. friend class SockWrapper_c;
  295. CSphRefcountedPtr<CSphNetLoop> m_pNetLoop;
  296. Handler m_fnWakeFromPoll = nullptr;
  297. std::atomic<bool> m_bEngaged { false }; // whether resume or not m_fnRestarter
  298. int64_t m_iWriteTimeoutUS;
  299. int64_t m_iReadTimeoutUS;
  300. Impl_c ( int iSocket, CSphNetLoop * pNetLoop );
  301. ~Impl_c () final;
  302. int64_t SockRecv ( char * pBuf, int64_t iLeftBytes );
  303. int64_t SockSend ( const char * pBuf, int64_t iLeftBytes );
  304. int64_t GetTimeoutUS () const;
  305. void SetTimeoutUS ( int64_t iTimeoutUS );
  306. int64_t GetWTimeoutUS () const;
  307. void SetWTimeoutUS ( int64_t iTimeoutUS );
  308. void EngageWaiterAndYield( int64_t tmTimeUntilUs );
  309. int SockPoll ( int64_t tmTimeUntilUs, bool bWrite );
  310. int SockPollClassic ( int64_t tmTimeUntilUs, bool bWrite );
  311. int SockPollNetloop ( int64_t tmTimeUntilUs, bool bWrite );
  312. public:
  313. void Process ( DWORD uGotEvents, CSphNetLoop * ) REQUIRES ( NetPoollingThread ) final;
  314. void NetLoopDestroying () final;
  315. };
  316. SockWrapper_c::Impl_c::Impl_c ( int iSocket, CSphNetLoop * pNetLoop )
  317. : ISphNetAction ( iSocket )
  318. , m_pNetLoop ( pNetLoop )
  319. , m_iWriteTimeoutUS ( g_iWriteTimeoutS * S2US )
  320. , m_iReadTimeoutUS ( g_iReadTimeoutS * S2US )
  321. {
  322. SafeAddRef ( pNetLoop );
  323. }
  324. SockWrapper_c::Impl_c::~Impl_c ()
  325. {
  326. // unlink _before_ closing the sock - since closing may cause event which we don't want to deal with.
  327. NetPooller_c::Unlink ( this );
  328. if ( m_iSock>=0 )
  329. {
  330. sphLogDebugv ( "closing sock=%d", m_iSock );
  331. sphSockClose ( m_iSock );
  332. }
  333. }
  334. // Netpool is already stopped, so it is th-safe here.
  335. void SockWrapper_c::Impl_c::NetLoopDestroying ()
  336. {
  337. sphLogDebug ( "SockWrapper_c::Impl_c::NetLoopDestroying ()" );
  338. // unlink here ensures we're not refer anyway to netpool (if any), and so, will not check it again in d-tr
  339. NetPooller_c::Unlink ( this );
  340. // if we're not finished - setting m_pNetLoop to null will just switch us to classic blocking polling.
  341. m_pNetLoop = nullptr;
  342. // socket is just unactive (oneshoted and not removed). Nothing to do.
  343. if ( !m_bEngaged.load ( std::memory_order_acquire ) )
  344. return;
  345. sphLogDebug ( "SockWrapper_c::Impl_c::NetLoopDestroying () will resume sleeping job" );
  346. // if we're in state of waiting - forcibly set awake reason to 'timeout', then wake up.
  347. m_uNetEvents = TIMEOUT;
  348. m_fnWakeFromPoll ();
  349. }
  350. // this is blocking function. Aware, that current thread may change when it finished.
  351. void SockWrapper_c::Impl_c::EngageWaiterAndYield ( int64_t tmTimeUntilUs )
  352. {
  353. assert ( m_pNetLoop );
  354. sphLogDebugv ( "CoYieldWith (m_iEvent=%d), timeout %d", m_uNetEvents, int(tmTimeUntilUs-sphMicroTimer ()) );
  355. m_iTimeoutTimeUS = tmTimeUntilUs;
  356. if ( !m_fnWakeFromPoll ) // must be set here, NOT in ctr (since m.b. constructed in different ctx)
  357. m_fnWakeFromPoll = Threads::CurrentRestarter ();
  358. // switch context (go to poll)
  359. m_bEngaged.store ( true, std::memory_order_release );
  360. Threads::CoYieldWith ( [this] { m_pNetLoop->AddAction ( this ); } );
  361. m_bEngaged.store ( false, std::memory_order_release );
  362. // here we switched back by call m_fnWakeFromPoll.
  363. sphLogDebugv ( "EngageWaiterAndYield awake (m_iSock=%d, events=%d)", m_iSock, m_uNetEvents );
  364. }
  365. // Called in strict order after EngageWaiterAndYield.
  366. // timer is removed and will NOT tick anyway in future.
  367. // event itself is deactivated (for socket it is one-shot), or timed-out (need to be removed)
  368. // If it was called >once - search for the problem in caller place.
  369. void SockWrapper_c::Impl_c::Process ( DWORD uGotEvents, CSphNetLoop * ) REQUIRES ( NetPoollingThread )
  370. {
  371. assert ( m_bEngaged.load ( std::memory_order_acquire ) );
  372. if ( CheckSocketError ( uGotEvents ) || uGotEvents==TIMEOUT ) // real socket error
  373. m_pNetLoop->RemoveEvent ( this );
  374. m_uNetEvents = uGotEvents;
  375. m_fnWakeFromPoll ();
  376. }
  377. // classic version - blocking via sphPoll
  378. int SockWrapper_c::Impl_c::SockPollClassic ( int64_t tmTimeUntilUs, bool bWrite )
  379. {
  380. int64_t tmMicroLeft = ( tmTimeUntilUs-sphMicroTimer () );
  381. if ( tmMicroLeft<0 )
  382. tmMicroLeft = 0;
  383. Threads::IdleTimer_t _;
  384. int iRes = sphPoll ( m_iSock, tmMicroLeft, bWrite );
  385. sphLogDebugv ( "sphPoll for alone returned %d in " INT64_FMT " Us", iRes, tmMicroLeft-tmTimeUntilUs+sphMicroTimer() );
  386. return iRes;
  387. }
  388. // netloop version - yield rescheduling and yield
  389. int SockWrapper_c::Impl_c::SockPollNetloop ( int64_t tmTimeUntilUs, bool bWrite )
  390. {
  391. m_uNetEvents = NetPollEvent_t::ONCE | ( bWrite ? NetPollEvent_t::WRITE : NetPollEvent_t::READ );
  392. EngageWaiterAndYield ( tmTimeUntilUs );
  393. if ( m_uNetEvents == NetPollEvent_t::TIMEOUT )
  394. {
  395. sphSockSetErrno ( ETIMEDOUT );
  396. return 0;
  397. }
  398. if ( CheckSocketError ( m_uNetEvents ) )
  399. return -1;
  400. return 1;
  401. }
  402. // as usual sphPoll - returns 1 on success, 0 on timeout, -1 on error.
  403. int SockWrapper_c::Impl_c::SockPoll ( int64_t tmTimeUntilUs, bool bWrite )
  404. {
  405. return m_pNetLoop ? SockPollNetloop ( tmTimeUntilUs, bWrite ) : SockPollClassic ( tmTimeUntilUs, bWrite );
  406. }
  407. int64_t SockWrapper_c::Impl_c::SockSend ( const char * pBuf, int64_t iLeftBytes )
  408. {
  409. return sphSockSend ( m_iSock, pBuf, iLeftBytes );
  410. }
  411. int64_t SockWrapper_c::Impl_c::SockRecv ( char * pBuf, int64_t iLeftBytes )
  412. {
  413. sphLogDebugvv ( "SockRecv %d, for " INT64_FMT " bytes", m_iSock, iLeftBytes );
  414. return sphSockRecv ( m_iSock, pBuf, iLeftBytes );
  415. }
  416. int64_t SockWrapper_c::Impl_c::GetTimeoutUS () const
  417. {
  418. return m_iReadTimeoutUS;
  419. }
  420. void SockWrapper_c::Impl_c::SetTimeoutUS ( int64_t iTimeoutUS )
  421. {
  422. m_iReadTimeoutUS = iTimeoutUS;
  423. }
  424. int64_t SockWrapper_c::Impl_c::GetWTimeoutUS () const
  425. {
  426. return m_iWriteTimeoutUS;
  427. }
  428. void SockWrapper_c::Impl_c::SetWTimeoutUS ( int64_t iTimeoutUS )
  429. {
  430. m_iWriteTimeoutUS = iTimeoutUS;
  431. }
  432. /////////////////////////////////////////////////////////////////////////////
  433. /// SockWrapper_c frontend implementation
  434. /////////////////////////////////////////////////////////////////////////////
  435. SockWrapper_c::SockWrapper_c ( int iSocket, CSphNetLoop * pNetLoop )
  436. : m_pImpl ( new Impl_c ( iSocket, pNetLoop ) )
  437. {}
  438. SockWrapper_c::~SockWrapper_c ()
  439. {
  440. SafeDelete ( m_pImpl );
  441. }
  442. int64_t SockWrapper_c::SockSend ( const char * pData, int64_t iLen )
  443. {
  444. assert ( m_pImpl );
  445. return m_pImpl->SockSend ( pData, iLen );
  446. }
  447. int64_t SockWrapper_c::SockRecv ( char * pData, int64_t iLen )
  448. {
  449. assert ( m_pImpl );
  450. return m_pImpl->SockRecv ( pData, iLen );
  451. }
  452. int SockWrapper_c::SockPoll ( int64_t tmTimeUntilUs, bool bWrite )
  453. {
  454. assert ( m_pImpl );
  455. return m_pImpl->SockPoll ( tmTimeUntilUs, bWrite );
  456. }
  457. int64_t SockWrapper_c::GetTimeoutUS () const
  458. {
  459. assert ( m_pImpl );
  460. return m_pImpl->GetTimeoutUS();
  461. }
  462. void SockWrapper_c::SetTimeoutUS ( int64_t iTimeoutUS )
  463. {
  464. assert ( m_pImpl );
  465. m_pImpl->SetTimeoutUS (iTimeoutUS);
  466. }
  467. int64_t SockWrapper_c::GetWTimeoutUS () const
  468. {
  469. assert ( m_pImpl );
  470. return m_pImpl->GetWTimeoutUS ();
  471. }
  472. void SockWrapper_c::SetWTimeoutUS ( int64_t iTimeoutUS )
  473. {
  474. assert ( m_pImpl );
  475. m_pImpl->SetWTimeoutUS ( iTimeoutUS );
  476. }
  477. /////////////////////////////////////////////////////////////////////////////
  478. /// Helpers
  479. /////////////////////////////////////////////////////////////////////////////
  480. // Send a blob into socket.
  481. // Alone worker will use waiting in poll.
  482. // Cooperative worker will yield and resume instead of waiting.
  483. // timeout is ruled by g_iWriteTimeoutS.
  484. static bool SyncSend ( SockWrapper_c* pSock, const char * pBuffer, int64_t iLen)
  485. {
  486. if ( sphInterrupted () )
  487. sphLogDebug ( "SIGTERM in SockWrapper_c::Send" );
  488. if ( iLen<=0 )
  489. return false;
  490. sphLogDebugv ( "AsyncSend " INT64_FMT " bytes", iLen );
  491. auto iTimeoutUntilUs = sphMicroTimer () + pSock->GetWTimeoutUS();
  492. do
  493. {
  494. auto iRes = pSock->SockSend ( pBuffer, iLen );
  495. if ( iRes<0 )
  496. {
  497. int iErrno = sphSockGetErrno ();
  498. if ( iErrno==EINTR ) // interrupted before any data was sent; just loop
  499. continue;
  500. if ( iErrno!=EAGAIN && iErrno!=EWOULDBLOCK ) {
  501. sphWarning ( "send() failed: %d: %s", iErrno, sphSockError ( iErrno ));
  502. return false;
  503. }
  504. } else {
  505. if ( iLen==iRes )
  506. return true; // we're finished
  507. iLen -= iRes;
  508. pBuffer += iRes;
  509. }
  510. sphLogDebugv ("Still need to send " INT64_FMT " bytes...", iLen );
  511. } while (pSock->SockPoll ( iTimeoutUntilUs, true ) );
  512. sphWarning ( "timed out while performing SyncSend to flush network buffers" );
  513. return false;
  514. }
  515. // fetch a chunk of bytes from socket and adjust position/rest of bytes
  516. static int AsyncRecvNBChunk ( SockWrapper_c * pSock, BYTE *& pBuf, int & iLeftBytes )
  517. {
  518. // try to receive next chunk
  519. auto iRes = pSock->SockRecv ( (char*) pBuf, iLeftBytes );
  520. sphLogDebugv ( "=========== AsyncRecvNBChunk " INT64_FMT " when read %d bytes", iRes, iLeftBytes );
  521. if ( iRes>0 )
  522. {
  523. pBuf += iRes;
  524. iLeftBytes -= iRes;
  525. }
  526. return (int) iRes;
  527. }
  528. #if USE_WINDOWS
  529. #define EMULATE_EINTR 1
  530. #endif
  531. //#define EMULATE_EINTR 1
  532. // flexible receive data from socket. iLen indicates, how many bytes to read. iSpace - how many is _safe_ to read.
  533. // (i.e. if you want 1 byte and space for 100 - you can read up to 100 bytes, but not 101).
  534. static int SyncSockRead ( SockWrapper_c * pSock, BYTE* pBuf, int iLen, int iSpace, bool bIntr )
  535. {
  536. assert ( iSpace>=iLen );
  537. // try to receive available chunk
  538. int iReceived = AsyncRecvNBChunk ( pSock, pBuf, iSpace );
  539. sphLogDebugv ( "AsyncRecvNBChunk %d bytes (%d requested)", iReceived, iLen );
  540. if ( iReceived>=iLen ) // all, and m.b. more read in one-shot
  541. return iReceived;
  542. // immediate error (most probably it is E_AGAIN; check!)
  543. if ( iReceived<0 )
  544. iReceived = 0;
  545. iLen -= iReceived;
  546. if ( !iLen )
  547. return iReceived;
  548. int64_t tmMaxTimer = sphMicroTimer ()+Max ( S2US, pSock->GetTimeoutUS () ); // in microseconds
  549. int iErr = 0;
  550. int iRes = -1;
  551. while ( iLen>0 )
  552. {
  553. int64_t tmNextStopUs = tmMaxTimer;
  554. #if EMULATE_EINTR
  555. // Windows EINTR emulation
  556. // Ctrl-C will not interrupt select on Windows, so let's handle that manually
  557. // forcibly limit select() to 100 ms, and check flag afterwards
  558. if ( bIntr )
  559. tmNextStopUs = Min ( tmMaxTimer, sphMicroTimer () + 100000 );
  560. #endif
  561. if ( ( tmNextStopUs - sphMicroTimer() )<=0 )
  562. break; // timed out
  563. // wait until there is data
  564. sphLogDebugv ( "Still need to receive %d bytes...", iLen );
  565. iRes = pSock->SockPoll ( tmNextStopUs, false );
  566. // if there was EINTR, retry
  567. // if any other error, bail
  568. if ( iRes==-1 )
  569. {
  570. // only let SIGTERM (of all them) to interrupt, and only if explicitly allowed
  571. iErr = sphSockGetErrno();
  572. if ( iErr==EINTR )
  573. {
  574. if ( !( sphInterrupted () && bIntr ))
  575. continue;
  576. sphLogDebug( "SyncSockRead: select got SIGTERM, exit -1" );
  577. }
  578. return -1;
  579. }
  580. // if there was a timeout, report it as an error
  581. if ( iRes==0 )
  582. {
  583. #if EMULATE_EINTR
  584. // EINTR emulation
  585. if ( bIntr )
  586. {
  587. // got that SIGTERM
  588. if ( sphInterrupted() )
  589. {
  590. sphLogDebug ( "SyncSockRead: got SIGTERM emulation on Windows, exit -1" );
  591. sphSockSetErrno ( EINTR );
  592. return -1;
  593. }
  594. // timeout might not be fully over just yet, so re-loop
  595. continue;
  596. }
  597. #endif
  598. sphLogDebugv ( "return TIMEOUT..." );
  599. sphSockSetErrno( ETIMEDOUT );
  600. return -1;
  601. }
  602. // try to receive next chunk
  603. iRes = AsyncRecvNBChunk( pSock, pBuf, iSpace );
  604. sphLogDebugv ( "SyncSockRead: AsyncRecvNBChunk returned %d", iRes );
  605. // if there was eof, we're done
  606. if ( !iRes )
  607. {
  608. sphLogDebugv ( "SyncSockRead: connection reset" );
  609. sphSockSetErrno( ECONNRESET );
  610. return -1;
  611. }
  612. // if there was EINTR, retry
  613. // if any other error, bail
  614. if ( iRes==-1 )
  615. {
  616. // only let SIGTERM (of all them) to interrupt, and only if explicitly allowed
  617. iErr = sphSockGetErrno();
  618. if ( iErr==EINTR )
  619. {
  620. if ( !( sphInterrupted () && bIntr ))
  621. continue;
  622. sphLogDebug( "SyncSockRead: select got SIGTERM, exit -1" );
  623. }
  624. return -1;
  625. }
  626. iReceived += iRes;
  627. iLen-= iRes;
  628. // avoid partial buffer loss in case of signal during the 2nd (!) read
  629. bIntr = false;
  630. }
  631. // if there was a timeout, report it as an error
  632. if ( iLen>0 )
  633. {
  634. sphSockSetErrno( ETIMEDOUT );
  635. return -1;
  636. }
  637. return iReceived;
  638. }
  639. /////////////////////////////////////////////////////////////////////////////
  640. /// AsyncNetInputBuffer_c
  641. /////////////////////////////////////////////////////////////////////////////
  642. AsyncNetInputBuffer_c::AsyncNetInputBuffer_c ()
  643. : STORE ( NET_MINIBUFFER_SIZE )
  644. , InputBuffer_c ( m_pData, NET_MINIBUFFER_SIZE )
  645. {
  646. Resize ( 0 );
  647. m_iLen = 0;
  648. }
  649. Proto_e AsyncNetInputBuffer_c::Probe ( int iHardLimit, bool bLight )
  650. {
  651. Proto_e eResult = Proto_e::UNKNOWN;
  652. m_bIntr = false;
  653. int iRest = 0;
  654. if ( !HasBytes() )
  655. {
  656. iRest = GetRoomForTail ( iHardLimit );
  657. if ( !iRest )
  658. return eResult; // hard limit reached
  659. AppendData ( 0, Min ( iRest, 4096 ), true );
  660. }
  661. auto iHas = HasBytes();
  662. if (!iHas)
  663. {
  664. if ( bLight )
  665. {
  666. sphLogDebugv ( "+++++ Light probing revealed nothing, bail" );
  667. return eResult;
  668. }
  669. sphLogDebugv ( "+++++ Light probing revealed nothing, try blocking" );
  670. AppendData ( 1, Min ( iRest, 4096 ), true );
  671. iHas = HasBytes ();
  672. }
  673. StringBuilder_c sBytes;
  674. auto tBlob = Tail ();
  675. if ( tBlob.second >=4 )
  676. {
  677. if ( !memcmp (tBlob.first,"\0\0\0\1",4) )
  678. {
  679. sBytes << "SphinxAPI, usual byte order";
  680. eResult = Proto_e::SPHINX;
  681. }
  682. else if ( !memcmp ( tBlob.first, "GET", 3)
  683. || !memcmp ( tBlob.first, "POST", 4 )
  684. || !memcmp ( tBlob.first, "PUT", 3 )
  685. || !memcmp ( tBlob.first, "DELE", 4 ) )
  686. {
  687. eResult = Proto_e::HTTP;
  688. sBytes << "HTTP";
  689. }
  690. else if ( !memcmp ( tBlob.first, "\1\0\0\0", 4 ) )
  691. {
  692. sBytes << "SphinxAPI, inversed byte order";
  693. eResult = Proto_e::SPHINX;
  694. }
  695. else
  696. {
  697. eResult = Proto_e::HTTPS; // m.b. more accurate probe on ssl header, but not important right now
  698. sBytes << "Unknown, assume HTTPS";
  699. }
  700. } else
  701. {
  702. sBytes.StartBlock ( " ", "Short [", "]" );
  703. for ( int i = 0; i<tBlob.second; ++i )
  704. sBytes << tBlob.first[i];
  705. sBytes.FinishBlocks();
  706. }
  707. sphLogDebugv ( "+++++ Probing revealed %d bytes: %s", iHas, sBytes.cstr() );
  708. return eResult;
  709. }
  710. bool AsyncNetInputBuffer_c::ReadFrom( int iLen, bool bIntr )
  711. {
  712. m_bIntr = false;
  713. if ( iLen<=0 || iLen>g_iMaxPacketSize )
  714. return false;
  715. auto iRest = iLen - HasBytes();
  716. if ( iRest<=0 ) // lazy case: prev ReadFrom already achieved requested bytes
  717. return true;
  718. m_bError = AppendData ( iRest, iRest, bIntr )<iRest;
  719. return !m_bError;
  720. }
  721. // ensure iSpace bytes in buffer, then read at least iNeed, up to vector's GetLimit().
  722. // returns -1 on error, or N of appended bytes.
  723. int AsyncNetInputBuffer_c::AppendData ( int iNeed, int iSpace, bool bIntr )
  724. {
  725. assert ( iNeed<=iSpace );
  726. auto iPos = DiscardAndReserve ( int ( m_pCur-m_pBuf ), GetLength()+iSpace );
  727. iSpace = GetLimit ()-GetLength ();
  728. m_pBuf = ByteBlob_t ( *this ).first; // realign after possible reserve, byteblob ensures it is not nullptr
  729. m_pCur = m_pBuf+iPos;
  730. int iGot = ReadFromBackend ( iNeed, iSpace, bIntr );
  731. if ( sphInterrupted () && bIntr )
  732. {
  733. sphLogDebug ( "AsyncNetInputBuffer_c::AppendData: got SIGTERM, return -1" );
  734. m_bError = true;
  735. m_bIntr = true;
  736. return -1;
  737. }
  738. if ( iGot==-1 )
  739. {
  740. m_bError = true;
  741. auto iErr = sphSockPeekErrno ();
  742. m_bIntr = iErr==EINTR;
  743. sphLogDebug ( "AsyncNetInputBuffer_c::AppendData: error %d (%s) return -1", iErr, strerrorm ( iErr ) );
  744. return -1;
  745. }
  746. AddN ( iGot );
  747. m_iLen = GetLength();
  748. m_bIntr = false;
  749. return iGot;
  750. }
  751. int AsyncNetInputBuffer_c::ReadAny ( int iHardLimit )
  752. {
  753. m_bIntr = false;
  754. auto iRest = GetRoomForTail ( iHardLimit );
  755. if ( !iRest )
  756. return 0;
  757. return AppendData ( 1, Min ( iRest, 4096 ), true );
  758. }
  759. ByteBlob_t AsyncNetInputBuffer_c::Tail ()
  760. {
  761. return { m_pCur, HasBytes ()};
  762. }
  763. ByteBlob_t AsyncNetInputBuffer_c::PopTail ( int iSize )
  764. {
  765. const BYTE * pBuf = nullptr;
  766. auto iBufLen = HasBytes ();
  767. if ( iSize>=0 )
  768. {
  769. assert ( iSize <= iBufLen );
  770. iBufLen = iSize;
  771. }
  772. if ( iBufLen>0 && GetBytesZerocopy ( &pBuf, iBufLen ) )
  773. return { pBuf, iBufLen };
  774. else
  775. return { nullptr, 0 };
  776. }
  777. int AsyncNetInputBuffer_c::GetRoomForTail ( int iHardLimit )
  778. {
  779. if ( iHardLimit-m_iLen<=0 )
  780. DiscardProcessed ( -1 );
  781. return iHardLimit-m_iLen;
  782. }
  783. void AsyncNetInputBuffer_c::DiscardProcessed ( int iHowMany )
  784. {
  785. auto iPos = int ( m_pCur-m_pBuf );
  786. assert ( m_iLen == GetLength() );
  787. assert ( iHowMany >=-1 ); // we don't even expect values less then -1.
  788. assert ( iPos <= m_iLen );
  789. auto iOldLen = m_iLen;
  790. switch ( iHowMany ) {
  791. case 0:
  792. if ( iPos==m_iLen ) {
  793. Resize ( 0 );
  794. iHowMany = iPos;
  795. }
  796. break;
  797. case -1: iHowMany = iPos;
  798. default: Remove ( iPos-iHowMany, iHowMany );
  799. }
  800. m_pCur -= iHowMany;
  801. m_iLen = STORE::GetLength();
  802. sphLogDebugv ( "DiscardProcessed(%d) iPos=%d->0, iLen=%d->%d",
  803. iHowMany, (int)iPos, iOldLen, m_iLen );
  804. }
  805. BYTE AsyncNetInputBuffer_c::Terminate ( int iPos, BYTE uNewVal )
  806. {
  807. auto pPos = m_pCur + iPos;
  808. auto pLimit = m_pData + GetLimit();
  809. if ( pPos >= pLimit ) // no place for terminator
  810. {
  811. auto iIdx = m_pCur-m_pBuf;
  812. ReserveGap(1);
  813. m_pBuf = m_pData;
  814. m_pCur = m_pBuf+iIdx;
  815. pPos = m_pCur+iPos;
  816. }
  817. auto uOld = *pPos;
  818. *const_cast<BYTE*>(pPos) = uNewVal;
  819. return uOld;
  820. }
  821. /////////////////////////////////////////////////////////////////////////////
  822. /// AsyncBufferedSocket_c - provides wrapper for sending and receiving
  823. /////////////////////////////////////////////////////////////////////////////
  824. class AsyncBufferedSocket_c final : public AsyncNetBuffer_c
  825. {
  826. SockWrapperPtr_c m_pSocket;
  827. int ReadFromBackend ( int iNeed, int iHaveSpace, bool bIntr ) final
  828. {
  829. assert ( iNeed<=iHaveSpace );
  830. return SyncSockRead ( m_pSocket, AddN ( 0 ), iNeed, iHaveSpace, bIntr );
  831. }
  832. bool SendBuffer ( const VecTraits_T<BYTE> & dData ) final
  833. {
  834. assert ( m_pSocket );
  835. if ( dData.IsEmpty () )
  836. return true; // nothing to send
  837. CSphScopedProfile tProf ( m_pProfile, SPH_QSTATE_NET_WRITE );
  838. if ( SyncSend ( m_pSocket, (const char *) m_dBuf.begin (), m_dBuf.GetLength64 () ) )
  839. return true;
  840. NetGenericOutputBuffer_c::m_bError = true;
  841. return false;
  842. }
  843. public:
  844. explicit AsyncBufferedSocket_c ( SockWrapperPtr_c pSock )
  845. : m_pSocket ( std::move ( pSock ) )
  846. {
  847. }
  848. void SetWTimeoutUS ( int64_t iTimeoutUS ) final { m_pSocket->SetWTimeoutUS ( iTimeoutUS ); };
  849. int64_t GetWTimeoutUS () const final { return m_pSocket->GetWTimeoutUS (); }
  850. void SetTimeoutUS ( int64_t iTimeoutUS ) final { m_pSocket->SetTimeoutUS ( iTimeoutUS ); };
  851. int64_t GetTimeoutUS () const final { return m_pSocket->GetTimeoutUS (); }
  852. };
  853. // main fabric
  854. AsyncNetBufferPtr_c MakeAsyncNetBuffer ( SockWrapperPtr_c pSock )
  855. {
  856. return AsyncNetBufferPtr_c ( new AsyncBufferedSocket_c ( std::move ( pSock )));
  857. }