소스 검색

add/delete host from network

Matthew R Kasun 2 년 전
부모
커밋
fe7b352079
10개의 변경된 파일52개의 추가작업 그리고 100개의 파일을 삭제
  1. 1 4
      auth/host_session.go
  2. 4 16
      controllers/ext_client.go
  3. 10 9
      controllers/hosts.go
  4. 5 20
      controllers/node.go
  5. 2 8
      ee/ee_controllers/relay.go
  6. 7 0
      logic/peers.go
  7. 6 4
      models/mqtt.go
  8. 1 4
      mq/handlers.go
  9. 9 1
      mq/publishers.go
  10. 7 34
      mq/relay.go

+ 1 - 4
auth/host_session.go

@@ -243,10 +243,7 @@ func CheckNetRegAndHostUpdate(networks []string, h *models.Host) {
 				slog.Warn("error getting network clients: ", "error", err)
 			}
 			for _, client := range peers {
-				update := models.PeerAction{
-					Peers: logic.GetPeerUpdate(&client.Host),
-				}
-				mq.PubPeerUpdateToHost(&client.Host, update)
+				mq.PubPeerUpdateToHost(&client.Host)
 			}
 		}
 	}

+ 4 - 16
controllers/ext_client.go

@@ -403,10 +403,7 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
 			slog.Warn("error getting network clients: ", "error", err)
 		}
 		for _, client := range peers {
-			update := models.PeerAction{
-				Peers: logic.GetPeerUpdate(&client.Host),
-			}
-			mq.PubPeerUpdateToHost(&client.Host, update)
+			mq.PubPeerUpdateToHost(&client.Host)
 		}
 		f, err := logic.GetFwUpdate(host)
 		if err == nil {
@@ -517,10 +514,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 						slog.Warn("error getting network clients: ", "error", err)
 					}
 					for _, client := range peers {
-						update := models.PeerAction{
-							Peers: logic.GetPeerUpdate(&client.Host),
-						}
-						mq.PubPeerUpdateToHost(&client.Host, update)
+						mq.PubPeerUpdateToHost(&client.Host)
 					}
 				}
 				if replaceOldClient || changedEnabled {
@@ -530,10 +524,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 						slog.Warn("error getting network clients: ", "error", err)
 					}
 					for _, client := range peers {
-						update := models.PeerAction{
-							Peers: logic.GetPeerUpdate(&client.Host),
-						}
-						mq.PubPeerUpdateToHost(&client.Host, update)
+						mq.PubPeerUpdateToHost(&client.Host)
 					}
 				}
 				f, err := logic.GetFwUpdate(ingressHost)
@@ -621,10 +612,7 @@ func deleteExtClient(w http.ResponseWriter, r *http.Request) {
 				slog.Warn("error getting network clients: ", "error", err)
 			}
 			for _, client := range peers {
-				update := models.PeerAction{
-					Peers: logic.GetPeerUpdate(&client.Host),
-				}
-				mq.PubPeerUpdateToHost(&client.Host, update)
+				mq.PubPeerUpdateToHost(&client.Host)
 			}
 			f, err := logic.GetFwUpdate(ingressHost)
 			if err == nil {

+ 10 - 9
controllers/hosts.go

@@ -291,15 +291,19 @@ func addHostToNetwork(w http.ResponseWriter, r *http.Request) {
 		Host:   *currHost,
 		Node:   *newNode,
 	})
-	if servercfg.IsMessageQueueBackend() {
+	go func() {
 		mq.HostUpdate(&models.HostUpdate{
 			Action: models.RequestAck,
 			Host:   *currHost,
 		})
-		go func() {
-
-		}()
-	}
+		peers, err := logic.GetNetworkClients(newNode.Network)
+		if err != nil {
+			slog.Warn("error getting network clients: ", "error", err)
+		}
+		for _, client := range peers {
+			mq.PubPeerUpdateToHost(&client.Host)
+		}
+	}()
 
 	logger.Log(2, r.Header.Get("user"), fmt.Sprintf("added host %s to network %s", currHost.Name, network))
 	w.WriteHeader(http.StatusOK)
@@ -355,10 +359,7 @@ func deleteHostFromNetwork(w http.ResponseWriter, r *http.Request) {
 			slog.Warn("error getting network clients: ", "error", err)
 		}
 		for _, client := range peers {
-			update := models.PeerAction{
-				Peers: logic.GetPeerUpdate(&client.Host),
-			}
-			mq.PubPeerUpdateToHost(&client.Host, update)
+			mq.PubPeerUpdateToHost(&client.Host)
 		}
 		if err := mq.PublishDNSDelete(node, currHost); err != nil {
 			logger.Log(1, "error publishing dns update", err.Error())

+ 5 - 20
controllers/node.go

@@ -470,10 +470,7 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) {
 			slog.Warn("error getting network clients: ", "error", err)
 		}
 		for _, client := range peers {
-			update := models.PeerAction{
-				Peers: logic.GetPeerUpdate(&client.Host),
-			}
-			mq.PubPeerUpdateToHost(&client.Host, update)
+			mq.PubPeerUpdateToHost(&client.Host)
 		}
 		f, err := logic.GetFwUpdate(host)
 		if err != nil {
@@ -527,10 +524,7 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) {
 			slog.Warn("error getting network clients: ", "error", err)
 		}
 		for _, client := range peers {
-			update := models.PeerAction{
-				Peers: logic.GetPeerUpdate(&client.Host),
-			}
-			mq.PubPeerUpdateToHost(&client.Host, update)
+			mq.PubPeerUpdateToHost(&client.Host)
 		}
 		f, err := logic.GetFwUpdate(host)
 		if err != nil {
@@ -631,10 +625,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 				slog.Warn("error getting network clients: ", "error", err)
 			}
 			for _, client := range peers {
-				update := models.PeerAction{
-					Peers: logic.GetPeerUpdate(&client.Host),
-				}
-				mq.PubPeerUpdateToHost(&client.Host, update)
+				mq.PubPeerUpdateToHost(&client.Host)
 			}
 			f, err := logic.GetFwUpdate(host)
 			if err == nil {
@@ -738,10 +729,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 			slog.Warn("error getting network clients: ", "error", err)
 		}
 		for _, client := range peers {
-			update := models.PeerAction{
-				Peers: logic.GetPeerUpdate(&client.Host),
-			}
-			mq.PubPeerUpdateToHost(&client.Host, update)
+			mq.PubPeerUpdateToHost(&client.Host)
 		}
 		if err := mq.PublishReplaceDNS(&currentNode, newNode, host); err != nil {
 			logger.Log(1, "failed to publish dns update", err.Error())
@@ -807,10 +795,7 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 			slog.Warn("error getting network clients: ", "error", err)
 		}
 		for _, client := range peers {
-			update := models.PeerAction{
-				Peers: logic.GetPeerUpdate(&client.Host),
-			}
-			mq.PubPeerUpdateToHost(&client.Host, update)
+			mq.PubPeerUpdateToHost(&client.Host)
 		}
 		if err != nil {
 			logger.Log(1, "error publishing peer update ", err.Error())

+ 2 - 8
ee/ee_controllers/relay.go

@@ -75,10 +75,7 @@ func createRelay(w http.ResponseWriter, r *http.Request) {
 	//clients := peers
 	go func() {
 		for _, client := range peers {
-			update := models.PeerAction{
-				Peers: logic.GetPeerUpdate(&client.Host),
-			}
-			mq.PubPeerUpdateToHost(&client.Host, update)
+			mq.PubPeerUpdateToHost(&client.Host)
 		}
 	}()
 	slog.Info("created relay on node", "user", r.Header.Get("user"), "node", relayRequest.NodeID, "network", relayRequest.NetID)
@@ -116,10 +113,7 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 			slog.Warn("error getting network clients: ", "error", err)
 		}
 		for _, client := range peers {
-			update := models.PeerAction{
-				Peers: logic.GetPeerUpdate(&client.Host),
-			}
-			mq.PubPeerUpdateToHost(&client.Host, update)
+			mq.PubPeerUpdateToHost(&client.Host)
 		}
 	}()
 	logger.Log(1, r.Header.Get("user"), "deleted relay on node", node.ID.String(), "on network", node.Network)

+ 7 - 0
logic/peers.go

@@ -13,6 +13,7 @@ import (
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/servercfg"
 	"golang.org/x/exp/slices"
+	"golang.org/x/exp/slog"
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
@@ -757,11 +758,13 @@ func GetPeerUpdate(host *models.Host) []wgtypes.PeerConfig {
 	for i, nodeStr := range host.Nodes {
 		node, err := GetNodeByID(nodeStr)
 		if err != nil {
+			slog.Warn("error getting node by id", nodeStr, err)
 			continue
 		}
 		client := models.Client{Host: *host, Node: node}
 		peers, err := GetNetworkClients(node.Network)
 		if err != nil {
+			slog.Warn("error getting network clients", node.Network, err)
 			continue
 		}
 		if node.IsRelayed {
@@ -814,6 +817,10 @@ func GetPeerUpdate(host *models.Host) []wgtypes.PeerConfig {
 			peerUpdate = append(peerUpdate, update)
 		}
 	}
+	fmt.Println("peer update for relayed node")
+	for _, peer := range peerUpdate {
+		fmt.Println(peer.PublicKey, peer.AllowedIPs, peer.Endpoint)
+	}
 	return peerUpdate
 }
 

+ 6 - 4
models/mqtt.go

@@ -14,10 +14,12 @@ type HostPeerUpdate struct {
 	ServerAddrs     []ServerAddr         `json:"serveraddrs" bson:"serveraddrs" yaml:"serveraddrs"`
 	NodePeers       []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"`
 	Peers           []wgtypes.PeerConfig
-	HostPeerIDs     HostPeerMap         `json:"hostpeerids" bson:"hostpeerids" yaml:"hostpeerids"`
-	ProxyUpdate     ProxyManagerPayload `json:"proxy_update" bson:"proxy_update" yaml:"proxy_update"`
-	PeerIDs         PeerMap             `json:"peerids" bson:"peerids" yaml:"peerids"`
-	HostNetworkInfo HostInfoMap         `json:"host_network_info,omitempty" bson:"host_network_info,omitempty" yaml:"host_network_info,omitempty"`
+	HostPeerIDs     HostPeerMap           `json:"hostpeerids" bson:"hostpeerids" yaml:"hostpeerids"`
+	ProxyUpdate     ProxyManagerPayload   `json:"proxy_update" bson:"proxy_update" yaml:"proxy_update"`
+	EgressInfo      map[string]EgressInfo `json:"egress_info" bson:"egress_info" yaml:"egress_info"` // map key is node ID
+	IngressInfo     IngressInfo           `json:"ingress_info" bson:"ext_peers" yaml:"ext_peers"`
+	PeerIDs         PeerMap               `json:"peerids" bson:"peerids" yaml:"peerids"`
+	HostNetworkInfo HostInfoMap           `json:"host_network_info,omitempty" bson:"host_network_info,omitempty" yaml:"host_network_info,omitempty"`
 }
 
 // IngressInfo - struct for ingress info

+ 1 - 4
mq/handlers.go

@@ -69,10 +69,7 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 			slog.Warn("error getting network clients: ", "error", err)
 		}
 		for _, client := range peers {
-			update := models.PeerAction{
-				Peers: logic.GetPeerUpdate(&client.Host),
-			}
-			PubPeerUpdateToHost(&client.Host, update)
+			PubPeerUpdateToHost(&client.Host)
 		}
 		if nodes, err := logic.GetNetworkNodes(newNode.Network); err == nil {
 			FlushNetworkPeersToHost(h, &newNode, nodes)

+ 9 - 1
mq/publishers.go

@@ -566,7 +566,15 @@ func sendPeers() {
 	}
 }
 
-func PubPeerUpdateToHost(host *models.Host, update models.PeerAction) {
+// PubPeerUpdateToHost - publishes a full peer update to a host
+func PubPeerUpdateToHost(host *models.Host) {
+	update := models.PeerAction{
+		Peers: logic.GetPeerUpdate(host),
+	}
+	if len(update.Peers) == 0 {
+		slog.Info("no peer update for host", "host", host.Name)
+		return
+	}
 	data, err := json.Marshal(update)
 	if err != nil {
 		slog.Error("error mashalling peer update for", "host", host.Name, "err", err)

+ 7 - 34
mq/relay.go

@@ -9,52 +9,25 @@ import (
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/servercfg"
+	"golang.org/x/exp/slog"
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
 // PubPeerUpdate publishes a peer update to the client
 // relay is set to a newly created relay node or nil for other peer updates
-func PubPeerUpdate(client, relay *models.Client, peers []models.Client) {
+func PubPeerUpdate(client *models.Client) {
+	peers := logic.GetPeerUpdate(&client.Host)
 	p := models.PeerAction{
 		Action: models.UpdatePeer,
+		Peers:  peers,
 	}
-	if client.Node.IsRelay {
-		pubRelayUpdate(client, peers)
+	if len(p.Peers) == 0 {
+		slog.Info("no peer update for host", "host", client.Host.Name)
 		return
 	}
-	if relay != nil {
-		if client.Node.RelayedBy == relay.Node.ID.String() {
-			pubRelayedUpdate(client, relay, peers)
-			return
-		}
-	}
-	for _, peer := range peers {
-		if client.Host.ID == peer.Host.ID {
-			continue
-		}
-		update := wgtypes.PeerConfig{
-			PublicKey:         peer.Host.PublicKey,
-			ReplaceAllowedIPs: true,
-			Endpoint: &net.UDPAddr{
-				IP:   peer.Host.EndpointIP,
-				Port: peer.Host.ListenPort,
-			},
-			PersistentKeepaliveInterval: &peer.Node.PersistentKeepalive,
-		}
-		update.AllowedIPs = append(update.AllowedIPs, logic.AddAllowedIPs(&peer)...)
-		if relay != nil {
-			if peer.Node.IsRelayed && peer.Node.RelayedBy == relay.Node.ID.String() {
-				update.Remove = true
-			}
-		}
-		if peer.Node.IsRelay {
-			update.AllowedIPs = append(update.AllowedIPs, getRelayAllowedIPs(peer)...)
-		}
-		p.Peers = append(p.Peers, update)
-	}
 	data, err := json.Marshal(p)
 	if err != nil {
-		logger.Log(0, "marshal peer update", err.Error())
+		slog.Error("marshal peer update", "error", err)
 		return
 	}
 	publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)