Browse Source

publish node update and peer update on ingress gateway creation

Matthew R Kasun 3 years ago
parent
commit
c1813c52e9
4 changed files with 28 additions and 20 deletions
  1. 7 0
      controllers/node.go
  2. 2 2
      controllers/node_grpc.go
  3. 17 16
      mq/mq.go
  4. 2 2
      netclient/functions/daemon.go

+ 7 - 0
controllers/node.go

@@ -473,6 +473,13 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) {
 		returnErrorResponse(w, r, formatError(err, "internal"))
 		returnErrorResponse(w, r, formatError(err, "internal"))
 		return
 		return
 	}
 	}
+	if err := mq.NodeUpdate(&node); err != nil {
+		logger.Log(1, "error publishing node update"+err.Error())
+	}
+	if err := mq.UpdatePeers(&node); err != nil {
+		logger.Log(1, "error publishing peer update "+err.Error())
+		return
+	}
 	logger.Log(1, r.Header.Get("user"), "created ingress gateway on node", nodeid, "on network", netid)
 	logger.Log(1, r.Header.Get("user"), "created ingress gateway on node", nodeid, "on network", netid)
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(node)
 	json.NewEncoder(w).Encode(node)

+ 2 - 2
controllers/node_grpc.go

@@ -87,7 +87,7 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object)
 		return nil, err
 		return nil, err
 	}
 	}
 	// notify other nodes on network of new peer
 	// notify other nodes on network of new peer
-	if err := mq.NewPeer(node); err != nil {
+	if err := mq.UpdatePeers(&node); err != nil {
 		logger.Log(0, "failed to inform peers of new node "+err.Error())
 		logger.Log(0, "failed to inform peers of new node "+err.Error())
 	}
 	}
 
 
@@ -143,7 +143,7 @@ func (s *NodeServiceServer) DeleteNode(ctx context.Context, req *nodepb.Object)
 		return nil, err
 		return nil, err
 	}
 	}
 	// notify other nodes on network of deleted peer
 	// notify other nodes on network of deleted peer
-	if err := mq.NewPeer(node); err != nil {
+	if err := mq.UpdatePeers(&node); err != nil {
 		logger.Log(0, "failed to inform peers of deleted node "+err.Error())
 		logger.Log(0, "failed to inform peers of deleted node "+err.Error())
 	}
 	}
 
 

+ 17 - 16
mq/mq.go

@@ -3,6 +3,7 @@ package mq
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
+	"log"
 	"strings"
 	"strings"
 
 
 	mqtt "github.com/eclipse/paho.mqtt.golang"
 	mqtt "github.com/eclipse/paho.mqtt.golang"
@@ -120,14 +121,8 @@ func GetID(topic string) (string, error) {
 // UpdateNode -- publishes a node update
 // UpdateNode -- publishes a node update
 func NodeUpdate(node *models.Node) error {
 func NodeUpdate(node *models.Node) error {
 	logger.Log(3, "publishing node update to "+node.Name)
 	logger.Log(3, "publishing node update to "+node.Name)
-	opts := mqtt.NewClientOptions()
-	broker := servercfg.GetMessageQueueEndpoint()
-	logger.Log(0, "broker: "+broker)
-	opts.AddBroker(broker)
-	client := mqtt.NewClient(opts)
-	if token := client.Connect(); token.Wait() && token.Error() != nil {
-		return token.Error()
-	}
+	client := SetupMQTT()
+	defer client.Disconnect(250)
 	data, err := json.Marshal(node)
 	data, err := json.Marshal(node)
 	if err != nil {
 	if err != nil {
 		logger.Log(2, "error marshalling node update "+err.Error())
 		logger.Log(2, "error marshalling node update "+err.Error())
@@ -140,18 +135,24 @@ func NodeUpdate(node *models.Node) error {
 	return nil
 	return nil
 }
 }
 
 
-// NewPeer -- publishes a peer update to all the peers of a newNode
-func NewPeer(node models.Node) error {
+// UpdatePeers -- publishes a peer update to all the peers of a node
+func UpdatePeers(node *models.Node) error {
+	client := SetupMQTT()
+	defer client.Disconnect(250)
+	if err := PublishPeerUpdate(client, node); err != nil {
+		return err
+	}
+	return nil
+}
+
+// SetupMQTT creates a connection to broker and return client
+func SetupMQTT() mqtt.Client {
 	opts := mqtt.NewClientOptions()
 	opts := mqtt.NewClientOptions()
 	broker := servercfg.GetMessageQueueEndpoint()
 	broker := servercfg.GetMessageQueueEndpoint()
-	logger.Log(0, "broker: "+broker)
 	opts.AddBroker(broker)
 	opts.AddBroker(broker)
 	client := mqtt.NewClient(opts)
 	client := mqtt.NewClient(opts)
 	if token := client.Connect(); token.Wait() && token.Error() != nil {
 	if token := client.Connect(); token.Wait() && token.Error() != nil {
-		return token.Error()
+		log.Fatal(token.Error())
 	}
 	}
-	if err := PublishPeerUpdate(client, &node); err != nil {
-		return err
-	}
-	return nil
+	return client
 }
 }

+ 2 - 2
netclient/functions/daemon.go

@@ -70,11 +70,11 @@ func MessageQueue(ctx context.Context, network string) {
 	if cfg.DebugOn {
 	if cfg.DebugOn {
 		ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/" + cfg.Node.ID)
 		ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/" + cfg.Node.ID)
 	}
 	}
-	if token := client.Subscribe("/update/peers/"+cfg.Node.ID, 0, UpdatePeers); token.Wait() && token.Error() != nil {
+	if token := client.Subscribe("update/peers/"+cfg.Node.ID, 0, UpdatePeers); token.Wait() && token.Error() != nil {
 		log.Fatal(token.Error())
 		log.Fatal(token.Error())
 	}
 	}
 	if cfg.DebugOn {
 	if cfg.DebugOn {
-		ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " /update/peers/" + cfg.Node.ID)
+		ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/peers/" + cfg.Node.ID)
 	}
 	}
 	defer client.Disconnect(250)
 	defer client.Disconnect(250)
 	go Checkin(ctx, &cfg, network)
 	go Checkin(ctx, &cfg, network)