emqx.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. package mq
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "sync"
  9. "github.com/gravitl/netmaker/servercfg"
  10. )
  11. type (
  12. emqxUser struct {
  13. UserID string `json:"user_id"`
  14. Password string `json:"password"`
  15. Admin bool `json:"is_superuser"`
  16. }
  17. emqxLogin struct {
  18. Username string `json:"username"`
  19. Password string `json:"password"`
  20. }
  21. emqxLoginResponse struct {
  22. License struct {
  23. Edition string `json:"edition"`
  24. } `json:"license"`
  25. Token string `json:"token"`
  26. Version string `json:"version"`
  27. }
  28. aclRule struct {
  29. Topic string `json:"topic"`
  30. Permission string `json:"permission"`
  31. Action string `json:"action"`
  32. }
  33. aclObject struct {
  34. Rules []aclRule `json:"rules"`
  35. Username string `json:"username,omitempty"`
  36. }
  37. )
  38. func getEmqxAuthToken() (string, error) {
  39. payload, err := json.Marshal(&emqxLogin{
  40. Username: servercfg.GetMqUserName(),
  41. Password: servercfg.GetMqPassword(),
  42. })
  43. if err != nil {
  44. return "", err
  45. }
  46. resp, err := http.Post(servercfg.GetEmqxRestEndpoint()+"/api/v5/login", "application/json", bytes.NewReader(payload))
  47. if err != nil {
  48. return "", err
  49. }
  50. msg, err := io.ReadAll(resp.Body)
  51. if err != nil {
  52. return "", err
  53. }
  54. if resp.StatusCode != http.StatusOK {
  55. return "", fmt.Errorf("error during EMQX login %v", string(msg))
  56. }
  57. var loginResp emqxLoginResponse
  58. if err := json.Unmarshal(msg, &loginResp); err != nil {
  59. return "", err
  60. }
  61. return loginResp.Token, nil
  62. }
  63. // CreateEmqxUser - creates an EMQX user
  64. func CreateEmqxUser(username, password string, admin bool) error {
  65. token, err := getEmqxAuthToken()
  66. if err != nil {
  67. return err
  68. }
  69. payload, err := json.Marshal(&emqxUser{
  70. UserID: username,
  71. Password: password,
  72. Admin: admin,
  73. })
  74. if err != nil {
  75. return err
  76. }
  77. req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users", bytes.NewReader(payload))
  78. if err != nil {
  79. return err
  80. }
  81. req.Header.Add("content-type", "application/json")
  82. req.Header.Add("authorization", "Bearer "+token)
  83. resp, err := (&http.Client{}).Do(req)
  84. if err != nil {
  85. return err
  86. }
  87. defer resp.Body.Close()
  88. if resp.StatusCode >= 300 {
  89. msg, err := io.ReadAll(resp.Body)
  90. if err != nil {
  91. return err
  92. }
  93. return fmt.Errorf("error creating EMQX user %v", string(msg))
  94. }
  95. return nil
  96. }
  97. // DeleteEmqxUser - deletes an EMQX user
  98. func DeleteEmqxUser(username string) error {
  99. token, err := getEmqxAuthToken()
  100. if err != nil {
  101. return err
  102. }
  103. req, err := http.NewRequest(http.MethodDelete, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users/"+username, nil)
  104. if err != nil {
  105. return err
  106. }
  107. req.Header.Add("authorization", "Bearer "+token)
  108. resp, err := (&http.Client{}).Do(req)
  109. if err != nil {
  110. return err
  111. }
  112. defer resp.Body.Close()
  113. if resp.StatusCode >= 300 {
  114. msg, err := io.ReadAll(resp.Body)
  115. if err != nil {
  116. return err
  117. }
  118. return fmt.Errorf("error deleting EMQX user %v", string(msg))
  119. }
  120. return nil
  121. }
  122. // CreateEmqxDefaultAuthenticator - creates a default authenticator based on password and using EMQX's built in database as storage
  123. func CreateEmqxDefaultAuthenticator() error {
  124. token, err := getEmqxAuthToken()
  125. if err != nil {
  126. return err
  127. }
  128. payload, err := json.Marshal(&struct {
  129. Mechanism string `json:"mechanism"`
  130. Backend string `json:"backend"`
  131. UserIDType string `json:"user_id_type"`
  132. }{Mechanism: "password_based", Backend: "built_in_database", UserIDType: "username"})
  133. if err != nil {
  134. return err
  135. }
  136. req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication", bytes.NewReader(payload))
  137. if err != nil {
  138. return err
  139. }
  140. req.Header.Add("content-type", "application/json")
  141. req.Header.Add("authorization", "Bearer "+token)
  142. resp, err := (&http.Client{}).Do(req)
  143. if err != nil {
  144. return err
  145. }
  146. defer resp.Body.Close()
  147. if resp.StatusCode != http.StatusOK {
  148. msg, err := io.ReadAll(resp.Body)
  149. if err != nil {
  150. return err
  151. }
  152. return fmt.Errorf("error creating default EMQX authenticator %v", string(msg))
  153. }
  154. return nil
  155. }
  156. // CreateEmqxDefaultAuthorizer - creates a default ACL authorization mechanism based on the built in database
  157. func CreateEmqxDefaultAuthorizer() error {
  158. token, err := getEmqxAuthToken()
  159. if err != nil {
  160. return err
  161. }
  162. payload, err := json.Marshal(&struct {
  163. Enable bool `json:"enable"`
  164. Type string `json:"type"`
  165. }{Enable: true, Type: "built_in_database"})
  166. if err != nil {
  167. return err
  168. }
  169. req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources", bytes.NewReader(payload))
  170. if err != nil {
  171. return err
  172. }
  173. req.Header.Add("content-type", "application/json")
  174. req.Header.Add("authorization", "Bearer "+token)
  175. resp, err := (&http.Client{}).Do(req)
  176. if err != nil {
  177. return err
  178. }
  179. defer resp.Body.Close()
  180. if resp.StatusCode != http.StatusNoContent {
  181. msg, err := io.ReadAll(resp.Body)
  182. if err != nil {
  183. return err
  184. }
  185. return fmt.Errorf("error creating default EMQX ACL authorization mechanism %v", string(msg))
  186. }
  187. return nil
  188. }
  189. // GetUserACL - returns ACL rules by username
  190. func GetUserACL(username string) (*aclObject, error) {
  191. token, err := getEmqxAuthToken()
  192. if err != nil {
  193. return nil, err
  194. }
  195. req, err := http.NewRequest(http.MethodGet, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+username, nil)
  196. if err != nil {
  197. return nil, err
  198. }
  199. req.Header.Add("content-type", "application/json")
  200. req.Header.Add("authorization", "Bearer "+token)
  201. resp, err := (&http.Client{}).Do(req)
  202. if err != nil {
  203. return nil, err
  204. }
  205. defer resp.Body.Close()
  206. response, err := io.ReadAll(resp.Body)
  207. if err != nil {
  208. return nil, err
  209. }
  210. if resp.StatusCode != http.StatusOK {
  211. return nil, fmt.Errorf("error fetching ACL rules %v", string(response))
  212. }
  213. body := new(aclObject)
  214. if err := json.Unmarshal(response, body); err != nil {
  215. return nil, err
  216. }
  217. return body, nil
  218. }
  219. // CreateDefaultDenyRule - creates a rule to deny access to all topics for all users by default
  220. // to allow user access to topics use the `mq.CreateUserAccessRule` function
  221. func CreateDefaultDenyRule() error {
  222. token, err := getEmqxAuthToken()
  223. if err != nil {
  224. return err
  225. }
  226. payload, err := json.Marshal(&aclObject{Rules: []aclRule{{Topic: "#", Permission: "deny", Action: "all"}}})
  227. if err != nil {
  228. return err
  229. }
  230. req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/all", bytes.NewReader(payload))
  231. if err != nil {
  232. return err
  233. }
  234. req.Header.Add("content-type", "application/json")
  235. req.Header.Add("authorization", "Bearer "+token)
  236. resp, err := (&http.Client{}).Do(req)
  237. if err != nil {
  238. return err
  239. }
  240. defer resp.Body.Close()
  241. if resp.StatusCode != http.StatusNoContent {
  242. msg, err := io.ReadAll(resp.Body)
  243. if err != nil {
  244. return err
  245. }
  246. return fmt.Errorf("error creating default ACL rules %v", string(msg))
  247. }
  248. return nil
  249. }
  250. // CreateHostACL - create host ACL rules
  251. func CreateHostACL(hostID, serverName string) error {
  252. token, err := getEmqxAuthToken()
  253. if err != nil {
  254. return err
  255. }
  256. payload, err := json.Marshal(&aclObject{
  257. Username: hostID,
  258. Rules: []aclRule{
  259. {
  260. Topic: fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
  261. Permission: "allow",
  262. Action: "all",
  263. },
  264. {
  265. Topic: fmt.Sprintf("host/update/%s/%s", hostID, serverName),
  266. Permission: "allow",
  267. Action: "all",
  268. },
  269. {
  270. Topic: fmt.Sprintf("dns/all/%s/%s", hostID, serverName),
  271. Permission: "allow",
  272. Action: "all",
  273. },
  274. {
  275. Topic: fmt.Sprintf("dns/update/%s/%s", hostID, serverName),
  276. Permission: "allow",
  277. Action: "all",
  278. },
  279. {
  280. Topic: fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
  281. Permission: "allow",
  282. Action: "all",
  283. },
  284. },
  285. })
  286. if err != nil {
  287. return err
  288. }
  289. req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
  290. if err != nil {
  291. return err
  292. }
  293. req.Header.Add("content-type", "application/json")
  294. req.Header.Add("authorization", "Bearer "+token)
  295. resp, err := (&http.Client{}).Do(req)
  296. if err != nil {
  297. return err
  298. }
  299. defer resp.Body.Close()
  300. if resp.StatusCode != http.StatusNoContent {
  301. msg, err := io.ReadAll(resp.Body)
  302. if err != nil {
  303. return err
  304. }
  305. return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
  306. }
  307. return nil
  308. }
  309. // a lock required for preventing simultaneous updates to the same ACL object leading to overwriting each other
  310. // might occur when multiple nodes belonging to the same host are created at the same time
  311. var nodeAclMux sync.Mutex
  312. // AppendNodeUpdateACL - adds ACL rule for subscribing to node updates for a node ID
  313. func AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
  314. nodeAclMux.Lock()
  315. defer nodeAclMux.Unlock()
  316. token, err := getEmqxAuthToken()
  317. if err != nil {
  318. return err
  319. }
  320. aclObject, err := GetUserACL(hostID)
  321. if err != nil {
  322. return err
  323. }
  324. aclObject.Rules = append(aclObject.Rules, []aclRule{
  325. {
  326. Topic: fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
  327. Permission: "allow",
  328. Action: "subscribe",
  329. },
  330. {
  331. Topic: fmt.Sprintf("ping/%s/%s", serverName, nodeID),
  332. Permission: "allow",
  333. Action: "all",
  334. },
  335. {
  336. Topic: fmt.Sprintf("update/%s/%s", serverName, nodeID),
  337. Permission: "allow",
  338. Action: "all",
  339. },
  340. {
  341. Topic: fmt.Sprintf("signal/%s/%s", serverName, nodeID),
  342. Permission: "allow",
  343. Action: "all",
  344. },
  345. {
  346. Topic: fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
  347. Permission: "allow",
  348. Action: "all",
  349. },
  350. }...)
  351. payload, err := json.Marshal(aclObject)
  352. if err != nil {
  353. return err
  354. }
  355. req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
  356. if err != nil {
  357. return err
  358. }
  359. req.Header.Add("content-type", "application/json")
  360. req.Header.Add("authorization", "Bearer "+token)
  361. resp, err := (&http.Client{}).Do(req)
  362. if err != nil {
  363. return err
  364. }
  365. defer resp.Body.Close()
  366. if resp.StatusCode != http.StatusNoContent {
  367. msg, err := io.ReadAll(resp.Body)
  368. if err != nil {
  369. return err
  370. }
  371. return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
  372. }
  373. return nil
  374. }