publishers.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. package mq
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/gravitl/netmaker/logger"
  9. "github.com/gravitl/netmaker/logic"
  10. "github.com/gravitl/netmaker/models"
  11. "github.com/gravitl/netmaker/servercfg"
  12. "golang.org/x/exp/slog"
  13. )
  14. var batchSize = servercfg.GetPeerUpdateBatchSize()
  15. var batchUpdate = servercfg.GetBatchPeerUpdate()
  16. // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
  17. func PublishPeerUpdate(replacePeers bool) error {
  18. if !servercfg.IsMessageQueueBackend() {
  19. return nil
  20. }
  21. hosts, err := logic.GetAllHosts()
  22. if err != nil {
  23. logger.Log(1, "err getting all hosts", err.Error())
  24. return err
  25. }
  26. allNodes, err := logic.GetAllNodes()
  27. if err != nil {
  28. return err
  29. }
  30. //if batch peer update disabled
  31. if !batchUpdate {
  32. for _, host := range hosts {
  33. host := host
  34. go func(host models.Host) {
  35. if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil {
  36. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  37. }
  38. }(host)
  39. }
  40. return nil
  41. }
  42. //if batch peer update enabled
  43. batchHost := BatchItems(hosts, batchSize)
  44. var wg sync.WaitGroup
  45. for _, v := range batchHost {
  46. hostLen := len(v)
  47. wg.Add(hostLen)
  48. for i := 0; i < hostLen; i++ {
  49. host := hosts[i]
  50. go func(host models.Host) {
  51. if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
  52. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  53. }
  54. }(host)
  55. }
  56. wg.Wait()
  57. }
  58. return nil
  59. }
  60. // PublishDeletedNodePeerUpdate --- determines and publishes a peer update
  61. // to all the hosts with a deleted node to account for
  62. func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
  63. if !servercfg.IsMessageQueueBackend() {
  64. return nil
  65. }
  66. hosts, err := logic.GetAllHosts()
  67. if err != nil {
  68. logger.Log(1, "err getting all hosts", err.Error())
  69. return err
  70. }
  71. allNodes, err := logic.GetAllNodes()
  72. if err != nil {
  73. return err
  74. }
  75. for _, host := range hosts {
  76. host := host
  77. if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, nil); err != nil {
  78. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  79. }
  80. }
  81. return err
  82. }
  83. // PublishDeletedClientPeerUpdate --- determines and publishes a peer update
  84. // to all the hosts with a deleted ext client to account for
  85. func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
  86. if !servercfg.IsMessageQueueBackend() {
  87. return nil
  88. }
  89. hosts, err := logic.GetAllHosts()
  90. if err != nil {
  91. logger.Log(1, "err getting all hosts", err.Error())
  92. return err
  93. }
  94. nodes, err := logic.GetAllNodes()
  95. if err != nil {
  96. return err
  97. }
  98. for _, host := range hosts {
  99. host := host
  100. if host.OS != models.OS_Types.IoT {
  101. if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, nil); err != nil {
  102. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  103. }
  104. }
  105. }
  106. return err
  107. }
  108. // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
  109. func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool, wg *sync.WaitGroup) error {
  110. if wg != nil {
  111. defer wg.Done()
  112. }
  113. peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
  114. if err != nil {
  115. return err
  116. }
  117. peerUpdate.ReplacePeers = replacePeers
  118. data, err := json.Marshal(&peerUpdate)
  119. if err != nil {
  120. return err
  121. }
  122. return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
  123. }
  124. // NodeUpdate -- publishes a node update
  125. func NodeUpdate(node *models.Node) error {
  126. host, err := logic.GetHost(node.HostID.String())
  127. if err != nil {
  128. return nil
  129. }
  130. if !servercfg.IsMessageQueueBackend() {
  131. return nil
  132. }
  133. logger.Log(3, "publishing node update to "+node.ID.String())
  134. //if len(node.NetworkSettings.AccessKeys) > 0 {
  135. //node.NetworkSettings.AccessKeys = []models.AccessKey{} // not to be sent (don't need to spread access keys around the network; we need to know how to reach other nodes, not become them)
  136. //}
  137. data, err := json.Marshal(node)
  138. if err != nil {
  139. logger.Log(2, "error marshalling node update ", err.Error())
  140. return err
  141. }
  142. if err = publish(host, fmt.Sprintf("node/update/%s/%s", node.Network, node.ID), data); err != nil {
  143. logger.Log(2, "error publishing node update to peer ", node.ID.String(), err.Error())
  144. return err
  145. }
  146. return nil
  147. }
  148. // HostUpdate -- publishes a host update to clients
  149. func HostUpdate(hostUpdate *models.HostUpdate) error {
  150. if !servercfg.IsMessageQueueBackend() {
  151. return nil
  152. }
  153. logger.Log(3, "publishing host update to "+hostUpdate.Host.ID.String())
  154. data, err := json.Marshal(hostUpdate)
  155. if err != nil {
  156. logger.Log(2, "error marshalling node update ", err.Error())
  157. return err
  158. }
  159. if err = publish(&hostUpdate.Host, fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), data); err != nil {
  160. logger.Log(2, "error publishing host update to", hostUpdate.Host.ID.String(), err.Error())
  161. return err
  162. }
  163. return nil
  164. }
  165. // ServerStartNotify - notifies all non server nodes to pull changes after a restart
  166. func ServerStartNotify() error {
  167. nodes, err := logic.GetAllNodes()
  168. if err != nil {
  169. return err
  170. }
  171. for i := range nodes {
  172. nodes[i].Action = models.NODE_FORCE_UPDATE
  173. if err = NodeUpdate(&nodes[i]); err != nil {
  174. logger.Log(1, "error when notifying node", nodes[i].ID.String(), "of a server startup")
  175. }
  176. }
  177. return nil
  178. }
  179. // PublishMqUpdatesForDeletedNode - published all the required updates for deleted node
  180. func PublishMqUpdatesForDeletedNode(node models.Node, sendNodeUpdate bool, gwClients []models.ExtClient) {
  181. // notify of peer change
  182. node.PendingDelete = true
  183. node.Action = models.NODE_DELETE
  184. if sendNodeUpdate {
  185. if err := NodeUpdate(&node); err != nil {
  186. slog.Error("error publishing node update to node", "node", node.ID, "error", err)
  187. }
  188. }
  189. if err := PublishDeletedNodePeerUpdate(&node); err != nil {
  190. logger.Log(1, "error publishing peer update ", err.Error())
  191. }
  192. if servercfg.IsDNSMode() {
  193. logic.SetDNS()
  194. }
  195. }
  196. func PushMetricsToExporter(metrics models.Metrics) error {
  197. logger.Log(2, "----> Pushing metrics to exporter")
  198. data, err := json.Marshal(metrics)
  199. if err != nil {
  200. return errors.New("failed to marshal metrics: " + err.Error())
  201. }
  202. if mqclient == nil || !mqclient.IsConnectionOpen() {
  203. return errors.New("cannot publish ... mqclient not connected")
  204. }
  205. if token := mqclient.Publish("metrics_exporter", 0, true, data); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
  206. var err error
  207. if token.Error() == nil {
  208. err = errors.New("connection timeout")
  209. } else {
  210. err = token.Error()
  211. }
  212. return err
  213. }
  214. return nil
  215. }
  216. // sendPeers - retrieve networks, send peer ports to all peers
  217. func sendPeers() {
  218. peer_force_send++
  219. if peer_force_send == 5 {
  220. servercfg.SetHost()
  221. peer_force_send = 0
  222. err := logic.TimerCheckpoint() // run telemetry & log dumps if 24 hours has passed..
  223. if err != nil {
  224. logger.Log(3, "error occurred on timer,", err.Error())
  225. }
  226. }
  227. }