Browse Source

refactored and cleaned up code, added peer update from clients

0xdcarns 3 years ago
parent
commit
867e253c3b
5 changed files with 347 additions and 348 deletions
  1. 105 0
      mq/handlers.go
  2. 4 223
      mq/mq.go
  3. 125 0
      mq/publishers.go
  4. 11 0
      mq/util.go
  5. 102 125
      netclient/functions/daemon.go

+ 105 - 0
mq/handlers.go

@@ -0,0 +1,105 @@
+package mq
+
+import (
+	"encoding/json"
+
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"github.com/gravitl/netmaker/database"
+	"github.com/gravitl/netmaker/logger"
+	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/models"
+)
+
+// DefaultHandler default message queue handler - only called when GetDebug == true
+func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
+	logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload()))
+}
+
+// Ping message Handler -- handles ping topic from client nodes
+func Ping(client mqtt.Client, msg mqtt.Message) {
+	logger.Log(0, "Ping Handler: ", msg.Topic())
+	go func() {
+		id, err := getID(msg.Topic())
+		if err != nil {
+			logger.Log(0, "error getting node.ID sent on ping topic ")
+			return
+		}
+		node, err := logic.GetNodeByID(id)
+		if err != nil {
+			logger.Log(0, "mq-ping error getting node: ", err.Error())
+			record, err := database.FetchRecord(database.NODES_TABLE_NAME, id)
+			if err != nil {
+				logger.Log(0, "error reading database ", err.Error())
+				return
+			}
+			logger.Log(0, "record from database")
+			logger.Log(0, record)
+			return
+		}
+		_, decryptErr := decryptMsg(&node, msg.Payload())
+		if decryptErr != nil {
+			logger.Log(0, "error decrypting when updating node ", node.ID, decryptErr.Error())
+			return
+		}
+		node.SetLastCheckIn()
+		if err := logic.UpdateNode(&node, &node); err != nil {
+			logger.Log(0, "error updating node", node.Name, node.ID, " on checkin", err.Error())
+			return
+		}
+		logger.Log(3, "ping processed for node", node.ID)
+		// --TODO --set client version once feature is implemented.
+		//node.SetClientVersion(msg.Payload())
+	}()
+}
+
+// UpdateNode  message Handler -- handles updates from client nodes
+func UpdateNode(client mqtt.Client, msg mqtt.Message) {
+	go func() {
+		id, err := getID(msg.Topic())
+		if err != nil {
+			logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
+			return
+		}
+		currentNode, err := logic.GetNodeByID(id)
+		if err != nil {
+			logger.Log(1, "error getting node ", id, err.Error())
+			return
+		}
+		decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
+		if decryptErr != nil {
+			logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
+			return
+		}
+		var newNode models.Node
+		if err := json.Unmarshal(decrypted, &newNode); err != nil {
+			logger.Log(1, "error unmarshaling payload ", err.Error())
+			return
+		}
+		if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
+			logger.Log(1, "error saving node", err.Error())
+			return
+		}
+		logger.Log(1, "updated node", id, newNode.Name)
+	}()
+}
+
+// ClientPeerUpdate  message handler -- handles updating peers after signal from client nodes
+func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
+	go func() {
+		id, err := getID(msg.Topic())
+		if err != nil {
+			logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
+			return
+		}
+		currentNode, err := logic.GetNodeByID(id)
+		if err != nil {
+			logger.Log(1, "error getting node ", id, err.Error())
+			return
+		}
+		if err := PublishPeerUpdate(&currentNode); err != nil {
+			logger.Log(1, "error publishing peer update ", err.Error())
+			return
+		}
+		logger.Log(1, "sent peer updates after signal received from", id, currentNode.Name)
+	}()
+}

+ 4 - 223
mq/mq.go

@@ -2,18 +2,11 @@ package mq
 
 import (
 	"context"
-	"encoding/json"
-	"errors"
-	"fmt"
 	"log"
-	"strings"
 	"time"
 
 	mqtt "github.com/eclipse/paho.mqtt.golang"
-	"github.com/gravitl/netmaker/database"
 	"github.com/gravitl/netmaker/logger"
-	"github.com/gravitl/netmaker/logic"
-	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/netclient/ncutils"
 	"github.com/gravitl/netmaker/servercfg"
 )
@@ -25,171 +18,6 @@ const MQ_DISCONNECT = 250
 
 var peer_force_send = 0
 
-// DefaultHandler default message queue handler - only called when GetDebug == true
-func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
-	logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload()))
-}
-
-// Ping message Handler -- handles ping topic from client nodes
-func Ping(client mqtt.Client, msg mqtt.Message) {
-	logger.Log(0, "Ping Handler: ", msg.Topic())
-	go func() {
-		id, err := GetID(msg.Topic())
-		if err != nil {
-			logger.Log(0, "error getting node.ID sent on ping topic ")
-			return
-		}
-		node, err := logic.GetNodeByID(id)
-		if err != nil {
-			logger.Log(0, "mq-ping error getting node: ", err.Error())
-			record, err := database.FetchRecord(database.NODES_TABLE_NAME, id)
-			if err != nil {
-				logger.Log(0, "error reading database ", err.Error())
-				return
-			}
-			logger.Log(0, "record from database")
-			logger.Log(0, record)
-			return
-		}
-		_, decryptErr := decryptMsg(&node, msg.Payload())
-		if decryptErr != nil {
-			logger.Log(0, "error decrypting when updating node ", node.ID, decryptErr.Error())
-			return
-		}
-		node.SetLastCheckIn()
-		if err := logic.UpdateNode(&node, &node); err != nil {
-			logger.Log(0, "error updating node", node.Name, node.ID, " on checkin", err.Error())
-			return
-		}
-		logger.Log(3, "ping processed for node", node.ID)
-		// --TODO --set client version once feature is implemented.
-		//node.SetClientVersion(msg.Payload())
-	}()
-}
-
-// UpdateNode  message Handler -- handles updates from client nodes
-func UpdateNode(client mqtt.Client, msg mqtt.Message) {
-	go func() {
-		id, err := GetID(msg.Topic())
-		if err != nil {
-			logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
-			return
-		}
-		currentNode, err := logic.GetNodeByID(id)
-		if err != nil {
-			logger.Log(1, "error getting node ", id, err.Error())
-			return
-		}
-		decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
-		if decryptErr != nil {
-			logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
-			return
-		}
-		var newNode models.Node
-		if err := json.Unmarshal(decrypted, &newNode); err != nil {
-			logger.Log(1, "error unmarshaling payload ", err.Error())
-			return
-		}
-		if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
-			logger.Log(1, "error saving node", err.Error())
-			return
-		}
-		if err := PublishPeerUpdate(&newNode); err != nil {
-			logger.Log(1, "error publishing peer update ", err.Error())
-			return
-		}
-		logger.Log(1, "Updated node", id, newNode.Name)
-	}()
-}
-
-// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node
-func PublishPeerUpdate(newNode *models.Node) error {
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
-	networkNodes, err := logic.GetNetworkNodes(newNode.Network)
-	if err != nil {
-		logger.Log(1, "err getting Network Nodes", err.Error())
-		return err
-	}
-	for _, node := range networkNodes {
-
-		if node.IsServer == "yes" || node.ID == newNode.ID {
-			continue
-		}
-		peerUpdate, err := logic.GetPeerUpdate(&node)
-		if err != nil {
-			logger.Log(1, "error getting peer update for node", node.ID, err.Error())
-			continue
-		}
-		data, err := json.Marshal(&peerUpdate)
-		if err != nil {
-			logger.Log(2, "error marshaling peer update for node", node.ID, err.Error())
-			continue
-		}
-		if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil {
-			logger.Log(1, "failed to publish peer update for node", node.ID)
-		} else {
-			logger.Log(1, "sent peer update for node", node.Name, "on network:", node.Network)
-		}
-	}
-	return nil
-}
-
-// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node
-func PublishExtPeerUpdate(node *models.Node) error {
-	var err error
-	if logic.IsLocalServer(node) {
-		if err = logic.ServerUpdate(node, false); err != nil {
-			logger.Log(1, "server node:", node.ID, "failed to update peers with ext clients")
-			return err
-		} else {
-			return nil
-		}
-	}
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
-	peerUpdate, err := logic.GetPeerUpdate(node)
-	if err != nil {
-		return err
-	}
-	data, err := json.Marshal(&peerUpdate)
-	if err != nil {
-		return err
-	}
-	return publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data)
-}
-
-// GetID -- decodes a message queue topic and returns the embedded node.ID
-func GetID(topic string) (string, error) {
-	parts := strings.Split(topic, "/")
-	count := len(parts)
-	if count == 1 {
-		return "", errors.New("invalid topic")
-	}
-	//the last part of the topic will be the node.ID
-	return parts[count-1], nil
-}
-
-// NodeUpdate -- publishes a node update
-func NodeUpdate(node *models.Node) error {
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
-	logger.Log(3, "publishing node update to "+node.Name)
-	data, err := json.Marshal(node)
-	if err != nil {
-		logger.Log(2, "error marshalling node update ", err.Error())
-		return err
-	}
-	if err = publish(node, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil {
-		logger.Log(2, "error publishing node update to peer ", node.ID, err.Error())
-		return err
-	}
-	return nil
-}
-
 // SetupMQTT creates a connection to broker and return client
 func SetupMQTT(publish bool) mqtt.Client {
 	opts := mqtt.NewClientOptions()
@@ -217,6 +45,10 @@ func SetupMQTT(publish bool) mqtt.Client {
 				client.Disconnect(240)
 				logger.Log(0, "node update subscription failed")
 			}
+			if token := client.Subscribe("clients/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.Wait() && token.Error() != nil {
+				client.Disconnect(240)
+				logger.Log(0, "node client subscription failed")
+			}
 
 			opts.SetOrderMatters(true)
 			opts.SetResumeSubs(true)
@@ -249,54 +81,3 @@ func Keepalive(ctx context.Context) {
 		}
 	}
 }
-
-// sendPeers - retrieve networks, send peer ports to all peers
-func sendPeers() {
-	var force bool
-	peer_force_send++
-	if peer_force_send == 5 {
-		force = true
-		peer_force_send = 0
-	}
-	networks, err := logic.GetNetworks()
-	if err != nil {
-		logger.Log(1, "error retrieving networks for keepalive", err.Error())
-	}
-	for _, network := range networks {
-		serverNode, errN := logic.GetNetworkServerLeader(network.NetID)
-		if errN == nil {
-			serverNode.SetLastCheckIn()
-			logic.UpdateNode(&serverNode, &serverNode)
-			if network.DefaultUDPHolePunch == "yes" {
-				if logic.ShouldPublishPeerPorts(&serverNode) || force {
-					if force {
-						logger.Log(2, "sending scheduled peer update (5 min)")
-					}
-					err = PublishPeerUpdate(&serverNode)
-					if err != nil {
-						logger.Log(1, "error publishing udp port updates for network", network.NetID)
-						logger.Log(1, errN.Error())
-					}
-				}
-			}
-		} else {
-			logger.Log(1, "unable to retrieve leader for network ", network.NetID)
-			logger.Log(1, errN.Error())
-			continue
-		}
-	}
-}
-
-// func publishServerKeepalive(client mqtt.Client, network *models.Network) {
-// 	nodes, err := logic.GetNetworkNodes(network.NetID)
-// 	if err != nil {
-// 		return
-// 	}
-// 	for _, node := range nodes {
-// 		if token := client.Publish(fmt.Sprintf("serverkeepalive/%s/%s", network.NetID, node.ID), 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil {
-// 			logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error())
-// 		} else {
-// 			logger.Log(2, "keepalive sent for network/node", network.NetID, node.ID)
-// 		}
-// 	}
-// }

+ 125 - 0
mq/publishers.go

@@ -0,0 +1,125 @@
+package mq
+
+import (
+	"encoding/json"
+	"fmt"
+
+	"github.com/gravitl/netmaker/logger"
+	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/models"
+	"github.com/gravitl/netmaker/servercfg"
+)
+
+// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node
+func PublishPeerUpdate(newNode *models.Node) error {
+	if !servercfg.IsMessageQueueBackend() {
+		return nil
+	}
+	networkNodes, err := logic.GetNetworkNodes(newNode.Network)
+	if err != nil {
+		logger.Log(1, "err getting Network Nodes", err.Error())
+		return err
+	}
+	for _, node := range networkNodes {
+
+		if node.IsServer == "yes" || node.ID == newNode.ID {
+			continue
+		}
+		peerUpdate, err := logic.GetPeerUpdate(&node)
+		if err != nil {
+			logger.Log(1, "error getting peer update for node", node.ID, err.Error())
+			continue
+		}
+		data, err := json.Marshal(&peerUpdate)
+		if err != nil {
+			logger.Log(2, "error marshaling peer update for node", node.ID, err.Error())
+			continue
+		}
+		if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil {
+			logger.Log(1, "failed to publish peer update for node", node.ID)
+		} else {
+			logger.Log(1, "sent peer update for node", node.Name, "on network:", node.Network)
+		}
+	}
+	return nil
+}
+
+// PublishPeerUpdate --- publishes a peer update to all the peers of a node
+func PublishExtPeerUpdate(node *models.Node) error {
+	var err error
+	if logic.IsLocalServer(node) {
+		if err = logic.ServerUpdate(node, false); err != nil {
+			logger.Log(1, "server node:", node.ID, "failed to update peers with ext clients")
+			return err
+		} else {
+			return nil
+		}
+	}
+	if !servercfg.IsMessageQueueBackend() {
+		return nil
+	}
+	peerUpdate, err := logic.GetPeerUpdate(node)
+	if err != nil {
+		return err
+	}
+	data, err := json.Marshal(&peerUpdate)
+	if err != nil {
+		return err
+	}
+	return publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data)
+}
+
+// NodeUpdate -- publishes a node update
+func NodeUpdate(node *models.Node) error {
+	if !servercfg.IsMessageQueueBackend() {
+		return nil
+	}
+	logger.Log(3, "publishing node update to "+node.Name)
+	data, err := json.Marshal(node)
+	if err != nil {
+		logger.Log(2, "error marshalling node update ", err.Error())
+		return err
+	}
+	if err = publish(node, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil {
+		logger.Log(2, "error publishing node update to peer ", node.ID, err.Error())
+		return err
+	}
+	return nil
+}
+
+// sendPeers - retrieve networks, send peer ports to all peers
+func sendPeers() {
+	var force bool
+	peer_force_send++
+	if peer_force_send == 5 {
+		force = true
+		peer_force_send = 0
+	}
+	networks, err := logic.GetNetworks()
+	if err != nil {
+		logger.Log(1, "error retrieving networks for keepalive", err.Error())
+	}
+	for _, network := range networks {
+		serverNode, errN := logic.GetNetworkServerLeader(network.NetID)
+		if errN == nil {
+			serverNode.SetLastCheckIn()
+			logic.UpdateNode(&serverNode, &serverNode)
+			if network.DefaultUDPHolePunch == "yes" {
+				if logic.ShouldPublishPeerPorts(&serverNode) || force {
+					if force {
+						logger.Log(2, "sending scheduled peer update (5 min)")
+					}
+					err = PublishPeerUpdate(&serverNode)
+					if err != nil {
+						logger.Log(1, "error publishing udp port updates for network", network.NetID)
+						logger.Log(1, errN.Error())
+					}
+				}
+			}
+		} else {
+			logger.Log(1, "unable to retrieve leader for network ", network.NetID)
+			logger.Log(1, errN.Error())
+			continue
+		}
+	}
+}

+ 11 - 0
mq/util.go

@@ -70,3 +70,14 @@ func publish(node *models.Node, dest string, msg []byte) error {
 	}
 	return nil
 }
+
+//  decodes a message queue topic and returns the embedded node.ID
+func getID(topic string) (string, error) {
+	parts := strings.Split(topic, "/")
+	count := len(parts)
+	if count == 1 {
+		return "", fmt.Errorf("invalid topic")
+	}
+	//the last part of the topic will be the node.ID
+	return parts[count-1], nil
+}

+ 102 - 125
netclient/functions/daemon.go

@@ -88,94 +88,6 @@ func Daemon() error {
 
 }
 
-// SetupMQTT creates a connection to broker and return client
-func SetupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
-	opts := mqtt.NewClientOptions()
-	server := getServerAddress(cfg)
-	opts.AddBroker(server + ":1883")
-	id := ncutils.MakeRandomString(23)
-	opts.ClientID = id
-	opts.SetDefaultPublishHandler(All)
-	opts.SetAutoReconnect(true)
-	opts.SetConnectRetry(true)
-	opts.SetConnectRetryInterval(time.Second << 2)
-	opts.SetKeepAlive(time.Minute >> 1)
-	opts.SetWriteTimeout(time.Minute)
-	opts.SetOnConnectHandler(func(client mqtt.Client) {
-		if !publish {
-			if cfg.DebugOn {
-				if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
-					ncutils.Log(token.Error().Error())
-					return
-				}
-				ncutils.Log("subscribed to all topics for debugging purposes")
-			}
-			if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
-				ncutils.Log(token.Error().Error())
-				return
-			}
-			if cfg.DebugOn {
-				ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
-			}
-			if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
-				ncutils.Log(token.Error().Error())
-				return
-			}
-			if cfg.DebugOn {
-				ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
-			}
-			opts.SetOrderMatters(true)
-			opts.SetResumeSubs(true)
-		}
-	})
-	opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
-		ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network)
-		_, err := Pull(cfg.Node.Network, true)
-		if err != nil {
-			ncutils.Log("could not run pull, server unreachable: " + err.Error())
-			ncutils.Log("waiting to retry...")
-			/*
-				//Consider putting in logic to restart - daemon may take long time to refresh
-				time.Sleep(time.Minute * 5)
-					ncutils.Log("restarting netclient")
-					daemon.Restart()
-			*/
-		}
-		ncutils.Log("connection re-established with mqtt server")
-	})
-
-	client := mqtt.NewClient(opts)
-	tperiod := time.Now().Add(12 * time.Second)
-	for {
-		//if after 12 seconds, try a gRPC pull on the last try
-		if time.Now().After(tperiod) {
-			ncutils.Log("running pull for " + cfg.Node.Network)
-			_, err := Pull(cfg.Node.Network, true)
-			if err != nil {
-				ncutils.Log("could not run pull, exiting " + cfg.Node.Network + " setup: " + err.Error())
-				return client
-			}
-			time.Sleep(time.Second)
-		}
-		if token := client.Connect(); token.Wait() && token.Error() != nil {
-			ncutils.Log("unable to connect to broker, retrying ...")
-			if time.Now().After(tperiod) {
-				ncutils.Log("could not connect to broker, exiting " + cfg.Node.Network + " setup: " + token.Error().Error())
-				if strings.Contains(token.Error().Error(), "connectex") || strings.Contains(token.Error().Error(), "i/o timeout") {
-					ncutils.PrintLog("connection issue detected.. pulling and restarting daemon", 0)
-					Pull(cfg.Node.Network, true)
-					daemon.Restart()
-				}
-				return client
-			}
-		} else {
-			break
-		}
-		time.Sleep(2 * time.Second)
-	}
-	return client
-}
-
 // MessageQueue sets up Message Queue and subsribes/publishes updates to/from server
 func MessageQueue(ctx context.Context, network string) {
 	ncutils.Log("netclient go routine started for " + network)
@@ -185,7 +97,7 @@ func MessageQueue(ctx context.Context, network string) {
 
 	cfg.ReadConfig()
 	ncutils.Log("daemon started for network: " + network)
-	client := SetupMQTT(&cfg, false)
+	client := setupMQTT(&cfg, false)
 
 	defer client.Disconnect(250)
 	wg := &sync.WaitGroup{}
@@ -287,15 +199,15 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
 		ncutils.Log("error updating wireguard config " + err.Error())
 		return
 	}
-	if ifaceDelta {
+	if ifaceDelta { // if a change caused an ifacedelta we need to notify the server to update the peers
 		ncutils.Log("applying WG conf to " + file)
 		err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file)
 		if err != nil {
 			ncutils.Log("error restarting wg after node update " + err.Error())
 			return
 		}
-		time.Sleep(time.Second >> 1)
 
+		time.Sleep(time.Second >> 1)
 		if newNode.DNSOn == "yes" {
 			for _, server := range newNode.NetworkSettings.DefaultServerAddrs {
 				if server.IsLeader {
@@ -304,6 +216,7 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
 				}
 			}
 		}
+		publishClientPeers(&cfg)
 	}
 	//deal with DNS
 	if newNode.DNSOn != "yes" && shouldDNSChange && cfg.Node.Interface != "" {
@@ -361,39 +274,6 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
 	}
 }
 
-// MonitorKeepalive - checks time last server keepalive received.  If more than 3+ minutes, notify and resubscribe
-// func MonitorKeepalive(ctx context.Context, wg *sync.WaitGroup, client mqtt.Client, cfg *config.ClientConfig) {
-// 	defer wg.Done()
-// 	for {
-// 		select {
-// 		case <-ctx.Done():
-// 			ncutils.Log("cancel recieved, monitor keepalive exiting")
-// 			return
-// 		case <-time.After(time.Second * 150):
-// 			var keepalivetime time.Time
-// 			keepaliveval, ok := keepalive.Load(cfg.Node.Network)
-// 			if ok {
-// 				keepalivetime = keepaliveval.(time.Time)
-// 				if !keepalivetime.IsZero() && time.Since(keepalivetime) > time.Second*120 { // more than 2+ minutes
-// 					// ncutils.Log("server keepalive not recieved recently, resubscribe to message queue")
-// 					// err := Resubscribe(client, cfg)
-// 					// if err != nil {
-// 					// 	ncutils.Log("closing " + err.Error())
-// 					// }
-// 					ncutils.Log("maybe wanna call something")
-// 				}
-// 			}
-// 		}
-// 	}
-// }
-
-// ServerKeepAlive -- handler to react to keepalive messages published by server
-// func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
-// 	var currentTime = time.Now()
-// 	keepalive.Store(parseNetworkFromTopic(msg.Topic()), currentTime)
-// 	ncutils.PrintLog("received server keepalive at "+currentTime.String(), 2)
-// }
-
 // UpdateKeys -- updates private key and returns new publickey
 func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error {
 	ncutils.Log("received message to update keys")
@@ -508,6 +388,103 @@ func Hello(cfg *config.ClientConfig, network string) {
 
 // == Private ==
 
+// setupMQTT creates a connection to broker and return client
+func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
+	opts := mqtt.NewClientOptions()
+	server := getServerAddress(cfg)
+	opts.AddBroker(server + ":1883")
+	id := ncutils.MakeRandomString(23)
+	opts.ClientID = id
+	opts.SetDefaultPublishHandler(All)
+	opts.SetAutoReconnect(true)
+	opts.SetConnectRetry(true)
+	opts.SetConnectRetryInterval(time.Second << 2)
+	opts.SetKeepAlive(time.Minute >> 1)
+	opts.SetWriteTimeout(time.Minute)
+	opts.SetOnConnectHandler(func(client mqtt.Client) {
+		if !publish {
+			if cfg.DebugOn {
+				if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
+					ncutils.Log(token.Error().Error())
+					return
+				}
+				ncutils.Log("subscribed to all topics for debugging purposes")
+			}
+			if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
+				ncutils.Log(token.Error().Error())
+				return
+			}
+			if cfg.DebugOn {
+				ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
+			}
+			if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
+				ncutils.Log(token.Error().Error())
+				return
+			}
+			if cfg.DebugOn {
+				ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
+			}
+			opts.SetOrderMatters(true)
+			opts.SetResumeSubs(true)
+		}
+	})
+	opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
+		ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network)
+		_, err := Pull(cfg.Node.Network, true)
+		if err != nil {
+			ncutils.Log("could not run pull, server unreachable: " + err.Error())
+			ncutils.Log("waiting to retry...")
+			/*
+				//Consider putting in logic to restart - daemon may take long time to refresh
+				time.Sleep(time.Minute * 5)
+					ncutils.Log("restarting netclient")
+					daemon.Restart()
+			*/
+		}
+		ncutils.Log("connection re-established with mqtt server")
+	})
+
+	client := mqtt.NewClient(opts)
+	tperiod := time.Now().Add(12 * time.Second)
+	for {
+		//if after 12 seconds, try a gRPC pull on the last try
+		if time.Now().After(tperiod) {
+			ncutils.Log("running pull for " + cfg.Node.Network)
+			_, err := Pull(cfg.Node.Network, true)
+			if err != nil {
+				ncutils.Log("could not run pull, exiting " + cfg.Node.Network + " setup: " + err.Error())
+				return client
+			}
+			time.Sleep(time.Second)
+		}
+		if token := client.Connect(); token.Wait() && token.Error() != nil {
+			ncutils.Log("unable to connect to broker, retrying ...")
+			if time.Now().After(tperiod) {
+				ncutils.Log("could not connect to broker, exiting " + cfg.Node.Network + " setup: " + token.Error().Error())
+				if strings.Contains(token.Error().Error(), "connectex") || strings.Contains(token.Error().Error(), "i/o timeout") {
+					ncutils.PrintLog("connection issue detected.. pulling and restarting daemon", 0)
+					Pull(cfg.Node.Network, true)
+					daemon.Restart()
+				}
+				return client
+			}
+		} else {
+			break
+		}
+		time.Sleep(2 * time.Second)
+	}
+	return client
+}
+
+// publishes a message to server to update peers on this peer's behalf
+func publishClientPeers(cfg *config.ClientConfig) error {
+	payload := []byte(ncutils.MakeRandomString(16)) // just random string for now to keep the bytes different
+	if err := publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), payload); err != nil {
+		return err
+	}
+	return nil
+}
+
 func initialPull(network string) {
 	ncutils.Log("pulling latest config for " + network)
 	var configPath = fmt.Sprintf("%snetconfig-%s", ncutils.GetNetclientPathSpecific(), network)
@@ -548,7 +525,7 @@ func publish(cfg *config.ClientConfig, dest string, msg []byte) error {
 		return err
 	}
 
-	client := SetupMQTT(cfg, true)
+	client := setupMQTT(cfg, true)
 	defer client.Disconnect(250)
 	encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey)
 	if err != nil {