Browse Source

NET-10: Refactor Peer Updates Phase-1 (#2303)

* single peer broadcast actions

* peer singleton update action

* broadcast update peer

* add func comments

* flush peers to host once joined the network

* send network peer list on host joining network

* flush all peers to host on joining a network

* broadcast acl update

* check node acls,action,connection status

* async add/rm network updates

* add comments to exported items

* pr comments

* pr comments

* pr comments
Abhishek K 2 years ago
parent
commit
4d7e7a4d2d
8 changed files with 214 additions and 31 deletions
  1. 4 3
      auth/host_session.go
  2. 2 3
      controllers/hosts.go
  3. 2 3
      controllers/network.go
  4. 7 13
      controllers/node.go
  5. 5 6
      logic/peers.go
  6. 18 0
      models/mqtt.go
  7. 16 3
      mq/handlers.go
  8. 160 0
      mq/publishers.go

+ 4 - 3
auth/host_session.go

@@ -237,6 +237,9 @@ func CheckNetRegAndHostUpdate(networks []string, h *models.Host) {
 				Host:   *h,
 				Host:   *h,
 				Node:   *newNode,
 				Node:   *newNode,
 			})
 			})
+			if servercfg.IsMessageQueueBackend() {
+				mq.BroadcastAddOrUpdatePeer(h, newNode, false)
+			}
 		}
 		}
 	}
 	}
 	if servercfg.IsMessageQueueBackend() {
 	if servercfg.IsMessageQueueBackend() {
@@ -244,9 +247,7 @@ func CheckNetRegAndHostUpdate(networks []string, h *models.Host) {
 			Action: models.RequestAck,
 			Action: models.RequestAck,
 			Host:   *h,
 			Host:   *h,
 		})
 		})
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(0, "failed to publish peer update during registration -", err.Error())
-		}
+
 	}
 	}
 }
 }
 
 

+ 2 - 3
controllers/hosts.go

@@ -303,6 +303,7 @@ func addHostToNetwork(w http.ResponseWriter, r *http.Request) {
 			Action: models.RequestAck,
 			Action: models.RequestAck,
 			Host:   *currHost,
 			Host:   *currHost,
 		})
 		})
+		go mq.BroadcastAddOrUpdatePeer(currHost, newNode, false)
 	}
 	}
 
 
 	logger.Log(2, r.Header.Get("user"), fmt.Sprintf("added host %s to network %s", currHost.Name, network))
 	logger.Log(2, r.Header.Get("user"), fmt.Sprintf("added host %s to network %s", currHost.Name, network))
@@ -354,9 +355,7 @@ func deleteHostFromNetwork(w http.ResponseWriter, r *http.Request) {
 
 
 	runUpdates(node, false)
 	runUpdates(node, false)
 	go func() { // notify of peer change
 	go func() { // notify of peer change
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(1, "error publishing peer update ", err.Error())
-		}
+		mq.BroadcastDelPeer(currHost, network)
 		if err := mq.PublishDNSDelete(node, currHost); err != nil {
 		if err := mq.PublishDNSDelete(node, currHost); err != nil {
 			logger.Log(1, "error publishing dns update", err.Error())
 			logger.Log(1, "error publishing dns update", err.Error())
 		}
 		}

+ 2 - 3
controllers/network.go

@@ -144,9 +144,8 @@ func updateNetworkACL(w http.ResponseWriter, r *http.Request) {
 
 
 	// send peer updates
 	// send peer updates
 	if servercfg.IsMessageQueueBackend() {
 	if servercfg.IsMessageQueueBackend() {
-		if err = mq.PublishPeerUpdate(); err != nil {
-			logger.Log(0, "failed to publish peer update after ACL update on", netname)
-		}
+		go mq.BroadcastAclUpdate(netname)
+
 	}
 	}
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(newNetACL)
 	json.NewEncoder(w).Encode(newNetACL)

+ 7 - 13
controllers/node.go

@@ -691,11 +691,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 	json.NewEncoder(w).Encode(apiNode)
 	json.NewEncoder(w).Encode(apiNode)
 	runUpdates(newNode, ifaceDelta)
 	runUpdates(newNode, ifaceDelta)
 	go func(aclUpdate bool, newNode *models.Node) {
 	go func(aclUpdate bool, newNode *models.Node) {
-		if aclUpdate {
-			if err := mq.PublishPeerUpdate(); err != nil {
-				logger.Log(0, "error during node ACL update for node", newNode.ID.String())
-			}
-		}
+		mq.BroadcastAddOrUpdatePeer(host, newNode, true)
 		if err := mq.PublishReplaceDNS(&currentNode, newNode, host); err != nil {
 		if err := mq.PublishReplaceDNS(&currentNode, newNode, host); err != nil {
 			logger.Log(1, "failed to publish dns update", err.Error())
 			logger.Log(1, "failed to publish dns update", err.Error())
 		}
 		}
@@ -750,19 +746,17 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 	}
 	}
 	go func(deletedNode *models.Node, fromNode bool) { // notify of peer change
 	go func(deletedNode *models.Node, fromNode bool) { // notify of peer change
 		var err error
 		var err error
-		if fromNode {
-			err = mq.PublishDeletedNodePeerUpdate(deletedNode)
-		} else {
-			err = mq.PublishPeerUpdate()
-		}
+		host, err := logic.GetHost(node.HostID.String())
 		if err != nil {
 		if err != nil {
-			logger.Log(1, "error publishing peer update ", err.Error())
+			logger.Log(1, "failed to retrieve host for node", node.ID.String(), err.Error())
+			return
 		}
 		}
 
 
-		host, err := logic.GetHost(node.HostID.String())
+		err = mq.BroadcastDelPeer(host, deletedNode.Network)
 		if err != nil {
 		if err != nil {
-			logger.Log(1, "failed to retrieve host for node", node.ID.String(), err.Error())
+			logger.Log(1, "error publishing peer update ", err.Error())
 		}
 		}
+
 		if err := mq.PublishDNSDelete(&node, host); err != nil {
 		if err := mq.PublishDNSDelete(&node, host); err != nil {
 			logger.Log(1, "error publishing dns update", err.Error())
 			logger.Log(1, "error publishing dns update", err.Error())
 		}
 		}

+ 5 - 6
logic/peers.go

@@ -428,12 +428,11 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
 // GetPeerListenPort - given a host, retrieve it's appropriate listening port
 // GetPeerListenPort - given a host, retrieve it's appropriate listening port
 func GetPeerListenPort(host *models.Host) int {
 func GetPeerListenPort(host *models.Host) int {
 	peerPort := host.ListenPort
 	peerPort := host.ListenPort
-	if host.ProxyEnabled {
-		if host.PublicListenPort != 0 {
-			peerPort = host.PublicListenPort
-		} else if host.ProxyListenPort != 0 {
-			peerPort = host.ProxyListenPort
-		}
+	if host.ProxyEnabled && host.ProxyListenPort != 0 {
+		peerPort = host.ProxyListenPort
+	}
+	if host.PublicListenPort != 0 {
+		peerPort = host.PublicListenPort
 	}
 	}
 	return peerPort
 	return peerPort
 }
 }

+ 18 - 0
models/mqtt.go

@@ -60,3 +60,21 @@ type KeyUpdate struct {
 	Network   string `json:"network" bson:"network"`
 	Network   string `json:"network" bson:"network"`
 	Interface string `json:"interface" bson:"interface"`
 	Interface string `json:"interface" bson:"interface"`
 }
 }
+
+// PeerMqActionType - peer update action type
+type PeerMqActionType string
+
+const (
+	// AddPeer - peer mq action type for adding peers
+	AddPeer PeerMqActionType = "ADD_PEER"
+	// UpdatePeer - peer mq action type for updating peers
+	UpdatePeer PeerMqActionType = "UPDATE_PEER"
+	// RemovePeer - peer mq action type for removing peers
+	RemovePeer PeerMqActionType = "REMOVE_PEER"
+)
+
+// PeerAction - struct for mq peer actions
+type PeerAction struct {
+	Action PeerMqActionType     `json:"action"`
+	Peers  []wgtypes.PeerConfig `json:"peers"`
+}

+ 16 - 3
mq/handlers.go

@@ -59,9 +59,17 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 		return
 		return
 	}
 	}
 	if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes
 	if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes
-		if err = PublishPeerUpdate(); err != nil {
+		h, err := logic.GetHost(newNode.HostID.String())
+		if err != nil {
+			return
+		}
+		if err = BroadcastAddOrUpdatePeer(h, &newNode, true); err != nil {
 			logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error())
 			logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error())
 		}
 		}
+		if nodes, err := logic.GetNetworkNodes(newNode.Network); err == nil {
+			FlushNetworkPeersToHost(h, &newNode, nodes)
+		}
+
 	}
 	}
 
 
 	logger.Log(1, "updated node", id, newNode.ID.String())
 	logger.Log(1, "updated node", id, newNode.ID.String())
@@ -107,10 +115,15 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 						return
 						return
 					}
 					}
 				}
 				}
-				if err = PublishSingleHostPeerUpdate(context.Background(), currentHost, nil, nil); err != nil {
-					logger.Log(0, "failed peers publish after join acknowledged", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
+				// flush peers to host
+				nodes, err := logic.GetNetworkNodes(hu.Node.Network)
+				if err != nil {
 					return
 					return
 				}
 				}
+				err = FlushNetworkPeersToHost(&hu.Host, &hu.Node, nodes)
+				if err != nil {
+					logger.Log(0, "failed to flush peers to host: ", err.Error())
+				}
 				if err = handleNewNodeDNS(&hu.Host, &hu.Node); err != nil {
 				if err = handleNewNodeDNS(&hu.Host, &hu.Node); err != nil {
 					logger.Log(0, "failed to send dns update after node,", hu.Node.ID.String(), ", added to host", hu.Host.Name, err.Error())
 					logger.Log(0, "failed to send dns update after node,", hu.Node.ID.String(), ", added to host", hu.Host.Name, err.Error())
 					return
 					return

+ 160 - 0
mq/publishers.go

@@ -5,12 +5,15 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
+	"net"
 	"time"
 	"time"
 
 
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/logic/acls/nodeacls"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/servercfg"
 	"github.com/gravitl/netmaker/servercfg"
+	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 )
 
 
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
@@ -108,6 +111,163 @@ func PublishSingleHostPeerUpdate(ctx context.Context, host *models.Host, deleted
 	return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
 	return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
 }
 }
 
 
+// FlushNetworkPeersToHost - sends all the peers in the network to the host.
+func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node, networkNodes []models.Node) error {
+	logger.Log(0, "flushing network peers to host: ", host.ID.String(), hNode.Network)
+	addPeerAction := models.PeerAction{
+		Action: models.AddPeer,
+		Peers:  []wgtypes.PeerConfig{},
+	}
+	rmPeerAction := models.PeerAction{
+		Action: models.RemovePeer,
+		Peers:  []wgtypes.PeerConfig{},
+	}
+	for _, node := range networkNodes {
+		if node.ID == hNode.ID {
+			// skip self
+			continue
+		}
+		peerHost, err := logic.GetHost(node.HostID.String())
+		if err != nil {
+			continue
+		}
+
+		if !nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(hNode.ID.String()), nodeacls.NodeID(node.ID.String())) ||
+			hNode.Action == models.NODE_DELETE || hNode.PendingDelete || !hNode.Connected {
+			// remove peer if not allowed
+			rmPeerAction.Peers = append(rmPeerAction.Peers, wgtypes.PeerConfig{
+				PublicKey: peerHost.PublicKey,
+				Remove:    true,
+			})
+			continue
+
+		}
+		peerCfg := wgtypes.PeerConfig{
+			PublicKey: peerHost.PublicKey,
+			Endpoint: &net.UDPAddr{
+				IP:   peerHost.EndpointIP,
+				Port: logic.GetPeerListenPort(peerHost),
+			},
+			PersistentKeepaliveInterval: &node.PersistentKeepalive,
+			ReplaceAllowedIPs:           true,
+			AllowedIPs:                  logic.GetAllowedIPs(hNode, &node, nil),
+		}
+		addPeerAction.Peers = append(addPeerAction.Peers, peerCfg)
+	}
+	if len(rmPeerAction.Peers) > 0 {
+		data, err := json.Marshal(rmPeerAction)
+		if err != nil {
+			return err
+		}
+		publish(host, fmt.Sprintf("peer/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
+	}
+	if len(addPeerAction.Peers) > 0 {
+		data, err := json.Marshal(addPeerAction)
+		if err != nil {
+			return err
+		}
+		publish(host, fmt.Sprintf("peer/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
+	}
+	return nil
+}
+
+// BroadcastDelPeer - notifys all the hosts in the network to remove peer
+func BroadcastDelPeer(host *models.Host, network string) error {
+	nodes, err := logic.GetNetworkNodes(network)
+	if err != nil {
+		return err
+	}
+	p := models.PeerAction{
+		Action: models.RemovePeer,
+		Peers: []wgtypes.PeerConfig{
+			{
+				PublicKey: host.PublicKey,
+				Remove:    true,
+			},
+		},
+	}
+	data, err := json.Marshal(p)
+	if err != nil {
+		return err
+	}
+	for _, nodeI := range nodes {
+		if nodeI.HostID == host.ID {
+			// skip self...
+			continue
+		}
+		peerHost, err := logic.GetHost(nodeI.HostID.String())
+		if err == nil {
+			publish(peerHost, fmt.Sprintf("peer/host/%s/%s", peerHost.ID.String(), servercfg.GetServer()), data)
+		}
+	}
+	return nil
+}
+
+// BroadcastAclUpdate - sends new acl updates to peers
+func BroadcastAclUpdate(network string) error {
+	nodes, err := logic.GetNetworkNodes(network)
+	if err != nil {
+		return err
+	}
+	for _, nodeI := range nodes {
+		nodeI := nodeI
+		h, err := logic.GetHost(nodeI.HostID.String())
+		if err == nil {
+			go FlushNetworkPeersToHost(h, &nodeI, nodes)
+		}
+	}
+	return err
+}
+
+// BroadcastAddOrUpdatePeer - notifys the hosts in the network to add or update peer.
+func BroadcastAddOrUpdatePeer(host *models.Host, node *models.Node, update bool) error {
+	nodes, err := logic.GetNetworkNodes(node.Network)
+	if err != nil {
+		return err
+	}
+
+	p := models.PeerAction{
+		Action: models.AddPeer,
+		Peers: []wgtypes.PeerConfig{
+			{
+				PublicKey: host.PublicKey,
+				Endpoint: &net.UDPAddr{
+					IP:   host.EndpointIP,
+					Port: logic.GetPeerListenPort(host),
+				},
+				PersistentKeepaliveInterval: &node.PersistentKeepalive,
+				ReplaceAllowedIPs:           true,
+			},
+		},
+	}
+	if update {
+		p.Action = models.UpdatePeer
+	}
+	for _, nodeI := range nodes {
+		if nodeI.ID.String() == node.ID.String() {
+			// skip self...
+			continue
+		}
+		// update allowed ips, according to the peer node
+		p.Peers[0].AllowedIPs = logic.GetAllowedIPs(&nodeI, node, nil)
+		if update && (!nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(nodeI.ID.String())) ||
+			node.Action == models.NODE_DELETE || node.PendingDelete || !node.Connected) {
+			// remove peer
+			p.Action = models.RemovePeer
+			p.Peers[0].Remove = true
+		}
+		data, err := json.Marshal(p)
+		if err != nil {
+			continue
+		}
+		peerHost, err := logic.GetHost(nodeI.HostID.String())
+		if err == nil {
+			publish(peerHost, fmt.Sprintf("peer/host/%s/%s", peerHost.ID.String(), servercfg.GetServer()), data)
+		}
+	}
+	return nil
+}
+
 // NodeUpdate -- publishes a node update
 // NodeUpdate -- publishes a node update
 func NodeUpdate(node *models.Node) error {
 func NodeUpdate(node *models.Node) error {
 	host, err := logic.GetHost(node.HostID.String())
 	host, err := logic.GetHost(node.HostID.String())