publishers.go 8.4 KB


  1. package mq
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/gravitl/netmaker/logger"
  9. "github.com/gravitl/netmaker/logic"
  10. "github.com/gravitl/netmaker/models"
  11. "github.com/gravitl/netmaker/servercfg"
  12. "golang.org/x/exp/slog"
  13. )
  14. var batchSize = servercfg.GetPeerUpdateBatchSize()
  15. var batchUpdate = servercfg.GetBatchPeerUpdate()
  16. // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
  17. func PublishPeerUpdate(replacePeers bool) error {
  18. if !servercfg.IsMessageQueueBackend() {
  19. return nil
  20. }
  21. if servercfg.GetManageDNS() {
  22. sendDNSSync()
  23. }
  24. hosts, err := logic.GetAllHosts()
  25. if err != nil {
  26. logger.Log(1, "err getting all hosts", err.Error())
  27. return err
  28. }
  29. allNodes, err := logic.GetAllNodes()
  30. if err != nil {
  31. return err
  32. }
  33. //if batch peer update disabled
  34. if !batchUpdate {
  35. for _, host := range hosts {
  36. host := host
  37. go func(host models.Host) {
  38. if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil {
  39. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  40. }
  41. }(host)
  42. }
  43. return nil
  44. }
  45. //if batch peer update enabled
  46. batchHost := BatchItems(hosts, batchSize)
  47. var wg sync.WaitGroup
  48. for _, v := range batchHost {
  49. hostLen := len(v)
  50. wg.Add(hostLen)
  51. for i := 0; i < hostLen; i++ {
  52. host := hosts[i]
  53. go func(host models.Host) {
  54. if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
  55. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  56. }
  57. }(host)
  58. }
  59. wg.Wait()
  60. }
  61. return nil
  62. }
  63. // PublishDeletedNodePeerUpdate --- determines and publishes a peer update
  64. // to all the hosts with a deleted node to account for
  65. func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
  66. if !servercfg.IsMessageQueueBackend() {
  67. return nil
  68. }
  69. hosts, err := logic.GetAllHosts()
  70. if err != nil {
  71. logger.Log(1, "err getting all hosts", err.Error())
  72. return err
  73. }
  74. allNodes, err := logic.GetAllNodes()
  75. if err != nil {
  76. return err
  77. }
  78. for _, host := range hosts {
  79. host := host
  80. if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, nil); err != nil {
  81. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  82. }
  83. }
  84. return err
  85. }
  86. // PublishDeletedClientPeerUpdate --- determines and publishes a peer update
  87. // to all the hosts with a deleted ext client to account for
  88. func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
  89. if !servercfg.IsMessageQueueBackend() {
  90. return nil
  91. }
  92. hosts, err := logic.GetAllHosts()
  93. if err != nil {
  94. logger.Log(1, "err getting all hosts", err.Error())
  95. return err
  96. }
  97. nodes, err := logic.GetAllNodes()
  98. if err != nil {
  99. return err
  100. }
  101. for _, host := range hosts {
  102. host := host
  103. if host.OS != models.OS_Types.IoT {
  104. if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, nil); err != nil {
  105. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  106. }
  107. }
  108. }
  109. return err
  110. }
  111. // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
  112. func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool, wg *sync.WaitGroup) error {
  113. if wg != nil {
  114. defer wg.Done()
  115. }
  116. peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
  117. if err != nil {
  118. return err
  119. }
  120. peerUpdate.ReplacePeers = replacePeers
  121. data, err := json.Marshal(&peerUpdate)
  122. if err != nil {
  123. return err
  124. }
  125. return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
  126. }
  127. // NodeUpdate -- publishes a node update
  128. func NodeUpdate(node *models.Node) error {
  129. host, err := logic.GetHost(node.HostID.String())
  130. if err != nil {
  131. return nil
  132. }
  133. if !servercfg.IsMessageQueueBackend() {
  134. return nil
  135. }
  136. logger.Log(3, "publishing node update to "+node.ID.String())
  137. //if len(node.NetworkSettings.AccessKeys) > 0 {
  138. //node.NetworkSettings.AccessKeys = []models.AccessKey{} // not to be sent (don't need to spread access keys around the network; we need to know how to reach other nodes, not become them)
  139. //}
  140. data, err := json.Marshal(node)
  141. if err != nil {
  142. logger.Log(2, "error marshalling node update ", err.Error())
  143. return err
  144. }
  145. if err = publish(host, fmt.Sprintf("node/update/%s/%s", node.Network, node.ID), data); err != nil {
  146. logger.Log(2, "error publishing node update to peer ", node.ID.String(), err.Error())
  147. return err
  148. }
  149. return nil
  150. }
  151. // HostUpdate -- publishes a host update to clients
  152. func HostUpdate(hostUpdate *models.HostUpdate) error {
  153. if !servercfg.IsMessageQueueBackend() {
  154. return nil
  155. }
  156. logger.Log(3, "publishing host update to "+hostUpdate.Host.ID.String())
  157. data, err := json.Marshal(hostUpdate)
  158. if err != nil {
  159. logger.Log(2, "error marshalling node update ", err.Error())
  160. return err
  161. }
  162. if err = publish(&hostUpdate.Host, fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), data); err != nil {
  163. logger.Log(2, "error publishing host update to", hostUpdate.Host.ID.String(), err.Error())
  164. return err
  165. }
  166. return nil
  167. }
  168. // ServerStartNotify - notifies all non server nodes to pull changes after a restart
  169. func ServerStartNotify() error {
  170. nodes, err := logic.GetAllNodes()
  171. if err != nil {
  172. return err
  173. }
  174. for i := range nodes {
  175. nodes[i].Action = models.NODE_FORCE_UPDATE
  176. if err = NodeUpdate(&nodes[i]); err != nil {
  177. logger.Log(1, "error when notifying node", nodes[i].ID.String(), "of a server startup")
  178. }
  179. }
  180. return nil
  181. }
  182. // PublishMqUpdatesForDeletedNode - published all the required updates for deleted node
  183. func PublishMqUpdatesForDeletedNode(node models.Node, sendNodeUpdate bool, gwClients []models.ExtClient) {
  184. // notify of peer change
  185. node.PendingDelete = true
  186. node.Action = models.NODE_DELETE
  187. if sendNodeUpdate {
  188. if err := NodeUpdate(&node); err != nil {
  189. slog.Error("error publishing node update to node", "node", node.ID, "error", err)
  190. }
  191. }
  192. if err := PublishDeletedNodePeerUpdate(&node); err != nil {
  193. logger.Log(1, "error publishing peer update ", err.Error())
  194. }
  195. if servercfg.IsDNSMode() {
  196. logic.SetDNS()
  197. }
  198. }
  199. func PushMetricsToExporter(metrics models.Metrics) error {
  200. logger.Log(2, "----> Pushing metrics to exporter")
  201. data, err := json.Marshal(metrics)
  202. if err != nil {
  203. return errors.New("failed to marshal metrics: " + err.Error())
  204. }
  205. if mqclient == nil || !mqclient.IsConnectionOpen() {
  206. return errors.New("cannot publish ... mqclient not connected")
  207. }
  208. if token := mqclient.Publish("metrics_exporter", 0, true, data); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
  209. var err error
  210. if token.Error() == nil {
  211. err = errors.New("connection timeout")
  212. } else {
  213. err = token.Error()
  214. }
  215. return err
  216. }
  217. return nil
  218. }
  219. // sendPeers - retrieve networks, send peer ports to all peers
  220. func sendPeers() {
  221. peer_force_send++
  222. if peer_force_send == 5 {
  223. servercfg.SetHost()
  224. peer_force_send = 0
  225. err := logic.TimerCheckpoint() // run telemetry & log dumps if 24 hours has passed..
  226. if err != nil {
  227. logger.Log(3, "error occurred on timer,", err.Error())
  228. }
  229. }
  230. }
  231. func SendDNSSyncByNetwork(network string) error {
  232. k, err := logic.GetDNS(network)
  233. if err == nil && len(k) > 0 {
  234. err = PushSyncDNS(k)
  235. if err != nil {
  236. slog.Warn("error publishing dns entry data for network ", network, err.Error())
  237. }
  238. }
  239. return err
  240. }
  241. func sendDNSSync() error {
  242. networks, err := logic.GetNetworks()
  243. if err == nil && len(networks) > 0 {
  244. for _, v := range networks {
  245. k, err := logic.GetDNS(v.NetID)
  246. if err == nil && len(k) > 0 {
  247. err = PushSyncDNS(k)
  248. if err != nil {
  249. slog.Warn("error publishing dns entry data for network ", v.NetID, err.Error())
  250. }
  251. }
  252. }
  253. return nil
  254. }
  255. return err
  256. }
  257. func PushSyncDNS(dnsEntries []models.DNSEntry) error {
  258. logger.Log(2, "----> Pushing Sync DNS")
  259. data, err := json.Marshal(dnsEntries)
  260. if err != nil {
  261. return errors.New("failed to marshal DNS entries: " + err.Error())
  262. }
  263. if mqclient == nil || !mqclient.IsConnectionOpen() {
  264. return errors.New("cannot publish ... mqclient not connected")
  265. }
  266. if token := mqclient.Publish(fmt.Sprintf("host/dns/sync/%s", dnsEntries[0].Network), 0, true, data); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
  267. var err error
  268. if token.Error() == nil {
  269. err = errors.New("connection timeout")
  270. } else {
  271. err = token.Error()
  272. }
  273. return err
  274. }
  275. return nil
  276. }