|
@@ -18,13 +18,14 @@ import (
|
|
|
"github.com/gravitl/netmaker/models"
|
|
|
"github.com/gravitl/netmaker/netclient/auth"
|
|
|
"github.com/gravitl/netmaker/netclient/config"
|
|
|
+ "github.com/gravitl/netmaker/netclient/daemon"
|
|
|
"github.com/gravitl/netmaker/netclient/local"
|
|
|
"github.com/gravitl/netmaker/netclient/ncutils"
|
|
|
"github.com/gravitl/netmaker/netclient/wireguard"
|
|
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
|
|
)
|
|
|
|
|
|
-// ServerKeepalive - stores time of last server keepalive message
|
|
|
+// == Message Caches ==
|
|
|
var keepalive = new(sync.Map)
|
|
|
var messageCache = new(sync.Map)
|
|
|
|
|
@@ -41,7 +42,6 @@ func insert(network, which, cache string) {
|
|
|
Message: cache,
|
|
|
LastSeen: time.Now(),
|
|
|
}
|
|
|
- ncutils.Log("storing new message: " + cache)
|
|
|
messageCache.Store(fmt.Sprintf("%s%s", network, which), newMessage)
|
|
|
}
|
|
|
|
|
@@ -54,15 +54,15 @@ func read(network, which string) string {
|
|
|
}
|
|
|
if time.Now().After(readMessage.LastSeen.Add(time.Minute)) { // check if message has been there over a minute
|
|
|
messageCache.Delete(fmt.Sprintf("%s%s", network, which)) // remove old message if expired
|
|
|
- ncutils.Log("cached message expired")
|
|
|
return ""
|
|
|
}
|
|
|
- ncutils.Log("cache hit, skipping probably " + readMessage.Message)
|
|
|
return readMessage.Message // return current message if not expired
|
|
|
}
|
|
|
return ""
|
|
|
}
|
|
|
|
|
|
+// == End Message Caches ==
|
|
|
+
|
|
|
// Daemon runs netclient daemon from command line
|
|
|
func Daemon() error {
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
@@ -106,6 +106,12 @@ func SetupMQTT(cfg *config.ClientConfig) mqtt.Client {
|
|
|
ncutils.Log("unable to connect to broker, retrying ...")
|
|
|
if time.Now().After(tperiod) {
|
|
|
ncutils.Log("could not connect to broker, exiting " + cfg.Node.Network + " setup: " + token.Error().Error())
|
|
|
+ if strings.Contains(token.Error().Error(), "connectex") {
|
|
|
+ ncutils.PrintLog("connection issue detected.. restarting daemon", 0)
|
|
|
+ Pull(cfg.Node.Network, true)
|
|
|
+ daemon.Restart()
|
|
|
+ os.Exit(2)
|
|
|
+ }
|
|
|
return client
|
|
|
}
|
|
|
} else {
|
|
@@ -230,7 +236,8 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
|
|
|
if currentMessage == string(data) {
|
|
|
return
|
|
|
}
|
|
|
- insert(newNode.Network, lastNodeUpdate, string(data))
|
|
|
+ insert(newNode.Network, lastNodeUpdate, string(data)) // store new message in cache
|
|
|
+
|
|
|
//check if interface name has changed if so delete.
|
|
|
if cfg.Node.Interface != newNode.Interface {
|
|
|
if err = wireguard.RemoveConf(cfg.Node.Interface, true); err != nil {
|
|
@@ -336,10 +343,9 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
|
|
|
ncutils.Log("error unmarshalling peer data")
|
|
|
return
|
|
|
}
|
|
|
- // see if cache hit, if so skip
|
|
|
+ // see if cached hit, if so skip
|
|
|
var currentMessage = read(peerUpdate.Network, lastPeerUpdate)
|
|
|
if currentMessage == string(data) {
|
|
|
- ncutils.Log("cache hit")
|
|
|
return
|
|
|
}
|
|
|
insert(peerUpdate.Network, lastPeerUpdate, string(data))
|
|
@@ -567,6 +573,10 @@ func parseNetworkFromTopic(topic string) string {
|
|
|
}
|
|
|
|
|
|
func decryptMsg(cfg *config.ClientConfig, msg []byte) ([]byte, error) {
|
|
|
+ if len(msg) <= 24 { // make sure message is of appropriate length
|
|
|
+ return nil, fmt.Errorf("recieved invalid message from broker %s", string(msg))
|
|
|
+ }
|
|
|
+
|
|
|
// setup the keys
|
|
|
diskKey, keyErr := auth.RetrieveTrafficKey(cfg.Node.Network)
|
|
|
if keyErr != nil {
|