Browse Source

Merge pull request #652 from gravitl/feature_v0.10.0_traffic_encrypt

Feature v0.10.0 traffic encrypt
dcarns 3 years ago
parent
commit
50254af029

+ 6 - 6
controllers/node.go

@@ -433,7 +433,7 @@ func uncordonNode(w http.ResponseWriter, r *http.Request) {
 		if err := mq.NodeUpdate(&node); err != nil {
 			logger.Log(1, "error publishing node update", err.Error())
 		}
-		if err := mq.UpdatePeers(&node); err != nil {
+		if err := mq.PublishPeerUpdate(&node); err != nil {
 			logger.Log(1, "error publishing peer update ", err.Error())
 		}
 	}()
@@ -465,7 +465,7 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) {
 		if err := mq.NodeUpdate(&node); err != nil {
 			logger.Log(1, "error publishing node update", err.Error())
 		}
-		if err := mq.UpdatePeers(&node); err != nil {
+		if err := mq.PublishPeerUpdate(&node); err != nil {
 			logger.Log(1, "error publishing peer update "+err.Error())
 		}
 	}()
@@ -491,7 +491,7 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) {
 		if err := mq.NodeUpdate(&node); err != nil {
 			logger.Log(1, "error publishing node update", err.Error())
 		}
-		if err := mq.UpdatePeers(&node); err != nil {
+		if err := mq.PublishPeerUpdate(&node); err != nil {
 			logger.Log(1, "error publishing peer update ", err.Error())
 		}
 	}()
@@ -516,7 +516,7 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) {
 		if err := mq.NodeUpdate(&node); err != nil {
 			logger.Log(1, "error publishing node update", err.Error())
 		}
-		if err := mq.UpdatePeers(&node); err != nil {
+		if err := mq.PublishPeerUpdate(&node); err != nil {
 			logger.Log(1, "error publishing peer update ", err.Error())
 		}
 	}()
@@ -538,7 +538,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 		if err := mq.NodeUpdate(&node); err != nil {
 			logger.Log(1, "error publishing node update", err.Error())
 		}
-		if err := mq.UpdatePeers(&node); err != nil {
+		if err := mq.PublishPeerUpdate(&node); err != nil {
 			logger.Log(1, "error publishing peer update ", err.Error())
 		}
 	}()
@@ -617,7 +617,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 			logger.Log(1, "error publishing node update", err.Error())
 		}
 		if logic.ShouldPeersUpdate(&node, &newNode) {
-			if err := mq.UpdatePeers(&newNode); err != nil {
+			if err := mq.PublishPeerUpdate(&newNode); err != nil {
 				logger.Log(1, "error publishing peer update after node update", err.Error())
 			}
 		}

+ 13 - 2
controllers/node_grpc.go

@@ -75,7 +75,18 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object)
 			Address:  server.Address,
 		}
 	}
+	// TODO consolidate functionality around files
 	node.NetworkSettings.DefaultServerAddrs = serverAddrs
+	key, keyErr := logic.RetrievePublicTrafficKey()
+	if keyErr != nil {
+		logger.Log(0, "error retrieving key: ", keyErr.Error())
+		return nil, keyErr
+	}
+
+	node.TrafficKeys = models.TrafficKeys{
+		Mine:   node.TrafficKeys.Mine,
+		Server: key,
+	}
 
 	err = logic.CreateNode(&node)
 	if err != nil {
@@ -103,7 +114,7 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object)
 	logger.Log(0, "new node,", node.Name, ", added on network,"+node.Network)
 	// notify other nodes on network of new peer
 	go func() {
-		if err := mq.UpdatePeers(&node); err != nil {
+		if err := mq.PublishPeerUpdate(&node); err != nil {
 			logger.Log(0, "failed to inform peers of new node ", err.Error())
 		}
 	}()
@@ -170,7 +181,7 @@ func (s *NodeServiceServer) DeleteNode(ctx context.Context, req *nodepb.Object)
 	}
 	// notify other nodes on network of deleted peer
 	go func() {
-		if err := mq.UpdatePeers(&node); err != nil {
+		if err := mq.PublishPeerUpdate(&node); err != nil {
 			logger.Log(0, "failed to inform peers of deleted node ", err.Error())
 		}
 	}()

+ 2 - 2
controllers/relay.go

@@ -34,7 +34,7 @@ func createRelay(w http.ResponseWriter, r *http.Request) {
 		if err := mq.NodeUpdate(&node); err != nil {
 			logger.Log(1, "error publishing node update", err.Error())
 		}
-		if err := mq.UpdatePeers(&node); err != nil {
+		if err := mq.PublishPeerUpdate(&node); err != nil {
 			logger.Log(1, "error publishing peer update ", err.Error())
 		}
 	}()
@@ -60,7 +60,7 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 		if err := mq.NodeUpdate(&node); err != nil {
 			logger.Log(1, "error publishing node update", err.Error())
 		}
-		if err := mq.UpdatePeers(&node); err != nil {
+		if err := mq.PublishPeerUpdate(&node); err != nil {
 			logger.Log(1, "error publishing peer update ", err.Error())
 		}
 	}()

+ 30 - 9
database/database.go

@@ -1,15 +1,17 @@
 package database
 
 import (
+	"crypto/rand"
 	"encoding/json"
 	"errors"
-	"strings"
 	"time"
 
 	"github.com/google/uuid"
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/models"
+	"github.com/gravitl/netmaker/netclient/ncutils"
 	"github.com/gravitl/netmaker/servercfg"
+	"golang.org/x/crypto/nacl/box"
 )
 
 // NETWORKS_TABLE_NAME - networks table
@@ -36,13 +38,13 @@ const INT_CLIENTS_TABLE_NAME = "intclients"
 // PEERS_TABLE_NAME - peers table
 const PEERS_TABLE_NAME = "peers"
 
-// SERVERCONF_TABLE_NAME
+// SERVERCONF_TABLE_NAME - stores server conf
 const SERVERCONF_TABLE_NAME = "serverconf"
 
-// SERVER_UUID_TABLE_NAME
+// SERVER_UUID_TABLE_NAME - stores unique netmaker server data
 const SERVER_UUID_TABLE_NAME = "serveruuid"
 
-// SERVER_UUID_RECORD_KEY
+// SERVER_UUID_RECORD_KEY - telemetry thing
 const SERVER_UUID_RECORD_KEY = "serveruuid"
 
 // DATABASE_FILENAME - database file name
@@ -114,8 +116,7 @@ func InitializeDatabase() error {
 		time.Sleep(2 * time.Second)
 	}
 	createTables()
-	err := initializeUUID()
-	return err
+	return initializeUUID()
 }
 
 func createTables() {
@@ -139,7 +140,8 @@ func createTable(tableName string) error {
 // IsJSONString - checks if valid json
 func IsJSONString(value string) bool {
 	var jsonInt interface{}
-	return json.Unmarshal([]byte(value), &jsonInt) == nil
+	var nodeInt models.Node
+	return json.Unmarshal([]byte(value), &jsonInt) == nil || json.Unmarshal([]byte(value), &nodeInt) == nil
 }
 
 // Insert - inserts object into db
@@ -199,13 +201,32 @@ func FetchRecords(tableName string) (map[string]string, error) {
 func initializeUUID() error {
 	records, err := FetchRecords(SERVER_UUID_TABLE_NAME)
 	if err != nil {
-		if !strings.Contains("could not find any records", err.Error()) {
+		if !IsEmptyRecord(err) {
 			return err
 		}
 	} else if len(records) > 0 {
 		return nil
 	}
-	telemetry := models.Telemetry{UUID: uuid.NewString()}
+	// setup encryption keys
+	var trafficPubKey, trafficPrivKey, errT = box.GenerateKey(rand.Reader) // generate traffic keys
+	if errT != nil {
+		return errT
+	}
+	tPriv, err := ncutils.ConvertKeyToBytes(trafficPrivKey)
+	if err != nil {
+		return err
+	}
+
+	tPub, err := ncutils.ConvertKeyToBytes(trafficPubKey)
+	if err != nil {
+		return err
+	}
+
+	telemetry := models.Telemetry{
+		UUID:           uuid.NewString(),
+		TrafficKeyPriv: tPriv,
+		TrafficKeyPub:  tPub,
+	}
 	telJSON, err := json.Marshal(&telemetry)
 	if err != nil {
 		return err

+ 1 - 1
database/sqlite.go

@@ -30,7 +30,7 @@ var SQLITE_FUNCTIONS = map[string]interface{}{
 func initSqliteDB() error {
 	// == create db file if not present ==
 	if _, err := os.Stat("data"); os.IsNotExist(err) {
-		os.Mkdir("data", 0744)
+		os.Mkdir("data", 0700)
 	}
 	dbFilePath := filepath.Join("data", dbFilename)
 	if _, err := os.Stat(dbFilePath); os.IsNotExist(err) {

+ 3 - 3
logger/logger.go

@@ -37,9 +37,6 @@ func ResetLogs() {
 
 // Log - handles adding logs
 func Log(verbosity int, message ...string) {
-	var mu sync.Mutex
-	mu.Lock()
-	defer mu.Unlock()
 	var currentTime = time.Now()
 	var currentMessage = makeString(message...)
 	if int32(verbosity) <= getVerbose() && getVerbose() >= 0 {
@@ -55,6 +52,9 @@ func Dump() string {
 		Key   string
 		Value time.Time
 	}
+	var mu sync.Mutex
+	mu.Lock()
+	defer mu.Unlock()
 	var dumpLogs = make([]keyVal, 0, len(currentLogs))
 	for key, value := range currentLogs {
 		parsedTime, err := time.Parse(TimeFormat, value)

+ 1 - 1
logic/nodes.go

@@ -60,8 +60,8 @@ func GetSortedNetworkServerNodes(network string) ([]models.Node, error) {
 
 // GetServerNodes - gets the server nodes of a network
 func GetServerNodes(network string) []models.Node {
-	var nodes, err = GetNetworkNodes(network)
 	var serverNodes = make([]models.Node, 0)
+	var nodes, err = GetNetworkNodes(network)
 	if err != nil {
 		return serverNodes
 	}

+ 5 - 3
logic/telemetry.go

@@ -74,11 +74,13 @@ func fetchTelemetryData() (telemetryData, error) {
 }
 
 // setTelemetryTimestamp - Give the entry in the DB a new timestamp
-func setTelemetryTimestamp(uuid string) error {
+func setTelemetryTimestamp(telRecord *models.Telemetry) error {
 	lastsend := time.Now().Unix()
 	var serverTelData = models.Telemetry{
-		UUID:     uuid,
-		LastSend: lastsend,
+		UUID:           telRecord.UUID,
+		LastSend:       lastsend,
+		TrafficKeyPriv: telRecord.TrafficKeyPriv,
+		TrafficKeyPub:  telRecord.TrafficKeyPub,
 	}
 	jsonObj, err := json.Marshal(&serverTelData)
 	if err != nil {

+ 1 - 1
logic/timer.go

@@ -31,7 +31,7 @@ func TimerCheckpoint() error {
 		runHooks()
 	}
 	// set telemetry timestamp for server, restarts 24 hour cycle
-	return setTelemetryTimestamp(telRecord.UUID)
+	return setTelemetryTimestamp(&telRecord)
 }
 
 // AddHook - adds a hook function to run every 24hrs

+ 21 - 0
logic/traffic.go

@@ -0,0 +1,21 @@
+package logic
+
+// RetrievePrivateTrafficKey - retrieves private key of server
+func RetrievePrivateTrafficKey() ([]byte, error) {
+	var telRecord, err = fetchTelemetryRecord()
+	if err != nil {
+		return nil, err
+	}
+
+	return telRecord.TrafficKeyPriv, nil
+}
+
+// RetrievePublicTrafficKey - retrieves public key of server
+func RetrievePublicTrafficKey() ([]byte, error) {
+	var telRecord, err = fetchTelemetryRecord()
+	if err != nil {
+		return nil, err
+	}
+
+	return telRecord.TrafficKeyPub, nil
+}

+ 29 - 28
models/node.go

@@ -48,34 +48,35 @@ type Node struct {
 	LastCheckIn         int64    `json:"lastcheckin" bson:"lastcheckin" yaml:"lastcheckin"`
 	MacAddress          string   `json:"macaddress" bson:"macaddress" yaml:"macaddress"`
 	// checkin interval is depreciated at the network level. Set on server with CHECKIN_INTERVAL
-	CheckInInterval     int32    `json:"checkininterval" bson:"checkininterval" yaml:"checkininterval"`
-	Password            string   `json:"password" bson:"password" yaml:"password" validate:"required,min=6"`
-	Network             string   `json:"network" bson:"network" yaml:"network" validate:"network_exists"`
-	IsRelayed           string   `json:"isrelayed" bson:"isrelayed" yaml:"isrelayed"`
-	IsPending           string   `json:"ispending" bson:"ispending" yaml:"ispending"`
-	IsRelay             string   `json:"isrelay" bson:"isrelay" yaml:"isrelay" validate:"checkyesorno"`
-	IsDocker            string   `json:"isdocker" bson:"isdocker" yaml:"isdocker" validate:"checkyesorno"`
-	IsK8S               string   `json:"isk8s" bson:"isk8s" yaml:"isk8s" validate:"checkyesorno"`
-	IsEgressGateway     string   `json:"isegressgateway" bson:"isegressgateway" yaml:"isegressgateway"`
-	IsIngressGateway    string   `json:"isingressgateway" bson:"isingressgateway" yaml:"isingressgateway"`
-	EgressGatewayRanges []string `json:"egressgatewayranges" bson:"egressgatewayranges" yaml:"egressgatewayranges"`
-	RelayAddrs          []string `json:"relayaddrs" bson:"relayaddrs" yaml:"relayaddrs"`
-	IngressGatewayRange string   `json:"ingressgatewayrange" bson:"ingressgatewayrange" yaml:"ingressgatewayrange"`
-	IsStatic            string   `json:"isstatic" bson:"isstatic" yaml:"isstatic" validate:"checkyesorno"`
-	UDPHolePunch        string   `json:"udpholepunch" bson:"udpholepunch" yaml:"udpholepunch" validate:"checkyesorno"`
-	PullChanges         string   `json:"pullchanges" bson:"pullchanges" yaml:"pullchanges" validate:"checkyesorno"`
-	DNSOn               string   `json:"dnson" bson:"dnson" yaml:"dnson" validate:"checkyesorno"`
-	IsDualStack         string   `json:"isdualstack" bson:"isdualstack" yaml:"isdualstack" validate:"checkyesorno"`
-	IsServer            string   `json:"isserver" bson:"isserver" yaml:"isserver" validate:"checkyesorno"`
-	Action              string   `json:"action" bson:"action" yaml:"action"`
-	IsLocal             string   `json:"islocal" bson:"islocal" yaml:"islocal" validate:"checkyesorno"`
-	LocalRange          string   `json:"localrange" bson:"localrange" yaml:"localrange"`
-	Roaming             string   `json:"roaming" bson:"roaming" yaml:"roaming" validate:"checkyesorno"`
-	IPForwarding        string   `json:"ipforwarding" bson:"ipforwarding" yaml:"ipforwarding" validate:"checkyesorno"`
-	OS                  string   `json:"os" bson:"os" yaml:"os"`
-	MTU                 int32    `json:"mtu" bson:"mtu" yaml:"mtu"`
-	Version             string   `json:"version" bson:"version" yaml:"version"`
-	ExcludedAddrs       []string `json:"excludedaddrs" bson:"excludedaddrs" yaml:"excludedaddrs"`
+	CheckInInterval     int32       `json:"checkininterval" bson:"checkininterval" yaml:"checkininterval"`
+	Password            string      `json:"password" bson:"password" yaml:"password" validate:"required,min=6"`
+	Network             string      `json:"network" bson:"network" yaml:"network" validate:"network_exists"`
+	IsRelayed           string      `json:"isrelayed" bson:"isrelayed" yaml:"isrelayed"`
+	IsPending           string      `json:"ispending" bson:"ispending" yaml:"ispending"`
+	IsRelay             string      `json:"isrelay" bson:"isrelay" yaml:"isrelay" validate:"checkyesorno"`
+	IsDocker            string      `json:"isdocker" bson:"isdocker" yaml:"isdocker" validate:"checkyesorno"`
+	IsK8S               string      `json:"isk8s" bson:"isk8s" yaml:"isk8s" validate:"checkyesorno"`
+	IsEgressGateway     string      `json:"isegressgateway" bson:"isegressgateway" yaml:"isegressgateway"`
+	IsIngressGateway    string      `json:"isingressgateway" bson:"isingressgateway" yaml:"isingressgateway"`
+	EgressGatewayRanges []string    `json:"egressgatewayranges" bson:"egressgatewayranges" yaml:"egressgatewayranges"`
+	RelayAddrs          []string    `json:"relayaddrs" bson:"relayaddrs" yaml:"relayaddrs"`
+	IngressGatewayRange string      `json:"ingressgatewayrange" bson:"ingressgatewayrange" yaml:"ingressgatewayrange"`
+	IsStatic            string      `json:"isstatic" bson:"isstatic" yaml:"isstatic" validate:"checkyesorno"`
+	UDPHolePunch        string      `json:"udpholepunch" bson:"udpholepunch" yaml:"udpholepunch" validate:"checkyesorno"`
+	PullChanges         string      `json:"pullchanges" bson:"pullchanges" yaml:"pullchanges" validate:"checkyesorno"`
+	DNSOn               string      `json:"dnson" bson:"dnson" yaml:"dnson" validate:"checkyesorno"`
+	IsDualStack         string      `json:"isdualstack" bson:"isdualstack" yaml:"isdualstack" validate:"checkyesorno"`
+	IsServer            string      `json:"isserver" bson:"isserver" yaml:"isserver" validate:"checkyesorno"`
+	Action              string      `json:"action" bson:"action" yaml:"action"`
+	IsLocal             string      `json:"islocal" bson:"islocal" yaml:"islocal" validate:"checkyesorno"`
+	LocalRange          string      `json:"localrange" bson:"localrange" yaml:"localrange"`
+	Roaming             string      `json:"roaming" bson:"roaming" yaml:"roaming" validate:"checkyesorno"`
+	IPForwarding        string      `json:"ipforwarding" bson:"ipforwarding" yaml:"ipforwarding" validate:"checkyesorno"`
+	OS                  string      `json:"os" bson:"os" yaml:"os"`
+	MTU                 int32       `json:"mtu" bson:"mtu" yaml:"mtu"`
+	Version             string      `json:"version" bson:"version" yaml:"version"`
+	ExcludedAddrs       []string    `json:"excludedaddrs" bson:"excludedaddrs" yaml:"excludedaddrs"`
+	TrafficKeys         TrafficKeys `json:"traffickeys" bson:"traffickeys" yaml:"traffickeys"`
 }
 
 // NodesArray - used for node sorting

+ 14 - 3
models/structs.go

@@ -1,6 +1,8 @@
 package models
 
-import jwt "github.com/golang-jwt/jwt/v4"
+import (
+	jwt "github.com/golang-jwt/jwt/v4"
+)
 
 const PLACEHOLDER_KEY_TEXT = "ACCESS_KEY"
 const PLACEHOLDER_TOKEN_TEXT = "ACCESS_TOKEN"
@@ -165,9 +167,12 @@ type ServerUpdateData struct {
 }
 
 // Telemetry - contains UUID of the server and timestamp of last send to posthog
+// also contains assymetrical encryption pub/priv keys for any server traffic
 type Telemetry struct {
-	UUID     string `json:"uuid" bson:"uuid"`
-	LastSend int64  `json:"lastsend" bson:"lastsend"`
+	UUID           string `json:"uuid" bson:"uuid"`
+	LastSend       int64  `json:"lastsend" bson:"lastsend"`
+	TrafficKeyPriv []byte `json:"traffickeypriv" bson:"traffickeypriv"`
+	TrafficKeyPub  []byte `json:"traffickeypub" bson:"traffickeypub"`
 }
 
 // ServerAddr - to pass to clients to tell server addresses and if it's the leader or not
@@ -175,3 +180,9 @@ type ServerAddr struct {
 	IsLeader bool   `json:"isleader" bson:"isleader" yaml:"isleader"`
 	Address  string `json:"address" bson:"address" yaml:"address"`
 }
+
+// TrafficKeys - struct to hold public keys
+type TrafficKeys struct {
+	Mine   []byte `json:"mine" bson:"mine" yaml:"mine"`
+	Server []byte `json:"server" bson:"server" yaml:"server"`
+}

+ 37 - 29
mq/mq.go

@@ -3,6 +3,7 @@ package mq
 import (
 	"encoding/json"
 	"errors"
+	"fmt"
 	"log"
 	"strings"
 
@@ -40,11 +41,16 @@ var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 			logger.Log(0, record)
 			return
 		}
+		_, decryptErr := decryptMsg(&node, msg.Payload())
+		if decryptErr != nil {
+			logger.Log(0, "error updating node ", node.ID, err.Error())
+			return
+		}
 		node.SetLastCheckIn()
 		if err := logic.UpdateNode(&node, &node); err != nil {
 			logger.Log(0, "error updating node ", err.Error())
 		}
-		logger.Log(0, "ping processed")
+		logger.Log(3, "ping processed for node", node.ID)
 		// --TODO --set client version once feature is implemented.
 		//node.SetClientVersion(msg.Payload())
 	}()
@@ -58,22 +64,28 @@ var UpdateNode mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message)
 			logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
 			return
 		}
+		currentNode, err := logic.GetNodeByID(id)
+		if err != nil {
+			logger.Log(1, "error getting node ", id, err.Error())
+			return
+		}
+		decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
+		if decryptErr != nil {
+			logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
+			return
+		}
 		logger.Log(1, "Update Node Handler", id)
 		var newNode models.Node
-		if err := json.Unmarshal(msg.Payload(), &newNode); err != nil {
+		if err := json.Unmarshal(decrypted, &newNode); err != nil {
 			logger.Log(1, "error unmarshaling payload ", err.Error())
 			return
 		}
-		currentNode, err := logic.GetNodeByID(newNode.ID)
-		if err != nil {
-			logger.Log(1, "error getting node ", newNode.ID, err.Error())
-			return
-		}
+
 		if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
 			logger.Log(1, "error saving node", err.Error())
 		}
 		if logic.ShouldPeersUpdate(&currentNode, &newNode) {
-			if err := PublishPeerUpdate(client, &newNode); err != nil {
+			if err := PublishPeerUpdate(&newNode); err != nil {
 				logger.Log(1, "error publishing peer update ", err.Error())
 				return
 			}
@@ -82,13 +94,19 @@ var UpdateNode mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message)
 }
 
 // PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node
-func PublishPeerUpdate(client mqtt.Client, newNode *models.Node) error {
+func PublishPeerUpdate(newNode *models.Node) error {
+	if !servercfg.IsMessageQueueBackend() {
+		return nil
+	}
 	networkNodes, err := logic.GetNetworkNodes(newNode.Network)
 	if err != nil {
 		logger.Log(1, "err getting Network Nodes", err.Error())
 		return err
 	}
 	for _, node := range networkNodes {
+		if node.IsServer == "yes" {
+			continue
+		}
 		peerUpdate, err := logic.GetPeerUpdate(&node)
 		if err != nil {
 			logger.Log(1, "error getting peer update for node ", node.ID, err.Error())
@@ -96,12 +114,11 @@ func PublishPeerUpdate(client mqtt.Client, newNode *models.Node) error {
 		}
 		data, err := json.Marshal(&peerUpdate)
 		if err != nil {
-			logger.Log(2, "error marshaling peer update ", err.Error())
-			return err
+			logger.Log(2, "error marshaling peer update for node", node.ID, err.Error())
+			continue
 		}
-		if token := client.Publish("update/peers/"+node.ID, 0, false, data); token.Wait() && token.Error() != nil {
-			logger.Log(2, "error publishing peer update to peer ", node.ID, token.Error().Error())
-			return err
+		if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil {
+			logger.Log(1, "failed to publish peer update for node", node.ID)
 		}
 	}
 	return nil
@@ -118,28 +135,19 @@ func GetID(topic string) (string, error) {
 	return parts[count-1], nil
 }
 
-// UpdateNode -- publishes a node update
+// NodeUpdate -- publishes a node update
 func NodeUpdate(node *models.Node) error {
+	if !servercfg.IsMessageQueueBackend() {
+		return nil
+	}
 	logger.Log(3, "publishing node update to "+node.Name)
-	client := SetupMQTT()
-	defer client.Disconnect(250)
 	data, err := json.Marshal(node)
 	if err != nil {
 		logger.Log(2, "error marshalling node update ", err.Error())
 		return err
 	}
-	if token := client.Publish("update/"+node.ID, 0, false, data); token.Wait() && token.Error() != nil {
-		logger.Log(2, "error publishing peer update to peer ", node.ID, token.Error().Error())
-		return err
-	}
-	return nil
-}
-
-// UpdatePeers -- publishes a peer update to all the peers of a node
-func UpdatePeers(node *models.Node) error {
-	client := SetupMQTT()
-	defer client.Disconnect(250)
-	if err := PublishPeerUpdate(client, node); err != nil {
+	if err = publish(node, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil {
+		logger.Log(2, "error publishing node update to peer ", node.ID, err.Error())
 		return err
 	}
 	return nil

+ 57 - 0
mq/util.go

@@ -0,0 +1,57 @@
+package mq
+
+import (
+	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/models"
+	"github.com/gravitl/netmaker/netclient/ncutils"
+)
+
+func decryptMsg(node *models.Node, msg []byte) ([]byte, error) {
+	trafficKey, trafficErr := logic.RetrievePrivateTrafficKey() // get server private key
+	if trafficErr != nil {
+		return nil, trafficErr
+	}
+	serverPrivTKey, err := ncutils.ConvertBytesToKey(trafficKey)
+	if err != nil {
+		return nil, err
+	}
+	nodePubTKey, err := ncutils.ConvertBytesToKey(node.TrafficKeys.Mine)
+	if err != nil {
+		return nil, err
+	}
+
+	return ncutils.BoxDecrypt(msg, nodePubTKey, serverPrivTKey)
+}
+
+func encryptMsg(node *models.Node, msg []byte) ([]byte, error) {
+	// fetch server public key to be certain hasn't changed in transit
+	trafficKey, trafficErr := logic.RetrievePrivateTrafficKey()
+	if trafficErr != nil {
+		return nil, trafficErr
+	}
+
+	serverPrivKey, err := ncutils.ConvertBytesToKey(trafficKey)
+	if err != nil {
+		return nil, err
+	}
+
+	nodePubKey, err := ncutils.ConvertBytesToKey(node.TrafficKeys.Mine)
+	if err != nil {
+		return nil, err
+	}
+
+	return ncutils.BoxEncrypt(msg, nodePubKey, serverPrivKey)
+}
+
+func publish(node *models.Node, dest string, msg []byte) error {
+	client := SetupMQTT()
+	defer client.Disconnect(250)
+	encrypted, encryptErr := encryptMsg(node, msg)
+	if encryptErr != nil {
+		return encryptErr
+	}
+	if token := client.Publish(dest, 0, false, encrypted); token.Wait() && token.Error() != nil {
+		return token.Error()
+	}
+	return nil
+}

+ 20 - 3
netclient/auth/auth.go

@@ -73,7 +73,7 @@ func AutoLogin(client nodepb.NodeServiceClient, network string) error {
 		return err
 	}
 	tokenstring := []byte(res.Data)
-	err = os.WriteFile(home+"nettoken-"+network, tokenstring, 0644) // TODO: Proper permissions?
+	err = os.WriteFile(home+"nettoken-"+network, tokenstring, 0600) // TODO: Proper permissions?
 	if err != nil {
 		return err
 	}
@@ -83,8 +83,7 @@ func AutoLogin(client nodepb.NodeServiceClient, network string) error {
 // StoreSecret - stores auth secret locally
 func StoreSecret(key string, network string) error {
 	d1 := []byte(key)
-	err := os.WriteFile(ncutils.GetNetclientPathSpecific()+"secret-"+network, d1, 0644)
-	return err
+	return os.WriteFile(ncutils.GetNetclientPathSpecific()+"secret-"+network, d1, 0600)
 }
 
 // RetrieveSecret - fetches secret locally
@@ -93,6 +92,24 @@ func RetrieveSecret(network string) (string, error) {
 	return string(dat), err
 }
 
+// StoreTrafficKey - stores traffic key
+func StoreTrafficKey(key *[32]byte, network string) error {
+	var data, err = ncutils.ConvertKeyToBytes(key)
+	if err != nil {
+		return err
+	}
+	return os.WriteFile(ncutils.GetNetclientPathSpecific()+"traffic-"+network, data, 0600)
+}
+
+// RetrieveTrafficKey - reads traffic file locally
+func RetrieveTrafficKey(network string) (*[32]byte, error) {
+	data, err := os.ReadFile(ncutils.GetNetclientPathSpecific() + "traffic-" + network)
+	if err != nil {
+		return nil, err
+	}
+	return ncutils.ConvertBytesToKey(data)
+}
+
 // Configuraion - struct for mac and pass
 type Configuration struct {
 	MacAddress string

+ 3 - 0
netclient/functions/common.go

@@ -253,6 +253,9 @@ func WipeLocal(network string) error {
 	if ncutils.FileExists(home + "secret-" + network) {
 		_ = os.Remove(home + "secret-" + network)
 	}
+	if ncutils.FileExists(home + "traffic-" + network) {
+		_ = os.Remove(home + "traffic-" + network)
+	}
 	if ncutils.FileExists(home + "wgkey-" + network) {
 		_ = os.Remove(home + "wgkey-" + network)
 	}

+ 78 - 22
netclient/functions/daemon.go

@@ -8,12 +8,14 @@ import (
 	"os"
 	"os/signal"
 	"runtime"
+	"strings"
 	"sync"
 	"syscall"
 	"time"
 
 	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"github.com/gravitl/netmaker/models"
+	"github.com/gravitl/netmaker/netclient/auth"
 	"github.com/gravitl/netmaker/netclient/config"
 	"github.com/gravitl/netmaker/netclient/local"
 	"github.com/gravitl/netmaker/netclient/ncutils"
@@ -88,17 +90,17 @@ func MessageQueue(ctx context.Context, network string) {
 		}
 		ncutils.Log("subscribed to all topics for debugging purposes")
 	}
-	if token := client.Subscribe("update/"+cfg.Node.ID, 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
+	if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
 		log.Fatal(token.Error())
 	}
 	if cfg.DebugOn {
-		ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/" + cfg.Node.ID)
+		ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s \n", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
 	}
-	if token := client.Subscribe("update/peers/"+cfg.Node.ID, 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
+	if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
 		log.Fatal(token.Error())
 	}
 	if cfg.DebugOn {
-		ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/peers/" + cfg.Node.ID)
+		ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s \n", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
 	}
 	defer client.Disconnect(250)
 	go Checkin(ctx, &cfg, network)
@@ -119,20 +121,27 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
 	go func() {
 		var newNode models.Node
 		var cfg config.ClientConfig
-		err := json.Unmarshal(msg.Payload(), &newNode)
+		var network = parseNetworkFromTopic(msg.Topic())
+		cfg.Network = network
+		cfg.ReadConfig()
+
+		data, dataErr := decryptMsg(&cfg, msg.Payload())
+		if dataErr != nil {
+			return
+		}
+		err := json.Unmarshal([]byte(data), &newNode)
 		if err != nil {
 			ncutils.Log("error unmarshalling node update data" + err.Error())
 			return
 		}
+
 		ncutils.Log("received message to update node " + newNode.Name)
 		// see if cache hit, if so skip
 		var currentMessage = read(newNode.Network, lastNodeUpdate)
-		if currentMessage == string(msg.Payload()) {
+		if currentMessage == string(data) {
 			return
 		}
-		insert(newNode.Network, lastNodeUpdate, string(msg.Payload()))
-		cfg.Network = newNode.Network
-		cfg.ReadConfig()
+		insert(newNode.Network, lastNodeUpdate, string(data))
 		//check if interface name has changed if so delete.
 		if cfg.Node.Interface != newNode.Interface {
 			if err = wireguard.RemoveConf(cfg.Node.Interface, true); err != nil {
@@ -201,21 +210,28 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
 func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
 	go func() {
 		var peerUpdate models.PeerUpdate
-		err := json.Unmarshal(msg.Payload(), &peerUpdate)
+		var network = parseNetworkFromTopic(msg.Topic())
+		var cfg = config.ClientConfig{}
+		cfg.Network = network
+		cfg.ReadConfig()
+
+		data, dataErr := decryptMsg(&cfg, msg.Payload())
+		if dataErr != nil {
+			return
+		}
+		err := json.Unmarshal([]byte(data), &peerUpdate)
 		if err != nil {
 			ncutils.Log("error unmarshalling peer data")
 			return
 		}
 		// see if cache hit, if so skip
 		var currentMessage = read(peerUpdate.Network, lastPeerUpdate)
-		if currentMessage == string(msg.Payload()) {
+		if currentMessage == string(data) {
 			return
 		}
-		insert(peerUpdate.Network, lastPeerUpdate, string(msg.Payload()))
+		insert(peerUpdate.Network, lastPeerUpdate, string(data))
 		ncutils.Log("update peer handler")
-		var cfg config.ClientConfig
-		cfg.Network = peerUpdate.Network
-		cfg.ReadConfig()
+
 		var shouldReSub = shouldResub(cfg.Node.NetworkSettings.DefaultServerAddrs, peerUpdate.ServerAddrs)
 		if shouldReSub {
 			Resubscribe(client, &cfg)
@@ -335,24 +351,64 @@ func PublishNodeUpdate(cfg *config.ClientConfig) {
 	if err := config.Write(cfg, cfg.Network); err != nil {
 		ncutils.Log("error saving configuration" + err.Error())
 	}
-	client := SetupMQTT(cfg)
 	data, err := json.Marshal(cfg.Node)
 	if err != nil {
 		ncutils.Log("error marshling node update " + err.Error())
 	}
-	if token := client.Publish("update/"+cfg.Node.ID, 0, false, data); token.Wait() && token.Error() != nil {
-		ncutils.Log("error publishing endpoint update " + token.Error().Error())
+	if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data); err != nil {
+		ncutils.Log(fmt.Sprintf("error publishing endpoint update, %v \n", err))
 	}
-	client.Disconnect(250)
 }
 
 // Hello -- ping the broker to let server know node is alive and doing fine
 func Hello(cfg *config.ClientConfig, network string) {
+	if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte("hello world!")); err != nil {
+		ncutils.Log(fmt.Sprintf("error publishing ping, %v \n", err))
+	}
+}
+
+func publish(cfg *config.ClientConfig, dest string, msg []byte) error {
+	// setup the keys
+	trafficPrivKey, err := auth.RetrieveTrafficKey(cfg.Node.Network)
+	if err != nil {
+		return err
+	}
+
+	serverPubKey, err := ncutils.ConvertBytesToKey(cfg.Node.TrafficKeys.Server)
+	if err != nil {
+		return err
+	}
+
 	client := SetupMQTT(cfg)
-	if token := client.Publish("ping/"+cfg.Node.ID, 2, false, "hello world!"); token.Wait() && token.Error() != nil {
-		ncutils.Log("error publishing ping " + token.Error().Error())
+	defer client.Disconnect(250)
+	encrypted, err := ncutils.BoxEncrypt(msg, serverPubKey, trafficPrivKey)
+	if err != nil {
+		return err
+	}
+
+	if token := client.Publish(dest, 0, false, encrypted); token.Wait() && token.Error() != nil {
+		return token.Error()
 	}
-	client.Disconnect(250)
+	return nil
+}
+
+func parseNetworkFromTopic(topic string) string {
+	return strings.Split(topic, "/")[1]
+}
+
+func decryptMsg(cfg *config.ClientConfig, msg []byte) ([]byte, error) {
+	// setup the keys
+	diskKey, keyErr := auth.RetrieveTrafficKey(cfg.Node.Network)
+	if keyErr != nil {
+		return nil, keyErr
+	}
+
+	serverPubKey, err := ncutils.ConvertBytesToKey(cfg.Node.TrafficKeys.Server)
+	if err != nil {
+		return nil, err
+	}
+
+	return ncutils.BoxDecrypt(msg, serverPubKey, diskKey)
 }
 
 func shouldResub(currentServers, newServers []models.ServerAddr) bool {

+ 35 - 13
netclient/functions/join.go

@@ -2,6 +2,7 @@ package functions
 
 import (
 	"context"
+	"crypto/rand"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -19,6 +20,7 @@ import (
 	"github.com/gravitl/netmaker/netclient/ncutils"
 	"github.com/gravitl/netmaker/netclient/server"
 	"github.com/gravitl/netmaker/netclient/wireguard"
+	"golang.org/x/crypto/nacl/box"
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 	"google.golang.org/grpc"
 )
@@ -30,22 +32,41 @@ func JoinNetwork(cfg config.ClientConfig, privateKey string) error {
 	}
 
 	var err error
-	if cfg.Node.IsServer != "yes" {
-		if local.HasNetwork(cfg.Network) {
-			err := errors.New("ALREADY_INSTALLED. Netclient appears to already be installed for " + cfg.Network + ". To re-install, please remove by executing 'sudo netclient leave -n " + cfg.Network + "'. Then re-run the install command.")
-			return err
-		}
+	if local.HasNetwork(cfg.Network) {
+		err := errors.New("ALREADY_INSTALLED. Netclient appears to already be installed for " + cfg.Network + ". To re-install, please remove by executing 'sudo netclient leave -n " + cfg.Network + "'. Then re-run the install command.")
+		return err
+	}
 
-		err = config.Write(&cfg, cfg.Network)
-		if err != nil {
-			return err
-		}
-		if cfg.Node.Password == "" {
-			cfg.Node.Password = ncutils.GenPass()
-		}
-		auth.StoreSecret(cfg.Node.Password, cfg.Node.Network)
+	err = config.Write(&cfg, cfg.Network)
+	if err != nil {
+		return err
+	}
+	if cfg.Node.Password == "" {
+		cfg.Node.Password = ncutils.GenPass()
+	}
+	var trafficPubKey, trafficPrivKey, errT = box.GenerateKey(rand.Reader) // generate traffic keys
+	if errT != nil {
+		return errT
 	}
 
+	// == handle keys ==
+	if err = auth.StoreSecret(cfg.Node.Password, cfg.Node.Network); err != nil {
+		return err
+	}
+
+	if err = auth.StoreTrafficKey(trafficPrivKey, cfg.Node.Network); err != nil {
+		return err
+	}
+
+	trafficPubKeyBytes, err := ncutils.ConvertKeyToBytes(trafficPubKey)
+	if err != nil {
+		return err
+	}
+
+	cfg.Node.TrafficKeys.Mine = trafficPubKeyBytes
+	cfg.Node.TrafficKeys.Server = nil
+	// == end handle keys ==
+
 	if cfg.Node.LocalRange != "" && cfg.Node.LocalAddress == "" {
 		log.Println("local vpn, getting local address from range: " + cfg.Node.LocalRange)
 		cfg.Node.LocalAddress = getLocalIP(cfg.Node)
@@ -122,6 +143,7 @@ func JoinNetwork(cfg config.ClientConfig, privateKey string) error {
 		Endpoint:            cfg.Node.Endpoint,
 		SaveConfig:          cfg.Node.SaveConfig,
 		UDPHolePunch:        cfg.Node.UDPHolePunch,
+		TrafficKeys:         cfg.Node.TrafficKeys,
 	}
 
 	ncutils.Log("joining " + cfg.Network + " at " + cfg.Server.GRPCAddress)

+ 51 - 0
netclient/ncutils/netclientutils.go

@@ -1,7 +1,10 @@
 package ncutils
 
 import (
+	"bytes"
+	crand "crypto/rand"
 	"crypto/tls"
+	"encoding/gob"
 	"errors"
 	"fmt"
 	"io"
@@ -18,6 +21,7 @@ import (
 	"time"
 
 	"github.com/gravitl/netmaker/models"
+	"golang.org/x/crypto/nacl/box"
 	"golang.zx2c4.com/wireguard/wgctrl"
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 	"google.golang.org/grpc"
@@ -51,6 +55,9 @@ const NETCLIENT_DEFAULT_PORT = 51821
 // DEFAULT_GC_PERCENT - garbage collection percent
 const DEFAULT_GC_PERCENT = 10
 
+// KEY_SIZE = ideal length for keys
+const KEY_SIZE = 2048
+
 // Log - logs a message
 func Log(message string) {
 	log.SetFlags(log.Flags() &^ (log.Llongfile | log.Lshortfile))
@@ -534,6 +541,28 @@ func CheckWG() {
 	}
 }
 
+// ConvertKeyToBytes - util to convert a key to bytes to use elsewhere
+func ConvertKeyToBytes(key *[32]byte) ([]byte, error) {
+	var buffer bytes.Buffer
+	var enc = gob.NewEncoder(&buffer)
+	if err := enc.Encode(key); err != nil {
+		return nil, err
+	}
+	return buffer.Bytes(), nil
+}
+
+// ConvertBytesToKey - util to convert bytes to a key to use elsewhere
+func ConvertBytesToKey(data []byte) (*[32]byte, error) {
+	var buffer = bytes.NewBuffer(data)
+	var dec = gob.NewDecoder(buffer)
+	var result = new([32]byte)
+	var err = dec.Decode(result)
+	if err != nil {
+		return nil, err
+	}
+	return result, err
+}
+
 // ServerAddrSliceContains - sees if a string slice contains a string element
 func ServerAddrSliceContains(slice []models.ServerAddr, item models.ServerAddr) bool {
 	for _, s := range slice {
@@ -543,3 +572,25 @@ func ServerAddrSliceContains(slice []models.ServerAddr, item models.ServerAddr)
 	}
 	return false
 }
+
+// BoxEncrypt - encrypts traffic box
+func BoxEncrypt(message []byte, recipientPubKey *[32]byte, senderPrivateKey *[32]byte) ([]byte, error) {
+	var nonce [24]byte // 192 bits of randomization
+	if _, err := io.ReadFull(crand.Reader, nonce[:]); err != nil {
+		return nil, err
+	}
+
+	encrypted := box.Seal(nonce[:], message, &nonce, recipientPubKey, senderPrivateKey)
+	return encrypted, nil
+}
+
+// BoxDecrypt - decrypts traffic box
+func BoxDecrypt(encrypted []byte, senderPublicKey *[32]byte, recipientPrivateKey *[32]byte) ([]byte, error) {
+	var decryptNonce [24]byte
+	copy(decryptNonce[:], encrypted[:24])
+	decrypted, ok := box.Open(nil, encrypted[24:], &decryptNonce, senderPublicKey, recipientPrivateKey)
+	if !ok {
+		return nil, fmt.Errorf("could not decrypt message")
+	}
+	return decrypted, nil
+}

+ 1 - 1
serverctl/iptables.go

@@ -15,7 +15,7 @@ import (
 
 const netmakerProcessName = "netmaker"
 
-// InitServerNetclient - intializes the server netclient
+// InitIPTables - intializes the server iptables
 func InitIPTables() error {
 	_, err := exec.LookPath("iptables")
 	if err != nil {

+ 2 - 0
serverctl/serverctl.go

@@ -12,6 +12,8 @@ import (
 	"github.com/gravitl/netmaker/netclient/ncutils"
 )
 
+const NETMAKER_BINARY_NAME = "netmaker"
+
 // InitServerNetclient - intializes the server netclient
 func InitServerNetclient() error {
 	netclientDir := ncutils.GetNetclientPath()