Browse Source

send pull syn over old mq for emqx migration

abhishek9686 1 year ago
parent
commit
4630925182
2 changed files with 79 additions and 17 deletions
  1. 9 11
      migrate/migrate.go
  2. 70 6
      mq/migrate.go

+ 9 - 11
migrate/migrate.go

@@ -299,17 +299,15 @@ func updateAcls() {
 }
 
 func migrateEmqx() {
-	hosts, err := logic.GetAllHosts()
-	if err != nil {
-		slog.Error("failed to migrate emqx: ", "error", err)
-		return
-	}
-	clientIDs := []string{}
-	for _, host := range hosts {
-		clientIDs = append(clientIDs, host.ID.String())
-	}
-	err = mq.KickOutClients(clientIDs)
+
+	err := mq.SendPullSYN()
 	if err != nil {
-		slog.Error("failed to migrate emqx: ", "kickout-error", err)
+		slog.Error("failed to send pull syn to clients", "error", err)
+		slog.Info("proceeding to kicking out clients from emqx")
+		err := mq.KickOutClients()
+		if err != nil {
+			slog.Error("failed to migrate emqx: ", "kickout-error", err)
+		}
 	}
+
 }

+ 70 - 6
mq/migrate.go

@@ -3,14 +3,47 @@ package mq
 import (
 	"bytes"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"io"
 	"net/http"
 	"os"
+	"time"
 
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/models"
+	"github.com/gravitl/netmaker/servercfg"
 	"golang.org/x/exp/slog"
 )
 
+func setupmqtt_old() (mqtt.Client, error) {
+
+	opts := mqtt.NewClientOptions()
+	opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT"))
+	id := logic.RandomString(23)
+	opts.ClientID = id
+	opts.SetUsername(os.Getenv("OLD_MQ_USERNAME"))
+	opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD"))
+	opts.SetAutoReconnect(true)
+	opts.SetConnectRetry(true)
+	opts.SetConnectRetryInterval(time.Second << 2)
+	opts.SetKeepAlive(time.Minute)
+	opts.SetWriteTimeout(time.Minute)
+	mqclient := mqtt.NewClient(opts)
+
+	var connecterr error
+	if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
+		if token.Error() == nil {
+			connecterr = errors.New("connect timeout")
+		} else {
+			connecterr = token.Error()
+		}
+		slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr)
+	}
+	return mqclient, nil
+}
+
 func getEmqxAuthTokenOld() (string, error) {
 	payload, err := json.Marshal(&emqxLogin{
 		Username: os.Getenv("OLD_MQ_USERNAME"),
@@ -37,27 +70,58 @@ func getEmqxAuthTokenOld() (string, error) {
 	return loginResp.Token, nil
 }
 
-func KickOutClients(clientIDs []string) error {
+func SendPullSYN() error {
+	mqclient, err := setupmqtt_old()
+	if err != nil {
+		return err
+	}
+	hosts, err := logic.GetAllHosts()
+	if err != nil {
+		return err
+	}
+	for _, host := range hosts {
+		host := host
+		hostUpdate := models.HostUpdate{
+			Action: models.RequestPull,
+			Host:   host,
+		}
+		msg, _ := json.Marshal(hostUpdate)
+		encrypted, encryptErr := encryptMsg(&host, msg)
+		if encryptErr != nil {
+			continue
+		}
+		mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted)
+	}
+	return nil
+}
+
+func KickOutClients() error {
 	authToken, err := getEmqxAuthTokenOld()
 	if err != nil {
 		return err
 	}
-	for _, clientID := range clientIDs {
-		url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), clientID)
+	hosts, err := logic.GetAllHosts()
+	if err != nil {
+		slog.Error("failed to migrate emqx: ", "error", err)
+		return err
+	}
+
+	for _, host := range hosts {
+		url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String())
 		client := &http.Client{}
 		req, err := http.NewRequest(http.MethodDelete, url, nil)
 		if err != nil {
-			slog.Error("failed to kick out client:", "client", clientID, "error", err)
+			slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err)
 			continue
 		}
 		req.Header.Add("Authorization", "Bearer "+authToken)
 		res, err := client.Do(req)
 		if err != nil {
-			slog.Error("failed to kick out client:", "client", clientID, "req-error", err)
+			slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err)
 			continue
 		}
 		if res.StatusCode != http.StatusNoContent {
-			slog.Error("failed to kick out client:", "client", clientID, "status-code", res.StatusCode)
+			slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode)
 		}
 		res.Body.Close()
 	}