Browse Source

adjusted peer updates to remove peers properly, slowed zombie check

0xdcarns 2 years ago
parent
commit
80fda83c0c
5 changed files with 17 additions and 194 deletions
  1. 4 5
      controllers/ext_client.go
  2. 5 174
      logic/peers.go
  3. 3 3
      logic/zombie.go
  4. 1 1
      mq/handlers.go
  5. 4 11
      mq/publishers.go

+ 4 - 5
controllers/ext_client.go

@@ -389,7 +389,7 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
 	logger.Log(0, r.Header.Get("user"), "created new ext client on network", networkName)
 	w.WriteHeader(http.StatusOK)
 	go func() {
-		err = mq.PublishExtPeerUpdate(&node)
+		err = mq.PublishPeerUpdate()
 		if err != nil {
 			logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error())
 		}
@@ -488,7 +488,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 	logger.Log(0, r.Header.Get("user"), "updated ext client", newExtClient.ClientID)
 	if changedEnabled { // need to send a peer update to the ingress node as enablement of one of it's clients has changed
 		if ingressNode, err := logic.GetNodeByID(newclient.IngressGatewayID); err == nil {
-			if err = mq.PublishExtPeerUpdate(&ingressNode); err != nil {
+			if err = mq.PublishPeerUpdate(); err != nil {
 				logger.Log(1, "error setting ext peers on", ingressNode.ID.String(), ":", err.Error())
 			}
 		}
@@ -567,11 +567,10 @@ func deleteExtClient(w http.ResponseWriter, r *http.Request) {
 	}
 
 	go func() {
-		err = mq.PublishExtPeerUpdate(&ingressnode)
-		if err != nil {
+		if err := mq.PublishPeerUpdate(); err != nil {
 			logger.Log(1, "error setting ext peers on "+ingressnode.ID.String()+": "+err.Error())
 		}
-		if err := mq.PublishDeleteExtClientDNS(&extclient); err != nil {
+		if err = mq.PublishDeleteExtClientDNS(&extclient); err != nil {
 			logger.Log(1, "error publishing dns update for extclient deletion", err.Error())
 		}
 	}()

+ 5 - 174
logic/peers.go

@@ -6,7 +6,6 @@ import (
 	"log"
 	"net"
 	"net/netip"
-	"time"
 
 	"github.com/gravitl/netmaker/database"
 	"github.com/gravitl/netmaker/logger"
@@ -17,177 +16,6 @@ import (
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
-// GetPeersforProxy calculates the peers for a proxy
-// TODO ==========================
-// TODO ==========================
-// TODO ==========================
-// TODO ==========================
-// TODO ==========================
-// revisit this logic with new host/node models.
-func GetPeersForProxy(node *models.Node, onlyPeers bool) (models.ProxyManagerPayload, error) {
-	proxyPayload := models.ProxyManagerPayload{}
-	var peers []wgtypes.PeerConfig
-	peerConfMap := make(map[string]models.PeerConf)
-	var err error
-	currentPeers, err := GetNetworkNodes(node.Network)
-	if err != nil {
-		return proxyPayload, err
-	}
-	if !onlyPeers {
-		if node.IsRelayed {
-			relayNode := FindRelay(node)
-			relayHost, err := GetHost(relayNode.HostID.String())
-			if err != nil {
-				return proxyPayload, err
-			}
-			if relayNode != nil {
-				host, err := GetHost(relayNode.HostID.String())
-				if err != nil {
-					logger.Log(0, "error retrieving host for relay node", relayNode.HostID.String(), err.Error())
-				}
-				relayEndpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayHost.EndpointIP, host.ListenPort))
-				if err != nil {
-					logger.Log(1, "failed to resolve relay node endpoint: ", err.Error())
-				}
-				proxyPayload.IsRelayed = true
-				proxyPayload.RelayedTo = relayEndpoint
-			} else {
-				logger.Log(0, "couldn't find relay node for:  ", node.ID.String())
-			}
-
-		}
-		if node.IsRelay {
-			host, err := GetHost(node.HostID.String())
-			if err != nil {
-				logger.Log(0, "error retrieving host for relay node", node.ID.String(), err.Error())
-			}
-			relayedNodes, err := GetRelayedNodes(node)
-			if err != nil {
-				logger.Log(1, "failed to relayed nodes: ", node.ID.String(), err.Error())
-				proxyPayload.IsRelay = false
-			} else {
-				relayPeersMap := make(map[string]models.RelayedConf)
-				for _, relayedNode := range relayedNodes {
-					relayedNode := relayedNode
-					payload, err := GetPeersForProxy(&relayedNode, true)
-					if err == nil {
-						relayedHost, err := GetHost(relayedNode.HostID.String())
-						if err != nil {
-							logger.Log(0, "error retrieving host for relayNode", relayedNode.ID.String(), err.Error())
-						}
-						relayedEndpoint, udpErr := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayedHost.EndpointIP, host.ListenPort))
-						if udpErr == nil {
-							relayPeersMap[host.PublicKey.String()] = models.RelayedConf{
-								RelayedPeerEndpoint: relayedEndpoint,
-								RelayedPeerPubKey:   relayedHost.PublicKey.String(),
-								Peers:               payload.Peers,
-							}
-						}
-
-					}
-				}
-				proxyPayload.IsRelay = true
-				proxyPayload.RelayedPeerConf = relayPeersMap
-			}
-		}
-
-	}
-
-	for _, peer := range currentPeers {
-		if peer.ID == node.ID {
-			//skip yourself
-			continue
-		}
-		host, err := GetHost(peer.HostID.String())
-		if err != nil {
-			continue
-		}
-		proxyStatus := host.ProxyEnabled
-		listenPort := host.ListenPort
-		if proxyStatus {
-			listenPort = host.ProxyListenPort
-			if listenPort == 0 {
-				listenPort = models.NmProxyPort
-			}
-		} else if listenPort == 0 {
-			listenPort = host.ListenPort
-
-		}
-
-		endpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host.EndpointIP, listenPort))
-		if err != nil {
-			logger.Log(1, "failed to resolve udp addr for node: ", peer.ID.String(), host.EndpointIP.String(), err.Error())
-			continue
-		}
-		allowedips := GetAllowedIPs(node, &peer, nil)
-		var keepalive time.Duration
-		if node.PersistentKeepalive != 0 {
-			// set_keepalive
-			keepalive = node.PersistentKeepalive
-		}
-		peers = append(peers, wgtypes.PeerConfig{
-			PublicKey:                   host.PublicKey,
-			Endpoint:                    endpoint,
-			AllowedIPs:                  allowedips,
-			PersistentKeepaliveInterval: &keepalive,
-			ReplaceAllowedIPs:           true,
-		})
-		peerConfMap[host.PublicKey.String()] = models.PeerConf{
-			Address:          net.ParseIP(peer.PrimaryAddress()),
-			Proxy:            proxyStatus,
-			PublicListenPort: int32(listenPort),
-		}
-
-		if !onlyPeers && peer.IsRelayed {
-			relayNode := FindRelay(&peer)
-			if relayNode != nil {
-				relayHost, err := GetHost(relayNode.HostID.String())
-				if err != nil {
-					logger.Log(0, "error retrieving host for relayNode", relayNode.ID.String(), err.Error())
-					continue
-				}
-				relayTo, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayHost.EndpointIP, relayHost.ListenPort))
-				if err == nil {
-					peerConfMap[host.PublicKey.String()] = models.PeerConf{
-
-						IsRelayed:        true,
-						RelayedTo:        relayTo,
-						Address:          net.ParseIP(peer.PrimaryAddress()),
-						Proxy:            proxyStatus,
-						PublicListenPort: int32(listenPort),
-					}
-				}
-
-			}
-
-		}
-	}
-	if node.IsIngressGateway {
-		var extPeers []wgtypes.PeerConfig
-		extPeers, peerConfMap, err = getExtPeersForProxy(node, peerConfMap)
-		if err == nil {
-			peers = append(peers, extPeers...)
-
-		} else if !database.IsEmptyRecord(err) {
-			logger.Log(1, "error retrieving external clients:", err.Error())
-		}
-	}
-
-	proxyPayload.IsIngress = node.IsIngressGateway
-	addr := node.Address
-	if addr.String() == "" {
-		addr = node.Address6
-	}
-	proxyPayload.Peers = peers
-	proxyPayload.PeerMap = peerConfMap
-	//proxyPayload.Network = node.Network
-	//proxyPayload.InterfaceName = node.Interface
-	//hardcode or read from host ??
-	proxyPayload.InterfaceName = models.WIREGUARD_INTERFACE
-
-	return proxyPayload, nil
-}
-
 // GetProxyUpdateForHost - gets the proxy update for host
 func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error) {
 	proxyPayload := models.ProxyManagerPayload{
@@ -331,7 +159,6 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd
 			if peer.ID == node.ID {
 				logger.Log(2, "peer update, skipping self")
 				//skip yourself
-
 				continue
 			}
 			var peerConfig wgtypes.PeerConfig
@@ -341,7 +168,7 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd
 				return models.HostPeerUpdate{}, err
 			}
 
-			if !peer.Connected || peer.Action == models.NODE_DELETE || peer.PendingDelete {
+			if !peer.Connected {
 				logger.Log(2, "peer update, skipping unconnected node", peer.ID.String())
 				//skip unconnected nodes
 				continue
@@ -421,6 +248,9 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd
 			var nodePeer wgtypes.PeerConfig
 			if _, ok := hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()]; !ok {
 				hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()] = make(map[string]models.IDandAddr)
+				if peer.Action == models.NODE_DELETE || peer.PendingDelete {
+					peerConfig.Remove = true
+				}
 				hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
 				peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
 				hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{
@@ -517,6 +347,7 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd
 	return hostPeerUpdate, nil
 }
 
+// GetPeerListenPort - given a host, retrieve it's appropriate listening port
 func GetPeerListenPort(host *models.Host) int {
 	peerPort := host.ListenPort
 	if host.ProxyEnabled {

+ 3 - 3
logic/zombie.go

@@ -10,8 +10,8 @@ import (
 )
 
 const (
-	// ZOMBIE_TIMEOUT - timeout in seconds for checking zombie status
-	ZOMBIE_TIMEOUT = 60
+	// ZOMBIE_TIMEOUT - timeout in hours for checking zombie status
+	ZOMBIE_TIMEOUT = 6
 	// ZOMBIE_DELETE_TIME - timeout in minutes for zombie node deletion
 	ZOMBIE_DELETE_TIME = 10
 )
@@ -86,7 +86,7 @@ func ManageZombies(ctx context.Context, peerUpdate chan *models.Node) {
 			zombies = append(zombies, id)
 		case id := <-newHostZombie:
 			hostZombies = append(hostZombies, id)
-		case <-time.After(time.Second * ZOMBIE_TIMEOUT):
+		case <-time.After(time.Hour * ZOMBIE_TIMEOUT): // run this check 4 times a day
 			logger.Log(3, "checking for zombie nodes")
 			if len(zombies) > 0 {
 				for i := len(zombies) - 1; i >= 0; i-- {

+ 1 - 1
mq/handlers.go

@@ -227,7 +227,7 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 				logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues")
 				host, err := logic.GetHost(currentNode.HostID.String())
 				if err == nil {
-					if err = PublishSingleHostUpdate(host); err != nil {
+					if err = PublishSingleHostPeerUpdate(host); err != nil {
 						logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
 					}
 				}

+ 4 - 11
mq/publishers.go

@@ -25,7 +25,7 @@ func PublishPeerUpdate() error {
 	}
 	for _, host := range hosts {
 		host := host
-		err = PublishSingleHostUpdate(&host)
+		err = PublishSingleHostPeerUpdate(&host)
 		if err != nil {
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 		}
@@ -33,8 +33,8 @@ func PublishPeerUpdate() error {
 	return err
 }
 
-// PublishSingleHostUpdate --- determines and publishes a peer update to one host
-func PublishSingleHostUpdate(host *models.Host) error {
+// PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
+func PublishSingleHostPeerUpdate(host *models.Host) error {
 
 	peerUpdate, err := logic.GetPeerUpdateForHost("", host)
 	if err != nil {
@@ -56,13 +56,6 @@ func PublishSingleHostUpdate(host *models.Host) error {
 	return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
 }
 
-// PublishExtPeerUpdate --- publishes a peer update to all the peers of a node
-func PublishExtPeerUpdate(node *models.Node) error {
-
-	go PublishPeerUpdate()
-	return nil
-}
-
 // NodeUpdate -- publishes a node update
 func NodeUpdate(node *models.Node) error {
 	host, err := logic.GetHost(node.HostID.String())
@@ -410,7 +403,7 @@ func sendPeers() {
 		if force {
 			host := host
 			logger.Log(2, "sending scheduled peer update (5 min)")
-			err = PublishSingleHostUpdate(&host)
+			err = PublishSingleHostPeerUpdate(&host)
 			if err != nil {
 				logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
 			}