| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853 |
- //
- // Copyright (c) 2017-2026, Manticore Software LTD (https://manticoresearch.com)
- // Copyright (c) 2001-2016, Andrew Aksyonoff
- // Copyright (c) 2008-2016, Sphinx Technologies Inc
- // All rights reserved
- //
- // This program is free software; you can redistribute it and/or modify
- // it under the terms of the GNU General Public License. You should have
- // received a copy of the GPL license along with this program; if you
- // did not, you can find it at http://www.gnu.org/
- //
- #include "threadutils.h"
- #include <boost/context/detail/prefetch.hpp>
- #include <optional>
- #include "datetime.h"
- #if !_WIN32
- // UNIX-specific headers and calls
- #include <sys/syscall.h>
- #include <signal.h>
- #endif
- // for thr_self()
- #ifdef __FreeBSD__
- #include <sys/thr.h>
- #endif
- using namespace Threads;
- const char* TaskStateName ( TaskState_e eState )
- {
- switch (eState)
- {
- case TaskState_e::UNKNOWN: return "-";
- case TaskState_e::HANDSHAKE: return "handshake";
- case TaskState_e::NET_READ: return "net_read";
- case TaskState_e::NET_WRITE: return "net_write";
- case TaskState_e::QUERY: return "query";
- case TaskState_e::NET_IDLE: return "net_idle";
- case TaskState_e::RETIRED: return "- retired";
- }
- return "unknown";
- }
- const char* ProtoName ( Proto_e eProto )
- {
- switch ( eProto )
- {
- case Proto_e::UNKNOWN: return "-";
- case Proto_e::SPHINX:
- case Proto_e::SPHINXSE: return "sphinx";
- case Proto_e::MYSQL41: return "mysql";
- case Proto_e::HTTP: return "http";
- case Proto_e::HTTPS: return "https";
- case Proto_e::REPLICATION: return "replication";
- default: break;
- }
- return "unknown";
- }
- const char * RelaxedProtoName ( Proto_e eProto )
- {
- switch ( eProto )
- {
- case Proto_e::UNKNOWN: return "-";
- case Proto_e::MYSQL41: return "mysql";
- case Proto_e::REPLICATION: return "replication";
- case Proto_e::SPHINX:
- case Proto_e::HTTP: return "sphinx and http(s)";
- case Proto_e::HTTPS: return "https";
- case Proto_e::SPHINXSE: return "sphinx (to connect from SphinxSE)";
- default: break;
- }
- return "unknown";
- }
- int GetOsThreadId ()
- {
- #if _WIN32
- return GetCurrentThreadId();
- #elif defined ( __APPLE__ )
- uint64_t tid;
- pthread_threadid_np(NULL, &tid);
- return tid;
- #elif defined(SYS_gettid)
- return syscall ( SYS_gettid );
- #elif defined(__FreeBSD__)
- long tid;
- thr_self(&tid);
- return (int)tid;
- #else
- return 0;
- #endif
- }
- int GetOsProcessId()
- {
- #if _WIN32
- return GetCurrentProcessId();
- #else
- return getpid();
- #endif
- }
- #include "event.h"
- #include <atomic>
- //////////////////////////////////////////////////////////////////////////
- /// functional threadpool with minimum footprint
- #define LOG_LEVEL_DEBUG false
- #define LOG_LEVEL_DETAIL false
- #define LOG_LEVEL_ALONE LOG_LEVEL_DETAIL
- //#define LOG_LEVEL_ALONE true
- namespace Threads {
- namespace logdetail {
- const char* name() { return MyThd ().m_sThreadName.cstr(); }
- }
- #define LOG_COMPONENT_MT "(" << GetOsThreadId() << ") " << logdetail::name() << ": "
- using Operation_t = Threads::details::SchedulerOperation_t;
- using OpSchedule_t = Threads::details::OpQueue_T<Operation_t>;
- struct TaskServiceThreadInfo_t
- {
- OpSchedule_t m_dPrivateQueue;
- long m_iPrivateOutstandingWork = 0;
- };
- class TaskService_t
- {
- public:
- using operation = Operation_t;
- };
- // Helper class to determine whether or not the current thread is inside an
- // invocation of Service_t::run() for a specified service object.
- // That may be used to optimize codeflow (like place continuation without locks)
- template<typename Key, typename Value = BYTE>
- class CallStack_c
- {
- public:
- // Context class automatically pushes the key/value pair on to the stack.
- class Context_c : public ISphNoncopyable
- {
- Key * m_pService; // The key associated with the context.
- Value * m_pThisContext; // The value associated with the context.
- Context_c * m_pNext; // The next element in the stack.
- friend class CallStack_c<Key, Value>;
- public:
- // Push the key on to the stack.
- explicit Context_c ( Key * pService )
- : m_pService ( pService ),
- m_pNext ( CallStack_c<Key, Value>::m_pTop )
- {
- m_pThisContext = (Value *)this;
- CallStack_c<Key, Value>::m_pTop = this;
- }
- // Push the key/value pair on to the stack.
- Context_c ( Key * pKey, Value & v )
- : m_pService ( pKey ),
- m_pThisContext ( &v ),
- m_pNext ( CallStack_c<Key, Value>::m_pTop )
- {
- CallStack_c<Key, Value>::m_pTop = this;
- }
- // Pop the key/value pair from the stack.
- ~Context_c ()
- {
- CallStack_c<Key, Value>::m_pTop = m_pNext;
- }
- // Find the next context with the same key.
- Value * NextByKey () const noexcept
- {
- for ( auto* pElem = m_pNext; pElem!=nullptr; pElem = pElem->m_pNext )
- if ( pElem->m_pService==m_pService )
- return pElem->m_pThisContext;
- return nullptr;
- }
- };
- friend class Context_c;
- // Determine whether the specified owner is on the stack.
- // Returns address of key, if present, nullptr otherwise.
- static Value * Contains ( Key * pKey ) noexcept
- {
- for ( auto* pElem = m_pTop; pElem!=nullptr; pElem = pElem->m_pNext )
- if ( pElem->m_pService==pKey )
- return pElem->m_pThisContext;
- return nullptr;
- }
- // Obtain the value at the top of the stack.
- static Value * Top () noexcept
- {
- Context_c * pElem = m_pTop;
- return pElem ? pElem->m_pThisContext : nullptr;
- }
- private:
- // The top of the stack of calls for the current thread.
- static thread_local Context_c* m_pTop;
- };
- template<typename Key, typename Value>
- thread_local typename CallStack_c<Key, Value>::Context_c* CallStack_c<Key,Value>::m_pTop = nullptr;
- //struct Service_i
- //{
- // virtual ~Service_i() {}
- // virtual void run() = 0;
- // virtual void reset() = 0;
- //};
- #define LOG_LEVEL_WORKS false
- #define LOG_LEVEL_ST false
- #define LOG_LEVEL_SERVICE LOG_LEVEL_DETAIL
- //#define LOG_LEVEL_SERVICE true
- #define LOG_COMPONENT_SVC LOG_COMPONENT_MT << " [" << &m_iOutstandingWork << "]=" << m_iOutstandingWork
- /// performs tasks pushed with post() in one or many threads until they done.
- /// Naming convention of members is inherited from boost::asio as drop-in replacement.
- struct Service_t : public TaskService_t//, public Service_i
- {
- std::atomic<long> m_iOutstandingWork {0}; /// count of unfinished works
- mutable CSphMutex m_dMutex; /// protect access to internal data
- bool m_bStopped = false; /// dispatcher has been stopped.
- bool m_bOneThread; /// optimize for single-threaded use case
- sph::Event_c m_tWakeupEvent; /// event to wake up blocked threads
- OpSchedule_t m_OpQueue GUARDED_BY ( m_dMutex ); /// The queue of handlers that are ready to be delivered
- OpSchedule_t m_OpVipQueue GUARDED_BY ( m_dMutex ); /// The queue of handlers that have to be delivered BEFORE OpQueue
- // Per-thread call stack to track the state of each thread in the service.
- using ThreadCallStack_c = CallStack_c<Service_t, TaskServiceThreadInfo_t>;
- class Work_c; /// Scoped RAII work to keep service running. calls work_started/work_finished
- friend class Work_c;
- public:
- explicit Service_t (bool bOneThread)
- : m_bOneThread ( bOneThread ) {}
- inline void post_op ( Service_t::operation* pOp) // post into secondary queue
- {
- post_immediate_completion ( pOp, false );
- }
- inline void defer_op ( Service_t::operation* pOp ) // post into primary queue
- {
- post_immediate_completion ( pOp, true );
- }
- void post_continuation ( Service_t::operation * pOp )
- {
- auto * pThisThread = ThreadCallStack_c::Contains ( this );
- if ( pThisThread )
- {
- ++pThisThread->m_iPrivateOutstandingWork;
- pThisThread->m_dPrivateQueue.Push ( pOp );
- LOG ( SERVICE, SVC ) << "post this";
- return;
- }
- work_started ();
- ScopedMutex_t dLock ( m_dMutex );
- LOG ( SERVICE, SVC ) << "post";
- m_OpVipQueue.Push ( pOp );
- wake_one_thread_and_unlock ( dLock );
- }
- void post_immediate_completion ( Service_t::operation * pOp, bool bVip )
- {
- if ( m_bOneThread )
- {
- auto * pThisThread = ThreadCallStack_c::Contains ( this );
- if ( pThisThread )
- {
- ++pThisThread->m_iPrivateOutstandingWork;
- pThisThread->m_dPrivateQueue.Push ( pOp );
- LOG ( SERVICE, SVC ) << "post this";
- return;
- }
- }
- work_started ();
- ScopedMutex_t dLock ( m_dMutex );
- LOG ( SERVICE, MT ) << "post";
- if ( bVip )
- m_OpVipQueue.Push ( pOp );
- else
- m_OpQueue.Push ( pOp );
- wake_one_thread_and_unlock ( dLock );
- }
- void run ( std::atomic<bool>& bBusy ) NO_THREAD_SAFETY_ANALYSIS //override
- {
- LOG ( SERVICE, SVC ) << "run " << m_iOutstandingWork << " st:" << !!m_bStopped;
- if ( m_iOutstandingWork==0 )
- {
- LOG ( WORKS, MT ) << "run m_iOutstandingWork " << m_iOutstandingWork << " " << &m_iOutstandingWork<< " stop!";
- stop();
- return;
- }
- TaskServiceThreadInfo_t dThisThread;
- dThisThread.m_iPrivateOutstandingWork = 0;
- ThreadCallStack_c::Context_c dCtx ( this, dThisThread );
- ScopedMutex_t dLock ( m_dMutex );
- while ( do_run_one ( dLock, dThisThread, bBusy ) )
- dLock.Lock ();
- }
- bool queue_empty() const REQUIRES ( m_dMutex )
- {
- return m_OpQueue.Empty () && m_OpVipQueue.Empty ();
- }
- inline bool do_run_one ( ScopedMutex_t& dLock, TaskServiceThreadInfo_t& this_thread, std::atomic<bool>& bBusy ) noexcept
- REQUIRES ( dLock ) RELEASE ( dLock ) TRY_ACQUIRE ( false, dLock )
- {
- while ( !m_bStopped )
- {
- LOG ( SERVICE, MT ) << "locked " << dLock.Locked();
- assert ( dLock.Locked ());
- if ( queue_empty() )
- {
- m_tWakeupEvent.Clear ( dLock );
- m_tWakeupEvent.Wait ( dLock );
- continue;
- }
- auto & dOpQueue = m_OpVipQueue.Empty () ? m_OpQueue : m_OpVipQueue;
- auto * pOp = dOpQueue.Front ();
- dOpQueue.Pop ();
- if ( !queue_empty () && !m_bOneThread )
- wake_one_thread_and_unlock ( dLock );
- else
- dLock.Unlock ();
- bBusy.store ( true, std::memory_order_relaxed );
- boost::context::detail::prefetch_range ( pOp, sizeof ( Operation_t ) );
- pOp->Complete (this);
- bBusy.store ( false, std::memory_order_relaxed );
- LOG ( SERVICE, MT ) << "completed & unlocked";
- if ( this_thread.m_iPrivateOutstandingWork>1 )
- {
- m_iOutstandingWork += this_thread.m_iPrivateOutstandingWork-1;
- LOG ( WORKS, MT ) << "do_run_one m_iOutstandingWork " << m_iOutstandingWork << " " << &m_iOutstandingWork;
- }
- else if ( this_thread.m_iPrivateOutstandingWork<1 )
- work_finished ();
- this_thread.m_iPrivateOutstandingWork = 0;
- if ( !this_thread.m_dPrivateQueue.Empty ())
- {
- dLock.Lock ();
- m_OpVipQueue.Push ( this_thread.m_dPrivateQueue );
- }
- return true;
- }
- return false;
- }
- void stop()
- {
- LOG ( SERVICE, SVC ) << "stop";
- ScopedMutex_t dLock ( m_dMutex );
- stop_all_threads ( dLock );
- }
- bool stopped () const
- {
- ScopedMutex_t dLock ( m_dMutex );
- return m_bStopped;
- }
- void reset () //override
- {
- LOG ( DETAIL, MT ) << "reset stopped ";
- ScopedMutex_t dLock ( m_dMutex );
- m_bStopped = false;
- }
- // Notify that some work has started.
- void work_started ()
- {
- LOG ( SERVICE, SVC ) << "work_started from " << m_iOutstandingWork;
- ++m_iOutstandingWork;
- LOG ( WORKS, MT ) << "work_started m_iOutstandingWork " << m_iOutstandingWork << " " << &m_iOutstandingWork;
- }
- // Notify that some work has finished.
- void work_finished ()
- {
- LOG ( SERVICE, SVC ) << "work_finished to " << m_iOutstandingWork-1;
- if ( --m_iOutstandingWork==0 )
- stop ();
- LOG ( WORKS, MT ) << "work_finished m_iOutstandingWork " << m_iOutstandingWork << " " << &m_iOutstandingWork;
- }
- void stop_all_threads ( ScopedMutex_t & dLock ) REQUIRES ( dLock )
- {
- m_bStopped = true;
- m_tWakeupEvent.SignalAll ( dLock );
- }
- void wake_one_thread_and_unlock ( ScopedMutex_t & dLock ) REQUIRES ( dLock ) RELEASE ( dLock )
- {
- if ( !m_tWakeupEvent.MaybeUnlockAndSignalOne ( dLock ))
- dLock.Unlock ();
- }
- long works() const noexcept
- {
- return m_iOutstandingWork;
- }
- NTasks_t tasks() const
- {
- ScopedMutex_t dLock ( m_dMutex );
- return { (int)m_OpVipQueue.GetLength(), (int)m_OpQueue.GetLength() };
- }
- };
- /// helper to hold the service running
- class Service_t::Work_c
- {
- Service_t& m_tServiceRef;
- public:
- explicit Work_c( Service_t& tService )
- : m_tServiceRef (tService)
- {
- m_tServiceRef.work_started ();
- }
- Work_c ( const Work_c& tOther)
- : m_tServiceRef ( tOther.m_tServiceRef )
- {
- m_tServiceRef.work_started ();
- }
- Work_c & operator= ( const Work_c & ) = delete;
- ~Work_c()
- {
- m_tServiceRef.work_finished();
- }
- };
- #define LOG_COMPONENT_TP LOG_COMPONENT_MT << ": "
- #define LOG_COMPONENT_ST LOG_COMPONENT_MT << " strand: "
- // strand - sequental scheduler. Operations executed strictly sequentally and in FIFO order.
- // It looks like 'single thread', but actual thread is provided from backend and may change.
- class Strand_c final : public SchedulerWithBackend_i
- {
- struct StrandWorker_t final : public ISphRefcountedMT
- {
- CSphMutex m_dMutex;
- bool m_bLocked GUARDED_BY ( m_dMutex ) = false;
- OpSchedule_t m_OpWaitQueue GUARDED_BY ( m_dMutex ); /// The queue for the next run
- OpSchedule_t m_OpReadyQueue; /// The queue for current run
- // strand has no backend thread/threadpool and works over another scheduler.
- Scheduler_i* m_pBackend = nullptr;
- inline bool Enqueue ( Threads::details::SchedulerOperation_t* pOp )
- {
- ScopedMutex_t tLock ( m_dMutex );
- if ( m_bLocked )
- {
- m_OpWaitQueue.Push ( pOp );
- LOG ( ST, ST ) << " enqueued to wait queue, was locked " << pOp;
- return false;
- }
- m_bLocked = true;
- tLock.Unlock();
- m_OpReadyQueue.Push ( pOp );
- LOG ( ST, ST ) << " enqueued to ready queue, locked " << pOp;
- return true;
- }
- // try to execute immediately, or then post to primary queue
- void PostContinuationToBackend ( Threads::details::SchedulerOperation_t* pOp ) const
- {
- // Add the function to the strand and schedule the strand if required.
- if ( m_pBackend )
- m_pBackend->ScheduleContinuationOp ( pOp );
- }
- inline Keeper_t KeepWorking() const
- {
- assert ( m_pBackend );
- return m_pBackend->KeepWorking();
- }
- protected:
- ~StrandWorker_t() final = default;
- };
- using StrandWorkerPtr_t = CSphRefcountedPtr<StrandWorker_t>;
- StrandWorkerPtr_t m_pWorker;
- const char* m_szName = nullptr;
- // Per-thread call stack to track the state of each thread in the service.
- using StrandCallStack_c = CallStack_c<StrandWorker_t>;
- class Invoker_c
- {
- StrandWorkerPtr_t m_pOwner;
- Keeper_t m_tParentKeeper;
- public:
- explicit Invoker_c ( StrandWorkerPtr_t pRand );
- Invoker_c ( const Invoker_c& rhs ) = default;
- Invoker_c ( Invoker_c && rhs ) noexcept;
- Invoker_c & operator= ( Invoker_c && rhs ) noexcept;
- void run ();
- };
- friend class Invoker_c;
- inline bool Enqueue ( Threads::details::SchedulerOperation_t* pOp )
- {
- assert ( m_pWorker );
- return m_pWorker->Enqueue ( pOp );
- }
- void PostContinuationImpl ( Threads::details::SchedulerOperation_t* pOp ) // try to execute immediately, or then post to primary queue
- {
- auto bThisThread = !!StrandCallStack_c::Contains ( m_pWorker );
- if ( bThisThread )
- {
- LOG ( ST, ST ) << "PostContinuation fast in this thread";
- Threads::JobTracker_t dTrack;
- // barrier ensures that no operations till here would be reordered below.
- std::atomic_thread_fence ( std::memory_order_acquire );
- pOp->Complete(pOp);
- std::atomic_thread_fence ( std::memory_order_release );
- LOG ( ST, ST ) << "strand PostContinuation performed without queuing";
- return;
- }
- bool bFirst = Enqueue ( pOp );
- // Add the function to the strand and schedule the strand if required.
- if ( bFirst )
- {
- Invoker_c tInvoker { m_pWorker };
- m_pWorker->PostContinuationToBackend ( Threads::details::Handler2Op ( [t = std::move ( tInvoker )]() mutable { t.run(); } ) );
- }
- }
- public:
- explicit Strand_c ( Scheduler_i* pBackend, const char* szName=nullptr )
- : m_pWorker { new StrandWorker_t }
- , m_szName { szName }
- {
- m_pWorker->m_pBackend = pBackend;
- LOGINFO ( TPLIFE, TP ) << "Strand_c created";
- }
- void ScheduleOp ( Threads::details::SchedulerOperation_t* pOp, bool bVip ) noexcept
- {
- LOG ( ST, ST ) << "Post";
- bool bFirst = Enqueue ( pOp );
- if ( bFirst && m_pWorker->m_pBackend )
- {
- LOG ( ST, ST ) << "Post scheduled invoker to backend";
- Invoker_c tInvoker { m_pWorker };
- m_pWorker->m_pBackend->Schedule ( [t=std::move(tInvoker)] () mutable { t.run (); }, bVip );
- }
- LOG ( ST, ST ) << "Post finished";
- }
- void ScheduleContinuationOp ( Threads::details::SchedulerOperation_t* pOp ) noexcept
- {
- LOG ( ST, ST ) << "ScheduleContinuation";
- PostContinuationImpl ( pOp );
- LOG ( ST, ST ) << "Post finished";
- }
- Keeper_t KeepWorking () noexcept
- {
- assert ( m_pWorker );
- return m_pWorker->KeepWorking();
- }
- bool SetBackend ( Scheduler_i* pBackend ) noexcept
- {
- assert ( m_pWorker );
- ScopedMutex_t tLock ( m_pWorker->m_dMutex );
- if ( m_pWorker->m_bLocked )
- {
- if ( m_pWorker->m_pBackend ) // everything healthy and work, can't change right now
- return false;
- assert ( !m_pWorker->m_pBackend );
- m_pWorker->m_pBackend = pBackend;
- tLock.Unlock();
- Invoker_c tInvoker { m_pWorker };
- m_pWorker->PostContinuationToBackend ( Threads::details::Handler2Op ( [t = std::move ( tInvoker )]() mutable { t.run(); } ) );
- }
- m_pWorker->m_pBackend = pBackend;
- return true;
- }
- const char * Name () const noexcept { return m_szName; }
- };
- Strand_c::Invoker_c::Invoker_c ( StrandWorkerPtr_t pRand )
- : m_pOwner { std::move(pRand) }
- , m_tParentKeeper { m_pOwner->KeepWorking() }
- {}
- Strand_c::Invoker_c::Invoker_c ( Strand_c::Invoker_c && rhs ) noexcept
- : m_pOwner ( rhs.m_pOwner )
- {
- m_tParentKeeper.Swap ( rhs.m_tParentKeeper );
- }
- Strand_c::Invoker_c & Strand_c::Invoker_c::operator= ( Strand_c::Invoker_c && rhs ) noexcept
- {
- m_tParentKeeper.Swap ( rhs.m_tParentKeeper );
- m_pOwner = rhs.m_pOwner;
- return *this;
- }
- void Strand_c::Invoker_c::run ()
- {
- struct OnInvokerFinished_t
- {
- Strand_c::Invoker_c* m_pThis;
- ~OnInvokerFinished_t()
- {
- bool bMoreHandlers;
- auto& pOwner = m_pThis->m_pOwner;
- {
- ScopedMutex_t tLock ( pOwner->m_dMutex );
- pOwner->m_OpReadyQueue.Push ( pOwner->m_OpWaitQueue );
- bMoreHandlers = pOwner->m_bLocked = !pOwner->m_OpReadyQueue.Empty ();
- }
- LOG ( ST, ST ) << "OnInvokerFinished_t: " << bMoreHandlers;
- if ( !bMoreHandlers )
- {
- LOG ( ST, ST ) << "OnInvokerFinished_t, abandoned, unlocked";
- return;
- }
- LOG ( ST, ST ) << "OnInvokerFinished_t, have more, locked";
- Strand_c::Invoker_c tInvoker { *m_pThis };
- // pOwner->Schedule ( [t=std::move(tInvoker)] () mutable { t.run (); }, true );
- pOwner->PostContinuationToBackend ( Threads::details::Handler2Op ( [t = std::move ( tInvoker )]() mutable { t.run(); } ) );
- }
- };
- StrandCallStack_c::Context_c dCtx ( m_pOwner );
- // that will ensure the next handler, if any, will be scheduled on block exit
- OnInvokerFinished_t VARIABLE_IS_NOT_USED dOnFinished = { this };
- // Run all ready handlers. No lock is required since the ready queue is
- // accessed only within the strand.
- while ( !m_pOwner->m_OpReadyQueue.Empty () )
- {
- auto * pOp = m_pOwner->m_OpReadyQueue.Front ();
- m_pOwner->m_OpReadyQueue.Pop ();
- LOG ( ST, ST ) << "run op: " << pOp;
- boost::context::detail::prefetch_range ( pOp, sizeof ( Operation_t ) );
- pOp->Complete ( pOp );
- }
- }
- class ThreadPool_c final : public Worker_i
- {
- using Work = Service_t::Work_c;
- const char * m_szName = nullptr;
- Service_t m_tService;
- std::optional<Work> m_dWork;
- CSphMutex m_dMutex;
- std::atomic<bool> m_bStop {false};
- struct alignas ( 64 ) Thd_t // alignas cacheline, to freely access m_bBusy without cache poisoning
- {
- std::atomic<bool> m_bBusy { false };
- SphThread_t m_tThread;
- LowThreadDesc_t* m_pChild = nullptr;
- };
- // support iteration over children for show threads and hazards
- mutable RwLock_t m_dChildGuard;
- CSphFixedVector<Thd_t> m_dThreads { 0 };
- void Post ( Threads::details::SchedulerOperation_t* pOp, bool bVip = false ) // post to primary (vip) or secondary queue
- {
- LOG ( DETAIL, TP ) << "Post " << bVip;
- if ( bVip )
- m_tService.defer_op ( pOp );
- else
- m_tService.post_op ( pOp );
- LOG ( DETAIL, TP ) << "Post finished";
- }
- void PostContinuation ( Threads::details::SchedulerOperation_t* pOp ) // 'very vip' - try to execute immediately, or post to the primary queue
- {
- LOG ( DETAIL, TP ) << "PostContinuation";
- m_tService.post_continuation ( pOp );
- LOG ( DETAIL, TP ) << "Post finished";
- }
- // Service_i & Service ()
- // {
- // return m_tService;
- // }
- void createWork ()
- {
- m_dWork.emplace ( m_tService );
- }
- void loop (int iChild) NO_THREAD_SAFETY_ANALYSIS
- {
- {
- ScWL_t _ ( m_dChildGuard );
- m_dThreads[iChild].m_pChild = &MyThd ();
- }
- while (true)
- {
- m_tService.run ( m_dThreads[iChild].m_bBusy );
- ScopedMutex_t dLock {m_dMutex};
- if ( m_bStop )
- break;
- if ( !m_dWork )
- {
- createWork ();
- m_tService.reset ();
- }
- }
- ScWL_t _ ( m_dChildGuard );
- m_dThreads[iChild].m_pChild = nullptr;
- }
- public:
- ThreadPool_c ( size_t iThreadCount, const char * szName )
- : m_szName {szName}
- , m_tService ( iThreadCount==1 )
- {
- createWork ();
- m_dThreads.Reset ( (int) iThreadCount );
- ARRAY_FOREACH ( i, m_dThreads )
- Threads::CreateQ ( &m_dThreads[i].m_tThread, [this,i] { loop (i); }, false, m_szName, i );
- LOG ( DEBUG, TP ) << "thread pool created with threads: " << iThreadCount;
- LOGINFO ( TPLIFE, TP ) << "thread pool created with threads: " << iThreadCount;
- }
- ~ThreadPool_c ()
- {
- LOGINFO ( TPLIFE, TP ) << "thread pool destroying";
- StopAll();
- ScWL_t _ ( m_dChildGuard ); // that will keep children list if smbody still iterates over it
- }
- void DiscardOnFork () final
- {
- ScWL_t _ ( m_dChildGuard );
- m_dThreads.Reset ( 0 );
- }
- void ScheduleOp ( Threads::details::SchedulerOperation_t* pOp, bool bVip ) noexcept
- {
- Post ( pOp, bVip );
- }
- void ScheduleContinuationOp ( Threads::details::SchedulerOperation_t* pOp ) noexcept
- {
- PostContinuation ( pOp );
- }
- #define LOG_LEVEL_SERVICE_KEEP_MT false
- #if LOG_LEVEL_SERVICE_KEEP_MT
- static intptr_t KeepWorkingID()
- {
- static std::atomic<intptr_t> uWorker { 0ULL };
- return uWorker.fetch_add ( 1, std::memory_order_relaxed );
- }
- Keeper_t KeepWorking() final
- {
- m_tService.work_started();
- auto kwid = KeepWorkingID();
- LOGINFO ( SERVICE_KEEP_MT, MT ) << "KeepWorking " << kwid;
- return { (void*)kwid, [this] ( void* kwid ) {
- m_tService.work_finished (); // divided to lines for breakpoints
- LOGINFO ( SERVICE_KEEP_MT, MT ) << "KeepWorking finished " << (intptr_t)kwid; } };
- }
- #else
- Keeper_t KeepWorking() noexcept
- {
- m_tService.work_started();
- return { nullptr, [this] ( void* ) { m_tService.work_finished(); } };
- }
- #endif
- int WorkingThreads () const noexcept NO_THREAD_SAFETY_ANALYSIS
- {
- return m_dThreads.GetLength ();
- }
- int Works () const noexcept
- {
- return (int)m_tService.works ();
- }
- NTasks_t Tasks() const noexcept
- {
- return m_tService.tasks();
- }
- int CurTasks() const noexcept NO_THREAD_SAFETY_ANALYSIS
- {
- return (int)m_dThreads.count_of ( [] ( auto& i ) { return i.m_bBusy.load ( std::memory_order_relaxed ); } );
- }
- void IterateChildren ( ThreadFN& fnHandler ) noexcept
- {
- ScRL_t _ ( m_dChildGuard );
- for ( const auto& tThd : m_dThreads )
- fnHandler ( tThd.m_pChild );
- }
- void StopAll () NO_THREAD_SAFETY_ANALYSIS
- {
- ScopedMutex_t dLock { m_dMutex };
- m_bStop = true;
- m_dWork.reset ();
- if ( sphIsDied() )
- m_tService.stop();
- dLock.Unlock ();
- LOG ( DEBUG, TP ) << "stopping thread pool";
- LOGINFO ( TPLIFE, TP ) << "stopping thread pool";
- for ( auto & dThread : m_dThreads )
- Threads::Join ( &dThread.m_tThread );
- LOG ( DEBUG, TP ) << "thread pool stopped";
- LOGINFO ( TPLIFE, TP ) << "thread pool stopped";
- m_dThreads.Reset ( 0 );
- }
- };
- class AloneThread_c final : public Worker_i
- {
- CSphString m_sName;
- int m_iThreadNum;
- Service_t m_tService;
- std::atomic<bool> m_bStarted {false};
- std::atomic<bool> m_bBusy {false};
- static int m_iRunningAlones;
- void Post ( Service_t::operation* pOp, bool bVip=false ) // post to primary (vip) or secondary queue
- {
- LOG ( DETAIL, TP ) << "Post " << bVip;
- if ( bVip )
- m_tService.defer_op ( pOp );
- else
- m_tService.post_op ( pOp );
- LOG ( DETAIL, TP ) << "Post finished";
- if ( !m_bStarted )
- {
- m_bStarted = true;
- SphThread_t tThd; // dummy, since we're starting detached
- Threads::CreateQ ( &tThd, [this] { loop (); }, true, m_sName.cstr (), m_iThreadNum );
- LOG ( DEBUG, TP ) << "alone thread created";
- }
- }
- void loop ()
- {
- Detached::AddThread ( &MyThd () );
- m_tService.run ( m_bBusy );
- Detached::RemoveThread ( &MyThd () );
- delete this;
- }
- public:
- explicit AloneThread_c ( int iNum, const char * szName )
- : m_sName {szName}
- , m_iThreadNum ( iNum )
- , m_tService ( true ) // true means 'single-thread'
- {
- ++m_iRunningAlones;
- LOG ( DEBUG, TP ) << "alone worker created " << szName;
- }
- ~AloneThread_c ()
- {
- LOG ( DEBUG, TP ) << "stopping thread";
- --m_iRunningAlones;
- LOG ( DEBUG, TP ) << "thread stopped";
- LOGINFO ( TPLIFE, TP ) << "AloneThread_c destroyed";
- }
- void ScheduleOp ( Service_t::operation* pOp , bool bVip ) noexcept
- {
- Post ( pOp, bVip );
- }
- #define LOG_LEVEL_SERVICE_KEEP_ALONE false
- #if LOG_LEVEL_SERVICE_KEEP_ALONE
- static intptr_t KeepWorkingID()
- {
- static std::atomic<intptr_t> uWorker { 0ULL };
- return uWorker.fetch_add ( 1, std::memory_order_relaxed );
- }
- Keeper_t KeepWorking() noexcept
- {
- m_tService.work_started();
- auto kwid = KeepWorkingID();
- LOGINFO ( SERVICE_KEEP_ALONE, MT ) << "KeepWorking alone " << this << " " << kwid;
- return { (void*)kwid, [this] ( void* kwid ) {
- m_tService.work_finished (); // divided to lines for breakpoints
- LOGINFO ( SERVICE_KEEP_ALONE, MT ) << "KeepWorking alone inished " << this << " " << (intptr_t)kwid; } };
- }
- #else
- Keeper_t KeepWorking() noexcept
- {
- m_tService.work_started();
- return { nullptr, [this] ( void* ) { m_tService.work_finished(); } };
- }
- #endif
- void StopAll () {}
- static int GetRunners () { return m_iRunningAlones; }
- int Works () const
- {
- return GetRunners ();
- }
- NTasks_t Tasks() const noexcept
- {
- return m_tService.tasks();
- }
- int CurTasks() const noexcept
- {
- return !!m_bBusy.load(std::memory_order_relaxed);
- }
- const char* Name() const noexcept
- {
- return m_sName.cstr();
- }
- };
- int AloneThread_c::m_iRunningAlones = 0;
- class ShedulerWrapper_c final : public Scheduler_i
- {
- Scheduler_i* m_pScheduler; // not owned
- const char* m_szName;
- public:
- ShedulerWrapper_c ( Scheduler_i* pScheduler, const char* szName ) noexcept
- : m_pScheduler { pScheduler }
- , m_szName { szName }
- {}
- void ScheduleOp ( details::SchedulerOperation_t* pOp, bool bVip ) noexcept
- {
- m_pScheduler->ScheduleOp ( pOp, bVip );
- }
- void ScheduleContinuationOp ( details::SchedulerOperation_t* pOp ) noexcept
- {
- m_pScheduler->ScheduleContinuationOp ( pOp );
- }
- Keeper_t KeepWorking() noexcept
- {
- return m_pScheduler->KeepWorking();
- };
- int WorkingThreads() const noexcept
- {
- return m_pScheduler->WorkingThreads();
- };
- const char* Name() const noexcept
- {
- return m_szName ? m_szName : m_pScheduler->Name();
- }
- };
- WorkerSharedPtr_t MakeThreadPool ( size_t iThreadCount, const char* szName )
- {
- return WorkerSharedPtr_t { new ThreadPool_c ( iThreadCount, szName ) };
- }
- WorkerSharedPtr_t MakeAloneThread ( size_t iOrderNum, const char* szName )
- {
- return WorkerSharedPtr_t { new AloneThread_c ( (int)iOrderNum, szName ) };
- }
- // Alone scheduler works on top of another scheduler and provides sequental execution of the tasks (each time only one
- // task may be performed, no concurrent execution). It also gives FIFO ordering of the tasks.
- SchedulerSharedPtr_t MakeAloneScheduler ( Scheduler_i* pBase, const char* szName )
- {
- return SchedulerSharedPtr_t { new Strand_c ( pBase, szName ) };
- }
- // wraps raw scheduler into shared-ptr (it will NOT delete the scheduler when destroyed!)
- SchedulerSharedPtr_t WrapRawScheduler ( Scheduler_i* pBase, const char* szName )
- {
- return SchedulerSharedPtr_t { new ShedulerWrapper_c ( pBase, szName ) };
- }
- } // namespace Threads
- namespace {
- RwLock_t & g_lShutdownGuard ()
- {
- static RwLock_t lShutdownGuard;
- return lShutdownGuard;
- }
- OpSchedule_t & g_dShutdownList ()
- {
- static OpSchedule_t dShutdownList GUARDED_BY ( g_lShutdownGuard () );
- return dShutdownList;
- }
- OpSchedule_t & g_dOnForkList ()
- {
- static OpSchedule_t dOnForkList GUARDED_BY ( g_lShutdownGuard () );
- return dOnForkList;
- }
- }
- void searchd::AddShutdownCb ( Handler fnCb )
- {
- auto pCb = Threads::details::Handler2Op ( std::move ( fnCb ) );
- ScWL_t tGuard ( g_lShutdownGuard() );
- g_dShutdownList().Push_front( pCb );
- }
- void searchd::AddOnForkCleanupCb ( Threads::Handler fnCb )
- {
- auto pCb = Threads::details::Handler2Op ( std::move ( fnCb ) );
- ScWL_t tGuard ( g_lShutdownGuard () );
- g_dOnForkList ().Push_front ( pCb );
- }
- // invoke shutdown handlers
- void searchd::FireShutdownCbs ()
- {
- ScRL_t tGuard ( g_lShutdownGuard() );
- while ( !g_dShutdownList().Empty () )
- {
- auto * pOp = g_dShutdownList().Front ();
- g_dShutdownList().Pop ();
- pOp->Complete ( pOp );
- }
- }
- void searchd::CleanAfterFork () NO_THREAD_SAFETY_ANALYSIS
- {
- while ( !g_dOnForkList ().Empty () )
- {
- auto * pOp = g_dOnForkList ().Front ();
- g_dOnForkList ().Pop ();
- pOp->Complete ( pOp );
- }
- while ( !g_dShutdownList().Empty () )
- {
- auto * pOp = g_dShutdownList().Front ();
- g_dShutdownList().Pop ();
- pOp->Destroy();
- }
- }
- static int g_iMaxChildrenThreads = 1;
- namespace {
- static WorkerSharedPtr_t pGlobalPool;
- WorkerSharedPtr_t& GlobalPoolSingletone ()
- {
- return pGlobalPool;
- }
- }
- void StartGlobalWorkPool ()
- {
- sphLogDebug ( "StartGlobalWorkpool" );
- WorkerSharedPtr_t& pPool = GlobalPoolSingletone ();
- #if !_WIN32
- if ( !pPool )
- #endif
- pPool = new ThreadPool_c ( g_iMaxChildrenThreads, "work" );
- }
- void StopGlobalWorkPool()
- {
- sphLogDebug ( "StopGlobalWorkPool" );
- WorkerSharedPtr_t& pPool = GlobalPoolSingletone();
- if ( pPool )
- pPool->StopAll();
- }
- void SetMaxChildrenThreads ( int iThreads )
- {
- sphLogDebug ( "SetMaxChildrenThreads to %d", iThreads );
- g_iMaxChildrenThreads = Max ( 1, iThreads );
- }
- int MaxChildrenThreads() noexcept
- {
- return g_iMaxChildrenThreads;
- }
- Worker_i * GlobalWorkPool ()
- {
- WorkerSharedPtr_t& pPool = GlobalPoolSingletone ();
- assert ( pPool && "invoke StartGlobalWorkPool first");
- return pPool;
- }
- void WipeGlobalSchedulerOnShutdownAndFork ()
- {
- #ifndef NDEBUG
- static bool bAlreadyInvoked = false;
- assert (!bAlreadyInvoked);
- bAlreadyInvoked = true;
- #endif
- Threads::RegisterIterator ( [] ( ThreadFN & fnHandler ) {
- WorkerSharedPtr_t& pPool = GlobalPoolSingletone ();
- if ( pPool )
- pPool->IterateChildren ( fnHandler );
- } );
- searchd::AddOnForkCleanupCb ( [] {
- WorkerSharedPtr_t& pPool = GlobalPoolSingletone ();
- if ( pPool )
- pPool->DiscardOnFork ();
- } );
- // searchd::AddShutdownCb ( [] {
- // sphWarning ( "stop all pool threads" );
- // WorkerSharedPtr_t& pPool = GlobalPoolSingletone ();
- // if ( pPool )
- // pPool->StopAll ();
- // } );
- }
- void WipeSchedulerOnFork ( Threads::Worker_i* pWorker )
- {
- Threads::RegisterIterator ( [pWorker] ( ThreadFN& fnHandler ) {
- if ( pWorker )
- pWorker->IterateChildren ( fnHandler );
- } );
- searchd::AddOnForkCleanupCb ( [pWorker] {
- if ( pWorker )
- pWorker->DiscardOnFork();
- } );
- }
- namespace {
- static std::atomic<int> g_iRunningThreads {0};
- }
- int Threads::GetNumOfRunning()
- {
- return g_iRunningThreads.load ( std::memory_order_relaxed );
- }
- //////////////////////////////////////////////////////////////////////////
- /// helpers to iterate over all registered threads
- class OperationsQueue_c::Impl_c
- {
- CSphMutex m_tQueueGuard;
- OpSchedule_t m_tQueue GUARDED_BY ( m_tQueueGuard );
- public:
- void AddOp ( Handler fnCb )
- {
- auto pCb = Threads::details::Handler2Op ( std::move ( fnCb ) );
- ScopedMutex_t tGuard ( m_tQueueGuard );
- m_tQueue.Push_front ( pCb );
- }
- void RunAll ()
- {
- OpSchedule_t tQueue;
- {
- ScopedMutex_t tGuard ( m_tQueueGuard );
- if ( m_tQueue.Empty() )
- return;
- tQueue.Push ( m_tQueue );
- }
- while ( !tQueue.Empty() )
- {
- auto* pOp = tQueue.Front();
- tQueue.Pop();
- pOp->Complete ( pOp );
- }
- }
- bool IsEmpty() const NO_THREAD_SAFETY_ANALYSIS
- {
- return m_tQueue.Empty();
- }
- ~Impl_c()
- {
- while ( !m_tQueue.Empty () )
- {
- auto * pOp = m_tQueue.Front ();
- m_tQueue.Pop ();
- pOp->Destroy ();
- }
- }
- };
- OperationsQueue_c::OperationsQueue_c()
- : m_pImpl ( new Impl_c )
- {}
- OperationsQueue_c::~OperationsQueue_c()
- {
- SafeDelete ( m_pImpl );
- }
- void OperationsQueue_c::AddOp (Handler fnOp)
- {
- assert ( m_pImpl );
- m_pImpl->AddOp(std::move(fnOp));
- }
- void OperationsQueue_c::RunAll()
- {
- assert ( m_pImpl );
- m_pImpl->RunAll();
- }
- bool OperationsQueue_c::IsEmpty() const
- {
- assert ( m_pImpl );
- return m_pImpl->IsEmpty();
- }
- namespace { // static
- class IterationHandler_c : public Threads::details::SchedulerOperation_t
- {
- ThreadIteratorFN m_Handler;
- public:
- explicit IterationHandler_c ( ThreadIteratorFN h )
- : SchedulerOperation_t ( &IterationHandler_c::DoComplete )
- , m_Handler ( std::move ( h ) )
- {}
- static void DoComplete ( void * pOwner, SchedulerOperation_t * pBase )
- {
- auto * pHandler = (IterationHandler_c *) pBase;
- if ( pOwner )
- pHandler->m_Handler ( *(ThreadFN *) pOwner );
- else
- delete pHandler;
- }
- };
- struct IteratorsQueue_t
- {
- RwLock_t m_tQueueGuard;
- OpSchedule_t m_tQueue GUARDED_BY ( m_tQueueGuard );
- void RegisterIterator ( ThreadIteratorFN fnIterator )
- {
- auto pCb = ( new IterationHandler_c ( std::move ( fnIterator ) ) );
- ScWL_t tGuard ( m_tQueueGuard );
- m_tQueue.Push_front ( pCb );
- }
- // iterate over all (pooled and alone) threads.
- // over pooled we're not using locks, since pool is living 'as whole', so no lock accessing individual elem need.
- // iteration func, however, must check if param is nullptr.
- // note, non-iteratable threads can't use hazard pointers (just nobody knows they're hold something).
- void IterateActive ( ThreadFN fnHandler )
- {
- ScRL_t tGuard ( m_tQueueGuard );
- for ( auto & dOp : m_tQueue )
- dOp.Complete ( &fnHandler );
- }
- ~IteratorsQueue_t()
- {
- ScWL_t tGuard ( m_tQueueGuard );
- while ( !m_tQueue.Empty () )
- {
- auto * pOp = m_tQueue.Front ();
- m_tQueue.Pop ();
- pOp->Destroy();
- }
- }
- };
- IteratorsQueue_t g_dIteratorsList;
- }
- void Threads::RegisterIterator ( ThreadIteratorFN fnIterator )
- {
- g_dIteratorsList.RegisterIterator ( std::move ( fnIterator ) );
- }
- void Threads::IterateActive ( ThreadFN fnHandler )
- {
- g_dIteratorsList.IterateActive ( std::move ( fnHandler ) );
- }
- Threads::Scheduler_i * MakeSingleThreadExecutor ( int iMaxThreads, const char * szName )
- {
- if ( iMaxThreads>0 && Threads::AloneThread_c::GetRunners ()>=iMaxThreads )
- return nullptr;
- static int iOrder = 0;
- return new Threads::AloneThread_c ( iOrder++, szName? szName: "alone" );
- }
- #if !_WIN32
- void * Threads::Init ( bool bDetached )
- #else
- void * Threads::Init ( bool )
- #endif
- {
- static bool bInit = false;
- #if !_WIN32
- static pthread_attr_t tJoinableAttr;
- static pthread_attr_t tDetachedAttr;
- #endif
- if ( !bInit )
- {
- #if SPH_DEBUG_LEAKS || SPH_ALLOCS_PROFILER
- sphMemStatInit();
- #endif
- #if !_WIN32
- if ( pthread_attr_init ( &tJoinableAttr ) )
- sphDie ( "FATAL: pthread_attr_init( joinable ) failed" );
- if ( pthread_attr_init ( &tDetachedAttr ) )
- sphDie ( "FATAL: pthread_attr_init( detached ) failed" );
- if ( pthread_attr_setdetachstate ( &tDetachedAttr, PTHREAD_CREATE_DETACHED ) )
- sphDie ( "FATAL: pthread_attr_setdetachstate( detached ) failed" );
- #endif
- bInit = true;
- }
- #if !_WIN32
- if ( pthread_attr_setstacksize ( &tJoinableAttr, STACK_SIZE ) )
- sphDie ( "FATAL: pthread_attr_setstacksize( joinable ) failed" );
- if ( pthread_attr_setstacksize ( &tDetachedAttr, STACK_SIZE ) )
- sphDie ( "FATAL: pthread_attr_setstacksize( detached ) failed" );
- return bDetached ? &tDetachedAttr : &tJoinableAttr;
- #else
- return NULL;
- #endif
- }
- #if SPH_DEBUG_LEAKS || SPH_ALLOCS_PROFILER
- void Threads::Done ( int iFD )
- {
- sphMemStatDump ( iFD );
- sphMemStatDone();
- }
- #else
- void Threads::Done ( int )
- {
- }
- #endif
- /// get name of a thread
- CSphString Threads::GetName ( const SphThread_t * pThread )
- {
- if ( !pThread || !*pThread )
- return "";
- #if HAVE_PTHREAD_GETNAME_NP
- char sClippedName[16];
- pthread_getname_np ( *pThread, sClippedName, 16 );
- return sClippedName;
- #else
- return "";
- #endif
- }
- /// my join thread wrapper
- bool Threads::Join ( SphThread_t * pThread )
- {
- #if _WIN32
- DWORD uWait = WaitForSingleObject ( *pThread, INFINITE );
- CloseHandle ( *pThread );
- *pThread = NULL;
- return ( uWait==WAIT_OBJECT_0 || uWait==WAIT_ABANDONED );
- #else
- return pthread_join ( *pThread, nullptr )==0;
- #endif
- }
- /// my own thread
- SphThread_t Threads::Self ()
- {
- #if _WIN32
- return GetCurrentThread();
- #else
- return pthread_self ();
- #endif
- }
- /// compares two thread ids
- bool Threads::Same ( const LowThreadDesc_t * pFirst, const LowThreadDesc_t * pSecond )
- {
- if ( !pFirst && !pSecond )
- return true;
- if ( !pFirst || !pSecond )
- return false;
- #if _WIN32
- // can not use m_tThread on Windows as GetCurrentThread returns -2 and that handle valid only inside thread itself
- return ( pFirst->m_iThreadID==pSecond->m_iThreadID );
- #else
- return pthread_equal ( pFirst->m_tThread, pSecond->m_tThread )!=0;
- #endif
- }
- struct RuntimeThreadContext_t : ISphNoncopyable
- {
- LowThreadDesc_t m_tDesc;
- const void * m_pMyThreadStack = nullptr;
- Handler m_fnRun;
- #if USE_GPROF
- pthread_mutex_t m_dlock;
- pthread_cond_t m_dwait;
- itimerval m_ditimer;
- #endif
- #if SPH_ALLOCS_PROFILER
- void * m_pTLS = nullptr;
- #endif
- // main thread execution func
- void Run ( const void * pStack );
- // prepare everything to make *this most robust
- void Prepare ( const void * pStack );
- // save name stored in desc as OS thread name
- void PropagateName ();
- };
- namespace {
- RuntimeThreadContext_t tStubForMain;
- thread_local RuntimeThreadContext_t* g_pLocalThread = &tStubForMain;
- }
- // to be used globally from thread env
- RuntimeThreadContext_t& MyThreadContext()
- {
- return *g_pLocalThread;
- }
- LowThreadDesc_t& Threads::MyThd () noexcept
- {
- return g_pLocalThread->m_tDesc;
- }
- void Threads::SetSysThreadName ()
- {
- g_pLocalThread->PropagateName ();
- }
- void Threads::JobStarted ()
- {
- auto& tDesc = Threads::MyThd ();
- tDesc.m_tmLastJobDoneTimeUS = -1;
- tDesc.m_tmLastJobStartTimeUS = sphMicroTimer ();
- tDesc.m_tmLastJobStartCPUTimeUS = sphThreadCpuTimer ();
- }
- void Threads::JobFinished ( bool bIsDone )
- {
- auto & tDesc = Threads::MyThd ();
- tDesc.m_tmLastJobDoneTimeUS = sphMicroTimer ();
- if ( bIsDone )
- ++tDesc.m_iTotalJobsDone;
- tDesc.m_tmTotalWorkedTimeUS += tDesc.m_tmLastJobDoneTimeUS-tDesc.m_tmLastJobStartTimeUS;
- tDesc.m_tmTotalWorkedCPUTimeUS += sphThreadCpuTimer()-tDesc.m_tmLastJobStartCPUTimeUS;
- }
- const void * Threads::TopOfStack ()
- {
- return MyThreadContext().m_pMyThreadStack;
- }
- void Threads::SetTopStack ( const void * pNewStack )
- {
- MyThreadContext ().m_pMyThreadStack = pNewStack;
- }
- namespace {
- int& MaxCoroStackSize()
- {
- static int iMaxCoroStackSize = 1024 * 1024;
- return iMaxCoroStackSize;
- }
- }
- void Threads::SetMaxCoroStackSize ( int iStackSize )
- {
- MaxCoroStackSize() = iStackSize;
- }
- int Threads::GetMaxCoroStackSize()
- {
- return MaxCoroStackSize();
- }
- void Threads::PrepareMainThread ( const void * PStack )
- {
- MyThreadContext ().Prepare ( PStack );
- }
- void RuntimeThreadContext_t::PropagateName ()
- {
- // set name of self
- #if HAVE_PTHREAD_SETNAME_NP
- if ( !m_tDesc.m_sThreadName.IsEmpty() )
- {
- auto sSafeName = m_tDesc.m_sThreadName.SubString ( 0, 15 );
- assert ( sSafeName.cstr ()!=nullptr );
- #if HAVE_PTHREAD_SETNAME_NP_1ARG
- pthread_setname_np ( sSafeName.cstr() );
- #else
- pthread_setname_np ( m_tDesc.m_tThread, sSafeName.cstr() );
- #endif
- }
- #endif
- }
- void RuntimeThreadContext_t::Prepare ( const void * pStack )
- {
- m_pMyThreadStack = pStack;
- m_tDesc.m_iThreadID = GetOsThreadId ();
- m_tDesc.m_tmStart = sphMicroTimer();
- m_tDesc.m_pTaskInfo.store ( nullptr, std::memory_order_release );
- m_tDesc.m_pHazards.store ( nullptr, std::memory_order_release );
- m_tDesc.m_tThread = Threads::Self ();
- #if USE_GPROF
- // Set the profile timer value
- setitimer ( ITIMER_PROF, &m_ditimer, NULL );
- // Tell the calling thread that we don't need its data anymore
- pthread_mutex_lock ( &m_dlock );
- pthread_cond_signal ( &m_dwait );
- pthread_mutex_unlock ( &m_dlock );
- #endif
- PropagateName ();
- }
- void RuntimeThreadContext_t::Run ( const void * pStack )
- {
- g_pLocalThread = this;
- Prepare ( pStack );
- #if SPH_ALLOCS_PROFILER
- m_pTLS = sphMemStatThdInit();
- #endif
- g_iRunningThreads.fetch_add ( 1, std::memory_order_acq_rel );
- LOG( DEBUG, MT ) << "thread created";
- m_fnRun();
- LOG( DEBUG, MT ) << "thread ended";
- g_iRunningThreads.fetch_sub ( 1, std::memory_order_acq_rel );
- #if SPH_ALLOCS_PROFILER
- sphMemStatThdCleanup ( m_pTLS );
- #endif
- }
- #if _WIN32
- DWORD __stdcall ThreadProcWrapper_fn ( void * pArg )
- #else
- void * ThreadProcWrapper_fn ( void * pArg )
- #endif
- {
- // This is the first local variable in the new thread. So, its address is the top of the stack.
- // We need to know thread stack size for both expression and query evaluating engines.
- // We store expressions as a linked tree of structs and execution is a calls of mutually
- // recursive methods. Before executing we compute tree height and multiply it by a constant
- // with experimentally measured value to check whether we have enough stack to execute current query.
- // The check is not ideal and do not work for all compilers and compiler settings.
- char cTopOfMyStack;
- std::unique_ptr<RuntimeThreadContext_t> pCtx { (RuntimeThreadContext_t *) pArg };
- pCtx->Run ( &cTopOfMyStack );
- return 0;
- }
- bool Threads::Create ( SphThread_t * pThread, Handler fnRun, bool bDetached, const char * sName, int iNum )
- {
- // we can not put this on current stack because wrapper need to see
- // it all the time and it will destroy this data from heap by itself
- auto pCtx = std::make_unique<RuntimeThreadContext_t>();
- pCtx->m_fnRun = std::move ( fnRun );
- if ( sName )
- {
- if ( iNum<0 )
- pCtx->m_tDesc.m_sThreadName = sName;
- else
- pCtx->m_tDesc.m_sThreadName.SetSprintf ( "%s_%d", sName, iNum );
- }
- // create thread
- #if _WIN32
- Threads::Init ( bDetached );
- *pThread = CreateThread ( NULL, STACK_SIZE, ThreadProcWrapper_fn, pCtx.get(), 0, NULL );
- if ( *pThread )
- {
- pCtx.release();
- return true;
- }
- #else
- #if USE_GPROF
- getitimer ( ITIMER_PROF, &pCtx->m_ditimer );
- pthread_cond_init ( &pCtx->m_dwait, NULL );
- pthread_mutex_init ( &pCtx->m_dlock, NULL );
- pthread_mutex_lock ( &pCtx->m_dlock );
- #endif
- void * pAttr = Threads::Init ( bDetached );
- errno = pthread_create ( pThread, (pthread_attr_t*) pAttr, ThreadProcWrapper_fn, pCtx.get() );
- #if USE_GPROF
- if ( !errno )
- pthread_cond_wait ( &pCtx->m_dwait, &pCtx->m_dlock );
- pthread_mutex_unlock ( &pCtx->m_dlock );
- pthread_mutex_destroy ( &pCtx->m_dlock );
- pthread_cond_destroy ( &pCtx->m_dwait );
- #endif
- if ( !errno )
- {
- pCtx.release();
- return true;
- }
- #endif // _WIN32
- // thread creation failed so we need to cleanup ourselves
- return false;
- }
- // Thread with crash query
- namespace { // static func
- thread_local CrashQuery_t* pTlsCrashQuery = nullptr;
- CrashQuery_t** g_ppTlsCrashQuery ()
- {
- return &pTlsCrashQuery;
- }
- void GlobalSetTopQueryTLS ( CrashQuery_t * pQuery )
- {
- *g_ppTlsCrashQuery() = pQuery;
- }
- void GlobalCrashQuerySet ( const CrashQuery_t & tQuery )
- {
- CrashQuery_t * pQuery = *g_ppTlsCrashQuery();
- assert ( pQuery );
- *pQuery = tQuery;
- }
- }
- static CrashQuery_t g_tUnhandled;
- CrashQuery_t & GlobalCrashQueryGetRef ()
- {
- CrashQuery_t * pQuery = *g_ppTlsCrashQuery ();
- // in case TLS not set \ found handler still should process crash
- if ( pQuery )
- return *pQuery;
- sphWarning ("GlobalCrashQueryGetRef: thread-local info is not set! Use ad-hoc");
- return g_tUnhandled;
- }
- CrashQueryKeeper_c::CrashQueryKeeper_c ()
- : m_tReference ( GlobalCrashQueryGetRef() )
- {}
- CrashQueryKeeper_c::~CrashQueryKeeper_c ()
- {
- RestoreCrashQuery();
- }
- void CrashQueryKeeper_c::RestoreCrashQuery () const
- {
- GlobalCrashQuerySet ( m_tReference );
- }
- namespace
- {
- constexpr char dWeekdays[7][4] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" };
- constexpr char dMonths[12][4] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
- }
- int sphFormatTime ( int64_t iNow, char * sTimeBuf, int iBufLen )
- {
- time_t ts = (time_t)( iNow / 1000000 ); // on some systems (eg. FreeBSD 6.2), tv.tv_sec has another type and we can't just pass it
- cctz::civil_second tCS = ConvertTimeLocal(ts);
- return snprintf ( sTimeBuf, iBufLen, "%.3s %.3s%3d %.2d:%.2d:%.2d.%.3d %d", dWeekdays[GetWeekDay ( tCS, true )-1], dMonths[tCS.month()-1], tCS.day(), tCS.hour(), tCS.minute(), tCS.second(), (int)( ( iNow % 1000000 ) / 1000 ), (int)tCS.year() );
- }
- void sphFormatTime ( int64_t iNow, StringBuilder_c & sOut )
- {
- time_t ts = (time_t)( iNow / 1000000 ); // on some systems (eg. FreeBSD 6.2), tv.tv_sec has another type, and we can't just pass it
- cctz::civil_second tCS = ConvertTimeLocal(ts);
- sOut << dWeekdays[GetWeekDay ( tCS, true )-1]
- << ' ' << dMonths[tCS.month()-1]
- << ' ' << Digits<2>(tCS.day())
- << ' ' << Digits<2>(tCS.hour()) << ':' << Digits<2>(tCS.minute()) << ':' << Digits<2>(tCS.second()) << '.' << FixedNum<10,3,0,'0'>( ( iNow % 1000000 ) / 1000 )
- << ' ' << tCS.year();
- }
- /// format current timestamp (for logging, or whatever)
- int sphFormatCurrentTime ( char* sTimeBuf, int iBufLen )
- {
- return sphFormatTime ( sphMicroTimer (), sTimeBuf, iBufLen );
- }
- void sphFormatCurrentTime ( StringBuilder_c& sOut )
- {
- return sphFormatTime ( sphMicroTimer (), sOut );
- }
- CSphString sphCurrentUtcTime()
- {
- int64_t iNow = sphMicroTimer();
- time_t ts = (time_t)( iNow / 1000000 ); // on some systems (eg. FreeBSD 6.2), tv.tv_sec has another type and we can't just pass it
- cctz::civil_second tCS = ConvertTimeUTC(ts);
- StringBuilder_c tOut;
- tOut << tCS.year()
- << '-' << Digits<2>(tCS.month())
- << '-' << Digits<2>(tCS.day())
- << 'T' << Digits<2>(tCS.hour()) << ':' << Digits<2>(tCS.minute()) << ':' << Digits<2>(tCS.second())
- << '.' << FixedNum<10, 3, 0, '0'> ( ( iNow % 1000000 ) / 1000 );
- // tOut.Sprintf ( "%.4d-%.2d-%.2dT%.2d:%.2d:%.2d.%.3d", // YYYY-MM-DDThh:mm:ss[.SSS]
- // 1900 + tmp.tm_year,
- // tmp.tm_mon + 1,
- // tmp.tm_mday,
- // tmp.tm_hour,
- // tmp.tm_min,
- // tmp.tm_sec,
- // (int)( ( iNow % 1000000 ) / 1000 ) );
- CSphString sRes;
- tOut.MoveTo ( sRes );
- return sRes;
- }
- // create thread for query - it will have set CrashQuery to valid obj inside, alive during whole thread's live time.
- bool Threads::CreateQ ( SphThread_t * pThread, Handler fnRun, bool bDetached, const char * sName, int iNum )
- {
- return Create ( pThread, [fnCrashRun = std::move ( fnRun )]
- {
- CrashQuery_t tQueryTLS;
- GlobalSetTopQueryTLS ( &tQueryTLS );
- LOG( DEBUG, MT ) << "thread created";
- fnCrashRun();
- LOG( DEBUG, MT ) << "thread ended";
- }, bDetached, sName, iNum );
- }
- // capture crash query and set it before running fnHandler.
- Threads::Handler Threads::WithCopiedCrashQuery ( Threads::Handler fnHandler )
- {
- CrashQuery_t tParentCrashQuery = GlobalCrashQueryGetRef ();
- return [tCrashQuery = tParentCrashQuery, fnHandler = std::move ( fnHandler )] {
- // CrashQueryKeeper_c _; // restore previous crash query on exit. Seems, that is not necessary
- GlobalCrashQuerySet ( tCrashQuery );
- fnHandler ();
- };
- }
|