Selaa lähdekoodia

sync relay changes with broadcast updates

Abhishek Kondur 2 vuotta sitten
vanhempi
commit
d6f47586fa
11 muutettua tiedostoa jossa 222 lisäystä ja 222 poistoa
  1. 1 1
      auth/host_session.go
  2. 2 2
      controllers/ext_client.go
  3. 1 1
      controllers/hosts.go
  4. 4 4
      controllers/node.go
  5. 22 45
      ee/ee_controllers/relay.go
  6. 3 0
      logic/gateway.go
  7. 60 71
      logic/peers.go
  8. 12 3
      logic/relay.go
  9. 9 9
      mq/handlers.go
  10. 91 54
      mq/publishers.go
  11. 17 32
      mq/relay.go

+ 1 - 1
auth/host_session.go

@@ -238,7 +238,7 @@ func CheckNetRegAndHostUpdate(networks []string, h *models.Host) {
 				Node:   *newNode,
 			})
 			if servercfg.IsMessageQueueBackend() {
-				mq.BroadcastAddOrUpdatePeer(h, newNode, false)
+				mq.BroadcastAddOrUpdateNetworkPeer(&models.Client{Host: *h, Node: *newNode}, false)
 			}
 		}
 	}

+ 2 - 2
controllers/ext_client.go

@@ -502,7 +502,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 		if ingressNode, err := logic.GetNodeByID(newclient.IngressGatewayID); err == nil {
 			if ingressHost, err := logic.GetHost(ingressNode.HostID.String()); err == nil {
 				if replaceOldClient || !update.Enabled {
-					mq.BroadcastDelExtClient(ingressHost, &ingressNode, []models.ExtClient{currentClient})
+					mq.BroadcastDelExtClient(&models.Client{Host: *ingressHost, Node: ingressNode}, []models.ExtClient{currentClient})
 				}
 				if replaceOldClient || changedEnabled {
 					// broadcast update
@@ -588,7 +588,7 @@ func deleteExtClient(w http.ResponseWriter, r *http.Request) {
 	go func() {
 		ingressHost, err := logic.GetHost(ingressnode.HostID.String())
 		if err == nil {
-			mq.BroadcastDelExtClient(ingressHost, &ingressnode, []models.ExtClient{extclient})
+			mq.BroadcastDelExtClient(&models.Client{Host: *ingressHost, Node: ingressnode}, []models.ExtClient{extclient})
 			f, err := logic.GetFwUpdate(ingressHost)
 			if err == nil {
 				mq.PublishFwUpdate(ingressHost, &f)

+ 1 - 1
controllers/hosts.go

@@ -294,7 +294,7 @@ func addHostToNetwork(w http.ResponseWriter, r *http.Request) {
 			Action: models.RequestAck,
 			Host:   *currHost,
 		})
-		go mq.BroadcastAddOrUpdatePeer(currHost, newNode, false)
+		go mq.BroadcastAddOrUpdateNetworkPeer(&models.Client{Host: *currHost, Node: *newNode}, false)
 	}
 
 	logger.Log(2, r.Header.Get("user"), fmt.Sprintf("added host %s to network %s", currHost.Name, network))

+ 4 - 4
controllers/node.go

@@ -465,7 +465,7 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) {
 			logger.Log(0, "failed to get egress host: ", err.Error())
 			return
 		}
-		mq.BroadcastAddOrUpdatePeer(host, &node, true)
+		mq.BroadcastAddOrUpdateNetworkPeer(&models.Client{Host: *host, Node: node}, true)
 		f, err := logic.GetFwUpdate(host)
 		if err != nil {
 			logger.Log(0, "failed to get egreess host: ", err.Error())
@@ -514,7 +514,7 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) {
 			logger.Log(0, "failed to get egress host: ", err.Error())
 			return
 		}
-		mq.BroadcastAddOrUpdatePeer(host, &node, true)
+		mq.BroadcastAddOrUpdateNetworkPeer(&models.Client{Host: *host, Node: node}, true)
 		f, err := logic.GetFwUpdate(host)
 		if err != nil {
 			logger.Log(0, "failed to get egreess host: ", err.Error())
@@ -609,7 +609,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 	if len(removedClients) > 0 {
 		host, err := logic.GetHost(node.HostID.String())
 		if err == nil {
-			mq.BroadcastDelExtClient(host, &node, removedClients)
+			mq.BroadcastDelExtClient(&models.Client{Host: *host, Node: node}, removedClients)
 			f, err := logic.GetFwUpdate(host)
 			if err == nil {
 				mq.PublishFwUpdate(host, &f)
@@ -707,7 +707,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 	json.NewEncoder(w).Encode(apiNode)
 	runUpdates(newNode, ifaceDelta)
 	go func(aclUpdate bool, newNode *models.Node) {
-		mq.BroadcastAddOrUpdatePeer(host, newNode, true)
+		mq.BroadcastAddOrUpdateNetworkPeer(&models.Client{Host: *host, Node: *newNode}, true)
 		if err := mq.PublishReplaceDNS(&currentNode, newNode, host); err != nil {
 			logger.Log(1, "failed to publish dns update", err.Error())
 		}

+ 22 - 45
ee/ee_controllers/relay.go

@@ -50,31 +50,7 @@ func createRelay(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
-	relayHost, err := logic.GetHost(relayNode.HostID.String())
-	if err != nil {
-		logger.Log(0, r.Header.Get("user"),
-			fmt.Sprintf("failed to retrieve host for node [%s] on network [%s]: %v", relayRequest.NodeID, relayRequest.NetID, err))
-		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
-		return
-	}
-	relay := models.Client{
-		Host: *relayHost,
-		Node: relayNode,
-	}
-	peers, err := logic.GetNetworkClients(relay.Node.Network)
-	if err != nil {
-		logger.Log(0, "error getting network nodes: ", err.Error())
-		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
-		return
-	}
-	//mq.PubPeersforRelay(relay, peers)
-	//for _, relayed := range relayedClients {
-	//mq.PubPeersForRelayedNode(relayed, relay, peers)
-	//}
-	clients := peers
-	for _, client := range clients {
-		mq.PubPeerUpdate(&client, &relay, peers)
-	}
+	go mq.BroadCastRelayUpdate(relayNode.Network)
 	logger.Log(1, r.Header.Get("user"), "created relay on node", relayRequest.NodeID, "on network", relayRequest.NetID)
 	apiNode := relayNode.ConvertToAPINode()
 	w.WriteHeader(http.StatusOK)
@@ -106,29 +82,30 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 	logger.Log(1, r.Header.Get("user"), "deleted relay server", nodeid, "on network", netid)
 	go func() {
 		//update relayHost node
-		relayHost, err := logic.GetHost(node.HostID.String())
-		if err == nil {
-			if err := mq.NodeUpdate(&node); err != nil {
-				logger.Log(1, "relay node update", relayHost.Name, "on network", node.Network, ": ", err.Error())
-			}
-			for _, relayedClient := range updateClients {
-				err = mq.NodeUpdate(&relayedClient.Node)
-				if err != nil {
-					logger.Log(1, "relayed node update ", relayedClient.Node.ID.String(), "on network", relayedClient.Node.Network, ": ", err.Error())
-
-				}
-			}
-			peers, err := logic.GetNetworkClients(node.Network)
+		// relayHost, err := logic.GetHost(node.HostID.String())
+		// if err == nil {
+		// 	if err := mq.NodeUpdate(&node); err != nil {
+		// 		logger.Log(1, "relay node update", relayHost.Name, "on network", node.Network, ": ", err.Error())
+		// 	}
+		for _, relayedClient := range updateClients {
+			err = mq.NodeUpdate(&relayedClient.Node)
 			if err != nil {
-				logger.Log(0, "error getting network nodes: ", err.Error())
-				logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
-				return
-			}
-			clients := peers
-			for _, client := range clients {
-				mq.PubPeerUpdate(&client, nil, peers)
+				logger.Log(1, "relayed node update ", relayedClient.Node.ID.String(), "on network", relayedClient.Node.Network, ": ", err.Error())
+
 			}
 		}
+		// 	peers, err := logic.GetNetworkClients(node.Network)
+		// 	if err != nil {
+		// 		logger.Log(0, "error getting network nodes: ", err.Error())
+		// 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
+		// 		return
+		// 	}
+		// 	clients := peers
+		// 	for _, client := range clients {
+		// 		mq.PubPeerUpdate(&client, nil, peers)
+		// 	}
+		// }
+		go mq.BroadCastRelayUpdate(netid)
 	}()
 	logger.Log(1, r.Header.Get("user"), "deleted relay on node", node.ID.String(), "on network", node.Network)
 	apiNode := node.ConvertToAPINode()

+ 3 - 0
logic/gateway.go

@@ -102,6 +102,9 @@ func CreateIngressGateway(netid string, nodeid string, ingress models.IngressReq
 	if err != nil {
 		return models.Node{}, err
 	}
+	if node.IsRelayed {
+		return models.Node{}, errors.New("ingress cannot be created on a relayed node")
+	}
 	host, err := GetHost(node.HostID.String())
 	if err != nil {
 		return models.Node{}, err

+ 60 - 71
logic/peers.go

@@ -63,7 +63,7 @@ func NodePeersInfo(client *models.Client) (models.NodePeersInfo, error) {
 			peerConfig.Endpoint.IP = peer.LocalAddress.IP
 			peerConfig.Endpoint.Port = peerHost.ListenPort
 		}
-		allowedips := GetAllowedIPs(&client.Node, &peer, nil)
+		allowedips := GetAllowedIPs(&models.Client{Host: *peerHost, Node: peer})
 		if peer.IsIngressGateway {
 			for _, entry := range peer.IngressGatewayRange {
 				_, cidr, err := net.ParseCIDR(string(entry))
@@ -180,7 +180,7 @@ func GetPeerUpdateForHost(host *models.Host) (models.HostPeerUpdate, error) {
 				peerConfig.Endpoint.IP = peer.LocalAddress.IP
 				peerConfig.Endpoint.Port = peerHost.ListenPort
 			}
-			allowedips := GetAllowedIPs(&node, &peer, nil)
+			allowedips := GetAllowedIPs(&models.Client{Host: *peerHost, Node: peer})
 			if peer.IsIngressGateway {
 				for _, entry := range peer.IngressGatewayRange {
 					_, cidr, err := net.ParseCIDR(string(entry))
@@ -448,40 +448,38 @@ func GetExtPeers(node *models.Node) ([]wgtypes.PeerConfig, []models.IDandAddr, e
 }
 
 // GetAllowedIPs - calculates the wireguard allowedip field for a peer of a node based on the peer and node settings
-func GetAllowedIPs(node, peer *models.Node, metrics *models.Metrics) []net.IPNet {
+func GetAllowedIPs(peer *models.Client) []net.IPNet {
 	var allowedips []net.IPNet
-	allowedips = getNodeAllowedIPs(peer, node)
+	if peer.Node.Address.IP != nil {
+		allowed := net.IPNet{
+			IP:   peer.Node.Address.IP,
+			Mask: net.CIDRMask(32, 32),
+		}
+		allowedips = append(allowedips, allowed)
+	}
+	if peer.Node.Address6.IP != nil {
+		allowed := net.IPNet{
+			IP:   peer.Node.Address6.IP,
+			Mask: net.CIDRMask(128, 128),
+		}
+		allowedips = append(allowedips, allowed)
+	}
+	// handle egress gateway peers
+	if peer.Node.IsEgressGateway {
+		egressIPs := getEgressIPs(
+			&models.Client{
+				Host: peer.Host,
+				Node: peer.Node,
+			})
+		allowedips = append(allowedips, egressIPs...)
 
+	}
+	if peer.Node.IsRelay {
+		allowedips = append(allowedips, getRelayAllowedIPs(&models.Client{Node: peer.Node, Host: peer.Host})...)
+	}
 	// handle ingress gateway peers
-	if peer.IsIngressGateway {
-		extPeers, _, err := GetExtPeers(peer)
-		if err != nil {
-			logger.Log(2, "could not retrieve ext peers for ", peer.ID.String(), err.Error())
-		}
-		for _, extPeer := range extPeers {
-			allowedips = append(allowedips, extPeer.AllowedIPs...)
-		}
-		// if node is a failover node, add allowed ips from nodes it is handling
-		if metrics != nil && peer.Failover && metrics.FailoverPeers != nil {
-			// traverse through nodes that need handling
-			logger.Log(3, "peer", peer.ID.String(), "was found to be failover for", node.ID.String(), "checking failover peers...")
-			for k := range metrics.FailoverPeers {
-				// if FailoverNode is me for this node, add allowedips
-				if metrics.FailoverPeers[k] == peer.ID.String() {
-					// get original node so we can traverse the allowed ips
-					nodeToFailover, err := GetNodeByID(k)
-					if err == nil {
-						failoverNodeMetrics, err := GetMetrics(nodeToFailover.ID.String())
-						if err == nil && failoverNodeMetrics != nil {
-							if len(failoverNodeMetrics.NodeName) > 0 {
-								allowedips = append(allowedips, getNodeAllowedIPs(&nodeToFailover, peer)...)
-								logger.Log(0, "failing over node", nodeToFailover.ID.String(), nodeToFailover.PrimaryAddress(), "to failover node", peer.ID.String())
-							}
-						}
-					}
-				}
-			}
-		}
+	if peer.Node.IsIngressGateway {
+		allowedips = append(allowedips, getIngressIPs(peer)...)
 	}
 	return allowedips
 }
@@ -521,44 +519,6 @@ func getEgressIPs(client *models.Client) []net.IPNet {
 	return allowedips
 }
 
-func getNodeAllowedIPs(peer, node *models.Node) []net.IPNet {
-	var allowedips = []net.IPNet{}
-	if peer.Address.IP != nil {
-		allowed := net.IPNet{
-			IP:   peer.Address.IP,
-			Mask: net.CIDRMask(32, 32),
-		}
-		allowedips = append(allowedips, allowed)
-	}
-	if peer.Address6.IP != nil {
-		allowed := net.IPNet{
-			IP:   peer.Address6.IP,
-			Mask: net.CIDRMask(128, 128),
-		}
-		allowedips = append(allowedips, allowed)
-	}
-	// handle egress gateway peers
-	if peer.IsEgressGateway {
-		//hasGateway = true
-		host, err := GetHost(peer.HostID.String())
-		if err == nil {
-			egressIPs := getEgressIPs(
-				&models.Client{
-					Host: *host,
-					Node: *peer,
-				})
-			allowedips = append(allowedips, egressIPs...)
-		}
-	}
-	if peer.IsRelay {
-		for _, relayed := range peer.RelayedNodes {
-			allowed := getRelayedAddresses(relayed)
-			allowedips = append(allowedips, allowed...)
-		}
-	}
-	return allowedips
-}
-
 func getCIDRMaskFromAddr(addr string) net.IPMask {
 	cidr := net.CIDRMask(32, 32)
 	ipAddr, err := netip.ParseAddr(addr)
@@ -659,6 +619,35 @@ func GetPeerUpdate(host *models.Host) []wgtypes.PeerConfig {
 	return peerUpdate
 }
 
+func AddHostAllowedIPs(h *models.Host) []net.IPNet {
+	allowedIPs := []net.IPNet{}
+	for _, hNodeID := range h.Nodes {
+		node, err := GetNodeByID(hNodeID)
+		if err != nil {
+			continue
+		}
+		if node.Address.IP != nil {
+			node.Address.Mask = net.CIDRMask(32, 32)
+			allowedIPs = append(allowedIPs, node.Address)
+		}
+		if node.Address6.IP != nil {
+			node.Address6.Mask = net.CIDRMask(128, 128)
+			allowedIPs = append(allowedIPs, node.Address6)
+		}
+		if node.IsEgressGateway {
+			allowedIPs = append(allowedIPs, getEgressIPs(&models.Client{Host: *h, Node: node})...)
+		}
+		if node.IsIngressGateway {
+			allowedIPs = append(allowedIPs, getIngressIPs(&models.Client{Host: *h, Node: node})...)
+		}
+		if node.IsRelay {
+			allowedIPs = append(allowedIPs, getRelayAllowedIPs(&models.Client{Host: *h, Node: node})...)
+		}
+	}
+	return allowedIPs
+
+}
+
 func AddAllowedIPs(peer *models.Client) []net.IPNet {
 	allowedIPs := []net.IPNet{}
 	if peer.Node.Address.IP != nil {

+ 12 - 3
logic/relay.go

@@ -100,6 +100,15 @@ func ValidateRelay(relay models.RelayRequest) error {
 	if empty {
 		err = errors.New("relayed nodes cannot be empty")
 	}
+	for _, relayedNodeID := range relay.RelayedNodes {
+		relayedNode, err := GetNodeByID(relayedNodeID)
+		if err != nil {
+			return err
+		}
+		if relayedNode.IsIngressGateway {
+			return errors.New("cannot relay an ingress gateway (" + relayedNodeID + ")")
+		}
+	}
 	return err
 }
 
@@ -174,7 +183,7 @@ func peerUpdateForRelayed(client *models.Client, peers []models.Client) []wgtype
 			continue
 		}
 		if peer.Host.ID == relay.Host.ID { // add relay as a peer
-			update := peerUpdateForRelayedByRelay(client, &relay)
+			update := PeerUpdateForRelayedByRelay(client, &relay)
 			peerConfig = append(peerConfig, update)
 			continue
 		}
@@ -187,8 +196,8 @@ func peerUpdateForRelayed(client *models.Client, peers []models.Client) []wgtype
 	return peerConfig
 }
 
-// peerUpdateForRelayedByRelay - returns the peerConfig for a node relayed by relay
-func peerUpdateForRelayedByRelay(relayed, relay *models.Client) wgtypes.PeerConfig {
+// PeerUpdateForRelayedByRelay - returns the peerConfig for a node relayed by relay
+func PeerUpdateForRelayedByRelay(relayed, relay *models.Client) wgtypes.PeerConfig {
 	if relayed.Node.RelayedBy != relay.Node.ID.String() {
 		logger.Log(0, "peerUpdateForRelayedByRelay called with invalid parameters")
 		return wgtypes.PeerConfig{}

+ 9 - 9
mq/handlers.go

@@ -63,11 +63,11 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 		if err != nil {
 			return
 		}
-		if err = BroadcastAddOrUpdatePeer(h, &newNode, true); err != nil {
+		if err = BroadcastAddOrUpdateNetworkPeer(&models.Client{Host: *h, Node: newNode}, true); err != nil {
 			logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error())
 		}
-		if nodes, err := logic.GetNetworkNodes(newNode.Network); err == nil {
-			FlushNetworkPeersToHost(h, &newNode, nodes)
+		if clients, err := logic.GetNetworkClients(newNode.Network); err == nil {
+			FlushNetworkPeersToHost(&models.Client{Host: *h, Node: newNode}, clients)
 		}
 
 	}
@@ -99,6 +99,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 	}
 	slog.Info("recieved host update", "name", hostUpdate.Host.Name, "id", hostUpdate.Host.ID)
 	var sendPeerUpdate bool
+	var removeHost bool
 	switch hostUpdate.Action {
 	case models.CheckIn:
 		sendPeerUpdate = handleHostCheckin(&hostUpdate.Host, currentHost)
@@ -116,11 +117,11 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 					}
 				}
 				// flush peers to host
-				nodes, err := logic.GetNetworkNodes(hu.Node.Network)
+				clients, err := logic.GetNetworkClients(hu.Node.Network)
 				if err != nil {
 					return
 				}
-				err = FlushNetworkPeersToHost(&hu.Host, &hu.Node, nodes)
+				err = FlushNetworkPeersToHost(&models.Client{Host: hu.Host, Node: hu.Node}, clients)
 				if err != nil {
 					logger.Log(0, "failed to flush peers to host: ", err.Error())
 				}
@@ -175,7 +176,9 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 			slog.Error("failed to delete host", "id", currentHost.ID, "error", err)
 			return
 		}
+		removeHost = true
 		sendPeerUpdate = true
+
 	case models.RegisterWithTurn:
 		if servercfg.IsUsingTurn() {
 			err = logic.RegisterHostWithTurn(hostUpdate.Host.ID.String(), hostUpdate.Host.HostPass)
@@ -188,10 +191,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 	}
 
 	if sendPeerUpdate {
-		err := PublishPeerUpdate()
-		if err != nil {
-			slog.Error("failed to publish peer update", "error", err)
-		}
+		go BroadcastHostUpdate(currentHost, removeHost)
 	}
 	// if servercfg.Is_EE && ifaceDelta {
 	// 	if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil {

+ 91 - 54
mq/publishers.go

@@ -53,8 +53,8 @@ func PublishSingleHostPeerUpdate(host *models.Host) error {
 }
 
 // FlushNetworkPeersToHost - sends all the peers in the network to the host.
-func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node, networkNodes []models.Node) error {
-	logger.Log(0, "flushing network peers to host: ", host.ID.String(), hNode.Network)
+func FlushNetworkPeersToHost(client *models.Client, networkClients []models.Client) error {
+	logger.Log(0, "flushing network peers to host: ", client.Host.ID.String(), client.Node.Network)
 	addPeerAction := models.PeerAction{
 		Action: models.AddPeer,
 		Peers:  []wgtypes.PeerConfig{},
@@ -63,40 +63,46 @@ func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node, networkNodes
 		Action: models.RemovePeer,
 		Peers:  []wgtypes.PeerConfig{},
 	}
-	for _, node := range networkNodes {
-		if node.ID == hNode.ID || node.IsRelayed {
-			// skip self or if relayed
+	for _, clientI := range networkClients {
+		clientI := clientI
+		if clientI.Node.ID == client.Node.ID {
+			// skip self
 			continue
 		}
-		peerHost, err := logic.GetHost(node.HostID.String())
-		if err != nil {
+		if clientI.Node.IsRelayed && (clientI.Node.RelayedBy != client.Node.ID.String()) {
+			// remove this peer, will be added to relay node's allowed ips
+			rmPeerAction.Peers = append(rmPeerAction.Peers, wgtypes.PeerConfig{
+				PublicKey: clientI.Host.PublicKey,
+				Remove:    true,
+			})
 			continue
 		}
 
-		if !nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(hNode.ID.String()), nodeacls.NodeID(node.ID.String())) ||
-			hNode.Action == models.NODE_DELETE || hNode.PendingDelete || !hNode.Connected || (hNode.IsRelayed && hNode.RelayedBy != node.ID.String()) {
+		if !nodeacls.AreNodesAllowed(nodeacls.NetworkID(clientI.Node.Network), nodeacls.NodeID(client.Node.ID.String()), nodeacls.NodeID(clientI.Node.ID.String())) ||
+			client.Node.Action == models.NODE_DELETE || client.Node.PendingDelete || !client.Node.Connected || (client.Node.IsRelayed && client.Node.RelayedBy != clientI.Node.ID.String()) {
 			// remove peer if not allowed
 			rmPeerAction.Peers = append(rmPeerAction.Peers, wgtypes.PeerConfig{
-				PublicKey: peerHost.PublicKey,
+				PublicKey: clientI.Host.PublicKey,
 				Remove:    true,
 			})
 			continue
 		}
 		peerCfg := wgtypes.PeerConfig{
-			PublicKey: peerHost.PublicKey,
+			PublicKey: clientI.Host.PublicKey,
 			Endpoint: &net.UDPAddr{
-				IP:   peerHost.EndpointIP,
-				Port: logic.GetPeerListenPort(peerHost),
+				IP:   clientI.Host.EndpointIP,
+				Port: logic.GetPeerListenPort(&clientI.Host),
 			},
-			PersistentKeepaliveInterval: &node.PersistentKeepalive,
+			PersistentKeepaliveInterval: &clientI.Node.PersistentKeepalive,
 			ReplaceAllowedIPs:           true,
-			AllowedIPs:                  logic.GetAllowedIPs(hNode, &node, nil),
+			AllowedIPs:                  logic.GetAllowedIPs(&clientI),
 		}
 		addPeerAction.Peers = append(addPeerAction.Peers, peerCfg)
 	}
-	if hNode.IsRelayed {
+	if client.Node.IsRelayed {
 		// update the relay peer on this node
-		relayNode, err := logic.GetNodeByID(hNode.RelayedBy)
+		logger.Log(0, "HEREEEEEEEEEEEEEEE 1")
+		relayNode, err := logic.GetNodeByID(client.Node.RelayedBy)
 		if err != nil {
 			return err
 		}
@@ -104,19 +110,17 @@ func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node, networkNodes
 		if err != nil {
 			return err
 		}
-		relayedClient := &models.Client{
-			Host: *host,
-			Node: *hNode,
-		}
+		logger.Log(0, "HEREEEEEEEEEEEEEEE 2")
+		relayedClient := client
 		relayClient := &models.Client{
 			Host: *relayHost,
 			Node: relayNode,
 		}
-		relayPeerCfg := getRelayPeerCfgForRelayedNode(relayedClient, relayClient)
+		relayPeerCfg := logic.PeerUpdateForRelayedByRelay(relayedClient, relayClient)
 		addPeerAction.Peers = append(addPeerAction.Peers, relayPeerCfg)
 	}
-	if hNode.IsIngressGateway {
-		extPeers, _, err := logic.GetExtPeers(hNode)
+	if client.Node.IsIngressGateway {
+		extPeers, _, err := logic.GetExtPeers(&client.Node)
 		if err == nil {
 			addPeerAction.Peers = append(addPeerAction.Peers, extPeers...)
 		}
@@ -126,20 +130,20 @@ func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node, networkNodes
 		if err != nil {
 			return err
 		}
-		publish(host, fmt.Sprintf("peer/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
+		publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
 	}
 	if len(addPeerAction.Peers) > 0 {
 		data, err := json.Marshal(addPeerAction)
 		if err != nil {
 			return err
 		}
-		publish(host, fmt.Sprintf("peer/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
+		publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
 	}
 	// send fw update if gw host
-	if hNode.IsIngressGateway || hNode.IsEgressGateway {
-		f, err := logic.GetFwUpdate(host)
+	if client.Node.IsIngressGateway || client.Node.IsEgressGateway {
+		f, err := logic.GetFwUpdate(&client.Host)
 		if err == nil {
-			PublishFwUpdate(host, &f)
+			PublishFwUpdate(&client.Host, &f)
 		}
 
 	}
@@ -188,23 +192,55 @@ func BroadcastDelPeer(host *models.Host, network string) error {
 
 // BroadcastAclUpdate - sends new acl updates to peers
 func BroadcastAclUpdate(network string) error {
-	nodes, err := logic.GetNetworkNodes(network)
+	clients, err := logic.GetNetworkClients(network)
 	if err != nil {
 		return err
 	}
-	for _, nodeI := range nodes {
-		nodeI := nodeI
-		h, err := logic.GetHost(nodeI.HostID.String())
-		if err == nil {
-			go FlushNetworkPeersToHost(h, &nodeI, nodes)
-		}
+	for _, client := range clients {
+		client := client
+		go FlushNetworkPeersToHost(&client, clients)
 	}
 	return err
 }
 
-// BroadcastAddOrUpdatePeer - notifys the hosts in the network to add or update peer.
-func BroadcastAddOrUpdatePeer(host *models.Host, node *models.Node, update bool) error {
-	nodes, err := logic.GetNetworkNodes(node.Network)
+// BroadcastHostUpdate - notifys the hosts in the network to update peer.
+func BroadcastHostUpdate(host *models.Host, remove bool) error {
+
+	p := models.PeerAction{
+		Action: models.UpdatePeer,
+		Peers: []wgtypes.PeerConfig{
+			{
+				PublicKey: host.PublicKey,
+				Endpoint: &net.UDPAddr{
+					IP:   host.EndpointIP,
+					Port: logic.GetPeerListenPort(host),
+				},
+				ReplaceAllowedIPs: true,
+				Remove:            remove,
+			},
+		},
+	}
+	if remove {
+		p.Action = models.RemovePeer
+	} else {
+		p.Peers[0].AllowedIPs = logic.AddHostAllowedIPs(host)
+		fmt.Println(0, "Allowed IPs: ", p.Peers[0].AllowedIPs)
+	}
+	peerHosts := logic.GetRelatedHosts(host.ID.String())
+	data, err := json.Marshal(p)
+	if err != nil {
+		return err
+	}
+	for _, peerHost := range peerHosts {
+		publish(&peerHost, fmt.Sprintf("peer/host/%s/%s", peerHost.ID.String(), servercfg.GetServer()), data)
+
+	}
+	return nil
+}
+
+// BroadcastAddOrUpdateNetworkPeer - notifys the hosts in the network to add or update peer.
+func BroadcastAddOrUpdateNetworkPeer(client *models.Client, update bool) error {
+	nodes, err := logic.GetNetworkNodes(client.Node.Network)
 	if err != nil {
 		return err
 	}
@@ -213,12 +249,12 @@ func BroadcastAddOrUpdatePeer(host *models.Host, node *models.Node, update bool)
 		Action: models.AddPeer,
 		Peers: []wgtypes.PeerConfig{
 			{
-				PublicKey: host.PublicKey,
+				PublicKey: client.Host.PublicKey,
 				Endpoint: &net.UDPAddr{
-					IP:   host.EndpointIP,
-					Port: logic.GetPeerListenPort(host),
+					IP:   client.Host.EndpointIP,
+					Port: logic.GetPeerListenPort(&client.Host),
 				},
-				PersistentKeepaliveInterval: &node.PersistentKeepalive,
+				PersistentKeepaliveInterval: &client.Node.PersistentKeepalive,
 				ReplaceAllowedIPs:           true,
 			},
 		},
@@ -227,14 +263,15 @@ func BroadcastAddOrUpdatePeer(host *models.Host, node *models.Node, update bool)
 		p.Action = models.UpdatePeer
 	}
 	for _, nodeI := range nodes {
-		if nodeI.ID.String() == node.ID.String() {
+		nodeI := nodeI
+		if nodeI.ID.String() == client.Node.ID.String() {
 			// skip self...
 			continue
 		}
 		// update allowed ips, according to the peer node
-		p.Peers[0].AllowedIPs = logic.GetAllowedIPs(&nodeI, node, nil)
-		if update && (!nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(nodeI.ID.String())) ||
-			node.Action == models.NODE_DELETE || node.PendingDelete || !node.Connected) {
+		p.Peers[0].AllowedIPs = logic.GetAllowedIPs(&models.Client{Host: client.Host, Node: client.Node})
+		if update && (!nodeacls.AreNodesAllowed(nodeacls.NetworkID(client.Node.Network), nodeacls.NodeID(client.Node.ID.String()), nodeacls.NodeID(nodeI.ID.String())) ||
+			client.Node.Action == models.NODE_DELETE || client.Node.PendingDelete || !client.Node.Connected) {
 			// remove peer
 			p.Action = models.RemovePeer
 			p.Peers[0].Remove = true
@@ -264,7 +301,7 @@ func BroadcastAddOrUpdatePeer(host *models.Host, node *models.Node, update bool)
 				Host: *relayHost,
 				Node: relayNode,
 			}
-			rPeerCfg := getRelayPeerCfgForRelayedNode(relayedClient, relayClient)
+			rPeerCfg := logic.PeerUpdateForRelayedByRelay(relayedClient, relayClient)
 			if update {
 				r.Action = models.UpdatePeer
 			}
@@ -298,20 +335,20 @@ func BroadcastAddOrUpdatePeer(host *models.Host, node *models.Node, update bool)
 // BroadcastExtClient - publishes msg to add/updates ext client in the network
 func BroadcastExtClient(ingressHost *models.Host, ingressNode *models.Node) error {
 
-	nodes, err := logic.GetNetworkNodes(ingressNode.Network)
+	clients, err := logic.GetNetworkClients(ingressNode.Network)
 	if err != nil {
 		return err
 	}
 	//flush peers to ingress host
-	go FlushNetworkPeersToHost(ingressHost, ingressNode, nodes)
+	go FlushNetworkPeersToHost(&models.Client{Host: *ingressHost, Node: *ingressNode}, clients)
 	// broadcast to update ingress peer to other hosts
-	go BroadcastAddOrUpdatePeer(ingressHost, ingressNode, true)
+	go BroadcastAddOrUpdateNetworkPeer(&models.Client{Host: *ingressHost, Node: *ingressNode}, true)
 	return nil
 }
 
 // BroadcastDelExtClient - published msg to remove ext client from network
-func BroadcastDelExtClient(ingressHost *models.Host, ingressNode *models.Node, extclients []models.ExtClient) error {
-	go BroadcastAddOrUpdatePeer(ingressHost, ingressNode, true)
+func BroadcastDelExtClient(ingressClient *models.Client, extclients []models.ExtClient) error {
+	go BroadcastAddOrUpdateNetworkPeer(ingressClient, true)
 	peers := []wgtypes.PeerConfig{}
 	for _, extclient := range extclients {
 		extPubKey, err := wgtypes.ParseKey(extclient.PublicKey)
@@ -333,7 +370,7 @@ func BroadcastDelExtClient(ingressHost *models.Host, ingressNode *models.Node, e
 	if err != nil {
 		return err
 	}
-	err = publish(ingressHost, fmt.Sprintf("peer/host/%s/%s", ingressHost.ID.String(), servercfg.GetServer()), data)
+	err = publish(&ingressClient.Host, fmt.Sprintf("peer/host/%s/%s", ingressClient.Host.ID.String(), servercfg.GetServer()), data)
 	if err != nil {
 		return err
 	}

+ 17 - 32
mq/relay.go

@@ -146,38 +146,6 @@ func getIngressIPs(peer models.Client) []net.IPNet {
 	return ingressIPs
 }
 
-func getRelayPeerCfgForRelayedNode(relayed, relay *models.Client) (relayPeer wgtypes.PeerConfig) {
-	relayPeer = wgtypes.PeerConfig{
-		PublicKey: relay.Host.PublicKey,
-		Endpoint: &net.UDPAddr{
-			IP:   relay.Host.EndpointIP,
-			Port: logic.GetPeerListenPort(&relay.Host),
-		},
-		PersistentKeepaliveInterval: &relay.Node.PersistentKeepalive,
-	}
-	peers, err := logic.GetNetworkClients(relay.Node.Network)
-	if err == nil {
-		if relay.Node.Address.IP != nil {
-			relay.Node.Address.Mask = net.CIDRMask(32, 32)
-			relayPeer.AllowedIPs = append(relayPeer.AllowedIPs, relay.Node.Address)
-		}
-		if relay.Node.Address6.IP != nil {
-			relay.Node.Address6.Mask = net.CIDRMask(128, 128)
-			relayPeer.AllowedIPs = append(relayPeer.AllowedIPs, relay.Node.Address6)
-		}
-		// add all other peers to allowed ips
-		for _, peer := range peers {
-			if peer.Host.ID == relay.Host.ID || peer.Host.ID == relayed.Host.ID {
-				continue
-			}
-			if nodeacls.AreNodesAllowed(nodeacls.NetworkID(relayed.Node.Network), nodeacls.NodeID(relayed.Node.ID.String()), nodeacls.NodeID(peer.Node.ID.String())) {
-				relayPeer.AllowedIPs = append(relayPeer.AllowedIPs, logic.AddAllowedIPs(&peer)...)
-			}
-		}
-	}
-	return
-}
-
 // pubRelayedUpdate - publish peer update to a node (client) that is relayed by the relay
 func pubRelayedUpdate(client, relay *models.Client, peers []models.Client) {
 	//verify
@@ -278,3 +246,20 @@ func pubRelayUpdate(client *models.Client, peers []models.Client) {
 	}
 	publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
 }
+
+func BroadCastRelayUpdate(network string) error {
+	/* TODO:
+	1. FlushPeersTo Relayed Node
+	2. BroadCast Remove relayed Peer on network peer
+	3. BroadCast Update Relay peer on netmaker peer
+	*/
+	clients, err := logic.GetNetworkClients(network)
+	if err != nil {
+		return err
+	}
+	for _, client := range clients {
+		client := client
+		go FlushNetworkPeersToHost(&client, clients)
+	}
+	return err
+}