Browse Source

Merge pull request #2428 from gravitl/NET-390-scale-latest

NET-390: scaling fixes, send peer update when listen port changes
Alex Feiszli 2 years ago
parent
commit
70a8d0e857
7 changed files with 206 additions and 182 deletions
  1. 7 2
      controllers/hosts.go
  2. 13 3
      controllers/node.go
  3. 5 2
      ee/ee_controllers/relay.go
  4. 15 7
      logic/acls/common.go
  5. 125 151
      logic/peers.go
  6. 18 4
      mq/handlers.go
  7. 23 13
      mq/publishers.go

+ 7 - 2
controllers/hosts.go

@@ -1,7 +1,6 @@
 package controller
 package controller
 
 
 import (
 import (
-	"context"
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
@@ -81,7 +80,13 @@ func pull(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 		return
 	}
 	}
-	hPU, err := logic.GetPeerUpdateForHost(context.Background(), "", host, nil, nil)
+	allNodes, err := logic.GetAllNodes()
+	if err != nil {
+		logger.Log(0, "could not pull peers for host", hostID)
+		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
+		return
+	}
+	hPU, err := logic.GetPeerUpdateForHost("", host, allNodes, nil, nil)
 	if err != nil {
 	if err != nil {
 		logger.Log(0, "could not pull peers for host", hostID)
 		logger.Log(0, "could not pull peers for host", hostID)
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))

+ 13 - 3
controllers/node.go

@@ -1,7 +1,6 @@
 package controller
 package controller
 
 
 import (
 import (
-	"context"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
 	"net/http"
 	"net/http"
@@ -388,7 +387,14 @@ func getNode(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 		return
 	}
 	}
-	hostPeerUpdate, err := logic.GetPeerUpdateForHost(context.Background(), node.Network, host, nil, nil)
+	allNodes, err := logic.GetAllNodes()
+	if err != nil {
+		logger.Log(0, r.Header.Get("user"),
+			fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err))
+		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
+		return
+	}
+	hostPeerUpdate, err := logic.GetPeerUpdateForHost(node.Network, host, allNodes, nil, nil)
 	if err != nil && !database.IsEmptyRecord(err) {
 	if err != nil && !database.IsEmptyRecord(err) {
 		logger.Log(0, r.Header.Get("user"),
 		logger.Log(0, r.Header.Get("user"),
 			fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err))
 			fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err))
@@ -583,9 +589,13 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 	if len(removedClients) > 0 {
 	if len(removedClients) > 0 {
 		host, err := logic.GetHost(node.HostID.String())
 		host, err := logic.GetHost(node.HostID.String())
 		if err == nil {
 		if err == nil {
+			allNodes, err := logic.GetAllNodes()
+			if err != nil {
+				return
+			}
 			go mq.PublishSingleHostPeerUpdate(
 			go mq.PublishSingleHostPeerUpdate(
-				context.Background(),
 				host,
 				host,
+				allNodes,
 				nil,
 				nil,
 				removedClients[:],
 				removedClients[:],
 			)
 			)

+ 5 - 2
ee/ee_controllers/relay.go

@@ -1,7 +1,6 @@
 package ee_controllers
 package ee_controllers
 
 
 import (
 import (
-	"context"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
 	"net/http"
 	"net/http"
@@ -91,8 +90,12 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 			h, err := logic.GetHost(relayedNode.HostID.String())
 			h, err := logic.GetHost(relayedNode.HostID.String())
 			if err == nil {
 			if err == nil {
 				if h.OS == models.OS_Types.IoT {
 				if h.OS == models.OS_Types.IoT {
+					nodes, err := logic.GetAllNodes()
+					if err != nil {
+						return
+					}
 					node.IsRelay = true // for iot update to recognise that it has to delete relay peer
 					node.IsRelay = true // for iot update to recognise that it has to delete relay peer
-					if err = mq.PublishSingleHostPeerUpdate(context.Background(), h, &node, nil); err != nil {
+					if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil); err != nil {
 						logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error())
 						logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error())
 					}
 					}
 				}
 				}

+ 15 - 7
logic/acls/common.go

@@ -37,16 +37,22 @@ func DeleteAclFromCache(containerID ContainerID) {
 
 
 // ACL.Allow - allows access by ID in memory
 // ACL.Allow - allows access by ID in memory
 func (acl ACL) Allow(ID AclID) {
 func (acl ACL) Allow(ID AclID) {
+	aclMutex.Lock()
+	defer aclMutex.Unlock()
 	acl[ID] = Allowed
 	acl[ID] = Allowed
 }
 }
 
 
 // ACL.DisallowNode - disallows access by ID in memory
 // ACL.DisallowNode - disallows access by ID in memory
 func (acl ACL) Disallow(ID AclID) {
 func (acl ACL) Disallow(ID AclID) {
+	aclMutex.Lock()
+	defer aclMutex.Unlock()
 	acl[ID] = NotAllowed
 	acl[ID] = NotAllowed
 }
 }
 
 
 // ACL.Remove - removes a node from a ACL in memory
 // ACL.Remove - removes a node from a ACL in memory
 func (acl ACL) Remove(ID AclID) {
 func (acl ACL) Remove(ID AclID) {
+	aclMutex.Lock()
+	defer aclMutex.Unlock()
 	delete(acl, ID)
 	delete(acl, ID)
 }
 }
 
 
@@ -56,23 +62,25 @@ func (acl ACL) Save(containerID ContainerID, ID AclID) (ACL, error) {
 }
 }
 
 
 // ACL.IsAllowed - sees if ID is allowed in referring ACL
 // ACL.IsAllowed - sees if ID is allowed in referring ACL
-func (acl ACL) IsAllowed(ID AclID) bool {
-	return acl[ID] == Allowed
-}
-
-// ACLContainer.IsAllowed - returns if the current ACL container contains allowed ACLs between two IDs
-func (aclContainer ACLContainer) IsAllowed(ID1, ID2 AclID) bool {
-	return aclContainer[ID1].IsAllowed(ID2) && aclContainer[ID2].IsAllowed(ID1)
+func (acl ACL) IsAllowed(ID AclID) (allowed bool) {
+	aclMutex.RLock()
+	allowed = acl[ID] == Allowed
+	aclMutex.RUnlock()
+	return
 }
 }
 
 
 // ACLContainer.UpdateACL - saves the state of a ACL in the ACLContainer in memory
 // ACLContainer.UpdateACL - saves the state of a ACL in the ACLContainer in memory
 func (aclContainer ACLContainer) UpdateACL(ID AclID, acl ACL) ACLContainer {
 func (aclContainer ACLContainer) UpdateACL(ID AclID, acl ACL) ACLContainer {
+	aclMutex.Lock()
+	defer aclMutex.Unlock()
 	aclContainer[ID] = acl
 	aclContainer[ID] = acl
 	return aclContainer
 	return aclContainer
 }
 }
 
 
 // ACLContainer.RemoveACL - removes the state of a ACL in the ACLContainer in memory
 // ACLContainer.RemoveACL - removes the state of a ACL in the ACLContainer in memory
 func (aclContainer ACLContainer) RemoveACL(ID AclID) ACLContainer {
 func (aclContainer ACLContainer) RemoveACL(ID AclID) ACLContainer {
+	aclMutex.Lock()
+	defer aclMutex.Unlock()
 	delete(aclContainer, ID)
 	delete(aclContainer, ID)
 	return aclContainer
 	return aclContainer
 }
 }

+ 125 - 151
logic/peers.go

@@ -1,9 +1,7 @@
 package logic
 package logic
 
 
 import (
 import (
-	"context"
 	"errors"
 	"errors"
-	"fmt"
 	"net"
 	"net"
 	"net/netip"
 	"net/netip"
 
 
@@ -17,15 +15,8 @@ import (
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 )
 
 
-var (
-	// PeerUpdateCtx context to send to host peer updates
-	PeerUpdateCtx context.Context
-	// PeerUpdateStop - the cancel for PeerUpdateCtx
-	PeerUpdateStop context.CancelFunc
-)
-
 // GetProxyUpdateForHost - gets the proxy update for host
 // GetProxyUpdateForHost - gets the proxy update for host
-func GetProxyUpdateForHost(ctx context.Context, host *models.Host) (models.ProxyManagerPayload, error) {
+func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error) {
 	proxyPayload := models.ProxyManagerPayload{
 	proxyPayload := models.ProxyManagerPayload{
 		Action: models.ProxyUpdate,
 		Action: models.ProxyUpdate,
 	}
 	}
@@ -85,24 +76,12 @@ func GetProxyUpdateForHost(ctx context.Context, host *models.Host) (models.Proxy
 	return proxyPayload, nil
 	return proxyPayload, nil
 }
 }
 
 
-// ResetPeerUpdateContext - kills any current peer updates and resets the context
-func ResetPeerUpdateContext() {
-	if PeerUpdateCtx != nil && PeerUpdateStop != nil {
-		PeerUpdateStop() // tell any current peer updates to stop
-	}
-
-	PeerUpdateCtx, PeerUpdateStop = context.WithCancel(context.Background())
-}
-
 // GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks
 // GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks
-func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host, deletedNode *models.Node, deletedClients []models.ExtClient) (models.HostPeerUpdate, error) {
+func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient) (models.HostPeerUpdate, error) {
 	if host == nil {
 	if host == nil {
 		return models.HostPeerUpdate{}, errors.New("host is nil")
 		return models.HostPeerUpdate{}, errors.New("host is nil")
 	}
 	}
-	allNodes, err := GetAllNodes()
-	if err != nil {
-		return models.HostPeerUpdate{}, err
-	}
+
 	// track which nodes are deleted
 	// track which nodes are deleted
 	// after peer calculation, if peer not in list, add delete config of peer
 	// after peer calculation, if peer not in list, add delete config of peer
 	hostPeerUpdate := models.HostPeerUpdate{
 	hostPeerUpdate := models.HostPeerUpdate{
@@ -195,150 +174,145 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
 			nodePeerMap = make(map[string]models.PeerRouteInfo)
 			nodePeerMap = make(map[string]models.PeerRouteInfo)
 		}
 		}
 		for _, peer := range currentPeers {
 		for _, peer := range currentPeers {
-			select {
-			case <-ctx.Done():
-				logger.Log(2, "cancelled peer update for host", host.Name, host.ID.String())
-				return models.HostPeerUpdate{}, fmt.Errorf("peer update cancelled")
-			default:
-				peer := peer
-				if peer.ID.String() == node.ID.String() {
-					logger.Log(2, "peer update, skipping self")
-					//skip yourself
-					continue
-				}
+			peer := peer
+			if peer.ID.String() == node.ID.String() {
+				logger.Log(2, "peer update, skipping self")
+				//skip yourself
+				continue
+			}
 
 
-				peerHost, err := GetHost(peer.HostID.String())
-				if err != nil {
-					logger.Log(1, "no peer host", peer.HostID.String(), err.Error())
-					return models.HostPeerUpdate{}, err
-				}
-				peerConfig := wgtypes.PeerConfig{
-					PublicKey:                   peerHost.PublicKey,
-					PersistentKeepaliveInterval: &peer.PersistentKeepalive,
-					ReplaceAllowedIPs:           true,
-				}
-				if node.IsIngressGateway || node.IsEgressGateway {
-					if peer.IsIngressGateway {
-						_, extPeerIDAndAddrs, err := getExtPeers(&peer)
-						if err == nil {
-							for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
-								extPeerIdAndAddr := extPeerIdAndAddr
-								nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{
-									PeerAddr: net.IPNet{
-										IP:   net.ParseIP(extPeerIdAndAddr.Address),
-										Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
-									},
-									PeerKey: extPeerIdAndAddr.ID,
-									Allow:   true,
-									ID:      extPeerIdAndAddr.ID,
-								}
+			peerHost, err := GetHost(peer.HostID.String())
+			if err != nil {
+				logger.Log(1, "no peer host", peer.HostID.String(), err.Error())
+				return models.HostPeerUpdate{}, err
+			}
+			peerConfig := wgtypes.PeerConfig{
+				PublicKey:                   peerHost.PublicKey,
+				PersistentKeepaliveInterval: &peer.PersistentKeepalive,
+				ReplaceAllowedIPs:           true,
+			}
+			if node.IsIngressGateway || node.IsEgressGateway {
+				if peer.IsIngressGateway {
+					_, extPeerIDAndAddrs, err := getExtPeers(&peer)
+					if err == nil {
+						for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
+							extPeerIdAndAddr := extPeerIdAndAddr
+							nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{
+								PeerAddr: net.IPNet{
+									IP:   net.ParseIP(extPeerIdAndAddr.Address),
+									Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
+								},
+								PeerKey: extPeerIdAndAddr.ID,
+								Allow:   true,
+								ID:      extPeerIdAndAddr.ID,
 							}
 							}
 						}
 						}
 					}
 					}
-					if node.IsIngressGateway && peer.IsEgressGateway {
-						hostPeerUpdate.IngressInfo.EgressRanges = append(hostPeerUpdate.IngressInfo.EgressRanges,
-							peer.EgressGatewayRanges...)
-					}
-					nodePeerMap[peerHost.PublicKey.String()] = models.PeerRouteInfo{
-						PeerAddr: net.IPNet{
-							IP:   net.ParseIP(peer.PrimaryAddress()),
-							Mask: getCIDRMaskFromAddr(peer.PrimaryAddress()),
-						},
-						PeerKey: peerHost.PublicKey.String(),
-						Allow:   true,
-						ID:      peer.ID.String(),
-					}
 				}
 				}
-				if (node.IsRelayed && node.RelayedBy != peer.ID.String()) || (peer.IsRelayed && peer.RelayedBy != node.ID.String()) {
-					// if node is relayed and peer is not the relay, set remove to true
-					if _, ok := hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()]; ok {
-						continue
-					}
-					peerConfig.Remove = true
-					hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
-					peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
+				if node.IsIngressGateway && peer.IsEgressGateway {
+					hostPeerUpdate.IngressInfo.EgressRanges = append(hostPeerUpdate.IngressInfo.EgressRanges,
+						peer.EgressGatewayRanges...)
+				}
+				nodePeerMap[peerHost.PublicKey.String()] = models.PeerRouteInfo{
+					PeerAddr: net.IPNet{
+						IP:   net.ParseIP(peer.PrimaryAddress()),
+						Mask: getCIDRMaskFromAddr(peer.PrimaryAddress()),
+					},
+					PeerKey: peerHost.PublicKey.String(),
+					Allow:   true,
+					ID:      peer.ID.String(),
+				}
+			}
+			if (node.IsRelayed && node.RelayedBy != peer.ID.String()) || (peer.IsRelayed && peer.RelayedBy != node.ID.String()) {
+				// if node is relayed and peer is not the relay, set remove to true
+				if _, ok := hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()]; ok {
 					continue
 					continue
 				}
 				}
+				peerConfig.Remove = true
+				hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
+				peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
+				continue
+			}
 
 
-				uselocal := false
-				if host.EndpointIP.String() == peerHost.EndpointIP.String() {
-					// peer is on same network
-					// set to localaddress
-					uselocal = true
-					if node.LocalAddress.IP == nil {
-						// use public endpint
-						uselocal = false
-					}
-					if node.LocalAddress.String() == peer.LocalAddress.String() {
-						uselocal = false
-					}
+			uselocal := false
+			if host.EndpointIP.String() == peerHost.EndpointIP.String() {
+				// peer is on same network
+				// set to localaddress
+				uselocal = true
+				if node.LocalAddress.IP == nil {
+					// use public endpint
+					uselocal = false
 				}
 				}
-				peerConfig.Endpoint = &net.UDPAddr{
-					IP:   peerHost.EndpointIP,
-					Port: getPeerWgListenPort(peerHost),
+				if node.LocalAddress.String() == peer.LocalAddress.String() {
+					uselocal = false
 				}
 				}
+			}
+			peerConfig.Endpoint = &net.UDPAddr{
+				IP:   peerHost.EndpointIP,
+				Port: getPeerWgListenPort(peerHost),
+			}
 
 
-				if uselocal {
-					peerConfig.Endpoint.IP = peer.LocalAddress.IP
-					peerConfig.Endpoint.Port = peerHost.ListenPort
+			if uselocal {
+				peerConfig.Endpoint.IP = peer.LocalAddress.IP
+				peerConfig.Endpoint.Port = peerHost.ListenPort
+			}
+			allowedips := GetAllowedIPs(&node, &peer, nil)
+			if peer.Action != models.NODE_DELETE &&
+				!peer.PendingDelete &&
+				peer.Connected &&
+				nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) &&
+				(deletedNode == nil || (deletedNode != nil && peer.ID.String() != deletedNode.ID.String())) {
+				peerConfig.AllowedIPs = allowedips // only append allowed IPs if valid connection
+			}
+
+			peerProxyPort := GetProxyListenPort(peerHost)
+			var nodePeer wgtypes.PeerConfig
+			if _, ok := hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()]; !ok {
+				hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()] = make(map[string]models.IDandAddr)
+				hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
+				peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
+				hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{
+					ID:              peer.ID.String(),
+					Address:         peer.PrimaryAddress(),
+					Name:            peerHost.Name,
+					Network:         peer.Network,
+					ProxyListenPort: peerProxyPort,
 				}
 				}
-				allowedips := GetAllowedIPs(&node, &peer, nil)
-				if peer.Action != models.NODE_DELETE &&
-					!peer.PendingDelete &&
-					peer.Connected &&
-					nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) &&
-					(deletedNode == nil || (deletedNode != nil && peer.ID.String() != deletedNode.ID.String())) {
-					peerConfig.AllowedIPs = allowedips // only append allowed IPs if valid connection
+				hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
+					Interfaces:      peerHost.Interfaces,
+					ProxyListenPort: peerProxyPort,
 				}
 				}
-
-				peerProxyPort := GetProxyListenPort(peerHost)
-				var nodePeer wgtypes.PeerConfig
-				if _, ok := hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()]; !ok {
-					hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()] = make(map[string]models.IDandAddr)
-					hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
-					peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
-					hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{
-						ID:              peer.ID.String(),
-						Address:         peer.PrimaryAddress(),
-						Name:            peerHost.Name,
-						Network:         peer.Network,
-						ProxyListenPort: peerProxyPort,
-					}
-					hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
-						Interfaces:      peerHost.Interfaces,
-						ProxyListenPort: peerProxyPort,
-					}
-					nodePeer = peerConfig
-				} else {
-					peerAllowedIPs := hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs
-					peerAllowedIPs = append(peerAllowedIPs, peerConfig.AllowedIPs...)
-					hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs = peerAllowedIPs
-					hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].Remove = false
-					hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{
-						ID:              peer.ID.String(),
-						Address:         peer.PrimaryAddress(),
-						Name:            peerHost.Name,
-						Network:         peer.Network,
-						ProxyListenPort: GetProxyListenPort(peerHost),
-					}
-					hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
-						Interfaces:      peerHost.Interfaces,
-						ProxyListenPort: peerProxyPort,
-					}
-					nodePeer = hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]]
+				nodePeer = peerConfig
+			} else {
+				peerAllowedIPs := hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs
+				peerAllowedIPs = append(peerAllowedIPs, peerConfig.AllowedIPs...)
+				hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs = peerAllowedIPs
+				hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].Remove = false
+				hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{
+					ID:              peer.ID.String(),
+					Address:         peer.PrimaryAddress(),
+					Name:            peerHost.Name,
+					Network:         peer.Network,
+					ProxyListenPort: GetProxyListenPort(peerHost),
 				}
 				}
+				hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
+					Interfaces:      peerHost.Interfaces,
+					ProxyListenPort: peerProxyPort,
+				}
+				nodePeer = hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]]
+			}
 
 
-				if node.Network == network { // add to peers map for metrics
-					hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()] = models.IDandAddr{
-						ID:              peer.ID.String(),
-						Address:         peer.PrimaryAddress(),
-						Name:            peerHost.Name,
-						Network:         peer.Network,
-						ProxyListenPort: peerHost.ProxyListenPort,
-					}
-					hostPeerUpdate.NodePeers = append(hostPeerUpdate.NodePeers, nodePeer)
+			if node.Network == network { // add to peers map for metrics
+				hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()] = models.IDandAddr{
+					ID:              peer.ID.String(),
+					Address:         peer.PrimaryAddress(),
+					Name:            peerHost.Name,
+					Network:         peer.Network,
+					ProxyListenPort: peerHost.ProxyListenPort,
 				}
 				}
+				hostPeerUpdate.NodePeers = append(hostPeerUpdate.NodePeers, nodePeer)
 			}
 			}
+			//}
 		}
 		}
 		var extPeers []wgtypes.PeerConfig
 		var extPeers []wgtypes.PeerConfig
 		var extPeerIDAndAddrs []models.IDandAddr
 		var extPeerIDAndAddrs []models.IDandAddr

+ 18 - 4
mq/handlers.go

@@ -1,7 +1,6 @@
 package mq
 package mq
 
 
 import (
 import (
-	"context"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
 	"math"
 	"math"
@@ -107,7 +106,11 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 						return
 						return
 					}
 					}
 				}
 				}
-				if err = PublishSingleHostPeerUpdate(context.Background(), currentHost, nil, nil); err != nil {
+				nodes, err := logic.GetAllNodes()
+				if err != nil {
+					return
+				}
+				if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil); err != nil {
 					slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
 					slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
 					return
 					return
 				}
 				}
@@ -235,7 +238,11 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 			slog.Info("updating peers after node detected connectivity issues", "id", currentNode.ID, "network", currentNode.Network)
 			slog.Info("updating peers after node detected connectivity issues", "id", currentNode.ID, "network", currentNode.Network)
 			host, err := logic.GetHost(currentNode.HostID.String())
 			host, err := logic.GetHost(currentNode.HostID.String())
 			if err == nil {
 			if err == nil {
-				if err = PublishSingleHostPeerUpdate(context.Background(), host, nil, nil); err != nil {
+				nodes, err := logic.GetAllNodes()
+				if err != nil {
+					return
+				}
+				if err = PublishSingleHostPeerUpdate(host, nodes, nil, nil); err != nil {
 					slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
 					slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
 				}
 				}
 			}
 			}
@@ -438,12 +445,19 @@ func handleHostCheckin(h, currentHost *models.Host) bool {
 	ifaceDelta := len(h.Interfaces) != len(currentHost.Interfaces) ||
 	ifaceDelta := len(h.Interfaces) != len(currentHost.Interfaces) ||
 		!h.EndpointIP.Equal(currentHost.EndpointIP) ||
 		!h.EndpointIP.Equal(currentHost.EndpointIP) ||
 		(len(h.NatType) > 0 && h.NatType != currentHost.NatType) ||
 		(len(h.NatType) > 0 && h.NatType != currentHost.NatType) ||
-		h.DefaultInterface != currentHost.DefaultInterface
+		h.DefaultInterface != currentHost.DefaultInterface ||
+		(h.ListenPort != 0 && h.ListenPort != currentHost.ListenPort) || (h.WgPublicListenPort != 0 && h.WgPublicListenPort != currentHost.WgPublicListenPort)
 	if ifaceDelta { // only save if something changes
 	if ifaceDelta { // only save if something changes
 		currentHost.EndpointIP = h.EndpointIP
 		currentHost.EndpointIP = h.EndpointIP
 		currentHost.Interfaces = h.Interfaces
 		currentHost.Interfaces = h.Interfaces
 		currentHost.DefaultInterface = h.DefaultInterface
 		currentHost.DefaultInterface = h.DefaultInterface
 		currentHost.NatType = h.NatType
 		currentHost.NatType = h.NatType
+		if h.ListenPort != 0 {
+			currentHost.ListenPort = h.ListenPort
+		}
+		if h.WgPublicListenPort != 0 {
+			currentHost.WgPublicListenPort = h.WgPublicListenPort
+		}
 		if err := logic.UpsertHost(currentHost); err != nil {
 		if err := logic.UpsertHost(currentHost); err != nil {
 			slog.Error("failed to update host after check-in", "name", h.Name, "id", h.ID, "error", err)
 			slog.Error("failed to update host after check-in", "name", h.Name, "id", h.ID, "error", err)
 			return false
 			return false

+ 23 - 13
mq/publishers.go

@@ -1,7 +1,6 @@
 package mq
 package mq
 
 
 import (
 import (
-	"context"
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
@@ -24,10 +23,13 @@ func PublishPeerUpdate() error {
 		logger.Log(1, "err getting all hosts", err.Error())
 		logger.Log(1, "err getting all hosts", err.Error())
 		return err
 		return err
 	}
 	}
-	logic.ResetPeerUpdateContext()
+	allNodes, err := logic.GetAllNodes()
+	if err != nil {
+		return err
+	}
 	for _, host := range hosts {
 	for _, host := range hosts {
 		host := host
 		host := host
-		if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nil, nil); err != nil {
+		if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil); err != nil {
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 		}
 		}
 	}
 	}
@@ -46,10 +48,13 @@ func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
 		logger.Log(1, "err getting all hosts", err.Error())
 		logger.Log(1, "err getting all hosts", err.Error())
 		return err
 		return err
 	}
 	}
-	logic.ResetPeerUpdateContext()
+	allNodes, err := logic.GetAllNodes()
+	if err != nil {
+		return err
+	}
 	for _, host := range hosts {
 	for _, host := range hosts {
 		host := host
 		host := host
-		if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, delNode, nil); err != nil {
+		if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil); err != nil {
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 		}
 		}
 	}
 	}
@@ -68,10 +73,13 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
 		logger.Log(1, "err getting all hosts", err.Error())
 		logger.Log(1, "err getting all hosts", err.Error())
 		return err
 		return err
 	}
 	}
-	logic.ResetPeerUpdateContext()
+	nodes, err := logic.GetAllNodes()
+	if err != nil {
+		return err
+	}
 	for _, host := range hosts {
 	for _, host := range hosts {
 		host := host
 		host := host
-		if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nil, []models.ExtClient{*delClient}); err != nil {
+		if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}); err != nil {
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 		}
 		}
 	}
 	}
@@ -79,9 +87,9 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
 }
 }
 
 
 // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
 // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
-func PublishSingleHostPeerUpdate(ctx context.Context, host *models.Host, deletedNode *models.Node, deletedClients []models.ExtClient) error {
+func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient) error {
 
 
-	peerUpdate, err := logic.GetPeerUpdateForHost(ctx, "", host, deletedNode, deletedClients)
+	peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -89,7 +97,7 @@ func PublishSingleHostPeerUpdate(ctx context.Context, host *models.Host, deleted
 		return nil
 		return nil
 	}
 	}
 	if host.OS != models.OS_Types.IoT {
 	if host.OS != models.OS_Types.IoT {
-		proxyUpdate, err := logic.GetProxyUpdateForHost(ctx, host)
+		proxyUpdate, err := logic.GetProxyUpdateForHost(host)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
@@ -438,7 +446,10 @@ func sendPeers() {
 	if err != nil && len(hosts) > 0 {
 	if err != nil && len(hosts) > 0 {
 		logger.Log(1, "error retrieving networks for keepalive", err.Error())
 		logger.Log(1, "error retrieving networks for keepalive", err.Error())
 	}
 	}
-
+	nodes, err := logic.GetAllNodes()
+	if err != nil {
+		return
+	}
 	var force bool
 	var force bool
 	peer_force_send++
 	peer_force_send++
 	if peer_force_send == 5 {
 	if peer_force_send == 5 {
@@ -453,11 +464,10 @@ func sendPeers() {
 		//collectServerMetrics(networks[:])
 		//collectServerMetrics(networks[:])
 	}
 	}
 	if force {
 	if force {
-		logic.ResetPeerUpdateContext()
 		for _, host := range hosts {
 		for _, host := range hosts {
 			host := host
 			host := host
 			logger.Log(2, "sending scheduled peer update (5 min)")
 			logger.Log(2, "sending scheduled peer update (5 min)")
-			if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nil, nil); err != nil {
+			if err = PublishSingleHostPeerUpdate(&host, nodes, nil, nil); err != nil {
 				logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
 				logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
 			}
 			}
 		}
 		}