Преглед на файлове

server single mq connection

Matthew R. Kasun преди 3 години
родител
ревизия
260153130b
променени са 3 файла, в които са добавени 20 реда и са изтрити 29 реда
  1. 0 2
      main.go
  2. 18 23
      mq/mq.go
  3. 2 4
      mq/util.go

+ 0 - 2
main.go

@@ -171,7 +171,6 @@ func runMessageQueue(wg *sync.WaitGroup) {
 	defer wg.Done()
 	brokerHost, secure := servercfg.GetMessageQueueEndpoint()
 	logger.Log(0, "connecting to mq broker at", brokerHost, "with TLS?", fmt.Sprintf("%v", secure))
-	var client = mq.SetupMQTT(false) // Set up the subscription listener
 	ctx, cancel := context.WithCancel(context.Background())
 	go mq.Keepalive(ctx)
 	go logic.ManageZombies(ctx)
@@ -180,7 +179,6 @@ func runMessageQueue(wg *sync.WaitGroup) {
 	<-quit
 	cancel()
 	logger.Log(0, "Message Queue shutting down")
-	client.Disconnect(250)
 }
 
 func setVerbosity() {

+ 18 - 23
mq/mq.go

@@ -20,9 +20,10 @@ const MQ_DISCONNECT = 250
 const MQ_TIMEOUT = 30
 
 var peer_force_send = 0
+var mqclient mqtt.Client
 
 // SetupMQTT creates a connection to broker and return client
-func SetupMQTT(publish bool) mqtt.Client {
+func SetupMQTT() {
 	opts := mqtt.NewClientOptions()
 	broker, secure := servercfg.GetMessageQueueEndpoint()
 	opts.AddBroker(broker)
@@ -37,28 +38,25 @@ func SetupMQTT(publish bool) mqtt.Client {
 	opts.SetKeepAlive(time.Minute)
 	opts.SetWriteTimeout(time.Minute)
 	opts.SetOnConnectHandler(func(client mqtt.Client) {
-		if !publish {
-			if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
-				client.Disconnect(240)
-				logger.Log(0, "ping subscription failed")
-			}
-			if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
-				client.Disconnect(240)
-				logger.Log(0, "node update subscription failed")
-			}
-			if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
-				client.Disconnect(240)
-				logger.Log(0, "node client subscription failed")
-			}
-
-			opts.SetOrderMatters(true)
-			opts.SetResumeSubs(true)
+		if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
+			client.Disconnect(240)
+			logger.Log(0, "ping subscription failed")
 		}
+		if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
+			client.Disconnect(240)
+			logger.Log(0, "node update subscription failed")
+		}
+		if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
+			client.Disconnect(240)
+			logger.Log(0, "node client subscription failed")
+		}
+
+		opts.SetOrderMatters(true)
+		opts.SetResumeSubs(true)
 	})
-	client := mqtt.NewClient(opts)
 	tperiod := time.Now().Add(10 * time.Second)
 	for {
-		if token := client.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
+		if token := mqclient.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
 			logger.Log(2, "unable to connect to broker, retrying ...")
 			if time.Now().After(tperiod) {
 				if token.Error() == nil {
@@ -72,10 +70,7 @@ func SetupMQTT(publish bool) mqtt.Client {
 		}
 		time.Sleep(2 * time.Second)
 	}
-	if !publish {
-		logger.Log(0, "successfully connected to mq broker")
-	}
-	return client
+	logger.Log(0, "successfully connected to mq broker")
 }
 
 // Keepalive -- periodically pings all nodes to let them know server is still alive and doing well

+ 2 - 4
mq/util.go

@@ -61,13 +61,11 @@ func encryptMsg(node *models.Node, msg []byte) ([]byte, error) {
 }
 
 func publish(node *models.Node, dest string, msg []byte) error {
-	client := SetupMQTT(true)
-	defer client.Disconnect(250)
 	encrypted, encryptErr := encryptMsg(node, msg)
 	if encryptErr != nil {
 		return encryptErr
 	}
-	if token := client.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
+	if token := mqclient.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
 		var err error
 		if token.Error() == nil {
 			err = errors.New("connection timeout")
@@ -79,7 +77,7 @@ func publish(node *models.Node, dest string, msg []byte) error {
 	return nil
 }
 
-//  decodes a message queue topic and returns the embedded node.ID
+// decodes a message queue topic and returns the embedded node.ID
 func getID(topic string) (string, error) {
 	parts := strings.Split(topic, "/")
 	count := len(parts)