Browse Source

set retained true on server publish and client ping server on checkin

Matthew R. Kasun 3 years ago
parent
commit
6f0950792f
4 changed files with 73 additions and 34 deletions
  1. 5 1
      go.mod
  2. 7 0
      go.sum
  3. 1 1
      mq/util.go
  4. 60 32
      netclient/functions/daemon.go

+ 5 - 1
go.mod

@@ -31,7 +31,10 @@ require (
 	gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
 )
 
-require github.com/posthog/posthog-go v0.0.0-20211028072449-93c17c49e2b0
+require (
+	github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534
+	github.com/posthog/posthog-go v0.0.0-20211028072449-93c17c49e2b0
+)
 
 require (
 	cloud.google.com/go v0.34.0 // indirect
@@ -50,5 +53,6 @@ require (
 	github.com/russross/blackfriday/v2 v2.0.1 // indirect
 	github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
 	github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect
+	golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
 	google.golang.org/appengine v1.4.0 // indirect
 )

+ 7 - 0
go.sum

@@ -36,6 +36,8 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S
 github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
+github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 h1:dhy9OQKGBh4zVXbjwbxxHjRxMJtLXj3zfgpBYQaR4Q4=
+github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534/go.mod h1:xIFjORFzTxqIV/tDVGO4eDy/bLuSyawEeojSm3GfRGk=
 github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
 github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
 github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU=
@@ -72,6 +74,7 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
 github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
 github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
 github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH4=
@@ -198,6 +201,7 @@ golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v
 golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
 golang.org/x/net v0.0.0-20210504132125-bbd867fde50d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985 h1:4CSI6oo7cOjJKajidEljs9h+uP0rRZBPPPhcCbj5mw8=
 golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
@@ -208,6 +212,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -229,6 +235,7 @@ golang.org/x/sys v0.0.0-20210123111255-9b0068b26619/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210216163648-f7da38b97c65/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210309040221-94ec62e08169/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

+ 1 - 1
mq/util.go

@@ -50,7 +50,7 @@ func publish(node *models.Node, dest string, msg []byte) error {
 	if encryptErr != nil {
 		return encryptErr
 	}
-	if token := client.Publish(dest, 0, false, encrypted); token.Wait() && token.Error() != nil {
+	if token := client.Publish(dest, 0, true, encrypted); token.Wait() && token.Error() != nil {
 		return token.Error()
 	}
 	return nil

+ 60 - 32
netclient/functions/daemon.go

@@ -3,6 +3,7 @@ package functions
 import (
 	"context"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"os"
 	"os/signal"
@@ -13,6 +14,7 @@ import (
 	"time"
 
 	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"github.com/go-ping/ping"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/netclient/auth"
 	"github.com/gravitl/netmaker/netclient/config"
@@ -83,13 +85,9 @@ func Daemon() error {
 // SetupMQTT creates a connection to broker and return client
 func SetupMQTT(cfg *config.ClientConfig) mqtt.Client {
 	opts := mqtt.NewClientOptions()
-	for _, server := range cfg.Node.NetworkSettings.DefaultServerAddrs {
-		if server.Address != "" && server.IsLeader {
-			// ncutils.Log(fmt.Sprintf("adding server (%s) to listen on network %s", server.Address, cfg.Node.Network))
-			opts.AddBroker(server.Address + ":1883")
-			break
-		}
-	}
+	server := getServerAddress(cfg)
+	opts.AddBroker(server + ":1883")
+
 	opts.SetDefaultPublishHandler(All)
 	client := mqtt.NewClient(opts)
 	tperiod := time.Now().Add(12 * time.Second)
@@ -123,32 +121,32 @@ func MessageQueue(ctx context.Context, network string) {
 	ncutils.Log("netclient go routine started for " + network)
 	var cfg config.ClientConfig
 	cfg.Network = network
-	var configPath = fmt.Sprintf("%snetconfig-%s", ncutils.GetNetclientPathSpecific(), network)
-	fileInfo, err := os.Stat(configPath)
-	if err != nil {
-		ncutils.Log("could not stat config file: " + configPath)
-	}
+	//var configPath = fmt.Sprintf("%snetconfig-%s", ncutils.GetNetclientPathSpecific(), network)
+	//fileInfo, err := os.Stat(configPath)
+	//if err != nil {
+	//	ncutils.Log("could not stat config file: " + configPath)
+	//}
 	// speed up UDP rest
-	if time.Now().After(fileInfo.ModTime().Add(time.Minute)) {
-		sleepTime := 2
-		ncutils.Log("pulling latest config for " + cfg.Network)
-		for {
-			_, err := Pull(network, true)
-			if err == nil {
-				break
-			} else {
-				ncutils.PrintLog("error pulling config for "+network+": "+err.Error(), 1)
-			}
-			if sleepTime > 3600 {
-				sleepTime = 3600
-			}
-			ncutils.Log("failed to pull for network " + network)
-			ncutils.Log(fmt.Sprintf("waiting %d seconds to retry...", sleepTime))
-			time.Sleep(time.Second * time.Duration(sleepTime))
-			sleepTime = sleepTime * 2
-		}
-	}
-	time.Sleep(time.Second << 1)
+	//	if time.Now().After(fileInfo.ModTime().Add(time.Minute)) {
+	//		sleepTime := 2
+	//		ncutils.Log("pulling latest config for " + cfg.Network)
+	//		for {
+	//			_, err := Pull(network, true)
+	//			if err == nil {
+	//				break
+	//			} else {
+	//				ncutils.PrintLog("error pulling config for "+network+": "+err.Error(), 1)
+	//			}
+	//			if sleepTime > 3600 {
+	//				sleepTime = 3600
+	//			}
+	//			ncutils.Log("failed to pull for network " + network)
+	//			ncutils.Log(fmt.Sprintf("waiting %d seconds to retry...", sleepTime))
+	//			time.Sleep(time.Second * time.Duration(sleepTime))
+	//			sleepTime = sleepTime * 2
+	//		}
+	//	}
+	//time.Sleep(time.Second << 1)
 	cfg.ReadConfig()
 	ncutils.Log("daemon started for network: " + network)
 	client := SetupMQTT(&cfg)
@@ -503,6 +501,9 @@ func Checkin(ctx context.Context, cfg *config.ClientConfig, network string) {
 					PublishNodeUpdate(cfg)
 				}
 			}
+			if err := pingServer(cfg); err != nil {
+				ncutils.PrintLog("could not ping server"+err.Error(), 0)
+			}
 			Hello(cfg, network)
 			// ncutils.Log("Checkin complete")
 		}
@@ -592,3 +593,30 @@ func setDNS(iface, network, address string) {
 		ncutils.Log("error applying dns" + err.Error())
 	}
 }
+
+func pingServer(cfg *config.ClientConfig) error {
+	node := getServerAddress(cfg)
+	pinger, err := ping.NewPinger(node)
+	if err != nil {
+		ncutils.Log("error creating pinger " + err.Error())
+		return err
+	}
+	pinger.Timeout = 2 * time.Second
+	pinger.Run()
+	stats := pinger.Statistics()
+	if stats.PacketLoss != 100 {
+		ncutils.PrintLog(fmt.Sprintf("lost packets when pinging server: packets sent:%d packets recieved: %d", stats.PacketsSent, stats.PacketsRecv), 1)
+		return errors.New("ping error")
+	}
+	return nil
+}
+
+func getServerAddress(cfg *config.ClientConfig) string {
+	var server models.ServerAddr
+	for _, server = range cfg.Node.NetworkSettings.DefaultServerAddrs {
+		if server.Address != "" && server.IsLeader {
+			break
+		}
+	}
+	return server.Address
+}