|
@@ -190,7 +190,7 @@ func unsubscribeNode(client mqtt.Client, nodeCfg *config.ClientConfig) {
|
|
|
func messageQueue(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig) {
|
|
|
defer wg.Done()
|
|
|
logger.Log(0, "network:", cfg.Node.Network, "netclient message queue started for server:", cfg.Server.Server)
|
|
|
- err := setupMQTT(cfg, false)
|
|
|
+ err := setupMQTT(cfg)
|
|
|
if err != nil {
|
|
|
logger.Log(0, "unable to connect to broker", cfg.Server.Server, err.Error())
|
|
|
return
|
|
@@ -230,7 +230,7 @@ func NewTLSConfig(server string) (*tls.Config, error) {
|
|
|
|
|
|
// setupMQTT creates a connection to broker and returns client
|
|
|
// this function is primarily used to create a connection to publish to the broker
|
|
|
-func setupMQTT(cfg *config.ClientConfig, publish bool) error {
|
|
|
+func setupMQTT(cfg *config.ClientConfig) error {
|
|
|
opts := mqtt.NewClientOptions()
|
|
|
server := cfg.Server.Server
|
|
|
port := cfg.Server.MQPort
|
|
@@ -250,17 +250,15 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) error {
|
|
|
opts.SetWriteTimeout(time.Minute)
|
|
|
|
|
|
opts.SetOnConnectHandler(func(client mqtt.Client) {
|
|
|
- if !publish {
|
|
|
- networks, err := ncutils.GetSystemNetworks()
|
|
|
- if err != nil {
|
|
|
- logger.Log(0, "error retriving networks", err.Error())
|
|
|
- }
|
|
|
- for _, network := range networks {
|
|
|
- var currNodeCfg config.ClientConfig
|
|
|
- currNodeCfg.Network = network
|
|
|
- currNodeCfg.ReadConfig()
|
|
|
- setSubscriptions(client, &currNodeCfg)
|
|
|
- }
|
|
|
+ networks, err := ncutils.GetSystemNetworks()
|
|
|
+ if err != nil {
|
|
|
+ logger.Log(0, "error retriving networks", err.Error())
|
|
|
+ }
|
|
|
+ for _, network := range networks {
|
|
|
+ var currNodeCfg config.ClientConfig
|
|
|
+ currNodeCfg.Network = network
|
|
|
+ currNodeCfg.ReadConfig()
|
|
|
+ setSubscriptions(client, &currNodeCfg)
|
|
|
}
|
|
|
})
|
|
|
opts.SetOrderMatters(true)
|