|
|
@@ -243,8 +243,9 @@ SphAttr_t InsertDocData_t::GetID() const
|
|
|
//////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
|
|
|
-RtSegment_t::RtSegment_t ( DWORD uDocs )
|
|
|
+RtSegment_t::RtSegment_t ( DWORD uDocs, const ISphSchema& tSchema )
|
|
|
: m_tDeadRowMap ( uDocs )
|
|
|
+ , m_tSchema { tSchema }
|
|
|
{
|
|
|
}
|
|
|
|
|
|
@@ -316,6 +317,14 @@ const CSphRowitem * RtSegment_t::GetDocinfoByRowID ( RowID_t tRowID ) const NO_T
|
|
|
return m_dRows.GetLength() ? &m_dRows[tRowID*GetStride()] : nullptr;
|
|
|
}
|
|
|
|
|
|
+RowID_t RtSegment_t::GetAliveRowidByDocid ( DocID_t tDocID ) const
|
|
|
+{
|
|
|
+ RowID_t* pRowID = m_tDocIDtoRowID.Find ( tDocID );
|
|
|
+ if ( !pRowID || m_tDeadRowMap.IsSet ( *pRowID ) )
|
|
|
+ return INVALID_ROWID;
|
|
|
+ return *pRowID;
|
|
|
+}
|
|
|
+
|
|
|
|
|
|
RowID_t RtSegment_t::GetRowidByDocid ( DocID_t tDocID ) const
|
|
|
{
|
|
|
@@ -1056,11 +1065,47 @@ CSphVector<int> GetChunkIds ( const VecTraits_T<DiskChunkRefPtr_t> & dChunks )
|
|
|
return dIds;
|
|
|
}
|
|
|
|
|
|
-enum class WriteState_e : int
|
|
|
+class SaveState_c
|
|
|
{
|
|
|
- ENABLED, // normal
|
|
|
- DISCARD, // disabled, current result will not be necessary (can escape to don't waste resources)
|
|
|
- DISABLED, // disabled, current stage must be completed first
|
|
|
+public:
|
|
|
+
|
|
|
+ enum States_e : BYTE {
|
|
|
+ ENABLED, // normal
|
|
|
+ DISCARD, // disabled, current result will not be necessary (can escape to don't waste resources)
|
|
|
+ DISABLED, // disabled, current stage must be completed first
|
|
|
+ };
|
|
|
+
|
|
|
+ explicit SaveState_c ( States_e eValue )
|
|
|
+ : m_tValue { eValue, false } {}
|
|
|
+
|
|
|
+ void SetState ( States_e eState )
|
|
|
+ {
|
|
|
+ m_tValue.ModifyValueAndNotifyAll ( [eState] ( Value_t& t ) { t.m_eValue = eState; } );
|
|
|
+ }
|
|
|
+ void SetShutdownFlag ()
|
|
|
+ {
|
|
|
+ m_tValue.ModifyValueAndNotifyAll ( [] ( Value_t& t ) { t.m_bShutdown = true; } );
|
|
|
+ }
|
|
|
+
|
|
|
+ bool Is (States_e eValue) const { return m_tValue.GetValue().m_eValue==eValue; }
|
|
|
+
|
|
|
+ // sleep and return true when expected state achieved.
|
|
|
+ // sleep and return false if shutdown expected
|
|
|
+ bool WaitStateOrShutdown ( States_e uState ) const
|
|
|
+ {
|
|
|
+ return uState == m_tValue.Wait ( [uState] ( const Value_t& tVal ) { return tVal.m_bShutdown || ( tVal.m_eValue == uState ); } ).m_eValue;
|
|
|
+ }
|
|
|
+private:
|
|
|
+ struct Value_t
|
|
|
+ {
|
|
|
+ States_e m_eValue;
|
|
|
+ bool m_bShutdown;
|
|
|
+ Value_t ( States_e eValue, bool bShutdown )
|
|
|
+ : m_eValue { eValue }
|
|
|
+ , m_bShutdown { bShutdown }
|
|
|
+ {}
|
|
|
+ };
|
|
|
+ Coro::Waitable_T<Value_t> m_tValue;
|
|
|
};
|
|
|
|
|
|
enum class MergeSeg_e : BYTE
|
|
|
@@ -1071,7 +1116,7 @@ enum class MergeSeg_e : BYTE
|
|
|
EXIT = 4, // shutdown and exit
|
|
|
};
|
|
|
|
|
|
-class RtIndex_c final : public RtIndex_i, public ISphNoncopyable, public ISphWordlist, public ISphWordlistSuggest, public IndexUpdateHelper_c, public IndexAlterHelper_c, public DebugCheckHelper_c
|
|
|
+class RtIndex_c final : public RtIndex_i, public ISphNoncopyable, public ISphWordlist, public ISphWordlistSuggest, public IndexAlterHelper_c, public DebugCheckHelper_c
|
|
|
{
|
|
|
public:
|
|
|
RtIndex_c ( const CSphSchema & tSchema, const char * sIndexName, int64_t iRamSize, const char * sPath, bool bKeywordDict );
|
|
|
@@ -1082,7 +1127,7 @@ public:
|
|
|
bool DeleteDocument ( const VecTraits_T<DocID_t> & dDocs, CSphString & sError, RtAccum_t * pAccExt ) final;
|
|
|
bool Commit ( int * pDeleted, RtAccum_t * pAccExt, CSphString* pError = nullptr ) final;
|
|
|
void RollBack ( RtAccum_t * pAccExt ) final;
|
|
|
- int CommitReplayable ( RtSegment_t * pNewSeg, const CSphVector<DocID_t> & dAccKlist ); // returns total killed documents
|
|
|
+ int CommitReplayable ( RtSegment_t * pNewSeg, const VecTraits_T<DocID_t> & dAccKlist ); // returns total killed documents
|
|
|
void ForceRamFlush ( const char * szReason ) final;
|
|
|
bool IsFlushNeed() const final;
|
|
|
bool ForceDiskChunk() final;
|
|
|
@@ -1137,7 +1182,7 @@ public:
|
|
|
void PostSetup() final;
|
|
|
bool IsRT() const final { return true; }
|
|
|
|
|
|
- int UpdateAttributes ( AttrUpdateInc_t & tUpd, bool & bCritical, CSphString & sError, CSphString & sWarning ) final;
|
|
|
+ int CheckThenUpdateAttributes ( AttrUpdateInc_t & tUpd, bool & bCritical, CSphString & sError, CSphString & sWarning, BlockerFn&& ) final;
|
|
|
bool SaveAttributes ( CSphString & sError ) const final;
|
|
|
DWORD GetAttributeStatus () const final { return m_uDiskAttrStatus; }
|
|
|
|
|
|
@@ -1200,7 +1245,7 @@ private:
|
|
|
std::atomic<int64_t> m_iRamChunksAllocatedRAM { 0 };
|
|
|
|
|
|
std::atomic<bool> m_bOptimizeStop { false };
|
|
|
- Threads::Coro::Waitable_T<int> m_tOptimizeRuns {0};
|
|
|
+ Coro::Waitable_T<int> m_tOptimizeRuns {0};
|
|
|
friend class OptimizeGuard_c;
|
|
|
|
|
|
int64_t m_iRtMemLimit;
|
|
|
@@ -1231,7 +1276,7 @@ private:
|
|
|
int m_iMaxCodepointLength = 0;
|
|
|
TokenizerRefPtr_c m_pTokenizerIndexing;
|
|
|
bool m_bPreallocPassedOk = true;
|
|
|
- std::atomic<WriteState_e> m_eSaving { WriteState_e::ENABLED };
|
|
|
+ SaveState_c m_tSaving { SaveState_c::ENABLED };
|
|
|
bool m_bHasFiles = false;
|
|
|
|
|
|
// fixme! make this *Lens atomic together with disk/ram data, to avoid any kind of race among them
|
|
|
@@ -1253,9 +1298,6 @@ private:
|
|
|
void MergeKeywords ( RtSegment_t & tSeg, const RtSegment_t & tSeg1, const RtSegment_t & tSeg2, const VecTraits_T<RowID_t> & dRowMap1, const VecTraits_T<RowID_t> & dRowMap2 ) const;
|
|
|
RtSegment_t * MergeTwoSegments ( const RtSegment_t * pA, const RtSegment_t * pB ) const;
|
|
|
static void CopyWord ( RtSegment_t& tDstSeg, RtWord_t& tDstWord, RtDocWriter_c& tDstDoc, const RtSegment_t& tSrcSeg, const RtWord_t* pSrcWord, const VecTraits_T<RowID_t>& dRowMap );
|
|
|
- void UpdateAttributesOffline ( VecTraits_T<PostponedUpdate_t>& dUpdates, IndexSegment_c * pSeg ) override;
|
|
|
-
|
|
|
- bool UpdateAttributesInRamSegment ( const RowsToUpdate_t& dRows, UpdateContext_t& tCtx, bool& bCritical, CSphString& sError );
|
|
|
|
|
|
void DeleteFieldFromDict ( RtSegment_t * pSeg, int iKillField );
|
|
|
void AddFieldToRamchunk ( const CSphString & sFieldName, DWORD uFieldFlags, const CSphSchema & tOldSchema, const CSphSchema & tNewSchema );
|
|
|
@@ -1289,13 +1331,12 @@ private:
|
|
|
bool ReadNextWord ( SuggestResult_t & tRes, DictWord_t & tWord ) const final;
|
|
|
|
|
|
ConstRtSegmentRefPtf_t AdoptSegment ( RtSegment_t * pNewSeg );
|
|
|
- int ApplyKillList ( const CSphVector<DocID_t> & dAccKlist ) REQUIRES ( m_tWorkers.SerialChunkAccess() );
|
|
|
+ int ApplyKillList ( const VecTraits_T<DocID_t> & dAccKlist ) REQUIRES ( m_tWorkers.SerialChunkAccess() );
|
|
|
|
|
|
bool AddRemoveColumnarAttr ( RtGuard_t & tGuard, bool bAdd, const CSphString & sAttrName, ESphAttr eAttrType, const CSphSchema & tOldSchema, const CSphSchema & tNewSchema, CSphString & sError );
|
|
|
void AddRemoveRowwiseAttr ( RtGuard_t & tGuard, bool bAdd, const CSphString & sAttrName, ESphAttr eAttrType, const CSphSchema & tOldSchema, const CSphSchema & tNewSchema, CSphString & sError );
|
|
|
|
|
|
- bool Update_WriteBlobRow ( UpdateContext_t& tCtx, CSphRowitem* pDocinfo, const BYTE* pBlob, int iLength, int nBlobAttrs, const CSphAttrLocator& tBlobRowLoc, bool& bCritical, CSphString& sError ) override;
|
|
|
- bool Update_DiskChunks ( AttrUpdateInc_t& tUpd, const DiskChunkSlice_t& dDiskChunks, CSphString& sError );
|
|
|
+ bool Update_DiskChunks ( AttrUpdateInc_t& tUpd, const DiskChunkSlice_t& dDiskChunks, CSphString& sError ) REQUIRES ( m_tWorkers.SerialChunkAccess() );
|
|
|
|
|
|
void GetIndexFiles ( StrVec_t& dFiles, StrVec_t& dExt, const FilenameBuilder_i* = nullptr ) const override;
|
|
|
DocstoreBuilder_i::Doc_t * FetchDocFields ( DocstoreBuilder_i::Doc_t & tStoredDoc, const InsertDocData_t & tDoc, CSphSource_StringVector & tSrc, CSphVector<CSphVector<BYTE>> & dTmpAttrStorage ) const;
|
|
|
@@ -1321,7 +1362,7 @@ private:
|
|
|
void SetMemLimit ( int64_t iMemLimit );
|
|
|
void RecalculateRateLimit ( int64_t iSaved, int64_t iInserted, bool bEmergent );
|
|
|
void AlterSave ( bool bSaveRam );
|
|
|
- void BinlogCommit ( RtSegment_t * pSeg, const CSphVector<DocID_t> & dKlist );
|
|
|
+ void BinlogCommit ( RtSegment_t * pSeg, const VecTraits_T<DocID_t> & dKlist );
|
|
|
bool StopOptimize();
|
|
|
void UpdateUnlockedCount();
|
|
|
bool CheckSegmentConsistency ( const RtSegment_t* pNewSeg, bool bSilent=true ) const;
|
|
|
@@ -1350,6 +1391,7 @@ private:
|
|
|
// Manage alter state
|
|
|
void RaiseAlterGeneration();
|
|
|
int GetAlterGeneration() const override;
|
|
|
+ bool AlterSI ( CSphString & sError ) override;
|
|
|
};
|
|
|
|
|
|
|
|
|
@@ -1382,6 +1424,8 @@ RtIndex_c::~RtIndex_c ()
|
|
|
// From serial worker resuming on Wait() will happen after whole merger coroutine finished.
|
|
|
ScopedScheduler_c tSerialFiber { m_tWorkers.SerialChunkAccess() };
|
|
|
TRACE_SCHED ( "rt", "~RtIndex_c" );
|
|
|
+ m_tSaving.SetShutdownFlag ();
|
|
|
+ Threads::Coro::Reschedule();
|
|
|
StopMergeSegmentsWorker();
|
|
|
m_tNSavesNow.Wait ( [] ( int iVal ) { return iVal==0; } );
|
|
|
}
|
|
|
@@ -1390,10 +1434,10 @@ RtIndex_c::~RtIndex_c ()
|
|
|
bool bValid = m_pTokenizer && m_pDict && m_bPreallocPassedOk;
|
|
|
|
|
|
if ( bValid )
|
|
|
- {
|
|
|
- SaveRamChunk();
|
|
|
+ bValid &= SaveRamChunk();
|
|
|
+
|
|
|
+ if ( bValid )
|
|
|
SaveMeta();
|
|
|
- }
|
|
|
|
|
|
if ( m_iLockFD>=0 )
|
|
|
::close ( m_iLockFD );
|
|
|
@@ -1411,15 +1455,14 @@ RtIndex_c::~RtIndex_c ()
|
|
|
sFile.SetSprintf ( "%s%s", m_sPath.cstr(), sphGetExt ( SPH_EXT_SETTINGS ) );
|
|
|
::unlink ( sFile.cstr() );
|
|
|
}
|
|
|
+ if ( !bValid )
|
|
|
+ return;
|
|
|
|
|
|
tmSave = sphMicroTimer() - tmSave;
|
|
|
- if ( tmSave>=1000 && bValid )
|
|
|
- {
|
|
|
- sphInfo ( "rt: index %s: ramchunk saved in %d.%03d sec",
|
|
|
- m_sIndexName.cstr(), (int)(tmSave/1000000), (int)((tmSave/1000)%1000) );
|
|
|
- }
|
|
|
+ if ( tmSave>=1000 )
|
|
|
+ sphInfo ( "rt: index %s: ramchunk saved in %d.%03d sec", m_sIndexName.cstr(), (int)(tmSave/1000000), (int)((tmSave/1000)%1000) );
|
|
|
|
|
|
- if ( !sphInterrupted() && bValid )
|
|
|
+ if ( !sphInterrupted() )
|
|
|
sphLogDebug ( "closed index %s, valid %d, deleted %d, time %d.%03d sec", m_sIndexName.cstr(), (int)bValid, (int)m_bIndexDeleted, (int)(tmSave/1000000), (int)((tmSave/1000)%1000) );
|
|
|
}
|
|
|
|
|
|
@@ -1435,8 +1478,10 @@ int RtIndex_c::GetAlterGeneration() const
|
|
|
|
|
|
void RtIndex_c::UpdateUnlockedCount()
|
|
|
{
|
|
|
- if ( !m_bDebugCheck )
|
|
|
- m_tUnLockedSegments.UpdateValueAndNotifyAll ( (int)m_tRtChunks.RamSegs()->count_of ( [] ( auto& dSeg ) { return !dSeg->m_iLocked; } ) );
|
|
|
+ if ( m_bDebugCheck )
|
|
|
+ return;
|
|
|
+
|
|
|
+ m_tUnLockedSegments.UpdateValueAndNotifyAll ( (int)m_tRtChunks.RamSegs()->count_of ( [] ( auto& dSeg ) { return !dSeg->m_iLocked; } ) );
|
|
|
}
|
|
|
|
|
|
void RtIndex_c::ProcessDiskChunk ( int iChunk, VisitChunk_fn&& fnVisitor ) const
|
|
|
@@ -1479,7 +1524,7 @@ bool RtIndex_c::IsFlushNeed() const
|
|
|
if ( Binlog::IsActive() && m_iTID<=m_iSavedTID )
|
|
|
return false;
|
|
|
|
|
|
- return m_eSaving.load(std::memory_order_relaxed)==WriteState_e::ENABLED;
|
|
|
+ return m_tSaving.Is ( SaveState_c::ENABLED );
|
|
|
}
|
|
|
|
|
|
static int64_t SegmentsGetUsedRam ( const ConstRtSegmentSlice_t& dSegments )
|
|
|
@@ -2009,7 +2054,7 @@ RtSegment_t * CreateSegment ( RtAccum_t* pAcc, int iWordsCheckpoint, ESphHitless
|
|
|
return nullptr;
|
|
|
|
|
|
MEMORY ( MEM_RT_ACCUM );
|
|
|
- auto * pSeg = new RtSegment_t ( pAcc->m_uAccumDocs );
|
|
|
+ auto * pSeg = new RtSegment_t ( pAcc->m_uAccumDocs, pAcc->m_pIndex->GetInternalSchema() );
|
|
|
FakeWL_t tFakeLock {pSeg->m_tLock};
|
|
|
CreateSegmentHits ( *pAcc, pSeg, iWordsCheckpoint, eHitless, dHitlessWords );
|
|
|
|
|
|
@@ -2457,7 +2502,7 @@ RtSegment_t* RtIndex_c::MergeTwoSegments ( const RtSegment_t* pA, const RtSegmen
|
|
|
auto pColumnarBuilder = CreateColumnarBuilderRT(m_tSchema);
|
|
|
RtAttrMergeContext_t tCtx ( nBlobAttrs, tNextRowID, pColumnarBuilder.get() );
|
|
|
|
|
|
- auto * pSeg = new RtSegment_t (0);
|
|
|
+ auto * pSeg = new RtSegment_t (0, m_tSchema);
|
|
|
FakeWL_t _ { pSeg->m_tLock }; // as pSeg is just created - we don't need real guarding and use fake lock to mute thread safety warnings
|
|
|
|
|
|
assert ( !!pA->m_pDocstore==!!pB->m_pDocstore );
|
|
|
@@ -2538,12 +2583,12 @@ namespace GatherUpdates {
|
|
|
const VecTraits_T<PostponedUpdate_t>& AccessPostponedUpdates ( const ConstDiskChunkRefPtr_t& pChunk ) { return pChunk->Cidx ().m_dPostponedUpdates; }
|
|
|
|
|
|
template<typename CHUNK_OR_SEG>
|
|
|
- CSphVector<PostponedUpdate_t> FromChunksOrSegments ( VecTraits_T<CHUNK_OR_SEG> dChunksOrSegmengs )
|
|
|
+ CSphVector<PostponedUpdate_t> FromChunksOrSegments ( VecTraits_T<CHUNK_OR_SEG> dChunksOrSegments )
|
|
|
{
|
|
|
CSphVector<PostponedUpdate_t> dResult;
|
|
|
CSphVector<HashedUpd_t> dUpdates;
|
|
|
|
|
|
- for ( const auto& dSeg : dChunksOrSegmengs )
|
|
|
+ for ( const auto& dSeg : dChunksOrSegments )
|
|
|
{
|
|
|
const VecTraits_T<PostponedUpdate_t>& dPostponedUpdates = AccessPostponedUpdates (dSeg);
|
|
|
if ( dPostponedUpdates.IsEmpty () )
|
|
|
@@ -2574,59 +2619,37 @@ namespace GatherUpdates {
|
|
|
}; // namespace
|
|
|
|
|
|
|
|
|
-bool RtIndex_c::UpdateAttributesInRamSegment ( const RowsToUpdate_t& dRows, UpdateContext_t& tCtx, bool& bCritical, CSphString& sError ) REQUIRES ( m_tWorkers.SerialChunkAccess() )
|
|
|
-{
|
|
|
- TRACE_SCHED ( "rt", "UpdateAttributesInRamSegment" );
|
|
|
- if ( tCtx.m_tUpd.m_pUpdate->m_bStrict )
|
|
|
- if ( !Update_InplaceJson ( dRows, tCtx, sError, true ) )
|
|
|
- return false;
|
|
|
-
|
|
|
- // second pass
|
|
|
- int iSaveWarnings = tCtx.m_iJsonWarnings;
|
|
|
- tCtx.m_iJsonWarnings = 0;
|
|
|
- Update_InplaceJson ( dRows, tCtx, sError, false );
|
|
|
- tCtx.m_iJsonWarnings += iSaveWarnings;
|
|
|
-
|
|
|
- if ( !Update_Blobs ( dRows, tCtx, bCritical, sError ) )
|
|
|
- return false;
|
|
|
-
|
|
|
- Update_Plain ( dRows, tCtx );
|
|
|
- return true;
|
|
|
-}
|
|
|
-
|
|
|
// that is 2-nd part of postponed updates. We may have one or several update set, stored from old segments.
|
|
|
-void RtIndex_c::UpdateAttributesOffline ( VecTraits_T<PostponedUpdate_t> & dUpdates, IndexSegment_c * pSeg ) NO_THREAD_SAFETY_ANALYSIS
|
|
|
+void RtSegment_t::UpdateAttributesOffline ( VecTraits_T<PostponedUpdate_t>& dPostUpdates ) NO_THREAD_SAFETY_ANALYSIS
|
|
|
{
|
|
|
- if ( dUpdates.IsEmpty() )
|
|
|
+ if ( dPostUpdates.IsEmpty() )
|
|
|
return;
|
|
|
|
|
|
- assert ( pSeg && "for RT index UpdateAttributesOffline should be called only with non-null pSeg!" );
|
|
|
- auto * pSegment = (RtSegment_t *) pSeg;
|
|
|
-
|
|
|
CSphString sError;
|
|
|
bool bCritical;
|
|
|
- for ( auto & tUpdate : dUpdates )
|
|
|
+
|
|
|
+ assert ( GetStride() == m_tSchema.GetRowSize() );
|
|
|
+ for ( auto & tPostUpdate : dPostUpdates )
|
|
|
{
|
|
|
- AttrUpdateInc_t tUpdInc { std::move ( tUpdate.m_pUpdate ) };
|
|
|
+ AttrUpdateInc_t tUpdInc { std::move ( tPostUpdate.m_pUpdate ) };
|
|
|
UpdateContext_t tCtx ( tUpdInc, m_tSchema );
|
|
|
- Update_PrepareListOfUpdatedAttributes ( tCtx, sError );
|
|
|
+ tCtx.PrepareListOfUpdatedAttributes ( sError );
|
|
|
|
|
|
// actualize list of updates in context of new segment
|
|
|
const auto & dDocids = tUpdInc.m_pUpdate->m_dDocids;
|
|
|
- ARRAY_FOREACH ( i, tUpdate.m_dRowsToUpdate )
|
|
|
+ ARRAY_FOREACH ( i, tPostUpdate.m_dRowsToUpdate )
|
|
|
{
|
|
|
- auto& tRow = tUpdate.m_dRowsToUpdate[i];
|
|
|
- auto pRow = pSegment->FindAliveRow ( dDocids[tRow.m_iIdx] );
|
|
|
- if ( pRow )
|
|
|
- tRow.m_pRow = pRow;
|
|
|
+ auto& tRow = tPostUpdate.m_dRowsToUpdate[i];
|
|
|
+ auto tRowID = GetAliveRowidByDocid ( dDocids[tRow.m_iIdx] );
|
|
|
+ if ( tRowID==INVALID_ROWID )
|
|
|
+ tPostUpdate.m_dRowsToUpdate.RemoveFast ( i-- );
|
|
|
else
|
|
|
- tUpdate.m_dRowsToUpdate.RemoveFast ( i-- );
|
|
|
+ tRow.m_tRow = tRowID;
|
|
|
}
|
|
|
|
|
|
- tCtx.m_pAttrPool = pSegment->m_dRows.begin();
|
|
|
- tCtx.m_pBlobPool = pSegment->m_dBlobs.begin();
|
|
|
- tCtx.m_pSegment = pSeg;
|
|
|
- UpdateAttributesInRamSegment ( tUpdate.m_dRowsToUpdate, tCtx, bCritical, sError );
|
|
|
+ tCtx.m_pAttrPool = m_dRows.begin();
|
|
|
+ tCtx.m_pBlobPool = m_dBlobs.begin();
|
|
|
+ Update_UpdateAttributes ( tPostUpdate.m_dRowsToUpdate, tCtx, bCritical, sError );
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -2744,7 +2767,7 @@ ConstRtSegmentRefPtf_t RtIndex_c::AdoptSegment ( RtSegment_t * pNewSeg )
|
|
|
|
|
|
// CommitReplayable -> ApplyKillList
|
|
|
// AttachDiskIndex -> ApplyKillList
|
|
|
-int RtIndex_c::ApplyKillList ( const CSphVector<DocID_t> & dAccKlist )
|
|
|
+int RtIndex_c::ApplyKillList ( const VecTraits_T<DocID_t> & dAccKlist )
|
|
|
{
|
|
|
if ( dAccKlist.IsEmpty() )
|
|
|
return 0;
|
|
|
@@ -2754,8 +2777,27 @@ int RtIndex_c::ApplyKillList ( const CSphVector<DocID_t> & dAccKlist )
|
|
|
|
|
|
int iKilled = 0;
|
|
|
auto pChunks = m_tRtChunks.DiskChunks();
|
|
|
- for ( auto& pChunk : *pChunks )
|
|
|
- iKilled += pChunk->CastIdx().KillMulti ( dAccKlist );
|
|
|
+
|
|
|
+ if ( m_tSaving.Is ( SaveState_c::ENABLED ) )
|
|
|
+ for ( auto& pChunk : *pChunks )
|
|
|
+ iKilled += pChunk->CastIdx().KillMulti ( dAccKlist );
|
|
|
+ else
|
|
|
+ {
|
|
|
+ // if saving is disabled, and we NEED to actually mark a doc in disk chunk as deleted,
|
|
|
+ // we'll pause that action, waiting until index is unlocked.
|
|
|
+ bool bNeedWait = true;
|
|
|
+ bool bEnabled = false;
|
|
|
+ for ( auto& pChunk : *pChunks )
|
|
|
+ iKilled += pChunk->CastIdx().CheckThenKillMulti ( dAccKlist, [this,&bNeedWait, &bEnabled]()
|
|
|
+ {
|
|
|
+ if ( bNeedWait )
|
|
|
+ {
|
|
|
+ bNeedWait = false;
|
|
|
+ bEnabled = m_tSaving.WaitStateOrShutdown ( SaveState_c::ENABLED );
|
|
|
+ }
|
|
|
+ return bEnabled;
|
|
|
+ });
|
|
|
+ }
|
|
|
|
|
|
auto pSegs = m_tRtChunks.RamSegs();
|
|
|
for ( auto& pSeg : *pSegs )
|
|
|
@@ -2969,7 +3011,7 @@ bool RtIndex_c::MergeSegmentsStep ( MergeSeg_e eVal ) REQUIRES ( m_tWorkers.Seri
|
|
|
dOld.Add ( pA );
|
|
|
dOld.Add ( pB );
|
|
|
auto dUpdates = GatherUpdates::FromChunksOrSegments ( dOld );
|
|
|
- UpdateAttributesOffline ( dUpdates, pMerged );
|
|
|
+ pMerged->UpdateAttributesOffline ( dUpdates );
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -3041,7 +3083,7 @@ int CommitID() {
|
|
|
}
|
|
|
} // namespace
|
|
|
|
|
|
-int RtIndex_c::CommitReplayable ( RtSegment_t * pNewSeg, const CSphVector<DocID_t> & dAccKlist ) REQUIRES_SHARED ( pNewSeg->m_tLock )
|
|
|
+int RtIndex_c::CommitReplayable ( RtSegment_t * pNewSeg, const VecTraits_T<DocID_t> & dAccKlist ) REQUIRES_SHARED ( pNewSeg->m_tLock )
|
|
|
{
|
|
|
// store statistics, because pNewSeg just might get merged
|
|
|
const int iId = CommitID();
|
|
|
@@ -3181,18 +3223,6 @@ struct SaveDiskDataContext_t : public BuildHeader_t
|
|
|
};
|
|
|
|
|
|
|
|
|
-struct CmpDocidLookup_fn
|
|
|
-{
|
|
|
- static inline bool IsLess ( const DocidRowidPair_t & a, const DocidRowidPair_t & b )
|
|
|
- {
|
|
|
- if ( a.m_tDocID==b.m_tDocID )
|
|
|
- return a.m_tRowID < b.m_tRowID;
|
|
|
-
|
|
|
- return (uint64_t)a.m_tDocID < (uint64_t)b.m_tDocID;
|
|
|
- }
|
|
|
-};
|
|
|
-
|
|
|
-
|
|
|
bool RtIndex_c::WriteAttributes ( SaveDiskDataContext_t & tCtx, CSphString & sError ) const
|
|
|
{
|
|
|
CSphString sSPA, sSPB, sSPT, sSPHI, sSPDS, sSPC, sSIdx;
|
|
|
@@ -3767,7 +3797,7 @@ bool RtIndex_c::SaveDiskHeader ( SaveDiskDataContext_t & tCtx, const ChunkStats_
|
|
|
|
|
|
void RtIndex_c::SaveMeta ( int64_t iTID, VecTraits_T<int> dChunkNames )
|
|
|
{
|
|
|
- if ( m_eSaving.load ( std::memory_order_relaxed ) != WriteState_e::ENABLED )
|
|
|
+ if ( !m_tSaving.Is ( SaveState_c::ENABLED ) )
|
|
|
return;
|
|
|
|
|
|
// sanity check
|
|
|
@@ -3886,7 +3916,7 @@ int64_t RtIndex_c::GetMemCount ( PRED&& fnPred ) const
|
|
|
// i.e. create new disk chunk from ram segments
|
|
|
bool RtIndex_c::SaveDiskChunk ( bool bForced, bool bEmergent, bool bBootstrap ) REQUIRES ( m_tWorkers.SerialChunkAccess() )
|
|
|
{
|
|
|
- if ( m_eSaving.load(std::memory_order_relaxed) != WriteState_e::ENABLED ) // fixme! review, m.b. refactor
|
|
|
+ if ( !m_tSaving.Is ( SaveState_c::ENABLED ) ) // fixme! review, m.b. refactor
|
|
|
return !bBootstrap;
|
|
|
|
|
|
assert ( Coro::CurrentScheduler() == m_tWorkers.SerialChunkAccess() );
|
|
|
@@ -4011,7 +4041,7 @@ bool RtIndex_c::SaveDiskChunk ( bool bForced, bool bEmergent, bool bBootstrap )
|
|
|
if ( !dUpdates.IsEmpty () )
|
|
|
{
|
|
|
RTLOGV << "SaveDiskChunk: apply postponed updates";
|
|
|
- pNewChunk->UpdateAttributesOffline ( dUpdates, pNewChunk.get() );
|
|
|
+ pNewChunk->UpdateAttributesOffline ( dUpdates );
|
|
|
dUpdates.Reset();
|
|
|
}
|
|
|
|
|
|
@@ -4635,8 +4665,8 @@ void RtIndex_c::SaveRamFieldLengths ( CSphWriter& wrChunk ) const
|
|
|
|
|
|
bool RtIndex_c::SaveRamChunk ()
|
|
|
{
|
|
|
- if ( m_eSaving.load ( std::memory_order_relaxed ) != WriteState_e::ENABLED )
|
|
|
- return true;
|
|
|
+ if ( !m_tSaving.Is ( SaveState_c::ENABLED ) )
|
|
|
+ return false;
|
|
|
|
|
|
MEMORY ( MEM_INDEX_RT );
|
|
|
|
|
|
@@ -4714,7 +4744,7 @@ bool RtIndex_c::LoadRamChunk ( DWORD uVersion, bool bRebuildInfixes, bool bFixup
|
|
|
{
|
|
|
DWORD uRows = rdChunk.GetDword();
|
|
|
|
|
|
- RtSegmentRefPtf_t pSeg {new RtSegment_t ( uRows )};
|
|
|
+ RtSegmentRefPtf_t pSeg { new RtSegment_t ( uRows, m_tSchema ) };
|
|
|
pSeg->m_uRows = uRows;
|
|
|
pSeg->m_tAliveRows.store ( rdChunk.GetDword (), std::memory_order_relaxed );
|
|
|
|
|
|
@@ -7690,12 +7720,12 @@ CSphFixedVector<RowsToUpdateData_t> Update_CollectRowPtrs ( UpdateContext_t & tC
|
|
|
if ( !tCtx.m_tUpd.m_dUpdated.BitGet ( i ) )
|
|
|
ARRAY_CONSTFOREACH ( j, tSegments )
|
|
|
{
|
|
|
- auto pRow = tSegments[j]->FindAliveRow ( dDocids[i] );
|
|
|
- if ( !pRow )
|
|
|
+ auto tRowID = tSegments[j]->GetAliveRowidByDocid( dDocids[i] );
|
|
|
+ if ( tRowID==INVALID_ROWID )
|
|
|
continue;
|
|
|
|
|
|
auto& dUpd = dUpdateSets[j].Add();
|
|
|
- dUpd.m_pRow = pRow;
|
|
|
+ dUpd.m_tRow = tRowID;
|
|
|
dUpd.m_iIdx = i;
|
|
|
}
|
|
|
return dUpdateSets;
|
|
|
@@ -7708,52 +7738,59 @@ CSphFixedVector<RowsToUpdateData_t> Update_CollectRowPtrs ( UpdateContext_t & tC
|
|
|
// to final resulting chunk/segment. That bit set before merging attributes and exists till the end of segment's lifetime.
|
|
|
// Here is first part of postponed merge - after update we collect docs updated in segment and store them into vec of
|
|
|
// updates (as it might happen be more than one update during the operation)
|
|
|
-static void AddDerivedUpdate ( const RowsToUpdate_t & dRows, const UpdateContext_t & tCtx )
|
|
|
+void RtSegment_t::MaybeAddPostponedUpdate ( const RowsToUpdate_t& dRows, const UpdateContext_t& tCtx )
|
|
|
{
|
|
|
- auto & tUpd = tCtx.m_tUpd;
|
|
|
+ if ( !m_bAttrsBusy.load ( std::memory_order_acquire ) )
|
|
|
+ return;
|
|
|
+
|
|
|
+ // segment is now saving/merging - add postponed update.
|
|
|
+ auto& tUpd = tCtx.m_tUpd;
|
|
|
// count exact N of affected rows (no need to waste space for reserve in this route at all)
|
|
|
- auto iRows = dRows.count_of ( [&tUpd] ( auto & i ) { return tUpd.m_dUpdated.BitGet ( i.m_iIdx ); } );
|
|
|
+ auto iRows = dRows.count_of ( [&tUpd] ( auto& i ) { return tUpd.m_dUpdated.BitGet ( i.m_iIdx ); } );
|
|
|
|
|
|
if ( !iRows )
|
|
|
return;
|
|
|
|
|
|
- auto * pSegment = (RtSegment_t *) tCtx.m_pSegment;
|
|
|
- assert (pSegment);
|
|
|
-
|
|
|
- auto& tNewDerivedUpdate = pSegment->m_dPostponedUpdates.Add();
|
|
|
- tNewDerivedUpdate.m_pUpdate = MakeReusableUpdate ( tUpd.m_pUpdate );
|
|
|
- tNewDerivedUpdate.m_dRowsToUpdate.Reserve (iRows);
|
|
|
+ auto& tNewPostponedUpdate = m_dPostponedUpdates.Add();
|
|
|
+ tNewPostponedUpdate.m_pUpdate = MakeReusableUpdate ( tUpd.m_pUpdate );
|
|
|
+ tNewPostponedUpdate.m_dRowsToUpdate.Reserve ( iRows );
|
|
|
|
|
|
// collect indexes of actually updated rows and docids
|
|
|
- dRows.for_each ( [&tUpd, &tNewDerivedUpdate] ( auto & i ) {
|
|
|
+ dRows.for_each ( [&tUpd, &tNewPostponedUpdate] ( const auto& i ) {
|
|
|
if ( tUpd.m_dUpdated.BitGet ( i.m_iIdx ) )
|
|
|
- tNewDerivedUpdate.m_dRowsToUpdate.Add ().m_iIdx = i.m_iIdx;
|
|
|
+ tNewPostponedUpdate.m_dRowsToUpdate.Add().m_iIdx = i.m_iIdx;
|
|
|
});
|
|
|
}
|
|
|
|
|
|
-bool RtIndex_c::Update_DiskChunks ( AttrUpdateInc_t& tUpd, const DiskChunkSlice_t& dDiskChunks, CSphString & sError )
|
|
|
+bool RtIndex_c::Update_DiskChunks ( AttrUpdateInc_t& tUpd, const DiskChunkSlice_t& dDiskChunks, CSphString & sError ) REQUIRES ( m_tWorkers.SerialChunkAccess() )
|
|
|
{
|
|
|
+ assert ( Coro::CurrentScheduler() == m_tWorkers.SerialChunkAccess() );
|
|
|
bool bCritical = false;
|
|
|
CSphString sWarning;
|
|
|
|
|
|
- // That seems to be only place where order of disk chunks is important.
|
|
|
- // About deduplication: order of new-born disk chunks is important, as they may contain replaces for older docs.
|
|
|
- // Since we don't consider killed documents during update, it is important to update from freshest to oldest,
|
|
|
- // this order ensures that actual documents updated. However, for optimized (merged/splitted) chunks that is not
|
|
|
- // important as kill-lists applied during merge, and so resulting chunks have only 'freshest' version, because
|
|
|
- // all previous are effectively excluded during merge pass.
|
|
|
- // (If we refactor updates to scan rows taking in account kill-lists, order will not be important at all).
|
|
|
- for ( int iChunk = dDiskChunks.GetLength()-1; iChunk>=0; --iChunk )
|
|
|
+ bool bEnabled = m_tSaving.Is ( SaveState_c::ENABLED );
|
|
|
+ bool bNeedWait = !bEnabled;
|
|
|
+
|
|
|
+ // if saving is disabled, and we NEED to actually update a disk chunk,
|
|
|
+ // we'll pause that action, waiting until index is unlocked.
|
|
|
+ BlockerFn fnBlock = [this, &bNeedWait, &bEnabled]() {
|
|
|
+ if ( bNeedWait )
|
|
|
+ {
|
|
|
+ bNeedWait = false;
|
|
|
+ bEnabled = m_tSaving.WaitStateOrShutdown ( SaveState_c::ENABLED );
|
|
|
+ }
|
|
|
+ return bEnabled;
|
|
|
+ };
|
|
|
+
|
|
|
+ for ( auto& pDiskChunk : dDiskChunks )
|
|
|
{
|
|
|
if ( tUpd.AllApplied () )
|
|
|
break;
|
|
|
|
|
|
- auto& pDiskChunk = dDiskChunks[iChunk];
|
|
|
-
|
|
|
// acquire fine-grain lock
|
|
|
SccWL_t wLock ( pDiskChunk->m_tLock );
|
|
|
|
|
|
- int iRes = pDiskChunk->CastIdx().UpdateAttributes ( tUpd, bCritical, sError, sWarning );
|
|
|
+ int iRes = pDiskChunk->CastIdx().CheckThenUpdateAttributes ( tUpd, bCritical, sError, sWarning, bNeedWait ? fnBlock : nullptr );
|
|
|
|
|
|
// FIXME! need to handle critical failures here (chunk is unusable at this point)
|
|
|
assert ( !bCritical );
|
|
|
@@ -7769,41 +7806,37 @@ bool RtIndex_c::Update_DiskChunks ( AttrUpdateInc_t& tUpd, const DiskChunkSlice_
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
-// thread-safe, as segment is locked up level before calling UpdateAttributesInRamSegment
|
|
|
-bool RtIndex_c::Update_WriteBlobRow ( UpdateContext_t & tCtx, CSphRowitem * pDocinfo, const BYTE * pBlob, int iLength,
|
|
|
+// thread-safe, as segment is locked up level before calling RAM segment update
|
|
|
+bool RtSegment_t::Update_WriteBlobRow ( UpdateContext_t & tCtx, RowID_t tRowID, ByteBlob_t tBlob,
|
|
|
int nBlobAttrs, const CSphAttrLocator & tBlobRowLoc, bool & bCritical, CSphString & sError ) NO_THREAD_SAFETY_ANALYSIS
|
|
|
{
|
|
|
// fixme! Ensure pSegment->m_tLock acquired exclusively...
|
|
|
- auto* pSegment = (RtSegment_t*)tCtx.m_pSegment;
|
|
|
- assert ( pSegment );
|
|
|
+ auto pDocinfo = tCtx.GetDocinfo ( tRowID );
|
|
|
+ BYTE* pExistingBlob = m_dBlobs.begin() + sphGetRowAttr ( pDocinfo, tBlobRowLoc );
|
|
|
+ DWORD uExistingBlobLen = sphGetBlobTotalLen ( pExistingBlob, nBlobAttrs );
|
|
|
|
|
|
bCritical = false;
|
|
|
|
|
|
- CSphTightVector<BYTE> & dBlobPool = pSegment->m_dBlobs;
|
|
|
-
|
|
|
- BYTE * pExistingBlob = dBlobPool.Begin() + sphGetRowAttr ( pDocinfo, tBlobRowLoc );
|
|
|
- DWORD uExistingBlobLen = sphGetBlobTotalLen ( pExistingBlob, nBlobAttrs );
|
|
|
-
|
|
|
// overwrite old record
|
|
|
- if ( (DWORD)iLength<=uExistingBlobLen )
|
|
|
+ if ( (DWORD)tBlob.second<=uExistingBlobLen )
|
|
|
{
|
|
|
- memcpy ( pExistingBlob, pBlob, iLength );
|
|
|
+ memcpy ( pExistingBlob, tBlob.first, tBlob.second );
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- int iPoolSize = dBlobPool.GetLength();
|
|
|
- dBlobPool.Append ( pBlob, iLength );
|
|
|
+ int iPoolSize = m_dBlobs.GetLength();
|
|
|
+ m_dBlobs.Append ( tBlob );
|
|
|
|
|
|
sphSetRowAttr ( pDocinfo, tBlobRowLoc, iPoolSize );
|
|
|
|
|
|
// update blob pool ptrs since they could have changed after the resize
|
|
|
- tCtx.m_pBlobPool = dBlobPool.begin();
|
|
|
+ tCtx.m_pBlobPool = m_dBlobs.begin();
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
|
|
|
// FIXME! might be inconsistent in case disk chunk update fails
|
|
|
-int RtIndex_c::UpdateAttributes ( AttrUpdateInc_t & tUpd, bool & bCritical, CSphString & sError, CSphString & sWarning )
|
|
|
+int RtIndex_c::CheckThenUpdateAttributes ( AttrUpdateInc_t& tUpd, bool& bCritical, CSphString& sError, CSphString& sWarning, BlockerFn&& fnWatcher )
|
|
|
{
|
|
|
const auto& tUpdc = *tUpd.m_pUpdate;
|
|
|
assert ( tUpdc.m_dRowOffset.IsEmpty() || tUpdc.m_dDocids.GetLength()==tUpdc.m_dRowOffset.GetLength() );
|
|
|
@@ -7815,19 +7848,12 @@ int RtIndex_c::UpdateAttributes ( AttrUpdateInc_t & tUpd, bool & bCritical, CSph
|
|
|
if ( m_tRtChunks.IsEmpty() )
|
|
|
return 0;
|
|
|
|
|
|
- if ( m_eSaving.load ( std::memory_order_relaxed ) == WriteState_e::DISABLED )
|
|
|
- {
|
|
|
- sError = "index is locked now, try again later";
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
int iUpdated = tUpd.m_iAffected;
|
|
|
-
|
|
|
- UpdateContext_t tCtx ( tUpd, m_tSchema );
|
|
|
- if ( !Update_CheckAttributes ( *tCtx.m_tUpd.m_pUpdate, tCtx.m_tSchema, sError ) )
|
|
|
+ if ( !Update_CheckAttributes ( *tUpd.m_pUpdate, m_tSchema, sError ) )
|
|
|
return -1;
|
|
|
|
|
|
- Update_PrepareListOfUpdatedAttributes ( tCtx, sError );
|
|
|
+ UpdateContext_t tCtx ( tUpd, m_tSchema );
|
|
|
+ tCtx.PrepareListOfUpdatedAttributes ( sError );
|
|
|
|
|
|
// do update in serial fiber. That ensures no concurrency with set of chunks changing, however need to dispatch
|
|
|
// with changers themselves (merge segments, merge chunks, save disk chunks).
|
|
|
@@ -7842,18 +7868,21 @@ int RtIndex_c::UpdateAttributes ( AttrUpdateInc_t & tUpd, bool & bCritical, CSph
|
|
|
if ( dRamUpdateSets[i].IsEmpty() )
|
|
|
continue;
|
|
|
|
|
|
+ if ( fnWatcher && !fnWatcher() )
|
|
|
+ return -1;
|
|
|
+
|
|
|
auto* pSeg = const_cast<RtSegment_t*> ( (const RtSegment_t*)tGuard.m_dRamSegs[i] );
|
|
|
SccWL_t wLock ( pSeg->m_tLock );
|
|
|
|
|
|
+ assert ( pSeg->GetStride() == m_tSchema.GetRowSize() );
|
|
|
+
|
|
|
// point context to target segment
|
|
|
tCtx.m_pAttrPool = pSeg->m_dRows.begin();
|
|
|
tCtx.m_pBlobPool = pSeg->m_dBlobs.begin();
|
|
|
- tCtx.m_pSegment = pSeg;
|
|
|
- if ( !UpdateAttributesInRamSegment ( dRamUpdateSets[i], tCtx, bCritical, sError ) )
|
|
|
+ if ( !pSeg->Update_UpdateAttributes ( dRamUpdateSets[i], tCtx, bCritical, sError ) )
|
|
|
return -1;
|
|
|
|
|
|
- if ( pSeg->m_bAttrsBusy.load ( std::memory_order_acquire ) )
|
|
|
- AddDerivedUpdate ( dRamUpdateSets[i], tCtx ); // segment is now saving/merging - add postponed update.
|
|
|
+ pSeg->MaybeAddPostponedUpdate( dRamUpdateSets[i], tCtx );
|
|
|
|
|
|
if ( tUpd.AllApplied () )
|
|
|
break;
|
|
|
@@ -7866,7 +7895,7 @@ int RtIndex_c::UpdateAttributes ( AttrUpdateInc_t & tUpd, bool & bCritical, CSph
|
|
|
Binlog::CommitUpdateAttributes ( &m_iTID, m_sIndexName.cstr(), tUpdc );
|
|
|
|
|
|
iUpdated = tUpd.m_iAffected - iUpdated;
|
|
|
- if ( !Update_HandleJsonWarnings ( tCtx, iUpdated, sWarning, sError ) )
|
|
|
+ if ( !tCtx.HandleJsonWarnings ( iUpdated, sWarning, sError ) )
|
|
|
return -1;
|
|
|
|
|
|
// all done
|
|
|
@@ -7881,7 +7910,7 @@ bool RtIndex_c::SaveAttributes ( CSphString & sError ) const
|
|
|
|
|
|
const auto& pDiskChunks = m_tRtChunks.DiskChunks();
|
|
|
|
|
|
- if ( pDiskChunks->IsEmpty() || ( m_eSaving.load ( std::memory_order_relaxed ) == WriteState_e::DISCARD ) )
|
|
|
+ if ( pDiskChunks->IsEmpty() || m_tSaving.Is ( SaveState_c::DISCARD ) )
|
|
|
return true;
|
|
|
|
|
|
for ( auto& pChunk : *pDiskChunks )
|
|
|
@@ -8187,7 +8216,7 @@ bool RtIndex_c::AttachDiskIndex ( CSphIndex* pIndex, bool bTruncate, bool & bFat
|
|
|
return false;
|
|
|
|
|
|
// safeguards
|
|
|
- // we do not support some of the disk index features in RT just yet
|
|
|
+ // we do not support some disk index features in RT just yet
|
|
|
#define LOC_ERROR(_arg) { sError = _arg; return false; }
|
|
|
const CSphIndexSettings & tSettings = pIndex->GetSettings();
|
|
|
if ( tSettings.m_iStopwordStep!=1 )
|
|
|
@@ -8627,7 +8656,7 @@ static int64_t NumAliveDocs ( const CSphIndex& dChunk )
|
|
|
return dChunk.GetStats().m_iTotalDocuments - tStatus.m_iDead;
|
|
|
}
|
|
|
|
|
|
-void RtIndex_c::BinlogCommit ( RtSegment_t * pSeg, const CSphVector<DocID_t> & dKlist ) REQUIRES ( pSeg->m_tLock )
|
|
|
+void RtIndex_c::BinlogCommit ( RtSegment_t * pSeg, const VecTraits_T<DocID_t> & dKlist ) REQUIRES ( pSeg->m_tLock )
|
|
|
{
|
|
|
// Tracer::AsyncOp tTracer ( "rt", "RtIndex_c::BinlogCommit" );
|
|
|
Binlog::Commit ( Binlog::COMMIT, &m_iTID, m_sIndexName.cstr(), false, [pSeg,&dKlist,this] (CSphWriter& tWriter) REQUIRES ( pSeg->m_tLock )
|
|
|
@@ -8691,7 +8720,7 @@ Binlog::CheckTnxResult_t RtIndex_c::ReplayCommit ( CSphReader & tReader, CSphStr
|
|
|
DWORD uRows = tReader.UnzipOffset();
|
|
|
if ( uRows )
|
|
|
{
|
|
|
- pSeg = new RtSegment_t(uRows);
|
|
|
+ pSeg = new RtSegment_t(uRows, m_tSchema);
|
|
|
FakeWL_t _ ( pSeg->m_tLock );
|
|
|
pSeg->m_uRows = uRows;
|
|
|
pSeg->m_tAliveRows.store ( uRows, std::memory_order_relaxed );
|
|
|
@@ -8883,7 +8912,7 @@ bool RtIndex_c::CompressOneChunk ( int iChunkID, int* pAffected )
|
|
|
auto& dUpdates = pVictim->CastIdx().m_dPostponedUpdates;
|
|
|
if ( !dUpdates.IsEmpty() )
|
|
|
{
|
|
|
- tCompressed.UpdateAttributesOffline ( dUpdates, &tCompressed );
|
|
|
+ tCompressed.UpdateAttributesOffline ( dUpdates );
|
|
|
dUpdates.Reset();
|
|
|
}
|
|
|
|
|
|
@@ -9052,8 +9081,8 @@ bool RtIndex_c::SplitOneChunk ( int iChunkID, const char* szUvarFilter, int* pAf
|
|
|
auto& dUpdates = pVictim->CastIdx().m_dPostponedUpdates;
|
|
|
if ( !dUpdates.IsEmpty() )
|
|
|
{
|
|
|
- tIndexI.UpdateAttributesOffline ( dUpdates, &tIndexI );
|
|
|
- tIndexE.UpdateAttributesOffline ( dUpdates, &tIndexE );
|
|
|
+ tIndexI.UpdateAttributesOffline ( dUpdates );
|
|
|
+ tIndexE.UpdateAttributesOffline ( dUpdates );
|
|
|
dUpdates.Reset();
|
|
|
}
|
|
|
|
|
|
@@ -9150,7 +9179,7 @@ bool RtIndex_c::MergeTwoChunks ( int iAID, int iBID, int* pAffected )
|
|
|
auto dUpdates = GatherUpdates::FromChunksOrSegments ( tUpdated );
|
|
|
if ( !dUpdates.IsEmpty() )
|
|
|
{
|
|
|
- tMerged.UpdateAttributesOffline ( dUpdates, &tMerged );
|
|
|
+ tMerged.UpdateAttributesOffline ( dUpdates );
|
|
|
dUpdates.Reset();
|
|
|
}
|
|
|
|
|
|
@@ -9617,7 +9646,7 @@ int RtIndex_c::Kill ( DocID_t tDocID )
|
|
|
}
|
|
|
|
|
|
|
|
|
-int RtIndex_c::KillMulti ( const VecTraits_T<DocID_t> & dKlist )
|
|
|
+int RtIndex_c::KillMulti ( const VecTraits_T<DocID_t> & /*dKlist*/ )
|
|
|
{
|
|
|
assert ( 0 && "No external kills for RT");
|
|
|
return 0;
|
|
|
@@ -9768,13 +9797,13 @@ bool RtIndex_c::CopyExternalFiles ( int /*iPostfix*/, StrVec_t & dCopied )
|
|
|
void RtIndex_c::ProhibitSave()
|
|
|
{
|
|
|
StopOptimize();
|
|
|
- m_eSaving.store ( WriteState_e::DISCARD, std::memory_order_relaxed );
|
|
|
+ m_tSaving.SetState ( SaveState_c::DISCARD );
|
|
|
std::atomic_thread_fence ( std::memory_order_release );
|
|
|
}
|
|
|
|
|
|
void RtIndex_c::EnableSave()
|
|
|
{
|
|
|
- m_eSaving.store ( WriteState_e::ENABLED, std::memory_order_relaxed );
|
|
|
+ m_tSaving.SetState ( SaveState_c::ENABLED );
|
|
|
m_bOptimizeStop.store ( false, std::memory_order_relaxed );
|
|
|
std::atomic_thread_fence ( std::memory_order_release );
|
|
|
}
|
|
|
@@ -9786,7 +9815,9 @@ void RtIndex_c::LockFileState ( CSphVector<CSphString>& dFiles )
|
|
|
ForceRamFlush ( "forced" );
|
|
|
CSphString sError;
|
|
|
SaveAttributes ( sError ); // fixme! report error, better discard whole locking
|
|
|
- m_eSaving.store ( WriteState_e::DISABLED, std::memory_order_relaxed );
|
|
|
+ // that will ensure, if current txn is applying, it will be finished (especially kill pass) before we continue.
|
|
|
+ ScopedScheduler_c tSerialFiber ( m_tWorkers.SerialChunkAccess() );
|
|
|
+ m_tSaving.SetState ( SaveState_c::DISABLED );
|
|
|
std::atomic_thread_fence ( std::memory_order_release );
|
|
|
GetIndexFiles ( dFiles, dFiles );
|
|
|
}
|
|
|
@@ -10076,4 +10107,21 @@ void RtIndex_c::RecalculateRateLimit ( int64_t iSaved, int64_t iInserted, bool b
|
|
|
m_iSoftRamLimit = m_iRtMemLimit * m_fSaveRateLimit;
|
|
|
|
|
|
TRACE_COUNTER ( "mem", perfetto::CounterTrack ( "Ratio", "%" ), m_fSaveRateLimit );
|
|
|
-}
|
|
|
+}
|
|
|
+
|
|
|
+bool RtIndex_c::AlterSI ( CSphString & sError )
|
|
|
+{
|
|
|
+ // strength single-fiber access (don't rely upon to upstream w-lock)
|
|
|
+ ScopedScheduler_c tSerialFiber ( m_tWorkers.SerialChunkAccess() );
|
|
|
+ TRACE_SCHED ( "rt", "alter-si" );
|
|
|
+
|
|
|
+ auto pChunks = m_tRtChunks.DiskChunks();
|
|
|
+ for ( auto & tChunk : *pChunks )
|
|
|
+ {
|
|
|
+ if ( !tChunk->CastIdx().AlterSI ( sError ) )
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ RaiseAlterGeneration();
|
|
|
+ return true;
|
|
|
+}
|