|
@@ -3,7 +3,6 @@ package functions
|
|
import (
|
|
import (
|
|
"context"
|
|
"context"
|
|
"encoding/json"
|
|
"encoding/json"
|
|
- "errors"
|
|
|
|
"fmt"
|
|
"fmt"
|
|
"os"
|
|
"os"
|
|
"os/signal"
|
|
"os/signal"
|
|
@@ -30,17 +29,31 @@ var messageCache = new(sync.Map)
|
|
const lastNodeUpdate = "lnu"
|
|
const lastNodeUpdate = "lnu"
|
|
const lastPeerUpdate = "lpu"
|
|
const lastPeerUpdate = "lpu"
|
|
|
|
|
|
|
|
+type cachedMessage struct {
|
|
|
|
+ Message string
|
|
|
|
+ LastSeen time.Time
|
|
|
|
+}
|
|
|
|
+
|
|
func insert(network, which, cache string) {
|
|
func insert(network, which, cache string) {
|
|
- // var mu sync.Mutex
|
|
|
|
- // mu.Lock()
|
|
|
|
- // defer mu.Unlock()
|
|
|
|
- messageCache.Store(fmt.Sprintf("%s%s", network, which), cache)
|
|
|
|
|
|
+ var newMessage = cachedMessage{
|
|
|
|
+ Message: cache,
|
|
|
|
+ LastSeen: time.Now(),
|
|
|
|
+ }
|
|
|
|
+ ncutils.Log("storing new message: " + cache)
|
|
|
|
+ messageCache.Store(fmt.Sprintf("%s%s", network, which), newMessage)
|
|
}
|
|
}
|
|
|
|
|
|
func read(network, which string) string {
|
|
func read(network, which string) string {
|
|
val, isok := messageCache.Load(fmt.Sprintf("%s%s", network, which))
|
|
val, isok := messageCache.Load(fmt.Sprintf("%s%s", network, which))
|
|
if isok {
|
|
if isok {
|
|
- return fmt.Sprintf("%v", val)
|
|
|
|
|
|
+ var readMessage = val.(cachedMessage) // fetch current cached message
|
|
|
|
+ if time.Now().After(readMessage.LastSeen.Add(time.Minute)) { // check if message has been there over a minute
|
|
|
|
+ messageCache.Delete(fmt.Sprintf("%s%s", network, which)) // remove old message if expired
|
|
|
|
+ ncutils.Log("cached message expired")
|
|
|
|
+ return ""
|
|
|
|
+ }
|
|
|
|
+ ncutils.Log("cache hit, skipping probably " + readMessage.Message)
|
|
|
|
+ return readMessage.Message // return current message if not expired
|
|
}
|
|
}
|
|
return ""
|
|
return ""
|
|
}
|
|
}
|
|
@@ -108,19 +121,27 @@ func MessageQueue(ctx context.Context, network string) {
|
|
var cfg config.ClientConfig
|
|
var cfg config.ClientConfig
|
|
cfg.Network = network
|
|
cfg.Network = network
|
|
ncutils.Log("pulling latest config for " + cfg.Network)
|
|
ncutils.Log("pulling latest config for " + cfg.Network)
|
|
- sleepTime := 2
|
|
|
|
- for {
|
|
|
|
- _, err := Pull(network, true)
|
|
|
|
- if err == nil {
|
|
|
|
- break
|
|
|
|
- }
|
|
|
|
- if sleepTime > 3600 {
|
|
|
|
- sleepTime = 3600
|
|
|
|
|
|
+ var configPath = fmt.Sprintf("%sconfig/netconfig-%s", ncutils.GetNetclientPathSpecific(), network)
|
|
|
|
+ fileInfo, err := os.Stat(configPath)
|
|
|
|
+ if err != nil {
|
|
|
|
+ ncutils.Log("could not stat config file: " + configPath)
|
|
|
|
+ }
|
|
|
|
+ // speed up UDP rest
|
|
|
|
+ if time.Now().After(fileInfo.ModTime().Add(time.Minute)) {
|
|
|
|
+ sleepTime := 2
|
|
|
|
+ for {
|
|
|
|
+ _, err := Pull(network, true)
|
|
|
|
+ if err == nil {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ if sleepTime > 3600 {
|
|
|
|
+ sleepTime = 3600
|
|
|
|
+ }
|
|
|
|
+ ncutils.Log("failed to pull for network " + network)
|
|
|
|
+ ncutils.Log(fmt.Sprintf("waiting %d seconds to retry...", sleepTime))
|
|
|
|
+ time.Sleep(time.Second * time.Duration(sleepTime))
|
|
|
|
+ sleepTime = sleepTime * 2
|
|
}
|
|
}
|
|
- ncutils.Log("failed to pull for network " + network)
|
|
|
|
- ncutils.Log(fmt.Sprintf("waiting %d seconds to retry...", sleepTime))
|
|
|
|
- time.Sleep(time.Second * time.Duration(sleepTime))
|
|
|
|
- sleepTime = sleepTime * 2
|
|
|
|
}
|
|
}
|
|
time.Sleep(time.Second << 1)
|
|
time.Sleep(time.Second << 1)
|
|
cfg.ReadConfig()
|
|
cfg.ReadConfig()
|
|
@@ -147,25 +168,24 @@ func MessageQueue(ctx context.Context, network string) {
|
|
if cfg.DebugOn {
|
|
if cfg.DebugOn {
|
|
ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
|
|
ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
|
|
}
|
|
}
|
|
- var id string
|
|
|
|
var found bool
|
|
var found bool
|
|
for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
|
|
for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
|
|
- if server.IsLeader {
|
|
|
|
- id = server.ID
|
|
|
|
|
|
+ if !server.IsLeader {
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
if server.Address != "" {
|
|
if server.Address != "" {
|
|
- if token := client.Subscribe("serverkeepalive/"+id, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
|
|
|
|
|
|
+ if token := client.Subscribe("serverkeepalive/"+cfg.Node.Network, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
|
|
ncutils.Log(token.Error().Error())
|
|
ncutils.Log(token.Error().Error())
|
|
return
|
|
return
|
|
}
|
|
}
|
|
found = true
|
|
found = true
|
|
if cfg.DebugOn {
|
|
if cfg.DebugOn {
|
|
- ncutils.Log("subscribed to server keepalives for server " + id)
|
|
|
|
|
|
+ ncutils.Log("subscribed to server keepalives for server " + cfg.Node.Network)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if !found {
|
|
if !found {
|
|
- ncutils.Log("leader not defined for network " + cfg.Network)
|
|
|
|
|
|
+ ncutils.Log("leader not defined for network " + cfg.Node.Network)
|
|
}
|
|
}
|
|
defer client.Disconnect(250)
|
|
defer client.Disconnect(250)
|
|
go MonitorKeepalive(ctx, client, &cfg)
|
|
go MonitorKeepalive(ctx, client, &cfg)
|
|
@@ -219,6 +239,7 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
|
|
newNode.OS = runtime.GOOS
|
|
newNode.OS = runtime.GOOS
|
|
// check if interface needs to delta
|
|
// check if interface needs to delta
|
|
ifaceDelta := ncutils.IfaceDelta(&cfg.Node, &newNode)
|
|
ifaceDelta := ncutils.IfaceDelta(&cfg.Node, &newNode)
|
|
|
|
+ shouldDNSChange := cfg.Node.DNSOn != newNode.DNSOn
|
|
|
|
|
|
cfg.Node = newNode
|
|
cfg.Node = newNode
|
|
switch newNode.Action {
|
|
switch newNode.Action {
|
|
@@ -265,24 +286,18 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
|
|
ncutils.Log("error resubscribing after interface change " + err.Error())
|
|
ncutils.Log("error resubscribing after interface change " + err.Error())
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- }
|
|
|
|
- /*
|
|
|
|
- else {
|
|
|
|
- ncutils.Log("syncing conf to " + file)
|
|
|
|
- err = wireguard.SyncWGQuickConf(cfg.Node.Interface, file)
|
|
|
|
- if err != nil {
|
|
|
|
- ncutils.Log("error syncing wg after peer update " + err.Error())
|
|
|
|
- return
|
|
|
|
|
|
+ if newNode.DNSOn == "yes" {
|
|
|
|
+ ncutils.Log("setting up DNS")
|
|
|
|
+ for _, server := range cfg.Node.NetworkSettings.DefaultServerAddrs {
|
|
|
|
+ if server.IsLeader {
|
|
|
|
+ go setDNS(cfg.Node.Interface, cfg.Network, server.Address)
|
|
|
|
+ break
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- */
|
|
|
|
|
|
+ }
|
|
//deal with DNS
|
|
//deal with DNS
|
|
- if newNode.DNSOn == "yes" {
|
|
|
|
- ncutils.Log("setting up DNS")
|
|
|
|
- if err = local.UpdateDNS(cfg.Node.Interface, cfg.Network, cfg.Server.CoreDNSAddr); err != nil {
|
|
|
|
- ncutils.Log("error applying dns" + err.Error())
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
|
|
+ if newNode.DNSOn != "yes" && shouldDNSChange && cfg.Node.Interface != "" {
|
|
ncutils.Log("settng DNS off")
|
|
ncutils.Log("settng DNS off")
|
|
_, err := ncutils.RunCmd("/usr/bin/resolvectl revert "+cfg.Node.Interface, true)
|
|
_, err := ncutils.RunCmd("/usr/bin/resolvectl revert "+cfg.Node.Interface, true)
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -311,14 +326,12 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
|
|
return
|
|
return
|
|
}
|
|
}
|
|
// see if cache hit, if so skip
|
|
// see if cache hit, if so skip
|
|
- /*
|
|
|
|
- var currentMessage = read(peerUpdate.Network, lastPeerUpdate)
|
|
|
|
- if currentMessage == string(data) {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- */
|
|
|
|
|
|
+ var currentMessage = read(peerUpdate.Network, lastPeerUpdate)
|
|
|
|
+ if currentMessage == string(data) {
|
|
|
|
+ ncutils.Log("cache hit")
|
|
|
|
+ return
|
|
|
|
+ }
|
|
insert(peerUpdate.Network, lastPeerUpdate, string(data))
|
|
insert(peerUpdate.Network, lastPeerUpdate, string(data))
|
|
- ncutils.Log("update peer handler")
|
|
|
|
|
|
|
|
file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
|
|
file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
|
|
err = wireguard.UpdateWgPeers(file, peerUpdate.Peers)
|
|
err = wireguard.UpdateWgPeers(file, peerUpdate.Peers)
|
|
@@ -326,31 +339,25 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
|
|
ncutils.Log("error updating wireguard peers" + err.Error())
|
|
ncutils.Log("error updating wireguard peers" + err.Error())
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- ncutils.Log("syncing conf to " + file)
|
|
|
|
//err = wireguard.SyncWGQuickConf(cfg.Node.Interface, file)
|
|
//err = wireguard.SyncWGQuickConf(cfg.Node.Interface, file)
|
|
err = wireguard.SetPeers(cfg.Node.Interface, cfg.Node.PersistentKeepalive, peerUpdate.Peers)
|
|
err = wireguard.SetPeers(cfg.Node.Interface, cfg.Node.PersistentKeepalive, peerUpdate.Peers)
|
|
if err != nil {
|
|
if err != nil {
|
|
ncutils.Log("error syncing wg after peer update " + err.Error())
|
|
ncutils.Log("error syncing wg after peer update " + err.Error())
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
+ ncutils.Log(fmt.Sprintf("received peer update on network, %s", cfg.Network))
|
|
}()
|
|
}()
|
|
}
|
|
}
|
|
|
|
|
|
// MonitorKeepalive - checks time last server keepalive received. If more than 3+ minutes, notify and resubscribe
|
|
// MonitorKeepalive - checks time last server keepalive received. If more than 3+ minutes, notify and resubscribe
|
|
func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.ClientConfig) {
|
|
func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.ClientConfig) {
|
|
- var id string
|
|
|
|
- for _, servAddr := range cfg.NetworkSettings.DefaultServerAddrs {
|
|
|
|
- if servAddr.IsLeader {
|
|
|
|
- id = servAddr.ID
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
for {
|
|
for {
|
|
select {
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
return
|
|
return
|
|
case <-time.After(time.Second * 150):
|
|
case <-time.After(time.Second * 150):
|
|
var keepalivetime time.Time
|
|
var keepalivetime time.Time
|
|
- keepaliveval, ok := keepalive.Load(id)
|
|
|
|
|
|
+ keepaliveval, ok := keepalive.Load(cfg.Node.Network)
|
|
if ok {
|
|
if ok {
|
|
keepalivetime = keepaliveval.(time.Time)
|
|
keepalivetime = keepaliveval.(time.Time)
|
|
} else {
|
|
} else {
|
|
@@ -370,15 +377,7 @@ func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.Clien
|
|
|
|
|
|
// ServerKeepAlive -- handler to react to keepalive messages published by server
|
|
// ServerKeepAlive -- handler to react to keepalive messages published by server
|
|
func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
|
|
func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
|
|
- // var mu sync.Mutex
|
|
|
|
- // mu.Lock()
|
|
|
|
- // defer mu.Unlock()
|
|
|
|
- serverid, err := getID(msg.Topic())
|
|
|
|
- if err != nil {
|
|
|
|
- ncutils.Log("invalid ID in serverkeepalive topic")
|
|
|
|
- }
|
|
|
|
- keepalive.Store(serverid, time.Now())
|
|
|
|
- // ncutils.Log("keepalive from server")
|
|
|
|
|
|
+ keepalive.Store(parseNetworkFromTopic(msg.Topic()), time.Now())
|
|
}
|
|
}
|
|
|
|
|
|
// Resubscribe --- handles resubscribing if needed
|
|
// Resubscribe --- handles resubscribing if needed
|
|
@@ -398,25 +397,24 @@ func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) error {
|
|
ncutils.Log("error resubscribing to peers for " + cfg.Node.Network)
|
|
ncutils.Log("error resubscribing to peers for " + cfg.Node.Network)
|
|
return token.Error()
|
|
return token.Error()
|
|
}
|
|
}
|
|
- var id string
|
|
|
|
var found bool
|
|
var found bool
|
|
for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
|
|
for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
|
|
- if server.IsLeader {
|
|
|
|
- id = server.ID
|
|
|
|
|
|
+ if !server.IsLeader {
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
if server.Address != "" {
|
|
if server.Address != "" {
|
|
- if token := client.Subscribe("serverkeepalive/"+id, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
|
|
|
|
|
|
+ if token := client.Subscribe("serverkeepalive/"+cfg.Node.Network, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
|
|
ncutils.Log("error resubscribing to serverkeepalive for " + cfg.Node.Network)
|
|
ncutils.Log("error resubscribing to serverkeepalive for " + cfg.Node.Network)
|
|
return token.Error()
|
|
return token.Error()
|
|
}
|
|
}
|
|
found = true
|
|
found = true
|
|
if cfg.DebugOn {
|
|
if cfg.DebugOn {
|
|
- ncutils.Log("subscribed to server keepalives for server " + id)
|
|
|
|
|
|
+ ncutils.Log("subscribed to server keepalives for server " + cfg.Node.Network)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if !found {
|
|
if !found {
|
|
- ncutils.Log("leader not defined for network " + cfg.Network)
|
|
|
|
|
|
+ ncutils.Log("leader not defined for network " + cfg.Node.Network)
|
|
}
|
|
}
|
|
ncutils.Log("finished re subbing")
|
|
ncutils.Log("finished re subbing")
|
|
return nil
|
|
return nil
|
|
@@ -568,24 +566,15 @@ func decryptMsg(cfg *config.ClientConfig, msg []byte) ([]byte, error) {
|
|
return ncutils.BoxDecrypt(msg, serverPubKey, diskKey)
|
|
return ncutils.BoxDecrypt(msg, serverPubKey, diskKey)
|
|
}
|
|
}
|
|
|
|
|
|
-func shouldResub(currentServers, newServers []models.ServerAddr) bool {
|
|
|
|
- if len(currentServers) != len(newServers) {
|
|
|
|
- return true
|
|
|
|
|
|
+func setDNS(iface, network, address string) {
|
|
|
|
+ var reachable bool
|
|
|
|
+ for counter := 0; !reachable && counter < 5; counter++ {
|
|
|
|
+ reachable = local.IsDNSReachable(address)
|
|
|
|
+ time.Sleep(time.Second << 1)
|
|
}
|
|
}
|
|
- for _, srv := range currentServers {
|
|
|
|
- if !ncutils.ServerAddrSliceContains(newServers, srv) {
|
|
|
|
- return true
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return false
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func getID(topic string) (string, error) {
|
|
|
|
- parts := strings.Split(topic, "/")
|
|
|
|
- count := len(parts)
|
|
|
|
- if count == 1 {
|
|
|
|
- return "", errors.New("invalid topic")
|
|
|
|
|
|
+ if !reachable {
|
|
|
|
+ ncutils.Log("not setting dns, server unreachable: " + address)
|
|
|
|
+ } else if err := local.UpdateDNS(iface, network, address); err != nil {
|
|
|
|
+ ncutils.Log("error applying dns" + err.Error())
|
|
}
|
|
}
|
|
- //the last part of the topic will be the network.ID
|
|
|
|
- return parts[count-1], nil
|
|
|
|
}
|
|
}
|