manager.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. package manager
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "errors"
  6. "fmt"
  7. "log"
  8. "net"
  9. "reflect"
  10. "runtime"
  11. "github.com/gravitl/netmaker/nm-proxy/common"
  12. "github.com/gravitl/netmaker/nm-proxy/models"
  13. peerpkg "github.com/gravitl/netmaker/nm-proxy/peer"
  14. "github.com/gravitl/netmaker/nm-proxy/wg"
  15. "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
  16. )
  17. /*
  18. TODO:-
  19. 1. ON Ingress node
  20. --> for attached ext clients
  21. -> start sniffer (will recieve pkts from ext clients (add ebf filter to listen on only ext traffic) if not intended to the interface forward it.)
  22. -> start remote conn after endpoint is updated
  23. -->
  24. */
  25. type ProxyAction string
  26. type ManagerPayload struct {
  27. InterfaceName string `json:"interface_name"`
  28. WgAddr string `json:"wg_addr"`
  29. Peers []wgtypes.PeerConfig `json:"peers"`
  30. PeerMap map[string]PeerConf `json:"peer_map"`
  31. IsRelayed bool `json:"is_relayed"`
  32. IsIngress bool `json:"is_ingress"`
  33. RelayedTo *net.UDPAddr `json:"relayed_to"`
  34. IsRelay bool `json:"is_relay"`
  35. RelayedPeerConf map[string]RelayedConf `json:"relayed_conf"`
  36. }
  37. type RelayedConf struct {
  38. RelayedPeerEndpoint *net.UDPAddr `json:"relayed_peer_endpoint"`
  39. RelayedPeerPubKey string `json:"relayed_peer_pub_key"`
  40. Peers []wgtypes.PeerConfig `json:"relayed_peers"`
  41. }
  42. type PeerConf struct {
  43. IsExtClient bool `json:"is_ext_client"`
  44. Address string `json:"address"`
  45. IsAttachedExtClient bool `json:"is_attached_ext_client"`
  46. IngressGatewayEndPoint *net.UDPAddr `json:"ingress_gateway_endpoint"`
  47. IsRelayed bool `json:"is_relayed"`
  48. RelayedTo *net.UDPAddr `json:"relayed_to"`
  49. Proxy bool `json:"proxy"`
  50. }
  51. const (
  52. AddInterface ProxyAction = "ADD_INTERFACE"
  53. DeleteInterface ProxyAction = "DELETE_INTERFACE"
  54. )
  55. type ManagerAction struct {
  56. Action ProxyAction
  57. Payload ManagerPayload
  58. }
  59. func StartProxyManager(manageChan chan *ManagerAction) {
  60. for {
  61. select {
  62. case mI := <-manageChan:
  63. log.Printf("-------> PROXY-MANAGER: %+v\n", mI)
  64. switch mI.Action {
  65. case AddInterface:
  66. mI.SetIngressGateway()
  67. err := mI.AddInterfaceToProxy()
  68. if err != nil {
  69. log.Printf("failed to add interface: [%s] to proxy: %v\n ", mI.Payload.InterfaceName, err)
  70. }
  71. case DeleteInterface:
  72. mI.DeleteInterface()
  73. }
  74. }
  75. }
  76. }
  77. func (m *ManagerAction) DeleteInterface() {
  78. var err error
  79. if runtime.GOOS == "darwin" {
  80. m.Payload.InterfaceName, err = wg.GetRealIface(m.Payload.InterfaceName)
  81. if err != nil {
  82. log.Println("failed to get real iface: ", err)
  83. return
  84. }
  85. }
  86. if common.WgIfaceMap.Iface.Name == m.Payload.InterfaceName {
  87. cleanUpInterface()
  88. }
  89. }
  90. func (m *ManagerAction) RelayUpdate() {
  91. common.IsRelay = m.Payload.IsRelay
  92. }
  93. func (m *ManagerAction) SetIngressGateway() {
  94. common.IsIngressGateway = m.Payload.IsIngress
  95. }
  96. func (m *ManagerAction) RelayPeers() {
  97. common.IsRelay = true
  98. for relayedNodePubKey, relayedNodeConf := range m.Payload.RelayedPeerConf {
  99. relayedNodePubKeyHash := fmt.Sprintf("%x", md5.Sum([]byte(relayedNodePubKey)))
  100. if _, ok := common.RelayPeerMap[relayedNodePubKeyHash]; !ok {
  101. common.RelayPeerMap[relayedNodePubKeyHash] = make(map[string]models.RemotePeer)
  102. }
  103. for _, peer := range relayedNodeConf.Peers {
  104. if peer.Endpoint != nil {
  105. peer.Endpoint.Port = models.NmProxyPort
  106. remotePeerKeyHash := fmt.Sprintf("%x", md5.Sum([]byte(peer.PublicKey.String())))
  107. common.RelayPeerMap[relayedNodePubKeyHash][remotePeerKeyHash] = models.RemotePeer{
  108. Endpoint: peer.Endpoint,
  109. }
  110. }
  111. }
  112. relayedNodeConf.RelayedPeerEndpoint.Port = models.NmProxyPort
  113. common.RelayPeerMap[relayedNodePubKeyHash][relayedNodePubKeyHash] = models.RemotePeer{
  114. Endpoint: relayedNodeConf.RelayedPeerEndpoint,
  115. }
  116. }
  117. }
  118. func cleanUpInterface() {
  119. log.Println("########------------> CLEANING UP: ", common.WgIfaceMap.Iface.Name)
  120. for _, peerI := range common.WgIfaceMap.PeerMap {
  121. peerI.Mutex.Lock()
  122. peerI.StopConn()
  123. peerI.Mutex.Unlock()
  124. delete(common.WgIfaceMap.PeerMap, peerI.Key.String())
  125. }
  126. common.WgIfaceMap.PeerMap = make(map[string]*models.Conn)
  127. }
  128. func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
  129. var err error
  130. var wgIface *wg.WGIface
  131. if m.Payload.InterfaceName == "" {
  132. return nil, errors.New("interface cannot be empty")
  133. }
  134. if len(m.Payload.Peers) == 0 {
  135. return nil, errors.New("no peers to add")
  136. }
  137. if runtime.GOOS == "darwin" {
  138. m.Payload.InterfaceName, err = wg.GetRealIface(m.Payload.InterfaceName)
  139. if err != nil {
  140. log.Println("failed to get real iface: ", err)
  141. }
  142. }
  143. common.InterfaceName = m.Payload.InterfaceName
  144. wgIface, err = wg.NewWGIFace(m.Payload.InterfaceName, "127.0.0.1/32", wg.DefaultMTU)
  145. if err != nil {
  146. log.Println("Failed init new interface: ", err)
  147. return nil, err
  148. }
  149. if common.WgIfaceMap.Iface == nil {
  150. for i := len(m.Payload.Peers) - 1; i >= 0; i-- {
  151. if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy {
  152. log.Println("-----------> skipping peer, proxy is off: ", m.Payload.Peers[i].PublicKey)
  153. if err := wgIface.Update(m.Payload.Peers[i], false); err != nil {
  154. log.Println("falied to update peer: ", err)
  155. }
  156. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  157. continue
  158. }
  159. }
  160. common.WgIfaceMap.Iface = wgIface.Device
  161. common.WgIfaceMap.IfaceKeyHash = fmt.Sprintf("%x", md5.Sum([]byte(wgIface.Device.PublicKey.String())))
  162. return wgIface, nil
  163. }
  164. wgProxyConf := common.WgIfaceMap
  165. if m.Payload.IsRelay {
  166. m.RelayPeers()
  167. }
  168. common.IsRelay = m.Payload.IsRelay
  169. // check if node is getting relayed
  170. if common.IsRelayed != m.Payload.IsRelayed {
  171. common.IsRelayed = m.Payload.IsRelayed
  172. cleanUpInterface()
  173. return wgIface, nil
  174. }
  175. // sync map with wg device config
  176. // check if listen port has changed
  177. if wgIface.Device.ListenPort != wgProxyConf.Iface.ListenPort {
  178. // reset proxy for this interface
  179. cleanUpInterface()
  180. return wgIface, nil
  181. }
  182. // check device conf different from proxy
  183. wgProxyConf.Iface = wgIface.Device
  184. // sync peer map with new update
  185. for _, currPeerI := range wgProxyConf.Iface.Peers {
  186. if _, ok := m.Payload.PeerMap[currPeerI.PublicKey.String()]; !ok {
  187. if val, ok := wgProxyConf.PeerMap[currPeerI.PublicKey.String()]; ok {
  188. val.Mutex.Lock()
  189. if val.IsAttachedExtClient {
  190. log.Println("------> Deleting ExtClient Watch Thread: ", currPeerI.PublicKey.String())
  191. if val, ok := common.ExtClientsWaitTh[currPeerI.PublicKey.String()]; ok {
  192. val.CancelFunc()
  193. delete(common.ExtClientsWaitTh, currPeerI.PublicKey.String())
  194. }
  195. log.Println("-----> Deleting Ext Client from Src Ip Map: ", currPeerI.PublicKey.String())
  196. delete(common.ExtSourceIpMap, val.Config.PeerConf.Endpoint.String())
  197. }
  198. val.StopConn()
  199. val.Mutex.Unlock()
  200. delete(wgProxyConf.PeerMap, currPeerI.PublicKey.String())
  201. }
  202. // delete peer from interface
  203. log.Println("CurrPeer Not Found, Deleting Peer from Interface: ", currPeerI.PublicKey.String())
  204. if err := wgIface.RemovePeer(currPeerI.PublicKey.String()); err != nil {
  205. log.Println("failed to remove peer: ", currPeerI.PublicKey.String(), err)
  206. }
  207. delete(common.PeerKeyHashMap, fmt.Sprintf("%x", md5.Sum([]byte(currPeerI.PublicKey.String()))))
  208. }
  209. }
  210. for i := len(m.Payload.Peers) - 1; i >= 0; i-- {
  211. if currentPeer, ok := wgProxyConf.PeerMap[m.Payload.Peers[i].PublicKey.String()]; ok {
  212. currentPeer.Mutex.Lock()
  213. if currentPeer.IsAttachedExtClient {
  214. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  215. continue
  216. }
  217. // check if proxy is off for the peer
  218. if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy {
  219. // cleanup proxy connections for the peer
  220. currentPeer.StopConn()
  221. delete(wgProxyConf.PeerMap, currentPeer.Key.String())
  222. // update the peer with actual endpoint
  223. if err := wgIface.Update(m.Payload.Peers[i], false); err != nil {
  224. log.Println("falied to update peer: ", err)
  225. }
  226. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  227. continue
  228. }
  229. // check if peer is not connected to proxy
  230. devPeer, err := wg.GetPeer(m.Payload.InterfaceName, currentPeer.Key.String())
  231. if err == nil {
  232. log.Printf("---------> COMAPRING ENDPOINT: DEV: %s, Proxy: %s", devPeer.Endpoint.String(), currentPeer.Config.LocalConnAddr.String())
  233. if devPeer.Endpoint.String() != currentPeer.Config.LocalConnAddr.String() {
  234. log.Println("---------> endpoint is not set to proxy: ", currentPeer.Key)
  235. currentPeer.StopConn()
  236. delete(wgProxyConf.PeerMap, currentPeer.Key.String())
  237. continue
  238. }
  239. }
  240. //check if peer is being relayed
  241. if currentPeer.IsRelayed != m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].IsRelayed {
  242. log.Println("---------> peer relay status has been changed: ", currentPeer.Key)
  243. currentPeer.StopConn()
  244. delete(wgProxyConf.PeerMap, currentPeer.Key.String())
  245. continue
  246. }
  247. // check if relay endpoint has been changed
  248. if currentPeer.RelayedEndpoint != nil &&
  249. m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].RelayedTo != nil &&
  250. currentPeer.RelayedEndpoint.String() != m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].RelayedTo.String() {
  251. log.Println("---------> peer relay endpoint has been changed: ", currentPeer.Key)
  252. currentPeer.StopConn()
  253. delete(wgProxyConf.PeerMap, currentPeer.Key.String())
  254. continue
  255. }
  256. if !reflect.DeepEqual(m.Payload.Peers[i], *currentPeer.Config.PeerConf) {
  257. if currentPeer.Config.RemoteConnAddr.IP.String() != m.Payload.Peers[i].Endpoint.IP.String() {
  258. log.Println("----------> Resetting proxy for Peer: ", currentPeer.Key, m.Payload.InterfaceName)
  259. currentPeer.StopConn()
  260. currentPeer.Mutex.Unlock()
  261. delete(wgProxyConf.PeerMap, currentPeer.Key.String())
  262. continue
  263. } else {
  264. log.Println("----->##### Updating Peer on Interface: ", m.Payload.InterfaceName, currentPeer.Key)
  265. updatePeerConf := m.Payload.Peers[i]
  266. localUdpAddr, err := net.ResolveUDPAddr("udp", currentPeer.Config.LocalConnAddr.String())
  267. if err == nil {
  268. updatePeerConf.Endpoint = localUdpAddr
  269. }
  270. if err := wgIface.Update(updatePeerConf, true); err != nil {
  271. log.Println("failed to update peer: ", currentPeer.Key, err)
  272. }
  273. currentPeer.Config.PeerConf = &m.Payload.Peers[i]
  274. wgProxyConf.PeerMap[currentPeer.Key.String()] = currentPeer
  275. // delete the peer from the list
  276. log.Println("-----------> deleting peer from list: ", m.Payload.Peers[i].PublicKey)
  277. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  278. }
  279. } else {
  280. // delete the peer from the list
  281. log.Println("-----------> No updates observed so deleting peer: ", m.Payload.Peers[i].PublicKey)
  282. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  283. }
  284. currentPeer.Mutex.Unlock()
  285. } else if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy && !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].IsAttachedExtClient {
  286. log.Println("-----------> skipping peer, proxy is off: ", m.Payload.Peers[i].PublicKey)
  287. if err := wgIface.Update(m.Payload.Peers[i], false); err != nil {
  288. log.Println("falied to update peer: ", err)
  289. }
  290. m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)
  291. }
  292. }
  293. // sync dev peers with new update
  294. common.WgIfaceMap = wgProxyConf
  295. log.Println("CLEANED UP..........")
  296. return wgIface, nil
  297. }
  298. func (m *ManagerAction) AddInterfaceToProxy() error {
  299. var err error
  300. wgInterface, err := m.processPayload()
  301. if err != nil {
  302. return err
  303. }
  304. log.Printf("wg: %+v\n", wgInterface)
  305. for _, peerI := range m.Payload.Peers {
  306. peerConf := m.Payload.PeerMap[peerI.PublicKey.String()]
  307. if peerI.Endpoint == nil && !(peerConf.IsAttachedExtClient || peerConf.IsExtClient) {
  308. log.Println("Endpoint nil for peer: ", peerI.PublicKey.String())
  309. continue
  310. }
  311. if peerConf.IsExtClient && !peerConf.IsAttachedExtClient {
  312. peerI.Endpoint = peerConf.IngressGatewayEndPoint
  313. }
  314. var isRelayed bool
  315. var relayedTo *net.UDPAddr
  316. if m.Payload.IsRelayed {
  317. isRelayed = true
  318. relayedTo = m.Payload.RelayedTo
  319. } else {
  320. isRelayed = peerConf.IsRelayed
  321. relayedTo = peerConf.RelayedTo
  322. }
  323. if peerConf.IsAttachedExtClient {
  324. log.Println("Extclient Thread...")
  325. go func(wgInterface *wg.WGIface, peer *wgtypes.PeerConfig,
  326. isRelayed bool, relayTo *net.UDPAddr, peerConf PeerConf, ingGwAddr string) {
  327. addExtClient := false
  328. commChan := make(chan *net.UDPAddr, 100)
  329. ctx, cancel := context.WithCancel(context.Background())
  330. common.ExtClientsWaitTh[peerI.PublicKey.String()] = models.ExtClientPeer{
  331. CancelFunc: cancel,
  332. CommChan: commChan,
  333. }
  334. defer func() {
  335. if addExtClient {
  336. log.Println("GOT ENDPOINT for Extclient adding peer...")
  337. common.ExtSourceIpMap[peer.Endpoint.String()] = models.RemotePeer{
  338. Interface: wgInterface.Name,
  339. PeerKey: peer.PublicKey.String(),
  340. IsExtClient: peerConf.IsExtClient,
  341. IsAttachedExtClient: peerConf.IsAttachedExtClient,
  342. Endpoint: peer.Endpoint,
  343. }
  344. peerpkg.AddNewPeer(wgInterface, peer, peerConf.Address, isRelayed,
  345. peerConf.IsExtClient, peerConf.IsAttachedExtClient, relayedTo)
  346. }
  347. log.Println("Exiting extclient watch Thread for: ", peer.PublicKey.String())
  348. }()
  349. for {
  350. select {
  351. case <-ctx.Done():
  352. return
  353. case endpoint := <-commChan:
  354. if endpoint != nil {
  355. addExtClient = true
  356. peer.Endpoint = endpoint
  357. delete(common.ExtClientsWaitTh, peer.PublicKey.String())
  358. return
  359. }
  360. }
  361. }
  362. }(wgInterface, &peerI, isRelayed, relayedTo, peerConf, m.Payload.WgAddr)
  363. continue
  364. }
  365. peerpkg.AddNewPeer(wgInterface, &peerI, peerConf.Address, isRelayed,
  366. peerConf.IsExtClient, peerConf.IsAttachedExtClient, relayedTo)
  367. }
  368. return nil
  369. }