|
@@ -164,11 +164,7 @@ func startControllers() {
|
|
|
// Should we be using a context vice a waitgroup????????????
|
|
|
func runMessageQueue(wg *sync.WaitGroup) {
|
|
|
defer wg.Done()
|
|
|
- brokerHost, secure := servercfg.GetMessageQueueEndpoint()
|
|
|
- logger.Log(0, "connecting to mq broker at", brokerHost, "with TLS?", fmt.Sprintf("%v", secure))
|
|
|
- mq.SetupMQTT()
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
-
|
|
|
if servercfg.IsMessageQueueBackend() { // connect to external broker
|
|
|
brokerHost, secure := servercfg.GetMessageQueueEndpoint()
|
|
|
logger.Log(0, "connecting to mq broker at", brokerHost, "with TLS?", fmt.Sprintf("%v", secure))
|
|
@@ -180,9 +176,11 @@ func runMessageQueue(wg *sync.WaitGroup) {
|
|
|
go func() {
|
|
|
peerUpdate := make(chan *models.Node)
|
|
|
go logic.ManageZombies(ctx, peerUpdate)
|
|
|
- for nodeUpdate := range peerUpdate {
|
|
|
- if err := mq.NodeUpdate(nodeUpdate); err != nil {
|
|
|
- logger.Log(0, "failed to send peer update for deleted node: ", nodeUpdate.ID.String(), err.Error())
|
|
|
+ if servercfg.IsMessageQueueBackend() {
|
|
|
+ for nodeUpdate := range peerUpdate {
|
|
|
+ if err := mq.NodeUpdate(nodeUpdate); err != nil {
|
|
|
+ logger.Log(0, "failed to send peer update for deleted node: ", nodeUpdate.ID.String(), err.Error())
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}()
|