publishers.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. package mq
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "runtime"
  7. "sync"
  8. "time"
  9. "github.com/google/uuid"
  10. "github.com/gravitl/netmaker/logger"
  11. "github.com/gravitl/netmaker/logic"
  12. "github.com/gravitl/netmaker/models"
  13. "github.com/gravitl/netmaker/servercfg"
  14. "golang.org/x/exp/slog"
  15. )
  16. var batchSize = servercfg.GetPeerUpdateBatchSize()
  17. var batchUpdate = servercfg.GetBatchPeerUpdate()
  18. //var peerUpdateTS = time.Now().Unix()
  19. // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
  20. func PublishPeerUpdate(replacePeers bool) error {
  21. slog.Error("entering PublishPeerUpdate", "Debug")
  22. t1 := time.Now().Unix()
  23. //if time.Now().Unix()-peerUpdateTS < 60 {
  24. // return nil
  25. //}
  26. //peerUpdateTS = time.Now().Unix()
  27. pc, file, no, ok := runtime.Caller(1)
  28. if ok {
  29. slog.Error("called from ", file, no)
  30. }
  31. details := runtime.FuncForPC(pc)
  32. if ok && details != nil {
  33. slog.Error("called from ", details.Name())
  34. }
  35. if !servercfg.IsMessageQueueBackend() {
  36. return nil
  37. }
  38. if servercfg.GetManageDNS() {
  39. sendDNSSync()
  40. }
  41. hosts, err := logic.GetAllHosts()
  42. if err != nil {
  43. logger.Log(1, "err getting all hosts", err.Error())
  44. return err
  45. }
  46. allNodes, err := logic.GetAllNodes()
  47. if err != nil {
  48. return err
  49. }
  50. //if batch peer update disabled
  51. if !batchUpdate {
  52. for _, host := range hosts {
  53. host := host
  54. go func(host models.Host) {
  55. if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil {
  56. id := host.Name
  57. if host.ID != uuid.Nil {
  58. id = host.ID.String()
  59. }
  60. slog.Error("failed to publish peer update to host", id, ": ", err)
  61. }
  62. }(host)
  63. }
  64. return nil
  65. }
  66. //if batch peer update enabled
  67. batchHost := BatchItems(hosts, batchSize)
  68. var wg sync.WaitGroup
  69. for _, v := range batchHost {
  70. hostLen := len(v)
  71. wg.Add(hostLen)
  72. for i := 0; i < hostLen; i++ {
  73. host := hosts[i]
  74. go func(host models.Host) {
  75. if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
  76. id := host.Name
  77. if host.ID != uuid.Nil {
  78. id = host.ID.String()
  79. }
  80. slog.Error("failed to publish peer update to host", id, ": ", err)
  81. }
  82. }(host)
  83. }
  84. wg.Wait()
  85. }
  86. slog.Error("leaving PublishPeerUpdate, time cost: ", "Debug", time.Now().Unix()-t1)
  87. return nil
  88. }
  89. // PublishDeletedNodePeerUpdate --- determines and publishes a peer update
  90. // to all the hosts with a deleted node to account for
  91. func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
  92. if !servercfg.IsMessageQueueBackend() {
  93. return nil
  94. }
  95. hosts, err := logic.GetAllHosts()
  96. if err != nil {
  97. logger.Log(1, "err getting all hosts", err.Error())
  98. return err
  99. }
  100. allNodes, err := logic.GetAllNodes()
  101. if err != nil {
  102. return err
  103. }
  104. for _, host := range hosts {
  105. host := host
  106. if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, nil); err != nil {
  107. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  108. }
  109. }
  110. return err
  111. }
  112. // PublishDeletedClientPeerUpdate --- determines and publishes a peer update
  113. // to all the hosts with a deleted ext client to account for
  114. func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
  115. if !servercfg.IsMessageQueueBackend() {
  116. return nil
  117. }
  118. hosts, err := logic.GetAllHosts()
  119. if err != nil {
  120. logger.Log(1, "err getting all hosts", err.Error())
  121. return err
  122. }
  123. nodes, err := logic.GetAllNodes()
  124. if err != nil {
  125. return err
  126. }
  127. for _, host := range hosts {
  128. host := host
  129. if host.OS != models.OS_Types.IoT {
  130. if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, nil); err != nil {
  131. logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
  132. }
  133. }
  134. }
  135. return err
  136. }
  137. // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
  138. func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool, wg *sync.WaitGroup) error {
  139. if wg != nil {
  140. defer wg.Done()
  141. }
  142. peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
  143. if err != nil {
  144. return err
  145. }
  146. peerUpdate.ReplacePeers = replacePeers
  147. data, err := json.Marshal(&peerUpdate)
  148. if err != nil {
  149. return err
  150. }
  151. return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
  152. }
  153. // NodeUpdate -- publishes a node update
  154. func NodeUpdate(node *models.Node) error {
  155. host, err := logic.GetHost(node.HostID.String())
  156. if err != nil {
  157. return nil
  158. }
  159. if !servercfg.IsMessageQueueBackend() {
  160. return nil
  161. }
  162. logger.Log(3, "publishing node update to "+node.ID.String())
  163. //if len(node.NetworkSettings.AccessKeys) > 0 {
  164. //node.NetworkSettings.AccessKeys = []models.AccessKey{} // not to be sent (don't need to spread access keys around the network; we need to know how to reach other nodes, not become them)
  165. //}
  166. data, err := json.Marshal(node)
  167. if err != nil {
  168. logger.Log(2, "error marshalling node update ", err.Error())
  169. return err
  170. }
  171. if err = publish(host, fmt.Sprintf("node/update/%s/%s", node.Network, node.ID), data); err != nil {
  172. logger.Log(2, "error publishing node update to peer ", node.ID.String(), err.Error())
  173. return err
  174. }
  175. return nil
  176. }
  177. // HostUpdate -- publishes a host update to clients
  178. func HostUpdate(hostUpdate *models.HostUpdate) error {
  179. if !servercfg.IsMessageQueueBackend() {
  180. return nil
  181. }
  182. logger.Log(3, "publishing host update to "+hostUpdate.Host.ID.String())
  183. data, err := json.Marshal(hostUpdate)
  184. if err != nil {
  185. logger.Log(2, "error marshalling node update ", err.Error())
  186. return err
  187. }
  188. if err = publish(&hostUpdate.Host, fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), data); err != nil {
  189. logger.Log(2, "error publishing host update to", hostUpdate.Host.ID.String(), err.Error())
  190. return err
  191. }
  192. return nil
  193. }
  194. // ServerStartNotify - notifies all non server nodes to pull changes after a restart
  195. func ServerStartNotify() error {
  196. nodes, err := logic.GetAllNodes()
  197. if err != nil {
  198. return err
  199. }
  200. for i := range nodes {
  201. nodes[i].Action = models.NODE_FORCE_UPDATE
  202. if err = NodeUpdate(&nodes[i]); err != nil {
  203. logger.Log(1, "error when notifying node", nodes[i].ID.String(), "of a server startup")
  204. }
  205. }
  206. return nil
  207. }
  208. // PublishMqUpdatesForDeletedNode - published all the required updates for deleted node
  209. func PublishMqUpdatesForDeletedNode(node models.Node, sendNodeUpdate bool, gwClients []models.ExtClient) {
  210. // notify of peer change
  211. node.PendingDelete = true
  212. node.Action = models.NODE_DELETE
  213. if sendNodeUpdate {
  214. if err := NodeUpdate(&node); err != nil {
  215. slog.Error("error publishing node update to node", "node", node.ID, "error", err)
  216. }
  217. }
  218. if err := PublishDeletedNodePeerUpdate(&node); err != nil {
  219. logger.Log(1, "error publishing peer update ", err.Error())
  220. }
  221. if servercfg.IsDNSMode() {
  222. logic.SetDNS()
  223. }
  224. }
  225. func PushMetricsToExporter(metrics models.Metrics) error {
  226. logger.Log(2, "----> Pushing metrics to exporter")
  227. data, err := json.Marshal(metrics)
  228. if err != nil {
  229. return errors.New("failed to marshal metrics: " + err.Error())
  230. }
  231. if mqclient == nil || !mqclient.IsConnectionOpen() {
  232. return errors.New("cannot publish ... mqclient not connected")
  233. }
  234. if token := mqclient.Publish("metrics_exporter", 0, true, data); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
  235. var err error
  236. if token.Error() == nil {
  237. err = errors.New("connection timeout")
  238. } else {
  239. err = token.Error()
  240. }
  241. return err
  242. }
  243. return nil
  244. }
  245. // sendPeers - retrieve networks, send peer ports to all peers
  246. func sendPeers() {
  247. peer_force_send++
  248. if peer_force_send == 5 {
  249. servercfg.SetHost()
  250. peer_force_send = 0
  251. err := logic.TimerCheckpoint() // run telemetry & log dumps if 24 hours has passed..
  252. if err != nil {
  253. logger.Log(3, "error occurred on timer,", err.Error())
  254. }
  255. }
  256. }
  257. func SendDNSSyncByNetwork(network string) error {
  258. k, err := logic.GetDNS(network)
  259. if err == nil && len(k) > 0 {
  260. err = PushSyncDNS(k)
  261. if err != nil {
  262. slog.Warn("error publishing dns entry data for network ", network, err.Error())
  263. }
  264. }
  265. return err
  266. }
  267. func sendDNSSync() error {
  268. networks, err := logic.GetNetworks()
  269. if err == nil && len(networks) > 0 {
  270. for _, v := range networks {
  271. k, err := logic.GetDNS(v.NetID)
  272. if err == nil && len(k) > 0 {
  273. err = PushSyncDNS(k)
  274. if err != nil {
  275. slog.Warn("error publishing dns entry data for network ", v.NetID, err.Error())
  276. }
  277. }
  278. }
  279. return nil
  280. }
  281. return err
  282. }
  283. func PushSyncDNS(dnsEntries []models.DNSEntry) error {
  284. logger.Log(2, "----> Pushing Sync DNS")
  285. data, err := json.Marshal(dnsEntries)
  286. if err != nil {
  287. return errors.New("failed to marshal DNS entries: " + err.Error())
  288. }
  289. if mqclient == nil || !mqclient.IsConnectionOpen() {
  290. return errors.New("cannot publish ... mqclient not connected")
  291. }
  292. if token := mqclient.Publish(fmt.Sprintf("host/dns/sync/%s", dnsEntries[0].Network), 0, true, data); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
  293. var err error
  294. if token.Error() == nil {
  295. err = errors.New("connection timeout")
  296. } else {
  297. err = token.Error()
  298. }
  299. return err
  300. }
  301. return nil
  302. }