浏览代码

manage DNS sync publish

Max Ma 1 年之前
父节点
当前提交
5b1552178b
共有 2 个文件被更改,包括 39 次插入0 次删除
  1. 3 0
      mq/mq.go
  2. 36 0
      mq/publishers.go

+ 3 - 0
mq/mq.go

@@ -131,6 +131,9 @@ func Keepalive(ctx context.Context) {
 		case <-ctx.Done():
 			return
 		case <-time.After(time.Second * KEEPALIVE_TIMEOUT):
+			if servercfg.GetManageDNS() {
+				sendDNSSync()
+			}
 			sendPeers()
 		}
 	}

+ 36 - 0
mq/publishers.go

@@ -16,6 +16,7 @@ import (
 
 var batchSize = servercfg.GetPeerUpdateBatchSize()
 var batchUpdate = servercfg.GetBatchPeerUpdate()
+var manageDNSCache = map[string]int{}
 
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
 func PublishPeerUpdate(replacePeers bool) error {
@@ -249,3 +250,38 @@ func sendPeers() {
 		}
 	}
 }
+
+func sendDNSSync() error {
+
+	networks, err := logic.GetNetworks()
+	if err == nil && len(networks) > 0 {
+		for _, v := range networks {
+			k, err := logic.GetDNS(v.NetID)
+			if err == nil {
+				if manageDNSCache[v.NetID] != len(k) {
+					data, err := json.Marshal(k)
+					if err != nil {
+						slog.Warn("error marshalling dns entry data for network ", v.NetID, err.Error())
+					}
+
+					if mqclient == nil || !mqclient.IsConnectionOpen() {
+						return errors.New("cannot publish ... mqclient not connected")
+					}
+
+					if token := mqclient.Publish(fmt.Sprintf("host/dns/sync/%s", v.NetID), 0, true, data); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
+						if token.Error() == nil {
+							slog.Warn("could not publish server status", "error", "connection timeout")
+						} else {
+							slog.Warn("could not publish server status", "error", token.Error().Error())
+						}
+					}
+					manageDNSCache[v.NetID] = len(k)
+				}
+				continue
+			}
+			slog.Warn("error getting DNS entries for network ", v.NetID, err.Error())
+		}
+		return err
+	}
+	return err
+}