emqx_cloud.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package mq
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "strings"
  9. "github.com/gravitl/netmaker/servercfg"
  10. )
  11. type EmqxCloud struct {
  12. URL string
  13. AppID string
  14. AppSecret string
  15. }
  16. type userCreateReq struct {
  17. UserName string `json:"username"`
  18. Password string `json:"password"`
  19. }
  20. type cloudAcl struct {
  21. UserName string `json:"username"`
  22. Topic string `json:"topic"`
  23. Action string `json:"action"`
  24. Access string `json:"access"`
  25. }
  26. func (e *EmqxCloud) GetType() servercfg.Emqxdeploy { return servercfg.EmqxCloudDeploy }
  27. func (e *EmqxCloud) CreateEmqxUser(username, pass string, admin bool) error {
  28. payload := userCreateReq{
  29. UserName: username,
  30. Password: pass,
  31. }
  32. data, _ := json.Marshal(payload)
  33. client := &http.Client{}
  34. req, err := http.NewRequest(http.MethodPost, e.URL, strings.NewReader(string(data)))
  35. if err != nil {
  36. return err
  37. }
  38. req.SetBasicAuth(e.AppID, e.AppSecret)
  39. req.Header.Add("Content-Type", "application/json")
  40. res, err := client.Do(req)
  41. if err != nil {
  42. return err
  43. }
  44. defer res.Body.Close()
  45. body, err := io.ReadAll(res.Body)
  46. if err != nil {
  47. return err
  48. }
  49. if res.StatusCode != http.StatusOK {
  50. return errors.New("request failed " + string(body))
  51. }
  52. return nil
  53. }
  54. func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ignore
  55. func (e *EmqxCloud) CreateEmqxDefaultAuthorizer() error { return nil } // ignore
  56. func (e *EmqxCloud) CreateDefaultDenyRule() error {
  57. return nil
  58. }
  59. func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
  60. acls := []cloudAcl{
  61. {
  62. UserName: hostID,
  63. Topic: fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
  64. Access: "allow",
  65. Action: "pubsub",
  66. },
  67. {
  68. UserName: hostID,
  69. Topic: fmt.Sprintf("host/update/%s/%s", hostID, serverName),
  70. Access: "allow",
  71. Action: "pubsub",
  72. },
  73. {
  74. UserName: hostID,
  75. Topic: fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
  76. Access: "allow",
  77. Action: "pubsub",
  78. },
  79. }
  80. payload, err := json.Marshal(acls)
  81. if err != nil {
  82. return err
  83. }
  84. client := &http.Client{}
  85. req, err := http.NewRequest(http.MethodPost, e.URL, strings.NewReader(string(payload)))
  86. if err != nil {
  87. return err
  88. }
  89. req.Header.Add("Content-Type", "application/json")
  90. req.SetBasicAuth(e.AppID, e.AppSecret)
  91. res, err := client.Do(req)
  92. if err != nil {
  93. return err
  94. }
  95. defer res.Body.Close()
  96. body, err := io.ReadAll(res.Body)
  97. if err != nil {
  98. return err
  99. }
  100. if res.StatusCode != http.StatusOK {
  101. return errors.New("request failed " + string(body))
  102. }
  103. return nil
  104. }
  105. func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
  106. acls := []cloudAcl{
  107. {
  108. UserName: hostID,
  109. Topic: fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
  110. Access: "allow",
  111. Action: "sub",
  112. },
  113. {
  114. UserName: hostID,
  115. Topic: fmt.Sprintf("ping/%s/%s", serverName, nodeID),
  116. Access: "allow",
  117. Action: "pubsub",
  118. },
  119. {
  120. UserName: hostID,
  121. Topic: fmt.Sprintf("update/%s/%s", serverName, nodeID),
  122. Access: "allow",
  123. Action: "pubsub",
  124. },
  125. {
  126. UserName: hostID,
  127. Topic: fmt.Sprintf("signal/%s/%s", serverName, nodeID),
  128. Access: "allow",
  129. Action: "pubsub",
  130. },
  131. {
  132. UserName: hostID,
  133. Topic: fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
  134. Access: "allow",
  135. Action: "pubsub",
  136. },
  137. }
  138. payload, err := json.Marshal(acls)
  139. if err != nil {
  140. return err
  141. }
  142. client := &http.Client{}
  143. req, err := http.NewRequest(http.MethodPost, e.URL, strings.NewReader(string(payload)))
  144. if err != nil {
  145. return err
  146. }
  147. req.Header.Add("Content-Type", "application/json")
  148. req.SetBasicAuth(e.AppID, e.AppSecret)
  149. res, err := client.Do(req)
  150. if err != nil {
  151. return err
  152. }
  153. defer res.Body.Close()
  154. body, err := io.ReadAll(res.Body)
  155. if err != nil {
  156. return err
  157. }
  158. if res.StatusCode != http.StatusOK {
  159. return errors.New("request failed " + string(body))
  160. }
  161. return nil
  162. }
  163. func (e *EmqxCloud) GetUserACL(username string) (*aclObject, error) { return nil, nil } // ununsed on cloud since it doesn't overwrite acls list
  164. func (e *EmqxCloud) DeleteEmqxUser(username string) error {
  165. client := &http.Client{}
  166. req, err := http.NewRequest(http.MethodDelete, e.URL, nil)
  167. if err != nil {
  168. return err
  169. }
  170. req.SetBasicAuth(e.AppID, e.AppSecret)
  171. res, err := client.Do(req)
  172. if err != nil {
  173. return err
  174. }
  175. defer res.Body.Close()
  176. body, err := io.ReadAll(res.Body)
  177. if err != nil {
  178. return err
  179. }
  180. if res.StatusCode != http.StatusOK {
  181. return errors.New("request failed " + string(body))
  182. }
  183. return nil
  184. }