|
@@ -3,6 +3,7 @@ package logic
|
|
import (
|
|
import (
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"math"
|
|
"math"
|
|
|
|
+ "sync"
|
|
"time"
|
|
"time"
|
|
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
@@ -15,9 +16,65 @@ import (
|
|
"golang.org/x/exp/slog"
|
|
"golang.org/x/exp/slog"
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+var (
|
|
|
|
+ metricsCacheMutex = &sync.RWMutex{}
|
|
|
|
+ metricsCacheMap map[string]models.Metrics
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+func getMetricsFromCache(key string) (metrics models.Metrics, ok bool) {
|
|
|
|
+ metricsCacheMutex.RLock()
|
|
|
|
+ metrics, ok = metricsCacheMap[key]
|
|
|
|
+ metricsCacheMutex.RUnlock()
|
|
|
|
+ return
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func storeMetricsInCache(key string, metrics models.Metrics) {
|
|
|
|
+ metricsCacheMutex.Lock()
|
|
|
|
+ metricsCacheMap[key] = metrics
|
|
|
|
+ metricsCacheMutex.Unlock()
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func deleteNetworkFromCache(key string) {
|
|
|
|
+ metricsCacheMutex.Lock()
|
|
|
|
+ delete(metricsCacheMap, key)
|
|
|
|
+ metricsCacheMutex.Unlock()
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func LoadNodeMetricsToCache() error {
|
|
|
|
+ point1 := time.Now()
|
|
|
|
+ if metricsCacheMap == nil {
|
|
|
|
+ metricsCacheMap = map[string]models.Metrics{}
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ collection, err := database.FetchRecords(database.METRICS_TABLE_NAME)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for key, value := range collection {
|
|
|
|
+ var metrics models.Metrics
|
|
|
|
+ if err := json.Unmarshal([]byte(value), &metrics); err != nil {
|
|
|
|
+ slog.Error("parse metric record error", "error", err.Error())
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ if servercfg.CacheEnabled() {
|
|
|
|
+ storeMetricsInCache(key, metrics)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ point3 := time.Now()
|
|
|
|
+ slog.Error("load node metrics done", "Debug", point3.Unix()-point1.Unix(), len(metricsCacheMap))
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
// GetMetrics - gets the metrics
|
|
// GetMetrics - gets the metrics
|
|
func GetMetrics(nodeid string) (*models.Metrics, error) {
|
|
func GetMetrics(nodeid string) (*models.Metrics, error) {
|
|
var metrics models.Metrics
|
|
var metrics models.Metrics
|
|
|
|
+ if servercfg.CacheEnabled() {
|
|
|
|
+ if metrics, ok := getMetricsFromCache(nodeid); ok {
|
|
|
|
+ return &metrics, nil
|
|
|
|
+ }
|
|
|
|
+ }
|
|
record, err := database.FetchRecord(database.METRICS_TABLE_NAME, nodeid)
|
|
record, err := database.FetchRecord(database.METRICS_TABLE_NAME, nodeid)
|
|
if err != nil {
|
|
if err != nil {
|
|
if database.IsEmptyRecord(err) {
|
|
if database.IsEmptyRecord(err) {
|
|
@@ -29,6 +86,9 @@ func GetMetrics(nodeid string) (*models.Metrics, error) {
|
|
if err != nil {
|
|
if err != nil {
|
|
return &metrics, err
|
|
return &metrics, err
|
|
}
|
|
}
|
|
|
|
+ if servercfg.CacheEnabled() {
|
|
|
|
+ storeMetricsInCache(nodeid, metrics)
|
|
|
|
+ }
|
|
return &metrics, nil
|
|
return &metrics, nil
|
|
}
|
|
}
|
|
|
|
|
|
@@ -38,12 +98,26 @@ func UpdateMetrics(nodeid string, metrics *models.Metrics) error {
|
|
if err != nil {
|
|
if err != nil {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
- return database.Insert(nodeid, string(data), database.METRICS_TABLE_NAME)
|
|
|
|
|
|
+ err = database.Insert(nodeid, string(data), database.METRICS_TABLE_NAME)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ if servercfg.CacheEnabled() {
|
|
|
|
+ storeMetricsInCache(nodeid, *metrics)
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
}
|
|
}
|
|
|
|
|
|
// DeleteMetrics - deletes metrics of a given node
|
|
// DeleteMetrics - deletes metrics of a given node
|
|
func DeleteMetrics(nodeid string) error {
|
|
func DeleteMetrics(nodeid string) error {
|
|
- return database.DeleteRecord(database.METRICS_TABLE_NAME, nodeid)
|
|
|
|
|
|
+ err := database.DeleteRecord(database.METRICS_TABLE_NAME, nodeid)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ if servercfg.CacheEnabled() {
|
|
|
|
+ deleteNetworkFromCache(nodeid)
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
}
|
|
}
|
|
|
|
|
|
// MQUpdateMetricsFallBack - called when mq fallback thread is triggered on client
|
|
// MQUpdateMetricsFallBack - called when mq fallback thread is triggered on client
|