mqpublish.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package functions
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/gravitl/netmaker/netclient/auth"
  9. "github.com/gravitl/netmaker/netclient/config"
  10. "github.com/gravitl/netmaker/netclient/ncutils"
  11. )
  12. // Checkin -- go routine that checks for public or local ip changes, publishes changes
  13. // if there are no updates, simply "pings" the server as a checkin
  14. func Checkin(ctx context.Context, wg *sync.WaitGroup, currentComms map[string]bool) {
  15. defer wg.Done()
  16. for {
  17. select {
  18. case <-ctx.Done():
  19. ncutils.Log("checkin routine closed")
  20. return
  21. //delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ??
  22. case <-time.After(time.Second * 60):
  23. // ncutils.Log("Checkin running")
  24. //read latest config
  25. networks, err := ncutils.GetSystemNetworks()
  26. if err != nil {
  27. return
  28. }
  29. for commsNet := range currentComms {
  30. var currCommsCfg config.ClientConfig
  31. currCommsCfg.Network = commsNet
  32. currCommsCfg.ReadConfig()
  33. for _, network := range networks {
  34. var nodeCfg config.ClientConfig
  35. nodeCfg.Network = network
  36. nodeCfg.ReadConfig()
  37. if nodeCfg.Node.CommID != commsNet {
  38. continue // skip if not on current comms network
  39. }
  40. if nodeCfg.Node.IsStatic != "yes" {
  41. extIP, err := ncutils.GetPublicIP()
  42. if err != nil {
  43. ncutils.PrintLog("error encountered checking public ip addresses: "+err.Error(), 1)
  44. }
  45. if nodeCfg.Node.Endpoint != extIP && extIP != "" {
  46. ncutils.PrintLog("endpoint has changed from "+nodeCfg.Node.Endpoint+" to "+extIP, 1)
  47. nodeCfg.Node.Endpoint = extIP
  48. if err := PublishNodeUpdate(&currCommsCfg, &nodeCfg); err != nil {
  49. ncutils.Log("could not publish endpoint change")
  50. }
  51. }
  52. intIP, err := getPrivateAddr()
  53. if err != nil {
  54. ncutils.PrintLog("error encountered checking private ip addresses: "+err.Error(), 1)
  55. }
  56. if nodeCfg.Node.LocalAddress != intIP && intIP != "" {
  57. ncutils.PrintLog("local Address has changed from "+nodeCfg.Node.LocalAddress+" to "+intIP, 1)
  58. nodeCfg.Node.LocalAddress = intIP
  59. if err := PublishNodeUpdate(&currCommsCfg, &nodeCfg); err != nil {
  60. ncutils.Log("could not publish local address change")
  61. }
  62. }
  63. } else if nodeCfg.Node.IsLocal == "yes" && nodeCfg.Node.LocalRange != "" {
  64. localIP, err := ncutils.GetLocalIP(nodeCfg.Node.LocalRange)
  65. if err != nil {
  66. ncutils.PrintLog("error encountered checking local ip addresses: "+err.Error(), 1)
  67. }
  68. if nodeCfg.Node.Endpoint != localIP && localIP != "" {
  69. ncutils.PrintLog("endpoint has changed from "+nodeCfg.Node.Endpoint+" to "+localIP, 1)
  70. nodeCfg.Node.Endpoint = localIP
  71. if err := PublishNodeUpdate(&currCommsCfg, &nodeCfg); err != nil {
  72. ncutils.Log("could not publish localip change")
  73. }
  74. }
  75. }
  76. if err := PingServer(&currCommsCfg); err != nil {
  77. ncutils.PrintLog("could not ping server on comms net, "+currCommsCfg.Network+"\n"+err.Error(), 0)
  78. } else {
  79. Hello(&currCommsCfg, &nodeCfg)
  80. }
  81. }
  82. }
  83. }
  84. }
  85. }
  86. // PublishNodeUpdates -- saves node and pushes changes to broker
  87. func PublishNodeUpdate(commsCfg, nodeCfg *config.ClientConfig) error {
  88. if err := config.Write(nodeCfg, nodeCfg.Network); err != nil {
  89. return err
  90. }
  91. data, err := json.Marshal(nodeCfg.Node)
  92. if err != nil {
  93. return err
  94. }
  95. if err = publish(commsCfg, nodeCfg, fmt.Sprintf("update/%s", nodeCfg.Node.ID), data, 1); err != nil {
  96. return err
  97. }
  98. ncutils.PrintLog("sent a node update to server for node"+nodeCfg.Node.ID+", "+nodeCfg.Node.ID, 1)
  99. return nil
  100. }
  101. // Hello -- ping the broker to let server know node it's alive and well
  102. func Hello(commsCfg, nodeCfg *config.ClientConfig) {
  103. if err := publish(commsCfg, nodeCfg, fmt.Sprintf("ping/%s", nodeCfg.Node.ID), []byte(ncutils.Version), 0); err != nil {
  104. ncutils.Log(fmt.Sprintf("error publishing ping, %v", err))
  105. ncutils.Log("running pull on " + commsCfg.Node.Network + " to reconnect")
  106. _, err := Pull(commsCfg.Node.Network, true)
  107. if err != nil {
  108. ncutils.Log("could not run pull on " + commsCfg.Node.Network + ", error: " + err.Error())
  109. }
  110. }
  111. }
  112. // requires the commscfg in which to send traffic over and nodecfg of node that is publish the message
  113. // node cfg is so that the traffic keys of that node may be fetched for encryption
  114. func publish(commsCfg, nodeCfg *config.ClientConfig, dest string, msg []byte, qos byte) error {
  115. // setup the keys
  116. trafficPrivKey, err := auth.RetrieveTrafficKey(nodeCfg.Node.Network)
  117. if err != nil {
  118. return err
  119. }
  120. serverPubKey, err := ncutils.ConvertBytesToKey(nodeCfg.Node.TrafficKeys.Server)
  121. if err != nil {
  122. return err
  123. }
  124. client := setupMQTT(commsCfg, true)
  125. defer client.Disconnect(250)
  126. encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey)
  127. if err != nil {
  128. return err
  129. }
  130. if token := client.Publish(dest, qos, false, encrypted); token.Wait() && token.Error() != nil {
  131. return token.Error()
  132. }
  133. return nil
  134. }