Browse Source

Merge pull request #703 from gravitl/bugfix_v0.10.0_blocks_on_subscriptions

remove go routines for handling reciept of subscribed messages
dcarns 3 years ago
parent
commit
4ffbbf128a
1 changed files with 125 additions and 131 deletions
  1. 125 131
      netclient/functions/daemon.go

+ 125 - 131
netclient/functions/daemon.go

@@ -224,157 +224,152 @@ var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 
 // NodeUpdate -- mqtt message handler for /update/<NodeID> topic
 func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
-	//potentiall blocking i/o so do this in a go routine
-	go func() {
-		var newNode models.Node
-		var cfg config.ClientConfig
-		var network = parseNetworkFromTopic(msg.Topic())
-		cfg.Network = network
-		cfg.ReadConfig()
-
-		data, dataErr := decryptMsg(&cfg, msg.Payload())
-		if dataErr != nil {
-			return
-		}
-		err := json.Unmarshal([]byte(data), &newNode)
-		if err != nil {
-			ncutils.Log("error unmarshalling node update data" + err.Error())
-			return
-		}
+	var newNode models.Node
+	var cfg config.ClientConfig
+	var network = parseNetworkFromTopic(msg.Topic())
+	cfg.Network = network
+	cfg.ReadConfig()
 
-		ncutils.Log("received message to update node " + newNode.Name)
-		// see if cache hit, if so skip
-		var currentMessage = read(newNode.Network, lastNodeUpdate)
-		if currentMessage == string(data) {
-			return
+	data, dataErr := decryptMsg(&cfg, msg.Payload())
+	if dataErr != nil {
+		return
+	}
+	err := json.Unmarshal([]byte(data), &newNode)
+	if err != nil {
+		ncutils.Log("error unmarshalling node update data" + err.Error())
+		return
+	}
+
+	ncutils.Log("received message to update node " + newNode.Name)
+	// see if cache hit, if so skip
+	var currentMessage = read(newNode.Network, lastNodeUpdate)
+	if currentMessage == string(data) {
+		return
+	}
+	insert(newNode.Network, lastNodeUpdate, string(data)) // store new message in cache
+
+	//check if interface name has changed if so delete.
+	if cfg.Node.Interface != newNode.Interface {
+		if err = wireguard.RemoveConf(cfg.Node.Interface, true); err != nil {
+			ncutils.PrintLog("could not delete old interface "+cfg.Node.Interface+": "+err.Error(), 1)
 		}
-		insert(newNode.Network, lastNodeUpdate, string(data)) // store new message in cache
+	}
+	newNode.PullChanges = "no"
+	//ensure that OS never changes
+	newNode.OS = runtime.GOOS
+	// check if interface needs to delta
+	ifaceDelta := ncutils.IfaceDelta(&cfg.Node, &newNode)
+	shouldDNSChange := cfg.Node.DNSOn != newNode.DNSOn
 
-		//check if interface name has changed if so delete.
-		if cfg.Node.Interface != newNode.Interface {
-			if err = wireguard.RemoveConf(cfg.Node.Interface, true); err != nil {
-				ncutils.PrintLog("could not delete old interface "+cfg.Node.Interface+": "+err.Error(), 1)
-			}
+	cfg.Node = newNode
+	switch newNode.Action {
+	case models.NODE_DELETE:
+		if cancel, ok := networkcontext.Load(newNode.Network); ok {
+			ncutils.Log("cancelling message queue context for " + newNode.Network)
+			cancel.(context.CancelFunc)()
+		} else {
+			ncutils.Log("failed to kill go routines for network " + newNode.Network)
 		}
-		newNode.PullChanges = "no"
-		//ensure that OS never changes
-		newNode.OS = runtime.GOOS
-		// check if interface needs to delta
-		ifaceDelta := ncutils.IfaceDelta(&cfg.Node, &newNode)
-		shouldDNSChange := cfg.Node.DNSOn != newNode.DNSOn
-
-		cfg.Node = newNode
-		switch newNode.Action {
-		case models.NODE_DELETE:
-			if cancel, ok := networkcontext.Load(newNode.Network); ok {
-				ncutils.Log("cancelling message queue context for " + newNode.Network)
-				cancel.(context.CancelFunc)()
-			} else {
-				ncutils.Log("failed to kill go routines for network " + newNode.Network)
-			}
-			ncutils.PrintLog(fmt.Sprintf("received delete request for %s", cfg.Node.Name), 1)
-			if err = LeaveNetwork(cfg.Node.Network); err != nil {
-				if !strings.Contains("rpc error", err.Error()) {
-					ncutils.PrintLog(fmt.Sprintf("failed to leave, please check that local files for network %s were removed", cfg.Node.Network), 1)
-				}
+		ncutils.PrintLog(fmt.Sprintf("received delete request for %s", cfg.Node.Name), 1)
+		if err = LeaveNetwork(cfg.Node.Network); err != nil {
+			if !strings.Contains("rpc error", err.Error()) {
+				ncutils.PrintLog(fmt.Sprintf("failed to leave, please check that local files for network %s were removed", cfg.Node.Network), 1)
 			}
-			ncutils.PrintLog(fmt.Sprintf("%s was removed", cfg.Node.Name), 1)
-			return
-		case models.NODE_UPDATE_KEY:
-			if err := UpdateKeys(&cfg, client); err != nil {
-				ncutils.PrintLog("err updating wireguard keys: "+err.Error(), 1)
-			}
-			ifaceDelta = true
-		case models.NODE_NOOP:
-		default:
 		}
-		// Save new config
-		cfg.Node.Action = models.NODE_NOOP
-		if err := config.Write(&cfg, cfg.Network); err != nil {
-			ncutils.PrintLog("error updating node configuration: "+err.Error(), 1)
+		ncutils.PrintLog(fmt.Sprintf("%s was removed", cfg.Node.Name), 1)
+		return
+	case models.NODE_UPDATE_KEY:
+		if err := UpdateKeys(&cfg, client); err != nil {
+			ncutils.PrintLog("err updating wireguard keys: "+err.Error(), 1)
 		}
-		nameserver := cfg.Server.CoreDNSAddr
-		privateKey, err := wireguard.RetrievePrivKey(newNode.Network)
+		ifaceDelta = true
+	case models.NODE_NOOP:
+	default:
+	}
+	// Save new config
+	cfg.Node.Action = models.NODE_NOOP
+	if err := config.Write(&cfg, cfg.Network); err != nil {
+		ncutils.PrintLog("error updating node configuration: "+err.Error(), 1)
+	}
+	nameserver := cfg.Server.CoreDNSAddr
+	privateKey, err := wireguard.RetrievePrivKey(newNode.Network)
+	if err != nil {
+		ncutils.Log("error reading PrivateKey " + err.Error())
+		return
+	}
+	file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
+	if err := wireguard.UpdateWgInterface(file, privateKey, nameserver, newNode); err != nil {
+		ncutils.Log("error updating wireguard config " + err.Error())
+		return
+	}
+	if ifaceDelta {
+		ncutils.Log("applying WG conf to " + file)
+		err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file)
 		if err != nil {
-			ncutils.Log("error reading PrivateKey " + err.Error())
+			ncutils.Log("error restarting wg after node update " + err.Error())
 			return
 		}
-		file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
-		if err := wireguard.UpdateWgInterface(file, privateKey, nameserver, newNode); err != nil {
-			ncutils.Log("error updating wireguard config " + err.Error())
+		time.Sleep(time.Second >> 1)
+		if err = Resubscribe(client, &cfg); err != nil {
+			ncutils.Log("error resubscribing after interface change " + err.Error())
 			return
 		}
-		if ifaceDelta {
-			ncutils.Log("applying WG conf to " + file)
-			err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file)
-			if err != nil {
-				ncutils.Log("error restarting wg after node update " + err.Error())
-				return
-			}
-			time.Sleep(time.Second >> 1)
-			if err = Resubscribe(client, &cfg); err != nil {
-				ncutils.Log("error resubscribing after interface change " + err.Error())
-				return
-			}
-			if newNode.DNSOn == "yes" {
-				for _, server := range newNode.NetworkSettings.DefaultServerAddrs {
-					if server.IsLeader {
-						go local.SetDNSWithRetry(newNode, server.Address)
-						break
-					}
+		if newNode.DNSOn == "yes" {
+			for _, server := range newNode.NetworkSettings.DefaultServerAddrs {
+				if server.IsLeader {
+					go local.SetDNSWithRetry(newNode, server.Address)
+					break
 				}
 			}
 		}
-		//deal with DNS
-		if newNode.DNSOn != "yes" && shouldDNSChange && cfg.Node.Interface != "" {
-			ncutils.Log("settng DNS off")
-			_, err := ncutils.RunCmd("/usr/bin/resolvectl revert "+cfg.Node.Interface, true)
-			if err != nil {
-				ncutils.Log("error applying dns" + err.Error())
-			}
+	}
+	//deal with DNS
+	if newNode.DNSOn != "yes" && shouldDNSChange && cfg.Node.Interface != "" {
+		ncutils.Log("settng DNS off")
+		_, err := ncutils.RunCmd("/usr/bin/resolvectl revert "+cfg.Node.Interface, true)
+		if err != nil {
+			ncutils.Log("error applying dns" + err.Error())
 		}
-	}()
+	}
 }
 
 // UpdatePeers -- mqtt message handler for peers/<Network>/<NodeID> topic
 func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
-	go func() {
-		var peerUpdate models.PeerUpdate
-		var network = parseNetworkFromTopic(msg.Topic())
-		var cfg = config.ClientConfig{}
-		cfg.Network = network
-		cfg.ReadConfig()
-
-		data, dataErr := decryptMsg(&cfg, msg.Payload())
-		if dataErr != nil {
-			return
-		}
-		err := json.Unmarshal([]byte(data), &peerUpdate)
-		if err != nil {
-			ncutils.Log("error unmarshalling peer data")
-			return
-		}
-		// see if cached hit, if so skip
-		var currentMessage = read(peerUpdate.Network, lastPeerUpdate)
-		if currentMessage == string(data) {
-			return
-		}
-		insert(peerUpdate.Network, lastPeerUpdate, string(data))
+	var peerUpdate models.PeerUpdate
+	var network = parseNetworkFromTopic(msg.Topic())
+	var cfg = config.ClientConfig{}
+	cfg.Network = network
+	cfg.ReadConfig()
 
-		file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
-		err = wireguard.UpdateWgPeers(file, peerUpdate.Peers)
-		if err != nil {
-			ncutils.Log("error updating wireguard peers" + err.Error())
-			return
-		}
-		//err = wireguard.SyncWGQuickConf(cfg.Node.Interface, file)
-		err = wireguard.SetPeers(cfg.Node.Interface, cfg.Node.Address, cfg.Node.PersistentKeepalive, peerUpdate.Peers)
-		if err != nil {
-			ncutils.Log("error syncing wg after peer update " + err.Error())
-			return
-		}
-		ncutils.Log(fmt.Sprintf("received peer update on network, %s", cfg.Network))
-	}()
+	data, dataErr := decryptMsg(&cfg, msg.Payload())
+	if dataErr != nil {
+		return
+	}
+	err := json.Unmarshal([]byte(data), &peerUpdate)
+	if err != nil {
+		ncutils.Log("error unmarshalling peer data")
+		return
+	}
+	// see if cached hit, if so skip
+	var currentMessage = read(peerUpdate.Network, lastPeerUpdate)
+	if currentMessage == string(data) {
+		return
+	}
+	insert(peerUpdate.Network, lastPeerUpdate, string(data))
+
+	file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
+	err = wireguard.UpdateWgPeers(file, peerUpdate.Peers)
+	if err != nil {
+		ncutils.Log("error updating wireguard peers" + err.Error())
+		return
+	}
+	//err = wireguard.SyncWGQuickConf(cfg.Node.Interface, file)
+	err = wireguard.SetPeers(cfg.Node.Interface, cfg.Node.Address, cfg.Node.PersistentKeepalive, peerUpdate.Peers)
+	if err != nil {
+		ncutils.Log("error syncing wg after peer update " + err.Error())
+		return
+	}
+	ncutils.Log(fmt.Sprintf("received peer update on network, %s", cfg.Network))
 }
 
 // MonitorKeepalive - checks time last server keepalive received.  If more than 3+ minutes, notify and resubscribe
@@ -456,7 +451,6 @@ func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) error {
 // UpdateKeys -- updates private key and returns new publickey
 func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error {
 	ncutils.Log("received message to update keys")
-	//potentiall blocking i/o so do this in a go routine
 	key, err := wgtypes.GeneratePrivateKey()
 	if err != nil {
 		ncutils.Log("error generating privatekey " + err.Error())