Browse Source

Add packet multiplexer

Joseph Henry 11 months ago
parent
commit
36adae3d82

+ 9 - 4
node/IncomingPacket.cpp

@@ -38,6 +38,7 @@
 #include "Path.hpp"
 #include "Bond.hpp"
 #include "Metrics.hpp"
+#include "PacketMultiplexer.hpp"
 
 namespace ZeroTier {
 
@@ -793,7 +794,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
 {
 	Metrics::pkt_frame_in++;
 	int32_t _flowId = ZT_QOS_NO_FLOW;
-	if (peer->flowHashingSupported()) {
+	//if (peer->flowHashingSupported()) {
 		if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) {
 			const unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE);
 			const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
@@ -855,7 +856,9 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
 				}
 			}
 		}
-	}
+	//}
+
+	//fprintf(stderr, "IncomingPacket::_doFRAME: flowId=%d\n", _flowId);
 
 	const uint64_t nwid = at<uint64_t>(ZT_PROTO_VERB_FRAME_IDX_NETWORK_ID);
 	const SharedPtr<Network> network(RR->node->network(nwid));
@@ -869,7 +872,8 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
 				const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
 				const uint8_t *const frameData = reinterpret_cast<const uint8_t *>(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
 				if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),sourceMac,network->mac(),frameData,frameLen,etherType,0) > 0) {
-					RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen);
+					//RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen);
+					RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId);
 				}
 			}
 		} else {
@@ -942,7 +946,8 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
 					}
 					// fall through -- 2 means accept regardless of bridging checks or other restrictions
 				case 2:
-					RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen);
+					//RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen);
+					RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId);
 					break;
 			}
 		}

+ 12 - 2
node/Node.cpp

@@ -35,6 +35,7 @@
 #include "Network.hpp"
 #include "Trace.hpp"
 #include "Metrics.hpp"
+#include "PacketMultiplexer.hpp"
 
 // FIXME: remove this suppression and actually fix warnings
 #ifdef __GNUC__
@@ -119,9 +120,10 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64
 		const unsigned long mcs = sizeof(Multicaster) + (((sizeof(Multicaster) & 0xf) != 0) ? (16 - (sizeof(Multicaster) & 0xf)) : 0);
 		const unsigned long topologys = sizeof(Topology) + (((sizeof(Topology) & 0xf) != 0) ? (16 - (sizeof(Topology) & 0xf)) : 0);
 		const unsigned long sas = sizeof(SelfAwareness) + (((sizeof(SelfAwareness) & 0xf) != 0) ? (16 - (sizeof(SelfAwareness) & 0xf)) : 0);
-		const unsigned long bc = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0);
+		const unsigned long bcs = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0);
+		const unsigned long pms = sizeof(PacketMultiplexer) + (((sizeof(PacketMultiplexer) & 0xf) != 0) ? (16 - (sizeof(PacketMultiplexer) & 0xf)) : 0);
 
-		m = reinterpret_cast<char *>(::malloc(16 + ts + sws + mcs + topologys + sas + bc));
+		m = reinterpret_cast<char *>(::malloc(16 + ts + sws + mcs + topologys + sas + bcs + pms));
 		if (!m) {
 			throw std::bad_alloc();
 		}
@@ -141,6 +143,8 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64
 		RR->sa = new (m) SelfAwareness(RR);
 		m += sas;
 		RR->bc = new (m) Bond(RR);
+		m += bcs;
+		RR->pm = new (m) PacketMultiplexer(RR);
 	} catch ( ... ) {
 		if (RR->sa) {
 			RR->sa->~SelfAwareness();
@@ -160,6 +164,9 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64
 		if (RR->bc) {
 			RR->bc->~Bond();
 		}
+		if (RR->pm) {
+			RR->pm->~PacketMultiplexer();
+		}
 		::free(m);
 		throw;
 	}
@@ -191,6 +198,9 @@ Node::~Node()
 	if (RR->bc) {
 		RR->bc->~Bond();
 	}
+	if (RR->pm) {
+		RR->pm->~PacketMultiplexer();
+	}
 	::free(RR->rtmem);
 }
 

+ 1 - 1
node/Node.hpp

@@ -283,7 +283,7 @@ public:
 		return _lowBandwidthMode;
 	}
 
-private:
+public:
 	RuntimeEnvironment _RR;
 	RuntimeEnvironment *RR;
 	void *_uPtr; // _uptr (lower case) is reserved in Visual Studio :P

+ 125 - 0
node/PacketMultiplexer.cpp

@@ -0,0 +1,125 @@
+/*
+ * Copyright (c)2013-2021 ZeroTier, Inc.
+ *
+ * Use of this software is governed by the Business Source License included
+ * in the LICENSE.TXT file in the project's root directory.
+ *
+ * Change Date: 2026-01-01
+ *
+ * On the date above, in accordance with the Business Source License, use
+ * of this software will be governed by version 2.0 of the Apache License.
+ */
+/****/
+
+#include "PacketMultiplexer.hpp"
+
+#include "Node.hpp"
+#include "RuntimeEnvironment.hpp"
+
+#include <stdio.h>
+#include <stdlib.h>
+
+namespace ZeroTier {
+
+void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId)
+{
+	PacketRecord* packet;
+	_rxPacketVector_m.lock();
+	if (_rxPacketVector.empty()) {
+		packet = new PacketRecord;
+	}
+	else {
+		packet = _rxPacketVector.back();
+		_rxPacketVector.pop_back();
+	}
+	_rxPacketVector_m.unlock();
+
+	packet->tPtr = tPtr;
+	packet->nwid = nwid;
+	packet->nuptr = nuptr;
+	packet->source = source.toInt();
+	packet->dest = dest.toInt();
+	packet->etherType = etherType;
+	packet->vlanId = vlanId;
+	packet->len = len;
+	packet->flowId = flowId;
+	memcpy(packet->data, data, len);
+
+	int bucket = flowId % _concurrency;
+	//fprintf(stderr, "bucket=%d\n", bucket);
+	_rxPacketQueues[bucket]->postLimit(packet, 2048);
+}
+
+PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv)
+{
+	RR = renv;
+	bool _enablePinning = false;
+	char* pinningVar = std::getenv("ZT_CPU_PINNING");
+	if (pinningVar) {
+		int tmp = atoi(pinningVar);
+		if (tmp > 0) {
+			_enablePinning = true;
+		}
+	}
+
+    _concurrency = 1;
+	char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY");
+	if (concurrencyVar) {
+		int tmp = atoi(concurrencyVar);
+		if (tmp > 0) {
+			_concurrency = tmp;
+		}
+		else {
+			_concurrency = std::max((unsigned int)1, std::thread::hardware_concurrency() / 2);
+		}
+	}
+	else {
+		_concurrency = std::max((unsigned int)1, std::thread::hardware_concurrency() / 2);
+	}
+
+	for (unsigned int i = 0; i < _concurrency; ++i) {
+		fprintf(stderr, "reserved queue for thread %d\n", i);
+		_rxPacketQueues.push_back(new BlockingQueue<PacketRecord*>());
+	}
+
+	// Each thread picks from its own queue to feed into the core
+	for (unsigned int i = 0; i < _concurrency; ++i) {
+		_rxThreads.push_back(std::thread([this, i, _enablePinning]() {
+			fprintf(stderr, "created post-decode packet ingestion thread %d\n", i);
+
+			PacketRecord* packet = nullptr;
+			for (;;) {
+				if (! _rxPacketQueues[i]->get(packet)) {
+					break;
+				}
+				if (! packet) {
+					break;
+				}
+
+                //fprintf(stderr, "popped packet from queue %d\n", i);
+
+				MAC sourceMac = MAC(packet->source);
+				MAC destMac = MAC(packet->dest);
+
+				RR->node->putFrame(packet->tPtr, packet->nwid, packet->nuptr, sourceMac, destMac, packet->etherType, 0, (const void*)packet->data, packet->len);
+				{
+					Mutex::Lock l(_rxPacketVector_m);
+					_rxPacketVector.push_back(packet);
+				}
+				/*
+				if (ZT_ResultCode_isFatal(err)) {
+					char tmp[256];
+					OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err);
+					Mutex::Lock _l(_termReason_m);
+					_termReason = ONE_UNRECOVERABLE_ERROR;
+					_fatalErrorMessage = tmp;
+					this->terminate();
+					break;
+				}
+				*/
+			}
+		}));
+	}
+};
+
+}	// namespace ZeroTier

+ 62 - 0
node/PacketMultiplexer.hpp

@@ -0,0 +1,62 @@
+/*
+ * Copyright (c)2013-2021 ZeroTier, Inc.
+ *
+ * Use of this software is governed by the Business Source License included
+ * in the LICENSE.TXT file in the project's root directory.
+ *
+ * Change Date: 2026-01-01
+ *
+ * On the date above, in accordance with the Business Source License, use
+ * of this software will be governed by version 2.0 of the Apache License.
+ */
+/****/
+
+#ifndef ZT_PACKET_MULTIPLEXER_HPP
+#define ZT_PACKET_MULTIPLEXER_HPP
+
+#include "../osdep/BlockingQueue.hpp"
+#include "MAC.hpp"
+#include "Mutex.hpp"
+#include "RuntimeEnvironment.hpp"
+
+#include <thread>
+#include <vector>
+
+namespace ZeroTier {
+
+struct PacketRecord {
+	void* tPtr;
+	uint64_t nwid;
+	void** nuptr;
+	uint64_t source;
+	uint64_t dest;
+	unsigned int etherType;
+	unsigned int vlanId;
+	uint8_t data[ZT_MAX_MTU];
+	unsigned int len;
+	unsigned int flowId;
+};
+
+class PacketMultiplexer {
+  public:
+	const RuntimeEnvironment* RR;
+
+	PacketMultiplexer(const RuntimeEnvironment* renv);
+
+	void putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId);
+
+	std::vector<BlockingQueue<PacketRecord*>*> _rxPacketQueues;
+
+    unsigned int _concurrency;
+	// pool
+	std::vector<PacketRecord*> _rxPacketVector;
+	std::vector<std::thread> _rxPacketThreads;
+	Mutex _rxPacketVector_m, _rxPacketThreads_m;
+
+	std::vector<std::thread> _rxThreads;
+	unsigned int _rxThreadCount;
+};
+
+}	// namespace ZeroTier
+
+#endif	 // ZT_PACKET_MULTIPLEXER_HPP

+ 2 - 0
node/RuntimeEnvironment.hpp

@@ -31,6 +31,7 @@ class NetworkController;
 class SelfAwareness;
 class Trace;
 class Bond;
+class PacketMultiplexer;
 
 /**
  * Holds global state for an instance of ZeroTier::Node
@@ -77,6 +78,7 @@ public:
 	Topology *topology;
 	SelfAwareness *sa;
 	Bond *bc;
+	PacketMultiplexer *pm;
 
 	// This node's identity and string representations thereof
 	Identity identity;

+ 1 - 1
node/Switch.cpp

@@ -519,7 +519,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 						RR->node->putFrame(tPtr, network->id(), network->userPtr(), peerMac, from, ZT_ETHERTYPE_IPV6, 0, adv, 72);
 
 					}).detach();
-					
+
 					return; // NDP emulation done. We have forged a "fake" reply, so no need to send actual NDP query.
 				} // else no NDP emulation
 			} // else no NDP emulation

+ 2 - 1
objects.mk

@@ -29,7 +29,8 @@ CORE_OBJS=\
 	node/Topology.o \
 	node/Trace.o \
 	node/Utils.o \
-	node/Bond.o
+	node/Bond.o \
+	node/PacketMultiplexer.o
 
 ONE_OBJS=\
 	controller/EmbeddedNetworkController.o \

+ 2 - 3
osdep/EthernetTap.cpp

@@ -58,7 +58,6 @@ namespace ZeroTier {
 std::shared_ptr<EthernetTap> EthernetTap::newInstance(
 	const char *tapDeviceType, // OS-specific, NULL for default
 	const char *homePath,
-	unsigned int concurrency,
 	const MAC &mac,
 	unsigned int mtu,
 	unsigned int metric,
@@ -89,11 +88,11 @@ std::shared_ptr<EthernetTap> EthernetTap::newInstance(
 				return std::shared_ptr<EthernetTap>(new MacEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
 			}
 		}
-	}
+	}/
 #endif // __APPLE__
 
 #ifdef __LINUX__
-	return std::shared_ptr<EthernetTap>(new LinuxEthernetTap(homePath,concurrency,mac,mtu,metric,nwid,friendlyName,handler,arg));
+	return std::shared_ptr<EthernetTap>(new LinuxEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
 #endif // __LINUX__
 
 #ifdef __WINDOWS__

+ 0 - 1
osdep/EthernetTap.hpp

@@ -33,7 +33,6 @@ public:
 	static std::shared_ptr<EthernetTap> newInstance(
 		const char *tapDeviceType, // OS-specific, NULL for default
 		const char *homePath,
-		unsigned int concurrency,
 		const MAC &mac,
 		unsigned int mtu,
 		unsigned int metric,

+ 18 - 5
osdep/LinuxEthernetTap.cpp

@@ -111,7 +111,6 @@ static void _base32_5_to_8(const uint8_t *in,char *out)
 
 LinuxEthernetTap::LinuxEthernetTap(
 	const char *homePath,
-	unsigned int concurrency,
 	const MAC &mac,
 	unsigned int mtu,
 	unsigned int metric,
@@ -128,7 +127,6 @@ LinuxEthernetTap::LinuxEthernetTap(
 	_fd(0),
 	_enabled(true),
 	_run(true),
-	_concurrency(concurrency),
 	_lastIfAddrsUpdate(0)
 {
 	static std::mutex s_tapCreateLock;
@@ -231,12 +229,27 @@ LinuxEthernetTap::LinuxEthernetTap(
 		}
 	}
 
+	int _concurrency = 1;
+	char* concurrencyVar = std::getenv("ZT_PACKET_PROCESSING_CONCURRENCY");
+	if (concurrencyVar) {
+		int tmp = atoi(concurrencyVar);
+		if (tmp > 0) {
+			_concurrency = tmp;
+		}
+		else {
+			_concurrency = std::max((unsigned int)1,std::thread::hardware_concurrency() / 2);
+		}
+	}
+	else {
+		_concurrency = std::max((unsigned int)1,std::thread::hardware_concurrency() / 2);
+	}
+
 	for (unsigned int i = 0; i < _concurrency; ++i) {
-		_rxThreads.push_back(std::thread([this, i, _enablePinning] {
+		_rxThreads.push_back(std::thread([this, i, _concurrency, _enablePinning] {
 
 			if (_enablePinning) {
 				int pinCore = i % _concurrency;
-				fprintf(stderr, "pinning thread %d to core %d\n", i, pinCore);
+				fprintf(stderr, "pinning tap thread %d to core %d\n", i, pinCore);
 				pthread_t self = pthread_self();
 				cpu_set_t cpuset;
 				CPU_ZERO(&cpuset);
@@ -244,7 +257,7 @@ LinuxEthernetTap::LinuxEthernetTap(
 				int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
 				if (rc != 0)
 				{
-					fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno));
+					fprintf(stderr, "failed to pin tap thread %d to core %d: %s\n", i, pinCore, strerror(errno));
 					exit(1);
 				}
 			}

+ 0 - 2
osdep/LinuxEthernetTap.hpp

@@ -35,7 +35,6 @@ class LinuxEthernetTap : public EthernetTap
 public:
 	LinuxEthernetTap(
 		const char *homePath,
-		unsigned int _concurrency,
 		const MAC &mac,
 		unsigned int mtu,
 		unsigned int metric,
@@ -68,7 +67,6 @@ private:
 	std::string _dev;
 	std::vector<MulticastGroup> _multicastGroups;
 	unsigned int _mtu;
-	unsigned int _concurrency;
 	int _fd;
 	int _shutdownSignalPipe[2];
 	std::atomic_bool _enabled;

+ 2 - 2
service/OneService.cpp

@@ -46,6 +46,7 @@
 #include "../node/SHA512.hpp"
 #include "../node/Bond.hpp"
 #include "../node/Peer.hpp"
+#include "../node/PacketMultiplexer.hpp"
 
 #include "../osdep/Phy.hpp"
 #include "../osdep/OSUtils.hpp"
@@ -986,7 +987,7 @@ public:
 #if defined(__LINUX__) || defined(__FreeBSD__) /* || defined(__APPLE__) */
 					if (rc != 0)
 					{
-						fprintf(stderr, "failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno));
+						fprintf(stderr, "failed to pin rx thread %d to core %d: %s\n", i, pinCore, strerror(errno));
 						exit(1);
 					}
 #endif
@@ -3131,7 +3132,6 @@ public:
 						n.setTap(EthernetTap::newInstance(
 							nullptr,
 							_homePath.c_str(),
-							_enableMulticore ? _rxThreadCount : 1,
 							MAC(nwc->mac),
 							nwc->mtu,
 							(unsigned int)ZT_IF_METRIC,