Browse Source

publish individual server keepalive messages

Matthew R. Kasun 3 years ago
parent
commit
7e2928ba84
2 changed files with 31 additions and 16 deletions
  1. 15 5
      mq/mq.go
  2. 16 11
      netclient/functions/daemon.go

+ 15 - 5
mq/mq.go

@@ -243,11 +243,7 @@ func Keepalive(ctx context.Context) {
 					logger.Log(1, "leader not defined for network ", network.NetID)
 					continue
 				}
-				if token := client.Publish("serverkeepalive/"+network.NetID, 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil {
-					logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error())
-				} else {
-					logger.Log(2, "keepalive sent for network", network.NetID)
-				}
+				publishServerKeepalive(client, &network)
 				err = serverctl.SyncServerNetwork(network.NetID)
 				if err != nil {
 					logger.Log(1, "error syncing server network", err.Error())
@@ -257,3 +253,17 @@ func Keepalive(ctx context.Context) {
 		}
 	}
 }
+
+func publishServerKeepalive(client mqtt.Client, network *models.Network) {
+	nodes, err := logic.GetNetworkNodes(network.NetID)
+	if err != nil {
+		return
+	}
+	for _, node := range nodes {
+		if token := client.Publish(fmt.Sprintf("serverkeepalive/%s/%s", network.NetID, node.ID), 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil {
+			logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error())
+		} else {
+			logger.Log(2, "keepalive sent for network/node", network.NetID, node.ID)
+		}
+	}
+}

+ 16 - 11
netclient/functions/daemon.go

@@ -183,7 +183,7 @@ func MessageQueue(ctx context.Context, network string) {
 			continue
 		}
 		if server.Address != "" {
-			if token := client.Subscribe("serverkeepalive/"+cfg.Node.Network, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
+			if token := client.Subscribe(fmt.Sprintf("serverkeepalive/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
 				ncutils.Log(token.Error().Error())
 				return
 			}
@@ -419,7 +419,7 @@ func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) error {
 				continue
 			}
 			if server.Address != "" {
-				if token := client.Subscribe("serverkeepalive/"+cfg.Node.Network, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
+				if token := client.Subscribe(fmt.Sprintf("serverkeepalive/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
 					ncutils.Log("error resubscribing to serverkeepalive for " + cfg.Node.Network)
 					return token.Error()
 				}
@@ -483,7 +483,9 @@ func Checkin(ctx context.Context, cfg *config.ClientConfig, network string) {
 				if cfg.Node.Endpoint != extIP && extIP != "" {
 					ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+extIP, 1)
 					cfg.Node.Endpoint = extIP
-					PublishNodeUpdate(cfg)
+					if err := PublishNodeUpdate(cfg); err != nil {
+						ncutils.Log("could not publish endpoint change")
+					}
 				}
 				intIP, err := getPrivateAddr()
 				if err != nil {
@@ -492,7 +494,9 @@ func Checkin(ctx context.Context, cfg *config.ClientConfig, network string) {
 				if cfg.Node.LocalAddress != intIP && intIP != "" {
 					ncutils.PrintLog("local Address has changed from "+cfg.Node.LocalAddress+" to "+intIP, 1)
 					cfg.Node.LocalAddress = intIP
-					PublishNodeUpdate(cfg)
+					if err := PublishNodeUpdate(cfg); err != nil {
+						ncutils.Log("could not publish local address change")
+					}
 				}
 			} else {
 				localIP, err := ncutils.GetLocalIP(cfg.Node.LocalRange)
@@ -502,7 +506,9 @@ func Checkin(ctx context.Context, cfg *config.ClientConfig, network string) {
 				if cfg.Node.Endpoint != localIP && localIP != "" {
 					ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+localIP, 1)
 					cfg.Node.Endpoint = localIP
-					PublishNodeUpdate(cfg)
+					if err := PublishNodeUpdate(cfg); err != nil {
+						ncutils.Log("could not publish localip change")
+					}
 				}
 			}
 			if err := pingServer(cfg); err != nil {
@@ -515,17 +521,18 @@ func Checkin(ctx context.Context, cfg *config.ClientConfig, network string) {
 }
 
 // PublishNodeUpdates -- saves node and pushes changes to broker
-func PublishNodeUpdate(cfg *config.ClientConfig) {
+func PublishNodeUpdate(cfg *config.ClientConfig) error {
 	if err := config.Write(cfg, cfg.Network); err != nil {
-		ncutils.Log("error saving configuration: " + err.Error())
+		return err
 	}
 	data, err := json.Marshal(cfg.Node)
 	if err != nil {
-		ncutils.Log("error marshling node update: " + err.Error())
+		return err
 	}
 	if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data); err != nil {
-		ncutils.Log(fmt.Sprintf("error publishing endpoint update, %v", err))
+		return err
 	}
+	return nil
 }
 
 // Hello -- ping the broker to let server know node is alive and doing fine
@@ -595,14 +602,12 @@ func pingServer(cfg *config.ClientConfig) error {
 	node := getServerAddress(cfg)
 	pinger, err := ping.NewPinger(node)
 	if err != nil {
-		ncutils.Log("error creating pinger " + err.Error())
 		return err
 	}
 	pinger.Timeout = 2 * time.Second
 	pinger.Run()
 	stats := pinger.Statistics()
 	if stats.PacketLoss == 100 {
-		ncutils.PrintLog(fmt.Sprintf("lost packets when pinging server: packets sent:%d packets recieved: %d", stats.PacketsSent, stats.PacketsRecv), 1)
 		return errors.New("ping error")
 	}
 	return nil