manager.go 15 KB


  1. package manager
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "errors"
  6. "fmt"
  7. "log"
  8. "net"
  9. "reflect"
  10. "runtime"
  11. "github.com/gravitl/netmaker/nm-proxy/common"
  12. "github.com/gravitl/netmaker/nm-proxy/models"
  13. peerpkg "github.com/gravitl/netmaker/nm-proxy/peer"
  14. "github.com/gravitl/netmaker/nm-proxy/proxy"
  15. "github.com/gravitl/netmaker/nm-proxy/wg"
  16. "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
  17. )
  18. /*
  19. TODO:-
  20. 1. ON Ingress node
  21. --> for attached ext clients
  22. -> start sniffer (will recieve pkts from ext clients (add ebf filter to listen on only ext traffic) if not intended to the interface forward it.)
  23. -> start remote conn after endpoint is updated
  24. -->
  25. */
  26. var sent bool
  27. type ProxyAction string
  28. type ManagerPayload struct {
  29. InterfaceName string `json:"interface_name"`
  30. WgAddr string `json:"wg_addr"`
  31. Peers []wgtypes.PeerConfig `json:"peers"`
  32. PeerMap map[string]PeerConf `json:"peer_map"`
  33. IsRelayed bool `json:"is_relayed"`
  34. IsIngress bool `json:"is_ingress"`
  35. RelayedTo *net.UDPAddr `json:"relayed_to"`
  36. IsRelay bool `json:"is_relay"`
  37. RelayedPeerConf map[string]RelayedConf `json:"relayed_conf"`
  38. }
  39. type RelayedConf struct {
  40. RelayedPeerEndpoint *net.UDPAddr `json:"relayed_peer_endpoint"`
  41. RelayedPeerPubKey string `json:"relayed_peer_pub_key"`
  42. Peers []wgtypes.PeerConfig `json:"relayed_peers"`
  43. }
  44. type PeerConf struct {
  45. IsExtClient bool `json:"is_ext_client"`
  46. Address string `json:"address"`
  47. IsAttachedExtClient bool `json:"is_attached_ext_client"`
  48. IngressGatewayEndPoint *net.UDPAddr `json:"ingress_gateway_endpoint"`
  49. IsRelayed bool `json:"is_relayed"`
  50. RelayedTo *net.UDPAddr `json:"relayed_to"`
  51. Proxy bool `json:"proxy"`
  52. }
  53. const (
  54. AddInterface ProxyAction = "ADD_INTERFACE"
  55. DeleteInterface ProxyAction = "DELETE_INTERFACE"
  56. )
  57. type ManagerAction struct {
  58. Action ProxyAction
  59. Payload ManagerPayload
  60. }
  61. func StartProxyManager(manageChan chan *ManagerAction) {
  62. for {
  63. select {
  64. case mI := <-manageChan:
  65. log.Printf("-------> PROXY-MANAGER: %+v\n", mI)
  66. switch mI.Action {
  67. case AddInterface:
  68. mI.SetIngressGateway()
  69. err := mI.AddInterfaceToProxy()
  70. if err != nil {
  71. log.Printf("failed to add interface: [%s] to proxy: %v\n ", mI.Payload.InterfaceName, err)
  72. }
  73. case DeleteInterface:
  74. mI.DeleteInterface()
  75. }
  76. }
  77. }
  78. }
  79. func (m *ManagerAction) DeleteInterface() {
  80. var err error
  81. if runtime.GOOS == "darwin" {
  82. m.Payload.InterfaceName, err = wg.GetRealIface(m.Payload.InterfaceName)
  83. if err != nil {
  84. log.Println("failed to get real iface: ", err)
  85. return
  86. }
  87. }
  88. if wgProxyConf, ok := common.WgIFaceMap[m.Payload.InterfaceName]; ok {
  89. cleanUpInterface(wgProxyConf)
  90. }
  91. }
  92. func (m *ManagerAction) RelayUpdate() {
  93. common.IsRelay = m.Payload.IsRelay
  94. }
  95. func (m *ManagerAction) SetIngressGateway() {
  96. common.IsIngressGateway = m.Payload.IsIngress
  97. }
  98. func (m *ManagerAction) RelayPeers() {
  99. common.IsRelay = true
  100. for relayedNodePubKey, relayedNodeConf := range m.Payload.RelayedPeerConf {
  101. relayedNodePubKeyHash := fmt.Sprintf("%x", md5.Sum([]byte(relayedNodePubKey)))
  102. if _, ok := common.RelayPeerMap[relayedNodePubKeyHash]; !ok {
  103. common.RelayPeerMap[relayedNodePubKeyHash] = make(map[string]models.RemotePeer)
  104. }
  105. for _, peer := range relayedNodeConf.Peers {
  106. if peer.Endpoint != nil {
  107. peer.Endpoint.Port = models.NmProxyPort
  108. remotePeerKeyHash := fmt.Sprintf("%x", md5.Sum([]byte(peer.PublicKey.String())))
  109. common.RelayPeerMap[relayedNodePubKeyHash][remotePeerKeyHash] = models.RemotePeer{
  110. Endpoint: peer.Endpoint,
  111. }
  112. }
  113. }
  114. relayedNodeConf.RelayedPeerEndpoint.Port = models.NmProxyPort
  115. common.RelayPeerMap[relayedNodePubKeyHash][relayedNodePubKeyHash] = models.RemotePeer{
  116. Endpoint: relayedNodeConf.RelayedPeerEndpoint,
  117. }
  118. }
  119. }
  120. func cleanUpInterface(ifaceConf models.WgIfaceConf) {
  121. log.Println("########------------> CLEANING UP: ", ifaceConf.Iface.Name)
  122. for _, peerI := range ifaceConf.PeerMap {
  123. peerI.StopConn()
  124. }
  125. delete(common.WgIFaceMap, ifaceConf.Iface.Name)
  126. }
  127. func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
  128. var err error
  129. var wgIface *wg.WGIface
  130. if m.Payload.InterfaceName == "" {
  131. return nil, errors.New("interface cannot be empty")
  132. }
  133. if len(m.Payload.Peers) == 0 {
  134. return nil, errors.New("no peers to add")
  135. }
  136. if runtime.GOOS == "darwin" {
  137. m.Payload.InterfaceName, err = wg.GetRealIface(m.Payload.InterfaceName)
  138. if err != nil {
  139. log.Println("failed to get real iface: ", err)
  140. }
  141. }
  142. common.InterfaceName = m.Payload.InterfaceName
  143. wgIface, err = wg.NewWGIFace(m.Payload.InterfaceName, "127.0.0.1/32", wg.DefaultMTU)
  144. if err != nil {
  145. log.Println("Failed init new interface: ", err)
  146. return nil, err
  147. }
  148. var wgProxyConf models.WgIfaceConf
  149. var ok bool
  150. if wgProxyConf, ok = common.WgIFaceMap[m.Payload.InterfaceName]; !ok {
  151. for i := len(m.Payload.Peers) - 1; i >= 0; i-- {
  152. if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy {
  153. log.Println("-----------> skipping peer, proxy is off: ", m.Payload.Peers[i].PublicKey)
  154. if err := wgIface.Update(m.Payload.Peers[i], false); err != nil {
  155. log.Println("falied to update peer: ", err)
  156. }
  157. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  158. continue
  159. }
  160. }
  161. return wgIface, nil
  162. }
  163. if m.Payload.IsRelay {
  164. m.RelayPeers()
  165. }
  166. common.IsRelay = m.Payload.IsRelay
  167. // check if node is getting relayed
  168. if common.IsRelayed != m.Payload.IsRelayed {
  169. common.IsRelayed = m.Payload.IsRelayed
  170. cleanUpInterface(wgProxyConf)
  171. return wgIface, nil
  172. }
  173. // sync map with wg device config
  174. // check if listen port has changed
  175. if wgIface.Device.ListenPort != wgProxyConf.Iface.ListenPort {
  176. // reset proxy for this interface
  177. cleanUpInterface(wgProxyConf)
  178. return wgIface, nil
  179. }
  180. // check device conf different from proxy
  181. wgProxyConf.Iface = wgIface.Device
  182. // sync peer map with new update
  183. for _, currPeerI := range wgProxyConf.Iface.Peers {
  184. if _, ok := m.Payload.PeerMap[currPeerI.PublicKey.String()]; !ok {
  185. if val, ok := wgProxyConf.PeerMap[currPeerI.PublicKey.String()]; ok {
  186. if val.IsAttachedExtClient {
  187. log.Println("------> Deleting ExtClient Watch Thread: ", currPeerI.PublicKey.String())
  188. if val, ok := common.ExtClientsWaitTh[currPeerI.PublicKey.String()]; ok {
  189. val.CancelFunc()
  190. delete(common.ExtClientsWaitTh, currPeerI.PublicKey.String())
  191. }
  192. log.Println("-----> Deleting Ext Client from Src Ip Map: ", currPeerI.PublicKey.String())
  193. delete(common.ExtSourceIpMap, val.PeerConf.Endpoint.String())
  194. }
  195. val.StopConn()
  196. }
  197. // delete peer from interface
  198. log.Println("CurrPeer Not Found, Deleting Peer from Interface: ", currPeerI.PublicKey.String())
  199. if err := wgIface.RemovePeer(currPeerI.PublicKey.String()); err != nil {
  200. log.Println("failed to remove peer: ", currPeerI.PublicKey.String(), err)
  201. }
  202. delete(common.PeerKeyHashMap, fmt.Sprintf("%x", md5.Sum([]byte(currPeerI.PublicKey.String()))))
  203. delete(wgProxyConf.PeerMap, currPeerI.PublicKey.String())
  204. }
  205. }
  206. for i := len(m.Payload.Peers) - 1; i >= 0; i-- {
  207. if currentPeer, ok := wgProxyConf.PeerMap[m.Payload.Peers[i].PublicKey.String()]; ok {
  208. if currentPeer.IsAttachedExtClient {
  209. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  210. continue
  211. }
  212. // check if proxy is off for the peer
  213. if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy {
  214. // cleanup proxy connections for the peer
  215. currentPeer.StopConn()
  216. delete(wgProxyConf.PeerMap, currentPeer.Key)
  217. // update the peer with actual endpoint
  218. if err := wgIface.Update(m.Payload.Peers[i], false); err != nil {
  219. log.Println("falied to update peer: ", err)
  220. }
  221. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  222. continue
  223. }
  224. // check if peer is not connected to proxy
  225. devPeer, err := wg.GetPeer(m.Payload.InterfaceName, currentPeer.Key)
  226. if err == nil {
  227. log.Printf("---------> COMAPRING ENDPOINT: DEV: %s, Proxy: %s", devPeer.Endpoint.String(), currentPeer.LocalConn.LocalAddr().String())
  228. if devPeer.Endpoint.String() != currentPeer.LocalConn.LocalAddr().String() {
  229. log.Println("---------> endpoint is not set to proxy: ", currentPeer.Key)
  230. currentPeer.StopConn()
  231. delete(wgProxyConf.PeerMap, currentPeer.Key)
  232. continue
  233. }
  234. }
  235. //check if peer is being relayed
  236. if currentPeer.IsRelayed != m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].IsRelayed {
  237. log.Println("---------> peer relay status has been changed: ", currentPeer.Key)
  238. currentPeer.StopConn()
  239. delete(wgProxyConf.PeerMap, currentPeer.Key)
  240. continue
  241. }
  242. // check if relay endpoint has been changed
  243. if currentPeer.RelayedEndpoint != nil &&
  244. m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].RelayedTo != nil &&
  245. currentPeer.RelayedEndpoint.String() != m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].RelayedTo.String() {
  246. log.Println("---------> peer relay endpoint has been changed: ", currentPeer.Key)
  247. currentPeer.StopConn()
  248. delete(wgProxyConf.PeerMap, currentPeer.Key)
  249. continue
  250. }
  251. if !reflect.DeepEqual(m.Payload.Peers[i], *currentPeer.PeerConf) {
  252. if currentPeer.RemoteConn.IP.String() != m.Payload.Peers[i].Endpoint.IP.String() {
  253. log.Println("----------> Resetting proxy for Peer: ", currentPeer.Key, m.Payload.InterfaceName)
  254. currentPeer.StopConn()
  255. delete(wgProxyConf.PeerMap, currentPeer.Key)
  256. } else {
  257. log.Println("----->##### Updating Peer on Interface: ", m.Payload.InterfaceName, currentPeer.Key)
  258. updatePeerConf := m.Payload.Peers[i]
  259. localUdpAddr, err := net.ResolveUDPAddr("udp", currentPeer.LocalConn.LocalAddr().String())
  260. if err == nil {
  261. updatePeerConf.Endpoint = localUdpAddr
  262. }
  263. if err := wgIface.Update(updatePeerConf, true); err != nil {
  264. log.Println("failed to update peer: ", currentPeer.Key, err)
  265. }
  266. currentPeer.PeerConf = &m.Payload.Peers[i]
  267. wgProxyConf.PeerMap[currentPeer.Key] = currentPeer
  268. // delete the peer from the list
  269. log.Println("-----------> deleting peer from list: ", m.Payload.Peers[i].PublicKey)
  270. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  271. }
  272. } else {
  273. // delete the peer from the list
  274. log.Println("-----------> No updates observed so deleting peer: ", m.Payload.Peers[i].PublicKey)
  275. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  276. }
  277. } else if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy && !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].IsAttachedExtClient {
  278. log.Println("-----------> skipping peer, proxy is off: ", m.Payload.Peers[i].PublicKey)
  279. if err := wgIface.Update(m.Payload.Peers[i], false); err != nil {
  280. log.Println("falied to update peer: ", err)
  281. }
  282. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  283. }
  284. }
  285. // sync dev peers with new update
  286. common.WgIFaceMap[m.Payload.InterfaceName] = wgProxyConf
  287. log.Println("CLEANED UP..........")
  288. return wgIface, nil
  289. }
  290. func (m *ManagerAction) AddInterfaceToProxy() error {
  291. var err error
  292. wgInterface, err := m.processPayload()
  293. if err != nil {
  294. return err
  295. }
  296. log.Printf("wg: %+v\n", wgInterface)
  297. wgListenAddr, err := proxy.GetInterfaceListenAddr(wgInterface.Port)
  298. if err != nil {
  299. log.Println("failed to get wg listen addr: ", err)
  300. return err
  301. }
  302. common.WgIfaceKeyMap[fmt.Sprintf("%x", md5.Sum([]byte(wgInterface.Device.PublicKey.String())))] = models.RemotePeer{
  303. PeerKey: wgInterface.Device.PublicKey.String(),
  304. Interface: wgInterface.Name,
  305. Endpoint: wgListenAddr,
  306. }
  307. for _, peerI := range m.Payload.Peers {
  308. peerConf := m.Payload.PeerMap[peerI.PublicKey.String()]
  309. if peerI.Endpoint == nil && !(peerConf.IsAttachedExtClient || peerConf.IsExtClient) {
  310. log.Println("Endpoint nil for peer: ", peerI.PublicKey.String())
  311. continue
  312. }
  313. if peerConf.IsExtClient && !peerConf.IsAttachedExtClient {
  314. peerI.Endpoint = peerConf.IngressGatewayEndPoint
  315. }
  316. var isRelayed bool
  317. var relayedTo *net.UDPAddr
  318. if m.Payload.IsRelayed {
  319. isRelayed = true
  320. relayedTo = m.Payload.RelayedTo
  321. } else {
  322. isRelayed = peerConf.IsRelayed
  323. relayedTo = peerConf.RelayedTo
  324. }
  325. if peerConf.IsAttachedExtClient {
  326. log.Println("Extclient Thread...")
  327. go func(wgInterface *wg.WGIface, peer *wgtypes.PeerConfig,
  328. isRelayed bool, relayTo *net.UDPAddr, peerConf PeerConf, ingGwAddr string) {
  329. addExtClient := false
  330. commChan := make(chan *net.UDPAddr, 100)
  331. ctx, cancel := context.WithCancel(context.Background())
  332. common.ExtClientsWaitTh[peerI.PublicKey.String()] = models.ExtClientPeer{
  333. CancelFunc: cancel,
  334. CommChan: commChan,
  335. }
  336. defer func() {
  337. if addExtClient {
  338. log.Println("GOT ENDPOINT for Extclient adding peer...")
  339. common.PeerKeyHashMap[fmt.Sprintf("%x", md5.Sum([]byte(peer.PublicKey.String())))] = models.RemotePeer{
  340. Interface: wgInterface.Name,
  341. PeerKey: peer.PublicKey.String(),
  342. IsExtClient: peerConf.IsExtClient,
  343. IsAttachedExtClient: peerConf.IsAttachedExtClient,
  344. Endpoint: peer.Endpoint,
  345. }
  346. common.ExtSourceIpMap[peer.Endpoint.String()] = models.RemotePeer{
  347. Interface: wgInterface.Name,
  348. PeerKey: peer.PublicKey.String(),
  349. IsExtClient: peerConf.IsExtClient,
  350. IsAttachedExtClient: peerConf.IsAttachedExtClient,
  351. Endpoint: peer.Endpoint,
  352. }
  353. peerpkg.AddNewPeer(wgInterface, peer, peerConf.Address, isRelayed,
  354. peerConf.IsExtClient, peerConf.IsAttachedExtClient, relayedTo)
  355. }
  356. log.Println("Exiting extclient watch Thread for: ", peer.PublicKey.String())
  357. }()
  358. for {
  359. select {
  360. case <-ctx.Done():
  361. return
  362. case endpoint := <-commChan:
  363. if endpoint != nil {
  364. addExtClient = true
  365. peer.Endpoint = endpoint
  366. delete(common.ExtClientsWaitTh, peer.PublicKey.String())
  367. return
  368. }
  369. }
  370. }
  371. }(wgInterface, &peerI, isRelayed, relayedTo, peerConf, m.Payload.WgAddr)
  372. continue
  373. }
  374. common.PeerKeyHashMap[fmt.Sprintf("%x", md5.Sum([]byte(peerI.PublicKey.String())))] = models.RemotePeer{
  375. Interface: m.Payload.InterfaceName,
  376. PeerKey: peerI.PublicKey.String(),
  377. IsExtClient: peerConf.IsExtClient,
  378. Endpoint: peerI.Endpoint,
  379. IsAttachedExtClient: peerConf.IsAttachedExtClient,
  380. }
  381. peerpkg.AddNewPeer(wgInterface, &peerI, peerConf.Address, isRelayed,
  382. peerConf.IsExtClient, peerConf.IsAttachedExtClient, relayedTo)
  383. }
  384. log.Printf("------> PEERHASHMAP: %+v\n", common.PeerKeyHashMap)
  385. log.Printf("-------> WgKeyHashMap: %+v\n", common.WgIfaceKeyMap)
  386. log.Printf("-------> WgIFaceMap: %+v\n", common.WgIFaceMap)
  387. return nil
  388. }