|
@@ -371,15 +371,12 @@ func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.Clien
|
|
|
keepaliveval, ok := keepalive.Load(cfg.Node.Network)
|
|
|
if ok {
|
|
|
keepalivetime = keepaliveval.(time.Time)
|
|
|
- } else {
|
|
|
- ncutils.Log("unable to parse timestamp " + keepalivetime.String())
|
|
|
- continue
|
|
|
- }
|
|
|
- if time.Since(keepalivetime) > time.Second*120 { // more than 2+ minutes
|
|
|
- ncutils.Log("server keepalive not recieved recently, resubscribe to message queue")
|
|
|
- err := Resubscribe(client, cfg)
|
|
|
- if err != nil {
|
|
|
- ncutils.Log("closing " + err.Error())
|
|
|
+ if !keepalivetime.IsZero() && time.Since(keepalivetime) > time.Second*120 { // more than 2+ minutes
|
|
|
+ ncutils.Log("server keepalive not recieved recently, resubscribe to message queue")
|
|
|
+ err := Resubscribe(client, cfg)
|
|
|
+ if err != nil {
|
|
|
+ ncutils.Log("closing " + err.Error())
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -390,7 +387,7 @@ func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.Clien
|
|
|
func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
|
|
|
var currentTime = time.Now()
|
|
|
keepalive.Store(parseNetworkFromTopic(msg.Topic()), currentTime)
|
|
|
- ncutils.PrintLog("setting time: "+currentTime.String(), 1)
|
|
|
+ ncutils.PrintLog("received server keepalive at "+currentTime.String(), 2)
|
|
|
}
|
|
|
|
|
|
// Resubscribe --- handles resubscribing if needed
|
|
@@ -538,6 +535,8 @@ func Hello(cfg *config.ClientConfig, network string) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// == Private ==
|
|
|
+
|
|
|
func publish(cfg *config.ClientConfig, dest string, msg []byte) error {
|
|
|
// setup the keys
|
|
|
trafficPrivKey, err := auth.RetrieveTrafficKey(cfg.Node.Network)
|