mq.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package mq
  2. import (
  3. "context"
  4. "log"
  5. "time"
  6. mqtt "github.com/eclipse/paho.mqtt.golang"
  7. "github.com/gravitl/netmaker/logger"
  8. "github.com/gravitl/netmaker/netclient/ncutils"
  9. "github.com/gravitl/netmaker/servercfg"
  10. "github.com/gravitl/netmaker/serverctl"
  11. )
  12. // KEEPALIVE_TIMEOUT - time in seconds for timeout
  13. const KEEPALIVE_TIMEOUT = 60 //timeout in seconds
  14. // MQ_DISCONNECT - disconnects MQ
  15. const MQ_DISCONNECT = 250
  16. // MQ_TIMEOUT - timeout for MQ
  17. const MQ_TIMEOUT = 30
  18. var peer_force_send = 0
  19. // SetupMQTT creates a connection to broker and return client
  20. func SetupMQTT(publish bool) mqtt.Client {
  21. opts := mqtt.NewClientOptions()
  22. opts.AddBroker(servercfg.GetMessageQueueEndpoint())
  23. id := ncutils.MakeRandomString(23)
  24. opts.ClientID = id
  25. tlsConfig, err := serverctl.ReadClientCertFromDB()
  26. if err != nil {
  27. logger.Log(0, "failed to get TLS config for server to broker connection", err.Error())
  28. }
  29. opts.SetTLSConfig(tlsConfig)
  30. opts.SetAutoReconnect(true)
  31. opts.SetConnectRetry(true)
  32. opts.SetConnectRetryInterval(time.Second << 2)
  33. opts.SetKeepAlive(time.Minute)
  34. opts.SetWriteTimeout(time.Minute)
  35. opts.SetOnConnectHandler(func(client mqtt.Client) {
  36. if !publish {
  37. if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
  38. client.Disconnect(240)
  39. logger.Log(0, "ping subscription failed")
  40. }
  41. if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
  42. client.Disconnect(240)
  43. logger.Log(0, "node update subscription failed")
  44. }
  45. if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
  46. client.Disconnect(240)
  47. logger.Log(0, "node client subscription failed")
  48. }
  49. opts.SetOrderMatters(true)
  50. opts.SetResumeSubs(true)
  51. }
  52. })
  53. client := mqtt.NewClient(opts)
  54. tperiod := time.Now().Add(10 * time.Second)
  55. for {
  56. if token := client.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
  57. logger.Log(2, "unable to connect to broker, retrying ...")
  58. if time.Now().After(tperiod) {
  59. if token.Error() == nil {
  60. log.Fatal(0, "could not connect to broker, token timeout, exiting ...")
  61. } else {
  62. log.Fatal(0, "could not connect to broker, exiting ...", token.Error())
  63. }
  64. }
  65. } else {
  66. break
  67. }
  68. time.Sleep(2 * time.Second)
  69. }
  70. if !publish {
  71. logger.Log(0, "successfully connected to mq broker")
  72. }
  73. return client
  74. }
  75. // Keepalive -- periodically pings all nodes to let them know server is still alive and doing well
  76. func Keepalive(ctx context.Context) {
  77. for {
  78. select {
  79. case <-ctx.Done():
  80. return
  81. case <-time.After(time.Second * KEEPALIVE_TIMEOUT):
  82. sendPeers()
  83. }
  84. }
  85. }