2
0

emqx_cloud.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package mq
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "strings"
  9. "github.com/gravitl/netmaker/servercfg"
  10. )
  11. type EmqxCloud struct {
  12. URL string
  13. AppID string
  14. AppSecret string
  15. }
  16. type userCreateReq struct {
  17. UserName string `json:"username"`
  18. Password string `json:"password"`
  19. }
  20. type cloudAcl struct {
  21. UserName string `json:"username"`
  22. Topic string `json:"topic"`
  23. Action string `json:"action"`
  24. Access string `json:"access"`
  25. }
  26. func (e *EmqxCloud) GetType() servercfg.Emqxdeploy { return servercfg.EmqxCloudDeploy }
  27. func (e *EmqxCloud) CreateEmqxUser(username, pass string) error {
  28. payload := userCreateReq{
  29. UserName: username,
  30. Password: pass,
  31. }
  32. data, _ := json.Marshal(payload)
  33. client := &http.Client{}
  34. req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/api/auth_username", e.URL), strings.NewReader(string(data)))
  35. if err != nil {
  36. return err
  37. }
  38. req.SetBasicAuth(e.AppID, e.AppSecret)
  39. req.Header.Add("Content-Type", "application/json")
  40. res, err := client.Do(req)
  41. if err != nil {
  42. return err
  43. }
  44. defer res.Body.Close()
  45. body, err := io.ReadAll(res.Body)
  46. if err != nil {
  47. return err
  48. }
  49. if res.StatusCode != http.StatusOK {
  50. return errors.New("request failed " + string(body))
  51. }
  52. return nil
  53. }
  54. func (e *EmqxCloud) CreateEmqxUserforServer() error {
  55. payload := userCreateReq{
  56. UserName: servercfg.GetMqUserName(),
  57. Password: servercfg.GetMqPassword(),
  58. }
  59. data, _ := json.Marshal(payload)
  60. client := &http.Client{}
  61. req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/api/auth_username", e.URL), strings.NewReader(string(data)))
  62. if err != nil {
  63. return err
  64. }
  65. req.SetBasicAuth(e.AppID, e.AppSecret)
  66. req.Header.Add("Content-Type", "application/json")
  67. res, err := client.Do(req)
  68. if err != nil {
  69. return err
  70. }
  71. defer res.Body.Close()
  72. body, err := io.ReadAll(res.Body)
  73. if err != nil {
  74. return err
  75. }
  76. if res.StatusCode != http.StatusOK {
  77. return errors.New("request failed " + string(body))
  78. }
  79. // add acls
  80. acls := []cloudAcl{
  81. {
  82. UserName: servercfg.GetMqUserName(),
  83. Topic: fmt.Sprintf("update/%s/#", servercfg.GetServer()),
  84. Access: "allow",
  85. Action: "sub",
  86. },
  87. {
  88. UserName: servercfg.GetMqUserName(),
  89. Topic: fmt.Sprintf("host/serverupdate/%s/#", servercfg.GetServer()),
  90. Access: "allow",
  91. Action: "sub",
  92. },
  93. {
  94. UserName: servercfg.GetMqUserName(),
  95. Topic: fmt.Sprintf("signal/%s/#", servercfg.GetServer()),
  96. Access: "allow",
  97. Action: "sub",
  98. },
  99. {
  100. UserName: servercfg.GetMqUserName(),
  101. Topic: fmt.Sprintf("metrics/%s/#", servercfg.GetServer()),
  102. Access: "allow",
  103. Action: "sub",
  104. },
  105. {
  106. UserName: servercfg.GetMqUserName(),
  107. Topic: "peers/host/#",
  108. Access: "allow",
  109. Action: "pub",
  110. },
  111. {
  112. UserName: servercfg.GetMqUserName(),
  113. Topic: "node/update/#",
  114. Access: "allow",
  115. Action: "pub",
  116. },
  117. {
  118. UserName: servercfg.GetMqUserName(),
  119. Topic: "host/update/#",
  120. Access: "allow",
  121. Action: "pub",
  122. },
  123. }
  124. return e.createacls(acls)
  125. }
  126. func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ignore
  127. func (e *EmqxCloud) CreateEmqxDefaultAuthorizer() error { return nil } // ignore
  128. func (e *EmqxCloud) CreateDefaultDenyRule() error {
  129. return nil
  130. }
  131. func (e *EmqxCloud) createacls(acls []cloudAcl) error {
  132. payload, err := json.Marshal(acls)
  133. if err != nil {
  134. return err
  135. }
  136. client := &http.Client{}
  137. req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/api/acl", e.URL), strings.NewReader(string(payload)))
  138. if err != nil {
  139. return err
  140. }
  141. req.Header.Add("Content-Type", "application/json")
  142. req.SetBasicAuth(e.AppID, e.AppSecret)
  143. res, err := client.Do(req)
  144. if err != nil {
  145. return err
  146. }
  147. defer res.Body.Close()
  148. body, err := io.ReadAll(res.Body)
  149. if err != nil {
  150. return err
  151. }
  152. if res.StatusCode != http.StatusOK {
  153. return errors.New("request failed " + string(body))
  154. }
  155. return nil
  156. }
  157. func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
  158. acls := []cloudAcl{
  159. {
  160. UserName: hostID,
  161. Topic: fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
  162. Access: "allow",
  163. Action: "sub",
  164. },
  165. {
  166. UserName: hostID,
  167. Topic: fmt.Sprintf("host/update/%s/%s", hostID, serverName),
  168. Access: "allow",
  169. Action: "sub",
  170. },
  171. {
  172. UserName: hostID,
  173. Topic: fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
  174. Access: "allow",
  175. Action: "pub",
  176. },
  177. }
  178. return e.createacls(acls)
  179. }
  180. func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
  181. acls := []cloudAcl{
  182. {
  183. UserName: hostID,
  184. Topic: fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
  185. Access: "allow",
  186. Action: "sub",
  187. },
  188. {
  189. UserName: hostID,
  190. Topic: fmt.Sprintf("ping/%s/%s", serverName, nodeID),
  191. Access: "allow",
  192. Action: "pubsub",
  193. },
  194. {
  195. UserName: hostID,
  196. Topic: fmt.Sprintf("update/%s/%s", serverName, nodeID),
  197. Access: "allow",
  198. Action: "pubsub",
  199. },
  200. {
  201. UserName: hostID,
  202. Topic: fmt.Sprintf("signal/%s/%s", serverName, nodeID),
  203. Access: "allow",
  204. Action: "pubsub",
  205. },
  206. {
  207. UserName: hostID,
  208. Topic: fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
  209. Access: "allow",
  210. Action: "pubsub",
  211. },
  212. }
  213. return e.createacls(acls)
  214. }
  215. func (e *EmqxCloud) GetUserACL(username string) (*aclObject, error) { return nil, nil } // ununsed on cloud since it doesn't overwrite acls list
  216. func (e *EmqxCloud) DeleteEmqxUser(username string) error {
  217. client := &http.Client{}
  218. req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/api/auth_username/%s", e.URL, username), nil)
  219. if err != nil {
  220. return err
  221. }
  222. req.SetBasicAuth(e.AppID, e.AppSecret)
  223. res, err := client.Do(req)
  224. if err != nil {
  225. return err
  226. }
  227. defer res.Body.Close()
  228. body, err := io.ReadAll(res.Body)
  229. if err != nil {
  230. return err
  231. }
  232. if res.StatusCode != http.StatusOK {
  233. return errors.New("request failed " + string(body))
  234. }
  235. return nil
  236. }