Jelajahi Sumber

update server status in mq topic

Max Ma 1 tahun lalu
induk
melakukan
6223446b15
2 mengubah file dengan 62 tambahan dan 5 penghapusan
  1. 2 1
      mq/mq.go
  2. 60 4
      mq/publishers.go

+ 2 - 1
mq/mq.go

@@ -14,7 +14,7 @@ import (
 )
 
 // KEEPALIVE_TIMEOUT - time in seconds for timeout
-const KEEPALIVE_TIMEOUT = 60 //timeout in seconds
+const KEEPALIVE_TIMEOUT = 30 //timeout in seconds
 // MQ_DISCONNECT - disconnects MQ
 const MQ_DISCONNECT = 250
 
@@ -131,6 +131,7 @@ func Keepalive(ctx context.Context) {
 		case <-ctx.Done():
 			return
 		case <-time.After(time.Second * KEEPALIVE_TIMEOUT):
+			ServerStatusUpdate()
 			sendPeers()
 		}
 	}

+ 60 - 4
mq/publishers.go

@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"time"
 
+	"github.com/gravitl/netmaker/database"
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
@@ -13,6 +14,16 @@ import (
 	"golang.org/x/exp/slog"
 )
 
+type ServerStatus struct {
+	DB               bool      `json:"db_connected"`
+	Broker           bool      `json:"broker_connected"`
+	IsBrokerConnOpen bool      `json:"is_broker_conn_open"`
+	LicenseError     string    `json:"license_error"`
+	IsPro            bool      `json:"is_pro"`
+	TrialEndDate     time.Time `json:"trial_end_date"`
+	IsOnTrialLicense bool      `json:"is_on_trial_license"`
+}
+
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
 func PublishPeerUpdate(replacePeers bool) error {
 	if !servercfg.IsMessageQueueBackend() {
@@ -134,6 +145,51 @@ func NodeUpdate(node *models.Node) error {
 	return nil
 }
 
+func ServerStatusUpdate() error {
+
+	licenseErr := ""
+	if servercfg.ErrLicenseValidation != nil {
+		licenseErr = servercfg.ErrLicenseValidation.Error()
+	}
+	var trialEndDate time.Time
+	var err error
+	isOnTrial := false
+	if servercfg.IsPro && (servercfg.GetLicenseKey() == "" || servercfg.GetNetmakerTenantID() == "") {
+		trialEndDate, err = logic.GetTrialEndDate()
+		if err != nil {
+			slog.Error("failed to get trial end date", "error", err)
+		} else {
+			isOnTrial = true
+		}
+	}
+	currentServerStatus := ServerStatus{
+		DB:               database.IsConnected(),
+		Broker:           IsConnected(),
+		IsBrokerConnOpen: IsConnectionOpen(),
+		LicenseError:     licenseErr,
+		IsPro:            servercfg.IsPro,
+		TrialEndDate:     trialEndDate,
+		IsOnTrialLicense: isOnTrial,
+	}
+
+	data, err := json.Marshal(currentServerStatus)
+	if err != nil {
+		slog.Error("error marshalling server status update ", err.Error())
+		return err
+	}
+
+	if mqclient == nil || !mqclient.IsConnected() {
+		return errors.New("cannot publish ... mqclient not connected")
+	}
+
+	if token := mqclient.Publish("server/status", 0, true, data); token.Wait() && token.Error() != nil {
+		slog.Error("could not publish server status", "error", token.Error().Error())
+		return token.Error()
+	}
+
+	return nil
+}
+
 // HostUpdate -- publishes a host update to clients
 func HostUpdate(hostUpdate *models.HostUpdate) error {
 	if !servercfg.IsMessageQueueBackend() {
@@ -212,10 +268,10 @@ func PushMetricsToExporter(metrics models.Metrics) error {
 // sendPeers - retrieve networks, send peer ports to all peers
 func sendPeers() {
 
-	hosts, err := logic.GetAllHosts()
-	if err != nil && len(hosts) > 0 {
-		logger.Log(1, "error retrieving networks for keepalive", err.Error())
-	}
+	// hosts, err := logic.GetAllHosts()
+	// if err != nil && len(hosts) > 0 {
+	// 	logger.Log(1, "error retrieving networks for keepalive", err.Error())
+	// }
 
 	peer_force_send++
 	if peer_force_send == 5 {