|
@@ -2,6 +2,7 @@ package mq
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
+ "fmt"
|
|
|
"log"
|
|
|
"time"
|
|
|
|
|
@@ -53,23 +54,24 @@ func SetupMQTT() {
|
|
|
opts := mqtt.NewClientOptions()
|
|
|
setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts)
|
|
|
opts.SetOnConnectHandler(func(client mqtt.Client) {
|
|
|
- if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
|
|
+ serverName := servercfg.GetServer()
|
|
|
+ if token := client.Subscribe(fmt.Sprintf("ping/%s/#", serverName), 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
|
|
client.Disconnect(240)
|
|
|
logger.Log(0, "ping subscription failed")
|
|
|
}
|
|
|
- if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
|
|
+ if token := client.Subscribe(fmt.Sprintf("update/%s/#", serverName), 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
|
|
client.Disconnect(240)
|
|
|
logger.Log(0, "node update subscription failed")
|
|
|
}
|
|
|
- if token := client.Subscribe("host/serverupdate/#", 0, mqtt.MessageHandler(UpdateHost)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
|
|
+ if token := client.Subscribe(fmt.Sprintf("host/serverupdate/%s/#", serverName), 0, mqtt.MessageHandler(UpdateHost)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
|
|
client.Disconnect(240)
|
|
|
logger.Log(0, "host update subscription failed")
|
|
|
}
|
|
|
- if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
|
|
+ if token := client.Subscribe(fmt.Sprintf("signal/%s/#", serverName), 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
|
|
client.Disconnect(240)
|
|
|
logger.Log(0, "node client subscription failed")
|
|
|
}
|
|
|
- if token := client.Subscribe("metrics/#", 0, mqtt.MessageHandler(UpdateMetrics)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
|
|
+ if token := client.Subscribe(fmt.Sprintf("metrics/%s/#", serverName), 0, mqtt.MessageHandler(UpdateMetrics)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
|
|
client.Disconnect(240)
|
|
|
logger.Log(0, "node metrics subscription failed")
|
|
|
}
|