Browse Source

NET-896: Scale test bug fixes (#2764)

* send peer update in async

* update metrics on fallback

* return http json response
Abhishek K 1 year ago
parent
commit
465f2bd5be
7 changed files with 51 additions and 50 deletions
  1. 5 1
      controllers/hosts.go
  2. 7 4
      models/host.go
  3. 4 5
      models/metrics.go
  4. 2 0
      mq/handlers.go
  5. 6 3
      mq/publishers.go
  6. 1 0
      pro/initialize.go
  7. 26 37
      pro/logic/metrics.go

+ 5 - 1
controllers/hosts.go

@@ -228,6 +228,7 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) {
 	currentHost, err := logic.GetHost(hostid)
 	if err != nil {
 		slog.Error("error getting host", "id", hostid, "error", err)
+		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
 		return
 	}
 
@@ -249,10 +250,13 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) {
 		err := logic.UpsertHost(currentHost)
 		if err != nil {
 			slog.Error("failed to update host", "id", currentHost.ID, "error", err)
+			logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 			return
 		}
-
+	case models.UpdateMetrics:
+		mq.UpdateMetricsFallBack(hostUpdate.Node.ID.String(), hostUpdate.NewMetrics)
 	}
+	logic.ReturnSuccessResponse(w, r, "updated host data")
 
 }
 

+ 7 - 4
models/host.go

@@ -114,6 +114,8 @@ const (
 	UpdateKeys HostMqAction = "UPDATE_KEYS"
 	// RequestPull - request a pull from a host
 	RequestPull HostMqAction = "REQ_PULL"
+	// UpdateMetrics - updates metrics data
+	UpdateMetrics HostMqAction = "UPDATE_METRICS"
 )
 
 // SignalAction - turn peer signal action
@@ -128,10 +130,11 @@ const (
 
 // HostUpdate - struct for host update
 type HostUpdate struct {
-	Action HostMqAction
-	Host   Host
-	Node   Node
-	Signal Signal
+	Action     HostMqAction
+	Host       Host
+	Node       Node
+	Signal     Signal
+	NewMetrics Metrics
 }
 
 // HostTurnRegister - struct for host turn registration

+ 4 - 5
models/metrics.go

@@ -6,11 +6,10 @@ import (
 
 // Metrics - metrics struct
 type Metrics struct {
-	Network       string            `json:"network" bson:"network" yaml:"network"`
-	NodeID        string            `json:"node_id" bson:"node_id" yaml:"node_id"`
-	NodeName      string            `json:"node_name" bson:"node_name" yaml:"node_name"`
-	Connectivity  map[string]Metric `json:"connectivity" bson:"connectivity" yaml:"connectivity"`
-	FailoverPeers map[string]string `json:"needsfailover" bson:"needsfailover" yaml:"needsfailover"`
+	Network      string            `json:"network" bson:"network" yaml:"network"`
+	NodeID       string            `json:"node_id" bson:"node_id" yaml:"node_id"`
+	NodeName     string            `json:"node_name" bson:"node_name" yaml:"node_name"`
+	Connectivity map[string]Metric `json:"connectivity" bson:"connectivity" yaml:"connectivity"`
 }
 
 // Metric - holds a metric for data between nodes

+ 2 - 0
mq/handlers.go

@@ -19,6 +19,8 @@ import (
 var UpdateMetrics = func(client mqtt.Client, msg mqtt.Message) {
 }
 
+var UpdateMetricsFallBack = func(nodeid string, newMetrics models.Metrics) {}
+
 // DefaultHandler default message queue handler  -- NOT USED
 func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
 	slog.Info("mqtt default handler", "topic", msg.Topic(), "message", msg.Payload())

+ 6 - 3
mq/publishers.go

@@ -30,9 +30,12 @@ func PublishPeerUpdate(replacePeers bool) error {
 	}
 	for _, host := range hosts {
 		host := host
-		if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil {
-			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-		}
+		go func(host models.Host) {
+			if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil {
+				logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+			}
+		}(host)
+
 	}
 	return err
 }

+ 1 - 0
pro/initialize.go

@@ -64,6 +64,7 @@ func InitPro() {
 	logic.IsInternetGw = proLogic.IsInternetGw
 	logic.SetInternetGw = proLogic.SetInternetGw
 	mq.UpdateMetrics = proLogic.MQUpdateMetrics
+	mq.UpdateMetricsFallBack = proLogic.MQUpdateMetricsFallBack
 }
 
 func retrieveProLogo() string {

+ 26 - 37
pro/logic/metrics.go

@@ -46,6 +46,28 @@ func DeleteMetrics(nodeid string) error {
 	return database.DeleteRecord(database.METRICS_TABLE_NAME, nodeid)
 }
 
+// MQUpdateMetricsFallBack - called when mq fallback thread is triggered on client
+func MQUpdateMetricsFallBack(nodeid string, newMetrics models.Metrics) {
+
+	currentNode, err := logic.GetNodeByID(nodeid)
+	if err != nil {
+		slog.Error("error getting node", "id", nodeid, "error", err)
+		return
+	}
+
+	updateNodeMetrics(&currentNode, &newMetrics)
+	if err = logic.UpdateMetrics(nodeid, &newMetrics); err != nil {
+		slog.Error("failed to update node metrics", "id", nodeid, "error", err)
+		return
+	}
+	if servercfg.IsMetricsExporter() {
+		if err := mq.PushMetricsToExporter(newMetrics); err != nil {
+			slog.Error("failed to push node metrics to exporter", "id", currentNode.ID, "error", err)
+		}
+	}
+	slog.Debug("updated node metrics", "id", nodeid)
+}
+
 func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 	id, err := mq.GetID(msg.Topic())
 	if err != nil {
@@ -68,9 +90,7 @@ func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 		slog.Error("error unmarshaling payload", "error", err)
 		return
 	}
-
-	shouldUpdate := updateNodeMetrics(&currentNode, &newMetrics)
-
+	updateNodeMetrics(&currentNode, &newMetrics)
 	if err = logic.UpdateMetrics(id, &newMetrics); err != nil {
 		slog.Error("failed to update node metrics", "id", id, "error", err)
 		return
@@ -80,34 +100,15 @@ func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 			slog.Error("failed to push node metrics to exporter", "id", currentNode.ID, "error", err)
 		}
 	}
-
-	if shouldUpdate {
-		slog.Info("updating peers after node detected connectivity issues", "id", currentNode.ID, "network", currentNode.Network)
-		host, err := logic.GetHost(currentNode.HostID.String())
-		if err == nil {
-			nodes, err := logic.GetAllNodes()
-			if err != nil {
-				return
-			}
-			if err = mq.PublishSingleHostPeerUpdate(host, nodes, nil, nil, false); err != nil {
-				slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
-			}
-		}
-	}
 	slog.Debug("updated node metrics", "id", id)
 }
 
-func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) bool {
-	if newMetrics.FailoverPeers == nil {
-		newMetrics.FailoverPeers = make(map[string]string)
-	}
+func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) {
+
 	oldMetrics, err := logic.GetMetrics(currentNode.ID.String())
 	if err != nil {
 		slog.Error("error finding old metrics for node", "id", currentNode.ID, "error", err)
-		return false
-	}
-	if oldMetrics.FailoverPeers == nil {
-		oldMetrics.FailoverPeers = make(map[string]string)
+		return
 	}
 
 	var attachedClients []models.ExtClient
@@ -164,19 +165,7 @@ func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) boo
 
 	}
 
-	shouldUpdate := len(oldMetrics.FailoverPeers) == 0 && len(newMetrics.FailoverPeers) > 0
-	for k, v := range oldMetrics.FailoverPeers {
-		if len(newMetrics.FailoverPeers[k]) > 0 && len(v) == 0 {
-			shouldUpdate = true
-		}
-
-		if len(v) > 0 && len(newMetrics.FailoverPeers[k]) == 0 {
-			newMetrics.FailoverPeers[k] = v
-		}
-	}
-
 	for k := range oldMetrics.Connectivity { // cleanup any left over data, self healing
 		delete(newMetrics.Connectivity, k)
 	}
-	return shouldUpdate
 }