浏览代码

publish singleton updates for relay updates

Abhishek Kondur 2 年之前
父节点
当前提交
bd7a31f1a7
共有 5 个文件被更改,包括 64 次插入24 次删除
  1. 5 1
      controllers/hosts.go
  2. 5 2
      controllers/node.go
  3. 2 2
      ee/ee_controllers/relay.go
  4. 13 18
      mq/publishers.go
  5. 39 1
      mq/relay.go

+ 5 - 1
controllers/hosts.go

@@ -346,7 +346,11 @@ func deleteHostFromNetwork(w http.ResponseWriter, r *http.Request) {
 
 
 	runUpdates(node, false)
 	runUpdates(node, false)
 	go func() { // notify of peer change
 	go func() { // notify of peer change
-		mq.BroadcastDelPeer(currHost, network)
+		clients, err := logic.GetNetworkClients(network)
+		if err != nil {
+			return
+		}
+		mq.BroadcastDelPeer(currHost, clients)
 		if err := mq.PublishDNSDelete(node, currHost); err != nil {
 		if err := mq.PublishDNSDelete(node, currHost); err != nil {
 			logger.Log(1, "error publishing dns update", err.Error())
 			logger.Log(1, "error publishing dns update", err.Error())
 		}
 		}

+ 5 - 2
controllers/node.go

@@ -767,8 +767,11 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 			logger.Log(1, "failed to retrieve host for node", node.ID.String(), err.Error())
 			logger.Log(1, "failed to retrieve host for node", node.ID.String(), err.Error())
 			return
 			return
 		}
 		}
-
-		err = mq.BroadcastDelPeer(host, deletedNode.Network)
+		clients, err := logic.GetNetworkClients(deletedNode.Network)
+		if err != nil {
+			return
+		}
+		err = mq.BroadcastDelPeer(host, clients)
 		if err != nil {
 		if err != nil {
 			logger.Log(1, "error publishing peer update ", err.Error())
 			logger.Log(1, "error publishing peer update ", err.Error())
 		}
 		}

+ 2 - 2
ee/ee_controllers/relay.go

@@ -50,7 +50,7 @@ func createRelay(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 		return
 	}
 	}
-	go mq.BroadCastRelayUpdate(relayNode.Network)
+	go mq.BroadCastRelayUpdate(relayRequest)
 	logger.Log(1, r.Header.Get("user"), "created relay on node", relayRequest.NodeID, "on network", relayRequest.NetID)
 	logger.Log(1, r.Header.Get("user"), "created relay on node", relayRequest.NodeID, "on network", relayRequest.NetID)
 	apiNode := relayNode.ConvertToAPINode()
 	apiNode := relayNode.ConvertToAPINode()
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
@@ -105,7 +105,7 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 		// 		mq.PubPeerUpdate(&client, nil, peers)
 		// 		mq.PubPeerUpdate(&client, nil, peers)
 		// 	}
 		// 	}
 		// }
 		// }
-		go mq.BroadCastRelayUpdate(netid)
+		go mq.BroadCastRelayRemoval(netid)
 	}()
 	}()
 	logger.Log(1, r.Header.Get("user"), "deleted relay on node", node.ID.String(), "on network", node.Network)
 	logger.Log(1, r.Header.Get("user"), "deleted relay on node", node.ID.String(), "on network", node.Network)
 	apiNode := node.ConvertToAPINode()
 	apiNode := node.ConvertToAPINode()

+ 13 - 18
mq/publishers.go

@@ -149,11 +149,8 @@ func FlushNetworkPeersToHost(client *models.Client, networkClients []models.Clie
 }
 }
 
 
 // BroadcastDelPeer - notifys all the hosts in the network to remove peer
 // BroadcastDelPeer - notifys all the hosts in the network to remove peer
-func BroadcastDelPeer(host *models.Host, network string) error {
-	nodes, err := logic.GetNetworkNodes(network)
-	if err != nil {
-		return err
-	}
+func BroadcastDelPeer(host *models.Host, networkClients []models.Client) error {
+
 	p := models.PeerAction{
 	p := models.PeerAction{
 		Action: models.RemovePeer,
 		Action: models.RemovePeer,
 		Peers: []wgtypes.PeerConfig{
 		Peers: []wgtypes.PeerConfig{
@@ -167,23 +164,21 @@ func BroadcastDelPeer(host *models.Host, network string) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	for _, nodeI := range nodes {
-		if nodeI.HostID == host.ID {
+	for _, clientI := range networkClients {
+		if clientI.Host.ID == host.ID {
 			// skip self...
 			// skip self...
 			continue
 			continue
 		}
 		}
-		peerHost, err := logic.GetHost(nodeI.HostID.String())
-		if err == nil {
-			publish(peerHost, fmt.Sprintf("peer/host/%s/%s", peerHost.ID.String(), servercfg.GetServer()), data)
-			if nodeI.IsIngressGateway || nodeI.IsEgressGateway {
-				go func(peerHost models.Host) {
-					f, err := logic.GetFwUpdate(&peerHost)
-					if err == nil {
-						PublishFwUpdate(&peerHost, &f)
-					}
-				}(*peerHost)
-			}
+		publish(&clientI.Host, fmt.Sprintf("peer/host/%s/%s", clientI.Host.ID.String(), servercfg.GetServer()), data)
+		if clientI.Node.IsIngressGateway || clientI.Node.IsEgressGateway {
+			go func(peerHost models.Host) {
+				f, err := logic.GetFwUpdate(&peerHost)
+				if err == nil {
+					PublishFwUpdate(&peerHost, &f)
+				}
+			}(clientI.Host)
 		}
 		}
+
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 39 - 1
mq/relay.go

@@ -247,12 +247,50 @@ func pubRelayUpdate(client *models.Client, peers []models.Client) {
 	publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
 	publish(&client.Host, fmt.Sprintf("peer/host/%s/%s", client.Host.ID.String(), servercfg.GetServer()), data)
 }
 }
 
 
-func BroadCastRelayUpdate(network string) error {
+func BroadCastRelayUpdate(relayReq models.RelayRequest) error {
 	/* TODO:
 	/* TODO:
 	1. FlushPeersTo Relayed Node
 	1. FlushPeersTo Relayed Node
 	2. BroadCast Remove relayed Peer on network peer
 	2. BroadCast Remove relayed Peer on network peer
 	3. BroadCast Update Relay peer on netmaker peer
 	3. BroadCast Update Relay peer on netmaker peer
 	*/
 	*/
+	clients, err := logic.GetNetworkClients(relayReq.NetID)
+	if err != nil {
+		return err
+	}
+	// filter relay Node
+	filteredClients := clients
+	for i := len(filteredClients) - 1; i >= 0; i-- {
+		if filteredClients[i].Node.ID.String() == relayReq.NodeID {
+			filteredClients = append(filteredClients[:i], filteredClients[i+1:]...)
+			break
+		}
+	}
+	for _, relayedNodeID := range relayReq.RelayedNodes {
+		relayedNode, err := logic.GetNodeByID(relayedNodeID)
+		if err != nil {
+			continue
+		}
+
+		h, err := logic.GetHost(relayedNode.HostID.String())
+		if err != nil {
+			continue
+		}
+		BroadcastDelPeer(h, filteredClients)
+		FlushNetworkPeersToHost(&models.Client{Host: *h, Node: relayedNode}, clients)
+	}
+	relayNode, err := logic.GetNodeByID(relayReq.NodeID)
+	if err != nil {
+		return err
+	}
+	relayHost, err := logic.GetHost(relayNode.HostID.String())
+	if err != nil {
+		return err
+	}
+
+	return BroadcastAddOrUpdateNetworkPeer(&models.Client{Host: *relayHost, Node: relayNode}, true)
+}
+
+func BroadCastRelayRemoval(network string) error {
 	clients, err := logic.GetNetworkClients(network)
 	clients, err := logic.GetNetworkClients(network)
 	if err != nil {
 	if err != nil {
 		return err
 		return err