manager.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. package manager
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "errors"
  6. "fmt"
  7. "log"
  8. "net"
  9. "reflect"
  10. "runtime"
  11. "github.com/gravitl/netmaker/nm-proxy/common"
  12. "github.com/gravitl/netmaker/nm-proxy/models"
  13. peerpkg "github.com/gravitl/netmaker/nm-proxy/peer"
  14. "github.com/gravitl/netmaker/nm-proxy/wg"
  15. "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
  16. )
  17. /*
  18. TODO:-
  19. 1. ON Ingress node
  20. --> for attached ext clients
  21. -> start sniffer (will recieve pkts from ext clients (add ebf filter to listen on only ext traffic) if not intended to the interface forward it.)
  22. -> start remote conn after endpoint is updated
  23. -->
  24. */
  25. type ProxyAction string
  26. type ManagerPayload struct {
  27. InterfaceName string `json:"interface_name"`
  28. WgAddr string `json:"wg_addr"`
  29. Peers []wgtypes.PeerConfig `json:"peers"`
  30. PeerMap map[string]PeerConf `json:"peer_map"`
  31. IsRelayed bool `json:"is_relayed"`
  32. IsIngress bool `json:"is_ingress"`
  33. RelayedTo *net.UDPAddr `json:"relayed_to"`
  34. IsRelay bool `json:"is_relay"`
  35. RelayedPeerConf map[string]RelayedConf `json:"relayed_conf"`
  36. }
  37. type RelayedConf struct {
  38. RelayedPeerEndpoint *net.UDPAddr `json:"relayed_peer_endpoint"`
  39. RelayedPeerPubKey string `json:"relayed_peer_pub_key"`
  40. Peers []wgtypes.PeerConfig `json:"relayed_peers"`
  41. }
  42. type PeerConf struct {
  43. IsExtClient bool `json:"is_ext_client"`
  44. Address string `json:"address"`
  45. IsAttachedExtClient bool `json:"is_attached_ext_client"`
  46. IngressGatewayEndPoint *net.UDPAddr `json:"ingress_gateway_endpoint"`
  47. IsRelayed bool `json:"is_relayed"`
  48. RelayedTo *net.UDPAddr `json:"relayed_to"`
  49. Proxy bool `json:"proxy"`
  50. }
  51. const (
  52. AddInterface ProxyAction = "ADD_INTERFACE"
  53. DeleteInterface ProxyAction = "DELETE_INTERFACE"
  54. )
  55. type ManagerAction struct {
  56. Action ProxyAction
  57. Payload ManagerPayload
  58. }
  59. func StartProxyManager(manageChan chan *ManagerAction) {
  60. for mI := range manageChan {
  61. log.Printf("-------> PROXY-MANAGER: %+v\n", mI)
  62. switch mI.Action {
  63. case AddInterface:
  64. mI.SetIngressGateway()
  65. err := mI.AddInterfaceToProxy()
  66. if err != nil {
  67. log.Printf("failed to add interface: [%s] to proxy: %v\n ", mI.Payload.InterfaceName, err)
  68. }
  69. case DeleteInterface:
  70. mI.DeleteInterface()
  71. }
  72. }
  73. }
  74. func (m *ManagerAction) DeleteInterface() {
  75. var err error
  76. if runtime.GOOS == "darwin" {
  77. m.Payload.InterfaceName, err = wg.GetRealIface(m.Payload.InterfaceName)
  78. if err != nil {
  79. log.Println("failed to get real iface: ", err)
  80. return
  81. }
  82. }
  83. if common.WgIfaceMap.Iface.Name == m.Payload.InterfaceName {
  84. cleanUpInterface()
  85. }
  86. }
  87. func (m *ManagerAction) RelayUpdate() {
  88. common.IsRelay = m.Payload.IsRelay
  89. }
  90. func (m *ManagerAction) SetIngressGateway() {
  91. common.IsIngressGateway = m.Payload.IsIngress
  92. }
  93. func (m *ManagerAction) RelayPeers() {
  94. common.IsRelay = true
  95. for relayedNodePubKey, relayedNodeConf := range m.Payload.RelayedPeerConf {
  96. relayedNodePubKeyHash := fmt.Sprintf("%x", md5.Sum([]byte(relayedNodePubKey)))
  97. if _, ok := common.RelayPeerMap[relayedNodePubKeyHash]; !ok {
  98. common.RelayPeerMap[relayedNodePubKeyHash] = make(map[string]models.RemotePeer)
  99. }
  100. for _, peer := range relayedNodeConf.Peers {
  101. if peer.Endpoint != nil {
  102. peer.Endpoint.Port = models.NmProxyPort
  103. remotePeerKeyHash := fmt.Sprintf("%x", md5.Sum([]byte(peer.PublicKey.String())))
  104. common.RelayPeerMap[relayedNodePubKeyHash][remotePeerKeyHash] = models.RemotePeer{
  105. Endpoint: peer.Endpoint,
  106. }
  107. }
  108. }
  109. relayedNodeConf.RelayedPeerEndpoint.Port = models.NmProxyPort
  110. common.RelayPeerMap[relayedNodePubKeyHash][relayedNodePubKeyHash] = models.RemotePeer{
  111. Endpoint: relayedNodeConf.RelayedPeerEndpoint,
  112. }
  113. }
  114. }
  115. func cleanUpInterface() {
  116. log.Println("########------------> CLEANING UP: ", common.WgIfaceMap.Iface.Name)
  117. for _, peerI := range common.WgIfaceMap.PeerMap {
  118. peerI.Mutex.Lock()
  119. peerI.StopConn()
  120. peerI.Mutex.Unlock()
  121. delete(common.WgIfaceMap.PeerMap, peerI.Key.String())
  122. }
  123. common.WgIfaceMap.PeerMap = make(map[string]*models.Conn)
  124. }
  125. func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
  126. var err error
  127. var wgIface *wg.WGIface
  128. if m.Payload.InterfaceName == "" {
  129. return nil, errors.New("interface cannot be empty")
  130. }
  131. if len(m.Payload.Peers) == 0 {
  132. return nil, errors.New("no peers to add")
  133. }
  134. if runtime.GOOS == "darwin" {
  135. m.Payload.InterfaceName, err = wg.GetRealIface(m.Payload.InterfaceName)
  136. if err != nil {
  137. log.Println("failed to get real iface: ", err)
  138. }
  139. }
  140. common.InterfaceName = m.Payload.InterfaceName
  141. wgIface, err = wg.NewWGIFace(m.Payload.InterfaceName, "127.0.0.1/32", wg.DefaultMTU)
  142. if err != nil {
  143. log.Println("Failed init new interface: ", err)
  144. return nil, err
  145. }
  146. if common.WgIfaceMap.Iface == nil {
  147. for i := len(m.Payload.Peers) - 1; i >= 0; i-- {
  148. if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy {
  149. log.Println("-----------> skipping peer, proxy is off: ", m.Payload.Peers[i].PublicKey)
  150. if err := wgIface.Update(m.Payload.Peers[i], false); err != nil {
  151. log.Println("falied to update peer: ", err)
  152. }
  153. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  154. continue
  155. }
  156. }
  157. common.WgIfaceMap.Iface = wgIface.Device
  158. common.WgIfaceMap.IfaceKeyHash = fmt.Sprintf("%x", md5.Sum([]byte(wgIface.Device.PublicKey.String())))
  159. return wgIface, nil
  160. }
  161. wgProxyConf := common.WgIfaceMap
  162. if m.Payload.IsRelay {
  163. m.RelayPeers()
  164. }
  165. common.IsRelay = m.Payload.IsRelay
  166. // check if node is getting relayed
  167. if common.IsRelayed != m.Payload.IsRelayed {
  168. common.IsRelayed = m.Payload.IsRelayed
  169. cleanUpInterface()
  170. return wgIface, nil
  171. }
  172. // sync map with wg device config
  173. // check if listen port has changed
  174. if wgIface.Device.ListenPort != wgProxyConf.Iface.ListenPort {
  175. // reset proxy for this interface
  176. cleanUpInterface()
  177. return wgIface, nil
  178. }
  179. // check device conf different from proxy
  180. wgProxyConf.Iface = wgIface.Device
  181. // sync peer map with new update
  182. for _, currPeerI := range wgProxyConf.Iface.Peers {
  183. if _, ok := m.Payload.PeerMap[currPeerI.PublicKey.String()]; !ok {
  184. if val, ok := wgProxyConf.PeerMap[currPeerI.PublicKey.String()]; ok {
  185. val.Mutex.Lock()
  186. if val.IsAttachedExtClient {
  187. log.Println("------> Deleting ExtClient Watch Thread: ", currPeerI.PublicKey.String())
  188. if val, ok := common.ExtClientsWaitTh[currPeerI.PublicKey.String()]; ok {
  189. val.CancelFunc()
  190. delete(common.ExtClientsWaitTh, currPeerI.PublicKey.String())
  191. }
  192. log.Println("-----> Deleting Ext Client from Src Ip Map: ", currPeerI.PublicKey.String())
  193. delete(common.ExtSourceIpMap, val.Config.PeerConf.Endpoint.String())
  194. }
  195. val.StopConn()
  196. val.Mutex.Unlock()
  197. delete(wgProxyConf.PeerMap, currPeerI.PublicKey.String())
  198. }
  199. // delete peer from interface
  200. log.Println("CurrPeer Not Found, Deleting Peer from Interface: ", currPeerI.PublicKey.String())
  201. if err := wgIface.RemovePeer(currPeerI.PublicKey.String()); err != nil {
  202. log.Println("failed to remove peer: ", currPeerI.PublicKey.String(), err)
  203. }
  204. delete(common.PeerKeyHashMap, fmt.Sprintf("%x", md5.Sum([]byte(currPeerI.PublicKey.String()))))
  205. }
  206. }
  207. for i := len(m.Payload.Peers) - 1; i >= 0; i-- {
  208. if currentPeer, ok := wgProxyConf.PeerMap[m.Payload.Peers[i].PublicKey.String()]; ok {
  209. currentPeer.Mutex.Lock()
  210. if currentPeer.IsAttachedExtClient {
  211. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  212. continue
  213. }
  214. // check if proxy is off for the peer
  215. if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy {
  216. // cleanup proxy connections for the peer
  217. currentPeer.StopConn()
  218. delete(wgProxyConf.PeerMap, currentPeer.Key.String())
  219. // update the peer with actual endpoint
  220. if err := wgIface.Update(m.Payload.Peers[i], false); err != nil {
  221. log.Println("falied to update peer: ", err)
  222. }
  223. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  224. continue
  225. }
  226. // check if peer is not connected to proxy
  227. devPeer, err := wg.GetPeer(m.Payload.InterfaceName, currentPeer.Key.String())
  228. if err == nil {
  229. log.Printf("---------> COMAPRING ENDPOINT: DEV: %s, Proxy: %s", devPeer.Endpoint.String(), currentPeer.Config.LocalConnAddr.String())
  230. if devPeer.Endpoint.String() != currentPeer.Config.LocalConnAddr.String() {
  231. log.Println("---------> endpoint is not set to proxy: ", currentPeer.Key)
  232. currentPeer.StopConn()
  233. delete(wgProxyConf.PeerMap, currentPeer.Key.String())
  234. continue
  235. }
  236. }
  237. //check if peer is being relayed
  238. if currentPeer.IsRelayed != m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].IsRelayed {
  239. log.Println("---------> peer relay status has been changed: ", currentPeer.Key)
  240. currentPeer.StopConn()
  241. delete(wgProxyConf.PeerMap, currentPeer.Key.String())
  242. continue
  243. }
  244. // check if relay endpoint has been changed
  245. if currentPeer.RelayedEndpoint != nil &&
  246. m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].RelayedTo != nil &&
  247. currentPeer.RelayedEndpoint.String() != m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].RelayedTo.String() {
  248. log.Println("---------> peer relay endpoint has been changed: ", currentPeer.Key)
  249. currentPeer.StopConn()
  250. delete(wgProxyConf.PeerMap, currentPeer.Key.String())
  251. continue
  252. }
  253. if !reflect.DeepEqual(m.Payload.Peers[i], *currentPeer.Config.PeerConf) {
  254. if currentPeer.Config.RemoteConnAddr.IP.String() != m.Payload.Peers[i].Endpoint.IP.String() {
  255. log.Println("----------> Resetting proxy for Peer: ", currentPeer.Key, m.Payload.InterfaceName)
  256. currentPeer.StopConn()
  257. currentPeer.Mutex.Unlock()
  258. delete(wgProxyConf.PeerMap, currentPeer.Key.String())
  259. continue
  260. } else {
  261. log.Println("----->##### Updating Peer on Interface: ", m.Payload.InterfaceName, currentPeer.Key)
  262. updatePeerConf := m.Payload.Peers[i]
  263. localUdpAddr, err := net.ResolveUDPAddr("udp", currentPeer.Config.LocalConnAddr.String())
  264. if err == nil {
  265. updatePeerConf.Endpoint = localUdpAddr
  266. }
  267. if err := wgIface.Update(updatePeerConf, true); err != nil {
  268. log.Println("failed to update peer: ", currentPeer.Key, err)
  269. }
  270. currentPeer.Config.PeerConf = &m.Payload.Peers[i]
  271. wgProxyConf.PeerMap[currentPeer.Key.String()] = currentPeer
  272. // delete the peer from the list
  273. log.Println("-----------> deleting peer from list: ", m.Payload.Peers[i].PublicKey)
  274. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  275. }
  276. } else {
  277. // delete the peer from the list
  278. log.Println("-----------> No updates observed so deleting peer: ", m.Payload.Peers[i].PublicKey)
  279. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  280. }
  281. currentPeer.Mutex.Unlock()
  282. } else if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy && !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].IsAttachedExtClient {
  283. log.Println("-----------> skipping peer, proxy is off: ", m.Payload.Peers[i].PublicKey)
  284. if err := wgIface.Update(m.Payload.Peers[i], false); err != nil {
  285. log.Println("falied to update peer: ", err)
  286. }
  287. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  288. }
  289. }
  290. // sync dev peers with new update
  291. common.WgIfaceMap = wgProxyConf
  292. log.Println("CLEANED UP..........")
  293. return wgIface, nil
  294. }
  295. func (m *ManagerAction) AddInterfaceToProxy() error {
  296. var err error
  297. wgInterface, err := m.processPayload()
  298. if err != nil {
  299. return err
  300. }
  301. log.Printf("wg: %+v\n", wgInterface)
  302. for _, peerI := range m.Payload.Peers {
  303. peerConf := m.Payload.PeerMap[peerI.PublicKey.String()]
  304. if peerI.Endpoint == nil && !(peerConf.IsAttachedExtClient || peerConf.IsExtClient) {
  305. log.Println("Endpoint nil for peer: ", peerI.PublicKey.String())
  306. continue
  307. }
  308. if peerConf.IsExtClient && !peerConf.IsAttachedExtClient {
  309. peerI.Endpoint = peerConf.IngressGatewayEndPoint
  310. }
  311. var isRelayed bool
  312. var relayedTo *net.UDPAddr
  313. if m.Payload.IsRelayed {
  314. isRelayed = true
  315. relayedTo = m.Payload.RelayedTo
  316. } else {
  317. isRelayed = peerConf.IsRelayed
  318. relayedTo = peerConf.RelayedTo
  319. }
  320. if peerConf.IsAttachedExtClient {
  321. log.Println("Extclient Thread...")
  322. go func(wgInterface *wg.WGIface, peer *wgtypes.PeerConfig,
  323. isRelayed bool, relayTo *net.UDPAddr, peerConf PeerConf, ingGwAddr string) {
  324. addExtClient := false
  325. commChan := make(chan *net.UDPAddr, 100)
  326. ctx, cancel := context.WithCancel(context.Background())
  327. common.ExtClientsWaitTh[peerI.PublicKey.String()] = models.ExtClientPeer{
  328. CancelFunc: cancel,
  329. CommChan: commChan,
  330. }
  331. defer func() {
  332. if addExtClient {
  333. log.Println("GOT ENDPOINT for Extclient adding peer...")
  334. common.ExtSourceIpMap[peer.Endpoint.String()] = models.RemotePeer{
  335. Interface: wgInterface.Name,
  336. PeerKey: peer.PublicKey.String(),
  337. IsExtClient: peerConf.IsExtClient,
  338. IsAttachedExtClient: peerConf.IsAttachedExtClient,
  339. Endpoint: peer.Endpoint,
  340. }
  341. peerpkg.AddNewPeer(wgInterface, peer, peerConf.Address, isRelayed,
  342. peerConf.IsExtClient, peerConf.IsAttachedExtClient, relayedTo)
  343. }
  344. log.Println("Exiting extclient watch Thread for: ", peer.PublicKey.String())
  345. }()
  346. for {
  347. select {
  348. case <-ctx.Done():
  349. return
  350. case endpoint := <-commChan:
  351. if endpoint != nil {
  352. addExtClient = true
  353. peer.Endpoint = endpoint
  354. delete(common.ExtClientsWaitTh, peer.PublicKey.String())
  355. return
  356. }
  357. }
  358. }
  359. }(wgInterface, &peerI, isRelayed, relayedTo, peerConf, m.Payload.WgAddr)
  360. continue
  361. }
  362. peerpkg.AddNewPeer(wgInterface, &peerI, peerConf.Address, isRelayed,
  363. peerConf.IsExtClient, peerConf.IsAttachedExtClient, relayedTo)
  364. }
  365. return nil
  366. }