|
@@ -126,7 +126,16 @@ func setupMQTT(publish bool, networkName string) mqtt.Client {
|
|
|
opts.SetWriteTimeout(time.Minute)
|
|
|
opts.SetOnConnectHandler(func(client mqtt.Client) {
|
|
|
if !publish {
|
|
|
- SetSubscriptions(client, cfg)
|
|
|
+ networks, err := ncutils.GetSystemNetworks()
|
|
|
+ if err != nil {
|
|
|
+ ncutils.Log("error retriving networks " + err.Error())
|
|
|
+ }
|
|
|
+ for _, network := range networks {
|
|
|
+ var currConf config.ClientConfig
|
|
|
+ currConf.Network = network
|
|
|
+ currConf.ReadConfig()
|
|
|
+ SetSubscriptions(client, &currConf)
|
|
|
+ }
|
|
|
}
|
|
|
})
|
|
|
opts.SetOrderMatters(true)
|
|
@@ -182,29 +191,20 @@ func SetSubscriptions(client mqtt.Client, cfg *config.ClientConfig) {
|
|
|
}
|
|
|
ncutils.Log("subscribed to all topics for debugging purposes")
|
|
|
}
|
|
|
- networks, err := ncutils.GetSystemNetworks()
|
|
|
- if err != nil {
|
|
|
- ncutils.Log("error retriving networks " + err.Error())
|
|
|
- }
|
|
|
- for _, network := range networks {
|
|
|
- var cfg config.ClientConfig
|
|
|
- cfg.Network = network
|
|
|
- cfg.ReadConfig()
|
|
|
|
|
|
- if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
|
|
|
- ncutils.Log(token.Error().Error())
|
|
|
- return
|
|
|
- }
|
|
|
- if cfg.DebugOn {
|
|
|
- ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
|
|
|
- }
|
|
|
- if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
|
|
|
- ncutils.Log(token.Error().Error())
|
|
|
- return
|
|
|
- }
|
|
|
- if cfg.DebugOn {
|
|
|
- ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
|
|
|
- }
|
|
|
+ if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
|
|
|
+ ncutils.Log(token.Error().Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if cfg.DebugOn {
|
|
|
+ ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
|
|
|
+ }
|
|
|
+ if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
|
|
|
+ ncutils.Log(token.Error().Error())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if cfg.DebugOn {
|
|
|
+ ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
|
|
|
}
|
|
|
}
|
|
|
|