Browse Source

Merge pull request #653 from gravitl/feature_v0.10.0_serverPings

Feature v0.10.0 server pings
dcarns 3 years ago
parent
commit
a7015e7260
7 changed files with 182 additions and 21 deletions
  1. 8 1
      controllers/node_grpc.go
  2. 12 0
      logic/networks.go
  3. 1 1
      logic/peers.go
  4. 4 0
      main.go
  5. 1 0
      models/structs.go
  6. 52 2
      mq/mq.go
  7. 104 17
      netclient/functions/daemon.go

+ 8 - 1
controllers/node_grpc.go

@@ -5,6 +5,7 @@ import (
 	"encoding/json"
 	"errors"
 	"strings"
+	"time"
 
 	nodepb "github.com/gravitl/netmaker/grpc"
 	"github.com/gravitl/netmaker/logger"
@@ -71,6 +72,7 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object)
 	var serverAddrs = make([]models.ServerAddr, len(serverNodes))
 	for i, server := range serverNodes {
 		serverAddrs[i] = models.ServerAddr{
+			ID:       server.ID,
 			IsLeader: logic.IsLeader(&server),
 			Address:  server.Address,
 		}
@@ -103,10 +105,15 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object)
 		Type: nodepb.NODE_TYPE,
 	}
 
-	err = logic.SetNetworkNodesLastModified(node.Network)
+	network, err := logic.GetParentNetwork(node.Network)
 	if err != nil {
 		return nil, err
 	}
+	network.NodesLastModified = time.Now().Unix()
+	network.DefaultServerAddrs = serverAddrs
+	if err := logic.SaveNetwork(&network); err != nil {
+		return nil, err
+	}
 	err = runServerPeerUpdate(node.Network, true)
 	if err != nil {
 		logger.Log(1, "internal error when setting peers after node,", node.ID, "was created (gRPC)")

+ 12 - 0
logic/networks.go

@@ -598,6 +598,18 @@ func KeyUpdate(netname string) (models.Network, error) {
 	return models.Network{}, nil
 }
 
+//SaveNetwork - save network struct to database
+func SaveNetwork(network *models.Network) error {
+	data, err := json.Marshal(network)
+	if err != nil {
+		return err
+	}
+	if err := database.Insert(network.NetID, string(data), database.NETWORKS_TABLE_NAME); err != nil {
+		return err
+	}
+	return nil
+}
+
 // == Private ==
 
 func networkNodesUpdateAction(networkName string, action string) error {

+ 1 - 1
logic/peers.go

@@ -57,7 +57,7 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) {
 		}
 		peers = append(peers, peerData)
 		if peer.IsServer == "yes" {
-			serverNodeAddresses = append(serverNodeAddresses, models.ServerAddr{IsLeader: IsLeader(&peer), Address: peer.Address})
+			serverNodeAddresses = append(serverNodeAddresses, models.ServerAddr{ID: peer.ID, IsLeader: IsLeader(&peer), Address: peer.Address})
 		}
 	}
 	peerUpdate.Network = node.Network

+ 4 - 0
main.go

@@ -206,9 +206,13 @@ func runMessageQueue(wg *sync.WaitGroup) {
 		client.Disconnect(240)
 		logger.Log(0, "node update subscription failed")
 	}
+	//Set Up Keepalive message
+	ctx, cancel := context.WithCancel(context.Background())
+	go mq.Keepalive(ctx)
 	quit := make(chan os.Signal, 1)
 	signal.Notify(quit, syscall.SIGTERM, os.Interrupt)
 	<-quit
+	cancel()
 	logger.Log(0, "Message Queue shutting down")
 	client.Disconnect(250)
 }

+ 1 - 0
models/structs.go

@@ -177,6 +177,7 @@ type Telemetry struct {
 
 // ServerAddr - to pass to clients to tell server addresses and if it's the leader or not
 type ServerAddr struct {
+	ID       string `json:"id" bson:"id" yaml:"id"`
 	IsLeader bool   `json:"isleader" bson:"isleader" yaml:"isleader"`
 	Address  string `json:"address" bson:"address" yaml:"address"`
 }

+ 52 - 2
mq/mq.go

@@ -1,11 +1,13 @@
 package mq
 
 import (
+	"context"
 	"encoding/json"
 	"errors"
 	"fmt"
 	"log"
 	"strings"
+	"time"
 
 	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"github.com/gravitl/netmaker/database"
@@ -15,6 +17,9 @@ import (
 	"github.com/gravitl/netmaker/servercfg"
 )
 
+const KEEPALIVE_TIMEOUT = 60 //timeout in seconds
+const MQ_DISCONNECT = 250
+
 // DefaultHandler default message queue handler - only called when GetDebug == true
 var DefaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 	logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload()))
@@ -90,6 +95,7 @@ var UpdateNode mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message)
 				return
 			}
 		}
+		logger.Log(1, "no need to update peers")
 	}()
 }
 
@@ -159,8 +165,52 @@ func SetupMQTT() mqtt.Client {
 	broker := servercfg.GetMessageQueueEndpoint()
 	opts.AddBroker(broker)
 	client := mqtt.NewClient(opts)
-	if token := client.Connect(); token.Wait() && token.Error() != nil {
-		log.Fatal(token.Error())
+	tperiod := time.Now().Add(10 * time.Second)
+	for {
+		if token := client.Connect(); token.Wait() && token.Error() != nil {
+			logger.Log(2, "unable to connect to broker, retrying ...")
+			if time.Now().After(tperiod) {
+				log.Fatal(0, "could not connect to broker, exiting ...", token.Error())
+			}
+		} else {
+			break
+		}
+		time.Sleep(2 * time.Second)
 	}
+	logger.Log(2, "connected to message queue", broker)
 	return client
 }
+
+// Keepalive -- periodically pings all nodes to let them know server is still alive and doing well
+func Keepalive(ctx context.Context) {
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case <-time.After(time.Second * KEEPALIVE_TIMEOUT):
+			client := SetupMQTT()
+			networks, err := logic.GetNetworks()
+			if err != nil {
+				logger.Log(1, "error retrieving networks for keepalive", err.Error())
+			}
+			for _, network := range networks {
+				var id string
+				for _, servAddr := range network.DefaultServerAddrs {
+					if servAddr.IsLeader {
+						id = servAddr.ID
+					}
+				}
+				if id == "" {
+					logger.Log(0, "leader not defined for network", network.NetID)
+					continue
+				}
+				if token := client.Publish("serverkeepalive/"+id, 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil {
+					logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error())
+				} else {
+					logger.Log(2, "keepalive sent for network", network.NetID)
+				}
+				client.Disconnect(MQ_DISCONNECT)
+			}
+		}
+	}
+}

+ 104 - 17
netclient/functions/daemon.go

@@ -3,6 +3,7 @@ package functions
 import (
 	"context"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"log"
 	"os"
@@ -14,6 +15,7 @@ import (
 	"time"
 
 	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/netclient/auth"
 	"github.com/gravitl/netmaker/netclient/config"
@@ -23,6 +25,8 @@ import (
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
+// ServerKeepalive  - stores time of last server keepalive message
+var keepalive = make(map[string]time.Time, 3)
 var messageCache = make(map[string]string, 20)
 
 const lastNodeUpdate = "lnu"
@@ -63,15 +67,24 @@ func SetupMQTT(cfg *config.ClientConfig) mqtt.Client {
 	opts := mqtt.NewClientOptions()
 	for _, server := range cfg.Node.NetworkSettings.DefaultServerAddrs {
 		if server.Address != "" && server.IsLeader {
-			ncutils.Log(fmt.Sprintf("adding server (%s) to listen on network %s \n", server.Address, cfg.Node.Network))
+			ncutils.Log(fmt.Sprintf("adding server (%s) to listen on network %s", server.Address, cfg.Node.Network))
 			opts.AddBroker(server.Address + ":1883")
 			break
 		}
 	}
 	opts.SetDefaultPublishHandler(All)
 	client := mqtt.NewClient(opts)
-	if token := client.Connect(); token.Wait() && token.Error() != nil {
-		log.Fatal(token.Error())
+	tperiod := time.Now().Add(10 * time.Second)
+	for {
+		if token := client.Connect(); token.Wait() && token.Error() != nil {
+			logger.Log(2, "unable to connect to broker, retrying ...")
+			if time.Now().After(tperiod) {
+				log.Fatal(0, "could not connect to broker, exiting ...", token.Error())
+			}
+		} else {
+			break
+		}
+		time.Sleep(2 * time.Second)
 	}
 	return client
 }
@@ -94,15 +107,32 @@ func MessageQueue(ctx context.Context, network string) {
 		log.Fatal(token.Error())
 	}
 	if cfg.DebugOn {
-		ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s \n", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
+		ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
 	}
 	if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
 		log.Fatal(token.Error())
 	}
 	if cfg.DebugOn {
-		ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s \n", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
+		ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
+	}
+	var id string
+	for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
+		if server.IsLeader {
+			id = server.ID
+		}
+		if server.Address != "" {
+			if token := client.Subscribe("serverkeepalive/"+id, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
+				log.Fatal(token.Error())
+			}
+			if cfg.DebugOn {
+				ncutils.Log("subscribed to server keepalives for server " + id)
+			}
+		} else {
+			ncutils.Log("leader not defined for network" + cfg.Network)
+		}
 	}
 	defer client.Disconnect(250)
+	go MonitorKeepalive(ctx, client, &cfg)
 	go Checkin(ctx, &cfg, network)
 	<-ctx.Done()
 	ncutils.Log("shutting down daemon")
@@ -236,20 +266,51 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
 		if shouldReSub {
 			Resubscribe(client, &cfg)
 			cfg.Node.NetworkSettings.DefaultServerAddrs = peerUpdate.ServerAddrs
+			file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
+			err = wireguard.UpdateWgPeers(file, peerUpdate.Peers)
+			if err != nil {
+				ncutils.Log("error updating wireguard peers" + err.Error())
+				return
+			}
+			ncutils.Log("applyWGQuickConf to " + file)
+			err = wireguard.ApplyWGQuickConf(file)
+			if err != nil {
+				ncutils.Log("error restarting wg after peer update " + err.Error())
+				return
+			}
 		}
-		file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
-		err = wireguard.UpdateWgPeers(file, peerUpdate.Peers)
-		if err != nil {
-			ncutils.Log("error updating wireguard peers" + err.Error())
-			return
+	}()
+}
+
+// MonitorKeepalive - checks time last server keepalive received.  If more than 3+ minutes, notify and resubscribe
+func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.ClientConfig) {
+	var id string
+	for _, servAddr := range cfg.NetworkSettings.DefaultServerAddrs {
+		if servAddr.IsLeader {
+			id = servAddr.ID
 		}
-		ncutils.Log("applyWGQuickConf to " + file)
-		err = wireguard.ApplyWGQuickConf(file)
-		if err != nil {
-			ncutils.Log("error restarting wg after peer update " + err.Error())
+	}
+	for {
+		select {
+		case <-ctx.Done():
 			return
+		case <-time.After(time.Second * 150):
+			if time.Since(keepalive[id]) > time.Second*200 { // more than 3+ minutes
+				ncutils.Log("server keepalive not recieved in more than minutes, resubscribe to message queue")
+				Resubscribe(client, cfg)
+			}
 		}
-	}()
+	}
+}
+
+// ServerKeepAlive -- handler to react to keepalive messages published by server
+func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
+	serverid, err := getID(msg.Topic())
+	if err != nil {
+		ncutils.Log("invalid ID in serverkeepalive topic")
+	}
+	keepalive[serverid] = time.Now()
+	ncutils.Log("keepalive from server")
 }
 
 // Resubscribe --- handles resubscribing if needed
@@ -267,6 +328,22 @@ func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) error {
 		if token := client.Subscribe("update/peers/"+cfg.Node.ID, 0, UpdatePeers); token.Wait() && token.Error() != nil {
 			log.Fatal(token.Error())
 		}
+		var id string
+		for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
+			if server.IsLeader {
+				id = server.ID
+			}
+			if server.Address != "" {
+				if token := client.Subscribe("serverkeepalive/"+id, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
+					log.Fatal(token.Error())
+				}
+				if cfg.DebugOn {
+					ncutils.Log("subscribed to server keepalives for server " + id)
+				}
+			} else {
+				ncutils.Log("leader not defined for network" + cfg.Network)
+			}
+		}
 		ncutils.Log("finished re subbing")
 		return nil
 	} else {
@@ -356,14 +433,14 @@ func PublishNodeUpdate(cfg *config.ClientConfig) {
 		ncutils.Log("error marshling node update " + err.Error())
 	}
 	if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data); err != nil {
-		ncutils.Log(fmt.Sprintf("error publishing endpoint update, %v \n", err))
+		ncutils.Log(fmt.Sprintf("error publishing endpoint update, %v", err))
 	}
 }
 
 // Hello -- ping the broker to let server know node is alive and doing fine
 func Hello(cfg *config.ClientConfig, network string) {
 	if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte("hello world!")); err != nil {
-		ncutils.Log(fmt.Sprintf("error publishing ping, %v \n", err))
+		ncutils.Log(fmt.Sprintf("error publishing ping, %v", err))
 	}
 }
 
@@ -422,3 +499,13 @@ func shouldResub(currentServers, newServers []models.ServerAddr) bool {
 	}
 	return false
 }
+
+func getID(topic string) (string, error) {
+	parts := strings.Split(topic, "/")
+	count := len(parts)
+	if count == 1 {
+		return "", errors.New("invalid topic")
+	}
+	//the last part of the topic will be the network.ID
+	return parts[count-1], nil
+}