Browse Source

Merge pull request #1291 from gravitl/feature_v0.14.5_mq_connect_server

Feature v0.14.5 mq connect server
dcarns 3 years ago
parent
commit
afc0fa01b5
5 changed files with 114 additions and 11 deletions
  1. 52 3
      main.go
  2. 7 3
      mq/mq.go
  3. 11 4
      mq/publishers.go
  4. 35 0
      serverctl/tls.go
  5. 9 1
      tls/tls.go

+ 52 - 3
main.go

@@ -120,7 +120,9 @@ func initialize() { // Client Mode Prereq Check
 		}
 	}
 
-	genCerts()
+	if err = genCerts(); err != nil {
+		logger.Log(0, "something went wrong when generating broker certs", err.Error())
+	}
 
 	if servercfg.IsMessageQueueBackend() {
 		if err = mq.ServerStartNotify(); err != nil {
@@ -207,7 +209,7 @@ func genCerts() error {
 	ca, err := serverctl.ReadCertFromDB(tls.ROOT_PEM_NAME)
 	//if cert doesn't exist or will expire within 10 days --- but can't do this as clients won't be able to connect
 	//if errors.Is(err, os.ErrNotExist) || cert.NotAfter.Before(time.Now().Add(time.Hour*24*10)) {
-	if errors.Is(err, os.ErrNotExist) || database.IsEmptyRecord(err) {
+	if errors.Is(err, os.ErrNotExist) || database.IsEmptyRecord(err) || ca.NotAfter.Before(time.Now().Add(time.Hour*24*10)) {
 		logger.Log(0, "generating new root CA")
 		caName := tls.NewName("CA Root", "US", "Gravitl")
 		csr, err := tls.NewCSR(*private, caName)
@@ -251,5 +253,52 @@ func genCerts() error {
 	} else if err != nil {
 		return err
 	}
-	return nil
+
+	logger.Log(2, "ensure the root.pem, root.key, server.pem, and server.key files are updated on your broker")
+
+	serverClientCert, err := serverctl.ReadCertFromDB(tls.SERVER_CLIENT_PEM)
+	if errors.Is(err, os.ErrNotExist) || database.IsEmptyRecord(err) || serverClientCert.NotAfter.Before(time.Now().Add(time.Hour*24*10)) {
+		//gen new key
+		logger.Log(0, "generating new server client key/certificate")
+		_, key, err := ed25519.GenerateKey(rand.Reader)
+		if err != nil {
+			return err
+		}
+		serverName := tls.NewCName(servercfg.GetServer())
+		csr, err := tls.NewCSR(key, serverName)
+		if err != nil {
+			return err
+		}
+		serverClientCert, err := tls.NewEndEntityCert(*private, csr, ca, tls.CERTIFICATE_VALIDITY)
+		if err != nil {
+			return err
+		}
+
+		if err := serverctl.SaveKey(functions.GetNetmakerPath()+ncutils.GetSeparator(), tls.SERVER_CLIENT_KEY, key); err != nil {
+			return err
+		}
+		if err := serverctl.SaveCert(functions.GetNetmakerPath()+ncutils.GetSeparator(), tls.SERVER_CLIENT_PEM, serverClientCert); err != nil {
+			return err
+		}
+	} else if err != nil {
+		return err
+	} else if err == nil {
+		logger.Log(0, "detected valid server client cert, re-saving for future consumption")
+		key, err := serverctl.ReadKeyFromDB(tls.SERVER_CLIENT_KEY)
+		if err != nil {
+			return err
+		}
+		if err := serverctl.SaveKey(functions.GetNetmakerPath()+ncutils.GetSeparator(), tls.SERVER_CLIENT_KEY, *key); err != nil {
+			return err
+		}
+		if err := serverctl.SaveCert(functions.GetNetmakerPath()+ncutils.GetSeparator(), tls.SERVER_CLIENT_PEM, serverClientCert); err != nil {
+			return err
+		}
+	}
+
+	return serverctl.SetClientTLSConf(
+		functions.GetNetmakerPath()+ncutils.GetSeparator()+tls.SERVER_CLIENT_PEM,
+		functions.GetNetmakerPath()+ncutils.GetSeparator()+tls.SERVER_CLIENT_KEY,
+		ca,
+	)
 }

+ 7 - 3
mq/mq.go

@@ -2,13 +2,13 @@ package mq
 
 import (
 	"context"
-	"log"
 	"time"
 
 	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/netclient/ncutils"
 	"github.com/gravitl/netmaker/servercfg"
+	"github.com/gravitl/netmaker/serverctl"
 )
 
 // KEEPALIVE_TIMEOUT - time in seconds for timeout
@@ -27,6 +27,7 @@ func SetupMQTT(publish bool) mqtt.Client {
 	opts.AddBroker(servercfg.GetMessageQueueEndpoint())
 	id := ncutils.MakeRandomString(23)
 	opts.ClientID = id
+	opts.SetTLSConfig(&serverctl.TlsConfig)
 	opts.SetAutoReconnect(true)
 	opts.SetConnectRetry(true)
 	opts.SetConnectRetryInterval(time.Second << 2)
@@ -58,9 +59,9 @@ func SetupMQTT(publish bool) mqtt.Client {
 			logger.Log(2, "unable to connect to broker, retrying ...")
 			if time.Now().After(tperiod) {
 				if token.Error() == nil {
-					log.Fatal(0, "could not connect to broker, token timeout, exiting ...")
+					logger.FatalLog("could not connect to broker, token timeout, exiting ...")
 				} else {
-					log.Fatal(0, "could not connect to broker, exiting ...", token.Error())
+					logger.FatalLog("could not connect to broker, exiting ...", token.Error().Error())
 				}
 			}
 		} else {
@@ -68,6 +69,9 @@ func SetupMQTT(publish bool) mqtt.Client {
 		}
 		time.Sleep(2 * time.Second)
 	}
+	if !publish {
+		logger.Log(0, "successfully connected to mq broker")
+	}
 	return client
 }
 

+ 11 - 4
mq/publishers.go

@@ -119,9 +119,14 @@ func sendPeers() {
 
 	for _, network := range networks {
 		serverNode, errN := logic.GetNetworkServerLeader(network.NetID)
-		if errN == nil && logic.IsLocalServer(&serverNode) {
+		if errN == nil {
 			serverNode.SetLastCheckIn()
-			logic.UpdateNode(&serverNode, &serverNode)
+			if err := logic.UpdateNode(&serverNode, &serverNode); err != nil {
+				logger.Log(0, "failed checkin for server node", serverNode.Name, "on network", network.NetID, err.Error())
+			}
+		}
+		isLeader := logic.IsLocalServer(&serverNode)
+		if errN == nil && isLeader {
 			if network.DefaultUDPHolePunch == "yes" {
 				if logic.ShouldPublishPeerPorts(&serverNode) || force {
 					if force {
@@ -135,12 +140,14 @@ func sendPeers() {
 				}
 			}
 		} else {
-			logger.Log(1, "unable to retrieve leader for network ", network.NetID)
+			if isLeader {
+				logger.Log(1, "unable to retrieve leader for network ", network.NetID)
+			}
+			logger.Log(2, "server checkin complete for server", serverNode.Name, "on network", network.NetID)
 			serverctl.SyncServerNetwork(network.NetID)
 			if errN != nil {
 				logger.Log(1, errN.Error())
 			}
-			continue
 		}
 	}
 }

+ 35 - 0
serverctl/tls.go

@@ -2,6 +2,7 @@ package serverctl
 
 import (
 	"crypto/ed25519"
+	ssl "crypto/tls"
 	"crypto/x509"
 	"encoding/json"
 	"encoding/pem"
@@ -12,6 +13,9 @@ import (
 	"github.com/gravitl/netmaker/tls"
 )
 
+// TlsConfig - holds this servers TLS conf in memory
+var TlsConfig ssl.Config
+
 // SaveCert - save a certificate to file and DB
 func SaveCert(path, name string, cert *x509.Certificate) error {
 	if err := SaveCertToDB(name, cert); err != nil {
@@ -103,3 +107,34 @@ func ReadKeyFromDB(name string) (*ed25519.PrivateKey, error) {
 	private := key.(ed25519.PrivateKey)
 	return &private, nil
 }
+
+// SetClientTLSConf - saves client cert for servers to connect to MQ broker with
+func SetClientTLSConf(serverClientPemPath, serverClientKeyPath string, ca *x509.Certificate) error {
+	certpool := x509.NewCertPool()
+	if caData := pem.EncodeToMemory(&pem.Block{
+		Type:  "CERTIFICATE",
+		Bytes: ca.Raw,
+	}); len(caData) <= 0 {
+		return fmt.Errorf("could not encode CA cert to memory for server client")
+	} else {
+		ok := certpool.AppendCertsFromPEM(caData)
+		if !ok {
+			return fmt.Errorf("failed to append root cert to server client cert")
+		}
+	}
+	clientKeyPair, err := ssl.LoadX509KeyPair(serverClientPemPath, serverClientKeyPath)
+	if err != nil {
+		return err
+	}
+	certs := []ssl.Certificate{clientKeyPair}
+
+	TlsConfig = ssl.Config{
+		RootCAs:            certpool,
+		ClientAuth:         ssl.NoClientCert,
+		ClientCAs:          nil,
+		Certificates:       certs,
+		InsecureSkipVerify: false,
+	}
+
+	return nil
+}

+ 9 - 1
tls/tls.go

@@ -18,7 +18,6 @@ import (
 )
 
 const (
-
 	// CERTTIFICATE_VALIDITY duration of certificate validity in days
 	CERTIFICATE_VALIDITY = 365
 
@@ -33,6 +32,15 @@ const (
 
 	// ROOT_PEM_NAME - name of root pem
 	ROOT_PEM_NAME = "root.pem"
+
+	// SERVER_CLIENT_PEM - the name of server client cert
+	SERVER_CLIENT_PEM = "serverclient.pem"
+
+	// SERVER_CLIENT_KEY - the name of server client key
+	SERVER_CLIENT_KEY = "serverclient.key"
+
+	// SERVER_CLIENT_ENTRY - the server client cert key for DB
+	SERVER_CLIENT_ENTRY = "servercliententry"
 )
 
 type (