Răsfoiți Sursa

NET-259: metrics refactor, removed unused peer update components (#2393)

* remove proxy fields

* remove proxy fields

* remove tests related to proxy port

* rm unused func

* remove client metric funcs from server

* add peer listen port to idaddr model

* new func to get node peers info

* rm unused fields

* rm unused peer update funcs

* func comment

* handle ext client metrics

* comment unused error
Abhishek K 2 ani în urmă
părinte
comite
d01d5fbaf8
12 a modificat fișierele cu 213 adăugiri și 457 ștergeri
  1. 6 7
      controllers/hosts.go
  2. 6 5
      controllers/node.go
  3. 0 6
      go.mod
  4. 0 8
      go.sum
  5. 0 90
      logic/metrics/metrics.go
  6. 166 172
      logic/peers.go
  7. 0 83
      metrics/metrics.go
  8. 15 15
      models/metrics.go
  9. 7 6
      models/mqtt.go
  10. 0 1
      models/structs.go
  11. 9 13
      mq/handlers.go
  12. 4 51
      mq/publishers.go

+ 6 - 7
controllers/hosts.go

@@ -111,13 +111,12 @@ func pull(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	peers := logic.GetPeerUpdate(host)
-
-	//hPU, err := logic.GetPeerUpdateForHost(context.Background(), "", host, nil, nil)
-	//if err != nil {
-	//logger.Log(0, "could not pull peers for host", hostID)
-	//logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
-	//return
-	//}
+	// hPU, err := logic.GetPeerUpdateForHost(host)
+	// if err != nil {
+	// 	logger.Log(0, "could not pull peers for host", hostID)
+	// 	logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
+	// 	return
+	// }
 	serverConf := servercfg.GetServerInfo()
 	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 		serverConf.MQUserName = hostID

+ 6 - 5
controllers/node.go

@@ -1,7 +1,6 @@
 package controller
 
 import (
-	"context"
 	"encoding/json"
 	"fmt"
 	"net/http"
@@ -388,7 +387,10 @@ func getNode(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
-	hostPeerUpdate, err := logic.GetPeerUpdateForHost(context.Background(), node.Network, host, nil, nil)
+	nodePeerUpdate, err := logic.NodePeersInfo(&models.Client{
+		Host: *host,
+		Node: node,
+	})
 	if err != nil && !database.IsEmptyRecord(err) {
 		logger.Log(0, r.Header.Get("user"),
 			fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err))
@@ -403,10 +405,9 @@ func getNode(w http.ResponseWriter, r *http.Request) {
 	response := models.NodeGet{
 		Node:         node,
 		Host:         *host,
-		HostPeers:    hostPeerUpdate.Peers,
-		Peers:        hostPeerUpdate.NodePeers,
+		Peers:        nodePeerUpdate.Peers,
 		ServerConfig: server,
-		PeerIDs:      hostPeerUpdate.PeerIDs,
+		PeerIDs:      nodePeerUpdate.PeerIDs,
 	}
 
 	if servercfg.Is_EE && nodeRequest {

+ 0 - 6
go.mod

@@ -20,7 +20,6 @@ require (
 	golang.org/x/oauth2 v0.8.0
 	golang.org/x/sys v0.8.0 // indirect
 	golang.org/x/text v0.9.0 // indirect
-	golang.zx2c4.com/wireguard v0.0.0-20220920152132-bb719d3a6e2c // indirect
 	golang.zx2c4.com/wireguard/wgctrl v0.0.0-20220324164955-056925b7df31
 	google.golang.org/protobuf v1.28.1 // indirect
 	gopkg.in/yaml.v3 v3.0.1
@@ -64,14 +63,9 @@ require (
 	github.com/go-playground/locales v0.14.1 // indirect
 	github.com/go-playground/universal-translator v0.18.1 // indirect
 	github.com/golang/protobuf v1.5.2 // indirect
-	github.com/google/go-cmp v0.5.9 // indirect
 	github.com/hashicorp/go-version v1.6.0
-	github.com/josharian/native v1.0.0 // indirect
 	github.com/leodido/go-urn v1.2.4 // indirect
 	github.com/mattn/go-runewidth v0.0.13 // indirect
-	github.com/mdlayher/genetlink v1.2.0 // indirect
-	github.com/mdlayher/netlink v1.6.0 // indirect
-	github.com/mdlayher/socket v0.1.1 // indirect
 	github.com/pmezard/go-difflib v1.0.0 // indirect
 	github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect
 	golang.org/x/sync v0.1.0 // indirect

+ 0 - 8
go.sum

@@ -43,7 +43,6 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
 github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
 github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
-github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
 github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
 github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH4=
@@ -59,7 +58,6 @@ github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mO
 github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
 github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
-github.com/josharian/native v1.0.0 h1:Ts/E8zCSEsG17dUqv7joXJFybuMLjQfWE04tsBODTxk=
 github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
 github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
 github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
@@ -73,13 +71,9 @@ github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4
 github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
 github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
 github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
-github.com/mdlayher/genetlink v1.2.0 h1:4yrIkRV5Wfk1WfpWTcoOlGmsWgQj3OtQN9ZsbrE+XtU=
 github.com/mdlayher/genetlink v1.2.0/go.mod h1:ra5LDov2KrUCZJiAtEvXXZBxGMInICMXIwshlJ+qRxQ=
-github.com/mdlayher/netlink v1.6.0 h1:rOHX5yl7qnlpiVkFWoqccueppMtXzeziFjWAjLg6sz0=
 github.com/mdlayher/netlink v1.6.0/go.mod h1:0o3PlBmGst1xve7wQ7j/hwpNaFaH4qCRyWCdcZk8/vA=
-github.com/mdlayher/socket v0.1.1 h1:q3uOGirUPfAV2MUoaC7BavjQ154J7+JOkTWyiV+intI=
 github.com/mdlayher/socket v0.1.1/go.mod h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5Awbj+qDs=
-github.com/mikioh/ipaddr v0.0.0-20190404000644-d465c8ab6721 h1:RlZweED6sbSArvlE924+mUcZuXKLBHA35U7LN621Bws=
 github.com/mikioh/ipaddr v0.0.0-20190404000644-d465c8ab6721/go.mod h1:Ickgr2WtCLZ2MDGd4Gr0geeCH5HybhRJbonOgQpvSxc=
 github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
 github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
@@ -169,8 +163,6 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
 golang.zx2c4.com/go118/netip v0.0.0-20211111135330-a4a02eeacf9d/go.mod h1:5yyfuiqVIJ7t+3MqrpTQ+QqRkMWiESiyDvPNvKYCecg=
 golang.zx2c4.com/wintun v0.0.0-20211104114900-415007cec224/go.mod h1:deeaetjYA+DHMHg+sMSMI58GrEteJUUzzw7en6TJQcI=
 golang.zx2c4.com/wireguard v0.0.0-20220202223031-3b95c81cc178/go.mod h1:TjUWrnD5ATh7bFvmm/ALEJZQ4ivKbETb6pmyj1vUoNI=
-golang.zx2c4.com/wireguard v0.0.0-20220920152132-bb719d3a6e2c h1:Okh6a1xpnJslG9Mn84pId1Mn+Q8cvpo4HCeeFWHo0cA=
-golang.zx2c4.com/wireguard v0.0.0-20220920152132-bb719d3a6e2c/go.mod h1:enML0deDxY1ux+B6ANGiwtg0yAJi1rctkTpcHNAVPyg=
 golang.zx2c4.com/wireguard/wgctrl v0.0.0-20220324164955-056925b7df31 h1:AgW3hljgTzuRbCB0j+q9tXT0uy6ij7vMjEzSCeMlQY0=
 golang.zx2c4.com/wireguard/wgctrl v0.0.0-20220324164955-056925b7df31/go.mod h1:8P32Ilp1kCpwB4ItaHyvSk4xAtnpQ+8gQVfg5WaO1TU=
 google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=

+ 0 - 90
logic/metrics/metrics.go

@@ -1,90 +0,0 @@
-package metrics
-
-import (
-	"time"
-
-	"github.com/gravitl/netmaker/logger"
-	proxy_metrics "github.com/gravitl/netmaker/metrics"
-	"github.com/gravitl/netmaker/models"
-	"golang.zx2c4.com/wireguard/wgctrl"
-)
-
-// Collect - collects metrics
-func Collect(iface, server, network string, peerMap models.PeerMap, proxy bool) (*models.Metrics, error) {
-	var metrics models.Metrics
-	metrics.Connectivity = make(map[string]models.Metric)
-	var wgclient, err = wgctrl.New()
-	if err != nil {
-		fillUnconnectedData(&metrics, peerMap)
-		return &metrics, err
-	}
-	defer wgclient.Close()
-	device, err := wgclient.Device(iface)
-	if err != nil {
-		fillUnconnectedData(&metrics, peerMap)
-		return &metrics, err
-	}
-	// TODO handle freebsd??
-	for i := range device.Peers {
-		currPeer := device.Peers[i]
-		if _, ok := peerMap[currPeer.PublicKey.String()]; !ok {
-			continue
-		}
-		id := peerMap[currPeer.PublicKey.String()].ID
-		address := peerMap[currPeer.PublicKey.String()].Address
-		if id == "" || address == "" {
-			logger.Log(0, "attempted to parse metrics for invalid peer from server", id, address)
-			continue
-		}
-		proxyMetrics := proxy_metrics.GetMetric(server, currPeer.PublicKey.String())
-		var newMetric = models.Metric{
-			NodeName: peerMap[currPeer.PublicKey.String()].Name,
-		}
-		logger.Log(2, "collecting metrics for peer", address)
-		newMetric.TotalReceived = int64(proxyMetrics.TrafficRecieved)
-		newMetric.TotalSent = int64(proxyMetrics.TrafficSent)
-		newMetric.Latency = int64(proxyMetrics.LastRecordedLatency)
-		newMetric.Connected = proxyMetrics.NodeConnectionStatus[id]
-		newMetric.CollectedByProxy = proxy
-		if newMetric.Connected {
-			newMetric.Uptime = 1
-		}
-		// check device peer to see if WG is working if ping failed
-		if !newMetric.Connected {
-			if currPeer.ReceiveBytes > 0 &&
-				currPeer.TransmitBytes > 0 &&
-				time.Now().Before(currPeer.LastHandshakeTime.Add(time.Minute<<1)) {
-				newMetric.Connected = true
-				newMetric.Uptime = 1
-			}
-		}
-		newMetric.TotalTime = 1
-		metrics.Connectivity[id] = newMetric
-		if len(proxyMetrics.NodeConnectionStatus) == 1 {
-			proxy_metrics.ResetMetricsForPeer(server, currPeer.PublicKey.String())
-		} else {
-			proxy_metrics.ResetMetricForNode(server, currPeer.PublicKey.String(), id)
-		}
-	}
-
-	fillUnconnectedData(&metrics, peerMap)
-	return &metrics, nil
-}
-
-// == used to fill zero value data for non connected peers ==
-func fillUnconnectedData(metrics *models.Metrics, peerMap models.PeerMap) {
-	for r := range peerMap {
-		id := peerMap[r].ID
-		if !metrics.Connectivity[id].Connected {
-			newMetric := models.Metric{
-				NodeName:  peerMap[r].Name,
-				Uptime:    0,
-				TotalTime: 1,
-				Connected: false,
-				Latency:   999,
-				PercentUp: 0,
-			}
-			metrics.Connectivity[id] = newMetric
-		}
-	}
-}

+ 166 - 172
logic/peers.go

@@ -1,9 +1,7 @@
 package logic
 
 import (
-	"context"
 	"errors"
-	"fmt"
 	"net"
 	"net/netip"
 
@@ -16,24 +14,100 @@ import (
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
-var (
-	// PeerUpdateCtx context to send to host peer updates
-	PeerUpdateCtx context.Context
-	// PeerUpdateStop - the cancel for PeerUpdateCtx
-	PeerUpdateStop context.CancelFunc
-)
-
-// ResetPeerUpdateContext - kills any current peer updates and resets the context
-func ResetPeerUpdateContext() {
-	if PeerUpdateCtx != nil && PeerUpdateStop != nil {
-		PeerUpdateStop() // tell any current peer updates to stop
+// NodePeersInfo - fetches node's peers with their ids and addrs.
+func NodePeersInfo(client *models.Client) (models.NodePeersInfo, error) {
+	nodePeersInfo := models.NodePeersInfo{
+		PeerIDs: make(models.PeerMap),
+		Peers:   []wgtypes.PeerConfig{},
+	}
+	nodes, err := GetNetworkNodes(client.Node.Network)
+	if err != nil {
+		return models.NodePeersInfo{}, err
 	}
 
-	PeerUpdateCtx, PeerUpdateStop = context.WithCancel(context.Background())
+	for _, peer := range nodes {
+		if peer.ID == client.Node.ID {
+			continue
+		}
+		if peer.Action == models.NODE_DELETE || peer.PendingDelete || !peer.Connected ||
+			!nodeacls.AreNodesAllowed(nodeacls.NetworkID(peer.Network), nodeacls.NodeID(client.Node.ID.String()), nodeacls.NodeID(peer.ID.String())) {
+			continue
+		}
+		peerHost, err := GetHost(peer.HostID.String())
+		if err != nil {
+			continue
+		}
+		var peerConfig wgtypes.PeerConfig
+		peerConfig.PublicKey = peerHost.PublicKey
+		peerConfig.PersistentKeepaliveInterval = &peer.PersistentKeepalive
+		peerConfig.ReplaceAllowedIPs = true
+		uselocal := false
+		if client.Host.EndpointIP.String() == peerHost.EndpointIP.String() {
+			// peer is on same network
+			// set to localaddress
+			uselocal = true
+			if client.Node.LocalAddress.IP == nil {
+				// use public endpint
+				uselocal = false
+			}
+			if client.Node.LocalAddress.String() == peer.LocalAddress.String() {
+				uselocal = false
+			}
+		}
+		peerConfig.Endpoint = &net.UDPAddr{
+			IP:   peerHost.EndpointIP,
+			Port: getPeerWgListenPort(peerHost),
+		}
+
+		if uselocal {
+			peerConfig.Endpoint.IP = peer.LocalAddress.IP
+			peerConfig.Endpoint.Port = peerHost.ListenPort
+		}
+		allowedips := GetAllowedIPs(&client.Node, &peer, nil)
+		if peer.IsIngressGateway {
+			for _, entry := range peer.IngressGatewayRange {
+				_, cidr, err := net.ParseCIDR(string(entry))
+				if err == nil {
+					allowedips = append(allowedips, *cidr)
+				}
+			}
+		}
+		if peer.IsEgressGateway {
+
+			allowedips = append(allowedips, getEgressIPs(
+				&models.Client{
+					Host: *peerHost,
+					Node: peer,
+				})...)
+
+		}
+		peerConfig.AllowedIPs = allowedips
+		nodePeersInfo.Peers = append(nodePeersInfo.Peers, peerConfig)
+		nodePeersInfo.PeerIDs[peerHost.PublicKey.String()] = models.IDandAddr{
+			ID:         peer.ID.String(),
+			Address:    peer.Address.IP.String(),
+			Name:       peerHost.Name,
+			Network:    peer.Network,
+			ListenPort: GetPeerListenPort(peerHost),
+		}
+	}
+	if client.Node.IsIngressGateway {
+		extPeers, extPeerIDAndAddrs, err := GetExtPeers(&client.Node)
+		if err == nil {
+			nodePeersInfo.Peers = append(nodePeersInfo.Peers, extPeers...)
+			for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
+				extPeerIdAndAddr := extPeerIdAndAddr
+				nodePeersInfo.PeerIDs[extPeerIdAndAddr.ID] = extPeerIdAndAddr
+			}
+		} else if !database.IsEmptyRecord(err) {
+			logger.Log(1, "error retrieving external clients:", err.Error())
+		}
+	}
+	return nodePeersInfo, nil
 }
 
 // GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks
-func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host, deletedNode *models.Node, deletedClients []models.ExtClient) (models.HostPeerUpdate, error) {
+func GetPeerUpdateForHost(host *models.Host) (models.HostPeerUpdate, error) {
 	if host == nil {
 		return models.HostPeerUpdate{}, errors.New("host is nil")
 	}
@@ -46,12 +120,8 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
 	hostPeerUpdate := models.HostPeerUpdate{
 		Host:            *host,
 		Server:          servercfg.GetServer(),
-		HostPeerIDs:     make(models.HostPeerMap, 0),
 		ServerVersion:   servercfg.GetVersion(),
-		ServerAddrs:     []models.ServerAddr{},
-		PeerIDs:         make(models.PeerMap, 0),
 		Peers:           []wgtypes.PeerConfig{},
-		NodePeers:       []wgtypes.PeerConfig{},
 		HostNetworkInfo: models.HostInfoMap{},
 	}
 
@@ -68,176 +138,99 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
 		}
 		currentPeers := GetNetworkNodesMemory(allNodes, node.Network)
 		for _, peer := range currentPeers {
-			select {
-			case <-ctx.Done():
-				logger.Log(2, "cancelled peer update for host", host.Name, host.ID.String())
-				return models.HostPeerUpdate{}, fmt.Errorf("peer update cancelled")
-			default:
-				peer := peer
-				if peer.ID.String() == node.ID.String() {
-					logger.Log(2, "peer update, skipping self")
-					//skip yourself
-					continue
-				}
-				if peer.IsRelayed {
-					// skip relayed peers; will be included in relay peer
-					continue
-				}
-				var peerConfig wgtypes.PeerConfig
-				peerHost, err := GetHost(peer.HostID.String())
-				if err != nil {
-					logger.Log(1, "no peer host", peer.HostID.String(), err.Error())
-					return models.HostPeerUpdate{}, err
-				}
+			peer := peer
+			if peer.ID.String() == node.ID.String() {
+				logger.Log(2, "peer update, skipping self")
+				//skip yourself
+				continue
+			}
+			if peer.IsRelayed {
+				// skip relayed peers; will be included in relay peer
+				continue
+			}
+			var peerConfig wgtypes.PeerConfig
+			peerHost, err := GetHost(peer.HostID.String())
+			if err != nil {
+				logger.Log(1, "no peer host", peer.HostID.String(), err.Error())
+				return models.HostPeerUpdate{}, err
+			}
 
-				peerConfig.PublicKey = peerHost.PublicKey
-				peerConfig.PersistentKeepaliveInterval = &peer.PersistentKeepalive
-				peerConfig.ReplaceAllowedIPs = true
-				uselocal := false
-				if host.EndpointIP.String() == peerHost.EndpointIP.String() {
-					// peer is on same network
-					// set to localaddress
-					uselocal = true
-					if node.LocalAddress.IP == nil {
-						// use public endpint
-						uselocal = false
-					}
-					if node.LocalAddress.String() == peer.LocalAddress.String() {
-						uselocal = false
-					}
+			peerConfig.PublicKey = peerHost.PublicKey
+			peerConfig.PersistentKeepaliveInterval = &peer.PersistentKeepalive
+			peerConfig.ReplaceAllowedIPs = true
+			uselocal := false
+			if host.EndpointIP.String() == peerHost.EndpointIP.String() {
+				// peer is on same network
+				// set to localaddress
+				uselocal = true
+				if node.LocalAddress.IP == nil {
+					// use public endpint
+					uselocal = false
 				}
-				peerConfig.Endpoint = &net.UDPAddr{
-					IP:   peerHost.EndpointIP,
-					Port: getPeerWgListenPort(peerHost),
+				if node.LocalAddress.String() == peer.LocalAddress.String() {
+					uselocal = false
 				}
+			}
+			peerConfig.Endpoint = &net.UDPAddr{
+				IP:   peerHost.EndpointIP,
+				Port: getPeerWgListenPort(peerHost),
+			}
 
-				if uselocal {
-					peerConfig.Endpoint.IP = peer.LocalAddress.IP
-					peerConfig.Endpoint.Port = peerHost.ListenPort
-				}
-				allowedips := GetAllowedIPs(&node, &peer, nil)
-				if peer.IsIngressGateway {
-					for _, entry := range peer.IngressGatewayRange {
-						_, cidr, err := net.ParseCIDR(string(entry))
-						if err == nil {
-							allowedips = append(allowedips, *cidr)
-						}
-					}
-				}
-				if peer.IsEgressGateway {
-					host, err := GetHost(peer.HostID.String())
+			if uselocal {
+				peerConfig.Endpoint.IP = peer.LocalAddress.IP
+				peerConfig.Endpoint.Port = peerHost.ListenPort
+			}
+			allowedips := GetAllowedIPs(&node, &peer, nil)
+			if peer.IsIngressGateway {
+				for _, entry := range peer.IngressGatewayRange {
+					_, cidr, err := net.ParseCIDR(string(entry))
 					if err == nil {
-						allowedips = append(allowedips, getEgressIPs(
-							&models.Client{
-								Host: *host,
-								Node: peer,
-							})...)
+						allowedips = append(allowedips, *cidr)
 					}
 				}
-				if peer.Action != models.NODE_DELETE &&
-					!peer.PendingDelete &&
-					peer.Connected &&
-					nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) &&
-					(deletedNode == nil || (deletedNode != nil && peer.ID.String() != deletedNode.ID.String())) {
-					peerConfig.AllowedIPs = allowedips // only append allowed IPs if valid connection
+			}
+			if peer.IsEgressGateway {
+				host, err := GetHost(peer.HostID.String())
+				if err == nil {
+					allowedips = append(allowedips, getEgressIPs(
+						&models.Client{
+							Host: *host,
+							Node: peer,
+						})...)
 				}
-				var nodePeer wgtypes.PeerConfig
-				if _, ok := hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()]; !ok {
-					hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()] = make(map[string]models.IDandAddr)
-					hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
-					peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
-					hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{
-						ID:      peer.ID.String(),
-						Address: peer.PrimaryAddress(),
-						Name:    peerHost.Name,
-						Network: peer.Network,
-					}
-					hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
-						Interfaces: peerHost.Interfaces,
-					}
-					nodePeer = peerConfig
-				} else {
-					peerAllowedIPs := hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs
-					peerAllowedIPs = append(peerAllowedIPs, allowedips...)
-					hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs = peerAllowedIPs
-					hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{
-						ID:      peer.ID.String(),
-						Address: peer.PrimaryAddress(),
-						Name:    peerHost.Name,
-						Network: peer.Network,
-					}
-					hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
-						Interfaces: peerHost.Interfaces,
-					}
-					nodePeer = hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]]
+			}
+			if peer.Action != models.NODE_DELETE &&
+				!peer.PendingDelete &&
+				peer.Connected &&
+				nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) {
+				peerConfig.AllowedIPs = allowedips // only append allowed IPs if valid connection
+			}
+			if _, ok := peerIndexMap[peerHost.PublicKey.String()]; !ok {
+				hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
+				peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
+				hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
+					Interfaces: peerHost.Interfaces,
 				}
-
-				if node.Network == network { // add to peers map for metrics
-					hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()] = models.IDandAddr{
-						ID:      peer.ID.String(),
-						Address: peer.PrimaryAddress(),
-						Name:    peerHost.Name,
-						Network: peer.Network,
-					}
-					hostPeerUpdate.NodePeers = append(hostPeerUpdate.NodePeers, nodePeer)
+			} else {
+				peerAllowedIPs := hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs
+				peerAllowedIPs = append(peerAllowedIPs, allowedips...)
+				hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs = peerAllowedIPs
+				hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
+					Interfaces: peerHost.Interfaces,
 				}
 			}
+
 		}
-		var extPeers []wgtypes.PeerConfig
-		var extPeerIDAndAddrs []models.IDandAddr
+
 		if node.IsIngressGateway {
-			extPeers, extPeerIDAndAddrs, err = GetExtPeers(&node)
+			extPeers, _, err := GetExtPeers(&node)
 			if err == nil {
 				hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, extPeers...)
-				for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
-					extPeerIdAndAddr := extPeerIdAndAddr
-					hostPeerUpdate.HostPeerIDs[extPeerIdAndAddr.ID] = make(map[string]models.IDandAddr)
-					hostPeerUpdate.HostPeerIDs[extPeerIdAndAddr.ID][extPeerIdAndAddr.ID] = models.IDandAddr{
-						ID:      extPeerIdAndAddr.ID,
-						Address: extPeerIdAndAddr.Address,
-						Name:    extPeerIdAndAddr.Name,
-						Network: node.Network,
-					}
-					if node.Network == network {
-						hostPeerUpdate.PeerIDs[extPeerIdAndAddr.ID] = extPeerIdAndAddr
-						hostPeerUpdate.NodePeers = append(hostPeerUpdate.NodePeers, extPeers...)
-					}
-				}
 			} else if !database.IsEmptyRecord(err) {
 				logger.Log(1, "error retrieving external clients:", err.Error())
 			}
 		}
 	}
-	// == post peer calculations ==
-	// indicate removal if no allowed IPs were calculated
-	for i := range hostPeerUpdate.Peers {
-		peer := hostPeerUpdate.Peers[i]
-		if len(peer.AllowedIPs) == 0 {
-			peer.Remove = true
-		}
-		hostPeerUpdate.Peers[i] = peer
-	}
-
-	for i := range hostPeerUpdate.NodePeers {
-		peer := hostPeerUpdate.NodePeers[i]
-		if len(peer.AllowedIPs) == 0 {
-			peer.Remove = true
-		}
-		hostPeerUpdate.NodePeers[i] = peer
-	}
-
-	if len(deletedClients) > 0 {
-		for i := range deletedClients {
-			deletedClient := deletedClients[i]
-			key, err := wgtypes.ParseKey(deletedClient.PublicKey)
-			if err == nil {
-				hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, wgtypes.PeerConfig{
-					PublicKey: key,
-					Remove:    true,
-				})
-			}
-		}
-	}
 
 	return hostPeerUpdate, nil
 }
@@ -444,9 +437,10 @@ func GetExtPeers(node *models.Node) ([]wgtypes.PeerConfig, []models.IDandAddr, e
 		}
 		peers = append(peers, peer)
 		idsAndAddr = append(idsAndAddr, models.IDandAddr{
-			ID:      peer.PublicKey.String(),
-			Name:    extPeer.ClientID,
-			Address: primaryAddr,
+			ID:          peer.PublicKey.String(),
+			Name:        extPeer.ClientID,
+			Address:     primaryAddr,
+			IsExtclient: true,
 		})
 	}
 	return peers, idsAndAddr, nil

+ 0 - 83
metrics/metrics.go

@@ -1,83 +0,0 @@
-package metrics
-
-import (
-	"sync"
-	"time"
-
-	"github.com/gravitl/netmaker/models"
-)
-
-// lock for metrics map
-var metricsMapLock = &sync.RWMutex{}
-
-// metrics data map
-var metricsPeerMap = make(map[string]map[string]*models.ProxyMetric)
-
-// GetMetricByServer - get metric data of peers by server
-func GetMetricByServer(server string) map[string]*models.ProxyMetric {
-	metricsMapLock.RLock()
-	defer metricsMapLock.RUnlock()
-	if _, ok := metricsPeerMap[server]; !ok {
-		return nil
-	}
-	return metricsPeerMap[server]
-}
-
-// GetMetric - fetches the metric data for the peer
-func GetMetric(server, peerKey string) models.ProxyMetric {
-	metric := models.ProxyMetric{}
-	peerMetricMap := GetMetricByServer(server)
-	metricsMapLock.RLock()
-	defer metricsMapLock.RUnlock()
-	if peerMetricMap == nil {
-		return metric
-	}
-	if m, ok := peerMetricMap[peerKey]; ok && m != nil {
-		metric = *m
-	}
-	return metric
-}
-
-// UpdateMetric - updates metric data for the peer
-func UpdateMetric(server, peerKey string, metric *models.ProxyMetric) {
-	metricsMapLock.Lock()
-	defer metricsMapLock.Unlock()
-	if metricsPeerMap[server] == nil {
-		metricsPeerMap[server] = make(map[string]*models.ProxyMetric)
-	}
-	metricsPeerMap[server][peerKey] = metric
-}
-
-// UpdateMetricByPeer - updates metrics data by peer public key
-func UpdateMetricByPeer(peerKey string, metric *models.ProxyMetric, onlyTraffic bool) {
-	metricsMapLock.Lock()
-	defer metricsMapLock.Unlock()
-	for server, peerKeyMap := range metricsPeerMap {
-		if peerMetric, ok := peerKeyMap[peerKey]; ok {
-			peerMetric.TrafficRecieved += metric.TrafficRecieved
-			peerMetric.TrafficSent += metric.TrafficSent
-			if !onlyTraffic {
-				peerMetric.LastRecordedLatency = metric.LastRecordedLatency
-			}
-
-			metricsPeerMap[server][peerKey] = peerMetric
-		}
-	}
-}
-
-// ResetMetricsForPeer - reset metrics for peer
-func ResetMetricsForPeer(server, peerKey string) {
-	metricsMapLock.Lock()
-	defer metricsMapLock.Unlock()
-	delete(metricsPeerMap[server], peerKey)
-}
-
-// ResetMetricForNode - resets node level metrics
-func ResetMetricForNode(server, peerKey, peerID string) {
-	metric := GetMetric(server, peerKey)
-	delete(metric.NodeConnectionStatus, peerID)
-	UpdateMetric(server, peerKey, &metric)
-}
-
-// MetricCollectionInterval - collection interval for metrics
-const MetricCollectionInterval = time.Second * 25

+ 15 - 15
models/metrics.go

@@ -15,25 +15,25 @@ type Metrics struct {
 
 // Metric - holds a metric for data between nodes
 type Metric struct {
-	NodeName         string        `json:"node_name" bson:"node_name" yaml:"node_name"`
-	Uptime           int64         `json:"uptime" bson:"uptime" yaml:"uptime"`
-	TotalTime        int64         `json:"totaltime" bson:"totaltime" yaml:"totaltime"`
-	Latency          int64         `json:"latency" bson:"latency" yaml:"latency"`
-	TotalReceived    int64         `json:"totalreceived" bson:"totalreceived" yaml:"totalreceived"`
-	TotalSent        int64         `json:"totalsent" bson:"totalsent" yaml:"totalsent"`
-	ActualUptime     time.Duration `json:"actualuptime" bson:"actualuptime" yaml:"actualuptime"`
-	PercentUp        float64       `json:"percentup" bson:"percentup" yaml:"percentup"`
-	Connected        bool          `json:"connected" bson:"connected" yaml:"connected"`
-	CollectedByProxy bool          `json:"collected_by_proxy" bson:"collected_by_proxy" yaml:"collected_by_proxy"`
+	NodeName      string        `json:"node_name" bson:"node_name" yaml:"node_name"`
+	Uptime        int64         `json:"uptime" bson:"uptime" yaml:"uptime"`
+	TotalTime     int64         `json:"totaltime" bson:"totaltime" yaml:"totaltime"`
+	Latency       int64         `json:"latency" bson:"latency" yaml:"latency"`
+	TotalReceived int64         `json:"totalreceived" bson:"totalreceived" yaml:"totalreceived"`
+	TotalSent     int64         `json:"totalsent" bson:"totalsent" yaml:"totalsent"`
+	ActualUptime  time.Duration `json:"actualuptime" bson:"actualuptime" yaml:"actualuptime"`
+	PercentUp     float64       `json:"percentup" bson:"percentup" yaml:"percentup"`
+	Connected     bool          `json:"connected" bson:"connected" yaml:"connected"`
 }
 
 // IDandAddr - struct to hold ID and primary Address
 type IDandAddr struct {
-	ID       string `json:"id" bson:"id" yaml:"id"`
-	Address  string `json:"address" bson:"address" yaml:"address"`
-	Name     string `json:"name" bson:"name" yaml:"name"`
-	IsServer string `json:"isserver" bson:"isserver" yaml:"isserver" validate:"checkyesorno"`
-	Network  string `json:"network" bson:"network" yaml:"network" validate:"network"`
+	ID          string `json:"id" bson:"id" yaml:"id"`
+	Address     string `json:"address" bson:"address" yaml:"address"`
+	Name        string `json:"name" bson:"name" yaml:"name"`
+	Network     string `json:"network" bson:"network" yaml:"network" validate:"network"`
+	ListenPort  int    `json:"listen_port" yaml:"listen_port"`
+	IsExtclient bool   `json:"is_ext_client" yaml:"is_ext_client"`
 }
 
 // HostInfoMap - map of host public keys to host networking info

+ 7 - 6
models/mqtt.go

@@ -6,17 +6,18 @@ import (
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
+type NodePeersInfo struct {
+	Peers   []wgtypes.PeerConfig `json:"peers" yaml:"peers"`
+	PeerIDs PeerMap              `json:"peerids" yaml:"peerids"`
+}
+
 // HostPeerUpdate - struct for host peer updates
 type HostPeerUpdate struct {
 	Host            Host                 `json:"host" bson:"host" yaml:"host"`
 	Server          string               `json:"server" bson:"server" yaml:"server"`
 	ServerVersion   string               `json:"serverversion" bson:"serverversion" yaml:"serverversion"`
-	ServerAddrs     []ServerAddr         `json:"serveraddrs" bson:"serveraddrs" yaml:"serveraddrs"`
-	NodePeers       []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"`
-	Peers           []wgtypes.PeerConfig
-	HostPeerIDs     HostPeerMap `json:"hostpeerids" bson:"hostpeerids" yaml:"hostpeerids"`
-	PeerIDs         PeerMap     `json:"peerids" bson:"peerids" yaml:"peerids"`
-	HostNetworkInfo HostInfoMap `json:"host_network_info,omitempty" bson:"host_network_info,omitempty" yaml:"host_network_info,omitempty"`
+	Peers           []wgtypes.PeerConfig `json:"peers" yaml:"peers"`
+	HostNetworkInfo HostInfoMap          `json:"host_network_info,omitempty" bson:"host_network_info,omitempty" yaml:"host_network_info,omitempty"`
 }
 
 // IngressInfo - struct for ingress info

+ 0 - 1
models/structs.go

@@ -213,7 +213,6 @@ type NodeGet struct {
 	Node         Node                 `json:"node" bson:"node" yaml:"node"`
 	Host         Host                 `json:"host" yaml:"host"`
 	Peers        []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"`
-	HostPeers    []wgtypes.PeerConfig `json:"host_peers" bson:"host_peers" yaml:"host_peers"`
 	ServerConfig ServerConfig         `json:"serverconfig" bson:"serverconfig" yaml:"serverconfig"`
 	PeerIDs      PeerMap              `json:"peerids,omitempty" bson:"peerids,omitempty" yaml:"peerids,omitempty"`
 }

+ 9 - 13
mq/handlers.go

@@ -1,7 +1,6 @@
 package mq
 
 import (
-	"context"
 	"encoding/json"
 	"fmt"
 	"math"
@@ -249,7 +248,7 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 			slog.Info("updating peers after node detected connectivity issues", "id", currentNode.ID, "network", currentNode.Network)
 			host, err := logic.GetHost(currentNode.HostID.String())
 			if err == nil {
-				if err = PublishSingleHostPeerUpdate(context.Background(), host, nil, nil); err != nil {
+				if err = PublishSingleHostPeerUpdate(host); err != nil {
 					slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
 				}
 			}
@@ -331,21 +330,18 @@ func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) boo
 		oldMetric := oldMetrics.Connectivity[k]
 		currMetric.TotalTime += oldMetric.TotalTime
 		currMetric.Uptime += oldMetric.Uptime // get the total uptime for this connection
-		if currMetric.CollectedByProxy {
+
+		if currMetric.TotalReceived < oldMetric.TotalReceived {
 			currMetric.TotalReceived += oldMetric.TotalReceived
+		} else {
+			currMetric.TotalReceived += int64(math.Abs(float64(currMetric.TotalReceived) - float64(oldMetric.TotalReceived)))
+		}
+		if currMetric.TotalSent < oldMetric.TotalSent {
 			currMetric.TotalSent += oldMetric.TotalSent
 		} else {
-			if currMetric.TotalReceived < oldMetric.TotalReceived {
-				currMetric.TotalReceived += oldMetric.TotalReceived
-			} else {
-				currMetric.TotalReceived += int64(math.Abs(float64(currMetric.TotalReceived) - float64(oldMetric.TotalReceived)))
-			}
-			if currMetric.TotalSent < oldMetric.TotalSent {
-				currMetric.TotalSent += oldMetric.TotalSent
-			} else {
-				currMetric.TotalSent += int64(math.Abs(float64(currMetric.TotalSent) - float64(oldMetric.TotalSent)))
-			}
+			currMetric.TotalSent += int64(math.Abs(float64(currMetric.TotalSent) - float64(oldMetric.TotalSent)))
 		}
+
 		if currMetric.Uptime == 0 || currMetric.TotalTime == 0 {
 			currMetric.PercentUp = 0
 		} else {

+ 4 - 51
mq/publishers.go

@@ -1,7 +1,6 @@
 package mq
 
 import (
-	"context"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -27,54 +26,9 @@ func PublishPeerUpdate() error {
 		logger.Log(1, "err getting all hosts", err.Error())
 		return err
 	}
-	logic.ResetPeerUpdateContext()
 	for _, host := range hosts {
 		host := host
-		if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nil, nil); err != nil {
-			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-		}
-	}
-	return err
-}
-
-// PublishDeletedNodePeerUpdate --- determines and publishes a peer update
-// to all the hosts with a deleted node to account for
-func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
-
-	hosts, err := logic.GetAllHosts()
-	if err != nil {
-		logger.Log(1, "err getting all hosts", err.Error())
-		return err
-	}
-	logic.ResetPeerUpdateContext()
-	for _, host := range hosts {
-		host := host
-		if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, delNode, nil); err != nil {
-			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-		}
-	}
-	return err
-}
-
-// PublishDeletedClientPeerUpdate --- determines and publishes a peer update
-// to all the hosts with a deleted ext client to account for
-func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
-
-	hosts, err := logic.GetAllHosts()
-	if err != nil {
-		logger.Log(1, "err getting all hosts", err.Error())
-		return err
-	}
-	logic.ResetPeerUpdateContext()
-	for _, host := range hosts {
-		host := host
-		if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nil, []models.ExtClient{*delClient}); err != nil {
+		if err = PublishSingleHostPeerUpdate(&host); err != nil {
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 		}
 	}
@@ -82,9 +36,9 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
 }
 
 // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
-func PublishSingleHostPeerUpdate(ctx context.Context, host *models.Host, deletedNode *models.Node, deletedClients []models.ExtClient) error {
+func PublishSingleHostPeerUpdate(host *models.Host) error {
 
-	peerUpdate, err := logic.GetPeerUpdateForHost(ctx, "", host, deletedNode, deletedClients)
+	peerUpdate, err := logic.GetPeerUpdateForHost(host)
 	if err != nil {
 		return err
 	}
@@ -686,11 +640,10 @@ func sendPeers() {
 		//collectServerMetrics(networks[:])
 	}
 	if force {
-		logic.ResetPeerUpdateContext()
 		for _, host := range hosts {
 			host := host
 			logger.Log(2, "sending scheduled peer update (5 min)")
-			if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nil, nil); err != nil {
+			if err = PublishSingleHostPeerUpdate(&host); err != nil {
 				logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
 			}
 		}