Browse Source

add /mqtt for EMQX websocket listen endpoint

Anish Mukherjee 2 years ago
parent
commit
5279900b7c
4 changed files with 12 additions and 6 deletions
  1. 0 2
      mq/emqx.go
  2. 2 2
      mq/handlers.go
  3. 1 1
      mq/mq.go
  4. 9 1
      servercfg/serverconf.go

+ 0 - 2
mq/emqx.go

@@ -10,8 +10,6 @@ import (
 	"github.com/gravitl/netmaker/servercfg"
 )
 
-const emqxBrokerType = "emqx"
-
 type (
 	emqxUser struct {
 		UserID   string `json:"user_id"`

+ 2 - 2
mq/handlers.go

@@ -145,7 +145,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 		var sendPeerUpdate bool
 		switch hostUpdate.Action {
 		case models.UpdateHost:
-			if servercfg.GetBrokerType() == emqxBrokerType {
+			if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 				// create EMQX credentials for host if it doesn't exists
 				if _, err := logic.GetHost(currentHost.ID.String()); err != nil {
 					if err := CreateEmqxUser(currentHost.ID.String(), currentHost.HostPass, false); err != nil {
@@ -161,7 +161,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 				return
 			}
 		case models.DeleteHost:
-			if servercfg.GetBrokerType() == emqxBrokerType {
+			if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 				// delete EMQX credentials for host
 				if err := DeleteEmqxUser(currentHost.ID.String()); err != nil {
 					logger.Log(0, "failed to remove host credentials from EMQX: ", currentHost.ID.String(), err.Error())

+ 1 - 1
mq/mq.go

@@ -39,7 +39,7 @@ func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
 
 // SetupMQTT creates a connection to broker and return client
 func SetupMQTT() {
-	if servercfg.GetBrokerType() == emqxBrokerType {
+	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 		time.Sleep(10 * time.Second) // wait for the REST endpoint to be ready
 		// setup authenticator and create admin user
 		if err := CreateEmqxDefaultAuthenticator(); err != nil {

+ 9 - 1
servercfg/serverconf.go

@@ -13,6 +13,9 @@ import (
 	"github.com/gravitl/netmaker/models"
 )
 
+// EmqxBrokerType denotes the broker type for EMQX MQTT
+const EmqxBrokerType = "emqx"
+
 var (
 	Version = "dev"
 	Is_EE   = false
@@ -249,7 +252,12 @@ func GetMessageQueueEndpoint() (string, bool) {
 	} else {
 		host = "ws://" + host
 	}
-	return host + ":" + GetMQServerPort(), secure
+	host += ":" + GetMQServerPort()
+	// websocket listen endpoint for EMQX broker is ws://host:port/mqtt
+	if GetBrokerType() == EmqxBrokerType {
+		host += "/mqtt"
+	}
+	return host, secure
 }
 
 // GetBrokerType - returns the type of MQ broker