Browse Source

changed mq client options

0xdcarns 3 years ago
parent
commit
06d3e847c3
1 changed files with 72 additions and 117 deletions
  1. 72 117
      netclient/functions/daemon.go

+ 72 - 117
netclient/functions/daemon.go

@@ -89,13 +89,53 @@ func Daemon() error {
 }
 
 // SetupMQTT creates a connection to broker and return client
-func SetupMQTT(cfg *config.ClientConfig) mqtt.Client {
+func SetupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
 	opts := mqtt.NewClientOptions()
 	server := getServerAddress(cfg)
 	opts.AddBroker(server + ":1883")
 	id := ncutils.MakeRandomString(23)
 	opts.ClientID = id
 	opts.SetDefaultPublishHandler(All)
+	opts.SetAutoReconnect(true)
+	opts.SetConnectRetry(true)
+	opts.SetConnectRetryInterval(time.Second << 2)
+	opts.SetKeepAlive(time.Minute >> 1)
+	opts.SetWriteTimeout(time.Minute)
+	opts.SetOnConnectHandler(func(client mqtt.Client) {
+		if !publish {
+			if cfg.DebugOn {
+				if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
+					ncutils.Log(token.Error().Error())
+					return
+				}
+				ncutils.Log("subscribed to all topics for debugging purposes")
+			}
+			if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
+				ncutils.Log(token.Error().Error())
+				return
+			}
+			if cfg.DebugOn {
+				ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
+			}
+			if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
+				ncutils.Log(token.Error().Error())
+				return
+			}
+			if cfg.DebugOn {
+				ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
+			}
+			opts.SetOrderMatters(true)
+			opts.SetResumeSubs(true)
+		}
+	})
+	opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
+		ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network)
+		_, err := Pull(cfg.Node.Network, true)
+		if err != nil {
+			ncutils.Log("could not run pull, please restart daemon or examine network connectivity --- " + err.Error())
+		}
+	})
+
 	client := mqtt.NewClient(opts)
 	tperiod := time.Now().Add(12 * time.Second)
 	for {
@@ -161,56 +201,14 @@ func MessageQueue(ctx context.Context, network string) {
 	time.Sleep(time.Second << 1)
 	cfg.ReadConfig()
 	ncutils.Log("daemon started for network: " + network)
-	client := SetupMQTT(&cfg)
-	if cfg.DebugOn {
-		if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
-			ncutils.Log(token.Error().Error())
-			return
-		}
-		ncutils.Log("subscribed to all topics for debugging purposes")
-	}
-	if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
-		ncutils.Log(token.Error().Error())
-		return
-	}
-	if cfg.DebugOn {
-		ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
-	}
-	if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
-		ncutils.Log(token.Error().Error())
-		return
-	}
-	if cfg.DebugOn {
-		ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
-	}
-	var found bool
-	for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
-		if !server.IsLeader {
-			continue
-		}
-		if server.Address != "" {
-			if token := client.Subscribe(fmt.Sprintf("serverkeepalive/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
-				ncutils.Log(token.Error().Error())
-				return
-			}
-			found = true
-			if cfg.DebugOn {
-				ncutils.Log("subscribed to server keepalives for server " + cfg.Node.Network)
-			}
-		}
-	}
-	if !found {
-		ncutils.Log("leader not defined for network " + cfg.Node.Network)
-	}
+	client := SetupMQTT(&cfg, false)
+
 	defer client.Disconnect(250)
 	wg := &sync.WaitGroup{}
 	wg.Add(2)
-	keepalivectx, keepalivecancel := context.WithCancel(context.Background())
-	go MonitorKeepalive(keepalivectx, wg, client, &cfg)
 	checkinctx, checkincancel := context.WithCancel(context.Background())
 	go Checkin(checkinctx, wg, &cfg, network)
 	<-ctx.Done()
-	keepalivecancel()
 	checkincancel()
 	ncutils.Log("shutting down message queue for network " + network)
 	wg.Wait()
@@ -313,10 +311,10 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
 			return
 		}
 		time.Sleep(time.Second >> 1)
-		if err = Resubscribe(client, &cfg); err != nil {
-			ncutils.Log("error resubscribing after interface change " + err.Error())
-			return
-		}
+		// if err = Resubscribe(client, &cfg); err != nil {
+		// 	ncutils.Log("error resubscribing after interface change " + err.Error())
+		// 	return
+		// }
 		if newNode.DNSOn == "yes" {
 			for _, server := range newNode.NetworkSettings.DefaultServerAddrs {
 				if server.IsLeader {
@@ -376,29 +374,30 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
 }
 
 // MonitorKeepalive - checks time last server keepalive received.  If more than 3+ minutes, notify and resubscribe
-func MonitorKeepalive(ctx context.Context, wg *sync.WaitGroup, client mqtt.Client, cfg *config.ClientConfig) {
-	defer wg.Done()
-	for {
-		select {
-		case <-ctx.Done():
-			ncutils.Log("cancel recieved, monitor keepalive exiting")
-			return
-		case <-time.After(time.Second * 150):
-			var keepalivetime time.Time
-			keepaliveval, ok := keepalive.Load(cfg.Node.Network)
-			if ok {
-				keepalivetime = keepaliveval.(time.Time)
-				if !keepalivetime.IsZero() && time.Since(keepalivetime) > time.Second*120 { // more than 2+ minutes
-					ncutils.Log("server keepalive not recieved recently, resubscribe to message queue")
-					err := Resubscribe(client, cfg)
-					if err != nil {
-						ncutils.Log("closing " + err.Error())
-					}
-				}
-			}
-		}
-	}
-}
+// func MonitorKeepalive(ctx context.Context, wg *sync.WaitGroup, client mqtt.Client, cfg *config.ClientConfig) {
+// 	defer wg.Done()
+// 	for {
+// 		select {
+// 		case <-ctx.Done():
+// 			ncutils.Log("cancel recieved, monitor keepalive exiting")
+// 			return
+// 		case <-time.After(time.Second * 150):
+// 			var keepalivetime time.Time
+// 			keepaliveval, ok := keepalive.Load(cfg.Node.Network)
+// 			if ok {
+// 				keepalivetime = keepaliveval.(time.Time)
+// 				if !keepalivetime.IsZero() && time.Since(keepalivetime) > time.Second*120 { // more than 2+ minutes
+// 					// ncutils.Log("server keepalive not recieved recently, resubscribe to message queue")
+// 					// err := Resubscribe(client, cfg)
+// 					// if err != nil {
+// 					// 	ncutils.Log("closing " + err.Error())
+// 					// }
+// 					ncutils.Log("maybe wanna call something")
+// 				}
+// 			}
+// 		}
+// 	}
+// }
 
 // ServerKeepAlive -- handler to react to keepalive messages published by server
 func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
@@ -407,50 +406,6 @@ func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
 	ncutils.PrintLog("received server keepalive at "+currentTime.String(), 2)
 }
 
-// Resubscribe --- handles resubscribing if needed
-func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) error {
-	if err := config.ModConfig(&cfg.Node); err == nil {
-		ncutils.Log("resubbing on network " + cfg.Node.Network)
-		client.Disconnect(250)
-		client = SetupMQTT(cfg)
-		if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, NodeUpdate); token.Wait() && token.Error() != nil {
-			ncutils.Log("error resubscribing to updates for " + cfg.Node.Network)
-			return token.Error()
-		}
-		if cfg.DebugOn {
-			ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/" + cfg.Node.ID)
-		}
-		if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, UpdatePeers); token.Wait() && token.Error() != nil {
-			ncutils.Log("error resubscribing to peers for " + cfg.Node.Network)
-			return token.Error()
-		}
-		var found bool
-		for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
-			if !server.IsLeader {
-				continue
-			}
-			if server.Address != "" {
-				if token := client.Subscribe(fmt.Sprintf("serverkeepalive/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
-					ncutils.Log("error resubscribing to serverkeepalive for " + cfg.Node.Network)
-					return token.Error()
-				}
-				found = true
-				if cfg.DebugOn {
-					ncutils.Log("subscribed to server keepalives for server " + cfg.Node.Network)
-				}
-			}
-		}
-		if !found {
-			ncutils.Log("leader not defined for network " + cfg.Node.Network)
-		}
-		ncutils.Log("finished re subbing")
-		return nil
-	} else {
-		ncutils.Log("could not mod config when re-subbing")
-		return err
-	}
-}
-
 // UpdateKeys -- updates private key and returns new publickey
 func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error {
 	ncutils.Log("received message to update keys")
@@ -577,7 +532,7 @@ func publish(cfg *config.ClientConfig, dest string, msg []byte) error {
 		return err
 	}
 
-	client := SetupMQTT(cfg)
+	client := SetupMQTT(cfg, true)
 	defer client.Disconnect(250)
 	encrypted, err := ncutils.BoxEncrypt(msg, serverPubKey, trafficPrivKey)
 	if err != nil {