Răsfoiți Sursa

reduced peer updates - first draft

0xdcarns 2 ani în urmă
părinte
comite
b4da4a74ac
8 a modificat fișierele cu 67 adăugiri și 135 ștergeri
  1. 0 3
      controllers/enrollmentkeys.go
  2. 1 1
      controllers/hosts.go
  3. 2 2
      controllers/relay.go
  4. 0 3
      logic/hosts.go
  5. 9 84
      logic/peers.go
  6. 1 1
      models/mqtt.go
  7. 6 4
      mq/handlers.go
  8. 48 37
      mq/publishers.go

+ 0 - 3
controllers/enrollmentkeys.go

@@ -240,8 +240,5 @@ func checkNetRegAndHostUpdate(networks []string, h *models.Host) {
 			Action: models.RequestAck,
 			Host:   *h,
 		})
-		if err := mq.PublishPeerUpdateForHost("", h, nil, nil); err != nil {
-			logger.Log(0, "failed to publish peer update during registration -", err.Error())
-		}
 	}
 }

+ 1 - 1
controllers/hosts.go

@@ -107,7 +107,7 @@ func updateHost(w http.ResponseWriter, r *http.Request) {
 		logger.Log(0, r.Header.Get("user"), "failed to send host update: ", currHost.ID.String(), err.Error())
 	}
 	go func() {
-		if err := mq.PublishPeerUpdateForHost("", newHost, nil, nil); err != nil {
+		if err := mq.PublishPeerUpdateForHost("", newHost, nil, nil, false); err != nil {
 			logger.Log(0, "fail to publish peer update: ", err.Error())
 		}
 		if newHost.Name != currHost.Name {

+ 2 - 2
controllers/relay.go

@@ -142,7 +142,7 @@ func createHostRelay(w http.ResponseWriter, r *http.Request) {
 				logger.Log(0, "failed to send host update: ", relatedHost.ID.String(), err.Error())
 			}
 		}
-		if err := mq.PublishPeerUpdateForHost("", relayHost, nil, nil); err != nil {
+		if err := mq.PublishPeerUpdateForHost("", relayHost, nil, nil, false); err != nil {
 			logger.Log(0, "fail to publish peer update: ", err.Error())
 		}
 	}(relayHost)
@@ -175,7 +175,7 @@ func deleteHostRelay(w http.ResponseWriter, r *http.Request) {
 	}
 	logger.Log(1, r.Header.Get("user"), "deleted relay host", hostid)
 	go func() {
-		if err := mq.PublishPeerUpdateForHost("", relayHost, nil, nil); err != nil {
+		if err := mq.PublishPeerUpdateForHost("", relayHost, nil, nil, false); err != nil {
 			logger.Log(0, "failed to update peers after relay delete:", err.Error())
 		}
 	}()

+ 0 - 3
logic/hosts.go

@@ -4,7 +4,6 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
-	"log"
 
 	"github.com/google/uuid"
 	"github.com/gravitl/netmaker/database"
@@ -99,11 +98,9 @@ func CreateHost(h *models.Host) error {
 	h.HostPass = string(hash)
 	// if another server has already updated proxyenabled, leave it alone
 	if !h.ProxyEnabledSet {
-		log.Println("checking default proxy", servercfg.GetServerConfig().DefaultProxyMode)
 		if servercfg.GetServerConfig().DefaultProxyMode.Set {
 			h.ProxyEnabledSet = true
 			h.ProxyEnabled = servercfg.GetServerConfig().DefaultProxyMode.Value
-			log.Println("set proxy enabled to ", h.ProxyEnabled)
 		}
 	}
 	checkForZombieHosts(h)

+ 9 - 84
logic/peers.go

@@ -57,7 +57,6 @@ func GetProxyUpdateForHost(ctx context.Context, host *models.Host) (models.Proxy
 						Peers:               payload.Peers,
 					}
 				}
-
 			}
 		}
 		proxyPayload.IsRelay = true
@@ -130,15 +129,6 @@ func GetProxyUpdateForHost(ctx context.Context, host *models.Host) (models.Proxy
 	return proxyPayload, nil
 }
 
-// ResetPeerUpdateContext - kills any current peer updates and resets the context
-func ResetPeerUpdateContext() {
-	if PeerUpdateCtx != nil && PeerUpdateStop != nil {
-		PeerUpdateStop() // tell any current peer updates to stop
-	}
-
-	PeerUpdateCtx, PeerUpdateStop = context.WithCancel(context.Background())
-}
-
 // GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks
 func GetPeerUpdateForHost(ctx context.Context, network string, hostToSend *models.Host, deletedNode *models.Node, deletedClient *models.ExtClient) (models.HostPeerUpdate, error) {
 	if hostToSend == nil {
@@ -151,7 +141,6 @@ func GetPeerUpdateForHost(ctx context.Context, network string, hostToSend *model
 	// track which nodes are deleted
 	// after peer calculation, if peer not in list, add delete config of peer
 	hostPeerUpdate := initHostPeerUpdate(hostToSend)
-
 	logger.Log(1, "peer update for host", hostToSend.ID.String())
 	peerIndexMap := make(map[string]int)
 	for _, nodeID := range hostToSend.Nodes {
@@ -190,27 +179,10 @@ func GetPeerUpdateForHost(ctx context.Context, network string, hostToSend *model
 				peerConfig.PublicKey = peerHost.PublicKey
 				peerConfig.PersistentKeepaliveInterval = &peer.PersistentKeepalive
 				peerConfig.ReplaceAllowedIPs = true
-				uselocal := false
-				if hostToSend.EndpointIP.String() == peerHost.EndpointIP.String() {
-					// peer is on same network
-					// set to localaddress
-					uselocal = true
-					if node.LocalAddress.IP == nil {
-						// use public endpint
-						uselocal = false
-					}
-					if node.LocalAddress.String() == peer.LocalAddress.String() {
-						uselocal = false
-					}
-				}
 				peerConfig.Endpoint = &net.UDPAddr{
 					IP:   peerHost.EndpointIP,
 					Port: GetPeerListenPort(peerHost),
 				}
-
-				if uselocal {
-					peerConfig.Endpoint.IP = peer.LocalAddress.IP
-				}
 				allowedips := GetAllowedIPs(&node, &peer, nil)
 				if peer.IsIngressGateway {
 					for _, entry := range peer.IngressGatewayRange {
@@ -311,56 +283,8 @@ func GetPeerUpdateForHost(ctx context.Context, network string, hostToSend *model
 					hostPeerUpdate.NodePeers = append(hostPeerUpdate.NodePeers, nodePeer)
 				}
 			}
-			var extPeers []wgtypes.PeerConfig
-			var extPeerIDAndAddrs []models.IDandAddr
 			if node.IsIngressGateway {
-				extPeers, extPeerIDAndAddrs, err = getExtPeers(&node)
-				if err == nil {
-					for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
-						extPeerIdAndAddr := extPeerIdAndAddr
-						nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{
-							PeerAddr: net.IPNet{
-								IP:   net.ParseIP(extPeerIdAndAddr.Address),
-								Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
-							},
-							PeerKey: extPeerIdAndAddr.ID,
-							Allow:   true,
-							ID:      extPeerIdAndAddr.ID,
-						}
-					}
-					hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, extPeers...)
-					for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
-						extPeerIdAndAddr := extPeerIdAndAddr
-						hostPeerUpdate.HostPeerIDs[extPeerIdAndAddr.ID] = make(map[string]models.IDandAddr)
-						hostPeerUpdate.HostPeerIDs[extPeerIdAndAddr.ID][extPeerIdAndAddr.ID] = models.IDandAddr{
-							ID:      extPeerIdAndAddr.ID,
-							Address: extPeerIdAndAddr.Address,
-							Name:    extPeerIdAndAddr.Name,
-							Network: node.Network,
-						}
-
-						hostPeerUpdate.IngressInfo.ExtPeers[extPeerIdAndAddr.ID] = models.ExtClientInfo{
-							Masquerade: true,
-							IngGwAddr: net.IPNet{
-								IP:   net.ParseIP(node.PrimaryAddress()),
-								Mask: getCIDRMaskFromAddr(node.PrimaryAddress()),
-							},
-							Network: node.PrimaryNetworkRange(),
-							ExtPeerAddr: net.IPNet{
-								IP:   net.ParseIP(extPeerIdAndAddr.Address),
-								Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
-							},
-							ExtPeerKey: extPeerIdAndAddr.ID,
-							Peers:      filterNodeMapForClientACLs(extPeerIdAndAddr.ID, node.Network, nodePeerMap),
-						}
-						if node.Network == network {
-							hostPeerUpdate.PeerIDs[extPeerIdAndAddr.ID] = extPeerIdAndAddr
-							hostPeerUpdate.NodePeers = append(hostPeerUpdate.NodePeers, extPeers...)
-						}
-					}
-				} else if !database.IsEmptyRecord(err) {
-					logger.Log(1, "error retrieving external clients:", err.Error())
-				}
+				getIngressNodeAllowedIPs(network, &node, &hostPeerUpdate, nodePeerMap)
 			}
 			if node.IsEgressGateway {
 				hostPeerUpdate.EgressInfo[node.ID.String()] = models.EgressInfo{
@@ -376,6 +300,7 @@ func GetPeerUpdateForHost(ctx context.Context, network string, hostToSend *model
 			}
 		}
 	}
+
 	// == post peer calculations ==
 	// indicate removal if no allowed IPs were calculated
 	for i := range hostPeerUpdate.Peers {
@@ -407,14 +332,15 @@ func GetPeerUpdateForHost(ctx context.Context, network string, hostToSend *model
 	return hostPeerUpdate, nil
 }
 
-// GetPeerUpdateForSingleHost - gets the consolidated peer update a single a host <-> host
+// GetPeerUpdateOfSingleHost - gets the consolidated peer update a single a host <-> host
 // from all networks
-func GetPeerUpdateForSingleHost(
+func GetPeerUpdateOfSingleHost(
 	network string,
 	hostToSend, updatedHost *models.Host,
 	updatedHostNodes []models.Node,
 	deletedNode *models.Node,
-	deletedClient *models.ExtClient) (models.HostPeerUpdate, error) {
+	deletedClient *models.ExtClient,
+	deleteHost bool) (models.HostPeerUpdate, error) {
 
 	if hostToSend == nil || updatedHost == nil || len(updatedHostNodes) == 0 {
 		return models.HostPeerUpdate{}, errors.New("host is nil")
@@ -437,9 +363,8 @@ func GetPeerUpdateForSingleHost(
 		}
 		for _, peer := range updatedHostNodes {
 			peer := peer
-			if peer.ID.String() == node.ID.String() {
+			if peer.ID.String() == node.ID.String() { // skip yourself - should never occur
 				logger.Log(2, "peer update, skipping self")
-				//skip yourself
 				continue
 			}
 			var peerConfig wgtypes.PeerConfig
@@ -572,7 +497,8 @@ func GetPeerUpdateForSingleHost(
 	// indicate removal if no allowed IPs were calculated
 	for i := range hostPeerUpdate.Peers {
 		peer := hostPeerUpdate.Peers[i]
-		if len(peer.AllowedIPs) == 0 {
+		if len(peer.AllowedIPs) == 0 ||
+			(deleteHost && peer.PublicKey.String() == updatedHost.PublicKey.String()) {
 			peer.Remove = true
 		}
 		hostPeerUpdate.Peers[i] = peer
@@ -894,7 +820,6 @@ func filterNodeMapForClientACLs(publicKey, network string, nodePeerMap map[strin
 
 func initHostPeerUpdate(h *models.Host) models.HostPeerUpdate {
 	return models.HostPeerUpdate{
-		Host:          *h,
 		Server:        servercfg.GetServer(),
 		HostPeerIDs:   make(models.HostPeerMap, 0),
 		ServerVersion: servercfg.GetVersion(),

+ 1 - 1
models/mqtt.go

@@ -8,7 +8,7 @@ import (
 
 // HostPeerUpdate - struct for host peer updates
 type HostPeerUpdate struct {
-	Host            Host                 `json:"host" bson:"host" yaml:"host"`
+	Host            *Host                `json:"host,omitempty" bson:"host,omitempty" yaml:"host,omitempty"`
 	Server          string               `json:"server" bson:"server" yaml:"server"`
 	ServerVersion   string               `json:"serverversion" bson:"serverversion" yaml:"serverversion"`
 	ServerAddrs     []ServerAddr         `json:"serveraddrs" bson:"serveraddrs" yaml:"serveraddrs"`

+ 6 - 4
mq/handlers.go

@@ -185,7 +185,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 				logger.Log(0, "failed to send new node to host", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
 				return
 			} else {
-				if err = PublishPeerUpdateForHost("", currentHost, nil, nil); err != nil {
+				if err = PublishPeerUpdateForHost("", currentHost, nil, nil, false); err != nil {
 					logger.Log(0, "failed peers publish after join acknowledged", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
 					return
 				}
@@ -210,6 +210,9 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 				return
 			}
 		}
+		if err := PublishPeerUpdateForHost("", currentHost, nil, nil, true); err != nil {
+			logger.Log(0, "failed to publish peer update: ", err.Error())
+		}
 		if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil {
 			logger.Log(0, "failed to delete all nodes of host: ", currentHost.ID.String(), err.Error())
 			return
@@ -218,13 +221,12 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 			logger.Log(0, "failed to delete host: ", currentHost.ID.String(), err.Error())
 			return
 		}
-		sendPeerUpdate = true
 	}
 
 	if sendPeerUpdate {
-		err := PublishPeerUpdateForHost("", &hostUpdate.Host, nil, nil)
+		err := PublishPeerUpdateForHost("", &hostUpdate.Host, nil, nil, false)
 		if err != nil {
-			logger.Log(0, "failed to pulish peer update: ", err.Error())
+			logger.Log(0, "failed to publish peer update: ", err.Error())
 		}
 	}
 	// if servercfg.Is_EE && ifaceDelta {

+ 48 - 37
mq/publishers.go

@@ -1,6 +1,7 @@
 package mq
 
 import (
+	"context"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -27,6 +28,7 @@ func PublishPeerUpdateForClient(network string, c *models.ExtClient, deleted boo
 		h,
 		nil,
 		deletedClient,
+		false,
 	)
 }
 
@@ -45,31 +47,34 @@ func PublishPeerUpdateForNode(network string, n *models.Node, deleted bool) erro
 		h,
 		deletedNode,
 		nil,
+		false,
 	)
 }
 
 // PublishPeerUpdateForHost - publishes a peer update to affected hosts on behalf of an updated host
-func PublishPeerUpdateForHost(network string, updatedHost *models.Host, deletedNode *models.Node, deletedClient *models.ExtClient) error {
+func PublishPeerUpdateForHost(network string, updatedHost *models.Host, deletedNode *models.Node, deletedClient *models.ExtClient, deleteHost bool) error {
 
 	hostsToSend := logic.GetRelatedHosts(updatedHost.ID.String())
 	currentHostNodes := logic.GetNodesByHost(updatedHost)
 	serverConf := servercfg.GetServerConfig()
-	for i := range hostsToSend {
+	for i := range hostsToSend { // calculate peers for other peers
 		hostToSend := hostsToSend[i]
-		peerUpdate, err := logic.GetPeerUpdateForSingleHost(
+		peerUpdate, err := logic.GetPeerUpdateOfSingleHost(
 			network,
 			&hostToSend,
 			updatedHost,
-			currentHostNodes,
+			currentHostNodes[:],
 			deletedNode,
 			deletedClient,
+			deleteHost,
 		)
 		if err != nil {
-			logger.Log(0, "failed to send peer update to host", hostToSend.Name, hostToSend.ID.String(), err.Error())
+			logger.Log(0, "failed to gather peer update for host", hostToSend.Name, hostToSend.ID.String(), err.Error())
+			continue
 		}
 		data, err := json.Marshal(&peerUpdate)
 		if err != nil {
-			logger.Log(0, "failed to send peer update to host", hostToSend.Name, hostToSend.ID.String(), err.Error())
+			logger.Log(0, "failed to JSON peer update to host", hostToSend.Name, hostToSend.ID.String(), err.Error())
 			continue
 		}
 		if err = publish(&hostToSend,
@@ -80,34 +85,17 @@ func PublishPeerUpdateForHost(network string, updatedHost *models.Host, deletedN
 		}
 	}
 
-	return nil
-}
+	if deleteHost {
+		return nil
+	}
 
-// // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
-// func PublishSingleHostPeerUpdate(ctx context.Context, host *models.Host, deletedNode *models.Node, deletedClient *models.ExtClient) error {
-
-// 	peerUpdate, err := logic.GetPeerUpdateForHost(ctx, "", host, deletedNode, deletedClient)
-// 	if err != nil {
-// 		return err
-// 	}
-// 	if len(peerUpdate.Peers) == 0 { // no peers to send
-// 		return nil
-// 	}
-// 	if host.ProxyEnabled {
-// 		proxyUpdate, err := logic.GetProxyUpdateForHost(ctx, host)
-// 		if err != nil {
-// 			return err
-// 		}
-// 		proxyUpdate.Action = models.ProxyUpdate
-// 		peerUpdate.ProxyUpdate = proxyUpdate
-// 	}
-
-// 	data, err := json.Marshal(&peerUpdate)
-// 	if err != nil {
-// 		return err
-// 	}
-// 	return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
-// }
+	return publishSingleHostPeerUpdate( // calculate for updated host
+		context.Background(),
+		updatedHost,
+		deletedNode,
+		deletedClient,
+	)
+}
 
 // NodeUpdate -- publishes a node update
 func NodeUpdate(node *models.Node) error {
@@ -448,17 +436,40 @@ func sendPeers() {
 		if err != nil {
 			logger.Log(3, "error occurred on timer,", err.Error())
 		}
-
-		//collectServerMetrics(networks[:])
 	}
 	if force {
-		logic.ResetPeerUpdateContext()
 		for _, host := range hosts {
 			host := host
 			logger.Log(2, "sending scheduled peer update (5 min)")
-			if err = PublishPeerUpdateForHost("", &host, nil, nil); err != nil {
+			if err = PublishPeerUpdateForHost("", &host, nil, nil, false); err != nil {
 				logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
 			}
 		}
 	}
 }
+
+// publishSingleHostPeerUpdate --- determines and publishes a peer update to one host
+func publishSingleHostPeerUpdate(ctx context.Context, host *models.Host, deletedNode *models.Node, deletedClient *models.ExtClient) error {
+
+	peerUpdate, err := logic.GetPeerUpdateForHost(ctx, "", host, deletedNode, deletedClient)
+	if err != nil {
+		return err
+	}
+	if len(peerUpdate.Peers) == 0 { // no peers to send
+		return nil
+	}
+	if host.ProxyEnabled {
+		proxyUpdate, err := logic.GetProxyUpdateForHost(ctx, host)
+		if err != nil {
+			return err
+		}
+		proxyUpdate.Action = models.ProxyUpdate
+		peerUpdate.ProxyUpdate = proxyUpdate
+	}
+
+	data, err := json.Marshal(&peerUpdate)
+	if err != nil {
+		return err
+	}
+	return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
+}