|
@@ -117,6 +117,28 @@ func GetID(topic string) (string, error) {
|
|
|
return parts[count-1], nil
|
|
|
}
|
|
|
|
|
|
+// UpdateNode -- publishes a node update
|
|
|
+func NodeUpdate(node *models.Node) error {
|
|
|
+ opts := mqtt.NewClientOptions()
|
|
|
+ broker := servercfg.GetMessageQueueEndpoint()
|
|
|
+ logger.Log(0, "broker: "+broker)
|
|
|
+ opts.AddBroker(broker)
|
|
|
+ client := mqtt.NewClient(opts)
|
|
|
+ if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
|
+ return token.Error()
|
|
|
+ }
|
|
|
+ data, err := json.Marshal(node)
|
|
|
+ if err != nil {
|
|
|
+ logger.Log(2, "error marshalling node update "+err.Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if token := client.Publish("/update/"+node.ID, 0, false, data); token.Wait() && token.Error() != nil {
|
|
|
+ logger.Log(2, "error publishing peer update to peer "+node.ID+" "+token.Error().Error())
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
// NewPeer -- publishes a peer update to all the peers of a newNode
|
|
|
func NewPeer(node models.Node) error {
|
|
|
opts := mqtt.NewClientOptions()
|