| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 | package managerimport (	"context"	"crypto/md5"	"errors"	"fmt"	"log"	"net"	"reflect"	"runtime"	"github.com/gravitl/netmaker/nm-proxy/common"	"github.com/gravitl/netmaker/nm-proxy/models"	peerpkg "github.com/gravitl/netmaker/nm-proxy/peer"	"github.com/gravitl/netmaker/nm-proxy/wg"	"golang.zx2c4.com/wireguard/wgctrl/wgtypes")/*TODO:-	1. ON Ingress node		--> for attached ext clients			-> start sniffer (will recieve pkts from ext clients (add ebf filter to listen on only ext traffic) if not intended to the interface forward it.)			-> start remote conn after endpoint is updated		-->*/var sent booltype ProxyAction stringtype ManagerPayload struct {	InterfaceName   string                 `json:"interface_name"`	WgAddr          string                 `json:"wg_addr"`	Peers           []wgtypes.PeerConfig   `json:"peers"`	PeerMap         map[string]PeerConf    `json:"peer_map"`	IsRelayed       bool                   `json:"is_relayed"`	IsIngress       bool                   `json:"is_ingress"`	RelayedTo       *net.UDPAddr           `json:"relayed_to"`	IsRelay         bool                   `json:"is_relay"`	RelayedPeerConf map[string]RelayedConf `json:"relayed_conf"`}type RelayedConf struct {	RelayedPeerEndpoint *net.UDPAddr         `json:"relayed_peer_endpoint"`	RelayedPeerPubKey   string               `json:"relayed_peer_pub_key"`	Peers               []wgtypes.PeerConfig `json:"relayed_peers"`}type PeerConf struct {	IsExtClient            bool         `json:"is_ext_client"`	Address                string       `json:"address"`	IsAttachedExtClient    bool         `json:"is_attached_ext_client"`	IngressGatewayEndPoint *net.UDPAddr `json:"ingress_gateway_endpoint"`	IsRelayed              bool         `json:"is_relayed"`	RelayedTo              *net.UDPAddr `json:"relayed_to"`	Proxy                  bool         `json:"proxy"`}const (	AddInterface    ProxyAction = "ADD_INTERFACE"	DeleteInterface ProxyAction = "DELETE_INTERFACE")type ManagerAction struct {	Action  ProxyAction	Payload ManagerPayload}func StartProxyManager(manageChan chan *ManagerAction) {	for {		select {		case mI := <-manageChan:			log.Printf("-------> PROXY-MANAGER: %+v\n", mI)			switch mI.Action {			case AddInterface:				mI.SetIngressGateway()				err := mI.AddInterfaceToProxy()				if err != nil {					log.Printf("failed to add interface: [%s] to proxy: %v\n  ", mI.Payload.InterfaceName, err)				}			case DeleteInterface:				mI.DeleteInterface()			}		}	}}func (m *ManagerAction) DeleteInterface() {	var err error	if runtime.GOOS == "darwin" {		m.Payload.InterfaceName, err = wg.GetRealIface(m.Payload.InterfaceName)		if err != nil {			log.Println("failed to get real iface: ", err)			return		}	}	if common.WgIfaceMap.Iface.Name == m.Payload.InterfaceName {		cleanUpInterface()	}}func (m *ManagerAction) RelayUpdate() {	common.IsRelay = m.Payload.IsRelay}func (m *ManagerAction) SetIngressGateway() {	common.IsIngressGateway = m.Payload.IsIngress}func (m *ManagerAction) RelayPeers() {	common.IsRelay = true	for relayedNodePubKey, relayedNodeConf := range m.Payload.RelayedPeerConf {		relayedNodePubKeyHash := fmt.Sprintf("%x", md5.Sum([]byte(relayedNodePubKey)))		if _, ok := common.RelayPeerMap[relayedNodePubKeyHash]; !ok {			common.RelayPeerMap[relayedNodePubKeyHash] = make(map[string]models.RemotePeer)		}		for _, peer := range relayedNodeConf.Peers {			if peer.Endpoint != nil {				peer.Endpoint.Port = models.NmProxyPort				remotePeerKeyHash := fmt.Sprintf("%x", md5.Sum([]byte(peer.PublicKey.String())))				common.RelayPeerMap[relayedNodePubKeyHash][remotePeerKeyHash] = models.RemotePeer{					Endpoint: peer.Endpoint,				}			}		}		relayedNodeConf.RelayedPeerEndpoint.Port = models.NmProxyPort		common.RelayPeerMap[relayedNodePubKeyHash][relayedNodePubKeyHash] = models.RemotePeer{			Endpoint: relayedNodeConf.RelayedPeerEndpoint,		}	}}func cleanUpInterface() {	log.Println("########------------>  CLEANING UP: ", common.WgIfaceMap.Iface.Name)	for _, peerI := range common.WgIfaceMap.PeerMap {		peerI.Mutex.Lock()		peerI.StopConn()		peerI.Mutex.Unlock()		delete(common.WgIfaceMap.PeerMap, peerI.Key.String())	}	common.WgIfaceMap.PeerMap = make(map[string]*models.Conn)}func (m *ManagerAction) processPayload() (*wg.WGIface, error) {	var err error	var wgIface *wg.WGIface	if m.Payload.InterfaceName == "" {		return nil, errors.New("interface cannot be empty")	}	if len(m.Payload.Peers) == 0 {		return nil, errors.New("no peers to add")	}	if runtime.GOOS == "darwin" {		m.Payload.InterfaceName, err = wg.GetRealIface(m.Payload.InterfaceName)		if err != nil {			log.Println("failed to get real iface: ", err)		}	}	common.InterfaceName = m.Payload.InterfaceName	wgIface, err = wg.NewWGIFace(m.Payload.InterfaceName, "127.0.0.1/32", wg.DefaultMTU)	if err != nil {		log.Println("Failed init new interface: ", err)		return nil, err	}	if common.WgIfaceMap.Iface == nil {		for i := len(m.Payload.Peers) - 1; i >= 0; i-- {			if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy {				log.Println("-----------> skipping peer, proxy is off: ", m.Payload.Peers[i].PublicKey)				if err := wgIface.Update(m.Payload.Peers[i], false); err != nil {					log.Println("falied to update peer: ", err)				}				m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)				continue			}		}		common.WgIfaceMap.Iface = wgIface.Device		common.WgIfaceMap.IfaceKeyHash = fmt.Sprintf("%x", md5.Sum([]byte(wgIface.Device.PublicKey.String())))		return wgIface, nil	}	wgProxyConf := common.WgIfaceMap	if m.Payload.IsRelay {		m.RelayPeers()	}	common.IsRelay = m.Payload.IsRelay	// check if node is getting relayed	if common.IsRelayed != m.Payload.IsRelayed {		common.IsRelayed = m.Payload.IsRelayed		cleanUpInterface()		return wgIface, nil	}	// sync map with wg device config	// check if listen port has changed	if wgIface.Device.ListenPort != wgProxyConf.Iface.ListenPort {		// reset proxy for this interface		cleanUpInterface()		return wgIface, nil	}	// check device conf different from proxy	wgProxyConf.Iface = wgIface.Device	// sync peer map with new update	for _, currPeerI := range wgProxyConf.Iface.Peers {		if _, ok := m.Payload.PeerMap[currPeerI.PublicKey.String()]; !ok {			if val, ok := wgProxyConf.PeerMap[currPeerI.PublicKey.String()]; ok {				val.Mutex.Lock()				if val.IsAttachedExtClient {					log.Println("------> Deleting ExtClient Watch Thread: ", currPeerI.PublicKey.String())					if val, ok := common.ExtClientsWaitTh[currPeerI.PublicKey.String()]; ok {						val.CancelFunc()						delete(common.ExtClientsWaitTh, currPeerI.PublicKey.String())					}					log.Println("-----> Deleting Ext Client from Src Ip Map: ", currPeerI.PublicKey.String())					delete(common.ExtSourceIpMap, val.Config.PeerConf.Endpoint.String())				}				val.StopConn()				val.Mutex.Unlock()				delete(wgProxyConf.PeerMap, currPeerI.PublicKey.String())			}			// delete peer from interface			log.Println("CurrPeer Not Found, Deleting Peer from Interface: ", currPeerI.PublicKey.String())			if err := wgIface.RemovePeer(currPeerI.PublicKey.String()); err != nil {				log.Println("failed to remove peer: ", currPeerI.PublicKey.String(), err)			}			delete(common.PeerKeyHashMap, fmt.Sprintf("%x", md5.Sum([]byte(currPeerI.PublicKey.String()))))		}	}	for i := len(m.Payload.Peers) - 1; i >= 0; i-- {		if currentPeer, ok := wgProxyConf.PeerMap[m.Payload.Peers[i].PublicKey.String()]; ok {			currentPeer.Mutex.Lock()			if currentPeer.IsAttachedExtClient {				m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)				continue			}			// check if proxy is off for the peer			if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy {				// cleanup proxy connections for the peer				currentPeer.StopConn()				delete(wgProxyConf.PeerMap, currentPeer.Key.String())				// update the peer with actual endpoint				if err := wgIface.Update(m.Payload.Peers[i], false); err != nil {					log.Println("falied to update peer: ", err)				}				m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)				continue			}			// check if peer is not connected to proxy			devPeer, err := wg.GetPeer(m.Payload.InterfaceName, currentPeer.Key.String())			if err == nil {				log.Printf("---------> COMAPRING ENDPOINT: DEV: %s, Proxy: %s", devPeer.Endpoint.String(), currentPeer.Config.LocalConnAddr.String())				if devPeer.Endpoint.String() != currentPeer.Config.LocalConnAddr.String() {					log.Println("---------> endpoint is not set to proxy: ", currentPeer.Key)					currentPeer.StopConn()					delete(wgProxyConf.PeerMap, currentPeer.Key.String())					continue				}			}			//check if peer is being relayed			if currentPeer.IsRelayed != m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].IsRelayed {				log.Println("---------> peer relay status has been changed: ", currentPeer.Key)				currentPeer.StopConn()				delete(wgProxyConf.PeerMap, currentPeer.Key.String())				continue			}			// check if relay endpoint has been changed			if currentPeer.RelayedEndpoint != nil &&				m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].RelayedTo != nil &&				currentPeer.RelayedEndpoint.String() != m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].RelayedTo.String() {				log.Println("---------> peer relay endpoint has been changed: ", currentPeer.Key)				currentPeer.StopConn()				delete(wgProxyConf.PeerMap, currentPeer.Key.String())				continue			}			if !reflect.DeepEqual(m.Payload.Peers[i], *currentPeer.Config.PeerConf) {				if currentPeer.Config.RemoteConnAddr.IP.String() != m.Payload.Peers[i].Endpoint.IP.String() {					log.Println("----------> Resetting proxy for Peer: ", currentPeer.Key, m.Payload.InterfaceName)					currentPeer.StopConn()					currentPeer.Mutex.Unlock()					delete(wgProxyConf.PeerMap, currentPeer.Key.String())					continue				} else {					log.Println("----->##### Updating Peer on Interface: ", m.Payload.InterfaceName, currentPeer.Key)					updatePeerConf := m.Payload.Peers[i]					localUdpAddr, err := net.ResolveUDPAddr("udp", currentPeer.Config.LocalConnAddr.String())					if err == nil {						updatePeerConf.Endpoint = localUdpAddr					}					if err := wgIface.Update(updatePeerConf, true); err != nil {						log.Println("failed to update peer: ", currentPeer.Key, err)					}					currentPeer.Config.PeerConf = &m.Payload.Peers[i]					wgProxyConf.PeerMap[currentPeer.Key.String()] = currentPeer					// delete the peer from the list					log.Println("-----------> deleting peer from list: ", m.Payload.Peers[i].PublicKey)					m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)				}			} else {				// delete the peer from the list				log.Println("-----------> No updates observed so deleting peer: ", m.Payload.Peers[i].PublicKey)				m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)			}			currentPeer.Mutex.Unlock()		} else if !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].Proxy && !m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].IsAttachedExtClient {			log.Println("-----------> skipping peer, proxy is off: ", m.Payload.Peers[i].PublicKey)			if err := wgIface.Update(m.Payload.Peers[i], false); err != nil {				log.Println("falied to update peer: ", err)			}			m.Payload.Peers = append(m.Payload.Peers[:i], m.Payload.Peers[i+1:]...)		}	}	// sync dev peers with new update	common.WgIfaceMap = wgProxyConf	log.Println("CLEANED UP..........")	return wgIface, nil}func (m *ManagerAction) AddInterfaceToProxy() error {	var err error	wgInterface, err := m.processPayload()	if err != nil {		return err	}	log.Printf("wg: %+v\n", wgInterface)	for _, peerI := range m.Payload.Peers {		peerConf := m.Payload.PeerMap[peerI.PublicKey.String()]		if peerI.Endpoint == nil && !(peerConf.IsAttachedExtClient || peerConf.IsExtClient) {			log.Println("Endpoint nil for peer: ", peerI.PublicKey.String())			continue		}		if peerConf.IsExtClient && !peerConf.IsAttachedExtClient {			peerI.Endpoint = peerConf.IngressGatewayEndPoint		}		var isRelayed bool		var relayedTo *net.UDPAddr		if m.Payload.IsRelayed {			isRelayed = true			relayedTo = m.Payload.RelayedTo		} else {			isRelayed = peerConf.IsRelayed			relayedTo = peerConf.RelayedTo		}		if peerConf.IsAttachedExtClient {			log.Println("Extclient Thread...")			go func(wgInterface *wg.WGIface, peer *wgtypes.PeerConfig,				isRelayed bool, relayTo *net.UDPAddr, peerConf PeerConf, ingGwAddr string) {				addExtClient := false				commChan := make(chan *net.UDPAddr, 100)				ctx, cancel := context.WithCancel(context.Background())				common.ExtClientsWaitTh[peerI.PublicKey.String()] = models.ExtClientPeer{					CancelFunc: cancel,					CommChan:   commChan,				}				defer func() {					if addExtClient {						log.Println("GOT ENDPOINT for Extclient adding peer...")						common.ExtSourceIpMap[peer.Endpoint.String()] = models.RemotePeer{							Interface:           wgInterface.Name,							PeerKey:             peer.PublicKey.String(),							IsExtClient:         peerConf.IsExtClient,							IsAttachedExtClient: peerConf.IsAttachedExtClient,							Endpoint:            peer.Endpoint,						}						peerpkg.AddNewPeer(wgInterface, peer, peerConf.Address, isRelayed,							peerConf.IsExtClient, peerConf.IsAttachedExtClient, relayedTo)					}					log.Println("Exiting extclient watch Thread for: ", peer.PublicKey.String())				}()				for {					select {					case <-ctx.Done():						return					case endpoint := <-commChan:						if endpoint != nil {							addExtClient = true							peer.Endpoint = endpoint							delete(common.ExtClientsWaitTh, peer.PublicKey.String())							return						}					}				}			}(wgInterface, &peerI, isRelayed, relayedTo, peerConf, m.Payload.WgAddr)			continue		}		peerpkg.AddNewPeer(wgInterface, &peerI, peerConf.Address, isRelayed,			peerConf.IsExtClient, peerConf.IsAttachedExtClient, relayedTo)	}	return nil}
 |