Browse Source

rm channel

Abhishek Kondur 2 years ago
parent
commit
3d5eac6905
3 changed files with 28 additions and 24 deletions
  1. 16 3
      controllers/node.go
  2. 0 1
      main.go
  3. 12 20
      mq/dynsec.go

+ 16 - 3
controllers/node.go

@@ -621,7 +621,7 @@ func createNode(w http.ResponseWriter, r *http.Request) {
 		return
 		return
 	}
 	}
 	// Delete Any Existing Client with this ID.
 	// Delete Any Existing Client with this ID.
-	mq.DynSecChan <- mq.DynSecAction{
+	event := mq.DynSecAction{
 		ActionType: mq.DeleteClient,
 		ActionType: mq.DeleteClient,
 		Payload: mq.MqDynsecPayload{
 		Payload: mq.MqDynsecPayload{
 			Commands: []mq.MqDynSecCmd{
 			Commands: []mq.MqDynSecCmd{
@@ -632,8 +632,12 @@ func createNode(w http.ResponseWriter, r *http.Request) {
 			},
 			},
 		},
 		},
 	}
 	}
+	if err := mq.PublishEventToDynSecTopic(event); err != nil {
+		logger.Log(0, fmt.Sprintf("failed to send DynSec command [%s]: %v",
+			event.ActionType, err.Error()))
+	}
 	// Create client for this node in Mq
 	// Create client for this node in Mq
-	mq.DynSecChan <- mq.DynSecAction{
+	event = mq.DynSecAction{
 		ActionType: mq.CreateClient,
 		ActionType: mq.CreateClient,
 		Payload: mq.MqDynsecPayload{
 		Payload: mq.MqDynsecPayload{
 			Commands: []mq.MqDynSecCmd{
 			Commands: []mq.MqDynSecCmd{
@@ -648,6 +652,10 @@ func createNode(w http.ResponseWriter, r *http.Request) {
 			},
 			},
 		},
 		},
 	}
 	}
+	if err := mq.PublishEventToDynSecTopic(event); err != nil {
+		logger.Log(0, fmt.Sprintf("failed to send DynSec command [%s]: %v",
+			event.ActionType, err.Error()))
+	}
 
 
 	response := models.NodeGet{
 	response := models.NodeGet{
 		Node:         node,
 		Node:         node,
@@ -984,7 +992,8 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 		return
 	}
 	}
-	mq.DynSecChan <- mq.DynSecAction{
+
+	event := mq.DynSecAction{
 		ActionType: mq.DeleteClient,
 		ActionType: mq.DeleteClient,
 		Payload: mq.MqDynsecPayload{
 		Payload: mq.MqDynsecPayload{
 			Commands: []mq.MqDynSecCmd{
 			Commands: []mq.MqDynSecCmd{
@@ -995,6 +1004,10 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 			},
 			},
 		},
 		},
 	}
 	}
+	if err := mq.PublishEventToDynSecTopic(event); err != nil {
+		logger.Log(0, fmt.Sprintf("failed to send DynSec command [%s]: %v",
+			event.ActionType, err.Error()))
+	}
 	logic.ReturnSuccessResponse(w, r, nodeid+" deleted.")
 	logic.ReturnSuccessResponse(w, r, nodeid+" deleted.")
 	logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"])
 	logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"])
 	runUpdates(&node, false)
 	runUpdates(&node, false)

+ 0 - 1
main.go

@@ -188,7 +188,6 @@ func runMessageQueue(wg *sync.WaitGroup) {
 	mq.SetUpAdminClient()
 	mq.SetUpAdminClient()
 	mq.SetupMQTT()
 	mq.SetupMQTT()
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
-	go mq.DynamicSecManager(ctx)
 	go mq.Keepalive(ctx)
 	go mq.Keepalive(ctx)
 	go logic.ManageZombies(ctx)
 	go logic.ManageZombies(ctx)
 	quit := make(chan os.Signal, 1)
 	quit := make(chan os.Signal, 1)

+ 12 - 20
mq/dynsec.go

@@ -1,7 +1,6 @@
 package mq
 package mq
 
 
 import (
 import (
-	"context"
 	"crypto/sha512"
 	"crypto/sha512"
 	"encoding/base64"
 	"encoding/base64"
 	"encoding/json"
 	"encoding/json"
@@ -115,8 +114,6 @@ type MqDynsecPayload struct {
 	Commands []MqDynSecCmd `json:"commands"`
 	Commands []MqDynSecCmd `json:"commands"`
 }
 }
 
 
-var DynSecChan = make(chan DynSecAction, 100)
-
 func encodePasswordToPBKDF2(password string, salt string, iterations int, keyLength int) string {
 func encodePasswordToPBKDF2(password string, salt string, iterations int, keyLength int) string {
 	binaryEncoded := pbkdf2.Key([]byte(password), []byte(salt), iterations, keyLength, sha512.New)
 	binaryEncoded := pbkdf2.Key([]byte(password), []byte(salt), iterations, keyLength, sha512.New)
 	return base64.StdEncoding.EncodeToString(binaryEncoded)
 	return base64.StdEncoding.EncodeToString(binaryEncoded)
@@ -129,7 +126,10 @@ func Configure() error {
 		return err
 		return err
 	}
 	}
 	c := dynCnf{}
 	c := dynCnf{}
-	json.Unmarshal(b, &c)
+	err = json.Unmarshal(b, &c)
+	if err != nil {
+		return err
+	}
 	password := servercfg.GetMqAdminPassword()
 	password := servercfg.GetMqAdminPassword()
 	if password == "" {
 	if password == "" {
 		return errors.New("MQ admin password not provided")
 		return errors.New("MQ admin password not provided")
@@ -150,24 +150,16 @@ func Configure() error {
 	return os.WriteFile(file, data, 0755)
 	return os.WriteFile(file, data, 0755)
 }
 }
 
 
-func DynamicSecManager(ctx context.Context) {
-	defer close(DynSecChan)
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		case dynSecAction := <-DynSecChan:
-			d, err := json.Marshal(dynSecAction.Payload)
-			if err != nil {
-				continue
-			}
-			if token := mqclient.Publish(DynamicSecPubTopic, 2, false, d); token.Error() != nil {
-				logger.Log(0, fmt.Sprintf("failed to perform action [%s]: %v",
-					dynSecAction.ActionType, token.Error()))
-			}
-		}
+func PublishEventToDynSecTopic(event DynSecAction) error {
 
 
+	d, err := json.Marshal(event.Payload)
+	if err != nil {
+		return err
+	}
+	if token := mqAdminClient.Publish(DynamicSecPubTopic, 2, false, d); token.Error() != nil {
+		return err
 	}
 	}
+	return nil
 }
 }
 
 
 func watchDynSecTopic(client mqtt.Client, msg mqtt.Message) {
 func watchDynSecTopic(client mqtt.Client, msg mqtt.Message) {