Browse Source

fix publish peers

Matthew R Kasun 3 years ago
parent
commit
767abc7d5e
3 changed files with 75 additions and 24 deletions
  1. 5 0
      controllers/node_grpc.go
  2. 0 4
      main.go
  3. 70 20
      mq/mq.go

+ 5 - 0
controllers/node_grpc.go

@@ -10,6 +10,7 @@ import (
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/models"
+	"github.com/gravitl/netmaker/mq"
 	"github.com/gravitl/netmaker/servercfg"
 	"github.com/gravitl/netmaker/servercfg"
 )
 )
 
 
@@ -85,6 +86,10 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+	// notify other nodes on network of new peer
+	if err := mq.NewPeer(node); err != nil {
+		logger.Log(0, "failed to inform peers of new node "+err.Error())
+	}
 
 
 	err = runServerPeerUpdate(node.Network, true)
 	err = runServerPeerUpdate(node.Network, true)
 	if err != nil {
 	if err != nil {

+ 0 - 4
main.go

@@ -208,10 +208,6 @@ func runMessageQueue(wg *sync.WaitGroup) {
 		client.Disconnect(240)
 		client.Disconnect(240)
 		logger.Log(0, "ping sub failed")
 		logger.Log(0, "ping sub failed")
 	}
 	}
-	if token := client.Subscribe("metrics/#", 0, mq.Metrics); token.Wait() && token.Error() != nil {
-		client.Disconnect(240)
-		logger.Log(0, "metrics sub failed")
-	}
 	if token := client.Subscribe("update/localaddress/#", 0, mq.LocalAddressUpdate); token.Wait() && token.Error() != nil {
 	if token := client.Subscribe("update/localaddress/#", 0, mq.LocalAddressUpdate); token.Wait() && token.Error() != nil {
 		client.Disconnect(240)
 		client.Disconnect(240)
 		logger.Log(0, "metrics sub failed")
 		logger.Log(0, "metrics sub failed")

+ 70 - 20
mq/mq.go

@@ -3,24 +3,24 @@ package mq
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
+	"net"
+	"strconv"
 	"strings"
 	"strings"
+	"time"
 
 
 	mqtt "github.com/eclipse/paho.mqtt.golang"
 	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"github.com/gravitl/netmaker/database"
 	"github.com/gravitl/netmaker/database"
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/models"
+	"github.com/gravitl/netmaker/servercfg"
+	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 )
 
 
 var DefaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 var DefaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 	logger.Log(0, "MQTT Message: Topic: "+string(msg.Topic())+" Message: "+string(msg.Payload()))
 	logger.Log(0, "MQTT Message: Topic: "+string(msg.Topic())+" Message: "+string(msg.Payload()))
 }
 }
 
 
-var Metrics mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
-	logger.Log(0, "Metrics Handler")
-	//TODOD -- handle metrics data ---- store to database?
-}
-
 var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 	logger.Log(0, "Ping Handler: "+msg.Topic())
 	logger.Log(0, "Ping Handler: "+msg.Topic())
 	go func() {
 	go func() {
@@ -42,6 +42,9 @@ var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 			return
 			return
 		}
 		}
 		node.SetLastCheckIn()
 		node.SetLastCheckIn()
+		if err := logic.UpdateNode(&node, &node) ; err != nil {
+			logger.Log(0, "error updating node "+ err.Error())
+		}
 		logger.Log(0, "ping processed")
 		logger.Log(0, "ping processed")
 		// --TODO --set client version once feature is implemented.
 		// --TODO --set client version once feature is implemented.
 		//node.SetClientVersion(msg.Payload())
 		//node.SetClientVersion(msg.Payload())
@@ -63,6 +66,9 @@ var PublicKeyUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Mess
 		}
 		}
 		node.PublicKey = key
 		node.PublicKey = key
 		node.SetLastCheckIn()
 		node.SetLastCheckIn()
+		if err := logic.UpdateNode(&node, &node) ; err != nil {
+			logger.Log(0, "error updating node "+ err.Error())
+		}
 		if err := UpdatePeers(client, node); err != nil {
 		if err := UpdatePeers(client, node); err != nil {
 			logger.Log(0, "error updating peers "+err.Error())
 			logger.Log(0, "error updating peers "+err.Error())
 		}
 		}
@@ -86,36 +92,64 @@ var IPUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 		}
 		}
 		node.Endpoint = ip
 		node.Endpoint = ip
 		node.SetLastCheckIn()
 		node.SetLastCheckIn()
+		if err := logic.UpdateNode(&node, &node) ; err != nil {
+			logger.Log(0, "error updating node "+ err.Error())
+		}
 		if err != UpdatePeers(client, node) {
 		if err != UpdatePeers(client, node) {
 			logger.Log(0, "error updating peers "+err.Error())
 			logger.Log(0, "error updating peers "+err.Error())
 		}
 		}
 	}()
 	}()
 }
 }
 
 
-func UpdatePeers(client mqtt.Client, node models.Node) error {
-	peersToUpdate, err := logic.GetNetworkNodes(node.Network)
+func UpdatePeers(client mqtt.Client, newnode models.Node) error {
+	networkNodes, err := logic.GetNetworkNodes(newnode.Network)
 	if err != nil {
 	if err != nil {
-		logger.Log(0, "error retrieving peers to be updated "+err.Error())
 		return err
 		return err
 	}
 	}
-	for _, peerToUpdate := range peersToUpdate {
-		peers, _, _, err := logic.GetServerPeers(&peerToUpdate)
-		if err != nil {
-			logger.Log(0, "error retrieving peers "+err.Error())
-			return err
-		}
-		if peerToUpdate.ID == node.ID {
-			continue
-		}
+        keepalive, _ := time.ParseDuration(string(newnode.PersistentKeepalive)+"s")
+        for _, node := range  networkNodes {
+                var peers []wgtypes.PeerConfig
 		var peerUpdate models.PeerUpdate
 		var peerUpdate models.PeerUpdate
+                for _, peer := range  networkNodes{
+                        if peer.ID == node.ID {
+                                //skip
+                                continue
+                        }
+                        pubkey, err := wgtypes.ParseKey(peer.PublicKey)
+                        if err != nil {
+				return err
+                        }
+                        if node.Endpoint == peer.Endpoint {
+                                if node.LocalAddress != peer.LocalAddress && peer.LocalAddress != "" {
+                                        peer.Endpoint = peer.LocalAddress
+                                }else {
+                                        continue
+                                }
+                        }
+                        endpoint := peer.Endpoint + ":" + strconv.Itoa(int(peer.ListenPort))
+                        //fmt.Println("endpoint: ", endpoint, peer.Endpoint, peer.ListenPort)
+                        address, err := net.ResolveUDPAddr("udp", endpoint)
+                        if err != nil {
+				return err
+                        }
+                        //calculate Allowed IPs.
+                        var peerData wgtypes.PeerConfig
+                        peerData = wgtypes.PeerConfig{
+                                PublicKey: pubkey,
+                                Endpoint: address,
+                                PersistentKeepaliveInterval: &keepalive,
+                                //AllowedIPs: allowedIPs
+                        }
+                        peers = append (peers, peerData)
+                }
 		peerUpdate.Network = node.Network
 		peerUpdate.Network = node.Network
-		peerUpdate.Peers = peers
-		data, err := json.Marshal(peerUpdate)
+		peerUpdate.Peers = peers 
+		data, err := json.Marshal(&peerUpdate)
 		if err != nil {
 		if err != nil {
 			logger.Log(0, "error marshaling peer update "+err.Error())
 			logger.Log(0, "error marshaling peer update "+err.Error())
 			return err
 			return err
 		}
 		}
-		if token := client.Publish("/update/peers/"+peerToUpdate.ID, 0, false, data); token.Wait() && token.Error() != nil {
+			if token := client.Publish("/update/peers/"+node.ID, 0, false, data); token.Wait() && token.Error() != nil {
 			logger.Log(0, "error sending peer updatte to no")
 			logger.Log(0, "error sending peer updatte to no")
 			return err
 			return err
 		}
 		}
@@ -154,3 +188,19 @@ func GetID(topic string) (string, error) {
 	//the last part of the topic will be the node.ID
 	//the last part of the topic will be the node.ID
 	return parts[count-1], nil
 	return parts[count-1], nil
 }
 }
+
+func NewPeer(node models.Node) error {
+	opts := mqtt.NewClientOptions()
+	broker := servercfg.GetMessageQueueEndpoint()
+	logger.Log(0, "broker: "+broker)
+	opts.AddBroker(broker)
+	client := mqtt.NewClient(opts)
+	if token := client.Connect(); token.Wait() && token.Error() != nil {
+		return token.Error()
+	}
+	
+	if err := UpdatePeers(client, node); err != nil {
+		return err
+	}
+	return nil
+}