Explorar el Código

remove Broadcast peer updates

Matthew R Kasun hace 2 años
padre
commit
7f33f60ef3
Se han modificado 6 ficheros con 127 adiciones y 161 borrados
  1. 11 1
      auth/host_session.go
  2. 41 4
      controllers/ext_client.go
  3. 14 2
      controllers/hosts.go
  4. 51 7
      controllers/node.go
  5. 9 2
      mq/handlers.go
  6. 1 145
      mq/publishers.go

+ 11 - 1
auth/host_session.go

@@ -15,6 +15,7 @@ import (
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/mq"
 	"github.com/gravitl/netmaker/servercfg"
+	"golang.org/x/exp/slog"
 )
 
 // SessionHandler - called by the HTTP router when user
@@ -237,7 +238,16 @@ func CheckNetRegAndHostUpdate(networks []string, h *models.Host) {
 				Host:   *h,
 				Node:   *newNode,
 			})
-			mq.BroadcastAddOrUpdatePeer(h, newNode, false)
+			peers, err := logic.GetNetworkClients(newNode.Network)
+			if err != nil {
+				slog.Warn("error getting network clients: ", "error", err)
+			}
+			for _, client := range peers {
+				update := models.PeerAction{
+					Peers: logic.GetPeerUpdate(&client.Host),
+				}
+				mq.PubPeerUpdateToHost(&client.Host, update)
+			}
 		}
 	}
 	if servercfg.IsMessageQueueBackend() {

+ 41 - 4
controllers/ext_client.go

@@ -18,6 +18,7 @@ import (
 	"github.com/gravitl/netmaker/models/promodels"
 	"github.com/gravitl/netmaker/mq"
 	"github.com/skip2/go-qrcode"
+	"golang.org/x/exp/slog"
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
@@ -397,7 +398,16 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
 	logger.Log(0, r.Header.Get("user"), "created new ext client on network", networkName)
 	w.WriteHeader(http.StatusOK)
 	go func() {
-		mq.BroadcastExtClient(host, &node)
+		peers, err := logic.GetNetworkClients(node.Network)
+		if err != nil {
+			slog.Warn("error getting network clients: ", "error", err)
+		}
+		for _, client := range peers {
+			update := models.PeerAction{
+				Peers: logic.GetPeerUpdate(&client.Host),
+			}
+			mq.PubPeerUpdateToHost(&client.Host, update)
+		}
 		f, err := logic.GetFwUpdate(host)
 		if err == nil {
 			mq.PublishFwUpdate(host, &f)
@@ -502,11 +512,29 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 		if ingressNode, err := logic.GetNodeByID(newclient.IngressGatewayID); err == nil {
 			if ingressHost, err := logic.GetHost(ingressNode.HostID.String()); err == nil {
 				if replaceOldClient || !update.Enabled {
-					mq.BroadcastDelExtClient(ingressHost, &ingressNode, []models.ExtClient{currentClient})
+					peers, err := logic.GetNetworkClients(ingressNode.Network)
+					if err != nil {
+						slog.Warn("error getting network clients: ", "error", err)
+					}
+					for _, client := range peers {
+						update := models.PeerAction{
+							Peers: logic.GetPeerUpdate(&client.Host),
+						}
+						mq.PubPeerUpdateToHost(&client.Host, update)
+					}
 				}
 				if replaceOldClient || changedEnabled {
 					// broadcast update
-					mq.BroadcastExtClient(ingressHost, &ingressNode)
+					peers, err := logic.GetNetworkClients(ingressNode.Network)
+					if err != nil {
+						slog.Warn("error getting network clients: ", "error", err)
+					}
+					for _, client := range peers {
+						update := models.PeerAction{
+							Peers: logic.GetPeerUpdate(&client.Host),
+						}
+						mq.PubPeerUpdateToHost(&client.Host, update)
+					}
 				}
 				f, err := logic.GetFwUpdate(ingressHost)
 				if err == nil {
@@ -588,7 +616,16 @@ func deleteExtClient(w http.ResponseWriter, r *http.Request) {
 	go func() {
 		ingressHost, err := logic.GetHost(ingressnode.HostID.String())
 		if err == nil {
-			mq.BroadcastDelExtClient(ingressHost, &ingressnode, []models.ExtClient{extclient})
+			peers, err := logic.GetNetworkClients(ingressnode.Network)
+			if err != nil {
+				slog.Warn("error getting network clients: ", "error", err)
+			}
+			for _, client := range peers {
+				update := models.PeerAction{
+					Peers: logic.GetPeerUpdate(&client.Host),
+				}
+				mq.PubPeerUpdateToHost(&client.Host, update)
+			}
 			f, err := logic.GetFwUpdate(ingressHost)
 			if err == nil {
 				mq.PublishFwUpdate(ingressHost, &f)

+ 14 - 2
controllers/hosts.go

@@ -14,6 +14,7 @@ import (
 	"github.com/gravitl/netmaker/mq"
 	"github.com/gravitl/netmaker/servercfg"
 	"golang.org/x/crypto/bcrypt"
+	"golang.org/x/exp/slog"
 )
 
 func hostHandlers(r *mux.Router) {
@@ -295,7 +296,9 @@ func addHostToNetwork(w http.ResponseWriter, r *http.Request) {
 			Action: models.RequestAck,
 			Host:   *currHost,
 		})
-		go mq.BroadcastAddOrUpdatePeer(currHost, newNode, false)
+		go func() {
+
+		}()
 	}
 
 	logger.Log(2, r.Header.Get("user"), fmt.Sprintf("added host %s to network %s", currHost.Name, network))
@@ -347,7 +350,16 @@ func deleteHostFromNetwork(w http.ResponseWriter, r *http.Request) {
 
 	runUpdates(node, false)
 	go func() { // notify of peer change
-		mq.BroadcastDelPeer(currHost, network)
+		peers, err := logic.GetNetworkClients(node.Network)
+		if err != nil {
+			slog.Warn("error getting network clients: ", "error", err)
+		}
+		for _, client := range peers {
+			update := models.PeerAction{
+				Peers: logic.GetPeerUpdate(&client.Host),
+			}
+			mq.PubPeerUpdateToHost(&client.Host, update)
+		}
 		if err := mq.PublishDNSDelete(node, currHost); err != nil {
 			logger.Log(1, "error publishing dns update", err.Error())
 		}

+ 51 - 7
controllers/node.go

@@ -17,6 +17,7 @@ import (
 	"github.com/gravitl/netmaker/mq"
 	"github.com/gravitl/netmaker/servercfg"
 	"golang.org/x/crypto/bcrypt"
+	"golang.org/x/exp/slog"
 )
 
 var hostIDHeader = "host-id"
@@ -464,7 +465,16 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) {
 			logger.Log(0, "failed to get egress host: ", err.Error())
 			return
 		}
-		mq.BroadcastAddOrUpdatePeer(host, &node, true)
+		peers, err := logic.GetNetworkClients(node.Network)
+		if err != nil {
+			slog.Warn("error getting network clients: ", "error", err)
+		}
+		for _, client := range peers {
+			update := models.PeerAction{
+				Peers: logic.GetPeerUpdate(&client.Host),
+			}
+			mq.PubPeerUpdateToHost(&client.Host, update)
+		}
 		f, err := logic.GetFwUpdate(host)
 		if err != nil {
 			logger.Log(0, "failed to get egreess host: ", err.Error())
@@ -507,13 +517,21 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
 	go func() {
-
 		host, err := logic.GetHost(node.HostID.String())
 		if err != nil {
 			logger.Log(0, "failed to get egress host: ", err.Error())
 			return
 		}
-		mq.BroadcastAddOrUpdatePeer(host, &node, true)
+		peers, err := logic.GetNetworkClients(node.Network)
+		if err != nil {
+			slog.Warn("error getting network clients: ", "error", err)
+		}
+		for _, client := range peers {
+			update := models.PeerAction{
+				Peers: logic.GetPeerUpdate(&client.Host),
+			}
+			mq.PubPeerUpdateToHost(&client.Host, update)
+		}
 		f, err := logic.GetFwUpdate(host)
 		if err != nil {
 			logger.Log(0, "failed to get egreess host: ", err.Error())
@@ -608,7 +626,16 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 	if len(removedClients) > 0 {
 		host, err := logic.GetHost(node.HostID.String())
 		if err == nil {
-			mq.BroadcastDelExtClient(host, &node, removedClients)
+			peers, err := logic.GetNetworkClients(node.Network)
+			if err != nil {
+				slog.Warn("error getting network clients: ", "error", err)
+			}
+			for _, client := range peers {
+				update := models.PeerAction{
+					Peers: logic.GetPeerUpdate(&client.Host),
+				}
+				mq.PubPeerUpdateToHost(&client.Host, update)
+			}
 			f, err := logic.GetFwUpdate(host)
 			if err == nil {
 				mq.PublishFwUpdate(host, &f)
@@ -706,7 +733,16 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 	json.NewEncoder(w).Encode(apiNode)
 	runUpdates(newNode, ifaceDelta)
 	go func(aclUpdate bool, newNode *models.Node) {
-		mq.BroadcastAddOrUpdatePeer(host, newNode, true)
+		peers, err := logic.GetNetworkClients(newNode.Network)
+		if err != nil {
+			slog.Warn("error getting network clients: ", "error", err)
+		}
+		for _, client := range peers {
+			update := models.PeerAction{
+				Peers: logic.GetPeerUpdate(&client.Host),
+			}
+			mq.PubPeerUpdateToHost(&client.Host, update)
+		}
 		if err := mq.PublishReplaceDNS(&currentNode, newNode, host); err != nil {
 			logger.Log(1, "failed to publish dns update", err.Error())
 		}
@@ -766,8 +802,16 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 			logger.Log(1, "failed to retrieve host for node", node.ID.String(), err.Error())
 			return
 		}
-
-		err = mq.BroadcastDelPeer(host, deletedNode.Network)
+		peers, err := logic.GetNetworkClients(node.Network)
+		if err != nil {
+			slog.Warn("error getting network clients: ", "error", err)
+		}
+		for _, client := range peers {
+			update := models.PeerAction{
+				Peers: logic.GetPeerUpdate(&client.Host),
+			}
+			mq.PubPeerUpdateToHost(&client.Host, update)
+		}
 		if err != nil {
 			logger.Log(1, "error publishing peer update ", err.Error())
 		}

+ 9 - 2
mq/handlers.go

@@ -64,8 +64,15 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 		if err != nil {
 			return
 		}
-		if err = BroadcastAddOrUpdatePeer(h, &newNode, true); err != nil {
-			logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error())
+		peers, err := logic.GetNetworkClients(newNode.Network)
+		if err != nil {
+			slog.Warn("error getting network clients: ", "error", err)
+		}
+		for _, client := range peers {
+			update := models.PeerAction{
+				Peers: logic.GetPeerUpdate(&client.Host),
+			}
+			PubPeerUpdateToHost(&client.Host, update)
 		}
 		if nodes, err := logic.GetNetworkNodes(newNode.Network); err == nil {
 			FlushNetworkPeersToHost(h, &newNode, nodes)

+ 1 - 145
mq/publishers.go

@@ -187,46 +187,6 @@ func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node, networkNodes
 	return nil
 }
 
-// BroadcastDelPeer - notifys all the hosts in the network to remove peer
-func BroadcastDelPeer(host *models.Host, network string) error {
-	nodes, err := logic.GetNetworkNodes(network)
-	if err != nil {
-		return err
-	}
-	p := models.PeerAction{
-		Action: models.RemovePeer,
-		Peers: []wgtypes.PeerConfig{
-			{
-				PublicKey: host.PublicKey,
-				Remove:    true,
-			},
-		},
-	}
-	data, err := json.Marshal(p)
-	if err != nil {
-		return err
-	}
-	for _, nodeI := range nodes {
-		if nodeI.HostID == host.ID {
-			// skip self...
-			continue
-		}
-		peerHost, err := logic.GetHost(nodeI.HostID.String())
-		if err == nil {
-			publish(peerHost, fmt.Sprintf("peer/host/%s/%s", peerHost.ID.String(), servercfg.GetServer()), data)
-			if nodeI.IsIngressGateway || nodeI.IsEgressGateway {
-				go func(peerHost models.Host) {
-					f, err := logic.GetFwUpdate(&peerHost)
-					if err == nil {
-						PublishFwUpdate(&peerHost, &f)
-					}
-				}(*peerHost)
-			}
-		}
-	}
-	return nil
-}
-
 // BroadcastAclUpdate - sends new acl updates to peers
 func BroadcastAclUpdate(network string) error {
 	nodes, err := logic.GetNetworkNodes(network)
@@ -243,111 +203,6 @@ func BroadcastAclUpdate(network string) error {
 	return err
 }
 
-// BroadcastAddOrUpdatePeer - notifys the hosts in the network to add or update peer.
-func BroadcastAddOrUpdatePeer(host *models.Host, node *models.Node, update bool) error {
-	nodes, err := logic.GetNetworkNodes(node.Network)
-	if err != nil {
-		return err
-	}
-
-	p := models.PeerAction{
-		Action: models.AddPeer,
-		Peers: []wgtypes.PeerConfig{
-			{
-				PublicKey: host.PublicKey,
-				Endpoint: &net.UDPAddr{
-					IP:   host.EndpointIP,
-					Port: logic.GetPeerListenPort(host),
-				},
-				PersistentKeepaliveInterval: &node.PersistentKeepalive,
-				ReplaceAllowedIPs:           true,
-			},
-		},
-	}
-	if update {
-		p.Action = models.UpdatePeer
-	}
-	for _, nodeI := range nodes {
-		if nodeI.ID.String() == node.ID.String() {
-			// skip self...
-			continue
-		}
-		// update allowed ips, according to the peer node
-		p.Peers[0].AllowedIPs = logic.GetAllowedIPs(&nodeI, node, nil)
-		if update && (!nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(nodeI.ID.String())) ||
-			node.Action == models.NODE_DELETE || node.PendingDelete || !node.Connected) {
-			// remove peer
-			p.Action = models.RemovePeer
-			p.Peers[0].Remove = true
-		}
-		data, err := json.Marshal(p)
-		if err != nil {
-			continue
-		}
-		peerHost, err := logic.GetHost(nodeI.HostID.String())
-		if err == nil {
-			publish(peerHost, fmt.Sprintf("peer/host/%s/%s", peerHost.ID.String(), servercfg.GetServer()), data)
-		}
-		if nodeI.IsIngressGateway || nodeI.IsEgressGateway {
-			go func(peerHost models.Host) {
-				f, err := logic.GetFwUpdate(&peerHost)
-				if err == nil {
-					PublishFwUpdate(&peerHost, &f)
-				}
-			}(*peerHost)
-
-		}
-
-	}
-	return nil
-}
-
-// BroadcastExtClient - publishes msg to add/updates ext client in the network
-func BroadcastExtClient(ingressHost *models.Host, ingressNode *models.Node) error {
-
-	nodes, err := logic.GetNetworkNodes(ingressNode.Network)
-	if err != nil {
-		return err
-	}
-	//flush peers to ingress host
-	go FlushNetworkPeersToHost(ingressHost, ingressNode, nodes)
-	// broadcast to update ingress peer to other hosts
-	go BroadcastAddOrUpdatePeer(ingressHost, ingressNode, true)
-	return nil
-}
-
-// BroadcastDelExtClient - published msg to remove ext client from network
-func BroadcastDelExtClient(ingressHost *models.Host, ingressNode *models.Node, extclients []models.ExtClient) error {
-	// TODO - send fw update
-	go BroadcastAddOrUpdatePeer(ingressHost, ingressNode, true)
-	peers := []wgtypes.PeerConfig{}
-	for _, extclient := range extclients {
-		extPubKey, err := wgtypes.ParseKey(extclient.PublicKey)
-		if err != nil {
-			continue
-		}
-		peers = append(peers, wgtypes.PeerConfig{
-			PublicKey: extPubKey,
-			Remove:    true,
-		})
-
-	}
-	p := models.PeerAction{
-		Action: models.RemovePeer,
-		Peers:  peers,
-	}
-
-	data, err := json.Marshal(p)
-	if err != nil {
-		return err
-	}
-	err = publish(ingressHost, fmt.Sprintf("peer/host/%s/%s", ingressHost.ID.String(), servercfg.GetServer()), data)
-	if err != nil {
-		return err
-	}
-	return nil
-}
-
 // NodeUpdate -- publishes a node update
 func NodeUpdate(node *models.Node) error {
 	host, err := logic.GetHost(node.HostID.String())
@@ -721,4 +576,5 @@ func PubPeerUpdateToHost(host *models.Host, update models.PeerAction) {
 		slog.Error("error publishing peer update to host", "host", host.Name, "err", err)
 		return
 	}
+	slog.Info("published peer update to host", "host", host.Name)
 }