threadutils.cpp 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853
  1. //
  2. // Copyright (c) 2017-2026, Manticore Software LTD (https://manticoresearch.com)
  3. // Copyright (c) 2001-2016, Andrew Aksyonoff
  4. // Copyright (c) 2008-2016, Sphinx Technologies Inc
  5. // All rights reserved
  6. //
  7. // This program is free software; you can redistribute it and/or modify
  8. // it under the terms of the GNU General Public License. You should have
  9. // received a copy of the GPL license along with this program; if you
  10. // did not, you can find it at http://www.gnu.org/
  11. //
  12. #include "threadutils.h"
  13. #include <boost/context/detail/prefetch.hpp>
  14. #include <optional>
  15. #include "datetime.h"
  16. #if !_WIN32
  17. // UNIX-specific headers and calls
  18. #include <sys/syscall.h>
  19. #include <signal.h>
  20. #endif
  21. // for thr_self()
  22. #ifdef __FreeBSD__
  23. #include <sys/thr.h>
  24. #endif
  25. using namespace Threads;
  26. const char* TaskStateName ( TaskState_e eState )
  27. {
  28. switch (eState)
  29. {
  30. case TaskState_e::UNKNOWN: return "-";
  31. case TaskState_e::HANDSHAKE: return "handshake";
  32. case TaskState_e::NET_READ: return "net_read";
  33. case TaskState_e::NET_WRITE: return "net_write";
  34. case TaskState_e::QUERY: return "query";
  35. case TaskState_e::NET_IDLE: return "net_idle";
  36. case TaskState_e::RETIRED: return "- retired";
  37. }
  38. return "unknown";
  39. }
  40. const char* ProtoName ( Proto_e eProto )
  41. {
  42. switch ( eProto )
  43. {
  44. case Proto_e::UNKNOWN: return "-";
  45. case Proto_e::SPHINX:
  46. case Proto_e::SPHINXSE: return "sphinx";
  47. case Proto_e::MYSQL41: return "mysql";
  48. case Proto_e::HTTP: return "http";
  49. case Proto_e::HTTPS: return "https";
  50. case Proto_e::REPLICATION: return "replication";
  51. default: break;
  52. }
  53. return "unknown";
  54. }
  55. const char * RelaxedProtoName ( Proto_e eProto )
  56. {
  57. switch ( eProto )
  58. {
  59. case Proto_e::UNKNOWN: return "-";
  60. case Proto_e::MYSQL41: return "mysql";
  61. case Proto_e::REPLICATION: return "replication";
  62. case Proto_e::SPHINX:
  63. case Proto_e::HTTP: return "sphinx and http(s)";
  64. case Proto_e::HTTPS: return "https";
  65. case Proto_e::SPHINXSE: return "sphinx (to connect from SphinxSE)";
  66. default: break;
  67. }
  68. return "unknown";
  69. }
  70. int GetOsThreadId ()
  71. {
  72. #if _WIN32
  73. return GetCurrentThreadId();
  74. #elif defined ( __APPLE__ )
  75. uint64_t tid;
  76. pthread_threadid_np(NULL, &tid);
  77. return tid;
  78. #elif defined(SYS_gettid)
  79. return syscall ( SYS_gettid );
  80. #elif defined(__FreeBSD__)
  81. long tid;
  82. thr_self(&tid);
  83. return (int)tid;
  84. #else
  85. return 0;
  86. #endif
  87. }
  88. int GetOsProcessId()
  89. {
  90. #if _WIN32
  91. return GetCurrentProcessId();
  92. #else
  93. return getpid();
  94. #endif
  95. }
  96. #include "event.h"
  97. #include <atomic>
  98. //////////////////////////////////////////////////////////////////////////
  99. /// functional threadpool with minimum footprint
  100. #define LOG_LEVEL_DEBUG false
  101. #define LOG_LEVEL_DETAIL false
  102. #define LOG_LEVEL_ALONE LOG_LEVEL_DETAIL
  103. //#define LOG_LEVEL_ALONE true
  104. namespace Threads {
  105. namespace logdetail {
  106. const char* name() { return MyThd ().m_sThreadName.cstr(); }
  107. }
  108. #define LOG_COMPONENT_MT "(" << GetOsThreadId() << ") " << logdetail::name() << ": "
  109. using Operation_t = Threads::details::SchedulerOperation_t;
  110. using OpSchedule_t = Threads::details::OpQueue_T<Operation_t>;
  111. struct TaskServiceThreadInfo_t
  112. {
  113. OpSchedule_t m_dPrivateQueue;
  114. long m_iPrivateOutstandingWork = 0;
  115. };
  116. class TaskService_t
  117. {
  118. public:
  119. using operation = Operation_t;
  120. };
  121. // Helper class to determine whether or not the current thread is inside an
  122. // invocation of Service_t::run() for a specified service object.
  123. // That may be used to optimize codeflow (like place continuation without locks)
  124. template<typename Key, typename Value = BYTE>
  125. class CallStack_c
  126. {
  127. public:
  128. // Context class automatically pushes the key/value pair on to the stack.
  129. class Context_c : public ISphNoncopyable
  130. {
  131. Key * m_pService; // The key associated with the context.
  132. Value * m_pThisContext; // The value associated with the context.
  133. Context_c * m_pNext; // The next element in the stack.
  134. friend class CallStack_c<Key, Value>;
  135. public:
  136. // Push the key on to the stack.
  137. explicit Context_c ( Key * pService )
  138. : m_pService ( pService ),
  139. m_pNext ( CallStack_c<Key, Value>::m_pTop )
  140. {
  141. m_pThisContext = (Value *)this;
  142. CallStack_c<Key, Value>::m_pTop = this;
  143. }
  144. // Push the key/value pair on to the stack.
  145. Context_c ( Key * pKey, Value & v )
  146. : m_pService ( pKey ),
  147. m_pThisContext ( &v ),
  148. m_pNext ( CallStack_c<Key, Value>::m_pTop )
  149. {
  150. CallStack_c<Key, Value>::m_pTop = this;
  151. }
  152. // Pop the key/value pair from the stack.
  153. ~Context_c ()
  154. {
  155. CallStack_c<Key, Value>::m_pTop = m_pNext;
  156. }
  157. // Find the next context with the same key.
  158. Value * NextByKey () const noexcept
  159. {
  160. for ( auto* pElem = m_pNext; pElem!=nullptr; pElem = pElem->m_pNext )
  161. if ( pElem->m_pService==m_pService )
  162. return pElem->m_pThisContext;
  163. return nullptr;
  164. }
  165. };
  166. friend class Context_c;
  167. // Determine whether the specified owner is on the stack.
  168. // Returns address of key, if present, nullptr otherwise.
  169. static Value * Contains ( Key * pKey ) noexcept
  170. {
  171. for ( auto* pElem = m_pTop; pElem!=nullptr; pElem = pElem->m_pNext )
  172. if ( pElem->m_pService==pKey )
  173. return pElem->m_pThisContext;
  174. return nullptr;
  175. }
  176. // Obtain the value at the top of the stack.
  177. static Value * Top () noexcept
  178. {
  179. Context_c * pElem = m_pTop;
  180. return pElem ? pElem->m_pThisContext : nullptr;
  181. }
  182. private:
  183. // The top of the stack of calls for the current thread.
  184. static thread_local Context_c* m_pTop;
  185. };
  186. template<typename Key, typename Value>
  187. thread_local typename CallStack_c<Key, Value>::Context_c* CallStack_c<Key,Value>::m_pTop = nullptr;
  188. //struct Service_i
  189. //{
  190. // virtual ~Service_i() {}
  191. // virtual void run() = 0;
  192. // virtual void reset() = 0;
  193. //};
  194. #define LOG_LEVEL_WORKS false
  195. #define LOG_LEVEL_ST false
  196. #define LOG_LEVEL_SERVICE LOG_LEVEL_DETAIL
  197. //#define LOG_LEVEL_SERVICE true
  198. #define LOG_COMPONENT_SVC LOG_COMPONENT_MT << " [" << &m_iOutstandingWork << "]=" << m_iOutstandingWork
  199. /// performs tasks pushed with post() in one or many threads until they done.
  200. /// Naming convention of members is inherited from boost::asio as drop-in replacement.
  201. struct Service_t : public TaskService_t//, public Service_i
  202. {
  203. std::atomic<long> m_iOutstandingWork {0}; /// count of unfinished works
  204. mutable CSphMutex m_dMutex; /// protect access to internal data
  205. bool m_bStopped = false; /// dispatcher has been stopped.
  206. bool m_bOneThread; /// optimize for single-threaded use case
  207. sph::Event_c m_tWakeupEvent; /// event to wake up blocked threads
  208. OpSchedule_t m_OpQueue GUARDED_BY ( m_dMutex ); /// The queue of handlers that are ready to be delivered
  209. OpSchedule_t m_OpVipQueue GUARDED_BY ( m_dMutex ); /// The queue of handlers that have to be delivered BEFORE OpQueue
  210. // Per-thread call stack to track the state of each thread in the service.
  211. using ThreadCallStack_c = CallStack_c<Service_t, TaskServiceThreadInfo_t>;
  212. class Work_c; /// Scoped RAII work to keep service running. calls work_started/work_finished
  213. friend class Work_c;
  214. public:
  215. explicit Service_t (bool bOneThread)
  216. : m_bOneThread ( bOneThread ) {}
  217. inline void post_op ( Service_t::operation* pOp) // post into secondary queue
  218. {
  219. post_immediate_completion ( pOp, false );
  220. }
  221. inline void defer_op ( Service_t::operation* pOp ) // post into primary queue
  222. {
  223. post_immediate_completion ( pOp, true );
  224. }
  225. void post_continuation ( Service_t::operation * pOp )
  226. {
  227. auto * pThisThread = ThreadCallStack_c::Contains ( this );
  228. if ( pThisThread )
  229. {
  230. ++pThisThread->m_iPrivateOutstandingWork;
  231. pThisThread->m_dPrivateQueue.Push ( pOp );
  232. LOG ( SERVICE, SVC ) << "post this";
  233. return;
  234. }
  235. work_started ();
  236. ScopedMutex_t dLock ( m_dMutex );
  237. LOG ( SERVICE, SVC ) << "post";
  238. m_OpVipQueue.Push ( pOp );
  239. wake_one_thread_and_unlock ( dLock );
  240. }
  241. void post_immediate_completion ( Service_t::operation * pOp, bool bVip )
  242. {
  243. if ( m_bOneThread )
  244. {
  245. auto * pThisThread = ThreadCallStack_c::Contains ( this );
  246. if ( pThisThread )
  247. {
  248. ++pThisThread->m_iPrivateOutstandingWork;
  249. pThisThread->m_dPrivateQueue.Push ( pOp );
  250. LOG ( SERVICE, SVC ) << "post this";
  251. return;
  252. }
  253. }
  254. work_started ();
  255. ScopedMutex_t dLock ( m_dMutex );
  256. LOG ( SERVICE, MT ) << "post";
  257. if ( bVip )
  258. m_OpVipQueue.Push ( pOp );
  259. else
  260. m_OpQueue.Push ( pOp );
  261. wake_one_thread_and_unlock ( dLock );
  262. }
  263. void run ( std::atomic<bool>& bBusy ) NO_THREAD_SAFETY_ANALYSIS //override
  264. {
  265. LOG ( SERVICE, SVC ) << "run " << m_iOutstandingWork << " st:" << !!m_bStopped;
  266. if ( m_iOutstandingWork==0 )
  267. {
  268. LOG ( WORKS, MT ) << "run m_iOutstandingWork " << m_iOutstandingWork << " " << &m_iOutstandingWork<< " stop!";
  269. stop();
  270. return;
  271. }
  272. TaskServiceThreadInfo_t dThisThread;
  273. dThisThread.m_iPrivateOutstandingWork = 0;
  274. ThreadCallStack_c::Context_c dCtx ( this, dThisThread );
  275. ScopedMutex_t dLock ( m_dMutex );
  276. while ( do_run_one ( dLock, dThisThread, bBusy ) )
  277. dLock.Lock ();
  278. }
  279. bool queue_empty() const REQUIRES ( m_dMutex )
  280. {
  281. return m_OpQueue.Empty () && m_OpVipQueue.Empty ();
  282. }
  283. inline bool do_run_one ( ScopedMutex_t& dLock, TaskServiceThreadInfo_t& this_thread, std::atomic<bool>& bBusy ) noexcept
  284. REQUIRES ( dLock ) RELEASE ( dLock ) TRY_ACQUIRE ( false, dLock )
  285. {
  286. while ( !m_bStopped )
  287. {
  288. LOG ( SERVICE, MT ) << "locked " << dLock.Locked();
  289. assert ( dLock.Locked ());
  290. if ( queue_empty() )
  291. {
  292. m_tWakeupEvent.Clear ( dLock );
  293. m_tWakeupEvent.Wait ( dLock );
  294. continue;
  295. }
  296. auto & dOpQueue = m_OpVipQueue.Empty () ? m_OpQueue : m_OpVipQueue;
  297. auto * pOp = dOpQueue.Front ();
  298. dOpQueue.Pop ();
  299. if ( !queue_empty () && !m_bOneThread )
  300. wake_one_thread_and_unlock ( dLock );
  301. else
  302. dLock.Unlock ();
  303. bBusy.store ( true, std::memory_order_relaxed );
  304. boost::context::detail::prefetch_range ( pOp, sizeof ( Operation_t ) );
  305. pOp->Complete (this);
  306. bBusy.store ( false, std::memory_order_relaxed );
  307. LOG ( SERVICE, MT ) << "completed & unlocked";
  308. if ( this_thread.m_iPrivateOutstandingWork>1 )
  309. {
  310. m_iOutstandingWork += this_thread.m_iPrivateOutstandingWork-1;
  311. LOG ( WORKS, MT ) << "do_run_one m_iOutstandingWork " << m_iOutstandingWork << " " << &m_iOutstandingWork;
  312. }
  313. else if ( this_thread.m_iPrivateOutstandingWork<1 )
  314. work_finished ();
  315. this_thread.m_iPrivateOutstandingWork = 0;
  316. if ( !this_thread.m_dPrivateQueue.Empty ())
  317. {
  318. dLock.Lock ();
  319. m_OpVipQueue.Push ( this_thread.m_dPrivateQueue );
  320. }
  321. return true;
  322. }
  323. return false;
  324. }
  325. void stop()
  326. {
  327. LOG ( SERVICE, SVC ) << "stop";
  328. ScopedMutex_t dLock ( m_dMutex );
  329. stop_all_threads ( dLock );
  330. }
  331. bool stopped () const
  332. {
  333. ScopedMutex_t dLock ( m_dMutex );
  334. return m_bStopped;
  335. }
  336. void reset () //override
  337. {
  338. LOG ( DETAIL, MT ) << "reset stopped ";
  339. ScopedMutex_t dLock ( m_dMutex );
  340. m_bStopped = false;
  341. }
  342. // Notify that some work has started.
  343. void work_started ()
  344. {
  345. LOG ( SERVICE, SVC ) << "work_started from " << m_iOutstandingWork;
  346. ++m_iOutstandingWork;
  347. LOG ( WORKS, MT ) << "work_started m_iOutstandingWork " << m_iOutstandingWork << " " << &m_iOutstandingWork;
  348. }
  349. // Notify that some work has finished.
  350. void work_finished ()
  351. {
  352. LOG ( SERVICE, SVC ) << "work_finished to " << m_iOutstandingWork-1;
  353. if ( --m_iOutstandingWork==0 )
  354. stop ();
  355. LOG ( WORKS, MT ) << "work_finished m_iOutstandingWork " << m_iOutstandingWork << " " << &m_iOutstandingWork;
  356. }
  357. void stop_all_threads ( ScopedMutex_t & dLock ) REQUIRES ( dLock )
  358. {
  359. m_bStopped = true;
  360. m_tWakeupEvent.SignalAll ( dLock );
  361. }
  362. void wake_one_thread_and_unlock ( ScopedMutex_t & dLock ) REQUIRES ( dLock ) RELEASE ( dLock )
  363. {
  364. if ( !m_tWakeupEvent.MaybeUnlockAndSignalOne ( dLock ))
  365. dLock.Unlock ();
  366. }
  367. long works() const noexcept
  368. {
  369. return m_iOutstandingWork;
  370. }
  371. NTasks_t tasks() const
  372. {
  373. ScopedMutex_t dLock ( m_dMutex );
  374. return { (int)m_OpVipQueue.GetLength(), (int)m_OpQueue.GetLength() };
  375. }
  376. };
  377. /// helper to hold the service running
  378. class Service_t::Work_c
  379. {
  380. Service_t& m_tServiceRef;
  381. public:
  382. explicit Work_c( Service_t& tService )
  383. : m_tServiceRef (tService)
  384. {
  385. m_tServiceRef.work_started ();
  386. }
  387. Work_c ( const Work_c& tOther)
  388. : m_tServiceRef ( tOther.m_tServiceRef )
  389. {
  390. m_tServiceRef.work_started ();
  391. }
  392. Work_c & operator= ( const Work_c & ) = delete;
  393. ~Work_c()
  394. {
  395. m_tServiceRef.work_finished();
  396. }
  397. };
  398. #define LOG_COMPONENT_TP LOG_COMPONENT_MT << ": "
  399. #define LOG_COMPONENT_ST LOG_COMPONENT_MT << " strand: "
  400. // strand - sequental scheduler. Operations executed strictly sequentally and in FIFO order.
  401. // It looks like 'single thread', but actual thread is provided from backend and may change.
  402. class Strand_c final : public SchedulerWithBackend_i
  403. {
  404. struct StrandWorker_t final : public ISphRefcountedMT
  405. {
  406. CSphMutex m_dMutex;
  407. bool m_bLocked GUARDED_BY ( m_dMutex ) = false;
  408. OpSchedule_t m_OpWaitQueue GUARDED_BY ( m_dMutex ); /// The queue for the next run
  409. OpSchedule_t m_OpReadyQueue; /// The queue for current run
  410. // strand has no backend thread/threadpool and works over another scheduler.
  411. Scheduler_i* m_pBackend = nullptr;
  412. inline bool Enqueue ( Threads::details::SchedulerOperation_t* pOp )
  413. {
  414. ScopedMutex_t tLock ( m_dMutex );
  415. if ( m_bLocked )
  416. {
  417. m_OpWaitQueue.Push ( pOp );
  418. LOG ( ST, ST ) << " enqueued to wait queue, was locked " << pOp;
  419. return false;
  420. }
  421. m_bLocked = true;
  422. tLock.Unlock();
  423. m_OpReadyQueue.Push ( pOp );
  424. LOG ( ST, ST ) << " enqueued to ready queue, locked " << pOp;
  425. return true;
  426. }
  427. // try to execute immediately, or then post to primary queue
  428. void PostContinuationToBackend ( Threads::details::SchedulerOperation_t* pOp ) const
  429. {
  430. // Add the function to the strand and schedule the strand if required.
  431. if ( m_pBackend )
  432. m_pBackend->ScheduleContinuationOp ( pOp );
  433. }
  434. inline Keeper_t KeepWorking() const
  435. {
  436. assert ( m_pBackend );
  437. return m_pBackend->KeepWorking();
  438. }
  439. protected:
  440. ~StrandWorker_t() final = default;
  441. };
  442. using StrandWorkerPtr_t = CSphRefcountedPtr<StrandWorker_t>;
  443. StrandWorkerPtr_t m_pWorker;
  444. const char* m_szName = nullptr;
  445. // Per-thread call stack to track the state of each thread in the service.
  446. using StrandCallStack_c = CallStack_c<StrandWorker_t>;
  447. class Invoker_c
  448. {
  449. StrandWorkerPtr_t m_pOwner;
  450. Keeper_t m_tParentKeeper;
  451. public:
  452. explicit Invoker_c ( StrandWorkerPtr_t pRand );
  453. Invoker_c ( const Invoker_c& rhs ) = default;
  454. Invoker_c ( Invoker_c && rhs ) noexcept;
  455. Invoker_c & operator= ( Invoker_c && rhs ) noexcept;
  456. void run ();
  457. };
  458. friend class Invoker_c;
  459. inline bool Enqueue ( Threads::details::SchedulerOperation_t* pOp )
  460. {
  461. assert ( m_pWorker );
  462. return m_pWorker->Enqueue ( pOp );
  463. }
  464. void PostContinuationImpl ( Threads::details::SchedulerOperation_t* pOp ) // try to execute immediately, or then post to primary queue
  465. {
  466. auto bThisThread = !!StrandCallStack_c::Contains ( m_pWorker );
  467. if ( bThisThread )
  468. {
  469. LOG ( ST, ST ) << "PostContinuation fast in this thread";
  470. Threads::JobTracker_t dTrack;
  471. // barrier ensures that no operations till here would be reordered below.
  472. std::atomic_thread_fence ( std::memory_order_acquire );
  473. pOp->Complete(pOp);
  474. std::atomic_thread_fence ( std::memory_order_release );
  475. LOG ( ST, ST ) << "strand PostContinuation performed without queuing";
  476. return;
  477. }
  478. bool bFirst = Enqueue ( pOp );
  479. // Add the function to the strand and schedule the strand if required.
  480. if ( bFirst )
  481. {
  482. Invoker_c tInvoker { m_pWorker };
  483. m_pWorker->PostContinuationToBackend ( Threads::details::Handler2Op ( [t = std::move ( tInvoker )]() mutable { t.run(); } ) );
  484. }
  485. }
  486. public:
  487. explicit Strand_c ( Scheduler_i* pBackend, const char* szName=nullptr )
  488. : m_pWorker { new StrandWorker_t }
  489. , m_szName { szName }
  490. {
  491. m_pWorker->m_pBackend = pBackend;
  492. LOGINFO ( TPLIFE, TP ) << "Strand_c created";
  493. }
  494. void ScheduleOp ( Threads::details::SchedulerOperation_t* pOp, bool bVip ) noexcept
  495. {
  496. LOG ( ST, ST ) << "Post";
  497. bool bFirst = Enqueue ( pOp );
  498. if ( bFirst && m_pWorker->m_pBackend )
  499. {
  500. LOG ( ST, ST ) << "Post scheduled invoker to backend";
  501. Invoker_c tInvoker { m_pWorker };
  502. m_pWorker->m_pBackend->Schedule ( [t=std::move(tInvoker)] () mutable { t.run (); }, bVip );
  503. }
  504. LOG ( ST, ST ) << "Post finished";
  505. }
  506. void ScheduleContinuationOp ( Threads::details::SchedulerOperation_t* pOp ) noexcept
  507. {
  508. LOG ( ST, ST ) << "ScheduleContinuation";
  509. PostContinuationImpl ( pOp );
  510. LOG ( ST, ST ) << "Post finished";
  511. }
  512. Keeper_t KeepWorking () noexcept
  513. {
  514. assert ( m_pWorker );
  515. return m_pWorker->KeepWorking();
  516. }
  517. bool SetBackend ( Scheduler_i* pBackend ) noexcept
  518. {
  519. assert ( m_pWorker );
  520. ScopedMutex_t tLock ( m_pWorker->m_dMutex );
  521. if ( m_pWorker->m_bLocked )
  522. {
  523. if ( m_pWorker->m_pBackend ) // everything healthy and work, can't change right now
  524. return false;
  525. assert ( !m_pWorker->m_pBackend );
  526. m_pWorker->m_pBackend = pBackend;
  527. tLock.Unlock();
  528. Invoker_c tInvoker { m_pWorker };
  529. m_pWorker->PostContinuationToBackend ( Threads::details::Handler2Op ( [t = std::move ( tInvoker )]() mutable { t.run(); } ) );
  530. }
  531. m_pWorker->m_pBackend = pBackend;
  532. return true;
  533. }
  534. const char * Name () const noexcept { return m_szName; }
  535. };
  536. Strand_c::Invoker_c::Invoker_c ( StrandWorkerPtr_t pRand )
  537. : m_pOwner { std::move(pRand) }
  538. , m_tParentKeeper { m_pOwner->KeepWorking() }
  539. {}
  540. Strand_c::Invoker_c::Invoker_c ( Strand_c::Invoker_c && rhs ) noexcept
  541. : m_pOwner ( rhs.m_pOwner )
  542. {
  543. m_tParentKeeper.Swap ( rhs.m_tParentKeeper );
  544. }
  545. Strand_c::Invoker_c & Strand_c::Invoker_c::operator= ( Strand_c::Invoker_c && rhs ) noexcept
  546. {
  547. m_tParentKeeper.Swap ( rhs.m_tParentKeeper );
  548. m_pOwner = rhs.m_pOwner;
  549. return *this;
  550. }
  551. void Strand_c::Invoker_c::run ()
  552. {
  553. struct OnInvokerFinished_t
  554. {
  555. Strand_c::Invoker_c* m_pThis;
  556. ~OnInvokerFinished_t()
  557. {
  558. bool bMoreHandlers;
  559. auto& pOwner = m_pThis->m_pOwner;
  560. {
  561. ScopedMutex_t tLock ( pOwner->m_dMutex );
  562. pOwner->m_OpReadyQueue.Push ( pOwner->m_OpWaitQueue );
  563. bMoreHandlers = pOwner->m_bLocked = !pOwner->m_OpReadyQueue.Empty ();
  564. }
  565. LOG ( ST, ST ) << "OnInvokerFinished_t: " << bMoreHandlers;
  566. if ( !bMoreHandlers )
  567. {
  568. LOG ( ST, ST ) << "OnInvokerFinished_t, abandoned, unlocked";
  569. return;
  570. }
  571. LOG ( ST, ST ) << "OnInvokerFinished_t, have more, locked";
  572. Strand_c::Invoker_c tInvoker { *m_pThis };
  573. // pOwner->Schedule ( [t=std::move(tInvoker)] () mutable { t.run (); }, true );
  574. pOwner->PostContinuationToBackend ( Threads::details::Handler2Op ( [t = std::move ( tInvoker )]() mutable { t.run(); } ) );
  575. }
  576. };
  577. StrandCallStack_c::Context_c dCtx ( m_pOwner );
  578. // that will ensure the next handler, if any, will be scheduled on block exit
  579. OnInvokerFinished_t VARIABLE_IS_NOT_USED dOnFinished = { this };
  580. // Run all ready handlers. No lock is required since the ready queue is
  581. // accessed only within the strand.
  582. while ( !m_pOwner->m_OpReadyQueue.Empty () )
  583. {
  584. auto * pOp = m_pOwner->m_OpReadyQueue.Front ();
  585. m_pOwner->m_OpReadyQueue.Pop ();
  586. LOG ( ST, ST ) << "run op: " << pOp;
  587. boost::context::detail::prefetch_range ( pOp, sizeof ( Operation_t ) );
  588. pOp->Complete ( pOp );
  589. }
  590. }
  591. class ThreadPool_c final : public Worker_i
  592. {
  593. using Work = Service_t::Work_c;
  594. const char * m_szName = nullptr;
  595. Service_t m_tService;
  596. std::optional<Work> m_dWork;
  597. CSphMutex m_dMutex;
  598. std::atomic<bool> m_bStop {false};
  599. struct alignas ( 64 ) Thd_t // alignas cacheline, to freely access m_bBusy without cache poisoning
  600. {
  601. std::atomic<bool> m_bBusy { false };
  602. SphThread_t m_tThread;
  603. LowThreadDesc_t* m_pChild = nullptr;
  604. };
  605. // support iteration over children for show threads and hazards
  606. mutable RwLock_t m_dChildGuard;
  607. CSphFixedVector<Thd_t> m_dThreads { 0 };
  608. void Post ( Threads::details::SchedulerOperation_t* pOp, bool bVip = false ) // post to primary (vip) or secondary queue
  609. {
  610. LOG ( DETAIL, TP ) << "Post " << bVip;
  611. if ( bVip )
  612. m_tService.defer_op ( pOp );
  613. else
  614. m_tService.post_op ( pOp );
  615. LOG ( DETAIL, TP ) << "Post finished";
  616. }
  617. void PostContinuation ( Threads::details::SchedulerOperation_t* pOp ) // 'very vip' - try to execute immediately, or post to the primary queue
  618. {
  619. LOG ( DETAIL, TP ) << "PostContinuation";
  620. m_tService.post_continuation ( pOp );
  621. LOG ( DETAIL, TP ) << "Post finished";
  622. }
  623. // Service_i & Service ()
  624. // {
  625. // return m_tService;
  626. // }
  627. void createWork ()
  628. {
  629. m_dWork.emplace ( m_tService );
  630. }
  631. void loop (int iChild) NO_THREAD_SAFETY_ANALYSIS
  632. {
  633. {
  634. ScWL_t _ ( m_dChildGuard );
  635. m_dThreads[iChild].m_pChild = &MyThd ();
  636. }
  637. while (true)
  638. {
  639. m_tService.run ( m_dThreads[iChild].m_bBusy );
  640. ScopedMutex_t dLock {m_dMutex};
  641. if ( m_bStop )
  642. break;
  643. if ( !m_dWork )
  644. {
  645. createWork ();
  646. m_tService.reset ();
  647. }
  648. }
  649. ScWL_t _ ( m_dChildGuard );
  650. m_dThreads[iChild].m_pChild = nullptr;
  651. }
  652. public:
  653. ThreadPool_c ( size_t iThreadCount, const char * szName )
  654. : m_szName {szName}
  655. , m_tService ( iThreadCount==1 )
  656. {
  657. createWork ();
  658. m_dThreads.Reset ( (int) iThreadCount );
  659. ARRAY_FOREACH ( i, m_dThreads )
  660. Threads::CreateQ ( &m_dThreads[i].m_tThread, [this,i] { loop (i); }, false, m_szName, i );
  661. LOG ( DEBUG, TP ) << "thread pool created with threads: " << iThreadCount;
  662. LOGINFO ( TPLIFE, TP ) << "thread pool created with threads: " << iThreadCount;
  663. }
  664. ~ThreadPool_c ()
  665. {
  666. LOGINFO ( TPLIFE, TP ) << "thread pool destroying";
  667. StopAll();
  668. ScWL_t _ ( m_dChildGuard ); // that will keep children list if smbody still iterates over it
  669. }
  670. void DiscardOnFork () final
  671. {
  672. ScWL_t _ ( m_dChildGuard );
  673. m_dThreads.Reset ( 0 );
  674. }
  675. void ScheduleOp ( Threads::details::SchedulerOperation_t* pOp, bool bVip ) noexcept
  676. {
  677. Post ( pOp, bVip );
  678. }
  679. void ScheduleContinuationOp ( Threads::details::SchedulerOperation_t* pOp ) noexcept
  680. {
  681. PostContinuation ( pOp );
  682. }
  683. #define LOG_LEVEL_SERVICE_KEEP_MT false
  684. #if LOG_LEVEL_SERVICE_KEEP_MT
  685. static intptr_t KeepWorkingID()
  686. {
  687. static std::atomic<intptr_t> uWorker { 0ULL };
  688. return uWorker.fetch_add ( 1, std::memory_order_relaxed );
  689. }
  690. Keeper_t KeepWorking() final
  691. {
  692. m_tService.work_started();
  693. auto kwid = KeepWorkingID();
  694. LOGINFO ( SERVICE_KEEP_MT, MT ) << "KeepWorking " << kwid;
  695. return { (void*)kwid, [this] ( void* kwid ) {
  696. m_tService.work_finished (); // divided to lines for breakpoints
  697. LOGINFO ( SERVICE_KEEP_MT, MT ) << "KeepWorking finished " << (intptr_t)kwid; } };
  698. }
  699. #else
  700. Keeper_t KeepWorking() noexcept
  701. {
  702. m_tService.work_started();
  703. return { nullptr, [this] ( void* ) { m_tService.work_finished(); } };
  704. }
  705. #endif
  706. int WorkingThreads () const noexcept NO_THREAD_SAFETY_ANALYSIS
  707. {
  708. return m_dThreads.GetLength ();
  709. }
  710. int Works () const noexcept
  711. {
  712. return (int)m_tService.works ();
  713. }
  714. NTasks_t Tasks() const noexcept
  715. {
  716. return m_tService.tasks();
  717. }
  718. int CurTasks() const noexcept NO_THREAD_SAFETY_ANALYSIS
  719. {
  720. return (int)m_dThreads.count_of ( [] ( auto& i ) { return i.m_bBusy.load ( std::memory_order_relaxed ); } );
  721. }
  722. void IterateChildren ( ThreadFN& fnHandler ) noexcept
  723. {
  724. ScRL_t _ ( m_dChildGuard );
  725. for ( const auto& tThd : m_dThreads )
  726. fnHandler ( tThd.m_pChild );
  727. }
  728. void StopAll () NO_THREAD_SAFETY_ANALYSIS
  729. {
  730. ScopedMutex_t dLock { m_dMutex };
  731. m_bStop = true;
  732. m_dWork.reset ();
  733. if ( sphIsDied() )
  734. m_tService.stop();
  735. dLock.Unlock ();
  736. LOG ( DEBUG, TP ) << "stopping thread pool";
  737. LOGINFO ( TPLIFE, TP ) << "stopping thread pool";
  738. for ( auto & dThread : m_dThreads )
  739. Threads::Join ( &dThread.m_tThread );
  740. LOG ( DEBUG, TP ) << "thread pool stopped";
  741. LOGINFO ( TPLIFE, TP ) << "thread pool stopped";
  742. m_dThreads.Reset ( 0 );
  743. }
  744. };
  745. class AloneThread_c final : public Worker_i
  746. {
  747. CSphString m_sName;
  748. int m_iThreadNum;
  749. Service_t m_tService;
  750. std::atomic<bool> m_bStarted {false};
  751. std::atomic<bool> m_bBusy {false};
  752. static int m_iRunningAlones;
  753. void Post ( Service_t::operation* pOp, bool bVip=false ) // post to primary (vip) or secondary queue
  754. {
  755. LOG ( DETAIL, TP ) << "Post " << bVip;
  756. if ( bVip )
  757. m_tService.defer_op ( pOp );
  758. else
  759. m_tService.post_op ( pOp );
  760. LOG ( DETAIL, TP ) << "Post finished";
  761. if ( !m_bStarted )
  762. {
  763. m_bStarted = true;
  764. SphThread_t tThd; // dummy, since we're starting detached
  765. Threads::CreateQ ( &tThd, [this] { loop (); }, true, m_sName.cstr (), m_iThreadNum );
  766. LOG ( DEBUG, TP ) << "alone thread created";
  767. }
  768. }
  769. void loop ()
  770. {
  771. Detached::AddThread ( &MyThd () );
  772. m_tService.run ( m_bBusy );
  773. Detached::RemoveThread ( &MyThd () );
  774. delete this;
  775. }
  776. public:
  777. explicit AloneThread_c ( int iNum, const char * szName )
  778. : m_sName {szName}
  779. , m_iThreadNum ( iNum )
  780. , m_tService ( true ) // true means 'single-thread'
  781. {
  782. ++m_iRunningAlones;
  783. LOG ( DEBUG, TP ) << "alone worker created " << szName;
  784. }
  785. ~AloneThread_c ()
  786. {
  787. LOG ( DEBUG, TP ) << "stopping thread";
  788. --m_iRunningAlones;
  789. LOG ( DEBUG, TP ) << "thread stopped";
  790. LOGINFO ( TPLIFE, TP ) << "AloneThread_c destroyed";
  791. }
  792. void ScheduleOp ( Service_t::operation* pOp , bool bVip ) noexcept
  793. {
  794. Post ( pOp, bVip );
  795. }
  796. #define LOG_LEVEL_SERVICE_KEEP_ALONE false
  797. #if LOG_LEVEL_SERVICE_KEEP_ALONE
  798. static intptr_t KeepWorkingID()
  799. {
  800. static std::atomic<intptr_t> uWorker { 0ULL };
  801. return uWorker.fetch_add ( 1, std::memory_order_relaxed );
  802. }
  803. Keeper_t KeepWorking() noexcept
  804. {
  805. m_tService.work_started();
  806. auto kwid = KeepWorkingID();
  807. LOGINFO ( SERVICE_KEEP_ALONE, MT ) << "KeepWorking alone " << this << " " << kwid;
  808. return { (void*)kwid, [this] ( void* kwid ) {
  809. m_tService.work_finished (); // divided to lines for breakpoints
  810. LOGINFO ( SERVICE_KEEP_ALONE, MT ) << "KeepWorking alone inished " << this << " " << (intptr_t)kwid; } };
  811. }
  812. #else
  813. Keeper_t KeepWorking() noexcept
  814. {
  815. m_tService.work_started();
  816. return { nullptr, [this] ( void* ) { m_tService.work_finished(); } };
  817. }
  818. #endif
  819. void StopAll () {}
  820. static int GetRunners () { return m_iRunningAlones; }
  821. int Works () const
  822. {
  823. return GetRunners ();
  824. }
  825. NTasks_t Tasks() const noexcept
  826. {
  827. return m_tService.tasks();
  828. }
  829. int CurTasks() const noexcept
  830. {
  831. return !!m_bBusy.load(std::memory_order_relaxed);
  832. }
  833. const char* Name() const noexcept
  834. {
  835. return m_sName.cstr();
  836. }
  837. };
  838. int AloneThread_c::m_iRunningAlones = 0;
  839. class ShedulerWrapper_c final : public Scheduler_i
  840. {
  841. Scheduler_i* m_pScheduler; // not owned
  842. const char* m_szName;
  843. public:
  844. ShedulerWrapper_c ( Scheduler_i* pScheduler, const char* szName ) noexcept
  845. : m_pScheduler { pScheduler }
  846. , m_szName { szName }
  847. {}
  848. void ScheduleOp ( details::SchedulerOperation_t* pOp, bool bVip ) noexcept
  849. {
  850. m_pScheduler->ScheduleOp ( pOp, bVip );
  851. }
  852. void ScheduleContinuationOp ( details::SchedulerOperation_t* pOp ) noexcept
  853. {
  854. m_pScheduler->ScheduleContinuationOp ( pOp );
  855. }
  856. Keeper_t KeepWorking() noexcept
  857. {
  858. return m_pScheduler->KeepWorking();
  859. };
  860. int WorkingThreads() const noexcept
  861. {
  862. return m_pScheduler->WorkingThreads();
  863. };
  864. const char* Name() const noexcept
  865. {
  866. return m_szName ? m_szName : m_pScheduler->Name();
  867. }
  868. };
  869. WorkerSharedPtr_t MakeThreadPool ( size_t iThreadCount, const char* szName )
  870. {
  871. return WorkerSharedPtr_t { new ThreadPool_c ( iThreadCount, szName ) };
  872. }
  873. WorkerSharedPtr_t MakeAloneThread ( size_t iOrderNum, const char* szName )
  874. {
  875. return WorkerSharedPtr_t { new AloneThread_c ( (int)iOrderNum, szName ) };
  876. }
  877. // Alone scheduler works on top of another scheduler and provides sequental execution of the tasks (each time only one
  878. // task may be performed, no concurrent execution). It also gives FIFO ordering of the tasks.
  879. SchedulerSharedPtr_t MakeAloneScheduler ( Scheduler_i* pBase, const char* szName )
  880. {
  881. return SchedulerSharedPtr_t { new Strand_c ( pBase, szName ) };
  882. }
  883. // wraps raw scheduler into shared-ptr (it will NOT delete the scheduler when destroyed!)
  884. SchedulerSharedPtr_t WrapRawScheduler ( Scheduler_i* pBase, const char* szName )
  885. {
  886. return SchedulerSharedPtr_t { new ShedulerWrapper_c ( pBase, szName ) };
  887. }
  888. } // namespace Threads
  889. namespace {
  890. RwLock_t & g_lShutdownGuard ()
  891. {
  892. static RwLock_t lShutdownGuard;
  893. return lShutdownGuard;
  894. }
  895. OpSchedule_t & g_dShutdownList ()
  896. {
  897. static OpSchedule_t dShutdownList GUARDED_BY ( g_lShutdownGuard () );
  898. return dShutdownList;
  899. }
  900. OpSchedule_t & g_dOnForkList ()
  901. {
  902. static OpSchedule_t dOnForkList GUARDED_BY ( g_lShutdownGuard () );
  903. return dOnForkList;
  904. }
  905. }
  906. void searchd::AddShutdownCb ( Handler fnCb )
  907. {
  908. auto pCb = Threads::details::Handler2Op ( std::move ( fnCb ) );
  909. ScWL_t tGuard ( g_lShutdownGuard() );
  910. g_dShutdownList().Push_front( pCb );
  911. }
  912. void searchd::AddOnForkCleanupCb ( Threads::Handler fnCb )
  913. {
  914. auto pCb = Threads::details::Handler2Op ( std::move ( fnCb ) );
  915. ScWL_t tGuard ( g_lShutdownGuard () );
  916. g_dOnForkList ().Push_front ( pCb );
  917. }
  918. // invoke shutdown handlers
  919. void searchd::FireShutdownCbs ()
  920. {
  921. ScRL_t tGuard ( g_lShutdownGuard() );
  922. while ( !g_dShutdownList().Empty () )
  923. {
  924. auto * pOp = g_dShutdownList().Front ();
  925. g_dShutdownList().Pop ();
  926. pOp->Complete ( pOp );
  927. }
  928. }
  929. void searchd::CleanAfterFork () NO_THREAD_SAFETY_ANALYSIS
  930. {
  931. while ( !g_dOnForkList ().Empty () )
  932. {
  933. auto * pOp = g_dOnForkList ().Front ();
  934. g_dOnForkList ().Pop ();
  935. pOp->Complete ( pOp );
  936. }
  937. while ( !g_dShutdownList().Empty () )
  938. {
  939. auto * pOp = g_dShutdownList().Front ();
  940. g_dShutdownList().Pop ();
  941. pOp->Destroy();
  942. }
  943. }
  944. static int g_iMaxChildrenThreads = 1;
  945. namespace {
  946. static WorkerSharedPtr_t pGlobalPool;
  947. WorkerSharedPtr_t& GlobalPoolSingletone ()
  948. {
  949. return pGlobalPool;
  950. }
  951. }
  952. void StartGlobalWorkPool ()
  953. {
  954. sphLogDebug ( "StartGlobalWorkpool" );
  955. WorkerSharedPtr_t& pPool = GlobalPoolSingletone ();
  956. #if !_WIN32
  957. if ( !pPool )
  958. #endif
  959. pPool = new ThreadPool_c ( g_iMaxChildrenThreads, "work" );
  960. }
  961. void StopGlobalWorkPool()
  962. {
  963. sphLogDebug ( "StopGlobalWorkPool" );
  964. WorkerSharedPtr_t& pPool = GlobalPoolSingletone();
  965. if ( pPool )
  966. pPool->StopAll();
  967. }
  968. void SetMaxChildrenThreads ( int iThreads )
  969. {
  970. sphLogDebug ( "SetMaxChildrenThreads to %d", iThreads );
  971. g_iMaxChildrenThreads = Max ( 1, iThreads );
  972. }
  973. int MaxChildrenThreads() noexcept
  974. {
  975. return g_iMaxChildrenThreads;
  976. }
  977. Worker_i * GlobalWorkPool ()
  978. {
  979. WorkerSharedPtr_t& pPool = GlobalPoolSingletone ();
  980. assert ( pPool && "invoke StartGlobalWorkPool first");
  981. return pPool;
  982. }
  983. void WipeGlobalSchedulerOnShutdownAndFork ()
  984. {
  985. #ifndef NDEBUG
  986. static bool bAlreadyInvoked = false;
  987. assert (!bAlreadyInvoked);
  988. bAlreadyInvoked = true;
  989. #endif
  990. Threads::RegisterIterator ( [] ( ThreadFN & fnHandler ) {
  991. WorkerSharedPtr_t& pPool = GlobalPoolSingletone ();
  992. if ( pPool )
  993. pPool->IterateChildren ( fnHandler );
  994. } );
  995. searchd::AddOnForkCleanupCb ( [] {
  996. WorkerSharedPtr_t& pPool = GlobalPoolSingletone ();
  997. if ( pPool )
  998. pPool->DiscardOnFork ();
  999. } );
  1000. // searchd::AddShutdownCb ( [] {
  1001. // sphWarning ( "stop all pool threads" );
  1002. // WorkerSharedPtr_t& pPool = GlobalPoolSingletone ();
  1003. // if ( pPool )
  1004. // pPool->StopAll ();
  1005. // } );
  1006. }
  1007. void WipeSchedulerOnFork ( Threads::Worker_i* pWorker )
  1008. {
  1009. Threads::RegisterIterator ( [pWorker] ( ThreadFN& fnHandler ) {
  1010. if ( pWorker )
  1011. pWorker->IterateChildren ( fnHandler );
  1012. } );
  1013. searchd::AddOnForkCleanupCb ( [pWorker] {
  1014. if ( pWorker )
  1015. pWorker->DiscardOnFork();
  1016. } );
  1017. }
  1018. namespace {
  1019. static std::atomic<int> g_iRunningThreads {0};
  1020. }
  1021. int Threads::GetNumOfRunning()
  1022. {
  1023. return g_iRunningThreads.load ( std::memory_order_relaxed );
  1024. }
  1025. //////////////////////////////////////////////////////////////////////////
  1026. /// helpers to iterate over all registered threads
  1027. class OperationsQueue_c::Impl_c
  1028. {
  1029. CSphMutex m_tQueueGuard;
  1030. OpSchedule_t m_tQueue GUARDED_BY ( m_tQueueGuard );
  1031. public:
  1032. void AddOp ( Handler fnCb )
  1033. {
  1034. auto pCb = Threads::details::Handler2Op ( std::move ( fnCb ) );
  1035. ScopedMutex_t tGuard ( m_tQueueGuard );
  1036. m_tQueue.Push_front ( pCb );
  1037. }
  1038. void RunAll ()
  1039. {
  1040. OpSchedule_t tQueue;
  1041. {
  1042. ScopedMutex_t tGuard ( m_tQueueGuard );
  1043. if ( m_tQueue.Empty() )
  1044. return;
  1045. tQueue.Push ( m_tQueue );
  1046. }
  1047. while ( !tQueue.Empty() )
  1048. {
  1049. auto* pOp = tQueue.Front();
  1050. tQueue.Pop();
  1051. pOp->Complete ( pOp );
  1052. }
  1053. }
  1054. bool IsEmpty() const NO_THREAD_SAFETY_ANALYSIS
  1055. {
  1056. return m_tQueue.Empty();
  1057. }
  1058. ~Impl_c()
  1059. {
  1060. while ( !m_tQueue.Empty () )
  1061. {
  1062. auto * pOp = m_tQueue.Front ();
  1063. m_tQueue.Pop ();
  1064. pOp->Destroy ();
  1065. }
  1066. }
  1067. };
  1068. OperationsQueue_c::OperationsQueue_c()
  1069. : m_pImpl ( new Impl_c )
  1070. {}
  1071. OperationsQueue_c::~OperationsQueue_c()
  1072. {
  1073. SafeDelete ( m_pImpl );
  1074. }
  1075. void OperationsQueue_c::AddOp (Handler fnOp)
  1076. {
  1077. assert ( m_pImpl );
  1078. m_pImpl->AddOp(std::move(fnOp));
  1079. }
  1080. void OperationsQueue_c::RunAll()
  1081. {
  1082. assert ( m_pImpl );
  1083. m_pImpl->RunAll();
  1084. }
  1085. bool OperationsQueue_c::IsEmpty() const
  1086. {
  1087. assert ( m_pImpl );
  1088. return m_pImpl->IsEmpty();
  1089. }
  1090. namespace { // static
  1091. class IterationHandler_c : public Threads::details::SchedulerOperation_t
  1092. {
  1093. ThreadIteratorFN m_Handler;
  1094. public:
  1095. explicit IterationHandler_c ( ThreadIteratorFN h )
  1096. : SchedulerOperation_t ( &IterationHandler_c::DoComplete )
  1097. , m_Handler ( std::move ( h ) )
  1098. {}
  1099. static void DoComplete ( void * pOwner, SchedulerOperation_t * pBase )
  1100. {
  1101. auto * pHandler = (IterationHandler_c *) pBase;
  1102. if ( pOwner )
  1103. pHandler->m_Handler ( *(ThreadFN *) pOwner );
  1104. else
  1105. delete pHandler;
  1106. }
  1107. };
  1108. struct IteratorsQueue_t
  1109. {
  1110. RwLock_t m_tQueueGuard;
  1111. OpSchedule_t m_tQueue GUARDED_BY ( m_tQueueGuard );
  1112. void RegisterIterator ( ThreadIteratorFN fnIterator )
  1113. {
  1114. auto pCb = ( new IterationHandler_c ( std::move ( fnIterator ) ) );
  1115. ScWL_t tGuard ( m_tQueueGuard );
  1116. m_tQueue.Push_front ( pCb );
  1117. }
  1118. // iterate over all (pooled and alone) threads.
  1119. // over pooled we're not using locks, since pool is living 'as whole', so no lock accessing individual elem need.
  1120. // iteration func, however, must check if param is nullptr.
  1121. // note, non-iteratable threads can't use hazard pointers (just nobody knows they're hold something).
  1122. void IterateActive ( ThreadFN fnHandler )
  1123. {
  1124. ScRL_t tGuard ( m_tQueueGuard );
  1125. for ( auto & dOp : m_tQueue )
  1126. dOp.Complete ( &fnHandler );
  1127. }
  1128. ~IteratorsQueue_t()
  1129. {
  1130. ScWL_t tGuard ( m_tQueueGuard );
  1131. while ( !m_tQueue.Empty () )
  1132. {
  1133. auto * pOp = m_tQueue.Front ();
  1134. m_tQueue.Pop ();
  1135. pOp->Destroy();
  1136. }
  1137. }
  1138. };
  1139. IteratorsQueue_t g_dIteratorsList;
  1140. }
  1141. void Threads::RegisterIterator ( ThreadIteratorFN fnIterator )
  1142. {
  1143. g_dIteratorsList.RegisterIterator ( std::move ( fnIterator ) );
  1144. }
  1145. void Threads::IterateActive ( ThreadFN fnHandler )
  1146. {
  1147. g_dIteratorsList.IterateActive ( std::move ( fnHandler ) );
  1148. }
  1149. Threads::Scheduler_i * MakeSingleThreadExecutor ( int iMaxThreads, const char * szName )
  1150. {
  1151. if ( iMaxThreads>0 && Threads::AloneThread_c::GetRunners ()>=iMaxThreads )
  1152. return nullptr;
  1153. static int iOrder = 0;
  1154. return new Threads::AloneThread_c ( iOrder++, szName? szName: "alone" );
  1155. }
  1156. #if !_WIN32
  1157. void * Threads::Init ( bool bDetached )
  1158. #else
  1159. void * Threads::Init ( bool )
  1160. #endif
  1161. {
  1162. static bool bInit = false;
  1163. #if !_WIN32
  1164. static pthread_attr_t tJoinableAttr;
  1165. static pthread_attr_t tDetachedAttr;
  1166. #endif
  1167. if ( !bInit )
  1168. {
  1169. #if SPH_DEBUG_LEAKS || SPH_ALLOCS_PROFILER
  1170. sphMemStatInit();
  1171. #endif
  1172. #if !_WIN32
  1173. if ( pthread_attr_init ( &tJoinableAttr ) )
  1174. sphDie ( "FATAL: pthread_attr_init( joinable ) failed" );
  1175. if ( pthread_attr_init ( &tDetachedAttr ) )
  1176. sphDie ( "FATAL: pthread_attr_init( detached ) failed" );
  1177. if ( pthread_attr_setdetachstate ( &tDetachedAttr, PTHREAD_CREATE_DETACHED ) )
  1178. sphDie ( "FATAL: pthread_attr_setdetachstate( detached ) failed" );
  1179. #endif
  1180. bInit = true;
  1181. }
  1182. #if !_WIN32
  1183. if ( pthread_attr_setstacksize ( &tJoinableAttr, STACK_SIZE ) )
  1184. sphDie ( "FATAL: pthread_attr_setstacksize( joinable ) failed" );
  1185. if ( pthread_attr_setstacksize ( &tDetachedAttr, STACK_SIZE ) )
  1186. sphDie ( "FATAL: pthread_attr_setstacksize( detached ) failed" );
  1187. return bDetached ? &tDetachedAttr : &tJoinableAttr;
  1188. #else
  1189. return NULL;
  1190. #endif
  1191. }
  1192. #if SPH_DEBUG_LEAKS || SPH_ALLOCS_PROFILER
  1193. void Threads::Done ( int iFD )
  1194. {
  1195. sphMemStatDump ( iFD );
  1196. sphMemStatDone();
  1197. }
  1198. #else
  1199. void Threads::Done ( int )
  1200. {
  1201. }
  1202. #endif
  1203. /// get name of a thread
  1204. CSphString Threads::GetName ( const SphThread_t * pThread )
  1205. {
  1206. if ( !pThread || !*pThread )
  1207. return "";
  1208. #if HAVE_PTHREAD_GETNAME_NP
  1209. char sClippedName[16];
  1210. pthread_getname_np ( *pThread, sClippedName, 16 );
  1211. return sClippedName;
  1212. #else
  1213. return "";
  1214. #endif
  1215. }
  1216. /// my join thread wrapper
  1217. bool Threads::Join ( SphThread_t * pThread )
  1218. {
  1219. #if _WIN32
  1220. DWORD uWait = WaitForSingleObject ( *pThread, INFINITE );
  1221. CloseHandle ( *pThread );
  1222. *pThread = NULL;
  1223. return ( uWait==WAIT_OBJECT_0 || uWait==WAIT_ABANDONED );
  1224. #else
  1225. return pthread_join ( *pThread, nullptr )==0;
  1226. #endif
  1227. }
  1228. /// my own thread
  1229. SphThread_t Threads::Self ()
  1230. {
  1231. #if _WIN32
  1232. return GetCurrentThread();
  1233. #else
  1234. return pthread_self ();
  1235. #endif
  1236. }
  1237. /// compares two thread ids
  1238. bool Threads::Same ( const LowThreadDesc_t * pFirst, const LowThreadDesc_t * pSecond )
  1239. {
  1240. if ( !pFirst && !pSecond )
  1241. return true;
  1242. if ( !pFirst || !pSecond )
  1243. return false;
  1244. #if _WIN32
  1245. // can not use m_tThread on Windows as GetCurrentThread returns -2 and that handle valid only inside thread itself
  1246. return ( pFirst->m_iThreadID==pSecond->m_iThreadID );
  1247. #else
  1248. return pthread_equal ( pFirst->m_tThread, pSecond->m_tThread )!=0;
  1249. #endif
  1250. }
  1251. struct RuntimeThreadContext_t : ISphNoncopyable
  1252. {
  1253. LowThreadDesc_t m_tDesc;
  1254. const void * m_pMyThreadStack = nullptr;
  1255. Handler m_fnRun;
  1256. #if USE_GPROF
  1257. pthread_mutex_t m_dlock;
  1258. pthread_cond_t m_dwait;
  1259. itimerval m_ditimer;
  1260. #endif
  1261. #if SPH_ALLOCS_PROFILER
  1262. void * m_pTLS = nullptr;
  1263. #endif
  1264. // main thread execution func
  1265. void Run ( const void * pStack );
  1266. // prepare everything to make *this most robust
  1267. void Prepare ( const void * pStack );
  1268. // save name stored in desc as OS thread name
  1269. void PropagateName ();
  1270. };
  1271. namespace {
  1272. RuntimeThreadContext_t tStubForMain;
  1273. thread_local RuntimeThreadContext_t* g_pLocalThread = &tStubForMain;
  1274. }
  1275. // to be used globally from thread env
  1276. RuntimeThreadContext_t& MyThreadContext()
  1277. {
  1278. return *g_pLocalThread;
  1279. }
  1280. LowThreadDesc_t& Threads::MyThd () noexcept
  1281. {
  1282. return g_pLocalThread->m_tDesc;
  1283. }
  1284. void Threads::SetSysThreadName ()
  1285. {
  1286. g_pLocalThread->PropagateName ();
  1287. }
  1288. void Threads::JobStarted ()
  1289. {
  1290. auto& tDesc = Threads::MyThd ();
  1291. tDesc.m_tmLastJobDoneTimeUS = -1;
  1292. tDesc.m_tmLastJobStartTimeUS = sphMicroTimer ();
  1293. tDesc.m_tmLastJobStartCPUTimeUS = sphThreadCpuTimer ();
  1294. }
  1295. void Threads::JobFinished ( bool bIsDone )
  1296. {
  1297. auto & tDesc = Threads::MyThd ();
  1298. tDesc.m_tmLastJobDoneTimeUS = sphMicroTimer ();
  1299. if ( bIsDone )
  1300. ++tDesc.m_iTotalJobsDone;
  1301. tDesc.m_tmTotalWorkedTimeUS += tDesc.m_tmLastJobDoneTimeUS-tDesc.m_tmLastJobStartTimeUS;
  1302. tDesc.m_tmTotalWorkedCPUTimeUS += sphThreadCpuTimer()-tDesc.m_tmLastJobStartCPUTimeUS;
  1303. }
  1304. const void * Threads::TopOfStack ()
  1305. {
  1306. return MyThreadContext().m_pMyThreadStack;
  1307. }
  1308. void Threads::SetTopStack ( const void * pNewStack )
  1309. {
  1310. MyThreadContext ().m_pMyThreadStack = pNewStack;
  1311. }
  1312. namespace {
  1313. int& MaxCoroStackSize()
  1314. {
  1315. static int iMaxCoroStackSize = 1024 * 1024;
  1316. return iMaxCoroStackSize;
  1317. }
  1318. }
  1319. void Threads::SetMaxCoroStackSize ( int iStackSize )
  1320. {
  1321. MaxCoroStackSize() = iStackSize;
  1322. }
  1323. int Threads::GetMaxCoroStackSize()
  1324. {
  1325. return MaxCoroStackSize();
  1326. }
  1327. void Threads::PrepareMainThread ( const void * PStack )
  1328. {
  1329. MyThreadContext ().Prepare ( PStack );
  1330. }
  1331. void RuntimeThreadContext_t::PropagateName ()
  1332. {
  1333. // set name of self
  1334. #if HAVE_PTHREAD_SETNAME_NP
  1335. if ( !m_tDesc.m_sThreadName.IsEmpty() )
  1336. {
  1337. auto sSafeName = m_tDesc.m_sThreadName.SubString ( 0, 15 );
  1338. assert ( sSafeName.cstr ()!=nullptr );
  1339. #if HAVE_PTHREAD_SETNAME_NP_1ARG
  1340. pthread_setname_np ( sSafeName.cstr() );
  1341. #else
  1342. pthread_setname_np ( m_tDesc.m_tThread, sSafeName.cstr() );
  1343. #endif
  1344. }
  1345. #endif
  1346. }
  1347. void RuntimeThreadContext_t::Prepare ( const void * pStack )
  1348. {
  1349. m_pMyThreadStack = pStack;
  1350. m_tDesc.m_iThreadID = GetOsThreadId ();
  1351. m_tDesc.m_tmStart = sphMicroTimer();
  1352. m_tDesc.m_pTaskInfo.store ( nullptr, std::memory_order_release );
  1353. m_tDesc.m_pHazards.store ( nullptr, std::memory_order_release );
  1354. m_tDesc.m_tThread = Threads::Self ();
  1355. #if USE_GPROF
  1356. // Set the profile timer value
  1357. setitimer ( ITIMER_PROF, &m_ditimer, NULL );
  1358. // Tell the calling thread that we don't need its data anymore
  1359. pthread_mutex_lock ( &m_dlock );
  1360. pthread_cond_signal ( &m_dwait );
  1361. pthread_mutex_unlock ( &m_dlock );
  1362. #endif
  1363. PropagateName ();
  1364. }
  1365. void RuntimeThreadContext_t::Run ( const void * pStack )
  1366. {
  1367. g_pLocalThread = this;
  1368. Prepare ( pStack );
  1369. #if SPH_ALLOCS_PROFILER
  1370. m_pTLS = sphMemStatThdInit();
  1371. #endif
  1372. g_iRunningThreads.fetch_add ( 1, std::memory_order_acq_rel );
  1373. LOG( DEBUG, MT ) << "thread created";
  1374. m_fnRun();
  1375. LOG( DEBUG, MT ) << "thread ended";
  1376. g_iRunningThreads.fetch_sub ( 1, std::memory_order_acq_rel );
  1377. #if SPH_ALLOCS_PROFILER
  1378. sphMemStatThdCleanup ( m_pTLS );
  1379. #endif
  1380. }
  1381. #if _WIN32
  1382. DWORD __stdcall ThreadProcWrapper_fn ( void * pArg )
  1383. #else
  1384. void * ThreadProcWrapper_fn ( void * pArg )
  1385. #endif
  1386. {
  1387. // This is the first local variable in the new thread. So, its address is the top of the stack.
  1388. // We need to know thread stack size for both expression and query evaluating engines.
  1389. // We store expressions as a linked tree of structs and execution is a calls of mutually
  1390. // recursive methods. Before executing we compute tree height and multiply it by a constant
  1391. // with experimentally measured value to check whether we have enough stack to execute current query.
  1392. // The check is not ideal and do not work for all compilers and compiler settings.
  1393. char cTopOfMyStack;
  1394. std::unique_ptr<RuntimeThreadContext_t> pCtx { (RuntimeThreadContext_t *) pArg };
  1395. pCtx->Run ( &cTopOfMyStack );
  1396. return 0;
  1397. }
  1398. bool Threads::Create ( SphThread_t * pThread, Handler fnRun, bool bDetached, const char * sName, int iNum )
  1399. {
  1400. // we can not put this on current stack because wrapper need to see
  1401. // it all the time and it will destroy this data from heap by itself
  1402. auto pCtx = std::make_unique<RuntimeThreadContext_t>();
  1403. pCtx->m_fnRun = std::move ( fnRun );
  1404. if ( sName )
  1405. {
  1406. if ( iNum<0 )
  1407. pCtx->m_tDesc.m_sThreadName = sName;
  1408. else
  1409. pCtx->m_tDesc.m_sThreadName.SetSprintf ( "%s_%d", sName, iNum );
  1410. }
  1411. // create thread
  1412. #if _WIN32
  1413. Threads::Init ( bDetached );
  1414. *pThread = CreateThread ( NULL, STACK_SIZE, ThreadProcWrapper_fn, pCtx.get(), 0, NULL );
  1415. if ( *pThread )
  1416. {
  1417. pCtx.release();
  1418. return true;
  1419. }
  1420. #else
  1421. #if USE_GPROF
  1422. getitimer ( ITIMER_PROF, &pCtx->m_ditimer );
  1423. pthread_cond_init ( &pCtx->m_dwait, NULL );
  1424. pthread_mutex_init ( &pCtx->m_dlock, NULL );
  1425. pthread_mutex_lock ( &pCtx->m_dlock );
  1426. #endif
  1427. void * pAttr = Threads::Init ( bDetached );
  1428. errno = pthread_create ( pThread, (pthread_attr_t*) pAttr, ThreadProcWrapper_fn, pCtx.get() );
  1429. #if USE_GPROF
  1430. if ( !errno )
  1431. pthread_cond_wait ( &pCtx->m_dwait, &pCtx->m_dlock );
  1432. pthread_mutex_unlock ( &pCtx->m_dlock );
  1433. pthread_mutex_destroy ( &pCtx->m_dlock );
  1434. pthread_cond_destroy ( &pCtx->m_dwait );
  1435. #endif
  1436. if ( !errno )
  1437. {
  1438. pCtx.release();
  1439. return true;
  1440. }
  1441. #endif // _WIN32
  1442. // thread creation failed so we need to cleanup ourselves
  1443. return false;
  1444. }
  1445. // Thread with crash query
  1446. namespace { // static func
  1447. thread_local CrashQuery_t* pTlsCrashQuery = nullptr;
  1448. CrashQuery_t** g_ppTlsCrashQuery ()
  1449. {
  1450. return &pTlsCrashQuery;
  1451. }
  1452. void GlobalSetTopQueryTLS ( CrashQuery_t * pQuery )
  1453. {
  1454. *g_ppTlsCrashQuery() = pQuery;
  1455. }
  1456. void GlobalCrashQuerySet ( const CrashQuery_t & tQuery )
  1457. {
  1458. CrashQuery_t * pQuery = *g_ppTlsCrashQuery();
  1459. assert ( pQuery );
  1460. *pQuery = tQuery;
  1461. }
  1462. }
  1463. static CrashQuery_t g_tUnhandled;
  1464. CrashQuery_t & GlobalCrashQueryGetRef ()
  1465. {
  1466. CrashQuery_t * pQuery = *g_ppTlsCrashQuery ();
  1467. // in case TLS not set \ found handler still should process crash
  1468. if ( pQuery )
  1469. return *pQuery;
  1470. sphWarning ("GlobalCrashQueryGetRef: thread-local info is not set! Use ad-hoc");
  1471. return g_tUnhandled;
  1472. }
  1473. CrashQueryKeeper_c::CrashQueryKeeper_c ()
  1474. : m_tReference ( GlobalCrashQueryGetRef() )
  1475. {}
  1476. CrashQueryKeeper_c::~CrashQueryKeeper_c ()
  1477. {
  1478. RestoreCrashQuery();
  1479. }
  1480. void CrashQueryKeeper_c::RestoreCrashQuery () const
  1481. {
  1482. GlobalCrashQuerySet ( m_tReference );
  1483. }
  1484. namespace
  1485. {
  1486. constexpr char dWeekdays[7][4] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" };
  1487. constexpr char dMonths[12][4] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
  1488. }
  1489. int sphFormatTime ( int64_t iNow, char * sTimeBuf, int iBufLen )
  1490. {
  1491. time_t ts = (time_t)( iNow / 1000000 ); // on some systems (eg. FreeBSD 6.2), tv.tv_sec has another type and we can't just pass it
  1492. cctz::civil_second tCS = ConvertTimeLocal(ts);
  1493. return snprintf ( sTimeBuf, iBufLen, "%.3s %.3s%3d %.2d:%.2d:%.2d.%.3d %d", dWeekdays[GetWeekDay ( tCS, true )-1], dMonths[tCS.month()-1], tCS.day(), tCS.hour(), tCS.minute(), tCS.second(), (int)( ( iNow % 1000000 ) / 1000 ), (int)tCS.year() );
  1494. }
  1495. void sphFormatTime ( int64_t iNow, StringBuilder_c & sOut )
  1496. {
  1497. time_t ts = (time_t)( iNow / 1000000 ); // on some systems (eg. FreeBSD 6.2), tv.tv_sec has another type, and we can't just pass it
  1498. cctz::civil_second tCS = ConvertTimeLocal(ts);
  1499. sOut << dWeekdays[GetWeekDay ( tCS, true )-1]
  1500. << ' ' << dMonths[tCS.month()-1]
  1501. << ' ' << Digits<2>(tCS.day())
  1502. << ' ' << Digits<2>(tCS.hour()) << ':' << Digits<2>(tCS.minute()) << ':' << Digits<2>(tCS.second()) << '.' << FixedNum<10,3,0,'0'>( ( iNow % 1000000 ) / 1000 )
  1503. << ' ' << tCS.year();
  1504. }
  1505. /// format current timestamp (for logging, or whatever)
  1506. int sphFormatCurrentTime ( char* sTimeBuf, int iBufLen )
  1507. {
  1508. return sphFormatTime ( sphMicroTimer (), sTimeBuf, iBufLen );
  1509. }
  1510. void sphFormatCurrentTime ( StringBuilder_c& sOut )
  1511. {
  1512. return sphFormatTime ( sphMicroTimer (), sOut );
  1513. }
  1514. CSphString sphCurrentUtcTime()
  1515. {
  1516. int64_t iNow = sphMicroTimer();
  1517. time_t ts = (time_t)( iNow / 1000000 ); // on some systems (eg. FreeBSD 6.2), tv.tv_sec has another type and we can't just pass it
  1518. cctz::civil_second tCS = ConvertTimeUTC(ts);
  1519. StringBuilder_c tOut;
  1520. tOut << tCS.year()
  1521. << '-' << Digits<2>(tCS.month())
  1522. << '-' << Digits<2>(tCS.day())
  1523. << 'T' << Digits<2>(tCS.hour()) << ':' << Digits<2>(tCS.minute()) << ':' << Digits<2>(tCS.second())
  1524. << '.' << FixedNum<10, 3, 0, '0'> ( ( iNow % 1000000 ) / 1000 );
  1525. // tOut.Sprintf ( "%.4d-%.2d-%.2dT%.2d:%.2d:%.2d.%.3d", // YYYY-MM-DDThh:mm:ss[.SSS]
  1526. // 1900 + tmp.tm_year,
  1527. // tmp.tm_mon + 1,
  1528. // tmp.tm_mday,
  1529. // tmp.tm_hour,
  1530. // tmp.tm_min,
  1531. // tmp.tm_sec,
  1532. // (int)( ( iNow % 1000000 ) / 1000 ) );
  1533. CSphString sRes;
  1534. tOut.MoveTo ( sRes );
  1535. return sRes;
  1536. }
  1537. // create thread for query - it will have set CrashQuery to valid obj inside, alive during whole thread's live time.
  1538. bool Threads::CreateQ ( SphThread_t * pThread, Handler fnRun, bool bDetached, const char * sName, int iNum )
  1539. {
  1540. return Create ( pThread, [fnCrashRun = std::move ( fnRun )]
  1541. {
  1542. CrashQuery_t tQueryTLS;
  1543. GlobalSetTopQueryTLS ( &tQueryTLS );
  1544. LOG( DEBUG, MT ) << "thread created";
  1545. fnCrashRun();
  1546. LOG( DEBUG, MT ) << "thread ended";
  1547. }, bDetached, sName, iNum );
  1548. }
  1549. // capture crash query and set it before running fnHandler.
  1550. Threads::Handler Threads::WithCopiedCrashQuery ( Threads::Handler fnHandler )
  1551. {
  1552. CrashQuery_t tParentCrashQuery = GlobalCrashQueryGetRef ();
  1553. return [tCrashQuery = tParentCrashQuery, fnHandler = std::move ( fnHandler )] {
  1554. // CrashQueryKeeper_c _; // restore previous crash query on exit. Seems, that is not necessary
  1555. GlobalCrashQuerySet ( tCrashQuery );
  1556. fnHandler ();
  1557. };
  1558. }