Browse Source

NET-164: Refactor Peer updates - Ext. Clients (#2325)

* single peer broadcast actions

* peer singleton update action

* broadcast update peer

* add func comments

* flush peers to host once joined the network

* send network peer list on host joining network

* flush all peers to host on joining a network

* broadcast acl update

* check node acls,action,connection status

* async add/rm network updates

* add comments to exported items

* publish new ext client update

* pr comments

* broadcast del ext client to the network

* broadcast ext client updates

* pr comments

* pr comments

* ext client updates

* publish dns update only when changed ID of ext client

* support multiple ext client removal
Abhishek K 2 years ago
parent
commit
b54635a576
5 changed files with 84 additions and 29 deletions
  1. 23 16
      controllers/ext_client.go
  2. 1 6
      controllers/node.go
  3. 5 3
      logic/extpeers.go
  4. 4 4
      logic/peers.go
  5. 51 0
      mq/publishers.go

+ 23 - 16
controllers/ext_client.go

@@ -397,9 +397,7 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
 	logger.Log(0, r.Header.Get("user"), "created new ext client on network", networkName)
 	w.WriteHeader(http.StatusOK)
 	go func() {
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error())
-		}
+		mq.BroadcastExtClient(host, &node)
 		if err := mq.PublishExtCLientDNS(&extclient); err != nil {
 			logger.Log(1, "error publishing extclient dns", err.Error())
 		}
@@ -484,7 +482,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 	var changedEnabled = (update.Enabled != oldExtClient.Enabled) // indicates there was a change in enablement
 	// extra var need as logic.Update changes oldExtClient
 	currentClient := oldExtClient
-	newclient, err := logic.UpdateExtClient(&oldExtClient, &update)
+	newclient, replaceOldClient, err := logic.UpdateExtClient(&oldExtClient, &update)
 	if err != nil {
 		logger.Log(0, r.Header.Get("user"),
 			fmt.Sprintf("failed to update ext client [%s], network [%s]: %v",
@@ -493,22 +491,29 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	logger.Log(0, r.Header.Get("user"), "updated ext client", update.ClientID)
-	if changedEnabled { // need to send a peer update to the ingress node as enablement of one of it's clients has changed
+
+	w.WriteHeader(http.StatusOK)
+	json.NewEncoder(w).Encode(newclient)
+
+	go func() {
 		if ingressNode, err := logic.GetNodeByID(newclient.IngressGatewayID); err == nil {
-			if err = mq.PublishPeerUpdate(); err != nil {
-				logger.Log(1, "error setting ext peers on", ingressNode.ID.String(), ":", err.Error())
+			if ingressHost, err := logic.GetHost(ingressNode.HostID.String()); err == nil {
+				if replaceOldClient || !update.Enabled {
+					mq.BroadcastDelExtClient(ingressHost, &ingressNode, []models.ExtClient{currentClient})
+				}
+				if replaceOldClient || changedEnabled {
+					// broadcast update
+					mq.BroadcastExtClient(ingressHost, &ingressNode)
+				}
 			}
 		}
-	}
-	w.WriteHeader(http.StatusOK)
-	json.NewEncoder(w).Encode(newclient)
-	if changedID {
-		go func() {
+		if changedID {
 			if err := mq.PublishExtClientDNSUpdate(currentClient, *newclient, networkName); err != nil {
 				logger.Log(1, "error pubishing dns update for extcient update", err.Error())
 			}
-		}()
-	}
+		}
+	}()
+
 }
 
 // swagger:route DELETE /api/extclients/{network}/{clientid} ext_client deleteExtClient
@@ -574,9 +579,11 @@ func deleteExtClient(w http.ResponseWriter, r *http.Request) {
 	}
 
 	go func() {
-		if err := mq.PublishDeletedClientPeerUpdate(&extclient); err != nil {
-			logger.Log(1, "error setting ext peers on "+ingressnode.ID.String()+": "+err.Error())
+		ingressHost, err := logic.GetHost(ingressnode.HostID.String())
+		if err == nil {
+			go mq.BroadcastDelExtClient(ingressHost, &ingressnode, []models.ExtClient{extclient})
 		}
+
 		if err = mq.PublishDeleteExtClientDNS(&extclient); err != nil {
 			logger.Log(1, "error publishing dns update for extclient deletion", err.Error())
 		}

+ 1 - 6
controllers/node.go

@@ -585,12 +585,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 	if len(removedClients) > 0 {
 		host, err := logic.GetHost(node.HostID.String())
 		if err == nil {
-			go mq.PublishSingleHostPeerUpdate(
-				context.Background(),
-				host,
-				nil,
-				removedClients[:],
-			)
+			mq.BroadcastDelExtClient(host, &node, removedClients)
 		}
 	}
 

+ 5 - 3
logic/extpeers.go

@@ -194,14 +194,16 @@ func SaveExtClient(extclient *models.ExtClient) error {
 }
 
 // UpdateExtClient - updates an ext client with new values
-func UpdateExtClient(old *models.ExtClient, update *models.CustomExtClient) (*models.ExtClient, error) {
+func UpdateExtClient(old *models.ExtClient, update *models.CustomExtClient) (*models.ExtClient, bool, error) {
 	new := old
+	replaceOldClient := false
 	err := DeleteExtClient(old.Network, old.ClientID)
 	if err != nil {
-		return new, err
+		return new, replaceOldClient, err
 	}
 	new.ClientID = update.ClientID
 	if update.PublicKey != "" && old.PublicKey != update.PublicKey {
+		replaceOldClient = true
 		new.PublicKey = update.PublicKey
 	}
 	if update.DNS != "" && update.DNS != old.DNS {
@@ -213,7 +215,7 @@ func UpdateExtClient(old *models.ExtClient, update *models.CustomExtClient) (*mo
 	if update.ExtraAllowedIPs != nil && StringDifference(old.ExtraAllowedIPs, update.ExtraAllowedIPs) != nil {
 		new.ExtraAllowedIPs = update.ExtraAllowedIPs
 	}
-	return new, CreateExtClient(new)
+	return new, replaceOldClient, CreateExtClient(new)
 }
 
 // GetExtClientsByID - gets the clients of attached gateway

+ 4 - 4
logic/peers.go

@@ -248,7 +248,7 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
 
 				if node.IsIngressGateway || node.IsEgressGateway {
 					if peer.IsIngressGateway {
-						_, extPeerIDAndAddrs, err := getExtPeers(&peer)
+						_, extPeerIDAndAddrs, err := GetExtPeers(&peer)
 						if err == nil {
 							for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
 								extPeerIdAndAddr := extPeerIdAndAddr
@@ -330,7 +330,7 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
 		var extPeers []wgtypes.PeerConfig
 		var extPeerIDAndAddrs []models.IDandAddr
 		if node.IsIngressGateway {
-			extPeers, extPeerIDAndAddrs, err = getExtPeers(&node)
+			extPeers, extPeerIDAndAddrs, err = GetExtPeers(&node)
 			if err == nil {
 				for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
 					extPeerIdAndAddr := extPeerIdAndAddr
@@ -446,7 +446,7 @@ func GetProxyListenPort(host *models.Host) int {
 	return proxyPort
 }
 
-func getExtPeers(node *models.Node) ([]wgtypes.PeerConfig, []models.IDandAddr, error) {
+func GetExtPeers(node *models.Node) ([]wgtypes.PeerConfig, []models.IDandAddr, error) {
 	var peers []wgtypes.PeerConfig
 	var idsAndAddr []models.IDandAddr
 	extPeers, err := GetNetworkExtClients(node.Network)
@@ -581,7 +581,7 @@ func GetAllowedIPs(node, peer *models.Node, metrics *models.Metrics) []net.IPNet
 
 	// handle ingress gateway peers
 	if peer.IsIngressGateway {
-		extPeers, _, err := getExtPeers(peer)
+		extPeers, _, err := GetExtPeers(peer)
 		if err != nil {
 			logger.Log(2, "could not retrieve ext peers for ", peer.ID.String(), err.Error())
 		}

+ 51 - 0
mq/publishers.go

@@ -154,6 +154,13 @@ func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node, networkNodes
 		}
 		addPeerAction.Peers = append(addPeerAction.Peers, peerCfg)
 	}
+	if hNode.IsIngressGateway {
+		extPeers, _, err := logic.GetExtPeers(hNode)
+		if err == nil {
+			addPeerAction.Peers = append(addPeerAction.Peers, extPeers...)
+		}
+
+	}
 	if len(rmPeerAction.Peers) > 0 {
 		data, err := json.Marshal(rmPeerAction)
 		if err != nil {
@@ -268,6 +275,50 @@ func BroadcastAddOrUpdatePeer(host *models.Host, node *models.Node, update bool)
 	return nil
 }
 
+// BroadcastExtClient - publishes msg to add/updates ext client in the network
+func BroadcastExtClient(ingressHost *models.Host, ingressNode *models.Node) error {
+
+	nodes, err := logic.GetNetworkNodes(ingressNode.Network)
+	if err != nil {
+		return err
+	}
+	//flush peers to ingress host
+	go FlushNetworkPeersToHost(ingressHost, ingressNode, nodes)
+	// broadcast to update ingress peer to other hosts
+	go BroadcastAddOrUpdatePeer(ingressHost, ingressNode, true)
+	// TODO - send fw update
+	return nil
+}
+
+// BroadcastDelExtClient - published msg to remove ext client from network
+func BroadcastDelExtClient(ingressHost *models.Host, ingressNode *models.Node, extclients []models.ExtClient) error {
+	// TODO - send fw update
+	go BroadcastAddOrUpdatePeer(ingressHost, ingressNode, true)
+	peers := []wgtypes.PeerConfig{}
+	for _, extclient := range extclients {
+		extPubKey, err := wgtypes.ParseKey(extclient.PublicKey)
+		if err != nil {
+			continue
+		}
+		peers = append(peers, wgtypes.PeerConfig{
+			PublicKey: extPubKey,
+			Remove:    true,
+		})
+
+	}
+	p := models.PeerAction{
+		Action: models.RemovePeer,
+		Peers:  peers,
+	}
+
+	data, err := json.Marshal(p)
+	if err != nil {
+		return err
+	}
+	publish(ingressHost, fmt.Sprintf("peer/host/%s/%s", ingressHost.ID.String(), servercfg.GetServer()), data)
+	return nil
+}
+
 // NodeUpdate -- publishes a node update
 func NodeUpdate(node *models.Node) error {
 	host, err := logic.GetHost(node.HostID.String())