|
@@ -24,6 +24,7 @@ import (
|
|
"github.com/gravitl/netmaker/netclient/daemon"
|
|
"github.com/gravitl/netmaker/netclient/daemon"
|
|
"github.com/gravitl/netmaker/netclient/ncutils"
|
|
"github.com/gravitl/netmaker/netclient/ncutils"
|
|
"github.com/gravitl/netmaker/netclient/wireguard"
|
|
"github.com/gravitl/netmaker/netclient/wireguard"
|
|
|
|
+ "github.com/gravitl/netmaker/servercfg"
|
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
|
)
|
|
)
|
|
|
|
|
|
@@ -43,30 +44,24 @@ func Daemon() error {
|
|
logger.Log(0, "starting daemon")
|
|
logger.Log(0, "starting daemon")
|
|
// == start mq for each server ==
|
|
// == start mq for each server ==
|
|
// == add waitgroup and cancel for checkin routine ==
|
|
// == add waitgroup and cancel for checkin routine ==
|
|
- //ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
|
+ ctx, cancel := context.WithCancel(context.Background())
|
|
wg := sync.WaitGroup{}
|
|
wg := sync.WaitGroup{}
|
|
- //networks, _ := ncutils.GetSystemNetworks()
|
|
|
|
- // for _, network := range networks {
|
|
|
|
- // //temporary code --- remove in version v0.13.0
|
|
|
|
- // removeHostDNS(network, ncutils.IsWindows())
|
|
|
|
- // // end of code to be removed in version v0.13.0
|
|
|
|
- // var cfg config.ClientConfig
|
|
|
|
- // cfg.Network = network
|
|
|
|
- // cfg.ReadConfig()
|
|
|
|
- // // == initial pull of all networks ==
|
|
|
|
- // initialPull(cfg.Network)
|
|
|
|
- // logger.Log(1, "started mq for server ", cfg.NetworkSettings.NetID)
|
|
|
|
- // wg.Add(1)
|
|
|
|
- // go messageQueue(ctx, wg, &cfg)
|
|
|
|
- // }
|
|
|
|
- //wg.Add(1)
|
|
|
|
- //go Checkin(ctx, &wg)
|
|
|
|
- var cfg config.ClientConfig
|
|
|
|
- cfg.Network = "netmaker"
|
|
|
|
- cfg.ReadConfig()
|
|
|
|
- //Hello(&cfg)
|
|
|
|
- client := SetupMQTT(&cfg, false)
|
|
|
|
- defer client.Disconnect(500)
|
|
|
|
|
|
+ networks, _ := ncutils.GetSystemNetworks()
|
|
|
|
+ for _, network := range networks {
|
|
|
|
+ //temporary code --- remove in version v0.13.0
|
|
|
|
+ removeHostDNS(network, ncutils.IsWindows())
|
|
|
|
+ // end of code to be removed in version v0.13.0
|
|
|
|
+ var cfg config.ClientConfig
|
|
|
|
+ cfg.Network = network
|
|
|
|
+ cfg.ReadConfig()
|
|
|
|
+ // == initial pull of all networks ==
|
|
|
|
+ initialPull(cfg.Network)
|
|
|
|
+ logger.Log(1, "started mq for server ", cfg.NetworkSettings.NetID)
|
|
|
|
+ wg.Add(1)
|
|
|
|
+ go messageQueue(ctx, wg, &cfg)
|
|
|
|
+ }
|
|
|
|
+ wg.Add(1)
|
|
|
|
+ go Checkin(ctx, &wg)
|
|
quit := make(chan os.Signal, 1)
|
|
quit := make(chan os.Signal, 1)
|
|
signal.Notify(quit, syscall.SIGTERM, os.Interrupt, os.Kill)
|
|
signal.Notify(quit, syscall.SIGTERM, os.Interrupt, os.Kill)
|
|
<-quit
|
|
<-quit
|
|
@@ -75,7 +70,7 @@ func Daemon() error {
|
|
// cancel.(context.CancelFunc)()
|
|
// cancel.(context.CancelFunc)()
|
|
// }
|
|
// }
|
|
//}
|
|
//}
|
|
- //cancel()
|
|
|
|
|
|
+ cancel()
|
|
logger.Log(0, "shutting down netclient daemon")
|
|
logger.Log(0, "shutting down netclient daemon")
|
|
wg.Wait()
|
|
wg.Wait()
|
|
logger.Log(0, "shutdown complete")
|
|
logger.Log(0, "shutdown complete")
|
|
@@ -122,13 +117,6 @@ func PingServer(nodeCfg *config.ClientConfig) error {
|
|
}
|
|
}
|
|
|
|
|
|
// == Private ==
|
|
// == Private ==
|
|
-func setTempSubs(client mqtt.Client) {
|
|
|
|
- if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
|
|
|
|
- logger.Log(0, token.Error().Error())
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- logger.Log(0, "subscribed to all topics for debugging purposes")
|
|
|
|
-}
|
|
|
|
|
|
|
|
// sets MQ client subscriptions for a specific node config
|
|
// sets MQ client subscriptions for a specific node config
|
|
// should be called for each node belonging to a given comms network
|
|
// should be called for each node belonging to a given comms network
|
|
@@ -177,22 +165,20 @@ func messageQueue(ctx context.Context, wg sync.WaitGroup, cfg *config.ClientConf
|
|
//var commsCfg config.ClientConfig
|
|
//var commsCfg config.ClientConfig
|
|
//commsCfg.Network = commsNet
|
|
//commsCfg.Network = commsNet
|
|
//commsCfg.ReadConfig()
|
|
//commsCfg.ReadConfig()
|
|
- logger.Log(0, "netclient daemon started for server: ", cfg.Server.MQEndPoint)
|
|
|
|
- client := SetupMQTT(cfg, false)
|
|
|
|
|
|
+ logger.Log(0, "netclient daemon started for server: ", cfg.Server.ServerName)
|
|
|
|
+ client := setupMQTT(cfg, false)
|
|
defer client.Disconnect(250)
|
|
defer client.Disconnect(250)
|
|
<-ctx.Done()
|
|
<-ctx.Done()
|
|
- logger.Log(0, "shutting down daemon for server ", cfg.Server.MQEndPoint)
|
|
|
|
|
|
+ logger.Log(0, "shutting down daemon for server ", cfg.Server.ServerName)
|
|
wg.Done()
|
|
wg.Done()
|
|
}
|
|
}
|
|
|
|
|
|
-// SetupMQTT creates a connection to broker and return client
|
|
|
|
|
|
+// setupMQTT creates a connection to broker and return client
|
|
// utilizes comms client configs to setup connections
|
|
// utilizes comms client configs to setup connections
|
|
-func SetupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
|
|
|
|
|
|
+func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
|
|
opts := mqtt.NewClientOptions()
|
|
opts := mqtt.NewClientOptions()
|
|
- //server := getServerAddress(commsCfg)
|
|
|
|
- opts.AddBroker(cfg.Server.MQEndPoint) // TODO get the appropriate port of the comms mq server
|
|
|
|
- //opts.ClientID = ncutils.MakeRandomString(23) // helps avoid id duplication on broker
|
|
|
|
- logger.Log(0, "added broker ", cfg.Server.MQEndPoint)
|
|
|
|
|
|
+ opts.AddBroker(servercfg.GetMessageQueueEndpoint(true))
|
|
|
|
+ logger.Log(2, "added broker ", cfg.Server.ServerName)
|
|
opts.SetDefaultPublishHandler(All)
|
|
opts.SetDefaultPublishHandler(All)
|
|
opts.SetAutoReconnect(true)
|
|
opts.SetAutoReconnect(true)
|
|
opts.SetConnectRetry(true)
|
|
opts.SetConnectRetry(true)
|
|
@@ -202,17 +188,16 @@ func SetupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
|
|
opts.SetOnConnectHandler(func(client mqtt.Client) {
|
|
opts.SetOnConnectHandler(func(client mqtt.Client) {
|
|
logger.Log(0, "Connection Handler")
|
|
logger.Log(0, "Connection Handler")
|
|
if !publish {
|
|
if !publish {
|
|
- //networks, err := ncutils.GetSystemNetworks()
|
|
|
|
- //if err != nil {
|
|
|
|
- // logger.Log(0, "error retriving networks ", err.Error())
|
|
|
|
- //}
|
|
|
|
- //for _, network := range networks {
|
|
|
|
- // var currNodeCfg config.ClientConfig
|
|
|
|
- // currNodeCfg.Network = network
|
|
|
|
- // currNodeCfg.ReadConfig()
|
|
|
|
- // setSubscriptions(client, &currNodeCfg)
|
|
|
|
- //}
|
|
|
|
- setTempSubs(client)
|
|
|
|
|
|
+ networks, err := ncutils.GetSystemNetworks()
|
|
|
|
+ if err != nil {
|
|
|
|
+ logger.Log(0, "error retriving networks ", err.Error())
|
|
|
|
+ }
|
|
|
|
+ for _, network := range networks {
|
|
|
|
+ var currNodeCfg config.ClientConfig
|
|
|
|
+ currNodeCfg.Network = network
|
|
|
|
+ currNodeCfg.ReadConfig()
|
|
|
|
+ setSubscriptions(client, &currNodeCfg)
|
|
|
|
+ }
|
|
}
|
|
}
|
|
})
|
|
})
|
|
opts.SetOrderMatters(true)
|
|
opts.SetOrderMatters(true)
|
|
@@ -241,13 +226,7 @@ func SetupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
|
|
}
|
|
}
|
|
time.Sleep(time.Second)
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
- if token := client.Connect(); !token.WaitTimeout(time.Second*2) && token.Error() != nil {
|
|
|
|
- //token := client.Connect()
|
|
|
|
- //if done := token.WaitTimeout(time.Second * 2); !done {
|
|
|
|
- // logger.Log(0, "mq client connect timeout")
|
|
|
|
- //}
|
|
|
|
- //err := token.Error()
|
|
|
|
- //if err != nil {
|
|
|
|
|
|
+ if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
logger.Log(0, token.Error().Error())
|
|
logger.Log(0, token.Error().Error())
|
|
logger.Log(0, "unable to connect to broker, retrying ...")
|
|
logger.Log(0, "unable to connect to broker, retrying ...")
|
|
if time.Now().After(tperiod) {
|
|
if time.Now().After(tperiod) {
|
|
@@ -366,7 +345,7 @@ func getServers(networks []string) (map[string]bool, error) {
|
|
for _, network := range networks {
|
|
for _, network := range networks {
|
|
cfg.Network = network
|
|
cfg.Network = network
|
|
cfg.ReadConfig()
|
|
cfg.ReadConfig()
|
|
- response[cfg.Node.MQEndPoint] = true
|
|
|
|
|
|
+ response[cfg.Server.ServerName] = true
|
|
}
|
|
}
|
|
return response, nil
|
|
return response, nil
|
|
}
|
|
}
|