searchdtask.cpp 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. //
  2. // Copyright (c) 2017-2026, Manticore Software LTD (https://manticoresearch.com)
  3. // Copyright (c) 2001-2016, Andrew Aksyonoff
  4. // Copyright (c) 2008-2016, Sphinx Technologies Inc
  5. // All rights reserved
  6. //
  7. // This program is free software; you can redistribute it and/or modify
  8. // it under the terms of the GNU General Public License. You should have
  9. // received a copy of the GPL license along with this program; if you
  10. // did not, you can find it at http://www.gnu.org/
  11. //
  12. #include "searchdtask.h"
  13. #include "coroutine.h"
  14. #include "mini_timer.h"
  15. #ifndef VERBOSE_TASKMANAGER
  16. #define VERBOSE_TASKMANAGER 0
  17. #endif
  18. #if VERBOSE_TASKMANAGER
  19. #define LOG_LEVEL_TSK true
  20. #else
  21. #define LOG_LEVEL_TSK false
  22. #endif
  23. #define LOG_COMPONENT_TSKX "X "
  24. #define INFOX LOGMSG ( INFO, TSK, TSKX )
  25. #define DEBUGX LOGMSG ( DEBUG, TSK, TSKX )
  26. #define WARNX LOGMSG ( WARNING, TSK, TSKX )
  27. // Max num of task flavours (we allocate fixed vec of this size)
  28. // since we have only 7 different tasks for now, pool of 32 slots seems to be enough
  29. constexpr int NUM_TASKS = 32;
  30. //////////////////////////////////////////////////////////////////////////
  31. // Tasks (job classes)
  32. //////////////////////////////////////////////////////////////////////////
  33. static TaskManager::TaskInfo_t g_Tasks [ NUM_TASKS ];
  34. static std::atomic<int> g_iTasks {0};
  35. // wrap naked executor into statistic collector
  36. Threads::Handler AttachClass ( TaskID iTask, Threads::Handler&& fnWorker )
  37. {
  38. return [iTask, fnWorker=std::move(fnWorker)] () {
  39. Threads::JobTracker_t dTrack;
  40. auto& tInfo = g_Tasks[iTask];
  41. INFOX << "Task " << tInfo.m_sName << " started";
  42. tInfo.m_iCurrentRunners.fetch_add ( 1, std::memory_order_relaxed );
  43. auto itmStart = sphMicroTimer();
  44. std::atomic_thread_fence ( std::memory_order_acquire );
  45. fnWorker();
  46. std::atomic_thread_fence ( std::memory_order_release );
  47. auto itmEnd = sphMicroTimer();
  48. tInfo.m_iCurrentRunners.fetch_sub ( 1, std::memory_order_relaxed );
  49. tInfo.m_iAllRunners.fetch_sub ( 1, std::memory_order_relaxed );
  50. tInfo.m_iTotalRun.fetch_add ( 1, std::memory_order_relaxed );
  51. tInfo.m_iLastFinished.store ( itmEnd, std::memory_order_relaxed );
  52. tInfo.m_iTotalSpent.fetch_add ( itmEnd - itmStart, std::memory_order_relaxed );
  53. INFOX << "Task " << tInfo.m_sName << " finished";
  54. };
  55. }
  56. void TaskManager::StartJob ( TaskID iTask, Threads::Handler fnJob )
  57. {
  58. assert ( iTask <= g_iTasks.load ( std::memory_order_relaxed ) && iTask >= 0 );
  59. auto& tInfo = g_Tasks[iTask];
  60. auto iAllRunners = tInfo.m_iAllRunners.load ( std::memory_order_relaxed );
  61. if ( sphInterrupted() )
  62. {
  63. INFOX << "Drop job (id=" << iTask << " \"" << tInfo.m_sName << "\"), since interrupted";
  64. tInfo.m_iTotalDropped.fetch_add ( 1, std::memory_order_relaxed );
  65. return;
  66. }
  67. if ( tInfo.m_iMaxRunners > 0 && iAllRunners >= tInfo.m_iMaxRunners )
  68. {
  69. INFOX << "Drop job (id=" << iTask << " \"" << tInfo.m_sName << "\"), since " << iAllRunners << " is running/enqueued";
  70. tInfo.m_iTotalDropped.fetch_add ( 1, std::memory_order_relaxed );
  71. return;
  72. }
  73. INFOX << "StartJob (id=" << iTask << " \"" << tInfo.m_sName << "\")";
  74. tInfo.m_iAllRunners.fetch_add ( 1, std::memory_order_relaxed );
  75. Threads::StartJob ( AttachClass ( iTask, std::move ( fnJob ) ) );
  76. }
  77. TaskID TaskManager::RegisterGlobal ( CSphString sName, int iThreads )
  78. {
  79. auto iTaskID = TaskID ( g_iTasks.fetch_add ( 1, std::memory_order_relaxed ) );
  80. if ( !iTaskID ) // this is first class; start log timering
  81. TimePrefixed::TimeStart();
  82. INFOX << "Task \"" << sName << "\" registered with id=" << iTaskID << ", running max " << iThreads << " jobs a time" << (iThreads?"":" (0=unlimited)");
  83. auto& dInfo = g_Tasks[iTaskID];
  84. dInfo.m_sName = std::move ( sName );
  85. dInfo.m_iMaxRunners = iThreads;
  86. return iTaskID;
  87. }
  88. void TaskManager::ScheduleJob ( TaskID iTask, int64_t iTimeStampUS, Threads::Handler fnJob )
  89. {
  90. INFOX << "ScheduleJob (id=" << iTask << ", \"" << g_Tasks[iTask].m_sName << "\", start " << timestamp_t ( iTimeStampUS ) << ")";
  91. assert ( iTimeStampUS > 0 );
  92. auto pTimer = new MiniTimer_c ( g_Tasks[iTask].m_sName.cstr() );
  93. pTimer->EngageAt ( iTimeStampUS, [pTimer, iTask, fnJob = std::move ( fnJob )]() mutable { TaskManager::StartJob ( iTask, std::move ( fnJob ) ); delete pTimer; } );
  94. }
  95. VecTraits_T<TaskManager::TaskInfo_t> TaskManager::GetTaskInfo ()
  96. {
  97. return { g_Tasks, g_iTasks.load ( std::memory_order_relaxed ) };
  98. }