Преглед на файлове

flush peers to host once joined the network

Abhishek Kondur преди 2 години
родител
ревизия
f0aa151cc6
променени са 4 файла, в които са добавени 57 реда и са изтрити 17 реда
  1. 3 4
      auth/host_session.go
  2. 4 10
      controllers/node.go
  3. 8 3
      mq/handlers.go
  4. 42 0
      mq/publishers.go

+ 3 - 4
auth/host_session.go

@@ -237,7 +237,9 @@ func CheckNetRegAndHostUpdate(networks []string, h *models.Host) {
 				Host:   *h,
 				Node:   *newNode,
 			})
-			mq.BroadCastAddOrUpdatePeer(h, newNode, false)
+			if servercfg.IsMessageQueueBackend() {
+				mq.BroadCastAddOrUpdatePeer(h, newNode, false)
+			}
 		}
 	}
 	if servercfg.IsMessageQueueBackend() {
@@ -245,9 +247,6 @@ func CheckNetRegAndHostUpdate(networks []string, h *models.Host) {
 			Action: models.RequestAck,
 			Host:   *h,
 		})
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(0, "failed to publish peer update during registration -", err.Error())
-		}
 
 	}
 }

+ 4 - 10
controllers/node.go

@@ -695,11 +695,9 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 	runUpdates(newNode, ifaceDelta)
 	go func(aclUpdate bool, newNode *models.Node) {
 		if aclUpdate {
-			if err := mq.PublishPeerUpdate(); err != nil {
-				logger.Log(0, "error during node ACL update for node", newNode.ID.String())
-			}
+			mq.BroadCastAddOrUpdatePeer(host, newNode, true)
 		}
-		mq.BroadCastAddOrUpdatePeer(host, newNode, true)
+
 		if err := mq.PublishReplaceDNS(&currentNode, newNode, host); err != nil {
 			logger.Log(1, "failed to publish dns update", err.Error())
 		}
@@ -758,12 +756,8 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 		if err != nil {
 			logger.Log(1, "failed to retrieve host for node", node.ID.String(), err.Error())
 		}
-		if fromNode {
-			err = mq.PublishDeletedNodePeerUpdate(deletedNode)
-			mq.BroadCastDelPeer(host, deletedNode.Network)
-		} else {
-			err = mq.PublishPeerUpdate()
-		}
+
+		err = mq.BroadCastDelPeer(host, deletedNode.Network)
 		if err != nil {
 			logger.Log(1, "error publishing peer update ", err.Error())
 		}

+ 8 - 3
mq/handlers.go

@@ -107,9 +107,14 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 						return
 					}
 				}
-				if err = PublishSingleHostPeerUpdate(context.Background(), currentHost, nil, nil); err != nil {
-					logger.Log(0, "failed peers publish after join acknowledged", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
-					return
+				// if err = PublishSingleHostPeerUpdate(context.Background(), currentHost, nil, nil); err != nil {
+				// 	logger.Log(0, "failed peers publish after join acknowledged", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
+				// 	return
+				// }
+				// flush peers to host
+				err = FlushNetworkPeersToHost(&hu.Host, &hu.Node)
+				if err != nil {
+					logger.Log(0, "failed to flush peers to host: ", err.Error())
 				}
 				if err = handleNewNodeDNS(&hu.Host, &hu.Node); err != nil {
 					logger.Log(0, "failed to send dns update after node,", hu.Node.ID.String(), ", added to host", hu.Host.Name, err.Error())

+ 42 - 0
mq/publishers.go

@@ -110,6 +110,43 @@ func PublishSingleHostPeerUpdate(ctx context.Context, host *models.Host, deleted
 	return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
 }
 
+func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node) error {
+	logger.Log(0, "flushing network peers to host: ", host.ID.String(), hNode.Network)
+	nodes, err := logic.GetNetworkNodes(hNode.Network)
+	if err != nil {
+		return err
+	}
+	for _, nodeI := range nodes {
+		if nodeI.ID == hNode.ID {
+			// skip self
+			continue
+		}
+		peerHost, err := logic.GetHost(nodeI.HostID.String())
+		if err != nil {
+			continue
+		}
+		p := models.PeerAction{
+			Action: models.AddPeer,
+			Peer: wgtypes.PeerConfig{
+				PublicKey: peerHost.PublicKey,
+				Endpoint: &net.UDPAddr{
+					IP:   peerHost.EndpointIP,
+					Port: logic.GetPeerListenPort(peerHost),
+				},
+				PersistentKeepaliveInterval: &nodeI.PersistentKeepalive,
+				ReplaceAllowedIPs:           true,
+				AllowedIPs:                  logic.GetAllowedIPs(hNode, &nodeI, nil),
+			},
+		}
+		data, err := json.Marshal(p)
+		if err != nil {
+			continue
+		}
+		publish(host, fmt.Sprintf("peer/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
+	}
+	return nil
+}
+
 // BroadCastDelPeer - notifys all the hosts in the network to remove peer
 func BroadCastDelPeer(host *models.Host, network string) error {
 	nodes, err := logic.GetNetworkNodes(network)
@@ -128,6 +165,10 @@ func BroadCastDelPeer(host *models.Host, network string) error {
 		return err
 	}
 	for _, nodeI := range nodes {
+		if nodeI.HostID == host.ID {
+			// skip self...
+			continue
+		}
 		peerHost, err := logic.GetHost(nodeI.HostID.String())
 		if err == nil {
 			publish(peerHost, fmt.Sprintf("peer/host/%s/%s", peerHost.ID.String(), servercfg.GetServer()), data)
@@ -138,6 +179,7 @@ func BroadCastDelPeer(host *models.Host, network string) error {
 
 // BroadCastAddOrUpdatePeer - notifys the hosts in the network to add or update peer.
 func BroadCastAddOrUpdatePeer(host *models.Host, node *models.Node, update bool) error {
+	// TODO: ACLs
 	nodes, err := logic.GetNetworkNodes(node.Network)
 	if err != nil {
 		return err