Browse Source

fix relay updates for proxy

Abhishek Kondur 2 years ago
parent
commit
c54080e150

+ 0 - 1
controllers/node.go

@@ -1121,7 +1121,6 @@ func runUpdates(node *models.Node, ifaceDelta bool) {
 
 // updates local peers for a server on a given node's network
 func runServerUpdate(node *models.Node, ifaceDelta bool) error {
-
 	if servercfg.IsClientMode() != "on" || !isServer(node) {
 		return nil
 	}

+ 9 - 5
logic/peers.go

@@ -31,12 +31,16 @@ func GetPeersForProxy(node *models.Node, onlyPeers bool) (manager.ManagerPayload
 	if !onlyPeers {
 		if node.IsRelayed == "yes" {
 			relayNode := FindRelay(node)
-			relayEndpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayNode.Endpoint, relayNode.LocalListenPort))
-			if err != nil {
-				logger.Log(1, "failed to resolve relay node endpoint: ", err.Error())
+			if relayNode != nil {
+				relayEndpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayNode.Endpoint, relayNode.LocalListenPort))
+				if err != nil {
+					logger.Log(1, "failed to resolve relay node endpoint: ", err.Error())
+				}
+				proxyPayload.IsRelayed = true
+				proxyPayload.RelayedTo = relayEndpoint
+			} else {
+				logger.Log(0, "couldn't find relay node for:  ", node.ID, node.PublicKey)
 			}
-			proxyPayload.IsRelayed = true
-			proxyPayload.RelayedTo = relayEndpoint
 
 		}
 		if node.IsRelay == "yes" {

+ 12 - 0
mq/dynsec_helper.go

@@ -182,6 +182,12 @@ func FetchNetworkAcls(network string) []Acl {
 			Priority: -1,
 			Allow:    true,
 		},
+		{
+			AclType:  "publishClientReceive",
+			Topic:    fmt.Sprintf("proxy/%s/#", network),
+			Priority: -1,
+			Allow:    true,
+		},
 		{
 			AclType:  "subscribePattern",
 			Topic:    "#",
@@ -206,6 +212,12 @@ func fetchServerAcls() []Acl {
 			Priority: -1,
 			Allow:    true,
 		},
+		{
+			AclType:  "publishClientSend",
+			Topic:    "proxy/#",
+			Priority: -1,
+			Allow:    true,
+		},
 		{
 			AclType:  "publishClientSend",
 			Topic:    "update/#",

+ 6 - 1
mq/publishers.go

@@ -31,6 +31,10 @@ func PublishPeerUpdate(newNode *models.Node, publishToSelf bool) error {
 		// 	logger.Log(1, "failed to publish proxy update to node", node.Name, "on network", node.Network, ":", err.Error())
 		// }
 		if node.IsServer == "yes" {
+			err := PublishProxyUpdate(manager.AddInterface, &node)
+			if err != nil {
+				logger.Log(0, "failed to send proxy update for server: ", err.Error())
+			}
 			continue
 		}
 		if !publishToSelf && newNode.ID == node.ID {
@@ -114,6 +118,7 @@ func PublishExtPeerUpdate(node *models.Node) error {
 // NodeUpdate -- publishes a node update
 func NodeUpdate(node *models.Node) error {
 	if !servercfg.IsMessageQueueBackend() || node.IsServer == "yes" {
+
 		return nil
 	}
 	logger.Log(3, "publishing node update to "+node.Name)
@@ -154,7 +159,7 @@ func ProxyUpdate(proxyPayload *manager.ManagerAction, node *models.Node) error {
 		return err
 	}
 	if err = publish(node, fmt.Sprintf("proxy/%s/%s", node.Network, node.ID), data); err != nil {
-		logger.Log(2, "error publishing node update to peer ", node.ID, err.Error())
+		logger.Log(2, "error publishing proxy update to peer ", node.ID, err.Error())
 		return err
 	}
 	return nil

+ 7 - 0
netclient/functions/common.go

@@ -22,6 +22,7 @@ import (
 	"github.com/gravitl/netmaker/netclient/local"
 	"github.com/gravitl/netmaker/netclient/ncutils"
 	"github.com/gravitl/netmaker/netclient/wireguard"
+	"github.com/gravitl/netmaker/nm-proxy/manager"
 	"golang.zx2c4.com/wireguard/wgctrl"
 )
 
@@ -192,6 +193,12 @@ func LeaveNetwork(network string) error {
 	if err := removeHostDNS(cfg.Node.Interface, ncutils.IsWindows()); err != nil {
 		logger.Log(0, "failed to delete dns entries for", cfg.Node.Interface, err.Error())
 	}
+	ProxyMgmChan <- &manager.ManagerAction{
+		Action: manager.DeleteInterface,
+		Payload: manager.ManagerPayload{
+			InterfaceName: cfg.Node.Interface,
+		},
+	}
 	logger.Log(2, "restarting daemon")
 	return daemon.Restart()
 }

+ 2 - 2
netclient/functions/daemon.go

@@ -235,9 +235,9 @@ func setSubscriptions(client mqtt.Client, nodeCfg *config.ClientConfig) {
 	}
 	if token := client.Subscribe(fmt.Sprintf("proxy/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID), 0, mqtt.MessageHandler(ProxyUpdate)); token.WaitTimeout(mq.MQ_TIMEOUT*time.Second) && token.Error() != nil {
 		if token.Error() == nil {
-			logger.Log(0, "network:", nodeCfg.Node.Network, "connection timeout")
+			logger.Log(0, "###### network:", nodeCfg.Node.Network, "connection timeout")
 		} else {
-			logger.Log(0, "network:", nodeCfg.Node.Network, token.Error().Error())
+			logger.Log(0, "###### network:", nodeCfg.Node.Network, token.Error().Error())
 		}
 		return
 	}

+ 4 - 3
netclient/functions/mqhandlers.go

@@ -36,22 +36,23 @@ func ProxyUpdate(client mqtt.Client, msg mqtt.Message) {
 	var network = parseNetworkFromTopic(msg.Topic())
 	nodeCfg.Network = network
 	nodeCfg.ReadConfig()
-
+	logger.Log(0, "---------> Recieved a proxy update")
 	data, dataErr := decryptMsg(&nodeCfg, msg.Payload())
 	if dataErr != nil {
 		return
 	}
 	err := json.Unmarshal([]byte(data), &proxyUpdate)
 	if err != nil {
-		logger.Log(0, "error unmarshalling node update data"+err.Error())
+		logger.Log(0, "error unmarshalling proxy update data"+err.Error())
 		return
 	}
-	logger.Log(0, "---------> recieved a proxy update")
+
 	ProxyMgmChan <- &proxyUpdate
 }
 
 // NodeUpdate -- mqtt message handler for /update/<NodeID> topic
 func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
+	logger.Log(0, "----------> RECIEVED NODE UPDDATEEEEE")
 	return
 	var newNode models.Node
 	var nodeCfg config.ClientConfig

+ 3 - 0
nm-proxy/common/common.go

@@ -14,6 +14,7 @@ import (
 var IsHostNetwork bool
 var IsRelay bool
 var IsIngressGateway bool
+var IsRelayed bool
 
 const (
 	NmProxyPort = 51722
@@ -37,6 +38,8 @@ type ConnConfig struct {
 	RemoteWgPort        int
 	RemoteProxyPort     int
 	IsExtClient         bool
+	IsRelayed           bool
+	RelayedEndpoint     *net.UDPAddr
 	IsAttachedExtClient bool
 	IngressGateWay      *net.UDPAddr
 }

+ 57 - 26
nm-proxy/manager/manager.go

@@ -57,13 +57,8 @@ type PeerConf struct {
 }
 
 const (
-	AddInterface ProxyAction = "ADD_INTERFACE"
-	DeletePeer   ProxyAction = "DELETE_PEER"
-	UpdatePeer   ProxyAction = "UPDATE_PEER"
-	AddPeer      ProxyAction = "ADD_PEER"
-	RelayPeers   ProxyAction = "RELAY_PEERS"
-	RelayUpdate  ProxyAction = "RELAY_UPDATE"
-	RelayTo      ProxyAction = "RELAY_TO"
+	AddInterface    ProxyAction = "ADD_INTERFACE"
+	DeleteInterface ProxyAction = "DELETE_INTERFACE"
 )
 
 type ManagerAction struct {
@@ -80,29 +75,34 @@ func StartProxyManager(manageChan chan *ManagerAction) {
 			switch mI.Action {
 			case AddInterface:
 
-				common.IsRelay = mI.Payload.IsRelay
-				if mI.Payload.IsRelay {
-					mI.RelayPeers()
-				}
 				mI.ExtClients()
 				err := mI.AddInterfaceToProxy()
 				if err != nil {
 					log.Printf("failed to add interface: [%s] to proxy: %v\n  ", mI.Payload.InterfaceName, err)
 				}
-			case UpdatePeer:
-				//mI.UpdatePeerProxy()
-			case DeletePeer:
-
-			case RelayPeers:
-				mI.RelayPeers()
-			case RelayUpdate:
-				mI.RelayUpdate()
+			case DeleteInterface:
+				mI.DeleteInterface()
 			}
 
 		}
 	}
 }
 
+func (m *ManagerAction) DeleteInterface() {
+	var err error
+	if runtime.GOOS == "darwin" {
+		m.Payload.InterfaceName, err = wg.GetRealIface(m.Payload.InterfaceName)
+		if err != nil {
+			log.Println("failed to get real iface: ", err)
+			return
+		}
+	}
+	if wgProxyConf, ok := common.WgIFaceMap[m.Payload.InterfaceName]; ok {
+		cleanUpInterface(wgProxyConf)
+	}
+
+}
+
 func (m *ManagerAction) RelayUpdate() {
 	common.IsRelay = m.Payload.IsRelay
 }
@@ -137,6 +137,14 @@ func (m *ManagerAction) RelayPeers() {
 	}
 }
 
+func cleanUpInterface(ifaceConf common.WgIfaceConf) {
+	log.Println("########------------>  CLEANING UP: ", ifaceConf.Iface.Name)
+	for _, peerI := range ifaceConf.PeerMap {
+		peerI.Proxy.Cancel()
+	}
+	delete(common.WgIFaceMap, ifaceConf.Iface.Name)
+}
+
 func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
 	var err error
 	var wgIface *wg.WGIface
@@ -163,19 +171,26 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
 	if wgProxyConf, ok = common.WgIFaceMap[m.Payload.InterfaceName]; !ok {
 		return wgIface, nil
 	}
+	if m.Payload.IsRelay {
+		m.RelayPeers()
+	}
+	common.IsRelay = m.Payload.IsRelay
+	// check if node is getting relayed
+	if common.IsRelayed != m.Payload.IsRelayed {
+		common.IsRelayed = m.Payload.IsRelayed
+		cleanUpInterface(wgProxyConf)
+		return wgIface, nil
+	}
+
 	// sync map with wg device config
 	// check if listen port has changed
 	if wgIface.Device.ListenPort != wgProxyConf.Iface.ListenPort {
 		// reset proxy for this interface
-
-		log.Println("########------------>  CLEANING UP: ", m.Payload.InterfaceName)
-		for _, peerI := range wgProxyConf.PeerMap {
-			peerI.Proxy.Cancel()
-		}
-		delete(common.WgIFaceMap, m.Payload.InterfaceName)
+		cleanUpInterface(wgProxyConf)
 		return wgIface, nil
 	}
-	wgProxyConf.Iface = wgIface.Device
+	// check device conf different from proxy
+	//wgProxyConf.Iface = wgIface.Device
 	for i := len(m.Payload.Peers) - 1; i >= 0; i-- {
 		if currentPeer, ok := wgProxyConf.PeerMap[m.Payload.Peers[i].PublicKey.String()]; ok {
 			// check if peer is not connected to proxy
@@ -189,6 +204,22 @@ func (m *ManagerAction) processPayload() (*wg.WGIface, error) {
 					continue
 				}
 			}
+			//check if peer is being relayed
+			if currentPeer.Config.IsRelayed != m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].IsRelayed {
+				log.Println("---------> peer relay status has been changed: ", currentPeer.Config.Key)
+				currentPeer.Proxy.Cancel()
+				delete(wgProxyConf.PeerMap, currentPeer.Config.Key)
+				continue
+			}
+			// check if relay endpoint has been changed
+			if currentPeer.Config.RelayedEndpoint != nil &&
+				m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].RelayedTo != nil &&
+				currentPeer.Config.RelayedEndpoint.String() != m.Payload.PeerMap[m.Payload.Peers[i].PublicKey.String()].RelayedTo.String() {
+				log.Println("---------> peer relay endpoint has been changed: ", currentPeer.Config.Key)
+				currentPeer.Proxy.Cancel()
+				delete(wgProxyConf.PeerMap, currentPeer.Config.Key)
+				continue
+			}
 			if !reflect.DeepEqual(m.Payload.Peers[i], *currentPeer.Proxy.Config.PeerConf) {
 				if currentPeer.Proxy.RemoteConn.IP.String() != m.Payload.Peers[i].Endpoint.IP.String() {
 					log.Println("----------> Resetting proxy for Peer: ", currentPeer.Config.Key, m.Payload.InterfaceName)

+ 4 - 1
nm-proxy/peer/peer.go

@@ -41,7 +41,8 @@ func AddNewPeer(wgInterface *wg.WGIface, peer *wgtypes.PeerConfig, peerAddr stri
 		LocalKey:    wgInterface.Device.PublicKey.String(),
 		RemoteKey:   peer.PublicKey.String(),
 		WgInterface: wgInterface,
-		PeerConf:    peer,
+
+		PeerConf: peer,
 	}
 	p := proxy.NewProxy(c)
 	peerPort := common.NmProxyPort
@@ -81,6 +82,8 @@ func AddNewPeer(wgInterface *wg.WGIface, peer *wgtypes.PeerConfig, peerAddr stri
 		RemoteProxyIP:   net.ParseIP(peer.Endpoint.IP.String()),
 		RemoteWgPort:    peer.Endpoint.Port,
 		RemoteProxyPort: common.NmProxyPort,
+		IsRelayed:       isRelayed,
+		RelayedEndpoint: relayTo,
 	}
 
 	peerProxy := common.Proxy{