task_info.cpp 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. //
  2. // Copyright (c) 2021-2022, Manticore Software LTD (https://manticoresearch.com)
  3. // All rights reserved
  4. //
  5. // This program is free software; you can redistribute it and/or modify
  6. // it under the terms of the GNU General Public License. You should have
  7. // received a copy of the GPL license along with this program; if you
  8. // did not, you can find it at http://www.gnu.org/
  9. //
  10. #include "task_info.h"
  11. #include "threadutils.h"
  12. namespace { // static
  13. const size_t NINFOS = 256;
  14. RenderFnPtr pInfos[NINFOS] = { nullptr };
  15. std::atomic<int> dCounters[NINFOS];
  16. std::atomic<BYTE> uFreeInfoSlot {1}; // 0-th slot is a mark of 'invalid'
  17. }
  18. BYTE RegisterRenderer ( RenderFnPtr pFunc ) noexcept
  19. {
  20. BYTE uRender = uFreeInfoSlot.fetch_add ( 1, std::memory_order_relaxed );
  21. pInfos[uRender] = pFunc;
  22. dCounters[uRender].store ( 0 );
  23. return uRender;
  24. }
  25. void RefCount_t::Inc ( BYTE eType )
  26. {
  27. if ( eType >= uFreeInfoSlot )
  28. sphWarning ( "Wrong RefCountInc slot! type=%d, free slot = %d", eType, uFreeInfoSlot.load() );
  29. assert ( eType<uFreeInfoSlot );
  30. if ( eType )
  31. dCounters[eType].fetch_add ( 1, std::memory_order_relaxed );
  32. }
  33. void RefCount_t::Dec ( BYTE eType )
  34. {
  35. if ( eType>=uFreeInfoSlot )
  36. sphWarning ( "Wrong RefCountDec slot! type=%d, free slot = %d", eType, uFreeInfoSlot.load () );
  37. assert ( eType<uFreeInfoSlot );
  38. if ( eType )
  39. dCounters[eType].fetch_sub ( 1, std::memory_order_relaxed );
  40. }
  41. int myinfo::Count ( BYTE eType )
  42. {
  43. assert ( eType<uFreeInfoSlot );
  44. return dCounters[eType].load ( std::memory_order_relaxed );
  45. }
  46. int myinfo::CountAll ()
  47. {
  48. int iRes = 0;
  49. for ( int i = 1, iLast = uFreeInfoSlot.load ( std::memory_order_relaxed ); i<iLast; ++i )
  50. iRes += dCounters[i].load ( std::memory_order_relaxed );
  51. return iRes;
  52. }
  53. void PublicThreadDesc_t::Swap ( PublicThreadDesc_t & rhs )
  54. {
  55. ::Swap ( m_iThreadID, rhs.m_iThreadID );
  56. ::Swap ( m_tmStart, rhs.m_tmStart );
  57. ::Swap ( m_tmLastJobStartTimeUS, rhs.m_tmLastJobStartTimeUS );
  58. ::Swap ( m_tmLastJobDoneTimeUS, rhs.m_tmLastJobDoneTimeUS );
  59. ::Swap ( m_tmTotalWorkedTimeUS, rhs.m_tmTotalWorkedTimeUS );
  60. ::Swap ( m_tmTotalWorkedCPUTimeUS, rhs.m_tmTotalWorkedCPUTimeUS );
  61. ::Swap ( m_iTotalJobsDone, rhs.m_iTotalJobsDone );
  62. ::Swap ( m_sThreadName, rhs.m_sThreadName );
  63. ::Swap ( m_sClientName, rhs.m_sClientName );
  64. ::Swap ( m_sDescription, rhs.m_sDescription );
  65. ::Swap ( m_sProto, rhs.m_sProto );
  66. ::Swap ( m_tmConnect, rhs.m_tmConnect );
  67. ::Swap ( m_pQuery, rhs.m_pQuery );
  68. ::Swap ( m_sCommand, rhs.m_sCommand );
  69. ::Swap ( m_iConnID, rhs.m_iConnID );
  70. ::Swap ( m_eProto, rhs.m_eProto );
  71. ::Swap ( m_eTaskState, rhs.m_eTaskState );
  72. ::Swap ( m_sChain, rhs.m_sChain );
  73. }
  74. void CopyBasicThreadInfo ( const Threads::LowThreadDesc_t * pSrc, PublicThreadDesc_t & dDst )
  75. {
  76. dDst.m_iThreadID = pSrc->m_iThreadID;
  77. dDst.m_tmStart.emplace_once ( pSrc->m_tmStart );
  78. dDst.m_tmLastJobStartTimeUS = pSrc->m_tmLastJobStartTimeUS;
  79. dDst.m_tmLastJobDoneTimeUS = pSrc->m_tmLastJobDoneTimeUS;
  80. dDst.m_tmTotalWorkedTimeUS = pSrc->m_tmTotalWorkedTimeUS;
  81. dDst.m_tmTotalWorkedCPUTimeUS = pSrc->m_tmTotalWorkedCPUTimeUS;
  82. dDst.m_iTotalJobsDone = pSrc->m_iTotalJobsDone;
  83. dDst.m_sThreadName = pSrc->m_sThreadName;
  84. }
  85. void RenderPublicTaskInfo ( const void * pSrc, PublicThreadDesc_t & dDst, BYTE eType )
  86. {
  87. if ( pInfos[eType] )
  88. pInfos[eType] ( pSrc, dDst );
  89. }
  90. PublicThreadDesc_t GatherPublicTaskInfo ( const Threads::LowThreadDesc_t * pSrc, int iCols )
  91. {
  92. PublicThreadDesc_t dDst;
  93. if (!pSrc)
  94. return dDst;
  95. dDst.m_iDescriptionLimit = iCols; // works as call-back
  96. hazard::Guard_c tGuard;
  97. auto pSrcInfo = (TaskInfo_t *) tGuard.Protect ( pSrc->m_pTaskInfo );
  98. while ( pSrcInfo )
  99. {
  100. RenderPublicTaskInfo ( pSrcInfo, dDst, pSrcInfo->m_eType );
  101. pSrcInfo = (TaskInfo_t *) tGuard.Protect ( pSrcInfo->m_pPrev );
  102. }
  103. tGuard.Release();
  104. CopyBasicThreadInfo ( pSrc, dDst );
  105. return dDst;
  106. }
  107. TaskInfo_t* myinfo::HazardTaskInfo()
  108. {
  109. return (TaskInfo_t*)Threads::MyThd().m_pTaskInfo.load ( std::memory_order_acquire );
  110. }
  111. TaskInfo_t* myinfo::GetHazardTypedNode ( BYTE eType )
  112. {
  113. return HazardGetNode ( [eType] ( TaskInfo_t* pNode ) { return pNode->m_eType == eType; } );
  114. }
  115. // bind current taskinfo content to handler
  116. Threads::Handler myinfo::StickParent ( Threads::Handler fnHandler )
  117. {
  118. auto pParent = myinfo::HazardTaskInfo();
  119. return [pParent, fnHandler = std::move ( fnHandler )] {
  120. Threads::MyThd().m_pTaskInfo.store ( pParent, std::memory_order_release );
  121. fnHandler();
  122. };
  123. }
  124. // bind current taskinfo and add new scoped mini info for coro handler
  125. Threads::Handler myinfo::OwnMini ( Threads::Handler fnHandler )
  126. {
  127. auto pParent = myinfo::HazardTaskInfo();
  128. return [pParent, fnHandler = std::move ( fnHandler )] {
  129. Threads::MyThd().m_pTaskInfo.store ( pParent, std::memory_order_release );
  130. ScopedMiniInfo_t _ ( new MiniTaskInfo_t );
  131. fnHandler();
  132. };
  133. }
  134. Threads::Handler myinfo::OwnMiniNoCount ( Threads::Handler fnHandler )
  135. {
  136. auto pParent = myinfo::HazardTaskInfo();
  137. return [pParent, fnHandler = std::move ( fnHandler )] {
  138. Threads::MyThd().m_pTaskInfo.store ( pParent, std::memory_order_release );
  139. ScopedMiniInfoNoCount_t _ ( new MiniTaskInfo_t );
  140. fnHandler();
  141. };
  142. }
  143. // generic is empty
  144. DEFINE_RENDER ( TaskInfo_t ) {};
  145. void MiniTaskInfo_t::RenderWithoutChain ( PublicThreadDesc_t& dDst )
  146. {
  147. dDst.m_tmStart.emplace_once ( m_tmStart );
  148. dDst.m_sCommand = m_sCommand;
  149. hazard::Guard_c tGuard;
  150. auto pDescription = tGuard.Protect ( m_pHazardDescription );
  151. if ( pDescription )
  152. {
  153. if ( dDst.m_iDescriptionLimit < 0 ) // no limit
  154. dDst.m_sDescription << *pDescription;
  155. else
  156. dDst.m_sDescription.AppendChunk ( { pDescription->scstr(), Min ( m_iDescriptionLen, dDst.m_iDescriptionLimit ) } );
  157. }
  158. }
  159. DEFINE_RENDER ( MiniTaskInfo_t )
  160. {
  161. dDst.m_sChain << "Mini ";
  162. auto& tInfo = *(MiniTaskInfo_t*)pSrc;
  163. tInfo.RenderWithoutChain ( dDst );
  164. }
  165. void SetMiniDescription ( MiniTaskInfo_t * pNode, CSphString * pString, int iLen )
  166. {
  167. assert ( pNode );
  168. assert ( pString );
  169. if ( pNode->m_iDescriptionLen>myinfo::HazardDescriptionSizeLimit )
  170. pNode->m_pHazardDescription.RetireNow ( pString );
  171. else
  172. pNode->m_pHazardDescription = pString;
  173. pNode->m_iDescriptionLen = iLen;
  174. pNode->m_tmStart = sphMicroTimer();
  175. }
  176. void SetMiniDescription ( MiniTaskInfo_t * pNode, const char * sTemplate, ... )
  177. {
  178. assert ( pNode );
  179. StringBuilder_c sBuf;
  180. va_list ap;
  181. va_start ( ap, sTemplate );
  182. sBuf.vSprintf ( sTemplate, ap );
  183. va_end ( ap );
  184. auto pString = new CSphString;
  185. auto iLen = sBuf.GetLength();
  186. sBuf.MoveTo ( *pString );
  187. SetMiniDescription ( pNode, pString, iLen );
  188. }
  189. void myinfo::SetCommand ( const char * sCommand )
  190. {
  191. auto pNode = HazardGetMini ();
  192. if ( pNode )
  193. pNode->m_sCommand = sCommand;
  194. else
  195. sphWarning ( "internal error: myinfo::SetCommand () invoked with empty tls!" );
  196. }
  197. Str_t myinfo::UnsafeDescription ()
  198. {
  199. auto pNode = HazardGetMini ();
  200. assert (pNode);
  201. if ( pNode )
  202. {
  203. if ( pNode->m_pHazardDescription )
  204. return { pNode->m_pHazardDescription->cstr (), pNode->m_iDescriptionLen };
  205. else
  206. return dEmptyStr;
  207. }
  208. sphWarning ( "internal error: myinfo::Description () invoked with empty tls!" );
  209. return dEmptyStr;
  210. }
  211. void myinfo::SetDescription ( CSphString sString, int iLen )
  212. {
  213. auto pNode = HazardGetMini ();
  214. assert ( pNode );
  215. if ( !pNode )
  216. {
  217. sphWarning ( "internal error: myinfo::SetDescription () invoked with empty tls!" );
  218. return;
  219. }
  220. SetMiniDescription ( pNode, new CSphString ( std::move ( sString ) ), iLen );
  221. }
  222. void myinfo::SetThreadInfo ( const char * sTemplate, ... )
  223. {
  224. auto pNode = HazardGetMini ();
  225. assert ( pNode );
  226. if ( !pNode )
  227. {
  228. sphWarning ( "internal error: myinfo::SetThreadInfo () invoked with empty tls!" );
  229. return;
  230. }
  231. StringBuilder_c sBuf;
  232. va_list ap;
  233. va_start ( ap, sTemplate );
  234. sBuf.vSprintf ( sTemplate, ap );
  235. va_end ( ap );
  236. auto pString = new CSphString;
  237. auto iLen = sBuf.GetLength();
  238. sBuf.MoveTo ( *pString );
  239. SetMiniDescription ( pNode, pString, iLen );
  240. }
  241. MiniTaskInfo_t * MakeSystemInfo ( const char * sDescription )
  242. {
  243. auto pInfo = new MiniTaskInfo_t;
  244. pInfo->m_sCommand = "SYSTEM";
  245. SetMiniDescription( pInfo, "SYSTEM %s", sDescription );
  246. return pInfo;
  247. }
  248. ScopedMiniInfo_t PublishSystemInfo ( const char * sDescription )
  249. {
  250. return ScopedMiniInfo_t ( MakeSystemInfo ( sDescription ) );
  251. }