Browse Source

Merge pull request #1869 from gravitl/GRA-824-getPeerUpdate

refactor of peerUpdate
Matthew R Kasun 2 years ago
parent
commit
645c2b8e8f
9 changed files with 142 additions and 39 deletions
  1. 10 8
      controllers/node.go
  2. 1 1
      logic/gateway.go
  3. 5 2
      logic/metrics/metrics.go
  4. 1 1
      logic/nodes.go
  5. 116 16
      logic/peers.go
  6. 1 1
      logic/relay.go
  7. 1 1
      models/api_node.go
  8. 1 3
      models/node.go
  9. 6 6
      mq/publishers.go

+ 10 - 8
controllers/node.go

@@ -17,6 +17,7 @@ import (
 	"github.com/gravitl/netmaker/models/promodels"
 	"github.com/gravitl/netmaker/mq"
 	"github.com/gravitl/netmaker/servercfg"
+	"github.com/kr/pretty"
 	"golang.org/x/crypto/bcrypt"
 )
 
@@ -466,18 +467,17 @@ func getNode(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
-
-	peerUpdate, err := logic.GetPeerUpdate(&node)
-	if err != nil && !database.IsEmptyRecord(err) {
+	host, err := logic.GetHost(node.HostID.String())
+	if err != nil {
 		logger.Log(0, r.Header.Get("user"),
-			fmt.Sprintf("error fetching wg peers config for node [ %s ]: %v", nodeid, err))
+			fmt.Sprintf("error fetching host for node [ %s ] info: %v", nodeid, err))
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
-	host, err := logic.GetHost(node.HostID.String())
-	if err != nil {
+	peerUpdate, err := logic.GetPeerUpdate(&node, host)
+	if err != nil && !database.IsEmptyRecord(err) {
 		logger.Log(0, r.Header.Get("user"),
-			fmt.Sprintf("error fetching host for node [ %s ] info: %v", nodeid, err))
+			fmt.Sprintf("error fetching wg peers config for node [ %s ]: %v", nodeid, err))
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
@@ -653,7 +653,7 @@ func createNode(w http.ResponseWriter, r *http.Request) {
 			return
 		}
 	}
-	peerUpdate, err := logic.GetPeerUpdate(&data.Node)
+	peerUpdate, err := logic.GetPeerUpdate(&data.Node, &data.Host)
 	if err != nil && !database.IsEmptyRecord(err) {
 		logger.Log(0, r.Header.Get("user"),
 			fmt.Sprintf("error fetching wg peers config for node [ %s ]: %v", data.Node.ID.String(), err))
@@ -661,6 +661,8 @@ func createNode(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	data.Node.Peers = peerUpdate.Peers
+	pretty.Println(data.Node.Peers)
+
 	// Create client for this host in Mq
 	event := mq.MqDynsecPayload{
 		Commands: []mq.MqDynSecCmd{

+ 1 - 1
logic/gateway.go

@@ -268,7 +268,7 @@ func DeleteIngressGateway(networkName string, nodeid string) (models.Node, bool,
 		return models.Node{}, false, err
 	}
 	logger.Log(3, "deleting ingress gateway")
-	wasFailover := node.Failover == true
+	wasFailover := node.Failover
 	node.LastModified = time.Now()
 	node.IsIngressGateway = false
 	node.IngressGatewayRange = ""

+ 5 - 2
logic/metrics/metrics.go

@@ -91,8 +91,11 @@ func Collect(iface, network string, proxy bool, peerMap models.PeerMap) (*models
 
 // GetExchangedBytesForNode - get exchanged bytes for current node peers
 func GetExchangedBytesForNode(node *models.Node, metrics *models.Metrics) error {
-
-	peers, err := logic.GetPeerUpdate(node)
+	host, err := logic.GetHost(node.HostID.String())
+	if err != nil {
+		return err
+	}
+	peers, err := logic.GetPeerUpdate(node, host)
 	if err != nil {
 		logger.Log(0, "Failed to get peers: ", err.Error())
 		return err

+ 1 - 1
logic/nodes.go

@@ -243,7 +243,7 @@ func SetNodeDefaults(node *models.Node) {
 	}
 
 	if node.PersistentKeepalive == 0 {
-		node.PersistentKeepalive = int(parentNetwork.DefaultKeepalive)
+		node.PersistentKeepalive = time.Duration(parentNetwork.DefaultKeepalive)
 	}
 	if node.PostUp == "" {
 		postup := parentNetwork.DefaultPostUp

+ 116 - 16
logic/peers.go

@@ -3,6 +3,7 @@ package logic
 import (
 	"errors"
 	"fmt"
+	"log"
 	"net"
 	"strconv"
 	"strings"
@@ -40,7 +41,7 @@ func GetPeersForProxy(node *models.Node, onlyPeers bool) (manager.ProxyManagerPa
 	if !onlyPeers {
 		if node.IsRelayed {
 			relayNode := FindRelay(node)
-			relayHost, err := GetHost(relayNode.ID.String())
+			relayHost, err := GetHost(relayNode.HostID.String())
 			if err != nil {
 				return proxyPayload, err
 			}
@@ -126,7 +127,7 @@ func GetPeersForProxy(node *models.Node, onlyPeers bool) (manager.ProxyManagerPa
 		var keepalive time.Duration
 		if node.PersistentKeepalive != 0 {
 			// set_keepalive
-			keepalive, _ = time.ParseDuration(strconv.FormatInt(int64(node.PersistentKeepalive), 10) + "s")
+			keepalive = node.PersistentKeepalive
 		}
 		peers = append(peers, wgtypes.PeerConfig{
 			PublicKey:                   host.PublicKey,
@@ -193,7 +194,103 @@ func GetPeersForProxy(node *models.Node, onlyPeers bool) (manager.ProxyManagerPa
 }
 
 // GetPeerUpdate - gets a wireguard peer config for each peer of a node
-func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) {
+func GetPeerUpdate(node *models.Node, host *models.Host) (models.PeerUpdate, error) {
+	log.Println("peer update for node ", node.ID)
+	peerUpdate := models.PeerUpdate{
+		Network:       node.Network,
+		ServerVersion: ncutils.Version,
+		DNS:           getPeerDNS(node.Network),
+	}
+	currentPeers, err := GetNetworkNodes(node.Network)
+	if err != nil {
+		log.Println("no network nodes")
+		return models.PeerUpdate{}, err
+	}
+	for _, peer := range currentPeers {
+		var peerConfig wgtypes.PeerConfig
+		peerHost, err := GetHost(peer.HostID.String())
+		if err != nil {
+			log.Println("no peer host", err)
+			return models.PeerUpdate{}, err
+		}
+		if peer.ID == node.ID {
+			log.Println("peer update, skipping self")
+			//skip yourself
+
+			continue
+		}
+		if !peer.Connected {
+			log.Println("peer update, skipping unconnected node")
+			//skip unconnected nodes
+			continue
+		}
+		if !nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) {
+			log.Println("peer update, skipping node for acl")
+			//skip if not permitted by acl
+			continue
+		}
+		peerConfig.PublicKey = peerHost.PublicKey
+		peerConfig.PersistentKeepaliveInterval = &peer.PersistentKeepalive
+		peerConfig.ReplaceAllowedIPs = true
+		uselocal := false
+		if host.EndpointIP.String() == peerHost.EndpointIP.String() {
+			//peer is on same network
+			// set to localaddress
+			uselocal = true
+			if node.LocalAddress.IP == nil {
+				// use public endpint
+				uselocal = false
+			}
+			if node.LocalAddress.String() == peer.LocalAddress.String() {
+				uselocal = false
+			}
+		}
+		peerConfig.Endpoint = &net.UDPAddr{
+			IP:   peerHost.EndpointIP,
+			Port: peerHost.ListenPort,
+		}
+		if uselocal {
+			peerConfig.Endpoint.IP = peer.LocalAddress.IP
+		}
+		allowedips := getNodeAllowedIPs(&peer, node)
+		if peer.IsIngressGateway {
+			for _, entry := range peer.IngressGatewayRange {
+				_, cidr, err := net.ParseCIDR(string(entry))
+				if err == nil {
+					allowedips = append(allowedips, *cidr)
+				}
+			}
+		}
+		if peer.IsRelay {
+			allowedips = append(allowedips, getRelayAllowedIPs(node, &peer)...)
+		}
+		if peer.IsEgressGateway {
+			allowedips = append(allowedips, getEgressIPs(node, &peer)...)
+		}
+		peerConfig.AllowedIPs = allowedips
+		peerUpdate.Peers = append(peerUpdate.Peers, peerConfig)
+	}
+	return peerUpdate, nil
+}
+
+func getRelayAllowedIPs(node, peer *models.Node) []net.IPNet {
+	var allowedips []net.IPNet
+	var allowedip net.IPNet
+	for _, addr := range peer.RelayAddrs {
+		if node.Address.IP.String() == addr {
+			continue
+		}
+		if node.Address6.IP.String() == addr {
+			continue
+		}
+		allowedip.IP = net.ParseIP(addr)
+		allowedips = append(allowedips, allowedip)
+	}
+	return allowedips
+}
+
+// GetPeerUpdateLegacy - gets a wireguard peer config for each peer of a node
+func GetPeerUpdateLegacy(node *models.Node) (models.PeerUpdate, error) {
 	var peerUpdate models.PeerUpdate
 	var peers []wgtypes.PeerConfig
 	var serverNodeAddresses = []models.ServerAddr{}
@@ -219,7 +316,7 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) {
 	if err != nil {
 		return models.PeerUpdate{}, err
 	}
-	host, err := GetHost(node.ID.String())
+	host, err := GetHost(node.HostID.String())
 	if err != nil {
 		return peerUpdate, err
 	}
@@ -231,7 +328,7 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) {
 	// #2 Set local address: set_local - could be a LOT BETTER and fix some bugs with additional logic
 	// #3 Set allowedips: set_allowedips
 	for _, peer := range currentPeers {
-		peerHost, err := GetHost(peer.ID.String())
+		peerHost, err := GetHost(peer.HostID.String())
 		if err != nil {
 			logger.Log(0, "error retrieving host for peer", node.ID.String(), err.Error())
 			return models.PeerUpdate{}, err
@@ -328,8 +425,9 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) {
 		allowedips := GetAllowedIPs(node, &peer, metrics, fetchRelayedIps)
 		var keepalive time.Duration
 		if node.PersistentKeepalive != 0 {
+
 			// set_keepalive
-			keepalive, _ = time.ParseDuration(strconv.FormatInt(int64(node.PersistentKeepalive), 10) + "s")
+			keepalive = node.PersistentKeepalive
 		}
 		var peerData = wgtypes.PeerConfig{
 			PublicKey:                   peerHost.PublicKey,
@@ -664,7 +762,11 @@ func GetPeerUpdateForRelayedNode(node *models.Node, udppeers map[string]string)
 		allowedips = append(allowedips, relayIP6)
 	}
 	//get PeerUpdate for relayed node
-	relayPeerUpdate, err := GetPeerUpdate(relay)
+	relayHost, err := GetHost(relay.HostID.String())
+	if err != nil {
+		return models.PeerUpdate{}, err
+	}
+	relayPeerUpdate, err := GetPeerUpdate(relay, relayHost)
 	if err != nil {
 		return models.PeerUpdate{}, err
 	}
@@ -721,13 +823,6 @@ func GetPeerUpdateForRelayedNode(node *models.Node, udppeers map[string]string)
 		}
 		allowedips = append(allowedips, *ip)
 	}
-	relayHost, err := GetHost(relay.HostID.String())
-	if err != nil {
-		logger.Log(0, "error retrieving host for relay node", node.ID.String(), err.Error())
-	}
-	if err != nil {
-		return models.PeerUpdate{}, err
-	}
 	var setUDPPort = false
 	var listenPort int
 	if CheckEndpoint(udppeers[relayHost.PublicKey.String()]) {
@@ -756,7 +851,7 @@ func GetPeerUpdateForRelayedNode(node *models.Node, udppeers map[string]string)
 	var keepalive time.Duration
 	if node.PersistentKeepalive != 0 {
 		// set_keepalive
-		keepalive, _ = time.ParseDuration(strconv.FormatInt(int64(node.PersistentKeepalive), 10) + "s")
+		keepalive = node.PersistentKeepalive
 	}
 	var peerData = wgtypes.PeerConfig{
 		PublicKey:                   relayHost.PublicKey,
@@ -788,7 +883,7 @@ func getEgressIPs(node, peer *models.Node) []net.IPNet {
 	if err != nil {
 		logger.Log(0, "error retrieving host for node", node.ID.String(), err.Error())
 	}
-	peerHost, err := GetHost(peer.ID.String())
+	peerHost, err := GetHost(peer.HostID.String())
 	if err != nil {
 		logger.Log(0, "error retrieving host for peer", peer.ID.String(), err.Error())
 	}
@@ -826,6 +921,11 @@ func getEgressIPs(node, peer *models.Node) []net.IPNet {
 
 func getNodeAllowedIPs(peer, node *models.Node) []net.IPNet {
 	var allowedips = []net.IPNet{}
+	host, err := GetHost(node.HostID.String())
+	if err != nil {
+		logger.Log(0, "error retrieving host for node", node.ID.String(), err.Error())
+	}
+
 	if peer.Address.IP != nil {
 		allowedips = append(allowedips, peer.Address)
 	}

+ 1 - 1
logic/relay.go

@@ -19,7 +19,7 @@ func CreateRelay(relay models.RelayRequest) ([]models.Node, models.Node, error)
 	if err != nil {
 		return returnnodes, models.Node{}, err
 	}
-	host, err := GetHost(node.ID.String())
+	host, err := GetHost(node.HostID.String())
 	if err != nil {
 		return returnnodes, models.Node{}, err
 	}

+ 1 - 1
models/api_node.go

@@ -69,7 +69,7 @@ func (a *ApiNode) ConvertToServerNode(currentNode *Node) *Node {
 	convertedNode.DNSOn = a.DNSOn
 	convertedNode.EgressGatewayRequest = currentNode.EgressGatewayRequest
 	convertedNode.EgressGatewayNatEnabled = currentNode.EgressGatewayNatEnabled
-	convertedNode.PersistentKeepalive = int(a.PersistentKeepalive)
+	convertedNode.PersistentKeepalive = time.Duration(a.PersistentKeepalive)
 	convertedNode.RelayAddrs = a.RelayAddrs
 	convertedNode.DefaultACL = a.DefaultACL
 	convertedNode.OwnerID = currentNode.OwnerID

+ 1 - 3
models/node.go

@@ -74,7 +74,7 @@ type CommonNode struct {
 	IsEgressGateway     bool                 `json:"isegressgateway" yaml:"isegressgateway"`
 	IsIngressGateway    bool                 `json:"isingressgateway" yaml:"isingressgateway"`
 	DNSOn               bool                 `json:"dnson" yaml:"dnson"`
-	PersistentKeepalive int                  `json:"persistentkeepalive" yaml:"persistentkeepalive"`
+	PersistentKeepalive time.Duration        `json:"persistentkeepalive" yaml:"persistentkeepalive"`
 	Peers               []wgtypes.PeerConfig `json:"peers" yaml:"peers"`
 }
 
@@ -536,7 +536,6 @@ func (ln *LegacyNode) ConvertToNewNode() (*Host, *Node) {
 	node.IsEgressGateway = parseBool(ln.IsEgressGateway)
 	node.IsIngressGateway = parseBool(ln.IsIngressGateway)
 	node.DNSOn = parseBool(ln.DNSOn)
-	node.PersistentKeepalive = int(ln.PersistentKeepalive)
 
 	return &host, &node
 }
@@ -559,7 +558,6 @@ func (n *Node) Legacy(h *Host, s *ServerConfig, net *Network) *LegacyNode {
 	l.PostUp = n.PostUp
 	l.PostDown = n.PostDown
 	//l.AllowedIPs =
-	l.PersistentKeepalive = int32(n.PersistentKeepalive)
 	l.AccessKey = ""
 	l.Interface = WIREGUARD_INTERFACE
 	//l.LastModified =

+ 6 - 6
mq/publishers.go

@@ -49,12 +49,12 @@ func PublishProxyPeerUpdate(node *models.Node) error {
 
 // PublishSinglePeerUpdate --- determines and publishes a peer update to one node
 func PublishSinglePeerUpdate(node *models.Node) error {
-	host, err := logic.GetHost(node.ID.String())
+	host, err := logic.GetHost(node.HostID.String())
 	if err != nil {
 		return nil
 	}
 
-	peerUpdate, err := logic.GetPeerUpdate(node)
+	peerUpdate, err := logic.GetPeerUpdate(node, host)
 	if err != nil {
 		return err
 	}
@@ -77,14 +77,14 @@ func PublishSinglePeerUpdate(node *models.Node) error {
 
 // PublishPeerUpdate --- publishes a peer update to all the peers of a node
 func PublishExtPeerUpdate(node *models.Node) error {
-	host, err := logic.GetHost(node.ID.String())
+	host, err := logic.GetHost(node.HostID.String())
 	if err != nil {
 		return nil
 	}
 	if !servercfg.IsMessageQueueBackend() {
 		return nil
 	}
-	peerUpdate, err := logic.GetPeerUpdate(node)
+	peerUpdate, err := logic.GetPeerUpdate(node, host)
 	if err != nil {
 		return err
 	}
@@ -108,7 +108,7 @@ func PublishExtPeerUpdate(node *models.Node) error {
 
 // NodeUpdate -- publishes a node update
 func NodeUpdate(node *models.Node) error {
-	host, err := logic.GetHost(node.ID.String())
+	host, err := logic.GetHost(node.HostID.String())
 	if err != nil {
 		return nil
 	}
@@ -142,7 +142,7 @@ func NodeUpdate(node *models.Node) error {
 
 // ProxyUpdate -- publishes updates to peers related to proxy
 func ProxyUpdate(proxyPayload *manager.ProxyManagerPayload, node *models.Node) error {
-	host, err := logic.GetHost(node.ID.String())
+	host, err := logic.GetHost(node.HostID.String())
 	if err != nil {
 		return nil
 	}