Browse Source

Merge pull request #2844 from gravitl/release-v0.23.0

Release v0.23.0
Abhishek K 1 year ago
parent
commit
37e877c553

+ 7 - 0
controllers/controller.go

@@ -4,6 +4,7 @@ import (
 	"context"
 	"fmt"
 	"net/http"
+	"os"
 	"strings"
 	"sync"
 	"time"
@@ -11,6 +12,7 @@ import (
 	"github.com/gorilla/handlers"
 	"github.com/gorilla/mux"
 	"github.com/gravitl/netmaker/logger"
+	m "github.com/gravitl/netmaker/migrate"
 	"github.com/gravitl/netmaker/servercfg"
 )
 
@@ -62,6 +64,11 @@ func HandleRESTRequests(wg *sync.WaitGroup, ctx context.Context) {
 			logger.Log(0, err.Error())
 		}
 	}()
+	if os.Getenv("MIGRATE_EMQX") == "true" {
+		logger.Log(0, "migrating emqx...")
+		time.Sleep(time.Second * 2)
+		m.MigrateEmqx()
+	}
 	logger.Log(0, "REST Server successfully started on port ", port, " (REST)")
 
 	// Block main routine until a signal is received

+ 11 - 0
controllers/node.go

@@ -645,6 +645,17 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 
 	}
 	relayUpdate := logic.RelayUpdates(&currentNode, newNode)
+	if relayUpdate && newNode.IsRelay {
+		err = logic.ValidateRelay(models.RelayRequest{
+			NodeID:       newNode.ID.String(),
+			NetID:        newNode.Network,
+			RelayedNodes: newNode.RelayedNodes,
+		}, true)
+		if err != nil {
+			logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
+			return
+		}
+	}
 	_, err = logic.GetHost(newNode.HostID.String())
 	if err != nil {
 		logger.Log(0, r.Header.Get("user"),

+ 0 - 2
logic/acls/nodeacls/modify.go

@@ -77,14 +77,12 @@ func RemoveNodeACL(networkID NetworkID, nodeID NodeID) (acls.ACLContainer, error
 	if err != nil {
 		return nil, err
 	}
-	acls.AclMutex.Lock()
 	for currentNodeID := range currentNetworkACL {
 		if NodeID(currentNodeID) != nodeID {
 			currentNetworkACL[currentNodeID].Remove(acls.AclID(nodeID))
 		}
 	}
 	delete(currentNetworkACL, acls.AclID(nodeID))
-	acls.AclMutex.Unlock()
 	return currentNetworkACL.Save(acls.ContainerID(networkID))
 }
 

+ 3 - 1
logic/acls/nodeacls/retrieve.go

@@ -15,8 +15,10 @@ func AreNodesAllowed(networkID NetworkID, node1, node2 NodeID) bool {
 	}
 	var allowed bool
 	acls.AclMutex.RLock()
-	allowed = currentNetworkACL[acls.AclID(node1)].IsAllowed(acls.AclID(node2)) && currentNetworkACL[acls.AclID(node2)].IsAllowed(acls.AclID(node1))
+	currNetworkACLNode1 := currentNetworkACL[acls.AclID(node1)]
+	currNetworkACLNode2 := currentNetworkACL[acls.AclID(node2)]
 	acls.AclMutex.RUnlock()
+	allowed = currNetworkACLNode1.IsAllowed(acls.AclID(node2)) && currNetworkACLNode2.IsAllowed(acls.AclID(node1))
 	return allowed
 }
 

+ 0 - 1
logic/hosts.go

@@ -418,7 +418,6 @@ func DissasociateNodeFromHost(n *models.Node, h *models.Host) error {
 	if err := DeleteNodeByID(n); err != nil {
 		return err
 	}
-
 	return UpsertHost(h)
 }
 

+ 0 - 3
logic/nodes.go

@@ -189,7 +189,6 @@ func UpdateNode(currentNode *models.Node, newNode *models.Node) error {
 func DeleteNode(node *models.Node, purge bool) error {
 	alreadyDeleted := node.PendingDelete || node.Action == models.NODE_DELETE
 	node.Action = models.NODE_DELETE
-
 	//delete ext clients if node is ingress gw
 	if node.IsIngressGateway {
 		if err := DeleteGatewayExtClients(node.ID.String(), node.Network); err != nil {
@@ -235,7 +234,6 @@ func DeleteNode(node *models.Node, purge bool) error {
 	if node.IsInternetGateway {
 		UnsetInternetGw(node)
 	}
-
 	if !purge && !alreadyDeleted {
 		newnode := *node
 		newnode.PendingDelete = true
@@ -281,7 +279,6 @@ func GetNodeByHostRef(hostid, network string) (node models.Node, err error) {
 func DeleteNodeByID(node *models.Node) error {
 	var err error
 	var key = node.ID.String()
-
 	if err = database.DeleteRecord(database.NODES_TABLE_NAME, key); err != nil {
 		if !database.IsEmptyRecord(err) {
 			return err

+ 4 - 0
logic/relay.go

@@ -28,3 +28,7 @@ var SetRelayedNodes = func(setRelayed bool, relay string, relayed []string) []mo
 var RelayUpdates = func(currentNode, newNode *models.Node) bool {
 	return false
 }
+
+var ValidateRelay = func(relay models.RelayRequest, update bool) error {
+	return nil
+}

+ 19 - 0
migrate/migrate.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"log"
+	"time"
 
 	"golang.org/x/exp/slog"
 
@@ -12,6 +13,7 @@ import (
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/logic/acls"
 	"github.com/gravitl/netmaker/models"
+	"github.com/gravitl/netmaker/mq"
 	"github.com/gravitl/netmaker/servercfg"
 )
 
@@ -22,6 +24,7 @@ func Run() {
 	updateHosts()
 	updateNodes()
 	updateAcls()
+
 }
 
 func assignSuperAdmin() {
@@ -292,3 +295,19 @@ func updateAcls() {
 		slog.Info(fmt.Sprintf("(migration) successfully saved new acls for network: %s", network.NetID))
 	}
 }
+
+func MigrateEmqx() {
+
+	err := mq.SendPullSYN()
+	if err != nil {
+		logger.Log(0, "failed to send pull syn to clients", "error", err.Error())
+
+	}
+	time.Sleep(time.Second * 3)
+	slog.Info("proceeding to kicking out clients from emqx")
+	err = mq.KickOutClients()
+	if err != nil {
+		logger.Log(0, "failed to migrate emqx: ", "kickout-error", err.Error())
+	}
+
+}

+ 131 - 0
mq/migrate.go

@@ -0,0 +1,131 @@
+package mq
+
+import (
+	"bytes"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"net/http"
+	"os"
+	"time"
+
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"github.com/gravitl/netmaker/logger"
+	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/models"
+	"github.com/gravitl/netmaker/servercfg"
+	"golang.org/x/exp/slog"
+)
+
+func setupmqtt_old() (mqtt.Client, error) {
+
+	opts := mqtt.NewClientOptions()
+	opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT"))
+	id := logic.RandomString(23)
+	opts.ClientID = id
+	opts.SetUsername(os.Getenv("OLD_MQ_USERNAME"))
+	opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD"))
+	opts.SetAutoReconnect(true)
+	opts.SetConnectRetry(true)
+	opts.SetConnectRetryInterval(time.Second << 2)
+	opts.SetKeepAlive(time.Minute)
+	opts.SetWriteTimeout(time.Minute)
+	mqclient := mqtt.NewClient(opts)
+
+	var connecterr error
+	if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
+		if token.Error() == nil {
+			connecterr = errors.New("connect timeout")
+		} else {
+			connecterr = token.Error()
+		}
+		slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr)
+	}
+	return mqclient, nil
+}
+
+func getEmqxAuthTokenOld() (string, error) {
+	payload, err := json.Marshal(&emqxLogin{
+		Username: os.Getenv("OLD_MQ_USERNAME"),
+		Password: os.Getenv("OLD_MQ_PASSWORD"),
+	})
+	if err != nil {
+		return "", err
+	}
+	resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload))
+	if err != nil {
+		return "", err
+	}
+	msg, err := io.ReadAll(resp.Body)
+	if err != nil {
+		return "", err
+	}
+	if resp.StatusCode != http.StatusOK {
+		return "", fmt.Errorf("error during EMQX login %v", string(msg))
+	}
+	var loginResp emqxLoginResponse
+	if err := json.Unmarshal(msg, &loginResp); err != nil {
+		return "", err
+	}
+	return loginResp.Token, nil
+}
+
+func SendPullSYN() error {
+	mqclient, err := setupmqtt_old()
+	if err != nil {
+		return err
+	}
+	hosts, err := logic.GetAllHosts()
+	if err != nil {
+		return err
+	}
+	for _, host := range hosts {
+		host := host
+		hostUpdate := models.HostUpdate{
+			Action: models.RequestPull,
+			Host:   host,
+		}
+		msg, _ := json.Marshal(hostUpdate)
+		encrypted, encryptErr := encryptMsg(&host, msg)
+		if encryptErr != nil {
+			continue
+		}
+		logger.Log(0, "sending pull syn to", host.Name)
+		mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted)
+	}
+	return nil
+}
+
+func KickOutClients() error {
+	authToken, err := getEmqxAuthTokenOld()
+	if err != nil {
+		return err
+	}
+	hosts, err := logic.GetAllHosts()
+	if err != nil {
+		slog.Error("failed to migrate emqx: ", "error", err)
+		return err
+	}
+
+	for _, host := range hosts {
+		url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String())
+		client := &http.Client{}
+		req, err := http.NewRequest(http.MethodDelete, url, nil)
+		if err != nil {
+			slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err)
+			continue
+		}
+		req.Header.Add("Authorization", "Bearer "+authToken)
+		res, err := client.Do(req)
+		if err != nil {
+			slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err)
+			continue
+		}
+		if res.StatusCode != http.StatusNoContent {
+			slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode)
+		}
+		res.Body.Close()
+	}
+	return nil
+}

+ 1 - 0
pro/initialize.go

@@ -101,6 +101,7 @@ func InitPro() {
 	logic.UpdateRelayed = proLogic.UpdateRelayed
 	logic.SetRelayedNodes = proLogic.SetRelayedNodes
 	logic.RelayUpdates = proLogic.RelayUpdates
+	logic.ValidateRelay = proLogic.ValidateRelay
 	logic.GetTrialEndDate = getTrialEndDate
 	logic.SetDefaultGw = proLogic.SetDefaultGw
 	logic.SetDefaultGwForRelayedUpdate = proLogic.SetDefaultGwForRelayedUpdate

+ 6 - 3
pro/logic/relays.go

@@ -44,7 +44,7 @@ func CreateRelay(relay models.RelayRequest) ([]models.Node, models.Node, error)
 	if host.OS != "linux" {
 		return returnnodes, models.Node{}, fmt.Errorf("only linux machines can be relay nodes")
 	}
-	err = ValidateRelay(relay)
+	err = ValidateRelay(relay, false)
 	if err != nil {
 		return returnnodes, models.Node{}, err
 	}
@@ -101,14 +101,14 @@ func SetRelayedNodes(setRelayed bool, relay string, relayed []string) []models.N
 // }
 
 // ValidateRelay - checks if relay is valid
-func ValidateRelay(relay models.RelayRequest) error {
+func ValidateRelay(relay models.RelayRequest, update bool) error {
 	var err error
 
 	node, err := logic.GetNodeByID(relay.NodeID)
 	if err != nil {
 		return err
 	}
-	if node.IsRelay {
+	if !update && node.IsRelay {
 		return errors.New("node is already acting as a relay")
 	}
 	for _, relayedNodeID := range relay.RelayedNodes {
@@ -119,6 +119,9 @@ func ValidateRelay(relay models.RelayRequest) error {
 		if relayedNode.IsIngressGateway {
 			return errors.New("cannot relay an ingress gateway (" + relayedNodeID + ")")
 		}
+		if relayedNode.IsInternetGateway {
+			return errors.New("cannot relay an internet gateway (" + relayedNodeID + ")")
+		}
 	}
 	return err
 }