浏览代码

extclients track watch with context and exiting durig cleanup

Abhishek Kondur 2 年之前
父节点
当前提交
a6e01c4963
共有 3 个文件被更改,包括 29 次插入17 次删除
  1. 2 0
      nm-proxy/common/common.go
  2. 26 16
      nm-proxy/manager/manager.go
  3. 1 1
      nm-proxy/wg/wg.go

+ 2 - 0
nm-proxy/common/common.go

@@ -78,6 +78,8 @@ var WgIfaceKeyMap = make(map[string]struct{})
 
 
 var RelayPeerMap = make(map[string]map[string]RemotePeer)
 var RelayPeerMap = make(map[string]map[string]RemotePeer)
 
 
+var ExtClientsWaitTh = make(map[string][]context.CancelFunc)
+
 // RunCmd - runs a local command
 // RunCmd - runs a local command
 func RunCmd(command string, printerr bool) (string, error) {
 func RunCmd(command string, printerr bool) (string, error) {
 	args := strings.Fields(command)
 	args := strings.Fields(command)

+ 26 - 16
nm-proxy/manager/manager.go

@@ -1,6 +1,7 @@
 package manager
 package manager
 
 
 import (
 import (
+	"context"
 	"crypto/md5"
 	"crypto/md5"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
@@ -24,7 +25,6 @@ TODO:-
 			-> start remote conn after endpoint is updated
 			-> start remote conn after endpoint is updated
 		-->
 		-->
 */
 */
-var sent bool
 
 
 type ProxyAction string
 type ProxyAction string
 
 
@@ -73,13 +73,9 @@ func StartProxyManager(manageChan chan *ManagerAction) {
 
 
 		select {
 		select {
 		case mI := <-manageChan:
 		case mI := <-manageChan:
-			if sent {
-				continue
-			}
 			log.Printf("-------> PROXY-MANAGER: %+v\n", mI)
 			log.Printf("-------> PROXY-MANAGER: %+v\n", mI)
 			switch mI.Action {
 			switch mI.Action {
 			case AddInterface:
 			case AddInterface:
-				sent = true
 				common.IsRelay = mI.Payload.IsRelay
 				common.IsRelay = mI.Payload.IsRelay
 				if mI.Payload.IsRelay {
 				if mI.Payload.IsRelay {
 					mI.RelayPeers()
 					mI.RelayPeers()
@@ -194,7 +190,13 @@ func cleanUp(iface string) {
 		}
 		}
 	}
 	}
 	delete(common.WgIFaceMap, iface)
 	delete(common.WgIFaceMap, iface)
-	time.Sleep(time.Second * 5)
+	if waitThs, ok := common.ExtClientsWaitTh[iface]; ok {
+		for _, cancelF := range waitThs {
+			cancelF()
+		}
+		delete(common.ExtClientsWaitTh, iface)
+	}
+
 	log.Println("CLEANED UP..........")
 	log.Println("CLEANED UP..........")
 }
 }
 
 
@@ -277,6 +279,8 @@ func (m *ManagerAction) AddInterfaceToProxy() error {
 			go func(wgInterface *wg.WGIface, peer *wgtypes.PeerConfig,
 			go func(wgInterface *wg.WGIface, peer *wgtypes.PeerConfig,
 				isRelayed, isExtClient, isAttachedExtClient bool, relayTo *net.UDPAddr, peerConf PeerConf) {
 				isRelayed, isExtClient, isAttachedExtClient bool, relayTo *net.UDPAddr, peerConf PeerConf) {
 				addExtClient := false
 				addExtClient := false
+				ctx, cancel := context.WithCancel(context.Background())
+				common.ExtClientsWaitTh[wgInterface.Name] = append(common.ExtClientsWaitTh[wgInterface.Name], cancel)
 				defer func() {
 				defer func() {
 					if addExtClient {
 					if addExtClient {
 						log.Println("GOT ENDPOINT for Extclient adding peer...")
 						log.Println("GOT ENDPOINT for Extclient adding peer...")
@@ -293,19 +297,25 @@ func (m *ManagerAction) AddInterfaceToProxy() error {
 					}
 					}
 				}()
 				}()
 				for {
 				for {
-					wgInterface, err := wg.NewWGIFace(ifaceName, "127.0.0.1/32", wg.DefaultMTU)
-					if err != nil {
-						log.Println("Failed init new interface: ", err)
-						continue
-					}
-					for _, devpeerI := range wgInterface.Device.Peers {
-						if devpeerI.PublicKey.String() == peer.PublicKey.String() && devpeerI.Endpoint != nil {
-							peer.Endpoint = devpeerI.Endpoint
-							addExtClient = true
+					select {
+					case <-ctx.Done():
+						log.Println("Exiting extclient watch Thread for: ", wgInterface.Device.PublicKey.String())
+						return
+					default:
+						wgInterface, err := wg.NewWGIFace(ifaceName, "127.0.0.1/32", wg.DefaultMTU)
+						if err != nil {
+							log.Println("Failed init new interface: ", err)
 							return
 							return
 						}
 						}
+						for _, devpeerI := range wgInterface.Device.Peers {
+							if devpeerI.PublicKey.String() == peer.PublicKey.String() && devpeerI.Endpoint != nil {
+								peer.Endpoint = devpeerI.Endpoint
+								addExtClient = true
+								return
+							}
+						}
+						time.Sleep(time.Second * 5)
 					}
 					}
-					time.Sleep(time.Second * 5)
 
 
 				}
 				}
 
 

+ 1 - 1
nm-proxy/wg/wg.go

@@ -74,7 +74,7 @@ func (w *WGIface) GetWgIface(iface string) error {
 		return err
 		return err
 	}
 	}
 
 
-	log.Printf("----> DEVICE: %+v\n", dev)
+	//log.Printf("----> DEVICE: %+v\n", dev)
 	w.Device = dev
 	w.Device = dev
 	w.Port = dev.ListenPort
 	w.Port = dev.ListenPort
 	return nil
 	return nil