소스 검색

Add periodic sync check when syncing LF<>another DB

Adam Ierymenko 6 년 전
부모
커밋
55087521f6
4개의 변경된 파일64개의 추가작업 그리고 4개의 파일을 삭제
  1. 14 1
      controller/DB.hpp
  2. 46 2
      controller/DBMirrorSet.cpp
  3. 3 0
      controller/DBMirrorSet.hpp
  4. 1 1
      controller/LFDB.cpp

+ 14 - 1
controller/DB.hpp

@@ -100,6 +100,19 @@ public:
 
 
 	void networks(std::set<uint64_t> &networks);
 	void networks(std::set<uint64_t> &networks);
 
 
+	template<typename F>
+	inline void each(F f)
+	{
+		nlohmann::json nullJson;
+		std::lock_guard<std::mutex> lck(_networks_l);
+		for(auto nw=_networks.begin();nw!=_networks.end();++nw) {
+			f(nw->first,nw->second->config,0,nullJson); // first provide network with 0 for member ID
+			for(auto m=nw->second->members.begin();m!=nw->second->members.end();++m) {
+				f(nw->first,nw->second->config,m->first,m->second);
+			}
+		}
+	}
+
 	virtual bool save(nlohmann::json &record,bool notifyListeners) = 0;
 	virtual bool save(nlohmann::json &record,bool notifyListeners) = 0;
 
 
 	virtual void eraseNetwork(const uint64_t networkId) = 0;
 	virtual void eraseNetwork(const uint64_t networkId) = 0;
@@ -114,7 +127,7 @@ public:
 	}
 	}
 
 
 protected:
 protected:
-	inline bool _compareRecords(const nlohmann::json &a,const nlohmann::json &b)
+	static inline bool _compareRecords(const nlohmann::json &a,const nlohmann::json &b)
 	{
 	{
 		if (a.is_object() == b.is_object()) {
 		if (a.is_object() == b.is_object()) {
 			if (a.is_object()) {
 			if (a.is_object()) {

+ 46 - 2
controller/DBMirrorSet.cpp

@@ -29,8 +29,52 @@
 namespace ZeroTier {
 namespace ZeroTier {
 
 
 DBMirrorSet::DBMirrorSet(DB::ChangeListener *listener) :
 DBMirrorSet::DBMirrorSet(DB::ChangeListener *listener) :
-	_listener(listener)
-{
+	_listener(listener),
+	_running(true)
+{
+	_syncCheckerThread = std::thread([this]() {
+		for(;;) {
+			for(int i=0;i<120;++i) { // 1 minute delay between checks
+				if (!_running)
+					return;
+				std::this_thread::sleep_for(std::chrono::milliseconds(500));
+			}
+
+			std::vector< std::shared_ptr<DB> > dbs;
+			{
+				std::lock_guard<std::mutex> l(_dbs_l);
+				if (_dbs.size() <= 1)
+					continue; // no need to do this if there's only one DB, so skip the iteration
+				dbs = _dbs;
+			}
+
+			for(auto db=dbs.begin();db!=dbs.end();++db) {
+				(*db)->each([this,&dbs,&db](uint64_t networkId,const nlohmann::json &network,uint64_t memberId,const nlohmann::json &member) {
+					if (memberId == 0) {
+						for(auto db2=dbs.begin();db2!=dbs.end();++db2) {
+							if (db->get() != db2->get()) {
+								nlohmann::json nw2;
+								if ((!(*db2)->get(networkId,nw2))||(OSUtils::jsonInt(nw2["revision"],0) < OSUtils::jsonInt(network["revision"],0))) {
+									nw2 = network;
+									(*db2)->save(nw2,false);
+								}
+							}
+						}
+					} else {
+						for(auto db2=dbs.begin();db2!=dbs.end();++db2) {
+							if (db->get() != db2->get()) {
+								nlohmann::json nw2,m2;
+								if ((!(*db2)->get(networkId,nw2,memberId,m2))||(OSUtils::jsonInt(nw2["revision"],0) < OSUtils::jsonInt(network["revision"],0))) {
+									m2 = member;
+									(*db2)->save(m2,false);
+								}
+							}
+						}
+					}
+				});
+			}
+		}
+	});
 }
 }
 
 
 DBMirrorSet::~DBMirrorSet()
 DBMirrorSet::~DBMirrorSet()

+ 3 - 0
controller/DBMirrorSet.hpp

@@ -33,6 +33,7 @@
 #include <memory>
 #include <memory>
 #include <mutex>
 #include <mutex>
 #include <set>
 #include <set>
+#include <thread>
 
 
 namespace ZeroTier {
 namespace ZeroTier {
 
 
@@ -72,6 +73,8 @@ public:
 
 
 private:
 private:
 	DB::ChangeListener *const _listener;
 	DB::ChangeListener *const _listener;
+	std::atomic_bool _running;
+	std::thread _syncCheckerThread;
 	std::vector< std::shared_ptr< DB > > _dbs;
 	std::vector< std::shared_ptr< DB > > _dbs;
 	mutable std::mutex _dbs_l;
 	mutable std::mutex _dbs_l;
 };
 };

+ 1 - 1
controller/LFDB.cpp

@@ -296,7 +296,7 @@ LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,cons
 														const uint64_t prevRevision = oldMember["revision"];
 														const uint64_t prevRevision = oldMember["revision"];
 														if (prevRevision < revision)
 														if (prevRevision < revision)
 															_memberChanged(oldMember,member,timeRangeStart > 0);
 															_memberChanged(oldMember,member,timeRangeStart > 0);
-													} else if (network.is_object()) {
+													} else if (hasNetwork(nwid)) {
 														nlohmann::json nullJson;
 														nlohmann::json nullJson;
 														_memberChanged(nullJson,member,timeRangeStart > 0);
 														_memberChanged(nullJson,member,timeRangeStart > 0);
 													}
 													}