| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 | /* * Copyright (c)2013-2021 ZeroTier, Inc. * * Use of this software is governed by the Business Source License included * in the LICENSE.TXT file in the project's root directory. * * Change Date: 2026-01-01 * * On the date above, in accordance with the Business Source License, use * of this software will be governed by version 2.0 of the Apache License. *//****/#include "PacketMultiplexer.hpp"#include "Constants.hpp"#include "Node.hpp"#include "RuntimeEnvironment.hpp"#include <stdio.h>#include <stdlib.h>namespace ZeroTier {PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv){	RR = renv;};void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId){#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)	RR->node->putFrame(tPtr, nwid, nuptr, source, dest, etherType, vlanId, (const void*)data, len);	return;#endif	if (! _enabled) {		RR->node->putFrame(tPtr, nwid, nuptr, source, dest, etherType, vlanId, (const void*)data, len);		return;	}	PacketRecord* packet;	_rxPacketVector_m.lock();	if (_rxPacketVector.empty()) {		packet = new PacketRecord;	}	else {		packet = _rxPacketVector.back();		_rxPacketVector.pop_back();	}	_rxPacketVector_m.unlock();	packet->tPtr = tPtr;	packet->nwid = nwid;	packet->nuptr = nuptr;	packet->source = source.toInt();	packet->dest = dest.toInt();	packet->etherType = etherType;	packet->vlanId = vlanId;	packet->len = len;	packet->flowId = flowId;	memcpy(packet->data, data, len);	int bucket = flowId % _concurrency;	_rxPacketQueues[bucket]->postLimit(packet, 2048);}void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled){#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)	return;#endif	_enabled = true;	_concurrency = concurrency;	bool _enablePinning = cpuPinningEnabled;	for (unsigned int i = 0; i < _concurrency; ++i) {		fprintf(stderr, "Reserved queue for thread %d\n", i);		_rxPacketQueues.push_back(new BlockingQueue<PacketRecord*>());	}	// Each thread picks from its own queue to feed into the core	for (unsigned int i = 0; i < _concurrency; ++i) {		_rxThreads.push_back(std::thread([this, i, _enablePinning]() {			fprintf(stderr, "Created post-decode packet ingestion thread %d\n", i);			PacketRecord* packet = nullptr;			for (;;) {				if (! _rxPacketQueues[i]->get(packet)) {					break;				}				if (! packet) {					break;				}				// fprintf(stderr, "popped packet from queue %d\n", i);				MAC sourceMac = MAC(packet->source);				MAC destMac = MAC(packet->dest);				RR->node->putFrame(packet->tPtr, packet->nwid, packet->nuptr, sourceMac, destMac, packet->etherType, 0, (const void*)packet->data, packet->len);				{					Mutex::Lock l(_rxPacketVector_m);					_rxPacketVector.push_back(packet);				}				/*				if (ZT_ResultCode_isFatal(err)) {					char tmp[256];					OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err);					Mutex::Lock _l(_termReason_m);					_termReason = ONE_UNRECOVERABLE_ERROR;					_fatalErrorMessage = tmp;					this->terminate();					break;				}				*/			}		}));	}}}	// namespace ZeroTier
 |