daemon.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package functions
  2. import (
  3. "context"
  4. "encoding/json"
  5. "log"
  6. "os"
  7. "os/signal"
  8. "strings"
  9. "syscall"
  10. "time"
  11. mqtt "github.com/eclipse/paho.mqtt.golang"
  12. "github.com/gravitl/netmaker/netclient/config"
  13. "github.com/gravitl/netmaker/netclient/ncutils"
  14. "golang.zx2c4.com/wireguard/wgctrl"
  15. )
  16. //Daemon runs netclient daemon from command line
  17. func Daemon() error {
  18. networks, err := ncutils.GetSystemNetworks()
  19. if err != nil {
  20. return err
  21. }
  22. for _, network := range networks {
  23. go Netclient(network)
  24. }
  25. for {
  26. }
  27. return nil
  28. }
  29. //SetupMQTT creates a connection to broker and return client
  30. func SetupMQTT(cfg config.ClientConfig) mqtt.Client {
  31. opts := mqtt.NewClientOptions()
  32. ncutils.Log("setting broker to " + cfg.Server.CoreDNSAddr + ":1883")
  33. opts.AddBroker(cfg.Server.CoreDNSAddr + ":1883")
  34. opts.SetDefaultPublishHandler(All)
  35. client := mqtt.NewClient(opts)
  36. if token := client.Connect(); token.Wait() && token.Error() != nil {
  37. log.Fatal(token.Error())
  38. }
  39. return client
  40. }
  41. //Netclient sets up Message Queue and subsribes/publishes updates to/from server
  42. func Netclient(network string) {
  43. ctx, cancel := context.WithCancel(context.Background())
  44. var cfg config.ClientConfig
  45. cfg.Network = network
  46. cfg.ReadConfig()
  47. //fix NodeID to remove ### so NodeID can be used as message topic
  48. //remove with GRA-73
  49. cfg.Node.ID = strings.ReplaceAll(cfg.Node.ID, "###", "-")
  50. ncutils.Log("daemon started for network:" + network)
  51. client := SetupMQTT(cfg)
  52. if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
  53. log.Fatal(token.Error())
  54. }
  55. client.AddRoute("update/"+cfg.Node.ID, NodeUpdate)
  56. client.AddRoute("update/peers/"+cfg.Node.ID, UpdatePeers)
  57. client.AddRoute("update/keys/"+cfg.Node.ID, UpdateKeys)
  58. defer client.Disconnect(250)
  59. go Checkin(ctx, cfg, network)
  60. go Metrics(ctx, cfg, network)
  61. quit := make(chan os.Signal, 1)
  62. signal.Notify(quit, syscall.SIGTERM, os.Interrupt)
  63. <-quit
  64. cancel()
  65. }
  66. //All -- mqtt message hander for all ('#') topics
  67. var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  68. ncutils.Log("Topic: " + string(msg.Topic()))
  69. ncutils.Log("Message: " + string(msg.Payload()))
  70. }
  71. //NodeUpdate -- mqtt message handler for /update/<NodeID> topic
  72. var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  73. ncutils.Log("received message to update node " + string(msg.Payload()))
  74. }
  75. //NodeUpdate -- mqtt message handler for /update/peers/<NodeID> topic
  76. var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  77. ncutils.Log("received message to update peers " + string(msg.Payload()))
  78. }
  79. //NodeUpdate -- mqtt message handler for /update/keys/<NodeID> topic
  80. var UpdateKeys mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  81. ncutils.Log("received message to update keys " + string(msg.Payload()))
  82. }
  83. //Checkin -- go routine that checks for public or local ip changes, publishes changes
  84. // if there are no updates, simply "pings" the server as a checkin
  85. func Checkin(ctx context.Context, cfg config.ClientConfig, network string) {
  86. select {
  87. case <-ctx.Done():
  88. ncutils.Log("Checkin cancelled")
  89. return
  90. //delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ??
  91. case <-time.After(time.Second * 10):
  92. ncutils.Log("Checkin running")
  93. if cfg.Node.Roaming == "yes" && cfg.Node.IsStatic != "yes" {
  94. extIP, err := ncutils.GetPublicIP()
  95. if err != nil {
  96. ncutils.PrintLog("error encountered checking ip addresses: "+err.Error(), 1)
  97. }
  98. if cfg.Node.Endpoint != extIP && extIP != "" {
  99. ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+extIP, 1)
  100. UpdateEndpoint(cfg, network, extIP)
  101. }
  102. intIP, err := getPrivateAddr()
  103. if err != nil {
  104. ncutils.PrintLog("error encountered checking ip addresses: "+err.Error(), 1)
  105. }
  106. if cfg.Node.LocalAddress != intIP && intIP != "" {
  107. ncutils.PrintLog("local Address has changed from "+cfg.Node.LocalAddress+" to "+intIP, 1)
  108. UpdateLocalAddress(cfg, network, intIP)
  109. }
  110. } else {
  111. localIP, err := ncutils.GetLocalIP(cfg.Node.LocalRange)
  112. if err != nil {
  113. ncutils.PrintLog("error encountered checking ip addresses: "+err.Error(), 1)
  114. }
  115. if cfg.Node.Endpoint != localIP && localIP != "" {
  116. ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+localIP, 1)
  117. UpdateEndpoint(cfg, network, localIP)
  118. }
  119. }
  120. Hello(cfg, network)
  121. ncutils.Log("Checkin complete")
  122. }
  123. }
  124. //UpdateEndpoint -- publishes an endpoint update to broker
  125. func UpdateEndpoint(cfg config.ClientConfig, network, ip string) {
  126. ncutils.Log("Updating endpoint")
  127. client := SetupMQTT(cfg)
  128. if token := client.Publish("update/ip/"+cfg.Node.ID, 0, false, ip); token.Wait() && token.Error() != nil {
  129. ncutils.Log("error publishing endpoint update " + token.Error().Error())
  130. }
  131. client.Disconnect(250)
  132. }
  133. //UpdateLocalAddress -- publishes a local address update to broker
  134. func UpdateLocalAddress(cfg config.ClientConfig, network, ip string) {
  135. ncutils.Log("Updating local address")
  136. client := SetupMQTT(cfg)
  137. if token := client.Publish("update/localaddress/"+cfg.Node.ID, 0, false, ip); token.Wait() && token.Error() != nil {
  138. ncutils.Log("error publishing local address update " + token.Error().Error())
  139. }
  140. client.Disconnect(250)
  141. }
  142. //Hello -- ping the broker to let server know node is alive and doing fine
  143. func Hello(cfg config.ClientConfig, network string) {
  144. client := SetupMQTT(cfg)
  145. if token := client.Publish("ping/"+network+"/"+cfg.Node.ID, 0, false, "hello world!"); token.Wait() && token.Error() != nil {
  146. ncutils.Log("error publishing ping " + token.Error().Error())
  147. }
  148. client.Disconnect(250)
  149. }
  150. //Metics -- go routine that collects wireguard metrics and publishes to broker
  151. func Metrics(ctx context.Context, cfg config.ClientConfig, network string) {
  152. select {
  153. case <-ctx.Done():
  154. ncutils.Log("Metrics collection cancelled")
  155. return
  156. //delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ??
  157. case <-time.After(time.Second * 60):
  158. ncutils.Log("Metrics collection running")
  159. ncutils.Log("Metrics running")
  160. wg, err := wgctrl.New()
  161. if err != nil {
  162. ncutils.Log("error getting devices " + err.Error())
  163. break
  164. }
  165. device, err := wg.Device(cfg.Node.Interface)
  166. if err != nil {
  167. ncutils.Log("error readind wg device " + err.Error())
  168. break
  169. }
  170. bytes, err := json.Marshal(device.Peers)
  171. if err != nil {
  172. ncutils.Log("error marshaling peers " + err.Error())
  173. break
  174. }
  175. client := SetupMQTT(cfg)
  176. if token := client.Publish("metrics/"+cfg.Node.ID, 1, false, bytes); token.Wait() && token.Error() != nil {
  177. ncutils.Log("error publishing metrics " + token.Error().Error())
  178. }
  179. wg.Close()
  180. client.Disconnect(250)
  181. ncutils.Log("metrics collection complete")
  182. }
  183. }