소스 검색

Merge branch 'refactor_peer_updates' into relays

Matthew R Kasun 2 년 전
부모
커밋
4d754ed7d0
7개의 변경된 파일230개의 추가작업 그리고 98개의 파일을 삭제
  1. 13 2
      controllers/ext_client.go
  2. 2 1
      controllers/hosts.go
  3. 31 2
      controllers/node.go
  4. 132 85
      logic/peers.go
  5. 12 6
      models/mqtt.go
  6. 1 0
      models/structs.go
  7. 39 2
      mq/publishers.go

+ 13 - 2
controllers/ext_client.go

@@ -398,6 +398,10 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	go func() {
 		mq.BroadcastExtClient(host, &node)
+		f, err := logic.GetFwUpdate(host)
+		if err == nil {
+			mq.PublishFwUpdate(host, &f)
+		}
 		if err := mq.PublishExtCLientDNS(&extclient); err != nil {
 			logger.Log(1, "error publishing extclient dns", err.Error())
 		}
@@ -491,7 +495,6 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	logger.Log(0, r.Header.Get("user"), "updated ext client", update.ClientID)
-
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(newclient)
 
@@ -505,6 +508,10 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 					// broadcast update
 					mq.BroadcastExtClient(ingressHost, &ingressNode)
 				}
+				f, err := logic.GetFwUpdate(ingressHost)
+				if err == nil {
+					mq.PublishFwUpdate(ingressHost, &f)
+				}
 			}
 		}
 		if changedID {
@@ -581,7 +588,11 @@ func deleteExtClient(w http.ResponseWriter, r *http.Request) {
 	go func() {
 		ingressHost, err := logic.GetHost(ingressnode.HostID.String())
 		if err == nil {
-			go mq.BroadcastDelExtClient(ingressHost, &ingressnode, []models.ExtClient{extclient})
+			mq.BroadcastDelExtClient(ingressHost, &ingressnode, []models.ExtClient{extclient})
+			f, err := logic.GetFwUpdate(ingressHost)
+			if err == nil {
+				mq.PublishFwUpdate(ingressHost, &f)
+			}
 		}
 
 		if err = mq.PublishDeleteExtClientDNS(&extclient); err != nil {

+ 2 - 1
controllers/hosts.go

@@ -99,6 +99,7 @@ func pull(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
+	fw, _ := logic.GetFwUpdate(host)
 	serverConf.TrafficKey = key
 	response := models.HostPull{
 		Host:         *host,
@@ -369,7 +370,7 @@ func authenticateHost(response http.ResponseWriter, request *http.Request) {
 	if err != nil {
 		errorResponse.Code = http.StatusBadRequest
 		errorResponse.Message = err.Error()
-		logger.Log(0, request.Header.Get("user"),
+		logger.Log(0, request.Header.Get("user"),var peerUpdate models.HostPeerUpdate
 			"error retrieving host: ", err.Error())
 		logic.ReturnErrorResponse(response, request, errorResponse)
 		return

+ 31 - 2
controllers/node.go

@@ -459,7 +459,19 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
 	go func() {
-		mq.PublishPeerUpdate()
+		host, err := logic.GetHost(node.HostID.String())
+		if err != nil {
+			logger.Log(0, "failed to get egress host: ", err.Error())
+			return
+		}
+		mq.BroadcastAddOrUpdatePeer(host, &node, true)
+		f, err := logic.GetFwUpdate(host)
+		if err != nil {
+			logger.Log(0, "failed to get egreess host: ", err.Error())
+			return
+		}
+		mq.PublishFwUpdate(host, &f)
+
 	}()
 	runUpdates(&node, true)
 }
@@ -495,7 +507,20 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
 	go func() {
-		mq.PublishPeerUpdate()
+
+		host, err := logic.GetHost(node.HostID.String())
+		if err != nil {
+			logger.Log(0, "failed to get egress host: ", err.Error())
+			return
+		}
+		mq.BroadcastAddOrUpdatePeer(host, &node, true)
+		f, err := logic.GetFwUpdate(host)
+		if err != nil {
+			logger.Log(0, "failed to get egreess host: ", err.Error())
+			return
+		}
+		mq.PublishFwUpdate(host, &f)
+
 	}()
 	runUpdates(&node, true)
 }
@@ -584,6 +609,10 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 		host, err := logic.GetHost(node.HostID.String())
 		if err == nil {
 			mq.BroadcastDelExtClient(host, &node, removedClients)
+			f, err := logic.GetFwUpdate(host)
+			if err == nil {
+				mq.PublishFwUpdate(host, &f)
+			}
 		}
 	}
 

+ 132 - 85
logic/peers.go

@@ -105,15 +105,11 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
 	// track which nodes are deleted
 	// after peer calculation, if peer not in list, add delete config of peer
 	hostPeerUpdate := models.HostPeerUpdate{
-		Host:          *host,
-		Server:        servercfg.GetServer(),
-		HostPeerIDs:   make(models.HostPeerMap, 0),
-		ServerVersion: servercfg.GetVersion(),
-		ServerAddrs:   []models.ServerAddr{},
-		IngressInfo: models.IngressInfo{
-			ExtPeers: make(map[string]models.ExtClientInfo),
-		},
-		EgressInfo:      make(map[string]models.EgressInfo),
+		Host:            *host,
+		Server:          servercfg.GetServer(),
+		HostPeerIDs:     make(models.HostPeerMap, 0),
+		ServerVersion:   servercfg.GetVersion(),
+		ServerAddrs:     []models.ServerAddr{},
 		PeerIDs:         make(models.PeerMap, 0),
 		Peers:           []wgtypes.PeerConfig{},
 		NodePeers:       []wgtypes.PeerConfig{},
@@ -132,10 +128,6 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
 			continue
 		}
 		currentPeers := GetNetworkNodesMemory(allNodes, node.Network)
-		var nodePeerMap map[string]models.PeerRouteInfo
-		if node.IsIngressGateway || node.IsEgressGateway {
-			nodePeerMap = make(map[string]models.PeerRouteInfo)
-		}
 		for _, peer := range currentPeers {
 			select {
 			case <-ctx.Done():
@@ -210,39 +202,6 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
 					peerConfig.AllowedIPs = allowedips // only append allowed IPs if valid connection
 				}
 
-				if node.IsIngressGateway || node.IsEgressGateway {
-					if peer.IsIngressGateway {
-						_, extPeerIDAndAddrs, err := GetExtPeers(&peer)
-						if err == nil {
-							for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
-								extPeerIdAndAddr := extPeerIdAndAddr
-								nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{
-									PeerAddr: net.IPNet{
-										IP:   net.ParseIP(extPeerIdAndAddr.Address),
-										Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
-									},
-									PeerKey: extPeerIdAndAddr.ID,
-									Allow:   true,
-									ID:      extPeerIdAndAddr.ID,
-								}
-							}
-						}
-					}
-					if node.IsIngressGateway && peer.IsEgressGateway {
-						hostPeerUpdate.IngressInfo.EgressRanges = append(hostPeerUpdate.IngressInfo.EgressRanges,
-							peer.EgressGatewayRanges...)
-					}
-					nodePeerMap[peerHost.PublicKey.String()] = models.PeerRouteInfo{
-						PeerAddr: net.IPNet{
-							IP:   net.ParseIP(peer.PrimaryAddress()),
-							Mask: getCIDRMaskFromAddr(peer.PrimaryAddress()),
-						},
-						PeerKey: peerHost.PublicKey.String(),
-						Allow:   true,
-						ID:      peer.ID.String(),
-					}
-				}
-
 				peerProxyPort := GetProxyListenPort(peerHost)
 				var nodePeer wgtypes.PeerConfig
 				if _, ok := hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()]; !ok {
@@ -296,18 +255,6 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
 		if node.IsIngressGateway {
 			extPeers, extPeerIDAndAddrs, err = GetExtPeers(&node)
 			if err == nil {
-				for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
-					extPeerIdAndAddr := extPeerIdAndAddr
-					nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{
-						PeerAddr: net.IPNet{
-							IP:   net.ParseIP(extPeerIdAndAddr.Address),
-							Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
-						},
-						PeerKey: extPeerIdAndAddr.ID,
-						Allow:   true,
-						ID:      extPeerIdAndAddr.ID,
-					}
-				}
 				hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, extPeers...)
 				for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
 					extPeerIdAndAddr := extPeerIdAndAddr
@@ -318,21 +265,6 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
 						Name:    extPeerIdAndAddr.Name,
 						Network: node.Network,
 					}
-
-					hostPeerUpdate.IngressInfo.ExtPeers[extPeerIdAndAddr.ID] = models.ExtClientInfo{
-						Masquerade: true,
-						IngGwAddr: net.IPNet{
-							IP:   net.ParseIP(node.PrimaryAddress()),
-							Mask: getCIDRMaskFromAddr(node.PrimaryAddress()),
-						},
-						Network: node.PrimaryNetworkRange(),
-						ExtPeerAddr: net.IPNet{
-							IP:   net.ParseIP(extPeerIdAndAddr.Address),
-							Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
-						},
-						ExtPeerKey: extPeerIdAndAddr.ID,
-						Peers:      filterNodeMapForClientACLs(extPeerIdAndAddr.ID, node.Network, nodePeerMap),
-					}
 					if node.Network == network {
 						hostPeerUpdate.PeerIDs[extPeerIdAndAddr.ID] = extPeerIdAndAddr
 						hostPeerUpdate.NodePeers = append(hostPeerUpdate.NodePeers, extPeers...)
@@ -342,18 +274,6 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
 				logger.Log(1, "error retrieving external clients:", err.Error())
 			}
 		}
-		if node.IsEgressGateway {
-			hostPeerUpdate.EgressInfo[node.ID.String()] = models.EgressInfo{
-				EgressID: node.ID.String(),
-				Network:  node.PrimaryNetworkRange(),
-				EgressGwAddr: net.IPNet{
-					IP:   net.ParseIP(node.PrimaryAddress()),
-					Mask: getCIDRMaskFromAddr(node.PrimaryAddress()),
-				},
-				GwPeers:     nodePeerMap,
-				EgressGWCfg: node.EgressGatewayRequest,
-			}
-		}
 	}
 	// == post peer calculations ==
 	// indicate removal if no allowed IPs were calculated
@@ -389,6 +309,133 @@ func GetPeerUpdateForHost(ctx context.Context, network string, host *models.Host
 	return hostPeerUpdate, nil
 }
 
+// GetFwUpdate - fetches the firewall update for the gateway nodes on the host
+func GetFwUpdate(host *models.Host) (models.FwUpdate, error) {
+	fwUpdate := models.FwUpdate{
+		IngressInfo: models.IngressInfo{
+			ExtPeers: make(map[string]models.ExtClientInfo),
+		},
+		EgressInfo: make(map[string]models.EgressInfo),
+	}
+	allNodes, err := GetAllNodes()
+	if err != nil {
+		return fwUpdate, err
+	}
+	for _, nodeID := range host.Nodes {
+		nodeID := nodeID
+		node, err := GetNodeByID(nodeID)
+		if err != nil {
+			continue
+		}
+		if !node.Connected || node.PendingDelete || node.Action == models.NODE_DELETE {
+			continue
+		}
+		currentPeers := GetNetworkNodesMemory(allNodes, node.Network)
+		var nodePeerMap map[string]models.PeerRouteInfo
+		if node.IsIngressGateway || node.IsEgressGateway {
+			nodePeerMap = make(map[string]models.PeerRouteInfo)
+		}
+		for _, peer := range currentPeers {
+			peer := peer
+			if peer.ID.String() == node.ID.String() {
+				logger.Log(2, "fw update, skipping self")
+				//skip yourself
+				continue
+			}
+			peerHost, err := GetHost(peer.HostID.String())
+			if err != nil {
+				logger.Log(1, "no peer host", peer.HostID.String(), err.Error())
+				continue
+			}
+			if node.IsIngressGateway || node.IsEgressGateway {
+				if peer.IsIngressGateway {
+					_, extPeerIDAndAddrs, err := GetExtPeers(&peer)
+					if err == nil {
+						for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
+							extPeerIdAndAddr := extPeerIdAndAddr
+							nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{
+								PeerAddr: net.IPNet{
+									IP:   net.ParseIP(extPeerIdAndAddr.Address),
+									Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
+								},
+								PeerKey: extPeerIdAndAddr.ID,
+								Allow:   true,
+								ID:      extPeerIdAndAddr.ID,
+							}
+						}
+					}
+				}
+				if node.IsIngressGateway && peer.IsEgressGateway {
+					fwUpdate.IngressInfo.EgressRanges = append(fwUpdate.IngressInfo.EgressRanges,
+						peer.EgressGatewayRanges...)
+				}
+				nodePeerMap[peerHost.PublicKey.String()] = models.PeerRouteInfo{
+					PeerAddr: net.IPNet{
+						IP:   net.ParseIP(peer.PrimaryAddress()),
+						Mask: getCIDRMaskFromAddr(peer.PrimaryAddress()),
+					},
+					PeerKey: peerHost.PublicKey.String(),
+					Allow:   true,
+					ID:      peer.ID.String(),
+				}
+			}
+		}
+		var extPeerIDAndAddrs []models.IDandAddr
+		if node.IsIngressGateway {
+			fwUpdate.IsIngressGw = true
+			_, extPeerIDAndAddrs, err = GetExtPeers(&node)
+			if err == nil {
+				for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
+					extPeerIdAndAddr := extPeerIdAndAddr
+					nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{
+						PeerAddr: net.IPNet{
+							IP:   net.ParseIP(extPeerIdAndAddr.Address),
+							Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
+						},
+						PeerKey: extPeerIdAndAddr.ID,
+						Allow:   true,
+						ID:      extPeerIdAndAddr.ID,
+					}
+				}
+				for _, extPeerIdAndAddr := range extPeerIDAndAddrs {
+					extPeerIdAndAddr := extPeerIdAndAddr
+
+					fwUpdate.IngressInfo.ExtPeers[extPeerIdAndAddr.ID] = models.ExtClientInfo{
+						Masquerade: true,
+						IngGwAddr: net.IPNet{
+							IP:   net.ParseIP(node.PrimaryAddress()),
+							Mask: getCIDRMaskFromAddr(node.PrimaryAddress()),
+						},
+						Network: node.PrimaryNetworkRange(),
+						ExtPeerAddr: net.IPNet{
+							IP:   net.ParseIP(extPeerIdAndAddr.Address),
+							Mask: getCIDRMaskFromAddr(extPeerIdAndAddr.Address),
+						},
+						ExtPeerKey: extPeerIdAndAddr.ID,
+						Peers:      filterNodeMapForClientACLs(extPeerIdAndAddr.ID, node.Network, nodePeerMap),
+					}
+				}
+			} else if !database.IsEmptyRecord(err) {
+				logger.Log(1, "error retrieving external clients:", err.Error())
+			}
+		}
+		if node.IsEgressGateway {
+			fwUpdate.IsEgressGw = true
+			fwUpdate.EgressInfo[node.ID.String()] = models.EgressInfo{
+				EgressID: node.ID.String(),
+				Network:  node.PrimaryNetworkRange(),
+				EgressGwAddr: net.IPNet{
+					IP:   net.ParseIP(node.PrimaryAddress()),
+					Mask: getCIDRMaskFromAddr(node.PrimaryAddress()),
+				},
+				GwPeers:     nodePeerMap,
+				EgressGWCfg: node.EgressGatewayRequest,
+			}
+		}
+	}
+	return fwUpdate, nil
+}
+
 // GetPeerListenPort - given a host, retrieve it's appropriate listening port
 func GetPeerListenPort(host *models.Host) int {
 	peerPort := host.ListenPort

+ 12 - 6
models/mqtt.go

@@ -14,12 +14,10 @@ type HostPeerUpdate struct {
 	ServerAddrs     []ServerAddr         `json:"serveraddrs" bson:"serveraddrs" yaml:"serveraddrs"`
 	NodePeers       []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"`
 	Peers           []wgtypes.PeerConfig
-	HostPeerIDs     HostPeerMap           `json:"hostpeerids" bson:"hostpeerids" yaml:"hostpeerids"`
-	ProxyUpdate     ProxyManagerPayload   `json:"proxy_update" bson:"proxy_update" yaml:"proxy_update"`
-	EgressInfo      map[string]EgressInfo `json:"egress_info" bson:"egress_info" yaml:"egress_info"` // map key is node ID
-	IngressInfo     IngressInfo           `json:"ingress_info" bson:"ext_peers" yaml:"ext_peers"`
-	PeerIDs         PeerMap               `json:"peerids" bson:"peerids" yaml:"peerids"`
-	HostNetworkInfo HostInfoMap           `json:"host_network_info,omitempty" bson:"host_network_info,omitempty" yaml:"host_network_info,omitempty"`
+	HostPeerIDs     HostPeerMap         `json:"hostpeerids" bson:"hostpeerids" yaml:"hostpeerids"`
+	ProxyUpdate     ProxyManagerPayload `json:"proxy_update" bson:"proxy_update" yaml:"proxy_update"`
+	PeerIDs         PeerMap             `json:"peerids" bson:"peerids" yaml:"peerids"`
+	HostNetworkInfo HostInfoMap         `json:"host_network_info,omitempty" bson:"host_network_info,omitempty" yaml:"host_network_info,omitempty"`
 }
 
 // IngressInfo - struct for ingress info
@@ -78,3 +76,11 @@ type PeerAction struct {
 	Action PeerMqActionType     `json:"action"`
 	Peers  []wgtypes.PeerConfig `json:"peers"`
 }
+
+// FwUpdate - struct for firewall updates
+type FwUpdate struct {
+	IsIngressGw bool                  `json:"is_ingress_gw"`
+	IsEgressGw  bool                  `json:"is_egress_gw"`
+	IngressInfo IngressInfo           `json:"ingress_info"`
+	EgressInfo  map[string]EgressInfo `json:"egress_info"`
+}

+ 1 - 0
models/structs.go

@@ -204,6 +204,7 @@ type HostPull struct {
 	Peers        []wgtypes.PeerConfig `json:"peers" yaml:"peers"`
 	ServerConfig ServerConfig         `json:"server_config" yaml:"server_config"`
 	PeerIDs      PeerMap              `json:"peer_ids,omitempty" yaml:"peer_ids,omitempty"`
+	FwUpdate     FwUpdate             `json:"fw_update" yaml:"fw_update"`
 }
 
 // NodeGet - struct for a single node get response

+ 39 - 2
mq/publishers.go

@@ -175,6 +175,14 @@ func FlushNetworkPeersToHost(host *models.Host, hNode *models.Node, networkNodes
 		}
 		publish(host, fmt.Sprintf("peer/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
 	}
+	// send fw update if gw host
+	if hNode.IsIngressGateway || hNode.IsEgressGateway {
+		f, err := logic.GetFwUpdate(host)
+		if err == nil {
+			PublishFwUpdate(host, &f)
+		}
+
+	}
 	return nil
 }
 
@@ -205,6 +213,14 @@ func BroadcastDelPeer(host *models.Host, network string) error {
 		peerHost, err := logic.GetHost(nodeI.HostID.String())
 		if err == nil {
 			publish(peerHost, fmt.Sprintf("peer/host/%s/%s", peerHost.ID.String(), servercfg.GetServer()), data)
+			if nodeI.IsIngressGateway || nodeI.IsEgressGateway {
+				go func(peerHost models.Host) {
+					f, err := logic.GetFwUpdate(&peerHost)
+					if err == nil {
+						PublishFwUpdate(&peerHost, &f)
+					}
+				}(*peerHost)
+			}
 		}
 	}
 	return nil
@@ -271,6 +287,16 @@ func BroadcastAddOrUpdatePeer(host *models.Host, node *models.Node, update bool)
 		if err == nil {
 			publish(peerHost, fmt.Sprintf("peer/host/%s/%s", peerHost.ID.String(), servercfg.GetServer()), data)
 		}
+		if nodeI.IsIngressGateway || nodeI.IsEgressGateway {
+			go func(peerHost models.Host) {
+				f, err := logic.GetFwUpdate(&peerHost)
+				if err == nil {
+					PublishFwUpdate(&peerHost, &f)
+				}
+			}(*peerHost)
+
+		}
+
 	}
 	return nil
 }
@@ -286,7 +312,6 @@ func BroadcastExtClient(ingressHost *models.Host, ingressNode *models.Node) erro
 	go FlushNetworkPeersToHost(ingressHost, ingressNode, nodes)
 	// broadcast to update ingress peer to other hosts
 	go BroadcastAddOrUpdatePeer(ingressHost, ingressNode, true)
-	// TODO - send fw update
 	return nil
 }
 
@@ -315,7 +340,10 @@ func BroadcastDelExtClient(ingressHost *models.Host, ingressNode *models.Node, e
 	if err != nil {
 		return err
 	}
-	publish(ingressHost, fmt.Sprintf("peer/host/%s/%s", ingressHost.ID.String(), servercfg.GetServer()), data)
+	err = publish(ingressHost, fmt.Sprintf("peer/host/%s/%s", ingressHost.ID.String(), servercfg.GetServer()), data)
+	if err != nil {
+		return err
+	}
 	return nil
 }
 
@@ -557,6 +585,15 @@ func PublishHostDNSUpdate(old, new *models.Host, networks []string) error {
 	return nil
 }
 
+// PublishFwUpdate - publishes fw update to host
+func PublishFwUpdate(gwHost *models.Host, f *models.FwUpdate) error {
+	data, err := json.Marshal(f)
+	if err != nil {
+		return err
+	}
+	return publish(gwHost, fmt.Sprintf("fw/host/%s/%s", gwHost.ID.String(), servercfg.GetServer()), data)
+}
+
 func pushMetricsToExporter(metrics models.Metrics) error {
 	logger.Log(2, "----> Pushing metrics to exporter")
 	data, err := json.Marshal(metrics)