Explorar o código

broadcast acl update

Abhishek Kondur %!s(int64=2) %!d(string=hai) anos
pai
achega
00aae60333
Modificáronse 4 ficheiros con 65 adicións e 22 borrados
  1. 5 3
      controllers/network.go
  2. 1 4
      controllers/node.go
  3. 5 1
      mq/handlers.go
  4. 54 14
      mq/publishers.go

+ 5 - 3
controllers/network.go

@@ -144,9 +144,11 @@ func updateNetworkACL(w http.ResponseWriter, r *http.Request) {
 
 	// send peer updates
 	if servercfg.IsMessageQueueBackend() {
-		if err = mq.PublishPeerUpdate(); err != nil {
-			logger.Log(0, "failed to publish peer update after ACL update on", netname)
-		}
+		// if err = mq.PublishPeerUpdate(); err != nil {
+		// 	logger.Log(0, "failed to publish peer update after ACL update on", netname)
+		// }
+		mq.BroadCastAclUpdate(netname)
+
 	}
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(newNetACL)

+ 1 - 4
controllers/node.go

@@ -694,10 +694,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 	json.NewEncoder(w).Encode(apiNode)
 	runUpdates(newNode, ifaceDelta)
 	go func(aclUpdate bool, newNode *models.Node) {
-		if aclUpdate {
-			mq.BroadCastAddOrUpdatePeer(host, newNode, true)
-		}
-
+		mq.BroadCastAddOrUpdatePeer(host, newNode, true)
 		if err := mq.PublishReplaceDNS(&currentNode, newNode, host); err != nil {
 			logger.Log(1, "failed to publish dns update", err.Error())
 		}

+ 5 - 1
mq/handlers.go

@@ -108,7 +108,11 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 					}
 				}
 				// flush peers to host
-				err = FlushNetworkPeersToHost(&hu.Host, &hu.Node)
+				nodes, err := logic.GetNetworkNodes(hu.Node.Network)
+				if err != nil {
+					return
+				}
+				err = FlushNetworkPeersToHost(&hu.Host, &hu.Node, nodes)
 				if err != nil {
 					logger.Log(0, "failed to flush peers to host: ", err.Error())
 				}

+ 54 - 14
mq/publishers.go

@@ -10,6 +10,7 @@ import (
 
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/logic/acls/nodeacls"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/servercfg"
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
@@ -110,17 +111,18 @@ func PublishSingleHostPeerUpdate(ctx context.Context, host *models.Host, deleted
 	return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
 }
 
-func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node) error {
+// FlushNetworkPeersToHost - sends all the peers in the network to the host.
+func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node, networkNodes []models.Node) error {
 	logger.Log(0, "flushing network peers to host: ", host.ID.String(), hNode.Network)
-	nodes, err := logic.GetNetworkNodes(hNode.Network)
-	if err != nil {
-		return err
-	}
-	p := models.PeerAction{
+	addPeerAction := models.PeerAction{
 		Action: models.AddPeer,
 		Peers:  []wgtypes.PeerConfig{},
 	}
-	for _, nodeI := range nodes {
+	rmPeerAction := models.PeerAction{
+		Action: models.RemovePeer,
+		Peers:  []wgtypes.PeerConfig{},
+	}
+	for _, nodeI := range networkNodes {
 		if nodeI.ID == hNode.ID {
 			// skip self
 			continue
@@ -129,7 +131,17 @@ func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node) error {
 		if err != nil {
 			continue
 		}
-		p.Peers = append(p.Peers, wgtypes.PeerConfig{
+
+		if !nodeacls.AreNodesAllowed(nodeacls.NetworkID(nodeI.Network), nodeacls.NodeID(nodeI.ID.String()), nodeacls.NodeID(hNode.ID.String())) {
+			// remove peer if not allowed
+			rmPeerAction.Peers = append(rmPeerAction.Peers, wgtypes.PeerConfig{
+				PublicKey: peerHost.PublicKey,
+				Remove:    true,
+			})
+			continue
+
+		}
+		peerCfg := wgtypes.PeerConfig{
 			PublicKey: peerHost.PublicKey,
 			Endpoint: &net.UDPAddr{
 				IP:   peerHost.EndpointIP,
@@ -138,14 +150,23 @@ func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node) error {
 			PersistentKeepaliveInterval: &nodeI.PersistentKeepalive,
 			ReplaceAllowedIPs:           true,
 			AllowedIPs:                  logic.GetAllowedIPs(hNode, &nodeI, nil),
-		})
-
+		}
+		addPeerAction.Peers = append(addPeerAction.Peers, peerCfg)
 	}
-	data, err := json.Marshal(p)
-	if err != nil {
-		return err
+	if len(rmPeerAction.Peers) > 0 {
+		data, err := json.Marshal(rmPeerAction)
+		if err != nil {
+			return err
+		}
+		publish(host, fmt.Sprintf("peer/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
+	}
+	if len(addPeerAction.Peers) > 0 {
+		data, err := json.Marshal(addPeerAction)
+		if err != nil {
+			return err
+		}
+		publish(host, fmt.Sprintf("peer/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
 	}
-	publish(host, fmt.Sprintf("peer/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
 	return nil
 }
 
@@ -181,6 +202,21 @@ func BroadCastDelPeer(host *models.Host, network string) error {
 	return nil
 }
 
+func BroadCastAclUpdate(network string) error {
+	nodes, err := logic.GetNetworkNodes(network)
+	if err != nil {
+		return err
+	}
+	for _, nodeI := range nodes {
+		nodeI := nodeI
+		h, err := logic.GetHost(nodeI.HostID.String())
+		if err == nil {
+			go FlushNetworkPeersToHost(h, &nodeI, nodes)
+		}
+	}
+	return err
+}
+
 // BroadCastAddOrUpdatePeer - notifys the hosts in the network to add or update peer.
 func BroadCastAddOrUpdatePeer(host *models.Host, node *models.Node, update bool) error {
 	// TODO: ACLs
@@ -213,6 +249,10 @@ func BroadCastAddOrUpdatePeer(host *models.Host, node *models.Node, update bool)
 		}
 		// update allowed ips, according to the peer node
 		p.Peers[0].AllowedIPs = logic.GetAllowedIPs(&nodeI, node, nil)
+		if update && !nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(nodeI.ID.String())) {
+			// remove peer if not allowed
+			p.Action = models.RemovePeer
+		}
 		data, err := json.Marshal(p)
 		if err != nil {
 			continue