Browse Source

Improving TNode.AddOperations speed

Allow race in incoming operations thread, high speed
PascalCoin 6 years ago
parent
commit
6543a6423e
1 changed files with 143 additions and 97 deletions
  1. 143 97
      src/core/UNode.pas

+ 143 - 97
src/core/UNode.pas

@@ -56,6 +56,7 @@ Type
     FNotifyList : TList<TNodeNotifyEvents>;
     FBank : TPCBank;
     FMemPoolOperationsComp : TPCOperationsComp;
+    FMemPoolAddingOperationsList : TOrderedRawList;
     FNetServer : TNetServer;
     FBCBankNotify : TPCBankNotify;
     FPeerCache : String;
@@ -93,7 +94,7 @@ Type
     procedure UnlockMempoolWrite;
     //
     Function AddNewBlockChain(SenderConnection : TNetConnection; NewBlockOperations: TPCOperationsComp; var newBlockAccount: TBlockAccount; var errors: String): Boolean;
-    Function AddOperations(SenderConnection : TNetConnection; OperationsHashTree : TOperationsHashTree; OperationsResult : TOperationsResumeList; var errors: String): Integer;
+    Function AddOperations(SenderConnection : TNetConnection; AOperationsHashTreeToAdd : TOperationsHashTree; OperationsResult : TOperationsResumeList; var errors: String): Integer;
     Function AddOperation(SenderConnection : TNetConnection; Operation : TPCOperation; var errors: String): Boolean;
     Function SendNodeMessage(Target : TNetConnection; const TheMessage : String; var errors : String) : Boolean;
     //
@@ -370,7 +371,7 @@ begin
   End;
 end;
 
-function TNode.AddOperations(SenderConnection : TNetConnection; OperationsHashTree : TOperationsHashTree; OperationsResult : TOperationsResumeList; var errors: String): Integer;
+function TNode.AddOperations(SenderConnection : TNetConnection; AOperationsHashTreeToAdd : TOperationsHashTree; OperationsResult : TOperationsResumeList; var errors: String): Integer;
   {$IFDEF BufferOfFutureOperations}
   Procedure Process_BufferOfFutureOperations(ALockedMempool : TPCOperationsComp; valids_operations : TOperationsHashTree);
   Var i,j, nAdded, nDeleted : Integer;
@@ -406,7 +407,7 @@ function TNode.AddOperations(SenderConnection : TNetConnection; OperationsHashTr
   {$ENDIF}
 Var
   i,j,nSpam,nError,nRepeated : Integer;
-  valids_operations : TOperationsHashTree;
+  LValids_operations : TOperationsHashTree;
   nc : TNetConnection;
   e : String;
   s : String;
@@ -414,11 +415,14 @@ Var
   ActOp : TPCOperation;
   {$IFDEF BufferOfFutureOperations}sAcc : TAccount;{$ENDIF}
   LLockedMempool : TPCOperationsComp;
+  LOpsToAdd : TList<TPCOperation>;
+  LTempSafeboxTransaction : TPCSafeBoxTransaction;
+  LTickCount : TTickCount;
 begin
   Result := -1; // -1 Means Node is blocked or disabled
   if Assigned(OperationsResult) then OperationsResult.Clear;
   if FDisabledsNewBlocksCount>0 then begin
-    errors := Format('Cannot Add Operations due is adding disabled - OpCount:%d',[OperationsHashTree.OperationsCount]);
+    errors := Format('Cannot Add Operations due is adding disabled - OpCount:%d',[AOperationsHashTreeToAdd.OperationsCount]);
     TLog.NewLog(ltinfo,Classname,errors);
     exit;
   end;
@@ -426,66 +430,69 @@ begin
   nRepeated := 0;
   nError := 0;
   errors := '';
-  valids_operations := TOperationsHashTree.Create;
+  Result := 0;
+  LTickCount := TPlatform.GetTickCount;
+  LValids_operations := TOperationsHashTree.Create;
   try
-    {$IFDEF HIGHLOG}TLog.NewLog(ltdebug,Classname,Format('AddOperations Connection:%s Operations:%d',[
-      Inttohex(PtrInt(SenderConnection),8),OperationsHashTree.OperationsCount]));{$ENDIF}
-    if Not TPCThread.TryProtectEnterCriticalSection(Self,4000,FLockMempool) then begin
-      s := 'Cannot AddOperations due blocking lock operations node';
-      TLog.NewLog(lterror,Classname,s);
-      if TThread.CurrentThread.ThreadID=MainThreadID then raise Exception.Create(s) else exit;
-    end;
+    LOpsToAdd := TList<TPCOperation>.Create;
     try
-      Result := 0;
+      // In order to allow income operations from multiple threads will divide the
+      // process in LOCKING steps: (instead of a single global locking)
+      // 1 - Add received AOperationsHashTreeToAdd in global FMemPoolAddingOperationsList
+      //     without duplicates. This allows receive same operation twice and execute
+      //     only first received
+      // 2 - Verify signatures in a multithread (if CPU's available)
+      // 3 - For each not repeated operation, try to add to mempool
+
+      // Step 1: Add operations to FMemPoolAddingOperationsList
+      LLockedMempool := LockMempoolWrite;
+      try
+        for i := 0 to AOperationsHashTreeToAdd.OperationsCount-1 do begin
+          ActOp := AOperationsHashTreeToAdd.GetOperation(i);
+          j := FMemPoolAddingOperationsList.IndexOf( ActOp.Sha256 );
+          if (j<0) then begin
+            LOpsToAdd.Add(ActOp);
+            FMemPoolAddingOperationsList.Add(ActOp.Sha256);
+          end;
+        end;
+      finally
+        UnlockMempoolWrite;
+      end;
+
+      // Step 2:
+      LTempSafeboxTransaction := TPCSafeBoxTransaction.Create(Bank.SafeBox);
+      try
+        TPCOperationsSignatureValidator.MultiThreadPreValidateSignatures(LTempSafeboxTransaction,LOpsToAdd,Nil);
+      finally
+        LTempSafeboxTransaction.Free;
+      end;
+
       {$IFDEF BufferOfFutureOperations}
-      Process_BufferOfFutureOperations(valids_operations);
+      LLockedMempool := LockMempoolWrite;
+      try
+        Process_BufferOfFutureOperations(LLockedMempool,LValids_operations);
+      finally
+        UnlockMempoolWrite;
+      end;
       {$ENDIF}
 
-      for j := 0 to OperationsHashTree.OperationsCount-1 do begin
-        ActOp := OperationsHashTree.GetOperation(j);
+      // Step 3:
+      for j := 0 to LOpsToAdd.Count-1 do begin
+        ActOp := LOpsToAdd[j];
         LLockedMempool := LockMempoolWrite;
         try
 
-        If (LLockedMempool.OperationsHashTree.IndexOfOperation(ActOp)<0) then begin
-          // Protocol 2 limitation: In order to prevent spam of operations without Fee, will protect it
-          If (ActOp.OperationFee=0) And (Bank.SafeBox.CurrentProtocol>=CT_PROTOCOL_2) And
-             (LLockedMempool.OperationsHashTree.CountOperationsBySameSignerWithoutFee(ActOp.SignerAccount)>=CT_MaxAccountOperationsPerBlockWithoutFee) then begin
-            inc(nSpam);
-            e := Format('Account %s zero fee operations per block limit:%d',[TAccountComp.AccountNumberToAccountTxtNumber(ActOp.SignerAccount),CT_MaxAccountOperationsPerBlockWithoutFee]);
-            if (nSpam<=5) then begin  // To Limit errors in a String... speed up
-              if (errors<>'') then errors := errors+' ';
-              errors := errors+'Op '+IntToStr(j+1)+'/'+IntToStr(OperationsHashTree.OperationsCount)+':'+e;
-            end;
-            TLog.NewLog(ltdebug,Classname,Format('AddOperation spam %d/%d: %s  - Error:%s',
-              [(j+1),OperationsHashTree.OperationsCount,ActOp.ToString,e]));
-            if Assigned(OperationsResult) then begin
-              TPCOperation.OperationToOperationResume(0,ActOp,True,ActOp.SignerAccount,OPR);
-              OPR.valid := false;
-              OPR.NOpInsideBlock:=-1;
-              OPR.OperationHash := Nil;
-              OPR.errors := e;
-              OperationsResult.Add(OPR);
-            end;
-          end else begin
-            if (LLockedMempool.AddOperation(true,ActOp,e)) then begin
-              inc(Result);
-              FSentOperations.Add(ActOp.Sha256,LLockedMempool.OperationBlock.block);
-              valids_operations.AddOperationToHashTree(ActOp);
-              TLog.NewLog(ltdebug,Classname,Format('AddOperation %d/%d: %s',[(j+1),OperationsHashTree.OperationsCount,ActOp.ToString]));
-              if Assigned(OperationsResult) then begin
-                TPCOperation.OperationToOperationResume(0,ActOp,True,ActOp.SignerAccount,OPR);
-                OPR.NOpInsideBlock:=LLockedMempool.Count-1;
-                OPR.Balance := LLockedMempool.SafeBoxTransaction.Account(ActOp.SignerAccount).balance;
-                OperationsResult.Add(OPR);
-              end;
-            end else begin
-              inc(nError);
-              if (nError<=5) then begin  // To Limit errors in a String... speed up
+          If (LLockedMempool.OperationsHashTree.IndexOfOperation(ActOp)<0) then begin
+            // Protocol 2 limitation: In order to prevent spam of operations without Fee, will protect it
+            If (ActOp.OperationFee=0) And (Bank.SafeBox.CurrentProtocol>=CT_PROTOCOL_2) And
+               (LLockedMempool.OperationsHashTree.CountOperationsBySameSignerWithoutFee(ActOp.SignerAccount)>=CT_MaxAccountOperationsPerBlockWithoutFee) then begin
+              inc(nSpam);
+              e := Format('Account %s zero fee operations per block limit:%d',[TAccountComp.AccountNumberToAccountTxtNumber(ActOp.SignerAccount),CT_MaxAccountOperationsPerBlockWithoutFee]);
+              if (nSpam<=5) then begin  // To Limit errors in a String... speed up
                 if (errors<>'') then errors := errors+' ';
-                errors := errors+'Op '+IntToStr(j+1)+'/'+IntToStr(OperationsHashTree.OperationsCount)+':'+e;
+                errors := errors+'Op '+IntToStr(j+1)+'/'+IntToStr(LOpsToAdd.Count)+':'+e;
               end;
-              TLog.NewLog(ltdebug,Classname,Format('AddOperation invalid/duplicated %d/%d: %s  - Error:%s',
-                [(j+1),OperationsHashTree.OperationsCount,ActOp.ToString,e]));
+              {$IFDEF HIGHLOG}TLog.NewLog(ltdebug,Classname,Format('AddOperation spam %d/%d: %s  - Error:%s',[(j+1),LOpsToAdd.Count,ActOp.ToString,e]));{$ENDIF}
               if Assigned(OperationsResult) then begin
                 TPCOperation.OperationToOperationResume(0,ActOp,True,ActOp.SignerAccount,OPR);
                 OPR.valid := false;
@@ -494,74 +501,111 @@ begin
                 OPR.errors := e;
                 OperationsResult.Add(OPR);
               end;
-              {$IFDEF BufferOfFutureOperations}
-              // Used to solve 2.0.0 "invalid order of operations" bug
-              If (Assigned(SenderConnection)) Then begin
-                if ActOp.SignerAccount<FOperations.SafeBoxTransaction.FreezedSafeBox.AccountsCount then begin
-                  sAcc := FOperations.SafeBoxTransaction.Account(ActOp.SignerAccount);
-                  If (sAcc.n_operation<ActOp.N_Operation) Or
-                     ((sAcc.n_operation=ActOp.N_Operation) AND (sAcc.balance=0) And (ActOp.OperationFee>0) And (ActOp.OpType = CT_Op_Changekey)) then begin
-                    If FBufferAuxWaitingOperations.IndexOfOperation(ActOp)<0 then begin
-                      FBufferAuxWaitingOperations.AddOperationToHashTree(ActOp);
-                      TLog.NewLog(ltInfo,Classname,Format('New FromBufferWaitingOperations %d/%d (new buffer size:%d): %s',[j+1,OperationsHashTree.OperationsCount,FBufferAuxWaitingOperations.OperationsCount,ActOp.ToString]));
+            end else begin
+              if (LLockedMempool.AddOperation(true,ActOp,e)) then begin
+                inc(Result);
+                FSentOperations.Add(ActOp.Sha256,LLockedMempool.OperationBlock.block);
+                LValids_operations.AddOperationToHashTree(ActOp);
+                {$IFDEF HIGHLOG}TLog.NewLog(ltdebug,Classname,Format('AddOperation %d/%d: %s',[(j+1),LOpsToAdd.Count,ActOp.ToString]));{$ENDIF}
+                if Assigned(OperationsResult) then begin
+                  TPCOperation.OperationToOperationResume(0,ActOp,True,ActOp.SignerAccount,OPR);
+                  OPR.NOpInsideBlock:=LLockedMempool.Count-1;
+                  OPR.Balance := LLockedMempool.SafeBoxTransaction.Account(ActOp.SignerAccount).balance;
+                  OperationsResult.Add(OPR);
+                end;
+              end else begin
+                inc(nError);
+                if (nError<=5) then begin  // To Limit errors in a String... speed up
+                  if (errors<>'') then errors := errors+' ';
+                  errors := errors+'Op '+IntToStr(j+1)+'/'+IntToStr(LOpsToAdd.Count)+':'+e;
+                end;
+                {$IFDEF HIGHLOG}TLog.NewLog(ltdebug,Classname,Format('AddOperation invalid/duplicated %d/%d: %s  - Error:%s',[(j+1),LOpsToAdd.Count,ActOp.ToString,e]));{$ENDIF}
+                if Assigned(OperationsResult) then begin
+                  TPCOperation.OperationToOperationResume(0,ActOp,True,ActOp.SignerAccount,OPR);
+                  OPR.valid := false;
+                  OPR.NOpInsideBlock:=-1;
+                  OPR.OperationHash := Nil;
+                  OPR.errors := e;
+                  OperationsResult.Add(OPR);
+                end;
+                {$IFDEF BufferOfFutureOperations}
+                // Used to solve 2.0.0 "invalid order of operations" bug
+                If (Assigned(SenderConnection)) Then begin
+                  if ActOp.SignerAccount<LLockedMempool.SafeBoxTransaction.FreezedSafeBox.AccountsCount then begin
+                    sAcc := LLockedMempool.SafeBoxTransaction.Account(ActOp.SignerAccount);
+                    If (sAcc.n_operation<ActOp.N_Operation) Or
+                       ((sAcc.n_operation=ActOp.N_Operation) AND (sAcc.balance=0) And (ActOp.OperationFee>0) And (ActOp.OpType = CT_Op_Changekey)) then begin
+                      If FBufferAuxWaitingOperations.IndexOfOperation(ActOp)<0 then begin
+                        FBufferAuxWaitingOperations.AddOperationToHashTree(ActOp);
+                        TLog.NewLog(ltInfo,Classname,Format('New FromBufferWaitingOperations %d/%d (new buffer size:%d): %s',[j+1,LOpsToAdd.Count,FBufferAuxWaitingOperations.OperationsCount,ActOp.ToString]));
+                      end;
                     end;
                   end;
                 end;
+                {$ENDIF}
               end;
-              {$ENDIF}
             end;
+          end else begin
+            inc(nRepeated);
+            e := Format('AddOperation made before %d/%d: %s',[(j+1),LOpsToAdd.Count,ActOp.ToString]);
+            if (nRepeated<=5) then begin  // To Limit errors in a String... speed up
+              if (errors<>'') then errors := errors+' ';
+              errors := errors + e;
+            end;
+            if Assigned(OperationsResult) then begin
+              TPCOperation.OperationToOperationResume(0,ActOp,True,ActOp.SignerAccount,OPR);
+              OPR.valid := false;
+              OPR.NOpInsideBlock:=-1;
+              OPR.OperationHash := Nil;
+              OPR.errors := e;
+              OperationsResult.Add(OPR);
+            end;
+            {$IFDEF HIGHLOG}TLog.NewLog(ltdebug,Classname,Format('AddOperation made before %d/%d: %s',[(j+1),LOpsToAdd.Count,ActOp.ToString]));{$ENDIF}
           end;
-        end else begin
-          inc(nRepeated);
-          e := Format('AddOperation made before %d/%d: %s',[(j+1),OperationsHashTree.OperationsCount,ActOp.ToString]);
-          if (nRepeated<=5) then begin  // To Limit errors in a String... speed up
-            if (errors<>'') then errors := errors+' ';
-            errors := errors + e;
-          end;
-          if Assigned(OperationsResult) then begin
-            TPCOperation.OperationToOperationResume(0,ActOp,True,ActOp.SignerAccount,OPR);
-            OPR.valid := false;
-            OPR.NOpInsideBlock:=-1;
-            OPR.OperationHash := Nil;
-            OPR.errors := e;
-            OperationsResult.Add(OPR);
-          end;
-          {$IFDEF HIGHLOG}TLog.NewLog(ltdebug,Classname,Format('AddOperation made before %d/%d: %s',[(j+1),OperationsHashTree.OperationsCount,ActOp.ToString]));{$ENDIF}
-        end;
         finally
           UnlockMempoolWrite;
         end;
       end; // for i
-      // Save operations buffer
       If Result<>0 then begin
         LLockedMempool := LockMempoolRead;
         try
+          // Save operations buffer
           Bank.Storage.SavePendingBufferOperations(LLockedMempool.OperationsHashTree);
         finally
           UnlockMempoolRead;
         end;
-      end;
-    finally
-      FLockMempool.Release;
-      if Result<>0 then begin
+        LTickCount := TPlatform.GetElapsedMilliseconds(LTickCount);
+        if LTickCount=0 then LTickCount:=1;
         if Assigned(SenderConnection) then begin
           s := SenderConnection.ClientRemoteAddr;
         end else s := '(SELF)';
-        TLog.NewLog(ltdebug,Classname,Format('Finalizing AddOperations from %s Operations:%d valids:%d spam:%d invalids:%d repeated:%d',[s,OperationsHashTree.OperationsCount,Result,nSpam,nError,nRepeated]));
+        TLog.NewLog(ltdebug,Classname,Format('Finalizing AddOperations from %s Operations:%d of %d valids:%d spam:%d invalids:%d repeated:%d Miliseconds:%d %.1f ops/sec',
+          [s,LOpsToAdd.Count,AOperationsHashTreeToAdd.OperationsCount,Result,nSpam,nError,nRepeated,LTickCount,LOpsToAdd.Count * 1000 / LTickCount]));
+        if FBroadcastData then begin
+          // Send to other nodes
+          j := TNetData.NetData.ConnectionsCountAll;
+          for i:=0 to j-1 do begin
+            If TNetData.NetData.GetConnection(i,nc) then begin
+              if (nc<>SenderConnection) And (nc.Connected) And (nc.RemoteOperationBlock.block>0) then TThreadNodeNotifyOperations.Create(nc,LValids_operations);
+            end;
+          end;
+        end;
       end;
-    end;
-    if Result=0 then exit;
-    if FBroadcastData then begin
-      // Send to other nodes
-      j := TNetData.NetData.ConnectionsCountAll;
-      for i:=0 to j-1 do begin
-        If TNetData.NetData.GetConnection(i,nc) then begin
-          if (nc<>SenderConnection) And (nc.Connected) And (nc.RemoteOperationBlock.block>0) then TThreadNodeNotifyOperations.Create(nc,valids_operations);
+    finally
+      // Remove LOpsToAdd from FMemPoolAddingOperationsList
+      LLockedMempool := LockMempoolWrite;
+      try
+        for i := 0 to LOpsToAdd.Count-1 do begin
+          ActOp := LOpsToAdd[i];
+          FMemPoolAddingOperationsList.Remove(ActOp.Sha256);
         end;
+      finally
+        UnlockMempoolWrite;
       end;
+      LOpsToAdd.Free;
     end;
   finally
-    valids_operations.Free;
+    LValids_operations.Free;
   end;
   // Notify it!
   for i := 0 to FNotifyList.Count-1 do begin
@@ -602,6 +646,7 @@ begin
   FMemPoolOperationsComp := TPCOperationsComp.Create(Nil);
   FMemPoolOperationsComp.bank := FBank;
   FNotifyList := TList<TNodeNotifyEvents>.Create;
+  FMemPoolAddingOperationsList := TOrderedRawList.Create;
   {$IFDEF BufferOfFutureOperations}
   FBufferAuxWaitingOperations := TOperationsHashTree.Create;
   {$ENDIF}
@@ -680,6 +725,7 @@ begin
     FreeAndNil(FNotifyList);
     step := 'Destroying Operations';
     FreeAndNil(FMemPoolOperationsComp);
+    FreeAndNil(FMemPoolAddingOperationsList);
     step := 'Assigning NIL to node var';
     if _Node=Self then _Node := Nil;
     Step := 'Destroying SentOperations list';