afeiszli преди 3 години
родител
ревизия
bafb71cd9b
променени са 3 файла, в които са добавени 207 реда и са изтрити 2 реда
  1. 3 2
      config/config.go
  2. 194 0
      database/etcd.go
  3. 10 0
      servercfg/serverconf.go

+ 3 - 2
config/config.go

@@ -57,8 +57,8 @@ type ServerConfig struct {
 	Version               string `yaml:"version"`
 	SQLConn               string `yaml:"sqlconn"`
 	Platform              string `yaml:"platform"`
-	Database              string `yaml:database`
-	CheckinInterval       string `yaml:checkininterval`
+	Database              string `yaml:"database"`
+	CheckinInterval       string `yaml:"checkininterval"`
 	DefaultNodeLimit      int32  `yaml:"defaultnodelimit"`
 	Verbosity             int32  `yaml:"verbosity"`
 	ServerCheckinInterval int64  `yaml:"servercheckininterval"`
@@ -66,6 +66,7 @@ type ServerConfig struct {
 	ClientID              string `yaml:"clientid"`
 	ClientSecret          string `yaml:"clientsecret"`
 	FrontendURL           string `yaml:"frontendurl"`
+	EtcdAddresses         string `yaml:"etcdaddresses"`
 }
 
 // Generic SQL Config

+ 194 - 0
database/etcd.go

@@ -0,0 +1,194 @@
+package database
+
+import (
+	"encoding/json"
+	"errors"
+	"log"
+	"net"
+	"strings"
+	"time"
+
+	"github.com/coreos/etcd/clientv3"
+	"github.com/gravitl/netmaker/servercfg"
+	"google.golang.org/appengine/memcache"
+)
+
+var EtcdDatabase *clientv3.Client
+var KV *clientv3.KV
+
+var ETCD_FUNCTIONS = map[string]interface{}{
+	INIT_DB:      initEtcdDatabase,
+	CREATE_TABLE: etcdCreateTable,
+	INSERT:       etcdInsert,
+	INSERT_PEER:  etcdInsertPeer,
+	DELETE:       etcdDeleteRecord,
+	DELETE_ALL:   etcdDeleteAllRecords,
+	FETCH_ALL:    etcdFetchRecords,
+	CLOSE_DB:     etcdCloseDB,
+}
+
+// utility function to make setting etcd servers easier
+func parseEtcdAddresses(addresses string) string {
+	addressesArr := strings.Split(addresses, ",")
+	numAddresses := len(addressesArr)
+	if numAddresses == 0 {
+		return "127.0.0.1:2379"
+	}
+	newAddresses := ""
+	log.SetFlags(log.Flags() &^ (log.Llongfile | log.Lshortfile))
+	for _, address := range addressesArr {
+		if isValidIp(address) {
+			newAddresses += address
+			if servercfg.GetVerbose() >= 2 {
+				log.Println("adding " + address + " to etcd servers")
+			}
+			if address != addressesArr[numAddresses-1] {
+				newAddresses += ","
+			}
+		}
+	}
+	return newAddresses
+}
+
+func initEtcdDatabase() error {
+	addresses := parseEtcdAddresses(servercfg.GetEtcdAddresses())
+	var err error
+	EtcdDatabase, err = clientv3.New(clientv3.Config{
+		Endpoints:   []string{addresses},
+		DialTimeout: 5 * time.Second,
+	})
+	if err != nil {
+		return err
+	} else if EtcdDatabase == nil {
+		return errors.New("could not initialize etcd")
+	}
+	EtcdDatabase.Timeout = time.Minute
+	clientv3.NewKV(EtcdDatabase)
+	return nil
+}
+
+func etcdCreateTable(tableName string) error {
+
+	if currentTable, err := etcdFetchRecords(tableName); (currentTable != nil && len(currentTable) >= 0) || err != nil {
+		// return if it already exists
+		return err
+	} else {
+		log.Println(currentTable)
+	}
+	table := make(map[string]string)
+	newTable, err := json.Marshal(table)
+	if err != nil {
+		return err
+	}
+	kv := clientv3.NewKV(EtcdDatabase)
+	err = EtcdDatabase.Set(&memcache.Item{Key: tableName, Value: newTable, Expiration: 0})
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func etcdInsert(key string, value string, tableName string) error {
+	if key != "" && value != "" && IsJSONString(value) {
+		preData, err := EtcdDatabase.Get(tableName)
+		if err != nil {
+			return err
+		}
+		var preDataMap map[string]string
+		if err := json.Unmarshal(preData.Value, &preDataMap); err != nil {
+			return err
+		}
+		preDataMap[key] = value
+		postData, err := json.Marshal(&preDataMap)
+		if err != nil {
+			return err
+		}
+		err = EtcdDatabase.Replace(&memcache.Item{Key: tableName, Value: postData, Expiration: 0})
+		if err != nil {
+			return err
+		}
+		return nil
+	} else {
+		return errors.New("invalid insert " + key + " : " + value)
+	}
+}
+
+func etcdInsertPeer(key string, value string) error {
+	if key != "" && value != "" && IsJSONString(value) {
+		if err := etcdInsert(key, value, PEERS_TABLE_NAME); err != nil {
+			return err
+		}
+		return nil
+	} else {
+		return errors.New("invalid peer insert " + key + " : " + value)
+	}
+}
+
+func etcdDeleteRecord(tableName string, key string) error {
+	if key != "" {
+		preData, err := EtcdDatabase.Get(tableName)
+		if err != nil {
+			return err
+		}
+		var preDataMap map[string]string
+		if err := json.Unmarshal(preData.Value, &preDataMap); err != nil {
+			return err
+		}
+		delete(preDataMap, key)
+		postData, err := json.Marshal(&preDataMap)
+		if err != nil {
+			return err
+		}
+		err = EtcdDatabase.Set(&memcache.Item{Key: tableName, Value: postData, Expiration: 0})
+		if err != nil {
+			return err
+		}
+		return nil
+	} else {
+		return errors.New("invalid delete, key is required")
+	}
+}
+
+func etcdDeleteAllRecords(tableName string) error {
+	err := EtcdDatabase.Delete(tableName)
+	if err != nil {
+		return err
+	}
+	err = etcdCreateTable(tableName)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// func etcdFetchRecord(tableName string, key string) (string, error) {
+// 	results, err := etcdFetchRecords(tableName)
+// 	if err != nil {
+// 		return "", err
+// 	}
+// 	if results[key] == "" {
+// 		return "", errors.New(NO_RECORD)
+// 	}
+// 	return results[key], nil
+// }
+
+func etcdFetchRecords(tableName string) (map[string]string, error) {
+	var records map[string]string
+	item, err := EtcdDatabase.Get(tableName)
+	if err != nil {
+		return records, err
+	}
+	if err = json.Unmarshal(item.Value, &records); err != nil {
+		return nil, err
+	}
+
+	return records, nil
+}
+
+func etcdCloseDB() {
+	// no op for this library..
+}
+
+func isValidIp(ipAddr string) bool {
+	return net.ParseIP(ipAddr) == nil
+}

+ 10 - 0
servercfg/serverconf.go

@@ -484,6 +484,16 @@ func GetAuthProviderInfo() []string {
 	return []string{"", "", ""}
 }
 
+func GetEtcdAddresses() string {
+	addresses := "127.0.0.1:2379"
+	if os.Getenv("ETCD_ADDRESSES") != "" {
+		addresses = os.Getenv("ETCD_ADDRESSES")
+	} else if config.Config.Server.EtcdAddresses != "" {
+		addresses = config.Config.Server.EtcdAddresses
+	}
+	return addresses
+}
+
 // GetMacAddr - get's mac address
 func getMacAddr() string {
 	ifas, err := net.Interfaces()