Browse Source

Make incoming packet processor thread pool dynamic based on core count.

Adam Ierymenko 6 years ago
parent
commit
3b6b1d1674
1 changed files with 34 additions and 37 deletions
  1. 34 37
      service/OneService.cpp

+ 34 - 37
service/OneService.cpp

@@ -174,8 +174,8 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; }
 // TCP activity timeout
 #define ZT_TCP_ACTIVITY_TIMEOUT 60000
 
-// Number of receive path threads to start
-#define ZT_INCOMING_PACKET_THREAD_POOL_SIZE 8
+// Max number of packet handler threads to start
+#define ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE 16
 
 #if ZT_VAULT_SUPPORT
 size_t curlResponseWrite(void *ptr, size_t size, size_t nmemb, std::string *data)
@@ -465,7 +465,7 @@ public:
 	unsigned int _tertiaryPort;
 	volatile unsigned int _udpPortPickerCounter;
 
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
+	unsigned int _incomingPacketThreadPoolSize;
 	struct {
 		uint8_t data[2048];
 		std::thread thr;
@@ -474,8 +474,7 @@ public:
 		int size;
 		std::condition_variable cond;
 		std::mutex lock;
-	} _incomingPacketWorker[ZT_INCOMING_PACKET_THREAD_POOL_SIZE];
-#endif
+	} _incomingPacketWorker[ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE];
 
 	// Local configuration and memo-ized information from it
 	json _localConfig;
@@ -606,8 +605,8 @@ public:
 		_ports[1] = 0;
 		_ports[2] = 0;
 
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
-		for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
+		_incomingPacketThreadPoolSize = std::max(std::min((unsigned int)std::thread::hardware_concurrency(),(unsigned int)ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE),(unsigned int)1);
+		for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) {
 			_incomingPacketWorker[tn].thr = std::thread([this,tn]() {
 				std::unique_lock<std::mutex> l(_incomingPacketWorker[tn].lock);
 				for(;;) {
@@ -636,7 +635,6 @@ public:
 				}
 			});
 		}
-#endif
 
 #if ZT_VAULT_SUPPORT
 		curl_global_init(CURL_GLOBAL_DEFAULT);
@@ -645,17 +643,15 @@ public:
 
 	virtual ~OneServiceImpl()
 	{
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
-		for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
+		for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) {
 			_incomingPacketWorker[tn].lock.lock();
 			_incomingPacketWorker[tn].size = -1;
 			_incomingPacketWorker[tn].lock.unlock();
 			_incomingPacketWorker[tn].cond.notify_all();
 		}
-		for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
+		for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) {
 			_incomingPacketWorker[tn].thr.join();
 		}
-#endif
 		_binder.closeAll(_phy);
 		_phy.close(_localControlSocket4);
 		_phy.close(_localControlSocket6);
@@ -1902,37 +1898,38 @@ public:
 	{
 		if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
 			_lastDirectReceiveFromGlobal = OSUtils::now();
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
-		unsigned long cksum = 0;
-		for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i) {
-			cksum += ((uint8_t *)from)[i];
+
+		/* Pick worker thread by checksumming the from address. This avoids thread
+		 * scheduling caused packet re-ordering by binding each individual remote
+		 * peer to a specific thread. It will block globally if that thread is blocked,
+		 * so this is not an optimal implementation from the perspective of perfect
+		 * thread utilization. Nevertheless using threads this way does greatly
+		 * improve performance in heavy load multi-peer scenarios and does so with
+		 * little impact on simpler scenarios due to its extreme simplicity. */
+		uint8_t cksum = 0;
+		switch(from->sa_family) {
+			case AF_INET:
+				for(unsigned int i=0;i<4;++i)
+					cksum += ((const uint8_t *)(&(((const struct sockaddr_in *)from)->sin_addr.s_addr)))[i];
+				break;
+			case AF_INET6:
+				for(unsigned int i=0;i<16;++i)
+					cksum += ((const struct sockaddr_in6 *)from)->sin6_addr.s6_addr[i];
+				break;
+			default:
+				for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i)
+					cksum += ((uint8_t *)from)[i];
+				break;
 		}
-		const unsigned long tn = cksum % ZT_INCOMING_PACKET_THREAD_POOL_SIZE;
+		const unsigned int tn = cksum % _incomingPacketThreadPoolSize;
+
 		_incomingPacketWorker[tn].lock.lock();
-		memcpy(_incomingPacketWorker[tn].data,data,len);
+		ZT_FAST_MEMCPY(_incomingPacketWorker[tn].data,data,len);
 		_incomingPacketWorker[tn].sock = reinterpret_cast<int64_t>(sock);
-		memcpy(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage));
+		ZT_FAST_MEMCPY(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage));
 		_incomingPacketWorker[tn].size = (int)len;
 		_incomingPacketWorker[tn].lock.unlock();
 		_incomingPacketWorker[tn].cond.notify_all();
-#else
-		const ZT_ResultCode rc = _node->processWirePacket(
-			(void *)0,
-			OSUtils::now(),
-			reinterpret_cast<int64_t>(sock),
-			reinterpret_cast<const struct sockaddr_storage *>(from), // Phy<> uses sockaddr_storage, so it'll always be that big
-			data,
-			len,
-			&_nextBackgroundTaskDeadline);
-		if (ZT_ResultCode_isFatal(rc)) {
-			char tmp[256];
-			OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
-			Mutex::Lock _l(_termReason_m);
-			_termReason = ONE_UNRECOVERABLE_ERROR;
-			_fatalErrorMessage = tmp;
-			this->terminate();
-		}
-#endif
 	}
 
 	inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)