Kaynağa Gözat

fix relay,ingress peer updates

Abhishek Kondur 2 yıl önce
ebeveyn
işleme
5af8a8004f

+ 5 - 3
controllers/ext_client.go

@@ -397,7 +397,8 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
 	logger.Log(0, r.Header.Get("user"), "created new ext client on network", networkName)
 	w.WriteHeader(http.StatusOK)
 	go func() {
-		mq.BroadcastExtClient(host, &node)
+		ingressClient := models.Client{Host: *host, Node: node}
+		mq.BroadcastExtClient(ingressClient)
 		f, err := logic.GetFwUpdate(host)
 		if err == nil {
 			mq.PublishFwUpdate(host, &f)
@@ -501,12 +502,13 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 	go func() {
 		if ingressNode, err := logic.GetNodeByID(newclient.IngressGatewayID); err == nil {
 			if ingressHost, err := logic.GetHost(ingressNode.HostID.String()); err == nil {
+				ingressClient := models.Client{Host: *ingressHost, Node: ingressNode}
 				if replaceOldClient || !update.Enabled {
-					mq.BroadcastDelExtClient(models.Client{Host: *ingressHost, Node: ingressNode}, []models.ExtClient{currentClient})
+					mq.BroadcastDelExtClient(ingressClient, []models.ExtClient{currentClient})
 				}
 				if replaceOldClient || changedEnabled {
 					// broadcast update
-					mq.BroadcastExtClient(ingressHost, &ingressNode)
+					mq.BroadcastExtClient(ingressClient)
 				}
 				f, err := logic.GetFwUpdate(ingressHost)
 				if err == nil {

+ 9 - 3
controllers/node.go

@@ -561,7 +561,6 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) {
 			logger.Log(1, "failed to reset failover list during failover create", node.ID.String(), node.Network)
 		}
 	}
-
 	apiNode := node.ConvertToAPINode()
 	logger.Log(1, r.Header.Get("user"), "created ingress gateway on node", nodeid, "on network", netid)
 	w.WriteHeader(http.StatusOK)
@@ -707,8 +706,15 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 	json.NewEncoder(w).Encode(apiNode)
 	runUpdates(newNode, ifaceDelta)
 	go func(aclUpdate, relayupdate bool, newNode *models.Node) {
-		if aclUpdate || relayupdate {
-			mq.BroadcastAddOrUpdateNetworkPeer(models.Client{Host: *host, Node: *newNode}, true)
+		if aclUpdate {
+			mq.BroadcastAclUpdate(newNode.Network)
+		}
+		if relayupdate {
+			mq.BroadCastRelayUpdate(models.RelayRequest{
+				NodeID:       newNode.ID.String(),
+				NetID:        newNode.Network,
+				RelayedNodes: newNode.RelayedNodes,
+			})
 		}
 		if err := mq.PublishReplaceDNS(&currentNode, newNode, host); err != nil {
 			logger.Log(1, "failed to publish dns update", err.Error())

+ 0 - 96
controllers/relay.go

@@ -1,96 +0,0 @@
-package controller
-
-import (
-	"encoding/json"
-	"fmt"
-	"net/http"
-
-	"github.com/gorilla/mux"
-	"github.com/gravitl/netmaker/logger"
-	"github.com/gravitl/netmaker/logic"
-	"github.com/gravitl/netmaker/models"
-	"github.com/gravitl/netmaker/mq"
-)
-
-// swagger:route POST /api/nodes/{network}/{nodeid}/createrelay nodes createRelay
-//
-// Create a relay.
-//
-//			Schemes: https
-//
-//			Security:
-//	  		oauth
-//
-//			Responses:
-//				200: nodeResponse
-func createRelay(w http.ResponseWriter, r *http.Request) {
-	var relay models.RelayRequest
-	var params = mux.Vars(r)
-	w.Header().Set("Content-Type", "application/json")
-	err := json.NewDecoder(r.Body).Decode(&relay)
-	if err != nil {
-		logger.Log(0, r.Header.Get("user"), "error decoding request body: ", err.Error())
-		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
-		return
-	}
-	relay.NetID = params["network"]
-	relay.NodeID = params["nodeid"]
-	updatenodes, node, err := logic.CreateRelay(relay)
-	if err != nil {
-		logger.Log(0, r.Header.Get("user"),
-			fmt.Sprintf("failed to create relay on node [%s] on network [%s]: %v", relay.NodeID, relay.NetID, err))
-		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
-		return
-	}
-
-	logger.Log(1, r.Header.Get("user"), "created relay on node", relay.NodeID, "on network", relay.NetID)
-	for _, relayedNode := range updatenodes {
-
-		err = mq.NodeUpdate(&relayedNode)
-		if err != nil {
-			logger.Log(1, "error sending update to relayed node ", relayedNode.ID.String(), "on network", relay.NetID, ": ", err.Error())
-		}
-	}
-	go mq.PublishPeerUpdate()
-
-	apiNode := node.ConvertToAPINode()
-	w.WriteHeader(http.StatusOK)
-	json.NewEncoder(w).Encode(apiNode)
-	runUpdates(&node, true)
-}
-
-// swagger:route DELETE /api/nodes/{network}/{nodeid}/deleterelay nodes deleteRelay
-//
-// Remove a relay.
-//
-//			Schemes: https
-//
-//			Security:
-//	  		oauth
-//
-//			Responses:
-//				200: nodeResponse
-func deleteRelay(w http.ResponseWriter, r *http.Request) {
-	w.Header().Set("Content-Type", "application/json")
-	var params = mux.Vars(r)
-	nodeid := params["nodeid"]
-	netid := params["network"]
-	updatenodes, node, err := logic.DeleteRelay(netid, nodeid)
-	if err != nil {
-		logger.Log(0, r.Header.Get("user"), "error decoding request body: ", err.Error())
-		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
-		return
-	}
-	logger.Log(1, r.Header.Get("user"), "deleted relay server", nodeid, "on network", netid)
-	for _, relayedNode := range updatenodes {
-		err = mq.NodeUpdate(&relayedNode)
-		if err != nil {
-			logger.Log(1, "error sending update to relayed node ", relayedNode.ID.String(), "on network", netid, ": ", err.Error())
-		}
-	}
-	go mq.PublishPeerUpdate()
-	apiNode := node.ConvertToAPINode()
-	w.WriteHeader(http.StatusOK)
-	json.NewEncoder(w).Encode(apiNode)
-	runUpdates(&node, true)
-}

+ 3 - 28
ee/ee_controllers/relay.go

@@ -50,6 +50,7 @@ func createRelay(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
+
 	go mq.BroadCastRelayUpdate(relayRequest)
 	logger.Log(1, r.Header.Get("user"), "created relay on node", relayRequest.NodeID, "on network", relayRequest.NetID)
 	apiNode := relayNode.ConvertToAPINode()
@@ -73,40 +74,14 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 	var params = mux.Vars(r)
 	nodeid := params["nodeid"]
 	netid := params["network"]
-	updateClients, node, err := logic.DeleteRelay(netid, nodeid)
+	_, node, err := logic.DeleteRelay(netid, nodeid)
 	if err != nil {
 		logger.Log(0, r.Header.Get("user"), "error decoding request body: ", err.Error())
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
 		return
 	}
 	logger.Log(1, r.Header.Get("user"), "deleted relay server", nodeid, "on network", netid)
-	go func() {
-		//update relayHost node
-		// relayHost, err := logic.GetHost(node.HostID.String())
-		// if err == nil {
-		// 	if err := mq.NodeUpdate(&node); err != nil {
-		// 		logger.Log(1, "relay node update", relayHost.Name, "on network", node.Network, ": ", err.Error())
-		// 	}
-		for _, relayedClient := range updateClients {
-			err = mq.NodeUpdate(&relayedClient.Node)
-			if err != nil {
-				logger.Log(1, "relayed node update ", relayedClient.Node.ID.String(), "on network", relayedClient.Node.Network, ": ", err.Error())
-
-			}
-		}
-		// 	peers, err := logic.GetNetworkClients(node.Network)
-		// 	if err != nil {
-		// 		logger.Log(0, "error getting network nodes: ", err.Error())
-		// 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
-		// 		return
-		// 	}
-		// 	clients := peers
-		// 	for _, client := range clients {
-		// 		mq.PubPeerUpdate(&client, nil, peers)
-		// 	}
-		// }
-		go mq.BroadCastRelayRemoval(netid)
-	}()
+	go mq.BroadCastRelayRemoval(netid)
 	logger.Log(1, r.Header.Get("user"), "deleted relay on node", node.ID.String(), "on network", node.Network)
 	apiNode := node.ConvertToAPINode()
 	w.WriteHeader(http.StatusOK)

+ 3 - 3
logic/gateway.go

@@ -115,9 +115,9 @@ func CreateIngressGateway(netid string, nodeid string, ingress models.IngressReq
 	if host.FirewallInUse == models.FIREWALL_NONE {
 		return models.Node{}, errors.New("firewall is not supported for ingress gateways")
 	}
-	if host.NatType != models.NAT_Types.Public {
-		return models.Node{}, errors.New("ingress cannot be created on nodes behind NAT")
-	}
+	// if host.NatType != models.NAT_Types.Public {
+	// 	return models.Node{}, errors.New("ingress cannot be created on nodes behind NAT")
+	// }
 
 	network, err := GetParentNetwork(netid)
 	if err != nil {

+ 14 - 11
logic/peers.go

@@ -439,7 +439,7 @@ func GetAllowedIPs(client, peer models.Client) []net.IPNet {
 		if clientNode == nil {
 			continue
 		}
-
+		client.Node = *clientNode
 		peer.Node = node
 		if ShouldRemovePeer(*clientNode, peer.Node) {
 			continue
@@ -654,20 +654,23 @@ func getIngressIPs(peer models.Client) []net.IPNet {
 	for _, ec := range extclients {
 		if ec.IngressGatewayID == peer.Node.ID.String() {
 			if ec.Address != "" {
-				ip, cidr, err := net.ParseCIDR(ec.Address)
-				if err != nil {
-					continue
+				var peeraddr = net.IPNet{
+					IP:   net.ParseIP(ec.Address),
+					Mask: net.CIDRMask(32, 32),
+				}
+				if peeraddr.IP != nil && peeraddr.Mask != nil {
+					ingressIPs = append(ingressIPs, peeraddr)
 				}
-				cidr.IP = ip
-				ingressIPs = append(ingressIPs, *cidr)
 			}
+
 			if ec.Address6 != "" {
-				ip, cidr, err := net.ParseCIDR(ec.Address6)
-				if err != nil {
-					continue
+				var addr6 = net.IPNet{
+					IP:   net.ParseIP(ec.Address6),
+					Mask: net.CIDRMask(128, 128),
+				}
+				if addr6.IP != nil && addr6.Mask != nil {
+					ingressIPs = append(ingressIPs, addr6)
 				}
-				cidr.IP = ip
-				ingressIPs = append(ingressIPs, *cidr)
 			}
 		}
 	}

+ 6 - 0
logic/relay.go

@@ -192,5 +192,11 @@ func GetPeerConfForRelayed(relayed, relay models.Client) wgtypes.PeerConfig {
 		}
 		update.AllowedIPs = append(update.AllowedIPs, allowed)
 	}
+	if relay.Node.IsIngressGateway {
+		update.AllowedIPs = append(update.AllowedIPs, getIngressIPs(relay)...)
+	}
+	if relay.Node.IsEgressGateway {
+		update.AllowedIPs = append(update.AllowedIPs, getEgressIPs(relay)...)
+	}
 	return update
 }

+ 0 - 4
models/host.go

@@ -60,10 +60,6 @@ type Host struct {
 	TrafficKeyPublic   []byte           `json:"traffickeypublic" yaml:"traffickeypublic"`
 	InternetGateway    net.UDPAddr      `json:"internetgateway" yaml:"internetgateway"`
 	Nodes              []string         `json:"nodes" yaml:"nodes"`
-	IsRelayed          bool             `json:"isrelayed" yaml:"isrelayed"`
-	RelayedBy          string           `json:"relayed_by" yaml:"relayed_by"`
-	IsRelay            bool             `json:"isrelay" yaml:"isrelay"`
-	RelayedHosts       []string         `json:"relay_hosts" yaml:"relay_hosts"`
 	Interfaces         []Iface          `json:"interfaces" yaml:"interfaces"`
 	DefaultInterface   string           `json:"defaultinterface" yaml:"defaultinterface"`
 	EndpointIP         net.IP           `json:"endpointip" yaml:"endpointip"`

+ 4 - 4
models/node.go

@@ -70,10 +70,6 @@ type CommonNode struct {
 	EgressGatewayRanges []string      `json:"egressgatewayranges" bson:"egressgatewayranges" yaml:"egressgatewayranges"`
 	IsIngressGateway    bool          `json:"isingressgateway" yaml:"isingressgateway"`
 	IngressDNS          string        `json:"ingressdns" yaml:"ingressdns"`
-	IsRelayed           bool          `json:"isrelayed" yaml:"isrelayed"`
-	RelayedBy           string        `json:"relayedby" yaml:"relayedby"`
-	IsRelay             bool          `json:"isrelay" yaml:"isrelay"`
-	RelayedNodes        []string      `json:"relaynodes" yaml:"relayedNodes"`
 	DNSOn               bool          `json:"dnson" yaml:"dnson"`
 	PersistentKeepalive time.Duration `json:"persistentkeepalive" yaml:"persistentkeepalive"`
 }
@@ -95,6 +91,10 @@ type Node struct {
 	OwnerID      string    `json:"ownerid,omitempty" bson:"ownerid,omitempty" yaml:"ownerid,omitempty"`
 	FailoverNode uuid.UUID `json:"failovernode" bson:"failovernode" yaml:"failovernode"`
 	Failover     bool      `json:"failover" bson:"failover" yaml:"failover"`
+	IsRelayed    bool      `json:"isrelayed" yaml:"isrelayed"`
+	RelayedBy    string    `json:"relayedby" yaml:"relayedby"`
+	IsRelay      bool      `json:"isrelay" yaml:"isrelay"`
+	RelayedNodes []string  `json:"relaynodes" yaml:"relayedNodes"`
 }
 
 // LegacyNode - legacy struct for node model

+ 62 - 9
mq/publishers.go

@@ -71,6 +71,12 @@ func FlushNetworkPeersToHost(client models.Client, networkClients []models.Clien
 
 		addPeerAction.Peers = append(addPeerAction.Peers, peerCfg)
 	}
+	if client.Node.IsIngressGateway {
+		extPeers, _, err := logic.GetExtPeers(&client.Node)
+		if err == nil {
+			addPeerAction.Peers = append(addPeerAction.Peers, extPeers...)
+		}
+	}
 	if len(rmPeerAction.Peers) > 0 {
 		data, err := json.Marshal(rmPeerAction)
 		if err != nil {
@@ -182,8 +188,10 @@ func BroadcastHostUpdate(host *models.Host, remove bool) error {
 		return err
 	}
 	for _, peerHost := range peerHosts {
+		if !remove {
+			p.Peers[0].AllowedIPs = logic.GetAllowedIPs(models.Client{Host: peerHost}, models.Client{Host: *host})
+		}
 		publish(&peerHost, fmt.Sprintf("peer/host/%s/%s", peerHost.ID.String(), servercfg.GetServer()), data)
-
 	}
 	return nil
 }
@@ -209,8 +217,40 @@ func BroadcastAddOrUpdateNetworkPeer(client models.Client, update bool) error {
 			},
 		},
 	}
+	var relayPeerCfg models.PeerAction
+	var relayClient models.Client
+	if client.Node.IsRelayed {
+		relayNode, err := logic.GetNodeByID(client.Node.RelayedBy)
+		if err != nil {
+			return err
+		}
+		relayHost, err := logic.GetHost(relayNode.HostID.String())
+		if err != nil {
+			return err
+		}
+		relayClient = models.Client{
+			Host: *relayHost,
+			Node: relayNode,
+		}
+		relayPeerCfg = models.PeerAction{
+			Action: models.AddPeer,
+			Peers: []wgtypes.PeerConfig{
+				{
+					PublicKey: relayHost.PublicKey,
+					Endpoint: &net.UDPAddr{
+						IP:   relayHost.EndpointIP,
+						Port: logic.GetPeerListenPort(relayHost),
+					},
+					PersistentKeepaliveInterval: &relayNode.PersistentKeepalive,
+					ReplaceAllowedIPs:           true,
+				},
+			},
+		}
+
+	}
 	if update {
 		p.Action = models.UpdatePeer
+		relayPeerCfg.Action = models.UpdatePeer
 	}
 	for _, clientI := range clients {
 		clientI := clientI
@@ -219,7 +259,10 @@ func BroadcastAddOrUpdateNetworkPeer(client models.Client, update bool) error {
 			continue
 		}
 		// update allowed ips, according to the peer node
-		p.Peers[0].AllowedIPs = logic.GetAllowedIPs(clientI, models.Client{Host: client.Host, Node: client.Node})
+		p.Peers[0].AllowedIPs = logic.GetAllowedIPs(clientI, client)
+		if client.Node.IsRelayed {
+			relayPeerCfg.Peers[0].AllowedIPs = logic.GetAllowedIPs(clientI, relayClient)
+		}
 		if update && len(p.Peers[0].AllowedIPs) == 0 {
 			// remove peer
 			p.Action = models.RemovePeer
@@ -230,6 +273,7 @@ func BroadcastAddOrUpdateNetworkPeer(client models.Client, update bool) error {
 		if err != nil {
 			continue
 		}
+
 		if clientI.Node.IsRelayed {
 			r := models.PeerAction{
 				Action: models.AddPeer,
@@ -263,10 +307,19 @@ func BroadcastAddOrUpdateNetworkPeer(client models.Client, update bool) error {
 			publish(peerHost, fmt.Sprintf("peer/host/%s/%s", peerHost.ID.String(), servercfg.GetServer()), data)
 
 		} else {
-			data, err := json.Marshal(p)
-			if err != nil {
-				continue
+			var data []byte
+			if client.Node.IsRelayed && client.Node.RelayedBy != clientI.Node.ID.String() {
+				data, err = json.Marshal(relayPeerCfg)
+				if err != nil {
+					continue
+				}
+			} else {
+				data, err = json.Marshal(p)
+				if err != nil {
+					continue
+				}
 			}
+
 			publish(peerHost, fmt.Sprintf("peer/host/%s/%s", peerHost.ID.String(), servercfg.GetServer()), data)
 		}
 		if clientI.Node.IsIngressGateway || clientI.Node.IsEgressGateway {
@@ -283,16 +336,16 @@ func BroadcastAddOrUpdateNetworkPeer(client models.Client, update bool) error {
 }
 
 // BroadcastExtClient - publishes msg to add/updates ext client in the network
-func BroadcastExtClient(ingressHost *models.Host, ingressNode *models.Node) error {
+func BroadcastExtClient(ingressClient models.Client) error {
 
-	clients, err := logic.GetNetworkClients(ingressNode.Network)
+	clients, err := logic.GetNetworkClients(ingressClient.Node.Network)
 	if err != nil {
 		return err
 	}
 	//flush peers to ingress host
-	go FlushNetworkPeersToHost(models.Client{Host: *ingressHost, Node: *ingressNode}, clients)
+	go FlushNetworkPeersToHost(ingressClient, clients)
 	// broadcast to update ingress peer to other hosts
-	go BroadcastAddOrUpdateNetworkPeer(models.Client{Host: *ingressHost, Node: *ingressNode}, true)
+	go BroadcastAddOrUpdateNetworkPeer(ingressClient, true)
 	return nil
 }