Переглянути джерело

fix consolidated host peer update to work with relays

Abhishek Kondur 2 роки тому
батько
коміт
966a5a8428
4 змінених файлів з 42 додано та 201 видалено
  1. 7 8
      controllers/hosts.go
  2. 21 94
      logic/peers.go
  3. 2 87
      logic/relay.go
  4. 12 12
      mq/publishers.go

+ 7 - 8
controllers/hosts.go

@@ -110,13 +110,12 @@ func pull(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
-	peers := logic.GetPeerUpdate(host)
-	// hPU, err := logic.GetPeerUpdateForHost(host)
-	// if err != nil {
-	// 	logger.Log(0, "could not pull peers for host", hostID)
-	// 	logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
-	// 	return
-	// }
+	hPU, err := logic.GetPeerUpdateForHost(host)
+	if err != nil {
+		logger.Log(0, "could not pull peers for host", hostID)
+		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
+		return
+	}
 	serverConf := servercfg.GetServerInfo()
 	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 		serverConf.MQUserName = hostID
@@ -133,7 +132,7 @@ func pull(w http.ResponseWriter, r *http.Request) {
 		Host:         *host,
 		Nodes:        logic.GetHostNodes(host),
 		ServerConfig: serverConf,
-		Peers:        peers,
+		Peers:        hPU.Peers,
 		FwUpdate:     fw,
 	}
 

+ 21 - 94
logic/peers.go

@@ -148,16 +148,23 @@ func GetPeerUpdateForHost(host *models.Host) (models.HostPeerUpdate, error) {
 				// skip relayed peers; will be included in relay peer
 				continue
 			}
-			var peerConfig wgtypes.PeerConfig
 			peerHost, err := GetHost(peer.HostID.String())
 			if err != nil {
 				logger.Log(1, "no peer host", peer.HostID.String(), err.Error())
 				return models.HostPeerUpdate{}, err
 			}
-
-			peerConfig.PublicKey = peerHost.PublicKey
-			peerConfig.PersistentKeepaliveInterval = &peer.PersistentKeepalive
-			peerConfig.ReplaceAllowedIPs = true
+			peerConfig := wgtypes.PeerConfig{
+				PublicKey:                   peerHost.PublicKey,
+				PersistentKeepaliveInterval: &peer.PersistentKeepalive,
+				ReplaceAllowedIPs:           true,
+			}
+			if (node.IsRelayed && node.RelayedBy != peer.ID.String()) || shouldRemovePeer(node, peer) {
+				// if node is relayed and peer is not the relay, set remove to true
+				peerConfig.Remove = true
+				hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
+				peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
+				continue
+			}
 			uselocal := false
 			if host.EndpointIP.String() == peerHost.EndpointIP.String() {
 				// peer is on same network
@@ -181,30 +188,6 @@ func GetPeerUpdateForHost(host *models.Host) (models.HostPeerUpdate, error) {
 				peerConfig.Endpoint.Port = peerHost.ListenPort
 			}
 			allowedips := GetAllowedIPs(&models.Client{Host: *peerHost, Node: peer})
-			if peer.IsIngressGateway {
-				for _, entry := range peer.IngressGatewayRange {
-					_, cidr, err := net.ParseCIDR(string(entry))
-					if err == nil {
-						allowedips = append(allowedips, *cidr)
-					}
-				}
-			}
-			if peer.IsEgressGateway {
-				host, err := GetHost(peer.HostID.String())
-				if err == nil {
-					allowedips = append(allowedips, getEgressIPs(
-						&models.Client{
-							Host: *host,
-							Node: peer,
-						})...)
-				}
-			}
-			if peer.Action != models.NODE_DELETE &&
-				!peer.PendingDelete &&
-				peer.Connected &&
-				nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) {
-				peerConfig.AllowedIPs = allowedips // only append allowed IPs if valid connection
-			}
 			if _, ok := peerIndexMap[peerHost.PublicKey.String()]; !ok {
 				hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig)
 				peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1
@@ -214,10 +197,8 @@ func GetPeerUpdateForHost(host *models.Host) (models.HostPeerUpdate, error) {
 			} else {
 				peerAllowedIPs := hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs
 				peerAllowedIPs = append(peerAllowedIPs, allowedips...)
+				hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].Remove = false
 				hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs = peerAllowedIPs
-				hostPeerUpdate.HostNetworkInfo[peerHost.PublicKey.String()] = models.HostNetworkInfo{
-					Interfaces: peerHost.Interfaces,
-				}
 			}
 
 		}
@@ -235,6 +216,14 @@ func GetPeerUpdateForHost(host *models.Host) (models.HostPeerUpdate, error) {
 	return hostPeerUpdate, nil
 }
 
+func shouldRemovePeer(node, peer models.Node) (remove bool) {
+	if peer.Action == models.NODE_DELETE || peer.PendingDelete || !peer.Connected ||
+		!nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) {
+		remove = true
+	}
+	return
+}
+
 // GetFwUpdate - fetches the firewall update for the gateway nodes on the host
 func GetFwUpdate(host *models.Host) (models.FwUpdate, error) {
 	fwUpdate := models.FwUpdate{
@@ -557,68 +546,6 @@ func filterNodeMapForClientACLs(publicKey, network string, nodePeerMap map[strin
 	return nodePeerMap
 }
 
-func GetPeerUpdate(host *models.Host) []wgtypes.PeerConfig {
-	peerUpdate := []wgtypes.PeerConfig{}
-	for _, nodeStr := range host.Nodes {
-		node, err := GetNodeByID(nodeStr)
-		if err != nil {
-			continue
-		}
-		client := models.Client{Host: *host, Node: node}
-		peers, err := GetNetworkClients(node.Network)
-		if err != nil {
-			continue
-		}
-		if node.IsRelayed {
-			peerUpdate = append(peerUpdate, peerUpdateForRelayed(&client, peers)...)
-			continue
-		}
-		if node.IsRelay {
-			peerUpdate = append(peerUpdate, peerUpdateForRelay(&client, peers)...)
-			continue
-		}
-		for _, peer := range peers {
-			if peer.Host.ID == client.Host.ID {
-				continue
-			}
-			// if peer is relayed by some other node, remove it from the peer list,  it
-			// will be added to allowedips of relay peer
-			if peer.Node.IsRelayed {
-				update := wgtypes.PeerConfig{
-					PublicKey: peer.Host.PublicKey,
-					Remove:    true,
-				}
-				peerUpdate = append(peerUpdate, update)
-				continue
-			}
-			update := wgtypes.PeerConfig{
-				PublicKey:         peer.Host.PublicKey,
-				ReplaceAllowedIPs: true,
-				Endpoint: &net.UDPAddr{
-					IP:   peer.Host.EndpointIP,
-					Port: peer.Host.ListenPort,
-				},
-				PersistentKeepaliveInterval: &peer.Node.PersistentKeepalive,
-			}
-			// if peer is a relay that relays us, don't do anything
-			if peer.Node.IsRelay && client.Node.RelayedBy == peer.Node.ID.String() {
-				continue
-			} else {
-				update.AllowedIPs = append(update.AllowedIPs, getRelayAllowedIPs(&peer)...)
-			}
-			//normal peer
-			if nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.Node.ID.String())) {
-				update.AllowedIPs = append(update.AllowedIPs, GetAllowedIPs(&peer)...)
-				peerUpdate = append(peerUpdate, update)
-			} else {
-				update.Remove = true
-				peerUpdate = append(peerUpdate, update)
-			}
-		}
-	}
-	return peerUpdate
-}
-
 func AddHostAllowedIPs(h *models.Host) []net.IPNet {
 	allowedIPs := []net.IPNet{}
 	for _, hNodeID := range h.Nodes {

+ 2 - 87
logic/relay.go

@@ -140,64 +140,8 @@ func DeleteRelay(network, nodeid string) ([]models.Client, models.Node, error) {
 	return returnClients, node, nil
 }
 
-func getRelayedAddresses(id string) []net.IPNet {
-	addrs := []net.IPNet{}
-	node, err := GetNodeByID(id)
-	if err != nil {
-		logger.Log(0, "getRelayedAddresses: "+err.Error())
-		return addrs
-	}
-	if node.Address.IP != nil {
-		node.Address.Mask = net.CIDRMask(32, 32)
-		addrs = append(addrs, node.Address)
-	}
-	if node.Address6.IP != nil {
-		node.Address6.Mask = net.CIDRMask(128, 128)
-		addrs = append(addrs, node.Address6)
-	}
-	return addrs
-}
-
-// peerUpdateForRelayed - returns the peerConfig for a relayed node
-func peerUpdateForRelayed(client *models.Client, peers []models.Client) []wgtypes.PeerConfig {
-	peerConfig := []wgtypes.PeerConfig{}
-	if !client.Node.IsRelayed {
-		logger.Log(0, "GetPeerUpdateForRelayed called for non-relayed node ", client.Host.Name)
-		return []wgtypes.PeerConfig{}
-	}
-	relayNode, err := GetNodeByID(client.Node.RelayedBy)
-	if err != nil {
-		logger.Log(0, "error retrieving relay node", err.Error())
-		return []wgtypes.PeerConfig{}
-	}
-	host, err := GetHost(relayNode.HostID.String())
-	if err != nil {
-		return []wgtypes.PeerConfig{}
-	}
-	relay := models.Client{
-		Host: *host,
-		Node: relayNode,
-	}
-	for _, peer := range peers {
-		if peer.Host.ID == client.Host.ID {
-			continue
-		}
-		if peer.Host.ID == relay.Host.ID { // add relay as a peer
-			update := PeerUpdateForRelayedByRelay(client, &relay)
-			peerConfig = append(peerConfig, update)
-			continue
-		}
-		update := wgtypes.PeerConfig{
-			PublicKey: peer.Host.PublicKey,
-			Remove:    true,
-		}
-		peerConfig = append(peerConfig, update)
-	}
-	return peerConfig
-}
-
-// PeerUpdateForRelayedByRelay - returns the peerConfig for a node relayed by relay
-func PeerUpdateForRelayedByRelay(relayed, relay *models.Client) wgtypes.PeerConfig {
+// GetPeerConfForRelayed - returns the peerConfig for a node relayed by relay
+func GetPeerConfForRelayed(relayed, relay *models.Client) wgtypes.PeerConfig {
 	if relayed.Node.RelayedBy != relay.Node.ID.String() {
 		logger.Log(0, "peerUpdateForRelayedByRelay called with invalid parameters")
 		return wgtypes.PeerConfig{}
@@ -240,32 +184,3 @@ func PeerUpdateForRelayedByRelay(relayed, relay *models.Client) wgtypes.PeerConf
 	}
 	return update
 }
-
-// peerUpdateForRelay - returns the peerConfig for a relay
-func peerUpdateForRelay(relay *models.Client, peers []models.Client) []wgtypes.PeerConfig {
-	peerConfig := []wgtypes.PeerConfig{}
-	if !relay.Node.IsRelay {
-		logger.Log(0, "GetPeerUpdateForRelay called for non-relay node ", relay.Host.Name)
-		return []wgtypes.PeerConfig{}
-	}
-	for _, peer := range peers {
-		if peer.Host.ID == relay.Host.ID {
-			continue
-		}
-		update := wgtypes.PeerConfig{
-			PublicKey:         peer.Host.PublicKey,
-			ReplaceAllowedIPs: true,
-			Remove:            false,
-			Endpoint: &net.UDPAddr{
-				IP:   peer.Host.EndpointIP,
-				Port: peer.Host.ListenPort,
-			},
-			PersistentKeepaliveInterval: &peer.Node.PersistentKeepalive,
-		}
-		if nodeacls.AreNodesAllowed(nodeacls.NetworkID(relay.Node.Network), nodeacls.NodeID(relay.Node.ID.String()), nodeacls.NodeID(peer.Node.ID.String())) {
-			update.AllowedIPs = append(update.AllowedIPs, GetAllowedIPs(&peer)...)
-			peerConfig = append(peerConfig, update)
-		}
-	}
-	return peerConfig
-}

+ 12 - 12
mq/publishers.go

@@ -94,7 +94,7 @@ func FlushNetworkPeersToHost(client *models.Client, networkClients []models.Clie
 			Host: *relayHost,
 			Node: relayNode,
 		}
-		relayPeerCfg := logic.PeerUpdateForRelayedByRelay(relayedClient, relayClient)
+		relayPeerCfg := logic.GetPeerConfForRelayed(relayedClient, relayClient)
 		addPeerAction.Peers = append(addPeerAction.Peers, relayPeerCfg)
 	}
 	if client.Node.IsIngressGateway {
@@ -213,7 +213,7 @@ func BroadcastHostUpdate(host *models.Host, remove bool) error {
 
 // BroadcastAddOrUpdateNetworkPeer - notifys the hosts in the network to add or update peer.
 func BroadcastAddOrUpdateNetworkPeer(client *models.Client, update bool) error {
-	nodes, err := logic.GetNetworkNodes(client.Node.Network)
+	clients, err := logic.GetNetworkClients(client.Node.Network)
 	if err != nil {
 		return err
 	}
@@ -235,30 +235,30 @@ func BroadcastAddOrUpdateNetworkPeer(client *models.Client, update bool) error {
 	if update {
 		p.Action = models.UpdatePeer
 	}
-	for _, nodeI := range nodes {
-		nodeI := nodeI
-		if nodeI.ID.String() == client.Node.ID.String() {
+	for _, clientI := range clients {
+		clientI := clientI
+		if clientI.Node.ID.String() == client.Node.ID.String() {
 			// skip self...
 			continue
 		}
 		// update allowed ips, according to the peer node
 		p.Peers[0].AllowedIPs = logic.GetAllowedIPs(&models.Client{Host: client.Host, Node: client.Node})
-		if update && (!nodeacls.AreNodesAllowed(nodeacls.NetworkID(client.Node.Network), nodeacls.NodeID(client.Node.ID.String()), nodeacls.NodeID(nodeI.ID.String())) ||
+		if update && (!nodeacls.AreNodesAllowed(nodeacls.NetworkID(client.Node.Network), nodeacls.NodeID(client.Node.ID.String()), nodeacls.NodeID(clientI.Node.ID.String())) ||
 			client.Node.Action == models.NODE_DELETE || client.Node.PendingDelete || !client.Node.Connected) {
 			// remove peer
 			p.Action = models.RemovePeer
 			p.Peers[0].Remove = true
 		}
-		peerHost, err := logic.GetHost(nodeI.HostID.String())
+		peerHost, err := logic.GetHost(clientI.Host.ID.String())
 		if err != nil {
 			continue
 		}
-		if nodeI.IsRelayed {
+		if clientI.Node.IsRelayed {
 			r := models.PeerAction{
 				Action: models.AddPeer,
 			}
 			// update the relay peer on this node
-			relayNode, err := logic.GetNodeByID(nodeI.RelayedBy)
+			relayNode, err := logic.GetNodeByID(clientI.Node.RelayedBy)
 			if err != nil {
 				continue
 			}
@@ -268,13 +268,13 @@ func BroadcastAddOrUpdateNetworkPeer(client *models.Client, update bool) error {
 			}
 			relayedClient := &models.Client{
 				Host: *peerHost,
-				Node: nodeI,
+				Node: clientI.Node,
 			}
 			relayClient := &models.Client{
 				Host: *relayHost,
 				Node: relayNode,
 			}
-			rPeerCfg := logic.PeerUpdateForRelayedByRelay(relayedClient, relayClient)
+			rPeerCfg := logic.GetPeerConfForRelayed(relayedClient, relayClient)
 			if update {
 				r.Action = models.UpdatePeer
 			}
@@ -292,7 +292,7 @@ func BroadcastAddOrUpdateNetworkPeer(client *models.Client, update bool) error {
 			}
 			publish(peerHost, fmt.Sprintf("peer/host/%s/%s", peerHost.ID.String(), servercfg.GetServer()), data)
 		}
-		if nodeI.IsIngressGateway || nodeI.IsEgressGateway {
+		if clientI.Node.IsIngressGateway || clientI.Node.IsEgressGateway {
 			go func(peerHost models.Host) {
 				f, err := logic.GetFwUpdate(&peerHost)
 				if err == nil {