|
@@ -0,0 +1,131 @@
|
|
|
|
+package mq
|
|
|
|
+
|
|
|
|
+import (
|
|
|
|
+ "bytes"
|
|
|
|
+ "encoding/json"
|
|
|
|
+ "errors"
|
|
|
|
+ "fmt"
|
|
|
|
+ "io"
|
|
|
|
+ "net/http"
|
|
|
|
+ "os"
|
|
|
|
+ "time"
|
|
|
|
+
|
|
|
|
+ mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
|
|
+ "github.com/gravitl/netmaker/logger"
|
|
|
|
+ "github.com/gravitl/netmaker/logic"
|
|
|
|
+ "github.com/gravitl/netmaker/models"
|
|
|
|
+ "github.com/gravitl/netmaker/servercfg"
|
|
|
|
+ "golang.org/x/exp/slog"
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+func setupmqtt_old() (mqtt.Client, error) {
|
|
|
|
+
|
|
|
|
+ opts := mqtt.NewClientOptions()
|
|
|
|
+ opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT"))
|
|
|
|
+ id := logic.RandomString(23)
|
|
|
|
+ opts.ClientID = id
|
|
|
|
+ opts.SetUsername(os.Getenv("OLD_MQ_USERNAME"))
|
|
|
|
+ opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD"))
|
|
|
|
+ opts.SetAutoReconnect(true)
|
|
|
|
+ opts.SetConnectRetry(true)
|
|
|
|
+ opts.SetConnectRetryInterval(time.Second << 2)
|
|
|
|
+ opts.SetKeepAlive(time.Minute)
|
|
|
|
+ opts.SetWriteTimeout(time.Minute)
|
|
|
|
+ mqclient := mqtt.NewClient(opts)
|
|
|
|
+
|
|
|
|
+ var connecterr error
|
|
|
|
+ if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
|
|
|
|
+ if token.Error() == nil {
|
|
|
|
+ connecterr = errors.New("connect timeout")
|
|
|
|
+ } else {
|
|
|
|
+ connecterr = token.Error()
|
|
|
|
+ }
|
|
|
|
+ slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr)
|
|
|
|
+ }
|
|
|
|
+ return mqclient, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func getEmqxAuthTokenOld() (string, error) {
|
|
|
|
+ payload, err := json.Marshal(&emqxLogin{
|
|
|
|
+ Username: os.Getenv("OLD_MQ_USERNAME"),
|
|
|
|
+ Password: os.Getenv("OLD_MQ_PASSWORD"),
|
|
|
|
+ })
|
|
|
|
+ if err != nil {
|
|
|
|
+ return "", err
|
|
|
|
+ }
|
|
|
|
+ resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload))
|
|
|
|
+ if err != nil {
|
|
|
|
+ return "", err
|
|
|
|
+ }
|
|
|
|
+ msg, err := io.ReadAll(resp.Body)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return "", err
|
|
|
|
+ }
|
|
|
|
+ if resp.StatusCode != http.StatusOK {
|
|
|
|
+ return "", fmt.Errorf("error during EMQX login %v", string(msg))
|
|
|
|
+ }
|
|
|
|
+ var loginResp emqxLoginResponse
|
|
|
|
+ if err := json.Unmarshal(msg, &loginResp); err != nil {
|
|
|
|
+ return "", err
|
|
|
|
+ }
|
|
|
|
+ return loginResp.Token, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func SendPullSYN() error {
|
|
|
|
+ mqclient, err := setupmqtt_old()
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ hosts, err := logic.GetAllHosts()
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ for _, host := range hosts {
|
|
|
|
+ host := host
|
|
|
|
+ hostUpdate := models.HostUpdate{
|
|
|
|
+ Action: models.RequestPull,
|
|
|
|
+ Host: host,
|
|
|
|
+ }
|
|
|
|
+ msg, _ := json.Marshal(hostUpdate)
|
|
|
|
+ encrypted, encryptErr := encryptMsg(&host, msg)
|
|
|
|
+ if encryptErr != nil {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ logger.Log(0, "sending pull syn to", host.Name)
|
|
|
|
+ mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted)
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func KickOutClients() error {
|
|
|
|
+ authToken, err := getEmqxAuthTokenOld()
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ hosts, err := logic.GetAllHosts()
|
|
|
|
+ if err != nil {
|
|
|
|
+ slog.Error("failed to migrate emqx: ", "error", err)
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for _, host := range hosts {
|
|
|
|
+ url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String())
|
|
|
|
+ client := &http.Client{}
|
|
|
|
+ req, err := http.NewRequest(http.MethodDelete, url, nil)
|
|
|
|
+ if err != nil {
|
|
|
|
+ slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ req.Header.Add("Authorization", "Bearer "+authToken)
|
|
|
|
+ res, err := client.Do(req)
|
|
|
|
+ if err != nil {
|
|
|
|
+ slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err)
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ if res.StatusCode != http.StatusNoContent {
|
|
|
|
+ slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode)
|
|
|
|
+ }
|
|
|
|
+ res.Body.Close()
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+}
|