Browse Source

Merge branch 'dev' of http://git.int.zerotier.com/ZeroTier/ZeroTierOne into dev

Grant Limberg 6 years ago
parent
commit
01e6df4d46
6 changed files with 113 additions and 102 deletions
  1. 2 0
      one.cpp
  2. 25 3
      osdep/BlockingQueue.hpp
  3. 9 6
      osdep/MacEthernetTap.cpp
  4. 2 0
      osdep/MacEthernetTap.hpp
  5. 5 11
      osdep/MacEthernetTapAgent.c
  6. 70 82
      service/OneService.cpp

+ 2 - 0
one.cpp

@@ -1354,12 +1354,14 @@ int main(int argc,char **argv)
 #ifdef __UNIX_LIKE__
 	signal(SIGHUP,&_sighandlerHup);
 	signal(SIGPIPE,SIG_IGN);
+	signal(SIGIO,SIG_IGN);
 	signal(SIGUSR1,SIG_IGN);
 	signal(SIGUSR2,SIG_IGN);
 	signal(SIGALRM,SIG_IGN);
 	signal(SIGINT,&_sighandlerQuit);
 	signal(SIGTERM,&_sighandlerQuit);
 	signal(SIGQUIT,&_sighandlerQuit);
+	signal(SIGINT,&_sighandlerQuit);
 
 	/* Ensure that there are no inherited file descriptors open from a previous
 	 * incarnation. This is a hack to ensure that GitHub issue #61 or variants

+ 25 - 3
osdep/BlockingQueue.hpp

@@ -32,6 +32,8 @@
 #include <condition_variable>
 #include <chrono>
 
+#include "Thread.hpp"
+
 namespace ZeroTier {
 
 /**
@@ -52,11 +54,27 @@ public:
 		c.notify_one();
 	}
 
+	inline void postLimit(T t,const unsigned long limit)
+	{
+		std::unique_lock<std::mutex> lock(m);
+		for(;;) {
+			if (q.size() < limit) {
+				q.push(t);
+				c.notify_one();
+				break;
+			}
+			if (!r)
+				break;
+			gc.wait(lock);
+		}
+	}
+
 	inline void stop(void)
 	{
 		std::lock_guard<std::mutex> lock(m);
 		r = false;
 		c.notify_all();
+		gc.notify_all();
 	}
 
 	inline bool get(T &value)
@@ -65,10 +83,14 @@ public:
 		if (!r) return false;
 		while (q.empty()) {
 			c.wait(lock);
-			if (!r) return false;
+			if (!r) {
+				gc.notify_all();
+				return false;
+			}
 		}
 		value = q.front();
 		q.pop();
+		gc.notify_all();
 		return true;
 	}
 
@@ -98,8 +120,8 @@ public:
 private:
 	volatile bool r;
 	std::queue<T> q;
-	std::mutex m;
-	std::condition_variable c;
+	mutable std::mutex m;
+	mutable std::condition_variable c,gc;
 };
 
 } // namespace ZeroTier

+ 9 - 6
osdep/MacEthernetTap.cpp

@@ -147,7 +147,7 @@ MacEthernetTap::MacEthernetTap(
 	_agentStdin2 = agentStdin[0];
 	_agentStdout2 = agentStdout[1];
 	_agentStderr2 = agentStderr[1];
-	long apid = (long)vfork();
+	long apid = (long)fork();
 	if (apid < 0) {
 		throw std::runtime_error("fork failed");
 	} else if (apid == 0) {
@@ -155,10 +155,13 @@ MacEthernetTap::MacEthernetTap(
 		::dup2(agentStdout[1],STDOUT_FILENO);
 		::dup2(agentStderr[1],STDERR_FILENO);
 		::close(agentStdin[0]);
+		::close(agentStdin[1]);
+		::close(agentStdout[0]);
 		::close(agentStdout[1]);
+		::close(agentStderr[0]);
 		::close(agentStderr[1]);
 		::execl(agentPath.c_str(),agentPath.c_str(),devnostr,ethaddr,mtustr,metricstr,(char *)0);
-		::exit(-1);
+		::_exit(-1);
 	} else {
 		_agentPid = apid;
 	}
@@ -284,7 +287,9 @@ void MacEthernetTap::put(const MAC &from,const MAC &to,unsigned int etherType,co
 		iov[1].iov_len = 15;
 		iov[2].iov_base = const_cast<void *>(data);
 		iov[2].iov_len = len;
+		_putLock.lock();
 		writev(_agentStdin,iov,3);
+		_putLock.unlock();
 	}
 }
 
@@ -356,8 +361,8 @@ void MacEthernetTap::threadMain()
 
 	const int nfds = std::max(std::max(_shutdownSignalPipe[0],_agentStdout),_agentStderr) + 1;
 	long agentReadPtr = 0;
-	fcntl(_agentStdout,F_SETFL,O_NONBLOCK);
-	fcntl(_agentStderr,F_SETFL,O_NONBLOCK);
+	fcntl(_agentStdout,F_SETFL,fcntl(_agentStdout,F_GETFL)|O_NONBLOCK);
+	fcntl(_agentStderr,F_SETFL,fcntl(_agentStderr,F_GETFL)|O_NONBLOCK);
 
 	FD_ZERO(&readfds);
 	FD_ZERO(&nullfds);
@@ -393,8 +398,6 @@ void MacEthernetTap::threadMain()
 						break;
 					}
 				}
-			} else {
-				break;
 			}
 		}
 		if (FD_ISSET(_agentStderr,&readfds)) {

+ 2 - 0
osdep/MacEthernetTap.hpp

@@ -38,6 +38,7 @@
 #include "../node/MAC.hpp"
 #include "../node/InetAddress.hpp"
 #include "../node/MulticastGroup.hpp"
+#include "../node/Mutex.hpp"
 
 #include "Thread.hpp"
 
@@ -80,6 +81,7 @@ private:
 	std::string _homePath;
 	std::string _dev;
 	std::vector<MulticastGroup> _multicastGroups;
+	Mutex _putLock;
 	unsigned int _mtu;
 	unsigned int _metric;
 	int _shutdownSignalPipe[2];

+ 5 - 11
osdep/MacEthernetTapAgent.c

@@ -175,7 +175,7 @@ static int run(const char *path,...)
 	} else if (pid == 0) {
 		dup2(STDERR_FILENO,STDOUT_FILENO);
 		execv(args[0],args);
-		exit(-1);
+		_exit(-1);
 	}
 	int rv = 0;
 	waitpid(pid,&rv,0);
@@ -322,10 +322,6 @@ int main(int argc,char **argv)
 		return ZT_MACETHERNETTAPAGENT_EXIT_CODE_UNABLE_TO_CREATE;
 	}
 
-	fcntl(STDIN_FILENO,F_SETFL,fcntl(STDIN_FILENO,F_GETFL)|O_NONBLOCK);
-	fcntl(s_ndrvfd,F_SETFL,fcntl(s_ndrvfd,F_GETFL)|O_NONBLOCK);
-	fcntl(s_bpffd,F_SETFL,fcntl(s_bpffd,F_GETFL)|O_NONBLOCK);
-
 	fprintf(stderr,"I %s %s %d.%d.%d.%d\n",s_deviceName,s_peerDeviceName,ZEROTIER_ONE_VERSION_MAJOR,ZEROTIER_ONE_VERSION_MINOR,ZEROTIER_ONE_VERSION_REVISION,ZEROTIER_ONE_VERSION_BUILD);
 
 	FD_ZERO(&rfds);
@@ -358,8 +354,6 @@ int main(int argc,char **argv)
 					}
 					p += BPF_WORDALIGN(h->bh_hdrlen + h->bh_caplen);
 				}
-			} else {
-				return ZT_MACETHERNETTAPAGENT_EXIT_CODE_READ_ERROR;
 			}
 		}
 
@@ -381,6 +375,7 @@ int main(int argc,char **argv)
 										}
 									}
 									break;
+
 								case ZT_MACETHERNETTAPAGENT_STDIN_CMD_IFCONFIG: {
 									char *args[16];
 									args[0] = P_IFCONFIG;
@@ -404,18 +399,19 @@ int main(int argc,char **argv)
 									}
 									args[argNo] = (char *)0;
 									if (argNo > 2) {
-										pid_t pid = vfork();
+										pid_t pid = fork();
 										if (pid < 0) {
 											return -1;
 										} else if (pid == 0) {
 											dup2(STDERR_FILENO,STDOUT_FILENO);
 											execv(args[0],args);
-											exit(-1);
+											_exit(-1);
 										}
 										int rv = 0;
 										waitpid(pid,&rv,0);
 									}
 								}	break;
+
 								case ZT_MACETHERNETTAPAGENT_STDIN_CMD_EXIT:
 									return ZT_MACETHERNETTAPAGENT_EXIT_CODE_SUCCESS;
 							}
@@ -430,8 +426,6 @@ int main(int argc,char **argv)
 						break;
 					}
 				}
-			} else {
-				return ZT_MACETHERNETTAPAGENT_EXIT_CODE_READ_ERROR;
 			}
 		}
 	}

+ 70 - 82
service/OneService.cpp

@@ -60,6 +60,7 @@
 #include "../osdep/PortMapper.hpp"
 #include "../osdep/Binder.hpp"
 #include "../osdep/ManagedRoute.hpp"
+#include "../osdep/BlockingQueue.hpp"
 
 #include "OneService.hpp"
 #include "SoftwareUpdater.hpp"
@@ -174,9 +175,6 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; }
 // TCP activity timeout
 #define ZT_TCP_ACTIVITY_TIMEOUT 60000
 
-// Number of receive path threads to start
-#define ZT_INCOMING_PACKET_THREAD_POOL_SIZE 8
-
 #if ZT_VAULT_SUPPORT
 size_t curlResponseWrite(void *ptr, size_t size, size_t nmemb, std::string *data)
 {
@@ -440,6 +438,15 @@ struct TcpConnection
 	Mutex writeq_m;
 };
 
+struct OneServiceIncomingPacket
+{
+	uint64_t now;
+	int64_t sock;
+	struct sockaddr_storage from;
+	unsigned int size;
+	uint8_t data[ZT_MAX_MTU];
+};
+
 class OneServiceImpl : public OneService
 {
 public:
@@ -465,17 +472,11 @@ public:
 	unsigned int _tertiaryPort;
 	volatile unsigned int _udpPortPickerCounter;
 
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
-	struct {
-		uint8_t data[2048];
-		std::thread thr;
-		int64_t sock;
-		struct sockaddr_storage from;
-		int size;
-		std::condition_variable cond;
-		std::mutex lock;
-	} _incomingPacketWorker[ZT_INCOMING_PACKET_THREAD_POOL_SIZE];
-#endif
+	unsigned long _incomingPacketConcurrency;
+	std::vector<OneServiceIncomingPacket *> _incomingPacketMemoryPool;
+	BlockingQueue<OneServiceIncomingPacket *> _incomingPacketQueue;
+	std::vector<std::thread> _incomingPacketThreads;
+	Mutex _incomingPacketMemoryPoolLock,_incomingPacketThreadsLock;
 
 	// Local configuration and memo-ized information from it
 	json _localConfig;
@@ -606,37 +607,33 @@ public:
 		_ports[1] = 0;
 		_ports[2] = 0;
 
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
-		for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
-			_incomingPacketWorker[tn].thr = std::thread([this,tn]() {
-				std::unique_lock<std::mutex> l(_incomingPacketWorker[tn].lock);
+		_incomingPacketConcurrency = std::max((unsigned long)1,std::min((unsigned long)16,(unsigned long)std::thread::hardware_concurrency()));
+		for(long t=0;t<_incomingPacketConcurrency;++t) {
+			_incomingPacketThreads.push_back(std::thread([this]() {
+				OneServiceIncomingPacket *pkt = nullptr;
 				for(;;) {
-					_incomingPacketWorker[tn].cond.wait(l);
-					if (_incomingPacketWorker[tn].size < 0) {
+					if (!_incomingPacketQueue.get(pkt))
+						break;
+					if (!pkt)
+						break;
+
+					const ZT_ResultCode rc = _node->processWirePacket(nullptr,pkt->now,pkt->sock,&(pkt->from),pkt->data,pkt->size,&_nextBackgroundTaskDeadline);
+					{
+						Mutex::Lock l(_incomingPacketMemoryPoolLock);
+						_incomingPacketMemoryPool.push_back(pkt);
+					}
+					if (ZT_ResultCode_isFatal(rc)) {
+						char tmp[256];
+						OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
+						Mutex::Lock _l(_termReason_m);
+						_termReason = ONE_UNRECOVERABLE_ERROR;
+						_fatalErrorMessage = tmp;
+						this->terminate();
 						break;
-					} else if (_incomingPacketWorker[tn].size > 0) {
-						const ZT_ResultCode rc = _node->processWirePacket(
-							(void *)0,
-							OSUtils::now(),
-							_incomingPacketWorker[tn].sock,
-							&(_incomingPacketWorker[tn].from),
-							_incomingPacketWorker[tn].data,
-							(unsigned int)_incomingPacketWorker[tn].size,
-							&_nextBackgroundTaskDeadline);
-						if (ZT_ResultCode_isFatal(rc)) {
-							char tmp[256];
-							OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
-							Mutex::Lock _l(_termReason_m);
-							_termReason = ONE_UNRECOVERABLE_ERROR;
-							_fatalErrorMessage = tmp;
-							this->terminate();
-							break;
-						}
 					}
 				}
-			});
+			}));
 		}
-#endif
 
 #if ZT_VAULT_SUPPORT
 		curl_global_init(CURL_GLOBAL_DEFAULT);
@@ -645,24 +642,27 @@ public:
 
 	virtual ~OneServiceImpl()
 	{
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
-		for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
-			_incomingPacketWorker[tn].lock.lock();
-			_incomingPacketWorker[tn].size = -1;
-			_incomingPacketWorker[tn].lock.unlock();
-			_incomingPacketWorker[tn].cond.notify_all();
-		}
-		for(unsigned int tn=0;tn<ZT_INCOMING_PACKET_THREAD_POOL_SIZE;++tn) {
-			_incomingPacketWorker[tn].thr.join();
-		}
-#endif
+		_incomingPacketQueue.stop();
+		_incomingPacketThreadsLock.lock();
+		for(auto t=_incomingPacketThreads.begin();t!=_incomingPacketThreads.end();++t)
+			t->join();
+		_incomingPacketThreadsLock.unlock();
+
 		_binder.closeAll(_phy);
 		_phy.close(_localControlSocket4);
 		_phy.close(_localControlSocket6);
+
 #if ZT_VAULT_SUPPORT
 		curl_global_cleanup();
 #endif
 
+		_incomingPacketMemoryPoolLock.lock();
+		while (!_incomingPacketMemoryPool.empty()) {
+			delete _incomingPacketMemoryPool.back();
+			_incomingPacketMemoryPool.pop_back();
+		}
+		_incomingPacketMemoryPoolLock.unlock();
+
 #ifdef ZT_USE_MINIUPNPC
 		delete _portMapper;
 #endif
@@ -1900,39 +1900,27 @@ public:
 
 	inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len)
 	{
+		const uint64_t now = OSUtils::now();
 		if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
-			_lastDirectReceiveFromGlobal = OSUtils::now();
-#ifdef ZT_INCOMING_PACKET_THREAD_POOL_SIZE
-		unsigned long cksum = 0;
-		for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i) {
-			cksum += ((uint8_t *)from)[i];
-		}
-		const unsigned long tn = cksum % ZT_INCOMING_PACKET_THREAD_POOL_SIZE;
-		_incomingPacketWorker[tn].lock.lock();
-		memcpy(_incomingPacketWorker[tn].data,data,len);
-		_incomingPacketWorker[tn].sock = reinterpret_cast<int64_t>(sock);
-		memcpy(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage));
-		_incomingPacketWorker[tn].size = (int)len;
-		_incomingPacketWorker[tn].lock.unlock();
-		_incomingPacketWorker[tn].cond.notify_all();
-#else
-		const ZT_ResultCode rc = _node->processWirePacket(
-			(void *)0,
-			OSUtils::now(),
-			reinterpret_cast<int64_t>(sock),
-			reinterpret_cast<const struct sockaddr_storage *>(from), // Phy<> uses sockaddr_storage, so it'll always be that big
-			data,
-			len,
-			&_nextBackgroundTaskDeadline);
-		if (ZT_ResultCode_isFatal(rc)) {
-			char tmp[256];
-			OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
-			Mutex::Lock _l(_termReason_m);
-			_termReason = ONE_UNRECOVERABLE_ERROR;
-			_fatalErrorMessage = tmp;
-			this->terminate();
+			_lastDirectReceiveFromGlobal = now;
+
+		OneServiceIncomingPacket *pkt;
+		_incomingPacketMemoryPoolLock.lock();
+		if (_incomingPacketMemoryPool.empty()) {
+			pkt = new OneServiceIncomingPacket;
+		} else {
+			pkt = _incomingPacketMemoryPool.back();
+			_incomingPacketMemoryPool.pop_back();
 		}
-#endif
+		_incomingPacketMemoryPoolLock.unlock();
+
+		pkt->now = now;
+		pkt->sock = reinterpret_cast<int64_t>(sock);
+		ZT_FAST_MEMCPY(&(pkt->from),from,sizeof(struct sockaddr_storage));
+		pkt->size = (unsigned int)len;
+		ZT_FAST_MEMCPY(pkt->data,data,len);
+
+		_incomingPacketQueue.postLimit(pkt,16 * _incomingPacketConcurrency);
 	}
 
 	inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)