瀏覽代碼

began servside refactor

0xdcarns 3 年之前
父節點
當前提交
407e46c117
共有 4 個文件被更改,包括 13 次插入25 次删除
  1. 4 11
      controllers/node.go
  2. 3 6
      controllers/server_util.go
  3. 1 1
      mq/mq.go
  4. 5 7
      netclient/functions/daemon.go

+ 4 - 11
controllers/node.go

@@ -5,7 +5,6 @@ import (
 	"fmt"
 	"net/http"
 	"strings"
-	"time"
 
 	"github.com/gorilla/mux"
 	"github.com/gravitl/netmaker/database"
@@ -418,8 +417,6 @@ func createNode(w http.ResponseWriter, r *http.Request) {
 	logger.Log(1, r.Header.Get("user"), "created new node", node.Name, "on network", node.Network)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(node)
-
-	runUpdates(&node, false, false)
 }
 
 //Takes node out of pending state
@@ -437,7 +434,7 @@ func uncordonNode(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode("SUCCESS")
 
-	runUpdates(&node, true, false)
+	mq.NodeUpdate(&node)
 }
 
 func createEgressGateway(w http.ResponseWriter, r *http.Request) {
@@ -623,10 +620,10 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 	returnSuccessResponse(w, r, nodeid+" deleted.")
 
 	logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"])
-	runUpdates(&node, false, true)
+	runUpdates(&node, false)
 }
 
-func runUpdates(node *models.Node, nodeUpdate bool, requiresPause bool) error {
+func runUpdates(node *models.Node, nodeUpdate bool) error {
 	//don't publish to server node
 
 	if nodeUpdate && !isServer(node) {
@@ -636,11 +633,7 @@ func runUpdates(node *models.Node, nodeUpdate bool, requiresPause bool) error {
 		}
 	}
 
-	if requiresPause { // TODO in future, detect when a node has finished iface update
-		time.Sleep(time.Second * 10)
-	}
-
-	if err := runServerPeerUpdate(node, isServer(node)); err != nil {
+	if err := runServerUpdate(node, isServer(node)); err != nil {
 		logger.Log(1, "internal error when running peer node:", err.Error())
 		return err
 	}

+ 3 - 6
controllers/server_util.go

@@ -4,21 +4,17 @@ import (
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
-	"github.com/gravitl/netmaker/mq"
 	"github.com/gravitl/netmaker/servercfg"
 )
 
-func runServerPeerUpdate(node *models.Node, ifaceDelta bool) error {
+// updates local peers for a server on a given node's network
+func runServerUpdate(node *models.Node, ifaceDelta bool) error {
 
 	err := logic.TimerCheckpoint()
 	if err != nil {
 		logger.Log(3, "error occurred on timer,", err.Error())
 	}
 
-	if err := mq.PublishPeerUpdate(node); err != nil {
-		logger.Log(0, "failed to inform peers of new node ", err.Error())
-	}
-
 	if servercfg.IsClientMode() != "on" {
 		return nil
 	}
@@ -26,6 +22,7 @@ func runServerPeerUpdate(node *models.Node, ifaceDelta bool) error {
 	if err != nil {
 		return getErr
 	}
+
 	if err = logic.ServerUpdate(&currentServerNode, ifaceDelta); err != nil {
 		logger.Log(1, "server node:", currentServerNode.ID, "failed update")
 		return err

+ 1 - 1
mq/mq.go

@@ -45,7 +45,7 @@ func SetupMQTT(publish bool) mqtt.Client {
 				client.Disconnect(240)
 				logger.Log(0, "node update subscription failed")
 			}
-			if token := client.Subscribe("clients/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.Wait() && token.Error() != nil {
+			if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.Wait() && token.Error() != nil {
 				client.Disconnect(240)
 				logger.Log(0, "node client subscription failed")
 			}

+ 5 - 7
netclient/functions/daemon.go

@@ -26,7 +26,6 @@ import (
 )
 
 // == Message Caches ==
-// var keepalive = new(sync.Map)
 var messageCache = new(sync.Map)
 var networkcontext = new(sync.Map)
 
@@ -367,7 +366,7 @@ func PublishNodeUpdate(cfg *config.ClientConfig) error {
 	if err != nil {
 		return err
 	}
-	if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data); err != nil {
+	if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data, 1); err != nil {
 		return err
 	}
 	return nil
@@ -375,14 +374,13 @@ func PublishNodeUpdate(cfg *config.ClientConfig) error {
 
 // Hello -- ping the broker to let server know node is alive and doing fine
 func Hello(cfg *config.ClientConfig, network string) {
-	if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte(ncutils.Version)); err != nil {
+	if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte(ncutils.Version), 0); err != nil {
 		ncutils.Log(fmt.Sprintf("error publishing ping, %v", err))
 		ncutils.Log("running pull on " + cfg.Node.Network + " to reconnect")
 		_, err := Pull(cfg.Node.Network, true)
 		if err != nil {
 			ncutils.Log("could not run pull on " + cfg.Node.Network + ", error: " + err.Error())
 		}
-
 	}
 }
 
@@ -479,7 +477,7 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
 // publishes a message to server to update peers on this peer's behalf
 func publishClientPeers(cfg *config.ClientConfig) error {
 	payload := []byte(ncutils.MakeRandomString(16)) // just random string for now to keep the bytes different
-	if err := publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), payload); err != nil {
+	if err := publish(cfg, fmt.Sprintf("signal/%s", cfg.Node.ID), payload, 1); err != nil {
 		return err
 	}
 	return nil
@@ -513,7 +511,7 @@ func initialPull(network string) {
 	}
 }
 
-func publish(cfg *config.ClientConfig, dest string, msg []byte) error {
+func publish(cfg *config.ClientConfig, dest string, msg []byte, qos byte) error {
 	// setup the keys
 	trafficPrivKey, err := auth.RetrieveTrafficKey(cfg.Node.Network)
 	if err != nil {
@@ -532,7 +530,7 @@ func publish(cfg *config.ClientConfig, dest string, msg []byte) error {
 		return err
 	}
 
-	if token := client.Publish(dest, 0, false, encrypted); token.Wait() && token.Error() != nil {
+	if token := client.Publish(dest, qos, false, encrypted); token.Wait() && token.Error() != nil {
 		return token.Error()
 	}
 	return nil