emqx_cloud.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package mq
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "strings"
  9. "github.com/gravitl/netmaker/servercfg"
  10. )
  11. type EmqxCloud struct {
  12. URL string
  13. AppID string
  14. AppSecret string
  15. }
  16. type userCreateReq struct {
  17. UserName string `json:"username"`
  18. Password string `json:"password"`
  19. }
  20. type cloudAcl struct {
  21. UserName string `json:"username"`
  22. Topic string `json:"topic"`
  23. Action string `json:"action"`
  24. Access string `json:"access"`
  25. }
  26. func (e *EmqxCloud) GetType() servercfg.Emqxdeploy { return servercfg.EmqxCloudDeploy }
  27. func (e *EmqxCloud) CreateEmqxUser(username, pass string, admin bool) error {
  28. payload := userCreateReq{
  29. UserName: username,
  30. Password: pass,
  31. }
  32. data, _ := json.Marshal(payload)
  33. client := &http.Client{}
  34. req, err := http.NewRequest(http.MethodPost, e.URL, strings.NewReader(string(data)))
  35. if err != nil {
  36. return err
  37. }
  38. req.SetBasicAuth(e.AppID, e.AppSecret)
  39. req.Header.Add("Content-Type", "application/json")
  40. res, err := client.Do(req)
  41. if err != nil {
  42. return err
  43. }
  44. defer res.Body.Close()
  45. body, err := io.ReadAll(res.Body)
  46. if err != nil {
  47. return err
  48. }
  49. if res.StatusCode != http.StatusOK {
  50. return errors.New("request failed " + string(body))
  51. }
  52. return nil
  53. }
  54. func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ignore
  55. func (e *EmqxCloud) CreateEmqxDefaultAuthorizer() error { return nil } // ignore
  56. func (e *EmqxCloud) CreateDefaultDenyRule() error {
  57. return nil
  58. }
  59. func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
  60. acls := []cloudAcl{
  61. {
  62. UserName: hostID,
  63. Topic: fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
  64. Access: "allow",
  65. Action: "pubsub",
  66. },
  67. {
  68. UserName: hostID,
  69. Topic: fmt.Sprintf("host/update/%s/%s", hostID, serverName),
  70. Access: "allow",
  71. Action: "pubsub",
  72. },
  73. {
  74. UserName: hostID,
  75. Topic: fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
  76. Access: "allow",
  77. Action: "pubsub",
  78. },
  79. }
  80. payload, err := json.Marshal(acls)
  81. if err != nil {
  82. return err
  83. }
  84. client := &http.Client{}
  85. req, err := http.NewRequest(http.MethodPost, e.URL, strings.NewReader(string(payload)))
  86. if err != nil {
  87. return err
  88. }
  89. req.Header.Add("Content-Type", "application/json")
  90. req.SetBasicAuth(e.AppID, e.AppSecret)
  91. res, err := client.Do(req)
  92. if err != nil {
  93. return err
  94. }
  95. defer res.Body.Close()
  96. body, err := io.ReadAll(res.Body)
  97. if err != nil {
  98. return err
  99. }
  100. if res.StatusCode != http.StatusOK {
  101. return errors.New("request failed " + string(body))
  102. }
  103. return nil
  104. }
  105. func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
  106. acls := []cloudAcl{
  107. {
  108. Topic: fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
  109. Permission: "allow",
  110. Action: "subscribe",
  111. },
  112. {
  113. Topic: fmt.Sprintf("ping/%s/%s", serverName, nodeID),
  114. Permission: "allow",
  115. Action: "all",
  116. },
  117. {
  118. Topic: fmt.Sprintf("update/%s/%s", serverName, nodeID),
  119. Permission: "allow",
  120. Action: "all",
  121. },
  122. {
  123. Topic: fmt.Sprintf("signal/%s/%s", serverName, nodeID),
  124. Permission: "allow",
  125. Action: "all",
  126. },
  127. {
  128. Topic: fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
  129. Permission: "allow",
  130. Action: "all",
  131. },
  132. }
  133. return nil
  134. }
  135. func (e *EmqxCloud) GetUserACL(username string) ([]cloudAcl, error) { return nil, nil }
  136. func (e *EmqxCloud) DeleteEmqxUser(username string) error {
  137. client := &http.Client{}
  138. req, err := http.NewRequest(http.MethodDelete, e.URL, nil)
  139. if err != nil {
  140. return err
  141. }
  142. req.SetBasicAuth(e.AppID, e.AppSecret)
  143. res, err := client.Do(req)
  144. if err != nil {
  145. return err
  146. }
  147. defer res.Body.Close()
  148. body, err := io.ReadAll(res.Body)
  149. if err != nil {
  150. return err
  151. }
  152. if res.StatusCode != http.StatusOK {
  153. return errors.New("request failed " + string(body))
  154. }
  155. return nil
  156. }