123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- package mq
- import (
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net/http"
- "os"
- "time"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "github.com/gravitl/netmaker/logger"
- "github.com/gravitl/netmaker/logic"
- "github.com/gravitl/netmaker/models"
- "github.com/gravitl/netmaker/servercfg"
- "golang.org/x/exp/slog"
- )
- func setupmqtt_old() (mqtt.Client, error) {
- opts := mqtt.NewClientOptions()
- opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT"))
- id := logic.RandomString(23)
- opts.ClientID = id
- opts.SetUsername(os.Getenv("OLD_MQ_USERNAME"))
- opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD"))
- opts.SetAutoReconnect(true)
- opts.SetConnectRetry(true)
- opts.SetConnectRetryInterval(time.Second << 2)
- opts.SetKeepAlive(time.Minute)
- opts.SetWriteTimeout(time.Minute)
- mqclient := mqtt.NewClient(opts)
- var connecterr error
- if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
- if token.Error() == nil {
- connecterr = errors.New("connect timeout")
- } else {
- connecterr = token.Error()
- }
- slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr)
- }
- return mqclient, nil
- }
- func getEmqxAuthTokenOld() (string, error) {
- payload, err := json.Marshal(&emqxLogin{
- Username: os.Getenv("OLD_MQ_USERNAME"),
- Password: os.Getenv("OLD_MQ_PASSWORD"),
- })
- if err != nil {
- return "", err
- }
- resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload))
- if err != nil {
- return "", err
- }
- msg, err := io.ReadAll(resp.Body)
- if err != nil {
- return "", err
- }
- if resp.StatusCode != http.StatusOK {
- return "", fmt.Errorf("error during EMQX login %v", string(msg))
- }
- var loginResp emqxLoginResponse
- if err := json.Unmarshal(msg, &loginResp); err != nil {
- return "", err
- }
- return loginResp.Token, nil
- }
- func SendPullSYN() error {
- mqclient, err := setupmqtt_old()
- if err != nil {
- return err
- }
- hosts, err := logic.GetAllHosts()
- if err != nil {
- return err
- }
- for _, host := range hosts {
- host := host
- hostUpdate := models.HostUpdate{
- Action: models.RequestPull,
- Host: host,
- }
- msg, _ := json.Marshal(hostUpdate)
- encrypted, encryptErr := encryptMsg(&host, msg)
- if encryptErr != nil {
- continue
- }
- logger.Log(0, "sending pull syn to", host.Name)
- mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted)
- }
- return nil
- }
- func KickOutClients() error {
- authToken, err := getEmqxAuthTokenOld()
- if err != nil {
- return err
- }
- hosts, err := logic.GetAllHosts()
- if err != nil {
- slog.Error("failed to migrate emqx: ", "error", err)
- return err
- }
- for _, host := range hosts {
- url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String())
- client := &http.Client{}
- req, err := http.NewRequest(http.MethodDelete, url, nil)
- if err != nil {
- slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err)
- continue
- }
- req.Header.Add("Authorization", "Bearer "+authToken)
- res, err := client.Do(req)
- if err != nil {
- slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err)
- continue
- }
- if res.StatusCode != http.StatusNoContent {
- slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode)
- }
- res.Body.Close()
- }
- return nil
- }
|