Selaa lähdekoodia

on mq re-connecting donot exit if failed

abhishek9686 1 vuosi sitten
vanhempi
commit
3ff9a8a5f4
2 muutettua tiedostoa jossa 11 lisäystä ja 4 poistoa
  1. 1 1
      main.go
  2. 10 3
      mq/mq.go

+ 1 - 1
main.go

@@ -155,7 +155,7 @@ func runMessageQueue(wg *sync.WaitGroup, ctx context.Context) {
 	defer wg.Done()
 	brokerHost, _ := servercfg.GetMessageQueueEndpoint()
 	logger.Log(0, "connecting to mq broker at", brokerHost)
-	mq.SetupMQTT()
+	mq.SetupMQTT(true)
 	if mq.IsConnected() {
 		logger.Log(0, "connected to MQ Broker")
 	} else {

+ 10 - 3
mq/mq.go

@@ -38,7 +38,7 @@ func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
 }
 
 // SetupMQTT creates a connection to broker and return client
-func SetupMQTT() {
+func SetupMQTT(fatal bool) {
 	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 		if emqx.GetType() == servercfg.EmqxOnPremDeploy {
 			time.Sleep(10 * time.Second) // wait for the REST endpoint to be ready
@@ -95,7 +95,7 @@ func SetupMQTT() {
 		slog.Warn("detected broker connection lost", "err", e.Error())
 		c.Disconnect(250)
 		slog.Info("re-initiating MQ connection")
-		SetupMQTT()
+		SetupMQTT(false)
 
 	})
 	mqclient = mqtt.NewClient(opts)
@@ -105,8 +105,15 @@ func SetupMQTT() {
 			logger.Log(2, "unable to connect to broker, retrying ...")
 			if time.Now().After(tperiod) {
 				if token.Error() == nil {
-					logger.FatalLog("could not connect to broker, token timeout, exiting ...")
+					if fatal {
+						logger.FatalLog("could not connect to broker, token timeout, exiting ...")
+					}
+					logger.Log(0, "could not connect to broker, token timeout, exiting ...")
+
 				} else {
+					if fatal {
+						logger.FatalLog("could not connect to broker, exiting ...", token.Error().Error())
+					}
 					logger.FatalLog("could not connect to broker, exiting ...", token.Error().Error())
 				}
 			}