|
@@ -3,7 +3,6 @@ package functions
|
|
import (
|
|
import (
|
|
"context"
|
|
"context"
|
|
"encoding/json"
|
|
"encoding/json"
|
|
- "errors"
|
|
|
|
"fmt"
|
|
"fmt"
|
|
"os"
|
|
"os"
|
|
"os/signal"
|
|
"os/signal"
|
|
@@ -161,25 +160,24 @@ func MessageQueue(ctx context.Context, network string) {
|
|
if cfg.DebugOn {
|
|
if cfg.DebugOn {
|
|
ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
|
|
ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
|
|
}
|
|
}
|
|
- var id string
|
|
|
|
var found bool
|
|
var found bool
|
|
for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
|
|
for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
|
|
- if server.IsLeader {
|
|
|
|
- id = server.ID
|
|
|
|
|
|
+ if !server.IsLeader {
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
if server.Address != "" {
|
|
if server.Address != "" {
|
|
- if token := client.Subscribe("serverkeepalive/"+id, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
|
|
|
|
|
|
+ if token := client.Subscribe("serverkeepalive/"+cfg.Node.Network, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
|
|
ncutils.Log(token.Error().Error())
|
|
ncutils.Log(token.Error().Error())
|
|
return
|
|
return
|
|
}
|
|
}
|
|
found = true
|
|
found = true
|
|
if cfg.DebugOn {
|
|
if cfg.DebugOn {
|
|
- ncutils.Log("subscribed to server keepalives for server " + id)
|
|
|
|
|
|
+ ncutils.Log("subscribed to server keepalives for server " + cfg.Node.Network)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if !found {
|
|
if !found {
|
|
- ncutils.Log("leader not defined for network " + cfg.Network)
|
|
|
|
|
|
+ ncutils.Log("leader not defined for network " + cfg.Node.Network)
|
|
}
|
|
}
|
|
defer client.Disconnect(250)
|
|
defer client.Disconnect(250)
|
|
go MonitorKeepalive(ctx, client, &cfg)
|
|
go MonitorKeepalive(ctx, client, &cfg)
|
|
@@ -342,19 +340,13 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
|
|
|
|
|
|
// MonitorKeepalive - checks time last server keepalive received. If more than 3+ minutes, notify and resubscribe
|
|
// MonitorKeepalive - checks time last server keepalive received. If more than 3+ minutes, notify and resubscribe
|
|
func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.ClientConfig) {
|
|
func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.ClientConfig) {
|
|
- var id string
|
|
|
|
- for _, servAddr := range cfg.NetworkSettings.DefaultServerAddrs {
|
|
|
|
- if servAddr.IsLeader {
|
|
|
|
- id = servAddr.ID
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
for {
|
|
for {
|
|
select {
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
return
|
|
return
|
|
case <-time.After(time.Second * 150):
|
|
case <-time.After(time.Second * 150):
|
|
var keepalivetime time.Time
|
|
var keepalivetime time.Time
|
|
- keepaliveval, ok := keepalive.Load(id)
|
|
|
|
|
|
+ keepaliveval, ok := keepalive.Load(cfg.Node.Network)
|
|
if ok {
|
|
if ok {
|
|
keepalivetime = keepaliveval.(time.Time)
|
|
keepalivetime = keepaliveval.(time.Time)
|
|
} else {
|
|
} else {
|
|
@@ -374,15 +366,7 @@ func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.Clien
|
|
|
|
|
|
// ServerKeepAlive -- handler to react to keepalive messages published by server
|
|
// ServerKeepAlive -- handler to react to keepalive messages published by server
|
|
func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
|
|
func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
|
|
- // var mu sync.Mutex
|
|
|
|
- // mu.Lock()
|
|
|
|
- // defer mu.Unlock()
|
|
|
|
- serverid, err := getID(msg.Topic())
|
|
|
|
- if err != nil {
|
|
|
|
- ncutils.Log("invalid ID in serverkeepalive topic")
|
|
|
|
- }
|
|
|
|
- keepalive.Store(serverid, time.Now())
|
|
|
|
- // ncutils.Log("keepalive from server")
|
|
|
|
|
|
+ keepalive.Store(parseNetworkFromTopic(msg.Topic()), time.Now())
|
|
}
|
|
}
|
|
|
|
|
|
// Resubscribe --- handles resubscribing if needed
|
|
// Resubscribe --- handles resubscribing if needed
|
|
@@ -402,25 +386,24 @@ func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) error {
|
|
ncutils.Log("error resubscribing to peers for " + cfg.Node.Network)
|
|
ncutils.Log("error resubscribing to peers for " + cfg.Node.Network)
|
|
return token.Error()
|
|
return token.Error()
|
|
}
|
|
}
|
|
- var id string
|
|
|
|
var found bool
|
|
var found bool
|
|
for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
|
|
for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
|
|
- if server.IsLeader {
|
|
|
|
- id = server.ID
|
|
|
|
|
|
+ if !server.IsLeader {
|
|
|
|
+ continue
|
|
}
|
|
}
|
|
if server.Address != "" {
|
|
if server.Address != "" {
|
|
- if token := client.Subscribe("serverkeepalive/"+id, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
|
|
|
|
|
|
+ if token := client.Subscribe("serverkeepalive/"+cfg.Node.Network, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
|
|
ncutils.Log("error resubscribing to serverkeepalive for " + cfg.Node.Network)
|
|
ncutils.Log("error resubscribing to serverkeepalive for " + cfg.Node.Network)
|
|
return token.Error()
|
|
return token.Error()
|
|
}
|
|
}
|
|
found = true
|
|
found = true
|
|
if cfg.DebugOn {
|
|
if cfg.DebugOn {
|
|
- ncutils.Log("subscribed to server keepalives for server " + id)
|
|
|
|
|
|
+ ncutils.Log("subscribed to server keepalives for server " + cfg.Node.Network)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if !found {
|
|
if !found {
|
|
- ncutils.Log("leader not defined for network " + cfg.Network)
|
|
|
|
|
|
+ ncutils.Log("leader not defined for network " + cfg.Node.Network)
|
|
}
|
|
}
|
|
ncutils.Log("finished re subbing")
|
|
ncutils.Log("finished re subbing")
|
|
return nil
|
|
return nil
|
|
@@ -583,13 +566,3 @@ func shouldResub(currentServers, newServers []models.ServerAddr) bool {
|
|
}
|
|
}
|
|
return false
|
|
return false
|
|
}
|
|
}
|
|
-
|
|
|
|
-func getID(topic string) (string, error) {
|
|
|
|
- parts := strings.Split(topic, "/")
|
|
|
|
- count := len(parts)
|
|
|
|
- if count == 1 {
|
|
|
|
- return "", errors.New("invalid topic")
|
|
|
|
- }
|
|
|
|
- //the last part of the topic will be the network.ID
|
|
|
|
- return parts[count-1], nil
|
|
|
|
-}
|
|
|