123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- package server
- import (
- "context"
- "encoding/binary"
- "fmt"
- "log"
- "net"
- "time"
- "github.com/gravitl/netmaker/nm-proxy/common"
- "github.com/gravitl/netmaker/nm-proxy/metrics"
- "github.com/gravitl/netmaker/nm-proxy/models"
- "github.com/gravitl/netmaker/nm-proxy/packet"
- )
- var (
- NmProxyServer = &ProxyServer{}
- )
- const (
- defaultBodySize = 10000
- defaultPort = models.NmProxyPort
- )
- type Config struct {
- Port int
- BodySize int
- IsRelay bool
- Addr net.Addr
- }
- type ProxyServer struct {
- Config Config
- Server *net.UDPConn
- }
- func (p *ProxyServer) Close() {
- log.Println("--------->### Shutting down Proxy.....")
- // clean up proxy connections
- for _, peerI := range common.WgIfaceMap.PeerMap {
- peerI.Mutex.Lock()
- peerI.StopConn()
- peerI.Mutex.Unlock()
- }
- // close server connection
- NmProxyServer.Server.Close()
- }
- // Proxy.Listen - begins listening for packets
- func (p *ProxyServer) Listen(ctx context.Context) {
- // Buffer with indicated body size
- buffer := make([]byte, 65036)
- for {
- select {
- case <-ctx.Done():
- p.Close()
- return
- default:
- // Read Packet
- n, source, err := p.Server.ReadFromUDP(buffer)
- if err != nil || source == nil { // in future log errors?
- log.Println("RECV ERROR: ", err)
- continue
- }
- //go func(buffer []byte, source *net.UDPAddr, n int) {
- proxyTransportMsg := true
- var srcPeerKeyHash, dstPeerKeyHash string
- n, srcPeerKeyHash, dstPeerKeyHash, err = packet.ExtractInfo(buffer, n)
- if err != nil {
- log.Println("proxy transport message not found: ", err)
- proxyTransportMsg = false
- }
- if proxyTransportMsg {
- p.proxyIncomingPacket(buffer[:], source, n, srcPeerKeyHash, dstPeerKeyHash)
- continue
- } else {
- // unknown peer to proxy -> check if extclient and handle it
- if handleExtClients(buffer[:], n, source) {
- continue
- }
- }
- handleMsgs(buffer, n, source)
- }
- }
- }
- func handleMsgs(buffer []byte, n int, source *net.UDPAddr) {
- msgType := binary.LittleEndian.Uint32(buffer[:4])
- switch packet.MessageType(msgType) {
- case packet.MessageMetricsType:
- metricMsg, err := packet.ConsumeMetricPacket(buffer[:n])
- // calc latency
- if err == nil {
- log.Printf("------->$$$$$ Recieved Metric Pkt: %+v, FROM:%s\n", metricMsg, source.String())
- if metricMsg.Sender == common.WgIfaceMap.Iface.PublicKey {
- latency := time.Now().UnixMilli() - metricMsg.TimeStamp
- metrics.MetricsMapLock.Lock()
- metric := metrics.MetricsMap[metricMsg.Reciever.String()]
- metric.LastRecordedLatency = uint64(latency)
- metric.ConnectionStatus = true
- metric.TrafficRecieved += float64(n) / (1 << 20)
- metrics.MetricsMap[metricMsg.Reciever.String()] = metric
- metrics.MetricsMapLock.Unlock()
- } else if metricMsg.Reciever == common.WgIfaceMap.Iface.PublicKey {
- // proxy it back to the sender
- log.Println("------------> $$$ SENDING back the metric pkt to the source: ", source.String())
- _, err = NmProxyServer.Server.WriteToUDP(buffer[:n], source)
- if err != nil {
- log.Println("Failed to send metric packet to remote: ", err)
- }
- metrics.MetricsMapLock.Lock()
- metric := metrics.MetricsMap[metricMsg.Sender.String()]
- metric.ConnectionStatus = true
- metric.TrafficRecieved += float64(n) / (1 << 20)
- metrics.MetricsMap[metricMsg.Sender.String()] = metric
- metrics.MetricsMapLock.Unlock()
- }
- }
- case packet.MessageProxyUpdateType:
- msg, err := packet.ConsumeProxyUpdateMsg(buffer[:n])
- if err == nil {
- switch msg.Action {
- case packet.UpdateListenPort:
- if peer, ok := common.WgIfaceMap.PeerMap[msg.Sender.String()]; ok {
- peer.Mutex.Lock()
- if peer.Config.PeerEndpoint.Port != int(msg.ListenPort) {
- // update peer conn
- peer.Config.PeerEndpoint.Port = int(msg.ListenPort)
- common.WgIfaceMap.PeerMap[msg.Sender.String()] = peer
- log.Println("--------> Resetting Proxy Conn For Peer ", msg.Sender.String())
- peer.Mutex.Unlock()
- peer.ResetConn()
- return
- }
- peer.Mutex.Unlock()
- }
- }
- }
- // consume handshake message for ext clients
- case packet.MessageInitiationType:
- err := packet.ConsumeHandshakeInitiationMsg(false, buffer[:n], source,
- packet.NoisePublicKey(common.WgIfaceMap.Iface.PublicKey), packet.NoisePrivateKey(common.WgIfaceMap.Iface.PrivateKey))
- if err != nil {
- log.Println("---------> @@@ failed to decode HS: ", err)
- }
- }
- }
- func handleExtClients(buffer []byte, n int, source *net.UDPAddr) bool {
- isExtClient := false
- if peerInfo, ok := common.ExtSourceIpMap[source.String()]; ok {
- if peerI, ok := common.WgIfaceMap.PeerMap[peerInfo.PeerKey]; ok {
- peerI.Mutex.RLock()
- peerI.Config.RecieverChan <- buffer[:n]
- metrics.MetricsMapLock.Lock()
- metric := metrics.MetricsMap[peerInfo.PeerKey]
- metric.TrafficRecieved += float64(n) / (1 << 20)
- metric.ConnectionStatus = true
- metrics.MetricsMap[peerInfo.PeerKey] = metric
- metrics.MetricsMapLock.Unlock()
- peerI.Mutex.RUnlock()
- isExtClient = true
- }
- }
- return isExtClient
- }
- func (p *ProxyServer) proxyIncomingPacket(buffer []byte, source *net.UDPAddr, n int, srcPeerKeyHash, dstPeerKeyHash string) {
- var err error
- //log.Printf("--------> RECV PKT , [SRCKEYHASH: %s], SourceIP: [%s] \n", srcPeerKeyHash, source.IP.String())
- if common.WgIfaceMap.IfaceKeyHash != dstPeerKeyHash && common.IsRelay {
- log.Println("----------> Relaying######")
- // check for routing map and forward to right proxy
- if remoteMap, ok := common.RelayPeerMap[srcPeerKeyHash]; ok {
- if conf, ok := remoteMap[dstPeerKeyHash]; ok {
- log.Printf("--------> Relaying PKT [ SourceIP: %s:%d ], [ SourceKeyHash: %s ], [ DstIP: %s:%d ], [ DstHashKey: %s ] \n",
- source.IP.String(), source.Port, srcPeerKeyHash, conf.Endpoint.String(), conf.Endpoint.Port, dstPeerKeyHash)
- _, err = p.Server.WriteToUDP(buffer[:n+packet.MessageProxySize], conf.Endpoint)
- if err != nil {
- log.Println("Failed to send to remote: ", err)
- }
- return
- }
- } else {
- if remoteMap, ok := common.RelayPeerMap[dstPeerKeyHash]; ok {
- if conf, ok := remoteMap[dstPeerKeyHash]; ok {
- log.Printf("--------> Relaying BACK TO RELAYED NODE PKT [ SourceIP: %s ], [ SourceKeyHash: %s ], [ DstIP: %s ], [ DstHashKey: %s ] \n",
- source.String(), srcPeerKeyHash, conf.Endpoint.String(), dstPeerKeyHash)
- _, err = p.Server.WriteToUDP(buffer[:n+packet.MessageProxySize], conf.Endpoint)
- if err != nil {
- log.Println("Failed to send to remote: ", err)
- }
- return
- }
- }
- }
- }
- if peerInfo, ok := common.PeerKeyHashMap[srcPeerKeyHash]; ok {
- log.Printf("PROXING TO LOCAL!!!---> %s <<<< %s <<<<<<<< %s [[ RECV PKT [SRCKEYHASH: %s], [DSTKEYHASH: %s], SourceIP: [%s] ]]\n",
- peerInfo.LocalConn.RemoteAddr(), peerInfo.LocalConn.LocalAddr(),
- fmt.Sprintf("%s:%d", source.IP.String(), source.Port), srcPeerKeyHash, dstPeerKeyHash, source.IP.String())
- _, err = peerInfo.LocalConn.Write(buffer[:n])
- if err != nil {
- log.Println("Failed to proxy to Wg local interface: ", err)
- //continue
- }
- go func(n int, peerKey string) {
- metrics.MetricsMapLock.Lock()
- metric := metrics.MetricsMap[peerKey]
- metric.TrafficRecieved += float64(n) / (1 << 20)
- metric.ConnectionStatus = true
- metrics.MetricsMap[peerKey] = metric
- metrics.MetricsMapLock.Unlock()
- }(n, peerInfo.PeerKey)
- return
- }
- }
- // Create - creats a proxy listener
- // port - port for proxy to listen on localhost
- // bodySize - default 10000, leave 0 to use default
- // addr - the address for proxy to listen on
- // forwards - indicate address to forward to, {"<address:port>",...} format
- func (p *ProxyServer) CreateProxyServer(port, bodySize int, addr string) (err error) {
- if p == nil {
- p = &ProxyServer{}
- }
- p.Config.Port = port
- p.Config.BodySize = bodySize
- p.setDefaults()
- p.Server, err = net.ListenUDP("udp", &net.UDPAddr{
- Port: p.Config.Port,
- IP: net.ParseIP(addr),
- })
- return
- }
- func (p *ProxyServer) KeepAlive(ip string, port int) {
- for {
- _, _ = p.Server.WriteToUDP([]byte("hello-proxy"), &net.UDPAddr{
- IP: net.ParseIP(ip),
- Port: port,
- })
- //log.Println("Sending MSg: ", ip, port, err)
- time.Sleep(time.Second * 5)
- }
- }
- // Proxy.setDefaults - sets all defaults of proxy listener
- func (p *ProxyServer) setDefaults() {
- p.setDefaultBodySize()
- p.setDefaultPort()
- }
- // Proxy.setDefaultPort - sets default port of Proxy listener if 0
- func (p *ProxyServer) setDefaultPort() {
- if p.Config.Port == 0 {
- p.Config.Port = defaultPort
- }
- }
- // Proxy.setDefaultBodySize - sets default body size of Proxy listener if 0
- func (p *ProxyServer) setDefaultBodySize() {
- if p.Config.BodySize == 0 {
- p.Config.BodySize = defaultBodySize
- }
- }
|