Browse Source

NM-137: Add addtional mq actions to host api (#3671)

* add host node update action

* add peer signal action to fallback api

* add replace peers to host pull

* add delete host action to fallback api

* update base go builder image

* update go builder tag

* check host port to avoid conflicts behind NAT

* fix connect/disconnect on api

* send pull signal on disconnect from UI

* fix panic on host join via user auth

* reset failover on disconnect
Abhishek K 5 days ago
parent
commit
49e28e3385
11 changed files with 99 additions and 67 deletions
  1. 1 1
      auth/host_session.go
  2. 26 19
      controllers/hosts.go
  3. 3 0
      controllers/node.go
  4. 1 1
      docker/Dockerfile-go-builder
  5. 23 0
      logic/hosts.go
  6. 2 0
      logic/settings.go
  7. 0 11
      logic/wireguard.go
  8. 2 0
      models/host.go
  9. 1 0
      models/node.go
  10. 1 0
      models/structs.go
  11. 39 35
      mq/handlers.go

+ 1 - 1
auth/host_session.go

@@ -226,7 +226,7 @@ func SessionHandler(conn *websocket.Conn) {
 		if err = conn.WriteMessage(messageType, reponseData); err != nil {
 			logger.Log(0, "error during message writing:", err.Error())
 		}
-		go CheckNetRegAndHostUpdate(models.EnrollmentKey{Networks: netsToAdd}, &result.Host, "")
+		go CheckNetRegAndHostUpdate(models.EnrollmentKey{Value: "user auth", Tags: []string{registerMessage.User}, Networks: netsToAdd}, &result.Host, "")
 	case <-timeout: // the read from req.answerCh has timed out
 		logger.Log(0, "timeout signal recv,exiting oauth socket conn")
 		break

+ 26 - 19
controllers/hosts.go

@@ -210,7 +210,7 @@ func pull(w http.ResponseWriter, r *http.Request) {
 			//slog.Error("failed to get node:", "id", node.ID, "error", err)
 			continue
 		}
-		if node.FailedOverBy != uuid.Nil && r.URL.Query().Get("reset_failovered") == "true" {
+		if r.URL.Query().Get("reset_failovered") == "true" {
 			logic.ResetFailedOverPeer(&node)
 			sendPeerUpdate = true
 		}
@@ -232,19 +232,11 @@ func pull(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
-	serverConf := logic.GetServerInfo()
-	key, keyErr := logic.RetrievePublicTrafficKey()
-	if keyErr != nil {
-		logger.Log(0, "error retrieving key:", keyErr.Error())
-		logic.ReturnErrorResponse(w, r, logic.FormatError(keyErr, "internal"))
-		return
-	}
 	_ = logic.CheckHostPorts(host)
-	serverConf.TrafficKey = key
 	response := models.HostPull{
 		Host:              *host,
 		Nodes:             logic.GetHostNodes(host),
-		ServerConfig:      serverConf,
+		ServerConfig:      hPU.ServerConfig,
 		Peers:             hPU.Peers,
 		PeerIDs:           hPU.PeerIDs,
 		HostNetworkInfo:   hPU.HostNetworkInfo,
@@ -257,6 +249,7 @@ func pull(w http.ResponseWriter, r *http.Request) {
 		EgressWithDomains: hPU.EgressWithDomains,
 		EndpointDetection: logic.IsEndpointDetectionEnabled(),
 		DnsNameservers:    hPU.DnsNameservers,
+		ReplacePeers:      hPU.ReplacePeers,
 	}
 
 	logger.Log(1, hostID, host.Name, "completed a pull")
@@ -363,8 +356,7 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
 		return
 	}
-	var sendPeerUpdate bool
-	var replacePeers bool
+	var sendPeerUpdate, sendDeletedNodeUpdate, replacePeers bool
 	var hostUpdate models.HostUpdate
 	err = json.NewDecoder(r.Body).Decode(&hostUpdate)
 	if err != nil {
@@ -376,6 +368,10 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) {
 	switch hostUpdate.Action {
 	case models.CheckIn:
 		sendPeerUpdate = mq.HandleHostCheckin(&hostUpdate.Host, currentHost)
+		changed := logic.CheckHostPorts(currentHost)
+		if changed {
+			mq.HostUpdate(&models.HostUpdate{Action: models.UpdateHost, Host: *currentHost})
+		}
 	case models.UpdateHost:
 		if hostUpdate.Host.PublicKey != currentHost.PublicKey {
 			//remove old peer entry
@@ -388,7 +384,8 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) {
 			logic.ReturnErrorResponse(w, r, logic.FormatError(err, logic.Internal))
 			return
 		}
-
+	case models.UpdateNode:
+		sendDeletedNodeUpdate, sendPeerUpdate = logic.UpdateHostNode(&hostUpdate.Host, &hostUpdate.Node)
 	case models.UpdateMetrics:
 		mq.UpdateMetricsFallBack(hostUpdate.Node.ID.String(), hostUpdate.NewMetrics)
 	case models.EgressUpdate:
@@ -403,14 +400,24 @@ func hostUpdateFallback(w http.ResponseWriter, r *http.Request) {
 			e.Update(db.WithContext(r.Context()))
 		}
 		sendPeerUpdate = true
+	case models.SignalHost:
+		mq.SignalPeer(hostUpdate.Signal)
+	case models.DeleteHost:
+		mq.DeleteAndCleanupHost(currentHost)
+		sendPeerUpdate = true
 	}
-
-	if sendPeerUpdate {
-		err := mq.PublishPeerUpdate(replacePeers)
-		if err != nil {
-			slog.Error("failed to publish peer update", "error", err)
+	go func() {
+		if sendDeletedNodeUpdate {
+			mq.PublishDeletedNodePeerUpdate(&hostUpdate.Node)
 		}
-	}
+		if sendPeerUpdate {
+			err := mq.PublishPeerUpdate(replacePeers)
+			if err != nil {
+				slog.Error("failed to publish peer update", "error", err)
+			}
+		}
+	}()
+
 	logic.ReturnSuccessResponse(w, r, "updated host data")
 }
 

+ 3 - 0
controllers/node.go

@@ -729,6 +729,9 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 		if err := mq.NodeUpdate(newNode); err != nil {
 			slog.Error("error publishing node update to node", "node", newNode.ID, "error", err)
 		}
+		if !newNode.Connected {
+			mq.HostUpdate(&models.HostUpdate{Host: *host, Action: models.RequestPull})
+		}
 		mq.PublishPeerUpdate(false)
 		if servercfg.IsDNSMode() {
 			logic.SetDNS()

+ 1 - 1
docker/Dockerfile-go-builder

@@ -1,4 +1,4 @@
-FROM golang:1.23.0-alpine3.20
+FROM golang:1.24.0-alpine3.20
 ARG version 
 RUN apk add --no-cache build-base
 WORKDIR /app

+ 23 - 0
logic/hosts.go

@@ -377,6 +377,29 @@ func UpsertHost(h *models.Host) error {
 	return nil
 }
 
+// UpdateHostNode -  handles updates from client nodes
+func UpdateHostNode(h *models.Host, newNode *models.Node) (publishDeletedNodeUpdate, publishPeerUpdate bool) {
+	currentNode, err := GetNodeByID(newNode.ID.String())
+	if err != nil {
+		return
+	}
+	ifaceDelta := IfaceDelta(&currentNode, newNode)
+	newNode.SetLastCheckIn()
+	if err := UpdateNode(&currentNode, newNode); err != nil {
+		slog.Error("error saving node", "name", h.Name, "network", newNode.Network, "error", err)
+		return
+	}
+	if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes
+		if !newNode.Connected {
+			publishDeletedNodeUpdate = true
+		}
+		publishPeerUpdate = true
+		// reset failover data for this node
+		ResetFailedOverPeer(newNode)
+	}
+	return
+}
+
 // RemoveHost - removes a given host from server
 func RemoveHost(h *models.Host, forceDelete bool) error {
 	if !forceDelete && len(h.Nodes) > 0 {

+ 2 - 0
logic/settings.go

@@ -245,6 +245,8 @@ func GetServerInfo() models.ServerConfig {
 	cfg.StunServers = serverSettings.StunServers
 	cfg.DefaultDomain = serverSettings.DefaultDomain
 	cfg.EndpointDetection = serverSettings.EndpointDetection
+	key, _ := RetrievePublicTrafficKey()
+	cfg.TrafficKey = key
 	return cfg
 }
 

+ 0 - 11
logic/wireguard.go

@@ -9,20 +9,9 @@ func IfaceDelta(currentNode *models.Node, newNode *models.Node) bool {
 	// single comparison statements
 	if newNode.Address.String() != currentNode.Address.String() ||
 		newNode.Address6.String() != currentNode.Address6.String() ||
-		newNode.IsRelay != currentNode.IsRelay ||
 		newNode.Connected != currentNode.Connected {
 		return true
 	}
-	if newNode.IsRelay {
-		if len(currentNode.RelayedNodes) != len(newNode.RelayedNodes) {
-			return true
-		}
-		for _, node := range newNode.RelayedNodes {
-			if !StringSliceContains(currentNode.RelayedNodes, node) {
-				return true
-			}
-		}
-	}
 	return false
 }
 

+ 2 - 0
models/host.go

@@ -106,6 +106,8 @@ const (
 	SignalHost HostMqAction = "SIGNAL_HOST"
 	// UpdateHost - constant for host update action
 	UpdateHost HostMqAction = "UPDATE_HOST"
+	// UpdateNode - constant for Node update action
+	UpdateNode HostMqAction = "UPDATE_NODE"
 	// DeleteHost - constant for host delete action
 	DeleteHost HostMqAction = "DELETE_HOST"
 	// JoinHostToNetwork - constant for host network join action

+ 1 - 0
models/node.go

@@ -481,6 +481,7 @@ func (newNode *Node) Fill(
 	if newNode.IsFailOver != currentNode.IsFailOver {
 		newNode.IsFailOver = currentNode.IsFailOver
 	}
+	newNode.FailOverPeers = currentNode.FailOverPeers
 	if newNode.Tags == nil {
 		if currentNode.Tags == nil {
 			currentNode.Tags = make(map[TagID]struct{})

+ 1 - 0
models/structs.go

@@ -266,6 +266,7 @@ type HostPull struct {
 	NameServers       []string              `json:"name_servers"`
 	EgressWithDomains []EgressDomain        `json:"egress_with_domains"`
 	DnsNameservers    []Nameserver          `json:"dns_nameservers"`
+	ReplacePeers      bool                  `json:"replace_peers"`
 }
 
 type DefaultGwInfo struct {

+ 39 - 35
mq/handlers.go

@@ -136,42 +136,10 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 			return
 		}
 	case models.DeleteHost:
-		if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-			// delete EMQX credentials for host
-			if err := emqx.DeleteEmqxUser(currentHost.ID.String()); err != nil {
-				slog.Error("failed to remove host credentials from EMQX", "id", currentHost.ID, "error", err)
-			}
-		}
-
-		// notify of deleted peer change
-		go func(host models.Host) {
-			for _, nodeID := range host.Nodes {
-				node, err := logic.GetNodeByID(nodeID)
-				if err == nil {
-					var gwClients []models.ExtClient
-					if node.IsIngressGateway {
-						gwClients = logic.GetGwExtclients(node.ID.String(), node.Network)
-					}
-					go PublishMqUpdatesForDeletedNode(node, false, gwClients)
-				}
-
-			}
-		}(*currentHost)
-
-		if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil {
-			slog.Error("failed to delete all nodes of host", "id", currentHost.ID, "error", err)
-			return
-		}
-		if err := logic.RemoveHostByID(currentHost.ID.String()); err != nil {
-			slog.Error("failed to delete host", "id", currentHost.ID, "error", err)
-			return
-		}
-		if servercfg.IsDNSMode() {
-			logic.SetDNS()
-		}
+		DeleteAndCleanupHost(currentHost)
 		sendPeerUpdate = true
 	case models.SignalHost:
-		signalPeer(hostUpdate.Signal)
+		SignalPeer(hostUpdate.Signal)
 
 	}
 
@@ -183,7 +151,43 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 	}
 }
 
-func signalPeer(signal models.Signal) {
+func DeleteAndCleanupHost(h *models.Host) {
+	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
+		// delete EMQX credentials for host
+		if err := emqx.DeleteEmqxUser(h.ID.String()); err != nil {
+			slog.Error("failed to remove host credentials from EMQX", "id", h.ID, "error", err)
+		}
+	}
+
+	// notify of deleted peer change
+	go func(host models.Host) {
+		for _, nodeID := range host.Nodes {
+			node, err := logic.GetNodeByID(nodeID)
+			if err == nil {
+				var gwClients []models.ExtClient
+				if node.IsIngressGateway {
+					gwClients = logic.GetGwExtclients(node.ID.String(), node.Network)
+				}
+				go PublishMqUpdatesForDeletedNode(node, false, gwClients)
+			}
+
+		}
+	}(*h)
+
+	if err := logic.DisassociateAllNodesFromHost(h.ID.String()); err != nil {
+		slog.Error("failed to delete all nodes of host", "id", h.ID, "error", err)
+		return
+	}
+	if err := logic.RemoveHostByID(h.ID.String()); err != nil {
+		slog.Error("failed to delete host", "id", h.ID, "error", err)
+		return
+	}
+	if servercfg.IsDNSMode() {
+		logic.SetDNS()
+	}
+}
+
+func SignalPeer(signal models.Signal) {
 
 	if signal.ToHostPubKey == "" {
 		msg := "insufficient data to signal peer"