migrate.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package mq
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "os"
  10. "time"
  11. mqtt "github.com/eclipse/paho.mqtt.golang"
  12. "github.com/gravitl/netmaker/logger"
  13. "github.com/gravitl/netmaker/logic"
  14. "github.com/gravitl/netmaker/models"
  15. "github.com/gravitl/netmaker/servercfg"
  16. "golang.org/x/exp/slog"
  17. )
  18. func setupmqtt_old() (mqtt.Client, error) {
  19. opts := mqtt.NewClientOptions()
  20. opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT"))
  21. id := logic.RandomString(23)
  22. opts.ClientID = id
  23. opts.SetUsername(os.Getenv("OLD_MQ_USERNAME"))
  24. opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD"))
  25. opts.SetAutoReconnect(true)
  26. opts.SetConnectRetry(true)
  27. opts.SetConnectRetryInterval(time.Second << 2)
  28. opts.SetKeepAlive(time.Minute)
  29. opts.SetWriteTimeout(time.Minute)
  30. mqclient := mqtt.NewClient(opts)
  31. var connecterr error
  32. if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
  33. if token.Error() == nil {
  34. connecterr = errors.New("connect timeout")
  35. } else {
  36. connecterr = token.Error()
  37. }
  38. slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr)
  39. }
  40. return mqclient, nil
  41. }
  42. func getEmqxAuthTokenOld() (string, error) {
  43. payload, err := json.Marshal(&emqxLogin{
  44. Username: os.Getenv("OLD_MQ_USERNAME"),
  45. Password: os.Getenv("OLD_MQ_PASSWORD"),
  46. })
  47. if err != nil {
  48. return "", err
  49. }
  50. resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload))
  51. if err != nil {
  52. return "", err
  53. }
  54. defer resp.Body.Close()
  55. msg, err := io.ReadAll(resp.Body)
  56. if err != nil {
  57. return "", err
  58. }
  59. if resp.StatusCode != http.StatusOK {
  60. return "", fmt.Errorf("error during EMQX login %v", string(msg))
  61. }
  62. var loginResp emqxLoginResponse
  63. if err := json.Unmarshal(msg, &loginResp); err != nil {
  64. return "", err
  65. }
  66. return loginResp.Token, nil
  67. }
  68. func SendPullSYN() error {
  69. mqclient, err := setupmqtt_old()
  70. if err != nil {
  71. return err
  72. }
  73. hosts, err := logic.GetAllHosts()
  74. if err != nil {
  75. return err
  76. }
  77. for _, host := range hosts {
  78. host := host
  79. hostUpdate := models.HostUpdate{
  80. Action: models.RequestPull,
  81. Host: host,
  82. }
  83. msg, _ := json.Marshal(hostUpdate)
  84. encrypted, encryptErr := encryptMsg(&host, msg)
  85. if encryptErr != nil {
  86. continue
  87. }
  88. logger.Log(0, "sending pull syn to", host.Name)
  89. mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted)
  90. }
  91. return nil
  92. }
  93. func KickOutClients() error {
  94. authToken, err := getEmqxAuthTokenOld()
  95. if err != nil {
  96. return err
  97. }
  98. hosts, err := logic.GetAllHosts()
  99. if err != nil {
  100. slog.Error("failed to migrate emqx: ", "error", err)
  101. return err
  102. }
  103. for _, host := range hosts {
  104. url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String())
  105. client := &http.Client{}
  106. req, err := http.NewRequest(http.MethodDelete, url, nil)
  107. if err != nil {
  108. slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err)
  109. continue
  110. }
  111. req.Header.Add("Authorization", "Bearer "+authToken)
  112. res, err := client.Do(req)
  113. if err != nil {
  114. slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err)
  115. continue
  116. }
  117. if res.StatusCode != http.StatusNoContent {
  118. slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode)
  119. }
  120. res.Body.Close()
  121. }
  122. return nil
  123. }