瀏覽代碼

only send mq messsage when there is server status change

Max Ma 1 年之前
父節點
當前提交
82bf2e424c
共有 2 個文件被更改,包括 54 次插入10 次删除
  1. 3 0
      mq/mq.go
  2. 51 10
      mq/publishers.go

+ 3 - 0
mq/mq.go

@@ -131,6 +131,9 @@ func Keepalive(ctx context.Context) {
 		case <-ctx.Done():
 			return
 		case <-time.After(time.Second * KEEPALIVE_TIMEOUT):
+			if mqclient == nil {
+				SetupMQTT(false)
+			}
 			ServerStatusUpdate()
 			sendPeers()
 		}

+ 51 - 10
mq/publishers.go

@@ -25,6 +25,8 @@ type ServerStatus struct {
 	Failover         map[string]bool `json:"is_failover_existed"`
 }
 
+var serverStatusCache = ServerStatus{}
+
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
 func PublishPeerUpdate(replacePeers bool) error {
 	if !servercfg.IsMessageQueueBackend() {
@@ -184,22 +186,61 @@ func ServerStatusUpdate() error {
 		Failover:         failoverExisted,
 	}
 
-	data, err := json.Marshal(currentServerStatus)
-	if err != nil {
-		slog.Error("error marshalling server status update ", err.Error())
-		return err
+	if isServerStatusChanged(serverStatusCache, currentServerStatus) {
+
+		data, err := json.Marshal(currentServerStatus)
+		if err != nil {
+			slog.Error("error marshalling server status update ", err.Error())
+			return err
+		}
+
+		if mqclient == nil || !mqclient.IsConnected() {
+			return errors.New("cannot publish ... mqclient not connected")
+		}
+
+		if token := mqclient.Publish("server/status", 0, true, data); token.Wait() && token.Error() != nil {
+			slog.Error("could not publish server status", "error", token.Error().Error())
+			return token.Error()
+		}
+		serverStatusCache = currentServerStatus
 	}
 
-	if mqclient == nil || !mqclient.IsConnected() {
-		return errors.New("cannot publish ... mqclient not connected")
+	return nil
+}
+
+func isServerStatusChanged(serverStatusCache, currentServerStatus ServerStatus) bool {
+	if serverStatusCache.DB != currentServerStatus.DB {
+		return true
+	}
+	if serverStatusCache.Broker != currentServerStatus.Broker {
+		return true
+	}
+	if serverStatusCache.IsBrokerConnOpen != currentServerStatus.IsBrokerConnOpen {
+		return true
+	}
+	if serverStatusCache.LicenseError != currentServerStatus.LicenseError {
+		return true
+	}
+	if serverStatusCache.IsPro != currentServerStatus.IsPro {
+		return true
+	}
+	if !serverStatusCache.TrialEndDate.Equal(currentServerStatus.TrialEndDate) {
+		return true
+	}
+	if serverStatusCache.IsOnTrialLicense != currentServerStatus.IsOnTrialLicense {
+		return true
+	}
+	if len(serverStatusCache.Failover) != len(currentServerStatus.Failover) {
+		return true
 	}
 
-	if token := mqclient.Publish("server/status", 0, true, data); token.Wait() && token.Error() != nil {
-		slog.Error("could not publish server status", "error", token.Error().Error())
-		return token.Error()
+	for k, v := range serverStatusCache.Failover {
+		if v1, ok := currentServerStatus.Failover[k]; !ok || (ok && v != v1) {
+			return true
+		}
 	}
 
-	return nil
+	return false
 }
 
 // HostUpdate -- publishes a host update to clients