Browse Source

change workers_active/tasks value

'workers_active' is field in 'show status', the same 'Tasks' is value
in oneline 'status'.

It was overall counter of all created job descriptors, however that is
not so useful, as several jobs may be chained together (for example,
encrypted compressed connect via mysql has 3 descriptors - one for
'mysql', one for 'compressed', one for 'encrypted'. But that is 1 job,
not 3).

Now it summarises only connections and tasks. Tasks are global (like
periodic flush), and also rise on parallel execution (query parallelized
 into 4 threads will add 3 tasks to currently running connection).
alexey 3 years ago
parent
commit
ac9def5ead
5 changed files with 44 additions and 16 deletions
  1. 6 0
      src/client_task_info.h
  2. 2 2
      src/coroutine.cpp
  3. 2 2
      src/searchd.cpp
  4. 12 2
      src/task_info.cpp
  5. 22 10
      src/task_info.h

+ 6 - 0
src/client_task_info.h

@@ -175,6 +175,12 @@ namespace myinfo {
 		return Count ( ClientTaskInfo_t::m_eTask ) - session::GetVips();
 	}
 
+	// num of real tasks (that is mini-info + client-info)
+	inline int CountTasks()
+	{
+		return Count ( MiniTaskInfo_t::m_eTask ) + Count ( ClientTaskInfo_t::m_eTask );
+	}
+
 } // namespace myinfo
 
 

+ 2 - 2
src/coroutine.cpp

@@ -773,7 +773,7 @@ void ExecuteN ( int iConcurrency, Threads::Handler&& fnWorker )
 {
 	if ( iConcurrency==1 )
 	{
-		myinfo::OwnMini ( fnWorker ) ();
+		myinfo::OwnMiniNoCount ( fnWorker ) ();
 		return;
 	}
 
@@ -784,7 +784,7 @@ void ExecuteN ( int iConcurrency, Threads::Handler&& fnWorker )
 	auto dWaiter = DefferedRestarter ();
 	for ( int i = 1; i<iConcurrency; ++i )
 		Coro::Co ( Threads::WithCopiedCrashQuery ( fnWorker ), dWaiter );
-	myinfo::OwnMini ( fnWorker ) ();
+	myinfo::OwnMiniNoCount ( fnWorker ) ();
 	WaitForDeffered ( std::move ( dWaiter ));
 }
 

+ 2 - 2
src/searchd.cpp

@@ -8718,7 +8718,7 @@ void BuildStatus ( VectorLike & dStatus )
 
 	// status of thread pool
 	dStatus.MatchTupletf ( "workers_total", "%d", GlobalWorkPool ()->WorkingThreads () );
-	dStatus.MatchTupletf ( "workers_active", "%d", myinfo::CountAll () );
+	dStatus.MatchTupletf ( "workers_active", "%d", myinfo::CountTasks () );
 	dStatus.MatchTupletf ( "workers_clients", "%d", myinfo::CountClients () );
 	dStatus.MatchTupletf ( "workers_clients_vip", "%u", session::GetVips() );
 	dStatus.MatchTupletf ( "work_queue_length", "%d", GlobalWorkPool ()->Works () );
@@ -8825,7 +8825,7 @@ void BuildStatusOneline ( StringBuilder_c & sOut )
 {
 	auto iThreads = GlobalWorkPool ()->WorkingThreads ();
 	auto iQueue = GlobalWorkPool ()->Works ();
-	auto iTasks = myinfo::CountAll ();
+	auto iTasks = myinfo::CountTasks ();
 	auto & g_tStats = gStats ();
 	sOut.StartBlock ( " " );
 	sOut

+ 12 - 2
src/task_info.cpp

@@ -26,7 +26,7 @@ BYTE RegisterRenderer ( RenderFnPtr pFunc ) noexcept
 	return uRender;
 }
 
-void internal_myinfo::RefCountInc ( BYTE eType )
+void RefCount_t::Inc ( BYTE eType )
 {
 	if ( eType >= uFreeInfoSlot )
 		sphWarning ( "Wrong RefCountInc slot! type=%d, free slot = %d", eType, uFreeInfoSlot.load() );
@@ -36,7 +36,7 @@ void internal_myinfo::RefCountInc ( BYTE eType )
 		dCounters[eType].fetch_add ( 1, std::memory_order_relaxed );
 }
 
-void internal_myinfo::RefCountDec ( BYTE eType )
+void RefCount_t::Dec ( BYTE eType )
 {
 	if ( eType>=uFreeInfoSlot )
 		sphWarning ( "Wrong RefCountDec slot! type=%d, free slot = %d", eType, uFreeInfoSlot.load () );
@@ -152,6 +152,16 @@ Threads::Handler myinfo::OwnMini ( Threads::Handler fnHandler )
 	};
 }
 
+Threads::Handler myinfo::OwnMiniNoCount ( Threads::Handler fnHandler )
+{
+	auto pParent = myinfo::HazardTaskInfo();
+	return [pParent, fnHandler = std::move ( fnHandler )] {
+		Threads::MyThd().m_pTaskInfo.store ( pParent, std::memory_order_release );
+		ScopedMiniInfoNoCount_t _ ( new MiniTaskInfo_t );
+		fnHandler();
+	};
+}
+
 // generic is empty
 DEFINE_RENDER ( TaskInfo_t ) {};
 

+ 22 - 10
src/task_info.h

@@ -106,15 +106,22 @@ struct TaskInfo_t
 	BYTE m_eType;
 };
 
-namespace internal_myinfo {
-	void RefCountInc ( BYTE eType );
-	void RefCountDec ( BYTE eType );
-}
+struct RefCount_t {
+	static void Inc ( BYTE eType );
+	static void Dec ( BYTE eType );
+};
+
+struct NoRefCount_t
+{
+	static void Inc ( BYTE ) {}
+	static void Dec ( BYTE ) {}
+};
+
 // RAII task info
 // Store info to TLS root, stores previous root to the chain
 // On dtr restores stored chain as root and retire info (uses hazard pointers)
 // if explicit root provided - uses it instead of TLS root.
-template<typename TASKINFO>
+template<typename TASKINFO, typename REFCOUNT=RefCount_t>
 class ScopedInfo_T : public hazard::ScopedPtr_t<TASKINFO*>
 {
 	using BASE = hazard::ScopedPtr_t<TASKINFO*>;
@@ -124,20 +131,20 @@ public:
 		: BASE ( pInfo )
 	{
 		pInfo->m_pPrev = Threads::MyThd ().m_pTaskInfo.load ( std::memory_order_acquire );
-		internal_myinfo::RefCountInc ( TASKINFO::m_eTask );
+		REFCOUNT::Inc ( TASKINFO::m_eTask );
 		Threads::MyThd ().m_pTaskInfo.store ( pInfo, std::memory_order_release );
 	}
 
 	ScopedInfo_T ( ScopedInfo_T && rhs ) noexcept
 	{
 		BASE::Swap ( rhs );
-		internal_myinfo::RefCountInc ( TASKINFO::m_eTask );
+		REFCOUNT::Inc ( TASKINFO::m_eTask );
 	}
 
-	~ScopedInfo_T<TASKINFO> ()
+	~ScopedInfo_T ()
 	{
 		Threads::MyThd ().m_pTaskInfo.store ( ( BASE::operator TASKINFO * () )->m_pPrev, std::memory_order_release );
-		internal_myinfo::RefCountDec ( TASKINFO::m_eTask );
+		REFCOUNT::Dec ( TASKINFO::m_eTask );
 	}
 };
 
@@ -165,6 +172,7 @@ struct MiniTaskInfo_t : public TaskInfo_t
 };
 
 using ScopedMiniInfo_t = ScopedInfo_T<MiniTaskInfo_t>;
+using ScopedMiniInfoNoCount_t = ScopedInfo_T<MiniTaskInfo_t,NoRefCount_t>;
 
 // create and publish info about system task (what before did ThreadSystem_t)
 // command = 'SYSTEM', description = 'SYSTEM sDescription'
@@ -182,7 +190,7 @@ namespace myinfo {
 	// num of tasks with given type
 	int Count ( BYTE eType );
 
-	// num of all tasks
+	// sum of all counters. Note, that might be quite useless, as one task may be 'decorated' by another
 	int CountAll();
 
 	// first ptr to node with type eType
@@ -194,6 +202,10 @@ namespace myinfo {
 	// bind current taskinfo and add new scoped mini info for coro handler
 	Threads::Handler OwnMini ( Threads::Handler fnHandler );
 
+	// bind current taskinfo and add new scoped mini info for coro handler, without tracing N of mini
+	// (for example, if you call it as subroutine in the same context and don't want to count it as separate task)
+	Threads::Handler OwnMiniNoCount ( Threads::Handler fnHandler );
+
 	// first ptr to node of given type
 	template <typename TASKINFO>
 	TASKINFO* ref()