Просмотр исходного кода

added server memcached support

worker-9 4 лет назад
Родитель
Сommit
e3fe39ebe7
8 измененных файлов с 216 добавлено и 15 удалено
  1. 2 1
      config/config.go
  2. 5 2
      database/database.go
  3. 184 0
      database/memcached.go
  4. 10 10
      database/rqlite.go
  5. 1 0
      go.mod
  6. 2 0
      go.sum
  7. 11 1
      servercfg/serverconf.go
  8. 1 1
      test/nodecreate.sh

+ 2 - 1
config/config.go

@@ -53,10 +53,11 @@ type ServerConfig struct {
 	GRPCSSL              string `yaml:"grpcssl"`
 	Version              string `yaml:"version"`
 	SQLConn              string `yaml:"sqlconn"`
-	Platform              string `yaml:"platform"`
+	Platform             string `yaml:"platform"`
 	Database             string `yaml:database`
 	DefaultNodeLimit     int32  `yaml:"defaultnodelimit"`
 	Verbosity            int32  `yaml:"verbosity"`
+	MemcachedAddresses   string `yaml:"memcachedaddresses"`
 }
 
 //reading in the env file

+ 5 - 2
database/database.go

@@ -2,9 +2,10 @@ package database
 
 import (
 	"encoding/json"
-	"time"
 	"errors"
 	"log"
+	"time"
+
 	"github.com/gravitl/netmaker/servercfg"
 )
 
@@ -38,13 +39,15 @@ func getCurrentDB() map[string]interface{} {
 		return RQLITE_FUNCTIONS
 	case "sqlite":
 		return SQLITE_FUNCTIONS
+	case "memcached":
+		return MEMCACHED_FUNCTIONS
 	default:
 		return SQLITE_FUNCTIONS
 	}
 }
 
 func InitializeDatabase() error {
-	log.Println("connecting to",servercfg.GetDB())
+	log.Println("connecting to", servercfg.GetDB())
 	tperiod := time.Now().Add(10 * time.Second)
 	for {
 		if err := getCurrentDB()[INIT_DB].(func() error)(); err != nil {

+ 184 - 0
database/memcached.go

@@ -0,0 +1,184 @@
+package database
+
+import (
+	"encoding/json"
+	"errors"
+	"log"
+	"net"
+	"strings"
+	"time"
+
+	"github.com/bradfitz/gomemcache/memcache"
+	"github.com/gravitl/netmaker/servercfg"
+)
+
+var MemCachedDatabase *memcache.Client
+
+var MEMCACHED_FUNCTIONS = map[string]interface{}{
+	INIT_DB:      initMemcachedDatabase,
+	CREATE_TABLE: memcachedCreateTable,
+	INSERT:       memcachedInsert,
+	INSERT_PEER:  memcachedInsertPeer,
+	DELETE:       memcachedDeleteRecord,
+	DELETE_ALL:   memcachedDeleteAllRecords,
+	FETCH_ALL:    memcachedFetchRecords,
+	CLOSE_DB:     memcachedCloseDB,
+}
+
+// utility function to make setting memcached servers easier
+func parseMemcachedAddresses(addresses string) string {
+	addressesArr := strings.Split(addresses, ",")
+	numAddresses := len(addressesArr)
+	if numAddresses == 0 {
+		return "127.0.0.1:11211"
+	}
+	newAddresses := ""
+	log.SetFlags(log.Flags() &^ (log.Llongfile | log.Lshortfile))
+	for _, address := range addressesArr {
+		if isValidIp(address) {
+			newAddresses += address
+			if servercfg.GetVerbose() >= 2 {
+				log.Println("adding " + address + " to memcached servers")
+			}
+			if address != addressesArr[numAddresses-1] {
+				newAddresses += ","
+			}
+		}
+	}
+	return newAddresses
+}
+
+func initMemcachedDatabase() error {
+	addresses := parseMemcachedAddresses(servercfg.GetMemcachedAddress())
+	MemCachedDatabase = memcache.New(addresses)
+	if MemCachedDatabase == nil {
+		return errors.New("could not initialize memcached")
+	}
+	MemCachedDatabase.Timeout = time.Minute
+	return nil
+}
+
+func memcachedCreateTable(tableName string) error {
+
+	if currentTable, err := memcachedFetchRecords(tableName); (currentTable != nil && len(currentTable) >= 0) || err != nil {
+		// return if it already exists
+		return err
+	} else {
+		log.Println(currentTable)
+	}
+	table := make(map[string]string)
+	newTable, err := json.Marshal(table)
+	if err != nil {
+		return err
+	}
+	err = MemCachedDatabase.Set(&memcache.Item{Key: tableName, Value: newTable, Expiration: 0})
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func memcachedInsert(key string, value string, tableName string) error {
+	if key != "" && value != "" && IsJSONString(value) {
+		preData, err := MemCachedDatabase.Get(tableName)
+		if err != nil {
+			return err
+		}
+		var preDataMap map[string]string
+		if err := json.Unmarshal(preData.Value, &preDataMap); err != nil {
+			return err
+		}
+		preDataMap[key] = value
+		postData, err := json.Marshal(&preDataMap)
+		if err != nil {
+			return err
+		}
+		err = MemCachedDatabase.Replace(&memcache.Item{Key: tableName, Value: postData, Expiration: 0})
+		if err != nil {
+			return err
+		}
+		return nil
+	} else {
+		return errors.New("invalid insert " + key + " : " + value)
+	}
+}
+
+func memcachedInsertPeer(key string, value string) error {
+	if key != "" && value != "" && IsJSONString(value) {
+		if err := memcachedInsert(key, value, PEERS_TABLE_NAME); err != nil {
+			return err
+		}
+		return nil
+	} else {
+		return errors.New("invalid peer insert " + key + " : " + value)
+	}
+}
+
+func memcachedDeleteRecord(tableName string, key string) error {
+	if key != "" {
+		preData, err := MemCachedDatabase.Get(tableName)
+		if err != nil {
+			return err
+		}
+		var preDataMap map[string]string
+		if err := json.Unmarshal(preData.Value, &preDataMap); err != nil {
+			return err
+		}
+		delete(preDataMap, key)
+		postData, err := json.Marshal(&preDataMap)
+		if err != nil {
+			return err
+		}
+		err = MemCachedDatabase.Set(&memcache.Item{Key: tableName, Value: postData, Expiration: 0})
+		if err != nil {
+			return err
+		}
+		return nil
+	} else {
+		return errors.New("invalid delete, key is required")
+	}
+}
+
+func memcachedDeleteAllRecords(tableName string) error {
+	err := MemCachedDatabase.Delete(tableName)
+	if err != nil {
+		return err
+	}
+	err = memcachedCreateTable(tableName)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// func memcachedFetchRecord(tableName string, key string) (string, error) {
+// 	results, err := memcachedFetchRecords(tableName)
+// 	if err != nil {
+// 		return "", err
+// 	}
+// 	if results[key] == "" {
+// 		return "", errors.New(NO_RECORD)
+// 	}
+// 	return results[key], nil
+// }
+
+func memcachedFetchRecords(tableName string) (map[string]string, error) {
+	var records map[string]string
+	item, err := MemCachedDatabase.Get(tableName)
+	if err != nil {
+		return records, err
+	}
+	if err = json.Unmarshal(item.Value, &records); err != nil {
+		return nil, err
+	}
+
+	return records, nil
+}
+
+func memcachedCloseDB() {
+	// no op for this library..
+}
+
+func isValidIp(ipAddr string) bool {
+	return net.ParseIP(ipAddr) == nil
+}

+ 10 - 10
database/rqlite.go

@@ -83,16 +83,16 @@ func rqliteDeleteAllRecords(tableName string) error {
 	return nil
 }
 
-func rqliteFetchRecord(tableName string, key string) (string, error) {
-	results, err := FetchRecords(tableName)
-	if err != nil {
-		return "", err
-	}
-	if results[key] == "" {
-		return "", errors.New(NO_RECORD)
-	}
-	return results[key], nil
-}
+// func rqliteFetchRecord(tableName string, key string) (string, error) {
+// 	results, err := FetchRecords(tableName)
+// 	if err != nil {
+// 		return "", err
+// 	}
+// 	if results[key] == "" {
+// 		return "", errors.New(NO_RECORD)
+// 	}
+// 	return results[key], nil
+// }
 
 func rqliteFetchRecords(tableName string) (map[string]string, error) {
 	row, err := RQliteDatabase.QueryOne("SELECT * FROM " + tableName + " ORDER BY key")

+ 1 - 0
go.mod

@@ -4,6 +4,7 @@ go 1.15
 
 require (
 	github.com/aws/aws-sdk-go v1.34.28
+	github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
 	github.com/davecgh/go-spew v1.1.1
 	github.com/go-playground/validator/v10 v10.5.0
 	github.com/go-sql-driver/mysql v1.6.0 // indirect

+ 2 - 0
go.sum

@@ -5,6 +5,8 @@ github.com/akavel/rsrc v0.10.2/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxk
 github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
 github.com/aws/aws-sdk-go v1.34.28 h1:sscPpn/Ns3i0F4HPEWAVcwdIRaZZCuL7llJ2/60yPIk=
 github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48=
+github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0=
+github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA=
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
 github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=

+ 11 - 1
servercfg/serverconf.go

@@ -81,7 +81,8 @@ func GetVersion() string {
 }
 func GetDB() string {
 	database := "sqlite"
-	if os.Getenv("DATABASE") == "rqlite" {
+	if os.Getenv("DATABASE") == "rqlite" ||
+		os.Getenv("DATABASE") == "memcached" {
 		database = os.Getenv("DATABASE")
 	} else if config.Config.Server.Database == "rqlite" {
 		database = config.Config.Server.Database
@@ -263,6 +264,15 @@ func IsGRPCSSL() bool {
 	return isssl
 }
 
+func GetMemcachedAddress() string { // completely optional
+	if os.Getenv("MEMCACHED_ADDRESSES") != "" {
+		return os.Getenv("MEMCACHED_ADDRESSES")
+	} else if config.Config.Server.MemcachedAddresses != "" {
+		return config.Config.Server.MemcachedAddresses
+	}
+	return "127.0.0.1:11211"
+}
+
 func DisableRemoteIPCheck() bool {
 	disabled := false
 	if os.Getenv("DISABLE_REMOTE_IP_CHECK") != "" {

+ 1 - 1
test/nodecreate.sh

@@ -3,7 +3,7 @@
 PUBKEY="DM5qhLAE20EG9BbfBEger+Ac9D2NDOwCtY1rbYDLf34="
 IPADDR="70.173.21.212"
 MACADDRESS="59:23:9c:f2:e4:49"
-ACCESSKEY="DYrXDkNuXC3XQ27J"
+ACCESSKEY="k4eybLwjDJreKM7N"
 PASSWORD="ppppppp"
 
 generate_post_json ()