migrate.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package mq
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "os"
  10. "time"
  11. mqtt "github.com/eclipse/paho.mqtt.golang"
  12. "github.com/gravitl/netmaker/logger"
  13. "github.com/gravitl/netmaker/logic"
  14. "github.com/gravitl/netmaker/models"
  15. "github.com/gravitl/netmaker/netclient/ncutils"
  16. "github.com/gravitl/netmaker/servercfg"
  17. "golang.org/x/exp/slog"
  18. )
  19. func setupmqtt_old() (mqtt.Client, error) {
  20. opts := mqtt.NewClientOptions()
  21. opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT"))
  22. id := logic.RandomString(23)
  23. opts.ClientID = id
  24. opts.SetUsername(os.Getenv("OLD_MQ_USERNAME"))
  25. opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD"))
  26. opts.SetAutoReconnect(true)
  27. opts.SetConnectRetry(true)
  28. opts.SetConnectRetryInterval(time.Second << 2)
  29. opts.SetKeepAlive(time.Minute)
  30. opts.SetWriteTimeout(time.Minute)
  31. mqclient := mqtt.NewClient(opts)
  32. var connecterr error
  33. if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
  34. if token.Error() == nil {
  35. connecterr = errors.New("connect timeout")
  36. } else {
  37. connecterr = token.Error()
  38. }
  39. slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr)
  40. }
  41. return mqclient, nil
  42. }
  43. func getEmqxAuthTokenOld() (string, error) {
  44. payload, err := json.Marshal(&emqxLogin{
  45. Username: os.Getenv("OLD_MQ_USERNAME"),
  46. Password: os.Getenv("OLD_MQ_PASSWORD"),
  47. })
  48. if err != nil {
  49. return "", err
  50. }
  51. resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload))
  52. if err != nil {
  53. return "", err
  54. }
  55. defer resp.Body.Close()
  56. msg, err := io.ReadAll(resp.Body)
  57. if err != nil {
  58. return "", err
  59. }
  60. if resp.StatusCode != http.StatusOK {
  61. return "", fmt.Errorf("error during EMQX login %v", string(msg))
  62. }
  63. var loginResp emqxLoginResponse
  64. if err := json.Unmarshal(msg, &loginResp); err != nil {
  65. return "", err
  66. }
  67. return loginResp.Token, nil
  68. }
  69. func SendPullSYN() error {
  70. mqclient, err := setupmqtt_old()
  71. if err != nil {
  72. return err
  73. }
  74. hosts, err := logic.GetAllHosts()
  75. if err != nil {
  76. return err
  77. }
  78. for _, host := range hosts {
  79. host := host
  80. hostUpdate := models.HostUpdate{
  81. Action: models.RequestPull,
  82. Host: host,
  83. }
  84. msg, _ := json.Marshal(hostUpdate)
  85. var encrypted []byte
  86. var encryptErr error
  87. vlt, err := logic.VersionLessThan(host.Version, "v0.30.0")
  88. if err != nil {
  89. slog.Warn("error checking version less than", "warn", err)
  90. continue
  91. }
  92. if vlt {
  93. encrypted, encryptErr = encryptMsg(&host, msg)
  94. if encryptErr != nil {
  95. slog.Warn("error encrypt with encryptMsg", "warn", encryptErr)
  96. continue
  97. }
  98. } else {
  99. zipped, err := compressPayload(msg)
  100. if err != nil {
  101. slog.Warn("error compressing message", "warn", err)
  102. continue
  103. }
  104. // Get server private key and client public key for AES-GCM encryption
  105. trafficKey, trafficErr := logic.RetrievePrivateTrafficKey()
  106. if trafficErr != nil {
  107. slog.Warn("error retrieving traffic key", "warn", trafficErr)
  108. continue
  109. }
  110. serverPrivKey, err := ncutils.ConvertBytesToKey(trafficKey)
  111. if err != nil {
  112. slog.Warn("error converting server private key", "warn", err)
  113. continue
  114. }
  115. clientPubKey, err := ncutils.ConvertBytesToKey(host.TrafficKeyPublic)
  116. if err != nil {
  117. slog.Warn("error converting client public key", "warn", err)
  118. continue
  119. }
  120. encrypted, encryptErr = encryptAESGCM(serverPrivKey, clientPubKey, zipped)
  121. if encryptErr != nil {
  122. slog.Warn("error encrypt with encryptAESGCM", "warn", encryptErr)
  123. continue
  124. }
  125. }
  126. logger.Log(0, "sending pull syn to", host.Name)
  127. mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted)
  128. }
  129. return nil
  130. }
  131. func KickOutClients() error {
  132. authToken, err := getEmqxAuthTokenOld()
  133. if err != nil {
  134. return err
  135. }
  136. hosts, err := logic.GetAllHosts()
  137. if err != nil {
  138. slog.Error("failed to migrate emqx: ", "error", err)
  139. return err
  140. }
  141. for _, host := range hosts {
  142. url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String())
  143. client := &http.Client{}
  144. req, err := http.NewRequest(http.MethodDelete, url, nil)
  145. if err != nil {
  146. slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err)
  147. continue
  148. }
  149. req.Header.Add("Authorization", "Bearer "+authToken)
  150. res, err := client.Do(req)
  151. if err != nil {
  152. slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err)
  153. continue
  154. }
  155. if res.StatusCode != http.StatusNoContent {
  156. slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode)
  157. }
  158. res.Body.Close()
  159. }
  160. return nil
  161. }