Browse Source

break into smaller funcs

Matthew R Kasun 2 years ago
parent
commit
c7c864b229
1 changed files with 106 additions and 129 deletions
  1. 106 129
      mq/publishers.go

+ 106 - 129
mq/publishers.go

@@ -56,7 +56,7 @@ func PublishSingleHostUpdate(host *models.Host) error {
 	return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
 }
 
-// PublishPeerUpdate --- publishes a peer update to all the peers of a node
+// PublishExtPeerUpdate --- publishes a peer update to all the peers of a node
 func PublishExtPeerUpdate(node *models.Node) error {
 
 	go PublishPeerUpdate()
@@ -111,40 +111,6 @@ func HostUpdate(hostUpdate *models.HostUpdate) error {
 	return nil
 }
 
-// sendPeers - retrieve networks, send peer ports to all peers
-func sendPeers() {
-
-	hosts, err := logic.GetAllHosts()
-	if err != nil {
-		logger.Log(1, "error retrieving networks for keepalive", err.Error())
-	}
-
-	var force bool
-	peer_force_send++
-	if peer_force_send == 5 {
-		servercfg.SetHost()
-		force = true
-		peer_force_send = 0
-		err := logic.TimerCheckpoint() // run telemetry & log dumps if 24 hours has passed..
-		if err != nil {
-			logger.Log(3, "error occurred on timer,", err.Error())
-		}
-
-		//collectServerMetrics(networks[:])
-	}
-
-	for _, host := range hosts {
-		if force {
-			host := host
-			logger.Log(2, "sending scheduled peer update (5 min)")
-			err = PublishSingleHostUpdate(&host)
-			if err != nil {
-				logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
-			}
-		}
-	}
-}
-
 // ServerStartNotify - notifies all non server nodes to pull changes after a restart
 func ServerStartNotify() error {
 	nodes, err := logic.GetAllNodes()
@@ -188,58 +154,13 @@ func PublishDNSUpdate(network string, dns models.DNSUpdate) error {
 // PublishAllDNS publishes an array of dns updates (ip / host.network) for each peer to a node joining a network
 func PublishAllDNS(newnode *models.Node) error {
 	alldns := []models.DNSUpdate{}
-	dns := models.DNSUpdate{}
 	newnodeHost, err := logic.GetHost(newnode.HostID.String())
 	if err != nil {
 		return fmt.Errorf("error retrieving host for dns update %w", err)
 	}
-	nodes, err := logic.GetNetworkNodes(newnode.Network)
-	if err != nil {
-		return err
-	}
-	for _, node := range nodes {
-		host, err := logic.GetHost(node.HostID.String())
-		if err != nil {
-			logger.Log(0, "error retrieving host for dns update", host.ID.String(), err.Error())
-			continue
-		}
-		dns.Action = models.DNSInsert
-		dns.Name = host.Name + "." + node.Network
-		if node.Address.IP != nil {
-			dns.Address = node.Address.IP.String()
-			alldns = append(alldns, dns)
-		}
-		if node.Address6.IP != nil {
-			dns.Address = node.Address6.IP.String()
-			alldns = append(alldns, dns)
-		}
-	}
-	clients, err := logic.GetNetworkExtClients(newnode.Network)
-	if err != nil {
-		logger.Log(0, "error retrieving extclients", err.Error())
-	}
-	for _, client := range clients {
-		dns.Action = models.DNSInsert
-		dns.Name = client.ClientID + "." + client.Network
-		if client.Address != "" {
-			dns.Address = client.Address
-			alldns = append(alldns, dns)
-		}
-		if client.Address6 != "" {
-			dns.Address = client.Address
-			alldns = append(alldns, dns)
-		}
-	}
-	customdns, err := logic.GetCustomDNS(newnode.Network)
-	if err != nil {
-		logger.Log(0, "error retrieving custom dns entries", err.Error())
-	}
-	for _, custom := range customdns {
-		dns.Action = models.DNSInsert
-		dns.Address = custom.Address
-		dns.Name = custom.Name + "." + custom.Network
-		alldns = append(alldns, dns)
-	}
+	alldns = append(alldns, getNodeDNS(newnode.Network)...)
+	alldns = append(alldns, getExtClientDNS(newnode.Network)...)
+	alldns = append(alldns, getCustomDNS(newnode.Network)...)
 	data, err := json.Marshal(alldns)
 	if err != nil {
 		return fmt.Errorf("error encoding dns data %w", err)
@@ -272,7 +193,7 @@ func PublishDNSDelete(node *models.Node, host *models.Host) error {
 	return nil
 }
 
-// PublishReplaceNDS publish a dns update to replace a dns entry on all hosts in network
+// PublishReplaceDNS publish a dns update to replace a dns entry on all hosts in network
 func PublishReplaceDNS(oldNode, newNode *models.Node, host *models.Host) error {
 	dns := models.DNSUpdate{
 		Action: models.DNSReplaceIP,
@@ -322,7 +243,7 @@ func PublishExtCLientDNS(client *models.ExtClient) error {
 	return nil
 }
 
-// PublishExtClientUpdate publishes dns update for extclient name change
+// PublishExtClientDNSUpdate update for extclient name change
 func PublishExtClientDNSUpdate(old, new models.ExtClient, network string) error {
 	dns := models.DNSUpdate{
 		Action:  models.DNSReplaceName,
@@ -335,7 +256,7 @@ func PublishExtClientDNSUpdate(old, new models.ExtClient, network string) error
 	return nil
 }
 
-// PublishDeleteExtClient publish dns update to delete extclient entry
+// PublishDeleteExtClientDNS publish dns update to delete extclient entry
 func PublishDeleteExtClientDNS(client *models.ExtClient) error {
 	dns := models.DNSUpdate{
 		Action: models.DNSDeleteByName,
@@ -380,49 +301,6 @@ func PublishHostDNSUpdate(old, new *models.Host, networks []string) error {
 	return nil
 }
 
-// function to collect and store metrics for server nodes
-//func collectServerMetrics(networks []models.Network) {
-//	if !servercfg.Is_EE {
-//		return
-//	}
-//	if len(networks) > 0 {
-//		for i := range networks {
-//			currentNetworkNodes, err := logic.GetNetworkNodes(networks[i].NetID)
-//			if err != nil {
-//				continue
-//			}
-//			currentServerNodes := logic.GetServerNodes(networks[i].NetID)
-//			if len(currentServerNodes) > 0 {
-//				for i := range currentServerNodes {
-//					if logic.IsLocalServer(&currentServerNodes[i]) {
-//						serverMetrics := logic.CollectServerMetrics(currentServerNodes[i].ID, currentNetworkNodes)
-//						if serverMetrics != nil {
-//							serverMetrics.NodeName = currentServerNodes[i].Name
-//							serverMetrics.NodeID = currentServerNodes[i].ID
-//							serverMetrics.IsServer = "yes"
-//							serverMetrics.Network = currentServerNodes[i].Network
-//							if err = metrics.GetExchangedBytesForNode(&currentServerNodes[i], serverMetrics); err != nil {
-//								logger.Log(1, fmt.Sprintf("failed to update exchanged bytes info for server: %s, err: %v",
-//									currentServerNodes[i].Name, err))
-//							}
-//							updateNodeMetrics(&currentServerNodes[i], serverMetrics)
-//							if err = logic.UpdateMetrics(currentServerNodes[i].ID, serverMetrics); err != nil {
-//								logger.Log(1, "failed to update metrics for server node", currentServerNodes[i].ID)
-//							}
-//							if servercfg.IsMetricsExporter() {
-//								logger.Log(2, "-------------> SERVER METRICS: ", fmt.Sprintf("%+v", serverMetrics))
-//								if err := pushMetricsToExporter(*serverMetrics); err != nil {
-//									logger.Log(2, "failed to push server metrics to exporter: ", err.Error())
-//								}
-//							}
-//						}
-//					}
-//				}
-//			}
-//		}
-//	}
-//}
-
 func pushMetricsToExporter(metrics models.Metrics) error {
 	logger.Log(2, "----> Pushing metrics to exporter")
 	data, err := json.Marshal(metrics)
@@ -440,3 +318,102 @@ func pushMetricsToExporter(metrics models.Metrics) error {
 	}
 	return nil
 }
+
+func getNodeDNS(network string) []models.DNSUpdate {
+	alldns := []models.DNSUpdate{}
+	dns := models.DNSUpdate{}
+	nodes, err := logic.GetNetworkNodes(network)
+	if err != nil {
+		logger.Log(0, "error retreiving network nodes for network", network, err.Error())
+	}
+	for _, node := range nodes {
+		host, err := logic.GetHost(node.HostID.String())
+		if err != nil {
+			logger.Log(0, "error retrieving host for dns update", host.ID.String(), err.Error())
+			continue
+		}
+		dns.Action = models.DNSInsert
+		dns.Name = host.Name + "." + node.Network
+		if node.Address.IP != nil {
+			dns.Address = node.Address.IP.String()
+			alldns = append(alldns, dns)
+		}
+		if node.Address6.IP != nil {
+			dns.Address = node.Address6.IP.String()
+			alldns = append(alldns, dns)
+		}
+	}
+	return alldns
+}
+
+func getExtClientDNS(network string) []models.DNSUpdate {
+	alldns := []models.DNSUpdate{}
+	dns := models.DNSUpdate{}
+	clients, err := logic.GetNetworkExtClients(network)
+	if err != nil {
+		logger.Log(0, "error retrieving extclients", err.Error())
+	}
+	for _, client := range clients {
+		dns.Action = models.DNSInsert
+		dns.Name = client.ClientID + "." + client.Network
+		if client.Address != "" {
+			dns.Address = client.Address
+			alldns = append(alldns, dns)
+		}
+		if client.Address6 != "" {
+			dns.Address = client.Address
+			alldns = append(alldns, dns)
+		}
+	}
+	return alldns
+}
+
+func getCustomDNS(network string) []models.DNSUpdate {
+	alldns := []models.DNSUpdate{}
+	dns := models.DNSUpdate{}
+	customdns, err := logic.GetCustomDNS(network)
+	if err != nil {
+		logger.Log(0, "error retrieving custom dns entries", err.Error())
+	}
+	for _, custom := range customdns {
+		dns.Action = models.DNSInsert
+		dns.Address = custom.Address
+		dns.Name = custom.Name + "." + custom.Network
+		alldns = append(alldns, dns)
+	}
+	return alldns
+}
+
+// sendPeers - retrieve networks, send peer ports to all peers
+func sendPeers() {
+
+	hosts, err := logic.GetAllHosts()
+	if err != nil {
+		logger.Log(1, "error retrieving networks for keepalive", err.Error())
+	}
+
+	var force bool
+	peer_force_send++
+	if peer_force_send == 5 {
+		servercfg.SetHost()
+		force = true
+		peer_force_send = 0
+		err := logic.TimerCheckpoint() // run telemetry & log dumps if 24 hours has passed..
+		if err != nil {
+			logger.Log(3, "error occurred on timer,", err.Error())
+		}
+
+		//collectServerMetrics(networks[:])
+	}
+
+	for _, host := range hosts {
+		if force {
+			host := host
+			logger.Log(2, "sending scheduled peer update (5 min)")
+			err = PublishSingleHostUpdate(&host)
+			if err != nil {
+				logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
+			}
+		}
+	}
+}