daemon.go 9.3 KB


  1. package functions
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "os"
  7. "os/signal"
  8. "strings"
  9. "sync"
  10. "syscall"
  11. "time"
  12. mqtt "github.com/eclipse/paho.mqtt.golang"
  13. "github.com/go-ping/ping"
  14. "github.com/gravitl/netmaker/models"
  15. "github.com/gravitl/netmaker/netclient/auth"
  16. "github.com/gravitl/netmaker/netclient/config"
  17. "github.com/gravitl/netmaker/netclient/daemon"
  18. "github.com/gravitl/netmaker/netclient/ncutils"
  19. "github.com/gravitl/netmaker/netclient/wireguard"
  20. "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
  21. )
  22. var messageCache = new(sync.Map)
  23. var networkcontext = new(sync.Map)
  24. const lastNodeUpdate = "lnu"
  25. const lastPeerUpdate = "lpu"
  26. type cachedMessage struct {
  27. Message string
  28. LastSeen time.Time
  29. }
  30. // Daemon runs netclient daemon from command line
  31. func Daemon() error {
  32. commsNetworks, err := getCommsNetworks()
  33. if err != nil {
  34. return errors.New("no comm networks exist")
  35. }
  36. for net := range commsNetworks {
  37. ncutils.PrintLog("started comms network daemon, "+net, 1)
  38. client := setupMQTT(false, net)
  39. defer client.Disconnect(250)
  40. }
  41. wg := sync.WaitGroup{}
  42. ctx, cancel := context.WithCancel(context.Background())
  43. networks, _ := ncutils.GetSystemNetworks()
  44. for _, network := range networks {
  45. var cfg config.ClientConfig
  46. cfg.Network = network
  47. cfg.ReadConfig()
  48. initialPull(cfg.Network)
  49. }
  50. wg.Add(1)
  51. go Checkin(ctx, wg)
  52. quit := make(chan os.Signal, 1)
  53. signal.Notify(quit, syscall.SIGTERM, os.Interrupt)
  54. <-quit
  55. cancel()
  56. ncutils.Log("shutting down message queue ")
  57. wg.Wait()
  58. ncutils.Log("shutdown complete")
  59. return nil
  60. }
  61. // UpdateKeys -- updates private key and returns new publickey
  62. func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error {
  63. ncutils.Log("received message to update keys")
  64. key, err := wgtypes.GeneratePrivateKey()
  65. if err != nil {
  66. ncutils.Log("error generating privatekey " + err.Error())
  67. return err
  68. }
  69. file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
  70. if err := wireguard.UpdatePrivateKey(file, key.String()); err != nil {
  71. ncutils.Log("error updating wireguard key " + err.Error())
  72. return err
  73. }
  74. cfg.Node.PublicKey = key.PublicKey().String()
  75. if err := config.ModConfig(&cfg.Node); err != nil {
  76. ncutils.Log("error updating local config " + err.Error())
  77. }
  78. PublishNodeUpdate(cfg)
  79. if err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file); err != nil {
  80. ncutils.Log("error applying new config " + err.Error())
  81. return err
  82. }
  83. return nil
  84. }
  85. // PingServer -- checks if server is reachable
  86. func PingServer(cfg *config.ClientConfig) error {
  87. node := getServerAddress(cfg)
  88. pinger, err := ping.NewPinger(node)
  89. if err != nil {
  90. return err
  91. }
  92. pinger.Timeout = 2 * time.Second
  93. pinger.Run()
  94. stats := pinger.Statistics()
  95. if stats.PacketLoss == 100 {
  96. return errors.New("ping error")
  97. }
  98. return nil
  99. }
  100. // == Private ==
  101. // setupMQTT creates a connection to broker and return client
  102. func setupMQTT(publish bool, networkName string) mqtt.Client {
  103. var cfg *config.ClientConfig
  104. cfg.Network = networkName
  105. cfg.ReadConfig()
  106. opts := mqtt.NewClientOptions()
  107. server := getServerAddress(cfg)
  108. opts.AddBroker(server + ":1883") // TODO get the appropriate port of the comms mq server
  109. id := ncutils.MakeRandomString(23)
  110. opts.ClientID = id
  111. opts.SetDefaultPublishHandler(All)
  112. opts.SetAutoReconnect(true)
  113. opts.SetConnectRetry(true)
  114. opts.SetConnectRetryInterval(time.Second << 2)
  115. opts.SetKeepAlive(time.Minute >> 1)
  116. opts.SetWriteTimeout(time.Minute)
  117. opts.SetOnConnectHandler(func(client mqtt.Client) {
  118. if !publish {
  119. networks, err := ncutils.GetSystemNetworks()
  120. if err != nil {
  121. ncutils.Log("error retriving networks " + err.Error())
  122. }
  123. for _, network := range networks {
  124. var currConf config.ClientConfig
  125. currConf.Network = network
  126. currConf.ReadConfig()
  127. SetSubscriptions(client, &currConf)
  128. }
  129. }
  130. })
  131. opts.SetOrderMatters(true)
  132. opts.SetResumeSubs(true)
  133. opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
  134. ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network)
  135. _, err := Pull(cfg.Node.Network, true)
  136. if err != nil {
  137. ncutils.Log("could not run pull, server unreachable: " + err.Error())
  138. ncutils.Log("waiting to retry...")
  139. }
  140. ncutils.Log("connection re-established with mqtt server")
  141. })
  142. client := mqtt.NewClient(opts)
  143. tperiod := time.Now().Add(12 * time.Second)
  144. for {
  145. //if after 12 seconds, try a gRPC pull on the last try
  146. if time.Now().After(tperiod) {
  147. ncutils.Log("running pull for " + cfg.Node.Network)
  148. _, err := Pull(cfg.Node.Network, true)
  149. if err != nil {
  150. ncutils.Log("could not run pull, exiting " + cfg.Node.Network + " setup: " + err.Error())
  151. return client
  152. }
  153. time.Sleep(time.Second)
  154. }
  155. if token := client.Connect(); token.Wait() && token.Error() != nil {
  156. ncutils.Log("unable to connect to broker, retrying ...")
  157. if time.Now().After(tperiod) {
  158. ncutils.Log("could not connect to broker, exiting " + cfg.Node.Network + " setup: " + token.Error().Error())
  159. if strings.Contains(token.Error().Error(), "connectex") || strings.Contains(token.Error().Error(), "i/o timeout") {
  160. ncutils.PrintLog("connection issue detected.. pulling and restarting daemon", 0)
  161. Pull(cfg.Node.Network, true)
  162. daemon.Restart()
  163. }
  164. return client
  165. }
  166. } else {
  167. break
  168. }
  169. time.Sleep(2 * time.Second)
  170. }
  171. return client
  172. }
  173. // SetSubscriptions - sets MQ subscriptions
  174. func SetSubscriptions(client mqtt.Client, cfg *config.ClientConfig) {
  175. if cfg.DebugOn {
  176. if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
  177. ncutils.Log(token.Error().Error())
  178. return
  179. }
  180. ncutils.Log("subscribed to all topics for debugging purposes")
  181. }
  182. if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
  183. ncutils.Log(token.Error().Error())
  184. return
  185. }
  186. if cfg.DebugOn {
  187. ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
  188. }
  189. if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
  190. ncutils.Log(token.Error().Error())
  191. return
  192. }
  193. if cfg.DebugOn {
  194. ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
  195. }
  196. }
  197. // publishes a message to server to update peers on this peer's behalf
  198. func publishSignal(cfg *config.ClientConfig, signal byte) error {
  199. if err := publish(cfg, fmt.Sprintf("signal/%s", cfg.Node.ID), []byte{signal}, 1); err != nil {
  200. return err
  201. }
  202. return nil
  203. }
  204. func initialPull(network string) {
  205. ncutils.Log("pulling latest config for " + network)
  206. var configPath = fmt.Sprintf("%snetconfig-%s", ncutils.GetNetclientPathSpecific(), network)
  207. fileInfo, err := os.Stat(configPath)
  208. if err != nil {
  209. ncutils.Log("could not stat config file: " + configPath)
  210. return
  211. }
  212. // speed up UDP rest
  213. if !fileInfo.ModTime().IsZero() && time.Now().After(fileInfo.ModTime().Add(time.Minute)) {
  214. sleepTime := 2
  215. for {
  216. _, err := Pull(network, true)
  217. if err == nil {
  218. break
  219. }
  220. if sleepTime > 3600 {
  221. sleepTime = 3600
  222. }
  223. ncutils.Log("failed to pull for network " + network)
  224. ncutils.Log(fmt.Sprintf("waiting %d seconds to retry...", sleepTime))
  225. time.Sleep(time.Second * time.Duration(sleepTime))
  226. sleepTime = sleepTime * 2
  227. }
  228. time.Sleep(time.Second << 1)
  229. }
  230. }
  231. func parseNetworkFromTopic(topic string) string {
  232. return strings.Split(topic, "/")[1]
  233. }
  234. func decryptMsg(cfg *config.ClientConfig, msg []byte) ([]byte, error) {
  235. if len(msg) <= 24 { // make sure message is of appropriate length
  236. return nil, fmt.Errorf("recieved invalid message from broker %v", msg)
  237. }
  238. // setup the keys
  239. diskKey, keyErr := auth.RetrieveTrafficKey(cfg.Node.Network)
  240. if keyErr != nil {
  241. return nil, keyErr
  242. }
  243. serverPubKey, err := ncutils.ConvertBytesToKey(cfg.Node.TrafficKeys.Server)
  244. if err != nil {
  245. return nil, err
  246. }
  247. return ncutils.DeChunk(msg, serverPubKey, diskKey)
  248. }
  249. func getServerAddress(cfg *config.ClientConfig) string {
  250. var server models.ServerAddr
  251. for _, server = range cfg.Node.NetworkSettings.DefaultServerAddrs {
  252. if server.Address != "" && server.IsLeader {
  253. break
  254. }
  255. }
  256. return server.Address
  257. }
  258. func getCommsNetworks() (map[string]bool, error) {
  259. var cfg config.ClientConfig
  260. networks, err := ncutils.GetSystemNetworks()
  261. if err != nil {
  262. return nil, err
  263. }
  264. var response = make(map[string]bool, 1)
  265. for _, network := range networks {
  266. cfg.Network = network
  267. cfg.ReadConfig()
  268. response[cfg.Node.CommID] = true
  269. }
  270. return response, nil
  271. }
  272. // == Message Caches ==
  273. func insert(network, which, cache string) {
  274. var newMessage = cachedMessage{
  275. Message: cache,
  276. LastSeen: time.Now(),
  277. }
  278. messageCache.Store(fmt.Sprintf("%s%s", network, which), newMessage)
  279. }
  280. func read(network, which string) string {
  281. val, isok := messageCache.Load(fmt.Sprintf("%s%s", network, which))
  282. if isok {
  283. var readMessage = val.(cachedMessage) // fetch current cached message
  284. if readMessage.LastSeen.IsZero() {
  285. return ""
  286. }
  287. if time.Now().After(readMessage.LastSeen.Add(time.Minute * 10)) { // check if message has been there over a minute
  288. messageCache.Delete(fmt.Sprintf("%s%s", network, which)) // remove old message if expired
  289. return ""
  290. }
  291. return readMessage.Message // return current message if not expired
  292. }
  293. return ""
  294. }
  295. // == End Message Caches ==