receiver_ctx.cpp 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. //
  2. // Copyright (c) 2017-2026, Manticore Software LTD (https://manticoresearch.com)
  3. // Copyright (c) 2001-2016, Andrew Aksyonoff
  4. // Copyright (c) 2008-2016, Sphinx Technologies Inc
  5. // All rights reserved
  6. //
  7. // This program is free software; you can redistribute it and/or modify
  8. // it under the terms of the GNU General Public License. You should have
  9. // received a copy of the GPL license along with this program; if you
  10. // did not, you can find it at http://www.gnu.org
  11. //
  12. #include "receiver_ctx.h"
  13. #include "tracer.h"
  14. #include "sphinxpq.h"
  15. #include "accumulator.h"
  16. #include "memio.h"
  17. #include "serialize.h"
  18. #include "sphinxrt.h"
  19. #include "searchdaemon.h"
  20. #include "searchdreplication.h"
  21. // verbose logging of replcating transactions, ruled by this env variable
  22. static bool LOG_LEVEL_RPL_TNX = val_from_env ( "MANTICORE_LOG_RPL_TNX", false );
  23. #define LOG_COMPONENT_RPL_TNX ""
  24. #define RPL_TNX LOGMSG ( RPL_DEBUG, RPL_TNX, RPL_TNX )
  25. // data passed to Galera and used at callbacks
  26. class ReceiverCtx_c final: public Wsrep::Receiver_i
  27. {
  28. // share of remote commands received between apply and commit callbacks
  29. RtAccum_t m_tAcc; // apply fn, commit fn
  30. CSphQuery m_tQuery;
  31. CSphString m_sName; // name of serving cluster
  32. Wsrep::Provider_i* m_pProvider = nullptr;
  33. std::function<void()> m_fnOnClean;
  34. private:
  35. void Cleanup();
  36. ~ReceiverCtx_c() final;
  37. public:
  38. ReceiverCtx_c ( CSphString sName, Wsrep::Provider_i* pProvider, std::function<void()> fnOnClean );
  39. ReceiverCtx_c ( const ReceiverCtx_c& ) = delete;
  40. ReceiverCtx_c ( ReceiverCtx_c&& ) = delete;
  41. ReceiverCtx_c& operator= ( const ReceiverCtx_c& ) = delete;
  42. ReceiverCtx_c& operator= ( ReceiverCtx_c&& ) = delete;
  43. // implementation of Wsrep::Receiver_i
  44. bool ApplyWriteset ( ByteBlob_t tData, bool bIsolated ) final;
  45. void ApplyUnordered ( ByteBlob_t tData ) final;
  46. bool Commit ( const void* pHndTrx, uint32_t uFlags, const Wsrep::TrxMeta_t* pMeta, bool bCommit ) final;
  47. private:
  48. static bool PQAdd ( ReplicationCommand_t* pCmd, ByteBlob_t tReq );
  49. };
  50. Wsrep::Receiver_i* MakeReceiverCtx ( CSphString sName, Wsrep::Provider_i* pProvider, std::function<void()> fnOnClean )
  51. {
  52. return new ReceiverCtx_c ( std::move ( sName ), pProvider, std::move(fnOnClean) );
  53. }
  54. ReceiverCtx_c::ReceiverCtx_c ( CSphString sName, Wsrep::Provider_i * pProvider, std::function<void()> fnOnClean )
  55. : m_sName {std::move ( sName )}
  56. , m_pProvider ( pProvider )
  57. , m_fnOnClean { std::move ( fnOnClean ) }
  58. {}
  59. ReceiverCtx_c::~ReceiverCtx_c ()
  60. {
  61. Cleanup ();
  62. }
  63. void ReceiverCtx_c::Cleanup ()
  64. {
  65. m_tAcc.Cleanup();
  66. m_tQuery.m_dFilters.Reset();
  67. m_tQuery.m_dFilterTree.Reset();
  68. if ( m_fnOnClean )
  69. m_fnOnClean();
  70. }
  71. bool ReceiverCtx_c::PQAdd ( ReplicationCommand_t* pCmd, ByteBlob_t tReq )
  72. {
  73. assert ( pCmd && pCmd->m_eCommand == ReplCmd_e::PQUERY_ADD );
  74. cServedIndexRefPtr_c pServed = GetServed ( pCmd->m_sIndex );
  75. if ( !pServed )
  76. {
  77. sphWarning ( "unknown table '%s' for replication, command %d", pCmd->m_sIndex.cstr(), (int)pCmd->m_eCommand );
  78. return false;
  79. }
  80. if ( pServed->m_eType != IndexType_e::PERCOLATE )
  81. {
  82. sphWarning ( "wrong type of table '%s' for replication, command %d", pCmd->m_sIndex.cstr(), (int)pCmd->m_eCommand );
  83. return false;
  84. }
  85. StoredQueryDesc_t tPQ;
  86. LoadStoredQuery ( tReq, tPQ );
  87. RPL_TNX << "pq-add, table '" << pCmd->m_sIndex.cstr() << "', uid " << tPQ.m_iQUID << " query " << tPQ.m_sQuery.cstr();
  88. CSphString sError;
  89. PercolateQueryArgs_t tArgs ( tPQ );
  90. tArgs.m_bReplace = true;
  91. pCmd->m_pStored = RIdx_T<PercolateIndex_i*> ( pServed )->CreateQuery ( tArgs, sError );
  92. if ( !pCmd->m_pStored )
  93. {
  94. sphWarning ( "pq-add replication error '%s', table '%s'", sError.cstr(), pCmd->m_sIndex.cstr() );
  95. return false;
  96. }
  97. return true;
  98. }
  99. // callback for Galera to parse replicated commands
  100. bool ReceiverCtx_c::ApplyWriteset ( ByteBlob_t tData, bool bIsolated )
  101. {
  102. MemoryReader_c tReader ( tData );
  103. while ( tReader.GetPos() < tData.second )
  104. {
  105. auto pCmd = std::make_unique<ReplicationCommand_t>();
  106. if ( !LoadCmdHeader ( tReader, pCmd.get() ))
  107. {
  108. sphWarning ( "%s", TlsMsg::szError() );
  109. return false;
  110. }
  111. auto iRequestLen = (int)tReader.GetDword();
  112. if ( iRequestLen + tReader.GetPos() > tData.second )
  113. {
  114. sphWarning ( "replication parse apply - out of buffer read %d+%d of %d", tReader.GetPos(), iRequestLen, tData.second );
  115. return false;
  116. }
  117. const BYTE * pRequest = tData.first + tReader.GetPos();
  118. tReader.SetPos ( tReader.GetPos() + iRequestLen );
  119. pCmd->m_sCluster = m_sName;
  120. pCmd->m_bIsolated = bIsolated;
  121. ByteBlob_t tReq { pRequest, iRequestLen };
  122. switch ( pCmd->m_eCommand )
  123. {
  124. case ReplCmd_e::PQUERY_ADD:
  125. if ( !PQAdd ( pCmd.get(), tReq ) )
  126. return false;
  127. break;
  128. case ReplCmd_e::PQUERY_DELETE:
  129. LoadDeleteQuery ( tReq, pCmd->m_dDeleteQueries, pCmd->m_sDeleteTags );
  130. RPL_TNX << "pq-delete, table '" << pCmd->m_sIndex.cstr() << "', queries " << pCmd->m_dDeleteQueries.GetLength() << ", tags " << pCmd->m_sDeleteTags.scstr();
  131. break;
  132. case ReplCmd_e::TRUNCATE:
  133. RPL_TNX << "pq-truncate, table '" << pCmd->m_sIndex.cstr() << "'";
  134. break;
  135. case ReplCmd_e::CLUSTER_ALTER_ADD:
  136. pCmd->m_bCheckIndex = false;
  137. RPL_TNX << "pq-cluster-alter-add, table '" << pCmd->m_sIndex.cstr() << "'";
  138. break;
  139. case ReplCmd_e::CLUSTER_ALTER_DROP:
  140. RPL_TNX << "pq-cluster-alter-drop, table '" << pCmd->m_sIndex.cstr() << "'";
  141. break;
  142. case ReplCmd_e::RT_TRX:
  143. m_tAcc.LoadRtTrx ( tReq, pCmd->m_uVersion );
  144. RPL_TNX << "rt trx, table '" << pCmd->m_sIndex.cstr() << "'";
  145. break;
  146. case ReplCmd_e::UPDATE_API:
  147. pCmd->m_pUpdateAPI = new CSphAttrUpdate;
  148. LoadAttrUpdate ( pRequest, iRequestLen, *pCmd->m_pUpdateAPI, pCmd->m_bBlobUpdate );
  149. RPL_TNX << "update, table '" << pCmd->m_sIndex.cstr() << "'";
  150. break;
  151. case ReplCmd_e::UPDATE_QL:
  152. case ReplCmd_e::UPDATE_JSON:
  153. {
  154. // can not handle multiple updates - only one update at time
  155. assert ( !m_tQuery.m_dFilters.GetLength() );
  156. pCmd->m_pUpdateAPI = new CSphAttrUpdate;
  157. int iGot = LoadAttrUpdate ( pRequest, iRequestLen, *pCmd->m_pUpdateAPI, pCmd->m_bBlobUpdate );
  158. assert ( iGot<iRequestLen );
  159. LoadUpdate ( pRequest + iGot, iRequestLen - iGot, m_tQuery );
  160. pCmd->m_pUpdateCond = &m_tQuery;
  161. RPL_TNX << "update " << ( pCmd->m_eCommand == ReplCmd_e::UPDATE_QL ? "ql" : "json" ) << ", table '" << pCmd->m_sIndex.cstr() << "'";
  162. break;
  163. }
  164. default:
  165. sphWarning ( "unsupported replication command %d", (int) pCmd->m_eCommand );
  166. return false;
  167. }
  168. m_tAcc.m_dCmd.Add ( std::move ( pCmd ) );
  169. }
  170. if ( tReader.GetPos() == tData.second )
  171. return true;
  172. sphWarning ( "replication parse apply - out of buffer read %d of %d", tReader.GetPos(), tData.second );
  173. return false;
  174. }
  175. void ReceiverCtx_c::ApplyUnordered ( ByteBlob_t foo )
  176. {
  177. sphLogDebugRpl ( "unordered byteblob size %d", foo.second );
  178. }
  179. bool ReceiverCtx_c::Commit ( const void* pHndTrx, uint32_t uFlags, const Wsrep::TrxMeta_t* pMeta, bool bCommit )
  180. {
  181. AT_SCOPE_EXIT ( [this] { Cleanup(); } );
  182. if ( !bCommit || m_tAcc.m_dCmd.IsEmpty() )
  183. return true;
  184. bool bOk = true;
  185. bool bIsolated = ( m_tAcc.m_dCmd[0]->m_bIsolated );
  186. m_tAcc.CleanReplicated();
  187. if ( bIsolated || !m_pProvider->GetApplier() ) {
  188. bOk = HandleCmdReplicated ( m_tAcc );
  189. } else
  190. {
  191. m_pProvider->GetApplier()->ApplierPreCommit ( pHndTrx );
  192. bOk = HandleCmdReplicated ( m_tAcc );
  193. m_pProvider->GetApplier()->ApplierInterimPostCommit ( pHndTrx );
  194. }
  195. if ( TlsMsg::HasErr () )
  196. sphWarning ( "%s", TlsMsg::szError () );
  197. if ( bOk )
  198. m_pProvider->GetCluster()->OnSeqnoCommited ( m_tAcc.m_tCmdReplicated, pMeta->m_tGtid.m_iSeqNo );
  199. sphLogDebugRpl ( "seq " INT64_FMT ", committed %d, isolated %d", (int64_t) pMeta->m_tGtid.m_iSeqNo, (int) bOk, (int) bIsolated );
  200. return bOk;
  201. }