|
@@ -139,11 +139,15 @@ func startControllers(wg *sync.WaitGroup, ctx context.Context) {
|
|
}
|
|
}
|
|
//Run MessageQueue
|
|
//Run MessageQueue
|
|
wg.Add(1)
|
|
wg.Add(1)
|
|
- go runMessageQueue(wg, ctx)
|
|
|
|
|
|
+ go runMessageQueue(ctx, wg)
|
|
|
|
+ peerUpdate := make(chan *models.Node)
|
|
|
|
+ wg.Add(1)
|
|
|
|
+ go logic.ManageZombies(ctx, wg, peerUpdate)
|
|
|
|
+ wg.Add(1)
|
|
|
|
+ go logic.DeleteExpiredNodes(ctx, wg, peerUpdate)
|
|
|
|
+ wg.Add(1)
|
|
go func() {
|
|
go func() {
|
|
- peerUpdate := make(chan *models.Node)
|
|
|
|
- go logic.ManageZombies(ctx, peerUpdate)
|
|
|
|
- go logic.DeleteExpiredNodes(ctx, peerUpdate)
|
|
|
|
|
|
+ defer wg.Done()
|
|
for {
|
|
for {
|
|
select {
|
|
select {
|
|
case nodeUpdate := <-peerUpdate:
|
|
case nodeUpdate := <-peerUpdate:
|
|
@@ -164,7 +168,7 @@ func startControllers(wg *sync.WaitGroup, ctx context.Context) {
|
|
}
|
|
}
|
|
|
|
|
|
// Should we be using a context vice a waitgroup????????????
|
|
// Should we be using a context vice a waitgroup????????????
|
|
-func runMessageQueue(wg *sync.WaitGroup, ctx context.Context) {
|
|
|
|
|
|
+func runMessageQueue(ctx context.Context, wg *sync.WaitGroup) {
|
|
defer wg.Done()
|
|
defer wg.Done()
|
|
go mq.Keepalive(ctx)
|
|
go mq.Keepalive(ctx)
|
|
defer mq.CloseClient()
|
|
defer mq.CloseClient()
|