handlers.go 14 KB


  1. package mq
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. mqtt "github.com/eclipse/paho.mqtt.golang"
  8. "github.com/google/uuid"
  9. "github.com/gravitl/netmaker/database"
  10. "github.com/gravitl/netmaker/logger"
  11. "github.com/gravitl/netmaker/logic"
  12. "github.com/gravitl/netmaker/logic/hostactions"
  13. "github.com/gravitl/netmaker/models"
  14. "github.com/gravitl/netmaker/netclient/ncutils"
  15. "github.com/gravitl/netmaker/servercfg"
  16. )
  17. // DefaultHandler default message queue handler -- NOT USED
  18. func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
  19. logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload()))
  20. }
  21. // Ping message Handler -- handles ping topic from client nodes
  22. func Ping(client mqtt.Client, msg mqtt.Message) {
  23. id, err := getID(msg.Topic())
  24. if err != nil {
  25. logger.Log(0, "error getting node.ID sent on ping topic ")
  26. return
  27. }
  28. node, err := logic.GetNodeByID(id)
  29. if err != nil {
  30. logger.Log(3, "mq-ping error getting node: ", err.Error())
  31. node, err := logic.GetNodeByID(id)
  32. if err != nil {
  33. logger.Log(3, "mq-ping error getting node: ", err.Error())
  34. if database.IsEmptyRecord(err) {
  35. h := logic.GetHostByNodeID(id) // check if a host is still associated
  36. if h != nil { // inform host that node should be removed
  37. fakeNode := models.Node{}
  38. fakeNode.ID, _ = uuid.Parse(id)
  39. fakeNode.Action = models.NODE_DELETE
  40. fakeNode.PendingDelete = true
  41. if err := NodeUpdate(&fakeNode); err != nil {
  42. logger.Log(0, "failed to inform host", h.Name, h.ID.String(), "to remove node", id, err.Error())
  43. }
  44. }
  45. }
  46. return
  47. }
  48. decrypted, decryptErr := decryptMsg(&node, msg.Payload())
  49. if decryptErr != nil {
  50. logger.Log(0, "error decrypting when updating node ", node.ID.String(), decryptErr.Error())
  51. return
  52. }
  53. var checkin models.NodeCheckin
  54. if err := json.Unmarshal(decrypted, &checkin); err != nil {
  55. logger.Log(1, "error unmarshaling payload ", err.Error())
  56. return
  57. }
  58. host, err := logic.GetHost(node.HostID.String())
  59. if err != nil {
  60. logger.Log(0, "error retrieving host for node ", node.ID.String(), err.Error())
  61. return
  62. }
  63. node.SetLastCheckIn()
  64. host.Version = checkin.Version
  65. node.Connected = checkin.Connected
  66. host.Interfaces = checkin.Ifaces
  67. for i := range host.Interfaces {
  68. host.Interfaces[i].AddressString = host.Interfaces[i].Address.String()
  69. }
  70. if err := logic.UpdateNode(&node, &node); err != nil {
  71. logger.Log(0, "error updating node", node.ID.String(), " on checkin", err.Error())
  72. return
  73. }
  74. return
  75. }
  76. decrypted, decryptErr := decryptMsg(&node, msg.Payload())
  77. if decryptErr != nil {
  78. logger.Log(0, "error decrypting when updating node ", node.ID.String(), decryptErr.Error())
  79. return
  80. }
  81. var checkin models.NodeCheckin
  82. if err := json.Unmarshal(decrypted, &checkin); err != nil {
  83. logger.Log(1, "error unmarshaling payload ", err.Error())
  84. return
  85. }
  86. host, err := logic.GetHost(node.HostID.String())
  87. if err != nil {
  88. logger.Log(0, "error retrieving host for node ", node.ID.String(), err.Error())
  89. return
  90. }
  91. node.SetLastCheckIn()
  92. host.Version = checkin.Version
  93. node.Connected = checkin.Connected
  94. host.Interfaces = checkin.Ifaces
  95. for i := range host.Interfaces {
  96. host.Interfaces[i].AddressString = host.Interfaces[i].Address.String()
  97. }
  98. if err := logic.UpdateNode(&node, &node); err != nil {
  99. logger.Log(0, "error updating node", node.ID.String(), " on checkin", err.Error())
  100. return
  101. }
  102. logger.Log(3, "ping processed for node", node.ID.String())
  103. // --TODO --set client version once feature is implemented.
  104. //node.SetClientVersion(msg.Payload())
  105. }
  106. // UpdateNode message Handler -- handles updates from client nodes
  107. func UpdateNode(client mqtt.Client, msg mqtt.Message) {
  108. id, err := getID(msg.Topic())
  109. if err != nil {
  110. logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
  111. return
  112. }
  113. currentNode, err := logic.GetNodeByID(id)
  114. if err != nil {
  115. logger.Log(1, "error getting node ", id, err.Error())
  116. return
  117. }
  118. decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
  119. if decryptErr != nil {
  120. logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
  121. return
  122. }
  123. var newNode models.Node
  124. if err := json.Unmarshal(decrypted, &newNode); err != nil {
  125. logger.Log(1, "error unmarshaling payload ", err.Error())
  126. return
  127. }
  128. ifaceDelta := logic.IfaceDelta(&currentNode, &newNode)
  129. if servercfg.Is_EE && ifaceDelta {
  130. if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil {
  131. logger.Log(1, "failed to reset failover list during node update", currentNode.ID.String(), currentNode.Network)
  132. }
  133. }
  134. newNode.SetLastCheckIn()
  135. if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
  136. logger.Log(1, "error saving node", err.Error())
  137. return
  138. }
  139. if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes
  140. if err = PublishPeerUpdate(); err != nil {
  141. logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error())
  142. }
  143. }
  144. logger.Log(1, "updated node", id, newNode.ID.String())
  145. }
  146. // UpdateHost message Handler -- handles host updates from clients
  147. func UpdateHost(client mqtt.Client, msg mqtt.Message) {
  148. id, err := getID(msg.Topic())
  149. if err != nil {
  150. logger.Log(1, "error getting host.ID sent on ", msg.Topic(), err.Error())
  151. return
  152. }
  153. currentHost, err := logic.GetHost(id)
  154. if err != nil {
  155. logger.Log(1, "error getting host ", id, err.Error())
  156. return
  157. }
  158. decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload())
  159. if decryptErr != nil {
  160. logger.Log(1, "failed to decrypt message for host ", id, decryptErr.Error())
  161. return
  162. }
  163. var hostUpdate models.HostUpdate
  164. if err := json.Unmarshal(decrypted, &hostUpdate); err != nil {
  165. logger.Log(1, "error unmarshaling payload ", err.Error())
  166. return
  167. }
  168. logger.Log(3, fmt.Sprintf("recieved host update: %s\n", hostUpdate.Host.ID.String()))
  169. var sendPeerUpdate bool
  170. switch hostUpdate.Action {
  171. case models.Acknowledgement:
  172. hu := hostactions.GetAction(currentHost.ID.String())
  173. if hu != nil {
  174. if err = HostUpdate(hu); err != nil {
  175. logger.Log(0, "failed to send new node to host", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
  176. return
  177. } else {
  178. if err = PublishSingleHostPeerUpdate(context.Background(), currentHost, nil); err != nil {
  179. logger.Log(0, "failed peers publish after join acknowledged", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
  180. return
  181. }
  182. }
  183. }
  184. case models.UpdateHost:
  185. sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
  186. err := logic.UpsertHost(currentHost)
  187. if err != nil {
  188. logger.Log(0, "failed to update host: ", currentHost.ID.String(), err.Error())
  189. return
  190. }
  191. case models.DeleteHost:
  192. if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
  193. // delete EMQX credentials for host
  194. if err := DeleteEmqxUser(currentHost.ID.String()); err != nil {
  195. logger.Log(0, "failed to remove host credentials from EMQX: ", currentHost.ID.String(), err.Error())
  196. return
  197. }
  198. }
  199. if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil {
  200. logger.Log(0, "failed to delete all nodes of host: ", currentHost.ID.String(), err.Error())
  201. return
  202. }
  203. if err := logic.RemoveHostByID(currentHost.ID.String()); err != nil {
  204. logger.Log(0, "failed to delete host: ", currentHost.ID.String(), err.Error())
  205. return
  206. }
  207. sendPeerUpdate = true
  208. }
  209. if sendPeerUpdate {
  210. err := PublishPeerUpdate()
  211. if err != nil {
  212. logger.Log(0, "failed to pulish peer update: ", err.Error())
  213. }
  214. }
  215. // if servercfg.Is_EE && ifaceDelta {
  216. // if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil {
  217. // logger.Log(1, "failed to reset failover list during node update", currentHost.ID.String(), currentHost.Network)
  218. // }
  219. // }
  220. }
  221. // UpdateMetrics message Handler -- handles updates from client nodes for metrics
  222. func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
  223. if servercfg.Is_EE {
  224. id, err := getID(msg.Topic())
  225. if err != nil {
  226. logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
  227. return
  228. }
  229. currentNode, err := logic.GetNodeByID(id)
  230. if err != nil {
  231. logger.Log(1, "error getting node ", id, err.Error())
  232. return
  233. }
  234. decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
  235. if decryptErr != nil {
  236. logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
  237. return
  238. }
  239. var newMetrics models.Metrics
  240. if err := json.Unmarshal(decrypted, &newMetrics); err != nil {
  241. logger.Log(1, "error unmarshaling payload ", err.Error())
  242. return
  243. }
  244. shouldUpdate := updateNodeMetrics(&currentNode, &newMetrics)
  245. if err = logic.UpdateMetrics(id, &newMetrics); err != nil {
  246. logger.Log(1, "faield to update node metrics", id, err.Error())
  247. return
  248. }
  249. if servercfg.IsMetricsExporter() {
  250. if err := pushMetricsToExporter(newMetrics); err != nil {
  251. logger.Log(2, fmt.Sprintf("failed to push node: [%s] metrics to exporter, err: %v",
  252. currentNode.ID, err))
  253. }
  254. }
  255. if newMetrics.Connectivity != nil {
  256. err := logic.EnterpriseFailoverFunc(&currentNode)
  257. if err != nil {
  258. logger.Log(0, "failed to failover for node", currentNode.ID.String(), "on network", currentNode.Network, "-", err.Error())
  259. }
  260. }
  261. if shouldUpdate {
  262. logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues")
  263. host, err := logic.GetHost(currentNode.HostID.String())
  264. if err == nil {
  265. if err = PublishSingleHostPeerUpdate(context.Background(), host, nil); err != nil {
  266. logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
  267. }
  268. }
  269. }
  270. logger.Log(1, "updated node metrics", id)
  271. }
  272. }
  273. // ClientPeerUpdate message handler -- handles updating peers after signal from client nodes
  274. func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
  275. id, err := getID(msg.Topic())
  276. if err != nil {
  277. logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
  278. return
  279. }
  280. currentNode, err := logic.GetNodeByID(id)
  281. if err != nil {
  282. logger.Log(1, "error getting node ", id, err.Error())
  283. return
  284. }
  285. decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
  286. if decryptErr != nil {
  287. logger.Log(1, "failed to decrypt message during client peer update for node ", id, decryptErr.Error())
  288. return
  289. }
  290. switch decrypted[0] {
  291. case ncutils.ACK:
  292. // do we still need this
  293. case ncutils.DONE:
  294. if err = PublishPeerUpdate(); err != nil {
  295. logger.Log(1, "error publishing peer update for node", currentNode.ID.String(), err.Error())
  296. return
  297. }
  298. }
  299. logger.Log(1, "sent peer updates after signal received from", id)
  300. }
  301. func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) bool {
  302. if newMetrics.FailoverPeers == nil {
  303. newMetrics.FailoverPeers = make(map[string]string)
  304. }
  305. oldMetrics, err := logic.GetMetrics(currentNode.ID.String())
  306. if err != nil {
  307. logger.Log(1, "error finding old metrics for node", currentNode.ID.String())
  308. return false
  309. }
  310. if oldMetrics.FailoverPeers == nil {
  311. oldMetrics.FailoverPeers = make(map[string]string)
  312. }
  313. var attachedClients []models.ExtClient
  314. if currentNode.IsIngressGateway {
  315. clients, err := logic.GetExtClientsByID(currentNode.ID.String(), currentNode.Network)
  316. if err == nil {
  317. attachedClients = clients
  318. }
  319. }
  320. if len(attachedClients) > 0 {
  321. // associate ext clients with IDs
  322. for i := range attachedClients {
  323. extMetric := newMetrics.Connectivity[attachedClients[i].PublicKey]
  324. if len(extMetric.NodeName) == 0 &&
  325. len(newMetrics.Connectivity[attachedClients[i].ClientID].NodeName) > 0 { // cover server clients
  326. extMetric = newMetrics.Connectivity[attachedClients[i].ClientID]
  327. if extMetric.TotalReceived > 0 && extMetric.TotalSent > 0 {
  328. extMetric.Connected = true
  329. }
  330. }
  331. extMetric.NodeName = attachedClients[i].ClientID
  332. delete(newMetrics.Connectivity, attachedClients[i].PublicKey)
  333. newMetrics.Connectivity[attachedClients[i].ClientID] = extMetric
  334. }
  335. }
  336. // run through metrics for each peer
  337. for k := range newMetrics.Connectivity {
  338. currMetric := newMetrics.Connectivity[k]
  339. oldMetric := oldMetrics.Connectivity[k]
  340. currMetric.TotalTime += oldMetric.TotalTime
  341. currMetric.Uptime += oldMetric.Uptime // get the total uptime for this connection
  342. if currMetric.Uptime == 0 || currMetric.TotalTime == 0 {
  343. currMetric.PercentUp = 0
  344. } else {
  345. currMetric.PercentUp = 100.0 * (float64(currMetric.Uptime) / float64(currMetric.TotalTime))
  346. }
  347. totalUpMinutes := currMetric.Uptime * ncutils.CheckInInterval
  348. currMetric.ActualUptime = time.Duration(totalUpMinutes) * time.Minute
  349. delete(oldMetrics.Connectivity, k) // remove from old data
  350. newMetrics.Connectivity[k] = currMetric
  351. }
  352. // add nodes that need failover
  353. nodes, err := logic.GetNetworkNodes(currentNode.Network)
  354. if err != nil {
  355. logger.Log(0, "failed to retrieve nodes while updating metrics")
  356. return false
  357. }
  358. for _, node := range nodes {
  359. if !newMetrics.Connectivity[node.ID.String()].Connected &&
  360. len(newMetrics.Connectivity[node.ID.String()].NodeName) > 0 &&
  361. node.Connected &&
  362. len(node.FailoverNode) > 0 &&
  363. !node.Failover {
  364. newMetrics.FailoverPeers[node.ID.String()] = node.FailoverNode.String()
  365. }
  366. }
  367. shouldUpdate := len(oldMetrics.FailoverPeers) == 0 && len(newMetrics.FailoverPeers) > 0
  368. for k, v := range oldMetrics.FailoverPeers {
  369. if len(newMetrics.FailoverPeers[k]) > 0 && len(v) == 0 {
  370. shouldUpdate = true
  371. }
  372. if len(v) > 0 && len(newMetrics.FailoverPeers[k]) == 0 {
  373. newMetrics.FailoverPeers[k] = v
  374. }
  375. }
  376. for k := range oldMetrics.Connectivity { // cleanup any left over data, self healing
  377. delete(newMetrics.Connectivity, k)
  378. }
  379. return shouldUpdate
  380. }