Browse Source

allow to work with `cluster:table`

That is not just cluster:table, but `cluster:table`.
For mysqldump-specific commands user should match cluster_user (yes,
this is not normal-flavour, this is specific hack). For insert/replace
user is not checked (so, replaying dumps doesn't require specific user).

That is related to #2249
Aleksey N. Vinogradov 1 year ago
parent
commit
5cc85bf0fa
5 changed files with 66 additions and 10 deletions
  1. 63 10
      src/searchd.cpp
  2. 1 0
      src/sphinxql.l
  3. 1 0
      src/sphinxql_extra.l
  4. 0 0
      test/test_383/model.bin
  5. 1 0
      test/test_383/test.xml

+ 63 - 10
src/searchd.cpp

@@ -7182,6 +7182,12 @@ void SearchHandler_c::RunSubset ( int iStart, int iEnd )
 		return;
 	}
 
+	if ( m_dNQueries[0].m_iLimit==-1 && ( !dRemotes.IsEmpty () || m_dLocal.GetLength ()>1 ) )
+	{
+		m_sError << "only one local table allowed in streaming select";
+		return;
+	}
+
 	// select lists must have no expressions
 	if ( m_bMultiQueue )
 		m_bMultiQueue = AllowsMulti ();
@@ -11222,6 +11228,20 @@ void sphHandleMysqlInsert ( StmtErrorReporter_i & tOut, const SqlStmt_t & tStmt
 	StatCountCommandDetails ( SearchdStats_t::eReplace, tStmt.m_iRowsAffected, tmStart );
 }
 
+// when index name came as `cluster:name`, it came to the name, and should be splitted to cluster and index name
+void MaybeFixupIndexNameFromMysqldump ( SqlStmt_t & tStmt )
+{
+	if ( g_pLocalIndexes->Contains ( tStmt.m_sIndex ) )
+		return;
+
+	auto dParts = sphSplit ( tStmt.m_sIndex.cstr (), ":" );
+	if ( dParts.GetLength ()!=2 )
+		return;
+
+	tStmt.m_sCluster = dParts[0];
+	tStmt.m_sIndex = dParts[1];
+}
+
 static bool AddDocument ( const SqlStmt_t & tStmt, cServedIndexRefPtr_c & pServed, StmtErrorReporter_i & tOut )
 {
 	auto * pSession = session::GetClientSession();
@@ -12472,13 +12492,26 @@ void DescribeDistributedSchema ( VectorLike& dOut, const cDistributedIndexRefPtr
 	}
 }
 
+inline static bool ClusterFlavour () noexcept
+{
+	return !g_sClusterUser.IsEmpty () && session::GetClientSession ()->m_sUser==g_sClusterUser;
+}
+
 
 void HandleMysqlDescribe ( RowBuffer_i & tOut, const SqlStmt_t * pStmt )
 {
 	auto & tStmt = *pStmt;
 	VectorLike dOut ( tStmt.m_sStringParam, 0 );
+	auto sName = tStmt.m_sIndex;
 
-	auto pServed = GetServed ( tStmt.m_sIndex );
+	if ( ClusterFlavour() )
+	{
+		auto dParts = sphSplit( tStmt.m_sIndex.cstr(), ":");
+		if ( dParts.GetLength()>1 )
+			sName = dParts[1];
+	}
+
+	auto pServed = GetServed ( sName );
 	if ( pServed )
 	{
 		// data
@@ -12504,7 +12537,7 @@ void HandleMysqlDescribe ( RowBuffer_i & tOut, const SqlStmt_t * pStmt )
 		DescribeLocalSchema ( dOut, tSchema, pServed->m_eType==IndexType_e::TEMPLATE, bShowFields );
 	} else
 	{
-		auto pDistr = GetDistr ( tStmt.m_sIndex );
+		auto pDistr = GetDistr ( sName );
 		if ( !pDistr )
 		{
 			tOut.ErrorAbsent ( "no such table '%s'", tStmt.m_sIndex.cstr () );
@@ -12580,9 +12613,7 @@ CSphVector<NamedIndexType_t> GetAllServedIndexes()
 void HandleMysqlShowTables ( RowBuffer_i & tOut, const SqlStmt_t * pStmt )
 {
 	auto dIndexes = GetAllServedIndexes();
-	bool bWithClusters = false;
-	if ( !g_sClusterUser.IsEmpty() && session::GetClientSession ()->m_sUser==g_sClusterUser )
-		bWithClusters = true;
+	bool bWithClusters = ClusterFlavour();
 
 	// output the results
 	VectorLike dTable ( pStmt->m_sStringParam, { "Index", "Type" } );
@@ -15805,24 +15836,31 @@ void HandleMysqlShowFederatedIndexStatus ( RowBuffer_i & tOut, const SqlStmt_t &
 	CheckLike tSelector { tStmt.m_sStringParam.cstr() };
 	auto dIndexes = GetAllServedIndexes();
 
+	bool bWithClusters = ClusterFlavour();
+
 	// fake stat for distrs
 	CSphSourceStats tFakeStats;
 	tFakeStats.m_iTotalDocuments = 1000; // TODO: check is it worth to query that number from agents
 
 	for ( const NamedIndexType_t& tIndex : dIndexes )
 	{
-		if ( !tSelector.Match ( tIndex.m_sName.cstr() ) )
+		CSphString sFullName;
+		if ( bWithClusters && !tIndex.m_sCluster.IsEmpty () )
+			sFullName.SetSprintf ("%s:%s", tIndex.m_sCluster.cstr(), tIndex.m_sName.cstr());
+		const CSphString& sName = ( bWithClusters && !tIndex.m_sCluster.IsEmpty () ) ? sFullName : tIndex.m_sName;
+
+		if ( !tSelector.Match ( sName.cstr() ) )
 			continue;
 
 		if ( tIndex.m_eType == IndexType_e::DISTR )
-			AddFederatedIndexStatusLine ( tFakeStats, tIndex.m_sName, tOut );
+			AddFederatedIndexStatusLine ( tFakeStats, sName, tOut );
 		else {
 			auto pServed = GetServed ( tIndex.m_sName );
 			if ( !pServed )
 				continue; // really rare case when between GetAllServedIndexes and that moment table was removed.
 			RIdx_c pIndex { pServed };
 			assert ( pIndex );
-			AddFederatedIndexStatusLine ( pIndex->GetStats(), tIndex.m_sName, tOut );
+			AddFederatedIndexStatusLine ( pIndex->GetStats(), sName, tOut );
 		}
 	}
 
@@ -16996,14 +17034,28 @@ bool ClientSession_c::Execute ( Str_t sQuery, RowBuffer_i & tOut )
 	switch ( eStmt )
 	{
 	case STMT_PARSE_ERROR:
-		FreezeLastMeta();
-		tOut.Error ( m_sError.cstr() );
+		if ( m_sError.IsEmpty() )
+			tOut.Ok ( 0 );
+		else {
+			FreezeLastMeta();
+			tOut.Error ( m_sError.cstr() );
+		}
 		return true;
 
 	case STMT_SELECT:
 		{
 			MEMORY ( MEM_SQL_SELECT );
 
+			if ( ClusterFlavour () )
+			{
+				auto dParts = sphSplit ( pStmt->m_sIndex.cstr (), ":" );
+				if ( dParts.GetLength ()>1 )
+				{
+					pStmt->m_sCluster = dParts[0];
+					pStmt->m_tQuery.m_sIndexes = pStmt->m_sIndex = dParts[1];
+				}
+			}
+
 			StatCountCommand ( SEARCHD_COMMAND_SEARCH );
 			auto tmStart = sphMicroTimer();
 			SearchHandler_c tHandler ( 1, sphCreatePlainQueryParser(), QUERY_SQL, true );
@@ -17056,6 +17108,7 @@ bool ClientSession_c::Execute ( Str_t sQuery, RowBuffer_i & tOut )
 	case STMT_REPLACE:
 		{
 			StmtErrorReporter_c tErrorReporter ( tOut );
+			MaybeFixupIndexNameFromMysqldump ( *pStmt );
 			sphHandleMysqlInsert ( tErrorReporter, *pStmt );
 			return true;
 		}

+ 1 - 0
src/sphinxql.l

@@ -243,6 +243,7 @@ FLOAT_CONSTANT      {INT}\.{INT}?{EXP}?|{INT}?\.{INT}{EXP}|{INT}{EXP}
 {ALNUM}+    		{ YYSTOREBOUNDS; return TOK_BAD_NUMERIC; }
 @@{ID_SYS}			{ YYSTOREBOUNDS; return TOK_SYSVAR; }
 `{ID}`				{ YYSTOREBOUNDS; ++lvalp->m_iStart; --lvalp->m_iEnd; return TOK_IDENT; }
+`{ID}:{ID}`			{ YYSTOREBOUNDS; ++lvalp->m_iStart; --lvalp->m_iEnd; return TOK_IDENT; }
 `{INT}{ID}`		    { YYSTOREBOUNDS; return TOK_BACKIDENT; } // allow `123xyz` column names
 
 {SPACE}+

+ 1 - 0
src/sphinxql_extra.l

@@ -105,6 +105,7 @@ FLOAT_CONSTANT      ({INT}\.{INT}?|{INT}?\.{INT}){EXPONENT}?
 
 {ID}				{ STORE_BOUNDS; return TOK_IDENT; }
 `{ID}`				{ STORE_BOUNDS; ++lvalp->m_iStart; --lvalp->m_iEnd; return TOK_IDENT; }
+`{ID}:{ID}`			{ STORE_BOUNDS; ++lvalp->m_iStart; --lvalp->m_iEnd; return TOK_IDENT; }
 `{DIGIT}+{ID}`		{ STORE_BOUNDS; ++lvalp->m_iStart; --lvalp->m_iEnd; return TOK_IDENT; } // allow `123xyz` column names
 
 {SPACE}+

File diff suppressed because it is too large
+ 0 - 0
test/test_383/model.bin


+ 1 - 0
test/test_383/test.xml

@@ -154,6 +154,7 @@ insert into test:rtc (title,mva1,gid,j1) values ('test 501', (521,522,523), 521,
 <sphinxql d="2">
 UPDATE test:rt1 SET mva10=(31,33) where id=3;
 insert into test:rt1 (id,title,mva1) values (601, 'test 201', (621,622));
+insert into `test:rt1` (id,title,mva1) values (602, 'test 383', (622,623));
 </sphinxql>
 <!-- request to node 1 -->
 <sphinxql d="1">

Some files were not shown because too many files changed in this diff