Browse Source

added attribute updates alpha

git-svn-id: svn://svn.sphinxsearch.com/sphinx/trunk@557 406a0c4d-033a-0410-8de8-e80135713968
shodan 19 years ago
parent
commit
ee50c4b33e
4 changed files with 250 additions and 21 deletions
  1. 68 0
      api/sphinxapi.php
  2. 63 1
      src/searchd.cpp
  3. 96 18
      src/sphinx.cpp
  4. 23 2
      src/sphinx.h

+ 68 - 0
api/sphinxapi.php

@@ -20,10 +20,12 @@
 /// known searchd commands
 define ( "SEARCHD_COMMAND_SEARCH",	0 );
 define ( "SEARCHD_COMMAND_EXCERPT",	1 );
+define ( "SEARCHD_COMMAND_UPDATE",	2 );
 
 /// current client-side command implementation versions
 define ( "VER_COMMAND_SEARCH",		0x106 );
 define ( "VER_COMMAND_EXCERPT",		0x100 );
+define ( "VER_COMMAND_UPDATE",		0x100 );
 
 /// known searchd status codes
 define ( "SEARCHD_OK",				0 );
@@ -584,6 +586,72 @@ class SphinxClient
 
 		return $res;
 	}
+
+	/////////////////////////////////////////////////////////////////////////////
+	// attribute updates
+	/////////////////////////////////////////////////////////////////////////////
+
+	/// update specified attributes on specified documents
+	///
+	/// $index is a name of the index to be updated
+	/// $attrs is an array of attribute name strings
+	/// $values is a hash where key is document id, and value is an array of
+	///		new attribute values
+	///
+	/// returns number of actually updated documents (0 or more) on success
+	/// returns -1 on failure
+	///
+	/// usage example:
+	///		$cl->UpdateAttributes ( array("group"), array(123=>array(456)) );
+	function UpdateAttributes ( $index, $attrs, $values )
+	{
+		// verify everything
+		assert ( is_string($index) );
+
+		assert ( is_array($attrs) );
+		foreach ( $attrs as $attr )
+			assert ( is_string($attr) );
+
+		assert ( is_array($values) );
+		foreach ( $values as $id=>$entry )
+		{
+			assert ( is_int($id) );
+			assert ( is_array($entry) );
+			assert ( count($entry)==count($attrs) );
+			foreach ( $entry as $v )
+				assert ( is_int($v) );
+		}
+
+		// build request
+		$req = pack ( "N", strlen($index) ) . $index;
+
+		$req .= pack ( "N", count($attrs) );
+		foreach ( $attrs as $attr )
+			$req .= pack ( "N", strlen($attr) ) . $attr;
+
+		$req .= pack ( "N", count($values) );
+		foreach ( $values as $id=>$entry )
+		{
+			$req .= pack ( "N", $id );
+			foreach ( $entry as $v )
+				$req .= pack ( "N", $v );
+		}
+
+		// connect, send query, get response
+		if (!( $fp = $this->_Connect() ))
+			return -1;
+
+		$len = strlen($req);
+		$req = pack ( "nnN", SEARCHD_COMMAND_UPDATE, VER_COMMAND_UPDATE, $len ) . $req; // add header
+		fwrite ( $fp, $req, $len+8 );
+
+		if (!( $response = $this->_GetResponse ( $fp, VER_COMMAND_UPDATE ) ))
+			return -1;
+
+		// parse response
+		list(,$updated) = unpack ( "N*", substr ( $response, $p, 4 ) );
+		return $updated;
+	}
 }
 
 //

+ 63 - 1
src/searchd.cpp

@@ -161,6 +161,7 @@ enum SearchdCommand_e
 {
 	SEARCHD_COMMAND_SEARCH		= 0,
 	SEARCHD_COMMAND_EXCERPT		= 1,
+	SEARCHD_COMMAND_UPDATE		= 2,
 
 	SEARCHD_COMMAND_TOTAL
 };
@@ -170,7 +171,8 @@ enum SearchdCommand_e
 enum
 {
 	VER_COMMAND_SEARCH		= 0x106,
-	VER_COMMAND_EXCERPT		= 0x100
+	VER_COMMAND_EXCERPT		= 0x100,
+	VER_COMMAND_UPDATE		= 0x100
 };
 
 
@@ -2799,6 +2801,65 @@ void HandleCommandExcerpt ( int iSock, int iVer, InputBuffer_c & tReq )
 	assert ( tOut.GetError()==true || tOut.GetSentCount()==iRespLen+8 );
 }
 
+/////////////////////////////////////////////////////////////////////////////
+
+void HandleCommandUpdate ( int iSock, int iVer, InputBuffer_c & tReq )
+{
+	// check major command version
+	if ( (iVer>>8)!=(VER_COMMAND_UPDATE>>8) )
+	{
+		tReq.SendErrorReply ( "major command version mismatch (expected v.%d.x, got v.%d.%d)",
+			VER_COMMAND_UPDATE>>8, iVer>>8, iVer&0xff );
+		return;
+	}
+
+	// obtain and check index name
+	CSphString sIndex = tReq.GetString ();
+	if ( !g_hIndexes ( sIndex ) )
+	{
+		tReq.SendErrorReply ( "invalid local index '%s' specified in request", sIndex.cstr() );
+		return;
+	}
+	assert ( g_hIndexes[sIndex].m_pIndex );
+
+	// obtain update data
+	CSphAttrUpdate_t tUpd;
+	tUpd.m_dAttrs.Resize ( tReq.GetDword() ); // FIXME! check this
+	ARRAY_FOREACH ( i, tUpd.m_dAttrs )
+		tUpd.m_dAttrs[i].m_sName = tReq.GetString ();
+
+	int iStride = 1+tUpd.m_dAttrs.GetLength();
+	tUpd.m_iUpdates = tReq.GetInt (); // FIXME! check this
+	tUpd.m_pUpdates = new DWORD [ tUpd.m_iUpdates*iStride ];
+	for ( int i=0; i<tUpd.m_iUpdates*iStride; i++ )
+		tUpd.m_pUpdates[i] = tReq.GetDword ();
+
+	// check buffer
+	if ( tReq.GetError() )
+	{
+		tReq.SendErrorReply ( "invalid or truncated request" );
+		return;
+	}
+
+	// do update
+	CSphString sError;
+	int iUpdated = g_hIndexes[sIndex].m_pIndex->UpdateAttributes ( tUpd, sError );
+	if ( iUpdated<0 )
+	{
+		tReq.SendErrorReply ( "index '%s': %s", sIndex.cstr(), sError.cstr() );
+		return;
+	}
+
+	NetOutputBuffer_c tOut ( iSock );
+	tOut.SendWord ( SEARCHD_OK );
+	tOut.SendWord ( VER_COMMAND_UPDATE);
+	tOut.SendInt ( sizeof(DWORD) );
+	tOut.SendInt ( iUpdated );
+	tOut.Flush ();
+	assert ( tOut.GetError()==true || tOut.GetSentCount()==8+sizeof(DWORD) );
+}
+
+/////////////////////////////////////////////////////////////////////////////
 
 void HandleClient ( int iSock, const char * sClientIP )
 {
@@ -2853,6 +2914,7 @@ void HandleClient ( int iSock, const char * sClientIP )
 	{
 		case SEARCHD_COMMAND_SEARCH:	HandleCommandSearch ( iSock, iCommandVer, tBuf ); break;
 		case SEARCHD_COMMAND_EXCERPT:	HandleCommandExcerpt ( iSock, iCommandVer, tBuf ); break;
+		case SEARCHD_COMMAND_UPDATE:	HandleCommandUpdate ( iSock, iCommandVer, tBuf ); break;
 		default:						assert ( 0 && "INTERNAL ERROR: unhandled command" ); break;
 	}
 }

+ 96 - 18
src/sphinx.cpp

@@ -485,7 +485,7 @@ public:
 	}
 
 public:
-	/// read-only accessor
+	/// accessor
 	inline const T & operator [] ( int iIndex )
 	{
 		return m_pData[iIndex];
@@ -1505,11 +1505,14 @@ struct CSphIndex_VLN : CSphIndex
 								~CSphIndex_VLN ();
 
 	virtual int					Build ( CSphDict * pDict, const CSphVector < CSphSource * > & dSources, int iMemoryLimit, ESphDocinfo eDocinfo );
+
+	virtual const CSphSchema *	Preload ();
 	virtual CSphQueryResult *	Query ( CSphDict * pDict, CSphQuery * pQuery);
 	virtual bool				QueryEx ( CSphDict * pDict, CSphQuery * pQuery, CSphQueryResult * pResult, ISphMatchQueue * pTop );
-	virtual const CSphSchema *	Preload ();
-	bool						Merge ( CSphIndex * pSource );	
-	void						MergeWordData ( CSphWordRecord & tDstWord, CSphWordRecord & tSrcWord );
+
+	virtual bool				Merge ( CSphIndex * pSource );	
+
+	virtual int					UpdateAttributes ( const CSphAttrUpdate_t & tUpd, CSphString & sError );
 
 private:
 	static const int			WORDLIST_CHECKPOINT		= 1024;		///< wordlist checkpoints frequency
@@ -1579,8 +1582,11 @@ private:
 	bool						MatchBoolean ( const CSphQuery * pQuery, CSphDict * pDict, ISphMatchQueue * pTop, CSphQueryResult * pResult, int iDoclistFD );
 	bool						MatchExtended ( const CSphQuery * pQuery, CSphDict * pDict, ISphMatchQueue * pTop, CSphQueryResult * pResult, int iDoclistFD );
 
+	const DWORD *				FindDocinfo ( DWORD uDocID );
 	void						LookupDocinfo ( CSphDocInfo & tMatch );
 
+	void						MergeWordData ( CSphWordRecord & tDstWord, CSphWordRecord & tSrcWord );
+
 public:
 	// FIXME! this needs to be protected, and refactored as well
 	bool						SetupQueryWord ( CSphQueryWord & tWord, int iFD );
@@ -3263,6 +3269,21 @@ CSphQueryResult::~CSphQueryResult ()
 {
 }
 
+/////////////////////////////////////////////////////////////////////////////
+// ATTR UPDATE
+/////////////////////////////////////////////////////////////////////////////
+
+CSphAttrUpdate_t::CSphAttrUpdate_t ()
+	: m_iUpdates ( 0 )
+	, m_pUpdates ( NULL )
+{}
+
+
+CSphAttrUpdate_t::~CSphAttrUpdate_t ()
+{
+	SafeDeleteArray ( m_pUpdates );
+}
+
 /////////////////////////////////////////////////////////////////////////////
 // CHUNK READER
 /////////////////////////////////////////////////////////////////////////////
@@ -3527,6 +3548,52 @@ CSphIndex_VLN::~CSphIndex_VLN ()
 {
 }
 
+/////////////////////////////////////////////////////////////////////////////
+
+int CSphIndex_VLN::UpdateAttributes ( const CSphAttrUpdate_t & tUpd, CSphString & sError )
+{
+	assert ( tUpd.m_pUpdates || !tUpd.m_iUpdates );
+
+	// error message buffer
+	char sBuf [ 1024 ];
+
+	// remap update schema to index schema
+	CSphVector<int,8> dAttrIndex;
+	ARRAY_FOREACH ( i, tUpd.m_dAttrs )
+	{
+		int iIndex = m_tSchema.GetAttrIndex ( tUpd.m_dAttrs[i].m_sName.cstr() );
+		if ( iIndex<0 )
+		{
+			snprintf ( sBuf, sizeof(sBuf), "attribute '%s' not found", tUpd.m_dAttrs[i].m_sName.cstr() );
+			sError = sBuf;
+			return -1;
+		}
+		dAttrIndex.Add ( 1+iIndex );
+	}
+	assert ( dAttrIndex.GetLength()==tUpd.m_dAttrs.GetLength() );
+
+	// do update
+	int iUpdated = 0;
+	int iStride = 1+dAttrIndex.GetLength();
+	const DWORD * pUpdate = tUpd.m_pUpdates;
+
+	for ( int iUpd=0; iUpd<tUpd.m_iUpdates; iUpd++, pUpdate+=iStride )
+	{
+		DWORD * pEntry = const_cast < DWORD * > ( FindDocinfo ( pUpdate[0] ) );
+		if ( !pEntry )
+			continue;
+
+		assert ( pEntry[0]==pUpdate[0] );
+		ARRAY_FOREACH ( i, dAttrIndex )
+			pEntry [ dAttrIndex[i] ] = pUpdate[1+i];
+
+		iUpdated++;
+	}
+
+	return iUpdated;
+}
+
+/////////////////////////////////////////////////////////////////////////////
 
 #define SPH_CMPHIT_LESS(a,b) \
 	(a.m_iWordID < b.m_iWordID || \
@@ -5638,29 +5705,27 @@ static inline bool sphMatchEarlyReject ( const CSphMatch & tMatch, const CSphQue
 }
 
 
-void CSphIndex_VLN::LookupDocinfo ( CSphDocInfo & tMatch )
+const DWORD * CSphIndex_VLN::FindDocinfo ( DWORD uDocID )
 {
 	if ( m_iDocinfo<=0 )
-		return;
+		return NULL;
 
 	assert ( m_eDocinfo==SPH_DOCINFO_EXTERN );
 	assert ( !m_pDocinfo.IsEmpty() );
 	assert ( m_tSchema.m_dAttrs.GetLength() );
-	assert ( tMatch.m_pAttrs );
-	assert ( tMatch.m_iAttrs==m_tSchema.m_dAttrs.GetLength() );
 
-	DWORD uHash = ( tMatch.m_iDocID - m_pDocinfo[0] ) >> m_iDocinfoIdShift;
+	DWORD uHash = ( uDocID - m_pDocinfo[0] ) >> m_iDocinfoIdShift;
 	int iStart = m_pDocinfoHash [ uHash ];
 	int iEnd = m_pDocinfoHash [ uHash+1 ] - 1;
 
 	int iStride = 1 + m_tSchema.m_dAttrs.GetLength();
 	const DWORD * pFound = NULL;
 
-	if ( tMatch.m_iDocID==m_pDocinfo [ iStart*iStride ] )
+	if ( uDocID==m_pDocinfo [ iStart*iStride ] )
 	{
 		pFound = &m_pDocinfo [ iStart*iStride ];
 
-	} else if ( tMatch.m_iDocID==m_pDocinfo [ iEnd*iStride ] )
+	} else if ( uDocID==m_pDocinfo [ iEnd*iStride ] )
 	{
 		pFound = &m_pDocinfo [ iEnd*iStride ];
 
@@ -5670,19 +5735,19 @@ void CSphIndex_VLN::LookupDocinfo ( CSphDocInfo & tMatch )
 		{
 			// check if nothing found
 			if (
-				tMatch.m_iDocID < m_pDocinfo [ iStart*iStride ] ||
-				tMatch.m_iDocID > m_pDocinfo [ iEnd*iStride ] )
+				uDocID < m_pDocinfo [ iStart*iStride ] ||
+				uDocID > m_pDocinfo [ iEnd*iStride ] )
 					break;
-			assert ( tMatch.m_iDocID > m_pDocinfo [ iStart*iStride ] );
-			assert ( tMatch.m_iDocID < m_pDocinfo [ iEnd*iStride ] );
+			assert ( uDocID > m_pDocinfo [ iStart*iStride ] );
+			assert ( uDocID < m_pDocinfo [ iEnd*iStride ] );
 
 			int iMid = iStart + (iEnd-iStart)/2;
-			if ( tMatch.m_iDocID==m_pDocinfo [ iMid*iStride ] )
+			if ( uDocID==m_pDocinfo [ iMid*iStride ] )
 			{
 				pFound = &m_pDocinfo [ iMid*iStride ];
 				break;
 			}
-			if ( tMatch.m_iDocID<m_pDocinfo [ iMid*iStride ] )
+			if ( uDocID<m_pDocinfo [ iMid*iStride ] )
 				iEnd = iMid;
 			else
 				iStart = iMid;
@@ -5690,7 +5755,20 @@ void CSphIndex_VLN::LookupDocinfo ( CSphDocInfo & tMatch )
 	}
 
 	assert ( pFound );
-	memcpy ( tMatch.m_pAttrs, pFound+1, tMatch.m_iAttrs*sizeof(DWORD) );
+	return pFound;
+}
+
+
+void CSphIndex_VLN::LookupDocinfo ( CSphDocInfo & tMatch )
+{
+	const DWORD * pFound = FindDocinfo ( tMatch.m_iDocID );
+	if ( pFound )
+	{
+		assert ( tMatch.m_pAttrs );
+		assert ( tMatch.m_iAttrs==m_tSchema.m_dAttrs.GetLength() );
+		assert ( pFound[0]==tMatch.m_iDocID );
+		memcpy ( tMatch.m_pAttrs, pFound+1, tMatch.m_iAttrs*sizeof(DWORD) );
+	}
 }
 
 

+ 23 - 2
src/sphinx.h

@@ -950,6 +950,21 @@ public:
 	virtual					~CSphQueryResult ();	///< dtor, which releases all owned stuff
 };
 
+/////////////////////////////////////////////////////////////////////////////
+// ATTRIBUTE UPDATE QUERY
+/////////////////////////////////////////////////////////////////////////////
+
+struct CSphAttrUpdate_t
+{
+	CSphVector<CSphColumnInfo,8>	m_dAttrs;		///< update schema (ie. what attrs to update)
+	int								m_iUpdates;		///< updates count
+	DWORD *							m_pUpdates;		///< updates data
+
+public:
+	CSphAttrUpdate_t ();		///< builds new clean structure
+	~CSphAttrUpdate_t ();		
+};
+
 /////////////////////////////////////////////////////////////////////////////
 // FULLTEXT INDICES
 /////////////////////////////////////////////////////////////////////////////
@@ -1001,11 +1016,17 @@ public:
 
 public:
 	virtual int					Build ( CSphDict * dict, const CSphVector < CSphSource * > & dSources, int iMemoryLimit, ESphDocinfo eDocinfo ) = 0;
+
+	virtual const CSphSchema *	Preload () = 0;
 	virtual CSphQueryResult *	Query ( CSphDict * dict, CSphQuery * pQuery ) = 0;
 	virtual bool				QueryEx ( CSphDict * dict, CSphQuery * pQuery, CSphQueryResult * pResult, ISphMatchQueue * pTop ) = 0;
-	virtual bool				Merge( CSphIndex * pSource ) = 0;
 
-	virtual const CSphSchema *	Preload () = 0;
+	virtual bool				Merge ( CSphIndex * pSource ) = 0;
+
+	/// updates memory-cached attributes in real time
+	/// returns non-negative amount of actually found and updated records on success
+	/// returns -1 and fills sError on failure
+	virtual int					UpdateAttributes ( const CSphAttrUpdate_t & tUpd, CSphString & sError ) = 0;
 
 protected:
 	ProgressCallback_t *		m_pProgress;