Sfoglia il codice sorgente

Change rescheduling of flush rt/pq indexes

Approach when scheduler relies on the timestamp provided by flush itself
doesn't work if flushing is disabled by any reason (not an error, but
any action which disables saving, like sst replication). In this case
flush task falled into 'hot loop' eternally rescheduling itself into the
past. Let's explicitly reschedule to +10s from current timestamp instead

That fixes #983
klirichek 6 anni fa
parent
commit
d6c00a6f75
2 ha cambiato i file con 14 aggiunte e 2 eliminazioni
  1. 12 2
      src/taskflushmutable.cpp
  2. 2 0
      src/taskflushmutable.h

+ 12 - 2
src/taskflushmutable.cpp

@@ -106,15 +106,25 @@ static void ScheduleFlushTask ( void* pName, int64_t iNextTimestamp=-1 )
 				}
 
 				// check timeout, schedule or run immediately.
-				if (( pRT->GetLastFlushTimestamp () + g_iRtFlushPeriod - 1000 )<=sphMicroTimer ())
+				auto iLastTimestamp = pRT->GetLastFlushTimestamp ();
+				auto iPlannedTimestamp = iLastTimestamp+g_iRtFlushPeriod;
+				if (( iPlannedTimestamp-1000 )<=sphMicroTimer ())
+				{
 					pRT->ForceRamFlush ( true );
+					// if flush not happened (by any reason) - stamp not updated
+					if ( iLastTimestamp==pRT->GetLastFlushTimestamp ()) {
+						sphInfo ("Scheduled flush of index %s didn't happened; fallback", sName.cstr());
+						iPlannedTimestamp = sphMicroTimer() + FALLBACK_FLUSH_PERIOD;
+					} else
+						iPlannedTimestamp = pRT->GetLastFlushTimestamp ()+g_iRtFlushPeriod;
+				}
 
 				// once more check for disabled - since ForceRamFlush may be long
 				if ( g_Flushable.IsDisabled ())
 					return;
 
 				// reschedule or post-schedule
-				ScheduleFlushTask ( sName.Leak(), pRT->GetLastFlushTimestamp () + g_iRtFlushPeriod );
+				ScheduleFlushTask ( sName.Leak(), iPlannedTimestamp );
 			},
 			[] ( void* pName ) // deleter
 			{

+ 2 - 0
src/taskflushmutable.h

@@ -17,6 +17,8 @@
 
 #include "sphinxstd.h"
 
+static const int64_t FALLBACK_FLUSH_PERIOD = 10*1000*1000; // reschedule in 10s in case planned flush failed
+
 /* this cb attached to local indexes hash table 'add-or-replace' function. It is called for all new arrived indexes,
  * and if it suitable for flushing (i.e. if it exists and is mutable), engages flushing task by timer for it.*/
 void HookSubscribeMutableFlush ( ISphRefcountedMT* pCounter, const CSphString& sName );