|
@@ -2,18 +2,21 @@ package functions
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
+ "crypto/rsa"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"log"
|
|
|
"os"
|
|
|
"os/signal"
|
|
|
"runtime"
|
|
|
+ "strings"
|
|
|
"sync"
|
|
|
"syscall"
|
|
|
"time"
|
|
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
|
"github.com/gravitl/netmaker/models"
|
|
|
+ "github.com/gravitl/netmaker/netclient/auth"
|
|
|
"github.com/gravitl/netmaker/netclient/config"
|
|
|
"github.com/gravitl/netmaker/netclient/local"
|
|
|
"github.com/gravitl/netmaker/netclient/ncutils"
|
|
@@ -88,17 +91,17 @@ func MessageQueue(ctx context.Context, network string) {
|
|
|
}
|
|
|
ncutils.Log("subscribed to all topics for debugging purposes")
|
|
|
}
|
|
|
- if token := client.Subscribe("update/"+cfg.Node.ID, 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
|
|
|
+ if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
|
|
|
log.Fatal(token.Error())
|
|
|
}
|
|
|
if cfg.DebugOn {
|
|
|
- ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/" + cfg.Node.ID)
|
|
|
+ ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s \n", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
|
|
|
}
|
|
|
- if token := client.Subscribe("update/peers/"+cfg.Node.ID, 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
|
|
|
+ if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
|
|
|
log.Fatal(token.Error())
|
|
|
}
|
|
|
if cfg.DebugOn {
|
|
|
- ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/peers/" + cfg.Node.ID)
|
|
|
+ ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s \n", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
|
|
|
}
|
|
|
defer client.Disconnect(250)
|
|
|
go Checkin(ctx, &cfg, network)
|
|
@@ -119,20 +122,27 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
|
|
|
go func() {
|
|
|
var newNode models.Node
|
|
|
var cfg config.ClientConfig
|
|
|
- err := json.Unmarshal(msg.Payload(), &newNode)
|
|
|
+ var network = parseNetworkFromTopic(msg.Topic())
|
|
|
+ cfg.Network = network
|
|
|
+ cfg.ReadConfig()
|
|
|
+
|
|
|
+ data, dataErr := decryptMsg(&cfg, msg.Payload())
|
|
|
+ if dataErr != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ err := json.Unmarshal(data, &newNode)
|
|
|
if err != nil {
|
|
|
ncutils.Log("error unmarshalling node update data" + err.Error())
|
|
|
return
|
|
|
}
|
|
|
+
|
|
|
ncutils.Log("received message to update node " + newNode.Name)
|
|
|
// see if cache hit, if so skip
|
|
|
var currentMessage = read(newNode.Network, lastNodeUpdate)
|
|
|
- if currentMessage == string(msg.Payload()) {
|
|
|
+ if currentMessage == string(data) {
|
|
|
return
|
|
|
}
|
|
|
- insert(newNode.Network, lastNodeUpdate, string(msg.Payload()))
|
|
|
- cfg.Network = newNode.Network
|
|
|
- cfg.ReadConfig()
|
|
|
+ insert(newNode.Network, lastNodeUpdate, string(data))
|
|
|
//check if interface name has changed if so delete.
|
|
|
if cfg.Node.Interface != newNode.Interface {
|
|
|
if err = wireguard.RemoveConf(cfg.Node.Interface, true); err != nil {
|
|
@@ -201,21 +211,28 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
|
|
|
func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
|
|
|
go func() {
|
|
|
var peerUpdate models.PeerUpdate
|
|
|
- err := json.Unmarshal(msg.Payload(), &peerUpdate)
|
|
|
+ var network = parseNetworkFromTopic(msg.Topic())
|
|
|
+ var cfg = config.ClientConfig{}
|
|
|
+ cfg.Network = network
|
|
|
+ cfg.ReadConfig()
|
|
|
+
|
|
|
+ data, dataErr := decryptMsg(&cfg, msg.Payload())
|
|
|
+ if dataErr != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ err := json.Unmarshal(data, &peerUpdate)
|
|
|
if err != nil {
|
|
|
ncutils.Log("error unmarshalling peer data")
|
|
|
return
|
|
|
}
|
|
|
// see if cache hit, if so skip
|
|
|
var currentMessage = read(peerUpdate.Network, lastPeerUpdate)
|
|
|
- if currentMessage == string(msg.Payload()) {
|
|
|
+ if currentMessage == string(data) {
|
|
|
return
|
|
|
}
|
|
|
- insert(peerUpdate.Network, lastPeerUpdate, string(msg.Payload()))
|
|
|
+ insert(peerUpdate.Network, lastPeerUpdate, string(data))
|
|
|
ncutils.Log("update peer handler")
|
|
|
- var cfg config.ClientConfig
|
|
|
- cfg.Network = peerUpdate.Network
|
|
|
- cfg.ReadConfig()
|
|
|
+
|
|
|
var shouldReSub = shouldResub(cfg.Node.NetworkSettings.DefaultServerAddrs, peerUpdate.ServerAddrs)
|
|
|
if shouldReSub {
|
|
|
Resubscribe(client, &cfg)
|
|
@@ -335,24 +352,49 @@ func PublishNodeUpdate(cfg *config.ClientConfig) {
|
|
|
if err := config.Write(cfg, cfg.Network); err != nil {
|
|
|
ncutils.Log("error saving configuration" + err.Error())
|
|
|
}
|
|
|
- client := SetupMQTT(cfg)
|
|
|
data, err := json.Marshal(cfg.Node)
|
|
|
if err != nil {
|
|
|
ncutils.Log("error marshling node update " + err.Error())
|
|
|
}
|
|
|
- if token := client.Publish("update/"+cfg.Node.ID, 0, false, data); token.Wait() && token.Error() != nil {
|
|
|
- ncutils.Log("error publishing endpoint update " + token.Error().Error())
|
|
|
+ if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data); err != nil {
|
|
|
+ ncutils.Log(fmt.Sprintf("error publishing endpoint update, %v \n", err))
|
|
|
}
|
|
|
- client.Disconnect(250)
|
|
|
}
|
|
|
|
|
|
// Hello -- ping the broker to let server know node is alive and doing fine
|
|
|
func Hello(cfg *config.ClientConfig, network string) {
|
|
|
+ if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte("hello world!")); err != nil {
|
|
|
+ ncutils.Log(fmt.Sprintf("error publishing ping, %v \n", err))
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func publish(cfg *config.ClientConfig, dest string, msg []byte) error {
|
|
|
client := SetupMQTT(cfg)
|
|
|
- if token := client.Publish("ping/"+cfg.Node.ID, 2, false, "hello world!"); token.Wait() && token.Error() != nil {
|
|
|
- ncutils.Log("error publishing ping " + token.Error().Error())
|
|
|
+ defer client.Disconnect(250)
|
|
|
+ encrypted, encryptErr := ncutils.EncryptWithPublicKey(msg, &cfg.Node.TrafficKeys.Server)
|
|
|
+ if encryptErr != nil {
|
|
|
+ return encryptErr
|
|
|
+ }
|
|
|
+ if token := client.Publish(dest, 0, false, encrypted); token.Wait() && token.Error() != nil {
|
|
|
+ return token.Error()
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func parseNetworkFromTopic(topic string) string {
|
|
|
+ return strings.Split(topic, "/")[1]
|
|
|
+}
|
|
|
+
|
|
|
+func decryptMsg(cfg *config.ClientConfig, msg []byte) ([]byte, error) {
|
|
|
+ diskKey, trafficErr := auth.RetrieveTrafficKey(cfg.Node.Network)
|
|
|
+ if trafficErr != nil {
|
|
|
+ return nil, trafficErr
|
|
|
+ }
|
|
|
+ var trafficKey rsa.PrivateKey
|
|
|
+ if err := json.Unmarshal([]byte(diskKey), &trafficKey); err != nil {
|
|
|
+ return nil, err
|
|
|
}
|
|
|
- client.Disconnect(250)
|
|
|
+ return ncutils.DecryptWithPrivateKey(msg, &trafficKey), nil
|
|
|
}
|
|
|
|
|
|
func shouldResub(currentServers, newServers []models.ServerAddr) bool {
|