Browse Source

add emqx boilerplate

Anish Mukherjee 2 years ago
parent
commit
d2adf88b21
6 changed files with 224 additions and 8 deletions
  1. 24 8
      compose/docker-compose.yml
  2. 2 0
      config/config.go
  3. 156 0
      mq/emqx.go
  4. 16 0
      mq/handlers.go
  5. 10 0
      mq/mq.go
  6. 16 0
      servercfg/serverconf.go

+ 24 - 8
compose/docker-compose.yml

@@ -10,6 +10,8 @@ services:
       - sqldata:/root/data
       - sqldata:/root/data
     environment:
     environment:
       BROKER_NAME: "broker.NETMAKER_BASE_DOMAIN"
       BROKER_NAME: "broker.NETMAKER_BASE_DOMAIN"
+      BROKER_TYPE: "emqx"
+      EMQX_REST_ENDPOINT: "http://mq:18083"
       SERVER_NAME: "NETMAKER_BASE_DOMAIN"
       SERVER_NAME: "NETMAKER_BASE_DOMAIN"
       STUN_DOMAIN: "stun.NETMAKER_BASE_DOMAIN"
       STUN_DOMAIN: "stun.NETMAKER_BASE_DOMAIN"
       SERVER_HOST: "SERVER_PUBLIC_IP"
       SERVER_HOST: "SERVER_PUBLIC_IP"
@@ -62,20 +64,34 @@ services:
     restart: always
     restart: always
     volumes:
     volumes:
       - dnsconfig:/root/dnsconfig
       - dnsconfig:/root/dnsconfig
+  # mq:
+  #   container_name: mq
+  #   image: eclipse-mosquitto:2.0.15-openssl
+  #   depends_on:
+  #     - netmaker
+  #   restart: unless-stopped
+  #   command: ["/mosquitto/config/wait.sh"]
+  #   environment:
+  #     MQ_PASSWORD: "REPLACE_MQ_PASSWORD"
+  #     MQ_USERNAME: "REPLACE_MQ_USERNAME"
+  #   volumes:
+  #     - /root/mosquitto.conf:/mosquitto/config/mosquitto.conf
+  #     - /root/wait.sh:/mosquitto/config/wait.sh
+  #     - mosquitto_logs:/mosquitto/log
   mq:
   mq:
     container_name: mq
     container_name: mq
-    image: eclipse-mosquitto:2.0.15-openssl
+    image: emqx/emqx:5.0.17
     depends_on:
     depends_on:
       - netmaker
       - netmaker
     restart: unless-stopped
     restart: unless-stopped
-    command: ["/mosquitto/config/wait.sh"]
     environment:
     environment:
-      MQ_PASSWORD: "REPLACE_MQ_PASSWORD"
-      MQ_USERNAME: "REPLACE_MQ_USERNAME"
-    volumes:
-      - /root/mosquitto.conf:/mosquitto/config/mosquitto.conf
-      - /root/wait.sh:/mosquitto/config/wait.sh
-      - mosquitto_logs:/mosquitto/log
+      EMQX_NAME: "emqx"
+      EMQX_DASHBOARD__DEFAULT_PASSWORD: "REPLACE_MQ_PASSWORD"
+      EMQX_DASHBOARD__DEFAULT_USERNAME: "REPLACE_MQ_USERNAME"
+    ports:
+      - "1883:1883"
+      - "8883:8883"
+      - "18083:18083"
 volumes:
 volumes:
   caddy_data: {}
   caddy_data: {}
   caddy_conf: {}
   caddy_conf: {}

+ 2 - 0
config/config.go

@@ -37,6 +37,8 @@ type ServerConfig struct {
 	APIHost               string `yaml:"apihost"`
 	APIHost               string `yaml:"apihost"`
 	APIPort               string `yaml:"apiport"`
 	APIPort               string `yaml:"apiport"`
 	MQHOST                string `yaml:"mqhost"`
 	MQHOST                string `yaml:"mqhost"`
+	BrokerType            string `yaml:"brokertype`
+	EmqxRestEndpoint      string `yaml:"emqxrestendpoint"`
 	MasterKey             string `yaml:"masterkey"`
 	MasterKey             string `yaml:"masterkey"`
 	DNSKey                string `yaml:"dnskey"`
 	DNSKey                string `yaml:"dnskey"`
 	AllowedOrigin         string `yaml:"allowedorigin"`
 	AllowedOrigin         string `yaml:"allowedorigin"`

+ 156 - 0
mq/emqx.go

@@ -0,0 +1,156 @@
+package mq
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io"
+	"net/http"
+
+	"github.com/gravitl/netmaker/servercfg"
+)
+
+const emqxBrokerType = "emqx"
+
+type (
+	emqxUser struct {
+		UserID   string `json:"user_id"`
+		Password string `json:"password"`
+		Admin    bool   `json:"is_superuser"`
+	}
+
+	emqxLogin struct {
+		Username string `json:"username"`
+		Password string `json:"password"`
+	}
+
+	emqxLoginResponse struct {
+		License struct {
+			Edition string `json:"edition"`
+		} `json:"license"`
+		Token   string `json:"token"`
+		Version string `json:"version"`
+	}
+)
+
+func getEmqxAuthToken() (string, error) {
+	payload, err := json.Marshal(&emqxLogin{
+		Username: servercfg.GetMqUserName(),
+		Password: servercfg.GetMqPassword(),
+	})
+	if err != nil {
+		return "", err
+	}
+	resp, err := http.Post(servercfg.GetEmqxRestEndpoint()+"/api/v5/login", "application/json", bytes.NewReader(payload))
+	if err != nil {
+		return "", err
+	}
+	msg, err := io.ReadAll(resp.Body)
+	if err != nil {
+		return "", err
+	}
+	if resp.StatusCode != http.StatusOK {
+		return "", fmt.Errorf("error during EMQX login %v", string(msg))
+	}
+	var loginResp emqxLoginResponse
+	if err := json.Unmarshal(msg, &loginResp); err != nil {
+		return "", err
+	}
+	return loginResp.Token, nil
+}
+
+// CreateEmqxUser - creates an EMQX user
+func CreateEmqxUser(username, password string, admin bool) error {
+	token, err := getEmqxAuthToken()
+	if err != nil {
+		return err
+	}
+	payload, err := json.Marshal(&emqxUser{
+		UserID:   username,
+		Password: password,
+		Admin:    admin,
+	})
+	if err != nil {
+		return err
+	}
+	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users", bytes.NewReader(payload))
+	if err != nil {
+		return err
+	}
+	req.Header.Add("content-type", "application/json")
+	req.Header.Add("authorization", "Bearer "+token)
+	resp, err := (&http.Client{}).Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode >= 300 {
+		msg, err := io.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		return fmt.Errorf("error creating EMQX user %v", string(msg))
+	}
+	return nil
+}
+
+// DeleteEmqxUser - deletes an EMQX user
+func DeleteEmqxUser(username string) error {
+	token, err := getEmqxAuthToken()
+	if err != nil {
+		return err
+	}
+	req, err := http.NewRequest(http.MethodDelete, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users/"+username, nil)
+	if err != nil {
+		return err
+	}
+	req.Header.Add("authorization", "Bearer "+token)
+	resp, err := (&http.Client{}).Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode >= 300 {
+		msg, err := io.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		return fmt.Errorf("error deleting EMQX user %v", string(msg))
+	}
+	return nil
+}
+
+// CreateEmqxDefaultAuthenticator - creates a default authenticator based on password and using EMQX's built in database as storage
+func CreateEmqxDefaultAuthenticator() error {
+	token, err := getEmqxAuthToken()
+	if err != nil {
+		return err
+	}
+	payload, err := json.Marshal(&struct {
+		Mechanism  string `json:"mechanism"`
+		Backend    string `json:"backend"`
+		UserIDType string `json:"user_id_type"`
+	}{Mechanism: "password_based", Backend: "built_in_database", UserIDType: "username"})
+	if err != nil {
+		return err
+	}
+	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication", bytes.NewReader(payload))
+	if err != nil {
+		return err
+	}
+	req.Header.Add("content-type", "application/json")
+	req.Header.Add("authorization", "Bearer "+token)
+	resp, err := (&http.Client{}).Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode != http.StatusOK {
+		msg, err := io.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		return fmt.Errorf("error creating default EMQX authenticator %v", string(msg))
+	}
+	return nil
+}

+ 16 - 0
mq/handlers.go

@@ -145,6 +145,15 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 		var sendPeerUpdate bool
 		var sendPeerUpdate bool
 		switch hostUpdate.Action {
 		switch hostUpdate.Action {
 		case models.UpdateHost:
 		case models.UpdateHost:
+			if servercfg.GetBrokerType() == emqxBrokerType {
+				// create EMQX credentials for host if it doesn't exists
+				if _, err := logic.GetHost(currentHost.ID.String()); err != nil {
+					if err := CreateEmqxUser(currentHost.ID.String(), currentHost.HostPass, false); err != nil {
+						logger.Log(0, "failed to add host credentials to EMQX: ", currentHost.ID.String(), err.Error())
+						return
+					}
+				}
+			}
 			sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
 			sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
 			err := logic.UpsertHost(currentHost)
 			err := logic.UpsertHost(currentHost)
 			if err != nil {
 			if err != nil {
@@ -152,6 +161,13 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 				return
 				return
 			}
 			}
 		case models.DeleteHost:
 		case models.DeleteHost:
+			if servercfg.GetBrokerType() == emqxBrokerType {
+				// delete EMQX credentials for host
+				if err := DeleteEmqxUser(currentHost.ID.String()); err != nil {
+					logger.Log(0, "failed to remove host credentials from EMQX: ", currentHost.ID.String(), err.Error())
+					return
+				}
+			}
 			if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil {
 			if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil {
 				logger.Log(0, "failed to delete all nodes of host: ", currentHost.ID.String(), err.Error())
 				logger.Log(0, "failed to delete all nodes of host: ", currentHost.ID.String(), err.Error())
 				return
 				return

+ 10 - 0
mq/mq.go

@@ -2,6 +2,7 @@ package mq
 
 
 import (
 import (
 	"context"
 	"context"
+	"log"
 	"time"
 	"time"
 
 
 	mqtt "github.com/eclipse/paho.mqtt.golang"
 	mqtt "github.com/eclipse/paho.mqtt.golang"
@@ -38,6 +39,15 @@ func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
 
 
 // SetupMQTT creates a connection to broker and return client
 // SetupMQTT creates a connection to broker and return client
 func SetupMQTT() {
 func SetupMQTT() {
+	if servercfg.GetBrokerType() == emqxBrokerType {
+		// setup authenticator and create admin user
+		if err := CreateEmqxDefaultAuthenticator(); err != nil {
+			logger.Log(0, err.Error())
+		}
+		if err := CreateEmqxUser(servercfg.GetMqUserName(), servercfg.GetMqPassword(), true); err != nil {
+			log.Fatal(err)
+		}
+	}
 	opts := mqtt.NewClientOptions()
 	opts := mqtt.NewClientOptions()
 	setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts)
 	setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts)
 	opts.SetOnConnectHandler(func(client mqtt.Client) {
 	opts.SetOnConnectHandler(func(client mqtt.Client) {

+ 16 - 0
servercfg/serverconf.go

@@ -43,6 +43,8 @@ func GetServerConfig() config.ServerConfig {
 	cfg.NodeID = GetNodeID()
 	cfg.NodeID = GetNodeID()
 	cfg.StunHost = GetStunAddr()
 	cfg.StunHost = GetStunAddr()
 	cfg.StunPort = GetStunPort()
 	cfg.StunPort = GetStunPort()
+	cfg.BrokerType = GetBrokerType()
+	cfg.EmqxRestEndpoint = GetEmqxRestEndpoint()
 	if IsRestBackend() {
 	if IsRestBackend() {
 		cfg.RestBackend = "on"
 		cfg.RestBackend = "on"
 	}
 	}
@@ -250,6 +252,15 @@ func GetMessageQueueEndpoint() (string, bool) {
 	return host + ":" + GetMQServerPort(), secure
 	return host + ":" + GetMQServerPort(), secure
 }
 }
 
 
+// GetBrokerType - returns the type of MQ broker
+func GetBrokerType() string {
+	if os.Getenv("BROKER_TYPE") != "" {
+		return os.Getenv("BROKER_TYPE")
+	} else {
+		return "mosquitto"
+	}
+}
+
 // GetMasterKey - gets the configured master key of server
 // GetMasterKey - gets the configured master key of server
 func GetMasterKey() string {
 func GetMasterKey() string {
 	key := ""
 	key := ""
@@ -613,6 +624,11 @@ func GetMqUserName() string {
 	return password
 	return password
 }
 }
 
 
+// GetEmqxRestEndpoint - returns the REST API Endpoint of EMQX
+func GetEmqxRestEndpoint() string {
+	return os.Getenv("EMQX_REST_ENDPOINT")
+}
+
 // IsBasicAuthEnabled - checks if basic auth has been configured to be turned off
 // IsBasicAuthEnabled - checks if basic auth has been configured to be turned off
 func IsBasicAuthEnabled() bool {
 func IsBasicAuthEnabled() bool {
 	var enabled = true //default
 	var enabled = true //default