daemon.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package functions
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "time"
  7. mqtt "github.com/eclipse/paho.mqtt.golang"
  8. "github.com/go-ping/ping"
  9. "github.com/gravitl/netmaker/netclient/config"
  10. "github.com/gravitl/netmaker/netclient/ncutils"
  11. "golang.zx2c4.com/wireguard/wgctrl"
  12. )
  13. func Daemon() error {
  14. networks, err := ncutils.GetSystemNetworks()
  15. if err != nil {
  16. return err
  17. }
  18. for _, network := range networks {
  19. go Netclient(network)
  20. }
  21. for {
  22. }
  23. return nil
  24. }
  25. func Netclient(network string) {
  26. var cfg config.ClientConfig
  27. cfg.Network = network
  28. cfg.ReadConfig()
  29. ncutils.Log("daemon started for network:" + network)
  30. //setup MQTT
  31. opts := mqtt.NewClientOptions()
  32. ncutils.Log("setting broker to " + cfg.Server.CoreDNSAddr + ":1883")
  33. opts.AddBroker(cfg.Server.CoreDNSAddr + ":1883")
  34. opts.SetDefaultPublishHandler(All)
  35. opts.SetClientID("netclient-mqttt")
  36. client := mqtt.NewClient(opts)
  37. if token := client.Connect(); token.Wait() && token.Error() != nil {
  38. log.Fatal(token.Error())
  39. }
  40. if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
  41. log.Fatal(token.Error())
  42. }
  43. client.AddRoute("update/"+network+"/"+cfg.Node.MacAddress, NodeUpdate)
  44. client.AddRoute("update/"+network+"/peers", UpdatePeers)
  45. client.AddRoute("update/"+network+"/keys", UpdateKeys)
  46. client.AddRoute("update/"+network+"/keys/"+cfg.Node.MacAddress, UpdateKeys)
  47. defer client.Disconnect(250)
  48. go Checkin(client, network)
  49. //go Metrics(client, network)
  50. //go Connectivity(client, network)
  51. for {
  52. }
  53. }
  54. var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  55. ncutils.Log("Topic: " + string(msg.Topic()))
  56. ncutils.Log("Message: " + string(msg.Payload()))
  57. }
  58. var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  59. ncutils.Log("received message to update node " + string(msg.Payload()))
  60. }
  61. var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  62. ncutils.Log("received message to update peers " + string(msg.Payload()))
  63. }
  64. var UpdateKeys mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  65. ncutils.Log("received message to update keys " + string(msg.Payload()))
  66. }
  67. func Checkin(client mqtt.Client, network string) {
  68. var cfg config.ClientConfig
  69. cfg.Network = network
  70. cfg.ReadConfig()
  71. for {
  72. time.Sleep(time.Duration(cfg.Node.NetworkSettings.DefaultCheckInInterval) * time.Second)
  73. ncutils.Log("Checkin running")
  74. if cfg.Node.Roaming == "yes" && cfg.Node.IsStatic != "yes" {
  75. extIP, err := ncutils.GetPublicIP()
  76. if err != nil {
  77. ncutils.PrintLog("error encountered checking ip addresses: "+err.Error(), 1)
  78. }
  79. if cfg.Node.Endpoint != extIP && extIP != "" {
  80. ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+extIP, 1)
  81. UpdateEndpoint(client, network, extIP)
  82. }
  83. intIP, err := getPrivateAddr()
  84. if err != nil {
  85. ncutils.PrintLog("error encountered checking ip addresses: "+err.Error(), 1)
  86. }
  87. if cfg.Node.LocalAddress != intIP && intIP != "" {
  88. ncutils.PrintLog("local Address has changed from "+cfg.Node.LocalAddress+" to "+intIP, 1)
  89. UpdateLocalAddress(client, network, intIP)
  90. }
  91. } else {
  92. localIP, err := ncutils.GetLocalIP(cfg.Node.LocalRange)
  93. if err != nil {
  94. ncutils.PrintLog("error encountered checking ip addresses: "+err.Error(), 1)
  95. }
  96. if cfg.Node.Endpoint != localIP && localIP != "" {
  97. ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+localIP, 1)
  98. UpdateEndpoint(client, network, localIP)
  99. }
  100. }
  101. Ping(client, network)
  102. }
  103. }
  104. func Ping(client mqtt.Client, network string) {
  105. var cfg config.ClientConfig
  106. cfg.Network = network
  107. cfg.ReadConfig()
  108. if token := client.Publish("ping/"+network+"/"+cfg.Node.ID, 0, false, []byte("ping")); token.Wait() && token.Error() != nil {
  109. ncutils.Log("error publishing ping " + token.Error().Error())
  110. }
  111. }
  112. func Metrics(client mqtt.Client, network string) {
  113. if token := client.Connect(); token.Wait() && token.Error() != nil {
  114. log.Fatal(token.Error())
  115. }
  116. var cfg config.ClientConfig
  117. cfg.Network = network
  118. cfg.ReadConfig()
  119. for {
  120. time.Sleep(time.Second * 60)
  121. ncutils.Log("Metrics running")
  122. wg, err := wgctrl.New()
  123. if err != nil {
  124. ncutils.Log("error getting devices " + err.Error())
  125. break
  126. }
  127. device, err := wg.Device(cfg.Node.Interface)
  128. if err != nil {
  129. ncutils.Log("error readind wg device " + err.Error())
  130. break
  131. }
  132. bytes, err := json.Marshal(device.Peers)
  133. if err != nil {
  134. ncutils.Log("error marshaling peers " + err.Error())
  135. break
  136. }
  137. if token := client.Publish("metrics/"+network+"/"+cfg.Node.ID, 1, false, bytes); token.Wait() && token.Error() != nil {
  138. ncutils.Log("error publishing metrics " + token.Error().Error())
  139. break
  140. }
  141. wg.Close()
  142. }
  143. }
  144. type PingStat struct {
  145. Name string
  146. Reachable bool
  147. }
  148. func Connectivity(client mqtt.Client, network string) {
  149. if token := client.Connect(); token.Wait() && token.Error() != nil {
  150. log.Fatal(token.Error())
  151. }
  152. var cfg config.ClientConfig
  153. cfg.Network = network
  154. cfg.ReadConfig()
  155. for {
  156. time.Sleep(time.Duration(cfg.NetworkSettings.DefaultCheckInInterval) * time.Second)
  157. ncutils.Log("Connectivity running")
  158. var pingStats []PingStat
  159. peers, err := ncutils.GetPeers(cfg.Node.Interface)
  160. if err != nil {
  161. ncutils.Log("error retriving peers " + err.Error())
  162. break
  163. }
  164. for _, peer := range peers {
  165. var pingStat PingStat
  166. pingStat.Name = peer.PublicKey.String()
  167. pingStat.Reachable = true
  168. ip := peer.Endpoint.IP.String()
  169. fmt.Println("----------", peer.Endpoint.IP, ip)
  170. pinger, err := ping.NewPinger(ip)
  171. if err != nil {
  172. ncutils.Log("error creating pinger " + err.Error())
  173. break
  174. }
  175. pinger.Timeout = 2 * time.Second
  176. pinger.Run()
  177. stats := pinger.Statistics()
  178. if stats.PacketLoss == 100 {
  179. pingStat.Reachable = false
  180. }
  181. pingStats = append(pingStats, pingStat)
  182. }
  183. bytes, err := json.Marshal(pingStats)
  184. if err != nil {
  185. ncutils.Log("error marshaling stats" + err.Error())
  186. break
  187. }
  188. if token := client.Publish("connectivity/"+network+"/"+cfg.Node.ID, 1, false, bytes); token.Wait() && token.Error() != nil {
  189. ncutils.Log("error publishing ping stats " + token.Error().Error())
  190. break
  191. }
  192. }
  193. }
  194. func UpdateEndpoint(client mqtt.Client, network, ip string) {
  195. ncutils.Log("Updating endpoint")
  196. }
  197. func UpdateLocalAddress(client mqtt.Client, network, ip string) {
  198. ncutils.Log("Updating local address")
  199. }