2
0
Эх сурвалжийг харах

rm unused peer update funcs

Abhishek Kondur 2 жил өмнө
parent
commit
71f985939d

+ 6 - 7
controllers/hosts.go

@@ -111,13 +111,12 @@ func pull(w http.ResponseWriter, r *http.Request) {
 		return
 		return
 	}
 	}
 	peers := logic.GetPeerUpdate(host)
 	peers := logic.GetPeerUpdate(host)
-
-	//hPU, err := logic.GetPeerUpdateForHost(context.Background(), "", host, nil, nil)
-	//if err != nil {
-	//logger.Log(0, "could not pull peers for host", hostID)
-	//logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
-	//return
-	//}
+	// hPU, err := logic.GetPeerUpdateForHost(host)
+	if err != nil {
+		logger.Log(0, "could not pull peers for host", hostID)
+		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
+		return
+	}
 	serverConf := servercfg.GetServerInfo()
 	serverConf := servercfg.GetServerInfo()
 	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 		serverConf.MQUserName = hostID
 		serverConf.MQUserName = hostID

+ 73 - 97
logic/peers.go

@@ -1,9 +1,7 @@
 package logic
 package logic
 
 
 import (
 import (
-	"context"
 	"errors"
 	"errors"
-	"fmt"
 	"net"
 	"net"
 	"net/netip"
 	"net/netip"
 
 
@@ -16,22 +14,6 @@ import (
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 )
 
 
-var (
-	// PeerUpdateCtx context to send to host peer updates
-	PeerUpdateCtx context.Context
-	// PeerUpdateStop - the cancel for PeerUpdateCtx
-	PeerUpdateStop context.CancelFunc
-)
-
-// ResetPeerUpdateContext - kills any current peer updates and resets the context
-func ResetPeerUpdateContext() {
-	if PeerUpdateCtx != nil && PeerUpdateStop != nil {
-		PeerUpdateStop() // tell any current peer updates to stop
-	}
-
-	PeerUpdateCtx, PeerUpdateStop = context.WithCancel(context.Background())
-}
-
 func NodePeersInfo(client *models.Client) (models.NodePeersInfo, error) {
 func NodePeersInfo(client *models.Client) (models.NodePeersInfo, error) {
 	nodePeersInfo := models.NodePeersInfo{
 	nodePeersInfo := models.NodePeersInfo{
 		PeerIDs: make(models.PeerMap),
 		PeerIDs: make(models.PeerMap),
@@ -124,7 +106,7 @@ func NodePeersInfo(client *models.Client) (models.NodePeersInfo, error) {
 }
 }
 
 
 // GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks
 // GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks
-func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host, deletedNode *models.Node, deletedClients []models.ExtClient) (models.HostPeerUpdate, error) {
+func GetPeerUpdateForHost(host *models.Host) (models.HostPeerUpdate, error) {
 	if host == nil {
 	if host == nil {
 		return models.HostPeerUpdate{}, errors.New("host is nil")
 		return models.HostPeerUpdate{}, errors.New("host is nil")
 	}
 	}
@@ -155,94 +137,88 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
 		}
 		}
 		currentPeers := GetNetworkNodesMemory(allNodes, node.Network)
 		currentPeers := GetNetworkNodesMemory(allNodes, node.Network)
 		for _, peer := range currentPeers {
 		for _, peer := range currentPeers {
-			select {
-			case <-ctx.Done():
-				logger.Log(2, "cancelled peer update for host", host.Name, host.ID.String())
-				return models.HostPeerUpdate{}, fmt.Errorf("peer update cancelled")
-			default:
-				peer := peer
-				if peer.ID.String() == node.ID.String() {
-					logger.Log(2, "peer update, skipping self")
-					//skip yourself
-					continue
-				}
-				if peer.IsRelayed {
-					// skip relayed peers; will be included in relay peer
-					continue
-				}
-				var peerConfig wgtypes.PeerConfig
-				peerHost, err := GetHost(peer.HostID.String())
-				if err != nil {
-					logger.Log(1, "no peer host", peer.HostID.String(), err.Error())
-					return models.HostPeerUpdate{}, err
-				}
+			peer := peer
+			if peer.ID.String() == node.ID.String() {
+				logger.Log(2, "peer update, skipping self")
+				//skip yourself
+				continue
+			}
+			if peer.IsRelayed {
+				// skip relayed peers; will be included in relay peer
+				continue
+			}
+			var peerConfig wgtypes.PeerConfig
+			peerHost, err := GetHost(peer.HostID.String())
+			if err != nil {
+				logger.Log(1, "no peer host", peer.HostID.String(), err.Error())
+				return models.HostPeerUpdate{}, err
+			}
 
 
-				peerConfig.PublicKey = peerHost.PublicKey
-				peerConfig.PersistentKeepaliveInterval = &peer.PersistentKeepalive
-				peerConfig.ReplaceAllowedIPs = true
-				uselocal := false
-				if host.EndpointIP.String() == peerHost.EndpointIP.String() {
-					// peer is on same network
-					// set to localaddress
-					uselocal = true
-					if node.LocalAddress.IP == nil {
-						// use public endpint
-						uselocal = false
-					}
-					if node.LocalAddress.String() == peer.LocalAddress.String() {
-						uselocal = false
-					}
+			peerConfig.PublicKey = peerHost.PublicKey
+			peerConfig.PersistentKeepaliveInterval = &peer.PersistentKeepalive
+			peerConfig.ReplaceAllowedIPs = true
+			uselocal := false
+			if host.EndpointIP.String() == peerHost.EndpointIP.String() {
+				// peer is on same network
+				// set to localaddress
+				uselocal = true
+				if node.LocalAddress.IP == nil {
+					// use public endpint
+					uselocal = false
 				}
 				}
-				peerConfig.Endpoint = &net.UDPAddr{
-					IP:   peerHost.EndpointIP,
-					Port: getPeerWgListenPort(peerHost),
+				if node.LocalAddress.String() == peer.LocalAddress.String() {
+					uselocal = false
 				}
 				}
+			}
+			peerConfig.Endpoint = &net.UDPAddr{
+				IP:   peerHost.EndpointIP,
+				Port: getPeerWgListenPort(peerHost),
+			}
 
 
-				if uselocal {
-					peerConfig.Endpoint.IP = peer.LocalAddress.IP
-					peerConfig.Endpoint.Port = peerHost.ListenPort
-				}
-				allowedips := GetAllowedIPs(&node, &peer, nil)
-				if peer.IsIngressGateway {
-					for _, entry := range peer.IngressGatewayRange {
-						_, cidr, err := net.ParseCIDR(string(entry))
-						if err == nil {
-							allowedips = append(allowedips, *cidr)
-						}
-					}
-				}
-				if peer.IsEgressGateway {
-					host, err := GetHost(peer.HostID.String())
+			if uselocal {
+				peerConfig.Endpoint.IP = peer.LocalAddress.IP
+				peerConfig.Endpoint.Port = peerHost.ListenPort
+			}
+			allowedips := GetAllowedIPs(&node, &peer, nil)
+			if peer.IsIngressGateway {
+				for _, entry := range peer.IngressGatewayRange {
+					_, cidr, err := net.ParseCIDR(string(entry))
 					if err == nil {
 					if err == nil {
-						allowedips = append(allowedips, getEgressIPs(
-							&models.Client{
-								Host: *host,
-								Node: peer,
-							})...)
+						allowedips = append(allowedips, *cidr)
 					}
 					}
 				}
 				}
-				if peer.Action != models.NODE_DELETE &&
-					!peer.PendingDelete &&
-					peer.Connected &&
-					nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) &&
-					(deletedNode == nil || (deletedNode != nil && peer.ID.String() != deletedNode.ID.String())) {
-					peerConfig.AllowedIPs = allowedips // only append allowed IPs if valid connection
+			}
+			if peer.IsEgressGateway {
+				host, err := GetHost(peer.HostID.String())
+				if err == nil {
+					allowedips = append(allowedips, getEgressIPs(
+						&models.Client{
+							Host: *host,
+							Node: peer,
+						})...)
 				}
 				}
-				if _, ok := peerIndexMap[peerHost.PublicKey.String()]; !ok {
-					hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
-					peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
-					hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
-						Interfaces: peerHost.Interfaces,
-					}
-				} else {
-					peerAllowedIPs := hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs
-					peerAllowedIPs = append(peerAllowedIPs, allowedips...)
-					hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs = peerAllowedIPs
-					hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
-						Interfaces: peerHost.Interfaces,
-					}
+			}
+			if peer.Action != models.NODE_DELETE &&
+				!peer.PendingDelete &&
+				peer.Connected &&
+				nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) {
+				peerConfig.AllowedIPs = allowedips // only append allowed IPs if valid connection
+			}
+			if _, ok := peerIndexMap[peerHost.PublicKey.String()]; !ok {
+				hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
+				peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
+				hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
+					Interfaces: peerHost.Interfaces,
+				}
+			} else {
+				peerAllowedIPs := hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs
+				peerAllowedIPs = append(peerAllowedIPs, allowedips...)
+				hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs = peerAllowedIPs
+				hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
+					Interfaces: peerHost.Interfaces,
 				}
 				}
 			}
 			}
+
 		}
 		}
 
 
 		if node.IsIngressGateway {
 		if node.IsIngressGateway {

+ 1 - 2
mq/handlers.go

@@ -1,7 +1,6 @@
 package mq
 package mq
 
 
 import (
 import (
-	"context"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
 	"math"
 	"math"
@@ -249,7 +248,7 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 			slog.Info("updating peers after node detected connectivity issues", "id", currentNode.ID, "network", currentNode.Network)
 			slog.Info("updating peers after node detected connectivity issues", "id", currentNode.ID, "network", currentNode.Network)
 			host, err := logic.GetHost(currentNode.HostID.String())
 			host, err := logic.GetHost(currentNode.HostID.String())
 			if err == nil {
 			if err == nil {
-				if err = PublishSingleHostPeerUpdate(context.Background(), host, nil, nil); err != nil {
+				if err = PublishSingleHostPeerUpdate(host); err != nil {
 					slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
 					slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
 				}
 				}
 			}
 			}

+ 4 - 51
mq/publishers.go

@@ -1,7 +1,6 @@
 package mq
 package mq
 
 
 import (
 import (
-	"context"
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
@@ -27,54 +26,9 @@ func PublishPeerUpdate() error {
 		logger.Log(1, "err getting all hosts", err.Error())
 		logger.Log(1, "err getting all hosts", err.Error())
 		return err
 		return err
 	}
 	}
-	logic.ResetPeerUpdateContext()
 	for _, host := range hosts {
 	for _, host := range hosts {
 		host := host
 		host := host
-		if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nil, nil); err != nil {
-			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-		}
-	}
-	return err
-}
-
-// PublishDeletedNodePeerUpdate --- determines and publishes a peer update
-// to all the hosts with a deleted node to account for
-func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
-
-	hosts, err := logic.GetAllHosts()
-	if err != nil {
-		logger.Log(1, "err getting all hosts", err.Error())
-		return err
-	}
-	logic.ResetPeerUpdateContext()
-	for _, host := range hosts {
-		host := host
-		if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, delNode, nil); err != nil {
-			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-		}
-	}
-	return err
-}
-
-// PublishDeletedClientPeerUpdate --- determines and publishes a peer update
-// to all the hosts with a deleted ext client to account for
-func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
-
-	hosts, err := logic.GetAllHosts()
-	if err != nil {
-		logger.Log(1, "err getting all hosts", err.Error())
-		return err
-	}
-	logic.ResetPeerUpdateContext()
-	for _, host := range hosts {
-		host := host
-		if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nil, []models.ExtClient{*delClient}); err != nil {
+		if err = PublishSingleHostPeerUpdate(&host); err != nil {
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 		}
 		}
 	}
 	}
@@ -82,9 +36,9 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
 }
 }
 
 
 // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
 // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
-func PublishSingleHostPeerUpdate(ctx context.Context, host *models.Host, deletedNode *models.Node, deletedClients []models.ExtClient) error {
+func PublishSingleHostPeerUpdate(host *models.Host) error {
 
 
-	peerUpdate, err := logic.GetPeerUpdateForHost(ctx, "", host, deletedNode, deletedClients)
+	peerUpdate, err := logic.GetPeerUpdateForHost(host)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -686,11 +640,10 @@ func sendPeers() {
 		//collectServerMetrics(networks[:])
 		//collectServerMetrics(networks[:])
 	}
 	}
 	if force {
 	if force {
-		logic.ResetPeerUpdateContext()
 		for _, host := range hosts {
 		for _, host := range hosts {
 			host := host
 			host := host
 			logger.Log(2, "sending scheduled peer update (5 min)")
 			logger.Log(2, "sending scheduled peer update (5 min)")
-			if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nil, nil); err != nil {
+			if err = PublishSingleHostPeerUpdate(&host); err != nil {
 				logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
 				logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
 			}
 			}
 		}
 		}