mqpublish.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package functions
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/gravitl/netmaker/netclient/auth"
  9. "github.com/gravitl/netmaker/netclient/config"
  10. "github.com/gravitl/netmaker/netclient/ncutils"
  11. )
  12. // Checkin -- go routine that checks for public or local ip changes, publishes changes
  13. // if there are no updates, simply "pings" the server as a checkin
  14. func Checkin(ctx context.Context, wg sync.WaitGroup) {
  15. defer wg.Done()
  16. for {
  17. select {
  18. case <-ctx.Done():
  19. ncutils.Log("Checkin cancelled")
  20. return
  21. //delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ??
  22. case <-time.After(time.Second * 60):
  23. // ncutils.Log("Checkin running")
  24. //read latest config
  25. networks, err := ncutils.GetSystemNetworks()
  26. if err != nil {
  27. return
  28. }
  29. for _, network := range networks {
  30. if network == ncutils.COMMS_NETWORK_NAME {
  31. continue
  32. }
  33. var cfg *config.ClientConfig
  34. cfg.Network = network
  35. cfg.ReadConfig()
  36. if cfg.Node.IsStatic != "yes" {
  37. extIP, err := ncutils.GetPublicIP()
  38. if err != nil {
  39. ncutils.PrintLog("error encountered checking public ip addresses: "+err.Error(), 1)
  40. }
  41. if cfg.Node.Endpoint != extIP && extIP != "" {
  42. ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+extIP, 1)
  43. cfg.Node.Endpoint = extIP
  44. if err := PublishNodeUpdate(cfg); err != nil {
  45. ncutils.Log("could not publish endpoint change")
  46. }
  47. }
  48. intIP, err := getPrivateAddr()
  49. if err != nil {
  50. ncutils.PrintLog("error encountered checking private ip addresses: "+err.Error(), 1)
  51. }
  52. if cfg.Node.LocalAddress != intIP && intIP != "" {
  53. ncutils.PrintLog("local Address has changed from "+cfg.Node.LocalAddress+" to "+intIP, 1)
  54. cfg.Node.LocalAddress = intIP
  55. if err := PublishNodeUpdate(cfg); err != nil {
  56. ncutils.Log("could not publish local address change")
  57. }
  58. }
  59. } else if cfg.Node.IsLocal == "yes" && cfg.Node.LocalRange != "" {
  60. localIP, err := ncutils.GetLocalIP(cfg.Node.LocalRange)
  61. if err != nil {
  62. ncutils.PrintLog("error encountered checking local ip addresses: "+err.Error(), 1)
  63. }
  64. if cfg.Node.Endpoint != localIP && localIP != "" {
  65. ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+localIP, 1)
  66. cfg.Node.Endpoint = localIP
  67. if err := PublishNodeUpdate(cfg); err != nil {
  68. ncutils.Log("could not publish localip change")
  69. }
  70. }
  71. }
  72. if err := PingServer(cfg); err != nil {
  73. ncutils.PrintLog("could not ping server "+err.Error(), 0)
  74. }
  75. Hello(cfg, network)
  76. // ncutils.Log("Checkin complete")
  77. }
  78. }
  79. }
  80. }
  81. // PublishNodeUpdates -- saves node and pushes changes to broker
  82. func PublishNodeUpdate(cfg *config.ClientConfig) error {
  83. if err := config.Write(cfg, cfg.Network); err != nil {
  84. return err
  85. }
  86. data, err := json.Marshal(cfg.Node)
  87. if err != nil {
  88. return err
  89. }
  90. if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data, 1); err != nil {
  91. return err
  92. }
  93. return nil
  94. }
  95. // Hello -- ping the broker to let server know node is alive and doing fine
  96. func Hello(cfg *config.ClientConfig, network string) {
  97. if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte(ncutils.Version), 0); err != nil {
  98. ncutils.Log(fmt.Sprintf("error publishing ping, %v", err))
  99. ncutils.Log("running pull on " + cfg.Node.Network + " to reconnect")
  100. _, err := Pull(cfg.Node.Network, true)
  101. if err != nil {
  102. ncutils.Log("could not run pull on " + cfg.Node.Network + ", error: " + err.Error())
  103. }
  104. }
  105. }
  106. func publish(cfg *config.ClientConfig, dest string, msg []byte, qos byte) error {
  107. // setup the keys
  108. trafficPrivKey, err := auth.RetrieveTrafficKey(cfg.Node.Network)
  109. if err != nil {
  110. return err
  111. }
  112. serverPubKey, err := ncutils.ConvertBytesToKey(cfg.Node.TrafficKeys.Server)
  113. if err != nil {
  114. return err
  115. }
  116. client := setupMQTT(true)
  117. defer client.Disconnect(250)
  118. encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey)
  119. if err != nil {
  120. return err
  121. }
  122. if token := client.Publish(dest, qos, false, encrypted); token.Wait() && token.Error() != nil {
  123. return token.Error()
  124. }
  125. return nil
  126. }