Browse Source

code changes for scale for review

Max Ma 1 year ago
parent
commit
3d95e83f6e

+ 2 - 2
controllers/enrollmentkeys.go

@@ -54,12 +54,12 @@ func getEnrollmentKeys(w http.ResponseWriter, r *http.Request) {
 	ret := []*models.EnrollmentKey{}
 	for _, key := range keys {
 		key := key
-		if err = logic.Tokenize(key, servercfg.GetAPIHost()); err != nil {
+		if err = logic.Tokenize(&key, servercfg.GetAPIHost()); err != nil {
 			logger.Log(0, r.Header.Get("user"), "failed to get token values for keys:", err.Error())
 			logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 			return
 		}
-		ret = append(ret, key)
+		ret = append(ret, &key)
 	}
 	// return JSON/API formatted keys
 	logger.Log(2, r.Header.Get("user"), "fetched enrollment keys")

+ 1 - 1
database/sqlite.go

@@ -39,7 +39,7 @@ func initSqliteDB() error {
 	}
 	// == "connect" the database ==
 	var dbOpenErr error
-	SqliteDB, dbOpenErr = sql.Open("sqlite3", dbFilePath+"?_journal_mode=WAL&_synchronous=NORMAL&_cache_size=500000")
+	SqliteDB, dbOpenErr = sql.Open("sqlite3", dbFilePath)
 	if dbOpenErr != nil {
 		return dbOpenErr
 	}

+ 4 - 1
logic/auth.go

@@ -129,7 +129,7 @@ func CreateUser(user *models.User) error {
 
 	tokenString, _ := CreateUserJWT(user.UserName, user.IsSuperAdmin, user.IsAdmin)
 	if tokenString == "" {
-		logger.Log(0, "failed to generate token", err.Error())
+		logger.Log(0, "failed to generate token")
 		return err
 	}
 
@@ -217,6 +217,9 @@ func UpsertUser(user models.User) error {
 		slog.Error("error inserting user", "user", user.UserName, "error", err.Error())
 		return err
 	}
+	if user.IsSuperAdmin {
+		superUser = user
+	}
 	return nil
 }
 

+ 18 - 18
logic/enrollmentkey.go

@@ -33,7 +33,7 @@ var EnrollmentErrors = struct {
 }
 var (
 	enrollmentkeyCacheMutex = &sync.RWMutex{}
-	enrollmentkeyCacheMap   = make(map[string]*models.EnrollmentKey)
+	enrollmentkeyCacheMap   = make(map[string]models.EnrollmentKey)
 )
 
 // CreateEnrollmentKey - creates a new enrollment key in db
@@ -110,21 +110,21 @@ func UpdateEnrollmentKey(keyId string, relayId uuid.UUID) (*models.EnrollmentKey
 
 	key.Relay = relayId
 
-	if err = upsertEnrollmentKey(key); err != nil {
+	if err = upsertEnrollmentKey(&key); err != nil {
 		return nil, err
 	}
 
-	return key, nil
+	return &key, nil
 }
 
 // GetAllEnrollmentKeys - fetches all enrollment keys from DB
 // TODO drop double pointer
-func GetAllEnrollmentKeys() ([]*models.EnrollmentKey, error) {
+func GetAllEnrollmentKeys() ([]models.EnrollmentKey, error) {
 	currentKeys, err := getEnrollmentKeysMap()
 	if err != nil {
 		return nil, err
 	}
-	var currentKeysList = []*models.EnrollmentKey{}
+	var currentKeysList = []models.EnrollmentKey{}
 	for k := range currentKeys {
 		currentKeysList = append(currentKeysList, currentKeys[k])
 	}
@@ -133,15 +133,15 @@ func GetAllEnrollmentKeys() ([]*models.EnrollmentKey, error) {
 
 // GetEnrollmentKey - fetches a single enrollment key
 // returns nil and error if not found
-func GetEnrollmentKey(value string) (*models.EnrollmentKey, error) {
+func GetEnrollmentKey(value string) (key models.EnrollmentKey, err error) {
 	currentKeys, err := getEnrollmentKeysMap()
 	if err != nil {
-		return nil, err
+		return key, err
 	}
 	if key, ok := currentKeys[value]; ok {
 		return key, nil
 	}
-	return nil, EnrollmentErrors.NoKeyFound
+	return key, EnrollmentErrors.NoKeyFound
 }
 
 func deleteEnrollmentkeyFromCache(key string) {
@@ -218,7 +218,7 @@ func DeTokenize(b64Token string) (*models.EnrollmentKey, error) {
 	if err != nil {
 		return nil, err
 	}
-	return k, nil
+	return &k, nil
 }
 
 // == private ==
@@ -233,11 +233,11 @@ func decrementEnrollmentKey(value string) (*models.EnrollmentKey, error) {
 		return nil, EnrollmentErrors.NoUsesRemaining
 	}
 	k.UsesRemaining = k.UsesRemaining - 1
-	if err = upsertEnrollmentKey(k); err != nil {
+	if err = upsertEnrollmentKey(&k); err != nil {
 		return nil, err
 	}
 
-	return k, nil
+	return &k, nil
 }
 
 func upsertEnrollmentKey(k *models.EnrollmentKey) error {
@@ -251,7 +251,7 @@ func upsertEnrollmentKey(k *models.EnrollmentKey) error {
 	err = database.Insert(k.Value, string(data), database.ENROLLMENT_KEYS_TABLE_NAME)
 	if err == nil {
 		if servercfg.CacheEnabled() {
-			storeEnrollmentkeyInCache(k.Value, k)
+			storeEnrollmentkeyInCache(k.Value, *k)
 		}
 	}
 	return nil
@@ -269,17 +269,17 @@ func getUniqueEnrollmentID() (string, error) {
 	return newID, nil
 }
 
-func getEnrollmentkeysFromCache() map[string]*models.EnrollmentKey {
+func getEnrollmentkeysFromCache() map[string]models.EnrollmentKey {
 	return enrollmentkeyCacheMap
 }
 
-func storeEnrollmentkeyInCache(key string, enrollmentkey *models.EnrollmentKey) {
+func storeEnrollmentkeyInCache(key string, enrollmentkey models.EnrollmentKey) {
 	enrollmentkeyCacheMutex.Lock()
 	enrollmentkeyCacheMap[key] = enrollmentkey
 	enrollmentkeyCacheMutex.Unlock()
 }
 
-func getEnrollmentKeysMap() (map[string]*models.EnrollmentKey, error) {
+func getEnrollmentKeysMap() (map[string]models.EnrollmentKey, error) {
 	if servercfg.CacheEnabled() {
 		keys := getEnrollmentkeysFromCache()
 		if len(keys) != 0 {
@@ -295,16 +295,16 @@ func getEnrollmentKeysMap() (map[string]*models.EnrollmentKey, error) {
 	if records == nil {
 		records = make(map[string]string)
 	}
-	currentKeys := make(map[string]*models.EnrollmentKey, 0)
+	currentKeys := make(map[string]models.EnrollmentKey, 0)
 	if len(records) > 0 {
 		for k := range records {
 			var currentKey models.EnrollmentKey
 			if err = json.Unmarshal([]byte(records[k]), &currentKey); err != nil {
 				continue
 			}
-			currentKeys[k] = &currentKey
+			currentKeys[k] = currentKey
 			if servercfg.CacheEnabled() {
-				storeEnrollmentkeyInCache(currentKey.Value, &currentKey)
+				storeEnrollmentkeyInCache(currentKey.Value, currentKey)
 			}
 		}
 	}

+ 2 - 2
logic/extpeers.go

@@ -245,7 +245,7 @@ func CreateExtClient(extclient *models.ExtClient) error {
 	}
 	if extclient.Address == "" {
 		if parentNetwork.IsIPv4 == "yes" {
-			newAddress, err := GetUniqueAddress(extclient.Network)
+			newAddress, err := UniqueAddress(extclient.Network, true)
 			if err != nil {
 				return err
 			}
@@ -255,7 +255,7 @@ func CreateExtClient(extclient *models.ExtClient) error {
 
 	if extclient.Address6 == "" {
 		if parentNetwork.IsIPv6 == "yes" {
-			addr6, err := GetUniqueAddress6(extclient.Network)
+			addr6, err := UniqueAddress6(extclient.Network, true)
 			if err != nil {
 				return err
 			}

+ 1 - 1
logic/ippool.go

@@ -15,7 +15,7 @@ import (
 )
 
 var (
-	ipPool      map[string]PoolMap
+	ipPool      = make(map[string]PoolMap)
 	ipPoolMutex = &sync.RWMutex{}
 )
 

+ 0 - 4
logic/metrics.go

@@ -12,10 +12,6 @@ var UpdateMetrics = func(string, *models.Metrics) error {
 	return nil
 }
 
-var WriteMetricsCacheToDB = func() error {
-	return nil
-}
-
 var GetMetrics = func(string) (*models.Metrics, error) {
 	var metrics models.Metrics
 	return &metrics, nil

+ 0 - 1
logic/networks.go

@@ -385,7 +385,6 @@ func UpdateNetwork(currentNetwork *models.Network, newNetwork *models.Network) (
 		if err == nil {
 			if servercfg.CacheEnabled() {
 				storeNetworkInCache(newNetwork.NetID, *newNetwork)
-				deleteNetworkFromCache(currentNetwork.NetID)
 			}
 		}
 		return hasrangeupdate4, hasrangeupdate6, hasholepunchupdate, err

+ 3 - 5
logic/nodes.go

@@ -112,11 +112,6 @@ func GetNetworkNodesMemory(allNodes []models.Node, network string) []models.Node
 // UpdateNodeCheckin - updates the checkin time of a node
 func UpdateNodeCheckin(node *models.Node) error {
 	node.SetLastCheckIn()
-	//If cache enabled, only Update the Check-in TS in cache
-	if servercfg.CacheEnabled() {
-		storeNodeInCache(*node)
-		return nil
-	}
 	data, err := json.Marshal(node)
 	if err != nil {
 		return err
@@ -126,6 +121,9 @@ func UpdateNodeCheckin(node *models.Node) error {
 	if err != nil {
 		return err
 	}
+	if servercfg.CacheEnabled() {
+		storeNodeInCache(*node)
+	}
 	return nil
 }
 

+ 3 - 0
logic/telemetry.go

@@ -128,6 +128,9 @@ func setTelemetryTimestamp(telRecord *models.Telemetry) error {
 		return err
 	}
 	err = database.Insert(database.SERVER_UUID_RECORD_KEY, string(jsonObj), database.SERVER_UUID_TABLE_NAME)
+	if err == nil {
+		telServerRecord = serverTelData
+	}
 	return err
 }
 

+ 0 - 1
main.go

@@ -39,7 +39,6 @@ func main() {
 	initialize()                       // initial db and acls
 	logic.SetIpPool()
 	defer logic.ClearIpPool()
-	defer logic.WriteMetricsCacheToDB()
 	setGarbageCollection()
 	setVerbosity()
 	if servercfg.DeployedByOperator() && !servercfg.IsPro {

+ 1 - 2
mq/mq.go

@@ -93,7 +93,6 @@ func SetupMQTT(fatal bool) {
 	opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
 		slog.Warn("detected broker connection lost", "err", e.Error())
 		//c.Disconnect(250)
-		mqclient = nil
 		slog.Info("re-initiating MQ connection")
 		//SetupMQTT(false)
 
@@ -101,7 +100,7 @@ func SetupMQTT(fatal bool) {
 	mqclient = mqtt.NewClient(opts)
 	tperiod := time.Now().Add(10 * time.Second)
 	for {
-		if token := mqclient.Connect(); token.Wait() && token.Error() != nil {
+		if token := mqclient.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
 			logger.Log(2, "unable to connect to broker, retrying ...")
 			if time.Now().After(tperiod) {
 				if token.Error() == nil {

+ 9 - 3
mq/publishers.go

@@ -343,9 +343,15 @@ func serverStatusUpdate() error {
 			return errors.New("cannot publish ... mqclient not connected")
 		}
 
-		if token := mqclient.Publish("server/status", 0, true, data); token.Wait() && token.Error() != nil {
-			slog.Error("could not publish server status", "error", token.Error().Error())
-			return token.Error()
+		if token := mqclient.Publish("server/status", 0, true, data); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
+			var err error
+			if token.Error() == nil {
+				err = errors.New("connection timeout")
+			} else {
+				slog.Error("could not publish server status", "error", token.Error().Error())
+				err = token.Error()
+			}
+			return err
 		}
 		serverStatusCache = currentServerStatus
 	}

+ 9 - 5
mq/util.go

@@ -4,6 +4,7 @@ import (
 	"errors"
 	"fmt"
 	"strings"
+	"time"
 
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
@@ -82,12 +83,15 @@ func publish(host *models.Host, dest string, msg []byte) error {
 		return errors.New("cannot publish ... mqclient not connected")
 	}
 
-	if token := mqclient.Publish(dest, 0, true, encrypted); token.Wait() && token.Error() != nil {
-		slog.Error("publish to mq error", "error", token.Error().Error())
-		if strings.Contains(token.Error().Error(), "use of closed network connection") || strings.Contains(token.Error().Error(), "publish was broken by timeout") {
-			mqclient = nil
+	if token := mqclient.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
+		var err error
+		if token.Error() == nil {
+			err = errors.New("connection timeout")
+		} else {
+			slog.Error("publish to mq error", "error", token.Error().Error())
+			err = token.Error()
 		}
-		return token.Error()
+		return err
 	}
 	return nil
 }

+ 0 - 1
pro/initialize.go

@@ -103,7 +103,6 @@ func InitPro() {
 	logic.UpdateProNodeACLs = proLogic.UpdateProNodeACLs
 	logic.GetMetrics = proLogic.GetMetrics
 	logic.UpdateMetrics = proLogic.UpdateMetrics
-	logic.WriteMetricsCacheToDB = proLogic.WriteMetricsCacheToDB
 	logic.DeleteMetrics = proLogic.DeleteMetrics
 	logic.GetRelays = proLogic.GetRelays
 	logic.GetAllowedIpsForRelayed = proLogic.GetAllowedIpsForRelayed

+ 3 - 20
pro/logic/metrics.go

@@ -66,21 +66,6 @@ func LoadNodeMetricsToCache() error {
 	return nil
 }
 
-func WriteMetricsCacheToDB() error {
-
-	for k, v := range metricsCacheMap {
-		data, err := json.Marshal(v)
-		if err != nil {
-			continue
-		}
-		err = database.Insert(k, string(data), database.METRICS_TABLE_NAME)
-		if err != nil {
-			continue
-		}
-	}
-	return nil
-}
-
 // GetMetrics - gets the metrics
 func GetMetrics(nodeid string) (*models.Metrics, error) {
 	var metrics models.Metrics
@@ -108,11 +93,6 @@ func GetMetrics(nodeid string) (*models.Metrics, error) {
 
 // UpdateMetrics - updates the metrics of a given client
 func UpdateMetrics(nodeid string, metrics *models.Metrics) error {
-	//if cache is enabled, only save the metric data to cache
-	if servercfg.CacheEnabled() {
-		storeMetricsInCache(nodeid, *metrics)
-		return nil
-	}
 	data, err := json.Marshal(metrics)
 	if err != nil {
 		return err
@@ -121,6 +101,9 @@ func UpdateMetrics(nodeid string, metrics *models.Metrics) error {
 	if err != nil {
 		return err
 	}
+	if servercfg.CacheEnabled() {
+		storeMetricsInCache(nodeid, *metrics)
+	}
 	return nil
 }