Browse Source

updated CBO; enabled multithreaded execution of queries with SI (with thread limit); fixed RT index thread distribution when pseudo_sharding is disabled

Ilya Kuznetsov 2 years ago
parent
commit
d0730272bb

+ 2 - 2
src/CMakeLists.txt

@@ -60,7 +60,7 @@ add_library ( lmanticore STATIC sphinx.cpp sphinxexcerpt.cpp sphinxquery.cpp sph
 		timeout_queue.cpp dynamic_idx.cpp columnarrt.cpp columnarmisc.cpp exprtraits.cpp columnarexpr.cpp
 		sphinx_alter.cpp columnarsort.cpp binlog.cpp chunksearchctx.cpp client_task_info.cpp
 		indexfiles.cpp indexfilebase.cpp attrindex_builder.cpp queryfilter.cpp aggregate.cpp secondarylib.cpp costestimate.cpp
-		docidlookup.cpp tracer.cpp attrindex_merge.cpp distinct.cpp hyperloglog.cpp detail/indexlink.cpp )
+		docidlookup.cpp tracer.cpp attrindex_merge.cpp distinct.cpp hyperloglog.cpp pseudosharding.cpp detail/indexlink.cpp )
 
 add_library ( lstem STATIC sphinxsoundex.cpp sphinxmetaphone.cpp sphinxstemen.cpp sphinxstemru.cpp sphinxstemru.inl
 		sphinxstemcz.cpp sphinxstemar.cpp )
@@ -150,7 +150,7 @@ set ( HEADERS sphinxexcerpt.h sphinxfilter.h sphinxint.h sphinxjsonquery.h sphin
 		indexsettings.h columnarlib.h fileio.h memio.h memio_impl.h queryprofile.h columnarfilter.h columnargrouper.h fileutils.h
 		libutils.h conversion.h columnarsort.h sortcomp.h binlog_defs.h binlog.h ${MANTICORE_BINARY_DIR}/config/config.h
 		chunksearchctx.h indexfilebase.h indexfiles.h attrindex_builder.h queryfilter.h aggregate.h secondarylib.h
-		costestimate.h docidlookup.h tracer.h attrindex_merge.h columnarmisc.h distinct.h hyperloglog.h detail/indexlink.h)
+		costestimate.h docidlookup.h tracer.h attrindex_merge.h columnarmisc.h distinct.h hyperloglog.h pseudosharding.h detail/indexlink.h )
 
 set ( SEARCHD_H searchdaemon.h searchdconfig.h searchdddl.h searchdexpr.h searchdha.h searchdreplication.h searchdsql.h
 		searchdtask.h client_task_info.h taskflushattrs.h taskflushbinlog.h taskflushmutable.h taskglobalidf.h

+ 0 - 1
src/columnarlib.cpp

@@ -15,7 +15,6 @@
 #include "fileutils.h"
 #include "schema/columninfo.h"
 #include "schema/schema.h"
-#include "std/cpuid.h"
 
 using CreateStorageReader_fn =	columnar::Columnar_i * (*) ( const std::string & sFilename, uint32_t uTotalDocs, std::string & sError );
 using CreateBuilder_fn =		columnar::Builder_i * (*) ( const columnar::Settings_t & tSettings, const common::Schema_t & tSchema, const std::string & sFile, std::string & sError );

+ 59 - 33
src/costestimate.cpp

@@ -16,8 +16,55 @@
 #include "columnarfilter.h"
 #include "secondarylib.h"
 #include <math.h>
+#include "std/sys.h"
 
 
+static float EstimateMTCost ( float fCost, int iThreads, float fKPerf, float fBPerf )
+{
+	if ( iThreads==1 )
+		return fCost;
+
+	int iMaxThreads = GetNumLogicalCPUs();
+	float fMaxPerfCoeff = fKPerf*iMaxThreads + fBPerf;
+	float fMinCost = fCost/fMaxPerfCoeff;
+
+	if ( iThreads==iMaxThreads )
+		return fMinCost;
+
+	const float fX1 = 1.0f;
+	float fX2 = iMaxThreads;
+	float fY1 = fCost;
+	float fY2 = fMinCost;
+
+	// cost = A/sqrt(num_threads) + B
+	float fA = ( fY2-fY1 ) / ( 1.0f/float(sqrt(fX2)) - 1.0f/float(sqrt(fX1)) );
+	float fB = fY1 - fA / float(sqrt(fX1));
+	float fX = iThreads;
+	float fY = fA/float(sqrt(fX)) + fB;
+
+	return fY;
+}
+
+
+float EstimateMTCost ( float fCost, int iThreads )
+{
+	const float fKPerf = 0.16f;
+	const float fBPerf = 1.38f;
+
+	return EstimateMTCost ( fCost, iThreads, fKPerf, fBPerf );
+}
+
+
+float EstimateMTCostSI ( float fCost, int iThreads )
+{
+	const float fKPerf = 0.045f;
+	const float fBPerf = 1.0f;
+
+	return EstimateMTCost ( fCost, iThreads, fKPerf, fBPerf );
+}
+
+/////////////////////////////////////////////////////////////////////
+
 class CostEstimate_c : public CostEstimate_i
 {
 	friend float CalcIntersectCost ( int64_t iDocs );
@@ -206,14 +253,12 @@ float CostEstimate_c::CalcFilterCost ( bool bFromIterator, float fDocsAfterIndex
 		}
 		else
 		{
-			int64_t iDocs = ApplyCutoff ( tSIInfo.m_iRsetEstimate );
-
 			if ( tFilter.m_eType==SPH_FILTER_STRING || tFilter.m_eType==SPH_FILTER_STRING_LIST )
 				fCost += Cost_Filter ( m_tCtx.m_iTotalDocs, fFilterComplexity );
 			else
 			{
-				// the idea is that block filter rejects most docs and 50% of the remaining docs are filtered out
-				fCost += Cost_Filter ( Min ( iDocs*2, m_tCtx.m_iTotalDocs ), fFilterComplexity );
+				int64_t iDocsToFilter = int64_t ( (float)ApplyCutoff ( tSIInfo.m_iRsetEstimate ) * m_tCtx.m_iTotalDocs / ( tSIInfo.m_iRsetEstimate + 1 ) );
+				fCost += Cost_Filter ( iDocsToFilter, fFilterComplexity );
 				fCost += Cost_BlockFilter ( m_tCtx.m_iTotalDocs, fFilterComplexity );
 			}
 		}
@@ -239,16 +284,16 @@ float CostEstimate_c::CalcAnalyzerCost() const
 		m_tCtx.m_pColumnar->GetAttrInfo ( tFilter.m_sAttrName.cstr(), tAttrInfo );
 
 		float fFilterComplexity = CalcGetFilterComplexity ( tSIInfo, tFilter );
+		int64_t iDocsBeforeFilter = tSIInfo.m_iPartialColumnarMinMax==-1 ? m_tCtx.m_iTotalDocs : std::min ( tSIInfo.m_iPartialColumnarMinMax, m_tCtx.m_iTotalDocs );
 
 		// filters that process but reject values are 2x faster
-		int64_t iDocsBeforeFilter = tSIInfo.m_iPartialColumnarMinMax==-1 ? m_tCtx.m_iTotalDocs : std::min ( tSIInfo.m_iPartialColumnarMinMax, m_tCtx.m_iTotalDocs );
 		float fAcceptCoeff = std::min ( float(tSIInfo.m_iRsetEstimate)/iDocsBeforeFilter, 1.0f ) / 2.0f + 0.5f;
 		float fTotalCoeff = fFilterComplexity*tAttrInfo.m_fComplexity*fAcceptCoeff;
 
-		int64_t iDocsAfterCutoff = ApplyCutoff ( m_tCtx.m_iTotalDocs );
+		int64_t iDocsToFilter = int64_t ( (float)ApplyCutoff ( tSIInfo.m_iRsetEstimate ) * m_tCtx.m_iTotalDocs / ( tSIInfo.m_iRsetEstimate + 1 ) );
 
 		if ( tSIInfo.m_iPartialColumnarMinMax==-1 ) // no minmax? scan whole index
-			fCost += Cost_ColumnarFilter ( iDocsAfterCutoff, fTotalCoeff );
+			fCost += Cost_ColumnarFilter ( iDocsToFilter, fTotalCoeff );
 		else
 		{
 			// minmax tree eval
@@ -256,7 +301,12 @@ float CostEstimate_c::CalcAnalyzerCost() const
 			int iMatchingNodes = ( tSIInfo.m_iRsetEstimate + MINMAX_NODE_SIZE - 1 ) / MINMAX_NODE_SIZE;
 			int iTreeLevels = sphLog2 ( m_tCtx.m_iTotalDocs );
 			fCost += Cost_Filter ( iMatchingNodes*iTreeLevels, fFilterComplexity );
-			fCost += Cost_ColumnarFilter ( std::min ( tSIInfo.m_iPartialColumnarMinMax, iDocsAfterCutoff ), fTotalCoeff );
+
+			const float MINMAX_RATIO = 0.9f;
+			if ( (float)tSIInfo.m_iPartialColumnarMinMax / m_tCtx.m_iTotalDocs >= MINMAX_RATIO )
+				fCost += Cost_ColumnarFilter ( iDocsToFilter, fTotalCoeff );
+			else
+				fCost += Cost_ColumnarFilter ( std::min ( iDocsBeforeFilter, iDocsToFilter ), fTotalCoeff );
 		}
 	}
 
@@ -289,31 +339,7 @@ float CostEstimate_c::CalcPushCost ( float fDocsAfterFilters ) const
 
 float CostEstimate_c::CalcMTCost ( float fCost ) const
 {
-	if ( m_tCtx.m_iThreads==1 )
-		return fCost;
-
-	int iMaxThreads = sphCpuThreadsCount();
-
-	const float fKPerf = 0.16f;
-	const float fBPerf = 1.38f;
-
-	float fMaxPerfCoeff = fKPerf*iMaxThreads + fBPerf;
-	float fMinCost = fCost/fMaxPerfCoeff;
-
-	if ( m_tCtx.m_iThreads==iMaxThreads )
-		return fMinCost;
-
-	const float fX1 = 1.0f;
-	float fX2 = iMaxThreads;
-	float fY1 = fCost;
-	float fY2 = fMinCost;
-
-	float fA = ( fY2-fY1 ) / ( 1.0f/float(sqrt(fX2)) - 1.0f/float(sqrt(fX1)) );
-	float fB = fY1 - fA / float(sqrt(fX1));
-	float fX = m_tCtx.m_iThreads;
-	float fY = fA/float(sqrt(fX)) + fB;
-
-	return fY;
+	return EstimateMTCost ( fCost, m_tCtx.m_iThreads );
 }
 
 

+ 3 - 0
src/costestimate.h

@@ -20,6 +20,9 @@ public:
 	virtual float	CalcQueryCost() = 0;
 };
 
+float EstimateMTCost ( float fCost, int iThreads );
+float EstimateMTCostSI ( float fCost, int iThreads );
+
 struct SecondaryIndexInfo_t
 {
 	CSphVector<SecondaryIndexType_e> m_dCapabilities;

+ 1 - 1
src/distinct.cpp

@@ -62,7 +62,7 @@ int UniqHLLTraits_c::Container_t::Estimate() const
 	case ContainterType_e::HLL_DENSE_NONPACKED:	return int( m_pHLLDenseNonPacked->Estimate() );
 	default:
 		assert ( 0 && "Unknown container type" );
-		break;
+		return 0;
 	}
 }
 

+ 1 - 1
src/gtests/gtests_globalstate.cpp

@@ -63,7 +63,7 @@ public:
 		Tracer::Init();
 		CreateSynonymsFile ();
 		CreateSynonymsFile ( g_sMagic );
-		auto iThreads = sphCpuThreadsCount();
+		auto iThreads = GetNumLogicalCPUs();
 		//		iThreads = 1; // uncomment if want to run all coro tests in single thread
 		SetMaxChildrenThreads ( iThreads );
 		WipeGlobalSchedulerOnShutdownAndFork();

+ 0 - 1
src/indexer.cpp

@@ -31,7 +31,6 @@
 #include <signal.h>
 
 #if _WIN32
-	#define snprintf	_snprintf
 	#define popen		_popen
 	#define RMODE "rb"
 

+ 64 - 0
src/pseudosharding.cpp

@@ -0,0 +1,64 @@
+//
+// Copyright (c) 2017-2023, Manticore Software LTD (https://manticoresearch.com)
+// Copyright (c) 2011-2016, Andrew Aksyonoff
+// Copyright (c) 2011-2016, Sphinx Technologies Inc
+// All rights reserved
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License. You should have
+// received a copy of the GPL license along with this program; if you
+// did not, you can find it at http://www.gnu.org/
+//
+
+#include "pseudosharding.h"
+#include <math.h>
+
+
+void DistributeThreadsOverIndexes ( IntVec_t & dThreads, const CSphVector<SplitData_t> & dSplitData, int iConcurrency )
+{
+	dThreads.Resize ( dSplitData.GetLength() );
+	dThreads.Fill(1);
+
+	int64_t iTotalMetric = 0;
+	for ( auto & i : dSplitData )
+		iTotalMetric += i.m_iMetric;
+
+	// ignore indexes with thread cap==1; they won't get more that 1 thread
+	int iThreadsUsed = dSplitData.count_of ( []( auto & i ){ return i.m_iThreadCap==1; } );
+
+	// split remaining threads between left indexes (and apply the thread cap)
+	int iThreadsLeft = iConcurrency-iThreadsUsed;
+	ARRAY_FOREACH ( i, dSplitData )
+	{
+		const SplitData_t & tSD = dSplitData[i];
+		int & iThreads = dThreads[i];
+
+		if ( tSD.m_iThreadCap==1 )
+			continue;
+
+		assert ( tSD.m_iMetric>=0 );
+		iThreads = Max ( (int)round ( float(tSD.m_iMetric) / iTotalMetric * iThreadsLeft ), 1 );
+		if ( tSD.m_iThreadCap > 1 )
+			iThreads = Min ( iThreads, tSD.m_iThreadCap );
+	}
+
+	int iCappedThreads = 0;
+	int iNonCappedThreads = 0;
+	ARRAY_FOREACH ( i, dSplitData )
+		if ( dSplitData[i].m_iThreadCap >= 1 )
+			iCappedThreads += dThreads[i];
+		else
+			iNonCappedThreads += dThreads[i];
+
+	iThreadsLeft = iConcurrency-iCappedThreads;
+	assert ( iThreadsLeft>=0 );
+	ARRAY_FOREACH ( i, dSplitData )
+		if ( dSplitData[i].m_iThreadCap < 1 )
+			dThreads[i] = Max ( (int)round ( iThreadsLeft * dThreads[i]/iNonCappedThreads ), 1 );
+}
+
+
+int CalcMaxThreadsPerIndex ( int iConcurrency, int iNumIndexes )
+{
+	return iNumIndexes<iConcurrency ? ( iConcurrency-iNumIndexes ) + 1 : 1;
+}

+ 24 - 0
src/pseudosharding.h

@@ -0,0 +1,24 @@
+//
+// Copyright (c) 2017-2023, Manticore Software LTD (https://manticoresearch.com)
+// Copyright (c) 2001-2016, Andrew Aksyonoff
+// Copyright (c) 2008-2016, Sphinx Technologies Inc
+// All rights reserved
+//
+// This program is free software; you can redistribute it and/or modify
+// it under the terms of the GNU General Public License. You should have
+// received a copy of the GPL license along with this program; if you
+// did not, you can find it at http://www.gnu.org/
+//
+
+#pragma once
+
+#include "sphinxstd.h"
+
+struct SplitData_t
+{
+	int64_t m_iMetric = 0;
+	int		m_iThreadCap = 0;
+};
+
+void	DistributeThreadsOverIndexes ( IntVec_t & dThreads, const CSphVector<SplitData_t> & dSplitData, int iConcurrency );
+int		CalcMaxThreadsPerIndex ( int iConcurrency, int iNumIndexes );

+ 27 - 44
src/searchd.cpp

@@ -52,6 +52,7 @@
 #include "tracer.h"
 #include "netfetch.h"
 #include "queryfilter.h"
+#include "pseudosharding.h"
 
 // services
 #include "taskping.h"
@@ -164,8 +165,6 @@ static auto&			g_iAutoOptimizeCutoffMultiplier = AutoOptimizeCutoffMultiplier();
 static constexpr bool	AUTOOPTIMIZE_NEEDS_VIP = false; // whether non-VIP can issue 'SET GLOBAL auto_optimize = X'
 static constexpr bool	THREAD_EX_NEEDS_VIP = false; // whether non-VIP can issue 'SET GLOBAL auto_optimize = X'
 
-static bool				g_bSplit = true;
-
 static CSphVector<Listener_t>	g_dListeners;
 
 static int				g_iQueryLogFile	= -1;
@@ -5334,7 +5333,7 @@ private:
 	bool							CreateValidSorters ( VecTraits_T<ISphMatchSorter *> & dSrt, SphQueueRes_t * pQueueRes, VecTraits_T<SearchFailuresLog_c> & dFlr, StrVec_t * pExtra, const CSphIndex* pIndex, const CSphString & sLocal, const char * szParent, ISphExprHook * pHook );
 
 	void							PopulateCountDistinct ( CSphVector<CSphVector<int64_t>> & dCountDistinct ) const;
-	void							CalcMaxThreadsPerIndex ( CSphVector<std::pair<int64_t,int>> & dSplitData, int iConcurrency ) const;
+	int								CalcMaxThreadsPerIndex ( int iConcurrency ) const;
 	void							CalcThreadsPerIndex ( int iConcurrency );
 };
 
@@ -5896,11 +5895,8 @@ void SearchHandler_c::PopulateCountDistinct ( CSphVector<CSphVector<int64_t>> &
 }
 
 
-void SearchHandler_c::CalcMaxThreadsPerIndex ( CSphVector<std::pair<int64_t,int>> & dSplitData, int iConcurrency ) const
+int SearchHandler_c::CalcMaxThreadsPerIndex ( int iConcurrency ) const
 {
-	dSplitData.Resize ( m_dLocal.GetLength() );
-	dSplitData.Fill ( { -1, 0 } );
-
 	int iNumValid = 0;
 	ARRAY_FOREACH ( i, m_dLocal )
 	{
@@ -5911,9 +5907,7 @@ void SearchHandler_c::CalcMaxThreadsPerIndex ( CSphVector<std::pair<int64_t,int>
 		iNumValid++;
 	}
 
-	int iMaxThreadsPerIndex = iNumValid<iConcurrency ? ( iConcurrency-iNumValid ) + 1 : 1;
-	for ( auto & i : dSplitData )
-		i.second = iMaxThreadsPerIndex;
+	return ::CalcMaxThreadsPerIndex ( iConcurrency, iNumValid );
 }
 
 
@@ -5925,14 +5919,12 @@ void SearchHandler_c::CalcThreadsPerIndex ( int iConcurrency )
 	CSphVector<CSphVector<int64_t>> dCountDistinct;
 	PopulateCountDistinct ( dCountDistinct );
 
-	CSphVector<std::pair<int64_t,int>> dSplitData;
-	CalcMaxThreadsPerIndex ( dSplitData, iConcurrency );
+	int iMaxThreadsPerIndex = CalcMaxThreadsPerIndex ( iConcurrency );
 
-	// FIXME! what about PQ?
-	int64_t iTotalMetric = 0;
-	int iSingleSplits = 0;
-	int iMTSplits = 0;
+	CSphVector<SplitData_t> dSplitData ( m_dLocal.GetLength() );
 
+	// FIXME! what about PQ?
+	int iEnabledIndexes = 0;
 	ARRAY_FOREACH ( iLocal, m_dLocal )
 	{
 		const LocalIndex_t & tLocal = m_dLocal[iLocal];
@@ -5940,43 +5932,34 @@ void SearchHandler_c::CalcThreadsPerIndex ( int iConcurrency )
 		if ( !pIndex )
 			continue;
 
+		iEnabledIndexes++;
 		auto & tPSInfo = m_dPSInfo[iLocal];
-
-		if ( g_bSplit || RIdx_c(pIndex)->IsRT() )
+		auto & tSplitData = dSplitData[iLocal];
+		if ( GetPseudoSharding() || RIdx_c(pIndex)->IsRT() )
 		{
 			// do metric calcs
-			auto & tSplitData = dSplitData[iLocal];
-			tPSInfo.m_iMaxThreads = tSplitData.second;
-			int64_t iMetric = RIdx_c ( pIndex )->GetPseudoShardingMetric ( m_dNQueries, dCountDistinct[iLocal], tPSInfo.m_iMaxThreads, tPSInfo.m_bForceSingleThread );
-			if ( iMetric==-1 )
-				iSingleSplits++;
-			else
-			{
-				tSplitData.first = iMetric;
-				iTotalMetric += iMetric;
-				iMTSplits++;
-			}
+			tPSInfo.m_iMaxThreads = iMaxThreadsPerIndex;
+			auto tMetric = RIdx_c ( pIndex )->GetPseudoShardingMetric ( m_dNQueries, dCountDistinct[iLocal], tPSInfo.m_iMaxThreads, tPSInfo.m_bForceSingleThread );
+			assert ( tMetric.first>=0 );
+
+			tSplitData.m_iMetric = tMetric.first;
+			tSplitData.m_iThreadCap = tMetric.second;
 		}
 		else
 		{
 			// don't do metric calcs; we are guaranteed to have one thread
 			// set the 'force single thread' flag to make sure max_matches won't be increased when it is not necessary
 			tPSInfo = { 1, 1, true };
-			iSingleSplits++;
+			tSplitData.m_iThreadCap = 1;
 		}
 	}
 
-	if ( iConcurrency>iSingleSplits+iMTSplits )
+	if ( iConcurrency>iEnabledIndexes )
 	{
-		int iLeft = iConcurrency-iSingleSplits;
-		ARRAY_FOREACH ( i, dSplitData )
-		{
-			const auto & tSplitData = dSplitData[i];
-			if ( tSplitData.first==-1 )
-				continue;
-
-			m_dPSInfo[i].m_iThreads = Max ( (int)round ( double(tSplitData.first) / iTotalMetric * iLeft ), 1 );
-		}
+		IntVec_t dThreads;
+		DistributeThreadsOverIndexes ( dThreads, dSplitData, iConcurrency );
+		ARRAY_FOREACH ( i, dThreads )
+			m_dPSInfo[i].m_iThreads = dThreads[i];
 	}
 }
 
@@ -14140,7 +14123,7 @@ static bool HandleSetGlobal ( CSphString& sError, const CSphString& sName, int64
 
 	if ( sName == "pseudo_sharding" )
 	{
-		g_bSplit = !!iSetValue;
+		SetPseudoSharding ( !!iSetValue );
 		return true;
 	}
 
@@ -15291,7 +15274,7 @@ void HandleMysqlShowVariables ( RowBuffer_i & dRows, const SqlStmt_t & tStmt )
 			return tBuf;
 		});
 	}
-	dTable.MatchTuplet ( "pseudo_sharding", g_bSplit ? "1" : "0" );
+	dTable.MatchTuplet ( "pseudo_sharding", GetPseudoSharding() ? "1" : "0" );
 
 	switch ( GetSecondaryIndexDefault() )
 	{
@@ -19488,7 +19471,7 @@ void ConfigureSearchd ( const CSphConfig & hConf, bool bOptPIDFile, bool bTestMo
 	g_iClientTimeoutS = hSearchd.GetSTimeS ( "client_timeout", 300 );
 
 	g_iMaxConnection = hSearchd.GetInt ( "max_connections", g_iMaxConnection );
-	g_iThreads = hSearchd.GetInt ( "threads", sphCpuThreadsCount() );
+	g_iThreads = hSearchd.GetInt ( "threads", GetNumLogicalCPUs() );
 	SetMaxChildrenThreads ( g_iThreads );
 	g_iThdQueueMax = hSearchd.GetInt ( "jobs_queue_size", g_iThdQueueMax );
 
@@ -19682,7 +19665,7 @@ void ConfigureSearchd ( const CSphConfig & hConf, bool bOptPIDFile, bool bTestMo
 	g_iAutoOptimizeCutoffMultiplier = hSearchd.GetInt ( "auto_optimize", 1 );
 	MutableIndexSettings_c::GetDefaults().m_iOptimizeCutoff = hSearchd.GetInt ( "optimize_cutoff", AutoOptimizeCutoff() );
 
-	g_bSplit = hSearchd.GetInt ( "pseudo_sharding", 1 )!=0;
+	SetPseudoSharding ( hSearchd.GetInt ( "pseudo_sharding", 1 )!=0 );
 	SetOptionSI ( hSearchd, bTestMode );
 
 	CSphString sWarning;

+ 11 - 19
src/secondaryindex.cpp

@@ -516,7 +516,7 @@ public:
 	bool	HintRowID ( RowID_t tRowID ) override { return m_pIterator->HintRowID(tRowID); }
 	bool	GetNextRowIdBlock ( RowIdBlock_t & dRowIdBlock ) override;
 	int64_t	GetNumProcessed() const override { return m_pIterator->GetNumProcessed(); }
-	void	SetCutoff ( int iCutoff ) override {}
+	void	SetCutoff ( int iCutoff ) override { m_pIterator->SetCutoff(iCutoff); }
 	bool	WasCutoffHit() const override { return false; }
 	void	AddDesc ( CSphVector<IteratorDesc_t> & dDesc ) const override;
 
@@ -703,14 +703,14 @@ static void MarkAvailableLookup ( CSphVector<SecondaryIndexInfo_t> & dSIInfo, co
 }
 
 
-static void MarkAvailableSI ( CSphVector<SecondaryIndexInfo_t> & dSIInfo, const SelectIteratorCtx_t & tCtx )
+static void MarkAvailableSI ( CSphVector<SecondaryIndexInfo_t> & dSIInfo, const SelectIteratorCtx_t & tCtx, bool bCheckUsable = true )
 {
 	if ( !tCtx.m_pHistograms )
 		return;
 
 	ARRAY_FOREACH ( i, tCtx.m_tQuery.m_dFilters )
 	{
-		if ( !dSIInfo[i].m_bUsable )
+		if ( bCheckUsable && !dSIInfo[i].m_bUsable )
 			continue;
 
 		bool bForce = false;
@@ -806,22 +806,6 @@ static void FetchPartialColumnarMinMax ( CSphVector<SecondaryIndexInfo_t> & dSII
 }
 
 
-static bool IsWideRange ( const CSphFilterSettings & tFilter )
-{
-	if ( tFilter.m_eType==SPH_FILTER_FLOATRANGE )
-		return true;
-
-	if ( tFilter.m_eType!=SPH_FILTER_RANGE )
-		return false;
-
-	if ( tFilter.m_bOpenLeft || tFilter.m_bOpenRight )
-		return true;
-
-	const int WIDE_RANGE_THRESH=10000;
-	return ( tFilter.m_iMaxValue-tFilter.m_iMinValue ) >= WIDE_RANGE_THRESH;
-}
-
-
 static uint32_t CalcNumSIIterators ( const CSphFilterSettings & tFilter, int64_t iDocs, const SelectIteratorCtx_t & tCtx )
 {
 	uint32_t uNumIterators = 1;
@@ -1049,6 +1033,14 @@ CSphVector<SecondaryIndexInfo_t> SelectIterators ( const SelectIteratorCtx_t & t
 }
 
 
+bool HaveAvailableSI ( const SelectIteratorCtx_t & tCtx )
+{
+	CSphVector<SecondaryIndexInfo_t> dSIInfo ( tCtx.m_tQuery.m_dFilters.GetLength() );
+	MarkAvailableSI ( dSIInfo, tCtx, false );
+	return dSIInfo.any_of ( []( auto & tInfo ){ return tInfo.m_dCapabilities.any_of ( []( auto eType ){ return eType==SecondaryIndexType_e::INDEX; } ); } );
+}
+
+
 const CSphFilterSettings * GetRowIdFilter ( const CSphVector<CSphFilterSettings> & dFilters, RowID_t uTotalDocs, RowIdBoundaries_t & tRowidBounds )
 {
 	const CSphFilterSettings * pRowIdFilter = nullptr;

+ 5 - 5
src/secondaryindex.h

@@ -38,15 +38,15 @@ public:
 
 struct RowIdBoundaries_t;
 
-RowidIterator_i * CreateIteratorIntersect ( CSphVector<RowidIterator_i*> & dIterators, const RowIdBoundaries_t * pBoundaries );
-RowidIterator_i * CreateIteratorWrapper ( common::BlockIterator_i * pIterator, const RowIdBoundaries_t * pBoundaries );
-RowidIterator_i * CreateIteratorIntersect ( std::vector<common::BlockIterator_i *> & dIterators, const RowIdBoundaries_t * pBoundaries );
+RowidIterator_i *	CreateIteratorIntersect ( CSphVector<RowidIterator_i*> & dIterators, const RowIdBoundaries_t * pBoundaries );
+RowidIterator_i *	CreateIteratorWrapper ( common::BlockIterator_i * pIterator, const RowIdBoundaries_t * pBoundaries );
+RowidIterator_i *	CreateIteratorIntersect ( std::vector<common::BlockIterator_i *> & dIterators, const RowIdBoundaries_t * pBoundaries );
 
 const CSphFilterSettings * GetRowIdFilter ( const CSphVector<CSphFilterSettings> & dFilters, RowID_t uTotalDocs, RowIdBoundaries_t & tRowidBounds );
-
-bool	ReturnIteratorResult ( RowID_t * pRowID, RowID_t * pRowIdStart, RowIdBlock_t & dRowIdBlock );
+bool				ReturnIteratorResult ( RowID_t * pRowID, RowID_t * pRowIdStart, RowIdBlock_t & dRowIdBlock );
 
 CSphVector<SecondaryIndexInfo_t> SelectIterators ( const SelectIteratorCtx_t & tCtx, float & fBestCost, StrVec_t & dWarnings );
+bool				HaveAvailableSI ( const SelectIteratorCtx_t & tCtx );
 
 namespace SI
 {

+ 0 - 1
src/secondarylib.cpp

@@ -16,7 +16,6 @@
 #include "schema/schema.h"
 #include "columnarmisc.h"
 #include "secondarylib.h"
-#include "std/cpuid.h"
 
 using CheckStorage_fn =			void (*) ( const std::string & sFilename, uint32_t uNumRows, std::function<void (const char*)> & fnError, std::function<void (const char*)> & fnProgress );
 using VersionStr_fn =			const char * (*)();

+ 64 - 54
src/sphinx.cpp

@@ -46,6 +46,7 @@
 #include "client_task_info.h"
 #include "chunksearchctx.h"
 #include "std/lrucache.h"
+#include "std/sys.h"
 #include "indexfiles.h"
 #include "task_dispatcher.h"
 #include "secondarylib.h"
@@ -67,9 +68,7 @@
 #include <re2/re2.h>
 #endif
 
-#if _WIN32
-	#define snprintf	_snprintf
-#else
+#if !_WIN32
 	#include <unistd.h>
 	#include <sys/time.h>
 #endif
@@ -126,7 +125,8 @@ static const int	MIN_READ_UNHINTED		= 1024;
 
 static int 			g_iReadUnhinted 		= DEFAULT_READ_UNHINTED;
 
-static int			g_iSplitThresh			= 8192;
+static bool			g_bPseudoSharding		= true;
+static int			g_iPseudoShardingThresh	= 8192;
 
 static bool LOG_LEVEL_SPLIT_QUERY = val_from_env ( "MANTICORE_LOG_SPLIT_QUERY", false ); // verbose logging split query events, ruled by this env variable
 #define LOG_COMPONENT_QUERYINFO __LINE__ << " "
@@ -1274,7 +1274,7 @@ public:
 	SI::Index_i *		Debug_GetSI() const override { return m_pSIdx.get(); }
 
 	bool				CheckEarlyReject ( const CSphVector<CSphFilterSettings> & dFilters, const ISphFilter * pFilter, ESphCollation eCollation, const ISphSchema & tSchema ) const;
-	int64_t				GetPseudoShardingMetric ( const VecTraits_T<const CSphQuery> & dQueries, const VecTraits_T<int64_t> & dMaxCountDistinct, int iThreads, bool & bForceSingleThread ) const override;
+	std::pair<int64_t,int> GetPseudoShardingMetric ( const VecTraits_T<const CSphQuery> & dQueries, const VecTraits_T<int64_t> & dMaxCountDistinct, int iThreads, bool & bForceSingleThread ) const override;
 	int64_t				GetCountDistinct ( const CSphString & sAttr ) const override;
 	int64_t				GetCountFilter ( const CSphFilterSettings & tFilter ) const override;
 	int64_t				GetCount() const override;
@@ -1390,8 +1390,8 @@ private:
 	RowidIterator_i *			SpawnIterators ( const CSphQuery & tQuery, CSphQueryContext & tCtx, CreateFilterContext_t & tFlx, const ISphSchema & tMaxSorterSchema, CSphQueryResultMeta & tMeta, int iCutoff, int iThreads, CSphVector<CSphFilterSettings> & dModifiedFilters, ISphRanker * pRanker ) const;
 	bool						SelectIteratorsFT ( const CSphQuery & tQuery, ISphRanker * pRanker, CSphVector<SecondaryIndexInfo_t> & dSIInfo, int iCutoff, int iThreads, StrVec_t & dWarnings ) const;
 
-	bool						IsQueryFast ( const CSphQuery & tQuery ) const;
-	bool						CheckEnabledIndexes ( const CSphQuery & tQuery, int iThreads, bool & bFastQuery ) const;
+	bool						IsQueryFast ( const CSphQuery & tQuery, const CSphVector<SecondaryIndexInfo_t> & dEnabledIndexes, float fCost ) const;
+	CSphVector<SecondaryIndexInfo_t> GetEnabledIndexes ( const CSphQuery & tQuery, float & fCost, int iThreads ) const;
 
 	Docstore_i *				GetDocstore() const override { return m_pDocstore.get(); }
 	columnar::Columnar_i *		GetColumnar() const override { return m_pColumnar.get(); }
@@ -2145,17 +2145,7 @@ bool CSphIndex::MustRunInSingleThread ( const VecTraits_T<const CSphQuery> & dQu
 		}
 	}
 
-	return GetStats().m_iTotalDocuments<=g_iSplitThresh;
-}
-
-
-int64_t CSphIndex::GetPseudoShardingMetric ( const VecTraits_T<const CSphQuery> & dQueries, const VecTraits_T<int64_t> & dMaxCountDistinct, int iThreads, bool & bForceSingleThread ) const
-{
-	if ( MustRunInSingleThread ( dQueries, false, dMaxCountDistinct, bForceSingleThread ) )
-		return -1;
-
-	int64_t iTotalDocs = GetStats().m_iTotalDocuments;
-	return iTotalDocs > g_iSplitThresh ? iTotalDocs : -1;
+	return GetStats().m_iTotalDocuments<=g_iPseudoShardingThresh;
 }
 
 //////////////////////////////////////////////////////////////////////////
@@ -3007,8 +2997,12 @@ int CSphIndex_VLN::CheckThenKillMulti ( const VecTraits_T<DocID_t>& dKlist, Bloc
 };
 
 
-bool CSphIndex_VLN::IsQueryFast ( const CSphQuery & tQuery ) const
+bool CSphIndex_VLN::IsQueryFast ( const CSphQuery & tQuery, const CSphVector<SecondaryIndexInfo_t> & dEnabledIndexes, float fCost ) const
 {
+	const float COST_THRESH = 0.5f;
+	if ( tQuery.m_sQuery.IsEmpty() )
+		return dEnabledIndexes.GetLength() && fCost<=COST_THRESH;
+
 	if ( m_pFieldFilter )
 		return false;
 
@@ -3059,66 +3053,65 @@ static bool CheckQueryFilters ( const CSphQuery & tQuery, const CSphSchema & tSc
 }
 
 
-bool CSphIndex_VLN::CheckEnabledIndexes ( const CSphQuery & tQuery, int iThreads, bool & bFastQuery ) const
+CSphVector<SecondaryIndexInfo_t> CSphIndex_VLN::GetEnabledIndexes ( const CSphQuery & tQuery, float & fCost, int iThreads ) const
 {
 	// if there's a filter tree, we don't have any indexes and there's no point in wasting time to eval them
 	if ( tQuery.m_dFilterTree.GetLength() )
-		return true;
-
-	const float COST_THRESH = 0.5f;
+		return {};
 
-	float fCost = FLT_MAX;
 	int iCutoff = ApplyImplicitCutoff ( tQuery, {} );
 
 	StrVec_t dWarnings;
 	SelectIteratorCtx_t tCtx ( tQuery, m_tSchema, m_pHistograms, m_pColumnar.get(), m_pSIdx.get(), iCutoff, m_iDocinfo, iThreads );
-	CSphVector<SecondaryIndexInfo_t> dEnabledIndexes = SelectIterators ( tCtx, fCost, dWarnings );
-
-	// disable pseudo sharding if any of the queries use secondary indexes/docid lookups
-	if ( dEnabledIndexes.any_of ( []( const SecondaryIndexInfo_t & tSI ){ return tSI.m_eType==SecondaryIndexType_e::INDEX || tSI.m_eType==SecondaryIndexType_e::LOOKUP; } ) )
-		return false;
-
-	if ( tQuery.m_sQuery.IsEmpty() )
-		bFastQuery = dEnabledIndexes.GetLength() && fCost<=COST_THRESH;
-	else
-		bFastQuery = IsQueryFast(tQuery);
-
-	return true;
+	return SelectIterators ( tCtx, fCost, dWarnings );
 }
 
 
-int64_t CSphIndex_VLN::GetPseudoShardingMetric ( const VecTraits_T<const CSphQuery> & dQueries, const VecTraits_T<int64_t> & dMaxCountDistinct, int iThreads, bool & bForceSingleThread ) const
+std::pair<int64_t,int> CSphIndex_VLN::GetPseudoShardingMetric ( const VecTraits_T<const CSphQuery> & dQueries, const VecTraits_T<int64_t> & dMaxCountDistinct, int iThreads, bool & bForceSingleThread ) const
 {
 	if ( MustRunInSingleThread ( dQueries, !!m_pSIdx, dMaxCountDistinct, bForceSingleThread ) )
-		return -1;
+		return { 0, 1 };
 
 	bool bAllFast = true;
+	int iThreadCap = 0;
+	int iNumProc = GetNumPhysicalCPUs();
+	if ( iNumProc==-1 )
+		iNumProc = GetNumLogicalCPUs()/2;
 
 	ARRAY_FOREACH ( i, dQueries )
 	{
 		auto & tQuery = dQueries[i];
-
-		// only process fullscan queries
 		if ( !tQuery.m_sQuery.IsEmpty() )
 		{
+			// check for fulltext+SI case; limit the number of threads
+			SelectIteratorCtx_t tCtx ( tQuery, m_tSchema, m_pHistograms, m_pColumnar.get(), m_pSIdx.get(), -1, m_iDocinfo, iThreads );
+			if ( !iThreadCap && tQuery.m_dFilters.GetLength() && HaveAvailableSI(tCtx) )
+				iThreadCap = iThreadCap ? Min ( iThreadCap, iNumProc ) : iNumProc;
+
 			bAllFast = false;
 			continue;
 		}
 
 		if ( !CheckQueryFilters ( tQuery, m_tSchema ) )
 			continue;
-		
-		bool bFastQuery = false;
-		if ( !CheckEnabledIndexes ( tQuery, iThreads, bFastQuery ) )
-			return -1; 
 
-		bAllFast &= bFastQuery;
+		float fCost = FLT_MAX;
+		CSphVector<SecondaryIndexInfo_t> dEnabledIndexes = GetEnabledIndexes ( tQuery, fCost, iThreads );
+		bAllFast &= IsQueryFast ( tQuery, dEnabledIndexes, fCost );
+
+		// disable pseudo sharding if any of the queries use docid lookups
+		if ( dEnabledIndexes.any_of ( []( const SecondaryIndexInfo_t & tSI ){ return tSI.m_eType==SecondaryIndexType_e::LOOKUP; } ) )
+			return { 0, 1 };
+
+		// enable pseudo sharding but limit number of threads when we use SI in fullscan
+		if ( dEnabledIndexes.any_of ( []( const SecondaryIndexInfo_t & tSI ){ return tSI.m_eType==SecondaryIndexType_e::INDEX; } ) )
+			iThreadCap = iThreadCap ? Min ( iThreadCap, iNumProc ) : iNumProc;
 	}
 
 	if ( bAllFast )
-		return -1;
+		return { 0, 1 };
 
-	return CSphIndex::GetPseudoShardingMetric ( dQueries, dMaxCountDistinct, iThreads, bForceSingleThread );
+	return  { GetStats().m_iTotalDocuments, iThreadCap };
 }
 
 
@@ -8016,7 +8009,8 @@ bool CSphIndex_VLN::SelectIteratorsFT ( const CSphQuery & tQuery, ISphRanker * p
 	// 4. estimate the cost of intersecting FT and iterator results
 	NodeEstimate_t tEstimate = pRanker->Estimate ( m_iDocinfo );
 
-	SelectIteratorCtx_t tSelectIteratorCtx ( tQuery, m_tSchema, m_pHistograms, m_pColumnar.get(), m_pSIdx.get(), iCutoff, m_iDocinfo, iThreads );
+	// always do single-thread estimates here
+	SelectIteratorCtx_t tSelectIteratorCtx ( tQuery, m_tSchema, m_pHistograms, m_pColumnar.get(), m_pSIdx.get(), iCutoff, m_iDocinfo, 1 );
 	tSelectIteratorCtx.IgnorePushCost();
 	float fBestCost = FLT_MAX;
 	dSIInfo = SelectIterators ( tSelectIteratorCtx, fBestCost, dWarnings );
@@ -8033,11 +8027,12 @@ bool CSphIndex_VLN::SelectIteratorsFT ( const CSphQuery & tQuery, ISphRanker * p
 	CSphVector<SecondaryIndexInfo_t> dSIInfoFilters { dSIInfo.GetLength() };
 	float fValuesAfterFilters = 1.0f;
 	ARRAY_FOREACH ( i, dSIInfo )
-	{
-		dSIInfoFilters[i] = dSIInfo[i];
-		fValuesAfterFilters *= float(dSIInfo[i].m_iRsetEstimate) / m_iDocinfo;
-		dSIInfoFilters[i].m_eType = SecondaryIndexType_e::FILTER;
-	}
+		if ( tQuery.m_dFilters[i].m_sAttrName != "@rowid" )
+		{
+			dSIInfoFilters[i] = dSIInfo[i];
+			fValuesAfterFilters *= float(dSIInfo[i].m_iRsetEstimate) / m_iDocinfo;
+			dSIInfoFilters[i].m_eType = SecondaryIndexType_e::FILTER;
+		}
 
 	// correct rset estimates (we are estimating filters after FT)
 	float fCostOfFilters = 0.0f;
@@ -8060,6 +8055,9 @@ bool CSphIndex_VLN::SelectIteratorsFT ( const CSphQuery & tQuery, ISphRanker * p
 	float fIteratorWithFT = CalcFTIntersectCost ( tIteratorEst, tEstimate, m_iDocinfo, ITERATOR_BLOCK_SIZE, MAX_BLOCK_DOCS );
 	float fFTWithFilters = tEstimate.m_fCost + fCostOfFilters;
 
+	fIteratorWithFT = EstimateMTCostSI ( fIteratorWithFT, iThreads );
+	fFTWithFilters = EstimateMTCost ( fFTWithFilters, iThreads );
+
 	return fIteratorWithFT<fFTWithFilters;
 }
 
@@ -12938,9 +12936,21 @@ void sphSetJsonOptions ( bool bStrict, bool bAutoconvNumbers, bool bKeynamesToLo
 }
 
 
+void SetPseudoSharding ( bool bSet )
+{
+	g_bPseudoSharding = bSet;
+}
+
+
+bool GetPseudoSharding()
+{
+	return g_bPseudoSharding;
+}
+
+
 void SetPseudoShardingThresh ( int iThresh )
 {
-	g_iSplitThresh = iThresh;
+	g_iPseudoShardingThresh = iThresh;
 }
 
 //////////////////////////////////////////////////////////////////////////

+ 3 - 1
src/sphinx.h

@@ -1119,7 +1119,7 @@ public:
 	void						SetMutableSettings ( const MutableIndexSettings_c & tSettings );
 	const MutableIndexSettings_c & GetMutableSettings () const { return m_tMutableSettings; }
 
-	virtual int64_t				GetPseudoShardingMetric ( const VecTraits_T<const CSphQuery> & dQueries, const VecTraits_T<int64_t> & dMaxCountDistinct, int iThreads, bool & bForceSingleThread ) const;
+	virtual std::pair<int64_t,int> GetPseudoShardingMetric ( const VecTraits_T<const CSphQuery> & dQueries, const VecTraits_T<int64_t> & dMaxCountDistinct, int iThreads, bool & bForceSingleThread ) const { return { 0, 0 }; }
 	virtual bool				MustRunInSingleThread ( const VecTraits_T<const CSphQuery> & dQueries, bool bHasSI, const VecTraits_T<int64_t> & dMaxCountDistinct, bool & bForceSingleThread ) const;
 	virtual int64_t				GetCountDistinct ( const CSphString & sAttr ) const { return -1; }	// returns values if index has some meta on its attributes
 	virtual int64_t				GetCountFilter ( const CSphFilterSettings & tFilter ) const { return -1; }	// returns values if index has some meta on its attributes
@@ -1417,6 +1417,8 @@ int					GetUnhintedBuffer();
 /// check query for expressions
 bool				sphHasExpressions ( const CSphQuery & tQuery, const CSphSchema & tSchema );
 
+void				SetPseudoSharding ( bool bSet );
+bool				GetPseudoSharding();
 void				SetPseudoShardingThresh ( int iThresh );
 
 void				InitSkipCache ( int64_t iCacheSize );

+ 25 - 20
src/sphinxrt.cpp

@@ -39,6 +39,8 @@
 #include "indexfiles.h"
 #include "task_dispatcher.h"
 #include "tracer.h"
+#include "pseudosharding.h"
+#include "std/sys.h"
 
 #include <sys/stat.h>
 #include <fcntl.h>
@@ -134,7 +136,7 @@ volatile int &AutoOptimizeCutoffMultiplier() noexcept
 
 volatile int AutoOptimizeCutoff() noexcept
 {
-	static int iAutoOptimizeCutoff = sphCpuThreadsCount() * 2;
+	static int iAutoOptimizeCutoff = GetNumLogicalCPUs() * 2;
 	return iAutoOptimizeCutoff;
 }
 
@@ -1159,6 +1161,7 @@ public:
 	int64_t				GetCountDistinct ( const CSphString & sAttr ) const override;
 	int64_t				GetCountFilter ( const CSphFilterSettings & tFilter ) const override;
 	int64_t				GetCount() const override;
+	std::pair<int64_t,int> GetPseudoShardingMetric ( const VecTraits_T<const CSphQuery> & dQueries, const VecTraits_T<int64_t> & dMaxCountDistinct, int iThreads, bool & bForceSingleThread ) const override;
 
 	// helpers
 	ConstDiskChunkRefPtr_t	MergeDiskChunks (  const char* szParentAction, const ConstDiskChunkRefPtr_t& pChunkA, const ConstDiskChunkRefPtr_t& pChunkB, CSphIndexProgress& tProgress, VecTraits_T<CSphFilterSettings> dFilters );
@@ -6812,37 +6815,28 @@ static int64_t CalcMaxCountDistinct ( const CSphQuery & tQuery, const RtGuard_t
 }
 
 
-static bool CalcDiskChunkSplits ( IntVec_t & dSplits, int iJobs, const CSphQuery & tQuery, const CSphMultiQueryArgs & tArgs, const RtGuard_t & tGuard )
+static bool CalcDiskChunkSplits ( IntVec_t & dThreads, int iJobs, const CSphQuery & tQuery, const CSphMultiQueryArgs & tArgs, const RtGuard_t & tGuard )
 {
-	assert ( dSplits.GetLength()==iJobs );
+	CSphVector<SplitData_t> dSplitData ( iJobs );
 
+	int iMaxThreadsPerIndex = CalcMaxThreadsPerIndex ( tArgs.m_iThreads, iJobs );
 	int64_t iMaxCountDistinct = CalcMaxCountDistinct ( tQuery, tGuard );
-	int iNumSingleThreads = 0;
-	int64_t iTotalMetric = 0;
-	CSphVector<int64_t> dMetrics { iJobs };
-	ARRAY_FOREACH ( i, dMetrics )
+	ARRAY_FOREACH ( i, dSplitData )
 	{
 		bool bForceSingleThread = false;
-		dMetrics[i] = tGuard.m_dDiskChunks[i]->Cidx().GetPseudoShardingMetric ( { &tQuery, 1 }, { &iMaxCountDistinct, 1 }, tArgs.m_iThreads, bForceSingleThread );
+		auto tMetric = tGuard.m_dDiskChunks[i]->Cidx().GetPseudoShardingMetric ( { &tQuery, 1 }, { &iMaxCountDistinct, 1 }, iMaxThreadsPerIndex, bForceSingleThread );
 		if ( bForceSingleThread )
 			return false;
 
-		if ( dMetrics[i]>0 )
-			iTotalMetric += dMetrics[i];
-		else
-			iNumSingleThreads++;
+		assert ( tMetric.first>=0 );
+		auto & tSplitData = dSplitData[i];
+		tSplitData.m_iMetric = tMetric.first;
+		tSplitData.m_iThreadCap = tMetric.second;
 	}
 
 	// we have more free threads than disk chunks; makes sense to apply pseudo_sharding
 	if ( tArgs.m_iThreads>iJobs )
-	{
-		int iLeft = tArgs.m_iThreads - iNumSingleThreads;
-		assert(iLeft>=0);
-
-		ARRAY_FOREACH ( i, dSplits )
-			if ( dMetrics[i]>0 )
-				dSplits[i] = Max ( (int)round ( double ( dMetrics[i] ) / iTotalMetric * iLeft ), 1 );
-	}
+		DistributeThreadsOverIndexes ( dThreads, dSplitData, tArgs.m_iThreads );
 
 	return true;
 }
@@ -8688,6 +8682,17 @@ int64_t RtIndex_c::GetCount() const
 }
 
 
+ std::pair<int64_t,int> RtIndex_c::GetPseudoShardingMetric ( const VecTraits_T<const CSphQuery> & dQueries, const VecTraits_T<int64_t> & dMaxCountDistinct, int iThreads, bool & bForceSingleThread ) const
+{
+	 if ( MustRunInSingleThread ( dQueries, false, dMaxCountDistinct, bForceSingleThread ) )
+		 return { 0, 1 };
+
+	auto tGuard = RtGuard();
+	int iThreadCap = GetPseudoSharding() ? 0 : tGuard.m_dDiskChunks.GetLength();
+	return { GetStats().m_iTotalDocuments, iThreadCap };
+}
+
+
 void RtIndex_c::DropDiskChunk ( int iChunkID, int* pAffected )
 {
 	TRACE_SCHED ( "rt", "RtIndex_c::DropDiskChunk" );

+ 1 - 0
src/stackmock.h

@@ -14,6 +14,7 @@
 
 #include "client_task_info.h"
 #include "coroutine.h"
+#include "std/sys.h"
 
 using StackSizeTuplet_t = std::pair<int,int>; // create, eval
 

+ 0 - 26
src/std/cpuid.h

@@ -1,26 +0,0 @@
-//
-// Copyright (c) 2023, Manticore Software LTD (https://manticoresearch.com)
-// All rights reserved
-//
-// This program is free software; you can redistribute it and/or modify
-// it under the terms of the GNU General Public License. You should have
-// received a copy of the GPL license along with this program; if you
-// did not, you can find it at http://www.gnu.org
-//
-
-#pragma once
-
-#if defined(__x86_64__) || defined(__i386__)
-	#include <cpuid.h>
-#endif
-
-inline bool IsSSE42Supported()
-{
-#if defined(__x86_64__) || defined(__i386__)
- 	uint32_t dInfo[4];
-	__cpuid ( 1, dInfo[0], dInfo[1], dInfo[2], dInfo[3] );
-	return (dInfo[2] & (1 << 20)) != 0;
-#else
-	return true;	// assumes that it's ARM and simde is used
-#endif
-}

+ 6 - 0
src/std/generics.h

@@ -47,6 +47,12 @@ typename std::common_type<T, U>::type Max ( T a, U b )
 	using common_type = typename std::common_type<T, U>::type;
 	return static_cast<common_type>(a) < static_cast<common_type>(b) ? b : a;
 }
+
+inline int sphRoundUp ( int iValue, int iLimit )
+{
+	return ( iValue + iLimit - 1 ) & ~( iLimit - 1 );
+}
+
 #define SafeDelete( _x )		do { if ( _x ) { delete ( _x ); ( _x ) = nullptr; } } while ( 0 )
 #define SafeDeleteArray( _x )	do { if ( _x ) { delete[] ( _x ); ( _x ) = nullptr; } } while ( 0 )
 #define SafeReleaseAndZero( _x)	do { if ( _x ) { ( _x )->Release(); ( _x ) = nullptr; } } while ( 0 )

+ 3 - 1
src/std/ints.h

@@ -51,7 +51,9 @@
 
 #define strcasecmp strcmpi
 #define strncasecmp _strnicmp
-#define snprintf _snprintf
+#if _MSC_VER < 1900
+	#define snprintf _snprintf
+#endif
 #define strtoll _strtoi64
 #define strtoull _strtoui64
 

+ 89 - 14
src/std/sys.cpp

@@ -7,48 +7,123 @@
 // This program is free software; you can redistribute it and/or modify
 // it under the terms of the GNU General Public License. You should have
 // received a copy of the GPL license along with this program; if you
-// did not, you can find it at http://www.gnu.org
+// did not, you can find it at http://www.gnu.org/
 //
 
+
 #include "sys.h"
+#include "fixedvector.h"
+#include "string.h"
+#include <stdlib.h>
+#include <set>
 
-#if _WIN32
+#if defined(__x86_64__) || defined(__i386__)
+	#include <cpuid.h>
+#endif
+
+#if !_WIN32
+	#include <unistd.h>
+#endif
 
-#include "ints.h"
 
-int sphCpuThreadsCount()
+int GetNumLogicalCPUs()
 {
+#if _WIN32
 	SYSTEM_INFO tInfo;
 	GetSystemInfo ( &tInfo );
 	return tInfo.dwNumberOfProcessors;
+#else
+	return sysconf ( _SC_NPROCESSORS_ONLN );
+#endif
 }
 
-static int GetMemPageSize()
+
+int GetNumPhysicalCPUs()
 {
-	SYSTEM_INFO tInfo;
-	GetSystemInfo ( &tInfo );
-	return tInfo.dwPageSize;
-}
+#if _WIN32
+	DWORD uResponseSize = 0;
+	if ( GetLogicalProcessorInformationEx ( RelationProcessorCore, nullptr, &uResponseSize ) )
+		return -1;
+
+	if ( GetLastError()!=ERROR_INSUFFICIENT_BUFFER )
+		return -1;
+
+	CSphFixedVector<uint8_t> dBuffer ( uResponseSize );
+	if ( !GetLogicalProcessorInformationEx ( RelationProcessorCore, (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX *)dBuffer.Begin(), &uResponseSize ) )
+		return -1;
 
+	int iNumCPUs = 0;
+	uint8_t * pPtr = dBuffer.Begin();
+	uint8_t * pMax = pPtr + uResponseSize;
+	while ( pPtr < pMax )
+	{
+		auto pInfo = (SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX *)pPtr;
+		pPtr += pInfo->Size;
+		iNumCPUs++;
+	}
+
+	return iNumCPUs;
 #else
+	FILE * pFile = fopen ( "/proc/cpuinfo", "r" );
+	if ( !pFile )
+		return -1;
+
+	using ProcCore_t = std::pair<size_t, size_t>;
+	std::set<ProcCore_t> tPCs;
+
+	ProcCore_t tPC;
+	char szLine[1024];
+	while ( fgets ( szLine, sizeof(szLine), pFile ) )
+	{
+		const char * pCol = strchr ( szLine, ':' );
+		if ( !pCol )
+			continue;
 
-#include <unistd.h>
+		CSphString sKey ( szLine, pCol-szLine );
+		CSphString sValue ( pCol+1 );
+		sKey.Trim();
+		sValue.Trim();
+		if ( sKey.Begins ( "physical id" ) )
+			tPC.first = atoi ( sValue.cstr() );
+		else if ( sKey.Begins ( "core id" ) )
+		{
+			tPC.second = atoi ( sValue.cstr() );
+			tPCs.insert(tPC);
+		}
+	}
 
-int sphCpuThreadsCount()
+	fclose(pFile);
+	return tPCs.empty() ? -1 : tPCs.size();
+#endif
+}
+
+
+bool IsSSE42Supported()
 {
-	return sysconf ( _SC_NPROCESSORS_ONLN );
+#if defined(__x86_64__) || defined(__i386__)
+	uint32_t dInfo[4];
+	__cpuid ( 1, dInfo[0], dInfo[1], dInfo[2], dInfo[3] );
+	return (dInfo[2] & (1 << 20)) != 0;
+#else
+	return true;	// assumes that it's ARM and simde is used
+#endif
 }
 
 
 static int GetMemPageSize()
 {
+#if _WIN32
+	SYSTEM_INFO tInfo;
+	GetSystemInfo ( &tInfo );
+	return tInfo.dwPageSize;
+#else
 	return getpagesize();
+#endif
 }
 
-#endif
 
 int sphGetMemPageSize()
 {
 	static int iMemPageSize = GetMemPageSize();
 	return iMemPageSize;
-}
+}

+ 4 - 8
src/std/sys.h

@@ -12,12 +12,8 @@
 
 #pragma once
 
+int GetNumLogicalCPUs();
+int GetNumPhysicalCPUs();	// may return -1 if fails
+bool IsSSE42Supported();
 
-int sphCpuThreadsCount();
-
-int sphGetMemPageSize();
-
-inline int sphRoundUp ( int iValue, int iLimit )
-{
-	return ( iValue + iLimit - 1 ) & ~( iLimit - 1 );
-}
+int sphGetMemPageSize();

File diff suppressed because it is too large
+ 0 - 0
test/test_261/model.bin


Some files were not shown because too many files changed in this diff