2
0
Эх сурвалжийг харах

JSONDB performance improvements, threading fix.

Adam Ierymenko 8 жил өмнө
parent
commit
4f2a779769

+ 31 - 28
controller/EmbeddedNetworkController.cpp

@@ -431,6 +431,7 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule)
 
 EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath) :
 	_startTime(OSUtils::now()),
+	_running(true),
 	_db(dbPath),
 	_node(node)
 {
@@ -438,12 +439,19 @@ EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPa
 
 EmbeddedNetworkController::~EmbeddedNetworkController()
 {
-	Mutex::Lock _l(_threads_m);
-	if (_threads.size() > 0) {
-		for(unsigned long i=0;i<(((unsigned long)_threads.size())*2);++i)
+	_running = false;
+	std::vector<Thread> t;
+	{
+		Mutex::Lock _l(_threads_m);
+		t = _threads;
+	}
+	if (t.size() > 0) {
+		for(unsigned long i=0,j=(unsigned long)(t.size() * 4);i<j;++i)
 			_queue.post((_RQEntry *)0);
-		for(std::vector<Thread>::iterator i(_threads.begin());i!=_threads.end();++i)
+		/*
+		for(std::vector<Thread>::iterator i(t.begin());i!=t.end();++i)
 			Thread::join(*i);
+		*/
 	}
 }
 
@@ -1111,23 +1119,23 @@ void EmbeddedNetworkController::threadMain()
 	throw()
 {
 	uint64_t lastCircuitTestCheck = 0;
-	for(;;) {
-		_RQEntry *const qe = _queue.get(); // waits on next request
-		if (!qe) break; // enqueue a NULL to terminate threads
+	_RQEntry *qe = (_RQEntry *)0;
+	while ((_running)&&((qe = _queue.get()))) {
 		try {
 			_request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData);
 		} catch ( ... ) {}
 		delete qe;
-
-		uint64_t now = OSUtils::now();
-		if ((now - lastCircuitTestCheck) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) {
-			lastCircuitTestCheck = now;
-			Mutex::Lock _l(_tests_m);
-			for(std::list< ZT_CircuitTest >::iterator i(_tests.begin());i!=_tests.end();) {
-				if ((now - i->timestamp) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) {
-					_node->circuitTestEnd(&(*i));
-					_tests.erase(i++);
-				} else ++i;
+		if (_running) {
+			uint64_t now = OSUtils::now();
+			if ((now - lastCircuitTestCheck) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) {
+				lastCircuitTestCheck = now;
+				Mutex::Lock _l(_tests_m);
+				for(std::list< ZT_CircuitTest >::iterator i(_tests.begin());i!=_tests.end();) {
+					if ((now - i->timestamp) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) {
+						_node->circuitTestEnd(&(*i));
+						_tests.erase(i++);
+					} else ++i;
+				}
 			}
 		}
 	}
@@ -1723,13 +1731,11 @@ void EmbeddedNetworkController::_getNetworkMemberInfo(uint64_t now,uint64_t nwid
 	char pfx[256];
 	Utils::snprintf(pfx,sizeof(pfx),"network/%.16llx/member",nwid);
 
-	{
-		Mutex::Lock _l(_nmiCache_m);
-		std::map<uint64_t,_NetworkMemberInfo>::iterator c(_nmiCache.find(nwid));
-		if ((c != _nmiCache.end())&&((now - c->second.nmiTimestamp) < 1000)) { // a short duration cache but limits CPU use on big networks
-			nmi = c->second;
-			return;
-		}
+	Mutex::Lock _l(_nmiCache_m);
+	std::map<uint64_t,_NetworkMemberInfo>::iterator c(_nmiCache.find(nwid));
+	if ((c != _nmiCache.end())&&((now - c->second.nmiTimestamp) < 1000)) { // a short duration cache but limits CPU use on big networks
+		nmi = c->second;
+		return;
 	}
 
 	_db.filter(pfx,[&nmi,&now](const std::string &n,const json &member) {
@@ -1770,10 +1776,7 @@ void EmbeddedNetworkController::_getNetworkMemberInfo(uint64_t now,uint64_t nwid
 	});
 	nmi.nmiTimestamp = now;
 
-	{
-		Mutex::Lock _l(_nmiCache_m);
-		_nmiCache[nwid] = nmi;
-	}
+	_nmiCache[nwid] = nmi;
 }
 
 void EmbeddedNetworkController::_pushMemberUpdate(uint64_t now,uint64_t nwid,const nlohmann::json &member)

+ 1 - 0
controller/EmbeddedNetworkController.hpp

@@ -178,6 +178,7 @@ private:
 
 	const uint64_t _startTime;
 
+	volatile bool _running;
 	BlockingQueue<_RQEntry *> _queue;
 	std::vector<Thread> _threads;
 	Mutex _threads_m;

+ 31 - 26
controller/JSONDB.cpp

@@ -50,7 +50,7 @@ JSONDB::JSONDB(const std::string &basePath) :
 		OSUtils::mkdir(_basePath.c_str());
 		OSUtils::lockDownFile(_basePath.c_str(),true); // networks might contain auth tokens, etc., so restrict directory permissions
 	}
-	_ready = _reload(_basePath,std::string());
+	_reload(_basePath,std::string());
 }
 
 bool JSONDB::writeRaw(const std::string &n,const std::string &obj)
@@ -87,16 +87,16 @@ bool JSONDB::put(const std::string &n,const nlohmann::json &obj)
 
 nlohmann::json JSONDB::get(const std::string &n)
 {
-	{
-		Mutex::Lock _l(_db_m);
+	while (!_ready) {
+		Thread::sleep(250);
+		_reload(_basePath,std::string());
+	}
 
-		while (!_ready) {
-			Thread::sleep(250);
-			_ready = _reload(_basePath,std::string());
-		}
+	if (!_isValidObjectName(n))
+		return _EMPTY_JSON;
 
-		if (!_isValidObjectName(n))
-			return _EMPTY_JSON;
+	{
+		Mutex::Lock _l(_db_m);
 		std::map<std::string,_E>::iterator e(_db.find(n));
 		if (e != _db.end())
 			return e->second.obj;
@@ -116,14 +116,16 @@ nlohmann::json JSONDB::get(const std::string &n)
 			return _EMPTY_JSON;
 	}
 
-	try {
+	{
 		Mutex::Lock _l(_db_m);
-		_E &e2 = _db[n];
-		e2.obj = OSUtils::jsonParse(buf);
-		return e2.obj;
-	} catch ( ... ) {
-		_db.erase(n);
-		return _EMPTY_JSON;
+		try {
+			_E &e2 = _db[n];
+			e2.obj = OSUtils::jsonParse(buf);
+			return e2.obj;
+		} catch ( ... ) {
+			_db.erase(n);
+			return _EMPTY_JSON;
+		}
 	}
 }
 
@@ -131,7 +133,15 @@ void JSONDB::erase(const std::string &n)
 {
 	if (!_isValidObjectName(n))
 		return;
+	_erase(n);
+	{
+		Mutex::Lock _l(_db_m);
+		_db.erase(n);
+	}
+}
 
+void JSONDB::_erase(const std::string &n)
+{
 	if (_httpAddr) {
 		std::string body;
 		std::map<std::string,std::string> headers;
@@ -142,17 +152,12 @@ void JSONDB::erase(const std::string &n)
 			return;
 		OSUtils::rm(path.c_str());
 	}
-
-	{
-		Mutex::Lock _l(_db_m);
-		_db.erase(n);
-	}
 }
 
-bool JSONDB::_reload(const std::string &p,const std::string &b)
+void JSONDB::_reload(const std::string &p,const std::string &b)
 {
-	// Assumes _db_m is locked
 	if (_httpAddr) {
+		Mutex::Lock _l(_db_m);
 		std::string body;
 		std::map<std::string,std::string> headers;
 		const unsigned int sc = Http::GET(2147483647,ZT_JSONDB_HTTP_TIMEOUT,reinterpret_cast<const struct sockaddr *>(&_httpAddr),_basePath.c_str(),_ZT_JSONDB_GET_HEADERS,headers,body);
@@ -161,18 +166,19 @@ bool JSONDB::_reload(const std::string &p,const std::string &b)
 				nlohmann::json dbImg(OSUtils::jsonParse(body));
 				std::string tmp;
 				if (dbImg.is_object()) {
+					_db.clear();
 					for(nlohmann::json::iterator i(dbImg.begin());i!=dbImg.end();++i) {
 						if (i.value().is_object()) {
 							tmp = i.key();
 							_db[tmp].obj = i.value();
 						}
 					}
-					return true;
+					_ready = true;
 				}
 			} catch ( ... ) {} // invalid JSON, so maybe incomplete request
 		}
-		return false;
 	} else {
+		_ready = true;
 		std::vector<std::string> dl(OSUtils::listDirectory(p.c_str(),true));
 		for(std::vector<std::string>::const_iterator di(dl.begin());di!=dl.end();++di) {
 			if ((di->length() > 5)&&(di->substr(di->length() - 5) == ".json")) {
@@ -181,7 +187,6 @@ bool JSONDB::_reload(const std::string &p,const std::string &b)
 				this->_reload((p + ZT_PATH_SEPARATOR + *di),(b + *di + ZT_PATH_SEPARATOR));
 			}
 		}
-		return true;
 	}
 }
 

+ 15 - 16
controller/JSONDB.hpp

@@ -72,29 +72,28 @@ public:
 	template<typename F>
 	inline void filter(const std::string &prefix,F func)
 	{
-		Mutex::Lock _l(_db_m);
-
 		while (!_ready) {
 			Thread::sleep(250);
-			_ready = _reload(_basePath,std::string());
+			_reload(_basePath,std::string());
 		}
-
-		for(std::map<std::string,_E>::iterator i(_db.lower_bound(prefix));i!=_db.end();) {
-			if ((i->first.length() >= prefix.length())&&(!memcmp(i->first.data(),prefix.data(),prefix.length()))) {
-				if (!func(i->first,get(i->first))) {
-					std::map<std::string,_E>::iterator i2(i); ++i2;
-					this->erase(i->first);
-					i = i2;
-				} else ++i;
-			} else break;
+		{
+			Mutex::Lock _l(_db_m);
+			for(std::map<std::string,_E>::iterator i(_db.lower_bound(prefix));i!=_db.end();) {
+				if ((i->first.length() >= prefix.length())&&(!memcmp(i->first.data(),prefix.data(),prefix.length()))) {
+					if (!func(i->first,i->second.obj)) {
+						this->_erase(i->first);
+						_db.erase(i++);
+					} else {
+						++i;
+					}
+				} else break;
+			}
 		}
 	}
 
-	inline bool operator==(const JSONDB &db) const { return ((_basePath == db._basePath)&&(_db == db._db)); }
-	inline bool operator!=(const JSONDB &db) const { return (!(*this == db)); }
-
 private:
-	bool _reload(const std::string &p,const std::string &b);
+	void _erase(const std::string &n);
+	void _reload(const std::string &p,const std::string &b);
 	bool _isValidObjectName(const std::string &n);
 	std::string _genPath(const std::string &n,bool create);
 

+ 2 - 1
node/Node.cpp

@@ -490,7 +490,8 @@ int Node::sendUserMessage(void *tptr,uint64_t dest,uint64_t typeId,const void *d
 void Node::setNetconfMaster(void *networkControllerInstance)
 {
 	RR->localNetworkController = reinterpret_cast<NetworkController *>(networkControllerInstance);
-	RR->localNetworkController->init(RR->identity,this);
+	if (networkControllerInstance)
+		RR->localNetworkController->init(RR->identity,this);
 }
 
 ZT_ResultCode Node::circuitTestBegin(void *tptr,ZT_CircuitTest *test,void (*reportCallback)(ZT_Node *,ZT_CircuitTest *,const ZT_CircuitTestReport *))

+ 16 - 26
osdep/Thread.hpp

@@ -46,7 +46,6 @@ class Thread
 {
 public:
 	Thread()
-		throw()
 	{
 		_th = NULL;
 		_tid = 0;
@@ -54,7 +53,6 @@ public:
 
 	template<typename C>
 	static inline Thread start(C *instance)
-		throw(std::runtime_error)
 	{
 		Thread t;
 		t._th = CreateThread(NULL,0,&___zt_threadMain<C>,(LPVOID)instance,0,&t._tid);
@@ -88,7 +86,7 @@ public:
 			CancelSynchronousIo(t._th);
 	}
 
-	inline operator bool() const throw() { return (_th != NULL); }
+	inline operator bool() const { return (_th != NULL); }
 
 private:
 	HANDLE _th;
@@ -123,33 +121,18 @@ class Thread
 {
 public:
 	Thread()
-		throw()
 	{
-		memset(&_tid,0,sizeof(_tid));
-		pthread_attr_init(&_tattr);
-		// This corrects for systems with abnormally small defaults (musl) and also
-		// shrinks the stack on systems with large defaults to save a bit of memory.
-		pthread_attr_setstacksize(&_tattr,ZT_THREAD_MIN_STACK_SIZE);
-		_started = false;
-	}
-
-	~Thread()
-	{
-		pthread_attr_destroy(&_tattr);
+		memset(this,0,sizeof(Thread));
 	}
 
 	Thread(const Thread &t)
-		throw()
 	{
-		memcpy(&_tid,&(t._tid),sizeof(_tid));
-		_started = t._started;
+		memcpy(this,&t,sizeof(Thread));
 	}
 
 	inline Thread &operator=(const Thread &t)
-		throw()
 	{
-		memcpy(&_tid,&(t._tid),sizeof(_tid));
-		_started = t._started;
+		memcpy(this,&t,sizeof(Thread));
 		return *this;
 	}
 
@@ -163,12 +146,20 @@ public:
 	 */
 	template<typename C>
 	static inline Thread start(C *instance)
-		throw(std::runtime_error)
 	{
 		Thread t;
-		t._started = true;
-		if (pthread_create(&t._tid,&t._tattr,&___zt_threadMain<C>,instance))
+		pthread_attr_t tattr;
+		pthread_attr_init(&tattr);
+		// This corrects for systems with abnormally small defaults (musl) and also
+		// shrinks the stack on systems with large defaults to save a bit of memory.
+		pthread_attr_setstacksize(&tattr,ZT_THREAD_MIN_STACK_SIZE);
+		if (pthread_create(&t._tid,&tattr,&___zt_threadMain<C>,instance)) {
+			pthread_attr_destroy(&tattr);
 			throw std::runtime_error("pthread_create() failed, unable to create thread");
+		} else {
+			t._started = true;
+			pthread_attr_destroy(&tattr);
+		}
 		return t;
 	}
 
@@ -190,11 +181,10 @@ public:
 	 */
 	static inline void sleep(unsigned long ms) { usleep(ms * 1000); }
 
-	inline operator bool() const throw() { return (_started); }
+	inline operator bool() const { return (_started); }
 
 private:
 	pthread_t _tid;
-	pthread_attr_t _tattr;
 	volatile bool _started;
 };