Browse Source

handled node additions in more elegant manner

0xdcarns 2 years ago
parent
commit
6b30cef968
7 changed files with 66 additions and 36 deletions
  1. 6 26
      controllers/enrollmentkeys.go
  2. 3 9
      controllers/hosts.go
  3. 43 0
      logic/hostactions/hostactions.go
  4. 1 1
      logic/hosts.go
  5. 2 0
      models/host.go
  6. 8 0
      mq/handlers.go
  7. 3 0
      mq/publishers.go

+ 6 - 26
controllers/enrollmentkeys.go

@@ -9,8 +9,8 @@ import (
 	"github.com/gorilla/mux"
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/logic/hostactions"
 	"github.com/gravitl/netmaker/models"
-	"github.com/gravitl/netmaker/mq"
 	"github.com/gravitl/netmaker/servercfg"
 )
 
@@ -182,7 +182,6 @@ func handleHostRegister(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
-
 	// ready the response
 	server := servercfg.GetServerInfo()
 	server.TrafficKey = key
@@ -196,15 +195,6 @@ func handleHostRegister(w http.ResponseWriter, r *http.Request) {
 // run through networks and send a host update
 func checkNetRegAndHostUpdate(networks []string, h *models.Host) {
 	// publish host update through MQ
-	if servercfg.IsMessageQueueBackend() {
-		if err := mq.HostUpdate(&models.HostUpdate{
-			Action: models.UpdateHost,
-			Host:   *h,
-		}); err != nil {
-			logger.Log(0, "failed to send host update after registration:", h.ID.String(), err.Error())
-		}
-	}
-
 	for i := range networks {
 		if ok, _ := logic.NetworkExists(networks[i]); ok {
 			newNode, err := logic.UpdateHostNetwork(h, networks[i], true)
@@ -213,21 +203,11 @@ func checkNetRegAndHostUpdate(networks []string, h *models.Host) {
 				continue
 			}
 			logger.Log(1, "added new node", newNode.ID.String(), "to host", h.Name)
-			if servercfg.IsMessageQueueBackend() {
-				if err = mq.HostUpdate(&models.HostUpdate{
-					Action: models.JoinHostToNetwork,
-					Host:   *h,
-					Node:   *newNode,
-				}); err != nil {
-					logger.Log(0, "failed to send host update to", h.ID.String(), networks[i], err.Error())
-				}
-			}
-		}
-	}
-
-	if servercfg.IsMessageQueueBackend() {
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(0, "failed to publish peer update after host registration -", err.Error())
+			hostactions.AddAction(models.HostUpdate{
+				Action: models.JoinHostToNetwork,
+				Host:   *h,
+				Node:   *newNode,
+			})
 		}
 	}
 }

+ 3 - 9
controllers/hosts.go

@@ -10,6 +10,7 @@ import (
 	"github.com/gorilla/mux"
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/logic/hostactions"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/mq"
 	"golang.org/x/crypto/bcrypt"
@@ -230,18 +231,11 @@ func addHostToNetwork(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	logger.Log(1, "added new node", newNode.ID.String(), "to host", currHost.Name)
-	if err = mq.HostUpdate(&models.HostUpdate{
+	hostactions.AddAction(models.HostUpdate{
 		Action: models.JoinHostToNetwork,
 		Host:   *currHost,
 		Node:   *newNode,
-	}); err != nil {
-		logger.Log(0, r.Header.Get("user"), "failed to update host to join network:", hostid, network, err.Error())
-	}
-	go func() { // notify of peer change
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(1, "error publishing peer update ", err.Error())
-		}
-	}()
+	})
 
 	logger.Log(2, r.Header.Get("user"), fmt.Sprintf("added host %s to network %s", currHost.Name, network))
 	w.WriteHeader(http.StatusOK)

+ 43 - 0
logic/hostactions/hostactions.go

@@ -0,0 +1,43 @@
+package hostactions
+
+import (
+	"sync"
+
+	"github.com/gravitl/netmaker/models"
+)
+
+// nodeActionHandler - handles the storage of host action updates
+var nodeActionHandler sync.Map
+
+// AddAction - adds a host action to a host's list to be retrieved from broker update
+func AddAction(hu models.HostUpdate) {
+	currentRecords, ok := nodeActionHandler.Load(hu.Host.ID.String())
+	if !ok { // no list exists yet
+		nodeActionHandler.Store(hu.Host.ID.String(), []models.HostUpdate{hu})
+	} else { // list exists, append to it
+		currentList := currentRecords.([]models.HostUpdate)
+		currentList = append(currentList, hu)
+		nodeActionHandler.Store(hu.Host.ID.String(), currentList)
+	}
+}
+
+// GetAction - gets an action if exists
+func GetAction(id string) *models.HostUpdate {
+	currentRecords, ok := nodeActionHandler.Load(id)
+	if !ok {
+		return nil
+	}
+	currentList := currentRecords.([]models.HostUpdate)
+	if len(currentList) > 0 {
+		hu := currentList[0]
+		nodeActionHandler.Store(hu.Host.ID.String(), currentList[1:])
+		return &hu
+	}
+	return nil
+}
+
+// [hostID][NodeAction1, NodeAction2]
+// host receives nodeaction1
+// host responds with ACK or something
+// mq then sends next action in list, NodeAction2
+// host responds, list is empty, finished

+ 1 - 1
logic/hosts.go

@@ -90,7 +90,7 @@ func CreateHost(h *models.Host) error {
 	if (err != nil && !database.IsEmptyRecord(err)) || (err == nil) {
 		return ErrHostExists
 	}
-	//encrypt that password so we never see it
+	// encrypt that password so we never see it
 	hash, err := bcrypt.GenerateFromPassword([]byte(h.HostPass), 5)
 	if err != nil {
 		return err

+ 2 - 0
models/host.go

@@ -74,6 +74,8 @@ const (
 	DeleteHost = "DELETE_HOST"
 	// JoinHostToNetwork - constant for host network join action
 	JoinHostToNetwork = "JOIN_HOST_TO_NETWORK"
+	// Acknowledgement - ACK response for hosts
+	Acknowledgement = "ACK"
 )
 
 // HostUpdate - struct for host update

+ 8 - 0
mq/handlers.go

@@ -9,6 +9,7 @@ import (
 	"github.com/gravitl/netmaker/database"
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/logic/hostactions"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/netclient/ncutils"
 	"github.com/gravitl/netmaker/servercfg"
@@ -144,6 +145,13 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 		logger.Log(3, fmt.Sprintf("recieved host update: %s\n", hostUpdate.Host.ID.String()))
 		var sendPeerUpdate bool
 		switch hostUpdate.Action {
+		case models.Acknowledgement:
+			hu := hostactions.GetAction(currentHost.ID.String())
+			if err = HostUpdate(hu); err != nil {
+				logger.Log(0, "failed to send new node to host", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
+				return
+			}
+			sendPeerUpdate = true
 		case models.UpdateHost:
 			sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
 			err := logic.UpsertHost(currentHost)

+ 3 - 0
mq/publishers.go

@@ -40,6 +40,9 @@ func PublishSingleHostUpdate(host *models.Host) error {
 	if err != nil {
 		return err
 	}
+	if len(peerUpdate.Peers) == 0 { // no peers to send
+		return nil
+	}
 	if host.ProxyEnabled {
 		proxyUpdate, err := logic.GetProxyUpdateForHost(host)
 		if err != nil {