mqhandlers.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. package functions
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "os"
  8. "runtime"
  9. "strings"
  10. "time"
  11. mqtt "github.com/eclipse/paho.mqtt.golang"
  12. "github.com/gravitl/netmaker/logger"
  13. "github.com/gravitl/netmaker/models"
  14. "github.com/gravitl/netmaker/netclient/config"
  15. "github.com/gravitl/netmaker/netclient/ncutils"
  16. "github.com/gravitl/netmaker/netclient/wireguard"
  17. "github.com/gravitl/netmaker/nm-proxy/manager"
  18. "github.com/guumaster/hostctl/pkg/file"
  19. "github.com/guumaster/hostctl/pkg/parser"
  20. "github.com/guumaster/hostctl/pkg/types"
  21. "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
  22. )
  23. // All -- mqtt message hander for all ('#') topics
  24. var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  25. logger.Log(0, "default message handler -- received message but not handling")
  26. logger.Log(0, "topic: "+string(msg.Topic()))
  27. //logger.Log(0, "Message: " + string(msg.Payload()))
  28. }
  29. func ProxyUpdate(client mqtt.Client, msg mqtt.Message) {
  30. var nodeCfg config.ClientConfig
  31. var proxyUpdate manager.ManagerAction
  32. var network = parseNetworkFromTopic(msg.Topic())
  33. nodeCfg.Network = network
  34. nodeCfg.ReadConfig()
  35. logger.Log(0, "---------> Recieved a proxy update")
  36. data, dataErr := decryptMsg(&nodeCfg, msg.Payload())
  37. if dataErr != nil {
  38. return
  39. }
  40. err := json.Unmarshal([]byte(data), &proxyUpdate)
  41. if err != nil {
  42. logger.Log(0, "error unmarshalling proxy update data"+err.Error())
  43. return
  44. }
  45. ProxyMgmChan <- &proxyUpdate
  46. }
  47. // NodeUpdate -- mqtt message handler for /update/<NodeID> topic
  48. func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
  49. var newNode models.Node
  50. var nodeCfg config.ClientConfig
  51. var network = parseNetworkFromTopic(msg.Topic())
  52. nodeCfg.Network = network
  53. nodeCfg.ReadConfig()
  54. data, dataErr := decryptMsg(&nodeCfg, msg.Payload())
  55. if dataErr != nil {
  56. return
  57. }
  58. err := json.Unmarshal([]byte(data), &newNode)
  59. if err != nil {
  60. logger.Log(0, "error unmarshalling node update data"+err.Error())
  61. return
  62. }
  63. if newNode.Proxy {
  64. if newNode.Proxy != nodeCfg.Node.Proxy {
  65. if err := config.Write(&nodeCfg, nodeCfg.Network); err != nil {
  66. logger.Log(0, nodeCfg.Node.Network, "error updating node configuration: ", err.Error())
  67. }
  68. }
  69. logger.Log(0, "Node is attached with proxy,ignore this node update...")
  70. return
  71. }
  72. // see if cache hit, if so skip
  73. var currentMessage = read(newNode.Network, lastNodeUpdate)
  74. if currentMessage == string(data) {
  75. return
  76. }
  77. insert(newNode.Network, lastNodeUpdate, string(data)) // store new message in cache
  78. logger.Log(0, "network:", newNode.Network, "received message to update node "+newNode.Name)
  79. // ensure that OS never changes
  80. newNode.OS = runtime.GOOS
  81. // check if interface needs to delta
  82. ifaceDelta := ncutils.IfaceDelta(&nodeCfg.Node, &newNode)
  83. shouldDNSChange := nodeCfg.Node.DNSOn != newNode.DNSOn
  84. hubChange := nodeCfg.Node.IsHub != newNode.IsHub
  85. keepaliveChange := nodeCfg.Node.PersistentKeepalive != newNode.PersistentKeepalive
  86. nodeCfg.Node = newNode
  87. switch newNode.Action {
  88. case models.NODE_DELETE:
  89. logger.Log(0, "network:", nodeCfg.Node.Network, " received delete request for %s", nodeCfg.Node.Name)
  90. unsubscribeNode(client, &nodeCfg)
  91. if err = LeaveNetwork(nodeCfg.Node.Network); err != nil {
  92. if !strings.Contains("rpc error", err.Error()) {
  93. logger.Log(0, "failed to leave, please check that local files for network", nodeCfg.Node.Network, "were removed")
  94. return
  95. }
  96. }
  97. logger.Log(0, nodeCfg.Node.Name, "was removed from network", nodeCfg.Node.Network)
  98. return
  99. case models.NODE_UPDATE_KEY:
  100. // == get the current key for node ==
  101. oldPrivateKey, retErr := wireguard.RetrievePrivKey(nodeCfg.Network)
  102. if retErr != nil {
  103. break
  104. }
  105. if err := UpdateKeys(&nodeCfg, client); err != nil {
  106. logger.Log(0, "err updating wireguard keys, reusing last key\n", err.Error())
  107. if key, parseErr := wgtypes.ParseKey(oldPrivateKey); parseErr == nil {
  108. wireguard.StorePrivKey(key.String(), nodeCfg.Network)
  109. nodeCfg.Node.PublicKey = key.PublicKey().String()
  110. }
  111. }
  112. ifaceDelta = true
  113. case models.NODE_FORCE_UPDATE:
  114. ifaceDelta = true
  115. case models.NODE_NOOP:
  116. default:
  117. }
  118. // Save new config
  119. nodeCfg.Node.Action = models.NODE_NOOP
  120. if err := config.Write(&nodeCfg, nodeCfg.Network); err != nil {
  121. logger.Log(0, nodeCfg.Node.Network, "error updating node configuration: ", err.Error())
  122. }
  123. nameserver := nodeCfg.Server.CoreDNSAddr
  124. privateKey, err := wireguard.RetrievePrivKey(newNode.Network)
  125. if err != nil {
  126. logger.Log(0, "error reading PrivateKey "+err.Error())
  127. return
  128. }
  129. file := ncutils.GetNetclientPathSpecific() + nodeCfg.Node.Interface + ".conf"
  130. if newNode.ListenPort != nodeCfg.Node.LocalListenPort {
  131. if err := wireguard.RemoveConf(newNode.Interface, false); err != nil {
  132. logger.Log(0, "error remove interface", newNode.Interface, err.Error())
  133. }
  134. err = ncutils.ModPort(&newNode)
  135. if err != nil {
  136. logger.Log(0, "network:", nodeCfg.Node.Network, "error modifying node port on", newNode.Name, "-", err.Error())
  137. return
  138. }
  139. ifaceDelta = true
  140. informPortChange(&newNode)
  141. }
  142. if err := wireguard.UpdateWgInterface(file, privateKey, nameserver, newNode); err != nil {
  143. logger.Log(0, "error updating wireguard config "+err.Error())
  144. return
  145. }
  146. if keepaliveChange {
  147. wireguard.UpdateKeepAlive(file, newNode.PersistentKeepalive)
  148. }
  149. logger.Log(0, "applying WG conf to "+file)
  150. err = wireguard.ApplyConf(&nodeCfg.Node, nodeCfg.Node.Interface, file)
  151. if err != nil {
  152. logger.Log(0, "error restarting wg after node update -", err.Error())
  153. return
  154. }
  155. time.Sleep(time.Second)
  156. // if newNode.DNSOn == "yes" {
  157. // for _, server := range newNode.NetworkSettings.DefaultServerAddrs {
  158. // if server.IsLeader {
  159. // go local.SetDNSWithRetry(newNode, server.Address)
  160. // break
  161. // }
  162. // }
  163. // }
  164. if ifaceDelta { // if a change caused an ifacedelta we need to notify the server to update the peers
  165. doneErr := publishSignal(&nodeCfg, ncutils.DONE)
  166. if doneErr != nil {
  167. logger.Log(0, "network:", nodeCfg.Node.Network, "could not notify server to update peers after interface change")
  168. } else {
  169. logger.Log(0, "network:", nodeCfg.Node.Network, "signalled finished interface update to server")
  170. }
  171. } else if hubChange {
  172. doneErr := publishSignal(&nodeCfg, ncutils.DONE)
  173. if doneErr != nil {
  174. logger.Log(0, "network:", nodeCfg.Node.Network, "could not notify server to update peers after hub change")
  175. } else {
  176. logger.Log(0, "network:", nodeCfg.Node.Network, "signalled finished hub update to server")
  177. }
  178. }
  179. //deal with DNS
  180. if newNode.DNSOn != "yes" && shouldDNSChange && nodeCfg.Node.Interface != "" {
  181. logger.Log(0, "network:", nodeCfg.Node.Network, "settng DNS off")
  182. if err := removeHostDNS(nodeCfg.Node.Interface, ncutils.IsWindows()); err != nil {
  183. logger.Log(0, "network:", nodeCfg.Node.Network, "error removing netmaker profile from /etc/hosts "+err.Error())
  184. }
  185. // _, err := ncutils.RunCmd("/usr/bin/resolvectl revert "+nodeCfg.Node.Interface, true)
  186. // if err != nil {
  187. // logger.Log(0, "error applying dns" + err.Error())
  188. // }
  189. }
  190. _ = UpdateLocalListenPort(&nodeCfg)
  191. }
  192. // UpdatePeers -- mqtt message handler for peers/<Network>/<NodeID> topic
  193. func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
  194. var peerUpdate models.PeerUpdate
  195. var network = parseNetworkFromTopic(msg.Topic())
  196. var cfg = config.ClientConfig{}
  197. cfg.Network = network
  198. cfg.ReadConfig()
  199. data, dataErr := decryptMsg(&cfg, msg.Payload())
  200. if dataErr != nil {
  201. return
  202. }
  203. err := json.Unmarshal([]byte(data), &peerUpdate)
  204. if err != nil {
  205. logger.Log(0, "error unmarshalling peer data")
  206. return
  207. }
  208. // see if cached hit, if so skip
  209. var currentMessage = read(peerUpdate.Network, lastPeerUpdate)
  210. if currentMessage == string(data) {
  211. return
  212. }
  213. insert(peerUpdate.Network, lastPeerUpdate, string(data))
  214. // check version
  215. if peerUpdate.ServerVersion != ncutils.Version {
  216. logger.Log(0, "server/client version mismatch server: ", peerUpdate.ServerVersion, " client: ", ncutils.Version)
  217. }
  218. if peerUpdate.ServerVersion != cfg.Server.Version {
  219. logger.Log(1, "updating server version")
  220. cfg.Server.Version = peerUpdate.ServerVersion
  221. config.Write(&cfg, cfg.Network)
  222. }
  223. if cfg.Node.Proxy {
  224. ProxyMgmChan <- &peerUpdate.ProxyUpdate
  225. return
  226. }
  227. file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
  228. internetGateway, err := wireguard.UpdateWgPeers(file, peerUpdate.Peers)
  229. if err != nil {
  230. logger.Log(0, "error updating wireguard peers"+err.Error())
  231. return
  232. }
  233. //check if internet gateway has changed
  234. oldGateway, err := net.ResolveUDPAddr("udp", cfg.Node.InternetGateway)
  235. // note: may want to remove second part (oldGateway == &net.UDPAddr{})
  236. // since it's a pointer, will never be true
  237. if err != nil || (oldGateway == &net.UDPAddr{}) {
  238. oldGateway = nil
  239. }
  240. if (internetGateway == nil && oldGateway != nil) || (internetGateway != nil && internetGateway.String() != oldGateway.String()) {
  241. cfg.Node.InternetGateway = internetGateway.String()
  242. if err := config.ModNodeConfig(&cfg.Node); err != nil {
  243. logger.Log(0, "failed to save internet gateway", err.Error())
  244. }
  245. if err := wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file); err != nil {
  246. logger.Log(0, "error applying internet gateway", err.Error())
  247. }
  248. UpdateLocalListenPort(&cfg)
  249. return
  250. }
  251. // queryAddr := cfg.Node.PrimaryAddress()
  252. //err = wireguard.SyncWGQuickConf(cfg.Node.Interface, file)
  253. // var iface = cfg.Node.Interface
  254. // if ncutils.IsMac() {
  255. // iface, err = local.GetMacIface(queryAddr)
  256. // if err != nil {
  257. // logger.Log(0, "error retrieving mac iface: "+err.Error())
  258. // return
  259. // }
  260. // }
  261. // err = wireguard.SetPeers(iface, &cfg.Node, peerUpdate.Peers)
  262. // if err != nil {
  263. // logger.Log(0, "error syncing wg after peer update: "+err.Error())
  264. // return
  265. // }
  266. logger.Log(0, "network:", cfg.Node.Network, "received peer update for node "+cfg.Node.Name+" "+cfg.Node.Network)
  267. if cfg.Node.DNSOn == "yes" {
  268. if err := setHostDNS(peerUpdate.DNS, cfg.Node.Interface, ncutils.IsWindows()); err != nil {
  269. logger.Log(0, "network:", cfg.Node.Network, "error updating /etc/hosts "+err.Error())
  270. return
  271. }
  272. } else {
  273. if err := removeHostDNS(cfg.Node.Interface, ncutils.IsWindows()); err != nil {
  274. logger.Log(0, "network:", cfg.Node.Network, "error removing profile from /etc/hosts "+err.Error())
  275. return
  276. }
  277. }
  278. _ = UpdateLocalListenPort(&cfg)
  279. }
  280. func setHostDNS(dns, iface string, windows bool) error {
  281. etchosts := "/etc/hosts"
  282. temp := os.TempDir()
  283. lockfile := temp + "/netclient-lock"
  284. if windows {
  285. etchosts = "c:\\windows\\system32\\drivers\\etc\\hosts"
  286. lockfile = temp + "\\netclient-lock"
  287. }
  288. if _, err := os.Stat(lockfile); !errors.Is(err, os.ErrNotExist) {
  289. return errors.New("/etc/hosts file is locked .... aborting")
  290. }
  291. lock, err := os.Create(lockfile)
  292. if err != nil {
  293. return fmt.Errorf("could not create lock file %w", err)
  294. }
  295. lock.Close()
  296. defer os.Remove(lockfile)
  297. dnsdata := strings.NewReader(dns)
  298. profile, err := parser.ParseProfile(dnsdata)
  299. if err != nil {
  300. return err
  301. }
  302. hosts, err := file.NewFile(etchosts)
  303. if err != nil {
  304. return err
  305. }
  306. profile.Name = strings.ToLower(iface)
  307. profile.Status = types.Enabled
  308. if err := hosts.ReplaceProfile(profile); err != nil {
  309. return err
  310. }
  311. if err := hosts.Flush(); err != nil {
  312. return err
  313. }
  314. return nil
  315. }
  316. func removeHostDNS(iface string, windows bool) error {
  317. etchosts := "/etc/hosts"
  318. temp := os.TempDir()
  319. lockfile := temp + "/netclient-lock"
  320. if windows {
  321. etchosts = "c:\\windows\\system32\\drivers\\etc\\hosts"
  322. lockfile = temp + "\\netclient-lock"
  323. }
  324. if _, err := os.Stat(lockfile); !errors.Is(err, os.ErrNotExist) {
  325. return errors.New("/etc/hosts file is locked .... aborting")
  326. }
  327. lock, err := os.Create(lockfile)
  328. if err != nil {
  329. return fmt.Errorf("could not create lock file %w", err)
  330. }
  331. lock.Close()
  332. defer os.Remove(lockfile)
  333. hosts, err := file.NewFile(etchosts)
  334. if err != nil {
  335. return err
  336. }
  337. if err := hosts.RemoveProfile(strings.ToLower(iface)); err != nil {
  338. if err == types.ErrUnknownProfile {
  339. return nil
  340. }
  341. return err
  342. }
  343. if err := hosts.Flush(); err != nil {
  344. return err
  345. }
  346. return nil
  347. }