Przeglądaj źródła

refine peer update for relay creation/deletion

Matthew R Kasun 2 lat temu
rodzic
commit
75d56c19f5
4 zmienionych plików z 47 dodań i 32 usunięć
  1. 4 2
      controllers/ext_client.go
  2. 20 28
      ee/ee_controllers/relay.go
  3. 10 2
      logic/peers.go
  4. 13 0
      mq/publishers.go

+ 4 - 2
controllers/ext_client.go

@@ -397,8 +397,10 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
 	logger.Log(0, r.Header.Get("user"), "created new ext client on network", networkName)
 	w.WriteHeader(http.StatusOK)
 	go func() {
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error())
+		mq.BroadcastExtClient(host, &node)
+		f, err := logic.GetFwUpdate(host)
+		if err == nil {
+			mq.PublishFwUpdate(host, &f)
 		}
 		if err := mq.PublishExtCLientDNS(&extclient); err != nil {
 			logger.Log(1, "error publishing extclient dns", err.Error())

+ 20 - 28
ee/ee_controllers/relay.go

@@ -11,6 +11,7 @@ import (
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/mq"
+	"golang.org/x/exp/slog"
 )
 
 // RelayHandlers - handle EE Relays
@@ -71,11 +72,16 @@ func createRelay(w http.ResponseWriter, r *http.Request) {
 	//for _, relayed := range relayedClients {
 	//mq.PubPeersForRelayedNode(relayed, relay, peers)
 	//}
-	clients := peers
-	for _, client := range clients {
-		mq.PubPeerUpdate(&client, &relay, peers)
-	}
-	logger.Log(1, r.Header.Get("user"), "created relay on node", relayRequest.NodeID, "on network", relayRequest.NetID)
+	//clients := peers
+	go func() {
+		for _, client := range peers {
+			update := models.PeerAction{
+				Peers: logic.GetPeerUpdate(&client.Host),
+			}
+			mq.PubPeerUpdateToHost(&client.Host, update)
+		}
+	}()
+	slog.Info("created relay on node", "user", r.Header.Get("user"), "node", relayRequest.NodeID, "network", relayRequest.NetID)
 	apiNode := relayNode.ConvertToAPINode()
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
@@ -97,7 +103,7 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 	var params = mux.Vars(r)
 	nodeid := params["nodeid"]
 	netid := params["network"]
-	updateClients, node, err := logic.DeleteRelay(netid, nodeid)
+	_, node, err := logic.DeleteRelay(netid, nodeid)
 	if err != nil {
 		logger.Log(0, r.Header.Get("user"), "error decoding request body: ", err.Error())
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
@@ -105,29 +111,15 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 	}
 	logger.Log(1, r.Header.Get("user"), "deleted relay server", nodeid, "on network", netid)
 	go func() {
-		//update relayHost node
-		relayHost, err := logic.GetHost(node.HostID.String())
-		if err == nil {
-			if err := mq.NodeUpdate(&node); err != nil {
-				logger.Log(1, "relay node update", relayHost.Name, "on network", node.Network, ": ", err.Error())
-			}
-			for _, relayedClient := range updateClients {
-				err = mq.NodeUpdate(&relayedClient.Node)
-				if err != nil {
-					logger.Log(1, "relayed node update ", relayedClient.Node.ID.String(), "on network", relayedClient.Node.Network, ": ", err.Error())
-
-				}
-			}
-			peers, err := logic.GetNetworkClients(node.Network)
-			if err != nil {
-				logger.Log(0, "error getting network nodes: ", err.Error())
-				logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
-				return
-			}
-			clients := peers
-			for _, client := range clients {
-				mq.PubPeerUpdate(&client, nil, peers)
+		peers, err := logic.GetNetworkClients(node.Network)
+		if err != nil {
+			slog.Warn("error getting network clients: ", "error", err)
+		}
+		for _, client := range peers {
+			update := models.PeerAction{
+				Peers: logic.GetPeerUpdate(&client.Host),
 			}
+			mq.PubPeerUpdateToHost(&client.Host, update)
 		}
 	}()
 	logger.Log(1, r.Header.Get("user"), "deleted relay on node", node.ID.String(), "on network", node.Network)

+ 10 - 2
logic/peers.go

@@ -754,7 +754,7 @@ func filterNodeMapForClientACLs(publicKey, network string, nodePeerMap map[strin
 
 func GetPeerUpdate(host *models.Host) []wgtypes.PeerConfig {
 	peerUpdate := []wgtypes.PeerConfig{}
-	for _, nodeStr := range host.Nodes {
+	for i, nodeStr := range host.Nodes {
 		node, err := GetNodeByID(nodeStr)
 		if err != nil {
 			continue
@@ -778,7 +778,7 @@ func GetPeerUpdate(host *models.Host) []wgtypes.PeerConfig {
 			}
 			// if peer is relayed by some other node, remove it from the peer list,  it
 			// will be added to allowedips of relay peer
-			if peer.Node.IsRelayed {
+			if peer.Node.IsRelayed && i == 0 {
 				update := wgtypes.PeerConfig{
 					PublicKey: peer.Host.PublicKey,
 					Remove:    true,
@@ -786,6 +786,10 @@ func GetPeerUpdate(host *models.Host) []wgtypes.PeerConfig {
 				peerUpdate = append(peerUpdate, update)
 				continue
 			}
+			// on multiple networks, do not remove, just skip
+			if peer.Node.IsRelayed && i > 0 {
+				continue
+			}
 			update := wgtypes.PeerConfig{
 				PublicKey:         peer.Host.PublicKey,
 				ReplaceAllowedIPs: true,
@@ -795,6 +799,10 @@ func GetPeerUpdate(host *models.Host) []wgtypes.PeerConfig {
 				},
 				PersistentKeepaliveInterval: &peer.Node.PersistentKeepalive,
 			}
+			// if multiple networks, append to allowedips
+			if i > 0 {
+				update.ReplaceAllowedIPs = false
+			}
 			// if peer is a relay that relays us, don't do anything
 			if peer.Node.IsRelay && client.Node.RelayedBy == peer.Node.ID.String() {
 				continue

+ 13 - 0
mq/publishers.go

@@ -13,6 +13,7 @@ import (
 	"github.com/gravitl/netmaker/logic/acls/nodeacls"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/servercfg"
+	"golang.org/x/exp/slog"
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
@@ -709,3 +710,15 @@ func sendPeers() {
 		}
 	}
 }
+
+func PubPeerUpdateToHost(host *models.Host, update models.PeerAction) {
+	data, err := json.Marshal(update)
+	if err != nil {
+		slog.Error("error mashalling peer update for", "host", host.Name, "err", err)
+		return
+	}
+	if err = publish(host, fmt.Sprintf("peer/host/%s/%s", host.ID.String(), servercfg.GetServer()), data); err != nil {
+		slog.Error("error publishing peer update to host", "host", host.Name, "err", err)
+		return
+	}
+}