|
@@ -34,6 +34,8 @@ var messageCache = new(sync.Map)
|
|
|
|
|
|
var serverSet map[string]bool
|
|
|
|
|
|
+var mqclient mqtt.Client
|
|
|
+
|
|
|
const lastNodeUpdate = "lnu"
|
|
|
const lastPeerUpdate = "lpu"
|
|
|
|
|
@@ -188,12 +190,12 @@ func unsubscribeNode(client mqtt.Client, nodeCfg *config.ClientConfig) {
|
|
|
func messageQueue(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig) {
|
|
|
defer wg.Done()
|
|
|
logger.Log(0, "network:", cfg.Node.Network, "netclient message queue started for server:", cfg.Server.Server)
|
|
|
- client, err := setupMQTT(cfg, false)
|
|
|
+ err := setupMQTT(cfg, false)
|
|
|
if err != nil {
|
|
|
logger.Log(0, "unable to connect to broker", cfg.Server.Server, err.Error())
|
|
|
return
|
|
|
}
|
|
|
- defer client.Disconnect(250)
|
|
|
+ defer mqclient.Disconnect(250)
|
|
|
<-ctx.Done()
|
|
|
logger.Log(0, "shutting down message queue for server", cfg.Server.Server)
|
|
|
}
|
|
@@ -228,7 +230,7 @@ func NewTLSConfig(server string) (*tls.Config, error) {
|
|
|
|
|
|
// setupMQTT creates a connection to broker and returns client
|
|
|
// this function is primarily used to create a connection to publish to the broker
|
|
|
-func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) {
|
|
|
+func setupMQTT(cfg *config.ClientConfig, publish bool) error {
|
|
|
opts := mqtt.NewClientOptions()
|
|
|
server := cfg.Server.Server
|
|
|
port := cfg.Server.MQPort
|
|
@@ -236,7 +238,7 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) {
|
|
|
tlsConfig, err := NewTLSConfig(server)
|
|
|
if err != nil {
|
|
|
logger.Log(0, "failed to get TLS config for", server, err.Error())
|
|
|
- return nil, err
|
|
|
+ return err
|
|
|
}
|
|
|
opts.SetTLSConfig(tlsConfig)
|
|
|
opts.SetClientID(ncutils.MakeRandomString(23))
|
|
@@ -286,11 +288,11 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) {
|
|
|
reRegisterWithServer(cfg)
|
|
|
//try after re-registering
|
|
|
if token := client.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
|
|
|
- return client, errors.New("unable to connect to broker")
|
|
|
+ return errors.New("unable to connect to broker")
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- return client, nil
|
|
|
+ mqclient = client
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
func reRegisterWithServer(cfg *config.ClientConfig) {
|