Browse Source

refactor server pings

Matthew R Kasun 3 years ago
parent
commit
513f85ede7
5 changed files with 38 additions and 13 deletions
  1. 1 0
      controllers/node_grpc.go
  2. 1 1
      logic/peers.go
  3. 1 0
      models/structs.go
  4. 19 7
      mq/mq.go
  5. 16 5
      netclient/functions/daemon.go

+ 1 - 0
controllers/node_grpc.go

@@ -71,6 +71,7 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object)
 	var serverAddrs = make([]models.ServerAddr, len(serverNodes))
 	var serverAddrs = make([]models.ServerAddr, len(serverNodes))
 	for i, server := range serverNodes {
 	for i, server := range serverNodes {
 		serverAddrs[i] = models.ServerAddr{
 		serverAddrs[i] = models.ServerAddr{
+			ID:       server.ID,
 			IsLeader: logic.IsLeader(&server),
 			IsLeader: logic.IsLeader(&server),
 			Address:  server.Address,
 			Address:  server.Address,
 		}
 		}

+ 1 - 1
logic/peers.go

@@ -57,7 +57,7 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) {
 		}
 		}
 		peers = append(peers, peerData)
 		peers = append(peers, peerData)
 		if peer.IsServer == "yes" {
 		if peer.IsServer == "yes" {
-			serverNodeAddresses = append(serverNodeAddresses, models.ServerAddr{IsLeader: IsLeader(&peer), Address: peer.Address})
+			serverNodeAddresses = append(serverNodeAddresses, models.ServerAddr{ID: peer.ID, IsLeader: IsLeader(&peer), Address: peer.Address})
 		}
 		}
 	}
 	}
 	peerUpdate.Network = node.Network
 	peerUpdate.Network = node.Network

+ 1 - 0
models/structs.go

@@ -172,6 +172,7 @@ type Telemetry struct {
 
 
 // ServerAddr - to pass to clients to tell server addresses and if it's the leader or not
 // ServerAddr - to pass to clients to tell server addresses and if it's the leader or not
 type ServerAddr struct {
 type ServerAddr struct {
+	ID       string `json:"id" bson:"id" yaml:"id"`
 	IsLeader bool   `json:"isleader" bson:"isleader" yaml:"isleader"`
 	IsLeader bool   `json:"isleader" bson:"isleader" yaml:"isleader"`
 	Address  string `json:"address" bson:"address" yaml:"address"`
 	Address  string `json:"address" bson:"address" yaml:"address"`
 }
 }

+ 19 - 7
mq/mq.go

@@ -160,6 +160,7 @@ func SetupMQTT() mqtt.Client {
 	if token := client.Connect(); token.Wait() && token.Error() != nil {
 	if token := client.Connect(); token.Wait() && token.Error() != nil {
 		log.Fatal(token.Error())
 		log.Fatal(token.Error())
 	}
 	}
+	logger.Log(2, "connected to message queue", broker)
 	return client
 	return client
 }
 }
 
 
@@ -170,18 +171,29 @@ func Keepalive(ctx context.Context) {
 		case <-ctx.Done():
 		case <-ctx.Done():
 			return
 			return
 		case <-time.After(time.Second * KEEPALIVE_TIMEOUT):
 		case <-time.After(time.Second * KEEPALIVE_TIMEOUT):
-			nodes, err := logic.GetAllNodes()
+			client := SetupMQTT()
+			networks, err := logic.GetNetworks()
 			if err != nil {
 			if err != nil {
-				logger.Log(1, "error retrieving nodes for keepalive", err.Error())
+				logger.Log(1, "error retrieving networks for keepalive", err.Error())
 			}
 			}
-			client := SetupMQTT()
-			for _, node := range nodes {
-				if token := client.Publish("serverkeepalive/"+node.ID, 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil {
-					logger.Log(1, "error publishing server keepalive", token.Error().Error())
+			for _, network := range networks {
+				var id string
+				for _, servAddr := range network.DefaultServerAddrs {
+					if servAddr.IsLeader {
+						id = servAddr.ID
+					}
+				}
+				if id != "" {
+					logger.Log(0, "leader not defined for network", network.NetID)
+					continue
+				}
+				if token := client.Publish("serverkeepalive/"+id, 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil {
+					logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error())
+				} else {
+					logger.Log(2, "keepalive sent for network", network.NetID)
 				}
 				}
 				client.Disconnect(MQ_DISCONNECT)
 				client.Disconnect(MQ_DISCONNECT)
 			}
 			}
-			logger.Log(2, "keepalive sent to all nodes")
 		}
 		}
 	}
 	}
 }
 }

+ 16 - 5
netclient/functions/daemon.go

@@ -102,11 +102,21 @@ func MessageQueue(ctx context.Context, network string) {
 	if cfg.DebugOn {
 	if cfg.DebugOn {
 		ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/peers/" + cfg.Node.ID)
 		ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/peers/" + cfg.Node.ID)
 	}
 	}
-	if token := client.Subscribe("serverkeepalive/"+cfg.Node.ID, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
-		log.Fatal(token.Error())
-	}
-	if cfg.DebugOn {
-		ncutils.Log("subscribed to server keepalives")
+	var id string
+	for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
+		if server.IsLeader {
+			id = server.ID
+		}
+		if server.Address != "" {
+			if token := client.Subscribe("serverkeepalive/"+id, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
+				log.Fatal(token.Error())
+			}
+			if cfg.DebugOn {
+				ncutils.Log("subscribed to server keepalives")
+			}
+		} else {
+			ncutils.Log("leader not defined for network" + cfg.Network)
+		}
 	}
 	}
 	defer client.Disconnect(250)
 	defer client.Disconnect(250)
 	go MonitorKeepalive(ctx, client, &cfg)
 	go MonitorKeepalive(ctx, client, &cfg)
@@ -270,6 +280,7 @@ func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.Clien
 func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
 func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
 	serverid := string(msg.Payload())
 	serverid := string(msg.Payload())
 	keepalive <- serverid
 	keepalive <- serverid
+	ncutils.Log("keepalive from server")
 }
 }
 
 
 // Resubscribe --- handles resubscribing if needed
 // Resubscribe --- handles resubscribing if needed