Преглед изворни кода

preliminary working peer updates for relays

Matthew R Kasun пре 2 година
родитељ
комит
f8077a4706
11 измењених фајлова са 567 додато и 181 уклоњено
  1. 1 1
      auth/host_session.go
  2. 4 4
      controllers/node.go
  3. 44 48
      controllers/relay.go
  4. 3 1
      go.mod
  5. 14 0
      logic/clients.go
  6. 17 0
      logic/nodes.go
  7. 34 42
      logic/relay.go
  8. 2 3
      models/host.go
  9. 2 2
      models/mqtt.go
  10. 437 74
      mq/publishers.go
  11. 9 6
      mq/util.go

+ 1 - 1
auth/host_session.go

@@ -237,7 +237,7 @@ func CheckNetRegAndHostUpdate(networks []string, h *models.Host) {
 				Host:   *h,
 				Node:   *newNode,
 			})
-			mq.PublishPeerAction(h)
+			//mq.PublishPeerAction(h)
 		}
 	}
 	if servercfg.IsMessageQueueBackend() {

+ 4 - 4
controllers/node.go

@@ -670,10 +670,10 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	if relayupdate {
-		updatenodes := logic.UpdateRelayed(currentNode.ID.String(), currentNode.RelayedNodes, newNode.RelayedNodes)
-		if len(updatenodes) > 0 {
-			for _, relayedNode := range updatenodes {
-				runUpdates(&relayedNode, false)
+		updatedClients := logic.UpdateRelayed(currentNode.ID.String(), currentNode.RelayedNodes, newNode.RelayedNodes)
+		if len(updatedClients) > 0 {
+			for _, relayedClient := range updatedClients {
+				runUpdates(&relayedClient.Node, false)
 			}
 		}
 	}

+ 44 - 48
controllers/relay.go

@@ -24,50 +24,46 @@ import (
 //			Responses:
 //				200: nodeResponse
 func createRelay(w http.ResponseWriter, r *http.Request) {
-	var relay models.RelayRequest
+	var relayRequest models.RelayRequest
 	var params = mux.Vars(r)
 	w.Header().Set("Content-Type", "application/json")
-	err := json.NewDecoder(r.Body).Decode(&relay)
+	err := json.NewDecoder(r.Body).Decode(&relayRequest)
 	if err != nil {
 		logger.Log(0, r.Header.Get("user"), "error decoding request body: ", err.Error())
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
 		return
 	}
-	relay.NetID = params["network"]
-	relay.NodeID = params["nodeid"]
-	updatenodes, node, err := logic.CreateRelay(relay)
+
+	relayRequest.NetID = params["network"]
+	relayRequest.NodeID = params["nodeid"]
+	_, relayNode, err := logic.CreateRelay(relayRequest)
 	if err != nil {
 		logger.Log(0, r.Header.Get("user"),
-			fmt.Sprintf("failed to create relay on node [%s] on network [%s]: %v", relay.NodeID, relay.NetID, err))
+			fmt.Sprintf("failed to create relay on node [%s] on network [%s]: %v", relayRequest.NodeID, relayRequest.NetID, err))
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
-	go func() {
-		// update relay node
-		host := logic.GetHostByNodeID(node.ID.String())
-		if err := mq.NodeUpdate(&node); err != nil {
-			logger.Log(1, "relay node update", host.Name, "on network", relay.NetID, ": ", err.Error())
-		}
-		// update relayed nodes
-		for _, relayedNode := range updatenodes {
-			err = mq.NodeUpdate(&relayedNode)
-			if err != nil {
-				logger.Log(1, "relayed node update ", relayedNode.ID.String(), "on network", relay.NetID, ": ", err.Error())
-			}
-		}
-		// peer updates
-		relay := models.Client{
-			Host: *host,
-			Node: node,
-		}
-		clients := logic.GetNetworkClients(relay.Node.Network)
-		if err := mq.PublishRelayPeerUpdate(&relay, &clients); err != nil {
-			logger.Log(1, "peer update to relayed node ", host.Name, "on network", relay.Node.Network, ": ", err.Error())
-		}
-	}()
+	relay := models.Client{
+		Host: *logic.GetHostByNodeID(params["nodeid"]),
+		Node: relayNode,
+	}
+	peers, err := logic.GetNetworkClients(relay.Node.Network)
+	if err != nil {
+		logger.Log(0, "error getting network nodes: ", err.Error())
+		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
+		return
+	}
+	//mq.PubPeersforRelay(relay, peers)
+	//for _, relayed := range relayedClients {
+	//mq.PubPeersForRelayedNode(relayed, relay, peers)
+	//}
+	clients := peers
+	for _, client := range clients {
+		mq.PubPeerUpdate(&client, &relay, &peers)
+	}
 
-	logger.Log(1, r.Header.Get("user"), "created relay on node", relay.NodeID, "on network", relay.NetID)
-	apiNode := node.ConvertToAPINode()
+	logger.Log(1, r.Header.Get("user"), "created relay on node", relayRequest.NodeID, "on network", relayRequest.NetID)
+	apiNode := relayNode.ConvertToAPINode()
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
 	//runUpdates(&node, true)
@@ -89,7 +85,7 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 	var params = mux.Vars(r)
 	nodeid := params["nodeid"]
 	netid := params["network"]
-	updatenodes, node, err := logic.DeleteRelay(netid, nodeid)
+	updateClients, node, err := logic.DeleteRelay(netid, nodeid)
 	if err != nil {
 		logger.Log(0, r.Header.Get("user"), "error decoding request body: ", err.Error())
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
@@ -97,32 +93,32 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 	}
 	logger.Log(1, r.Header.Get("user"), "deleted relay server", nodeid, "on network", netid)
 	go func() {
-		//update relay node
-		host := logic.GetHostByNodeID(node.ID.String())
+		//update relayHost node
+		relayHost := logic.GetHostByNodeID(node.ID.String())
 		if err := mq.NodeUpdate(&node); err != nil {
-			logger.Log(1, "relay node update", host.Name, "on network", node.Network, ": ", err.Error())
+			logger.Log(1, "relay node update", relayHost.Name, "on network", node.Network, ": ", err.Error())
 		}
-		//update relayed nodes
-		for _, relayedNode := range updatenodes {
-			err = mq.NodeUpdate(&relayedNode)
+		for _, relayedClient := range updateClients {
+			err = mq.NodeUpdate(&relayedClient.Node)
 			if err != nil {
-				logger.Log(1, "relayed node update ", relayedNode.ID.String(), "on network", relayedNode.Network, ": ", err.Error())
+				logger.Log(1, "relayed node update ", relayedClient.Node.ID.String(), "on network", relayedClient.Node.Network, ": ", err.Error())
+
 			}
 		}
-		// peer updates
-		relay := models.Client{
-			Host: *host,
-			Node: node,
+		peers, err := logic.GetNetworkClients(node.Network)
+		if err != nil {
+			logger.Log(0, "error getting network nodes: ", err.Error())
+			logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
+			return
 		}
-		clients := logic.GetNetworkClients(relay.Node.Network)
-		if err := mq.PublishRelayPeerUpdate(&relay, &clients); err != nil {
-			logger.Log(1, "peer update to relayed node ", host.Name, "on network", relay.Node.Network, ": ", err.Error())
+		clients := peers
+		for _, client := range clients {
+			mq.PubPeerUpdate(&client, nil, &peers)
 		}
 	}()
-
 	logger.Log(1, r.Header.Get("user"), "deleted relay on node", node.ID.String(), "on network", node.Network)
 	apiNode := node.ConvertToAPINode()
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
-	//runUpdates(&node, true)
+	runUpdates(&node, true)
 }

+ 3 - 1
go.mod

@@ -43,6 +43,7 @@ require (
 require (
 	github.com/devilcove/httpclient v0.6.0
 	github.com/guumaster/tablewriter v0.0.10
+	github.com/kr/pretty v0.3.1
 	github.com/matryer/is v1.4.1
 	github.com/olekukonko/tablewriter v0.0.5
 	github.com/spf13/cobra v1.7.0
@@ -52,8 +53,9 @@ require (
 	cloud.google.com/go/compute/metadata v0.2.1 // indirect
 	github.com/go-jose/go-jose/v3 v3.0.0 // indirect
 	github.com/inconshreveable/mousetrap v1.1.0 // indirect
-	github.com/kr/pretty v0.3.1 // indirect
+	github.com/kr/text v0.2.0 // indirect
 	github.com/rivo/uniseg v0.2.0 // indirect
+	github.com/rogpeppe/go-internal v1.9.0 // indirect
 	github.com/spf13/pflag v1.0.5 // indirect
 )
 

+ 14 - 0
logic/clients.go

@@ -62,3 +62,17 @@ func SortExtClient(unsortedExtClient []models.ExtClient) {
 		return unsortedExtClient[i].ClientID < unsortedExtClient[j].ClientID
 	})
 }
+
+func getExtClients(node models.Node) []models.ExtClient {
+	var extClients []models.ExtClient
+	netec, err := GetNetworkExtClients(node.Network)
+	if err != nil {
+		return extClients
+	}
+	for _, ec := range netec {
+		if ec.IngressGatewayID == node.ID.String() {
+			extClients = append(extClients, ec)
+		}
+	}
+	return extClients
+}

+ 17 - 0
logic/nodes.go

@@ -41,6 +41,23 @@ func GetNetworkNodes(network string) ([]models.Node, error) {
 	return GetNetworkNodesMemory(allnodes, network), nil
 }
 
+// GetNetworkClients - gets the clients of a network
+func GetNetworkClients(network string) ([]models.Client, error) {
+	clients := []models.Client{}
+	nodes, err := GetNetworkNodes(network)
+	if err != nil {
+		return []models.Client{}, err
+	}
+	for _, node := range nodes {
+		client := models.Client{
+			Node: node,
+			Host: *GetHostByNodeID(node.ID.String()),
+		}
+		clients = append(clients, client)
+	}
+	return clients, nil
+}
+
 // GetNetworkNodesMemory - gets all nodes belonging to a network from list in memory
 func GetNetworkNodesMemory(allNodes []models.Node, network string) []models.Node {
 	var nodes = []models.Node{}

+ 34 - 42
logic/relay.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"log"
 	"net"
 
 	"github.com/gravitl/netmaker/database"
@@ -12,51 +13,52 @@ import (
 )
 
 // CreateRelay - creates a relay
-func CreateRelay(relay models.RelayRequest) ([]models.Node, models.Node, error) {
-	var returnnodes []models.Node
-
+func CreateRelay(relay models.RelayRequest) ([]models.Client, models.Node, error) {
+	var relayedClients []models.Client
 	node, err := GetNodeByID(relay.NodeID)
 	if err != nil {
-		return returnnodes, models.Node{}, err
+		return relayedClients, models.Node{}, err
 	}
 	host, err := GetHost(node.HostID.String())
 	if err != nil {
-		return returnnodes, models.Node{}, err
+		return relayedClients, models.Node{}, err
 	}
 	if host.OS != "linux" {
-		return returnnodes, models.Node{}, fmt.Errorf("only linux machines can be relay nodes")
+		return relayedClients, models.Node{}, fmt.Errorf("only linux machines can be relay nodes")
 	}
 	err = ValidateRelay(relay)
 	if err != nil {
-		return returnnodes, models.Node{}, err
+		return relayedClients, models.Node{}, err
 	}
 	node.IsRelay = true
+	node.RelayedNodes = relay.RelayedNodes
 	node.SetLastModified()
 	nodeData, err := json.Marshal(&node)
 	if err != nil {
-		return returnnodes, node, err
+		return relayedClients, node, err
 	}
 	if err = database.Insert(node.ID.String(), string(nodeData), database.NODES_TABLE_NAME); err != nil {
-		return returnnodes, models.Node{}, err
+		return relayedClients, models.Node{}, err
 	}
-	returnnodes = SetRelayedNodes(true, relay.NodeID, relay.RelayedNodes)
-	for _, relayedNode := range returnnodes {
-		data, err := json.Marshal(&relayedNode)
+	relayedClients = SetRelayedNodes(true, relay.NodeID, relay.RelayedNodes)
+	for _, relayed := range relayedClients {
+		data, err := json.Marshal(&relayed.Node)
 		if err != nil {
 			logger.Log(0, "marshalling relayed node", err.Error())
 			continue
 		}
-		if err := database.Insert(relayedNode.ID.String(), string(data), database.NODES_TABLE_NAME); err != nil {
+		if err := database.Insert(relayed.Node.ID.String(), string(data), database.NODES_TABLE_NAME); err != nil {
 			logger.Log(0, "inserting relayed node", err.Error())
 			continue
 		}
 	}
-	return returnnodes, node, nil
+	return relayedClients, node, nil
 }
 
 // SetRelayedNodes- sets and saves node as relayed
-func SetRelayedNodes(setRelayed bool, relay string, relayed []string) []models.Node {
-	var returnnodes []models.Node
+func SetRelayedNodes(setRelayed bool, relay string, relayed []string) []models.Client {
+	log.Println("setting relayed nodes", setRelayed, relay, relayed)
+	var returnnodes []models.Client
 	for _, id := range relayed {
 		node, err := GetNodeByID(id)
 		if err != nil {
@@ -70,6 +72,7 @@ func SetRelayedNodes(setRelayed bool, relay string, relayed []string) []models.N
 			node.RelayedBy = ""
 		}
 		node.SetLastModified()
+		log.Println("setting relayed nodes", node.ID.String(), node.IsRelayed, node.RelayedBy)
 		data, err := json.Marshal(&node)
 		if err != nil {
 			logger.Log(0, "setRelayedNodes.Marshal", err.Error())
@@ -79,7 +82,11 @@ func SetRelayedNodes(setRelayed bool, relay string, relayed []string) []models.N
 			logger.Log(0, "setRelayedNodes.Insert", err.Error())
 			continue
 		}
-		returnnodes = append(returnnodes, node)
+		host := GetHostByNodeID(node.ID.String())
+		returnnodes = append(returnnodes, models.Client{
+			Host: *host,
+			Node: node,
+		})
 	}
 	return returnnodes
 }
@@ -96,48 +103,32 @@ func ValidateRelay(relay models.RelayRequest) error {
 }
 
 // UpdateRelayed - updates relay nodes
-func UpdateRelayed(relay string, oldNodes []string, newNodes []string) []models.Node {
+func UpdateRelayed(relay string, oldNodes []string, newNodes []string) []models.Client {
 	_ = SetRelayedNodes(false, relay, oldNodes)
 	return SetRelayedNodes(true, relay, newNodes)
 }
 
 // DeleteRelay - deletes a relay
-func DeleteRelay(network, nodeid string) ([]models.Node, models.Node, error) {
-	var returnnodes []models.Node
+func DeleteRelay(network, nodeid string) ([]models.Client, models.Node, error) {
+	var returnClients []models.Client
 	node, err := GetNodeByID(nodeid)
 	if err != nil {
-		return returnnodes, models.Node{}, err
+		return returnClients, models.Node{}, err
 	}
-	returnnodes = SetRelayedNodes(false, nodeid, node.RelayedNodes)
+	returnClients = SetRelayedNodes(false, nodeid, node.RelayedNodes)
 	node.IsRelay = false
+	node.RelayedNodes = []string{}
 	node.SetLastModified()
 	data, err := json.Marshal(&node)
 	if err != nil {
-		return returnnodes, models.Node{}, err
+		return returnClients, models.Node{}, err
 	}
 	if err = database.Insert(nodeid, string(data), database.NODES_TABLE_NAME); err != nil {
-		return returnnodes, models.Node{}, err
+		return returnClients, models.Node{}, err
 	}
-	return returnnodes, node, nil
+	return returnClients, node, nil
 }
 
-// GetNetworkClients - gets all clients in a network
-func GetNetworkClients(network string) []models.Client {
-	var clients []models.Client
-	nodes, err := GetNetworkNodes(network)
-	if err != nil {
-		return clients
-	}
-	for _, node := range nodes {
-		host := GetHostByNodeID(node.ID.String())
-		client := models.Client{
-			Host: *host,
-			Node: node,
-		}
-		clients = append(clients, client)
-	}
-	return clients
-}
 func getRelayedAddresses(id string) []net.IPNet {
 	addrs := []net.IPNet{}
 	node, err := GetNodeByID(id)
@@ -151,5 +142,6 @@ func getRelayedAddresses(id string) []net.IPNet {
 	if node.Address6.IP != nil {
 		addrs = append(addrs, node.Address6)
 	}
+	log.Println("====================== relayed addresses", addrs)
 	return addrs
 }

+ 2 - 3
models/host.go

@@ -77,9 +77,8 @@ type Host struct {
 
 // Client - represents a client on the network
 type Client struct {
-	Host Host   `json:"host" yaml:"host"`
-	Node Node   `json:"node" yaml:"node"`
-	Kind string `json:"kind" yaml:"kind"`
+	Host Host `json:"host" yaml:"host"`
+	Node Node `json:"node" yaml:"node"`
 }
 
 // FormatBool converts a boolean to a [yes|no] string

+ 2 - 2
models/mqtt.go

@@ -70,6 +70,6 @@ const (
 )
 
 type PeerAction struct {
-	Action PeerActionType     `json:"action"`
-	Peer   wgtypes.PeerConfig `json:"peer"`
+	Action PeerActionType       `json:"action"`
+	Peers  []wgtypes.PeerConfig `json:"peers"`
 }

+ 437 - 74
mq/publishers.go

@@ -5,6 +5,7 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"log"
 	"net"
 	"time"
 
@@ -12,6 +13,7 @@ import (
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/servercfg"
+	"github.com/kr/pretty"
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
@@ -118,9 +120,11 @@ func BroadCastDelPeer(host *models.Host, network string) error {
 	}
 	p := models.PeerAction{
 		Action: models.RemovePeer,
-		Peer: wgtypes.PeerConfig{
-			PublicKey: host.PublicKey,
-			Remove:    true,
+		Peers: []wgtypes.PeerConfig{
+			{
+				PublicKey: host.PublicKey,
+				Remove:    true,
+			},
 		},
 	}
 	data, err := json.Marshal(p)
@@ -144,14 +148,16 @@ func BroadCastAddPeer(host *models.Host, node *models.Node, network string, upda
 
 	p := models.PeerAction{
 		Action: models.AddPeer,
-		Peer: wgtypes.PeerConfig{
-			PublicKey: host.PublicKey,
-			Endpoint: &net.UDPAddr{
-				IP:   host.EndpointIP,
-				Port: logic.GetPeerListenPort(host),
+		Peers: []wgtypes.PeerConfig{
+			{
+				PublicKey: host.PublicKey,
+				Endpoint: &net.UDPAddr{
+					IP:   host.EndpointIP,
+					Port: logic.GetPeerListenPort(host),
+				},
+				PersistentKeepaliveInterval: &node.PersistentKeepalive,
+				ReplaceAllowedIPs:           true,
 			},
-			PersistentKeepaliveInterval: &node.PersistentKeepalive,
-			ReplaceAllowedIPs:           true,
 		},
 	}
 	if update {
@@ -159,7 +165,7 @@ func BroadCastAddPeer(host *models.Host, node *models.Node, network string, upda
 	}
 	for _, nodeI := range nodes {
 		// update allowed ips, according to the peer node
-		p.Peer.AllowedIPs = logic.GetAllowedIPs(&nodeI, node, nil)
+		p.Peers[0].AllowedIPs = logic.GetAllowedIPs(&nodeI, node, nil)
 		data, err := json.Marshal(p)
 		if err != nil {
 			continue
@@ -410,30 +416,6 @@ func PublishHostDNSUpdate(old, new *models.Host, networks []string) error {
 	return nil
 }
 
-// PublishRelayPeerUpdate publishes peer update on relay changes
-func PublishRelayPeerUpdate(relay *models.Client, clients *[]models.Client) error {
-	relayed := []models.Client{}
-	for _, client := range *clients {
-		var update []wgtypes.PeerConfig
-		if client.Host.ID == relay.Host.ID {
-			client.Kind = "relay"
-		}
-		if client.Node.RelayedBy == relay.Node.ID.String() {
-			client.Kind = "relayed"
-			update = setPeersForRelayedNode(*relay, client, clients)
-		} else {
-			client.Kind = "normal"
-			update = setPeersForUnrelayedNode(relayed, *relay, client, clients)
-		}
-		data, err := json.Marshal(update)
-		if err != nil {
-			return err
-		}
-		publish(&client.Host, fmt.Sprintf("relay/new/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
-	}
-	return nil
-}
-
 func pushMetricsToExporter(metrics models.Metrics) error {
 	logger.Log(2, "----> Pushing metrics to exporter")
 	data, err := json.Marshal(metrics)
@@ -550,65 +532,446 @@ func sendPeers() {
 	}
 }
 
-func setPeersForRelayedNode(relay, relayed models.Client, clients *[]models.Client) []wgtypes.PeerConfig {
-	perrUpdate := []wgtypes.PeerConfig{}
-	relayUpdate := wgtypes.PeerConfig{
+func PubPeersforRelay(relay models.Client, peers []models.Client) {
+	for _, peer := range peers {
+		if peer.Host.ID == relay.Host.ID {
+			continue
+		}
+		update := wgtypes.PeerConfig{
+			PublicKey:         peer.Host.PublicKey,
+			ReplaceAllowedIPs: true,
+			Endpoint: &net.UDPAddr{
+				IP:   peer.Host.EndpointIP,
+				Port: peer.Host.ListenPort,
+			},
+		}
+		if peer.Node.Address.IP != nil {
+			update.AllowedIPs = append(update.AllowedIPs, relay.Node.Address)
+		}
+		if peer.Node.Address6.IP != nil {
+			update.AllowedIPs = append(update.AllowedIPs, relay.Node.Address6)
+		}
+		update.PersistentKeepaliveInterval = &relay.Node.PersistentKeepalive
+		data, err := json.Marshal(update)
+		if err != nil {
+			continue
+		}
+		pretty.Println("publishing peer update for relay", update)
+		publish(&relay.Host, fmt.Sprintf("peer/host/%s/%s", relay.Host.ID.String(), servercfg.GetServer()), data)
+	}
+}
+
+func PubPeersForRelayedNode(relayed, relay models.Client, peers []models.Client) {
+	update := wgtypes.PeerConfig{
 		PublicKey:         relay.Host.PublicKey,
 		ReplaceAllowedIPs: true,
+		Endpoint: &net.UDPAddr{
+			IP:   relay.Host.EndpointIP,
+			Port: relay.Host.ListenPort,
+		},
 	}
 	if relayed.Node.Address.IP != nil {
-		relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, relayed.Node.Address)
+		update.AllowedIPs = append(update.AllowedIPs, relayed.Node.Address)
 	}
 	if relayed.Node.Address6.IP != nil {
-		relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, relayed.Node.Address6)
+		update.AllowedIPs = append(update.AllowedIPs, relayed.Node.Address6)
+	}
+	update.PersistentKeepaliveInterval = &relayed.Node.PersistentKeepalive
+
+	for _, peer := range peers {
+		if peer.Node.Address.IP != nil {
+			update.AllowedIPs = append(update.AllowedIPs, peer.Node.Address)
+		}
+		if peer.Node.Address6.IP != nil {
+			update.AllowedIPs = append(update.AllowedIPs, peer.Node.Address6)
+		}
+		if peer.Node.IsEgressGateway {
+			for _, egressRange := range peer.Node.EgressGatewayRanges {
+				ip, cidr, err := net.ParseCIDR(egressRange)
+				if err != nil {
+					continue
+				}
+				update.AllowedIPs = append(update.AllowedIPs, net.IPNet{IP: ip, Mask: cidr.Mask})
+			}
+		}
+		if peer.Node.IsIngressGateway {
+			extclients, err := logic.GetNetworkExtClients(peer.Node.Network)
+			if err != nil {
+				continue
+			}
+			for _, ec := range extclients {
+				if ec.IngressGatewayID == peer.Node.ID.String() {
+					ip, cidr, err := net.ParseCIDR(ec.Address)
+					if err != nil {
+						continue
+					}
+					update.AllowedIPs = append(update.AllowedIPs, net.IPNet{IP: ip, Mask: cidr.Mask})
+				}
+			}
+		}
+		data, err := json.Marshal(update)
+		if err != nil {
+			continue
+		}
+		pretty.Println("publishing peer update for relayed node", update)
+		publish(&relayed.Host, fmt.Sprintf("peer/host/%s/%s", relayed.Host.ID.String(), servercfg.GetServer()), data)
 	}
-	for _, c := range *clients {
+
+}
+
+func PubPeersForUnrelayedNode(client, relay *models.Client, peers, relayedClients *[]models.Client) {
+	for _, peer := range *peers {
+		// remove nodes relayed by the relay from list of peers
+		if peer.Node.RelayedBy == relay.Host.ID.String() {
+			update := wgtypes.PeerConfig{
+				PublicKey: peer.Host.PublicKey,
+				Remove:    true,
+			}
+			data, err := json.Marshal(update)
+			if err != nil {
+				continue
+			}
+			pretty.Println("publishing peer update for relayed node", update)
+			publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
+			// add relay addresses and all rela1yed nodes
+		} else if peer.Host.ID == relay.Host.ID {
+			update := wgtypes.PeerConfig{
+				PublicKey:         relay.Host.PublicKey,
+				ReplaceAllowedIPs: true,
+				Endpoint: &net.UDPAddr{
+					IP:   relay.Host.EndpointIP,
+					Port: relay.Host.ListenPort,
+				},
+				PersistentKeepaliveInterval: &relay.Node.PersistentKeepalive,
+			}
+
+			if relay.Node.Address.IP != nil {
+				update.AllowedIPs = append(update.AllowedIPs, relay.Node.Address)
+			}
+			if relay.Node.Address6.IP != nil {
+				update.AllowedIPs = append(update.AllowedIPs, relay.Node.Address6)
+			}
+			for _, peer := range *relayedClients {
+				if peer.Node.Address.IP != nil {
+					update.AllowedIPs = append(update.AllowedIPs, peer.Node.Address)
+				}
+				if peer.Node.Address6.IP != nil {
+					update.AllowedIPs = append(update.AllowedIPs, peer.Node.Address6)
+				}
+			}
+			data, err := json.Marshal(update)
+			if err == nil {
+				pretty.Println("publishing peer update for relayed node", update)
+				publish(&relay.Host, fmt.Sprintf("peer/host/%s/%s", relay.Host.ID.String(), servercfg.GetServer()), data)
+			}
+			return
+		} else {
+			update := wgtypes.PeerConfig{
+				PublicKey:         peer.Host.PublicKey,
+				ReplaceAllowedIPs: true,
+				Endpoint: &net.UDPAddr{
+					IP:   peer.Host.EndpointIP,
+					Port: peer.Host.ListenPort,
+				},
+				PersistentKeepaliveInterval: &peer.Node.PersistentKeepalive,
+			}
+			if peer.Node.Address.IP != nil {
+				update.AllowedIPs = append(update.AllowedIPs, peer.Node.Address)
+			}
+			if peer.Node.Address6.IP != nil {
+				update.AllowedIPs = append(update.AllowedIPs, peer.Node.Address6)
+			}
+			data, err := json.Marshal(update)
+			if err == nil {
+				pretty.Println("publishing peer update for unrelayed node", update)
+				publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
+			}
+		}
+	}
+}
+
+func PubPeerUpdate(client, relay *models.Client, peers *[]models.Client) {
+	fmt.Println("calculating peer update for", client.Host.Name, " with relay ")
+	if relay != nil {
+		fmt.Println(relay.Host.Name, " with relayed nodes ", relay.Node.RelayedNodes)
+	} else {
+		fmt.Println("no relay")
+	}
+
+	p := models.PeerAction{
+		Action: models.UpdatePeer,
+	}
+	if client.Node.IsRelay {
+		pubRelayUpdate(client, peers)
+		return
+	}
+	if relay != nil {
+		if logic.StringSliceContains(relay.Node.RelayedNodes, client.Node.ID.String()) {
+			pubRelayedUpdate(client, relay, peers)
+			return
+		}
+	}
+	for _, peer := range *peers {
+		fmt.Println("peer: ", peer.Host.Name)
+		if client.Host.ID == peer.Host.ID {
+			continue
+		}
 		update := wgtypes.PeerConfig{
-			PublicKey: c.Host.PublicKey,
-			Remove:    true,
+			PublicKey:         peer.Host.PublicKey,
+			ReplaceAllowedIPs: true,
+			Endpoint: &net.UDPAddr{
+				IP:   peer.Host.EndpointIP,
+				Port: peer.Host.ListenPort,
+			},
+			PersistentKeepaliveInterval: &peer.Node.PersistentKeepalive,
 		}
-		perrUpdate = append(perrUpdate, update)
-		if c.Node.Address.IP != nil {
-			relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, c.Node.Address)
+		if peer.Node.IsRelay {
+			fmt.Println("processing relay peer")
+			update.AllowedIPs = append(update.AllowedIPs, getRelayIPs(peer)...)
+			fmt.Println("adding relay ips", update.AllowedIPs)
+		}
+		if relay != nil {
+			if peer.Node.IsRelayed && peer.Node.RelayedBy == relay.Node.ID.String() {
+				fmt.Println("removing relayed peer", peer.Host.Name, " from ", client.Host.Name)
+				update.Remove = true
+			}
 		}
-		if c.Node.Address6.IP != nil {
-			relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, c.Node.Address6)
+		if peer.Node.Address.IP != nil {
+			peer.Node.Address.Mask = net.CIDRMask(32, 32)
+			update.AllowedIPs = append(update.AllowedIPs, peer.Node.Address)
 		}
+		if peer.Node.Address6.IP != nil {
+			peer.Node.Address.Mask = net.CIDRMask(128, 128)
+			update.AllowedIPs = append(update.AllowedIPs, peer.Node.Address6)
+		}
+		if peer.Node.IsEgressGateway {
+			update.AllowedIPs = append(update.AllowedIPs, getEgressIPs(peer)...)
+		}
+		if peer.Node.IsIngressGateway {
+			update.AllowedIPs = append(update.AllowedIPs, getIngressIPs(peer)...)
+		}
+		p.Peers = append(p.Peers, update)
+		fmt.Println("update: ", update)
+
+	}
+	data, err := json.Marshal(p)
+	if err != nil {
+		logger.Log(0, "marshal peer update", err.Error())
+		return
 	}
-	perrUpdate = append(perrUpdate, relayUpdate)
-	return perrUpdate
+	fmt.Println("publishing peer update", client.Host.Name, p.Action, len(data))
+	publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
 }
 
-func setPeersForUnrelayedNode(relayedNodes []models.Client, relay, client models.Client, clients *[]models.Client) []wgtypes.PeerConfig {
-	perrUpdate := []wgtypes.PeerConfig{}
-	relayUpdate := wgtypes.PeerConfig{
-		PublicKey:         relay.Host.PublicKey,
-		ReplaceAllowedIPs: true,
-		AllowedIPs:        []net.IPNet{},
+func getRelayIPs(peer models.Client) []net.IPNet {
+	var relayIPs []net.IPNet
+	for _, relayed := range peer.Node.RelayedNodes {
+		node, err := logic.GetNodeByID(relayed)
+		if err != nil {
+			logger.Log(0, "retrieve relayed node", err.Error())
+			continue
+		}
+		if node.Address.IP != nil {
+			node.Address.Mask = net.CIDRMask(32, 32)
+			relayIPs = append(relayIPs, node.Address)
+		}
+		if node.Address6.IP != nil {
+			node.Address.Mask = net.CIDRMask(128, 128)
+			relayIPs = append(relayIPs, node.Address6)
+		}
+		if node.IsRelay {
+			relayIPs = append(relayIPs, getRelayIPs(peer)...)
+		}
+		if node.IsEgressGateway {
+			relayIPs = append(relayIPs, getEgressIPs(peer)...)
+		}
+		if node.IsIngressGateway {
+			relayIPs = append(relayIPs, getIngressIPs(peer)...)
+		}
+
 	}
-	for _, relayed := range relayedNodes {
-		//remove relayed nodes from peer list
-		relayedUpdate := wgtypes.PeerConfig{
-			PublicKey: relayed.Host.PublicKey,
-			Remove:    true,
+	return relayIPs
+}
+
+func getEgressIPs(peer models.Client) []net.IPNet {
+	var egressIPs []net.IPNet
+	for _, egressRange := range peer.Node.EgressGatewayRanges {
+		ip, cidr, err := net.ParseCIDR(egressRange)
+		if err != nil {
+			logger.Log(0, "parse egress range", err.Error())
+			continue
 		}
-		perrUpdate = append(perrUpdate, relayedUpdate)
-		// add relayed nodes to relay allowed ips
-		if relayed.Node.Address.IP != nil {
-			relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, relayed.Node.Address)
+		cidr.IP = ip
+		egressIPs = append(egressIPs, *cidr)
+	}
+	return egressIPs
+}
+
+func getIngressIPs(peer models.Client) []net.IPNet {
+	var ingressIPs []net.IPNet
+	extclients, err := logic.GetNetworkExtClients(peer.Node.Network)
+	if err != nil {
+		return ingressIPs
+	}
+	for _, ec := range extclients {
+		if ec.IngressGatewayID == peer.Node.ID.String() {
+			if ec.Address != "" {
+				ip, cidr, err := net.ParseCIDR(ec.Address)
+				if err != nil {
+					continue
+				}
+				cidr.IP = ip
+				ingressIPs = append(ingressIPs, *cidr)
+			}
+			if ec.Address6 != "" {
+				ip, cidr, err := net.ParseCIDR(ec.Address6)
+				if err != nil {
+					continue
+				}
+				cidr.IP = ip
+				ingressIPs = append(ingressIPs, *cidr)
+			}
 		}
-		if relayed.Node.Address6.IP != nil {
-			relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, relayed.Node.Address6)
+	}
+	return ingressIPs
+}
+
+// publish peer update to a node (client) that is relayed by the relay
+func pubRelayedUpdate(client, relay *models.Client, peers *[]models.Client) {
+	log.Println("pubRelayedUpdate", client.Host.Name, relay.Host.Name, len(*peers))
+	//verify
+	if !logic.StringSliceContains(relay.Node.RelayedNodes, client.Node.ID.String()) {
+		logger.Log(0, "invalid call to pubRelayed update", client.Host.Name, relay.Host.Name)
+		return
+	}
+	//remove all nodes except relay
+	p := models.PeerAction{
+		Action: models.RemovePeer,
+	}
+	log.Println("removing peers ")
+	for _, peer := range *peers {
+		if peer.Host.ID == relay.Host.ID || peer.Host.ID == client.Host.ID {
+			log.Println("skipping removal of ", peer.Host.Name)
+			continue
 		}
+		update := wgtypes.PeerConfig{
+			PublicKey: peer.Host.PublicKey,
+			Remove:    true,
+		}
+		p.Peers = append(p.Peers, update)
+	}
+	data, err := json.Marshal(p)
+	if err != nil {
+		logger.Log(0, "marshal peer update", err.Error())
+		return
 	}
-	// add client addresses to relay allowed ips
-	if client.Node.Address.IP != nil {
-		relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, client.Node.Address)
+	fmt.Println("publishing peer update", client.Host.Name, p.Action, len(data))
+	publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
+
+	//update the relay peer
+	p = models.PeerAction{
+		Action: models.UpdatePeer,
 	}
-	if client.Node.Address6.IP != nil {
-		relayUpdate.AllowedIPs = append(relayUpdate.AllowedIPs, client.Node.Address6)
+	update := wgtypes.PeerConfig{
+		PublicKey:         relay.Host.PublicKey,
+		ReplaceAllowedIPs: true,
+		Endpoint: &net.UDPAddr{
+			IP:   relay.Host.EndpointIP,
+			Port: relay.Host.ListenPort,
+		},
+		PersistentKeepaliveInterval: &relay.Node.PersistentKeepalive,
+	}
+	if relay.Node.Address.IP != nil {
+		relay.Node.Address.Mask = net.CIDRMask(32, 32)
+		update.AllowedIPs = append(update.AllowedIPs, relay.Node.Address)
+	}
+	if relay.Node.Address6.IP != nil {
+		relay.Node.Address6.Mask = net.CIDRMask(128, 128)
+		update.AllowedIPs = append(update.AllowedIPs, relay.Node.Address6)
+	}
+	p.Peers = append(p.Peers, update)
+	// add all other peers to allowed ips
+	log.Println("adding peers to allowed ips")
+	for _, peer := range *peers {
+		if peer.Host.ID == relay.Host.ID || peer.Host.ID == client.Host.ID {
+			log.Println("skipping ", peer.Host.Name, "in allowedips")
+			continue
+		}
+		log.Println("adding ", peer.Host.Name, peer.Node.Address, "to allowedips")
+		if peer.Node.Address.IP != nil {
+			peer.Node.Address.Mask = net.CIDRMask(32, 32)
+			update.AllowedIPs = append(update.AllowedIPs, peer.Node.Address)
+		}
+		if peer.Node.Address6.IP != nil {
+			peer.Node.Address6.Mask = net.CIDRMask(128, 128)
+			update.AllowedIPs = append(update.AllowedIPs, peer.Node.Address6)
+		}
+		if peer.Node.IsRelay {
+			update.AllowedIPs = append(update.AllowedIPs, getRelayIPs(peer)...)
+		}
+		if peer.Node.IsEgressGateway {
+			update.AllowedIPs = append(update.AllowedIPs, getEgressIPs(peer)...)
+		}
+		if peer.Node.IsIngressGateway {
+			update.AllowedIPs = append(update.AllowedIPs, getIngressIPs(peer)...)
+		}
+	}
+	p.Peers = append(p.Peers, update)
+	data, err = json.Marshal(p)
+	if err != nil {
+		logger.Log(0, "marshal peer update", err.Error())
+		return
 	}
-	perrUpdate = append(perrUpdate, relayUpdate)
+	fmt.Println("publishing peer update", client.Host.Name, p.Action, len(data))
+	publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
+}
 
-	return perrUpdate
+func pubRelayUpdate(client *models.Client, peers *[]models.Client) {
+	if !client.Node.IsRelay {
+		return
+	}
+	// add all peers to allowedips
+	p := models.PeerAction{
+		Action: models.UpdatePeer,
+	}
+	for _, peer := range *peers {
+		if peer.Host.ID == client.Host.ID {
+			continue
+		}
+		update := wgtypes.PeerConfig{
+			PublicKey:         peer.Host.PublicKey,
+			ReplaceAllowedIPs: true,
+			Remove:            false,
+			Endpoint: &net.UDPAddr{
+				IP:   peer.Host.EndpointIP,
+				Port: peer.Host.ListenPort,
+			},
+			PersistentKeepaliveInterval: &peer.Node.PersistentKeepalive,
+		}
+		if peer.Node.Address.IP != nil {
+			peer.Node.Address.Mask = net.CIDRMask(32, 32)
+			update.AllowedIPs = append(update.AllowedIPs, peer.Node.Address)
+		}
+		if peer.Node.Address6.IP != nil {
+			peer.Node.Address6.Mask = net.CIDRMask(128, 128)
+			update.AllowedIPs = append(update.AllowedIPs, peer.Node.Address6)
+		}
+		if peer.Node.IsRelay {
+			update.AllowedIPs = append(update.AllowedIPs, getRelayIPs(peer)...)
+		}
+		if peer.Node.IsEgressGateway {
+			update.AllowedIPs = append(update.AllowedIPs, getEgressIPs(peer)...)
+		}
+		if peer.Node.IsIngressGateway {
+			update.AllowedIPs = append(update.AllowedIPs, getIngressIPs(peer)...)
+		}
+		p.Peers = append(p.Peers, update)
+	}
+	data, err := json.Marshal(p)
+	if err != nil {
+		logger.Log(0, "marshal peer update", err.Error())
+		return
+	}
+	fmt.Println("publishing peer update", client.Host.Name, p.Action, len(data))
+	publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
 }

+ 9 - 6
mq/util.go

@@ -3,6 +3,7 @@ package mq
 import (
 	"errors"
 	"fmt"
+	"log"
 	"strings"
 	"time"
 
@@ -12,9 +13,9 @@ import (
 )
 
 func decryptMsgWithHost(host *models.Host, msg []byte) ([]byte, error) {
-	if host.OS == models.OS_Types.IoT { // just pass along IoT messages
-		return msg, nil
-	}
+	//if host.OS == models.OS_Types.IoT { // just pass along IoT messages
+	return msg, nil
+	//}
 
 	trafficKey, trafficErr := logic.RetrievePrivateTrafficKey() // get server private key
 	if trafficErr != nil {
@@ -33,6 +34,7 @@ func decryptMsgWithHost(host *models.Host, msg []byte) ([]byte, error) {
 }
 
 func decryptMsg(node *models.Node, msg []byte) ([]byte, error) {
+	return msg, nil
 	if len(msg) <= 24 { // make sure message is of appropriate length
 		return nil, fmt.Errorf("recieved invalid message from broker %v", msg)
 	}
@@ -45,9 +47,9 @@ func decryptMsg(node *models.Node, msg []byte) ([]byte, error) {
 }
 
 func encryptMsg(host *models.Host, msg []byte) ([]byte, error) {
-	if host.OS == models.OS_Types.IoT {
-		return msg, nil
-	}
+	//if host.OS == models.OS_Types.IoT {
+	return msg, nil
+	//}
 
 	// fetch server public key to be certain hasn't changed in transit
 	trafficKey, trafficErr := logic.RetrievePrivateTrafficKey()
@@ -77,6 +79,7 @@ func publish(host *models.Host, dest string, msg []byte) error {
 	if encryptErr != nil {
 		return encryptErr
 	}
+	log.Println("publishing to ", dest, " with message lenght", len(encrypted))
 	if mqclient == nil {
 		return errors.New("cannot publish ... mqclient not connected")
 	}