| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 | package serverimport (	"context"	"encoding/binary"	"fmt"	"log"	"net"	"time"	"github.com/gravitl/netmaker/nm-proxy/common"	"github.com/gravitl/netmaker/nm-proxy/metrics"	"github.com/gravitl/netmaker/nm-proxy/models"	"github.com/gravitl/netmaker/nm-proxy/packet")var (	NmProxyServer = &ProxyServer{})const (	defaultBodySize = 10000	defaultPort     = models.NmProxyPort)type Config struct {	Port     int	BodySize int	IsRelay  bool	Addr     net.Addr}type ProxyServer struct {	Config Config	Server *net.UDPConn}func (p *ProxyServer) Close() {	log.Println("--------->### Shutting down Proxy.....")	// clean up proxy connections	for _, peerI := range common.WgIfaceMap.PeerMap {		peerI.Mutex.Lock()		peerI.StopConn()		peerI.Mutex.Unlock()	}	// close server connection	NmProxyServer.Server.Close()}// Proxy.Listen - begins listening for packetsfunc (p *ProxyServer) Listen(ctx context.Context) {	// Buffer with indicated body size	buffer := make([]byte, 65036)	for {		select {		case <-ctx.Done():			p.Close()			return		default:			// Read Packet			n, source, err := p.Server.ReadFromUDP(buffer)			if err != nil || source == nil { // in future log errors?				log.Println("RECV ERROR: ", err)				continue			}			//go func(buffer []byte, source *net.UDPAddr, n int) {			proxyTransportMsg := true			var srcPeerKeyHash, dstPeerKeyHash string			n, srcPeerKeyHash, dstPeerKeyHash, err = packet.ExtractInfo(buffer, n)			if err != nil {				log.Println("proxy transport message not found: ", err)				proxyTransportMsg = false			}			if proxyTransportMsg {				p.proxyIncomingPacket(buffer[:], source, n, srcPeerKeyHash, dstPeerKeyHash)				continue			} else {				// unknown peer to proxy -> check if extclient and handle it				if handleExtClients(buffer[:], n, source) {					continue				}			}			handleMsgs(buffer, n, source)		}	}}func handleMsgs(buffer []byte, n int, source *net.UDPAddr) {	msgType := binary.LittleEndian.Uint32(buffer[:4])	switch packet.MessageType(msgType) {	case packet.MessageMetricsType:		metricMsg, err := packet.ConsumeMetricPacket(buffer[:n])		// calc latency		if err == nil {			log.Printf("------->$$$$$ Recieved Metric Pkt: %+v, FROM:%s\n", metricMsg, source.String())			if metricMsg.Sender == common.WgIfaceMap.Iface.PublicKey {				latency := time.Now().UnixMilli() - metricMsg.TimeStamp				metrics.MetricsMapLock.Lock()				metric := metrics.MetricsMap[metricMsg.Reciever.String()]				metric.LastRecordedLatency = uint64(latency)				metric.ConnectionStatus = true				metric.TrafficRecieved += float64(n) / (1 << 20)				metrics.MetricsMap[metricMsg.Reciever.String()] = metric				metrics.MetricsMapLock.Unlock()			} else if metricMsg.Reciever == common.WgIfaceMap.Iface.PublicKey {				// proxy it back to the sender				log.Println("------------> $$$ SENDING back the metric pkt to the source: ", source.String())				_, err = NmProxyServer.Server.WriteToUDP(buffer[:n], source)				if err != nil {					log.Println("Failed to send metric packet to remote: ", err)				}				metrics.MetricsMapLock.Lock()				metric := metrics.MetricsMap[metricMsg.Sender.String()]				metric.ConnectionStatus = true				metric.TrafficRecieved += float64(n) / (1 << 20)				metrics.MetricsMap[metricMsg.Sender.String()] = metric				metrics.MetricsMapLock.Unlock()			}		}	case packet.MessageProxyUpdateType:		msg, err := packet.ConsumeProxyUpdateMsg(buffer[:n])		if err == nil {			switch msg.Action {			case packet.UpdateListenPort:				if peer, ok := common.WgIfaceMap.PeerMap[msg.Sender.String()]; ok {					peer.Mutex.Lock()					if peer.Config.PeerEndpoint.Port != int(msg.ListenPort) {						// update peer conn						peer.Config.PeerEndpoint.Port = int(msg.ListenPort)						common.WgIfaceMap.PeerMap[msg.Sender.String()] = peer						log.Println("--------> Resetting Proxy Conn For Peer ", msg.Sender.String())						peer.Mutex.Unlock()						peer.ResetConn()						return					}					peer.Mutex.Unlock()				}			}		}	// consume handshake message for ext clients	case packet.MessageInitiationType:		err := packet.ConsumeHandshakeInitiationMsg(false, buffer[:n], source,			packet.NoisePublicKey(common.WgIfaceMap.Iface.PublicKey), packet.NoisePrivateKey(common.WgIfaceMap.Iface.PrivateKey))		if err != nil {			log.Println("---------> @@@ failed to decode HS: ", err)		}	}}func handleExtClients(buffer []byte, n int, source *net.UDPAddr) bool {	isExtClient := false	if peerInfo, ok := common.ExtSourceIpMap[source.String()]; ok {		if peerI, ok := common.WgIfaceMap.PeerMap[peerInfo.PeerKey]; ok {			peerI.Mutex.RLock()			peerI.Config.RecieverChan <- buffer[:n]			metrics.MetricsMapLock.Lock()			metric := metrics.MetricsMap[peerInfo.PeerKey]			metric.TrafficRecieved += float64(n) / (1 << 20)			metric.ConnectionStatus = true			metrics.MetricsMap[peerInfo.PeerKey] = metric			metrics.MetricsMapLock.Unlock()			peerI.Mutex.RUnlock()			isExtClient = true		}	}	return isExtClient}func (p *ProxyServer) proxyIncomingPacket(buffer []byte, source *net.UDPAddr, n int, srcPeerKeyHash, dstPeerKeyHash string) {	var err error	//log.Printf("--------> RECV PKT , [SRCKEYHASH: %s], SourceIP: [%s] \n", srcPeerKeyHash, source.IP.String())	if common.WgIfaceMap.IfaceKeyHash != dstPeerKeyHash && common.IsRelay {		log.Println("----------> Relaying######")		// check for routing map and forward to right proxy		if remoteMap, ok := common.RelayPeerMap[srcPeerKeyHash]; ok {			if conf, ok := remoteMap[dstPeerKeyHash]; ok {				log.Printf("--------> Relaying PKT [ SourceIP: %s:%d ], [ SourceKeyHash: %s ], [ DstIP: %s:%d ], [ DstHashKey: %s ] \n",					source.IP.String(), source.Port, srcPeerKeyHash, conf.Endpoint.String(), conf.Endpoint.Port, dstPeerKeyHash)				_, err = p.Server.WriteToUDP(buffer[:n+packet.MessageProxySize], conf.Endpoint)				if err != nil {					log.Println("Failed to send to remote: ", err)				}				return			}		} else {			if remoteMap, ok := common.RelayPeerMap[dstPeerKeyHash]; ok {				if conf, ok := remoteMap[dstPeerKeyHash]; ok {					log.Printf("--------> Relaying BACK TO RELAYED NODE PKT [ SourceIP: %s ], [ SourceKeyHash: %s ], [ DstIP: %s ], [ DstHashKey: %s ] \n",						source.String(), srcPeerKeyHash, conf.Endpoint.String(), dstPeerKeyHash)					_, err = p.Server.WriteToUDP(buffer[:n+packet.MessageProxySize], conf.Endpoint)					if err != nil {						log.Println("Failed to send to remote: ", err)					}					return				}			}		}	}	if peerInfo, ok := common.PeerKeyHashMap[srcPeerKeyHash]; ok {		log.Printf("PROXING TO LOCAL!!!---> %s <<<< %s <<<<<<<< %s   [[ RECV PKT [SRCKEYHASH: %s], [DSTKEYHASH: %s], SourceIP: [%s] ]]\n",			peerInfo.LocalConn.RemoteAddr(), peerInfo.LocalConn.LocalAddr(),			fmt.Sprintf("%s:%d", source.IP.String(), source.Port), srcPeerKeyHash, dstPeerKeyHash, source.IP.String())		_, err = peerInfo.LocalConn.Write(buffer[:n])		if err != nil {			log.Println("Failed to proxy to Wg local interface: ", err)			//continue		}		go func(n int, peerKey string) {			metrics.MetricsMapLock.Lock()			metric := metrics.MetricsMap[peerKey]			metric.TrafficRecieved += float64(n) / (1 << 20)			metric.ConnectionStatus = true			metrics.MetricsMap[peerKey] = metric			metrics.MetricsMapLock.Unlock()		}(n, peerInfo.PeerKey)		return	}}// Create - creats a proxy listener// port - port for proxy to listen on localhost// bodySize - default 10000, leave 0 to use default// addr - the address for proxy to listen on// forwards - indicate address to forward to, {"<address:port>",...} formatfunc (p *ProxyServer) CreateProxyServer(port, bodySize int, addr string) (err error) {	if p == nil {		p = &ProxyServer{}	}	p.Config.Port = port	p.Config.BodySize = bodySize	p.setDefaults()	p.Server, err = net.ListenUDP("udp", &net.UDPAddr{		Port: p.Config.Port,		IP:   net.ParseIP(addr),	})	return}func (p *ProxyServer) KeepAlive(ip string, port int) {	for {		_, _ = p.Server.WriteToUDP([]byte("hello-proxy"), &net.UDPAddr{			IP:   net.ParseIP(ip),			Port: port,		})		//log.Println("Sending MSg: ", ip, port, err)		time.Sleep(time.Second * 5)	}}// Proxy.setDefaults - sets all defaults of proxy listenerfunc (p *ProxyServer) setDefaults() {	p.setDefaultBodySize()	p.setDefaultPort()}// Proxy.setDefaultPort - sets default port of Proxy listener if 0func (p *ProxyServer) setDefaultPort() {	if p.Config.Port == 0 {		p.Config.Port = defaultPort	}}// Proxy.setDefaultBodySize - sets default body size of Proxy listener if 0func (p *ProxyServer) setDefaultBodySize() {	if p.Config.BodySize == 0 {		p.Config.BodySize = defaultBodySize	}}
 |