Browse Source

work in progress for message queue

Matthew R Kasun 3 years ago
parent
commit
cb51a6be78
4 changed files with 153 additions and 254 deletions
  1. 3 0
      go.mod
  2. 6 0
      go.sum
  3. 5 0
      netclient/command/commands.go
  4. 139 254
      netclient/functions/daemon.go

+ 3 - 0
go.mod

@@ -34,7 +34,9 @@ require (
 	cloud.google.com/go v0.34.0 // indirect
 	github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect
 	github.com/davecgh/go-spew v1.1.1 // indirect
+	github.com/eclipse/paho.mqtt.golang v1.3.5 // indirect
 	github.com/felixge/httpsnoop v1.0.1 // indirect
+	github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 // indirect
 	github.com/go-playground/locales v0.14.0 // indirect
 	github.com/go-playground/universal-translator v0.18.0 // indirect
 	github.com/google/go-cmp v0.5.5 // indirect
@@ -46,5 +48,6 @@ require (
 	github.com/pmezard/go-difflib v1.0.0 // indirect
 	github.com/russross/blackfriday/v2 v2.0.1 // indirect
 	github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
+	golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
 	google.golang.org/appengine v1.4.0 // indirect
 )

+ 6 - 0
go.sum

@@ -36,6 +36,8 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S
 github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
+github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 h1:dhy9OQKGBh4zVXbjwbxxHjRxMJtLXj3zfgpBYQaR4Q4=
+github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534/go.mod h1:xIFjORFzTxqIV/tDVGO4eDy/bLuSyawEeojSm3GfRGk=
 github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
 github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
 github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU=
@@ -192,6 +194,7 @@ golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v
 golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
 golang.org/x/net v0.0.0-20210504132125-bbd867fde50d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985 h1:4CSI6oo7cOjJKajidEljs9h+uP0rRZBPPPhcCbj5mw8=
 golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
@@ -202,6 +205,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -223,6 +228,7 @@ golang.org/x/sys v0.0.0-20210123111255-9b0068b26619/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210216163648-f7da38b97c65/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210309040221-94ec62e08169/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

+ 5 - 0
netclient/command/commands.go

@@ -215,3 +215,8 @@ func Daemon() error {
 	err := functions.Daemon()
 	return err
 }
+
+func Daemon() error {
+	err := functions.Daemon()
+	return err
+}

+ 139 - 254
netclient/functions/daemon.go

@@ -1,329 +1,214 @@
 package functions
 
 import (
-	"context"
 	"encoding/json"
+	"fmt"
 	"log"
-	"os"
-	"os/signal"
-	"runtime"
-	"strings"
-	"syscall"
 	"time"
 
 	mqtt "github.com/eclipse/paho.mqtt.golang"
-	"github.com/gravitl/netmaker/models"
+	"github.com/go-ping/ping"
 	"github.com/gravitl/netmaker/netclient/config"
 	"github.com/gravitl/netmaker/netclient/ncutils"
-	"github.com/gravitl/netmaker/netclient/wireguard"
 	"golang.zx2c4.com/wireguard/wgctrl"
-	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
-// Daemon runs netclient daemon from command line
 func Daemon() error {
-	ctx, cancel := context.WithCancel(context.Background())
 	networks, err := ncutils.GetSystemNetworks()
 	if err != nil {
-		cancel()
 		return err
 	}
 	for _, network := range networks {
-		go Netclient(ctx, network)
+		go Netclient(network)
+	}
+	for {
 	}
-	quit := make(chan os.Signal, 1)
-	signal.Notify(quit, syscall.SIGTERM, os.Interrupt)
-	<-quit
-	cancel()
-	ncutils.Log("all done")
 	return nil
 }
 
-// SetupMQTT creates a connection to broker and return client
-func SetupMQTT(cfg config.ClientConfig) mqtt.Client {
+func Netclient(network string) {
+	var cfg config.ClientConfig
+	cfg.Network = network
+	cfg.ReadConfig()
+	ncutils.Log("daemon started for network:" + network)
+	//setup MQTT
 	opts := mqtt.NewClientOptions()
 	ncutils.Log("setting broker to " + cfg.Server.CoreDNSAddr + ":1883")
 	opts.AddBroker(cfg.Server.CoreDNSAddr + ":1883")
 	opts.SetDefaultPublishHandler(All)
+	opts.SetClientID("netclient-mqtt")
 	client := mqtt.NewClient(opts)
 	if token := client.Connect(); token.Wait() && token.Error() != nil {
 		log.Fatal(token.Error())
 	}
-	return client
-}
-
-// Netclient sets up Message Queue and subsribes/publishes updates to/from server
-func Netclient(ctx context.Context, network string) {
-	ncutils.Log("netclient go routine started for " + network)
-	var cfg config.ClientConfig
-	cfg.Network = network
-	cfg.ReadConfig()
-	//fix NodeID to remove ### so NodeID can be used as message topic
-	//remove with GRA-73
-	cfg.Node.ID = strings.Replace(cfg.Node.ID, "###", "-", 1)
-	ncutils.Log("daemon started for network:" + network)
-	client := SetupMQTT(cfg)
 	if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
 		log.Fatal(token.Error())
 	}
-	client.AddRoute("update/"+cfg.Node.ID, NodeUpdate)
-	client.AddRoute("update/peers/"+cfg.Node.ID, UpdatePeers)
-	//handle key updates in node update
-	//client.AddRoute("update/keys/"+cfg.Node.ID, UpdateKeys)
+	client.AddRoute("update/"+network+"/"+cfg.Node.MacAddress, NodeUpdate)
+	client.AddRoute("update/"+network+"/peers", UpdatePeers)
+	client.AddRoute("update/"+network+"/keys", UpdateKeys)
+	client.AddRoute("update/"+network+"/keys/"+cfg.Node.MacAddress, UpdateKeys)
 	defer client.Disconnect(250)
-	go Checkin(ctx, cfg, network)
-	go Metrics(ctx, cfg, network)
-	<-ctx.Done()
-	ncutils.Log("shutting down daemon")
+	go Checkin(client, network)
+	//go Metrics(client, network)
+	//go Connectivity(client, network)
+	for {
+	}
 }
 
-// All -- mqtt message hander for all ('#') topics
 var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 	ncutils.Log("Topic: " + string(msg.Topic()))
 	ncutils.Log("Message: " + string(msg.Payload()))
 }
 
-// NodeUpdate -- mqtt message handler for /update/<NodeID> topic
 var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 	ncutils.Log("received message to update node " + string(msg.Payload()))
-	//potentiall blocking i/o so do this in a go routine
-	go func() {
-		var newNode models.Node
-		var cfg config.ClientConfig
-		cfg.Network = newNode.Network
-		cfg.ReadConfig()
-		err := json.Unmarshal(msg.Payload(), &newNode)
-		if err != nil {
-			ncutils.Log("error unmarshalling node update data" + err.Error())
-			return
-		}
-		//check if interface name has changed if so delete.
-		if cfg.Node.Interface != newNode.Interface {
-			if err = wireguard.RemoveConf(cfg.Node.Interface, true); err != nil {
-				ncutils.PrintLog("could not delete old interface "+cfg.Node.Interface+": "+err.Error(), 1)
-			}
-		}
-		newNode.PullChanges = "no"
-		//ensure that OS never changes
-		newNode.OS = runtime.GOOS
-		cfg.Node = newNode
-		switch newNode.Action {
-		case models.NODE_DELETE:
-			if err := RemoveLocalInstance(&cfg, cfg.Network); err != nil {
-				ncutils.PrintLog("error deleting local instance: "+err.Error(), 1)
-				return
-			}
-		case models.NODE_UPDATE_KEY:
-			UpdateKeys(&cfg, client)
-		case models.NODE_NOOP:
-		default:
-		}
-		//Save new config
-		if err := config.Write(&cfg, cfg.Network); err != nil {
-			ncutils.PrintLog("error updating node configuration: "+err.Error(), 1)
-		}
-		nameserver := cfg.Server.CoreDNSAddr
-		privateKey, err := wireguard.RetrievePrivKey(newNode.Network)
-		if err != nil {
-			ncutils.Log("error reading PrivateKey " + err.Error())
-			return
-		}
-		if err := wireguard.UpdateWgInterface(cfg.Node.Interface, privateKey, nameserver, newNode); err != nil {
-			ncutils.Log("error updating wireguard config " + err.Error())
-			return
-		}
-		// path hardcoded for now... should be updated
-		err = wireguard.ApplyWGQuickConf("/etc/netclient/config/" + cfg.Node.Interface + ".conf")
-		if err != nil {
-			ncutils.Log("error restarting wg after node update " + err.Error())
-			return
-		}
-	}()
 }
 
-// UpdatePeers -- mqtt message handler for /update/peers/<NodeID> topic
 var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 	ncutils.Log("received message to update peers " + string(msg.Payload()))
-	go func() {
-		var peerUpdate models.PeerUpdate
-		err := json.Unmarshal(msg.Payload(), &peerUpdate)
-		if err != nil {
-			ncutils.Log("error unmarshalling peer data")
-			return
-		}
-		var cfg config.ClientConfig
-		cfg.Network = peerUpdate.Network
-		cfg.ReadConfig()
-		peers, err := CalculatePeers(cfg.Node, peerUpdate.Nodes, cfg.Node.IsDualStack, cfg.Node.IsEgressGateway, cfg.Node.IsServer)
-		if err != nil {
-			ncutils.Log("error calculating Peers " + err.Error())
-			return
-		}
-		extpeers, err := CalculateExtPeers(cfg.Node, peerUpdate.ExtPeers)
-		if err != nil {
-			ncutils.Log("error updated external wireguard peers " + err.Error())
-		}
-		peers = append(peers, extpeers...)
-		err = wireguard.UpdateWgPeers(cfg.Node.Interface, peers)
-		if err != nil {
-			ncutils.Log("error updating wireguard peers" + err.Error())
-			return
-		}
-		// path hardcoded for now... should be updated
-		err = wireguard.ApplyWGQuickConf("/etc/netclient/config/" + cfg.Node.Interface + ".conf")
-		if err != nil {
-			ncutils.Log("error restarting wg after peer update " + err.Error())
-			return
-		}
-	}()
 }
 
-// UpdateKeys -- updates private key and returns new publickey
-func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) (*config.ClientConfig, error) {
-	ncutils.Log("received message to update keys")
-	//potentiall blocking i/o so do this in a go routine
-	key, err := wgtypes.GeneratePrivateKey()
-	if err != nil {
-		ncutils.Log("error generating privatekey " + err.Error())
-		return cfg, err
-	}
-	if err := wireguard.UpdatePrivateKey(cfg.Node.Interface, key.String()); err != nil {
-		ncutils.Log("error updating wireguard key " + err.Error())
-		return cfg, err
-	}
-	publicKey := key.PublicKey()
-	if token := client.Publish("update/publickey/"+cfg.Node.ID, 0, false, publicKey.String()); token.Wait() && token.Error() != nil {
-		ncutils.Log("error publishing publickey update " + token.Error().Error())
-		client.Disconnect(250)
-		return cfg, err
-	}
-	if err := config.ModConfig(&cfg.Node); err != nil {
-		ncutils.Log("error updating local config " + err.Error())
-	}
-	return cfg, nil
+var UpdateKeys mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
+	ncutils.Log("received message to update keys " + string(msg.Payload()))
 }
 
-// Checkin  -- go routine that checks for public or local ip changes, publishes changes
-//   if there are no updates, simply "pings" the server as a checkin
-func Checkin(ctx context.Context, cfg config.ClientConfig, network string) {
+func Checkin(client mqtt.Client, network string) {
+	var cfg config.ClientConfig
+	cfg.Network = network
+	cfg.ReadConfig()
 	for {
-		select {
-		case <-ctx.Done():
-			ncutils.Log("Checkin cancelled")
-			return
-			//delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ??
-		case <-time.After(time.Second * 10):
-			ncutils.Log("Checkin running")
-			//read latest config
-			cfg.ReadConfig()
-			//fix NodeID to remove ### so NodeID can be used as message topic
-			//remove with GRA-73
-			cfg.Node.ID = strings.Replace(cfg.Node.ID, "###", "-", 1)
-			if cfg.Node.Roaming == "yes" && cfg.Node.IsStatic != "yes" {
-				extIP, err := ncutils.GetPublicIP()
-				if err != nil {
-					ncutils.PrintLog("error encountered checking ip addresses: "+err.Error(), 1)
-				}
-				if cfg.Node.Endpoint != extIP && extIP != "" {
-					ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+extIP, 1)
-					UpdateEndpoint(cfg, network, extIP)
-				}
-				intIP, err := getPrivateAddr()
-				if err != nil {
-					ncutils.PrintLog("error encountered checking ip addresses: "+err.Error(), 1)
-				}
-				if cfg.Node.LocalAddress != intIP && intIP != "" {
-					ncutils.PrintLog("local Address has changed from "+cfg.Node.LocalAddress+" to "+intIP, 1)
-					UpdateLocalAddress(cfg, network, intIP)
-				}
-			} else {
-				localIP, err := ncutils.GetLocalIP(cfg.Node.LocalRange)
-				if err != nil {
-					ncutils.PrintLog("error encountered checking ip addresses: "+err.Error(), 1)
-				}
-				if cfg.Node.Endpoint != localIP && localIP != "" {
-					ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+localIP, 1)
-					UpdateEndpoint(cfg, network, localIP)
-				}
+		time.Sleep(time.Duration(cfg.Node.NetworkSettings.DefaultCheckInInterval) * time.Second)
+		ncutils.Log("Checkin running")
+		if cfg.Node.Roaming == "yes" && cfg.Node.IsStatic != "yes" {
+			extIP, err := ncutils.GetPublicIP()
+			if err != nil {
+				ncutils.PrintLog("error encountered checking ip addresses: "+err.Error(), 1)
+			}
+			if cfg.Node.Endpoint != extIP && extIP != "" {
+				ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+extIP, 1)
+				UpdateEndpoint(client, network, extIP)
+			}
+			intIP, err := getPrivateAddr()
+			if err != nil {
+				ncutils.PrintLog("error encountered checking ip addresses: "+err.Error(), 1)
+			}
+			if cfg.Node.LocalAddress != intIP && intIP != "" {
+				ncutils.PrintLog("local Address has changed from "+cfg.Node.LocalAddress+" to "+intIP, 1)
+				UpdateLocalAddress(client, network, intIP)
+			}
+		} else {
+			localIP, err := ncutils.GetLocalIP(cfg.Node.LocalRange)
+			if err != nil {
+				ncutils.PrintLog("error encountered checking ip addresses: "+err.Error(), 1)
+			}
+			if cfg.Node.Endpoint != localIP && localIP != "" {
+				ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+localIP, 1)
+				UpdateEndpoint(client, network, localIP)
 			}
-			Hello(cfg, network)
-			ncutils.Log("Checkin complete")
 		}
+		Ping(client, network)
 	}
 }
 
-// UpdateEndpoint -- publishes an endpoint update to broker
-func UpdateEndpoint(cfg config.ClientConfig, network, ip string) {
-	ncutils.Log("Updating endpoint")
-	client := SetupMQTT(cfg)
-	if token := client.Publish("update/ip/"+cfg.Node.ID, 0, false, ip); token.Wait() && token.Error() != nil {
-		ncutils.Log("error publishing endpoint update " + token.Error().Error())
-	}
-	cfg.Node.Endpoint = ip
-	if err := config.Write(&cfg, cfg.Network); err != nil {
-		ncutils.Log("error updating local config " + err.Error())
+func Ping(client mqtt.Client, network string) {
+	var cfg config.ClientConfig
+	cfg.Network = network
+	cfg.ReadConfig()
+	if token := client.Publish("ping/"+network+"/"+cfg.Node.ID, 0, false, []byte("ping")); token.Wait() && token.Error() != nil {
+		ncutils.Log("error publishing ping " + token.Error().Error())
 	}
-	client.Disconnect(250)
 }
 
-// UpdateLocalAddress -- publishes a local address update to broker
-func UpdateLocalAddress(cfg config.ClientConfig, network, ip string) {
-	ncutils.Log("Updating local address")
-	client := SetupMQTT(cfg)
-	if token := client.Publish("update/localaddress/"+cfg.Node.ID, 0, false, ip); token.Wait() && token.Error() != nil {
-		ncutils.Log("error publishing local address update " + token.Error().Error())
+func Metrics(client mqtt.Client, network string) {
+	if token := client.Connect(); token.Wait() && token.Error() != nil {
+		log.Fatal(token.Error())
 	}
-	cfg.Node.LocalAddress = ip
-	ncutils.Log("updating local address in local config to: " + cfg.Node.LocalAddress)
-	if err := config.Write(&cfg, cfg.Network); err != nil {
-		ncutils.Log("error updating local config " + err.Error())
+	var cfg config.ClientConfig
+	cfg.Network = network
+	cfg.ReadConfig()
+	for {
+		time.Sleep(time.Second * 60)
+		ncutils.Log("Metrics running")
+		wg, err := wgctrl.New()
+		if err != nil {
+			ncutils.Log("error getting devices " + err.Error())
+			break
+		}
+		device, err := wg.Device(cfg.Node.Interface)
+		if err != nil {
+			ncutils.Log("error readind wg device " + err.Error())
+			break
+		}
+		bytes, err := json.Marshal(device.Peers)
+		if err != nil {
+			ncutils.Log("error marshaling peers " + err.Error())
+			break
+		}
+		if token := client.Publish("metrics/"+network+"/"+cfg.Node.ID, 1, false, bytes); token.Wait() && token.Error() != nil {
+			ncutils.Log("error publishing metrics " + token.Error().Error())
+			break
+		}
+		wg.Close()
 	}
-	client.Disconnect(250)
 }
 
-// Hello -- ping the broker to let server know node is alive and doing fine
-func Hello(cfg config.ClientConfig, network string) {
-	client := SetupMQTT(cfg)
-	ncutils.Log("sending ping " + cfg.Node.ID)
-	if token := client.Publish("ping/"+cfg.Node.ID, 2, false, "hello world!"); token.Wait() && token.Error() != nil {
-		ncutils.Log("error publishing ping " + token.Error().Error())
-	}
-	client.Disconnect(250)
+type PingStat struct {
+	Name      string
+	Reachable bool
 }
 
-// Metics --  go routine that collects wireguard metrics and publishes to broker
-func Metrics(ctx context.Context, cfg config.ClientConfig, network string) {
+func Connectivity(client mqtt.Client, network string) {
+	if token := client.Connect(); token.Wait() && token.Error() != nil {
+		log.Fatal(token.Error())
+	}
+	var cfg config.ClientConfig
+	cfg.Network = network
+	cfg.ReadConfig()
 	for {
-		select {
-		case <-ctx.Done():
-			ncutils.Log("Metrics collection cancelled")
-			return
-			//delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ??
-		case <-time.After(time.Second * 60):
-			ncutils.Log("Metrics collection running")
-			ncutils.Log("Metrics running")
-			wg, err := wgctrl.New()
-			if err != nil {
-				ncutils.Log("error getting devices " + err.Error())
-				break
-			}
-			device, err := wg.Device(cfg.Node.Interface)
-			if err != nil {
-				ncutils.Log("error readind wg device " + err.Error())
-				break
-			}
-			bytes, err := json.Marshal(device.Peers)
+		time.Sleep(time.Duration(cfg.NetworkSettings.DefaultCheckInInterval) * time.Second)
+		ncutils.Log("Connectivity running")
+		var pingStats []PingStat
+		peers, err := ncutils.GetPeers(cfg.Node.Interface)
+		if err != nil {
+			ncutils.Log("error retriving peers " + err.Error())
+			break
+		}
+		for _, peer := range peers {
+			var pingStat PingStat
+			pingStat.Name = peer.PublicKey.String()
+			pingStat.Reachable = true
+			ip := peer.Endpoint.IP.String()
+			fmt.Println("----------", peer.Endpoint.IP, ip)
+			pinger, err := ping.NewPinger(ip)
 			if err != nil {
-				ncutils.Log("error marshaling peers " + err.Error())
+				ncutils.Log("error creating pinger " + err.Error())
 				break
 			}
-			client := SetupMQTT(cfg)
-			if token := client.Publish("metrics/"+cfg.Node.ID, 1, false, bytes); token.Wait() && token.Error() != nil {
-				ncutils.Log("error publishing metrics " + token.Error().Error())
+			pinger.Timeout = 2 * time.Second
+			pinger.Run()
+			stats := pinger.Statistics()
+			if stats.PacketLoss == 100 {
+				pingStat.Reachable = false
 			}
-			wg.Close()
-			client.Disconnect(250)
-			ncutils.Log("metrics collection complete")
+			pingStats = append(pingStats, pingStat)
+		}
+		bytes, err := json.Marshal(pingStats)
+		if err != nil {
+			ncutils.Log("error marshaling stats" + err.Error())
+			break
+		}
+		if token := client.Publish("connectivity/"+network+"/"+cfg.Node.ID, 1, false, bytes); token.Wait() && token.Error() != nil {
+			ncutils.Log("error publishing ping stats " + token.Error().Error())
+			break
 		}
 	}
 }
+
+func UpdateEndpoint(client mqtt.Client, network, ip string) {
+	ncutils.Log("Updating endpoint")
+}
+
+func UpdateLocalAddress(client mqtt.Client, network, ip string) {
+	ncutils.Log("Updating local address")
+}