Bläddra i källkod

Controller optimizations -- make locking more fine-grained, use true hardware concurrency, etc.

Adam Ierymenko 8 år sedan
förälder
incheckning
cafbe44dde

+ 71 - 122
controller/EmbeddedNetworkController.cpp

@@ -32,6 +32,7 @@
 #include <stdexcept>
 #include <set>
 #include <map>
+#include <thread>
 
 #include "../include/ZeroTierOne.h"
 #include "../node/Constants.hpp"
@@ -430,7 +431,6 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule)
 
 EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath) :
 	_startTime(OSUtils::now()),
-	_threadsStarted(false),
 	_db(dbPath),
 	_node(node)
 {
@@ -439,11 +439,11 @@ EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPa
 EmbeddedNetworkController::~EmbeddedNetworkController()
 {
 	Mutex::Lock _l(_threads_m);
-	if (_threadsStarted) {
-		for(int i=0;i<(ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT*2);++i)
+	if (_threads.size() > 0) {
+		for(unsigned long i=0;i<(((unsigned long)_threads.size())*2);++i)
 			_queue.post((_RQEntry *)0);
-		for(int i=0;i<ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT;++i)
-			Thread::join(_threads[i]);
+		for(std::vector<Thread>::iterator i(_threads.begin());i!=_threads.end();++i)
+			Thread::join(*i);
 	}
 }
 
@@ -465,11 +465,13 @@ void EmbeddedNetworkController::request(
 
 	{
 		Mutex::Lock _l(_threads_m);
-		if (!_threadsStarted) {
-			for(int i=0;i<ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT;++i)
-				_threads[i] = Thread::start(this);
+		if (_threads.size() == 0) {
+			long hwc = (long)std::thread::hardware_concurrency();
+			if (hwc <= 0)
+				hwc = 1;
+			for(long i=0;i<hwc;++i)
+				_threads.push_back(Thread::start(this));
 		}
-		_threadsStarted = true;
 	}
 
 	_RQEntry *qe = new _RQEntry;
@@ -496,11 +498,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
 			char nwids[24];
 			Utils::snprintf(nwids,sizeof(nwids),"%.16llx",(unsigned long long)nwid);
 
-			json network;
-			{
-				Mutex::Lock _l(_db_m);
-				network = _db.get("network",nwids);
-			}
+			json network(_db.get("network",nwids));
 			if (!network.size())
 				return 404;
 
@@ -510,24 +508,13 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
 
 					if (path.size() >= 4) {
 						const uint64_t address = Utils::hexStrToU64(path[3].c_str());
-
-						json member;
-						{
-							Mutex::Lock _l(_db_m);
-							member = _db.get("network",nwids,"member",Address(address).toString());
-						}
+						json member(_db.get("network",nwids,"member",Address(address).toString()));
 						if (!member.size())
 							return 404;
-
 						_addMemberNonPersistedFields(member,OSUtils::now());
 						responseBody = OSUtils::jsonDump(member);
 						responseContentType = "application/json";
-
-						return 200;
 					} else {
-
-						Mutex::Lock _l(_db_m);
-
 						responseBody = "{";
 						_db.filter((std::string("network/") + nwids + "/member/"),[&responseBody](const std::string &n,const json &member) {
 							if ((member.is_object())&&(member.size() > 0)) {
@@ -540,9 +527,8 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
 						});
 						responseBody.push_back('}');
 						responseContentType = "application/json";
-
-						return 200;
 					}
+					return 200;
 
 				} // else 404
 
@@ -560,14 +546,11 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
 		} else if (path.size() == 1) {
 
 			std::set<std::string> networkIds;
-			{
-				Mutex::Lock _l(_db_m);
-				_db.filter("network/",[&networkIds](const std::string &n,const json &obj) {
-					if (n.length() == (16 + 8))
-						networkIds.insert(n.substr(8));
-					return true; // do not delete
-				});
-			}
+			_db.filter("network/",[&networkIds](const std::string &n,const json &obj) {
+				if (n.length() == (16 + 8))
+					networkIds.insert(n.substr(8));
+				return true; // do not delete
+			});
 
 			responseBody.push_back('[');
 			for(std::set<std::string>::iterator i(networkIds.begin());i!=networkIds.end();++i) {
@@ -634,11 +617,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
 					char addrs[24];
 					Utils::snprintf(addrs,sizeof(addrs),"%.10llx",(unsigned long long)address);
 
-					json member;
-					{
-						Mutex::Lock _l(_db_m);
-						member = _db.get("network",nwids,"member",Address(address).toString());
-					}
+					json member(_db.get("network",nwids,"member",Address(address).toString()));
 					json origMember(member); // for detecting changes
 					_initMember(member);
 
@@ -735,10 +714,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
 						member["lastModified"] = now;
 						json &revj = member["revision"];
 						member["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL);
-						{
-							Mutex::Lock _l(_db_m);
-							_db.put("network",nwids,"member",Address(address).toString(),member);
-						}
+						_db.put("network",nwids,"member",Address(address).toString(),member);
 						_pushMemberUpdate(now,nwid,member);
 					}
 
@@ -806,31 +782,26 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
 			} else {
 				// POST to network ID
 
-				json network;
-				{
-					Mutex::Lock _l(_db_m);
-
-					// Magic ID ending with ______ picks a random unused network ID
-					if (path[1].substr(10) == "______") {
-						nwid = 0;
-						uint64_t nwidPrefix = (Utils::hexStrToU64(path[1].substr(0,10).c_str()) << 24) & 0xffffffffff000000ULL;
-						uint64_t nwidPostfix = 0;
-						for(unsigned long k=0;k<100000;++k) { // sanity limit on trials
-							Utils::getSecureRandom(&nwidPostfix,sizeof(nwidPostfix));
-							uint64_t tryNwid = nwidPrefix | (nwidPostfix & 0xffffffULL);
-							if ((tryNwid & 0xffffffULL) == 0ULL) tryNwid |= 1ULL;
-							Utils::snprintf(nwids,sizeof(nwids),"%.16llx",(unsigned long long)tryNwid);
-							if (_db.get("network",nwids).size() <= 0) {
-								nwid = tryNwid;
-								break;
-							}
+				// Magic ID ending with ______ picks a random unused network ID
+				if (path[1].substr(10) == "______") {
+					nwid = 0;
+					uint64_t nwidPrefix = (Utils::hexStrToU64(path[1].substr(0,10).c_str()) << 24) & 0xffffffffff000000ULL;
+					uint64_t nwidPostfix = 0;
+					for(unsigned long k=0;k<100000;++k) { // sanity limit on trials
+						Utils::getSecureRandom(&nwidPostfix,sizeof(nwidPostfix));
+						uint64_t tryNwid = nwidPrefix | (nwidPostfix & 0xffffffULL);
+						if ((tryNwid & 0xffffffULL) == 0ULL) tryNwid |= 1ULL;
+						Utils::snprintf(nwids,sizeof(nwids),"%.16llx",(unsigned long long)tryNwid);
+						if (_db.get("network",nwids).size() <= 0) {
+							nwid = tryNwid;
+							break;
 						}
-						if (!nwid)
-							return 503;
 					}
-
-					network = _db.get("network",nwids);
+					if (!nwid)
+						return 503;
 				}
+				json network(_db.get("network",nwids));
+
 				json origNetwork(network); // for detecting changes
 				_initNetwork(network);
 
@@ -1044,10 +1015,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
 					json &revj = network["revision"];
 					network["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL);
 					network["lastModified"] = now;
-					{
-						Mutex::Lock _l(_db_m);
-						_db.put("network",nwids,network);
-					}
+					_db.put("network",nwids,network);
 
 					// Send an update to all members of the network
 					_db.filter((std::string("network/") + nwids + "/member/"),[this,&now,&nwid](const std::string &n,const json &obj) {
@@ -1101,11 +1069,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE(
 
 			char nwids[24];
 			Utils::snprintf(nwids,sizeof(nwids),"%.16llx",nwid);
-			json network;
-			{
-				Mutex::Lock _l(_db_m);
-				network = _db.get("network",nwids);
-			}
+			json network(_db.get("network",nwids));
 			if (!network.size())
 				return 404;
 
@@ -1113,8 +1077,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE(
 				if ((path.size() == 4)&&(path[2] == "member")&&(path[3].length() == 10)) {
 					const uint64_t address = Utils::hexStrToU64(path[3].c_str());
 
-					Mutex::Lock _l(_db_m);
-
 					json member = _db.get("network",nwids,"member",Address(address).toString());
 					_db.erase("network",nwids,"member",Address(address).toString());
 
@@ -1125,8 +1087,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE(
 					return 200;
 				}
 			} else {
-				Mutex::Lock _l(_db_m);
-
 				std::string pfx("network/");
 				pfx.append(nwids);
 				_db.filter(pfx,[](const std::string &n,const json &obj) {
@@ -1226,7 +1186,6 @@ void EmbeddedNetworkController::_circuitTestCallback(ZT_Node *node,ZT_CircuitTes
 		reinterpret_cast<const InetAddress *>(&(report->receivedFromRemoteAddress))->toString().c_str(),
 		((double)report->receivedFromLinkQuality / (double)ZT_PATH_LINK_QUALITY_MAX));
 
-	Mutex::Lock _l(self->_db_m);
 	self->_db.writeRaw(id,std::string(tmp));
 }
 
@@ -1252,13 +1211,8 @@ void EmbeddedNetworkController::_request(
 
 	char nwids[24];
 	Utils::snprintf(nwids,sizeof(nwids),"%.16llx",nwid);
-	json network;
-	json member;
-	{
-		Mutex::Lock _l(_db_m);
-		network = _db.get("network",nwids);
-		member = _db.get("network",nwids,"member",identity.address().toString());
-	}
+	json network(_db.get("network",nwids));
+	json member(_db.get("network",nwids,"member",identity.address().toString()));
 
 	if (!network.size()) {
 		_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_OBJECT_NOT_FOUND);
@@ -1403,7 +1357,6 @@ void EmbeddedNetworkController::_request(
 	if (!authorizedBy) {
 		if (origMember != member) {
 			member["lastModified"] = now;
-			Mutex::Lock _l(_db_m);
 			_db.put("network",nwids,"member",identity.address().toString(),member);
 		}
 		_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_ACCESS_DENIED);
@@ -1759,7 +1712,6 @@ void EmbeddedNetworkController::_request(
 
 	if (member != origMember) {
 		member["lastModified"] = now;
-		Mutex::Lock _l(_db_m);
 		_db.put("network",nwids,"member",identity.address().toString(),member);
 	}
 
@@ -1780,45 +1732,42 @@ void EmbeddedNetworkController::_getNetworkMemberInfo(uint64_t now,uint64_t nwid
 		}
 	}
 
-	{
-		Mutex::Lock _l(_db_m);
-		_db.filter(pfx,[&nmi,&now](const std::string &n,const json &member) {
-			try {
-				if (OSUtils::jsonBool(member["authorized"],false)) {
-					++nmi.authorizedMemberCount;
-
-					if (member.count("recentLog")) {
-						const json &mlog = member["recentLog"];
-						if ((mlog.is_array())&&(mlog.size() > 0)) {
-							const json &mlog1 = mlog[0];
-							if (mlog1.is_object()) {
-								if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < ZT_NETCONF_NODE_ACTIVE_THRESHOLD)
-									++nmi.activeMemberCount;
-							}
+	_db.filter(pfx,[&nmi,&now](const std::string &n,const json &member) {
+		try {
+			if (OSUtils::jsonBool(member["authorized"],false)) {
+				++nmi.authorizedMemberCount;
+
+				if (member.count("recentLog")) {
+					const json &mlog = member["recentLog"];
+					if ((mlog.is_array())&&(mlog.size() > 0)) {
+						const json &mlog1 = mlog[0];
+						if (mlog1.is_object()) {
+							if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < ZT_NETCONF_NODE_ACTIVE_THRESHOLD)
+								++nmi.activeMemberCount;
 						}
 					}
+				}
 
-					if (OSUtils::jsonBool(member["activeBridge"],false)) {
-						nmi.activeBridges.insert(Address(Utils::hexStrToU64(OSUtils::jsonString(member["id"],"0000000000").c_str())));
-					}
+				if (OSUtils::jsonBool(member["activeBridge"],false)) {
+					nmi.activeBridges.insert(Address(Utils::hexStrToU64(OSUtils::jsonString(member["id"],"0000000000").c_str())));
+				}
 
-					if (member.count("ipAssignments")) {
-						const json &mips = member["ipAssignments"];
-						if (mips.is_array()) {
-							for(unsigned long i=0;i<mips.size();++i) {
-								InetAddress mip(OSUtils::jsonString(mips[i],""));
-								if ((mip.ss_family == AF_INET)||(mip.ss_family == AF_INET6))
-									nmi.allocatedIps.insert(mip);
-							}
+				if (member.count("ipAssignments")) {
+					const json &mips = member["ipAssignments"];
+					if (mips.is_array()) {
+						for(unsigned long i=0;i<mips.size();++i) {
+							InetAddress mip(OSUtils::jsonString(mips[i],""));
+							if ((mip.ss_family == AF_INET)||(mip.ss_family == AF_INET6))
+								nmi.allocatedIps.insert(mip);
 						}
 					}
-				} else {
-					nmi.mostRecentDeauthTime = std::max(nmi.mostRecentDeauthTime,OSUtils::jsonInt(member["lastDeauthorizedTime"],0ULL));
 				}
-			} catch ( ... ) {}
-			return true;
-		});
-	}
+			} else {
+				nmi.mostRecentDeauthTime = std::max(nmi.mostRecentDeauthTime,OSUtils::jsonInt(member["lastDeauthorizedTime"],0ULL));
+			}
+		} catch ( ... ) {}
+		return true;
+	});
 	nmi.nmiTimestamp = now;
 
 	{

+ 1 - 6
controller/EmbeddedNetworkController.hpp

@@ -43,9 +43,6 @@
 
 #include "JSONDB.hpp"
 
-// Number of background threads to start -- not actually started until needed
-#define ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT 4
-
 // TTL for circuit tests
 #define ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION 120000
 
@@ -182,15 +179,13 @@ private:
 	const uint64_t _startTime;
 
 	BlockingQueue<_RQEntry *> _queue;
-	Thread _threads[ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT];
-	bool _threadsStarted;
+	std::vector<Thread> _threads;
 	Mutex _threads_m;
 
 	std::map<uint64_t,_NetworkMemberInfo> _nmiCache;
 	Mutex _nmiCache_m;
 
 	JSONDB _db;
-	Mutex _db_m;
 
 	Node *const _node;
 	std::string _path;

+ 24 - 12
controller/JSONDB.cpp

@@ -78,22 +78,29 @@ bool JSONDB::writeRaw(const std::string &n,const std::string &obj)
 bool JSONDB::put(const std::string &n,const nlohmann::json &obj)
 {
 	const bool r = writeRaw(n,OSUtils::jsonDump(obj));
-	_db[n].obj = obj;
+	{
+		Mutex::Lock _l(_db_m);
+		_db[n].obj = obj;
+	}
 	return r;
 }
 
-const nlohmann::json &JSONDB::get(const std::string &n)
+nlohmann::json JSONDB::get(const std::string &n)
 {
-	while (!_ready) {
-		Thread::sleep(250);
-		_ready = _reload(_basePath,std::string());
-	}
+	{
+		Mutex::Lock _l(_db_m);
 
-	if (!_isValidObjectName(n))
-		return _EMPTY_JSON;
-	std::map<std::string,_E>::iterator e(_db.find(n));
-	if (e != _db.end())
-		return e->second.obj;
+		while (!_ready) {
+			Thread::sleep(250);
+			_ready = _reload(_basePath,std::string());
+		}
+
+		if (!_isValidObjectName(n))
+			return _EMPTY_JSON;
+		std::map<std::string,_E>::iterator e(_db.find(n));
+		if (e != _db.end())
+			return e->second.obj;
+	}
 
 	std::string buf;
 	if (_httpAddr) {
@@ -110,6 +117,7 @@ const nlohmann::json &JSONDB::get(const std::string &n)
 	}
 
 	try {
+		Mutex::Lock _l(_db_m);
 		_E &e2 = _db[n];
 		e2.obj = OSUtils::jsonParse(buf);
 		return e2.obj;
@@ -135,11 +143,15 @@ void JSONDB::erase(const std::string &n)
 		OSUtils::rm(path.c_str());
 	}
 
-	_db.erase(n);
+	{
+		Mutex::Lock _l(_db_m);
+		_db.erase(n);
+	}
 }
 
 bool JSONDB::_reload(const std::string &p,const std::string &b)
 {
+	// Assumes _db_m is locked
 	if (_httpAddr) {
 		std::string body;
 		std::map<std::string,std::string> headers;

+ 8 - 7
controller/JSONDB.hpp

@@ -51,18 +51,16 @@ public:
 	bool writeRaw(const std::string &n,const std::string &obj);
 
 	bool put(const std::string &n,const nlohmann::json &obj);
-
 	inline bool put(const std::string &n1,const std::string &n2,const nlohmann::json &obj) { return this->put((n1 + "/" + n2),obj); }
 	inline bool put(const std::string &n1,const std::string &n2,const std::string &n3,const nlohmann::json &obj) { return this->put((n1 + "/" + n2 + "/" + n3),obj); }
 	inline bool put(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4,const nlohmann::json &obj) { return this->put((n1 + "/" + n2 + "/" + n3 + "/" + n4),obj); }
 	inline bool put(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4,const std::string &n5,const nlohmann::json &obj) { return this->put((n1 + "/" + n2 + "/" + n3 + "/" + n4 + "/" + n5),obj); }
 
-	const nlohmann::json &get(const std::string &n);
-
-	inline const nlohmann::json &get(const std::string &n1,const std::string &n2) { return this->get((n1 + "/" + n2)); }
-	inline const nlohmann::json &get(const std::string &n1,const std::string &n2,const std::string &n3) { return this->get((n1 + "/" + n2 + "/" + n3)); }
-	inline const nlohmann::json &get(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4) { return this->get((n1 + "/" + n2 + "/" + n3 + "/" + n4)); }
-	inline const nlohmann::json &get(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4,const std::string &n5) { return this->get((n1 + "/" + n2 + "/" + n3 + "/" + n4 + "/" + n5)); }
+	nlohmann::json get(const std::string &n);
+	inline nlohmann::json get(const std::string &n1,const std::string &n2) { return this->get((n1 + "/" + n2)); }
+	inline nlohmann::json get(const std::string &n1,const std::string &n2,const std::string &n3) { return this->get((n1 + "/" + n2 + "/" + n3)); }
+	inline nlohmann::json get(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4) { return this->get((n1 + "/" + n2 + "/" + n3 + "/" + n4)); }
+	inline nlohmann::json get(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4,const std::string &n5) { return this->get((n1 + "/" + n2 + "/" + n3 + "/" + n4 + "/" + n5)); }
 
 	void erase(const std::string &n);
 
@@ -74,6 +72,8 @@ public:
 	template<typename F>
 	inline void filter(const std::string &prefix,F func)
 	{
+		Mutex::Lock _l(_db_m);
+
 		while (!_ready) {
 			Thread::sleep(250);
 			_ready = _reload(_basePath,std::string());
@@ -108,6 +108,7 @@ private:
 	InetAddress _httpAddr;
 	std::string _basePath;
 	std::map<std::string,_E> _db;
+	Mutex _db_m;
 	volatile bool _ready;
 };
 

+ 2 - 0
selftest.cpp

@@ -25,6 +25,7 @@
 #include <iostream>
 #include <string>
 #include <vector>
+#include <thread>
 
 #include "node/Constants.hpp"
 #include "node/Hashtable.hpp"
@@ -1114,6 +1115,7 @@ int main(int argc,char **argv)
 	*/
 
 	std::cout << "[info] sizeof(void *) == " << sizeof(void *) << std::endl;
+	std::cout << "[info] hardware concurrency == " << std::thread::hardware_concurrency() << std::endl;
 	std::cout << "[info] sizeof(NetworkConfig) == " << sizeof(ZeroTier::NetworkConfig) << std::endl;
 
 	srand((unsigned int)time(0));