Browse Source

sync peers updates with proxy

Abhishek Kondur 2 years ago
parent
commit
17e05d430b
6 changed files with 75 additions and 22 deletions
  1. 35 1
      controllers/node.go
  2. 3 0
      logic/peers.go
  3. 1 2
      logic/server.go
  4. 22 3
      logic/wireguard.go
  5. 13 16
      mq/handlers.go
  6. 1 0
      serverctl/serverctl.go

+ 35 - 1
controllers/node.go

@@ -3,6 +3,7 @@ package controller
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
+	"net"
 	"net/http"
 	"net/http"
 	"strings"
 	"strings"
 
 
@@ -14,8 +15,10 @@ import (
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/models/promodels"
 	"github.com/gravitl/netmaker/models/promodels"
 	"github.com/gravitl/netmaker/mq"
 	"github.com/gravitl/netmaker/mq"
+	"github.com/gravitl/netmaker/nm-proxy/manager"
 	"github.com/gravitl/netmaker/servercfg"
 	"github.com/gravitl/netmaker/servercfg"
 	"golang.org/x/crypto/bcrypt"
 	"golang.org/x/crypto/bcrypt"
+	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 )
 
 
 func nodeHandlers(r *mux.Router) {
 func nodeHandlers(r *mux.Router) {
@@ -1012,6 +1015,24 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 	if servercfg.IsDNSMode() {
 	if servercfg.IsDNSMode() {
 		logic.SetDNS()
 		logic.SetDNS()
 	}
 	}
+	wgPubKey, wgErr := wgtypes.ParseKey(newNode.PublicKey)
+	nodeEndpoint, udpErr := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", newNode.Endpoint, newNode.LocalListenPort))
+	if wgErr == nil && udpErr == nil {
+		logic.ProxyMgmChan <- &manager.ManagerAction{
+			Action: manager.UpdatePeer,
+			Payload: manager.ManagerPayload{
+				InterfaceName: newNode.Interface,
+				Peers: []wgtypes.PeerConfig{
+					{
+						PublicKey: wgPubKey,
+						Endpoint:  nodeEndpoint,
+					},
+				},
+			},
+		}
+	} else {
+		logger.Log(1, fmt.Sprintf("failed to send node update to proxy, wgErr: %v, udpErr: %v", wgErr, udpErr))
+	}
 
 
 	logger.Log(1, r.Header.Get("user"), "updated node", node.ID, "on network", node.Network)
 	logger.Log(1, r.Header.Get("user"), "updated node", node.ID, "on network", node.Network)
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
@@ -1100,7 +1121,20 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 			logger.Log(0, "failed to reset failover lists during node delete for node", node.Name, node.Network)
 			logger.Log(0, "failed to reset failover lists during node delete for node", node.Name, node.Network)
 		}
 		}
 	}
 	}
-
+	wgKey, _ := wgtypes.ParseKey(node.PublicKey)
+	endpoint, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", node.Endpoint, node.LocalListenPort))
+	logic.ProxyMgmChan <- &manager.ManagerAction{
+		Action: manager.DeletePeer,
+		Payload: manager.ManagerPayload{
+			InterfaceName: node.Interface,
+			Peers: []wgtypes.PeerConfig{
+				{
+					PublicKey: wgKey,
+					Endpoint:  endpoint,
+				},
+			},
+		},
+	}
 	logic.ReturnSuccessResponse(w, r, nodeid+" deleted.")
 	logic.ReturnSuccessResponse(w, r, nodeid+" deleted.")
 	logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"])
 	logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"])
 	runUpdates(&node, false)
 	runUpdates(&node, false)

+ 3 - 0
logic/peers.go

@@ -3,6 +3,7 @@ package logic
 import (
 import (
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
+	"log"
 	"net"
 	"net"
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
@@ -31,6 +32,7 @@ func GetPeersForProxy(node *models.Node) ([]wgtypes.PeerConfig, error) {
 			//skip yourself
 			//skip yourself
 			continue
 			continue
 		}
 		}
+		log.Printf("----------> PEER: %s, Endpoint: %s, LocalPort: %d", peer.ID, peer.Endpoint, peer.LocalListenPort)
 		pubkey, err := wgtypes.ParseKey(peer.PublicKey)
 		pubkey, err := wgtypes.ParseKey(peer.PublicKey)
 		if err != nil {
 		if err != nil {
 			logger.Log(1, "failed to parse node pub key: ", peer.ID)
 			logger.Log(1, "failed to parse node pub key: ", peer.ID)
@@ -47,6 +49,7 @@ func GetPeersForProxy(node *models.Node) ([]wgtypes.PeerConfig, error) {
 			// set_keepalive
 			// set_keepalive
 			keepalive, _ = time.ParseDuration(strconv.FormatInt(int64(node.PersistentKeepalive), 10) + "s")
 			keepalive, _ = time.ParseDuration(strconv.FormatInt(int64(node.PersistentKeepalive), 10) + "s")
 		}
 		}
+		log.Printf("---------->##### PEER: %s, Endpoint: %s, LocalPort: %d", peer.ID, endpoint, peer.LocalListenPort)
 		peers = append(peers, wgtypes.PeerConfig{
 		peers = append(peers, wgtypes.PeerConfig{
 			PublicKey:                   pubkey,
 			PublicKey:                   pubkey,
 			Endpoint:                    endpoint,
 			Endpoint:                    endpoint,

+ 1 - 2
logic/server.go

@@ -19,7 +19,6 @@ import (
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 )
 
 
-var ProxyStatus = "OFF"
 var ProxyMgmChan = make(chan *manager.ManagerAction, 100)
 var ProxyMgmChan = make(chan *manager.ManagerAction, 100)
 
 
 // EnterpriseCheckFuncs - can be set to run functions for EE
 // EnterpriseCheckFuncs - can be set to run functions for EE
@@ -176,7 +175,7 @@ func ServerJoin(networkSettings *models.Network) (models.Node, error) {
 	if err != nil {
 	if err != nil {
 		return returnNode, err
 		return returnNode, err
 	}
 	}
-
+	logger.Log(0, "--------> Hereeeeeee23333")
 	ProxyMgmChan <- &manager.ManagerAction{
 	ProxyMgmChan <- &manager.ManagerAction{
 		Action: manager.AddInterface,
 		Action: manager.AddInterface,
 		Payload: manager.ManagerPayload{
 		Payload: manager.ManagerPayload{

+ 22 - 3
logic/wireguard.go

@@ -139,22 +139,41 @@ func setWGConfig(node *models.Node, peerupdate bool) error {
 	if peerupdate {
 	if peerupdate {
 		if err := wireguard.SetPeers(node.Interface, node, peers.Peers); err != nil {
 		if err := wireguard.SetPeers(node.Interface, node, peers.Peers); err != nil {
 			logger.Log(0, "error updating peers", err.Error())
 			logger.Log(0, "error updating peers", err.Error())
+			return err
 		}
 		}
+		// logger.Log(0, "--------> UPDATE PEERS IN PROXY.....")
+		// ProxyMgmChan <- &manager.ManagerAction{
+		// 	Action: manager.UpdatePeer,
+		// 	Payload: manager.ManagerPayload{
+		// 		InterfaceName: node.Interface,
+		// 		Peers:         peers.Peers,
+		// 	},
+		// }
+
 		logger.Log(2, "updated peers on server", node.Name)
 		logger.Log(2, "updated peers on server", node.Name)
 	} else {
 	} else {
 		err = wireguard.InitWireguard(node, privkey, peers.Peers)
 		err = wireguard.InitWireguard(node, privkey, peers.Peers)
+		if err != nil {
+			logger.Log(0, "failed to set wg config on server: ", node.Name, err.Error())
+			return err
+		}
 		logger.Log(3, "finished setting wg config on server", node.Name)
 		logger.Log(3, "finished setting wg config on server", node.Name)
+
 	}
 	}
-	if ProxyStatus == "ON" {
+	logger.Log(0, "--------> ADD INTERFACE TO PROXY.....")
+	peersP, err := GetPeersForProxy(node)
+	if err != nil {
+		logger.Log(0, "failed to get peers for proxy: ", err.Error())
+	} else {
 		ProxyMgmChan <- &manager.ManagerAction{
 		ProxyMgmChan <- &manager.ManagerAction{
 			Action: manager.AddInterface,
 			Action: manager.AddInterface,
 			Payload: manager.ManagerPayload{
 			Payload: manager.ManagerPayload{
 				InterfaceName: node.Interface,
 				InterfaceName: node.Interface,
-				Peers:         peers.Peers,
+				Peers:         peersP,
 			},
 			},
 		}
 		}
 	}
 	}
-	return err
+	return nil
 }
 }
 
 
 func setWGKeyConfig(node *models.Node) error {
 func setWGKeyConfig(node *models.Node) error {

+ 13 - 16
mq/handlers.go

@@ -104,27 +104,24 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 			if err = PublishPeerUpdate(&currentNode, true); err != nil {
 			if err = PublishPeerUpdate(&currentNode, true); err != nil {
 				logger.Log(0, "error updating peers when node", currentNode.Name, currentNode.ID, "informed the server of an interface change", err.Error())
 				logger.Log(0, "error updating peers when node", currentNode.Name, currentNode.ID, "informed the server of an interface change", err.Error())
 			}
 			}
-			pubKey, err := wgtypes.ParseKey(newNode.PublicKey)
-			if err == nil {
-				endpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", newNode.Endpoint, newNode.LocalListenPort))
-				if err == nil {
-					logic.ProxyMgmChan <- &manager.ManagerAction{
-						Action: manager.UpdatePeer,
-						Payload: manager.ManagerPayload{
-							InterfaceName: newNode.Interface,
-							Peers: []wgtypes.PeerConfig{
-								{
-									PublicKey: pubKey,
-									Endpoint:  endpoint,
-								},
+			pubKey, wgErr := wgtypes.ParseKey(newNode.PublicKey)
+			endpoint, udpErr := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", newNode.Endpoint, newNode.LocalListenPort))
+			if wgErr == nil && udpErr == nil {
+				logic.ProxyMgmChan <- &manager.ManagerAction{
+					Action: manager.UpdatePeer,
+					Payload: manager.ManagerPayload{
+						InterfaceName: newNode.Interface,
+						Peers: []wgtypes.PeerConfig{
+							{
+								PublicKey: pubKey,
+								Endpoint:  endpoint,
 							},
 							},
 						},
 						},
-					}
+					},
 				}
 				}
-
 			}
 			}
-
 		}
 		}
+
 		logger.Log(1, "updated node", id, newNode.Name)
 		logger.Log(1, "updated node", id, newNode.Name)
 	}()
 	}()
 }
 }

+ 1 - 0
serverctl/serverctl.go

@@ -87,6 +87,7 @@ func SyncServerNetworkWithProxy() error {
 				logger.Log(1, "failed to retrieve peers for server node: ", serverNode.ID)
 				logger.Log(1, "failed to retrieve peers for server node: ", serverNode.ID)
 				continue
 				continue
 			}
 			}
+			logger.Log(0, "----> HEREEEEEEEE1")
 			logic.ProxyMgmChan <- &manager.ManagerAction{
 			logic.ProxyMgmChan <- &manager.ManagerAction{
 				Action: manager.AddInterface,
 				Action: manager.AddInterface,
 				Payload: manager.ManagerPayload{
 				Payload: manager.ManagerPayload{