|
@@ -30,7 +30,7 @@ import (
|
|
|
)
|
|
|
|
|
|
var messageCache = new(sync.Map)
|
|
|
-var networkcontext = new(sync.Map)
|
|
|
+var serverSet map[string]bool
|
|
|
|
|
|
const lastNodeUpdate = "lnu"
|
|
|
const lastPeerUpdate = "lpu"
|
|
@@ -43,19 +43,53 @@ type cachedMessage struct {
|
|
|
// Daemon runs netclient daemon from command line
|
|
|
func Daemon() error {
|
|
|
UpdateClientConfig()
|
|
|
- serverSet := make(map[string]bool)
|
|
|
+ if err := ncutils.SavePID(); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ serverSet = make(map[string]bool)
|
|
|
// == initial pull of all networks ==
|
|
|
networks, _ := ncutils.GetSystemNetworks()
|
|
|
if len(networks) == 0 {
|
|
|
return errors.New("no networks")
|
|
|
}
|
|
|
- pubNetworks = append(pubNetworks, networks...)
|
|
|
// set ipforwarding on startup
|
|
|
err := local.SetIPForwarding()
|
|
|
if err != nil {
|
|
|
logger.Log(0, err.Error())
|
|
|
}
|
|
|
|
|
|
+ // == add waitgroup and cancel for checkin routine ==
|
|
|
+ wg := sync.WaitGroup{}
|
|
|
+ quit := make(chan os.Signal, 1)
|
|
|
+ reset := make(chan os.Signal, 1)
|
|
|
+ signal.Notify(quit, syscall.SIGTERM, os.Interrupt)
|
|
|
+ signal.Notify(reset, syscall.SIGHUP)
|
|
|
+ cancel := startGoRoutines(&wg)
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-quit:
|
|
|
+ cancel()
|
|
|
+ logger.Log(0, "shutting down netclient daemon")
|
|
|
+ wg.Wait()
|
|
|
+ logger.Log(0, "shutdown complete")
|
|
|
+ return nil
|
|
|
+ case <-reset:
|
|
|
+ logger.Log(0, "received reset")
|
|
|
+ cancel()
|
|
|
+ wg.Wait()
|
|
|
+ logger.Log(0, "restarting daemon")
|
|
|
+ cancel = startGoRoutines(&wg)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func startGoRoutines(wg *sync.WaitGroup) context.CancelFunc {
|
|
|
+ defer wg.Done()
|
|
|
+ ctx, cancel := context.WithCancel(context.Background())
|
|
|
+ wg.Add(1)
|
|
|
+ go Checkin(ctx, wg)
|
|
|
+ serverSet := make(map[string]bool)
|
|
|
+ networks, _ := ncutils.GetSystemNetworks()
|
|
|
for _, network := range networks {
|
|
|
logger.Log(3, "initializing network", network)
|
|
|
cfg := config.ClientConfig{}
|
|
@@ -69,30 +103,10 @@ func Daemon() error {
|
|
|
// == subscribe to all nodes for each on machine ==
|
|
|
serverSet[server] = true
|
|
|
logger.Log(1, "started daemon for server ", server)
|
|
|
- ctx, cancel := context.WithCancel(context.Background())
|
|
|
- networkcontext.Store(server, cancel)
|
|
|
- go messageQueue(ctx, &cfg)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // == add waitgroup and cancel for checkin routine ==
|
|
|
- wg := sync.WaitGroup{}
|
|
|
- ctx, cancel := context.WithCancel(context.Background())
|
|
|
- wg.Add(1)
|
|
|
- go Checkin(ctx, &wg)
|
|
|
- quit := make(chan os.Signal, 1)
|
|
|
- signal.Notify(quit, syscall.SIGTERM, os.Interrupt)
|
|
|
- <-quit
|
|
|
- for server := range serverSet {
|
|
|
- if cancel, ok := networkcontext.Load(server); ok {
|
|
|
- cancel.(context.CancelFunc)()
|
|
|
+ go messageQueue(ctx, wg, &cfg)
|
|
|
}
|
|
|
}
|
|
|
- cancel()
|
|
|
- logger.Log(0, "shutting down netclient daemon")
|
|
|
- wg.Wait()
|
|
|
- logger.Log(0, "shutdown complete")
|
|
|
- return nil
|
|
|
+ return cancel
|
|
|
}
|
|
|
|
|
|
// UpdateKeys -- updates private key and returns new publickey
|
|
@@ -167,7 +181,8 @@ func unsubscribeNode(client mqtt.Client, nodeCfg *config.ClientConfig) {
|
|
|
|
|
|
// sets up Message Queue and subsribes/publishes updates to/from server
|
|
|
// the client should subscribe to ALL nodes that exist on server locally
|
|
|
-func messageQueue(ctx context.Context, cfg *config.ClientConfig) {
|
|
|
+func messageQueue(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig) {
|
|
|
+ defer wg.Done()
|
|
|
logger.Log(0, "netclient daemon started for server: ", cfg.Server.Server)
|
|
|
client, err := setupMQTT(cfg, false)
|
|
|
if err != nil {
|