Browse Source

check broker connectivity on mqtt setup

Matthew R. Kasun 3 years ago
parent
commit
a1fd52d7d5
2 changed files with 19 additions and 10 deletions
  1. 10 4
      netclient/functions/daemon.go
  2. 9 6
      netclient/functions/mqpublish.go

+ 10 - 4
netclient/functions/daemon.go

@@ -167,7 +167,11 @@ func unsubscribeNode(client mqtt.Client, nodeCfg *config.ClientConfig) {
 // the client should subscribe to ALL nodes that exist on server locally
 func messageQueue(ctx context.Context, cfg *config.ClientConfig) {
 	logger.Log(0, "netclient daemon started for server: ", cfg.Server.Server)
-	client := setupMQTT(cfg, false)
+	client, err := setupMQTT(cfg, false)
+	if err != nil {
+		logger.Log(0, "unable to connect to broker", err.Error())
+		return
+	}
 	defer client.Disconnect(250)
 	<-ctx.Done()
 	logger.Log(0, "shutting down daemon for server ", cfg.Server.Server)
@@ -201,7 +205,7 @@ func NewTLSConfig(server string) *tls.Config {
 
 // setupMQTT creates a connection to broker and returns client
 // this function is primarily used to create a connection to publish to the broker
-func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
+func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) {
 	opts := mqtt.NewClientOptions()
 	server := cfg.Server.Server
 	opts.AddBroker("ssl://" + server + ":8883") // TODO get the appropriate port of the comms mq server
@@ -242,7 +246,9 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
 		} else {
 			err = token.Error()
 		}
-		checkBroker(cfg.Server.Server)
+		if err := checkBroker(cfg.Server.Server); err != nil {
+			return nil, err
+		}
 		logger.Log(0, "could not connect to broker", cfg.Server.Server, err.Error())
 		if strings.Contains(err.Error(), "connectex") || strings.Contains(err.Error(), "connect timeout") {
 			logger.Log(0, "connection issue detected.. attempt connection with new certs")
@@ -257,7 +263,7 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
 			daemon.Restart()
 		}
 	}
-	return client
+	return client, nil
 }
 
 // publishes a message to server to update peers on this peer's behalf

+ 9 - 6
netclient/functions/mqpublish.go

@@ -111,7 +111,7 @@ func Hello(nodeCfg *config.ClientConfig) {
 			logger.Log(0, "could not run pull on "+nodeCfg.Node.Network+", error: "+err.Error())
 		}
 	}
-	logger.Log(3, "server checkin complete")
+	logger.Log(3, "checkin for", nodeCfg.Network, "complete")
 }
 
 // node cfg is required  in order to fetch the traffic keys of that node for encryption
@@ -126,7 +126,10 @@ func publish(nodeCfg *config.ClientConfig, dest string, msg []byte, qos byte) er
 		return err
 	}
 
-	client := setupMQTT(nodeCfg, true)
+	client, err := setupMQTT(nodeCfg, true)
+	if err != nil {
+		return fmt.Errorf("mq setup error %w", err)
+	}
 	defer client.Disconnect(250)
 	encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey)
 	if err != nil {
@@ -142,7 +145,6 @@ func publish(nodeCfg *config.ClientConfig, dest string, msg []byte, qos byte) er
 			err = token.Error()
 		}
 		if err != nil {
-			checkBroker(nodeCfg.Server.Server)
 			return err
 		}
 	}
@@ -165,10 +167,10 @@ func checkCertExpiry(cfg *config.ClientConfig) error {
 	return nil
 }
 
-func checkBroker(broker string) {
+func checkBroker(broker string) error {
 	_, err := net.LookupIP(broker)
 	if err != nil {
-		logger.FatalLog("nslookup failed for broker ... check dns records")
+		return errors.New("nslookup failed for broker ... check dns records")
 	}
 	pinger := ping.NewTCPing()
 	pinger.SetTarget(&ping.Target{
@@ -182,6 +184,7 @@ func checkBroker(broker string) {
 	pingerDone := pinger.Start()
 	<-pingerDone
 	if pinger.Result().SuccessCounter == 0 {
-		logger.FatalLog("unable to connect to broker port ... check firewalls")
+		return errors.New("unable to connect to broker port ... check netmaker server and firewalls")
 	}
+	return nil
 }