Forráskód Böngészése

Merge branch 'develop' of https://github.com/gravitl/netmaker into NET-1119

abhishek9686 1 éve
szülő
commit
fff2b0071c

+ 1 - 1
.github/ISSUE_TEMPLATE/bug-report.yml

@@ -31,7 +31,7 @@ body:
       label: Version
       description: What version are you running?
       options:
-        - v0.23.1
+        - v0.24.0
         - v0.23.0
         - v0.22.0
         - v0.21.2

+ 1 - 1
README.md

@@ -16,7 +16,7 @@
 
 <p align="center">
   <a href="https://github.com/gravitl/netmaker/releases">
-    <img src="https://img.shields.io/badge/Version-0.23.1-informational?style=flat-square" />
+    <img src="https://img.shields.io/badge/Version-0.24.0-informational?style=flat-square" />
   </a>
   <a href="https://hub.docker.com/r/gravitl/netmaker/tags">
     <img src="https://img.shields.io/docker/pulls/gravitl/netmaker?label=downloads" />

+ 9 - 1
auth/host_session.go

@@ -248,12 +248,20 @@ func CheckNetRegAndHostUpdate(networks []string, h *models.Host, relayNodeId uui
 				// check if relay node exists and acting as relay
 				relaynode, err := logic.GetNodeByID(relayNodeId.String())
 				if err == nil && relaynode.IsRelay {
+					slog.Info(fmt.Sprintf("adding relayed node %s to relay %s on network %s", newNode.ID.String(), relayNodeId.String(), network))
 					newNode.IsRelayed = true
 					newNode.RelayedBy = relayNodeId.String()
-					slog.Info(fmt.Sprintf("adding relayed node %s to relay %s on network %s", newNode.ID.String(), relayNodeId.String(), network))
+					updatedRelayNode := relaynode
+					updatedRelayNode.RelayedNodes = append(updatedRelayNode.RelayedNodes, newNode.ID.String())
+					logic.UpdateRelayed(&relaynode, &updatedRelayNode)
+					if err := logic.UpsertNode(&updatedRelayNode); err != nil {
+						slog.Error("failed to update node", "nodeid", relayNodeId.String())
+					}
 					if err := logic.UpsertNode(newNode); err != nil {
 						slog.Error("failed to update node", "nodeid", relayNodeId.String())
 					}
+				} else {
+					slog.Error("failed to relay node. maybe specified relay node is actually not a relay?", "err", err)
 				}
 			}
 			logger.Log(1, "added new node", newNode.ID.String(), "to host", h.Name)

+ 3 - 0
cli/cmd/host/update.go

@@ -14,6 +14,7 @@ import (
 var (
 	apiHostFilePath string
 	endpoint        string
+	endpoint6       string
 	name            string
 	listenPort      int
 	mtu             int
@@ -40,6 +41,7 @@ var hostUpdateCmd = &cobra.Command{
 		} else {
 			apiHost.ID = args[0]
 			apiHost.EndpointIP = endpoint
+			apiHost.EndpointIPv6 = endpoint6
 			apiHost.Name = name
 			apiHost.ListenPort = listenPort
 			apiHost.MTU = mtu
@@ -54,6 +56,7 @@ var hostUpdateCmd = &cobra.Command{
 func init() {
 	hostUpdateCmd.Flags().StringVar(&apiHostFilePath, "file", "", "Path to host_definition.json")
 	hostUpdateCmd.Flags().StringVar(&endpoint, "endpoint", "", "Endpoint of the Host")
+	hostUpdateCmd.Flags().StringVar(&endpoint6, "endpoint6", "", "IPv6 Endpoint of the Host")
 	hostUpdateCmd.Flags().StringVar(&name, "name", "", "Host name")
 	hostUpdateCmd.Flags().IntVar(&listenPort, "listen_port", 0, "Listen port of the host")
 	hostUpdateCmd.Flags().IntVar(&mtu, "mtu", 0, "Host MTU size")

+ 1 - 1
compose/docker-compose.netclient.yml

@@ -3,7 +3,7 @@ version: "3.4"
 services:
   netclient:
     container_name: netclient
-    image: 'gravitl/netclient:v0.23.1'
+    image: 'gravitl/netclient:v0.24.0'
     hostname: netmaker-1
     network_mode: host
     restart: on-failure

+ 1 - 1
controllers/docs.go

@@ -10,7 +10,7 @@
 //
 //	Schemes: https
 //	BasePath: /
-//	Version: 0.23.1
+//	Version: 0.24.0
 //	Host: api.demo.netmaker.io
 //
 //	Consumes:

+ 1 - 1
controllers/enrollmentkeys.go

@@ -308,7 +308,7 @@ func handleHostRegister(w http.ResponseWriter, r *http.Request) {
 	if !hostExists {
 		newHost.PersistentKeepalive = models.DefaultPersistentKeepAlive
 		// register host
-		logic.CheckHostPorts(&newHost)
+		//logic.CheckHostPorts(&newHost)
 		// create EMQX credentials and ACLs for host
 		if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 			if err := mq.GetEmqxHandler().CreateEmqxUser(newHost.ID.String(), newHost.HostPass); err != nil {

+ 5 - 6
controllers/ext_client.go

@@ -436,15 +436,14 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	if err := logic.SetClientDefaultACLs(&extclient); err != nil {
-		slog.Error("failed to set default acls for extclient", "user", r.Header.Get("user"), "network", node.Network, "error", err)
-		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
-		return
-	}
-
 	slog.Info("created extclient", "user", r.Header.Get("user"), "network", node.Network, "clientid", extclient.ClientID)
 	w.WriteHeader(http.StatusOK)
 	go func() {
+		if err := logic.SetClientDefaultACLs(&extclient); err != nil {
+			slog.Error("failed to set default acls for extclient", "user", r.Header.Get("user"), "network", node.Network, "error", err)
+			logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
+			return
+		}
 		if err := mq.PublishPeerUpdate(false); err != nil {
 			logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error())
 		}

+ 17 - 16
controllers/hosts.go

@@ -554,26 +554,27 @@ func authenticateHost(response http.ResponseWriter, request *http.Request) {
 		logic.ReturnErrorResponse(response, request, errorResponse)
 		return
 	}
-
-	// Create EMQX creds and ACLs if not found
-	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-		if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password); err != nil {
-			slog.Error("failed to create host credentials for EMQX: ", err.Error())
-		} else {
-			if err := mq.GetEmqxHandler().CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
-				slog.Error("failed to add host ACL rules to EMQX: ", err.Error())
-			}
-			for _, nodeID := range host.Nodes {
-				if node, err := logic.GetNodeByID(nodeID); err == nil {
-					if err = mq.GetEmqxHandler().AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil {
-						slog.Error("failed to add ACLs for EMQX node", "error", err)
+	go func() {
+		// Create EMQX creds and ACLs if not found
+		if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
+			if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password); err != nil {
+				slog.Error("failed to create host credentials for EMQX: ", err.Error())
+			} else {
+				if err := mq.GetEmqxHandler().CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
+					slog.Error("failed to add host ACL rules to EMQX: ", err.Error())
+				}
+				for _, nodeID := range host.Nodes {
+					if node, err := logic.GetNodeByID(nodeID); err == nil {
+						if err = mq.GetEmqxHandler().AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil {
+							slog.Error("failed to add ACLs for EMQX node", "error", err)
+						}
+					} else {
+						slog.Error("failed to get node", "nodeid", nodeID, "error", err)
 					}
-				} else {
-					slog.Error("failed to get node", "nodeid", nodeID, "error", err)
 				}
 			}
 		}
-	}
+	}()
 
 	response.WriteHeader(http.StatusOK)
 	response.Header().Set("Content-Type", "application/json")

+ 1 - 1
k8s/client/netclient-daemonset.yaml

@@ -16,7 +16,7 @@ spec:
       hostNetwork: true
       containers:
       - name: netclient
-        image: gravitl/netclient:v0.23.1
+        image: gravitl/netclient:v0.24.0
         env:
         - name: TOKEN
           value: "TOKEN_VALUE"

+ 1 - 1
k8s/client/netclient.yaml

@@ -28,7 +28,7 @@ spec:
       #           - "<node label value>"
       containers:
       - name: netclient
-        image: gravitl/netclient:v0.23.1
+        image: gravitl/netclient:v0.24.0
         env:
         - name: TOKEN
           value: "TOKEN_VALUE"

+ 1 - 1
k8s/server/netmaker-ui.yaml

@@ -15,7 +15,7 @@ spec:
     spec:
       containers:
       - name: netmaker-ui
-        image: gravitl/netmaker-ui:v0.23.1
+        image: gravitl/netmaker-ui:v0.24.0
         ports:
         - containerPort: 443
         env:

+ 4 - 2
logic/acls/common.go

@@ -64,9 +64,9 @@ func (acl ACL) Save(containerID ContainerID, ID AclID) (ACL, error) {
 
 // ACL.IsAllowed - sees if ID is allowed in referring ACL
 func (acl ACL) IsAllowed(ID AclID) (allowed bool) {
-	AclMutex.RLock()
+	AclMutex.Lock()
 	allowed = acl[ID] == Allowed
-	AclMutex.RUnlock()
+	AclMutex.Unlock()
 	return
 }
 
@@ -88,6 +88,8 @@ func (aclContainer ACLContainer) RemoveACL(ID AclID) ACLContainer {
 
 // ACLContainer.ChangeAccess - changes the relationship between two nodes in memory
 func (networkACL ACLContainer) ChangeAccess(ID1, ID2 AclID, value byte) {
+	AclMutex.Lock()
+	defer AclMutex.Unlock()
 	if _, ok := networkACL[ID1]; !ok {
 		slog.Error("ACL missing for ", "id", ID1)
 		return

+ 7 - 2
logic/acls/nodeacls/retrieve.go

@@ -3,21 +3,26 @@ package nodeacls
 import (
 	"encoding/json"
 	"fmt"
+	"sync"
 
 	"github.com/gravitl/netmaker/logic/acls"
 )
 
+var NodesAllowedACLMutex = &sync.Mutex{}
+
 // AreNodesAllowed - checks if nodes are allowed to communicate in their network ACL
 func AreNodesAllowed(networkID NetworkID, node1, node2 NodeID) bool {
+	NodesAllowedACLMutex.Lock()
+	defer NodesAllowedACLMutex.Unlock()
 	var currentNetworkACL, err = FetchAllACLs(networkID)
 	if err != nil {
 		return false
 	}
 	var allowed bool
-	acls.AclMutex.RLock()
+	acls.AclMutex.Lock()
 	currNetworkACLNode1 := currentNetworkACL[acls.AclID(node1)]
 	currNetworkACLNode2 := currentNetworkACL[acls.AclID(node2)]
-	acls.AclMutex.RUnlock()
+	acls.AclMutex.Unlock()
 	allowed = currNetworkACLNode1.IsAllowed(acls.AclID(node2)) && currNetworkACLNode2.IsAllowed(acls.AclID(node1))
 	return allowed
 }

+ 2 - 2
logic/errors.go

@@ -4,8 +4,8 @@ import (
 	"encoding/json"
 	"net/http"
 
-	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/models"
+	"golang.org/x/exp/slog"
 )
 
 // FormatError - takes ErrorResponse and uses correct code
@@ -62,7 +62,7 @@ func ReturnErrorResponse(response http.ResponseWriter, request *http.Request, er
 	if err != nil {
 		panic(err)
 	}
-	logger.Log(1, "processed request error:", errorMessage.Message)
+	slog.Debug("processed request error", "err", errorMessage.Message)
 	response.Header().Set("Content-Type", "application/json")
 	response.WriteHeader(errorMessage.Code)
 	response.Write(jsonResponse)

+ 5 - 0
logic/hosts.go

@@ -217,6 +217,7 @@ func UpdateHost(newHost, currentHost *models.Host) {
 	newHost.Nodes = currentHost.Nodes
 	newHost.PublicKey = currentHost.PublicKey
 	newHost.TrafficKeyPublic = currentHost.TrafficKeyPublic
+	newHost.EndpointIPv6 = currentHost.EndpointIPv6
 	// changeable fields
 	if len(newHost.Version) == 0 {
 		newHost.Version = currentHost.Version
@@ -258,6 +259,10 @@ func UpdateHostFromClient(newHost, currHost *models.Host) (sendPeerUpdate bool)
 		currHost.EndpointIP = newHost.EndpointIP
 		sendPeerUpdate = true
 	}
+	if currHost.EndpointIPv6.String() != newHost.EndpointIPv6.String() {
+		currHost.EndpointIPv6 = newHost.EndpointIPv6
+		sendPeerUpdate = true
+	}
 	currHost.DaemonInstalled = newHost.DaemonInstalled
 	currHost.Debug = newHost.Debug
 	currHost.Verbosity = newHost.Verbosity

+ 1 - 1
logic/networks.go

@@ -138,7 +138,7 @@ func GetParentNetwork(networkname string) (models.Network, error) {
 	return network, nil
 }
 
-// GetParentNetwork - get parent network
+// GetNetworkSettings - get parent network
 func GetNetworkSettings(networkname string) (models.Network, error) {
 
 	var network models.Network

+ 14 - 1
logic/peers.go

@@ -211,8 +211,21 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N
 					uselocal = false
 				}
 			}
+
+			//if host is ipv4 only or ipv4+ipv6, set the peer endpoint to ipv4 address, if host is ipv6 only, set the peer endpoint to ipv6 address
+			peerEndpoint := peerHost.EndpointIP
+			if ipv4 := host.EndpointIP.To4(); ipv4 != nil {
+				peerEndpoint = peerHost.EndpointIP
+			} else {
+				//if peer host's ipv6 address is empty, it means that peer is an IPv4 only host
+				//IPv4 only host could not communicate with IPv6 only host
+				if peerHost.EndpointIPv6 != nil && peerHost.EndpointIPv6.String() != "" {
+					peerEndpoint = peerHost.EndpointIPv6
+				}
+			}
+
 			peerConfig.Endpoint = &net.UDPAddr{
-				IP:   peerHost.EndpointIP,
+				IP:   peerEndpoint,
 				Port: GetPeerListenPort(peerHost),
 			}
 

+ 1 - 1
logic/util.go

@@ -89,7 +89,7 @@ func StringSliceContains(slice []string, item string) bool {
 	return false
 }
 
-// NormalCIDR - returns the first address of CIDR
+// NormalizeCIDR - returns the first address of CIDR
 func NormalizeCIDR(address string) (string, error) {
 	ip, IPNet, err := net.ParseCIDR(address)
 	if err != nil {

+ 1 - 1
logic/zombie.go

@@ -76,7 +76,7 @@ func checkForZombieHosts(h *models.Host) {
 // ManageZombies - goroutine which adds/removes/deletes nodes from the zombie node quarantine list
 func ManageZombies(ctx context.Context, peerUpdate chan *models.Node) {
 	logger.Log(2, "Zombie management started")
-	InitializeZombies()
+	go InitializeZombies()
 
 	// Zombie Nodes Cleanup Four Times a Day
 	ticker := time.NewTicker(time.Hour * ZOMBIE_TIMEOUT)

+ 2 - 2
main.go

@@ -28,7 +28,7 @@ import (
 	"golang.org/x/exp/slog"
 )
 
-var version = "v0.23.1"
+var version = "v0.24.0"
 
 // Start DB Connection and start API Request Handler
 func main() {
@@ -155,7 +155,7 @@ func runMessageQueue(wg *sync.WaitGroup, ctx context.Context) {
 	defer wg.Done()
 	brokerHost, _ := servercfg.GetMessageQueueEndpoint()
 	logger.Log(0, "connecting to mq broker at", brokerHost)
-	mq.SetupMQTT()
+	mq.SetupMQTT(true)
 	if mq.IsConnected() {
 		logger.Log(0, "connected to MQ Broker")
 	} else {

+ 1 - 1
migrate/migrate.go

@@ -287,7 +287,7 @@ func updateAcls() {
 		}
 
 		// save new acls
-		slog.Info(fmt.Sprintf("(migration) saving new acls for network: %s", network.NetID), "networkAcl", networkAcl)
+		slog.Debug(fmt.Sprintf("(migration) saving new acls for network: %s", network.NetID), "networkAcl", networkAcl)
 		if _, err := networkAcl.Save(acls.ContainerID(network.NetID)); err != nil {
 			slog.Error(fmt.Sprintf("error during acls migration. error saving new acls for network: %s", network.NetID), "error", err)
 			continue

+ 7 - 0
models/api_host.go

@@ -22,6 +22,7 @@ type ApiHost struct {
 	Interfaces          []ApiIface `json:"interfaces"            yaml:"interfaces"`
 	DefaultInterface    string     `json:"defaultinterface"      yaml:"defautlinterface"`
 	EndpointIP          string     `json:"endpointip"            yaml:"endpointip"`
+	EndpointIPv6        string     `json:"endpointipv6"            yaml:"endpointipv6"`
 	PublicKey           string     `json:"publickey"`
 	MacAddress          string     `json:"macaddress"`
 	Nodes               []string   `json:"nodes"`
@@ -43,6 +44,7 @@ func (h *Host) ConvertNMHostToAPI() *ApiHost {
 	a := ApiHost{}
 	a.Debug = h.Debug
 	a.EndpointIP = h.EndpointIP.String()
+	a.EndpointIPv6 = h.EndpointIPv6.String()
 	a.FirewallInUse = h.FirewallInUse
 	a.ID = h.ID.String()
 	a.Interfaces = make([]ApiIface, len(h.Interfaces))
@@ -83,6 +85,11 @@ func (a *ApiHost) ConvertAPIHostToNMHost(currentHost *Host) *Host {
 	} else {
 		h.EndpointIP = net.ParseIP(a.EndpointIP)
 	}
+	if len(a.EndpointIPv6) == 0 || strings.Contains(a.EndpointIPv6, "nil") {
+		h.EndpointIPv6 = currentHost.EndpointIPv6
+	} else {
+		h.EndpointIPv6 = net.ParseIP(a.EndpointIPv6)
+	}
 	h.Debug = a.Debug
 	h.FirewallInUse = a.FirewallInUse
 	h.IPForwarding = currentHost.IPForwarding

+ 1 - 0
models/host.go

@@ -63,6 +63,7 @@ type Host struct {
 	Interfaces          []Iface          `json:"interfaces"              yaml:"interfaces"`
 	DefaultInterface    string           `json:"defaultinterface"        yaml:"defaultinterface"`
 	EndpointIP          net.IP           `json:"endpointip"              yaml:"endpointip"`
+	EndpointIPv6        net.IP           `json:"endpointipv6"            yaml:"endpointipv6"`
 	IsDocker            bool             `json:"isdocker"                yaml:"isdocker"`
 	IsK8S               bool             `json:"isk8s"                   yaml:"isk8s"`
 	IsStatic            bool             `json:"isstatic"                yaml:"isstatic"`

+ 3 - 138
mq/emqx_cloud.go

@@ -22,13 +22,6 @@ type userCreateReq struct {
 	Password string `json:"password"`
 }
 
-type cloudAcl struct {
-	UserName string `json:"username"`
-	Topic    string `json:"topic"`
-	Action   string `json:"action"`
-	Access   string `json:"access"`
-}
-
 func (e *EmqxCloud) GetType() servercfg.Emqxdeploy { return servercfg.EmqxCloudDeploy }
 
 func (e *EmqxCloud) CreateEmqxUser(username, pass string) error {
@@ -89,54 +82,7 @@ func (e *EmqxCloud) CreateEmqxUserforServer() error {
 	if res.StatusCode != http.StatusOK {
 		return errors.New("request failed " + string(body))
 	}
-	// add acls
-	acls := []cloudAcl{
-		{
-			UserName: servercfg.GetMqUserName(),
-			Topic:    fmt.Sprintf("update/%s/#", servercfg.GetServer()),
-			Access:   "allow",
-			Action:   "sub",
-		},
-		{
-			UserName: servercfg.GetMqUserName(),
-			Topic:    fmt.Sprintf("host/serverupdate/%s/#", servercfg.GetServer()),
-			Access:   "allow",
-			Action:   "sub",
-		},
-		{
-			UserName: servercfg.GetMqUserName(),
-			Topic:    fmt.Sprintf("signal/%s/#", servercfg.GetServer()),
-			Access:   "allow",
-			Action:   "sub",
-		},
-		{
-			UserName: servercfg.GetMqUserName(),
-			Topic:    fmt.Sprintf("metrics/%s/#", servercfg.GetServer()),
-			Access:   "allow",
-			Action:   "sub",
-		},
-		{
-			UserName: servercfg.GetMqUserName(),
-			Topic:    "peers/host/#",
-			Access:   "allow",
-			Action:   "pub",
-		},
-		{
-			UserName: servercfg.GetMqUserName(),
-			Topic:    "node/update/#",
-			Access:   "allow",
-			Action:   "pub",
-		},
-		{
-
-			UserName: servercfg.GetMqUserName(),
-			Topic:    "host/update/#",
-			Access:   "allow",
-			Action:   "pub",
-		},
-	}
-
-	return e.createacls(acls)
+	return nil
 }
 
 func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ignore
@@ -147,94 +93,13 @@ func (e *EmqxCloud) CreateDefaultDenyRule() error {
 	return nil
 }
 
-func (e *EmqxCloud) createacls(acls []cloudAcl) error {
-	payload, err := json.Marshal(acls)
-	if err != nil {
-		return err
-	}
-	client := &http.Client{}
-	req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/api/acl", e.URL), strings.NewReader(string(payload)))
-	if err != nil {
-		return err
-	}
-	req.Header.Add("Content-Type", "application/json")
-	req.SetBasicAuth(e.AppID, e.AppSecret)
-	res, err := client.Do(req)
-	if err != nil {
-		return err
-	}
-	defer res.Body.Close()
-
-	body, err := io.ReadAll(res.Body)
-	if err != nil {
-		return err
-	}
-	if res.StatusCode != http.StatusOK {
-		return errors.New("request failed " + string(body))
-	}
-	return nil
-}
-
 func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
-	acls := []cloudAcl{
-		{
-			UserName: hostID,
-			Topic:    fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
-			Access:   "allow",
-			Action:   "sub",
-		},
-		{
-			UserName: hostID,
-			Topic:    fmt.Sprintf("host/update/%s/%s", hostID, serverName),
-			Access:   "allow",
-			Action:   "sub",
-		},
-		{
-			UserName: hostID,
-			Topic:    fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
-			Access:   "allow",
-			Action:   "pub",
-		},
-	}
-
-	return e.createacls(acls)
+	return nil
 }
 
 func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
-	acls := []cloudAcl{
-		{
-			UserName: hostID,
-			Topic:    fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
-			Access:   "allow",
-			Action:   "sub",
-		},
-		{
-			UserName: hostID,
-			Topic:    fmt.Sprintf("ping/%s/%s", serverName, nodeID),
-			Access:   "allow",
-			Action:   "pubsub",
-		},
-		{
-			UserName: hostID,
-			Topic:    fmt.Sprintf("update/%s/%s", serverName, nodeID),
-			Access:   "allow",
-			Action:   "pubsub",
-		},
-		{
-			UserName: hostID,
-			Topic:    fmt.Sprintf("signal/%s/%s", serverName, nodeID),
-			Access:   "allow",
-			Action:   "pubsub",
-		},
-		{
-			UserName: hostID,
-			Topic:    fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
-			Access:   "allow",
-			Action:   "pubsub",
-		},
-	}
+	return nil
 
-	return e.createacls(acls)
 }
 
 func (e *EmqxCloud) GetUserACL(username string) (*aclObject, error) { return nil, nil } // ununsed on cloud since it doesn't overwrite acls list

+ 4 - 2
mq/handlers.go

@@ -92,7 +92,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 	}
 	decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload())
 	if decryptErr != nil {
-		slog.Error("failed to decrypt message for host", "id", id, "error", decryptErr)
+		slog.Error("failed to decrypt message for host", "id", id, "name", currentHost.Name, "error", decryptErr)
 		return
 	}
 	var hostUpdate models.HostUpdate
@@ -296,9 +296,11 @@ func HandleHostCheckin(h, currentHost *models.Host) bool {
 		!h.EndpointIP.Equal(currentHost.EndpointIP) ||
 		(len(h.NatType) > 0 && h.NatType != currentHost.NatType) ||
 		h.DefaultInterface != currentHost.DefaultInterface ||
-		(h.ListenPort != 0 && h.ListenPort != currentHost.ListenPort) || (h.WgPublicListenPort != 0 && h.WgPublicListenPort != currentHost.WgPublicListenPort)
+		(h.ListenPort != 0 && h.ListenPort != currentHost.ListenPort) ||
+		(h.WgPublicListenPort != 0 && h.WgPublicListenPort != currentHost.WgPublicListenPort) || (!h.EndpointIPv6.Equal(currentHost.EndpointIPv6))
 	if ifaceDelta { // only save if something changes
 		currentHost.EndpointIP = h.EndpointIP
+		currentHost.EndpointIPv6 = h.EndpointIPv6
 		currentHost.Interfaces = h.Interfaces
 		currentHost.DefaultInterface = h.DefaultInterface
 		currentHost.NatType = h.NatType

+ 22 - 7
mq/mq.go

@@ -8,8 +8,8 @@ import (
 
 	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"github.com/gravitl/netmaker/logger"
-	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/servercfg"
+	"golang.org/x/exp/slog"
 )
 
 // KEEPALIVE_TIMEOUT - time in seconds for timeout
@@ -27,12 +27,12 @@ var mqclient mqtt.Client
 func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
 	broker, _ := servercfg.GetMessageQueueEndpoint()
 	opts.AddBroker(broker)
-	id := logic.RandomString(23)
-	opts.ClientID = id
+	opts.ClientID = user
 	opts.SetUsername(user)
 	opts.SetPassword(password)
 	opts.SetAutoReconnect(true)
 	opts.SetConnectRetry(true)
+	opts.SetCleanSession(true)
 	opts.SetConnectRetryInterval(time.Second * 4)
 	opts.SetKeepAlive(time.Minute)
 	opts.SetCleanSession(true)
@@ -40,7 +40,7 @@ func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
 }
 
 // SetupMQTT creates a connection to broker and return client
-func SetupMQTT() {
+func SetupMQTT(fatal bool) {
 	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 		if emqx.GetType() == servercfg.EmqxOnPremDeploy {
 			time.Sleep(10 * time.Second) // wait for the REST endpoint to be ready
@@ -70,6 +70,7 @@ func SetupMQTT() {
 
 	opts := mqtt.NewClientOptions()
 	setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts)
+	logger.Log(0, "Mq Client Connecting with Random ID: ", opts.ClientID)
 	opts.SetOnConnectHandler(func(client mqtt.Client) {
 		serverName := servercfg.GetServer()
 		if token := client.Subscribe(fmt.Sprintf("update/%s/#", serverName), 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
@@ -92,6 +93,13 @@ func SetupMQTT() {
 		opts.SetOrderMatters(false)
 		opts.SetResumeSubs(true)
 	})
+	opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
+		slog.Warn("detected broker connection lost", "err", e.Error())
+		c.Disconnect(250)
+		slog.Info("re-initiating MQ connection")
+		SetupMQTT(false)
+
+	})
 	mqclient = mqtt.NewClient(opts)
 	tperiod := time.Now().Add(10 * time.Second)
 	for {
@@ -99,9 +107,16 @@ func SetupMQTT() {
 			logger.Log(2, "unable to connect to broker, retrying ...")
 			if time.Now().After(tperiod) {
 				if token.Error() == nil {
-					logger.FatalLog("could not connect to broker, token timeout, exiting ...")
+					if fatal {
+						logger.FatalLog("could not connect to broker, token timeout, exiting ...")
+					}
+					logger.Log(0, "could not connect to broker, token timeout, exiting ...")
+
 				} else {
-					logger.FatalLog("could not connect to broker, exiting ...", token.Error().Error())
+					if fatal {
+						logger.FatalLog("could not connect to broker, exiting ...", token.Error().Error())
+					}
+					logger.Log(0, "could not connect to broker, exiting ...", token.Error().Error())
 				}
 			}
 		} else {
@@ -125,7 +140,7 @@ func Keepalive(ctx context.Context) {
 
 // IsConnected - function for determining if the mqclient is connected or not
 func IsConnected() bool {
-	return mqclient != nil && mqclient.IsConnected()
+	return mqclient != nil && mqclient.IsConnectionOpen()
 }
 
 // CloseClient - function to close the mq connection from server

+ 6 - 1
pro/controllers/failover.go

@@ -134,10 +134,15 @@ func failOverME(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
 		return
 	}
+	host, err := logic.GetHost(node.HostID.String())
+	if err != nil {
+		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
+		return
+	}
 
 	failOverNode, exists := proLogic.FailOverExists(node.Network)
 	if !exists {
-		logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failover node doesn't exist in the network"), "badrequest"))
+		logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("req-from: %s, failover node doesn't exist in the network", host.Name), "badrequest"))
 		return
 	}
 	var failOverReq models.FailOverMeReq

+ 1 - 1
pro/logic/relays.go

@@ -238,7 +238,7 @@ func getRelayedAddresses(id string) []net.IPNet {
 		addrs = append(addrs, node.Address)
 	}
 	if node.Address6.IP != nil {
-		node.Address.Mask = net.CIDRMask(128, 128)
+		node.Address6.Mask = net.CIDRMask(128, 128)
 		addrs = append(addrs, node.Address6)
 	}
 	return addrs

+ 1 - 1
release.md

@@ -1,4 +1,4 @@
-# Netmaker v0.23.1
+# Netmaker v0.24.0
 
 ## Whats New ✨
 

+ 1 - 1
swagger.yml

@@ -1464,7 +1464,7 @@ info:
 
         API calls must be authenticated via a header of the format -H “Authorization: Bearer <YOUR_SECRET_KEY>” There are two methods to obtain YOUR_SECRET_KEY: 1. Using the masterkey. By default, this value is “secret key,” but you should change this on your instance and keep it secure. This value can be set via env var at startup or in a config file (config/environments/< env >.yaml). See the [Netmaker](https://docs.netmaker.org/index.html) documentation for more details. 2. Using a JWT received for a node. This can be retrieved by calling the /api/nodes/<network>/authenticate endpoint, as documented below.
     title: Netmaker
-    version: 0.23.1
+    version: 0.24.0
 paths:
     /api/dns:
         get: