server.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. package server
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "fmt"
  6. "log"
  7. "net"
  8. "time"
  9. "github.com/gravitl/netmaker/nm-proxy/common"
  10. "github.com/gravitl/netmaker/nm-proxy/metrics"
  11. "github.com/gravitl/netmaker/nm-proxy/models"
  12. "github.com/gravitl/netmaker/nm-proxy/packet"
  13. )
  14. var (
  15. NmProxyServer = &ProxyServer{}
  16. )
  17. const (
  18. defaultBodySize = 10000
  19. defaultPort = models.NmProxyPort
  20. )
  21. type Config struct {
  22. Port int
  23. BodySize int
  24. IsRelay bool
  25. Addr net.Addr
  26. }
  27. type ProxyServer struct {
  28. Config Config
  29. Server *net.UDPConn
  30. }
  31. func (p *ProxyServer) Close() {
  32. log.Println("--------->### Shutting down Proxy.....")
  33. // clean up proxy connections
  34. for _, peerI := range common.WgIfaceMap.PeerMap {
  35. peerI.Mutex.Lock()
  36. peerI.StopConn()
  37. peerI.Mutex.Unlock()
  38. }
  39. // close server connection
  40. NmProxyServer.Server.Close()
  41. }
  42. // Proxy.Listen - begins listening for packets
  43. func (p *ProxyServer) Listen(ctx context.Context) {
  44. // Buffer with indicated body size
  45. buffer := make([]byte, 65036)
  46. for {
  47. select {
  48. case <-ctx.Done():
  49. p.Close()
  50. return
  51. default:
  52. // Read Packet
  53. n, source, err := p.Server.ReadFromUDP(buffer)
  54. if err != nil || source == nil { // in future log errors?
  55. log.Println("RECV ERROR: ", err)
  56. continue
  57. }
  58. //go func(buffer []byte, source *net.UDPAddr, n int) {
  59. proxyTransportMsg := true
  60. var srcPeerKeyHash, dstPeerKeyHash string
  61. n, srcPeerKeyHash, dstPeerKeyHash, err = packet.ExtractInfo(buffer, n)
  62. if err != nil {
  63. log.Println("proxy transport message not found: ", err)
  64. proxyTransportMsg = false
  65. }
  66. if proxyTransportMsg {
  67. p.proxyIncomingPacket(buffer[:], source, n, srcPeerKeyHash, dstPeerKeyHash)
  68. continue
  69. } else {
  70. // unknown peer to proxy -> check if extclient and handle it
  71. if handleExtClients(buffer[:], n, source) {
  72. continue
  73. }
  74. }
  75. handleMsgs(buffer, n, source)
  76. }
  77. }
  78. }
  79. func handleMsgs(buffer []byte, n int, source *net.UDPAddr) {
  80. msgType := binary.LittleEndian.Uint32(buffer[:4])
  81. switch packet.MessageType(msgType) {
  82. case packet.MessageMetricsType:
  83. metricMsg, err := packet.ConsumeMetricPacket(buffer[:n])
  84. // calc latency
  85. if err == nil {
  86. log.Printf("------->$$$$$ Recieved Metric Pkt: %+v, FROM:%s\n", metricMsg, source.String())
  87. if metricMsg.Sender == common.WgIfaceMap.Iface.PublicKey {
  88. latency := time.Now().UnixMilli() - metricMsg.TimeStamp
  89. metrics.MetricsMapLock.Lock()
  90. metric := metrics.MetricsMap[metricMsg.Reciever.String()]
  91. metric.LastRecordedLatency = uint64(latency)
  92. metric.ConnectionStatus = true
  93. metric.TrafficRecieved += float64(n) / (1 << 20)
  94. metrics.MetricsMap[metricMsg.Reciever.String()] = metric
  95. metrics.MetricsMapLock.Unlock()
  96. } else if metricMsg.Reciever == common.WgIfaceMap.Iface.PublicKey {
  97. // proxy it back to the sender
  98. log.Println("------------> $$$ SENDING back the metric pkt to the source: ", source.String())
  99. _, err = NmProxyServer.Server.WriteToUDP(buffer[:n], source)
  100. if err != nil {
  101. log.Println("Failed to send metric packet to remote: ", err)
  102. }
  103. metrics.MetricsMapLock.Lock()
  104. metric := metrics.MetricsMap[metricMsg.Sender.String()]
  105. metric.ConnectionStatus = true
  106. metric.TrafficRecieved += float64(n) / (1 << 20)
  107. metrics.MetricsMap[metricMsg.Sender.String()] = metric
  108. metrics.MetricsMapLock.Unlock()
  109. }
  110. }
  111. case packet.MessageProxyUpdateType:
  112. msg, err := packet.ConsumeProxyUpdateMsg(buffer[:n])
  113. if err == nil {
  114. switch msg.Action {
  115. case packet.UpdateListenPort:
  116. if peer, ok := common.WgIfaceMap.PeerMap[msg.Sender.String()]; ok {
  117. peer.Mutex.Lock()
  118. if peer.Config.PeerEndpoint.Port != int(msg.ListenPort) {
  119. // update peer conn
  120. peer.Config.PeerEndpoint.Port = int(msg.ListenPort)
  121. common.WgIfaceMap.PeerMap[msg.Sender.String()] = peer
  122. log.Println("--------> Resetting Proxy Conn For Peer ", msg.Sender.String())
  123. peer.Mutex.Unlock()
  124. peer.ResetConn()
  125. return
  126. }
  127. peer.Mutex.Unlock()
  128. }
  129. }
  130. }
  131. // consume handshake message for ext clients
  132. case packet.MessageInitiationType:
  133. err := packet.ConsumeHandshakeInitiationMsg(false, buffer[:n], source,
  134. packet.NoisePublicKey(common.WgIfaceMap.Iface.PublicKey), packet.NoisePrivateKey(common.WgIfaceMap.Iface.PrivateKey))
  135. if err != nil {
  136. log.Println("---------> @@@ failed to decode HS: ", err)
  137. }
  138. }
  139. }
  140. func handleExtClients(buffer []byte, n int, source *net.UDPAddr) bool {
  141. isExtClient := false
  142. if peerInfo, ok := common.ExtSourceIpMap[source.String()]; ok {
  143. if peerI, ok := common.WgIfaceMap.PeerMap[peerInfo.PeerKey]; ok {
  144. peerI.Mutex.RLock()
  145. peerI.Config.RecieverChan <- buffer[:n]
  146. metrics.MetricsMapLock.Lock()
  147. metric := metrics.MetricsMap[peerInfo.PeerKey]
  148. metric.TrafficRecieved += float64(n) / (1 << 20)
  149. metric.ConnectionStatus = true
  150. metrics.MetricsMap[peerInfo.PeerKey] = metric
  151. metrics.MetricsMapLock.Unlock()
  152. peerI.Mutex.RUnlock()
  153. isExtClient = true
  154. }
  155. }
  156. return isExtClient
  157. }
  158. func (p *ProxyServer) proxyIncomingPacket(buffer []byte, source *net.UDPAddr, n int, srcPeerKeyHash, dstPeerKeyHash string) {
  159. var err error
  160. //log.Printf("--------> RECV PKT , [SRCKEYHASH: %s], SourceIP: [%s] \n", srcPeerKeyHash, source.IP.String())
  161. if common.WgIfaceMap.IfaceKeyHash != dstPeerKeyHash && common.IsRelay {
  162. log.Println("----------> Relaying######")
  163. // check for routing map and forward to right proxy
  164. if remoteMap, ok := common.RelayPeerMap[srcPeerKeyHash]; ok {
  165. if conf, ok := remoteMap[dstPeerKeyHash]; ok {
  166. log.Printf("--------> Relaying PKT [ SourceIP: %s:%d ], [ SourceKeyHash: %s ], [ DstIP: %s:%d ], [ DstHashKey: %s ] \n",
  167. source.IP.String(), source.Port, srcPeerKeyHash, conf.Endpoint.String(), conf.Endpoint.Port, dstPeerKeyHash)
  168. _, err = p.Server.WriteToUDP(buffer[:n+packet.MessageProxySize], conf.Endpoint)
  169. if err != nil {
  170. log.Println("Failed to send to remote: ", err)
  171. }
  172. return
  173. }
  174. } else {
  175. if remoteMap, ok := common.RelayPeerMap[dstPeerKeyHash]; ok {
  176. if conf, ok := remoteMap[dstPeerKeyHash]; ok {
  177. log.Printf("--------> Relaying BACK TO RELAYED NODE PKT [ SourceIP: %s ], [ SourceKeyHash: %s ], [ DstIP: %s ], [ DstHashKey: %s ] \n",
  178. source.String(), srcPeerKeyHash, conf.Endpoint.String(), dstPeerKeyHash)
  179. _, err = p.Server.WriteToUDP(buffer[:n+packet.MessageProxySize], conf.Endpoint)
  180. if err != nil {
  181. log.Println("Failed to send to remote: ", err)
  182. }
  183. return
  184. }
  185. }
  186. }
  187. }
  188. if peerInfo, ok := common.PeerKeyHashMap[srcPeerKeyHash]; ok {
  189. log.Printf("PROXING TO LOCAL!!!---> %s <<<< %s <<<<<<<< %s [[ RECV PKT [SRCKEYHASH: %s], [DSTKEYHASH: %s], SourceIP: [%s] ]]\n",
  190. peerInfo.LocalConn.RemoteAddr(), peerInfo.LocalConn.LocalAddr(),
  191. fmt.Sprintf("%s:%d", source.IP.String(), source.Port), srcPeerKeyHash, dstPeerKeyHash, source.IP.String())
  192. _, err = peerInfo.LocalConn.Write(buffer[:n])
  193. if err != nil {
  194. log.Println("Failed to proxy to Wg local interface: ", err)
  195. //continue
  196. }
  197. go func(n int, peerKey string) {
  198. metrics.MetricsMapLock.Lock()
  199. metric := metrics.MetricsMap[peerKey]
  200. metric.TrafficRecieved += float64(n) / (1 << 20)
  201. metric.ConnectionStatus = true
  202. metrics.MetricsMap[peerKey] = metric
  203. metrics.MetricsMapLock.Unlock()
  204. }(n, peerInfo.PeerKey)
  205. return
  206. }
  207. }
  208. // Create - creats a proxy listener
  209. // port - port for proxy to listen on localhost
  210. // bodySize - default 10000, leave 0 to use default
  211. // addr - the address for proxy to listen on
  212. // forwards - indicate address to forward to, {"<address:port>",...} format
  213. func (p *ProxyServer) CreateProxyServer(port, bodySize int, addr string) (err error) {
  214. if p == nil {
  215. p = &ProxyServer{}
  216. }
  217. p.Config.Port = port
  218. p.Config.BodySize = bodySize
  219. p.setDefaults()
  220. p.Server, err = net.ListenUDP("udp", &net.UDPAddr{
  221. Port: p.Config.Port,
  222. IP: net.ParseIP(addr),
  223. })
  224. return
  225. }
  226. func (p *ProxyServer) KeepAlive(ip string, port int) {
  227. for {
  228. _, _ = p.Server.WriteToUDP([]byte("hello-proxy"), &net.UDPAddr{
  229. IP: net.ParseIP(ip),
  230. Port: port,
  231. })
  232. //log.Println("Sending MSg: ", ip, port, err)
  233. time.Sleep(time.Second * 5)
  234. }
  235. }
  236. // Proxy.setDefaults - sets all defaults of proxy listener
  237. func (p *ProxyServer) setDefaults() {
  238. p.setDefaultBodySize()
  239. p.setDefaultPort()
  240. }
  241. // Proxy.setDefaultPort - sets default port of Proxy listener if 0
  242. func (p *ProxyServer) setDefaultPort() {
  243. if p.Config.Port == 0 {
  244. p.Config.Port = defaultPort
  245. }
  246. }
  247. // Proxy.setDefaultBodySize - sets default body size of Proxy listener if 0
  248. func (p *ProxyServer) setDefaultBodySize() {
  249. if p.Config.BodySize == 0 {
  250. p.Config.BodySize = defaultBodySize
  251. }
  252. }