mq.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package mq
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "strings"
  8. mqtt "github.com/eclipse/paho.mqtt.golang"
  9. "github.com/gravitl/netmaker/database"
  10. "github.com/gravitl/netmaker/logger"
  11. "github.com/gravitl/netmaker/logic"
  12. "github.com/gravitl/netmaker/models"
  13. "github.com/gravitl/netmaker/servercfg"
  14. )
  15. // DefaultHandler default message queue handler - only called when GetDebug == true
  16. var DefaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  17. logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload()))
  18. }
  19. // Ping message Handler -- handles ping topic from client nodes
  20. var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  21. logger.Log(0, "Ping Handler: ", msg.Topic())
  22. go func() {
  23. id, err := GetID(msg.Topic())
  24. if err != nil {
  25. logger.Log(0, "error getting node.ID sent on ping topic ")
  26. return
  27. }
  28. node, err := logic.GetNodeByID(id)
  29. if err != nil {
  30. logger.Log(0, "mq-ping error getting node: ", err.Error())
  31. record, err := database.FetchRecord(database.NODES_TABLE_NAME, id)
  32. if err != nil {
  33. logger.Log(0, "error reading database ", err.Error())
  34. return
  35. }
  36. logger.Log(0, "record from database")
  37. logger.Log(0, record)
  38. return
  39. }
  40. _, decryptErr := decryptMsg(node.ID, msg.Payload())
  41. if decryptErr != nil {
  42. logger.Log(0, "error updating node ", node.ID, err.Error())
  43. return
  44. }
  45. node.SetLastCheckIn()
  46. if err := logic.UpdateNode(&node, &node); err != nil {
  47. logger.Log(0, "error updating node ", err.Error())
  48. }
  49. logger.Log(0, "ping processed")
  50. // --TODO --set client version once feature is implemented.
  51. //node.SetClientVersion(msg.Payload())
  52. }()
  53. }
  54. // UpdateNode message Handler -- handles updates from client nodes
  55. var UpdateNode mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  56. go func() {
  57. id, err := GetID(msg.Topic())
  58. if err != nil {
  59. logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
  60. return
  61. }
  62. currentNode, err := logic.GetNodeByID(id)
  63. if err != nil {
  64. logger.Log(1, "error getting node ", id, err.Error())
  65. return
  66. }
  67. decrypted, decryptErr := decryptMsg(id, msg.Payload())
  68. if decryptErr != nil {
  69. logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
  70. return
  71. }
  72. logger.Log(1, "Update Node Handler", id)
  73. var newNode models.Node
  74. if err := json.Unmarshal(decrypted, &newNode); err != nil {
  75. logger.Log(1, "error unmarshaling payload ", err.Error())
  76. return
  77. }
  78. if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
  79. logger.Log(1, "error saving node", err.Error())
  80. }
  81. if logic.ShouldPeersUpdate(&currentNode, &newNode) {
  82. if err := PublishPeerUpdate(&newNode); err != nil {
  83. logger.Log(1, "error publishing peer update ", err.Error())
  84. return
  85. }
  86. }
  87. }()
  88. }
  89. // PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node
  90. func PublishPeerUpdate(newNode *models.Node) error {
  91. if !servercfg.IsMessageQueueBackend() {
  92. return nil
  93. }
  94. networkNodes, err := logic.GetNetworkNodes(newNode.Network)
  95. if err != nil {
  96. logger.Log(1, "err getting Network Nodes", err.Error())
  97. return err
  98. }
  99. for _, node := range networkNodes {
  100. peerUpdate, err := logic.GetPeerUpdate(&node)
  101. if err != nil {
  102. logger.Log(1, "error getting peer update for node ", node.ID, err.Error())
  103. continue
  104. }
  105. data, err := json.Marshal(&peerUpdate)
  106. if err != nil {
  107. logger.Log(2, "error marshaling peer update for node", node.ID, err.Error())
  108. continue
  109. }
  110. if err = publish(node.ID, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil {
  111. logger.Log(1, "failed to publish peer update for node", node.ID)
  112. }
  113. }
  114. return nil
  115. }
  116. // GetID -- decodes a message queue topic and returns the embedded node.ID
  117. func GetID(topic string) (string, error) {
  118. parts := strings.Split(topic, "/")
  119. count := len(parts)
  120. if count == 1 {
  121. return "", errors.New("invalid topic")
  122. }
  123. //the last part of the topic will be the node.ID
  124. return parts[count-1], nil
  125. }
  126. // NodeUpdate -- publishes a node update
  127. func NodeUpdate(node *models.Node) error {
  128. if !servercfg.IsMessageQueueBackend() {
  129. return nil
  130. }
  131. logger.Log(3, "publishing node update to "+node.Name)
  132. data, err := json.Marshal(node)
  133. if err != nil {
  134. logger.Log(2, "error marshalling node update ", err.Error())
  135. return err
  136. }
  137. if err = publish(node.ID, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil {
  138. logger.Log(2, "error publishing node update to peer ", node.ID, err.Error())
  139. return err
  140. }
  141. return nil
  142. }
  143. // SetupMQTT creates a connection to broker and return client
  144. func SetupMQTT() mqtt.Client {
  145. opts := mqtt.NewClientOptions()
  146. broker := servercfg.GetMessageQueueEndpoint()
  147. opts.AddBroker(broker)
  148. client := mqtt.NewClient(opts)
  149. if token := client.Connect(); token.Wait() && token.Error() != nil {
  150. log.Fatal(token.Error())
  151. }
  152. return client
  153. }