ソースを参照

Merge pull request #2234 from zerotier/jh-zerotier-multithreaded

Add multi-core concurrent packet processing
Adam Ierymenko 10 ヶ月 前
コミット
4a485df0c7

+ 60 - 60
node/IncomingPacket.cpp

@@ -38,6 +38,7 @@
 #include "Path.hpp"
 #include "Bond.hpp"
 #include "Metrics.hpp"
+#include "PacketMultiplexer.hpp"
 
 namespace ZeroTier {
 
@@ -792,66 +793,65 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
 {
 	Metrics::pkt_frame_in++;
 	int32_t _flowId = ZT_QOS_NO_FLOW;
-	if (peer->flowHashingSupported()) {
-		if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) {
-			const unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE);
-			const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
-			const uint8_t *const frameData = reinterpret_cast<const uint8_t *>(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
-
-			if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) {
-				uint16_t srcPort = 0;
-				uint16_t dstPort = 0;
-				uint8_t proto = (reinterpret_cast<const uint8_t *>(frameData)[9]);
-				const unsigned int headerLen = 4 * (reinterpret_cast<const uint8_t *>(frameData)[0] & 0xf);
-				switch(proto) {
-					case 0x01: // ICMP
-						//flowId = 0x01;
-						break;
-					// All these start with 16-bit source and destination port in that order
-					case 0x06: // TCP
-					case 0x11: // UDP
-					case 0x84: // SCTP
-					case 0x88: // UDPLite
-						if (frameLen > (headerLen + 4)) {
-							unsigned int pos = headerLen + 0;
-							srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
-							srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
-							pos++;
-							dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
-							dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
-							_flowId = dstPort ^ srcPort ^ proto;
-						}
-						break;
-				}
+
+	if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) {
+		const unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE);
+		const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
+		const uint8_t *const frameData = reinterpret_cast<const uint8_t *>(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
+
+		if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) {
+			uint16_t srcPort = 0;
+			uint16_t dstPort = 0;
+			uint8_t proto = (reinterpret_cast<const uint8_t *>(frameData)[9]);
+			const unsigned int headerLen = 4 * (reinterpret_cast<const uint8_t *>(frameData)[0] & 0xf);
+			switch(proto) {
+				case 0x01: // ICMP
+					//flowId = 0x01;
+					break;
+				// All these start with 16-bit source and destination port in that order
+				case 0x06: // TCP
+				case 0x11: // UDP
+				case 0x84: // SCTP
+				case 0x88: // UDPLite
+					if (frameLen > (headerLen + 4)) {
+						unsigned int pos = headerLen + 0;
+						srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
+						srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
+						pos++;
+						dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
+						dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
+						_flowId = dstPort ^ srcPort ^ proto;
+					}
+					break;
 			}
+		}
 
-			if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) {
-				uint16_t srcPort = 0;
-				uint16_t dstPort = 0;
-				unsigned int pos;
-				unsigned int proto;
-				_ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto);
-				switch(proto) {
-					case 0x3A: // ICMPv6
-						//flowId = 0x3A;
-						break;
-					// All these start with 16-bit source and destination port in that order
-					case 0x06: // TCP
-					case 0x11: // UDP
-					case 0x84: // SCTP
-					case 0x88: // UDPLite
-						if (frameLen > (pos + 4)) {
-							srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
-							srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
-							pos++;
-							dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
-							dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
-							_flowId = dstPort ^ srcPort ^ proto;
-						}
-						break;
-					default:
-						break;
-				}
+		if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) {
+			uint16_t srcPort = 0;
+			uint16_t dstPort = 0;
+			unsigned int pos;
+			unsigned int proto;
+			_ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto);
+			switch(proto) {
+				case 0x3A: // ICMPv6
+					//flowId = 0x3A;
+					break;
+				// All these start with 16-bit source and destination port in that order
+				case 0x06: // TCP
+				case 0x11: // UDP
+				case 0x84: // SCTP
+				case 0x88: // UDPLite
+					if (frameLen > (pos + 4)) {
+						srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
+						srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
+						pos++;
+						dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
+						dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
+						_flowId = dstPort ^ srcPort ^ proto;
+					}
+					break;
+				default:
+					break;
 			}
 		}
 	}
@@ -868,7 +868,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
 				const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
 				const uint8_t *const frameData = reinterpret_cast<const uint8_t *>(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
 				if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),sourceMac,network->mac(),frameData,frameLen,etherType,0) > 0) {
-					RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen);
+					RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId);
 				}
 			}
 		} else {
@@ -941,7 +941,7 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
 					}
 					// fall through -- 2 means accept regardless of bridging checks or other restrictions
 				case 2:
-					RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen);
+					RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId);
 					break;
 			}
 		}

+ 17 - 2
node/Node.cpp

@@ -35,6 +35,7 @@
 #include "Network.hpp"
 #include "Trace.hpp"
 #include "Metrics.hpp"
+#include "PacketMultiplexer.hpp"
 
 // FIXME: remove this suppression and actually fix warnings
 #ifdef __GNUC__
@@ -119,9 +120,10 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64
 		const unsigned long mcs = sizeof(Multicaster) + (((sizeof(Multicaster) & 0xf) != 0) ? (16 - (sizeof(Multicaster) & 0xf)) : 0);
 		const unsigned long topologys = sizeof(Topology) + (((sizeof(Topology) & 0xf) != 0) ? (16 - (sizeof(Topology) & 0xf)) : 0);
 		const unsigned long sas = sizeof(SelfAwareness) + (((sizeof(SelfAwareness) & 0xf) != 0) ? (16 - (sizeof(SelfAwareness) & 0xf)) : 0);
-		const unsigned long bc = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0);
+		const unsigned long bcs = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0);
+		const unsigned long pms = sizeof(PacketMultiplexer) + (((sizeof(PacketMultiplexer) & 0xf) != 0) ? (16 - (sizeof(PacketMultiplexer) & 0xf)) : 0);
 
-		m = reinterpret_cast<char *>(::malloc(16 + ts + sws + mcs + topologys + sas + bc));
+		m = reinterpret_cast<char *>(::malloc(16 + ts + sws + mcs + topologys + sas + bcs + pms));
 		if (!m) {
 			throw std::bad_alloc();
 		}
@@ -141,6 +143,8 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64
 		RR->sa = new (m) SelfAwareness(RR);
 		m += sas;
 		RR->bc = new (m) Bond(RR);
+		m += bcs;
+		RR->pm = new (m) PacketMultiplexer(RR);
 	} catch ( ... ) {
 		if (RR->sa) {
 			RR->sa->~SelfAwareness();
@@ -160,6 +164,9 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64
 		if (RR->bc) {
 			RR->bc->~Bond();
 		}
+		if (RR->pm) {
+			RR->pm->~PacketMultiplexer();
+		}
 		::free(m);
 		throw;
 	}
@@ -191,6 +198,9 @@ Node::~Node()
 	if (RR->bc) {
 		RR->bc->~Bond();
 	}
+	if (RR->pm) {
+		RR->pm->~PacketMultiplexer();
+	}
 	::free(RR->rtmem);
 }
 
@@ -230,6 +240,11 @@ ZT_ResultCode Node::processVirtualNetworkFrame(
 	}
 }
 
+void Node::initMultithreading(unsigned int concurrency, bool cpuPinningEnabled)
+{
+	RR->pm->setUpPostDecodeReceiveThreads(concurrency, cpuPinningEnabled);
+}
+
 // Closure used to ping upstream and active/online peers
 class _PingPeersThatNeedPing
 {

+ 4 - 1
node/Node.hpp

@@ -283,7 +283,10 @@ public:
 		return _lowBandwidthMode;
 	}
 
-private:
+	void initMultithreading(unsigned int concurrency, bool cpuPinningEnabled);
+
+
+public:
 	RuntimeEnvironment _RR;
 	RuntimeEnvironment *RR;
 	void *_uPtr; // _uptr (lower case) is reserved in Visual Studio :P

+ 122 - 0
node/PacketMultiplexer.cpp

@@ -0,0 +1,122 @@
+/*
+ * Copyright (c)2013-2021 ZeroTier, Inc.
+ *
+ * Use of this software is governed by the Business Source License included
+ * in the LICENSE.TXT file in the project's root directory.
+ *
+ * Change Date: 2026-01-01
+ *
+ * On the date above, in accordance with the Business Source License, use
+ * of this software will be governed by version 2.0 of the Apache License.
+ */
+/****/
+
+#include "PacketMultiplexer.hpp"
+
+#include "Node.hpp"
+#include "RuntimeEnvironment.hpp"
+#include "Constants.hpp"
+
+#include <stdio.h>
+#include <stdlib.h>
+
+namespace ZeroTier {
+
+PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv)
+{
+	RR = renv;
+};
+
+void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId)
+{
+#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)
+	RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len);
+	return;
+#endif
+
+	if (!_enabled) {
+		RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len);
+		return;
+	}
+
+	PacketRecord* packet;
+	_rxPacketVector_m.lock();
+	if (_rxPacketVector.empty()) {
+		packet = new PacketRecord;
+	}
+	else {
+		packet = _rxPacketVector.back();
+		_rxPacketVector.pop_back();
+	}
+	_rxPacketVector_m.unlock();
+
+	packet->tPtr = tPtr;
+	packet->nwid = nwid;
+	packet->nuptr = nuptr;
+	packet->source = source.toInt();
+	packet->dest = dest.toInt();
+	packet->etherType = etherType;
+	packet->vlanId = vlanId;
+	packet->len = len;
+	packet->flowId = flowId;
+	memcpy(packet->data, data, len);
+
+	int bucket = flowId % _concurrency;
+	_rxPacketQueues[bucket]->postLimit(packet, 2048);
+}
+
+void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled)
+{
+#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)
+	return;
+#endif
+	_enabled = true;
+	_concurrency = concurrency;
+	bool _enablePinning = cpuPinningEnabled;
+
+	for (unsigned int i = 0; i < _concurrency; ++i) {
+		fprintf(stderr, "Reserved queue for thread %d\n", i);
+		_rxPacketQueues.push_back(new BlockingQueue<PacketRecord*>());
+	}
+
+	// Each thread picks from its own queue to feed into the core
+	for (unsigned int i = 0; i < _concurrency; ++i) {
+		_rxThreads.push_back(std::thread([this, i, _enablePinning]() {
+			fprintf(stderr, "Created post-decode packet ingestion thread %d\n", i);
+
+			PacketRecord* packet = nullptr;
+			for (;;) {
+				if (! _rxPacketQueues[i]->get(packet)) {
+					break;
+				}
+				if (! packet) {
+					break;
+				}
+
+				// fprintf(stderr, "popped packet from queue %d\n", i);
+
+				MAC sourceMac = MAC(packet->source);
+				MAC destMac = MAC(packet->dest);
+
+				RR->node->putFrame(packet->tPtr, packet->nwid, packet->nuptr, sourceMac, destMac, packet->etherType, 0, (const void*)packet->data, packet->len);
+				{
+					Mutex::Lock l(_rxPacketVector_m);
+					_rxPacketVector.push_back(packet);
+				}
+				/*
+				if (ZT_ResultCode_isFatal(err)) {
+					char tmp[256];
+					OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err);
+					Mutex::Lock _l(_termReason_m);
+					_termReason = ONE_UNRECOVERABLE_ERROR;
+					_fatalErrorMessage = tmp;
+					this->terminate();
+					break;
+				}
+				*/
+			}
+		}));
+	}
+}
+
+}	// namespace ZeroTier

+ 65 - 0
node/PacketMultiplexer.hpp

@@ -0,0 +1,65 @@
+/*
+ * Copyright (c)2013-2021 ZeroTier, Inc.
+ *
+ * Use of this software is governed by the Business Source License included
+ * in the LICENSE.TXT file in the project's root directory.
+ *
+ * Change Date: 2026-01-01
+ *
+ * On the date above, in accordance with the Business Source License, use
+ * of this software will be governed by version 2.0 of the Apache License.
+ */
+/****/
+
+#ifndef ZT_PACKET_MULTIPLEXER_HPP
+#define ZT_PACKET_MULTIPLEXER_HPP
+
+#include "../osdep/BlockingQueue.hpp"
+#include "MAC.hpp"
+#include "Mutex.hpp"
+#include "RuntimeEnvironment.hpp"
+
+#include <thread>
+#include <vector>
+
+namespace ZeroTier {
+
+struct PacketRecord {
+	void* tPtr;
+	uint64_t nwid;
+	void** nuptr;
+	uint64_t source;
+	uint64_t dest;
+	unsigned int etherType;
+	unsigned int vlanId;
+	uint8_t data[ZT_MAX_MTU];
+	unsigned int len;
+	unsigned int flowId;
+};
+
+class PacketMultiplexer {
+  public:
+	const RuntimeEnvironment* RR;
+
+	PacketMultiplexer(const RuntimeEnvironment* renv);
+
+	void setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled);
+
+	void putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId);
+
+	std::vector<BlockingQueue<PacketRecord*>*> _rxPacketQueues;
+
+	unsigned int _concurrency;
+	// pool
+	std::vector<PacketRecord*> _rxPacketVector;
+	std::vector<std::thread> _rxPacketThreads;
+	Mutex _rxPacketVector_m, _rxPacketThreads_m;
+
+	std::vector<std::thread> _rxThreads;
+	unsigned int _rxThreadCount;
+	bool _enabled;
+};
+
+}	// namespace ZeroTier
+
+#endif	 // ZT_PACKET_MULTIPLEXER_HPP

+ 2 - 0
node/RuntimeEnvironment.hpp

@@ -31,6 +31,7 @@ class NetworkController;
 class SelfAwareness;
 class Trace;
 class Bond;
+class PacketMultiplexer;
 
 /**
  * Holds global state for an instance of ZeroTier::Node
@@ -77,6 +78,7 @@ public:
 	Topology *topology;
 	SelfAwareness *sa;
 	Bond *bc;
+	PacketMultiplexer *pm;
 
 	// This node's identity and string representations thereof
 	Identity identity;

+ 1 - 1
node/Switch.cpp

@@ -519,7 +519,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 						RR->node->putFrame(tPtr, network->id(), network->userPtr(), peerMac, from, ZT_ETHERTYPE_IPV6, 0, adv, 72);
 
 					}).detach();
-					
+
 					return; // NDP emulation done. We have forged a "fake" reply, so no need to send actual NDP query.
 				} // else no NDP emulation
 			} // else no NDP emulation

+ 2 - 1
objects.mk

@@ -29,7 +29,8 @@ CORE_OBJS=\
 	node/Topology.o \
 	node/Trace.o \
 	node/Utils.o \
-	node/Bond.o
+	node/Bond.o \
+	node/PacketMultiplexer.o
 
 ONE_OBJS=\
 	controller/EmbeddedNetworkController.o \

+ 69 - 38
osdep/BSDEthernetTap.cpp

@@ -39,7 +39,9 @@
 #include <net/if_dl.h>
 #include <net/if_media.h>
 #include <net/route.h>
+#include <pthread_np.h>
 
+#include <sched.h>
 #include <string>
 #include <map>
 #include <set>
@@ -53,6 +55,7 @@
 #include "BSDEthernetTap.hpp"
 
 #define ZT_BASE32_CHARS "0123456789abcdefghijklmnopqrstuv"
+#define ZT_TAP_BUF_SIZE (1024 * 16)
 
 // ff:ff:ff:ff:ff:ff with no ADI
 static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC(0xff),0);
@@ -61,6 +64,8 @@ namespace ZeroTier {
 
 BSDEthernetTap::BSDEthernetTap(
 	const char *homePath,
+	unsigned int concurrency,
+	bool pinning,
 	const MAC &mac,
 	unsigned int mtu,
 	unsigned int metric,
@@ -69,6 +74,8 @@ BSDEthernetTap::BSDEthernetTap(
 	void (*handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int),
 	void *arg) :
 	_handler(handler),
+	_concurrency(concurrency),
+	_pinning(pinning),
 	_arg(arg),
 	_nwid(nwid),
 	_mtu(mtu),
@@ -195,11 +202,9 @@ BSDEthernetTap::BSDEthernetTap(
 BSDEthernetTap::~BSDEthernetTap()
 {
 	::write(_shutdownSignalPipe[1],"\0",1); // causes thread to exit
-	Thread::join(_thread);
 	::close(_fd);
 	::close(_shutdownSignalPipe[0]);
 	::close(_shutdownSignalPipe[1]);
-
 	long cpid = (long)vfork();
 	if (cpid == 0) {
 #ifdef ZT_TRACE
@@ -211,6 +216,10 @@ BSDEthernetTap::~BSDEthernetTap()
 		int exitcode = -1;
 		::waitpid(cpid,&exitcode,0);
 	}
+	Thread::join(_thread);
+	for (std::thread &t : _rxThreads) {
+		t.join();
+	}
 }
 
 void BSDEthernetTap::setEnabled(bool en)
@@ -418,53 +427,75 @@ void BSDEthernetTap::setMtu(unsigned int mtu)
 void BSDEthernetTap::threadMain()
 	throw()
 {
-	fd_set readfds,nullfds;
-	MAC to,from;
-	int n,nfds,r;
-	char getBuf[ZT_MAX_MTU + 64];
-
 	// Wait for a moment after startup -- wait for Network to finish
 	// constructing itself.
 	Thread::sleep(500);
 
-	FD_ZERO(&readfds);
-	FD_ZERO(&nullfds);
-	nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1;
+	for (unsigned int i = 0; i < _concurrency; ++i) {
+		_rxThreads.push_back(std::thread([this, i, _pinning] {
+
+			if (_pinning) {
+				int pinCore = i % _concurrency;
+				fprintf(stderr, "Pinning thread %d to core %d\n", i, pinCore);
+				pthread_t self = pthread_self();
+				cpu_set_t cpuset;
+				CPU_ZERO(&cpuset);
+				CPU_SET(pinCore, &cpuset);
+				//int rc = sched_setaffinity(self, sizeof(cpu_set_t), &cpuset);
+				int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
+				if (rc != 0)
+				{
+					fprintf(stderr, "Failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno));
+					exit(1);
+				}
+			}
+
+			uint8_t b[ZT_TAP_BUF_SIZE];
+			MAC to, from;
+			fd_set readfds, nullfds;
+			int n, nfds, r;
 
-	r = 0;
-	for(;;) {
-		FD_SET(_shutdownSignalPipe[0],&readfds);
-		FD_SET(_fd,&readfds);
-		select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);
+			FD_ZERO(&readfds);
+			FD_ZERO(&nullfds);
+			nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1;
 
-		if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread
-			break;
+			r = 0;
 
-		if (FD_ISSET(_fd,&readfds)) {
-			n = (int)::read(_fd,getBuf + r,sizeof(getBuf) - r);
-			if (n < 0) {
-				if ((errno != EINTR)&&(errno != ETIMEDOUT))
+			for(;;) {
+				FD_SET(_shutdownSignalPipe[0],&readfds);
+				FD_SET(_fd,&readfds);
+				select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);
+
+				if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread
 					break;
-			} else {
-				// Some tap drivers like to send the ethernet frame and the
-				// payload in two chunks, so handle that by accumulating
-				// data until we have at least a frame.
-				r += n;
-				if (r > 14) {
-					if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms
-						r = _mtu + 14;
-
-					if (_enabled) {
-						to.setTo(getBuf,6);
-						from.setTo(getBuf + 6,6);
-						unsigned int etherType = ntohs(((const uint16_t *)getBuf)[6]);
-						_handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(getBuf + 14),r - 14);
-					}
 
-					r = 0;
+				if (FD_ISSET(_fd,&readfds)) {
+					n = (int)::read(_fd,b + r,sizeof(b) - r);
+					if (n < 0) {
+						if ((errno != EINTR)&&(errno != ETIMEDOUT))
+							break;
+					} else {
+						// Some tap drivers like to send the ethernet frame and the
+						// payload in two chunks, so handle that by accumulating
+						// data until we have at least a frame.
+						r += n;
+						if (r > 14) {
+							if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms
+								r = _mtu + 14;
+
+							if (_enabled) {
+								to.setTo(b,6);
+								from.setTo(b + 6,6);
+								unsigned int etherType = ntohs(((const uint16_t *)b)[6]);
+								_handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(b + 14),r - 14);
+							}
+
+							r = 0;
+						}
+					}
 				}
 			}
-		}
+		}));
 	}
 }
 

+ 6 - 0
osdep/BSDEthernetTap.hpp

@@ -20,6 +20,7 @@
 #include <string>
 #include <vector>
 #include <stdexcept>
+#include <thread>
 
 #include "../node/Constants.hpp"
 #include "../node/MulticastGroup.hpp"
@@ -34,6 +35,8 @@ class BSDEthernetTap : public EthernetTap
 public:
 	BSDEthernetTap(
 		const char *homePath,
+		unsigned int concurrency,
+		bool pinning,
 		const MAC &mac,
 		unsigned int mtu,
 		unsigned int metric,
@@ -62,6 +65,8 @@ public:
 private:
 	void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int);
 	void *_arg;
+	unsigned int _concurrency;
+	bool _pinning;
 	uint64_t _nwid;
 	Thread _thread;
 	std::string _dev;
@@ -73,6 +78,7 @@ private:
 	volatile bool _enabled;
 	mutable std::vector<InetAddress> _ifaddrs;
 	mutable uint64_t _lastIfAddrsUpdate;
+	std::vector<std::thread> _rxThreads;
 };
 
 } // namespace ZeroTier

+ 4 - 2
osdep/EthernetTap.cpp

@@ -57,6 +57,8 @@ namespace ZeroTier {
 
 std::shared_ptr<EthernetTap> EthernetTap::newInstance(
 	const char *tapDeviceType, // OS-specific, NULL for default
+	unsigned int concurrency,
+	bool pinning,
 	const char *homePath,
 	const MAC &mac,
 	unsigned int mtu,
@@ -92,7 +94,7 @@ std::shared_ptr<EthernetTap> EthernetTap::newInstance(
 #endif // __APPLE__
 
 #ifdef __LINUX__
-	return std::shared_ptr<EthernetTap>(new LinuxEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
+	return std::shared_ptr<EthernetTap>(new LinuxEthernetTap(homePath,concurrency,pinning,mac,mtu,metric,nwid,friendlyName,handler,arg));
 #endif // __LINUX__
 
 #ifdef __WINDOWS__
@@ -130,7 +132,7 @@ std::shared_ptr<EthernetTap> EthernetTap::newInstance(
 #endif // __WINDOWS__
 
 #ifdef __FreeBSD__
-	return std::shared_ptr<EthernetTap>(new BSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
+	return std::shared_ptr<EthernetTap>(new BSDEthernetTap(homePath,concurrency,pinning,mac,mtu,metric,nwid,friendlyName,handler,arg));
 #endif // __FreeBSD__
 
 #ifdef __NetBSD__

+ 2 - 0
osdep/EthernetTap.hpp

@@ -32,6 +32,8 @@ class EthernetTap
 public:
 	static std::shared_ptr<EthernetTap> newInstance(
 		const char *tapDeviceType, // OS-specific, NULL for default
+		unsigned int concurrency,
+		bool pinning,
 		const char *homePath,
 		const MAC &mac,
 		unsigned int mtu,

+ 123 - 101
osdep/LinuxEthernetTap.cpp

@@ -60,7 +60,7 @@
 #define IFNAMSIZ 16
 #endif
 
-#define ZT_TAP_BUF_SIZE 16384
+#define ZT_TAP_BUF_SIZE (1024 * 16)
 
 // ff:ff:ff:ff:ff:ff with no ADI
 static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC(0xff),0);
@@ -68,7 +68,7 @@ static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC
 namespace ZeroTier {
 
 // determine if we're running a really old linux kernel.
-// Kernels in the 2.6.x series don't behave the same when bringing up 
+// Kernels in the 2.6.x series don't behave the same when bringing up
 // the tap devices.
 //
 // Returns true if the kernel major version is < 3
@@ -111,6 +111,8 @@ static void _base32_5_to_8(const uint8_t *in,char *out)
 
 LinuxEthernetTap::LinuxEthernetTap(
 	const char *homePath,
+	unsigned int concurrency,
+	bool pinning,
 	const MAC &mac,
 	unsigned int mtu,
 	unsigned int metric,
@@ -220,135 +222,155 @@ LinuxEthernetTap::LinuxEthernetTap(
 
 	(void)::pipe(_shutdownSignalPipe);
 
-	_tapReaderThread = std::thread([this]{
-		uint8_t b[ZT_TAP_BUF_SIZE];
-		fd_set readfds,nullfds;
-		int n,nfds,r;
-		std::vector<void *> buffers;
-		struct ifreq ifr;
+	for (unsigned int i = 0; i < concurrency; ++i) {
+		_rxThreads.push_back(std::thread([this, i, concurrency, pinning] {
+
+			if (pinning) {
+				int pinCore = i % concurrency;
+				fprintf(stderr, "Pinning tap thread %d to core %d\n", i, pinCore);
+				pthread_t self = pthread_self();
+				cpu_set_t cpuset;
+				CPU_ZERO(&cpuset);
+				CPU_SET(pinCore, &cpuset);
+				int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
+				if (rc != 0)
+				{
+					fprintf(stderr, "Failed to pin tap thread %d to core %d: %s\n", i, pinCore, strerror(errno));
+					exit(1);
+				}
+			}
+
+			uint8_t b[ZT_TAP_BUF_SIZE];
+			fd_set readfds, nullfds;
+			int n, nfds, r;
+			if (i == 0) {
+				struct ifreq ifr;
+				memset(&ifr, 0, sizeof(ifr));
+				strcpy(ifr.ifr_name, _dev.c_str());
+
+				const int sock = socket(AF_INET, SOCK_DGRAM, 0);
+				if (sock <= 0)
+					return;
+
+				if (ioctl(sock, SIOCGIFFLAGS, (void*)&ifr) < 0) {
+					::close(sock);
+					printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n");
+					return;
+				}
 
-		memset(&ifr,0,sizeof(ifr));
-		strcpy(ifr.ifr_name,_dev.c_str());
+				ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER;
+				_mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data, 6);
+				if (ioctl(sock, SIOCSIFHWADDR, (void*)&ifr) < 0) {
+					::close(sock);
+					printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n");
+					return;
+				}
 
-		const int sock = socket(AF_INET,SOCK_DGRAM,0);
-		if (sock <= 0)
-			return;
+				usleep(100000);
 
-		if (ioctl(sock,SIOCGIFFLAGS,(void *)&ifr) < 0) {
-			::close(sock);
-			printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n");
-			return;
-		}
+				if (isOldLinuxKernel()) {
+					ifr.ifr_ifru.ifru_mtu = (int)_mtu;
+					if (ioctl(sock, SIOCSIFMTU, (void*)&ifr) < 0) {
+						::close(sock);
+						printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n");
+						return;
+					}
 
-		ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER;
-		_mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6);
-		if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) {
-			::close(sock);
-			printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n");
-			return;
-		}
+					usleep(100000);
+				}
 
-		usleep(100000);
+				ifr.ifr_flags |= IFF_MULTICAST;
+				ifr.ifr_flags |= IFF_UP;
+				if (ioctl(sock, SIOCSIFFLAGS, (void*)&ifr) < 0) {
+					::close(sock);
+					printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n");
+					return;
+				}
 
-		if (isOldLinuxKernel()) {
-			ifr.ifr_ifru.ifru_mtu = (int)_mtu;
-			if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) {
-				::close(sock);
-				printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n");
-				return;
-			}
+				usleep(100000);
 
-			usleep(100000);
-		}
-	
-
-		ifr.ifr_flags |= IFF_MULTICAST;
-		ifr.ifr_flags |= IFF_UP;
-		if (ioctl(sock,SIOCSIFFLAGS,(void *)&ifr) < 0) {
-			::close(sock);
-			printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n");
-			return;
-		}
+				if (! isOldLinuxKernel()) {
+					ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER;
+					_mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data, 6);
+					if (ioctl(sock, SIOCSIFHWADDR, (void*)&ifr) < 0) {
+						::close(sock);
+						printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n");
+						return;
+					}
+
+					ifr.ifr_ifru.ifru_mtu = (int)_mtu;
+					if (ioctl(sock, SIOCSIFMTU, (void*)&ifr) < 0) {
+						::close(sock);
+						printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n");
+						return;
+					}
+				}
 
-		usleep(100000);
+				fcntl(_fd, F_SETFL, O_NONBLOCK);
 
-		if (!isOldLinuxKernel()) {
-			ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER;
-			_mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6);
-			if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) {
 				::close(sock);
-				printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n");
-				return;
 			}
 
-			ifr.ifr_ifru.ifru_mtu = (int)_mtu;
-			if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) {
-				::close(sock);
-				printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n");
+			if (! _run) {
 				return;
 			}
-		}
 
-		fcntl(_fd,F_SETFL,O_NONBLOCK);
-
-		::close(sock);
-
-		if (!_run)
-			return;
-
-		FD_ZERO(&readfds);
-		FD_ZERO(&nullfds);
-		nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1;
-
-		r = 0;
-		for(;;) {
-			FD_SET(_shutdownSignalPipe[0],&readfds);
-			FD_SET(_fd,&readfds);
-			select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);
-
-			if (FD_ISSET(_shutdownSignalPipe[0],&readfds))
-				break;
-
-			if (FD_ISSET(_fd,&readfds)) {
-				for(;;) { // read until there are no more packets, then return to outer select() loop
-					n = (int)::read(_fd,b + r,ZT_TAP_BUF_SIZE - r);
-					if (n > 0) {
-						// Some tap drivers like to send the ethernet frame and the
-						// payload in two chunks, so handle that by accumulating
-						// data until we have at least a frame.
-						r += n;
-						if (r > 14) {
-							if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms
-								r = _mtu + 14;
-
-							if (_enabled) {
-								//_tapq.post(std::pair<void *,int>(buf,r));
-								//buf = nullptr;
-								MAC to(b, 6),from(b + 6, 6);
-								unsigned int etherType = Utils::ntoh(((const uint16_t *)b)[6]);
-								_handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void *)(b + 14),(unsigned int)(r - 14));
-							}
+			FD_ZERO(&readfds);
+			FD_ZERO(&nullfds);
+			nfds = (int)std::max(_shutdownSignalPipe[0], _fd) + 1;
+
+			r = 0;
+			for (;;) {
+				FD_SET(_shutdownSignalPipe[0], &readfds);
+				FD_SET(_fd, &readfds);
+				select(nfds, &readfds, &nullfds, &nullfds, (struct timeval*)0);
 
+				if (FD_ISSET(_shutdownSignalPipe[0], &readfds)) {
+					break;
+				}
+				if (FD_ISSET(_fd, &readfds)) {
+					for (;;) {
+						// read until there are no more packets, then return to outer select() loop
+						n = (int)::read(_fd, b + r, ZT_TAP_BUF_SIZE - r);
+						if (n > 0) {
+							// Some tap drivers like to send the ethernet frame and the
+							// payload in two chunks, so handle that by accumulating
+							// data until we have at least a frame.
+							r += n;
+							if (r > 14) {
+								if (r > ((int)_mtu + 14))	// sanity check for weird TAP behavior on some platforms
+									r = _mtu + 14;
+
+								if (_enabled) {
+									MAC to(b, 6), from(b + 6, 6);
+									unsigned int etherType = Utils::ntoh(((const uint16_t*)b)[6]);
+									_handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void*)(b + 14), (unsigned int)(r - 14));
+								}
+
+								r = 0;
+							}
+						}
+						else {
 							r = 0;
+							break;
 						}
-					} else {
-						r = 0;
-						break;
 					}
 				}
 			}
-		}
-	});
+		}));
+	}
 }
 
 LinuxEthernetTap::~LinuxEthernetTap()
 {
 	_run = false;
 	(void)::write(_shutdownSignalPipe[1],"\0",1);
-	_tapReaderThread.join();
 	::close(_fd);
 	::close(_shutdownSignalPipe[0]);
 	::close(_shutdownSignalPipe[1]);
+	for (std::thread &t : _rxThreads) {
+		t.join();
+	}
 }
 
 void LinuxEthernetTap::setEnabled(bool en)

+ 4 - 4
osdep/LinuxEthernetTap.hpp

@@ -26,6 +26,7 @@
 #include <mutex>
 #include "../node/MulticastGroup.hpp"
 #include "EthernetTap.hpp"
+#include "BlockingQueue.hpp"
 
 namespace ZeroTier {
 
@@ -34,6 +35,8 @@ class LinuxEthernetTap : public EthernetTap
 public:
 	LinuxEthernetTap(
 		const char *homePath,
+		unsigned int concurrency,
+		bool pinning,
 		const MAC &mac,
 		unsigned int mtu,
 		unsigned int metric,
@@ -57,9 +60,6 @@ public:
 	virtual void setMtu(unsigned int mtu);
 	virtual void setDns(const char *domain, const std::vector<InetAddress> &servers) {}
 
-
-
-
 private:
 	void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int);
 	void *_arg;
@@ -73,9 +73,9 @@ private:
 	int _shutdownSignalPipe[2];
 	std::atomic_bool _enabled;
 	std::atomic_bool _run;
-	std::thread _tapReaderThread;
 	mutable std::vector<InetAddress> _ifaddrs;
 	mutable uint64_t _lastIfAddrsUpdate;
+	std::vector<std::thread> _rxThreads;
 };
 
 } // namespace ZeroTier

+ 1 - 1
osdep/MacEthernetTapAgent.c

@@ -32,7 +32,7 @@
  * All this stuff is basically undocumented. A lot of tracing through
  * the Darwin/XNU kernel source was required to figure out how to make
  * this actually work.
- * 
+ *
  * We hope to develop a DriverKit-based driver in the near-mid future to
  * replace this weird hack, but it works for now through Big Sur in our
  * testing.

+ 3 - 1
osdep/MacKextEthernetTap.cpp

@@ -447,7 +447,9 @@ MacKextEthernetTap::~MacKextEthernetTap()
 
 	::write(_shutdownSignalPipe[1],"\0",1); // causes thread to exit
 	Thread::join(_thread);
-
+		for (std::thread &t : _rxThreads) {
+		t.join();
+	}
 	::close(_fd);
 	::close(_shutdownSignalPipe[0]);
 	::close(_shutdownSignalPipe[1]);

+ 2 - 0
osdep/MacKextEthernetTap.hpp

@@ -20,6 +20,7 @@
 #include <stdexcept>
 #include <string>
 #include <vector>
+#include <thread>
 
 #include "../node/Constants.hpp"
 #include "../node/MAC.hpp"
@@ -75,6 +76,7 @@ private:
 	int _fd;
 	int _shutdownSignalPipe[2];
 	volatile bool _enabled;
+	std::vector<std::thread> _rxThreads;
 };
 
 } // namespace ZeroTier

+ 82 - 20
service/OneService.cpp

@@ -16,7 +16,6 @@
 #include <stdlib.h>
 #include <string.h>
 #include <stdint.h>
-
 #include <string>
 #include <map>
 #include <vector>
@@ -26,6 +25,11 @@
 #include <mutex>
 #include <condition_variable>
 
+#ifdef __FreeBSD__
+#include <sched.h>
+#include <pthread_np.h>
+#endif
+
 #include "../version.h"
 #include "../include/ZeroTierOne.h"
 
@@ -42,6 +46,7 @@
 #include "../node/SHA512.hpp"
 #include "../node/Bond.hpp"
 #include "../node/Peer.hpp"
+#include "../node/PacketMultiplexer.hpp"
 
 #include "../osdep/Phy.hpp"
 #include "../osdep/OSUtils.hpp"
@@ -759,7 +764,7 @@ struct TcpConnection
 	Mutex writeq_m;
 };
 
-struct OneServiceIncomingPacket
+struct PacketRecord
 {
 	uint64_t now;
 	int64_t sock;
@@ -786,14 +791,22 @@ public:
 	SoftwareUpdater *_updater;
 	bool _updateAutoApply;
 
-    httplib::Server _controlPlane;
+	httplib::Server _controlPlane;
 	httplib::Server _controlPlaneV6;
-    std::thread _serverThread;
+	std::thread _serverThread;
 	std::thread _serverThreadV6;
 	bool _serverThreadRunning;
 	bool _serverThreadRunningV6;
 
-    bool _allowTcpFallbackRelay;
+	BlockingQueue<PacketRecord *> _rxPacketQueue;
+	std::vector<PacketRecord *> _rxPacketVector;
+	std::vector<std::thread> _rxPacketThreads;
+	Mutex _rxPacketVector_m,_rxPacketThreads_m;
+	bool _multicoreEnabled;
+	bool _cpuPinningEnabled;
+	unsigned int _concurrency;
+
+	bool _allowTcpFallbackRelay;
 	bool _forceTcpRelay;
 	bool _allowSecondaryPort;
 	bool _enableWebServer;
@@ -844,8 +857,6 @@ public:
 	// Deadline for the next background task service function
 	volatile int64_t _nextBackgroundTaskDeadline;
 
-
-
 	std::map<uint64_t,NetworkState> _nets;
 	Mutex _nets_m;
 
@@ -892,9 +903,9 @@ public:
 		,_node((Node *)0)
 		,_updater((SoftwareUpdater *)0)
 		,_updateAutoApply(false)
-        ,_controlPlane()
+		,_controlPlane()
 		,_controlPlaneV6()
-        ,_serverThread()
+		,_serverThread()
 		,_serverThreadV6()
 		,_serverThreadRunning(false)
 		,_serverThreadRunningV6(false)
@@ -928,9 +939,9 @@ public:
 		_ports[1] = 0;
 		_ports[2] = 0;
 
-        prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr);
-        prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5));
-        prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom");
+		prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr);
+		prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5));
+		prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom");
 
 #if ZT_VAULT_SUPPORT
 		curl_global_init(CURL_GLOBAL_DEFAULT);
@@ -942,20 +953,34 @@ public:
 #ifdef __WINDOWS__
 		WinFWHelper::removeICMPRules();
 #endif
+
+		_rxPacketQueue.stop();
+		_rxPacketThreads_m.lock();
+		for(auto t=_rxPacketThreads.begin();t!=_rxPacketThreads.end();++t) {
+			t->join();
+		}
+		_rxPacketThreads_m.unlock();
 		_binder.closeAll(_phy);
 
 #if ZT_VAULT_SUPPORT
 		curl_global_cleanup();
 #endif
 
-        _controlPlane.stop();
+		_controlPlane.stop();
 		if (_serverThreadRunning) {
-	        _serverThread.join();
+			_serverThread.join();
 		}
 		_controlPlaneV6.stop();
 		if (_serverThreadRunningV6) {
 			_serverThreadV6.join();
 		}
+		_rxPacketVector_m.lock();
+		while (!_rxPacketVector.empty()) {
+			delete _rxPacketVector.back();
+			_rxPacketVector.pop_back();
+		}
+		_rxPacketVector_m.unlock();
+
 
 #ifdef ZT_USE_MINIUPNPC
 		delete _portMapper;
@@ -964,6 +989,15 @@ public:
 		delete _rc;
 	}
 
+	void setUpMultithreading()
+	{
+#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)
+		return;
+#endif
+		_node->initMultithreading(_concurrency, _cpuPinningEnabled);
+		bool pinning = _cpuPinningEnabled;
+	}
+
 	virtual ReasonForTermination run()
 	{
 		try {
@@ -1272,6 +1306,9 @@ public:
 				const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 500;
 				clockShouldBe = now + (int64_t)delay;
 				_phy.poll(delay);
+
+
+
 			}
 		} catch (std::exception &e) {
 			Mutex::Lock _l(_termReason_m);
@@ -2562,7 +2599,25 @@ public:
 			fprintf(stderr,"WARNING: using manually-specified secondary and/or tertiary ports. This can cause NAT issues." ZT_EOL_S);
 		}
 		_portMappingEnabled = OSUtils::jsonBool(settings["portMappingEnabled"],true);
-		_node->setLowBandwidthMode(OSUtils::jsonBool(settings["lowBandwidthMode"],false));
+#if defined(__LINUX__) || defined(__FreeBSD__)
+		_multicoreEnabled = OSUtils::jsonBool(settings["multicoreEnabled"],false);
+		_concurrency = OSUtils::jsonInt(settings["concurrency"],1);
+		_cpuPinningEnabled = OSUtils::jsonBool(settings["cpuPinningEnabled"],false);
+		if (_multicoreEnabled) {
+			unsigned int maxConcurrency = std::thread::hardware_concurrency();
+			if (_concurrency <= 1 || _concurrency >= maxConcurrency) {
+				unsigned int conservativeDefault = (std::thread::hardware_concurrency() >= 4 ? 2 : 1);
+				fprintf(stderr, "Concurrency level provided (%d) is invalid, assigning conservative default value of (%d)\n", _concurrency, conservativeDefault);
+				_concurrency =  conservativeDefault;
+			}
+			setUpMultithreading();
+		}
+		else {
+			// Force values in case the user accidentally defined them with multicore disabled
+			_concurrency = 1;
+			_cpuPinningEnabled = false;
+		}
+#endif
 
 #ifndef ZT_SDK
 		const std::string up(OSUtils::jsonString(settings["softwareUpdate"],ZT_SOFTWARE_UPDATE_DEFAULT));
@@ -2877,16 +2932,19 @@ public:
 	// Handlers for Node and Phy<> callbacks
 	// =========================================================================
 
-	inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len)
+
+
+
+	inline void phyOnDatagram(PhySocket* sock, void** uptr, const struct sockaddr* localAddr, const struct sockaddr* from, void* data, unsigned long len)
 	{
 		if (_forceTcpRelay) {
 			return;
 		}
-        Metrics::udp_recv += len;
+		Metrics::udp_recv += len;
 		const uint64_t now = OSUtils::now();
-		if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) {
+		if ((len >= 16) && (reinterpret_cast<const InetAddress*>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) {
 			_lastDirectReceiveFromGlobal = now;
-        }
+		}
 		const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast<int64_t>(sock),reinterpret_cast<const struct sockaddr_storage *>(from),data,len,&_nextBackgroundTaskDeadline);
 		if (ZT_ResultCode_isFatal(rc)) {
 			char tmp[256];
@@ -2898,6 +2956,7 @@ public:
 		}
 	}
 
+
 	inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)
 	{
 		if (!success) {
@@ -3116,6 +3175,8 @@ public:
 
 						n.setTap(EthernetTap::newInstance(
 							nullptr,
+							_concurrency,
+							_cpuPinningEnabled,
 							_homePath.c_str(),
 							MAC(nwc->mac),
 							nwc->mtu,
@@ -3630,8 +3691,9 @@ public:
 	inline void nodeVirtualNetworkFrameFunction(uint64_t nwid,void **nuptr,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
 	{
 		NetworkState *n = reinterpret_cast<NetworkState *>(*nuptr);
-		if ((!n)||(!n->tap()))
+		if ((!n)||(!n->tap())) {
 			return;
+		}
 		n->tap()->put(MAC(sourceMac),MAC(destMac),etherType,data,len);
 	}
 

+ 1 - 0
windows/ZeroTierOne/ZeroTierOne.vcxproj

@@ -88,6 +88,7 @@
     <ClCompile Include="..\..\node\Node.cpp" />
     <ClCompile Include="..\..\node\OutboundMulticast.cpp" />
     <ClCompile Include="..\..\node\Packet.cpp" />
+    <ClCompile Include="..\..\node\PacketMultiplexer.cpp" />
     <ClCompile Include="..\..\node\Path.cpp" />
     <ClCompile Include="..\..\node\Peer.cpp" />
     <ClCompile Include="..\..\node\Poly1305.cpp">