Răsfoiți Sursa

initial commit to sync relay update with singleton peerupdates

Abhishek Kondur 2 ani în urmă
părinte
comite
45ff5a2634
3 a modificat fișierele cu 94 adăugiri și 36 ștergeri
  1. 0 26
      logic/peers.go
  2. 62 10
      mq/publishers.go
  3. 32 0
      mq/relay.go

+ 0 - 26
logic/peers.go

@@ -762,29 +762,3 @@ func getIngressIPs(peer *models.Client) []net.IPNet {
 	}
 	return ingressIPs
 }
-
-// GetPeerUpdateForRelay - returns the peer update for a relay node
-func GetPeerUpdateForRelay(client *models.Client, peers []models.Client) []wgtypes.PeerConfig {
-	peerConfig := []wgtypes.PeerConfig{}
-	if !client.Node.IsRelay {
-		return []wgtypes.PeerConfig{}
-	}
-	for _, peer := range peers {
-		if peer.Host.ID == client.Host.ID {
-			continue
-		}
-		update := wgtypes.PeerConfig{
-			PublicKey:         peer.Host.PublicKey,
-			ReplaceAllowedIPs: true,
-			Remove:            false,
-			Endpoint: &net.UDPAddr{
-				IP:   peer.Host.EndpointIP,
-				Port: peer.Host.ListenPort,
-			},
-			PersistentKeepaliveInterval: &peer.Node.PersistentKeepalive,
-		}
-		update.AllowedIPs = append(update.AllowedIPs, AddAllowedIPs(&peer)...)
-		peerConfig = append(peerConfig, update)
-	}
-	return peerConfig
-}

+ 62 - 10
mq/publishers.go

@@ -64,8 +64,8 @@ func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node, networkNodes
 		Peers:  []wgtypes.PeerConfig{},
 	}
 	for _, node := range networkNodes {
-		if node.ID == hNode.ID {
-			// skip self
+		if node.ID == hNode.ID || node.IsRelayed {
+			// skip self or if relayed
 			continue
 		}
 		peerHost, err := logic.GetHost(node.HostID.String())
@@ -74,14 +74,13 @@ func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node, networkNodes
 		}
 
 		if !nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(hNode.ID.String()), nodeacls.NodeID(node.ID.String())) ||
-			hNode.Action == models.NODE_DELETE || hNode.PendingDelete || !hNode.Connected {
+			hNode.Action == models.NODE_DELETE || hNode.PendingDelete || !hNode.Connected || (hNode.IsRelayed && hNode.RelayedBy != node.ID.String()) {
 			// remove peer if not allowed
 			rmPeerAction.Peers = append(rmPeerAction.Peers, wgtypes.PeerConfig{
 				PublicKey: peerHost.PublicKey,
 				Remove:    true,
 			})
 			continue
-
 		}
 		peerCfg := wgtypes.PeerConfig{
 			PublicKey: peerHost.PublicKey,
@@ -95,12 +94,32 @@ func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node, networkNodes
 		}
 		addPeerAction.Peers = append(addPeerAction.Peers, peerCfg)
 	}
+	if hNode.IsRelayed {
+		// update the relay peer on this node
+		relayNode, err := logic.GetNodeByID(hNode.RelayedBy)
+		if err != nil {
+			return err
+		}
+		relayHost, err := logic.GetHost(relayNode.HostID.String())
+		if err != nil {
+			return err
+		}
+		relayedClient := &models.Client{
+			Host: *host,
+			Node: *hNode,
+		}
+		relayClient := &models.Client{
+			Host: *relayHost,
+			Node: relayNode,
+		}
+		relayPeerCfg := getRelayPeerCfgForRelayedNode(relayedClient, relayClient)
+		addPeerAction.Peers = append(addPeerAction.Peers, relayPeerCfg)
+	}
 	if hNode.IsIngressGateway {
 		extPeers, _, err := logic.GetExtPeers(hNode)
 		if err == nil {
 			addPeerAction.Peers = append(addPeerAction.Peers, extPeers...)
 		}
-
 	}
 	if len(rmPeerAction.Peers) > 0 {
 		data, err := json.Marshal(rmPeerAction)
@@ -220,12 +239,47 @@ func BroadcastAddOrUpdatePeer(host *models.Host, node *models.Node, update bool)
 			p.Action = models.RemovePeer
 			p.Peers[0].Remove = true
 		}
-		data, err := json.Marshal(p)
+		peerHost, err := logic.GetHost(nodeI.HostID.String())
 		if err != nil {
 			continue
 		}
-		peerHost, err := logic.GetHost(nodeI.HostID.String())
-		if err == nil {
+		if nodeI.IsRelayed {
+			r := models.PeerAction{
+				Action: models.AddPeer,
+			}
+			// update the relay peer on this node
+			relayNode, err := logic.GetNodeByID(nodeI.RelayedBy)
+			if err != nil {
+				continue
+			}
+			relayHost, err := logic.GetHost(relayNode.HostID.String())
+			if err != nil {
+				continue
+			}
+			relayedClient := &models.Client{
+				Host: *peerHost,
+				Node: nodeI,
+			}
+			relayClient := &models.Client{
+				Host: *relayHost,
+				Node: relayNode,
+			}
+			rPeerCfg := getRelayPeerCfgForRelayedNode(relayedClient, relayClient)
+			if update {
+				r.Action = models.UpdatePeer
+			}
+			r.Peers = append(r.Peers, rPeerCfg)
+			data, err := json.Marshal(r)
+			if err != nil {
+				continue
+			}
+			publish(peerHost, fmt.Sprintf("peer/host/%s/%s", peerHost.ID.String(), servercfg.GetServer()), data)
+
+		} else {
+			data, err := json.Marshal(p)
+			if err != nil {
+				continue
+			}
 			publish(peerHost, fmt.Sprintf("peer/host/%s/%s", peerHost.ID.String(), servercfg.GetServer()), data)
 		}
 		if nodeI.IsIngressGateway || nodeI.IsEgressGateway {
@@ -235,7 +289,6 @@ func BroadcastAddOrUpdatePeer(host *models.Host, node *models.Node, update bool)
 					PublishFwUpdate(&peerHost, &f)
 				}
 			}(*peerHost)
-
 		}
 
 	}
@@ -258,7 +311,6 @@ func BroadcastExtClient(ingressHost *models.Host, ingressNode *models.Node) erro
 
 // BroadcastDelExtClient - published msg to remove ext client from network
 func BroadcastDelExtClient(ingressHost *models.Host, ingressNode *models.Node, extclients []models.ExtClient) error {
-	// TODO - send fw update
 	go BroadcastAddOrUpdatePeer(ingressHost, ingressNode, true)
 	peers := []wgtypes.PeerConfig{}
 	for _, extclient := range extclients {

+ 32 - 0
mq/relay.go

@@ -146,6 +146,38 @@ func getIngressIPs(peer models.Client) []net.IPNet {
 	return ingressIPs
 }
 
+func getRelayPeerCfgForRelayedNode(relayed, relay *models.Client) (relayPeer wgtypes.PeerConfig) {
+	relayPeer = wgtypes.PeerConfig{
+		PublicKey: relay.Host.PublicKey,
+		Endpoint: &net.UDPAddr{
+			IP:   relay.Host.EndpointIP,
+			Port: logic.GetPeerListenPort(&relay.Host),
+		},
+		PersistentKeepaliveInterval: &relay.Node.PersistentKeepalive,
+	}
+	peers, err := logic.GetNetworkClients(relay.Node.Network)
+	if err == nil {
+		if relay.Node.Address.IP != nil {
+			relay.Node.Address.Mask = net.CIDRMask(32, 32)
+			relayPeer.AllowedIPs = append(relayPeer.AllowedIPs, relay.Node.Address)
+		}
+		if relay.Node.Address6.IP != nil {
+			relay.Node.Address6.Mask = net.CIDRMask(128, 128)
+			relayPeer.AllowedIPs = append(relayPeer.AllowedIPs, relay.Node.Address6)
+		}
+		// add all other peers to allowed ips
+		for _, peer := range peers {
+			if peer.Host.ID == relay.Host.ID || peer.Host.ID == relayed.Host.ID {
+				continue
+			}
+			if nodeacls.AreNodesAllowed(nodeacls.NetworkID(relayed.Node.Network), nodeacls.NodeID(relayed.Node.ID.String()), nodeacls.NodeID(peer.Node.ID.String())) {
+				relayPeer.AllowedIPs = append(relayPeer.AllowedIPs, logic.AddAllowedIPs(&peer)...)
+			}
+		}
+	}
+	return
+}
+
 // pubRelayedUpdate - publish peer update to a node (client) that is relayed by the relay
 func pubRelayedUpdate(client, relay *models.Client, peers []models.Client) {
 	//verify