Browse Source

Remove current multithreaded receive path, which is not that efficient. We will do something better in the future.

Adam Ierymenko 6 years ago
parent
commit
bb0808c99c
1 changed files with 12 additions and 73 deletions
  1. 12 73
      service/OneService.cpp

+ 12 - 73
service/OneService.cpp

@@ -147,6 +147,10 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; }
 
 
 #endif // ZT_USE_TEST_TAP
 #endif // ZT_USE_TEST_TAP
 
 
+#ifndef ZT_SOFTWARE_UPDATE_DEFAULT
+#define ZT_SOFTWARE_UPDATE_DEFAULT "disable"
+#endif
+
 // Sanity limits for HTTP
 // Sanity limits for HTTP
 #define ZT_MAX_HTTP_MESSAGE_SIZE (1024 * 1024 * 64)
 #define ZT_MAX_HTTP_MESSAGE_SIZE (1024 * 1024 * 64)
 #define ZT_MAX_HTTP_CONNECTIONS 65536
 #define ZT_MAX_HTTP_CONNECTIONS 65536
@@ -477,12 +481,6 @@ public:
 	unsigned int _tertiaryPort;
 	unsigned int _tertiaryPort;
 	volatile unsigned int _udpPortPickerCounter;
 	volatile unsigned int _udpPortPickerCounter;
 
 
-	unsigned long _incomingPacketConcurrency;
-	std::vector<OneServiceIncomingPacket *> _incomingPacketMemoryPool;
-	BlockingQueue<OneServiceIncomingPacket *> _incomingPacketQueue;
-	std::vector<std::thread> _incomingPacketThreads;
-	Mutex _incomingPacketMemoryPoolLock,_incomingPacketThreadsLock;
-
 	// Local configuration and memo-ized information from it
 	// Local configuration and memo-ized information from it
 	json _localConfig;
 	json _localConfig;
 	Hashtable< uint64_t,std::vector<InetAddress> > _v4Hints;
 	Hashtable< uint64_t,std::vector<InetAddress> > _v4Hints;
@@ -615,43 +613,6 @@ public:
 		_ports[1] = 0;
 		_ports[1] = 0;
 		_ports[2] = 0;
 		_ports[2] = 0;
 
 
-		_incomingPacketConcurrency = std::max((unsigned long)1,std::min((unsigned long)16,(unsigned long)std::thread::hardware_concurrency()));
-		char *envPool = std::getenv("INCOMING_PACKET_CONCURRENCY");
-		if (envPool != NULL) {
-			int tmp = atoi(envPool);
-			if (tmp > 0) {
-				_incomingPacketConcurrency = tmp;
-			}
-		}
-		for(unsigned long t=0;t<_incomingPacketConcurrency;++t) {
-			_incomingPacketThreads.push_back(std::thread([this]() {
-				OneServiceIncomingPacket *pkt = nullptr;
-				for(;;) {
-					if (!_incomingPacketQueue.get(pkt))
-						break;
-					if (!pkt)
-						break;
-					if (!_run)
-						break;
-
-					const ZT_ResultCode rc = _node->processWirePacket(nullptr,pkt->now,pkt->sock,&(pkt->from),pkt->data,pkt->size,&_nextBackgroundTaskDeadline);
-					{
-						Mutex::Lock l(_incomingPacketMemoryPoolLock);
-						_incomingPacketMemoryPool.push_back(pkt);
-					}
-					if (ZT_ResultCode_isFatal(rc)) {
-						char tmp[256];
-						OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
-						Mutex::Lock _l(_termReason_m);
-						_termReason = ONE_UNRECOVERABLE_ERROR;
-						_fatalErrorMessage = tmp;
-						this->terminate();
-						break;
-					}
-				}
-			}));
-		}
-
 #if ZT_VAULT_SUPPORT
 #if ZT_VAULT_SUPPORT
 		curl_global_init(CURL_GLOBAL_DEFAULT);
 		curl_global_init(CURL_GLOBAL_DEFAULT);
 #endif
 #endif
@@ -659,12 +620,6 @@ public:
 
 
 	virtual ~OneServiceImpl()
 	virtual ~OneServiceImpl()
 	{
 	{
-		_incomingPacketQueue.stop();
-		_incomingPacketThreadsLock.lock();
-		for(auto t=_incomingPacketThreads.begin();t!=_incomingPacketThreads.end();++t)
-			t->join();
-		_incomingPacketThreadsLock.unlock();
-
 		_binder.closeAll(_phy);
 		_binder.closeAll(_phy);
 		_phy.close(_localControlSocket4);
 		_phy.close(_localControlSocket4);
 		_phy.close(_localControlSocket6);
 		_phy.close(_localControlSocket6);
@@ -673,13 +628,6 @@ public:
 		curl_global_cleanup();
 		curl_global_cleanup();
 #endif
 #endif
 
 
-		_incomingPacketMemoryPoolLock.lock();
-		while (!_incomingPacketMemoryPool.empty()) {
-			delete _incomingPacketMemoryPool.back();
-			_incomingPacketMemoryPool.pop_back();
-		}
-		_incomingPacketMemoryPoolLock.unlock();
-
 #ifdef ZT_USE_MINIUPNPC
 #ifdef ZT_USE_MINIUPNPC
 		delete _portMapper;
 		delete _portMapper;
 #endif
 #endif
@@ -1917,24 +1865,15 @@ public:
 		const uint64_t now = OSUtils::now();
 		const uint64_t now = OSUtils::now();
 		if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
 		if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
 			_lastDirectReceiveFromGlobal = now;
 			_lastDirectReceiveFromGlobal = now;
-
-		OneServiceIncomingPacket *pkt;
-		_incomingPacketMemoryPoolLock.lock();
-		if (_incomingPacketMemoryPool.empty()) {
-			pkt = new OneServiceIncomingPacket;
-		} else {
-			pkt = _incomingPacketMemoryPool.back();
-			_incomingPacketMemoryPool.pop_back();
+		const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast<int64_t>(sock),reinterpret_cast<const struct sockaddr_storage *>(from),data,len,&_nextBackgroundTaskDeadline);
+		if (ZT_ResultCode_isFatal(rc)) {
+			char tmp[256];
+			OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
+			Mutex::Lock _l(_termReason_m);
+			_termReason = ONE_UNRECOVERABLE_ERROR;
+			_fatalErrorMessage = tmp;
+			this->terminate();
 		}
 		}
-		_incomingPacketMemoryPoolLock.unlock();
-
-		pkt->now = now;
-		pkt->sock = reinterpret_cast<int64_t>(sock);
-		memcpy(&(pkt->from),from,sizeof(struct sockaddr_storage));
-		pkt->size = (unsigned int)len;
-		memcpy(pkt->data,data,len);
-
-		_incomingPacketQueue.postLimit(pkt,16 * _incomingPacketConcurrency);
 	}
 	}
 
 
 	inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)
 	inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)