ソースを参照

thread time tracking

Alexey N. Vinogradov 5 年 前
コミット
f0e8f0e985
5 ファイル変更116 行追加72 行削除
  1. 2 0
      src/networking_daemon.cpp
  2. 73 66
      src/searchdtask.cpp
  3. 1 6
      src/searchdtask.h
  4. 18 0
      src/threadutils.cpp
  5. 22 0
      src/threadutils.h

+ 2 - 0
src/networking_daemon.cpp

@@ -197,6 +197,7 @@ class CSphNetLoop::Impl_c
 
 		m_tPrf.StartPoll ();
 		// need positive timeout for communicate threads back and shutdown
+		Threads::IdleTimer_t _;
 		m_pPoll->Wait ( iWaitMs );
 		m_tPrf.EndTask ();
 	}
@@ -456,6 +457,7 @@ int SockWrapper_c::Impl_c::SockPollClassic ( int64_t tmTimeUntilUs, bool bWrite
 	if ( tmMicroLeft<0 )
 		tmMicroLeft = 0;
 
+	Threads::IdleTimer_t _;
 	int iRes = sphPoll ( m_iSock, tmMicroLeft, bWrite );
 	sphLogDebugv ( "sphPoll for alone returned %d in " INT64_FMT " Us", iRes, tmMicroLeft-tmTimeUntilUs+sphMicroTimer() );
 	return iRes;

+ 73 - 66
src/searchdtask.cpp

@@ -238,6 +238,7 @@ public:
 
 	void Action ()
 	{
+		Threads::JobTimer_t dTrack;
 		DebugT ( "Task_t %s Action (%p)", GetName (), m_pPayload );
 		++Prop ().m_iCurrentRunners;
 		auto itmStart = sphMicroTimer ();
@@ -280,15 +281,10 @@ ThreadRole TaskThread;
 class LazyJobs_c;
 struct TaskWorker_t: public ListNode_t
 {
-	int64_t m_iMyStartTimestamp = 0;	// time where I've started
-	int64_t m_iLastJobStartTime = -1;    // time where I've done something useful
-	int64_t m_iLastJobDoneTime = -1;	// time where I've done something useful
-	int64_t m_iTotalWorkedTime = -1;	// total time I've worked on useful tasks
-	int64_t m_iTotalJobsDone = 0;		// total jobs I've completed
+	Threads::LowThreadDesc_t& m_tDesc = Threads::MyThd (); // not owned, not need to me, need for outside iterations.
 	int64_t m_iTotalTicked = 0;			// total num of times I've waked up
 	int m_iMyTaskFlavour = 0;			// round-robin of tasks' flavours
 	int m_iMyThreadID = 0;				// my id in workers queue
-	int m_iMyOSThreadID = 0;            // my id in OS
 
 	// dig all the queues and extract the task we can work with
 	Task_t* ExtractJobToWork () REQUIRES ( MtJobThread )
@@ -334,21 +330,16 @@ struct TaskWorker_t: public ListNode_t
 	void PerformTask ( Task_t* pTask ) REQUIRES (MtJobThread)
 	{
 		assert ( pTask );
-		m_iLastJobDoneTime = -1;
-		m_iLastJobStartTime = sphMicroTimer ();
 		// we've extracted the task and going to work on it.
 		pTask->Action ();
-		m_iLastJobDoneTime = sphMicroTimer ();
-		++m_iTotalJobsDone;
-		m_iTotalWorkedTime += m_iLastJobDoneTime - m_iLastJobStartTime;
 		DebugL (M, "%d Done %d jobs, spend " INT64_FMT "us",
-				   m_iMyThreadID, ( int ) m_iTotalJobsDone, m_iTotalWorkedTime );
+				   m_iMyThreadID, ( int ) m_tDesc.m_iTotalJobsDone, m_tDesc.m_tmTotalWorkedTimeUS; );
 	}
 
 	// returns timeout after which we'll regarded idle too much
 	int TimeToDeadMS ( int64_t iIdlePeriod )
 	{
-		auto iLastJob = (m_iLastJobDoneTime>0) ? m_iLastJobDoneTime : sphMicroTimer();
+		auto iLastJob = (m_tDesc.m_tmLastJobDoneTimeUS>0) ? m_tDesc.m_tmLastJobDoneTimeUS : sphMicroTimer();
 		auto mSecs = (( iLastJob + iIdlePeriod ) - sphMicroTimer ()) / 1000;
 		return Max ( mSecs, 0 );
 	}
@@ -366,24 +357,27 @@ struct ScheduledJob_t: public EnqueuedTimeout_t
 	}
 };
 
+void IterateLazyThreads ( Threads::ThreadFN & fnHandler );
+
 LazyJobs_c& LazyTasker ();
 class LazyJobs_c: ISphNoncopyable
 {
 	// stuff to transfer (enqueue) tasks
 	VectorTask_c  m_dInternalTasks; // internal queue where we add our tasks without mutex
 	VectorTask_c* m_pEnqueuedTasks GUARDED_BY ( m_dActiveLock ) = nullptr; // ext. mt queue where we add tasks
-	CSphMutex m_dActiveLock;
+	RwLock_t m_dActiveLock;
 	TimeoutQueue_c m_dTimeouts;
 	int64_t m_iNextTimeoutUS = 0;
 	OneshotEvent_c m_tSignal;
 	TaskID m_iScheduler = -1;
+	Threads::LowThreadDesc_t* m_pSchedulerThread = nullptr;
 
 	// thread pool
 	List_t m_dWorkers GUARDED_BY ( m_dWorkersLock );
 	int m_iNextThreadId = 0;
-	CSphMutex m_dWorkersLock;
-	CSphAtomic m_iIdleWorkers;
-	int64_t m_iIdlePeriod = IDLE_TIME_TO_FINISH; // workers will be finished after idle of this time, in uS
+	RwLock_t m_dWorkersLock;
+	std::atomic<long> m_iIdleWorkers;
+	int64_t m_tmIdlePeriodUS = IDLE_TIME_TO_FINISH; // workers will be finished after idle of this time, in uS
 	CSphAutoEvent m_tJobSignal;
 	volatile bool m_bShutdown = false;
 	int m_iMaxWorkers = 32;
@@ -395,7 +389,7 @@ private:
 		// atomically get current vec; put zero instead.
 		VectorTask_c* pReadyQueue = nullptr;
 		{
-			ScopedMutex_t tLock ( m_dActiveLock );
+			ScWL_t tLock ( m_dActiveLock );
 			pReadyQueue = m_pEnqueuedTasks;
 			m_pEnqueuedTasks = nullptr;
 		}
@@ -449,7 +443,7 @@ private:
 				break;
 	}
 
-	/// abandon and release all tiemouted events.
+	/// abandon and release all timeouted events.
 	/// \return next active timeout (in uS), or -1 for infinite.
 	bool HasTimeoutActions () REQUIRES ( TaskThread )
 	{
@@ -540,7 +534,7 @@ private:
 			return;
 
 		pTask->AddRef();
-		ScopedMutex_t tLock ( m_dActiveLock );
+		ScWL_t tLock ( m_dActiveLock );
 		if ( !m_pEnqueuedTasks )
 			m_pEnqueuedTasks = new VectorTask_c;
 		m_pEnqueuedTasks->Add ( pTask );
@@ -571,16 +565,11 @@ private:
 
 	void WorkerFunc () REQUIRES ( !TaskThread )
 	{
+		m_pSchedulerThread = &Threads::MyThd ();
 		ScopedRole_c thLazy ( TaskThread );
 		DebugT ( "LazyJobs_c::WorkerFunc started" );
 		EventLoop ();
-	}
-
-	static void SchedulerFunc ( void* pScheduledJob ) REQUIRES ( TaskThread )
-	{
-		auto& tThis = LazyTasker ();
-		DebugT ( "LazyJobs_c::SchedulerFunc" );
-		tThis.ProcessSchedulingEnqueue (( ScheduledJob_t* ) pScheduledJob );
+		m_pSchedulerThread = nullptr;
 	}
 
 private:
@@ -596,15 +585,15 @@ private:
 		}
 		++tWorker.m_iTotalTicked;
 
-		bool bSignaled = m_tJobSignal.WaitEvent ( tWorker.TimeToDeadMS ( m_iIdlePeriod ));
+		bool bSignaled = m_tJobSignal.WaitEvent ( tWorker.TimeToDeadMS ( m_tmIdlePeriodUS ));
 		if ( !bSignaled ) // idle timeout happened. Fixme! m.b. better way to determine idles need.
 		{
 			DebugM ( "%d finishes because idle period exceeded", tWorker.m_iMyThreadID );
 			return false;
 		}
 
-		--m_iIdleWorkers;
-		auto dFreeIdle = AtScopeExit ( [this] { ++m_iIdleWorkers; } );
+		m_iIdleWorkers.fetch_sub ( 1, std::memory_order_relaxed );
+		auto dFreeIdle = AtScopeExit ( [this] { m_iIdleWorkers.fetch_add ( 1, std::memory_order_relaxed ); } );
 
 		while (true)
 		{
@@ -617,45 +606,43 @@ private:
 	}
 
 	/// main event loop run in separate thread.
-	void JobLoop ( TaskWorker_t* pWorker ) REQUIRES ( MtJobThread )
+	void JobLoop () REQUIRES ( MtJobThread )
 	{
-		pWorker->m_iMyOSThreadID = GetOsThreadId ();
-		pWorker->m_iMyStartTimestamp = sphMicroTimer ();
+		auto pWorker = new TaskWorker_t;
+		CSphScopedPtr<TaskWorker_t> pThreadWorkerContext { pWorker };
+
+		pWorker->m_iMyThreadID = ++m_iNextThreadId;
+		Threads::MyThd ().m_sThreadName.SetSprintf( "TaskW_%d", pWorker->m_iMyThreadID );
+		Threads::SetSysThreadName();
+
+		{
+			ScWL_t _ { m_dWorkersLock };
+			m_dWorkers.Add ( pWorker );
+		}
+
 
 		DebugM ( "JobLoop started for %d", pWorker->m_iMyThreadID );
 		while ( true )
 			if ( !JobTick ( *pWorker ) )
 				break;
 
-		RemoveAndDeleteWorker ( pWorker );
+		m_iIdleWorkers.fetch_sub ( 1, std::memory_order_relaxed );
+		ScWL_t _ { m_dWorkersLock };
+		m_dWorkers.Remove ( pWorker );
 	}
 
 	// adds a worker, until limit exceeded.
 	void AddWorker ()
 	{
-		ScopedMutex_t tWorkersLock ( m_dWorkersLock );
-		if ( m_dWorkers.GetLength ()>=m_iMaxWorkers )
-			return;
-
-		CSphScopedPtr<TaskWorker_t> pThreadWorkerContext { new TaskWorker_t };
-		pThreadWorkerContext->m_iMyThreadID = ++m_iNextThreadId;
-		StringBuilder_c thName;
-		thName.Sprintf ("TaskW_%d", pThreadWorkerContext->m_iMyThreadID);
-		SphThread_t tThd;
-		if ( Threads::Create ( &tThd, [pArg=pThreadWorkerContext.Ptr()] { TheadPoolWorker(pArg); }, true, thName.cstr() ))
 		{
-			m_dWorkers.Add ( pThreadWorkerContext.LeakPtr ());
-			++m_iIdleWorkers;
+			ScRL_t _ ( m_dWorkersLock );
+			if ( m_dWorkers.GetLength ()>=m_iMaxWorkers )
+				return;
 		}
-	}
 
-	void RemoveAndDeleteWorker ( TaskWorker_t* pWorker )
-	{
-		assert ( pWorker );
-		CSphScopedPtr<TaskWorker_t> pThreadWorkerContext { pWorker };
-		--m_iIdleWorkers;
-		ScopedMutex_t tWorkersLock ( m_dWorkersLock );
-		m_dWorkers.Remove ( pWorker );
+		SphThread_t tThd;
+		if ( Threads::Create ( &tThd, TheadPoolWorker, true ))
+			m_iIdleWorkers.fetch_add ( 1, std::memory_order_relaxed );
 	}
 
 	void FinishAllWorkers () NO_THREAD_SAFETY_ANALYSIS
@@ -667,11 +654,11 @@ private:
 		}
 	}
 
-	static void TheadPoolWorker ( TaskWorker_t* pArg ) REQUIRES (!MtJobThread)
+	static void TheadPoolWorker () REQUIRES (!MtJobThread)
 	{
 		DebugM ( "LazyJobs_c::TheadPoolWorker started" );
 		ScopedRole_c thMtThread ( MtJobThread );
-		LazyTasker ().JobLoop (( TaskWorker_t* ) pArg );
+		LazyTasker ().JobLoop ( );
 	}
 
 	static void RemoveAllJobs ()
@@ -726,7 +713,7 @@ private:
 			DebugX ( "AddNewMTJob %s(%p) success (%d in queue)", pJob->GetName (), pJob, iQueueLen );
 		}
 
-		if ( !m_iIdleWorkers && dProp.m_iCurrentRunners<pJob->Descr ().m_iMaxRunners )
+		if ( !m_iIdleWorkers.load(std::memory_order_relaxed) && dProp.m_iCurrentRunners<pJob->Descr ().m_iMaxRunners )
 			AddWorker ();
 		KickJobPool ();
 	}
@@ -741,6 +728,8 @@ private:
 public:
 	LazyJobs_c ()
 	{
+		Threads::RegisterIterator ( IterateLazyThreads );
+
 		SphThread_t tThd;
 		Threads::Create ( &tThd, [this] { WorkerFunc(); }, true, "TaskSched" );
 		m_iScheduler = TaskManager::RegisterGlobal ( "Scheduler",
@@ -825,19 +814,20 @@ public:
 	CSphVector<TaskManager::ThreadInfo_t> GetThreadsInfo ()
 	{
 		CSphVector<TaskManager::ThreadInfo_t> dRes;
-		ScopedMutex_t tWorkersLock ( m_dWorkersLock );
-		for ( const ListNode_t* pIt = m_dWorkers.Begin (); pIt!=m_dWorkers.End (); pIt = pIt->m_pNext )
+		ScRL_t _ ( m_dWorkersLock );
+		for ( const auto& tNode : m_dWorkers )
 		{
-			auto* pThd = ( TaskWorker_t* ) pIt;
+			auto* pThd = (TaskWorker_t *) &tNode;
+			auto& dThdDesc = pThd->m_tDesc;
 			auto& dInfo = dRes.Add ();
-			dInfo.m_iMyStartTimestamp = pThd->m_iMyStartTimestamp;
-			dInfo.m_iLastJobStartTime = pThd->m_iLastJobStartTime;
-			dInfo.m_iLastJobDoneTime = pThd->m_iLastJobDoneTime;
-			dInfo.m_iTotalWorkedTime = pThd->m_iTotalWorkedTime;
-			dInfo.m_iTotalJobsDone = pThd->m_iTotalJobsDone;
+			dInfo.m_iMyStartTimestamp = dThdDesc.m_tmStart;
+			dInfo.m_iLastJobStartTime = dThdDesc.m_tmLastJobStartTimeUS;
+			dInfo.m_iLastJobDoneTime = dThdDesc.m_tmLastJobDoneTimeUS;
+			dInfo.m_iTotalWorkedTime = dThdDesc.m_tmTotalWorkedTimeUS;
+			dInfo.m_iTotalJobsDone = dThdDesc.m_iTotalJobsDone;
 			dInfo.m_iTotalTicked = pThd->m_iTotalTicked;
 			dInfo.m_iMyThreadID = pThd->m_iMyThreadID;
-			dInfo.m_iMyOSThreadID = pThd->m_iMyOSThreadID;
+			dInfo.m_iMyOSThreadID = dThdDesc.m_iThreadID;
 
 		}
 		return dRes;
@@ -855,6 +845,18 @@ public:
 			});
 		return dRes;
 	}
+
+	void IterateThreads ( Threads::ThreadFN& fnHandler )
+	{
+		{
+			ScRL_t _ ( m_dWorkersLock );
+			for ( const auto & tNode : m_dWorkers )
+				fnHandler ( &( (TaskWorker_t *) &tNode )->m_tDesc );
+		}
+
+		ScRL_t _ { m_dActiveLock };
+		fnHandler ( m_pSchedulerThread );
+	}
 };
 
 //! Get static (singletone) instance of lazy poller
@@ -865,6 +867,11 @@ LazyJobs_c& LazyTasker ()
 	return dEvents;
 }
 
+void IterateLazyThreads ( Threads::ThreadFN & fnHandler )
+{
+	LazyTasker ().IterateThreads ( fnHandler );
+}
+
 TaskID TaskManager::RegisterGlobal( CSphString sName, fnThread_t fnThread, fnThread_t fnFree, int iThreads, int iJobs )
 {
 	auto iTaskID = TaskID( g_iTasks++ );

+ 1 - 6
src/searchdtask.h

@@ -13,9 +13,7 @@
 /// @file searchdtask.h
 /// Task manager
 
-#ifndef MANTICORE_SEARCHDTASK_H
-#define MANTICORE_SEARCHDTASK_H
-
+#pragma once
 #include "sphinxutils.h"
 
 /// member type for priority queue used for timeout task managing
@@ -140,7 +138,4 @@ namespace TaskManager {
 	CSphVector<TaskInfo_t> GetTaskInfo ();
 	CSphVector<ThreadInfo_t> GetThreadsInfo ();
 	CSphVector<ScheduleInfo_t> GetSchedInfo ();
-
 }
-
-#endif //MANTICORE_SEARCHDTASK_H

+ 18 - 0
src/threadutils.cpp

@@ -460,6 +460,7 @@ public:
 		// Make the upcall if required.
 		if ( pOwner )
 		{
+			Threads::JobTimer_t dTrack;
 			dLocalHandler();
 
 			// barrier ensures that no operations till here would be reordered below.
@@ -1415,6 +1416,22 @@ void Threads::SetSysThreadName ()
 	RuntimeThreadContext ()->PropagateName ();
 }
 
+void Threads::JobStarted ()
+{
+	auto& tDesc = Threads::MyThd ();
+	tDesc.m_tmLastJobDoneTimeUS = -1;
+	tDesc.m_tmLastJobStartTimeUS = sphMicroTimer ();
+}
+
+void Threads::JobFinished ( bool bIsDone )
+{
+	auto & tDesc = Threads::MyThd ();
+	tDesc.m_tmLastJobDoneTimeUS = sphMicroTimer ();
+	if ( bIsDone )
+		++tDesc.m_iTotalJobsDone;
+	tDesc.m_tmTotalWorkedTimeUS += tDesc.m_tmLastJobDoneTimeUS-tDesc.m_tmLastJobStartTimeUS;
+}
+
 // Adds a function call (a new task for a wrapper) to a linked list
 // of thread contexts. They will be executed one by one right after
 // the main thread ends its execution. This is a way for a wrapper
@@ -1456,6 +1473,7 @@ void RuntimeThreadContext_t::Prepare ( const void * pStack )
 {
 	m_pMyThreadStack = pStack;
 	m_tDesc.m_iThreadID = GetOsThreadId ();
+	m_tDesc.m_tmStart = sphMicroTimer();
 	m_tDesc.m_pHazards.store ( nullptr, std::memory_order_release );
 	m_tDesc.m_tThread = Threads::Self ();
 

+ 22 - 0
src/threadutils.h

@@ -54,6 +54,11 @@ struct LowThreadDesc_t
 {
 	SphThread_t			m_tThread;		///< m.b. used to send signals to the thread
 	int					m_iThreadID;	///< OS thread id
+	int64_t				m_tmStart;		///< when did the current thread started? Initialized from thread itself
+	int64_t 			m_tmLastJobStartTimeUS = -1; ///< last time where I've started something useful
+	int64_t				m_tmLastJobDoneTimeUS = -1;	///< last time where I've done something useful
+	int64_t				m_tmTotalWorkedTimeUS = 0;	///< total time I've worked on useful tasks
+	int64_t				m_iTotalJobsDone = 0;		///< total jobs I've completed
 	CSphString			m_sThreadName;
 	std::atomic<void *> m_pHazards;		///< my hazard pointers
 };
@@ -75,6 +80,23 @@ void RegisterIterator ( ThreadIteratorFN fnIterator );
 
 int GetNumOfRunning ();
 
+// track the time
+void JobStarted();
+void JobFinished( bool bIsDone=true );
+
+// RAII time tracker
+struct JobTimer_t
+{
+	JobTimer_t () { JobStarted (); }
+	~JobTimer_t () { JobFinished (); }
+};
+
+struct IdleTimer_t
+{
+	IdleTimer_t () { JobFinished (false); }
+	~IdleTimer_t () { JobStarted (); }
+};
+
 struct ThdInfo_t
 {
 	Proto_e	   m_eProto		 = Proto_e::MYSQL41;