Browse Source

collect golden signals

Abhishek Kondur 2 years ago
parent
commit
fbd404d868

+ 7 - 7
nm-proxy/manager/manager.go

@@ -242,7 +242,7 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
 
 				// cleanup proxy connections for the peer
 				currentPeer.StopConn()
-				delete(wgProxyConf.PeerMap, currentPeer.Key)
+				delete(wgProxyConf.PeerMap, currentPeer.Key.String())
 				// update the peer with actual endpoint
 				if err := wgIface.Update(m.Payload.Peers[i], false); err != nil {
 					log.Println("falied to update peer: ", err)
@@ -252,13 +252,13 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
 
 			}
 			// check if peer is not connected to proxy
-			devPeer, err := wg.GetPeer(m.Payload.InterfaceName, currentPeer.Key)
+			devPeer, err := wg.GetPeer(m.Payload.InterfaceName, currentPeer.Key.String())
 			if err == nil {
 				log.Printf("---------> COMAPRING ENDPOINT: DEV: %s, Proxy: %s", devPeer.Endpoint.String(), currentPeer.LocalConn.LocalAddr().String())
 				if devPeer.Endpoint.String() != currentPeer.LocalConn.LocalAddr().String() {
 					log.Println("---------> endpoint is not set to proxy: ", currentPeer.Key)
 					currentPeer.StopConn()
-					delete(wgProxyConf.PeerMap, currentPeer.Key)
+					delete(wgProxyConf.PeerMap, currentPeer.Key.String())
 					continue
 				}
 			}
@@ -266,7 +266,7 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
 			if currentPeer.IsRelayed != m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].IsRelayed {
 				log.Println("---------> peer relay status has been changed: ", currentPeer.Key)
 				currentPeer.StopConn()
-				delete(wgProxyConf.PeerMap, currentPeer.Key)
+				delete(wgProxyConf.PeerMap, currentPeer.Key.String())
 				continue
 			}
 			// check if relay endpoint has been changed
@@ -275,14 +275,14 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
 				currentPeer.RelayedEndpoint.String() != m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].RelayedTo.String() {
 				log.Println("---------> peer relay endpoint has been changed: ", currentPeer.Key)
 				currentPeer.StopConn()
-				delete(wgProxyConf.PeerMap, currentPeer.Key)
+				delete(wgProxyConf.PeerMap, currentPeer.Key.String())
 				continue
 			}
 			if !reflect.DeepEqual(m.Payload.Peers[i], *currentPeer.PeerConf) {
 				if currentPeer.RemoteConn.IP.String() != m.Payload.Peers[i].Endpoint.IP.String() {
 					log.Println("----------> Resetting proxy for Peer: ", currentPeer.Key, m.Payload.InterfaceName)
 					currentPeer.StopConn()
-					delete(wgProxyConf.PeerMap, currentPeer.Key)
+					delete(wgProxyConf.PeerMap, currentPeer.Key.String())
 
 				} else {
 
@@ -296,7 +296,7 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
 						log.Println("failed to update peer: ", currentPeer.Key, err)
 					}
 					currentPeer.PeerConf = &m.Payload.Peers[i]
-					wgProxyConf.PeerMap[currentPeer.Key] = currentPeer
+					wgProxyConf.PeerMap[currentPeer.Key.String()] = currentPeer
 					// delete the peer from the list
 					log.Println("-----------> deleting peer from list: ", m.Payload.Peers[i].PublicKey)
 					m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)

+ 15 - 0
nm-proxy/metrics/metrics.go

@@ -1,6 +1,21 @@
 package metrics
 
+import (
+	"sync"
+)
+
 /*
 1. Create metrics packet--> packet with identifier to track latency, errors.
 
 */
+
+type Metric struct {
+	LastRecordedLatency int64
+	ConnectionStatus    bool
+	TrafficSent         uint64
+	TrafficRecieved     uint64
+}
+
+var MetricsMapLock *sync.RWMutex
+
+var MetricsMap = make(map[string]Metric)

+ 1 - 1
nm-proxy/models/peer.go

@@ -16,7 +16,7 @@ const (
 type ConnConfig struct {
 
 	// Key is a public key of a remote peer
-	Key                 string
+	Key                 wgtypes.Key
 	IsExtClient         bool
 	IsRelayed           bool
 	RelayedEndpoint     *net.UDPAddr

+ 0 - 6
nm-proxy/nm-proxy.go

@@ -12,12 +12,6 @@ import (
 	"github.com/gravitl/netmaker/nm-proxy/stun"
 )
 
-// Comm Channel to configure proxy
-/* Actions -
-   1. Add - new interface and its peers
-   2. Delete - remove close all conns for the interface,cleanup
-
-*/
 func Start(ctx context.Context, mgmChan chan *manager.ManagerAction, apiServerAddr string) {
 	log.Println("Starting Proxy...")
 	common.IsHostNetwork = (os.Getenv("HOST_NETWORK") == "" || os.Getenv("HOST_NETWORK") == "on")

+ 19 - 3
nm-proxy/packet/packet.go

@@ -14,7 +14,7 @@ import (
 	"github.com/gravitl/netmaker/nm-proxy/common"
 	"golang.org/x/crypto/blake2s"
 	"golang.org/x/crypto/chacha20poly1305"
-	"golang.zx2c4.com/wireguard/tai64n"
+	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
 func ConsumeHandshakeInitiationMsg(initiator bool, buf []byte, src *net.UDPAddr, devicePubKey NoisePublicKey, devicePrivKey NoisePrivateKey) error {
@@ -64,12 +64,12 @@ func ConsumeHandshakeInitiationMsg(initiator bool, buf []byte, src *net.UDPAddr,
 	return nil
 }
 
-func CreateMetricPacket(id uint64, sender, reciever NoisePublicKey) ([]byte, error) {
+func CreateMetricPacket(id uint32, sender, reciever wgtypes.Key) ([]byte, error) {
 	msg := MetricMessage{
 		ID:        id,
 		Sender:    sender,
 		Reciever:  reciever,
-		TimeStamp: tai64n.Now(),
+		TimeStamp: time.Now().UnixMilli(),
 	}
 	var buff [MessageMetricSize]byte
 	writer := bytes.NewBuffer(buff[:0])
@@ -81,6 +81,22 @@ func CreateMetricPacket(id uint64, sender, reciever NoisePublicKey) ([]byte, err
 	return packet, nil
 }
 
+func ConsumeMetricPacket(buf []byte) (*MetricMessage, error) {
+	var msg MetricMessage
+	var err error
+	reader := bytes.NewReader(buf[:])
+	err = binary.Read(reader, binary.LittleEndian, &msg)
+	if err != nil {
+		log.Println("Failed to decode metric message")
+		return nil, err
+	}
+
+	if msg.Type != MessageMetricsType {
+		return nil, errors.New("not  metric message")
+	}
+	return &msg, nil
+}
+
 func ProcessPacketBeforeSending(buf []byte, n int, srckey, dstKey string) ([]byte, int, string, string) {
 
 	srcKeymd5 := md5.Sum([]byte(srckey))

+ 5 - 4
nm-proxy/packet/packet_helper.go

@@ -5,6 +5,7 @@ import (
 	"golang.org/x/crypto/chacha20poly1305"
 	"golang.org/x/crypto/poly1305"
 	"golang.zx2c4.com/wireguard/tai64n"
+	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
 var (
@@ -30,8 +31,8 @@ type MessageInitiation struct {
 
 type MetricMessage struct {
 	Type      uint32
-	ID        uint64
-	Sender    NoisePublicKey
-	Reciever  NoisePublicKey
-	TimeStamp tai64n.Timestamp
+	ID        uint32
+	Sender    wgtypes.Key
+	Reciever  wgtypes.Key
+	TimeStamp int64
 }

+ 2 - 2
nm-proxy/peer/peer.go

@@ -18,7 +18,7 @@ func AddNewPeer(wgInterface *wg.WGIface, peer *wgtypes.PeerConfig, peerAddr stri
 
 	c := proxy.Config{
 		Port:        peer.Endpoint.Port,
-		LocalKey:    wgInterface.Device.PublicKey.String(),
+		LocalKey:    wgInterface.Device.PublicKey,
 		RemoteKey:   peer.PublicKey.String(),
 		WgInterface: wgInterface,
 		IsExtClient: isExtClient,
@@ -56,7 +56,7 @@ func AddNewPeer(wgInterface *wg.WGIface, peer *wgtypes.PeerConfig, peerAddr stri
 	// }
 
 	connConf := models.ConnConfig{
-		Key:                 peer.PublicKey.String(),
+		Key:                 peer.PublicKey,
 		IsRelayed:           isRelayed,
 		RelayedEndpoint:     relayTo,
 		IsAttachedExtClient: isAttachedExtClient,

+ 1 - 1
nm-proxy/proxy/proxy.go

@@ -21,7 +21,7 @@ type Config struct {
 	BodySize    int
 	Addr        string
 	RemoteKey   string
-	LocalKey    string
+	LocalKey    wgtypes.Key
 	WgInterface *wg.WGIface
 	IsExtClient bool
 	PeerConf    *wgtypes.PeerConfig

+ 25 - 32
nm-proxy/proxy/wireguard.go

@@ -8,9 +8,12 @@ import (
 	"net"
 	"runtime"
 	"strings"
+	"time"
 
 	"github.com/c-robinson/iplib"
+	"github.com/google/uuid"
 	"github.com/gravitl/netmaker/nm-proxy/common"
+	"github.com/gravitl/netmaker/nm-proxy/metrics"
 	"github.com/gravitl/netmaker/nm-proxy/models"
 	"github.com/gravitl/netmaker/nm-proxy/packet"
 	"github.com/gravitl/netmaker/nm-proxy/server"
@@ -53,6 +56,20 @@ func (p *Proxy) ProxyToRemote() {
 			}
 
 			return
+		case <-time.After(time.Minute):
+			metrics.MetricsMapLock.Lock()
+			metric := metrics.MetricsMap[p.Config.PeerConf.PublicKey.String()]
+			metric.ConnectionStatus = false
+			metrics.MetricsMap[p.Config.PeerConf.PublicKey.String()] = metric
+			metrics.MetricsMapLock.Unlock()
+			pkt, err := packet.CreateMetricPacket(uuid.New().ID(), p.Config.LocalKey, p.Config.PeerConf.PublicKey)
+			if err == nil {
+				_, err = server.NmProxyServer.Server.WriteToUDP(pkt, p.RemoteConn)
+				if err != nil {
+					log.Println("Failed to send to metric pkt: ", err)
+				}
+
+			}
 		default:
 
 			n, err := p.LocalConn.Read(buf)
@@ -61,43 +78,21 @@ func (p *Proxy) ProxyToRemote() {
 				continue
 			}
 
-			//go func(buf []byte, n int) {
 			ifaceConf := common.WgIFaceMap[p.Config.WgInterface.Name]
 			if peerI, ok := ifaceConf.PeerMap[p.Config.RemoteKey]; ok {
+
+				metrics.MetricsMapLock.Lock()
+				metric := metrics.MetricsMap[peerI.Key.String()]
+				metric.TrafficSent += uint64(n)
+				metrics.MetricsMap[peerI.Key.String()] = metric
+				metrics.MetricsMapLock.Unlock()
+
 				var srcPeerKeyHash, dstPeerKeyHash string
 				if !p.Config.IsExtClient {
-					buf, n, srcPeerKeyHash, dstPeerKeyHash = packet.ProcessPacketBeforeSending(buf, n, ifaceConf.Iface.PublicKey.String(), peerI.Key)
+					buf, n, srcPeerKeyHash, dstPeerKeyHash = packet.ProcessPacketBeforeSending(buf, n, ifaceConf.Iface.PublicKey.String(), peerI.Key.String())
 					if err != nil {
 						log.Println("failed to process pkt before sending: ", err)
 					}
-				} else {
-					// unknown peer to proxy -> check if extclient and handle it
-					// consume handshake message for ext clients
-					// msgType := binary.LittleEndian.Uint32(buf[:n])
-					// switch msgType {
-					// case models.MessageInitiationType:
-
-					// 	devPriv, devPubkey, err := packet.GetDeviceKeys(common.InterfaceName)
-					// 	if err == nil {
-					// 		err := packet.ConsumeHandshakeInitiationMsg(true, buf[:n], p.RemoteConn, devPubkey, devPriv)
-					// 		if err != nil {
-					// 			log.Println("---------> @@@ failed to decode HS: ", err)
-					// 		}
-					// 	} else {
-					// 		log.Println("failed to get device keys: ", err)
-					// 	}
-					// case models.MessageResponseType:
-					// 	devPriv, devPubkey, err := packet.GetDeviceKeys(common.InterfaceName)
-					// 	if err == nil {
-					// 		err := packet.ConsumeMessageResponse(true, buf[:n], p.RemoteConn, devPubkey, devPriv)
-					// 		if err != nil {
-					// 			log.Println("---------> @@@ failed to decode HS: ", err)
-					// 		}
-					// 	} else {
-					// 		log.Println("failed to get device keys: ", err)
-					// 	}
-
-					// }
 				}
 
 				log.Printf("PROXING TO REMOTE!!!---> %s >>>>> %s >>>>> %s [[ SrcPeerHash: %s, DstPeerHash: %s ]]\n",
@@ -107,13 +102,11 @@ func (p *Proxy) ProxyToRemote() {
 				p.Cancel()
 				return
 			}
-			//test(n, buf)
 
 			_, err = server.NmProxyServer.Server.WriteToUDP(buf[:n], p.RemoteConn)
 			if err != nil {
 				log.Println("Failed to send to remote: ", err)
 			}
-			//}(buf, n)
 
 		}
 	}

+ 27 - 0
nm-proxy/server/server.go

@@ -9,6 +9,7 @@ import (
 	"time"
 
 	"github.com/gravitl/netmaker/nm-proxy/common"
+	"github.com/gravitl/netmaker/nm-proxy/metrics"
 	"github.com/gravitl/netmaker/nm-proxy/models"
 	"github.com/gravitl/netmaker/nm-proxy/packet"
 )
@@ -120,6 +121,12 @@ func (p *ProxyServer) Listen(ctx context.Context) {
 			if peerInfo, ok := common.PeerKeyHashMap[srcPeerKeyHash]; ok {
 				if ifaceConf, ok := common.WgIFaceMap[peerInfo.Interface]; ok {
 					if peerI, ok := ifaceConf.PeerMap[peerInfo.PeerKey]; ok {
+						metrics.MetricsMapLock.Lock()
+						metric := metrics.MetricsMap[peerInfo.PeerKey]
+						metric.TrafficRecieved += uint64(n)
+						metric.ConnectionStatus = true
+						metrics.MetricsMap[peerInfo.PeerKey] = metric
+						metrics.MetricsMapLock.Unlock()
 						log.Printf("PROXING TO LOCAL!!!---> %s <<<< %s <<<<<<<< %s   [[ RECV PKT [SRCKEYHASH: %s], [DSTKEYHASH: %s], SourceIP: [%s] ]]\n",
 							peerI.LocalConn.RemoteAddr(), peerI.LocalConn.LocalAddr(),
 							fmt.Sprintf("%s:%d", source.IP.String(), source.Port), srcPeerKeyHash, dstPeerKeyHash, source.IP.String())
@@ -137,6 +144,12 @@ func (p *ProxyServer) Listen(ctx context.Context) {
 			if peerInfo, ok := common.ExtSourceIpMap[source.String()]; ok {
 				if ifaceConf, ok := common.WgIFaceMap[peerInfo.Interface]; ok {
 					if peerI, ok := ifaceConf.PeerMap[peerInfo.PeerKey]; ok {
+						metrics.MetricsMapLock.Lock()
+						metric := metrics.MetricsMap[peerInfo.PeerKey]
+						metric.TrafficRecieved += uint64(n)
+						metric.ConnectionStatus = true
+						metrics.MetricsMap[peerInfo.PeerKey] = metric
+						metrics.MetricsMapLock.Unlock()
 						log.Printf("PROXING TO LOCAL!!!---> %s <<<< %s <<<<<<<< %s   [[ RECV PKT [SRCKEYHASH: %s], [DSTKEYHASH: %s], SourceIP: [%s] ]]\n",
 							peerI.LocalConn.RemoteAddr(), peerI.LocalConn.LocalAddr(),
 							fmt.Sprintf("%s:%d", source.IP.String(), source.Port), srcPeerKeyHash, dstPeerKeyHash, source.IP.String())
@@ -154,6 +167,20 @@ func (p *ProxyServer) Listen(ctx context.Context) {
 			// consume handshake message for ext clients
 			msgType := binary.LittleEndian.Uint32(buffer[:4])
 			switch msgType {
+			case packet.MessageMetricsType:
+				metricMsg, err := packet.ConsumeMetricPacket(buffer[:origBufferLen])
+				// calc latency
+				if err == nil {
+					latency := time.Now().UnixMilli() - metricMsg.TimeStamp
+					metrics.MetricsMapLock.Lock()
+					metric := metrics.MetricsMap[metricMsg.Reciever.PublicKey().String()]
+					metric.LastRecordedLatency = latency
+					metric.ConnectionStatus = true
+					metric.TrafficRecieved += uint64(origBufferLen)
+					metrics.MetricsMap[metricMsg.Reciever.PublicKey().String()] = metric
+					metrics.MetricsMapLock.Unlock()
+				}
+
 			case packet.MessageInitiationType:
 
 				devPriv, devPubkey, err := packet.GetDeviceKeys(common.InterfaceName)