Browse Source

* Allow for async apply updates

git-svn-id: trunk@47641 -
michael 4 years ago
parent
commit
a9e7d1ef0d
2 changed files with 140 additions and 55 deletions
  1. 139 55
      packages/fcl-db/src/base/bufdataset.pas
  2. 1 0
      packages/fcl-db/src/base/dbconst.pas

+ 139 - 55
packages/fcl-db/src/base/bufdataset.pas

@@ -45,6 +45,12 @@ type
     BlobBuffer     : PBlobBuffer;
   end;
 
+  TApplyRecUpdateResult = Record
+    HadError : Boolean;
+    Response : TResolverResponse;
+    Async : Boolean;
+  end;
+
   { TBufBlobStream }
 
   TBufBlobStream = class(TStream)
@@ -78,6 +84,7 @@ type
   end;
 
   TRecUpdateBuffer = record
+    Processing         : Boolean;
     UpdateKind         : TUpdateKind;
 {  BookMarkData:
      - Is -1 if the update has canceled out. For example: an appended record has been deleted again
@@ -558,6 +565,14 @@ type
     procedure BuildIndexes;
     procedure RemoveRecordFromIndexes(const ABookmark : TBufBookmark);
     procedure InternalCreateIndex(F: TBufDataSetIndex); virtual;
+    // Position record for update. Note that no check on state is done.
+    procedure PrepareForUpdate(aUpdate: TRecUpdateBuffer);
+    // Apply update for current record. Called in sequence by ApplyUpdates. The active buffer is positioned on the record to be updated.
+    function DoApplyUpdate(var aUpdate : TRecUpdateBuffer; AbortOnError : Boolean): TApplyRecUpdateResult;
+    // Call this when an update failed. This will return true if update must be retried.
+    function HandleUpdateError(aUpdate: TRecUpdateBuffer; var aResult: TApplyRecUpdateResult; E: Exception): Boolean;
+    // Call this when a record has been resolved. It will free temp buffers.
+    procedure ResolvedRecord(var aUpdate: TRecUpdateBuffer);
     Property CurrentIndexBuf : TBufIndex Read GetCurrentIndexBuf;
     Property CurrentIndexDef : TBufDatasetIndex Read FCurrentIndexDef;
     Property BufIndexDefs[Aindex : Integer] : TBufDatasetIndex Read GetBufIndexDef;
@@ -596,7 +611,8 @@ type
     function GetBookmarkFlag(Buffer: TRecordBuffer): TBookmarkFlag; override;
     function IsCursorOpen: Boolean; override;
     function  GetRecordCount: Longint; override;
-    procedure ApplyRecUpdate(UpdateKind : TUpdateKind); virtual;
+    procedure ApplyRecUpdate(UpdateKind : TUpdateKind); virtual; deprecated;
+    Function  ApplyRecUpdateEx(UpdateKind : TUpdateKind) : TApplyRecUpdateResult; virtual;
     procedure SetOnUpdateError(const AValue: TResolverErrorEvent);
     procedure SetFilterText(const Value: String); override; {virtual;}
     procedure SetFiltered(Value: Boolean); override; {virtual;}
@@ -1898,6 +1914,7 @@ begin
 end;
 
 procedure TDoubleLinkedBufIndex.BeginUpdate;
+
 begin
   if FCurrentRecBuf = FLastRecBuf then
     FCursOnFirstRec := True
@@ -2837,95 +2854,162 @@ begin
   FOnUpdateError := AValue;
 end;
 
-procedure TCustomBufDataset.ApplyUpdates; // For backward compatibility
+
+function TCustomBufDataset.ApplyRecUpdateEx(UpdateKind: TUpdateKind): TApplyRecUpdateResult;
 
 begin
-  ApplyUpdates(0);
+  Result:=Default(TApplyRecUpdateResult);
+  Result.Async:=False;
+  Result.Response:=rrApply;
+  ApplyRecUpdate(UpdateKind);
+end;
+
+Function TCustomBufDataset.HandleUpdateError(aUpdate : TRecUpdateBuffer; var aResult : TApplyRecUpdateResult; E : Exception) : Boolean;
+
+Var
+  AUpdateError : EUpdateError;
+
+begin
+  Result:=False;
+  AUpdateError := PSGetUpdateException(Exception(AcquireExceptionObject), nil);
+  if assigned(FOnUpdateError) then
+    begin
+    FOnUpdateError(Self, Self, AUpdateError, aUpdate.UpdateKind, aResult.Response);
+    AUpdateError.Free;
+    Result:=aResult.Response=rrApply;
+    end
+  else if (aResult.Response = rrAbort) then
+    begin
+    raise AUpdateError;
+    end
+  else
+    aUpdateError.Free;
+end;
+
+Procedure TCustomBufDataset.PrepareForUpdate(aUpdate : TRecUpdateBuffer);
+
+begin
+  // For async, this could be a different buffer than the buffer
+  CurrentIndexBuf.GotoBookmark(@aUpdate.BookmarkData);
+  // Synchronise the CurrentBuffer to the ActiveBuffer
+  CurrentRecordToBuffer(ActiveBuffer);
+end;
+
+function TCustomBufDataset.DoApplyUpdate(var aUpdate : TRecUpdateBuffer; AbortOnError : Boolean): TApplyRecUpdateResult;
+
+Const
+  ErrorResponse : Array[Boolean] of TResolverResponse = (rrSkip,rrAbort);
+
+begin
+  Result.Async:=False;
+  Result.Response:=rrApply;
+  // If the record is first inserted and afterwards deleted, do nothing
+  if ((aUpdate.UpdateKind=ukDelete) and not (assigned(aUpdate.OldValuesBuffer))) then
+    exit;
+  try
+     PrepareForUpdate(aUpdate);
+     Result:=ApplyRecUpdateEx(aUpdate.UpdateKind);
+  except
+    on E: EDatabaseError do
+      begin
+      Result.Response:=ErrorResponse[AbortOnError];
+      if HandleUpdateError(aUpdate,Result,E) then
+         DoApplyUpdate(aUpdate,AbortOnError);
+      Result.HadError:=True;
+      end
+    else
+      raise;
+  end;
+end;
+
+procedure TCustomBufDataset.ResolvedRecord(Var aUpdate : TRecUpdateBuffer);
+
+begin
+  FreeRecordBuffer(aUpdate.OldValuesBuffer);
+  if aUpdate.UpdateKind = ukDelete then
+    FreeRecordBuffer(TRecordBuffer(AUpdate.BookmarkData.BookmarkData));
+  AUpdate.BookmarkData.BookmarkData := nil;
+  aUpdate.Processing:=False;
 end;
 
 procedure TCustomBufDataset.ApplyUpdates(MaxErrors: Integer);
 
 var r            : Integer;
     FailedCount  : integer;
-    Response     : TResolverResponse;
+    Res : TApplyRecUpdateResult;
     StoreCurrRec : TBufBookmark;
-    AUpdateError : EUpdateError;
+    aSyncDetected : Boolean;
+    aOldState : TDataSetState;
+    UpdOK : Boolean;
 
 begin
+  Res:=Default(TApplyRecUpdateResult);
+  Res.Response:=rrApply;
   CheckBrowseMode;
-
   CurrentIndexBuf.StoreCurrentRecIntoBookmark(@StoreCurrRec);
-
-  r := 0;
-  FailedCount := 0;
-  Response := rrApply;
-  DisableControls;
+  aSyncDetected:=False;
+  aOldState:=SetTempState(dsBlockRead);
   try
-    while (r < Length(FUpdateBuffer)) and (Response <> rrAbort) do
-      begin
-      // If the record is first inserted and afterwards deleted, do nothing
-      if not ((FUpdateBuffer[r].UpdateKind=ukDelete) and not (assigned(FUpdateBuffer[r].OldValuesBuffer))) then
+    DisableControls;
+    r := 0;
+    FailedCount := 0;
+    while (r < Length(FUpdateBuffer)) and (Res.Response <> rrAbort) do
+      // S
+      if Not FUpdateBuffer[r].Processing then
         begin
-        CurrentIndexBuf.GotoBookmark(@FUpdateBuffer[r].BookmarkData);
-        // Synchronise the CurrentBuffer to the ActiveBuffer
-        CurrentRecordToBuffer(ActiveBuffer);
-        Response := rrApply;
+        UpdOK:=False;
+        FUpdateBuffer[r].Processing:=True;
         try
-          ApplyRecUpdate(FUpdateBuffer[r].UpdateKind);
-        except
-          on E: EDatabaseError do
-            begin
-            Inc(FailedCount);
-            if FailedCount > word(MaxErrors) then
-              Response := rrAbort
-            else
-              Response := rrSkip;
-            if assigned(FOnUpdateError) then
-              begin
-              AUpdateError := PSGetUpdateException(Exception(AcquireExceptionObject), nil);
-              FOnUpdateError(Self, Self, AUpdateError, FUpdateBuffer[r].UpdateKind, Response);
-              AUpdateError.Free;
-              if Response in [rrApply, rrIgnore] then dec(FailedCount);
-              if Response = rrApply then dec(r);
-              end
-            else if Response = rrAbort then
-              begin
-              AUpdateError := PSGetUpdateException(Exception(AcquireExceptionObject), nil);
-              raise AUpdateError;
-              end;
-            end
+          Res:=DoApplyUpdate(FUpdateBuffer[r],FailedCount>=MaxErrors);
+          UpdOK:=True;
+        finally
+          if Res.Async then
+            aSyncDetected:=True
           else
-            raise;
+            begin
+            FUpdateBuffer[r].Processing:=False;
+            if not UpdOK then // We have an exception, force HadError
+              Res.HadError:=True;
+            if Res.HadError then
+              Inc(FailedCount);
+            if Res.Response in [rrApply, rrIgnore] then
+              ResolvedRecord(FUpdateBuffer[r]);
+            end;
         end;
-        if Response in [rrApply, rrIgnore] then
-          begin
-          FreeRecordBuffer(FUpdateBuffer[r].OldValuesBuffer);
-          if FUpdateBuffer[r].UpdateKind = ukDelete then
-            FreeRecordBuffer( TRecordBuffer(FUpdateBuffer[r].BookmarkData.BookmarkData));
-          FUpdateBuffer[r].BookmarkData.BookmarkData := nil;
-          end
+        inc(r);
         end;
-      inc(r);
-      end;
   finally
-    if (FailedCount=0) and Not ManualMergeChangeLog then
+    if (FailedCount=0) and Not (AsyncDetected or ManualMergeChangeLog) then
       MergeChangeLog;
     InternalGotoBookmark(@StoreCurrRec);
     Resync([]);
+    RestoreState(aOldState);
     EnableControls;
   end;
 end;
 
+procedure TCustomBufDataset.ApplyUpdates; // For backward compatibility
+
+begin
+  ApplyUpdates(0);
+end;
+
 procedure TCustomBufDataset.MergeChangeLog;
 
-var r            : Integer;
+var
+  r,aCount : Integer;
 
 begin
+  aCount:=0;
+  for r:=0 to length(FUpdateBuffer)-1 do
+    if FUpdateBuffer[r].Processing then
+      Inc(aCount);
+  If aCount>0 then
+    Raise EDatabaseError.CreateFmt(SErrUpdatesInProgess,[ACount]);
   for r:=0 to length(FUpdateBuffer)-1 do
     if assigned(FUpdateBuffer[r].OldValuesBuffer) then
       FreeMem(FUpdateBuffer[r].OldValuesBuffer);
   SetLength(FUpdateBuffer,0);
-
   if assigned(FUpdateBlobBuffers) then for r:=0 to length(FUpdateBlobBuffers)-1 do
     if assigned(FUpdateBlobBuffers[r]) then
       begin

+ 1 - 0
packages/fcl-db/src/base/dbconst.pas

@@ -91,6 +91,7 @@ Resourcestring
   SFieldIsNull             = 'The field is null';
   SOnUpdateError           = 'An error occurred while applying the updates in a record: %s';
   SApplyRecNotSupported    = 'Applying updates is not supported by this TDataset descendent';
+  SErrUpdatesInProgess     = 'Apply updates in progress: %d records being processed.';
   SNoWhereFields           = 'No %s query specified and failed to generate one. (No fields for inclusion in where statement found)';
   SNoUpdateFields          = 'No %s query specified and failed to generate one. (No fields for insert- or update-statement found)';
   SNotSupported            = 'Operation is not supported by this type of database';