publishers.go 6.9 KB


  1. package mq
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "time"
  7. "github.com/gravitl/netmaker/logger"
  8. "github.com/gravitl/netmaker/logic"
  9. "github.com/gravitl/netmaker/models"
  10. "github.com/gravitl/netmaker/servercfg"
  11. "golang.org/x/exp/slog"
  12. )
  13. // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
  14. func PublishPeerUpdate(replacePeers bool) error {
  15. if !servercfg.IsMessageQueueBackend() {
  16. return nil
  17. }
  18. hosts, err := logic.GetAllHosts()
  19. if err != nil {
  20. logger.Log(1, "err getting all hosts", err.Error())
  21. return err
  22. }
  23. allNodes, err := logic.GetAllNodes()
  24. if err != nil {
  25. return err
  26. }
  27. for _, host := range hosts {
  28. host := host
  29. go func(host models.Host) {
  30. if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil {
  31. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  32. }
  33. }(host)
  34. }
  35. return err
  36. }
  37. // PublishDeletedNodePeerUpdate --- determines and publishes a peer update
  38. // to all the hosts with a deleted node to account for
  39. func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
  40. if !servercfg.IsMessageQueueBackend() {
  41. return nil
  42. }
  43. hosts, err := logic.GetAllHosts()
  44. if err != nil {
  45. logger.Log(1, "err getting all hosts", err.Error())
  46. return err
  47. }
  48. allNodes, err := logic.GetAllNodes()
  49. if err != nil {
  50. return err
  51. }
  52. for _, host := range hosts {
  53. host := host
  54. if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false); err != nil {
  55. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  56. }
  57. }
  58. return err
  59. }
  60. // PublishDeletedClientPeerUpdate --- determines and publishes a peer update
  61. // to all the hosts with a deleted ext client to account for
  62. func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
  63. if !servercfg.IsMessageQueueBackend() {
  64. return nil
  65. }
  66. hosts, err := logic.GetAllHosts()
  67. if err != nil {
  68. logger.Log(1, "err getting all hosts", err.Error())
  69. return err
  70. }
  71. nodes, err := logic.GetAllNodes()
  72. if err != nil {
  73. return err
  74. }
  75. for _, host := range hosts {
  76. host := host
  77. if host.OS != models.OS_Types.IoT {
  78. if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false); err != nil {
  79. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  80. }
  81. }
  82. }
  83. return err
  84. }
  85. // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
  86. func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool) error {
  87. peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
  88. if err != nil {
  89. return err
  90. }
  91. peerUpdate.ReplacePeers = replacePeers
  92. data, err := json.Marshal(&peerUpdate)
  93. if err != nil {
  94. return err
  95. }
  96. return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
  97. }
  98. // NodeUpdate -- publishes a node update
  99. func NodeUpdate(node *models.Node) error {
  100. host, err := logic.GetHost(node.HostID.String())
  101. if err != nil {
  102. return nil
  103. }
  104. if !servercfg.IsMessageQueueBackend() {
  105. return nil
  106. }
  107. logger.Log(3, "publishing node update to "+node.ID.String())
  108. //if len(node.NetworkSettings.AccessKeys) > 0 {
  109. //node.NetworkSettings.AccessKeys = []models.AccessKey{} // not to be sent (don't need to spread access keys around the network; we need to know how to reach other nodes, not become them)
  110. //}
  111. data, err := json.Marshal(node)
  112. if err != nil {
  113. logger.Log(2, "error marshalling node update ", err.Error())
  114. return err
  115. }
  116. if err = publish(host, fmt.Sprintf("node/update/%s/%s", node.Network, node.ID), data); err != nil {
  117. logger.Log(2, "error publishing node update to peer ", node.ID.String(), err.Error())
  118. return err
  119. }
  120. return nil
  121. }
  122. // HostUpdate -- publishes a host update to clients
  123. func HostUpdate(hostUpdate *models.HostUpdate) error {
  124. if !servercfg.IsMessageQueueBackend() {
  125. return nil
  126. }
  127. logger.Log(3, "publishing host update to "+hostUpdate.Host.ID.String())
  128. data, err := json.Marshal(hostUpdate)
  129. if err != nil {
  130. logger.Log(2, "error marshalling node update ", err.Error())
  131. return err
  132. }
  133. if err = publish(&hostUpdate.Host, fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), data); err != nil {
  134. logger.Log(2, "error publishing host update to", hostUpdate.Host.ID.String(), err.Error())
  135. return err
  136. }
  137. return nil
  138. }
  139. // ServerStartNotify - notifies all non server nodes to pull changes after a restart
  140. func ServerStartNotify() error {
  141. nodes, err := logic.GetAllNodes()
  142. if err != nil {
  143. return err
  144. }
  145. for i := range nodes {
  146. nodes[i].Action = models.NODE_FORCE_UPDATE
  147. if err = NodeUpdate(&nodes[i]); err != nil {
  148. logger.Log(1, "error when notifying node", nodes[i].ID.String(), "of a server startup")
  149. }
  150. }
  151. return nil
  152. }
  153. // PublishMqUpdatesForDeletedNode - published all the required updates for deleted node
  154. func PublishMqUpdatesForDeletedNode(node models.Node, sendNodeUpdate bool, gwClients []models.ExtClient) {
  155. // notify of peer change
  156. node.PendingDelete = true
  157. node.Action = models.NODE_DELETE
  158. if sendNodeUpdate {
  159. if err := NodeUpdate(&node); err != nil {
  160. slog.Error("error publishing node update to node", "node", node.ID, "error", err)
  161. }
  162. }
  163. if err := PublishDeletedNodePeerUpdate(&node); err != nil {
  164. logger.Log(1, "error publishing peer update ", err.Error())
  165. }
  166. if servercfg.IsDNSMode() {
  167. logic.SetDNS()
  168. }
  169. }
  170. func PushMetricsToExporter(metrics models.Metrics) error {
  171. logger.Log(2, "----> Pushing metrics to exporter")
  172. data, err := json.Marshal(metrics)
  173. if err != nil {
  174. return errors.New("failed to marshal metrics: " + err.Error())
  175. }
  176. if token := mqclient.Publish("metrics_exporter", 2, true, data); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
  177. var err error
  178. if token.Error() == nil {
  179. err = errors.New("connection timeout")
  180. } else {
  181. err = token.Error()
  182. }
  183. return err
  184. }
  185. return nil
  186. }
  187. // sendPeers - retrieve networks, send peer ports to all peers
  188. func sendPeers() {
  189. hosts, err := logic.GetAllHosts()
  190. if err != nil && len(hosts) > 0 {
  191. logger.Log(1, "error retrieving networks for keepalive", err.Error())
  192. }
  193. nodes, err := logic.GetAllNodes()
  194. if err != nil {
  195. return
  196. }
  197. var force bool
  198. peer_force_send++
  199. if peer_force_send == 5 {
  200. servercfg.SetHost()
  201. force = true
  202. peer_force_send = 0
  203. err := logic.TimerCheckpoint() // run telemetry & log dumps if 24 hours has passed..
  204. if err != nil {
  205. logger.Log(3, "error occurred on timer,", err.Error())
  206. }
  207. //collectServerMetrics(networks[:])
  208. }
  209. if force {
  210. for _, host := range hosts {
  211. host := host
  212. logger.Log(2, "sending scheduled peer update (5 min)")
  213. if err = PublishSingleHostPeerUpdate(&host, nodes, nil, nil, false); err != nil {
  214. logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
  215. }
  216. }
  217. }
  218. }