Adam Ierymenko 6 anni fa
parent
commit
ac2688de58
2 ha cambiato i file con 100 aggiunte e 231 eliminazioni
  1. 99 225
      controller/LFDB.cpp
  2. 1 6
      controller/LFDB.hpp

+ 99 - 225
controller/LFDB.cpp

@@ -41,30 +41,77 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
 	DB(nc,myId,path),
 	_nc(nc),
 	_myId(myId),
-	_lfOwnerPrivate(lfOwnerPrivate),
-	_lfOwnerPublic(lfOwnerPublic),
-	_lfNodeHost(lfNodeHost),
-	_lfNodePort(lfNodePort),
+	_lfOwnerPrivate((lfOwnerPrivate) ? lfOwnerPrivate : ""),
+	_lfOwnerPublic((lfOwnerPublic) ? lfOwnerPublic : ""),
+	_lfNodeHost((lfNodeHost) ? lfNodeHost : "127.0.0.1"),
+	_lfNodePort(((lfNodePort > 0)&&(lfNodePort < 65536)) ? lfNodePort : 9980),
 	_running(true),
 	_ready(false),
 	_storeOnlineState(storeOnlineState)
 {
 	_syncThread = std::thread([this]() {
 		char controllerAddress[24];
+		const uint64_t controllerAddressInt = _myId.address().toInt();
 		_myId.address().toString(controllerAddress);
+		std::string networksSelectorName("com.zerotier.controller.lfdb:"); networksSelectorName.append(controllerAddress); networksSelectorName.append("/network");
+		std::string membersSelectorName("com.zerotier.controller.lfdb:"); membersSelectorName.append(controllerAddress); membersSelectorName.append("/network/member");
 
 		httplib::Client htcli(_lfNodeHost.c_str(),_lfNodePort,600);
 		std::ostringstream query;
 		int64_t timeRangeStart = 0;
 		while (_running) {
+			{
+				std::lock_guard<std::mutex> sl(_state_l);
+				for(auto ns=_state.begin();ns!=_state.end();++ns) {
+					if (ns->second.dirty) {
+						nlohmann::json network;
+						if (get(ns->first,network)) {
+							nlohmann::json newrec;
+							newrec["Selectors"] = {{ { { "Name",networksSelectorName },{ "Ordinal",ns->first } } }};
+							newrec["Value"] = network.dump();
+							newrec["OwnerPrivate"] = _lfOwnerPrivate;
+							newrec["MaskingKey"] = controllerAddress;
+							auto resp = htcli.Post("/make",newrec.dump(),"application/json");
+							if (resp->status == 200) {
+								ns->second.dirty = false;
+							} else {
+								fprintf(stderr,"ERROR: LFDB: %d from node (create/update network): %s" ZT_EOL_S,resp->status,resp->body.c_str());
+							}
+						}
+					}
+
+					for(auto ms=ns->second.members.begin();ms!=ns->second.members.end();++ms) {
+						if ((_storeOnlineState)&&(ms->second.lastOnlineDirty)) {
+						}
+
+						if (ms->second.dirty) {
+							nlohmann::json network,member;
+							if (get(ns->first,network,ms->first,member)) {
+								nlohmann::json newrec;
+								newrec["Selectors"] = {{ { { "Name",networksSelectorName },{ "Ordinal",ns->first } },{ { "Name",membersSelectorName },{ "Ordinal",ms->first } } }};
+								newrec["Value"] = member.dump();
+								newrec["OwnerPrivate"] = _lfOwnerPrivate;
+								newrec["MaskingKey"] = controllerAddress;
+								auto resp = htcli.Post("/make",newrec.dump(),"application/json");
+								if (resp->status == 200) {
+									ms->second.dirty = false;
+								} else {
+									fprintf(stderr,"ERROR: LFDB: %d from node (create/update member): %s" ZT_EOL_S,resp->status,resp->body.c_str());
+								}
+							}
+						}
+					}
+				}
+			}
+
 			query.clear();
 			query
 				<< '{'
 					<< "\"Ranges\":[{"
-						<< "\"Name\": \"com.zerotier.controller.lfdb:" << controllerAddress << "/network\","
-						<< "\"Range\": [ 0,18446744073709551615 ]"
+						<< "\"Name\":\"" << networksSelectorName << "\","
+						<< "\"Range\":[0,18446744073709551615]"
 					<< "}],"
-					<< "\"TimeRange\": [ " << timeRangeStart << ",18446744073709551615 ],"
+					<< "\"TimeRange\":[" << timeRangeStart << ",18446744073709551615],"
 					<< "\"MaskingKey\":\"" << controllerAddress << "\","
 					<< "\"Owners\":[\"" << _lfOwnerPublic << "\"]"
 				<< '}';
@@ -79,11 +126,19 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
 							if (result.is_object()) {
 								nlohmann::json &record = result["Record"];
 								if (record.is_object()) {
-									int64_t ts = record["Timestamp"];
-									std::string value = result["Value"];
-									nlohmann::json network(OSUtils::jsonParse(value));
+									const std::string recordValue = result["Value"];
+									nlohmann::json network(OSUtils::jsonParse(recordValue));
 									if (network.is_object()) {
-										std::string idstr = network["id"];
+										const std::string idstr = network["id"];
+										const uint64_t id = Utils::hexStrToU64(idstr.c_str());
+										if ((id >> 24) == controllerAddressInt) {
+											std::lock_guard<std::mutex> sl(_state_l);
+											_NetworkState &ns = _state[id];
+											if (!ns.dirty) {
+												nlohmann::json nullJson;
+												_networkChanged(nullJson,network,false);
+											}
+										}
 									}
 								}
 							}
@@ -98,13 +153,13 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
 			query
 				<< '{'
 					<< "\"Ranges\":[{"
-						<< "\"Name\": \"com.zerotier.controller.lfdb:" << controllerAddress << "/network\","
-						<< "\"Range\": [ 0,18446744073709551615 ]"
+						<< "\"Name\":\"" << networksSelectorName << "\","
+						<< "\"Range\":[0,18446744073709551615]"
 					<< "},{"
-						<< "\"Name\": \"com.zerotier.controller.lfdb:" << controllerAddress << "/network/member\","
-						<< "\"Range\": [ 0,18446744073709551615 ]"
+						<< "\"Name\":\"" << membersSelectorName << "\","
+						<< "\"Range\":[0,18446744073709551615]"
 					<< "}],"
-					<< "\"TimeRange\": [ " << timeRangeStart << ",18446744073709551615 ],"
+					<< "\"TimeRange\":[" << timeRangeStart << ",18446744073709551615],"
 					<< "\"MaskingKey\":\"" << controllerAddress << "\","
 					<< "\"Owners\":[\"" << _lfOwnerPublic << "\"]"
 				<< '}';
@@ -119,12 +174,24 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
 							if (result.is_object()) {
 								nlohmann::json &record = result["Record"];
 								if (record.is_object()) {
-									int64_t ts = record["Timestamp"];
-									std::string value = result["Value"];
-									nlohmann::json member(OSUtils::jsonParse(value));
+									const std::string recordValue = result["Value"];
+									nlohmann::json member(OSUtils::jsonParse(recordValue));
 									if (member.is_object()) {
-										std::string nwidstr = member["nwid"];
-										std::string idstr = member["id"];
+										const std::string nwidstr = member["nwid"];
+										const std::string idstr = member["id"];
+										const uint64_t nwid = Utils::hexStrToU64(nwidstr.c_str());
+										const uint64_t id = Utils::hexStrToU64(idstr.c_str());
+										if ((id)&&((nwid >> 24) == controllerAddressInt)) {
+											std::lock_guard<std::mutex> sl(_state_l);
+											auto ns = _state.find(nwid);
+											if (ns != _state.end()) {
+												_MemberState &ms = ns->second.members[id];
+												if (!ms.dirty) {
+													nlohmann::json nullJson;
+													_memberChanged(nullJson,member,false);
+												}
+											}
+										}
 									}
 								}
 							}
@@ -138,8 +205,7 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
 			timeRangeStart = time(nullptr) - 120; // start next query 2m before now to avoid losing updates
 			_ready = true;
 
-			// Delay 2s between queries, checking running flag every 100ms
-			for(int k=0;k<20;++k) {
+			for(int k=0;k<20;++k) { // 2s delay between queries for remotely modified networks or members
 				if (!_running)
 					return;
 				std::this_thread::sleep_for(std::chrono::milliseconds(100));
@@ -184,6 +250,11 @@ void LFDB::save(nlohmann::json *orig,nlohmann::json &record)
 			nlohmann::json old;
 			get(nwid,old);
 			if ((!old.is_object())||(old != record)) {
+				_networkChanged(old,record,true);
+				{
+					std::lock_guard<std::mutex> l(_state_l);
+					_state[nwid].dirty = true;
+				}
 			}
 		}
 	} else if (objtype == "member") {
@@ -193,6 +264,11 @@ void LFDB::save(nlohmann::json *orig,nlohmann::json &record)
 			nlohmann::json network,old;
 			get(nwid,network,id,old);
 			if ((!old.is_object())||(old != record)) {
+				_memberChanged(old,record,true);
+				{
+					std::lock_guard<std::mutex> l(_state_l);
+					_state[nwid].members[id].dirty = true;
+				}
 			}
 		}
 	}
@@ -223,206 +299,4 @@ void LFDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const I
 	}
 }
 
-#if 0
-FileDB::FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) :
-	DB(nc,myId,path),
-	_networksPath(_path + ZT_PATH_SEPARATOR_S + "network"),
-	_tracePath(_path + ZT_PATH_SEPARATOR_S + "trace"),
-	_onlineChanged(false),
-	_running(true)
-{
-	OSUtils::mkdir(_path.c_str());
-	OSUtils::lockDownFile(_path.c_str(),true);
-	OSUtils::mkdir(_networksPath.c_str());
-	OSUtils::mkdir(_tracePath.c_str());
-
-	std::vector<std::string> networks(OSUtils::listDirectory(_networksPath.c_str(),false));
-	std::string buf;
-	for(auto n=networks.begin();n!=networks.end();++n) {
-		buf.clear();
-		if ((n->length() == 21)&&(OSUtils::readFile((_networksPath + ZT_PATH_SEPARATOR_S + *n).c_str(),buf))) {
-			try {
-				nlohmann::json network(OSUtils::jsonParse(buf));
-				const std::string nwids = network["id"];
-				if (nwids.length() == 16) {
-					nlohmann::json nullJson;
-					_networkChanged(nullJson,network,false);
-					std::string membersPath(_networksPath + ZT_PATH_SEPARATOR_S + nwids + ZT_PATH_SEPARATOR_S "member");
-					std::vector<std::string> members(OSUtils::listDirectory(membersPath.c_str(),false));
-					for(auto m=members.begin();m!=members.end();++m) {
-						buf.clear();
-						if ((m->length() == 15)&&(OSUtils::readFile((membersPath + ZT_PATH_SEPARATOR_S + *m).c_str(),buf))) {
-							try {
-								nlohmann::json member(OSUtils::jsonParse(buf));
-								const std::string addrs = member["id"];
-								if (addrs.length() == 10) {
-									nlohmann::json nullJson2;
-									_memberChanged(nullJson2,member,false);
-								}
-							} catch ( ... ) {}
-						}
-					}
-				}
-			} catch ( ... ) {}
-		}
-	}
-
-	_onlineUpdateThread = std::thread([this]() {
-		unsigned int cnt = 0;
-		while (this->_running) {
-			std::this_thread::sleep_for(std::chrono::microseconds(100));
-			if ((++cnt % 20) == 0) { // 5 seconds
-				std::lock_guard<std::mutex> l(this->_online_l);
-				if (!this->_running) return;
-				if (this->_onlineChanged) {
-					char p[4096],atmp[64];
-					for(auto nw=this->_online.begin();nw!=this->_online.end();++nw) {
-						OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx-online.json",_networksPath.c_str(),(unsigned long long)nw->first);
-						FILE *f = fopen(p,"wb");
-						if (f) {
-							fprintf(f,"{");
-							const char *memberPrefix = "";
-							for(auto m=nw->second.begin();m!=nw->second.end();++m) {
-								fprintf(f,"%s\"%.10llx\":{" ZT_EOL_S,memberPrefix,(unsigned long long)m->first);
-								memberPrefix = ",";
-								InetAddress lastAddr;
-								const char *timestampPrefix = " ";
-								int cnt = 0;
-								for(auto ts=m->second.rbegin();ts!=m->second.rend();) {
-									if (cnt < 25) {
-										if (lastAddr != ts->second) {
-											lastAddr = ts->second;
-											fprintf(f,"%s\"%lld\":\"%s\"" ZT_EOL_S,timestampPrefix,(long long)ts->first,ts->second.toString(atmp));
-											timestampPrefix = ",";
-											++cnt;
-											++ts;
-										} else {
-											ts = std::map<int64_t,InetAddress>::reverse_iterator(m->second.erase(std::next(ts).base()));
-										}
-									} else {
-										ts = std::map<int64_t,InetAddress>::reverse_iterator(m->second.erase(std::next(ts).base()));
-									}
-								}
-								fprintf(f,"}");
-							}
-							fprintf(f,"}" ZT_EOL_S);
-							fclose(f);
-						}
-					}
-					this->_onlineChanged = false;
-				}
-			}
-		}
-	});
-}
-
-FileDB::~FileDB()
-{
-	try {
-		_online_l.lock();
-		_running = false;
-		_online_l.unlock();
-		_onlineUpdateThread.join();
-	} catch ( ... ) {}
-}
-
-bool FileDB::waitForReady() { return true; }
-bool FileDB::isReady() { return true; }
-
-void FileDB::save(nlohmann::json *orig,nlohmann::json &record)
-{
-	char p1[4096],p2[4096],pb[4096];
-	try {
-		if (orig) {
-			if (*orig != record) {
-				record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
-			}
-		} else {
-			record["revision"] = 1;
-		}
-
-		const std::string objtype = record["objtype"];
-		if (objtype == "network") {
-			const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL);
-			if (nwid) {
-				nlohmann::json old;
-				get(nwid,old);
-				if ((!old.is_object())||(old != record)) {
-					OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid);
-					if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)))
-						fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
-					_networkChanged(old,record,true);
-				}
-			}
-		} else if (objtype == "member") {
-			const uint64_t id = OSUtils::jsonIntHex(record["id"],0ULL);
-			const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"],0ULL);
-			if ((id)&&(nwid)) {
-				nlohmann::json network,old;
-				get(nwid,network,id,old);
-				if ((!old.is_object())||(old != record)) {
-					OSUtils::ztsnprintf(pb,sizeof(pb),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)nwid);
-					OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.10llx.json",pb,(unsigned long long)id);
-					if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) {
-						OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx",_networksPath.c_str(),(unsigned long long)nwid);
-						OSUtils::mkdir(p2);
-						OSUtils::mkdir(pb);
-						if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)))
-							fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
-					}
-					_memberChanged(old,record,true);
-				}
-			}
-		} else if (objtype == "trace") {
-			const std::string id = record["id"];
-			if (id.length() > 0) {
-				OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%s.json",_tracePath.c_str(),id.c_str());
-				OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1));
-			}
-		}
-	} catch ( ... ) {} // drop invalid records missing fields
-}
-
-void FileDB::eraseNetwork(const uint64_t networkId)
-{
-	nlohmann::json network,nullJson;
-	get(networkId,network);
-	char p[16384];
-	OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),networkId);
-	OSUtils::rm(p);
-	OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx-online.json",_networksPath.c_str(),networkId);
-	OSUtils::rm(p);
-	OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)networkId);
-	OSUtils::rmDashRf(p);
-	_networkChanged(network,nullJson,true);
-	std::lock_guard<std::mutex> l(this->_online_l);
-	this->_online.erase(networkId);
-	this->_onlineChanged = true;
-}
-
-void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
-{
-	nlohmann::json network,member,nullJson;
-	get(networkId,network);
-	get(memberId,member);
-	char p[4096];
-	OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member" ZT_PATH_SEPARATOR_S "%.10llx.json",_networksPath.c_str(),networkId,memberId);
-	OSUtils::rm(p);
-	_memberChanged(member,nullJson,true);
-	std::lock_guard<std::mutex> l(this->_online_l);
-	this->_online[networkId].erase(memberId);
-	this->_onlineChanged = true;
-}
-
-void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress)
-{
-	char mid[32],atmp[64];
-	OSUtils::ztsnprintf(mid,sizeof(mid),"%.10llx",(unsigned long long)memberId);
-	physicalAddress.toString(atmp);
-	std::lock_guard<std::mutex> l(this->_online_l);
-	this->_online[networkId][memberId][OSUtils::now()] = physicalAddress;
-	this->_onlineChanged = true;
-}
-#endif
-
 } // namespace ZeroTier

+ 1 - 6
controller/LFDB.hpp

@@ -66,8 +66,7 @@ protected:
 	EmbeddedNetworkController *const _nc;
 	const Identity _myId;
 
-	std::string _lfOwnerPrivate;
-	std::string _lfOwnerPublic;
+	std::string _lfOwnerPrivate,_lfOwnerPublic;
 	std::string _lfNodeHost;
 	int _lfNodePort;
 
@@ -76,12 +75,10 @@ protected:
 		_MemberState() :
 			lastOnlineAddress(),
 			lastOnlineTime(0),
-			recordTimestamp(0),
 			dirty(false),
 			lastOnlineDirty(false) {}
 		InetAddress lastOnlineAddress;
 		int64_t lastOnlineTime;
-		int64_t recordTimestamp;
 		bool dirty;
 		bool lastOnlineDirty;
 	};
@@ -89,10 +86,8 @@ protected:
 	{
 		_NetworkState() :
 			members(),
-			recordTimestamp(0),
 			dirty(false) {}
 		std::unordered_map<uint64_t,_MemberState> members;
-		int64_t recordTimestamp;
 		bool dirty;
 	};
 	std::unordered_map<uint64_t,_NetworkState> _state;