mini_timer.cpp 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. //
  2. // Copyright (c) 2021-2026, Manticore Software LTD (https://manticoresearch.com)
  3. // All rights reserved
  4. //
  5. // This program is free software; you can redistribute it and/or modify
  6. // it under the terms of the GNU General Public License. You should have
  7. // received a copy of the GPL license along with this program; if you
  8. // did not, you can find it at http://www.gnu.org/
  9. //
  10. #include "mini_timer.h"
  11. #include "threadutils.h"
  12. #include "timeout_queue.h"
  13. #include <atomic>
  14. #ifndef VERBOSE_TIMER
  15. #define VERBOSE_TIMER 0
  16. #endif
  17. #if VERBOSE_TIMER
  18. #define LOG_LEVEL_TIMER true
  19. #else
  20. #define LOG_LEVEL_TIMER false
  21. #endif
  22. namespace Time
  23. {
  24. CSphString Stamp()
  25. {
  26. return StringBuilder_c().Sprintf( "[%t] ", TimePrefixed::TimeStamp() ).cstr();
  27. }
  28. }
  29. #define LOG_COMPONENT_TSKX "X " << Time::Stamp()
  30. #define LOG_COMPONENT_TSKT "T " << Time::Stamp()
  31. #define INFOX LOGMSG ( INFO, TIMER, TSKX )
  32. #define DEBUGT LOGMSG ( DEBUG, TIMER, TSKT )
  33. #define DEBUGX LOGMSG ( DEBUG, TIMER, TSKX )
  34. using namespace Threads;
  35. namespace {
  36. std::atomic g_tmLastTimestamp { sphMicroTimer() };
  37. int64_t MicroTimerImpl() noexcept
  38. {
  39. int64_t tmTimestamp = sphMicroTimer();
  40. g_tmLastTimestamp.store ( tmTimestamp, std::memory_order_relaxed );
  41. return tmTimestamp;
  42. }
  43. [[nodiscard]] int64_t LastTimestampImpl() noexcept
  44. {
  45. return g_tmLastTimestamp.load ( std::memory_order_relaxed );
  46. }
  47. /// timer thread context
  48. ThreadRole TimerThread;
  49. std::atomic g_bTimerCreated { false };
  50. std::atomic g_bTimerActive { false };
  51. } // namespace
  52. [[nodiscard]] int64_t sph::MicroTimer() noexcept
  53. {
  54. return MicroTimerImpl();
  55. }
  56. [[nodiscard]] int64_t sph::LastTimestamp() noexcept
  57. {
  58. return LastTimestampImpl();
  59. }
  60. class TinyTimer_c
  61. {
  62. // the queue
  63. mutable CSphMutex m_tTimeoutsGuard; // guard is need as we can add/remove elements from any thread. That is short-live.
  64. TimeoutQueue_c m_dTimeouts GUARDED_BY ( m_tTimeoutsGuard );
  65. // management
  66. OneshotEvent_c m_tSignal;
  67. std::atomic<bool> m_bInterrupted { true };
  68. // thread
  69. SphThread_t m_tCounterThread;
  70. std::atomic<LowThreadDesc_t*> m_pCounterThread { nullptr };
  71. private:
  72. [[nodiscard]] bool IsInterrupted() const noexcept
  73. {
  74. return m_bInterrupted.load(std::memory_order_relaxed) || sphInterrupted(); // aliased, as we can override it in tests while mocking
  75. }
  76. void Enqueue ( MiniTimer_c& tTask ) noexcept EXCLUDES ( m_tTimeoutsGuard )
  77. {
  78. DEBUGT << "enqueue " << &tTask;
  79. {
  80. ScopedMutex_t tTimeoutsLock { m_tTimeoutsGuard };
  81. m_dTimeouts.Change ( &tTask );
  82. }
  83. Kick();
  84. }
  85. [[nodiscard]] int GetNextWaitPeriodMs() const noexcept REQUIRES ( TimerThread ) EXCLUDES ( m_tTimeoutsGuard )
  86. {
  87. ScopedMutex_t tTimeoutsLock { m_tTimeoutsGuard };
  88. if ( m_dTimeouts.IsEmpty() )
  89. return -1;
  90. auto* pTask = (MiniTimer_c*)m_dTimeouts.Root();
  91. return (int)( ( pTask->m_iTimeoutTimeUS - MicroTimerImpl() ) / sph::TICKS_GRANULARITY );
  92. }
  93. [[nodiscard]] std::pair<MiniTimer_c*,Handler> PopNextDeadlinedAction() noexcept REQUIRES ( TimerThread ) EXCLUDES ( m_tTimeoutsGuard )
  94. {
  95. ScopedMutex_t tTimeoutsLock { m_tTimeoutsGuard };
  96. if ( m_dTimeouts.IsEmpty() )
  97. return { nullptr, nullptr };
  98. auto pRoot = (MiniTimer_c*)m_dTimeouts.Root();
  99. assert ( pRoot->m_iTimeoutTimeUS > 0 );
  100. if ( !sph::TimeExceeded ( pRoot->m_iTimeoutTimeUS, MicroTimerImpl() ) )
  101. return { nullptr, nullptr };
  102. // timeout reached; have to do an action
  103. DEBUGT << "timeout happens for " << pRoot << " deadline " << timestamp_t ( pRoot->m_iTimeoutTimeUS );
  104. DEBUGT << m_dTimeouts.DebugDump ( "heap:" );
  105. m_dTimeouts.Pop();
  106. DEBUGT << "Oneshot task removed: " << pRoot;
  107. return { pRoot, pRoot->m_fnOnTimer };
  108. }
  109. void ProcessTimerActions() noexcept REQUIRES ( TimerThread ) EXCLUDES ( m_tTimeoutsGuard )
  110. {
  111. for ( auto [pRoot, fnTimer] = PopNextDeadlinedAction(); pRoot; std::tie(pRoot, fnTimer) = PopNextDeadlinedAction() )
  112. if ( fnTimer )
  113. fnTimer();
  114. }
  115. void Loop() noexcept
  116. {
  117. ScopedRole_c thSched ( TimerThread );
  118. m_pCounterThread.store ( &Threads::MyThd(), std::memory_order_relaxed );
  119. g_bTimerActive = true;
  120. while ( !IsInterrupted () )
  121. {
  122. DEBUGT << "---------------------------- Loop() tick";
  123. ProcessTimerActions();
  124. int iWait = GetNextWaitPeriodMs();
  125. if ( !iWait )
  126. {
  127. DEBUGT << "no sleep since timeout is 0; (" << timestamp_t ( iWait ) << ")";
  128. continue;
  129. }
  130. DEBUGT << "calculated timeout is " << iWait << " ms (" << timestamp_t ( iWait ) << ")";
  131. bool VARIABLE_IS_NOT_USED bWasKicked = m_tSignal.WaitEvent ( iWait );
  132. DEBUGT << "awakened, reason=" << ( bWasKicked ? "kicked" : "timeout or error" );
  133. }
  134. g_bTimerActive = false;
  135. AbortScheduled();
  136. m_pCounterThread.store ( nullptr, std::memory_order_relaxed );
  137. }
  138. [[nodiscard]] std::pair<MiniTimer_c*,Handler> PopNextAction() noexcept REQUIRES ( TimerThread ) EXCLUDES ( m_tTimeoutsGuard )
  139. {
  140. ScopedMutex_t tTimeoutsLock { m_tTimeoutsGuard };
  141. if ( m_dTimeouts.IsEmpty() )
  142. return { nullptr, nullptr };
  143. auto pRoot = (MiniTimer_c*)m_dTimeouts.Root();
  144. m_dTimeouts.Pop();
  145. return { pRoot, pRoot->m_fnOnTimer };
  146. }
  147. /// abandon and release all events (on shutdown)
  148. void AbortScheduled() noexcept REQUIRES ( TimerThread ) EXCLUDES ( m_tTimeoutsGuard )
  149. {
  150. DEBUGT << "AbortScheduled()";
  151. assert ( IsInterrupted() );
  152. for ( auto [pRoot, fnTimer] = PopNextAction(); pRoot; std::tie(pRoot, fnTimer) = PopNextAction() )
  153. if ( fnTimer )
  154. fnTimer();
  155. }
  156. public:
  157. TinyTimer_c()
  158. {
  159. MicroTimerImpl();
  160. m_bInterrupted.store ( false, std::memory_order_release );
  161. g_bTimerCreated = true;
  162. RegisterIterator ( [this] ( const ThreadFN& fnHandler ) {
  163. fnHandler ( m_pCounterThread.load ( std::memory_order_relaxed ) );
  164. } );
  165. Create ( &m_tCounterThread, [this] { Loop (); }, false, "Timer" );
  166. }
  167. ~TinyTimer_c()
  168. {
  169. DEBUGX << "~TinyTimer_c. Shutdown=" << IsInterrupted();
  170. Stop();
  171. }
  172. void Stop() noexcept
  173. {
  174. m_bInterrupted.store ( true, std::memory_order_release );
  175. if ( !g_bTimerActive )
  176. return;
  177. Kick();
  178. Join ( &m_tCounterThread );
  179. }
  180. /// Kick the tasker
  181. void Kick() noexcept
  182. {
  183. DEBUGX << "Timer kicked";
  184. m_tSignal.SetEvent();
  185. }
  186. void EngageAt ( int64_t iTimeStampUS, MiniTimer_c& tTimer ) noexcept EXCLUDES ( m_tTimeoutsGuard )
  187. {
  188. if ( tTimer.m_iTimeoutIdx >= 0 && tTimer.m_iTimeoutTimeUS == iTimeStampUS )
  189. return; // no need to re-engage if no changes
  190. tTimer.m_iTimeoutTimeUS = iTimeStampUS;
  191. DEBUGT << "Engage task: " << &tTimer << " after " << timestamp_t ( iTimeStampUS );
  192. Enqueue ( tTimer );
  193. }
  194. int64_t Engage ( int64_t iTimePeriodUS, MiniTimer_c& tTimer ) noexcept EXCLUDES ( m_tTimeoutsGuard )
  195. {
  196. if ( iTimePeriodUS < 0 || IsInterrupted() )
  197. return -1;
  198. EngageAt ( MicroTimerImpl() + iTimePeriodUS, tTimer );
  199. return tTimer.m_iTimeoutTimeUS;
  200. }
  201. void Remove ( MiniTimer_c& tTimer ) noexcept EXCLUDES ( m_tTimeoutsGuard )
  202. {
  203. ScopedMutex_t tTimeoutsLock { m_tTimeoutsGuard };
  204. DEBUGT << ((tTimer.m_iTimeoutIdx >= 0) ? "Removed from queue: " : "Not in queue: ") << &tTimer << " deadline " << timestamp_t ( tTimer.m_iTimeoutTimeUS );
  205. m_dTimeouts.Remove ( &tTimer );
  206. }
  207. // statistics
  208. void FillSchedInfo( CSphVector<sph::ScheduleInfo_t>& dRes) const noexcept EXCLUDES ( m_tTimeoutsGuard )
  209. {
  210. ScopedMutex_t tTimeoutsLock { m_tTimeoutsGuard };
  211. m_dTimeouts.DebugDump ( [&dRes] ( EnqueuedTimeout_t* pMember ) {
  212. auto& dInfo = dRes.Add();
  213. auto* pScheduled = (MiniTimer_c*)pMember;
  214. dInfo.m_iTimeoutStamp = pScheduled->m_iTimeoutTimeUS;
  215. dInfo.m_sTask = pScheduled->m_szName;
  216. } );
  217. }
  218. };
  219. TinyTimer_c& g_TinyTimer()
  220. {
  221. static TinyTimer_c tTimer;
  222. return tTimer;
  223. }
  224. void MiniTimer_c::EngageAt ( int64_t iTimeStampUS ) noexcept
  225. {
  226. DEBUGT << "MiniTimer_c::EngageAt " << timestamp_t ( iTimeStampUS );
  227. g_TinyTimer().EngageAt ( iTimeStampUS, *this );
  228. }
  229. void MiniTimer_c::EngageAt ( int64_t iTimeStampUS, Handler&& fnOnTimer ) noexcept
  230. {
  231. DEBUGT << "MiniTimer_c::EngageAt " << timestamp_t ( iTimeStampUS );
  232. SetHandler ( std::move ( fnOnTimer ) );
  233. g_TinyTimer().EngageAt ( iTimeStampUS, *this );
  234. }
  235. int64_t MiniTimer_c::Engage ( int64_t iTimePeriodMS ) noexcept
  236. {
  237. auto iTimePeriodUS = iTimePeriodMS * 1000;
  238. if ( iTimePeriodUS <= 0 )
  239. return 0;
  240. DEBUGT << "MiniTimer_c::Engage " << timespan_t ( iTimePeriodUS );
  241. return g_TinyTimer().Engage ( iTimePeriodUS, *this );
  242. }
  243. int64_t MiniTimer_c::Engage ( int64_t iTimePeriodMS, Handler&& fnOnTimer ) noexcept
  244. {
  245. auto iTimePeriodUS = iTimePeriodMS * 1000;
  246. if ( iTimePeriodUS <= 0 )
  247. return 0;
  248. DEBUGT << "MiniTimer_c::Engage " << timespan_t ( iTimePeriodUS );
  249. SetHandler ( std::move ( fnOnTimer ) );
  250. return g_TinyTimer().Engage ( iTimePeriodUS, *this );
  251. }
  252. void MiniTimer_c::SetHandler ( Handler&& fnOnTimer ) noexcept
  253. {
  254. assert ( !m_fnOnTimer );
  255. m_fnOnTimer = std::move ( fnOnTimer );
  256. }
  257. void MiniTimer_c::UnEngage() noexcept
  258. {
  259. if ( g_bTimerCreated.load(std::memory_order_relaxed) )
  260. g_TinyTimer().Remove ( *this );
  261. }
  262. MiniTimer_c::~MiniTimer_c()
  263. {
  264. UnEngage();
  265. }
  266. /// returns true if provided timestamp is already reached or not
  267. [[nodiscard]] bool sph::TimeExceeded ( int64_t tmMicroTimestamp ) noexcept
  268. {
  269. if ( tmMicroTimestamp <= 0 )
  270. return false;
  271. return sph::TimeExceeded ( tmMicroTimestamp, LastTimestampImpl() );
  272. }
  273. void sph::ShutdownMiniTimer()
  274. {
  275. if ( g_bTimerActive )
  276. g_TinyTimer().Stop();
  277. }
  278. // statistics
  279. CSphVector<sph::ScheduleInfo_t> sph::GetSchedInfo()
  280. {
  281. CSphVector<sph::ScheduleInfo_t> dRes;
  282. if ( g_bTimerCreated )
  283. g_TinyTimer().FillSchedInfo ( dRes );
  284. return dRes;
  285. }