publishers.go 7.9 KB


  1. package mq
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/google/uuid"
  9. "github.com/gravitl/netmaker/logger"
  10. "github.com/gravitl/netmaker/logic"
  11. "github.com/gravitl/netmaker/models"
  12. "github.com/gravitl/netmaker/servercfg"
  13. "golang.org/x/exp/slog"
  14. )
  15. // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
  16. func PublishPeerUpdate(replacePeers bool) error {
  17. if !servercfg.IsMessageQueueBackend() {
  18. return nil
  19. }
  20. if servercfg.GetManageDNS() {
  21. sendDNSSync()
  22. }
  23. hosts, err := logic.GetAllHosts()
  24. if err != nil {
  25. logger.Log(1, "err getting all hosts", err.Error())
  26. return err
  27. }
  28. allNodes, err := logic.GetAllNodes()
  29. if err != nil {
  30. return err
  31. }
  32. for _, host := range hosts {
  33. host := host
  34. time.Sleep(5 * time.Millisecond)
  35. go func(host models.Host) {
  36. if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil {
  37. id := host.Name
  38. if host.ID != uuid.Nil {
  39. id = host.ID.String()
  40. }
  41. slog.Error("failed to publish peer update to host", id, ": ", err)
  42. }
  43. }(host)
  44. }
  45. return nil
  46. }
  47. // PublishDeletedNodePeerUpdate --- determines and publishes a peer update
  48. // to all the hosts with a deleted node to account for
  49. func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
  50. if !servercfg.IsMessageQueueBackend() {
  51. return nil
  52. }
  53. hosts, err := logic.GetAllHosts()
  54. if err != nil {
  55. logger.Log(1, "err getting all hosts", err.Error())
  56. return err
  57. }
  58. allNodes, err := logic.GetAllNodes()
  59. if err != nil {
  60. return err
  61. }
  62. for _, host := range hosts {
  63. host := host
  64. if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, nil); err != nil {
  65. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  66. }
  67. }
  68. return err
  69. }
  70. // PublishDeletedClientPeerUpdate --- determines and publishes a peer update
  71. // to all the hosts with a deleted ext client to account for
  72. func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
  73. if !servercfg.IsMessageQueueBackend() {
  74. return nil
  75. }
  76. hosts, err := logic.GetAllHosts()
  77. if err != nil {
  78. logger.Log(1, "err getting all hosts", err.Error())
  79. return err
  80. }
  81. nodes, err := logic.GetAllNodes()
  82. if err != nil {
  83. return err
  84. }
  85. for _, host := range hosts {
  86. host := host
  87. if host.OS != models.OS_Types.IoT {
  88. if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, nil); err != nil {
  89. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  90. }
  91. }
  92. }
  93. return err
  94. }
  95. // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
  96. func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool, wg *sync.WaitGroup) error {
  97. if wg != nil {
  98. defer wg.Done()
  99. }
  100. peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
  101. if err != nil {
  102. return err
  103. }
  104. peerUpdate.ReplacePeers = replacePeers
  105. data, err := json.Marshal(&peerUpdate)
  106. if err != nil {
  107. return err
  108. }
  109. return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
  110. }
  111. // NodeUpdate -- publishes a node update
  112. func NodeUpdate(node *models.Node) error {
  113. host, err := logic.GetHost(node.HostID.String())
  114. if err != nil {
  115. return nil
  116. }
  117. if !servercfg.IsMessageQueueBackend() {
  118. return nil
  119. }
  120. logger.Log(3, "publishing node update to "+node.ID.String())
  121. //if len(node.NetworkSettings.AccessKeys) > 0 {
  122. //node.NetworkSettings.AccessKeys = []models.AccessKey{} // not to be sent (don't need to spread access keys around the network; we need to know how to reach other nodes, not become them)
  123. //}
  124. data, err := json.Marshal(node)
  125. if err != nil {
  126. logger.Log(2, "error marshalling node update ", err.Error())
  127. return err
  128. }
  129. if err = publish(host, fmt.Sprintf("node/update/%s/%s", node.Network, node.ID), data); err != nil {
  130. logger.Log(2, "error publishing node update to peer ", node.ID.String(), err.Error())
  131. return err
  132. }
  133. return nil
  134. }
  135. // HostUpdate -- publishes a host update to clients
  136. func HostUpdate(hostUpdate *models.HostUpdate) error {
  137. if !servercfg.IsMessageQueueBackend() {
  138. return nil
  139. }
  140. logger.Log(3, "publishing host update to "+hostUpdate.Host.ID.String())
  141. data, err := json.Marshal(hostUpdate)
  142. if err != nil {
  143. logger.Log(2, "error marshalling node update ", err.Error())
  144. return err
  145. }
  146. if err = publish(&hostUpdate.Host, fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), data); err != nil {
  147. logger.Log(2, "error publishing host update to", hostUpdate.Host.ID.String(), err.Error())
  148. return err
  149. }
  150. return nil
  151. }
  152. // ServerStartNotify - notifies all non server nodes to pull changes after a restart
  153. func ServerStartNotify() error {
  154. nodes, err := logic.GetAllNodes()
  155. if err != nil {
  156. return err
  157. }
  158. for i := range nodes {
  159. nodes[i].Action = models.NODE_FORCE_UPDATE
  160. if err = NodeUpdate(&nodes[i]); err != nil {
  161. logger.Log(1, "error when notifying node", nodes[i].ID.String(), "of a server startup")
  162. }
  163. }
  164. return nil
  165. }
  166. // PublishMqUpdatesForDeletedNode - published all the required updates for deleted node
  167. func PublishMqUpdatesForDeletedNode(node models.Node, sendNodeUpdate bool, gwClients []models.ExtClient) {
  168. // notify of peer change
  169. node.PendingDelete = true
  170. node.Action = models.NODE_DELETE
  171. if sendNodeUpdate {
  172. if err := NodeUpdate(&node); err != nil {
  173. slog.Error("error publishing node update to node", "node", node.ID, "error", err)
  174. }
  175. }
  176. if err := PublishDeletedNodePeerUpdate(&node); err != nil {
  177. logger.Log(1, "error publishing peer update ", err.Error())
  178. }
  179. if servercfg.IsDNSMode() {
  180. logic.SetDNS()
  181. }
  182. }
  183. func PushMetricsToExporter(metrics models.Metrics) error {
  184. logger.Log(2, "----> Pushing metrics to exporter")
  185. data, err := json.Marshal(metrics)
  186. if err != nil {
  187. return errors.New("failed to marshal metrics: " + err.Error())
  188. }
  189. if mqclient == nil || !mqclient.IsConnectionOpen() {
  190. return errors.New("cannot publish ... mqclient not connected")
  191. }
  192. if token := mqclient.Publish("metrics_exporter", 0, true, data); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
  193. var err error
  194. if token.Error() == nil {
  195. err = errors.New("connection timeout")
  196. } else {
  197. err = token.Error()
  198. }
  199. return err
  200. }
  201. return nil
  202. }
  203. // sendPeers - retrieve networks, send peer ports to all peers
  204. func sendPeers() {
  205. peer_force_send++
  206. if peer_force_send == 5 {
  207. servercfg.SetHost()
  208. peer_force_send = 0
  209. err := logic.TimerCheckpoint() // run telemetry & log dumps if 24 hours has passed..
  210. if err != nil {
  211. logger.Log(3, "error occurred on timer,", err.Error())
  212. }
  213. }
  214. }
  215. func SendDNSSyncByNetwork(network string) error {
  216. k, err := logic.GetDNS(network)
  217. if err == nil && len(k) > 0 {
  218. err = PushSyncDNS(k)
  219. if err != nil {
  220. slog.Warn("error publishing dns entry data for network ", network, err.Error())
  221. }
  222. }
  223. return err
  224. }
  225. func sendDNSSync() error {
  226. networks, err := logic.GetNetworks()
  227. if err == nil && len(networks) > 0 {
  228. for _, v := range networks {
  229. k, err := logic.GetDNS(v.NetID)
  230. if err == nil && len(k) > 0 {
  231. err = PushSyncDNS(k)
  232. if err != nil {
  233. slog.Warn("error publishing dns entry data for network ", v.NetID, err.Error())
  234. }
  235. }
  236. }
  237. return nil
  238. }
  239. return err
  240. }
  241. func PushSyncDNS(dnsEntries []models.DNSEntry) error {
  242. logger.Log(2, "----> Pushing Sync DNS")
  243. data, err := json.Marshal(dnsEntries)
  244. if err != nil {
  245. return errors.New("failed to marshal DNS entries: " + err.Error())
  246. }
  247. if mqclient == nil || !mqclient.IsConnectionOpen() {
  248. return errors.New("cannot publish ... mqclient not connected")
  249. }
  250. if token := mqclient.Publish(fmt.Sprintf("host/dns/sync/%s", dnsEntries[0].Network), 0, true, data); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
  251. var err error
  252. if token.Error() == nil {
  253. err = errors.New("connection timeout")
  254. } else {
  255. err = token.Error()
  256. }
  257. return err
  258. }
  259. return nil
  260. }