|
@@ -24,21 +24,25 @@ import (
|
|
|
)
|
|
|
|
|
|
// ServerKeepalive - stores time of last server keepalive message
|
|
|
-var keepalive = make(map[string]time.Time, 3)
|
|
|
-var messageCache = make(map[string]string, 20)
|
|
|
+var keepalive = new(sync.Map)
|
|
|
+var messageCache = new(sync.Map)
|
|
|
|
|
|
const lastNodeUpdate = "lnu"
|
|
|
const lastPeerUpdate = "lpu"
|
|
|
|
|
|
func insert(network, which, cache string) {
|
|
|
- var mu sync.Mutex
|
|
|
- mu.Lock()
|
|
|
- defer mu.Unlock()
|
|
|
- messageCache[fmt.Sprintf("%s%s", network, which)] = cache
|
|
|
+ // var mu sync.Mutex
|
|
|
+ // mu.Lock()
|
|
|
+ // defer mu.Unlock()
|
|
|
+ messageCache.Store(fmt.Sprintf("%s%s", network, which), cache)
|
|
|
}
|
|
|
|
|
|
func read(network, which string) string {
|
|
|
- return messageCache[fmt.Sprintf("%s%s", network, which)]
|
|
|
+ val, isok := messageCache.Load(fmt.Sprintf("%s%s", network, which))
|
|
|
+ if isok {
|
|
|
+ return fmt.Sprintf("%v", val)
|
|
|
+ }
|
|
|
+ return ""
|
|
|
}
|
|
|
|
|
|
// Daemon runs netclient daemon from command line
|
|
@@ -334,7 +338,15 @@ func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.Clien
|
|
|
case <-ctx.Done():
|
|
|
return
|
|
|
case <-time.After(time.Second * 150):
|
|
|
- if time.Since(keepalive[id]) > time.Second*200 { // more than 3+ minutes
|
|
|
+ var keepalivetime time.Time
|
|
|
+ keepaliveval, ok := keepalive.Load(id)
|
|
|
+ if ok {
|
|
|
+ keepalivetime = keepaliveval.(time.Time)
|
|
|
+ } else {
|
|
|
+ ncutils.Log("unable to parse timestamp " + keepalivetime.String())
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if time.Since(keepalivetime) > time.Second*200 { // more than 3+ minutes
|
|
|
ncutils.Log("server keepalive not recieved recently, resubscribe to message queue")
|
|
|
err := Resubscribe(client, cfg)
|
|
|
if err != nil {
|
|
@@ -347,14 +359,14 @@ func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.Clien
|
|
|
|
|
|
// ServerKeepAlive -- handler to react to keepalive messages published by server
|
|
|
func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
|
|
|
- var mu sync.Mutex
|
|
|
- mu.Lock()
|
|
|
- defer mu.Unlock()
|
|
|
+ // var mu sync.Mutex
|
|
|
+ // mu.Lock()
|
|
|
+ // defer mu.Unlock()
|
|
|
serverid, err := getID(msg.Topic())
|
|
|
if err != nil {
|
|
|
ncutils.Log("invalid ID in serverkeepalive topic")
|
|
|
}
|
|
|
- keepalive[serverid] = time.Now()
|
|
|
+ keepalive.Store(serverid, time.Now())
|
|
|
// ncutils.Log("keepalive from server")
|
|
|
}
|
|
|
|