2
0

publishers.go 8.1 KB


  1. package mq
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/google/uuid"
  9. "github.com/gravitl/netmaker/logger"
  10. "github.com/gravitl/netmaker/logic"
  11. "github.com/gravitl/netmaker/models"
  12. "github.com/gravitl/netmaker/servercfg"
  13. "golang.org/x/exp/slog"
  14. )
  15. // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
  16. func PublishPeerUpdate(replacePeers bool) error {
  17. if !servercfg.IsMessageQueueBackend() {
  18. return nil
  19. }
  20. if logic.GetManageDNS() {
  21. sendDNSSync()
  22. }
  23. hosts, err := logic.GetAllHosts()
  24. if err != nil {
  25. logger.Log(1, "err getting all hosts", err.Error())
  26. return err
  27. }
  28. allNodes, err := logic.GetAllNodes()
  29. if err != nil {
  30. return err
  31. }
  32. for _, host := range hosts {
  33. host := host
  34. time.Sleep(5 * time.Millisecond)
  35. go func(host models.Host) {
  36. if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil {
  37. id := host.Name
  38. if host.ID != uuid.Nil {
  39. id = host.ID.String()
  40. }
  41. slog.Error("failed to publish peer update to host", id, ": ", err)
  42. }
  43. }(host)
  44. }
  45. return nil
  46. }
  47. // PublishDeletedNodePeerUpdate --- determines and publishes a peer update
  48. // to all the hosts with a deleted node to account for
  49. func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
  50. if !servercfg.IsMessageQueueBackend() {
  51. return nil
  52. }
  53. hosts, err := logic.GetAllHosts()
  54. if err != nil {
  55. logger.Log(1, "err getting all hosts", err.Error())
  56. return err
  57. }
  58. allNodes, err := logic.GetAllNodes()
  59. if err != nil {
  60. return err
  61. }
  62. for _, host := range hosts {
  63. host := host
  64. if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, nil); err != nil {
  65. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  66. }
  67. }
  68. return err
  69. }
  70. // PublishDeletedClientPeerUpdate --- determines and publishes a peer update
  71. // to all the hosts with a deleted ext client to account for
  72. func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
  73. if !servercfg.IsMessageQueueBackend() {
  74. return nil
  75. }
  76. hosts, err := logic.GetAllHosts()
  77. if err != nil {
  78. logger.Log(1, "err getting all hosts", err.Error())
  79. return err
  80. }
  81. nodes, err := logic.GetAllNodes()
  82. if err != nil {
  83. return err
  84. }
  85. for _, host := range hosts {
  86. host := host
  87. if host.OS != models.OS_Types.IoT {
  88. if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, nil); err != nil {
  89. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  90. }
  91. }
  92. }
  93. return err
  94. }
  95. // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
  96. func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool, wg *sync.WaitGroup) error {
  97. if wg != nil {
  98. defer wg.Done()
  99. }
  100. peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
  101. if err != nil {
  102. return err
  103. }
  104. peerUpdate.OldPeerUpdateFields = models.OldPeerUpdateFields{
  105. NodePeers: peerUpdate.NodePeers,
  106. OldPeers: peerUpdate.Peers,
  107. EndpointDetection: peerUpdate.ServerConfig.EndpointDetection,
  108. }
  109. peerUpdate.ReplacePeers = replacePeers
  110. data, err := json.Marshal(&peerUpdate)
  111. if err != nil {
  112. return err
  113. }
  114. return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
  115. }
  116. // NodeUpdate -- publishes a node update
  117. func NodeUpdate(node *models.Node) error {
  118. host, err := logic.GetHost(node.HostID.String())
  119. if err != nil {
  120. return nil
  121. }
  122. if !servercfg.IsMessageQueueBackend() {
  123. return nil
  124. }
  125. logger.Log(3, "publishing node update to "+node.ID.String())
  126. //if len(node.NetworkSettings.AccessKeys) > 0 {
  127. //node.NetworkSettings.AccessKeys = []models.AccessKey{} // not to be sent (don't need to spread access keys around the network; we need to know how to reach other nodes, not become them)
  128. //}
  129. data, err := json.Marshal(node)
  130. if err != nil {
  131. logger.Log(2, "error marshalling node update ", err.Error())
  132. return err
  133. }
  134. if err = publish(host, fmt.Sprintf("node/update/%s/%s", node.Network, node.ID), data); err != nil {
  135. logger.Log(2, "error publishing node update to peer ", node.ID.String(), err.Error())
  136. return err
  137. }
  138. return nil
  139. }
  140. // HostUpdate -- publishes a host update to clients
  141. func HostUpdate(hostUpdate *models.HostUpdate) error {
  142. if !servercfg.IsMessageQueueBackend() {
  143. return nil
  144. }
  145. logger.Log(3, "publishing host update to "+hostUpdate.Host.ID.String())
  146. data, err := json.Marshal(hostUpdate)
  147. if err != nil {
  148. logger.Log(2, "error marshalling node update ", err.Error())
  149. return err
  150. }
  151. if err = publish(&hostUpdate.Host, fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), data); err != nil {
  152. logger.Log(2, "error publishing host update to", hostUpdate.Host.ID.String(), err.Error())
  153. return err
  154. }
  155. return nil
  156. }
  157. // ServerStartNotify - notifies all non server nodes to pull changes after a restart
  158. func ServerStartNotify() error {
  159. nodes, err := logic.GetAllNodes()
  160. if err != nil {
  161. return err
  162. }
  163. for i := range nodes {
  164. nodes[i].Action = models.NODE_FORCE_UPDATE
  165. if err = NodeUpdate(&nodes[i]); err != nil {
  166. logger.Log(1, "error when notifying node", nodes[i].ID.String(), "of a server startup")
  167. }
  168. }
  169. return nil
  170. }
  171. // PublishMqUpdatesForDeletedNode - published all the required updates for deleted node
  172. func PublishMqUpdatesForDeletedNode(node models.Node, sendNodeUpdate bool, gwClients []models.ExtClient) {
  173. // notify of peer change
  174. node.PendingDelete = true
  175. node.Action = models.NODE_DELETE
  176. if sendNodeUpdate {
  177. if err := NodeUpdate(&node); err != nil {
  178. slog.Error("error publishing node update to node", "node", node.ID, "error", err)
  179. }
  180. }
  181. if err := PublishDeletedNodePeerUpdate(&node); err != nil {
  182. logger.Log(1, "error publishing peer update ", err.Error())
  183. }
  184. if servercfg.IsDNSMode() {
  185. logic.SetDNS()
  186. }
  187. }
  188. func PushMetricsToExporter(metrics models.Metrics) error {
  189. logger.Log(2, "----> Pushing metrics to exporter")
  190. data, err := json.Marshal(metrics)
  191. if err != nil {
  192. return errors.New("failed to marshal metrics: " + err.Error())
  193. }
  194. if mqclient == nil || !mqclient.IsConnectionOpen() {
  195. return errors.New("cannot publish ... mqclient not connected")
  196. }
  197. if token := mqclient.Publish("metrics_exporter", 0, true, data); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
  198. var err error
  199. if token.Error() == nil {
  200. err = errors.New("connection timeout")
  201. } else {
  202. err = token.Error()
  203. }
  204. return err
  205. }
  206. return nil
  207. }
  208. // sendPeers - retrieve networks, send peer ports to all peers
  209. func sendPeers() {
  210. peer_force_send++
  211. if peer_force_send == 5 {
  212. servercfg.SetHost()
  213. peer_force_send = 0
  214. err := logic.TimerCheckpoint() // run telemetry & log dumps if 24 hours has passed..
  215. if err != nil {
  216. logger.Log(3, "error occurred on timer,", err.Error())
  217. }
  218. }
  219. }
  220. func SendDNSSyncByNetwork(network string) error {
  221. k, err := logic.GetDNS(network)
  222. if err == nil && len(k) > 0 {
  223. err = PushSyncDNS(k)
  224. if err != nil {
  225. slog.Warn("error publishing dns entry data for network ", network, err.Error())
  226. }
  227. }
  228. return err
  229. }
  230. func sendDNSSync() error {
  231. networks, err := logic.GetNetworks()
  232. if err == nil && len(networks) > 0 {
  233. for _, v := range networks {
  234. k, err := logic.GetDNS(v.NetID)
  235. if err == nil && len(k) > 0 {
  236. err = PushSyncDNS(k)
  237. if err != nil {
  238. slog.Warn("error publishing dns entry data for network ", v.NetID, err.Error())
  239. }
  240. }
  241. }
  242. return nil
  243. }
  244. return err
  245. }
  246. func PushSyncDNS(dnsEntries []models.DNSEntry) error {
  247. logger.Log(2, "----> Pushing Sync DNS")
  248. data, err := json.Marshal(dnsEntries)
  249. if err != nil {
  250. return errors.New("failed to marshal DNS entries: " + err.Error())
  251. }
  252. if mqclient == nil || !mqclient.IsConnectionOpen() {
  253. return errors.New("cannot publish ... mqclient not connected")
  254. }
  255. if token := mqclient.Publish(fmt.Sprintf("host/dns/sync/%s", dnsEntries[0].Network), 0, true, data); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
  256. var err error
  257. if token.Error() == nil {
  258. err = errors.New("connection timeout")
  259. } else {
  260. err = token.Error()
  261. }
  262. return err
  263. }
  264. return nil
  265. }