Browse Source

add emqx migration func

abhishek9686 1 year ago
parent
commit
eb28faf669
2 changed files with 86 additions and 0 deletions
  1. 21 0
      migrate/migrate.go
  2. 65 0
      mq/migrate.go

+ 21 - 0
migrate/migrate.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
 	"log"
 	"log"
+	"os"
 
 
 	"golang.org/x/exp/slog"
 	"golang.org/x/exp/slog"
 
 
@@ -12,6 +13,7 @@ import (
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/logic/acls"
 	"github.com/gravitl/netmaker/logic/acls"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/models"
+	"github.com/gravitl/netmaker/mq"
 	"github.com/gravitl/netmaker/servercfg"
 	"github.com/gravitl/netmaker/servercfg"
 )
 )
 
 
@@ -22,6 +24,9 @@ func Run() {
 	updateHosts()
 	updateHosts()
 	updateNodes()
 	updateNodes()
 	updateAcls()
 	updateAcls()
+	if os.Getenv("MIGRATE_EMQX") == "true" {
+		migrateEmqx()
+	}
 }
 }
 
 
 func assignSuperAdmin() {
 func assignSuperAdmin() {
@@ -292,3 +297,19 @@ func updateAcls() {
 		slog.Info(fmt.Sprintf("(migration) successfully saved new acls for network: %s", network.NetID))
 		slog.Info(fmt.Sprintf("(migration) successfully saved new acls for network: %s", network.NetID))
 	}
 	}
 }
 }
+
+func migrateEmqx() {
+	hosts, err := logic.GetAllHosts()
+	if err != nil {
+		slog.Error("failed to migrate emqx: ", "error", err)
+		return
+	}
+	clientIDs := []string{}
+	for _, host := range hosts {
+		clientIDs = append(clientIDs, host.ID.String())
+	}
+	err = mq.KickOutClients(clientIDs)
+	if err != nil {
+		slog.Error("failed to migrate emqx: ", "kickout-error", err)
+	}
+}

+ 65 - 0
mq/migrate.go

@@ -0,0 +1,65 @@
+package mq
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io"
+	"net/http"
+	"os"
+
+	"golang.org/x/exp/slog"
+)
+
+func getEmqxAuthTokenOld() (string, error) {
+	payload, err := json.Marshal(&emqxLogin{
+		Username: os.Getenv("OLD_MQ_USERNAME"),
+		Password: os.Getenv("OLD_MQ_PASSWORD"),
+	})
+	if err != nil {
+		return "", err
+	}
+	resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload))
+	if err != nil {
+		return "", err
+	}
+	msg, err := io.ReadAll(resp.Body)
+	if err != nil {
+		return "", err
+	}
+	if resp.StatusCode != http.StatusOK {
+		return "", fmt.Errorf("error during EMQX login %v", string(msg))
+	}
+	var loginResp emqxLoginResponse
+	if err := json.Unmarshal(msg, &loginResp); err != nil {
+		return "", err
+	}
+	return loginResp.Token, nil
+}
+
+func KickOutClients(clientIDs []string) error {
+	authToken, err := getEmqxAuthTokenOld()
+	if err != nil {
+		return err
+	}
+	for _, clientID := range clientIDs {
+		url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), clientID)
+		client := &http.Client{}
+		req, err := http.NewRequest(http.MethodDelete, url, nil)
+		if err != nil {
+			slog.Error("failed to kick out client:", "client", clientID, "error", err)
+			continue
+		}
+		req.Header.Add("Authorization", "Bearer "+authToken)
+		res, err := client.Do(req)
+		if err != nil {
+			slog.Error("failed to kick out client:", "client", clientID, "req-error", err)
+			continue
+		}
+		if res.StatusCode != http.StatusNoContent {
+			slog.Error("failed to kick out client:", "client", clientID, "status-code", res.StatusCode)
+		}
+		res.Body.Close()
+	}
+	return nil
+}