Browse Source

handled forced deleted peer

0xdcarns 2 years ago
parent
commit
adf3967e0d
4 changed files with 43 additions and 13 deletions
  1. 12 5
      controllers/node.go
  2. 5 2
      logic/peers.go
  3. 1 2
      mq/handlers.go
  4. 25 4
      mq/publishers.go

+ 12 - 5
controllers/node.go

@@ -433,7 +433,7 @@ func getNode(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
-	hostPeerUpdate, err := logic.GetPeerUpdateForHost(node.Network, host)
+	hostPeerUpdate, err := logic.GetPeerUpdateForHost(node.Network, host, nil)
 	if err != nil && !database.IsEmptyRecord(err) {
 		logger.Log(0, r.Header.Get("user"),
 			fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err))
@@ -616,7 +616,7 @@ func createNode(w http.ResponseWriter, r *http.Request) {
 			return
 		}
 	}
-	hostPeerUpdate, err := logic.GetPeerUpdateForHost(networkName, &data.Host)
+	hostPeerUpdate, err := logic.GetPeerUpdateForHost(networkName, &data.Host, nil)
 	if err != nil && !database.IsEmptyRecord(err) {
 		logger.Log(0, r.Header.Get("user"),
 			fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", data.Host.ID.String(), err))
@@ -985,10 +985,17 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 	if !fromNode { // notify node change
 		runUpdates(&node, false)
 	}
-	go func() { // notify of peer change
-		if err := mq.PublishPeerUpdate(); err != nil {
+	go func(deletedNode *models.Node, fromNode bool) { // notify of peer change
+		var err error
+		if fromNode {
+			err = mq.PublishDeletedNodePeerUpdate(deletedNode)
+		} else {
+			err = mq.PublishPeerUpdate()
+		}
+		if err != nil {
 			logger.Log(1, "error publishing peer update ", err.Error())
 		}
+
 		host, err := logic.GetHost(node.HostID.String())
 		if err != nil {
 			logger.Log(1, "failed to retrieve host for node", node.ID.String(), err.Error())
@@ -996,7 +1003,7 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 		if err := mq.PublishDNSDelete(&node, host); err != nil {
 			logger.Log(1, "error publishing dns update", err.Error())
 		}
-	}()
+	}(&node, fromNode)
 }
 
 func runUpdates(node *models.Node, ifaceDelta bool) {

+ 5 - 2
logic/peers.go

@@ -41,7 +41,7 @@ func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error
 		relayPeersMap := make(map[string]models.RelayedConf)
 		for _, relayedHost := range relayedHosts {
 			relayedHost := relayedHost
-			payload, err := GetPeerUpdateForHost("", &relayedHost)
+			payload, err := GetPeerUpdateForHost("", &relayedHost, nil)
 			if err == nil {
 				relayedEndpoint, udpErr := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayedHost.EndpointIP, GetPeerListenPort(&relayedHost)))
 				if udpErr == nil {
@@ -118,7 +118,7 @@ func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error
 }
 
 // GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks
-func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpdate, error) {
+func GetPeerUpdateForHost(network string, host *models.Host, deletedNode *models.Node) (models.HostPeerUpdate, error) {
 	if host == nil {
 		return models.HostPeerUpdate{}, errors.New("host is nil")
 	}
@@ -139,6 +139,9 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd
 		NodePeers:  []wgtypes.PeerConfig{},
 	}
 	var deletedNodes = []models.Node{} // used to track deleted nodes
+	if deletedNode != nil {
+		deletedNodes = append(deletedNodes, *deletedNode)
+	}
 	logger.Log(1, "peer update for host ", host.ID.String())
 	peerIndexMap := make(map[string]int)
 	for _, nodeID := range host.Nodes {

+ 1 - 2
mq/handlers.go

@@ -227,11 +227,10 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 				logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues")
 				host, err := logic.GetHost(currentNode.HostID.String())
 				if err == nil {
-					if err = PublishSingleHostPeerUpdate(host); err != nil {
+					if err = PublishSingleHostPeerUpdate(host, nil); err != nil {
 						logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
 					}
 				}
-
 			}
 
 			logger.Log(1, "updated node metrics", id)

+ 25 - 4
mq/publishers.go

@@ -25,7 +25,7 @@ func PublishPeerUpdate() error {
 	}
 	for _, host := range hosts {
 		host := host
-		err = PublishSingleHostPeerUpdate(&host)
+		err = PublishSingleHostPeerUpdate(&host, nil)
 		if err != nil {
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 		}
@@ -33,10 +33,31 @@ func PublishPeerUpdate() error {
 	return err
 }
 
+// PublishDeletedNodePeerUpdate --- determines and publishes a peer update
+// to all the hosts with a deleted node to account for
+func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
+	if !servercfg.IsMessageQueueBackend() {
+		return nil
+	}
+
+	hosts, err := logic.GetAllHosts()
+	if err != nil {
+		logger.Log(1, "err getting all hosts", err.Error())
+		return err
+	}
+	for _, host := range hosts {
+		host := host
+		if err = PublishSingleHostPeerUpdate(&host, delNode); err != nil {
+			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+		}
+	}
+	return err
+}
+
 // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
-func PublishSingleHostPeerUpdate(host *models.Host) error {
+func PublishSingleHostPeerUpdate(host *models.Host, deletedNode *models.Node) error {
 
-	peerUpdate, err := logic.GetPeerUpdateForHost("", host)
+	peerUpdate, err := logic.GetPeerUpdateForHost("", host, deletedNode)
 	if err != nil {
 		return err
 	}
@@ -403,7 +424,7 @@ func sendPeers() {
 		if force {
 			host := host
 			logger.Log(2, "sending scheduled peer update (5 min)")
-			err = PublishSingleHostPeerUpdate(&host)
+			err = PublishSingleHostPeerUpdate(&host, nil)
 			if err != nil {
 				logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
 			}