Procházet zdrojové kódy

More Mac tap improvements and threading efficiency improvements.

Adam Ierymenko před 7 roky
rodič
revize
6684559cd9

+ 13 - 13
osdep/BlockingQueue.hpp

@@ -54,20 +54,16 @@ public:
 		c.notify_one();
 	}
 
-	inline void postWait(T t,unsigned long maxQueueSize)
+	inline void postLimit(T t,const unsigned long limit)
 	{
+		std::unique_lock<std::mutex> lock(m);
 		for(;;) {
-			{
-				std::lock_guard<std::mutex> lock(m);
-				if (q.size() < maxQueueSize) {
-					q.push(t);
-					c.notify_one();
-					return;
-				}
-			}
-			if (!r)
+			if (q.size() < limit) {
+				q.push(t);
+				c.notify_one();
 				break;
-			Thread::sleep(1);
+			}
+			gc.wait(lock);
 		}
 	}
 
@@ -84,10 +80,14 @@ public:
 		if (!r) return false;
 		while (q.empty()) {
 			c.wait(lock);
-			if (!r) return false;
+			if (!r) {
+				gc.notify_all();
+				return false;
+			}
 		}
 		value = q.front();
 		q.pop();
+		gc.notify_all();
 		return true;
 	}
 
@@ -118,7 +118,7 @@ private:
 	volatile bool r;
 	std::queue<T> q;
 	mutable std::mutex m;
-	mutable std::condition_variable c;
+	mutable std::condition_variable c,gc;
 };
 
 } // namespace ZeroTier

+ 2 - 2
osdep/MacEthernetTap.cpp

@@ -287,7 +287,9 @@ void MacEthernetTap::put(const MAC &from,const MAC &to,unsigned int etherType,co
 		iov[1].iov_len = 15;
 		iov[2].iov_base = const_cast<void *>(data);
 		iov[2].iov_len = len;
+		_putLock.lock();
 		writev(_agentStdin,iov,3);
+		_putLock.unlock();
 	}
 }
 
@@ -396,8 +398,6 @@ void MacEthernetTap::threadMain()
 						break;
 					}
 				}
-			} else {
-				break;
 			}
 		}
 		if (FD_ISSET(_agentStderr,&readfds)) {

+ 2 - 0
osdep/MacEthernetTap.hpp

@@ -38,6 +38,7 @@
 #include "../node/MAC.hpp"
 #include "../node/InetAddress.hpp"
 #include "../node/MulticastGroup.hpp"
+#include "../node/Mutex.hpp"
 
 #include "Thread.hpp"
 
@@ -80,6 +81,7 @@ private:
 	std::string _homePath;
 	std::string _dev;
 	std::vector<MulticastGroup> _multicastGroups;
+	Mutex _putLock;
 	unsigned int _mtu;
 	unsigned int _metric;
 	int _shutdownSignalPipe[2];

+ 2 - 9
osdep/MacEthernetTapAgent.c

@@ -104,8 +104,8 @@
 
 #define P_IFCONFIG "/sbin/ifconfig"
 
-static unsigned char s_pktReadBuf[524288] __attribute__ ((__aligned__(16)));
-static unsigned char s_stdinReadBuf[524288] __attribute__ ((__aligned__(16)));
+static unsigned char s_pktReadBuf[262144] __attribute__ ((__aligned__(16)));
+static unsigned char s_stdinReadBuf[262144] __attribute__ ((__aligned__(16)));
 static char s_deviceName[IFNAMSIZ];
 static char s_peerDeviceName[IFNAMSIZ];
 static int s_bpffd = -1;
@@ -322,9 +322,6 @@ int main(int argc,char **argv)
 		return ZT_MACETHERNETTAPAGENT_EXIT_CODE_UNABLE_TO_CREATE;
 	}
 
-	fcntl(s_ndrvfd,F_SETFL,fcntl(s_ndrvfd,F_GETFL)|O_NONBLOCK);
-	fcntl(s_bpffd,F_SETFL,fcntl(s_bpffd,F_GETFL)|O_NONBLOCK);
-
 	fprintf(stderr,"I %s %s %d.%d.%d.%d\n",s_deviceName,s_peerDeviceName,ZEROTIER_ONE_VERSION_MAJOR,ZEROTIER_ONE_VERSION_MINOR,ZEROTIER_ONE_VERSION_REVISION,ZEROTIER_ONE_VERSION_BUILD);
 
 	FD_ZERO(&rfds);
@@ -357,8 +354,6 @@ int main(int argc,char **argv)
 					}
 					p += BPF_WORDALIGN(h->bh_hdrlen + h->bh_caplen);
 				}
-			} else {
-				return ZT_MACETHERNETTAPAGENT_EXIT_CODE_READ_ERROR;
 			}
 		}
 
@@ -431,8 +426,6 @@ int main(int argc,char **argv)
 						break;
 					}
 				}
-			} else {
-				return ZT_MACETHERNETTAPAGENT_EXIT_CODE_READ_ERROR;
 			}
 		}
 	}

+ 4 - 2
service/OneService.cpp

@@ -472,6 +472,7 @@ public:
 	unsigned int _tertiaryPort;
 	volatile unsigned int _udpPortPickerCounter;
 
+	unsigned long _incomingPacketConcurrency;
 	std::vector<OneServiceIncomingPacket *> _incomingPacketMemoryPool;
 	BlockingQueue<OneServiceIncomingPacket *> _incomingPacketQueue;
 	std::vector<std::thread> _incomingPacketThreads;
@@ -606,7 +607,8 @@ public:
 		_ports[1] = 0;
 		_ports[2] = 0;
 
-		for(long t=0;t<std::max((long)1,(long)std::thread::hardware_concurrency());++t) {
+		_incomingPacketConcurrency = std::max((unsigned long)1,std::min((unsigned long)16,(unsigned long)std::thread::hardware_concurrency()));
+		for(long t=0;t<_incomingPacketConcurrency;++t) {
 			_incomingPacketThreads.push_back(std::thread([this]() {
 				OneServiceIncomingPacket *pkt = nullptr;
 				for(;;) {
@@ -1918,7 +1920,7 @@ public:
 		pkt->size = (unsigned int)len;
 		ZT_FAST_MEMCPY(pkt->data,data,len);
 
-		_incomingPacketQueue.postWait(pkt,64);
+		_incomingPacketQueue.postLimit(pkt,16 * _incomingPacketConcurrency);
 	}
 
 	inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)