Kaynağa Gözat

remove mq check,remove duplicate allowedips func

Abhishek Kondur 2 yıl önce
ebeveyn
işleme
09c951898e
12 değiştirilmiş dosya ile 33 ekleme ve 163 silme
  1. 5 12
      auth/host_session.go
  2. 7 11
      controllers/dns.go
  3. 7 10
      controllers/hosts.go
  4. 1 5
      controllers/network.go
  5. 1 20
      logic/peers.go
  6. 2 2
      logic/relay.go
  7. 5 14
      main.go
  8. 1 41
      mq/handlers.go
  9. 1 4
      mq/mq.go
  10. 0 26
      mq/publishers.go
  11. 3 3
      mq/relay.go
  12. 0 15
      servercfg/serverconf.go

+ 5 - 12
auth/host_session.go

@@ -237,21 +237,14 @@ func CheckNetRegAndHostUpdate(networks []string, h *models.Host) {
 				Host:   *h,
 				Node:   *newNode,
 			})
-			if servercfg.IsMessageQueueBackend() {
-				mq.BroadcastAddOrUpdateNetworkPeer(&models.Client{Host: *h, Node: *newNode}, false)
-			}
+			mq.BroadcastAddOrUpdateNetworkPeer(&models.Client{Host: *h, Node: *newNode}, false)
 		}
 	}
-	if servercfg.IsMessageQueueBackend() {
-		mq.HostUpdate(&models.HostUpdate{
-			Action: models.RequestAck,
-			Host:   *h,
-		})
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(0, "failed to publish peer update during registration -", err.Error())
-		}
+	mq.HostUpdate(&models.HostUpdate{
+		Action: models.RequestAck,
+		Host:   *h,
+	})
 
-	}
 }
 
 func handleHostRegErr(conn *websocket.Conn, err error) {

+ 7 - 11
controllers/dns.go

@@ -11,7 +11,6 @@ import (
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/mq"
-	"github.com/gravitl/netmaker/servercfg"
 )
 
 func dnsHandlers(r *mux.Router) {
@@ -176,16 +175,13 @@ func createDNS(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	logger.Log(1, "new DNS record added:", entry.Name)
-	if servercfg.IsMessageQueueBackend() {
-		go func() {
-			if err = mq.PublishPeerUpdate(); err != nil {
-				logger.Log(0, "failed to publish peer update after ACL update on", entry.Network)
-			}
-			if err := mq.PublishCustomDNS(&entry); err != nil {
-				logger.Log(0, "error publishing custom dns", err.Error())
-			}
-		}()
-	}
+
+	go func() {
+		if err := mq.PublishCustomDNS(&entry); err != nil {
+			logger.Log(0, "error publishing custom dns", err.Error())
+		}
+	}()
+
 	logger.Log(2, r.Header.Get("user"),
 		fmt.Sprintf("DNS entry is set: %+v", entry))
 	w.WriteHeader(http.StatusOK)

+ 7 - 10
controllers/hosts.go

@@ -186,9 +186,7 @@ func updateHost(w http.ResponseWriter, r *http.Request) {
 		logger.Log(0, r.Header.Get("user"), "failed to send host update: ", currHost.ID.String(), err.Error())
 	}
 	go func() {
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(0, "fail to publish peer update: ", err.Error())
-		}
+		mq.BroadcastHostUpdate(newHost, false)
 		if newHost.Name != currHost.Name {
 			networks := logic.GetHostNetworks(currHost.ID.String())
 			if err := mq.PublishHostDNSUpdate(currHost, newHost, networks); err != nil {
@@ -289,13 +287,12 @@ func addHostToNetwork(w http.ResponseWriter, r *http.Request) {
 		Host:   *currHost,
 		Node:   *newNode,
 	})
-	if servercfg.IsMessageQueueBackend() {
-		mq.HostUpdate(&models.HostUpdate{
-			Action: models.RequestAck,
-			Host:   *currHost,
-		})
-		go mq.BroadcastAddOrUpdateNetworkPeer(&models.Client{Host: *currHost, Node: *newNode}, false)
-	}
+
+	mq.HostUpdate(&models.HostUpdate{
+		Action: models.RequestAck,
+		Host:   *currHost,
+	})
+	go mq.BroadcastAddOrUpdateNetworkPeer(&models.Client{Host: *currHost, Node: *newNode}, false)
 
 	logger.Log(2, r.Header.Get("user"), fmt.Sprintf("added host %s to network %s", currHost.Name, network))
 	w.WriteHeader(http.StatusOK)

+ 1 - 5
controllers/network.go

@@ -15,7 +15,6 @@ import (
 	"github.com/gravitl/netmaker/logic/acls"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/mq"
-	"github.com/gravitl/netmaker/servercfg"
 )
 
 func networkHandlers(r *mux.Router) {
@@ -140,10 +139,7 @@ func updateNetworkACL(w http.ResponseWriter, r *http.Request) {
 	logger.Log(1, r.Header.Get("user"), "updated ACLs for network", netname)
 
 	// send peer updates
-	if servercfg.IsMessageQueueBackend() {
-		go mq.BroadcastAclUpdate(netname)
-
-	}
+	go mq.BroadcastAclUpdate(netname)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(newNetACL)
 }

+ 1 - 20
logic/peers.go

@@ -608,7 +608,7 @@ func GetPeerUpdate(host *models.Host) []wgtypes.PeerConfig {
 			}
 			//normal peer
 			if nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.Node.ID.String())) {
-				update.AllowedIPs = append(update.AllowedIPs, AddAllowedIPs(&peer)...)
+				update.AllowedIPs = append(update.AllowedIPs, GetAllowedIPs(&peer)...)
 				peerUpdate = append(peerUpdate, update)
 			} else {
 				update.Remove = true
@@ -648,25 +648,6 @@ func AddHostAllowedIPs(h *models.Host) []net.IPNet {
 
 }
 
-func AddAllowedIPs(peer *models.Client) []net.IPNet {
-	allowedIPs := []net.IPNet{}
-	if peer.Node.Address.IP != nil {
-		peer.Node.Address.Mask = net.CIDRMask(32, 32)
-		allowedIPs = append(allowedIPs, peer.Node.Address)
-	}
-	if peer.Node.Address6.IP != nil {
-		peer.Node.Address6.Mask = net.CIDRMask(128, 128)
-		allowedIPs = append(allowedIPs, peer.Node.Address6)
-	}
-	if peer.Node.IsEgressGateway {
-		allowedIPs = append(allowedIPs, getEgressIPs(peer)...)
-	}
-	if peer.Node.IsIngressGateway {
-		allowedIPs = append(allowedIPs, getIngressIPs(peer)...)
-	}
-	return allowedIPs
-}
-
 // getRelayAllowedIPs returns the list of allowedips for a peer that is a relay
 func getRelayAllowedIPs(peer *models.Client) []net.IPNet {
 	var relayIPs []net.IPNet

+ 2 - 2
logic/relay.go

@@ -235,7 +235,7 @@ func PeerUpdateForRelayedByRelay(relayed, relay *models.Client) wgtypes.PeerConf
 			continue
 		}
 		if nodeacls.AreNodesAllowed(nodeacls.NetworkID(relayed.Node.Network), nodeacls.NodeID(relayed.Node.ID.String()), nodeacls.NodeID(peer.Node.ID.String())) {
-			update.AllowedIPs = append(update.AllowedIPs, AddAllowedIPs(&peer)...)
+			update.AllowedIPs = append(update.AllowedIPs, GetAllowedIPs(&peer)...)
 		}
 	}
 	return update
@@ -263,7 +263,7 @@ func peerUpdateForRelay(relay *models.Client, peers []models.Client) []wgtypes.P
 			PersistentKeepaliveInterval: &peer.Node.PersistentKeepalive,
 		}
 		if nodeacls.AreNodesAllowed(nodeacls.NetworkID(relay.Node.Network), nodeacls.NodeID(relay.Node.ID.String()), nodeacls.NodeID(peer.Node.ID.String())) {
-			update.AllowedIPs = append(update.AllowedIPs, AddAllowedIPs(&peer)...)
+			update.AllowedIPs = append(update.AllowedIPs, GetAllowedIPs(&peer)...)
 			peerConfig = append(peerConfig, update)
 		}
 	}

+ 5 - 14
main.go

@@ -110,12 +110,10 @@ func initialize() { // Client Mode Prereq Check
 			logger.FatalLog(err.Error())
 		}
 	}
-
-	if servercfg.IsMessageQueueBackend() {
-		if err = mq.ServerStartNotify(); err != nil {
-			logger.Log(0, "error occurred when notifying nodes of startup", err.Error())
-		}
+	if err = mq.ServerStartNotify(); err != nil {
+		logger.Log(0, "error occurred when notifying nodes of startup", err.Error())
 	}
+
 }
 
 func startControllers(wg *sync.WaitGroup, ctx context.Context) {
@@ -138,15 +136,8 @@ func startControllers(wg *sync.WaitGroup, ctx context.Context) {
 		go controller.HandleRESTRequests(wg, ctx)
 	}
 	//Run MessageQueue
-	if servercfg.IsMessageQueueBackend() {
-		wg.Add(1)
-		go runMessageQueue(wg, ctx)
-	}
-
-	if !servercfg.IsRestBackend() && !servercfg.IsMessageQueueBackend() {
-		logger.Log(0, "No Server Mode selected, so nothing is being served! Set Rest mode (REST_BACKEND) or MessageQueue (MESSAGEQUEUE_BACKEND) to 'true'.")
-	}
-
+	wg.Add(1)
+	go runMessageQueue(wg, ctx)
 	// starts the stun server
 	wg.Add(1)
 	go stunserver.Start(wg, ctx)

+ 1 - 41
mq/handlers.go

@@ -225,7 +225,7 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 			return
 		}
 
-		shouldUpdate := updateNodeMetrics(&currentNode, &newMetrics)
+		_ = updateNodeMetrics(&currentNode, &newMetrics)
 
 		if err = logic.UpdateMetrics(id, &newMetrics); err != nil {
 			slog.Error("failed to update node metrics", "id", id, "error", err)
@@ -243,50 +243,10 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 				slog.Error("failed to failover for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
 			}
 		}
-
-		if shouldUpdate {
-			slog.Info("updating peers after node detected connectivity issues", "id", currentNode.ID, "network", currentNode.Network)
-			host, err := logic.GetHost(currentNode.HostID.String())
-			if err == nil {
-				if err = PublishSingleHostPeerUpdate(host); err != nil {
-					slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
-				}
-			}
-		}
 		slog.Info("updated node metrics", "id", id)
 	}
 }
 
-// ClientPeerUpdate  message handler -- handles updating peers after signal from client nodes
-func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
-	id, err := getID(msg.Topic())
-	if err != nil {
-		slog.Error("error getting node.ID sent on ", "topic", msg.Topic(), "error", err)
-		return
-	}
-	currentNode, err := logic.GetNodeByID(id)
-	if err != nil {
-		slog.Error("error getting node", "id", id, "error", err)
-		return
-	}
-	decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
-	if decryptErr != nil {
-		slog.Error("failed to decrypt message for node", "id", id, "error", decryptErr)
-		return
-	}
-	switch decrypted[0] {
-	case ncutils.ACK:
-		// do we still need this
-	case ncutils.DONE:
-		if err = PublishPeerUpdate(); err != nil {
-			slog.Error("error publishing peer update for node", "id", currentNode.ID, "error", err)
-			return
-		}
-	}
-
-	slog.Info("sent peer updates after signal received from", "id", id)
-}
-
 func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) bool {
 	if newMetrics.FailoverPeers == nil {
 		newMetrics.FailoverPeers = make(map[string]string)

+ 1 - 4
mq/mq.go

@@ -71,10 +71,7 @@ func SetupMQTT() {
 			client.Disconnect(240)
 			logger.Log(0, "host update subscription failed")
 		}
-		if token := client.Subscribe(fmt.Sprintf("signal/%s/#", serverName), 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
-			client.Disconnect(240)
-			logger.Log(0, "node client subscription failed")
-		}
+
 		if token := client.Subscribe(fmt.Sprintf("metrics/%s/#", serverName), 0, mqtt.MessageHandler(UpdateMetrics)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
 			client.Disconnect(240)
 			logger.Log(0, "node metrics subscription failed")

+ 0 - 26
mq/publishers.go

@@ -15,26 +15,6 @@ import (
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
-// PublishPeerUpdate --- determines and publishes a peer update to all the hosts
-func PublishPeerUpdate() error {
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
-
-	hosts, err := logic.GetAllHosts()
-	if err != nil {
-		logger.Log(1, "err getting all hosts", err.Error())
-		return err
-	}
-	for _, host := range hosts {
-		host := host
-		if err = PublishSingleHostPeerUpdate(&host); err != nil {
-			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-		}
-	}
-	return err
-}
-
 // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
 func PublishSingleHostPeerUpdate(host *models.Host) error {
 
@@ -376,9 +356,6 @@ func NodeUpdate(node *models.Node) error {
 	if err != nil {
 		return nil
 	}
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
 	logger.Log(3, "publishing node update to "+node.ID.String())
 
 	//if len(node.NetworkSettings.AccessKeys) > 0 {
@@ -400,9 +377,6 @@ func NodeUpdate(node *models.Node) error {
 
 // HostUpdate -- publishes a host update to clients
 func HostUpdate(hostUpdate *models.HostUpdate) error {
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
 	logger.Log(3, "publishing host update to "+hostUpdate.Host.ID.String())
 
 	data, err := json.Marshal(hostUpdate)

+ 3 - 3
mq/relay.go

@@ -43,7 +43,7 @@ func PubPeerUpdate(client, relay *models.Client, peers []models.Client) {
 			PersistentKeepaliveInterval: &peer.Node.PersistentKeepalive,
 		}
 		if nodeacls.AreNodesAllowed(nodeacls.NetworkID(client.Node.Network), nodeacls.NodeID(client.Node.ID.String()), nodeacls.NodeID(peer.Node.ID.String())) {
-			update.AllowedIPs = append(update.AllowedIPs, logic.AddAllowedIPs(&peer)...)
+			update.AllowedIPs = append(update.AllowedIPs, logic.GetAllowedIPs(&peer)...)
 		} else {
 			update.Remove = true
 		}
@@ -201,7 +201,7 @@ func pubRelayedUpdate(client, relay *models.Client, peers []models.Client) {
 			continue
 		}
 		if nodeacls.AreNodesAllowed(nodeacls.NetworkID(client.Node.Network), nodeacls.NodeID(client.Node.ID.String()), nodeacls.NodeID(peer.Node.ID.String())) {
-			update.AllowedIPs = append(update.AllowedIPs, logic.AddAllowedIPs(&peer)...)
+			update.AllowedIPs = append(update.AllowedIPs, logic.GetAllowedIPs(&peer)...)
 		}
 	}
 	p.Peers = append(p.Peers, update)
@@ -236,7 +236,7 @@ func pubRelayUpdate(client *models.Client, peers []models.Client) {
 			},
 			PersistentKeepaliveInterval: &peer.Node.PersistentKeepalive,
 		}
-		update.AllowedIPs = append(update.AllowedIPs, logic.AddAllowedIPs(&peer)...)
+		update.AllowedIPs = append(update.AllowedIPs, logic.GetAllowedIPs(&peer)...)
 		p.Peers = append(p.Peers, update)
 	}
 	data, err := json.Marshal(p)

+ 0 - 15
servercfg/serverconf.go

@@ -364,21 +364,6 @@ func IsMetricsExporter() bool {
 	return export
 }
 
-// IsMessageQueueBackend - checks if message queue is on or off
-func IsMessageQueueBackend() bool {
-	ismessagequeue := true
-	if os.Getenv("MESSAGEQUEUE_BACKEND") != "" {
-		if os.Getenv("MESSAGEQUEUE_BACKEND") == "off" {
-			ismessagequeue = false
-		}
-	} else if config.Config.Server.MessageQueueBackend != "" {
-		if config.Config.Server.MessageQueueBackend == "off" {
-			ismessagequeue = false
-		}
-	}
-	return ismessagequeue
-}
-
 // Telemetry - checks if telemetry data should be sent
 func Telemetry() string {
 	telemetry := "on"