Browse Source

Cleanup and a minor performance improvement.

Adam Ierymenko 6 years ago
parent
commit
f6450cd7e1
1 changed files with 14 additions and 20 deletions
  1. 14 20
      service/OneService.cpp

+ 14 - 20
service/OneService.cpp

@@ -468,12 +468,13 @@ public:
 	unsigned int _incomingPacketThreadPoolSize;
 	struct {
 		uint8_t data[2048];
-		std::thread thr;
+		uint64_t now;
 		int64_t sock;
 		struct sockaddr_storage from;
 		int size;
 		std::condition_variable cond;
 		std::mutex lock;
+		std::thread thr;
 	} _incomingPacketWorker[ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE];
 
 	// Local configuration and memo-ized information from it
@@ -607,21 +608,16 @@ public:
 
 		_incomingPacketThreadPoolSize = std::max(std::min((unsigned int)std::thread::hardware_concurrency(),(unsigned int)ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE),(unsigned int)1);
 		for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) {
-			_incomingPacketWorker[tn].thr = std::thread([this,tn]() {
-				std::unique_lock<std::mutex> l(_incomingPacketWorker[tn].lock);
+			const unsigned int tno = tn;
+			_incomingPacketWorker[tn].thr = std::thread([this,tno]() {
+				std::unique_lock<std::mutex> l(_incomingPacketWorker[tno].lock);
 				for(;;) {
-					_incomingPacketWorker[tn].cond.wait(l);
-					if (_incomingPacketWorker[tn].size < 0) {
+					_incomingPacketWorker[tno].cond.wait(l);
+					const int s = _incomingPacketWorker[tno].size;
+					if (s < 0) {
 						break;
-					} else if (_incomingPacketWorker[tn].size > 0) {
-						const ZT_ResultCode rc = _node->processWirePacket(
-							(void *)0,
-							OSUtils::now(),
-							_incomingPacketWorker[tn].sock,
-							&(_incomingPacketWorker[tn].from),
-							_incomingPacketWorker[tn].data,
-							(unsigned int)_incomingPacketWorker[tn].size,
-							&_nextBackgroundTaskDeadline);
+					} else if (s > 0) {
+						const ZT_ResultCode rc = _node->processWirePacket(nullptr,_incomingPacketWorker[tno].now,_incomingPacketWorker[tno].sock,&(_incomingPacketWorker[tno].from),_incomingPacketWorker[tno].data,(unsigned int)s,&_nextBackgroundTaskDeadline);
 						if (ZT_ResultCode_isFatal(rc)) {
 							char tmp[256];
 							OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
@@ -1896,8 +1892,9 @@ public:
 
 	inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len)
 	{
+		const uint64_t now = OSUtils::now();
 		if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
-			_lastDirectReceiveFromGlobal = OSUtils::now();
+			_lastDirectReceiveFromGlobal = now;
 
 		/* Pick worker thread by checksumming the from address. This avoids thread
 		 * scheduling caused packet re-ordering by binding each individual remote
@@ -1916,20 +1913,17 @@ public:
 				for(unsigned int i=0;i<16;++i)
 					cksum += ((const struct sockaddr_in6 *)from)->sin6_addr.s6_addr[i];
 				break;
-			default:
-				for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i)
-					cksum += ((uint8_t *)from)[i];
-				break;
 		}
 		const unsigned int tn = cksum % _incomingPacketThreadPoolSize;
 
 		_incomingPacketWorker[tn].lock.lock();
 		ZT_FAST_MEMCPY(_incomingPacketWorker[tn].data,data,len);
+		_incomingPacketWorker[tn].now = now;
 		_incomingPacketWorker[tn].sock = reinterpret_cast<int64_t>(sock);
 		ZT_FAST_MEMCPY(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage));
 		_incomingPacketWorker[tn].size = (int)len;
+		_incomingPacketWorker[tn].cond.notify_one();
 		_incomingPacketWorker[tn].lock.unlock();
-		_incomingPacketWorker[tn].cond.notify_all();
 	}
 
 	inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)