瀏覽代碼

Merge pull request #2363 from zerotier/dev

1.14.1 merge dev to main
Adam Ierymenko 1 年之前
父節點
當前提交
8140264cb6
共有 46 個文件被更改,包括 1170 次插入462 次删除
  1. 6 0
      .clangd
  2. 39 14
      .github/workflows/build.yml
  3. 1 0
      .gitignore
  4. 1 0
      OFFICIAL-RELEASE-STEPS.md
  5. 1 1
      README.md
  6. 16 0
      RELEASE-NOTES.md
  7. 18 0
      controller/DB.cpp
  8. 4 2
      controller/EmbeddedNetworkController.cpp
  9. 29 5
      controller/PostgreSQL.cpp
  10. 6 0
      debian/changelog
  11. 1 1
      ext/installfiles/mac/ZeroTier One.pkgproj
  12. 57 1
      java/jni/com_zerotierone_sdk_Node.cpp
  13. 2 2
      make-linux.mk
  14. 2 2
      make-mac.mk
  15. 26 15
      node/Bond.cpp
  16. 2 0
      node/Bond.hpp
  17. 66 0
      node/Constants.hpp
  18. 60 61
      node/IncomingPacket.cpp
  19. 1 0
      node/Network.cpp
  20. 3 1
      node/NetworkConfig.hpp
  21. 17 2
      node/Node.cpp
  22. 4 1
      node/Node.hpp
  23. 122 0
      node/PacketMultiplexer.cpp
  24. 65 0
      node/PacketMultiplexer.hpp
  25. 2 0
      node/RuntimeEnvironment.hpp
  26. 1 1
      node/Switch.cpp
  27. 14 0
      node/Utils.hpp
  28. 2 1
      objects.mk
  29. 69 38
      osdep/BSDEthernetTap.cpp
  30. 6 0
      osdep/BSDEthernetTap.hpp
  31. 4 2
      osdep/EthernetTap.cpp
  32. 2 0
      osdep/EthernetTap.hpp
  33. 123 101
      osdep/LinuxEthernetTap.cpp
  34. 4 4
      osdep/LinuxEthernetTap.hpp
  35. 1 1
      osdep/MacEthernetTapAgent.c
  36. 3 1
      osdep/MacKextEthernetTap.cpp
  37. 2 0
      osdep/MacKextEthernetTap.hpp
  38. 5 0
      pkg/synology/dsm7-docker/README.md
  39. 10 12
      pkg/synology/dsm7-docker/build.sh
  40. 279 169
      rustybits/Cargo.lock
  41. 1 1
      rustybits/zeroidc.vcxproj
  42. 87 21
      service/OneService.cpp
  43. 1 1
      version.h
  44. 1 0
      windows/ZeroTierOne/ZeroTierOne.vcxproj
  45. 3 0
      windows/ZeroTierOne/ZeroTierOne.vcxproj.filters
  46. 1 1
      zerotier-one.spec

+ 6 - 0
.clangd

@@ -0,0 +1,6 @@
+CompileFlags:
+  Add:
+    - "-std=c++17"
+    - "-I../ext"
+    - "-I../ext/prometheus-cpp-lite-1.0/core/include"
+    - "-I../ext/prometheus-cpp-lite-1.0/simpleapi/include"

+ 39 - 14
.github/workflows/build.yml

@@ -9,13 +9,12 @@ jobs:
         git config --global core.autocrlf input
       #        git config --global core.eol lf
     - name: checkout
-      uses: actions/checkout@v3
+      uses: actions/checkout@v4
     - name: Install Rust
-      uses: actions-rs/toolchain@v1
+      uses: dtolnay/rust-toolchain@stable
       with:
         toolchain: stable
-        target: x86_64-unknown-linux-gnu
-        override: true
+        targets: x86_64-unknown-linux-gnu
         components: rustfmt, clippy
 
     - name: Set up cargo cache
@@ -33,6 +32,14 @@ jobs:
       run: |
         make selftest
         ./zerotier-selftest
+    - name: 'Tar files' # keeps permissions (execute)
+      run: tar -cvf zerotier-one.tar zerotier-one
+    - name: Archive production artifacts
+      uses: actions/upload-artifact@v4
+      with:
+        name: zerotier-one-ubuntu-x64
+        path: zerotier-one.tar
+        retention-days: 7
 
   build_macos:
     runs-on: macos-latest
@@ -42,13 +49,18 @@ jobs:
         git config --global core.autocrlf input
       #        git config --global core.eol lf
     - name: checkout
-      uses: actions/checkout@v3
-    - name: Install Rust
-      uses: actions-rs/toolchain@v1
+      uses: actions/checkout@v4
+    - name: Install Rust aarch64
+      uses: dtolnay/rust-toolchain@stable
       with:
         toolchain: stable
         target: aarch64-apple-darwin
-        override: true
+        components: rustfmt, clippy
+    - name: Install Rust x86_64
+      uses: dtolnay/rust-toolchain@stable
+      with:
+        toolchain: stable
+        target: x86_64-apple-darwin
         components: rustfmt, clippy
     - name: Set up cargo cache
       uses: Swatinem/rust-cache@v2
@@ -58,13 +70,21 @@ jobs:
         shared-key: ${{ runner.os }}-cargo-
         workspaces: |
           rustybits/
-
     - name: make
       run: make
     - name: selftest
       run: |
         make selftest
         ./zerotier-selftest
+    - name: 'Tar files' # keeps permissions (execute)
+      run: tar -cvf zerotier-one.tar zerotier-one
+    - name: Archive production artifacts
+      uses: actions/upload-artifact@v4
+      with:
+        name: zerotier-one-mac
+        path: zerotier-one.tar
+        retention-days: 7
+
 
   build_windows:
     runs-on: windows-latest
@@ -74,13 +94,12 @@ jobs:
         git config --global core.autocrlf true
       #        git config --global core.eol lf
     - name: checkout
-      uses: actions/checkout@v3
+      uses: actions/checkout@v4
     - name: Install Rust
-      uses: actions-rs/toolchain@v1
+      uses: dtolnay/rust-toolchain@stable
       with:
         toolchain: stable
         target: aarch64-apple-darwin
-        override: true
         components: rustfmt, clippy
     - name: Set up cargo cache
       uses: Swatinem/rust-cache@v2
@@ -92,7 +111,13 @@ jobs:
           rustybits/
 
     - name: setup msbuild
-      uses: microsoft/setup-msbuild@v1.1.3
+      uses: microsoft/setup-msbuild@v2
     - name: msbuild
       run: |
-        msbuild windows\ZeroTierOne.sln /m /p:Configuration=Release  /property:Platform=x64 /t:ZeroTierOne        
+        msbuild windows\ZeroTierOne.sln /m /p:Configuration=Release  /property:Platform=x64 /t:ZeroTierOne
+    - name: Archive production artifacts
+      uses: actions/upload-artifact@v4
+      with:
+        name: zerotier-one-windows
+        path: windows/Build
+        retention-days: 7

+ 1 - 0
.gitignore

@@ -124,6 +124,7 @@ attic/world/mkworld
 workspace/
 workspace2/
 zeroidc/target/
+tcp-proxy/target
 
 #snapcraft specifics
 /parts/

+ 1 - 0
OFFICIAL-RELEASE-STEPS.md

@@ -14,6 +14,7 @@ The version must be incremented in all of the following files:
     /debian/changelog
     /ext/installfiles/mac/ZeroTier One.pkgproj
     /ext/installfiles/windows/ZeroTier One.aip
+  ../DesktopUI/mac-app-template/ZeroTier.app/Contents/Info.plist
 
 The final .AIP file can only be edited on Windows with [Advanced Installer Enterprise](http://www.advancedinstaller.com/). In addition to incrementing the version be sure that a new product code is generated. (The "upgrade code" GUID on the other hand must never change.)
 

+ 1 - 1
README.md

@@ -58,7 +58,7 @@ To build on Mac and Linux just type `make`. On FreeBSD and OpenBSD `gmake` (GNU
    - Xcode command line tools for macOS 10.13 or newer are required.
    - Rust for x86_64 and ARM64 targets *if SSO is enabled in the build*.
  - **Linux**
-   - The minimum compiler versions required are GCC/G++ 4.9.3 or CLANG/CLANG++ 3.4.2. (Install `clang` on CentOS 7 as G++ is too old.)
+   - The minimum compiler versions required are GCC/G++ 8.x or CLANG/CLANG++ 5.x.
    - Linux makefiles automatically detect and prefer clang/clang++ if present as it produces smaller and slightly faster binaries in most cases. You can override by supplying CC and CXX variables on the make command line.
    - Rust for x86_64 and ARM64 targets *if SSO is enabled in the build*.
  - **Windows**

+ 16 - 0
RELEASE-NOTES.md

@@ -1,6 +1,22 @@
 ZeroTier Release Notes
 ======
 
+# 2024-09-12 -- Version 1.14.1
+
+  * Multithreaded packet I/O support! Currently this is just for Linux and must
+    be enabled in local.conf. It will likely make the largest difference on small
+    multi-core devices where CPU is a bottleneck and high throughput is desired.
+    It may be enabled by default in the future but we want it to be thoroughly
+    tested. It's a little harder than it seems at first glance due to the need
+    to keep packets in sequence and balance load.
+  * Several multipath bug fixes.
+  * Updated the versions on a number of libraries related to OIDC support and HTTP.
+  * MacOS .app now shows the correct version in its Info.plist manifest.
+  * Sanitize MAC addresses in JSON format rules parser.
+  * Some basic information about the platform (OS, CPU architecture) is now reported
+    to network controllers when networks are joined so it can be displayed to
+    network admins and in the future used in policy checking and inventory operations.
+
 # 2024-05-02 -- Version 1.14.0
 
   * Linux I/O performance improvements under heavy load

+ 18 - 0
controller/DB.cpp

@@ -382,6 +382,24 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool
 		const std::string ids = old["id"];
 		const uint64_t networkId = Utils::hexStrToU64(ids.c_str());
 		if (networkId) {
+			try {
+				// deauth all members on the network
+				nlohmann::json network;
+				std::vector<nlohmann::json> members;
+				this->get(networkId, network, members);
+				for(auto i=members.begin();i!=members.end();++i) {
+					const std::string nodeID = (*i)["id"];
+					const uint64_t memberId = Utils::hexStrToU64(nodeID.c_str());
+					std::unique_lock<std::shared_mutex> ll(_changeListeners_l);
+					for(auto j=_changeListeners.begin();j!=_changeListeners.end();++j) {
+						(*j)->onNetworkMemberDeauthorize(this,networkId,memberId);
+					}
+				}
+			} catch (std::exception &e) {
+				std::cerr << "Error deauthorizing members on network delete: " << e.what() << std::endl;
+			}
+
+			// delete the network
 			std::unique_lock<std::shared_mutex> l(_networks_l);
 			_networks.erase(networkId);
 		}

+ 4 - 2
controller/EmbeddedNetworkController.cpp

@@ -315,12 +315,14 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule)
 		return true;
 	} else if (t == "MATCH_MAC_SOURCE") {
 		rule.t |= ZT_NETWORK_RULE_MATCH_MAC_SOURCE;
-		const std::string mac(OSUtils::jsonString(r["mac"],"0"));
+		std::string mac(OSUtils::jsonString(r["mac"],"0"));
+		Utils::cleanMac(mac);
 		Utils::unhex(mac.c_str(),(unsigned int)mac.length(),rule.v.mac,6);
 		return true;
 	} else if (t == "MATCH_MAC_DEST") {
 		rule.t |= ZT_NETWORK_RULE_MATCH_MAC_DEST;
-		const std::string mac(OSUtils::jsonString(r["mac"],"0"));
+		std::string mac(OSUtils::jsonString(r["mac"],"0"));
+		Utils::cleanMac(mac);
 		Utils::unhex(mac.c_str(),(unsigned int)mac.length(),rule.v.mac,6);
 		return true;
 	} else if (t == "MATCH_IPV4_SOURCE") {

+ 29 - 5
controller/PostgreSQL.cpp

@@ -780,11 +780,25 @@ void PostgreSQL::initializeNetworks()
 				fprintf(stderr, "adding networks to redis...\n");
 				if (_rc->clusterMode) {
 					auto tx = _cluster->transaction(_myAddressStr, true, false);
-					tx.sadd(setKey, networkSet.begin(), networkSet.end());
+					uint64_t count = 0;
+					for (std::string nwid : networkSet) {
+						tx.sadd(setKey, nwid);
+						if (++count % 30000 == 0) {
+							tx.exec();
+							tx = _cluster->transaction(_myAddressStr, true, false);
+						}
+					}
 					tx.exec();
 				} else {
 					auto tx = _redis->transaction(true, false);
-					tx.sadd(setKey, networkSet.begin(), networkSet.end());
+					uint64_t count = 0;
+					for (std::string nwid : networkSet) {
+						tx.sadd(setKey, nwid);
+						if (++count % 30000 == 0) {
+							tx.exec();
+							tx = _redis->transaction(true, false);
+						}
+					}
 					tx.exec();
 				}
 				fprintf(stderr, "done.\n");
@@ -1005,14 +1019,24 @@ void PostgreSQL::initializeMembers()
 				fprintf(stderr, "Load member data into redis...\n");
 				if (_rc->clusterMode) {
 					auto tx = _cluster->transaction(_myAddressStr, true, false);
+					uint64_t count = 0;
 					for (auto it : networkMembers) {
 						tx.sadd(it.first, it.second);
+						if (++count % 30000 == 0) {
+							tx.exec();
+							tx = _cluster->transaction(_myAddressStr, true, false);
+						}
 					}
 					tx.exec();
 				} else {
 					auto tx = _redis->transaction(true, false);
+					uint64_t count = 0;
 					for (auto it : networkMembers) {
 						tx.sadd(it.first, it.second);
+						if (++count % 30000 == 0) {
+							tx.exec();
+							tx = _redis->transaction(true, false);
+						}
 					}
 					tx.exec();
 				}
@@ -1180,7 +1204,7 @@ void PostgreSQL::_membersWatcher_Redis() {
 									_memberChanged(oldConfig,newConfig,(this->_ready >= 2));
 								}
 							} catch (...) {
-								fprintf(stderr, "json parse error in networkWatcher_Redis\n");
+								fprintf(stderr, "json parse error in _membersWatcher_Redis: %s\n", a.second.c_str());
 							}
 						}
 						if (_rc->clusterMode) {
@@ -1269,8 +1293,8 @@ void PostgreSQL::_networksWatcher_Redis() {
 								if (oldConfig.is_object()||newConfig.is_object()) {
 									_networkChanged(oldConfig,newConfig,(this->_ready >= 2));
 								}
-							} catch (...) {
-								fprintf(stderr, "json parse error in networkWatcher_Redis\n");
+							} catch (std::exception &e) {
+								fprintf(stderr, "json parse error in networkWatcher_Redis: what: %s json: %s\n", e.what(), a.second.c_str());
 							}
 						}
 						if (_rc->clusterMode) {

+ 6 - 0
debian/changelog

@@ -1,3 +1,9 @@
+zerotier-one (1.14.1) unstable; urgency=medium
+
+  * See RELEASE-NOTES.md for release notes.
+
+ -- Adam Ierymenko <[email protected]>  Wed, 11 Sep 2024 01:00:00 -0700
+
 zerotier-one (1.14.0) unstable; urgency=medium
 
   * See RELEASE-NOTES.md for release notes.

+ 1 - 1
ext/installfiles/mac/ZeroTier One.pkgproj

@@ -701,7 +701,7 @@
 				<key>USE_HFS+_COMPRESSION</key>
 				<false/>
 				<key>VERSION</key>
-				<string>1.14.0</string>
+				<string>1.14.1</string>
 			</dict>
 			<key>TYPE</key>
 			<integer>0</integer>

+ 57 - 1
java/jni/com_zerotierone_sdk_Node.cpp

@@ -111,6 +111,44 @@ namespace {
         bool finishInitializing();
     };
 
+    //
+    // RAII construct for calling AttachCurrentThread and DetachCurrent automatically
+    //
+    struct ScopedJNIThreadAttacher {
+
+        JavaVM *jvm;
+        JNIEnv **env_p;
+        jint getEnvRet;
+
+        ScopedJNIThreadAttacher(JavaVM *jvmIn, JNIEnv **env_pIn, jint getEnvRetIn) :
+        jvm(jvmIn),
+        env_p(env_pIn),
+        getEnvRet(getEnvRetIn) {
+
+            if (getEnvRet != JNI_EDETACHED) {
+                return;
+            }
+
+            jint attachCurrentThreadRet;
+            if ((attachCurrentThreadRet = jvm->AttachCurrentThread(env_p, NULL)) != JNI_OK) {
+                LOGE("Error calling AttachCurrentThread: %d", attachCurrentThreadRet);
+                assert(false && "Error calling AttachCurrentThread");
+            }
+        }
+
+        ~ScopedJNIThreadAttacher() {
+
+            if (getEnvRet != JNI_EDETACHED) {
+                return;
+            }
+
+            jint detachCurrentThreadRet;
+            if ((detachCurrentThreadRet = jvm->DetachCurrentThread()) != JNI_OK) {
+                LOGE("Error calling DetachCurrentThread: %d", detachCurrentThreadRet);
+                assert(false && "Error calling DetachCurrentThread");
+            }
+        }
+    };
 
     /*
     * This must return 0 on success. It can return any OS-dependent error code
@@ -194,7 +232,25 @@ namespace {
         assert(ref);
         assert(ref->node == node);
         JNIEnv *env;
-        GETENV(env, ref->jvm);
+        
+        jint getEnvRet;
+        assert(ref->jvm);
+        getEnvRet = ref->jvm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION_1_6);
+
+        if (!(getEnvRet == JNI_OK || getEnvRet == JNI_EDETACHED)) {
+            LOGE("Error calling GetEnv: %d", getEnvRet);
+            assert(false && "Error calling GetEnv");
+        }
+
+        //
+        // Thread might actually be detached.
+        //
+        // e.g:
+        // https://github.com/zerotier/ZeroTierOne/blob/91e7ce87f09ac1cfdeaf6ff22c3cedcd93574c86/node/Switch.cpp#L519
+        //
+        // Make sure to attach if needed
+        //
+        ScopedJNIThreadAttacher attacher{ref->jvm, &env, getEnvRet};
 
         if (env->ExceptionCheck()) {
             LOGE("Unhandled pending exception");

+ 2 - 2
make-linux.mk

@@ -71,7 +71,7 @@ else
 	override CFLAGS+=-Wall -Wno-deprecated -pthread $(INCLUDES) -DNDEBUG $(DEFS)
 	CXXFLAGS?=-O3 -fstack-protector
 	override CXXFLAGS+=-Wall -Wno-deprecated -std=c++17 -pthread $(INCLUDES) -DNDEBUG $(DEFS)
-	LDFLAGS=-pie -Wl,-z,relro,-z,now
+	LDFLAGS?=-pie -Wl,-z,relro,-z,now
 	ZT_CARGO_FLAGS=--release
 endif
 
@@ -364,7 +364,7 @@ override CFLAGS+=-fPIC -fPIE
 override CXXFLAGS+=-fPIC -fPIE
 
 # Non-executable stack
-override ASFLAGS+=--noexecstack
+override LDFLAGS+=-Wl,-z,noexecstack
 
 .PHONY: all
 all:	one

+ 2 - 2
make-mac.mk

@@ -1,8 +1,8 @@
 CC=clang
 CXX=clang++
-TOPDIR=$(shell PWD)
+TOPDIR=$(shell pwd)
 
-INCLUDES=-I$(shell PWD)/rustybits/target -isystem $(TOPDIR)/ext  -I$(TOPDIR)/ext/prometheus-cpp-lite-1.0/core/include -I$(TOPDIR)/ext-prometheus-cpp-lite-1.0/3rdparty/http-client-lite/include -I$(TOPDIR)/ext/prometheus-cpp-lite-1.0/simpleapi/include
+INCLUDES=-I$(shell pwd)/rustybits/target -isystem $(TOPDIR)/ext  -I$(TOPDIR)/ext/prometheus-cpp-lite-1.0/core/include -I$(TOPDIR)/ext-prometheus-cpp-lite-1.0/3rdparty/http-client-lite/include -I$(TOPDIR)/ext/prometheus-cpp-lite-1.0/simpleapi/include
 DEFS=
 LIBS=
 ARCH_FLAGS=-arch x86_64 -arch arm64 

+ 26 - 15
node/Bond.cpp

@@ -373,6 +373,7 @@ SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
 	 */
 	if (_policy == ZT_BOND_POLICY_ACTIVE_BACKUP) {
 		if (_abPathIdx != ZT_MAX_PEER_NETWORK_PATHS && _paths[_abPathIdx].p) {
+			//fprintf(stderr, "trying to send via (_abPathIdx=%d) %s\n", _abPathIdx, pathToStr(_paths[_abPathIdx].p).c_str());
 			return _paths[_abPathIdx].p;
 		}
 	}
@@ -1032,6 +1033,13 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
 		bool satisfiedUpDelay = (now - _paths[i].lastAliveToggle) >= _upDelay;
 		// How long since the last QoS was received (Must be less than ZT_PEER_PATH_EXPIRATION since the remote peer's _qosSendInterval isn't known)
 		bool acceptableQoSAge = (_paths[i].lastQoSReceived == 0 && inTrial) || ((now - _paths[i].lastQoSReceived) < ZT_PEER_EXPIRED_PATH_TRIAL_PERIOD);
+
+		// Allow active-backup to operate without the receipt of QoS records
+		// This may be expanded to the other modes as an option
+		if (_policy == ZT_BOND_POLICY_ACTIVE_BACKUP) {
+			acceptableQoSAge = true;
+		}
+
 		currEligibility = _paths[i].allowed() && ((acceptableAge && satisfiedUpDelay && acceptableQoSAge) || inTrial);
 
 		if (currEligibility) {
@@ -1043,12 +1051,11 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
 		 */
 		if (currEligibility != _paths[i].eligible) {
 			if (currEligibility == 0) {
-				log("link %s is no longer eligible", pathToStr(_paths[i].p).c_str());
+				log("link %s is no longer eligible (reason: allowed=%d, age=%d, ud=%d, qos=%d, trial=%d)", pathToStr(_paths[i].p).c_str(), _paths[i].allowed(), acceptableAge, satisfiedUpDelay, acceptableQoSAge, inTrial);
 			}
 			if (currEligibility == 1) {
 				log("link %s is eligible", pathToStr(_paths[i].p).c_str());
 			}
-			debug("\t[%d] allowed=%d, age=%d, qa=%d, ud=%d, trial=%d", i, _paths[i].allowed(), acceptableAge, acceptableQoSAge, satisfiedUpDelay, inTrial);
 			dumpPathStatus(now, i);
 			if (currEligibility) {
 				rebuildBond = true;
@@ -1496,7 +1503,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
 {
 	int prevActiveBackupPathIdx = _abPathIdx;
 	int nonPreferredPathIdx = ZT_MAX_PEER_NETWORK_PATHS;
-	bool bFoundPrimaryLink = false;
+	bool foundPathOnPrimaryLink = false;
+	bool foundPreferredPath = false;
 
 	if (_abPathIdx != ZT_MAX_PEER_NETWORK_PATHS && ! _paths[_abPathIdx].p) {
 		_abPathIdx = ZT_MAX_PEER_NETWORK_PATHS;
@@ -1559,15 +1567,16 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
 							if (! _paths[i].preferred()) {
 								// Found path on primary link, take note in case we don't find a preferred path
 								nonPreferredPathIdx = i;
-								bFoundPrimaryLink = true;
+								foundPathOnPrimaryLink = true;
 							}
 							if (_paths[i].preferred()) {
 								_abPathIdx = i;
-								bFoundPrimaryLink = true;
+								foundPathOnPrimaryLink = true;
 								if (_paths[_abPathIdx].p) {
 									SharedPtr<Link> abLink = RR->bc->getLinkBySocket(_policyAlias, _paths[_abPathIdx].p->localSocket());
 									if (abLink) {
-										log("found preferred primary link %s", pathToStr(_paths[_abPathIdx].p).c_str());
+										log("found preferred primary link (_abPathIdx=%d), %s", _abPathIdx, pathToStr(_paths[_abPathIdx].p).c_str());
+										foundPreferredPath = true;
 									}
 									break;	 // Found preferred path on primary link
 								}
@@ -1575,8 +1584,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
 						}
 					}
 				}
-				if (bFoundPrimaryLink && (nonPreferredPathIdx != ZT_MAX_PEER_NETWORK_PATHS)) {
-					log("found non-preferred primary link");
+				if (!foundPreferredPath && foundPathOnPrimaryLink && (nonPreferredPathIdx != ZT_MAX_PEER_NETWORK_PATHS)) {
+					log("found non-preferred primary link (_abPathIdx=%d)", _abPathIdx);
 					_abPathIdx = nonPreferredPathIdx;
 				}
 			}
@@ -1614,10 +1623,10 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
 		}
 		if (_paths[(*it)].p && ! _paths[(*it)].eligible) {
 			SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[(*it)].p->localSocket());
-			it = _abFailoverQueue.erase(it);
 			if (link) {
-				log("link %s is ineligible, removing from failover queue (%zu links remain in queue)", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size());
+				log("link %s is ineligible, removing from failover queue (%zu links remain in queue)", pathToStr(_paths[(*it)].p).c_str(), _abFailoverQueue.size());
 			}
+			it = _abFailoverQueue.erase(it);
 			continue;
 		}
 		else {
@@ -1684,7 +1693,7 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
 						}
 					}
 					if (! bFoundPathInQueue) {
-						_abFailoverQueue.push_front(i);
+						_abFailoverQueue.push_back(i);
 						log("add link %s to failover queue (%zu links in queue)", pathToStr(_paths[i].p).c_str(), _abFailoverQueue.size());
 						addPathToBond(i, 0);
 					}
@@ -1734,13 +1743,14 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
 					}
 				}
 				if (! bFoundPathInQueue) {
-					_abFailoverQueue.push_front(i);
+					_abFailoverQueue.push_back(i);
 					log("add link %s to failover queue (%zu links in queue)", pathToStr(_paths[i].p).c_str(), _abFailoverQueue.size());
 					addPathToBond(i, 0);
 				}
 			}
 		}
 	}
+	/*
 	// Sort queue based on performance
 	if (! _abFailoverQueue.empty()) {
 		for (int i = 0; i < _abFailoverQueue.size(); i++) {
@@ -1752,7 +1762,7 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
 			}
 			_abFailoverQueue[hole_position] = value_to_insert;
 		}
-	}
+	}*/
 
 	/**
 	 * Short-circuit if we have no queued paths
@@ -1902,7 +1912,7 @@ void Bond::setBondParameters(int policy, SharedPtr<Bond> templateBond, bool useT
 	 * Policy defaults
 	 */
 	_abPathIdx = ZT_MAX_PEER_NETWORK_PATHS;
-	_abLinkSelectMethod = ZT_BOND_RESELECTION_POLICY_OPTIMIZE;
+	_abLinkSelectMethod = ZT_BOND_RESELECTION_POLICY_ALWAYS;
 	_rrPacketsSentOnCurrLink = 0;
 	_rrIdx = 0;
 	_packetsPerLink = 64;
@@ -2021,7 +2031,8 @@ void Bond::dumpInfo(int64_t now, bool force)
 	_lastSummaryDump = now;
 	float overhead = (_overheadBytes / (timeSinceLastDump / 1000.0f) / 1000.0f);
 	_overheadBytes = 0;
-	log("bond: bp=%d, fi=%" PRIu64 ", mi=%d, ud=%d, dd=%d, flows=%zu, leaf=%d, overhead=%f KB/s, links=(%d/%d)",
+	log("bond: ready=%d, bp=%d, fi=%" PRIu64 ", mi=%d, ud=%d, dd=%d, flows=%zu, leaf=%d, overhead=%f KB/s, links=(%d/%d)",
+		isReady(),
 		_policy,
 		_failoverInterval,
 		_monitorInterval,

+ 2 - 0
node/Bond.hpp

@@ -1144,6 +1144,7 @@ class Bond {
 		__attribute__((format(printf, 2, 3)))
 #endif
 	{
+		//if (_peerId != 0x0 && _peerId != 0x0) { return; }
 #ifdef ZT_TRACE
 		time_t rawtime;
 		struct tm* timeinfo;
@@ -1175,6 +1176,7 @@ class Bond {
 		__attribute__((format(printf, 2, 3)))
 #endif
 	{
+		//if (_peerId != 0x0 && _peerId != 0x0) { return; }
 #ifdef ZT_DEBUG
 		time_t rawtime;
 		struct tm* timeinfo;

+ 66 - 0
node/Constants.hpp

@@ -202,6 +202,72 @@
 #define ZT_PACKED_STRUCT(D) D __attribute__((packed))
 #endif
 
+#if defined(_WIN32)
+#define ZT_PLATFORM_NAME "windows" // Windows
+#elif defined(_WIN64)
+#define ZT_PLATFORM_NAME "windows" // Windows
+#elif defined(__CYGWIN__)
+#define ZT_PLATFORM_NAME "windows" // Windows (Cygwin POSIX under Microsoft Window)
+#elif defined(__ANDROID__)
+#define ZT_PLATFORM_NAME "android" // Android (implies Linux, so it must come first)
+#elif defined(__linux__)
+#define ZT_PLATFORM_NAME "linux" // Debian, Ubuntu, Gentoo, Fedora, openSUSE, RedHat, Centos and other
+#elif defined(__unix__) || !defined(__APPLE__) && defined(__MACH__)
+#include <sys/param.h>
+#if defined(BSD)
+#define ZT_PLATFORM_NAME "bsd" // FreeBSD, NetBSD, OpenBSD, DragonFly BSD
+#endif
+#elif defined(__hpux)
+#define ZT_PLATFORM_NAME "hp-ux" // HP-UX
+#elif defined(_AIX)
+#define ZT_PLATFORM_NAME "aix" // IBM AIX
+#elif defined(__APPLE__) && defined(__MACH__) // Apple OSX and iOS (Darwin)
+#include <TargetConditionals.h>
+#if defined(TARGET_IPHONE_SIMULATOR) && TARGET_IPHONE_SIMULATOR == 1
+#define ZT_PLATFORM_NAME "ios_sim" // Apple iOS
+#elif defined(TARGET_OS_IPAD) && TARGET_OS_IPAD == 1
+#define ZT_PLATFORM_NAME "ios_ipad"
+#elif defined(TARGET_OS_IPHONE) && TARGET_OS_IPHONE == 1
+#define ZT_PLATFORM_NAME "ios_iphone" // Apple iOS
+#elif defined(TARGET_OS_MAC) && TARGET_OS_MAC == 1
+#define ZT_PLATFORM_NAME "macos" // Apple OSX
+#endif
+#elif defined(__sun) && defined(__SVR4)
+#define ZT_PLATFORM_NAME "solaris" // Oracle Solaris, Open Indiana
+#else
+#define ZT_PLATFORM_NAME "unknown"
+#endif
+#ifndef ZT_PLATFORM_NAME
+#define ZT_PLATFORM_NAME "unknown"
+#endif
+
+#if defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) || defined(__AMD64) || defined(__AMD64__) || defined(_M_X64) || defined(_M_AMD64)
+#define ZT_ARCH_NAME "x86_64"
+#elif defined(__i386__) || defined(__i486__) || defined(__i586__) || defined(__i686__) || defined(_X86_) || defined(_M_IX86) || defined(__X86__) || defined(__I86__) || defined(_M_I86)
+#define ZT_ARCH_NAME "x86"
+#elif defined(__aarch64__) || defined(__AARCH64EL__) || defined(_M_ARM64)
+#define ZT_ARCH_NAME "arm64"
+#elif defined(__arm__) || defined(__TARGET_ARCH_ARM) || defined(_ARM) || defined(_M_ARM) || defined(_M_ARMT) || defined(__arm) || defined(__thumb__)
+#define ZT_ARCH_NAME "arm"
+#elif defined(__loongarch__) || defined(_LOONGARCH_ARCH)
+#define ZT_ARCH_NAME "loongarch"
+#elif defined(__mips__) || defined(__MIPS__)
+#define ZT_ARCH_NAME "mips"
+#elif defined(__riscv) || defined(__riscv_xlen)
+#define ZT_ARCH_NAME "riscv"
+#elif defined(__powerpc__) || defined(__powerpc64__) || defined(__ppc__) || defined(__ppc64__) || defined (_M_PPC)
+#define ZT_ARCH_NAME "powerpc"
+#elif defined(__s390__) || defined(__s390x__) || defined(__zarch__)
+#define ZT_ARCH_NAME "s390"
+#else
+#define ZT_ARCH_NAME "unknown"
+#endif
+#ifndef ZT_ARCH_NAME
+#define ZT_ARCH_NAME "unknown"
+#endif
+
+#define ZT_TARGET_NAME (ZT_PLATFORM_NAME "/" ZT_ARCH_NAME)
+
 /**
  * Length of a ZeroTier address in bytes
  */

+ 60 - 61
node/IncomingPacket.cpp

@@ -38,6 +38,7 @@
 #include "Path.hpp"
 #include "Bond.hpp"
 #include "Metrics.hpp"
+#include "PacketMultiplexer.hpp"
 
 namespace ZeroTier {
 
@@ -334,7 +335,6 @@ bool IncomingPacket::_doACK(const RuntimeEnvironment* RR, void* tPtr, const Shar
 bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment* RR, void* tPtr, const SharedPtr<Peer>& peer)
 {
 	Metrics::pkt_qos_in++;
-	SharedPtr<Bond> bond = peer->bond();
 	if (! peer->rateGateQoS(RR->node->now(), _path)) {
 		return true;
 	}
@@ -793,66 +793,65 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
 {
 	Metrics::pkt_frame_in++;
 	int32_t _flowId = ZT_QOS_NO_FLOW;
-	if (peer->flowHashingSupported()) {
-		if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) {
-			const unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE);
-			const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
-			const uint8_t *const frameData = reinterpret_cast<const uint8_t *>(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
-
-			if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) {
-				uint16_t srcPort = 0;
-				uint16_t dstPort = 0;
-				uint8_t proto = (reinterpret_cast<const uint8_t *>(frameData)[9]);
-				const unsigned int headerLen = 4 * (reinterpret_cast<const uint8_t *>(frameData)[0] & 0xf);
-				switch(proto) {
-					case 0x01: // ICMP
-						//flowId = 0x01;
-						break;
-					// All these start with 16-bit source and destination port in that order
-					case 0x06: // TCP
-					case 0x11: // UDP
-					case 0x84: // SCTP
-					case 0x88: // UDPLite
-						if (frameLen > (headerLen + 4)) {
-							unsigned int pos = headerLen + 0;
-							srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
-							srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
-							pos++;
-							dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
-							dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
-							_flowId = dstPort ^ srcPort ^ proto;
-						}
-						break;
-				}
+
+	if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) {
+		const unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE);
+		const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
+		const uint8_t *const frameData = reinterpret_cast<const uint8_t *>(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
+
+		if (etherType == ZT_ETHERTYPE_IPV4 && (frameLen >= 20)) {
+			uint16_t srcPort = 0;
+			uint16_t dstPort = 0;
+			uint8_t proto = (reinterpret_cast<const uint8_t *>(frameData)[9]);
+			const unsigned int headerLen = 4 * (reinterpret_cast<const uint8_t *>(frameData)[0] & 0xf);
+			switch(proto) {
+				case 0x01: // ICMP
+					//flowId = 0x01;
+					break;
+				// All these start with 16-bit source and destination port in that order
+				case 0x06: // TCP
+				case 0x11: // UDP
+				case 0x84: // SCTP
+				case 0x88: // UDPLite
+					if (frameLen > (headerLen + 4)) {
+						unsigned int pos = headerLen + 0;
+						srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
+						srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
+						pos++;
+						dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
+						dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
+						_flowId = dstPort ^ srcPort ^ proto;
+					}
+					break;
 			}
+		}
 
-			if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) {
-				uint16_t srcPort = 0;
-				uint16_t dstPort = 0;
-				unsigned int pos;
-				unsigned int proto;
-				_ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto);
-				switch(proto) {
-					case 0x3A: // ICMPv6
-						//flowId = 0x3A;
-						break;
-					// All these start with 16-bit source and destination port in that order
-					case 0x06: // TCP
-					case 0x11: // UDP
-					case 0x84: // SCTP
-					case 0x88: // UDPLite
-						if (frameLen > (pos + 4)) {
-							srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
-							srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
-							pos++;
-							dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
-							dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
-							_flowId = dstPort ^ srcPort ^ proto;
-						}
-						break;
-					default:
-						break;
-				}
+		if (etherType == ZT_ETHERTYPE_IPV6 && (frameLen >= 40)) {
+			uint16_t srcPort = 0;
+			uint16_t dstPort = 0;
+			unsigned int pos;
+			unsigned int proto;
+			_ipv6GetPayload((const uint8_t *)frameData, frameLen, pos, proto);
+			switch(proto) {
+				case 0x3A: // ICMPv6
+					//flowId = 0x3A;
+					break;
+				// All these start with 16-bit source and destination port in that order
+				case 0x06: // TCP
+				case 0x11: // UDP
+				case 0x84: // SCTP
+				case 0x88: // UDPLite
+					if (frameLen > (pos + 4)) {
+						srcPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
+						srcPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
+						pos++;
+						dstPort = (reinterpret_cast<const uint8_t *>(frameData)[pos++]) << 8;
+						dstPort |= (reinterpret_cast<const uint8_t *>(frameData)[pos]);
+						_flowId = dstPort ^ srcPort ^ proto;
+					}
+					break;
+				default:
+					break;
 			}
 		}
 	}
@@ -869,7 +868,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
 				const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
 				const uint8_t *const frameData = reinterpret_cast<const uint8_t *>(data()) + ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
 				if (network->filterIncomingPacket(tPtr,peer,RR->identity.address(),sourceMac,network->mac(),frameData,frameLen,etherType,0) > 0) {
-					RR->node->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen);
+					RR->pm->putFrame(tPtr,nwid,network->userPtr(),sourceMac,network->mac(),etherType,0,(const void *)frameData,frameLen, _flowId);
 				}
 			}
 		} else {
@@ -942,7 +941,7 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
 					}
 					// fall through -- 2 means accept regardless of bridging checks or other restrictions
 				case 2:
-					RR->node->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen);
+					RR->pm->putFrame(tPtr,nwid,network->userPtr(),from,to,etherType,0,(const void *)frameData,frameLen, flowId);
 					break;
 			}
 		}

+ 1 - 0
node/Network.cpp

@@ -1313,6 +1313,7 @@ void Network::requestConfiguration(void *tPtr)
 	rmd.add(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_MAX_NETWORK_TAGS,(uint64_t)ZT_MAX_NETWORK_TAGS);
 	rmd.add(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_FLAGS,(uint64_t)0);
 	rmd.add(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_RULES_ENGINE_REV,(uint64_t)ZT_RULES_ENGINE_REVISION);
+	rmd.add(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_OS_ARCH,ZT_TARGET_NAME);
 
 	RR->t->networkConfigRequestSent(tPtr,*this,ctrl);
 

+ 3 - 1
node/NetworkConfig.hpp

@@ -105,6 +105,8 @@ namespace ZeroTier {
 
 // Network config version
 #define ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_VERSION "v"
+// Network config version
+#define ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_OS_ARCH "o"
 // Protocol version (see Packet.hpp)
 #define ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_PROTOCOL_VERSION "pv"
 // Software vendor
@@ -687,7 +689,7 @@ public:
 
 	/**
 	 * Time current authentication expires or 0 if external authentication is disabled
-	 * 
+	 *
 	 * Not used if authVersion >= 1
 	 */
 	uint64_t authenticationExpiryTime;

+ 17 - 2
node/Node.cpp

@@ -35,6 +35,7 @@
 #include "Network.hpp"
 #include "Trace.hpp"
 #include "Metrics.hpp"
+#include "PacketMultiplexer.hpp"
 
 // FIXME: remove this suppression and actually fix warnings
 #ifdef __GNUC__
@@ -119,9 +120,10 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64
 		const unsigned long mcs = sizeof(Multicaster) + (((sizeof(Multicaster) & 0xf) != 0) ? (16 - (sizeof(Multicaster) & 0xf)) : 0);
 		const unsigned long topologys = sizeof(Topology) + (((sizeof(Topology) & 0xf) != 0) ? (16 - (sizeof(Topology) & 0xf)) : 0);
 		const unsigned long sas = sizeof(SelfAwareness) + (((sizeof(SelfAwareness) & 0xf) != 0) ? (16 - (sizeof(SelfAwareness) & 0xf)) : 0);
-		const unsigned long bc = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0);
+		const unsigned long bcs = sizeof(Bond) + (((sizeof(Bond) & 0xf) != 0) ? (16 - (sizeof(Bond) & 0xf)) : 0);
+		const unsigned long pms = sizeof(PacketMultiplexer) + (((sizeof(PacketMultiplexer) & 0xf) != 0) ? (16 - (sizeof(PacketMultiplexer) & 0xf)) : 0);
 
-		m = reinterpret_cast<char *>(::malloc(16 + ts + sws + mcs + topologys + sas + bc));
+		m = reinterpret_cast<char *>(::malloc(16 + ts + sws + mcs + topologys + sas + bcs + pms));
 		if (!m) {
 			throw std::bad_alloc();
 		}
@@ -141,6 +143,8 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64
 		RR->sa = new (m) SelfAwareness(RR);
 		m += sas;
 		RR->bc = new (m) Bond(RR);
+		m += bcs;
+		RR->pm = new (m) PacketMultiplexer(RR);
 	} catch ( ... ) {
 		if (RR->sa) {
 			RR->sa->~SelfAwareness();
@@ -160,6 +164,9 @@ Node::Node(void *uptr,void *tptr,const struct ZT_Node_Callbacks *callbacks,int64
 		if (RR->bc) {
 			RR->bc->~Bond();
 		}
+		if (RR->pm) {
+			RR->pm->~PacketMultiplexer();
+		}
 		::free(m);
 		throw;
 	}
@@ -191,6 +198,9 @@ Node::~Node()
 	if (RR->bc) {
 		RR->bc->~Bond();
 	}
+	if (RR->pm) {
+		RR->pm->~PacketMultiplexer();
+	}
 	::free(RR->rtmem);
 }
 
@@ -230,6 +240,11 @@ ZT_ResultCode Node::processVirtualNetworkFrame(
 	}
 }
 
+void Node::initMultithreading(unsigned int concurrency, bool cpuPinningEnabled)
+{
+	RR->pm->setUpPostDecodeReceiveThreads(concurrency, cpuPinningEnabled);
+}
+
 // Closure used to ping upstream and active/online peers
 class _PingPeersThatNeedPing
 {

+ 4 - 1
node/Node.hpp

@@ -283,7 +283,10 @@ public:
 		return _lowBandwidthMode;
 	}
 
-private:
+	void initMultithreading(unsigned int concurrency, bool cpuPinningEnabled);
+
+
+public:
 	RuntimeEnvironment _RR;
 	RuntimeEnvironment *RR;
 	void *_uPtr; // _uptr (lower case) is reserved in Visual Studio :P

+ 122 - 0
node/PacketMultiplexer.cpp

@@ -0,0 +1,122 @@
+/*
+ * Copyright (c)2013-2021 ZeroTier, Inc.
+ *
+ * Use of this software is governed by the Business Source License included
+ * in the LICENSE.TXT file in the project's root directory.
+ *
+ * Change Date: 2026-01-01
+ *
+ * On the date above, in accordance with the Business Source License, use
+ * of this software will be governed by version 2.0 of the Apache License.
+ */
+/****/
+
+#include "PacketMultiplexer.hpp"
+
+#include "Node.hpp"
+#include "RuntimeEnvironment.hpp"
+#include "Constants.hpp"
+
+#include <stdio.h>
+#include <stdlib.h>
+
+namespace ZeroTier {
+
+PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv)
+{
+	RR = renv;
+};
+
+void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId)
+{
+#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)
+	RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len);
+	return;
+#endif
+
+	if (!_enabled) {
+		RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len);
+		return;
+	}
+
+	PacketRecord* packet;
+	_rxPacketVector_m.lock();
+	if (_rxPacketVector.empty()) {
+		packet = new PacketRecord;
+	}
+	else {
+		packet = _rxPacketVector.back();
+		_rxPacketVector.pop_back();
+	}
+	_rxPacketVector_m.unlock();
+
+	packet->tPtr = tPtr;
+	packet->nwid = nwid;
+	packet->nuptr = nuptr;
+	packet->source = source.toInt();
+	packet->dest = dest.toInt();
+	packet->etherType = etherType;
+	packet->vlanId = vlanId;
+	packet->len = len;
+	packet->flowId = flowId;
+	memcpy(packet->data, data, len);
+
+	int bucket = flowId % _concurrency;
+	_rxPacketQueues[bucket]->postLimit(packet, 2048);
+}
+
+void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled)
+{
+#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)
+	return;
+#endif
+	_enabled = true;
+	_concurrency = concurrency;
+	bool _enablePinning = cpuPinningEnabled;
+
+	for (unsigned int i = 0; i < _concurrency; ++i) {
+		fprintf(stderr, "Reserved queue for thread %d\n", i);
+		_rxPacketQueues.push_back(new BlockingQueue<PacketRecord*>());
+	}
+
+	// Each thread picks from its own queue to feed into the core
+	for (unsigned int i = 0; i < _concurrency; ++i) {
+		_rxThreads.push_back(std::thread([this, i, _enablePinning]() {
+			fprintf(stderr, "Created post-decode packet ingestion thread %d\n", i);
+
+			PacketRecord* packet = nullptr;
+			for (;;) {
+				if (! _rxPacketQueues[i]->get(packet)) {
+					break;
+				}
+				if (! packet) {
+					break;
+				}
+
+				// fprintf(stderr, "popped packet from queue %d\n", i);
+
+				MAC sourceMac = MAC(packet->source);
+				MAC destMac = MAC(packet->dest);
+
+				RR->node->putFrame(packet->tPtr, packet->nwid, packet->nuptr, sourceMac, destMac, packet->etherType, 0, (const void*)packet->data, packet->len);
+				{
+					Mutex::Lock l(_rxPacketVector_m);
+					_rxPacketVector.push_back(packet);
+				}
+				/*
+				if (ZT_ResultCode_isFatal(err)) {
+					char tmp[256];
+					OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err);
+					Mutex::Lock _l(_termReason_m);
+					_termReason = ONE_UNRECOVERABLE_ERROR;
+					_fatalErrorMessage = tmp;
+					this->terminate();
+					break;
+				}
+				*/
+			}
+		}));
+	}
+}
+
+}	// namespace ZeroTier

+ 65 - 0
node/PacketMultiplexer.hpp

@@ -0,0 +1,65 @@
+/*
+ * Copyright (c)2013-2021 ZeroTier, Inc.
+ *
+ * Use of this software is governed by the Business Source License included
+ * in the LICENSE.TXT file in the project's root directory.
+ *
+ * Change Date: 2026-01-01
+ *
+ * On the date above, in accordance with the Business Source License, use
+ * of this software will be governed by version 2.0 of the Apache License.
+ */
+/****/
+
+#ifndef ZT_PACKET_MULTIPLEXER_HPP
+#define ZT_PACKET_MULTIPLEXER_HPP
+
+#include "../osdep/BlockingQueue.hpp"
+#include "MAC.hpp"
+#include "Mutex.hpp"
+#include "RuntimeEnvironment.hpp"
+
+#include <thread>
+#include <vector>
+
+namespace ZeroTier {
+
+struct PacketRecord {
+	void* tPtr;
+	uint64_t nwid;
+	void** nuptr;
+	uint64_t source;
+	uint64_t dest;
+	unsigned int etherType;
+	unsigned int vlanId;
+	uint8_t data[ZT_MAX_MTU];
+	unsigned int len;
+	unsigned int flowId;
+};
+
+class PacketMultiplexer {
+  public:
+	const RuntimeEnvironment* RR;
+
+	PacketMultiplexer(const RuntimeEnvironment* renv);
+
+	void setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled);
+
+	void putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId);
+
+	std::vector<BlockingQueue<PacketRecord*>*> _rxPacketQueues;
+
+	unsigned int _concurrency;
+	// pool
+	std::vector<PacketRecord*> _rxPacketVector;
+	std::vector<std::thread> _rxPacketThreads;
+	Mutex _rxPacketVector_m, _rxPacketThreads_m;
+
+	std::vector<std::thread> _rxThreads;
+	unsigned int _rxThreadCount;
+	bool _enabled;
+};
+
+}	// namespace ZeroTier
+
+#endif	 // ZT_PACKET_MULTIPLEXER_HPP

+ 2 - 0
node/RuntimeEnvironment.hpp

@@ -31,6 +31,7 @@ class NetworkController;
 class SelfAwareness;
 class Trace;
 class Bond;
+class PacketMultiplexer;
 
 /**
  * Holds global state for an instance of ZeroTier::Node
@@ -77,6 +78,7 @@ public:
 	Topology *topology;
 	SelfAwareness *sa;
 	Bond *bc;
+	PacketMultiplexer *pm;
 
 	// This node's identity and string representations thereof
 	Identity identity;

+ 1 - 1
node/Switch.cpp

@@ -519,7 +519,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 						RR->node->putFrame(tPtr, network->id(), network->userPtr(), peerMac, from, ZT_ETHERTYPE_IPV6, 0, adv, 72);
 
 					}).detach();
-					
+
 					return; // NDP emulation done. We have forged a "fake" reply, so no need to send actual NDP query.
 				} // else no NDP emulation
 			} // else no NDP emulation

+ 14 - 0
node/Utils.hpp

@@ -24,6 +24,7 @@
 #include <stdexcept>
 #include <vector>
 #include <map>
+#include <algorithm>
 
 #if defined(__FreeBSD__)
 #include <sys/endian.h>
@@ -849,6 +850,19 @@ public:
 	 * Hexadecimal characters 0-f
 	 */
 	static const char HEXCHARS[16];
+
+	/*
+	 * Remove `-` and `:` from a MAC address (in-place).
+	 *
+	 * @param mac The MAC address
+	*/
+	static inline void cleanMac(std::string& mac)
+	{
+		auto start = mac.begin();
+		auto end = mac.end();
+		auto new_end = std::remove_if(start, end, [](char c) { return c == 45 || c == 58; });
+		mac.erase(new_end, end);
+	}
 };
 
 } // namespace ZeroTier

+ 2 - 1
objects.mk

@@ -29,7 +29,8 @@ CORE_OBJS=\
 	node/Topology.o \
 	node/Trace.o \
 	node/Utils.o \
-	node/Bond.o
+	node/Bond.o \
+	node/PacketMultiplexer.o
 
 ONE_OBJS=\
 	controller/EmbeddedNetworkController.o \

+ 69 - 38
osdep/BSDEthernetTap.cpp

@@ -39,7 +39,9 @@
 #include <net/if_dl.h>
 #include <net/if_media.h>
 #include <net/route.h>
+#include <pthread_np.h>
 
+#include <sched.h>
 #include <string>
 #include <map>
 #include <set>
@@ -53,6 +55,7 @@
 #include "BSDEthernetTap.hpp"
 
 #define ZT_BASE32_CHARS "0123456789abcdefghijklmnopqrstuv"
+#define ZT_TAP_BUF_SIZE (1024 * 16)
 
 // ff:ff:ff:ff:ff:ff with no ADI
 static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC(0xff),0);
@@ -61,6 +64,8 @@ namespace ZeroTier {
 
 BSDEthernetTap::BSDEthernetTap(
 	const char *homePath,
+	unsigned int concurrency,
+	bool pinning,
 	const MAC &mac,
 	unsigned int mtu,
 	unsigned int metric,
@@ -69,6 +74,8 @@ BSDEthernetTap::BSDEthernetTap(
 	void (*handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int),
 	void *arg) :
 	_handler(handler),
+	_concurrency(concurrency),
+	_pinning(pinning),
 	_arg(arg),
 	_nwid(nwid),
 	_mtu(mtu),
@@ -195,11 +202,9 @@ BSDEthernetTap::BSDEthernetTap(
 BSDEthernetTap::~BSDEthernetTap()
 {
 	::write(_shutdownSignalPipe[1],"\0",1); // causes thread to exit
-	Thread::join(_thread);
 	::close(_fd);
 	::close(_shutdownSignalPipe[0]);
 	::close(_shutdownSignalPipe[1]);
-
 	long cpid = (long)vfork();
 	if (cpid == 0) {
 #ifdef ZT_TRACE
@@ -211,6 +216,10 @@ BSDEthernetTap::~BSDEthernetTap()
 		int exitcode = -1;
 		::waitpid(cpid,&exitcode,0);
 	}
+	Thread::join(_thread);
+	for (std::thread &t : _rxThreads) {
+		t.join();
+	}
 }
 
 void BSDEthernetTap::setEnabled(bool en)
@@ -418,53 +427,75 @@ void BSDEthernetTap::setMtu(unsigned int mtu)
 void BSDEthernetTap::threadMain()
 	throw()
 {
-	fd_set readfds,nullfds;
-	MAC to,from;
-	int n,nfds,r;
-	char getBuf[ZT_MAX_MTU + 64];
-
 	// Wait for a moment after startup -- wait for Network to finish
 	// constructing itself.
 	Thread::sleep(500);
 
-	FD_ZERO(&readfds);
-	FD_ZERO(&nullfds);
-	nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1;
+	for (unsigned int i = 0; i < _concurrency; ++i) {
+		_rxThreads.push_back(std::thread([this, i, _pinning] {
+
+			if (_pinning) {
+				int pinCore = i % _concurrency;
+				fprintf(stderr, "Pinning thread %d to core %d\n", i, pinCore);
+				pthread_t self = pthread_self();
+				cpu_set_t cpuset;
+				CPU_ZERO(&cpuset);
+				CPU_SET(pinCore, &cpuset);
+				//int rc = sched_setaffinity(self, sizeof(cpu_set_t), &cpuset);
+				int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
+				if (rc != 0)
+				{
+					fprintf(stderr, "Failed to pin thread %d to core %d: %s\n", i, pinCore, strerror(errno));
+					exit(1);
+				}
+			}
+
+			uint8_t b[ZT_TAP_BUF_SIZE];
+			MAC to, from;
+			fd_set readfds, nullfds;
+			int n, nfds, r;
 
-	r = 0;
-	for(;;) {
-		FD_SET(_shutdownSignalPipe[0],&readfds);
-		FD_SET(_fd,&readfds);
-		select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);
+			FD_ZERO(&readfds);
+			FD_ZERO(&nullfds);
+			nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1;
 
-		if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread
-			break;
+			r = 0;
 
-		if (FD_ISSET(_fd,&readfds)) {
-			n = (int)::read(_fd,getBuf + r,sizeof(getBuf) - r);
-			if (n < 0) {
-				if ((errno != EINTR)&&(errno != ETIMEDOUT))
+			for(;;) {
+				FD_SET(_shutdownSignalPipe[0],&readfds);
+				FD_SET(_fd,&readfds);
+				select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);
+
+				if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread
 					break;
-			} else {
-				// Some tap drivers like to send the ethernet frame and the
-				// payload in two chunks, so handle that by accumulating
-				// data until we have at least a frame.
-				r += n;
-				if (r > 14) {
-					if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms
-						r = _mtu + 14;
-
-					if (_enabled) {
-						to.setTo(getBuf,6);
-						from.setTo(getBuf + 6,6);
-						unsigned int etherType = ntohs(((const uint16_t *)getBuf)[6]);
-						_handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(getBuf + 14),r - 14);
-					}
 
-					r = 0;
+				if (FD_ISSET(_fd,&readfds)) {
+					n = (int)::read(_fd,b + r,sizeof(b) - r);
+					if (n < 0) {
+						if ((errno != EINTR)&&(errno != ETIMEDOUT))
+							break;
+					} else {
+						// Some tap drivers like to send the ethernet frame and the
+						// payload in two chunks, so handle that by accumulating
+						// data until we have at least a frame.
+						r += n;
+						if (r > 14) {
+							if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms
+								r = _mtu + 14;
+
+							if (_enabled) {
+								to.setTo(b,6);
+								from.setTo(b + 6,6);
+								unsigned int etherType = ntohs(((const uint16_t *)b)[6]);
+								_handler(_arg,(void *)0,_nwid,from,to,etherType,0,(const void *)(b + 14),r - 14);
+							}
+
+							r = 0;
+						}
+					}
 				}
 			}
-		}
+		}));
 	}
 }
 

+ 6 - 0
osdep/BSDEthernetTap.hpp

@@ -20,6 +20,7 @@
 #include <string>
 #include <vector>
 #include <stdexcept>
+#include <thread>
 
 #include "../node/Constants.hpp"
 #include "../node/MulticastGroup.hpp"
@@ -34,6 +35,8 @@ class BSDEthernetTap : public EthernetTap
 public:
 	BSDEthernetTap(
 		const char *homePath,
+		unsigned int concurrency,
+		bool pinning,
 		const MAC &mac,
 		unsigned int mtu,
 		unsigned int metric,
@@ -62,6 +65,8 @@ public:
 private:
 	void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int);
 	void *_arg;
+	unsigned int _concurrency;
+	bool _pinning;
 	uint64_t _nwid;
 	Thread _thread;
 	std::string _dev;
@@ -73,6 +78,7 @@ private:
 	volatile bool _enabled;
 	mutable std::vector<InetAddress> _ifaddrs;
 	mutable uint64_t _lastIfAddrsUpdate;
+	std::vector<std::thread> _rxThreads;
 };
 
 } // namespace ZeroTier

+ 4 - 2
osdep/EthernetTap.cpp

@@ -57,6 +57,8 @@ namespace ZeroTier {
 
 std::shared_ptr<EthernetTap> EthernetTap::newInstance(
 	const char *tapDeviceType, // OS-specific, NULL for default
+	unsigned int concurrency,
+	bool pinning,
 	const char *homePath,
 	const MAC &mac,
 	unsigned int mtu,
@@ -92,7 +94,7 @@ std::shared_ptr<EthernetTap> EthernetTap::newInstance(
 #endif // __APPLE__
 
 #ifdef __LINUX__
-	return std::shared_ptr<EthernetTap>(new LinuxEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
+	return std::shared_ptr<EthernetTap>(new LinuxEthernetTap(homePath,concurrency,pinning,mac,mtu,metric,nwid,friendlyName,handler,arg));
 #endif // __LINUX__
 
 #ifdef __WINDOWS__
@@ -130,7 +132,7 @@ std::shared_ptr<EthernetTap> EthernetTap::newInstance(
 #endif // __WINDOWS__
 
 #ifdef __FreeBSD__
-	return std::shared_ptr<EthernetTap>(new BSDEthernetTap(homePath,mac,mtu,metric,nwid,friendlyName,handler,arg));
+	return std::shared_ptr<EthernetTap>(new BSDEthernetTap(homePath,concurrency,pinning,mac,mtu,metric,nwid,friendlyName,handler,arg));
 #endif // __FreeBSD__
 
 #ifdef __NetBSD__

+ 2 - 0
osdep/EthernetTap.hpp

@@ -32,6 +32,8 @@ class EthernetTap
 public:
 	static std::shared_ptr<EthernetTap> newInstance(
 		const char *tapDeviceType, // OS-specific, NULL for default
+		unsigned int concurrency,
+		bool pinning,
 		const char *homePath,
 		const MAC &mac,
 		unsigned int mtu,

+ 123 - 101
osdep/LinuxEthernetTap.cpp

@@ -60,7 +60,7 @@
 #define IFNAMSIZ 16
 #endif
 
-#define ZT_TAP_BUF_SIZE 16384
+#define ZT_TAP_BUF_SIZE (1024 * 16)
 
 // ff:ff:ff:ff:ff:ff with no ADI
 static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC(0xff),0);
@@ -68,7 +68,7 @@ static const ZeroTier::MulticastGroup _blindWildcardMulticastGroup(ZeroTier::MAC
 namespace ZeroTier {
 
 // determine if we're running a really old linux kernel.
-// Kernels in the 2.6.x series don't behave the same when bringing up 
+// Kernels in the 2.6.x series don't behave the same when bringing up
 // the tap devices.
 //
 // Returns true if the kernel major version is < 3
@@ -111,6 +111,8 @@ static void _base32_5_to_8(const uint8_t *in,char *out)
 
 LinuxEthernetTap::LinuxEthernetTap(
 	const char *homePath,
+	unsigned int concurrency,
+	bool pinning,
 	const MAC &mac,
 	unsigned int mtu,
 	unsigned int metric,
@@ -220,135 +222,155 @@ LinuxEthernetTap::LinuxEthernetTap(
 
 	(void)::pipe(_shutdownSignalPipe);
 
-	_tapReaderThread = std::thread([this]{
-		uint8_t b[ZT_TAP_BUF_SIZE];
-		fd_set readfds,nullfds;
-		int n,nfds,r;
-		std::vector<void *> buffers;
-		struct ifreq ifr;
+	for (unsigned int i = 0; i < concurrency; ++i) {
+		_rxThreads.push_back(std::thread([this, i, concurrency, pinning] {
+
+			if (pinning) {
+				int pinCore = i % concurrency;
+				fprintf(stderr, "Pinning tap thread %d to core %d\n", i, pinCore);
+				pthread_t self = pthread_self();
+				cpu_set_t cpuset;
+				CPU_ZERO(&cpuset);
+				CPU_SET(pinCore, &cpuset);
+				int rc = pthread_setaffinity_np(self, sizeof(cpu_set_t), &cpuset);
+				if (rc != 0)
+				{
+					fprintf(stderr, "Failed to pin tap thread %d to core %d: %s\n", i, pinCore, strerror(errno));
+					exit(1);
+				}
+			}
+
+			uint8_t b[ZT_TAP_BUF_SIZE];
+			fd_set readfds, nullfds;
+			int n, nfds, r;
+			if (i == 0) {
+				struct ifreq ifr;
+				memset(&ifr, 0, sizeof(ifr));
+				strcpy(ifr.ifr_name, _dev.c_str());
+
+				const int sock = socket(AF_INET, SOCK_DGRAM, 0);
+				if (sock <= 0)
+					return;
+
+				if (ioctl(sock, SIOCGIFFLAGS, (void*)&ifr) < 0) {
+					::close(sock);
+					printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n");
+					return;
+				}
 
-		memset(&ifr,0,sizeof(ifr));
-		strcpy(ifr.ifr_name,_dev.c_str());
+				ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER;
+				_mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data, 6);
+				if (ioctl(sock, SIOCSIFHWADDR, (void*)&ifr) < 0) {
+					::close(sock);
+					printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n");
+					return;
+				}
 
-		const int sock = socket(AF_INET,SOCK_DGRAM,0);
-		if (sock <= 0)
-			return;
+				usleep(100000);
 
-		if (ioctl(sock,SIOCGIFFLAGS,(void *)&ifr) < 0) {
-			::close(sock);
-			printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n");
-			return;
-		}
+				if (isOldLinuxKernel()) {
+					ifr.ifr_ifru.ifru_mtu = (int)_mtu;
+					if (ioctl(sock, SIOCSIFMTU, (void*)&ifr) < 0) {
+						::close(sock);
+						printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n");
+						return;
+					}
 
-		ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER;
-		_mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6);
-		if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) {
-			::close(sock);
-			printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n");
-			return;
-		}
+					usleep(100000);
+				}
 
-		usleep(100000);
+				ifr.ifr_flags |= IFF_MULTICAST;
+				ifr.ifr_flags |= IFF_UP;
+				if (ioctl(sock, SIOCSIFFLAGS, (void*)&ifr) < 0) {
+					::close(sock);
+					printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n");
+					return;
+				}
 
-		if (isOldLinuxKernel()) {
-			ifr.ifr_ifru.ifru_mtu = (int)_mtu;
-			if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) {
-				::close(sock);
-				printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n");
-				return;
-			}
+				usleep(100000);
 
-			usleep(100000);
-		}
-	
-
-		ifr.ifr_flags |= IFF_MULTICAST;
-		ifr.ifr_flags |= IFF_UP;
-		if (ioctl(sock,SIOCSIFFLAGS,(void *)&ifr) < 0) {
-			::close(sock);
-			printf("WARNING: ioctl() failed setting up Linux tap device (bring interface up)\n");
-			return;
-		}
+				if (! isOldLinuxKernel()) {
+					ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER;
+					_mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data, 6);
+					if (ioctl(sock, SIOCSIFHWADDR, (void*)&ifr) < 0) {
+						::close(sock);
+						printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n");
+						return;
+					}
+
+					ifr.ifr_ifru.ifru_mtu = (int)_mtu;
+					if (ioctl(sock, SIOCSIFMTU, (void*)&ifr) < 0) {
+						::close(sock);
+						printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n");
+						return;
+					}
+				}
 
-		usleep(100000);
+				fcntl(_fd, F_SETFL, O_NONBLOCK);
 
-		if (!isOldLinuxKernel()) {
-			ifr.ifr_ifru.ifru_hwaddr.sa_family = ARPHRD_ETHER;
-			_mac.copyTo(ifr.ifr_ifru.ifru_hwaddr.sa_data,6);
-			if (ioctl(sock,SIOCSIFHWADDR,(void *)&ifr) < 0) {
 				::close(sock);
-				printf("WARNING: ioctl() failed setting up Linux tap device (set MAC)\n");
-				return;
 			}
 
-			ifr.ifr_ifru.ifru_mtu = (int)_mtu;
-			if (ioctl(sock,SIOCSIFMTU,(void *)&ifr) < 0) {
-				::close(sock);
-				printf("WARNING: ioctl() failed setting up Linux tap device (set MTU)\n");
+			if (! _run) {
 				return;
 			}
-		}
 
-		fcntl(_fd,F_SETFL,O_NONBLOCK);
-
-		::close(sock);
-
-		if (!_run)
-			return;
-
-		FD_ZERO(&readfds);
-		FD_ZERO(&nullfds);
-		nfds = (int)std::max(_shutdownSignalPipe[0],_fd) + 1;
-
-		r = 0;
-		for(;;) {
-			FD_SET(_shutdownSignalPipe[0],&readfds);
-			FD_SET(_fd,&readfds);
-			select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);
-
-			if (FD_ISSET(_shutdownSignalPipe[0],&readfds))
-				break;
-
-			if (FD_ISSET(_fd,&readfds)) {
-				for(;;) { // read until there are no more packets, then return to outer select() loop
-					n = (int)::read(_fd,b + r,ZT_TAP_BUF_SIZE - r);
-					if (n > 0) {
-						// Some tap drivers like to send the ethernet frame and the
-						// payload in two chunks, so handle that by accumulating
-						// data until we have at least a frame.
-						r += n;
-						if (r > 14) {
-							if (r > ((int)_mtu + 14)) // sanity check for weird TAP behavior on some platforms
-								r = _mtu + 14;
-
-							if (_enabled) {
-								//_tapq.post(std::pair<void *,int>(buf,r));
-								//buf = nullptr;
-								MAC to(b, 6),from(b + 6, 6);
-								unsigned int etherType = Utils::ntoh(((const uint16_t *)b)[6]);
-								_handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void *)(b + 14),(unsigned int)(r - 14));
-							}
+			FD_ZERO(&readfds);
+			FD_ZERO(&nullfds);
+			nfds = (int)std::max(_shutdownSignalPipe[0], _fd) + 1;
+
+			r = 0;
+			for (;;) {
+				FD_SET(_shutdownSignalPipe[0], &readfds);
+				FD_SET(_fd, &readfds);
+				select(nfds, &readfds, &nullfds, &nullfds, (struct timeval*)0);
 
+				if (FD_ISSET(_shutdownSignalPipe[0], &readfds)) {
+					break;
+				}
+				if (FD_ISSET(_fd, &readfds)) {
+					for (;;) {
+						// read until there are no more packets, then return to outer select() loop
+						n = (int)::read(_fd, b + r, ZT_TAP_BUF_SIZE - r);
+						if (n > 0) {
+							// Some tap drivers like to send the ethernet frame and the
+							// payload in two chunks, so handle that by accumulating
+							// data until we have at least a frame.
+							r += n;
+							if (r > 14) {
+								if (r > ((int)_mtu + 14))	// sanity check for weird TAP behavior on some platforms
+									r = _mtu + 14;
+
+								if (_enabled) {
+									MAC to(b, 6), from(b + 6, 6);
+									unsigned int etherType = Utils::ntoh(((const uint16_t*)b)[6]);
+									_handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void*)(b + 14), (unsigned int)(r - 14));
+								}
+
+								r = 0;
+							}
+						}
+						else {
 							r = 0;
+							break;
 						}
-					} else {
-						r = 0;
-						break;
 					}
 				}
 			}
-		}
-	});
+		}));
+	}
 }
 
 LinuxEthernetTap::~LinuxEthernetTap()
 {
 	_run = false;
 	(void)::write(_shutdownSignalPipe[1],"\0",1);
-	_tapReaderThread.join();
 	::close(_fd);
 	::close(_shutdownSignalPipe[0]);
 	::close(_shutdownSignalPipe[1]);
+	for (std::thread &t : _rxThreads) {
+		t.join();
+	}
 }
 
 void LinuxEthernetTap::setEnabled(bool en)

+ 4 - 4
osdep/LinuxEthernetTap.hpp

@@ -26,6 +26,7 @@
 #include <mutex>
 #include "../node/MulticastGroup.hpp"
 #include "EthernetTap.hpp"
+#include "BlockingQueue.hpp"
 
 namespace ZeroTier {
 
@@ -34,6 +35,8 @@ class LinuxEthernetTap : public EthernetTap
 public:
 	LinuxEthernetTap(
 		const char *homePath,
+		unsigned int concurrency,
+		bool pinning,
 		const MAC &mac,
 		unsigned int mtu,
 		unsigned int metric,
@@ -57,9 +60,6 @@ public:
 	virtual void setMtu(unsigned int mtu);
 	virtual void setDns(const char *domain, const std::vector<InetAddress> &servers) {}
 
-
-
-
 private:
 	void (*_handler)(void *,void *,uint64_t,const MAC &,const MAC &,unsigned int,unsigned int,const void *,unsigned int);
 	void *_arg;
@@ -73,9 +73,9 @@ private:
 	int _shutdownSignalPipe[2];
 	std::atomic_bool _enabled;
 	std::atomic_bool _run;
-	std::thread _tapReaderThread;
 	mutable std::vector<InetAddress> _ifaddrs;
 	mutable uint64_t _lastIfAddrsUpdate;
+	std::vector<std::thread> _rxThreads;
 };
 
 } // namespace ZeroTier

+ 1 - 1
osdep/MacEthernetTapAgent.c

@@ -32,7 +32,7 @@
  * All this stuff is basically undocumented. A lot of tracing through
  * the Darwin/XNU kernel source was required to figure out how to make
  * this actually work.
- * 
+ *
  * We hope to develop a DriverKit-based driver in the near-mid future to
  * replace this weird hack, but it works for now through Big Sur in our
  * testing.

+ 3 - 1
osdep/MacKextEthernetTap.cpp

@@ -447,7 +447,9 @@ MacKextEthernetTap::~MacKextEthernetTap()
 
 	::write(_shutdownSignalPipe[1],"\0",1); // causes thread to exit
 	Thread::join(_thread);
-
+		for (std::thread &t : _rxThreads) {
+		t.join();
+	}
 	::close(_fd);
 	::close(_shutdownSignalPipe[0]);
 	::close(_shutdownSignalPipe[1]);

+ 2 - 0
osdep/MacKextEthernetTap.hpp

@@ -20,6 +20,7 @@
 #include <stdexcept>
 #include <string>
 #include <vector>
+#include <thread>
 
 #include "../node/Constants.hpp"
 #include "../node/MAC.hpp"
@@ -75,6 +76,7 @@ private:
 	int _fd;
 	int _shutdownSignalPipe[2];
 	volatile bool _enabled;
+	std::vector<std::thread> _rxThreads;
 };
 
 } // namespace ZeroTier

+ 5 - 0
pkg/synology/dsm7-docker/README.md

@@ -1,3 +1,8 @@
 ## Docker image for Synology's DSM7
 
 Documentation: [docs.zerotier.com/devices/synology](https://docs.zerotier.com/devices/synology)
+
+### Build & Push changes to DockerHub
+```shell
+./build.sh build
+```

+ 10 - 12
pkg/synology/dsm7-docker/build.sh

@@ -3,19 +3,17 @@
 ZTO_VER=$(git describe --tags $(git rev-list --tags --max-count=1))
 ZTO_COMMIT=$(git rev-parse HEAD)
 
-build()
-{
-  sudo docker build --load --rm -t zerotier-synology . --build-arg ZTO_COMMIT=${ZTO_COMMIT} --build-arg ZTO_VER=${ZTO_VER}
-  LATEST_DOCKER_IMAGE_HASH=$(sudo docker images -q zerotier-synology)
-  sudo docker tag ${LATEST_DOCKER_IMAGE_HASH} zerotier/zerotier-synology:${ZTO_VER}
-  sudo docker tag ${LATEST_DOCKER_IMAGE_HASH} zerotier/zerotier-synology:latest
-}
-
-push()
-{
+build() {
   sudo docker login --username=${DOCKERHUB_USERNAME}
-  sudo docker push zerotier/zerotier-synology:${ZTO_VER}
-  sudo docker push zerotier/zerotier-synology:latest
+
+  sudo docker buildx build \
+    --push \
+    --platform linux/arm/v7,linux/arm64/v8,linux/amd64 \
+    --tag zerotier/zerotier-synology:${ZTO_VER} \
+    --tag zerotier/zerotier-synology:latest \
+    --build-arg ZTO_COMMIT=${ZTO_COMMIT} \
+    --build-arg ZTO_VER=${ZTO_VER} \
+    .
 }
 
 "$@"

文件差異過大導致無法顯示
+ 279 - 169
rustybits/Cargo.lock


+ 1 - 1
rustybits/zeroidc.vcxproj

@@ -91,7 +91,7 @@
     <NMakeOutput>
     </NMakeOutput>
     <NMakeCleanCommandLine>cargo clean</NMakeCleanCommandLine>
-    <NMakeReBuildCommandLine>cargo clean &amp; cargo build --release --target=x86_64-pc-windows-msvc</NMakeReBuildCommandLine>
+    <NMakeReBuildCommandLine>cargo clean &amp; cargo build -p zeroidc --release --target=x86_64-pc-windows-msvc</NMakeReBuildCommandLine>
     <NMakePreprocessorDefinitions>NDEBUG;$(NMakePreprocessorDefinitions)</NMakePreprocessorDefinitions>
   </PropertyGroup>
   <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|ARM64'">

+ 87 - 21
service/OneService.cpp

@@ -16,7 +16,6 @@
 #include <stdlib.h>
 #include <string.h>
 #include <stdint.h>
-
 #include <string>
 #include <map>
 #include <vector>
@@ -26,6 +25,11 @@
 #include <mutex>
 #include <condition_variable>
 
+#ifdef __FreeBSD__
+#include <sched.h>
+#include <pthread_np.h>
+#endif
+
 #include "../version.h"
 #include "../include/ZeroTierOne.h"
 
@@ -42,6 +46,7 @@
 #include "../node/SHA512.hpp"
 #include "../node/Bond.hpp"
 #include "../node/Peer.hpp"
+#include "../node/PacketMultiplexer.hpp"
 
 #include "../osdep/Phy.hpp"
 #include "../osdep/OSUtils.hpp"
@@ -759,7 +764,7 @@ struct TcpConnection
 	Mutex writeq_m;
 };
 
-struct OneServiceIncomingPacket
+struct PacketRecord
 {
 	uint64_t now;
 	int64_t sock;
@@ -786,14 +791,22 @@ public:
 	SoftwareUpdater *_updater;
 	bool _updateAutoApply;
 
-    httplib::Server _controlPlane;
+	httplib::Server _controlPlane;
 	httplib::Server _controlPlaneV6;
-    std::thread _serverThread;
+	std::thread _serverThread;
 	std::thread _serverThreadV6;
 	bool _serverThreadRunning;
 	bool _serverThreadRunningV6;
 
-    bool _allowTcpFallbackRelay;
+	BlockingQueue<PacketRecord *> _rxPacketQueue;
+	std::vector<PacketRecord *> _rxPacketVector;
+	std::vector<std::thread> _rxPacketThreads;
+	Mutex _rxPacketVector_m,_rxPacketThreads_m;
+	bool _multicoreEnabled;
+	bool _cpuPinningEnabled;
+	unsigned int _concurrency;
+
+	bool _allowTcpFallbackRelay;
 	bool _forceTcpRelay;
 	bool _allowSecondaryPort;
 	bool _enableWebServer;
@@ -844,8 +857,6 @@ public:
 	// Deadline for the next background task service function
 	volatile int64_t _nextBackgroundTaskDeadline;
 
-
-
 	std::map<uint64_t,NetworkState> _nets;
 	Mutex _nets_m;
 
@@ -892,9 +903,9 @@ public:
 		,_node((Node *)0)
 		,_updater((SoftwareUpdater *)0)
 		,_updateAutoApply(false)
-        ,_controlPlane()
+		,_controlPlane()
 		,_controlPlaneV6()
-        ,_serverThread()
+		,_serverThread()
 		,_serverThreadV6()
 		,_serverThreadRunning(false)
 		,_serverThreadRunningV6(false)
@@ -928,9 +939,9 @@ public:
 		_ports[1] = 0;
 		_ports[2] = 0;
 
-        prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr);
-        prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5));
-        prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom");
+		prometheus::simpleapi::saver.set_registry(prometheus::simpleapi::registry_ptr);
+		prometheus::simpleapi::saver.set_delay(std::chrono::seconds(5));
+		prometheus::simpleapi::saver.set_out_file(_homePath + ZT_PATH_SEPARATOR + "metrics.prom");
 
 #if ZT_VAULT_SUPPORT
 		curl_global_init(CURL_GLOBAL_DEFAULT);
@@ -942,20 +953,34 @@ public:
 #ifdef __WINDOWS__
 		WinFWHelper::removeICMPRules();
 #endif
+
+		_rxPacketQueue.stop();
+		_rxPacketThreads_m.lock();
+		for(auto t=_rxPacketThreads.begin();t!=_rxPacketThreads.end();++t) {
+			t->join();
+		}
+		_rxPacketThreads_m.unlock();
 		_binder.closeAll(_phy);
 
 #if ZT_VAULT_SUPPORT
 		curl_global_cleanup();
 #endif
 
-        _controlPlane.stop();
+		_controlPlane.stop();
 		if (_serverThreadRunning) {
-	        _serverThread.join();
+			_serverThread.join();
 		}
 		_controlPlaneV6.stop();
 		if (_serverThreadRunningV6) {
 			_serverThreadV6.join();
 		}
+		_rxPacketVector_m.lock();
+		while (!_rxPacketVector.empty()) {
+			delete _rxPacketVector.back();
+			_rxPacketVector.pop_back();
+		}
+		_rxPacketVector_m.unlock();
+
 
 #ifdef ZT_USE_MINIUPNPC
 		delete _portMapper;
@@ -964,6 +989,15 @@ public:
 		delete _rc;
 	}
 
+	void setUpMultithreading()
+	{
+#if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)
+		return;
+#endif
+		_node->initMultithreading(_concurrency, _cpuPinningEnabled);
+		bool pinning = _cpuPinningEnabled;
+	}
+
 	virtual ReasonForTermination run()
 	{
 		try {
@@ -1272,6 +1306,9 @@ public:
 				const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 500;
 				clockShouldBe = now + (int64_t)delay;
 				_phy.poll(delay);
+
+
+
 			}
 		} catch (std::exception &e) {
 			Mutex::Lock _l(_termReason_m);
@@ -2510,7 +2547,7 @@ public:
 					}
 					_node->bondController()->addCustomLink(customPolicyStr, new Link(linkNameStr,ipvPref,mtu,capacity,enabled,linkMode,failoverToStr));
 				}
-				std::string linkSelectMethodStr(OSUtils::jsonString(customPolicy["activeReselect"],"optimize"));
+				std::string linkSelectMethodStr(OSUtils::jsonString(customPolicy["activeReselect"],"always"));
 				if (linkSelectMethodStr == "always") {
 					newTemplateBond->setLinkSelectMethod(ZT_BOND_RESELECTION_POLICY_ALWAYS);
 				}
@@ -2562,7 +2599,29 @@ public:
 			fprintf(stderr,"WARNING: using manually-specified secondary and/or tertiary ports. This can cause NAT issues." ZT_EOL_S);
 		}
 		_portMappingEnabled = OSUtils::jsonBool(settings["portMappingEnabled"],true);
-		_node->setLowBandwidthMode(OSUtils::jsonBool(settings["lowBandwidthMode"],false));
+#if defined(__LINUX__) || defined(__FreeBSD__)
+		_multicoreEnabled = OSUtils::jsonBool(settings["multicoreEnabled"],false);
+		_concurrency = OSUtils::jsonInt(settings["concurrency"],1);
+		_cpuPinningEnabled = OSUtils::jsonBool(settings["cpuPinningEnabled"],false);
+		if (_multicoreEnabled) {
+			unsigned int maxConcurrency = std::thread::hardware_concurrency();
+			if (_concurrency <= 1 || _concurrency >= maxConcurrency) {
+				unsigned int conservativeDefault = (std::thread::hardware_concurrency() >= 4 ? 2 : 1);
+				fprintf(stderr, "Concurrency level provided (%d) is invalid, assigning conservative default value of (%d)\n", _concurrency, conservativeDefault);
+				_concurrency =  conservativeDefault;
+			}
+			setUpMultithreading();
+		}
+		else {
+			// Force values in case the user accidentally defined them with multicore disabled
+			_concurrency = 1;
+			_cpuPinningEnabled = false;
+		}
+#else
+		_multicoreEnabled = false;
+		_concurrency = 1;
+		_cpuPinningEnabled = false;
+#endif
 
 #ifndef ZT_SDK
 		const std::string up(OSUtils::jsonString(settings["softwareUpdate"],ZT_SOFTWARE_UPDATE_DEFAULT));
@@ -2877,16 +2936,19 @@ public:
 	// Handlers for Node and Phy<> callbacks
 	// =========================================================================
 
-	inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len)
+
+
+
+	inline void phyOnDatagram(PhySocket* sock, void** uptr, const struct sockaddr* localAddr, const struct sockaddr* from, void* data, unsigned long len)
 	{
 		if (_forceTcpRelay) {
 			return;
 		}
-        Metrics::udp_recv += len;
+		Metrics::udp_recv += len;
 		const uint64_t now = OSUtils::now();
-		if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) {
+		if ((len >= 16) && (reinterpret_cast<const InetAddress*>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) {
 			_lastDirectReceiveFromGlobal = now;
-        }
+		}
 		const ZT_ResultCode rc = _node->processWirePacket(nullptr,now,reinterpret_cast<int64_t>(sock),reinterpret_cast<const struct sockaddr_storage *>(from),data,len,&_nextBackgroundTaskDeadline);
 		if (ZT_ResultCode_isFatal(rc)) {
 			char tmp[256];
@@ -2898,6 +2960,7 @@ public:
 		}
 	}
 
+
 	inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)
 	{
 		if (!success) {
@@ -3116,6 +3179,8 @@ public:
 
 						n.setTap(EthernetTap::newInstance(
 							nullptr,
+							_concurrency,
+							_cpuPinningEnabled,
 							_homePath.c_str(),
 							MAC(nwc->mac),
 							nwc->mtu,
@@ -3630,8 +3695,9 @@ public:
 	inline void nodeVirtualNetworkFrameFunction(uint64_t nwid,void **nuptr,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
 	{
 		NetworkState *n = reinterpret_cast<NetworkState *>(*nuptr);
-		if ((!n)||(!n->tap()))
+		if ((!n)||(!n->tap())) {
 			return;
+		}
 		n->tap()->put(MAC(sourceMac),MAC(destMac),etherType,data,len);
 	}
 

+ 1 - 1
version.h

@@ -27,7 +27,7 @@
 /**
  * Revision
  */
-#define ZEROTIER_ONE_VERSION_REVISION 0
+#define ZEROTIER_ONE_VERSION_REVISION 1
 
 /**
  * Build version

+ 1 - 0
windows/ZeroTierOne/ZeroTierOne.vcxproj

@@ -88,6 +88,7 @@
     <ClCompile Include="..\..\node\Node.cpp" />
     <ClCompile Include="..\..\node\OutboundMulticast.cpp" />
     <ClCompile Include="..\..\node\Packet.cpp" />
+    <ClCompile Include="..\..\node\PacketMultiplexer.cpp" />
     <ClCompile Include="..\..\node\Path.cpp" />
     <ClCompile Include="..\..\node\Peer.cpp" />
     <ClCompile Include="..\..\node\Poly1305.cpp">

+ 3 - 0
windows/ZeroTierOne/ZeroTierOne.vcxproj.filters

@@ -294,6 +294,9 @@
     <ClCompile Include="..\..\node\Metrics.cpp">
       <Filter>Source Files\node</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\node\PacketMultiplexer.cpp">
+      <Filter>Source Files\node</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="resource.h">

+ 1 - 1
zerotier-one.spec

@@ -1,5 +1,5 @@
 Name:           zerotier-one
-Version:        1.14.0
+Version:        1.14.1
 Release:        1%{?dist}
 Summary:        ZeroTier network virtualization service
 

部分文件因文件數量過多而無法顯示