publishers.go 16 KB


  1. package mq
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "time"
  7. "github.com/gravitl/netmaker/logger"
  8. "github.com/gravitl/netmaker/logic"
  9. "github.com/gravitl/netmaker/models"
  10. "github.com/gravitl/netmaker/servercfg"
  11. "golang.org/x/exp/slog"
  12. )
  13. // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
  14. func PublishPeerUpdate() error {
  15. if !servercfg.IsMessageQueueBackend() {
  16. return nil
  17. }
  18. hosts, err := logic.GetAllHosts()
  19. if err != nil {
  20. logger.Log(1, "err getting all hosts", err.Error())
  21. return err
  22. }
  23. allNodes, err := logic.GetAllNodes()
  24. if err != nil {
  25. return err
  26. }
  27. for _, host := range hosts {
  28. host := host
  29. if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil); err != nil {
  30. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  31. }
  32. }
  33. return err
  34. }
  35. // PublishDeletedNodePeerUpdate --- determines and publishes a peer update
  36. // to all the hosts with a deleted node to account for
  37. func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
  38. if !servercfg.IsMessageQueueBackend() {
  39. return nil
  40. }
  41. hosts, err := logic.GetAllHosts()
  42. if err != nil {
  43. logger.Log(1, "err getting all hosts", err.Error())
  44. return err
  45. }
  46. allNodes, err := logic.GetAllNodes()
  47. if err != nil {
  48. return err
  49. }
  50. for _, host := range hosts {
  51. host := host
  52. if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil); err != nil {
  53. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  54. }
  55. }
  56. return err
  57. }
  58. // PublishDeletedClientPeerUpdate --- determines and publishes a peer update
  59. // to all the hosts with a deleted ext client to account for
  60. func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
  61. if !servercfg.IsMessageQueueBackend() {
  62. return nil
  63. }
  64. hosts, err := logic.GetAllHosts()
  65. if err != nil {
  66. logger.Log(1, "err getting all hosts", err.Error())
  67. return err
  68. }
  69. nodes, err := logic.GetAllNodes()
  70. if err != nil {
  71. return err
  72. }
  73. for _, host := range hosts {
  74. host := host
  75. if host.OS != models.OS_Types.IoT {
  76. if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}); err != nil {
  77. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  78. }
  79. }
  80. }
  81. return err
  82. }
  83. // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
  84. func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient) error {
  85. peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
  86. if err != nil {
  87. return err
  88. }
  89. if len(peerUpdate.Peers) == 0 { // no peers to send
  90. return nil
  91. }
  92. data, err := json.Marshal(&peerUpdate)
  93. if err != nil {
  94. return err
  95. }
  96. return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
  97. }
  98. // NodeUpdate -- publishes a node update
  99. func NodeUpdate(node *models.Node) error {
  100. host, err := logic.GetHost(node.HostID.String())
  101. if err != nil {
  102. return nil
  103. }
  104. if !servercfg.IsMessageQueueBackend() {
  105. return nil
  106. }
  107. logger.Log(3, "publishing node update to "+node.ID.String())
  108. //if len(node.NetworkSettings.AccessKeys) > 0 {
  109. //node.NetworkSettings.AccessKeys = []models.AccessKey{} // not to be sent (don't need to spread access keys around the network; we need to know how to reach other nodes, not become them)
  110. //}
  111. data, err := json.Marshal(node)
  112. if err != nil {
  113. logger.Log(2, "error marshalling node update ", err.Error())
  114. return err
  115. }
  116. if err = publish(host, fmt.Sprintf("node/update/%s/%s", node.Network, node.ID), data); err != nil {
  117. logger.Log(2, "error publishing node update to peer ", node.ID.String(), err.Error())
  118. return err
  119. }
  120. return nil
  121. }
  122. // HostUpdate -- publishes a host update to clients
  123. func HostUpdate(hostUpdate *models.HostUpdate) error {
  124. if !servercfg.IsMessageQueueBackend() {
  125. return nil
  126. }
  127. logger.Log(3, "publishing host update to "+hostUpdate.Host.ID.String())
  128. data, err := json.Marshal(hostUpdate)
  129. if err != nil {
  130. logger.Log(2, "error marshalling node update ", err.Error())
  131. return err
  132. }
  133. if err = publish(&hostUpdate.Host, fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), data); err != nil {
  134. logger.Log(2, "error publishing host update to", hostUpdate.Host.ID.String(), err.Error())
  135. return err
  136. }
  137. return nil
  138. }
  139. // ServerStartNotify - notifies all non server nodes to pull changes after a restart
  140. func ServerStartNotify() error {
  141. nodes, err := logic.GetAllNodes()
  142. if err != nil {
  143. return err
  144. }
  145. for i := range nodes {
  146. nodes[i].Action = models.NODE_FORCE_UPDATE
  147. if err = NodeUpdate(&nodes[i]); err != nil {
  148. logger.Log(1, "error when notifying node", nodes[i].ID.String(), "of a server startup")
  149. }
  150. }
  151. return nil
  152. }
  153. // PublishDNSUpdatev1 - published dns updates to all nodes passed
  154. func PublishDNSUpdatev1(network string, dns models.DNSUpdate, nodes []models.Node) error {
  155. for _, node := range nodes {
  156. host, err := logic.GetHost(node.HostID.String())
  157. if err != nil {
  158. logger.Log(0, "error retrieving host for dns update", node.HostID.String(), err.Error())
  159. continue
  160. }
  161. data, err := json.Marshal(dns)
  162. if err != nil {
  163. logger.Log(0, "failed to encode dns data for node", node.ID.String(), err.Error())
  164. }
  165. if err := publish(host, "dns/update/"+host.ID.String()+"/"+servercfg.GetServer(), data); err != nil {
  166. logger.Log(0, "error publishing dns update to host", host.ID.String(), err.Error())
  167. continue
  168. }
  169. logger.Log(3, "published dns update to host", host.ID.String())
  170. }
  171. return nil
  172. }
  173. // PublishDNSUpdate publishes a dns update to all nodes on a network
  174. func PublishDNSUpdate(network string, dns models.DNSUpdate) error {
  175. nodes, err := logic.GetNetworkNodes(network)
  176. if err != nil {
  177. return err
  178. }
  179. for _, node := range nodes {
  180. host, err := logic.GetHost(node.HostID.String())
  181. if err != nil {
  182. logger.Log(0, "error retrieving host for dns update", node.HostID.String(), err.Error())
  183. continue
  184. }
  185. data, err := json.Marshal(dns)
  186. if err != nil {
  187. logger.Log(0, "failed to encode dns data for node", node.ID.String(), err.Error())
  188. }
  189. if err := publish(host, "dns/update/"+host.ID.String()+"/"+servercfg.GetServer(), data); err != nil {
  190. logger.Log(0, "error publishing dns update to host", host.ID.String(), err.Error())
  191. continue
  192. }
  193. logger.Log(3, "published dns update to host", host.ID.String())
  194. }
  195. return nil
  196. }
  197. // PublishAllDNS publishes an array of dns updates (ip / host.network) for each peer to a node joining a network
  198. func PublishAllDNS(newnode *models.Node) error {
  199. alldns := []models.DNSUpdate{}
  200. newnodeHost, err := logic.GetHost(newnode.HostID.String())
  201. if err != nil {
  202. return fmt.Errorf("error retrieving host for dns update %w", err)
  203. }
  204. alldns = append(alldns, getNodeDNS(newnode.Network)...)
  205. alldns = append(alldns, getExtClientDNS(newnode.Network)...)
  206. alldns = append(alldns, getCustomDNS(newnode.Network)...)
  207. data, err := json.Marshal(alldns)
  208. if err != nil {
  209. return fmt.Errorf("error encoding dns data %w", err)
  210. }
  211. if err := publish(newnodeHost, "dns/all/"+newnodeHost.ID.String()+"/"+servercfg.GetServer(), data); err != nil {
  212. return fmt.Errorf("error publishing full dns update to %s, %w", newnodeHost.ID.String(), err)
  213. }
  214. logger.Log(3, "published full dns update to %s", newnodeHost.ID.String())
  215. return nil
  216. }
  217. // PublishMqUpdatesForDeletedNode - published all the required updates for deleted node
  218. func PublishMqUpdatesForDeletedNode(node models.Node, sendNodeUpdate bool, gwClients []models.ExtClient) {
  219. // notify of peer change
  220. node.PendingDelete = true
  221. node.Action = models.NODE_DELETE
  222. if sendNodeUpdate {
  223. if err := NodeUpdate(&node); err != nil {
  224. slog.Error("error publishing node update to node", "node", node.ID, "error", err)
  225. }
  226. }
  227. if err := PublishDeletedNodePeerUpdate(&node); err != nil {
  228. logger.Log(1, "error publishing peer update ", err.Error())
  229. }
  230. host, err := logic.GetHost(node.HostID.String())
  231. if err != nil {
  232. logger.Log(1, "failed to retrieve host for node", node.ID.String(), err.Error())
  233. }
  234. if err := PublishDNSDelete(&node, host); err != nil {
  235. logger.Log(1, "error publishing dns update", err.Error())
  236. }
  237. if err := PublishDeleteAllExtclientsDNS(node.Network, gwClients); err != nil {
  238. logger.Log(1, "error publishing ext dns update", err.Error())
  239. }
  240. }
  241. // PublishDNSDelete publish a dns update deleting a node to all hosts on a network
  242. func PublishDNSDelete(node *models.Node, host *models.Host) error {
  243. dns := models.DNSUpdate{
  244. Action: models.DNSDeleteByIP,
  245. Name: host.Name + "." + node.Network,
  246. }
  247. if node.Address.IP != nil {
  248. dns.Address = node.Address.IP.String()
  249. if err := PublishDNSUpdate(node.Network, dns); err != nil {
  250. return fmt.Errorf("dns update node deletion %w", err)
  251. }
  252. }
  253. if node.Address6.IP != nil {
  254. dns.Address = node.Address6.IP.String()
  255. if err := PublishDNSUpdate(node.Network, dns); err != nil {
  256. return fmt.Errorf("dns update node deletion %w", err)
  257. }
  258. }
  259. return nil
  260. }
  261. // PublishReplaceDNS publish a dns update to replace a dns entry on all hosts in network
  262. func PublishReplaceDNS(oldNode, newNode *models.Node, host *models.Host) error {
  263. dns := models.DNSUpdate{
  264. Action: models.DNSReplaceIP,
  265. Name: host.Name + "." + oldNode.Network,
  266. }
  267. if !oldNode.Address.IP.Equal(newNode.Address.IP) {
  268. dns.Address = oldNode.Address.IP.String()
  269. dns.NewAddress = newNode.Address.IP.String()
  270. if err := PublishDNSUpdate(oldNode.Network, dns); err != nil {
  271. return err
  272. }
  273. }
  274. if !oldNode.Address6.IP.Equal(newNode.Address6.IP) {
  275. dns.Address = oldNode.Address6.IP.String()
  276. dns.NewAddress = newNode.Address6.IP.String()
  277. if err := PublishDNSUpdate(oldNode.Network, dns); err != nil {
  278. return err
  279. }
  280. }
  281. return nil
  282. }
  283. // PublishExtClientDNS publish dns update for new extclient
  284. func PublishExtClientDNS(client *models.ExtClient) error {
  285. errMsgs := models.DNSError{}
  286. dns := models.DNSUpdate{
  287. Action: models.DNSInsert,
  288. Name: client.ClientID + "." + client.Network,
  289. Address: client.Address,
  290. }
  291. if client.Address != "" {
  292. dns.Address = client.Address
  293. if err := PublishDNSUpdate(client.Network, dns); err != nil {
  294. errMsgs.ErrorStrings = append(errMsgs.ErrorStrings, err.Error())
  295. }
  296. }
  297. if client.Address6 != "" {
  298. dns.Address = client.Address6
  299. if err := PublishDNSUpdate(client.Network, dns); err != nil {
  300. errMsgs.ErrorStrings = append(errMsgs.ErrorStrings, err.Error())
  301. }
  302. }
  303. if len(errMsgs.ErrorStrings) > 0 {
  304. return errMsgs
  305. }
  306. return nil
  307. }
  308. // PublishExtClientDNSUpdate update for extclient name change
  309. func PublishExtClientDNSUpdate(old, new models.ExtClient, network string) error {
  310. dns := models.DNSUpdate{
  311. Action: models.DNSReplaceName,
  312. Name: old.ClientID + "." + network,
  313. NewName: new.ClientID + "." + network,
  314. }
  315. if err := PublishDNSUpdate(network, dns); err != nil {
  316. return err
  317. }
  318. return nil
  319. }
  320. // PublishDeleteAllExtclientsDNS - publish to delete all passed ext clients dns entries
  321. func PublishDeleteAllExtclientsDNS(network string, clients []models.ExtClient) error {
  322. nodes, err := logic.GetNetworkNodes(network)
  323. if err != nil {
  324. return err
  325. }
  326. for _, client := range clients {
  327. dns := models.DNSUpdate{
  328. Action: models.DNSDeleteByName,
  329. Name: client.ClientID + "." + client.Network,
  330. }
  331. go PublishDNSUpdatev1(client.Network, dns, nodes)
  332. }
  333. return nil
  334. }
  335. // PublishDeleteExtClientDNS publish dns update to delete extclient entry
  336. func PublishDeleteExtClientDNS(client *models.ExtClient) error {
  337. dns := models.DNSUpdate{
  338. Action: models.DNSDeleteByName,
  339. Name: client.ClientID + "." + client.Network,
  340. }
  341. if err := PublishDNSUpdate(client.Network, dns); err != nil {
  342. return err
  343. }
  344. return nil
  345. }
  346. // PublishCustomDNS publish dns update for new custom dns entry
  347. func PublishCustomDNS(entry *models.DNSEntry) error {
  348. dns := models.DNSUpdate{
  349. Action: models.DNSInsert,
  350. Name: entry.Name,
  351. //entry.Address6 is never used
  352. Address: entry.Address,
  353. }
  354. if err := PublishDNSUpdate(entry.Network, dns); err != nil {
  355. return err
  356. }
  357. return nil
  358. }
  359. // PublishHostDNSUpdate publishes dns update on host name change
  360. func PublishHostDNSUpdate(old, new *models.Host, networks []string) error {
  361. errMsgs := models.DNSError{}
  362. for _, network := range networks {
  363. dns := models.DNSUpdate{
  364. Action: models.DNSReplaceName,
  365. Name: old.Name + "." + network,
  366. NewName: new.Name + "." + network,
  367. }
  368. if err := PublishDNSUpdate(network, dns); err != nil {
  369. errMsgs.ErrorStrings = append(errMsgs.ErrorStrings, err.Error())
  370. }
  371. }
  372. if len(errMsgs.ErrorStrings) > 0 {
  373. return errMsgs
  374. }
  375. return nil
  376. }
  377. func PushMetricsToExporter(metrics models.Metrics) error {
  378. logger.Log(2, "----> Pushing metrics to exporter")
  379. data, err := json.Marshal(metrics)
  380. if err != nil {
  381. return errors.New("failed to marshal metrics: " + err.Error())
  382. }
  383. if token := mqclient.Publish("metrics_exporter", 2, true, data); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
  384. var err error
  385. if token.Error() == nil {
  386. err = errors.New("connection timeout")
  387. } else {
  388. err = token.Error()
  389. }
  390. return err
  391. }
  392. return nil
  393. }
  394. func getNodeDNS(network string) []models.DNSUpdate {
  395. alldns := []models.DNSUpdate{}
  396. dns := models.DNSUpdate{}
  397. nodes, err := logic.GetNetworkNodes(network)
  398. if err != nil {
  399. logger.Log(0, "error retreiving network nodes for network", network, err.Error())
  400. }
  401. for _, node := range nodes {
  402. host, err := logic.GetHost(node.HostID.String())
  403. if err != nil {
  404. logger.Log(0, "error retrieving host for dns update", node.HostID.String(), err.Error())
  405. continue
  406. }
  407. dns.Action = models.DNSInsert
  408. dns.Name = host.Name + "." + node.Network
  409. if node.Address.IP != nil {
  410. dns.Address = node.Address.IP.String()
  411. alldns = append(alldns, dns)
  412. }
  413. if node.Address6.IP != nil {
  414. dns.Address = node.Address6.IP.String()
  415. alldns = append(alldns, dns)
  416. }
  417. }
  418. return alldns
  419. }
  420. func getExtClientDNS(network string) []models.DNSUpdate {
  421. alldns := []models.DNSUpdate{}
  422. dns := models.DNSUpdate{}
  423. clients, err := logic.GetNetworkExtClients(network)
  424. if err != nil {
  425. logger.Log(0, "error retrieving extclients", err.Error())
  426. }
  427. for _, client := range clients {
  428. dns.Action = models.DNSInsert
  429. dns.Name = client.ClientID + "." + client.Network
  430. if client.Address != "" {
  431. dns.Address = client.Address
  432. alldns = append(alldns, dns)
  433. }
  434. if client.Address6 != "" {
  435. dns.Address = client.Address
  436. alldns = append(alldns, dns)
  437. }
  438. }
  439. return alldns
  440. }
  441. func getCustomDNS(network string) []models.DNSUpdate {
  442. alldns := []models.DNSUpdate{}
  443. dns := models.DNSUpdate{}
  444. customdns, err := logic.GetCustomDNS(network)
  445. if err != nil {
  446. logger.Log(0, "error retrieving custom dns entries", err.Error())
  447. }
  448. for _, custom := range customdns {
  449. dns.Action = models.DNSInsert
  450. dns.Address = custom.Address
  451. dns.Name = custom.Name + "." + custom.Network
  452. alldns = append(alldns, dns)
  453. }
  454. return alldns
  455. }
  456. // sendPeers - retrieve networks, send peer ports to all peers
  457. func sendPeers() {
  458. hosts, err := logic.GetAllHosts()
  459. if err != nil && len(hosts) > 0 {
  460. logger.Log(1, "error retrieving networks for keepalive", err.Error())
  461. }
  462. nodes, err := logic.GetAllNodes()
  463. if err != nil {
  464. return
  465. }
  466. var force bool
  467. peer_force_send++
  468. if peer_force_send == 5 {
  469. servercfg.SetHost()
  470. force = true
  471. peer_force_send = 0
  472. err := logic.TimerCheckpoint() // run telemetry & log dumps if 24 hours has passed..
  473. if err != nil {
  474. logger.Log(3, "error occurred on timer,", err.Error())
  475. }
  476. //collectServerMetrics(networks[:])
  477. }
  478. if force {
  479. for _, host := range hosts {
  480. host := host
  481. logger.Log(2, "sending scheduled peer update (5 min)")
  482. if err = PublishSingleHostPeerUpdate(&host, nodes, nil, nil); err != nil {
  483. logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
  484. }
  485. }
  486. }
  487. }