Prechádzať zdrojové kódy

Improve multithreading support for OneService (faster, dynamic adjustment of thread count based on HW concurrency).

Adam Ierymenko 6 rokov pred
rodič
commit
90631adb9b
3 zmenil súbory, kde vykonal 87 pridanie a 71 odobranie
  1. 2 0
      one.cpp
  2. 21 2
      osdep/BlockingQueue.hpp
  3. 64 69
      service/OneService.cpp

+ 2 - 0
one.cpp

@@ -1354,12 +1354,14 @@ int main(int argc,char **argv)
 #ifdef __UNIX_LIKE__
 	signal(SIGHUP,&_sighandlerHup);
 	signal(SIGPIPE,SIG_IGN);
+	signal(SIGIO,SIG_IGN);
 	signal(SIGUSR1,SIG_IGN);
 	signal(SIGUSR2,SIG_IGN);
 	signal(SIGALRM,SIG_IGN);
 	signal(SIGINT,&_sighandlerQuit);
 	signal(SIGTERM,&_sighandlerQuit);
 	signal(SIGQUIT,&_sighandlerQuit);
+	signal(SIGINT,&_sighandlerQuit);
 
 	/* Ensure that there are no inherited file descriptors open from a previous
 	 * incarnation. This is a hack to ensure that GitHub issue #61 or variants

+ 21 - 2
osdep/BlockingQueue.hpp

@@ -32,6 +32,8 @@
 #include <condition_variable>
 #include <chrono>
 
+#include "Thread.hpp"
+
 namespace ZeroTier {
 
 /**
@@ -52,6 +54,23 @@ public:
 		c.notify_one();
 	}
 
+	inline void postWait(T t,unsigned long maxQueueSize)
+	{
+		for(;;) {
+			{
+				std::lock_guard<std::mutex> lock(m);
+				if (q.size() < maxQueueSize) {
+					q.push(t);
+					c.notify_one();
+					return;
+				}
+			}
+			if (!r)
+				break;
+			Thread::sleep(1);
+		}
+	}
+
 	inline void stop(void)
 	{
 		std::lock_guard<std::mutex> lock(m);
@@ -98,8 +117,8 @@ public:
 private:
 	volatile bool r;
 	std::queue<T> q;
-	std::mutex m;
-	std::condition_variable c;
+	mutable std::mutex m;
+	mutable std::condition_variable c;
 };
 
 } // namespace ZeroTier

+ 64 - 69
service/OneService.cpp

@@ -60,6 +60,7 @@
 #include "../osdep/PortMapper.hpp"
 #include "../osdep/Binder.hpp"
 #include "../osdep/ManagedRoute.hpp"
+#include "../osdep/BlockingQueue.hpp"
 
 #include "OneService.hpp"
 #include "SoftwareUpdater.hpp"
@@ -174,9 +175,6 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; }
 // TCP activity timeout
 #define ZT_TCP_ACTIVITY_TIMEOUT 60000
 
-// Max number of packet handler threads to start
-#define ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE 16
-
 #if ZT_VAULT_SUPPORT
 size_t curlResponseWrite(void *ptr, size_t size, size_t nmemb, std::string *data)
 {
@@ -440,6 +438,15 @@ struct TcpConnection
 	Mutex writeq_m;
 };
 
+struct OneServiceIncomingPacket
+{
+	uint64_t now;
+	int64_t sock;
+	struct sockaddr_storage from;
+	unsigned int size;
+	uint8_t data[ZT_MAX_MTU];
+};
+
 class OneServiceImpl : public OneService
 {
 public:
@@ -465,17 +472,10 @@ public:
 	unsigned int _tertiaryPort;
 	volatile unsigned int _udpPortPickerCounter;
 
-	unsigned int _incomingPacketThreadPoolSize;
-	struct {
-		uint8_t data[2048];
-		uint64_t now;
-		int64_t sock;
-		struct sockaddr_storage from;
-		int size;
-		std::condition_variable cond;
-		std::mutex lock;
-		std::thread thr;
-	} _incomingPacketWorker[ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE];
+	std::vector<OneServiceIncomingPacket *> _incomingPacketMemoryPool;
+	BlockingQueue<OneServiceIncomingPacket *> _incomingPacketQueue;
+	std::vector<std::thread> _incomingPacketThreads;
+	Mutex _incomingPacketMemoryPoolLock,_incomingPacketThreadsLock;
 
 	// Local configuration and memo-ized information from it
 	json _localConfig;
@@ -606,30 +606,31 @@ public:
 		_ports[1] = 0;
 		_ports[2] = 0;
 
-		_incomingPacketThreadPoolSize = std::max(std::min((unsigned int)std::thread::hardware_concurrency(),(unsigned int)ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE),(unsigned int)1);
-		for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) {
-			const unsigned int tno = tn;
-			_incomingPacketWorker[tn].thr = std::thread([this,tno]() {
-				std::unique_lock<std::mutex> l(_incomingPacketWorker[tno].lock);
+		for(long t=0;t<std::max((long)1,(long)std::thread::hardware_concurrency());++t) {
+			_incomingPacketThreads.push_back(std::thread([this]() {
+				OneServiceIncomingPacket *pkt = nullptr;
 				for(;;) {
-					_incomingPacketWorker[tno].cond.wait(l);
-					const int s = _incomingPacketWorker[tno].size;
-					if (s < 0) {
+					if (!_incomingPacketQueue.get(pkt))
+						break;
+					if (!pkt)
+						break;
+
+					const ZT_ResultCode rc = _node->processWirePacket(nullptr,pkt->now,pkt->sock,&(pkt->from),pkt->data,pkt->size,&_nextBackgroundTaskDeadline);
+					{
+						Mutex::Lock l(_incomingPacketMemoryPoolLock);
+						_incomingPacketMemoryPool.push_back(pkt);
+					}
+					if (ZT_ResultCode_isFatal(rc)) {
+						char tmp[256];
+						OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
+						Mutex::Lock _l(_termReason_m);
+						_termReason = ONE_UNRECOVERABLE_ERROR;
+						_fatalErrorMessage = tmp;
+						this->terminate();
 						break;
-					} else if (s > 0) {
-						const ZT_ResultCode rc = _node->processWirePacket(nullptr,_incomingPacketWorker[tno].now,_incomingPacketWorker[tno].sock,&(_incomingPacketWorker[tno].from),_incomingPacketWorker[tno].data,(unsigned int)s,&_nextBackgroundTaskDeadline);
-						if (ZT_ResultCode_isFatal(rc)) {
-							char tmp[256];
-							OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
-							Mutex::Lock _l(_termReason_m);
-							_termReason = ONE_UNRECOVERABLE_ERROR;
-							_fatalErrorMessage = tmp;
-							this->terminate();
-							break;
-						}
 					}
 				}
-			});
+			}));
 		}
 
 #if ZT_VAULT_SUPPORT
@@ -639,22 +640,27 @@ public:
 
 	virtual ~OneServiceImpl()
 	{
-		for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) {
-			_incomingPacketWorker[tn].lock.lock();
-			_incomingPacketWorker[tn].size = -1;
-			_incomingPacketWorker[tn].lock.unlock();
-			_incomingPacketWorker[tn].cond.notify_all();
-		}
-		for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) {
-			_incomingPacketWorker[tn].thr.join();
-		}
+		_incomingPacketQueue.stop();
+		_incomingPacketThreadsLock.lock();
+		for(auto t=_incomingPacketThreads.begin();t!=_incomingPacketThreads.end();++t)
+			t->join();
+		_incomingPacketThreadsLock.unlock();
+
 		_binder.closeAll(_phy);
 		_phy.close(_localControlSocket4);
 		_phy.close(_localControlSocket6);
+
 #if ZT_VAULT_SUPPORT
 		curl_global_cleanup();
 #endif
 
+		_incomingPacketMemoryPoolLock.lock();
+		while (!_incomingPacketMemoryPool.empty()) {
+			delete _incomingPacketMemoryPool.back();
+			_incomingPacketMemoryPool.pop_back();
+		}
+		_incomingPacketMemoryPoolLock.unlock();
+
 #ifdef ZT_USE_MINIUPNPC
 		delete _portMapper;
 #endif
@@ -1896,34 +1902,23 @@ public:
 		if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
 			_lastDirectReceiveFromGlobal = now;
 
-		/* Pick worker thread by checksumming the from address. This avoids thread
-		 * scheduling caused packet re-ordering by binding each individual remote
-		 * peer to a specific thread. It will block globally if that thread is blocked,
-		 * so this is not an optimal implementation from the perspective of perfect
-		 * thread utilization. Nevertheless using threads this way does greatly
-		 * improve performance in heavy load multi-peer scenarios and does so with
-		 * little impact on simpler scenarios due to its extreme simplicity. */
-		uint8_t cksum = 0;
-		switch(from->sa_family) {
-			case AF_INET:
-				for(unsigned int i=0;i<4;++i)
-					cksum += ((const uint8_t *)(&(((const struct sockaddr_in *)from)->sin_addr.s_addr)))[i];
-				break;
-			case AF_INET6:
-				for(unsigned int i=0;i<16;++i)
-					cksum += ((const struct sockaddr_in6 *)from)->sin6_addr.s6_addr[i];
-				break;
+		OneServiceIncomingPacket *pkt;
+		_incomingPacketMemoryPoolLock.lock();
+		if (_incomingPacketMemoryPool.empty()) {
+			pkt = new OneServiceIncomingPacket;
+		} else {
+			pkt = _incomingPacketMemoryPool.back();
+			_incomingPacketMemoryPool.pop_back();
 		}
-		const unsigned int tn = cksum % _incomingPacketThreadPoolSize;
+		_incomingPacketMemoryPoolLock.unlock();
+
+		pkt->now = now;
+		pkt->sock = reinterpret_cast<int64_t>(sock);
+		ZT_FAST_MEMCPY(&(pkt->from),from,sizeof(struct sockaddr_storage));
+		pkt->size = (unsigned int)len;
+		ZT_FAST_MEMCPY(pkt->data,data,len);
 
-		_incomingPacketWorker[tn].lock.lock();
-		ZT_FAST_MEMCPY(_incomingPacketWorker[tn].data,data,len);
-		_incomingPacketWorker[tn].now = now;
-		_incomingPacketWorker[tn].sock = reinterpret_cast<int64_t>(sock);
-		ZT_FAST_MEMCPY(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage));
-		_incomingPacketWorker[tn].size = (int)len;
-		_incomingPacketWorker[tn].cond.notify_one();
-		_incomingPacketWorker[tn].lock.unlock();
+		_incomingPacketQueue.postWait(pkt,64);
 	}
 
 	inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)