Ver código fonte

add failover ack cleanup hook

abhishek9686 7 meses atrás
pai
commit
cdf367b270
3 arquivos alterados com 46 adições e 2 exclusões
  1. 5 1
      pro/controllers/failover.go
  2. 1 0
      pro/initialize.go
  3. 40 1
      pro/logic/failover.go

+ 5 - 1
pro/controllers/failover.go

@@ -441,7 +441,11 @@ func registerFailoverAck(w http.ResponseWriter, r *http.Request) {
 	sendPeerUpdate = true
 
 	if sendPeerUpdate {
-		go mq.PublishPeerUpdate(false)
+		go func() {
+			mq.PublishPeerUpdate(false)
+			// delete ACK
+			proLogic.DeRegisterFailOverAck(nodeid, peerid)
+		}()
 	}
 
 	w.Header().Set("Content-Type", "application/json")

+ 1 - 0
pro/initialize.go

@@ -83,6 +83,7 @@ func InitPro() {
 		if servercfg.GetServerConfig().RacAutoDisable {
 			AddRacHooks()
 		}
+		proLogic.AddFailOverHook()
 
 		var authProvider = auth.InitializeAuthProvider()
 		if authProvider != "" {

+ 40 - 1
pro/logic/failover.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"net"
 	"sync"
+	"time"
 
 	"github.com/google/uuid"
 	"github.com/gravitl/netmaker/database"
@@ -16,6 +17,13 @@ import (
 
 var failOverCtxMutex = &sync.RWMutex{}
 
+func AddFailOverHook() {
+	logic.HookManagerCh <- models.HookDetails{
+		Hook:     FailOverCleanUpHook,
+		Interval: time.Minute * 5,
+	}
+}
+
 func SetFailOverCtx(failOverNode, victimNode, peerNode models.Node) error {
 	failOverCtxMutex.Lock()
 	defer failOverCtxMutex.Unlock()
@@ -192,11 +200,42 @@ func CreateFailOver(node models.Node) error {
 
 // DoesFailoverAckExists - checks if ack with this id exists
 func DoesFailoverAckExists(id string) bool {
+	failOverCtxMutex.Lock()
+	defer failOverCtxMutex.Unlock()
 	_, err := database.FetchRecord(database.PEER_ACK_TABLE, id)
 	return err == nil
 }
 
 // RegisterFailOverAck - registers failover ack signal
 func RegisterFailOverAck(nodeid, peerid string) error {
-	return database.Insert(fmt.Sprintf("%s-%s", nodeid, peerid), "true", database.PEER_ACK_TABLE)
+	failOverCtxMutex.Lock()
+	defer failOverCtxMutex.Unlock()
+	return database.Insert(fmt.Sprintf("%s-%s", nodeid, peerid), time.Now().String(), database.PEER_ACK_TABLE)
+}
+
+// DeRegisterFailOverAck - removes the peer and node acks from DB
+func DeRegisterFailOverAck(nodeid, peerid string) {
+	failOverCtxMutex.Lock()
+	defer failOverCtxMutex.Unlock()
+	database.DeleteRecord(fmt.Sprintf("%s-%s", nodeid, peerid), database.PEER_ACK_TABLE)
+	database.DeleteRecord(fmt.Sprintf("%s-%s", peerid, nodeid), database.PEER_ACK_TABLE)
+}
+
+func FailOverCleanUpHook() error {
+	failOverCtxMutex.Lock()
+	defer failOverCtxMutex.Unlock()
+	data, err := database.FetchRecords(database.PEER_ACK_TABLE)
+	if err != nil {
+		return err
+	}
+	for key, value := range data {
+		parsedTime, err := time.Parse("2006-01-02 15:04:05", value)
+		if err != nil {
+			continue
+		}
+		if time.Since(parsedTime) > time.Minute*5 {
+			database.DeleteRecord(key, database.PEER_ACK_TABLE)
+		}
+	}
+	return nil
 }