Browse Source

Added notion of Flows

Joseph Henry 6 years ago
parent
commit
0634214f2c
8 changed files with 446 additions and 120 deletions
  1. 35 13
      include/ZeroTierOne.h
  2. 6 0
      node/Constants.hpp
  3. 0 8
      node/Path.hpp
  4. 180 35
      node/Peer.cpp
  5. 46 1
      node/Peer.hpp
  6. 170 56
      node/Switch.cpp
  7. 7 5
      node/Switch.hpp
  8. 2 2
      node/Trace.cpp

+ 35 - 13
include/ZeroTierOne.h

@@ -434,27 +434,49 @@ enum ZT_ResultCode
 enum ZT_MultipathMode
 {
 	/**
-	 * No active multipath.
-	 *
-	 * Traffic is merely sent over the strongest path. That being
-	 * said, this mode will automatically failover in the event that a link goes down.
+	 * No fault tolerance or balancing.
 	 */
 	ZT_MULTIPATH_NONE = 0,
 
 	/**
-	 * Traffic is randomly distributed among all active paths.
-	 *
-	 * Will cease sending traffic over links that appear to be stale.
+	 * Sends traffic out on all paths.
 	 */
-	ZT_MULTIPATH_RANDOM = 1,
+	ZT_MULTIPATH_BROADCAST = 1,
 
 	/**
-	 * Traffic is allocated across all active paths in proportion to their strength and
-	 * reliability.
-	 *
-	 * Will cease sending traffic over links that appear to be stale.
+	 * Sends traffic out on only one path at a time. Immediate fail-over.
+	 */
+	ZT_MULTIPATH_ACTIVE_BACKUP= 2,
+
+	/**
+	 * Sends traffic out on all interfaces according to a uniform random distribution.
+	 */
+	ZT_MULTIPATH_BALANCE_RANDOM = 3,
+
+	/**
+	 * Stripes packets across all paths.
+	 */
+	ZT_MULTIPATH_BALANCE_RR_OPAQUE = 4,
+
+	/**
+	 * Balances flows across all paths.
+	 */
+	ZT_MULTIPATH_BALANCE_RR_FLOW = 5,
+
+	/**
+	 * Hashes flows across all paths.
+	 */
+	ZT_MULTIPATH_BALANCE_XOR_FLOW = 6,
+
+	/**
+	 * Balances traffic across all paths according to observed performance.
+	 */
+	ZT_MULTIPATH_BALANCE_DYNAMIC_OPAQUE = 7,
+
+	/**
+	 * Balances flows across all paths.
 	 */
-	ZT_MULTIPATH_PROPORTIONALLY_BALANCED = 2,
+	ZT_MULTIPATH_BALANCE_DYNAMIC_FLOW = 8,
 };
 
 /**

+ 6 - 0
node/Constants.hpp

@@ -266,6 +266,12 @@
  */
 #define ZT_LOCAL_CONF_FILE_CHECK_INTERVAL 10000
 
+/**
+ * How long before we consider a flow to be dead and remove it from the balancing
+ * policy's list.
+ */
+#define ZT_MULTIPATH_FLOW_EXPIRATION 60000
+
 /**
  * How frequently to check for changes to the system's network interfaces. When
  * the service decides to use this constant it's because we want to react more

+ 0 - 8
node/Path.hpp

@@ -308,7 +308,6 @@ public:
 	 */
 	inline void recordOutgoingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb)
 	{
-		DEBUG_INFO("");
 		Mutex::Lock _l(_statistics_m);
 		if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) {
 			if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) {
@@ -332,7 +331,6 @@ public:
 	 */
 	inline void recordIncomingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb)
 	{
-		DEBUG_INFO("");
 		Mutex::Lock _l(_statistics_m);
 		if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) {
 			if ((packetId & (ZT_PATH_QOS_ACK_PROTOCOL_DIVISOR - 1)) == 0) {
@@ -353,7 +351,6 @@ public:
 	 */
 	inline void receivedAck(int64_t now, int32_t ackedBytes)
 	{
-		DEBUG_INFO("");
 		_expectingAckAsOf = 0;
 		_unackedBytes = (ackedBytes > _unackedBytes) ? 0 : _unackedBytes - ackedBytes;
 		int64_t timeSinceThroughputEstimate = (now - _lastThroughputEstimation);
@@ -398,7 +395,6 @@ public:
 	 */
 	inline void sentAck(int64_t now)
 	{
-		DEBUG_INFO("");
 		Mutex::Lock _l(_statistics_m);
 		_inACKRecords.clear();
 		_packetsReceivedSinceLastAck = 0;
@@ -416,7 +412,6 @@ public:
 	 */
 	inline void receivedQoS(int64_t now, int count, uint64_t *rx_id, uint16_t *rx_ts)
 	{
-		DEBUG_INFO("");
 		Mutex::Lock _l(_statistics_m);
 		// Look up egress times and compute latency values for each record
 		std::map<uint64_t,uint64_t>::iterator it;
@@ -441,7 +436,6 @@ public:
 	 */
 	inline int32_t generateQoSPacket(int64_t now, char *qosBuffer)
 	{
-		DEBUG_INFO("");
 		Mutex::Lock _l(_statistics_m);
 		int32_t len = 0;
 		std::map<uint64_t,uint64_t>::iterator it = _inQoSRecords.begin();
@@ -466,7 +460,6 @@ public:
 	 * @param Current time
 	 */
 	inline void sentQoS(int64_t now) {
-		DEBUG_INFO("");
 		_packetsReceivedSinceLastQoS = 0;
 		_lastQoSMeasurement = now;
 	}
@@ -586,7 +579,6 @@ public:
 	inline void processBackgroundPathMeasurements(const int64_t now)
 	{
 		if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
-			DEBUG_INFO("");
 			Mutex::Lock _l(_statistics_m);
 			_lastPathQualityComputeTime = now;
 			address().toString(_addrString);

+ 180 - 35
node/Peer.cpp

@@ -75,7 +75,9 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident
 	_linkIsRedundant(false),
 	_remotePeerMultipathEnabled(false),
 	_lastAggregateStatsReport(0),
-	_lastAggregateAllocation(0)
+	_lastAggregateAllocation(0),
+	_virtualPathCount(0),
+	_roundRobinPathAssignmentIdx(0)
 {
 	if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH))
 		throw ZT_EXCEPTION_INVALID_ARGUMENT;
@@ -195,6 +197,9 @@ void Peer::received(
 				} else {
 					attemptToContact = true;
 				}
+
+				// Every time we learn of new path, rebuild set of virtual paths
+				constructSetOfVirtualPaths();
 			}
 		}
 
@@ -256,6 +261,39 @@ void Peer::received(
 	}
 }
 
+void Peer::constructSetOfVirtualPaths()
+{
+	if (!_remoteMultipathSupported) {
+		return;
+	}
+	Mutex::Lock _l(_virtual_paths_m);
+
+	int64_t now = RR->node->now();
+	_virtualPathCount = 0;
+	for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+		if (_paths[i].p && _paths[i].p->alive(now)) {
+			for(unsigned int j=0;j<ZT_MAX_PEER_NETWORK_PATHS;++j) {
+				if (_paths[j].p && _paths[j].p->alive(now)) {
+					int64_t localSocket = _paths[j].p->localSocket();
+					bool foundVirtualPath = false;
+					for (int k=0; k<_virtualPaths.size(); k++) {
+						if (_virtualPaths[k]->localSocket == localSocket && _virtualPaths[k]->p == _paths[i].p) {
+							foundVirtualPath = true;
+						}
+					}
+					if (!foundVirtualPath)
+					{
+						VirtualPath *np = new VirtualPath;
+						np->p = _paths[i].p;
+						np->localSocket = localSocket;
+						_virtualPaths.push_back(np);
+					}
+				}
+			}
+		}
+	}
+}
+
 void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId,
 	uint16_t payloadLength, const Packet::Verb verb, int64_t now)
 {
@@ -320,10 +358,10 @@ void Peer::computeAggregateAllocation(int64_t now)
 	for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
 		if (_paths[i].p) {
 
-			if (RR->node->getMultipathMode() == ZT_MULTIPATH_RANDOM) {
+			if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_RANDOM) {
 				_paths[i].p->updateComponentAllocationOfAggregateLink(((float)_pathChoiceHist.countValue(i) / (float)_pathChoiceHist.count()) * 255);
 			}
-			if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) {
+			if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_DYNAMIC_OPAQUE) {
 				_paths[i].p->updateComponentAllocationOfAggregateLink((unsigned char)((_paths[i].p->relativeQuality() / totalRelativeQuality) * 255));
 			}
 		}
@@ -382,9 +420,22 @@ int Peer::aggregateLinkLogicalPathCount()
 	return pathCount;
 }
 
-SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
+std::vector<SharedPtr<Path>> Peer::getAllPaths(int64_t now)
+{
+	Mutex::Lock _l(_virtual_paths_m); // FIXME: TX can now lock RX
+	std::vector<SharedPtr<Path>> paths;
+	for (int i=0; i<_virtualPaths.size(); i++) {
+		if (_virtualPaths[i]->p) {
+			paths.push_back(_virtualPaths[i]->p);
+		}
+	}
+	return paths;
+}
+
+SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired, int64_t flowId)
 {
 	Mutex::Lock _l(_paths_m);
+	SharedPtr<Path> selectedPath;
 	unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS;
 
 	/**
@@ -410,52 +461,129 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
 		return SharedPtr<Path>();
 	}
 
+	// Update path measurements
 	for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
 		if (_paths[i].p) {
 			_paths[i].p->processBackgroundPathMeasurements(now);
 		}
 	}
+	// Detect new flows and update existing records
+	if (_flows.count(flowId)) {
+		_flows[flowId]->lastSend = now;
+	}
+	else {
+		fprintf(stderr, "new flow %llx detected between this node and %llx (%lu active flow(s))\n",
+			flowId, this->_id.address().toInt(), (_flows.size()+1));
+		struct Flow *newFlow = new Flow(flowId, now);
+		_flows[flowId] = newFlow;
+		newFlow->assignedPath = nullptr;
+	}
+	// Construct set of virtual paths if needed
+	if (!_virtualPaths.size()) {
+		constructSetOfVirtualPaths();
+	}
+	if (!_virtualPaths.size()) {
+		fprintf(stderr, "no paths to send packet out on\n");
+		return SharedPtr<Path>();
+	}
 
 	/**
-	 * Randomly distribute traffic across all paths
+	 * Traffic is randomly distributed among all active paths.
 	 */
 	int numAlivePaths = 0;
 	int numStalePaths = 0;
-	if (RR->node->getMultipathMode() == ZT_MULTIPATH_RANDOM) {
-		computeAggregateAllocation(now); /* This call is algorithmically inert but gives us a value to show in the status output */
-		int alivePaths[ZT_MAX_PEER_NETWORK_PATHS];
-		int stalePaths[ZT_MAX_PEER_NETWORK_PATHS];
-		memset(&alivePaths, -1, sizeof(alivePaths));
-		memset(&stalePaths, -1, sizeof(stalePaths));
-		for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
-			if (_paths[i].p) {
-				if (_paths[i].p->alive(now)) {
-					alivePaths[numAlivePaths] = i;
-					numAlivePaths++;
-				}
-				else {
-					stalePaths[numStalePaths] = i;
-					numStalePaths++;
-				}
-			}
+	if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_RANDOM) {
+		int sz = _virtualPaths.size();
+		if (sz) {
+			int idx = _freeRandomByte % sz;
+			_pathChoiceHist.push(idx);
+			char pathStr[128];
+			_virtualPaths[idx]->p->address().toString(pathStr);
+			fprintf(stderr, "sending out: (%llx), idx=%d: path=%s, localSocket=%lld\n",
+				this->_id.address().toInt(), idx, pathStr, _virtualPaths[idx]->localSocket);
+			return _virtualPaths[idx]->p;
 		}
-		unsigned int r = _freeRandomByte;
-		if (numAlivePaths > 0) {
-			int rf = r % numAlivePaths;
-			_pathChoiceHist.push(alivePaths[rf]); // Record which path we chose
-			return _paths[alivePaths[rf]].p;
+		// This call is algorithmically inert but gives us a value to show in the status output
+		computeAggregateAllocation(now);
+	}
+
+	/**
+	 * All traffic is sent on all paths.
+	 */
+	if (RR->node->getMultipathMode() == ZT_MULTIPATH_BROADCAST) {
+		// Not handled here. Handled in Switch.cpp
+	}
+
+	/**
+	 * Only one link is active. Fail-over is immediate.
+	 */
+	if (RR->node->getMultipathMode() == ZT_MULTIPATH_ACTIVE_BACKUP) {
+		// fprintf(stderr, "ZT_MULTIPATH_ACTIVE_BACKUP\n");
+	}
+
+	/**
+	 * Packets are striped across all available paths.
+	 */
+	if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_RR_OPAQUE) {
+		// fprintf(stderr, "ZT_MULTIPATH_BALANCE_RR_OPAQUE\n");
+		int16_t previousIdx = _roundRobinPathAssignmentIdx;
+		if (_roundRobinPathAssignmentIdx < (_virtualPaths.size()-1)) {
+			_roundRobinPathAssignmentIdx++;
 		}
-		else if(numStalePaths > 0) {
-			// Resort to trying any non-expired path
-			int rf = r % numStalePaths;
-			return _paths[stalePaths[rf]].p;
+		else {
+			_roundRobinPathAssignmentIdx = 0;
 		}
+		selectedPath = _virtualPaths[previousIdx]->p;
+		char pathStr[128];
+		selectedPath->address().toString(pathStr);
+		fprintf(stderr, "sending packet out on path %s at index %d\n",
+			pathStr, previousIdx);
+		return selectedPath;
+	}
+
+	/**
+	 * Flows are striped across all available paths.
+	 */
+	if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_RR_FLOW) {
+		// fprintf(stderr, "ZT_MULTIPATH_BALANCE_RR_FLOW\n");
 	}
 
 	/**
-	 * Proportionally allocate traffic according to dynamic path quality measurements
+	 * Flows are hashed across all available paths.
 	 */
-	if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) {
+	if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_XOR_FLOW) {
+		// fprintf(stderr, "ZT_MULTIPATH_BALANCE_XOR_FLOW (%llx) \n", flowId);
+		char pathStr[128];
+		struct Flow *currFlow = NULL;
+		if (_flows.count(flowId)) {
+			currFlow = _flows[flowId];
+			if (!currFlow->assignedPath) {
+				int idx = abs((int)(currFlow->flowId % (_virtualPaths.size()-1)));
+				currFlow->assignedPath = _virtualPaths[idx];
+				_virtualPaths[idx]->p->address().toString(pathStr);
+				fprintf(stderr, "assigning flow %llx between this node and peer %llx to path %s at index %d\n",
+					currFlow->flowId, this->_id.address().toInt(), pathStr, idx);
+			}
+			else {
+				if (!currFlow->assignedPath->p->alive(now)) {
+					char newPathStr[128];
+					currFlow->assignedPath->p->address().toString(pathStr);
+					// Re-assign
+					int idx = abs((int)(currFlow->flowId % (_virtualPaths.size()-1)));
+					currFlow->assignedPath = _virtualPaths[idx];
+					_virtualPaths[idx]->p->address().toString(newPathStr);
+					fprintf(stderr, "path %s assigned to flow %llx between this node and %llx appears to be dead, reassigning to path %s\n",
+						pathStr, currFlow->flowId, this->_id.address().toInt(), newPathStr);
+				}
+			}
+			return currFlow->assignedPath->p;
+		}
+	}
+
+	/**
+	 * Proportionally allocate traffic according to dynamic path quality measurements.
+	 */
+	if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_DYNAMIC_OPAQUE) {
 		if ((now - _lastAggregateAllocation) >= ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
 			_lastAggregateAllocation = now;
 			computeAggregateAllocation(now);
@@ -476,6 +604,13 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
 			return _paths[bestPath].p;
 		}
 	}
+
+	/**
+	 * Flows are dynamically allocated across paths in proportion to link strength and load.
+	 */
+	if (RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_DYNAMIC_FLOW) {
+	}
+
 	return SharedPtr<Path>();
 }
 
@@ -676,10 +811,20 @@ inline void Peer::processBackgroundPeerTasks(const int64_t now)
 		_localMultipathSupported = ((RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) && (ZT_PROTO_VERSION > 9));
 		_remoteMultipathSupported = _vProto > 9;
 		// If both peers support multipath and more than one path exist, we can use multipath logic
-		DEBUG_INFO("from=%llx, _localMultipathSupported=%d, _remoteMultipathSupported=%d, (_uniqueAlivePathCount > 1)=%d", 
-			this->_id.address().toInt(), _localMultipathSupported, _remoteMultipathSupported, (_uniqueAlivePathCount > 1));
 		_canUseMultipath = _localMultipathSupported && _remoteMultipathSupported && (_uniqueAlivePathCount > 1);
 	}
+
+	// Remove old flows
+	std::map<int64_t, struct Flow *>::iterator it = _flows.begin();
+	while (it != _flows.end()) {
+		if ((now - it->second->lastSend) > ZT_MULTIPATH_FLOW_EXPIRATION) {
+			fprintf(stderr, "forgetting flow %llx between this node and %llx (%lu active flow(s))\n",
+				it->first, this->_id.address().toInt(), _flows.size());
+			it = _flows.erase(it);
+		} else {
+			it++;
+		}
+	}
 }
 
 void Peer::sendACK(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now)

+ 46 - 1
node/Peer.hpp

@@ -28,6 +28,8 @@
 #define ZT_PEER_HPP
 
 #include <vector>
+#include <map>
+#include <queue>
 
 #include "../include/ZeroTierOne.h"
 
@@ -147,6 +149,8 @@ public:
 		return false;
 	}
 
+	void constructSetOfVirtualPaths();
+
 	/**
 	 * Record statistics on outgoing packets
 	 *
@@ -216,14 +220,17 @@ public:
 	 */
 	int aggregateLinkLogicalPathCount();
 
+	std::vector<SharedPtr<Path>> getAllPaths(int64_t now);
+
 	/**
 	 * Get the most appropriate direct path based on current multipath and QoS configuration
 	 *
 	 * @param now Current time
+	 * @param flowId Session-specific protocol flow identifier used for path allocation
 	 * @param includeExpired If true, include even expired paths
 	 * @return Best current path or NULL if none
 	 */
-	SharedPtr<Path> getAppropriatePath(int64_t now, bool includeExpired);
+	SharedPtr<Path> getAppropriatePath(int64_t now, bool includeExpired, int64_t flowId = -1);
 
 	/**
 	 * Generate a human-readable string of interface names making up the aggregate link, also include
@@ -680,6 +687,44 @@ private:
 	int64_t _lastAggregateAllocation;
 
 	char _interfaceListStr[256]; // 16 characters * 16 paths in a link
+
+	//
+	struct LinkPerformanceEntry
+	{
+		int64_t packetId;
+		struct VirtualPath *egressVirtualPath;
+		struct VirtualPath *ingressVirtualPath;
+	};
+
+	// Virtual paths
+	int _virtualPathCount;
+	Mutex _virtual_paths_m;
+	struct VirtualPath
+	{
+		SharedPtr<Path> p;
+		int64_t localSocket;
+		std::queue<struct LinkPerformanceEntry *> performanceEntries;
+	};
+	std::vector<struct VirtualPath*> _virtualPaths;
+
+	// Flows
+	struct Flow
+	{
+		Flow(int64_t fid, int64_t ls) :
+			flowId(fid),
+			lastSend(ls),
+			assignedPath(NULL)
+		{}
+
+		int64_t flowId;
+		int64_t bytesPerSecond;
+		int64_t lastSend;
+		struct VirtualPath *assignedPath;
+	};
+
+	std::map<int64_t, struct Flow *> _flows;
+
+	int16_t _roundRobinPathAssignmentIdx;
 };
 
 } // namespace ZeroTier

+ 170 - 56
node/Switch.cpp

@@ -255,6 +255,35 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
 	} catch ( ... ) {} // sanity check, should be caught elsewhere
 }
 
+// Returns true if packet appears valid; pos and proto will be set
+static bool _ipv6GetPayload(const uint8_t *frameData,unsigned int frameLen,unsigned int &pos,unsigned int &proto)
+{
+	if (frameLen < 40)
+		return false;
+	pos = 40;
+	proto = frameData[6];
+	while (pos <= frameLen) {
+		switch(proto) {
+			case 0: // hop-by-hop options
+			case 43: // routing
+			case 60: // destination options
+			case 135: // mobility options
+				if ((pos + 8) > frameLen)
+					return false; // invalid!
+				proto = frameData[pos];
+				pos += ((unsigned int)frameData[pos + 1] * 8) + 8;
+				break;
+
+			//case 44: // fragment -- we currently can't parse these and they are deprecated in IPv6 anyway
+			//case 50:
+			//case 51: // IPSec ESP and AH -- we have to stop here since this is encrypted stuff
+			default:
+				return true;
+		}
+	}
+	return false; // overflow == invalid
+}
+
 void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
 {
 	if (!network->hasConfig())
@@ -271,6 +300,73 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 
 	uint8_t qosBucket = ZT_QOS_DEFAULT_BUCKET;
 
+	/* A pseudo-unique identifier used by the balancing and bonding policies to associate properties
+	 * of a specific protocol flow over time and to determine which virtual path this packet
+	 * shall be sent out on. This identifier consists of the source port and destination port
+	 * of the encapsulated frame.
+	 *
+	 * A flowId of -1 will indicate that whatever packet we are about transmit has no
+	 * preferred virtual path and will be sent out according to what the multipath logic
+	 * deems appropriate. An example of this would be an ICMP packet.
+	 */
+	int64_t flowId = -1;
+
+	if (etherType == ZT_ETHERTYPE_IPV4 && (len >= 20)) {
+		uint16_t srcPort = 0;
+		uint16_t dstPort = 0;
+		int8_t proto = (reinterpret_cast<const uint8_t *>(data)[9]);
+		const unsigned int headerLen = 4 * (reinterpret_cast<const uint8_t *>(data)[0] & 0xf);
+		switch(proto) {
+			case 0x01: // ICMP
+				flowId = 0x01;
+				break;
+			// All these start with 16-bit source and destination port in that order
+			case 0x06: // TCP
+			case 0x11: // UDP
+			case 0x84: // SCTP
+			case 0x88: // UDPLite
+				if (len > (headerLen + 4)) {
+					unsigned int pos = headerLen + 0;
+					srcPort = (reinterpret_cast<const uint8_t *>(data)[pos++]) << 8;
+					srcPort |= (reinterpret_cast<const uint8_t *>(data)[pos]);
+					pos++;
+					dstPort = (reinterpret_cast<const uint8_t *>(data)[pos++]) << 8;
+					dstPort |= (reinterpret_cast<const uint8_t *>(data)[pos]);
+					flowId = ((int64_t)srcPort << 48) | ((int64_t)dstPort << 32) | proto;
+				}
+				break;
+		}
+	}
+
+	if (etherType == ZT_ETHERTYPE_IPV6 && (len >= 40)) {
+		uint16_t srcPort = 0;
+		uint16_t dstPort = 0;
+		unsigned int pos;
+		unsigned int proto;
+		_ipv6GetPayload((const uint8_t *)data, len, pos, proto);
+		switch(proto) {
+			case 0x3A: // ICMPv6
+				flowId = 0x3A;
+				break;
+			// All these start with 16-bit source and destination port in that order
+			case 0x06: // TCP
+			case 0x11: // UDP
+			case 0x84: // SCTP
+			case 0x88: // UDPLite
+				if (len > (pos + 4)) {
+					srcPort = (reinterpret_cast<const uint8_t *>(data)[pos++]) << 8;
+					srcPort |= (reinterpret_cast<const uint8_t *>(data)[pos]);
+					pos++;
+					dstPort = (reinterpret_cast<const uint8_t *>(data)[pos++]) << 8;
+					dstPort |= (reinterpret_cast<const uint8_t *>(data)[pos]);
+					flowId = ((int64_t)srcPort << 48) | ((int64_t)dstPort << 32) | proto;
+				}
+				break;
+			default:
+				break;
+		}
+	}
+
 	if (to.isMulticast()) {
 		MulticastGroup multicastGroup(to,0);
 
@@ -280,7 +376,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 				 * otherwise a straightforward Ethernet switch emulation. Vanilla ARP
 				 * is dumb old broadcast and simply doesn't scale. ZeroTier multicast
 				 * groups have an additional field called ADI (additional distinguishing
-			   * information) which was added specifically for ARP though it could
+				 * information) which was added specifically for ARP though it could
 				 * be used for other things too. We then take ARP broadcasts and turn
 				 * them into multicasts by stuffing the IP address being queried into
 				 * the 32-bit ADI field. In practice this uses our multicast pub/sub
@@ -429,7 +525,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 			outp.append(data,len);
 			if (!network->config().disableCompression())
 				outp.compress();
-			aqm_enqueue(tPtr,network,outp,true,qosBucket);
+			aqm_enqueue(tPtr,network,outp,true,qosBucket,flowId);
 		} else {
 			Packet outp(toZT,RR->identity.address(),Packet::VERB_FRAME);
 			outp.append(network->id());
@@ -437,7 +533,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 			outp.append(data,len);
 			if (!network->config().disableCompression())
 				outp.compress();
-			aqm_enqueue(tPtr,network,outp,true,qosBucket);
+			aqm_enqueue(tPtr,network,outp,true,qosBucket,flowId);
 		}
 	} else {
 		// Destination is bridged behind a remote peer
@@ -493,7 +589,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 				outp.append(data,len);
 				if (!network->config().disableCompression())
 					outp.compress();
-				aqm_enqueue(tPtr,network,outp,true,qosBucket);
+				aqm_enqueue(tPtr,network,outp,true,qosBucket,flowId);
 			} else {
 				RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked (bridge replication)");
 			}
@@ -501,10 +597,10 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 	}
 }
 
-void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket)
+void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket,int64_t flowId)
 {
 	if(!network->qosEnabled()) {
-		send(tPtr, packet, encrypt);
+		send(tPtr, packet, encrypt, flowId);
 		return;
 	}
 	NetworkQoSControlBlock *nqcb = _netQueueControlBlock[network->id()];
@@ -518,11 +614,9 @@ void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &
 			nqcb->inactiveQueues.push_back(new ManagedQueue(i));
 		}
 	}
-
+	// Don't apply QoS scheduling to ZT protocol traffic
 	if (packet.verb() != Packet::VERB_FRAME && packet.verb() != Packet::VERB_EXT_FRAME) {
-		// DEBUG_INFO("skipping, no QoS for this packet, verb=%x", packet.verb());
-		// just send packet normally, no QoS for ZT protocol traffic
-		send(tPtr, packet, encrypt);
+		send(tPtr, packet, encrypt, flowId);
 	} 
 
 	_aqm_m.lock();
@@ -530,7 +624,7 @@ void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &
 	// Enqueue packet and move queue to appropriate list
 
 	const Address dest(packet.destination());
-	TXQueueEntry *txEntry = new TXQueueEntry(dest,RR->node->now(),packet,encrypt);
+	TXQueueEntry *txEntry = new TXQueueEntry(dest,RR->node->now(),packet,encrypt,flowId);
 	
 	ManagedQueue *selectedQueue = nullptr;
 	for (size_t i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
@@ -702,7 +796,7 @@ void Switch::aqm_dequeue(void *tPtr)
 					queueAtFrontOfList->byteCredit -= len;
 					// Send the packet!
 					queueAtFrontOfList->q.pop_front();
-					send(tPtr, entryToEmit->packet, entryToEmit->encrypt);
+					send(tPtr, entryToEmit->packet, entryToEmit->encrypt, entryToEmit->flowId);
 					(*nqcb).second->_currEnqueuedPackets--;
 				}
 				if (queueAtFrontOfList) {
@@ -734,7 +828,7 @@ void Switch::aqm_dequeue(void *tPtr)
 					queueAtFrontOfList->byteLength -= len;
 					queueAtFrontOfList->byteCredit -= len;
 					queueAtFrontOfList->q.pop_front();
-					send(tPtr, entryToEmit->packet, entryToEmit->encrypt);
+					send(tPtr, entryToEmit->packet, entryToEmit->encrypt, entryToEmit->flowId);
 					(*nqcb).second->_currEnqueuedPackets--;
 				}
 				if (queueAtFrontOfList) {
@@ -758,18 +852,18 @@ void Switch::removeNetworkQoSControlBlock(uint64_t nwid)
 	}
 }
 
-void Switch::send(void *tPtr,Packet &packet,bool encrypt)
+void Switch::send(void *tPtr,Packet &packet,bool encrypt,int64_t flowId)
 {
 	const Address dest(packet.destination());
 	if (dest == RR->identity.address())
 		return;
-	if (!_trySend(tPtr,packet,encrypt)) {
+	if (!_trySend(tPtr,packet,encrypt,flowId)) {
 		{
 			Mutex::Lock _l(_txQueue_m);
 			if (_txQueue.size() >= ZT_TX_QUEUE_SIZE) {
 				_txQueue.pop_front();
 			}
-			_txQueue.push_back(TXQueueEntry(dest,RR->node->now(),packet,encrypt));
+			_txQueue.push_back(TXQueueEntry(dest,RR->node->now(),packet,encrypt,flowId));
 		}
 		if (!RR->topology->getPeer(tPtr,dest))
 			requestWhois(tPtr,RR->node->now(),dest);
@@ -791,10 +885,11 @@ void Switch::requestWhois(void *tPtr,const int64_t now,const Address &addr)
 
 	const SharedPtr<Peer> upstream(RR->topology->getUpstreamPeer());
 	if (upstream) {
+		int64_t flowId = -1;
 		Packet outp(upstream->address(),RR->identity.address(),Packet::VERB_WHOIS);
 		addr.appendTo(outp);
 		RR->node->expectReplyTo(outp.packetId());
-		send(tPtr,outp,true);
+		send(tPtr,outp,true,flowId);
 	}
 }
 
@@ -819,7 +914,7 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
 		Mutex::Lock _l(_txQueue_m);
 		for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
 			if (txi->dest == peer->address()) {
-				if (_trySend(tPtr,txi->packet,txi->encrypt)) {
+				if (_trySend(tPtr,txi->packet,txi->encrypt,txi->flowId)) {
 					_txQueue.erase(txi++);
 				} else {
 					++txi;
@@ -843,7 +938,7 @@ unsigned long Switch::doTimerTasks(void *tPtr,int64_t now)
 		Mutex::Lock _l(_txQueue_m);
 
 		for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
-			if (_trySend(tPtr,txi->packet,txi->encrypt)) {
+			if (_trySend(tPtr,txi->packet,txi->encrypt,txi->flowId)) {
 				_txQueue.erase(txi++);
 			} else if ((now - txi->creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) {
 				_txQueue.erase(txi++);
@@ -907,7 +1002,7 @@ bool Switch::_shouldUnite(const int64_t now,const Address &source,const Address
 	return false;
 }
 
-bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
+bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt,int64_t flowId)
 {
 	SharedPtr<Path> viaPath;
 	const int64_t now = RR->node->now();
@@ -915,54 +1010,73 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
 
 	const SharedPtr<Peer> peer(RR->topology->getPeer(tPtr,destination));
 	if (peer) {
-		viaPath = peer->getAppropriatePath(now,false);
-		if (!viaPath) {
-			peer->tryMemorizedPath(tPtr,now); // periodically attempt memorized or statically defined paths, if any are known
-			const SharedPtr<Peer> relay(RR->topology->getUpstreamPeer());
-			if ( (!relay) || (!(viaPath = relay->getAppropriatePath(now,false))) ) {
-				if (!(viaPath = peer->getAppropriatePath(now,true)))
-					return false;
+		if (RR->node->getMultipathMode() == ZT_MULTIPATH_BROADCAST) {
+			// Nothing here, we'll grab an entire set of paths to send out on below
+		}
+		else {
+			viaPath = peer->getAppropriatePath(now,false,flowId);
+			if (!viaPath) {
+				peer->tryMemorizedPath(tPtr,now); // periodically attempt memorized or statically defined paths, if any are known
+				const SharedPtr<Peer> relay(RR->topology->getUpstreamPeer());
+				if ( (!relay) || (!(viaPath = relay->getAppropriatePath(now,false,flowId))) ) {
+					if (!(viaPath = peer->getAppropriatePath(now,true,flowId)))
+						return false;
+				}
 			}
 		}
 	} else {
 		return false;
 	}
 
-	unsigned int mtu = ZT_DEFAULT_PHYSMTU;
-	uint64_t trustedPathId = 0;
-	RR->topology->getOutboundPathInfo(viaPath->address(),mtu,trustedPathId);
-
-	unsigned int chunkSize = std::min(packet.size(),mtu);
-	packet.setFragmented(chunkSize < packet.size());
+	// If sending on all paths, set viaPath to first path
+	int nextPathIdx = 0;
+	std::vector<SharedPtr<Path>> paths = peer->getAllPaths(now);
+	if (RR->node->getMultipathMode() == ZT_MULTIPATH_BROADCAST) {
+		if (paths.size()) {
+			viaPath = paths[nextPathIdx++];
+		}
+	}
 
-	peer->recordOutgoingPacket(viaPath, packet.packetId(), packet.payloadLength(), packet.verb(), now);
+	while (viaPath) {
+		unsigned int mtu = ZT_DEFAULT_PHYSMTU;
+		uint64_t trustedPathId = 0;
+		RR->topology->getOutboundPathInfo(viaPath->address(),mtu,trustedPathId);
+		unsigned int chunkSize = std::min(packet.size(),mtu);
+		packet.setFragmented(chunkSize < packet.size());
+		peer->recordOutgoingPacket(viaPath, packet.packetId(), packet.payloadLength(), packet.verb(), now);
 
-	if (trustedPathId) {
-		packet.setTrusted(trustedPathId);
-	} else {
-		packet.armor(peer->key(),encrypt);
-	}
+		if (trustedPathId) {
+			packet.setTrusted(trustedPathId);
+		} else {
+			packet.armor(peer->key(),encrypt);
+		}
 
-	if (viaPath->send(RR,tPtr,packet.data(),chunkSize,now)) {
-		if (chunkSize < packet.size()) {
-			// Too big for one packet, fragment the rest
-			unsigned int fragStart = chunkSize;
-			unsigned int remaining = packet.size() - chunkSize;
-			unsigned int fragsRemaining = (remaining / (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH));
-			if ((fragsRemaining * (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH)) < remaining)
-				++fragsRemaining;
-			const unsigned int totalFragments = fragsRemaining + 1;
-
-			for(unsigned int fno=1;fno<totalFragments;++fno) {
-				chunkSize = std::min(remaining,(unsigned int)(mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH));
-				Packet::Fragment frag(packet,fragStart,chunkSize,fno,totalFragments);
-				viaPath->send(RR,tPtr,frag.data(),frag.size(),now);
-				fragStart += chunkSize;
-				remaining -= chunkSize;
+		if (viaPath->send(RR,tPtr,packet.data(),chunkSize,now)) {
+			if (chunkSize < packet.size()) {
+				// Too big for one packet, fragment the rest
+				unsigned int fragStart = chunkSize;
+				unsigned int remaining = packet.size() - chunkSize;
+				unsigned int fragsRemaining = (remaining / (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH));
+				if ((fragsRemaining * (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH)) < remaining)
+					++fragsRemaining;
+				const unsigned int totalFragments = fragsRemaining + 1;
+
+				for(unsigned int fno=1;fno<totalFragments;++fno) {
+					chunkSize = std::min(remaining,(unsigned int)(mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH));
+					Packet::Fragment frag(packet,fragStart,chunkSize,fno,totalFragments);
+					viaPath->send(RR,tPtr,frag.data(),frag.size(),now);
+					fragStart += chunkSize;
+					remaining -= chunkSize;
+				}
+			}
+		}
+		viaPath.zero();
+		if (RR->node->getMultipathMode() == ZT_MULTIPATH_BROADCAST) {
+			if (paths.size() > nextPathIdx) {
+				viaPath = paths[nextPathIdx++];
 			}
 		}
 	}
-
 	return true;
 }
 

+ 7 - 5
node/Switch.hpp

@@ -131,7 +131,7 @@ public:
 	 * @param encrypt Encrypt packet payload? (always true except for HELLO)
 	 * @param qosBucket Which bucket the rule-system determined this packet should fall into
 	 */
-	void aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket);
+	void aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket,int64_t flowId = -1);
 
 	/**
 	 * Performs a single AQM cycle and dequeues and transmits all eligible packets on all networks
@@ -177,7 +177,7 @@ public:
 	 * @param packet Packet to send (buffer may be modified)
 	 * @param encrypt Encrypt packet payload? (always true except for HELLO)
 	 */
-	void send(void *tPtr,Packet &packet,bool encrypt);
+	void send(void *tPtr,Packet &packet,bool encrypt,int64_t flowId = -1);
 
 	/**
 	 * Request WHOIS on a given address
@@ -212,7 +212,7 @@ public:
 
 private:
 	bool _shouldUnite(const int64_t now,const Address &source,const Address &destination);
-	bool _trySend(void *tPtr,Packet &packet,bool encrypt); // packet is modified if return is true
+	bool _trySend(void *tPtr,Packet &packet,bool encrypt,int64_t flowId = -1); // packet is modified if return is true
 
 	const RuntimeEnvironment *const RR;
 	int64_t _lastBeaconResponse;
@@ -261,16 +261,18 @@ private:
 	struct TXQueueEntry
 	{
 		TXQueueEntry() {}
-		TXQueueEntry(Address d,uint64_t ct,const Packet &p,bool enc) :
+		TXQueueEntry(Address d,uint64_t ct,const Packet &p,bool enc,int64_t fid) :
 			dest(d),
 			creationTime(ct),
 			packet(p),
-			encrypt(enc) {}
+			encrypt(enc),
+			flowId(fid) {}
 
 		Address dest;
 		uint64_t creationTime;
 		Packet packet; // unencrypted/unMAC'd packet -- this is done at send time
 		bool encrypt;
+		int64_t flowId;
 	};
 	std::list< TXQueueEntry > _txQueue;
 	Mutex _txQueue_m;

+ 2 - 2
node/Trace.cpp

@@ -109,10 +109,10 @@ void Trace::peerConfirmingUnknownPath(void *const tPtr,const uint64_t networkId,
 
 void Trace::peerLinkNowAggregate(void *const tPtr,Peer &peer)
 {
-	if ((RR->node->getMultipathMode() == ZT_MULTIPATH_RANDOM)) {
+	if ((RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_RANDOM)) {
 		ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is now a randomly-distributed aggregate link",peer.address().toInt());
 	}
-	if ((RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED)) {
+	if ((RR->node->getMultipathMode() == ZT_MULTIPATH_BALANCE_DYNAMIC_OPAQUE)) {
 		ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is now a proportionally-balanced aggregate link",peer.address().toInt());
 	}
 }