Browse Source

Split Linux tap receive into two threads: one reader, one processor.

Adam Ierymenko 4 năm trước cách đây
mục cha
commit
2da162bed7
3 tập tin đã thay đổi với 171 bổ sung114 xóa
  1. 17 5
      osdep/BlockingQueue.hpp
  2. 145 103
      osdep/LinuxEthernetTap.cpp
  3. 9 6
      osdep/LinuxEthernetTap.hpp

+ 17 - 5
osdep/BlockingQueue.hpp

@@ -18,8 +18,8 @@
 #include <mutex>
 #include <condition_variable>
 #include <chrono>
-
-#include "Thread.hpp"
+#include <atomic>
+#include <vector>
 
 namespace ZeroTier {
 
@@ -67,7 +67,8 @@ public:
 	inline bool get(T &value)
 	{
 		std::unique_lock<std::mutex> lock(m);
-		if (!r) return false;
+		if (!r)
+			return false;
 		while (q.empty()) {
 			c.wait(lock);
 			if (!r) {
@@ -81,6 +82,16 @@ public:
 		return true;
 	}
 
+	inline std::vector<T> drain()
+	{
+		std::vector<T> v;
+		while (!q.empty()) {
+			v.push_back(q.front());
+			q.pop();
+		}
+		return v;
+	}
+
 	enum TimedWaitResult
 	{
 		OK,
@@ -92,7 +103,8 @@ public:
 	{
 		const std::chrono::milliseconds ms2{ms};
 		std::unique_lock<std::mutex> lock(m);
-		if (!r) return STOP;
+		if (!r)
+			return STOP;
 		while (q.empty()) {
 			if (c.wait_for(lock,ms2) == std::cv_status::timeout)
 				return ((r) ? TIMED_OUT : STOP);
@@ -105,10 +117,10 @@ public:
 	}
 
 private:
-	volatile bool r;
 	std::queue<T> q;
 	mutable std::mutex m;
 	mutable std::condition_variable c,gc;
+	std::atomic_bool r;
 };
 
 } // namespace ZeroTier

+ 145 - 103
osdep/LinuxEthernetTap.cpp

@@ -11,6 +11,10 @@
  */
 /****/
 
+#ifdef __GNUC__
+#pragma GCC diagnostic ignored "-Wrestrict"
+#endif
+
 #include "../node/Constants.hpp"
 
 #ifdef __LINUX__
@@ -52,6 +56,8 @@
 #define IFNAMSIZ 16
 #endif
 
+#define ZT_TAP_BUF_SIZE 16384
+
 // ff:ff:ff:ff:ff:ff with no ADI
 static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC(0xff),0);
 
@@ -177,16 +183,153 @@ LinuxEthernetTap::LinuxEthernetTap(
 
 	(void)::pipe(_shutdownSignalPipe);
 
-	_thread = Thread::start(this);
+	_tapReaderThread = std::thread([this]{
+		fd_set readfds,nullfds;
+		int n,nfds,r;
+		void *buf = nullptr;
+
+		usleep(100000);
+		{
+			struct ifreq ifr;
+			memset(&ifr,0,sizeof(ifr));
+
+			strcpy(ifr.ifr_name,_dev.c_str());
+
+			const int sock = socket(AF_INET,SOCK_DGRAM,0);
+			if (sock <= 0)
+				return;
+
+			if (ioctl(sock,SIOCGIFFLAGS,(void *)&ifr) < 0) {
+				::close(sock);
+				printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n");
+				return;
+			}
+			ifr.ifr_flags |= IFF_UP;
+			if (ioctl(sock,SIOCSIFFLAGS,(void *)&ifr) < 0) {
+				::close(sock);
+				printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n");
+				return;
+			}
+
+			usleep(500000);
+
+			ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER;
+			_mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6);
+			if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) {
+				::close(sock);
+				printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n");
+				return;
+			}
+
+			ifr.ifr_ifru.ifru_mtu = (int)_mtu;
+			if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) {
+				::close(sock);
+				printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n");
+				return;
+			}
+
+			fcntl(_fd,F_SETFL,O_NONBLOCK);
+
+			::close(sock);
+		}
+
+		FD_ZERO(&readfds);
+		FD_ZERO(&nullfds);
+		nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1;
+
+		r = 0;
+		for(;;) {
+			FD_SET(_shutdownSignalPipe[0],&readfds);
+			FD_SET(_fd,&readfds);
+			select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);
+
+			if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread
+				break;
+
+			if (FD_ISSET(_fd,&readfds)) {
+				for(int x=0;x<64;++x) {
+					if (!buf) {
+						std::lock_guard<std::mutex> l(_buffers_l);
+						if (_buffers.empty()) {
+							buf = malloc(ZT_TAP_BUF_SIZE);
+							if (!buf)
+								break;
+						} else {
+							buf = _buffers.back();
+							_buffers.pop_back();
+						}
+					}
+
+					n = (int)::read(_fd,reinterpret_cast<uint8_t *>(buf) + r,ZT_TAP_BUF_SIZE - r);
+					if (n < 0) {
+						break;
+					} else {
+						// Some tap drivers like to send the ethernet frame and the
+						// payload in two chunks, so handle that by accumulating
+						// data until we have at least a frame.
+						r += n;
+						if (r > 14) {
+							if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms
+								r = _mtu + 14;
+
+							if (_enabled) {
+								_tapq.post(std::pair<void *,int>(buf,r));
+								buf = nullptr;
+								/*
+								to.setTo(getBuf,6);
+								from.setTo(getBuf + 6,6);
+								unsigned int etherType = ntohs(((const uint16_t *)getBuf)[6]);
+								// TODO: VLAN support
+								_handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(getBuf + 14),r - 14);
+								*/
+							}
+
+							r = 0;
+						}
+					}
+				}
+			}
+		}
+	});
+
+	_tapProcessorThread = std::thread([this] {
+		MAC to,from;
+		std::pair<void *,int> qi;
+		while (_tapq.get(qi)) {
+			uint8_t *const b = reinterpret_cast<uint8_t *>(qi.first);
+			if (b) {
+				to.setTo(b, 6);
+				from.setTo(b + 6, 6);
+				unsigned int etherType = Utils::ntoh(((const uint16_t *)b)[6]);
+				_handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void *)(b + 14),(unsigned int)(qi.second - 14));
+				{
+					std::lock_guard<std::mutex> l(_buffers_l);
+					_buffers.push_back(qi.first);
+				}
+			} else break;
+		}
+	});
 }
 
 LinuxEthernetTap::~LinuxEthernetTap()
 {
 	(void)::write(_shutdownSignalPipe[1],"\0",1); // causes thread to exit
-	Thread::join(_thread);
+	_tapq.post(std::pair<void *,int>(nullptr,0));
+
 	::close(_fd);
 	::close(_shutdownSignalPipe[0]);
 	::close(_shutdownSignalPipe[1]);
+
+	_tapReaderThread.join();
+	_tapProcessorThread.join();
+
+	for(std::vector<void *>::iterator i(_buffers.begin());i!=_buffers.end();++i)
+		free(*i);
+	std::vector< std::pair<void *,int> > dv(_tapq.drain());
+	for(std::vector< std::pair<void *,int> >::iterator i(dv.begin());i!=dv.end();++i) {
+		if (i->first)
+			free(i->first);
+	}
 }
 
 void LinuxEthernetTap::setEnabled(bool en)
@@ -400,107 +543,6 @@ void LinuxEthernetTap::setMtu(unsigned int mtu)
 	}
 }
 
-void LinuxEthernetTap::threadMain()
-	throw()
-{
-	fd_set readfds,nullfds;
-	MAC to,from;
-	int n,nfds,r;
-	char getBuf[ZT_MAX_MTU + 64];
-
-	Thread::sleep(100);
-
-	{
-		struct ifreq ifr;
-		memset(&ifr,0,sizeof(ifr));
-
-		strcpy(ifr.ifr_name,_dev.c_str());
-
-		const int sock = socket(AF_INET,SOCK_DGRAM,0);
-		if (sock <= 0)
-			return;
-
-		if (ioctl(sock,SIOCGIFFLAGS,(void *)&ifr) < 0) {
-			::close(sock);
-			printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n");
-			return;
-		}
-		ifr.ifr_flags |= IFF_UP;
-		if (ioctl(sock,SIOCSIFFLAGS,(void *)&ifr) < 0) {
-			::close(sock);
-			printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n");
-			return;
-		}
-
-		Thread::sleep(500);
-
-		ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER;
-		_mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6);
-		if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) {
-			::close(sock);
-			printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n");
-			return;
-		}
-
-		ifr.ifr_ifru.ifru_mtu = (int)_mtu;
-		if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) {
-			::close(sock);
-			printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n");
-			return;
-		}
-
-		if (fcntl(_fd,F_SETFL,O_NONBLOCK) == -1) {
-			::close(sock);
-			printf("WARNING: ioctl() failed setting up Linux tap device (set non-blocking)\n");
-			return;
-		}
-
-		::close(sock);
-	}
-
-	FD_ZERO(&readfds);
-	FD_ZERO(&nullfds);
-	nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1;
-
-	r = 0;
-	for(;;) {
-		FD_SET(_shutdownSignalPipe[0],&readfds);
-		FD_SET(_fd,&readfds);
-		select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);
-
-		if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread
-			break;
-
-		if (FD_ISSET(_fd,&readfds)) {
-			for(int x=0;x<64;++x) {
-				n = (int)::read(_fd,getBuf + r,sizeof(getBuf) - r);
-				if (n < 0) {
-					break;
-				} else {
-					// Some tap drivers like to send the ethernet frame and the
-					// payload in two chunks, so handle that by accumulating
-					// data until we have at least a frame.
-					r += n;
-					if (r > 14) {
-						if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms
-							r = _mtu + 14;
-
-						if (_enabled) {
-							to.setTo(getBuf,6);
-							from.setTo(getBuf + 6,6);
-							unsigned int etherType = ntohs(((const uint16_t *)getBuf)[6]);
-							// TODO: VLAN support
-							_handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(getBuf + 14),r - 14);
-						}
-
-						r = 0;
-					}
-				}
-			}
-		}
-	}
-}
-
 } // namespace ZeroTier
 
 #endif // __LINUX__

+ 9 - 6
osdep/LinuxEthernetTap.hpp

@@ -21,10 +21,12 @@
 #include <vector>
 #include <stdexcept>
 #include <atomic>
-
+#include <array>
+#include <thread>
+#include <mutex>
 #include "../node/MulticastGroup.hpp"
-#include "Thread.hpp"
 #include "EthernetTap.hpp"
+#include "BlockingQueue.hpp"
 
 namespace ZeroTier {
 
@@ -56,15 +58,11 @@ public:
 	virtual void setMtu(unsigned int mtu);
 	virtual void setDns(const char *domain, const std::vector<InetAddress> &servers) {}
 
-	void threadMain()
-		throw();
-
 private:
 	void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int);
 	void *_arg;
 	uint64_t _nwid;
 	MAC _mac;
-	Thread _thread;
 	std::string _homePath;
 	std::string _dev;
 	std::vector<MulticastGroup> _multicastGroups;
@@ -72,6 +70,11 @@ private:
 	int _fd;
 	int _shutdownSignalPipe[2];
 	std::atomic_bool _enabled;
+	std::thread _tapReaderThread;
+	std::thread _tapProcessorThread;
+	std::mutex _buffers_l;
+	std::vector<void *> _buffers;
+	BlockingQueue< std::pair<void *,int> > _tapq;
 };
 
 } // namespace ZeroTier