Browse Source

Merge pull request #794 from gravitl/feature_v0.11.0_comms_network_client

client logic for comms network
dcarns 3 years ago
parent
commit
e9f229dd1d

+ 35 - 1
netclient/command/commands.go

@@ -8,12 +8,46 @@ import (
 	"github.com/gravitl/netmaker/netclient/daemon"
 	"github.com/gravitl/netmaker/netclient/functions"
 	"github.com/gravitl/netmaker/netclient/ncutils"
+	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
+// JoinCommsNetwork -- Join the message queue comms network
+func JoinCommsNetwork(cfg config.ClientConfig) error {
+	key, err := wgtypes.GeneratePrivateKey()
+	if err != nil {
+		return err
+	}
+	if err := functions.JoinNetwork(cfg, key.PublicKey().String()); err != nil {
+		return err
+	}
+	return nil
+}
+
 // Join - join command to run from cli
 func Join(cfg config.ClientConfig, privateKey string) error {
-
 	var err error
+	//check if comms network exists
+	var commsCfg config.ClientConfig
+	commsCfg.Network = ncutils.COMMS_NETWORK_NAME
+	commsCfg.ReadConfig()
+	if commsCfg.Node.Name == "" {
+		if err := JoinCommsNetwork(commsCfg); err != nil {
+			ncutils.Log("could not join comms network " + err.Error())
+			return err
+		}
+	}
+	//ensure comms network is reachable
+	if err := functions.PingServer(&commsCfg); err != nil {
+		if err := functions.LeaveNetwork(commsCfg.Network); err != nil {
+			ncutils.Log("could not leave comms network " + err.Error())
+			return err
+		}
+		if err := JoinCommsNetwork(commsCfg); err != nil {
+			ncutils.Log("could not join comms network " + err.Error())
+			return err
+		}
+	}
+	//join network
 	err = functions.JoinNetwork(cfg, privateKey)
 	if err != nil && !cfg.DebugOn {
 		if !strings.Contains(err.Error(), "ALREADY_INSTALLED") {

+ 93 - 384
netclient/functions/daemon.go

@@ -2,31 +2,26 @@ package functions
 
 import (
 	"context"
-	"encoding/json"
 	"errors"
 	"fmt"
 	"os"
 	"os/signal"
-	"runtime"
 	"strings"
 	"sync"
 	"syscall"
 	"time"
 
-	"github.com/davecgh/go-spew/spew"
 	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"github.com/go-ping/ping"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/netclient/auth"
 	"github.com/gravitl/netmaker/netclient/config"
 	"github.com/gravitl/netmaker/netclient/daemon"
-	"github.com/gravitl/netmaker/netclient/local"
 	"github.com/gravitl/netmaker/netclient/ncutils"
 	"github.com/gravitl/netmaker/netclient/wireguard"
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
-// == Message Caches ==
 var messageCache = new(sync.Map)
 var networkcontext = new(sync.Map)
 
@@ -38,251 +33,29 @@ type cachedMessage struct {
 	LastSeen time.Time
 }
 
-func insert(network, which, cache string) {
-	var newMessage = cachedMessage{
-		Message:  cache,
-		LastSeen: time.Now(),
-	}
-	messageCache.Store(fmt.Sprintf("%s%s", network, which), newMessage)
-}
-
-func read(network, which string) string {
-	val, isok := messageCache.Load(fmt.Sprintf("%s%s", network, which))
-	if isok {
-		var readMessage = val.(cachedMessage) // fetch current cached message
-		if readMessage.LastSeen.IsZero() {
-			return ""
-		}
-		if time.Now().After(readMessage.LastSeen.Add(time.Minute * 10)) { // check if message has been there over a minute
-			messageCache.Delete(fmt.Sprintf("%s%s", network, which)) // remove old message if expired
-			return ""
-		}
-		return readMessage.Message // return current message if not expired
-	}
-	return ""
-}
-
-// == End Message Caches ==
-
 // Daemon runs netclient daemon from command line
 func Daemon() error {
-	networks, err := ncutils.GetSystemNetworks()
-	if err != nil {
-		return err
-	}
+	client := setupMQTT(false)
+	defer client.Disconnect(250)
+	wg := sync.WaitGroup{}
+	ctx, cancel := context.WithCancel(context.Background())
+	networks, _ := ncutils.GetSystemNetworks()
 	for _, network := range networks {
-		ctx, cancel := context.WithCancel(context.Background())
-		networkcontext.Store(network, cancel)
-		go MessageQueue(ctx, network)
+		var cfg config.ClientConfig
+		cfg.Network = network
+		cfg.ReadConfig()
+		initialPull(cfg.Network)
 	}
+	wg.Add(1)
+	go Checkin(ctx, wg)
 	quit := make(chan os.Signal, 1)
 	signal.Notify(quit, syscall.SIGTERM, os.Interrupt)
 	<-quit
-	for _, network := range networks {
-		if cancel, ok := networkcontext.Load(network); ok {
-			cancel.(context.CancelFunc)()
-		}
-	}
-	ncutils.Log("all done")
-	return nil
-
-}
-
-// MessageQueue sets up Message Queue and subsribes/publishes updates to/from server
-func MessageQueue(ctx context.Context, network string) {
-	ncutils.Log("netclient go routine started for " + network)
-	var cfg config.ClientConfig
-	cfg.Network = network
-	initialPull(cfg.Network)
-
-	cfg.ReadConfig()
-	ncutils.Log("daemon started for network: " + network)
-	client := setupMQTT(&cfg, false)
-
-	defer client.Disconnect(250)
-	wg := &sync.WaitGroup{}
-	wg.Add(2)
-	checkinctx, checkincancel := context.WithCancel(context.Background())
-	go Checkin(checkinctx, wg, &cfg, network)
-	<-ctx.Done()
-	checkincancel()
-	ncutils.Log("shutting down message queue for network " + network)
+	cancel()
+	ncutils.Log("shutting down message queue ")
 	wg.Wait()
 	ncutils.Log("shutdown complete")
-}
-
-// All -- mqtt message hander for all ('#') topics
-var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
-	ncutils.Log("default message handler -- received message but not handling")
-	ncutils.Log("Topic: " + string(msg.Topic()))
-	//ncutils.Log("Message: " + string(msg.Payload()))
-}
-
-// NodeUpdate -- mqtt message handler for /update/<NodeID> topic
-func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
-	var newNode models.Node
-	var cfg config.ClientConfig
-	var network = parseNetworkFromTopic(msg.Topic())
-	cfg.Network = network
-	cfg.ReadConfig()
-
-	data, dataErr := decryptMsg(&cfg, msg.Payload())
-	if dataErr != nil {
-		return
-	}
-	err := json.Unmarshal([]byte(data), &newNode)
-	if err != nil {
-		ncutils.Log("error unmarshalling node update data" + err.Error())
-		return
-	}
-
-	ncutils.Log("received message to update node " + newNode.Name)
-	// see if cache hit, if so skip
-	var currentMessage = read(newNode.Network, lastNodeUpdate)
-	if currentMessage == string(data) {
-		return
-	}
-	insert(newNode.Network, lastNodeUpdate, string(data)) // store new message in cache
-
-	//check if interface name has changed if so delete.
-	if cfg.Node.Interface != newNode.Interface {
-		if err = wireguard.RemoveConf(cfg.Node.Interface, true); err != nil {
-			ncutils.PrintLog("could not delete old interface "+cfg.Node.Interface+": "+err.Error(), 1)
-		}
-	}
-	//ensure that OS never changes
-	newNode.OS = runtime.GOOS
-	// check if interface needs to delta
-	ifaceDelta := ncutils.IfaceDelta(&cfg.Node, &newNode)
-	shouldDNSChange := cfg.Node.DNSOn != newNode.DNSOn
-
-	cfg.Node = newNode
-	switch newNode.Action {
-	case models.NODE_DELETE:
-		if cancel, ok := networkcontext.Load(newNode.Network); ok {
-			ncutils.Log("cancelling message queue context for " + newNode.Network)
-			cancel.(context.CancelFunc)()
-		} else {
-			ncutils.Log("failed to kill go routines for network " + newNode.Network)
-		}
-		ncutils.PrintLog(fmt.Sprintf("received delete request for %s", cfg.Node.Name), 1)
-		if err = LeaveNetwork(cfg.Node.Network); err != nil {
-			if !strings.Contains("rpc error", err.Error()) {
-				ncutils.PrintLog(fmt.Sprintf("failed to leave, please check that local files for network %s were removed", cfg.Node.Network), 1)
-			}
-			ncutils.PrintLog(fmt.Sprintf("%s was removed", cfg.Node.Name), 1)
-			return
-		}
-		ncutils.PrintLog(fmt.Sprintf("%s was removed", cfg.Node.Name), 1)
-		return
-	case models.NODE_UPDATE_KEY:
-		if err := UpdateKeys(&cfg, client); err != nil {
-			ncutils.PrintLog("err updating wireguard keys: "+err.Error(), 1)
-		}
-	case models.NODE_NOOP:
-	default:
-	}
-	// Save new config
-	cfg.Node.Action = models.NODE_NOOP
-	if err := config.Write(&cfg, cfg.Network); err != nil {
-		ncutils.PrintLog("error updating node configuration: "+err.Error(), 1)
-	}
-	nameserver := cfg.Server.CoreDNSAddr
-	privateKey, err := wireguard.RetrievePrivKey(newNode.Network)
-	if err != nil {
-		ncutils.Log("error reading PrivateKey " + err.Error())
-		return
-	}
-	file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
-	if err := wireguard.UpdateWgInterface(file, privateKey, nameserver, newNode); err != nil {
-		ncutils.Log("error updating wireguard config " + err.Error())
-		return
-	}
-	if ifaceDelta { // if a change caused an ifacedelta we need to notify the server to update the peers
-		ackErr := publishSignal(&cfg, ncutils.ACK)
-		if ackErr != nil {
-			ncutils.Log("could not notify server that it received an interface update")
-		} else {
-			ncutils.Log("signalled acknowledgement of change to server")
-		}
-		ncutils.Log("applying WG conf to " + file)
-		err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file)
-		if err != nil {
-			ncutils.Log("error restarting wg after node update " + err.Error())
-			return
-		}
-
-		time.Sleep(time.Second >> 1)
-		if newNode.DNSOn == "yes" {
-			for _, server := range newNode.NetworkSettings.DefaultServerAddrs {
-				if server.IsLeader {
-					go local.SetDNSWithRetry(newNode, server.Address)
-					break
-				}
-			}
-		}
-		doneErr := publishSignal(&cfg, ncutils.DONE)
-		if doneErr != nil {
-			ncutils.Log("could not notify server to update peers after interface change")
-		} else {
-			ncutils.Log("signalled finshed interface update to server")
-		}
-	}
-	//deal with DNS
-	if newNode.DNSOn != "yes" && shouldDNSChange && cfg.Node.Interface != "" {
-		ncutils.Log("settng DNS off")
-		_, err := ncutils.RunCmd("/usr/bin/resolvectl revert "+cfg.Node.Interface, true)
-		if err != nil {
-			ncutils.Log("error applying dns" + err.Error())
-		}
-	}
-}
-
-// UpdatePeers -- mqtt message handler for peers/<Network>/<NodeID> topic
-func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
-	var peerUpdate models.PeerUpdate
-	var network = parseNetworkFromTopic(msg.Topic())
-	var cfg = config.ClientConfig{}
-	cfg.Network = network
-	cfg.ReadConfig()
-
-	data, dataErr := decryptMsg(&cfg, msg.Payload())
-	if dataErr != nil {
-		return
-	}
-	err := json.Unmarshal([]byte(data), &peerUpdate)
-	if err != nil {
-		ncutils.Log("error unmarshalling peer data")
-		return
-	}
-	// see if cached hit, if so skip
-	var currentMessage = read(peerUpdate.Network, lastPeerUpdate)
-	if currentMessage == string(data) {
-		return
-	}
-	insert(peerUpdate.Network, lastPeerUpdate, string(data))
-
-	file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
-	spew.Dump(peerUpdate.Peers)
-	err = wireguard.UpdateWgPeers(file, peerUpdate.Peers)
-	if err != nil {
-		ncutils.Log("error updating wireguard peers" + err.Error())
-		return
-	}
-	//err = wireguard.SyncWGQuickConf(cfg.Node.Interface, file)
-	var iface = cfg.Node.Interface
-	if ncutils.IsMac() {
-		iface, err = local.GetMacIface(cfg.Node.Address)
-		if err != nil {
-			ncutils.Log("error retrieving mac iface: " + err.Error())
-			return
-		}
-	}
-	err = wireguard.SetPeers(iface, cfg.Node.Address, cfg.Node.PersistentKeepalive, peerUpdate.Peers)
-	if err != nil {
-		ncutils.Log("error syncing wg after peer update: " + err.Error())
-		return
-	}
+	return nil
 }
 
 // UpdateKeys -- updates private key and returns new publickey
@@ -310,96 +83,29 @@ func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error {
 	return nil
 }
 
-// Checkin  -- go routine that checks for public or local ip changes, publishes changes
-//   if there are no updates, simply "pings" the server as a checkin
-func Checkin(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig, network string) {
-	defer wg.Done()
-	for {
-		select {
-		case <-ctx.Done():
-			ncutils.Log("Checkin cancelled")
-			return
-			//delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ??
-		case <-time.After(time.Second * 60):
-			// ncutils.Log("Checkin running")
-			//read latest config
-			cfg.ReadConfig()
-			if cfg.Node.IsStatic != "yes" {
-				extIP, err := ncutils.GetPublicIP()
-				if err != nil {
-					ncutils.PrintLog("error encountered checking public ip addresses: "+err.Error(), 1)
-				}
-				if cfg.Node.Endpoint != extIP && extIP != "" {
-					ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+extIP, 1)
-					cfg.Node.Endpoint = extIP
-					if err := PublishNodeUpdate(cfg); err != nil {
-						ncutils.Log("could not publish endpoint change")
-					}
-				}
-				intIP, err := getPrivateAddr()
-				if err != nil {
-					ncutils.PrintLog("error encountered checking private ip addresses: "+err.Error(), 1)
-				}
-				if cfg.Node.LocalAddress != intIP && intIP != "" {
-					ncutils.PrintLog("local Address has changed from "+cfg.Node.LocalAddress+" to "+intIP, 1)
-					cfg.Node.LocalAddress = intIP
-					if err := PublishNodeUpdate(cfg); err != nil {
-						ncutils.Log("could not publish local address change")
-					}
-				}
-			} else if cfg.Node.IsLocal == "yes" && cfg.Node.LocalRange != "" {
-				localIP, err := ncutils.GetLocalIP(cfg.Node.LocalRange)
-				if err != nil {
-					ncutils.PrintLog("error encountered checking local ip addresses: "+err.Error(), 1)
-				}
-				if cfg.Node.Endpoint != localIP && localIP != "" {
-					ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+localIP, 1)
-					cfg.Node.Endpoint = localIP
-					if err := PublishNodeUpdate(cfg); err != nil {
-						ncutils.Log("could not publish localip change")
-					}
-				}
-			}
-			if err := pingServer(cfg); err != nil {
-				ncutils.PrintLog("could not ping server "+err.Error(), 0)
-			}
-			Hello(cfg, network)
-			// ncutils.Log("Checkin complete")
-		}
-	}
-}
-
-// PublishNodeUpdates -- saves node and pushes changes to broker
-func PublishNodeUpdate(cfg *config.ClientConfig) error {
-	if err := config.Write(cfg, cfg.Network); err != nil {
-		return err
-	}
-	data, err := json.Marshal(cfg.Node)
+// PingServer -- checks if server is reachable
+func PingServer(cfg *config.ClientConfig) error {
+	node := getServerAddress(cfg)
+	pinger, err := ping.NewPinger(node)
 	if err != nil {
 		return err
 	}
-	if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data, 1); err != nil {
-		return err
+	pinger.Timeout = 2 * time.Second
+	pinger.Run()
+	stats := pinger.Statistics()
+	if stats.PacketLoss == 100 {
+		return errors.New("ping error")
 	}
 	return nil
 }
 
-// Hello -- ping the broker to let server know node is alive and doing fine
-func Hello(cfg *config.ClientConfig, network string) {
-	if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte(ncutils.Version), 0); err != nil {
-		ncutils.Log(fmt.Sprintf("error publishing ping, %v", err))
-		ncutils.Log("running pull on " + cfg.Node.Network + " to reconnect")
-		_, err := Pull(cfg.Node.Network, true)
-		if err != nil {
-			ncutils.Log("could not run pull on " + cfg.Node.Network + ", error: " + err.Error())
-		}
-	}
-}
-
 // == Private ==
 
 // setupMQTT creates a connection to broker and return client
-func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
+func setupMQTT(publish bool) mqtt.Client {
+	var cfg *config.ClientConfig
+	cfg.Network = ncutils.COMMS_NETWORK_NAME
+	cfg.ReadConfig()
 	opts := mqtt.NewClientOptions()
 	server := getServerAddress(cfg)
 	opts.AddBroker(server + ":1883")
@@ -413,31 +119,11 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
 	opts.SetWriteTimeout(time.Minute)
 	opts.SetOnConnectHandler(func(client mqtt.Client) {
 		if !publish {
-			if cfg.DebugOn {
-				if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
-					ncutils.Log(token.Error().Error())
-					return
-				}
-				ncutils.Log("subscribed to all topics for debugging purposes")
-			}
-			if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
-				ncutils.Log(token.Error().Error())
-				return
-			}
-			if cfg.DebugOn {
-				ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
-			}
-			if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
-				ncutils.Log(token.Error().Error())
-				return
-			}
-			if cfg.DebugOn {
-				ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
-			}
-			opts.SetOrderMatters(true)
-			opts.SetResumeSubs(true)
+			SetSubscriptions(client, cfg)
 		}
 	})
+	opts.SetOrderMatters(true)
+	opts.SetResumeSubs(true)
 	opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
 		ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network)
 		_, err := Pull(cfg.Node.Network, true)
@@ -486,6 +172,41 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
 	return client
 }
 
+// SetSubscriptions - sets MQ subscriptions
+func SetSubscriptions(client mqtt.Client, cfg *config.ClientConfig) {
+	if cfg.DebugOn {
+		if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
+			ncutils.Log(token.Error().Error())
+			return
+		}
+		ncutils.Log("subscribed to all topics for debugging purposes")
+	}
+	networks, err := ncutils.GetSystemNetworks()
+	if err != nil {
+		ncutils.Log("error retriving networks " + err.Error())
+	}
+	for _, network := range networks {
+		var cfg config.ClientConfig
+		cfg.Network = network
+		cfg.ReadConfig()
+
+		if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
+			ncutils.Log(token.Error().Error())
+			return
+		}
+		if cfg.DebugOn {
+			ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
+		}
+		if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
+			ncutils.Log(token.Error().Error())
+			return
+		}
+		if cfg.DebugOn {
+			ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
+		}
+	}
+}
+
 // publishes a message to server to update peers on this peer's behalf
 func publishSignal(cfg *config.ClientConfig, signal byte) error {
 	if err := publish(cfg, fmt.Sprintf("signal/%s", cfg.Node.ID), []byte{signal}, 1); err != nil {
@@ -522,31 +243,6 @@ func initialPull(network string) {
 	}
 }
 
-func publish(cfg *config.ClientConfig, dest string, msg []byte, qos byte) error {
-	// setup the keys
-	trafficPrivKey, err := auth.RetrieveTrafficKey(cfg.Node.Network)
-	if err != nil {
-		return err
-	}
-
-	serverPubKey, err := ncutils.ConvertBytesToKey(cfg.Node.TrafficKeys.Server)
-	if err != nil {
-		return err
-	}
-
-	client := setupMQTT(cfg, true)
-	defer client.Disconnect(250)
-	encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey)
-	if err != nil {
-		return err
-	}
-
-	if token := client.Publish(dest, qos, false, encrypted); token.Wait() && token.Error() != nil {
-		return token.Error()
-	}
-	return nil
-}
-
 func parseNetworkFromTopic(topic string) string {
 	return strings.Split(topic, "/")[1]
 }
@@ -570,21 +266,6 @@ func decryptMsg(cfg *config.ClientConfig, msg []byte) ([]byte, error) {
 	return ncutils.DeChunk(msg, serverPubKey, diskKey)
 }
 
-func pingServer(cfg *config.ClientConfig) error {
-	node := getServerAddress(cfg)
-	pinger, err := ping.NewPinger(node)
-	if err != nil {
-		return err
-	}
-	pinger.Timeout = 2 * time.Second
-	pinger.Run()
-	stats := pinger.Statistics()
-	if stats.PacketLoss == 100 {
-		return errors.New("ping error")
-	}
-	return nil
-}
-
 func getServerAddress(cfg *config.ClientConfig) string {
 	var server models.ServerAddr
 	for _, server = range cfg.Node.NetworkSettings.DefaultServerAddrs {
@@ -594,3 +275,31 @@ func getServerAddress(cfg *config.ClientConfig) string {
 	}
 	return server.Address
 }
+
+// == Message Caches ==
+
+func insert(network, which, cache string) {
+	var newMessage = cachedMessage{
+		Message:  cache,
+		LastSeen: time.Now(),
+	}
+	messageCache.Store(fmt.Sprintf("%s%s", network, which), newMessage)
+}
+
+func read(network, which string) string {
+	val, isok := messageCache.Load(fmt.Sprintf("%s%s", network, which))
+	if isok {
+		var readMessage = val.(cachedMessage) // fetch current cached message
+		if readMessage.LastSeen.IsZero() {
+			return ""
+		}
+		if time.Now().After(readMessage.LastSeen.Add(time.Minute * 10)) { // check if message has been there over a minute
+			messageCache.Delete(fmt.Sprintf("%s%s", network, which)) // remove old message if expired
+			return ""
+		}
+		return readMessage.Message // return current message if not expired
+	}
+	return ""
+}
+
+// == End Message Caches ==

+ 193 - 0
netclient/functions/mqhandlers.go

@@ -0,0 +1,193 @@
+package functions
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"runtime"
+	"strings"
+	"time"
+
+	"github.com/davecgh/go-spew/spew"
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"github.com/gravitl/netmaker/models"
+	"github.com/gravitl/netmaker/netclient/config"
+	"github.com/gravitl/netmaker/netclient/local"
+	"github.com/gravitl/netmaker/netclient/ncutils"
+	"github.com/gravitl/netmaker/netclient/wireguard"
+)
+
+// All -- mqtt message hander for all ('#') topics
+var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
+	ncutils.Log("default message handler -- received message but not handling")
+	ncutils.Log("Topic: " + string(msg.Topic()))
+	//ncutils.Log("Message: " + string(msg.Payload()))
+}
+
+// NodeUpdate -- mqtt message handler for /update/<NodeID> topic
+func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
+	var newNode models.Node
+	var cfg config.ClientConfig
+	var network = parseNetworkFromTopic(msg.Topic())
+	cfg.Network = network
+	cfg.ReadConfig()
+
+	data, dataErr := decryptMsg(&cfg, msg.Payload())
+	if dataErr != nil {
+		return
+	}
+	err := json.Unmarshal([]byte(data), &newNode)
+	if err != nil {
+		ncutils.Log("error unmarshalling node update data" + err.Error())
+		return
+	}
+
+	ncutils.Log("received message to update node " + newNode.Name)
+	// see if cache hit, if so skip
+	var currentMessage = read(newNode.Network, lastNodeUpdate)
+	if currentMessage == string(data) {
+		return
+	}
+	insert(newNode.Network, lastNodeUpdate, string(data)) // store new message in cache
+
+	//check if interface name has changed if so delete.
+	if cfg.Node.Interface != newNode.Interface {
+		if err = wireguard.RemoveConf(cfg.Node.Interface, true); err != nil {
+			ncutils.PrintLog("could not delete old interface "+cfg.Node.Interface+": "+err.Error(), 0)
+		}
+	}
+	//ensure that OS never changes
+	newNode.OS = runtime.GOOS
+	// check if interface needs to delta
+	ifaceDelta := ncutils.IfaceDelta(&cfg.Node, &newNode)
+	shouldDNSChange := cfg.Node.DNSOn != newNode.DNSOn
+
+	cfg.Node = newNode
+	switch newNode.Action {
+	case models.NODE_DELETE:
+		if cancel, ok := networkcontext.Load(newNode.Network); ok {
+			ncutils.Log("cancelling message queue context for " + newNode.Network)
+			cancel.(context.CancelFunc)()
+		} else {
+			ncutils.Log("failed to kill go routines for network " + newNode.Network)
+		}
+		ncutils.PrintLog(fmt.Sprintf("received delete request for %s", cfg.Node.Name), 0)
+		if err = LeaveNetwork(cfg.Node.Network); err != nil {
+			if !strings.Contains("rpc error", err.Error()) {
+				ncutils.PrintLog(fmt.Sprintf("failed to leave, please check that local files for network %s were removed", cfg.Node.Network), 0)
+			}
+			ncutils.PrintLog(fmt.Sprintf("%s was removed", cfg.Node.Name), 0)
+			return
+		}
+		ncutils.PrintLog(fmt.Sprintf("%s was removed", cfg.Node.Name), 0)
+		return
+	case models.NODE_UPDATE_KEY:
+		if err := UpdateKeys(&cfg, client); err != nil {
+			ncutils.PrintLog("err updating wireguard keys: "+err.Error(), 0)
+		}
+	case models.NODE_NOOP:
+	default:
+	}
+	// Save new config
+	cfg.Node.Action = models.NODE_NOOP
+	if err := config.Write(&cfg, cfg.Network); err != nil {
+		ncutils.PrintLog("error updating node configuration: "+err.Error(), 0)
+	}
+	nameserver := cfg.Server.CoreDNSAddr
+	privateKey, err := wireguard.RetrievePrivKey(newNode.Network)
+	if err != nil {
+		ncutils.Log("error reading PrivateKey " + err.Error())
+		return
+	}
+	file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
+
+	if err := wireguard.UpdateWgInterface(file, privateKey, nameserver, newNode); err != nil {
+		ncutils.Log("error updating wireguard config " + err.Error())
+		return
+	}
+	if ifaceDelta { // if a change caused an ifacedelta we need to notify the server to update the peers
+		ackErr := publishSignal(&cfg, ncutils.ACK)
+		if ackErr != nil {
+			ncutils.Log("could not notify server that it received an interface update")
+		} else {
+			ncutils.Log("signalled acknowledgement of change to server")
+		}
+		ncutils.Log("applying WG conf to " + file)
+		err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file)
+		if err != nil {
+			ncutils.Log("error restarting wg after node update " + err.Error())
+			return
+		}
+
+		time.Sleep(time.Second >> 0)
+		if newNode.DNSOn == "yes" {
+			for _, server := range newNode.NetworkSettings.DefaultServerAddrs {
+				if server.IsLeader {
+					go local.SetDNSWithRetry(newNode, server.Address)
+					break
+				}
+			}
+		}
+		doneErr := publishSignal(&cfg, ncutils.DONE)
+		if doneErr != nil {
+			ncutils.Log("could not notify server to update peers after interface change")
+		} else {
+			ncutils.Log("signalled finshed interface update to server")
+		}
+	}
+	//deal with DNS
+	if newNode.DNSOn != "yes" && shouldDNSChange && cfg.Node.Interface != "" {
+		ncutils.Log("settng DNS off")
+		_, err := ncutils.RunCmd("/usr/bin/resolvectl revert "+cfg.Node.Interface, true)
+		if err != nil {
+			ncutils.Log("error applying dns" + err.Error())
+		}
+	}
+}
+
+// UpdatePeers -- mqtt message handler for peers/<Network>/<NodeID> topic
+func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
+	var peerUpdate models.PeerUpdate
+	var network = parseNetworkFromTopic(msg.Topic())
+	var cfg = config.ClientConfig{}
+	cfg.Network = network
+	cfg.ReadConfig()
+
+	data, dataErr := decryptMsg(&cfg, msg.Payload())
+	if dataErr != nil {
+		return
+	}
+	err := json.Unmarshal([]byte(data), &peerUpdate)
+	if err != nil {
+		ncutils.Log("error unmarshalling peer data")
+		return
+	}
+	// see if cached hit, if so skip
+	var currentMessage = read(peerUpdate.Network, lastPeerUpdate)
+	if currentMessage == string(data) {
+		return
+	}
+	insert(peerUpdate.Network, lastPeerUpdate, string(data))
+
+	file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
+	spew.Dump(peerUpdate.Peers)
+	err = wireguard.UpdateWgPeers(file, peerUpdate.Peers)
+	if err != nil {
+		ncutils.Log("error updating wireguard peers" + err.Error())
+		return
+	}
+	//err = wireguard.SyncWGQuickConf(cfg.Node.Interface, file)
+	var iface = cfg.Node.Interface
+	if ncutils.IsMac() {
+		iface, err = local.GetMacIface(cfg.Node.Address)
+		if err != nil {
+			ncutils.Log("error retrieving mac iface: " + err.Error())
+			return
+		}
+	}
+	err = wireguard.SetPeers(iface, cfg.Node.Address, cfg.Node.PersistentKeepalive, peerUpdate.Peers)
+	if err != nil {
+		ncutils.Log("error syncing wg after peer update: " + err.Error())
+		return
+	}
+}

+ 135 - 0
netclient/functions/mqpublish.go

@@ -0,0 +1,135 @@
+package functions
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/gravitl/netmaker/netclient/auth"
+	"github.com/gravitl/netmaker/netclient/config"
+	"github.com/gravitl/netmaker/netclient/ncutils"
+)
+
+// Checkin  -- go routine that checks for public or local ip changes, publishes changes
+//   if there are no updates, simply "pings" the server as a checkin
+func Checkin(ctx context.Context, wg sync.WaitGroup) {
+	defer wg.Done()
+	for {
+		select {
+		case <-ctx.Done():
+			ncutils.Log("Checkin cancelled")
+			return
+			//delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ??
+		case <-time.After(time.Second * 60):
+			// ncutils.Log("Checkin running")
+			//read latest config
+			networks, err := ncutils.GetSystemNetworks()
+			if err != nil {
+				return
+			}
+			for _, network := range networks {
+				if network == ncutils.COMMS_NETWORK_NAME {
+					continue
+				}
+				var cfg *config.ClientConfig
+				cfg.Network = network
+				cfg.ReadConfig()
+				if cfg.Node.IsStatic != "yes" {
+					extIP, err := ncutils.GetPublicIP()
+					if err != nil {
+						ncutils.PrintLog("error encountered checking public ip addresses: "+err.Error(), 1)
+					}
+					if cfg.Node.Endpoint != extIP && extIP != "" {
+						ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+extIP, 1)
+						cfg.Node.Endpoint = extIP
+						if err := PublishNodeUpdate(cfg); err != nil {
+							ncutils.Log("could not publish endpoint change")
+						}
+					}
+					intIP, err := getPrivateAddr()
+					if err != nil {
+						ncutils.PrintLog("error encountered checking private ip addresses: "+err.Error(), 1)
+					}
+					if cfg.Node.LocalAddress != intIP && intIP != "" {
+						ncutils.PrintLog("local Address has changed from "+cfg.Node.LocalAddress+" to "+intIP, 1)
+						cfg.Node.LocalAddress = intIP
+						if err := PublishNodeUpdate(cfg); err != nil {
+							ncutils.Log("could not publish local address change")
+						}
+					}
+				} else if cfg.Node.IsLocal == "yes" && cfg.Node.LocalRange != "" {
+					localIP, err := ncutils.GetLocalIP(cfg.Node.LocalRange)
+					if err != nil {
+						ncutils.PrintLog("error encountered checking local ip addresses: "+err.Error(), 1)
+					}
+					if cfg.Node.Endpoint != localIP && localIP != "" {
+						ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+localIP, 1)
+						cfg.Node.Endpoint = localIP
+						if err := PublishNodeUpdate(cfg); err != nil {
+							ncutils.Log("could not publish localip change")
+						}
+					}
+				}
+				if err := PingServer(cfg); err != nil {
+					ncutils.PrintLog("could not ping server "+err.Error(), 0)
+				}
+				Hello(cfg, network)
+				// ncutils.Log("Checkin complete")
+			}
+		}
+	}
+}
+
+// PublishNodeUpdates -- saves node and pushes changes to broker
+func PublishNodeUpdate(cfg *config.ClientConfig) error {
+	if err := config.Write(cfg, cfg.Network); err != nil {
+		return err
+	}
+	data, err := json.Marshal(cfg.Node)
+	if err != nil {
+		return err
+	}
+	if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data, 1); err != nil {
+		return err
+	}
+	return nil
+}
+
+// Hello -- ping the broker to let server know node is alive and doing fine
+func Hello(cfg *config.ClientConfig, network string) {
+	if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte(ncutils.Version), 0); err != nil {
+		ncutils.Log(fmt.Sprintf("error publishing ping, %v", err))
+		ncutils.Log("running pull on " + cfg.Node.Network + " to reconnect")
+		_, err := Pull(cfg.Node.Network, true)
+		if err != nil {
+			ncutils.Log("could not run pull on " + cfg.Node.Network + ", error: " + err.Error())
+		}
+	}
+}
+
+func publish(cfg *config.ClientConfig, dest string, msg []byte, qos byte) error {
+	// setup the keys
+	trafficPrivKey, err := auth.RetrieveTrafficKey(cfg.Node.Network)
+	if err != nil {
+		return err
+	}
+
+	serverPubKey, err := ncutils.ConvertBytesToKey(cfg.Node.TrafficKeys.Server)
+	if err != nil {
+		return err
+	}
+
+	client := setupMQTT(true)
+	defer client.Disconnect(250)
+	encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey)
+	if err != nil {
+		return err
+	}
+
+	if token := client.Publish(dest, qos, false, encrypted); token.Wait() && token.Error() != nil {
+		return token.Error()
+	}
+	return nil
+}

+ 2 - 0
netclient/ncutils/constants.go

@@ -7,4 +7,6 @@ const (
 	DONE = 2
 	// KEY - key update completed signal for MQ
 	KEY = 3
+	// COMMS_NETWORK_NAME - name of signalling network
+	COMMS_NETWORK_NAME = "n37m8k3r"
 )