| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 | package mqimport (	"bytes"	"encoding/json"	"errors"	"fmt"	"io"	"net/http"	"os"	"time"	mqtt "github.com/eclipse/paho.mqtt.golang"	"github.com/gravitl/netmaker/logger"	"github.com/gravitl/netmaker/logic"	"github.com/gravitl/netmaker/models"	"github.com/gravitl/netmaker/servercfg"	"golang.org/x/exp/slog")func setupmqtt_old() (mqtt.Client, error) {	opts := mqtt.NewClientOptions()	opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT"))	id := logic.RandomString(23)	opts.ClientID = id	opts.SetUsername(os.Getenv("OLD_MQ_USERNAME"))	opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD"))	opts.SetAutoReconnect(true)	opts.SetConnectRetry(true)	opts.SetConnectRetryInterval(time.Second << 2)	opts.SetKeepAlive(time.Minute)	opts.SetWriteTimeout(time.Minute)	mqclient := mqtt.NewClient(opts)	var connecterr error	if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {		if token.Error() == nil {			connecterr = errors.New("connect timeout")		} else {			connecterr = token.Error()		}		slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr)	}	return mqclient, nil}func getEmqxAuthTokenOld() (string, error) {	payload, err := json.Marshal(&emqxLogin{		Username: os.Getenv("OLD_MQ_USERNAME"),		Password: os.Getenv("OLD_MQ_PASSWORD"),	})	if err != nil {		return "", err	}	resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload))	if err != nil {		return "", err	}	defer resp.Body.Close()	msg, err := io.ReadAll(resp.Body)	if err != nil {		return "", err	}	if resp.StatusCode != http.StatusOK {		return "", fmt.Errorf("error during EMQX login %v", string(msg))	}	var loginResp emqxLoginResponse	if err := json.Unmarshal(msg, &loginResp); err != nil {		return "", err	}	return loginResp.Token, nil}func SendPullSYN() error {	mqclient, err := setupmqtt_old()	if err != nil {		return err	}	hosts, err := logic.GetAllHosts()	if err != nil {		return err	}	for _, host := range hosts {		host := host		hostUpdate := models.HostUpdate{			Action: models.RequestPull,			Host:   host,		}		msg, _ := json.Marshal(hostUpdate)		encrypted, encryptErr := encryptMsg(&host, msg)		if encryptErr != nil {			continue		}		logger.Log(0, "sending pull syn to", host.Name)		mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted)	}	return nil}func KickOutClients() error {	authToken, err := getEmqxAuthTokenOld()	if err != nil {		return err	}	hosts, err := logic.GetAllHosts()	if err != nil {		slog.Error("failed to migrate emqx: ", "error", err)		return err	}	for _, host := range hosts {		url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String())		client := &http.Client{}		req, err := http.NewRequest(http.MethodDelete, url, nil)		if err != nil {			slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err)			continue		}		req.Header.Add("Authorization", "Bearer "+authToken)		res, err := client.Do(req)		if err != nil {			slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err)			continue		}		if res.StatusCode != http.StatusNoContent {			slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode)		}		res.Body.Close()	}	return nil}
 |