Kaynağa Gözat

publish peer update for new relay

Matthew R Kasun 2 yıl önce
ebeveyn
işleme
f3a02c8073
4 değiştirilmiş dosya ile 155 ekleme ve 14 silme
  1. 40 14
      controllers/relay.go
  2. 19 0
      logic/relay.go
  3. 7 0
      models/host.go
  4. 89 0
      mq/publishers.go

+ 40 - 14
controllers/relay.go

@@ -42,20 +42,35 @@ func createRelay(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
-
-	logger.Log(1, r.Header.Get("user"), "created relay on node", relay.NodeID, "on network", relay.NetID)
-	for _, relayedNode := range updatenodes {
-
-		err = mq.NodeUpdate(&relayedNode)
-		if err != nil {
-			logger.Log(1, "error sending update to relayed node ", relayedNode.ID.String(), "on network", relay.NetID, ": ", err.Error())
+	go func() {
+		// update relay node
+		host := logic.GetHostByNodeID(node.ID.String())
+		if err := mq.NodeUpdate(&node); err != nil {
+			logger.Log(1, "relay node update", host.Name, "on network", relay.NetID, ": ", err.Error())
 		}
-	}
+		// update relayed nodes
+		for _, relayedNode := range updatenodes {
+			err = mq.NodeUpdate(&relayedNode)
+			if err != nil {
+				logger.Log(1, "relayed node update ", relayedNode.ID.String(), "on network", relay.NetID, ": ", err.Error())
+			}
+		}
+		// peer updates
+		relay := models.Client{
+			Host: *host,
+			Node: node,
+		}
+		clients := logic.GetNetworkClients(relay.Node.Network)
+		if err := mq.PublishRelayNew(&relay, &clients); err != nil {
+			logger.Log(1, "peer update to relayed node ", host.Name, "on network", relay.Node.Network, ": ", err.Error())
+		}
+	}()
 
+	logger.Log(1, r.Header.Get("user"), "created relay on node", relay.NodeID, "on network", relay.NetID)
 	apiNode := node.ConvertToAPINode()
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
-	runUpdates(&node, true)
+	//runUpdates(&node, true)
 }
 
 // swagger:route DELETE /api/nodes/{network}/{nodeid}/deleterelay nodes deleteRelay
@@ -81,12 +96,23 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	logger.Log(1, r.Header.Get("user"), "deleted relay server", nodeid, "on network", netid)
-	for _, relayedNode := range updatenodes {
-		err = mq.NodeUpdate(&relayedNode)
-		if err != nil {
-			logger.Log(1, "error sending update to relayed node ", relayedNode.ID.String(), "on network", netid, ": ", err.Error())
+	go func() {
+		host := logic.GetHostByNodeID(node.ID.String())
+		if err := mq.PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, host, nil, []models.ExtClient{}); err != nil {
+			logger.Log(1, "peer update to relayed node ", host.Name, "on network", netid, ": ", err.Error())
 		}
-	}
+		for _, relayedNode := range updatenodes {
+			host := logic.GetHostByNodeID(relayedNode.ID.String())
+
+			err = mq.NodeUpdate(&relayedNode)
+			if err != nil {
+				logger.Log(1, "error sending update to relayed node ", relayedNode.ID.String(), "on network", netid, ": ", err.Error())
+			}
+			if err := mq.PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, host, nil, []models.ExtClient{}); err != nil {
+				logger.Log(1, "peer update to relayed node ", host.Name, "on network", netid, ": ", err.Error())
+			}
+		}
+	}()
 	apiNode := node.ConvertToAPINode()
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)

+ 19 - 0
logic/relay.go

@@ -66,6 +66,8 @@ func SetRelayedNodes(setRelayed bool, relay string, relayed []string) []models.N
 		node.IsRelayed = setRelayed
 		if node.IsRelayed {
 			node.RelayedBy = relay
+		} else {
+			node.RelayedBy = ""
 		}
 		node.SetLastModified()
 		data, err := json.Marshal(&node)
@@ -119,6 +121,23 @@ func DeleteRelay(network, nodeid string) ([]models.Node, models.Node, error) {
 	return returnnodes, node, nil
 }
 
+// GetNetworkClients - gets all clients in a network
+func GetNetworkClients(network string) []models.Client {
+	var clients []models.Client
+	nodes, err := GetNetworkNodes(network)
+	if err != nil {
+		return clients
+	}
+	for _, node := range nodes {
+		host := GetHostByNodeID(node.ID.String())
+		client := models.Client{
+			Host: *host,
+			Node: node,
+		}
+		clients = append(clients, client)
+	}
+	return clients
+}
 func getRelayedAddresses(id string) []net.IPNet {
 	addrs := []net.IPNet{}
 	node, err := GetNodeByID(id)

+ 7 - 0
models/host.go

@@ -75,6 +75,13 @@ type Host struct {
 	TurnEndpoint     *netip.AddrPort  `json:"turn_endpoint,omitempty" yaml:"turn_endpoint,omitempty"`
 }
 
+// Client - represents a client on the network
+type Client struct {
+	Host Host   `json:"host" yaml:"host"`
+	Node Node   `json:"node" yaml:"node"`
+	Kind string `json:"kind" yaml:"kind"`
+}
+
 // FormatBool converts a boolean to a [yes|no] string
 func FormatBool(b bool) string {
 	s := "no"

+ 89 - 0
mq/publishers.go

@@ -5,12 +5,14 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"net"
 	"time"
 
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/servercfg"
+	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
@@ -346,6 +348,30 @@ func PublishHostDNSUpdate(old, new *models.Host, networks []string) error {
 	return nil
 }
 
+// PublishRelayUpdate publishes peer update on relay changes
+func PublishRelayNew(relay *models.Client, clients *[]models.Client) error {
+	relayed := []models.Client{}
+	for _, client := range *clients {
+		var update []wgtypes.PeerConfig
+		if client.Host.ID == relay.Host.ID {
+			client.Kind = "relay"
+		}
+		if client.Node.RelayedBy == relay.Node.ID.String() {
+			client.Kind = "relayed"
+			update = setPeersForRelayedNode(*relay, client, clients)
+		} else {
+			client.Kind = "normal"
+			update = setPeersForUnrelayedNode(relayed, *relay, client, clients)
+		}
+		data, err := json.Marshal(update)
+		if err != nil {
+			return err
+		}
+		publish(&client.Host, fmt.Sprintf("relay/new/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
+	}
+	return nil
+}
+
 func pushMetricsToExporter(metrics models.Metrics) error {
 	logger.Log(2, "----> Pushing metrics to exporter")
 	data, err := json.Marshal(metrics)
@@ -461,3 +487,66 @@ func sendPeers() {
 		}
 	}
 }
+
+func setPeersForRelayedNode(relay, relayed models.Client, clients *[]models.Client) []wgtypes.PeerConfig {
+	perrUpdate := []wgtypes.PeerConfig{}
+	relayUpdate := wgtypes.PeerConfig{
+		PublicKey:         relay.Host.PublicKey,
+		ReplaceAllowedIPs: true,
+	}
+	if relayed.Node.Address.IP != nil {
+		relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, relayed.Node.Address)
+	}
+	if relayed.Node.Address6.IP != nil {
+		relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, relayed.Node.Address6)
+	}
+	for _, c := range *clients {
+		update := wgtypes.PeerConfig{
+			PublicKey: c.Host.PublicKey,
+			Remove:    true,
+		}
+		perrUpdate = append(perrUpdate, update)
+		if c.Node.Address.IP != nil {
+			relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, c.Node.Address)
+		}
+		if c.Node.Address6.IP != nil {
+			relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, c.Node.Address6)
+		}
+	}
+	perrUpdate = append(perrUpdate, relayUpdate)
+	return perrUpdate
+}
+
+func setPeersForUnrelayedNode(relayedNodes []models.Client, relay, client models.Client, clients *[]models.Client) []wgtypes.PeerConfig {
+	perrUpdate := []wgtypes.PeerConfig{}
+	relayUpdate := wgtypes.PeerConfig{
+		PublicKey:         relay.Host.PublicKey,
+		ReplaceAllowedIPs: true,
+		AllowedIPs:        []net.IPNet{},
+	}
+	for _, relayed := range relayedNodes {
+		//remove relayed nodes from peer list
+		relayedUpdate := wgtypes.PeerConfig{
+			PublicKey: relayed.Host.PublicKey,
+			Remove:    true,
+		}
+		perrUpdate = append(perrUpdate, relayedUpdate)
+		// add relayed nodes to relay allowed ips
+		if relayed.Node.Address.IP != nil {
+			relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, relayed.Node.Address)
+		}
+		if relayed.Node.Address6.IP != nil {
+			relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, relayed.Node.Address6)
+		}
+	}
+	// add client addresses to relay allowed ips
+	if client.Node.Address.IP != nil {
+		relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, client.Node.Address)
+	}
+	if client.Node.Address6.IP != nil {
+		relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, client.Node.Address6)
+	}
+	perrUpdate = append(perrUpdate, relayUpdate)
+
+	return perrUpdate
+}