emqx_cloud.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package mq
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "strings"
  9. "github.com/gravitl/netmaker/servercfg"
  10. )
  11. type EmqxCloud struct {
  12. URL string
  13. AppID string
  14. AppSecret string
  15. }
  16. type userCreateReq struct {
  17. UserName string `json:"username"`
  18. Password string `json:"password"`
  19. }
  20. type cloudAcl struct {
  21. UserName string `json:"username"`
  22. Topic string `json:"topic"`
  23. Action string `json:"action"`
  24. Access string `json:"access"`
  25. }
  26. func (e *EmqxCloud) GetType() servercfg.Emqxdeploy { return servercfg.EmqxCloudDeploy }
  27. func (e *EmqxCloud) CreateEmqxUser(username, pass string) error {
  28. payload := userCreateReq{
  29. UserName: username,
  30. Password: pass,
  31. }
  32. data, _ := json.Marshal(payload)
  33. client := &http.Client{}
  34. req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/api/auth_username", e.URL), strings.NewReader(string(data)))
  35. if err != nil {
  36. return err
  37. }
  38. req.SetBasicAuth(e.AppID, e.AppSecret)
  39. req.Header.Add("Content-Type", "application/json")
  40. res, err := client.Do(req)
  41. if err != nil {
  42. return err
  43. }
  44. defer res.Body.Close()
  45. body, err := io.ReadAll(res.Body)
  46. if err != nil {
  47. return err
  48. }
  49. if res.StatusCode != http.StatusOK {
  50. return errors.New("request failed " + string(body))
  51. }
  52. return nil
  53. }
  54. func (e *EmqxCloud) CreateEmqxUserforServer() error {
  55. payload := userCreateReq{
  56. UserName: servercfg.GetMqUserName(),
  57. Password: servercfg.GetMqPassword(),
  58. }
  59. data, _ := json.Marshal(payload)
  60. client := &http.Client{}
  61. req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/api/auth_username", e.URL), strings.NewReader(string(data)))
  62. if err != nil {
  63. return err
  64. }
  65. req.SetBasicAuth(e.AppID, e.AppSecret)
  66. req.Header.Add("Content-Type", "application/json")
  67. res, err := client.Do(req)
  68. if err != nil {
  69. return err
  70. }
  71. defer res.Body.Close()
  72. body, err := io.ReadAll(res.Body)
  73. if err != nil {
  74. return err
  75. }
  76. if res.StatusCode != http.StatusOK {
  77. return errors.New("request failed " + string(body))
  78. }
  79. return nil
  80. // add acls
  81. acls := []cloudAcl{
  82. {
  83. UserName: servercfg.GetMqUserName(),
  84. Topic: fmt.Sprintf("update/%s/#", servercfg.GetServer()),
  85. Access: "allow",
  86. Action: "sub",
  87. },
  88. {
  89. UserName: servercfg.GetMqUserName(),
  90. Topic: fmt.Sprintf("host/serverupdate/%s/#", servercfg.GetServer()),
  91. Access: "allow",
  92. Action: "sub",
  93. },
  94. {
  95. UserName: servercfg.GetMqUserName(),
  96. Topic: fmt.Sprintf("signal/%s/#", servercfg.GetServer()),
  97. Access: "allow",
  98. Action: "sub",
  99. },
  100. {
  101. UserName: servercfg.GetMqUserName(),
  102. Topic: fmt.Sprintf("metrics/%s/#", servercfg.GetServer()),
  103. Access: "allow",
  104. Action: "sub",
  105. },
  106. {
  107. UserName: servercfg.GetMqUserName(),
  108. Topic: "peers/host/#",
  109. Access: "allow",
  110. Action: "pub",
  111. },
  112. {
  113. UserName: servercfg.GetMqUserName(),
  114. Topic: "node/update/#",
  115. Access: "allow",
  116. Action: "pub",
  117. },
  118. {
  119. UserName: servercfg.GetMqUserName(),
  120. Topic: "host/update/#",
  121. Access: "allow",
  122. Action: "pub",
  123. },
  124. }
  125. return e.createacls(acls)
  126. }
  127. func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ignore
  128. func (e *EmqxCloud) CreateEmqxDefaultAuthorizer() error { return nil } // ignore
  129. func (e *EmqxCloud) CreateDefaultDenyRule() error {
  130. return nil
  131. }
  132. func (e *EmqxCloud) createacls(acls []cloudAcl) error {
  133. return nil
  134. payload, err := json.Marshal(acls)
  135. if err != nil {
  136. return err
  137. }
  138. client := &http.Client{}
  139. req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/api/acl", e.URL), strings.NewReader(string(payload)))
  140. if err != nil {
  141. return err
  142. }
  143. req.Header.Add("Content-Type", "application/json")
  144. req.SetBasicAuth(e.AppID, e.AppSecret)
  145. res, err := client.Do(req)
  146. if err != nil {
  147. return err
  148. }
  149. defer res.Body.Close()
  150. body, err := io.ReadAll(res.Body)
  151. if err != nil {
  152. return err
  153. }
  154. if res.StatusCode != http.StatusOK {
  155. return errors.New("request failed " + string(body))
  156. }
  157. return nil
  158. }
  159. func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
  160. return nil
  161. acls := []cloudAcl{
  162. {
  163. UserName: hostID,
  164. Topic: fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
  165. Access: "allow",
  166. Action: "sub",
  167. },
  168. {
  169. UserName: hostID,
  170. Topic: fmt.Sprintf("host/update/%s/%s", hostID, serverName),
  171. Access: "allow",
  172. Action: "sub",
  173. },
  174. {
  175. UserName: hostID,
  176. Topic: fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
  177. Access: "allow",
  178. Action: "pub",
  179. },
  180. }
  181. return e.createacls(acls)
  182. }
  183. func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
  184. return nil
  185. acls := []cloudAcl{
  186. {
  187. UserName: hostID,
  188. Topic: fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
  189. Access: "allow",
  190. Action: "sub",
  191. },
  192. {
  193. UserName: hostID,
  194. Topic: fmt.Sprintf("ping/%s/%s", serverName, nodeID),
  195. Access: "allow",
  196. Action: "pubsub",
  197. },
  198. {
  199. UserName: hostID,
  200. Topic: fmt.Sprintf("update/%s/%s", serverName, nodeID),
  201. Access: "allow",
  202. Action: "pubsub",
  203. },
  204. {
  205. UserName: hostID,
  206. Topic: fmt.Sprintf("signal/%s/%s", serverName, nodeID),
  207. Access: "allow",
  208. Action: "pubsub",
  209. },
  210. {
  211. UserName: hostID,
  212. Topic: fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
  213. Access: "allow",
  214. Action: "pubsub",
  215. },
  216. }
  217. return e.createacls(acls)
  218. }
  219. func (e *EmqxCloud) GetUserACL(username string) (*aclObject, error) { return nil, nil } // ununsed on cloud since it doesn't overwrite acls list
  220. func (e *EmqxCloud) DeleteEmqxUser(username string) error {
  221. client := &http.Client{}
  222. req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/api/auth_username/%s", e.URL, username), nil)
  223. if err != nil {
  224. return err
  225. }
  226. req.SetBasicAuth(e.AppID, e.AppSecret)
  227. res, err := client.Do(req)
  228. if err != nil {
  229. return err
  230. }
  231. defer res.Body.Close()
  232. body, err := io.ReadAll(res.Body)
  233. if err != nil {
  234. return err
  235. }
  236. if res.StatusCode != http.StatusOK {
  237. return errors.New("request failed " + string(body))
  238. }
  239. return nil
  240. }