Browse Source

Added auto-escalation to multipath if both peers support it. Improved QoS/ACK tracking. Related bug fixes

Joseph Henry 7 years ago
parent
commit
91a22a686a
5 changed files with 200 additions and 112 deletions
  1. 21 6
      node/Constants.hpp
  2. 27 25
      node/IncomingPacket.cpp
  3. 98 50
      node/Path.hpp
  4. 27 24
      node/Peer.cpp
  5. 27 7
      node/Peer.hpp

+ 21 - 6
node/Constants.hpp

@@ -334,6 +334,11 @@
 #define ZT_PATH_CONTRIB_THROUGHPUT 1.50 / 3.0
 #define ZT_PATH_CONTRIB_SCOPE      0.75 / 3.0
 
+/**
+ * How often a QoS packet is sent
+ */
+#define ZT_PATH_QOS_INTERVAL 3000
+
 /**
  * Min and max acceptable sizes for a VERB_QOS_MEASUREMENT packet
  */
@@ -343,7 +348,22 @@
 /**
  * How many ID:sojourn time pairs in a single QoS packet
  */
-#define ZT_PATH_QOS_TABLE_SIZE (ZT_PATH_MAX_QOS_PACKET_SZ * 8) / (64 + 8)
+#define ZT_PATH_QOS_TABLE_SIZE (ZT_PATH_MAX_QOS_PACKET_SZ * 8) / (64 + 16)
+
+/**
+ * Maximum number of outgoing packets we monitor for QoS information
+ */
+#define ZT_PATH_MAX_OUTSTANDING_QOS_RECORDS 128
+
+/**
+ * How often we check the age of QoS records
+ */
+#define ZT_PATH_QOS_RECORD_PURGE_INTERVAL 1000
+
+/**
+ * Timeout for QoS records
+ */
+#define ZT_PATH_QOS_TIMEOUT ZT_PATH_QOS_INTERVAL * 2
 
 /**
  * How often the service tests the path throughput
@@ -355,11 +375,6 @@
  */
 #define ZT_PATH_ACK_INTERVAL 250
 
-/**
- * How often a QoS packet is sent
- */
-#define ZT_PATH_QOS_INTERVAL 1000
-
 /**
  * How often an aggregate link statistics report is emitted into this tracing system
  */

+ 27 - 25
node/IncomingPacket.cpp

@@ -206,43 +206,45 @@ bool IncomingPacket::_doACK(const RuntimeEnvironment *RR,void *tPtr,const Shared
 {
 	/* Dissect incoming ACK packet. From this we can estimate current throughput of the path, establish known
 	 * maximums and detect packet loss. */
-
-	if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
+	if (peer->localMultipathSupport()) {
 		int32_t ackedBytes;
-		memcpy(&ackedBytes, payload(), sizeof(int32_t));
+		if (payloadLength() != sizeof(ackedBytes)) {
+			return true; // ignore
+		}
+		memcpy(&ackedBytes, payload(), sizeof(ackedBytes));
 		_path->receivedAck(RR->node->now(), Utils::ntoh(ackedBytes));
+		peer->inferRemoteMultipathEnabled();
 	}
 
 	return true;
 }
-
 bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer)
 {
-
 	/* Dissect incoming QoS packet. From this we can compute latency values and their variance.
 	 * The latency variance is used as a measure of "jitter". */
-
-	if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
-		if (payloadLength() < ZT_PATH_MAX_QOS_PACKET_SZ && payloadLength() > ZT_PATH_MIN_QOS_PACKET_SZ) {
-			const int64_t now = RR->node->now();
-			uint64_t rx_id[ZT_PATH_QOS_TABLE_SIZE];
-			uint8_t rx_ts[ZT_PATH_QOS_TABLE_SIZE];
-			char *begin = (char *)payload();
-			char *ptr = begin;
-			int count = 0;
-			int len = payloadLength();
-			// Read packet IDs and latency compensation intervals for each packet tracked by thie QoS packet
-			while (ptr < (begin + len)) {
-				memcpy((void*)&rx_id[count], ptr, sizeof(uint64_t));
-				rx_id[count] = Utils::ntoh(rx_id[count]);
-				ptr+=sizeof(uint64_t);
-				memcpy((void*)&rx_ts[count], ptr, sizeof(uint8_t));
-				ptr+=sizeof(uint8_t);
-				count++;
-			}
-			_path->receivedQoS(now, count, rx_id, rx_ts);
+	if (peer->localMultipathSupport()) {
+		if (payloadLength() > ZT_PATH_MAX_QOS_PACKET_SZ || payloadLength() < ZT_PATH_MIN_QOS_PACKET_SZ) {
+			return true; // ignore
 		}
+		const int64_t now = RR->node->now();
+		uint64_t rx_id[ZT_PATH_QOS_TABLE_SIZE];
+		uint16_t rx_ts[ZT_PATH_QOS_TABLE_SIZE];
+		char *begin = (char *)payload();
+		char *ptr = begin;
+		int count = 0;
+		int len = payloadLength();
+		// Read packet IDs and latency compensation intervals for each packet tracked by thie QoS packet
+		while (ptr < (begin + len) && (count < ZT_PATH_QOS_TABLE_SIZE)) {
+			memcpy((void*)&rx_id[count], ptr, sizeof(uint64_t));
+			ptr+=sizeof(uint64_t);
+			memcpy((void*)&rx_ts[count], ptr, sizeof(uint16_t));
+			ptr+=sizeof(uint16_t);
+			count++;
+		}
+		_path->receivedQoS(now, count, rx_id, rx_ts);
+		peer->inferRemoteMultipathEnabled();
 	}
+
 	return true;
 }
 

+ 98 - 50
node/Path.hpp

@@ -40,6 +40,7 @@
 #include "AtomicCounter.hpp"
 #include "Utils.hpp"
 #include "RingBuffer.hpp"
+#include "Packet.hpp"
 
 #include "../osdep/Phy.hpp"
 
@@ -105,9 +106,11 @@ public:
 		_lastAck(0),
 		_lastThroughputEstimation(0),
 		_lastQoSMeasurement(0),
+		_lastQoSRecordPurge(0),
 		_unackedBytes(0),
 		_expectingAckAsOf(0),
 		_packetsReceivedSinceLastAck(0),
+		_packetsReceivedSinceLastQoS(0),
 		_meanThroughput(0.0),
 		_maxLifetimeThroughput(0),
 		_bytesAckedSinceLastThroughputEstimation(0),
@@ -133,9 +136,11 @@ public:
 		_lastAck(0),
 		_lastThroughputEstimation(0),
 		_lastQoSMeasurement(0),
+		_lastQoSRecordPurge(0),
 		_unackedBytes(0),
 		_expectingAckAsOf(0),
 		_packetsReceivedSinceLastAck(0),
+		_packetsReceivedSinceLastQoS(0),
 		_meanThroughput(0.0),
 		_maxLifetimeThroughput(0),
 		_bytesAckedSinceLastThroughputEstimation(0),
@@ -147,6 +152,7 @@ public:
 		_lastComputedRelativeQuality(0)
 	{
 		prepareBuffers();
+		_phy->getIfName((PhySocket *)((uintptr_t)_localSocket), _ifname, 16);
 	}
 
 	~Path()
@@ -295,17 +301,52 @@ public:
 	}
 
 	/**
-	 * Take note that we're expecting a VERB_ACK on this path as of a specific time
+	 * Record statistics on outgoing packets. Used later to estimate QoS metrics.
 	 *
 	 * @param now Current time
-	 * @param packetId ID of the packet
-	 * @param payloadLength Number of bytes we're is expecting a reply to
+	 * @param packetId ID of packet
+	 * @param payloadLength Length of payload
+	 * @param verb Packet verb
 	 */
-	inline void expectingAck(int64_t now, int64_t packetId, uint16_t payloadLength)
+	inline void recordOutgoingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb)
 	{
-		_expectingAckAsOf = ackAge(now) > ZT_PATH_ACK_INTERVAL ? _expectingAckAsOf : now;
-		_unackedBytes += payloadLength;
-		_outgoingPacketRecords[packetId] = now;
+		Mutex::Lock _l(_statistics_m);
+		if (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME) {
+			if (packetId % 2 == 0) { // even -> use for ACK
+				_unackedBytes += payloadLength;
+				// Take note that we're expecting a VERB_ACK on this path as of a specific time
+				_expectingAckAsOf = ackAge(now) > ZT_PATH_ACK_INTERVAL ? _expectingAckAsOf : now;
+			}
+			else { // odd -> use for QoS
+				if (_outQoSRecords.size() < ZT_PATH_MAX_OUTSTANDING_QOS_RECORDS) {
+					_outQoSRecords[packetId] = now;
+				}
+			}
+		}
+	}
+
+	/**
+	 * Record statistics on incoming packets. Used later to estimate QoS metrics.
+	 *
+	 * @param now Current time
+	 * @param packetId ID of packet
+	 * @param payloadLength Length of payload
+	 * @param verb Packet verb
+	 */
+	inline void recordIncomingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb)
+	{
+		Mutex::Lock _l(_statistics_m);
+		if (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME) {
+			if (packetId % 2 == 0) { // even -> use for ACK
+				_inACKRecords[packetId] = payloadLength;
+				_packetsReceivedSinceLastAck++;
+			}
+			else { // odd -> use for QoS
+				_inQoSRecords[packetId] = now;
+				_packetsReceivedSinceLastQoS++;
+			}
+			_packetValiditySamples->push(true);
+		}
 	}
 
 	/**
@@ -335,9 +376,12 @@ public:
 	 */
 	inline int32_t bytesToAck()
 	{
+		Mutex::Lock _l(_statistics_m);
 		int32_t bytesToAck = 0;
-		for (int i=0; i<_packetsReceivedSinceLastAck; i++) {
-			bytesToAck += _recorded_len[i];
+		std::map<uint64_t,uint16_t>::iterator it = _inACKRecords.begin();
+		while (it != _inACKRecords.end()) {
+			bytesToAck += it->second;
+			it++;
 		}
 		return bytesToAck;
 	}
@@ -357,9 +401,8 @@ public:
 	 */
 	inline void sentAck(int64_t now)
 	{
-		memset(_recorded_id, 0, sizeof(_recorded_id));
-		memset(_recorded_ts, 0, sizeof(_recorded_ts));
-		memset(_recorded_len, 0, sizeof(_recorded_len));
+		Mutex::Lock _l(_statistics_m);
+		_inACKRecords.clear();
 		_packetsReceivedSinceLastAck = 0;
 		_lastAck = now;
 	}
@@ -373,17 +416,19 @@ public:
 	 * @param rx_id table of packet IDs
 	 * @param rx_ts table of holding times
 	 */
-	inline void receivedQoS(int64_t now, int count, uint64_t *rx_id, uint8_t *rx_ts)
+	inline void receivedQoS(int64_t now, int count, uint64_t *rx_id, uint16_t *rx_ts)
 	{
+		Mutex::Lock _l(_statistics_m);
 		// Look up egress times and compute latency values for each record
+		std::map<uint64_t,uint64_t>::iterator it;
 		for (int j=0; j<count; j++) {
-			std::map<uint64_t,uint64_t>::iterator it = _outgoingPacketRecords.find(rx_id[j]);
-			if (it != _outgoingPacketRecords.end()) {
+			it = _outQoSRecords.find(rx_id[j]);
+			if (it != _outQoSRecords.end()) {
 				uint16_t rtt = (uint16_t)(now - it->second);
 				uint16_t rtt_compensated = rtt - rx_ts[j];
 				float latency = rtt_compensated / 2.0;
 				updateLatency(latency, now);
-				_outgoingPacketRecords.erase(it);
+				_outQoSRecords.erase(it);
 			}
 		}
 	}
@@ -397,15 +442,20 @@ public:
 	 */
 	inline int32_t generateQoSPacket(int64_t now, char *qosBuffer)
 	{
+		Mutex::Lock _l(_statistics_m);
 		int32_t len = 0;
-		for (int i=0; i<_packetsReceivedSinceLastAck; i++) {
-			uint64_t id = _recorded_id[i];
+		std::map<uint64_t,uint64_t>::iterator it = _inQoSRecords.begin();
+		int i=0;
+		while (i<_packetsReceivedSinceLastQoS && it != _inQoSRecords.end()) {
+			uint64_t id = it->first;
 			memcpy(qosBuffer, &id, sizeof(uint64_t));
 			qosBuffer+=sizeof(uint64_t);
-			uint8_t holdingTime = (uint8_t)(now - _recorded_ts[i]);
-			memcpy(qosBuffer, &holdingTime, sizeof(uint8_t));
-			qosBuffer+=sizeof(uint8_t);
-			len+=sizeof(uint64_t)+sizeof(uint8_t);
+			uint16_t holdingTime = (now - it->second);
+			memcpy(qosBuffer, &holdingTime, sizeof(uint16_t));
+			qosBuffer+=sizeof(uint16_t);
+			len+=sizeof(uint64_t)+sizeof(uint16_t);
+			_inQoSRecords.erase(it++);
+			i++;
 		}
 		return len;
 	}
@@ -415,22 +465,9 @@ public:
 	 *
 	 * @param Current time
 	 */
-	inline void sentQoS(int64_t now) { _lastQoSMeasurement = now; }
-
-	/**
-	 * Record statistics on incoming packets. Used later to estimate QoS.
-	 *
-	 * @param now Current time
-	 * @param packetId
-	 * @param payloadLength
-	 */
-	inline void recordIncomingPacket(int64_t now, int64_t packetId, int32_t payloadLength)
-	{
-		_recorded_ts[_packetsReceivedSinceLastAck] = now;
-		_recorded_id[_packetsReceivedSinceLastAck] = packetId;
-		_recorded_len[_packetsReceivedSinceLastAck] = payloadLength;
-		_packetsReceivedSinceLastAck++;
-		_packetValiditySamples->push(true);
+	inline void sentQoS(int64_t now) {
+		_packetsReceivedSinceLastQoS = 0;
+		_lastQoSMeasurement = now;
 	}
 
 	/**
@@ -447,8 +484,8 @@ public:
 	 * @return Whether a QoS (VERB_QOS_MEASUREMENT) packet needs to be emitted at this time
 	 */
 	inline bool needsToSendQoS(int64_t now) {
-		return ((_packetsReceivedSinceLastAck >= ZT_PATH_QOS_TABLE_SIZE) ||
-			((now - _lastQoSMeasurement) > ZT_PATH_QOS_INTERVAL)) && _packetsReceivedSinceLastAck;
+		return ((_packetsReceivedSinceLastQoS >= ZT_PATH_QOS_TABLE_SIZE) ||
+			((now - _lastQoSMeasurement) > ZT_PATH_QOS_INTERVAL)) && _packetsReceivedSinceLastQoS;
 	}
 
 	/**
@@ -523,15 +560,16 @@ public:
 	inline char *getAddressString() { return _addrString; }
 
 	/**
-	 * Compute and cache stability and performance metrics. The resultant stability coefficint is a measure of how "well behaved"
+	 * Compute and cache stability and performance metrics. The resultant stability coefficient is a measure of how "well behaved"
 	 * this path is. This figure is substantially different from (but required for the estimation of the path's overall "quality".
 	 *
 	 * @param now Current time
 	 */
 	inline void processBackgroundPathMeasurements(int64_t now, const int64_t peerId) {
+		Mutex::Lock _l(_statistics_m);
+		// Compute path stability
 		if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
 			_lastPathQualityComputeTime = now;
-			_phy->getIfName((PhySocket *)((uintptr_t)_localSocket), _ifname, 16);
 			address().toString(_addrString);
 			_meanThroughput = _throughputSamples->mean();
 			_meanLatency = _latencySamples->mean();
@@ -556,6 +594,17 @@ public:
 			_lastComputedStability *= 1 - _packetErrorRatio;
 			_qualitySamples->push(_lastComputedStability);
 		}
+		// Prevent QoS records from sticking around for too long
+		if (now - _lastQoSRecordPurge > ZT_PATH_QOS_RECORD_PURGE_INTERVAL)
+		{
+			std::map<uint64_t,uint64_t>::iterator it = _outQoSRecords.begin();
+			while (it != _outQoSRecords.end()) {
+				// Time since egress of tracked packet
+				if ((now - it->second) >= ZT_PATH_QOS_TIMEOUT) {
+					_outQoSRecords.erase(it++);
+				} else { it++; }
+			}
+		}
 	}
 
 	/**
@@ -592,13 +641,12 @@ public:
 		_qualitySamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
 		_packetValiditySamples = new RingBuffer<bool>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
 		memset(_ifname, 0, 16);
-		memset(_recorded_id, 0, sizeof(_recorded_id));
-		memset(_recorded_ts, 0, sizeof(_recorded_ts));
-		memset(_recorded_len, 0, sizeof(_recorded_len));
 		memset(_addrString, 0, sizeof(_addrString));
 	}
 
 private:
+	Mutex _statistics_m;
+
 	volatile int64_t _lastOut;
 	volatile int64_t _lastIn;
 	volatile int64_t _lastTrustEstablishedPacketReceived;
@@ -609,19 +657,19 @@ private:
 	InetAddress::IpScope _ipScope; // memoize this since it's a computed value checked often
 	AtomicCounter __refCount;
 
-	uint64_t _recorded_id[ZT_PATH_QOS_TABLE_SIZE];
-	uint64_t _recorded_ts[ZT_PATH_QOS_TABLE_SIZE];
-	uint16_t _recorded_len[ZT_PATH_QOS_TABLE_SIZE];
-
-	std::map<uint64_t, uint64_t> _outgoingPacketRecords;
+	std::map<uint64_t, uint64_t> _outQoSRecords; // id:egress_time
+	std::map<uint64_t, uint64_t> _inQoSRecords; // id:now
+	std::map<uint64_t, uint16_t> _inACKRecords; // id:len
 
 	int64_t _lastAck;
 	int64_t _lastThroughputEstimation;
 	int64_t _lastQoSMeasurement;
+	int64_t _lastQoSRecordPurge;
 
 	int64_t _unackedBytes;
 	int64_t _expectingAckAsOf;
 	int16_t _packetsReceivedSinceLastAck;
+	int16_t _packetsReceivedSinceLastQoS;
 
 	float _meanThroughput;
 	uint64_t _maxLifetimeThroughput;

+ 27 - 24
node/Peer.cpp

@@ -64,6 +64,7 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident
 	_credentialsCutoffCount(0),
 	_linkIsBalanced(false),
 	_linkIsRedundant(false),
+	_remotePeerMultipathEnabled(false),
 	_lastAggregateStatsReport(0)
 {
 	if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH))
@@ -104,8 +105,10 @@ void Peer::received(
 
 	{
 		Mutex::Lock _l(_paths_m);
-		if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
-			recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now);
+
+		recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now);
+
+		if (canUseMultipath()) {
 			if (path->needsToSendQoS(now)) {
 				sendQOS_MEASUREMENT(tPtr, path, path->localSocket(), path->address(), now);
 			}
@@ -167,7 +170,7 @@ void Peer::received(
 
 				// If we find a pre-existing path with the same address, just replace it.
 				// If we don't find anything we can replace, just use the replacePath that we previously decided on.
-				if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
+				if (canUseMultipath()) {
 					for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
 						if (_paths[i].p) {
 							if ( _paths[i].p->address().ss_family == path->address().ss_family && _paths[i].p->address().ipsEqual2(path->address())) {
@@ -269,21 +272,19 @@ void Peer::received(
 void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId,
 	uint16_t payloadLength, const Packet::Verb verb, int64_t now)
 {
-	if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
-		if (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME) {
-			path->expectingAck(now, packetId, payloadLength);
-		}
+	if (localMultipathSupport()) {
+		path->recordOutgoingPacket(now, packetId, payloadLength, verb);
 	}
 }
 
 void Peer::recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId,
 	uint16_t payloadLength, const Packet::Verb verb, int64_t now)
 {
-	if (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME) {
+	if (localMultipathSupport()) {
 		if (path->needsToSendAck(now)) {
 			sendACK(tPtr, path, path->localSocket(), path->address(), now);
 		}
-		path->recordIncomingPacket(now, packetId, payloadLength);
+		path->recordIncomingPacket(now, packetId, payloadLength, verb);
 	}
 }
 
@@ -384,9 +385,9 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
 
 	/**
 	 * Send traffic across the highest quality path only. This algorithm will still
-	 * use the old path quality metric.
+	 * use the old path quality metric from protocol version 9.
 	 */
-	if (RR->node->getMultipathMode() == ZT_MULTIPATH_NONE) {
+	if (!canUseMultipath()) {
 		long bestPathQuality = 2147483647;
 		for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
 			if (_paths[i].p) {
@@ -470,17 +471,20 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
 					stalePaths[numStalePaths] = i;
 					numStalePaths++;
 				}
-				// Record a default path to use as a short-circuit for the rest of the algorithm
+				// Record a default path to use as a short-circuit for the rest of the algorithm (if needed)
 				bestPath = i;
 			}
 		}
+
+				// Compare paths to each-other
+		float qualityScalingFactor = computeAggregateLinkRelativeQuality(now);
+
 		if (numAlivePaths == 0 && numStalePaths == 0) {
 			return SharedPtr<Path>();
 		} if (numAlivePaths == 1 || numStalePaths == 1) {
 			return _paths[bestPath].p;
 		}
-		// Compare paths to each-other
-		float qualityScalingFactor = computeAggregateLinkRelativeQuality(now);
+
 		// Convert set of relative performances into an allocation set
 		for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
 			if (_paths[i].p) {
@@ -530,7 +534,7 @@ char *Peer::interfaceListStr()
 				}
 			}
 			char *ipvStr = ipv ? (char*)"ipv4" : (char*)"ipv6";
-			sprintf(tmp, "(%s, %s, %5.4f)", _paths[i].p->getName(), ipvStr, currentAllocation);
+			sprintf(tmp, "(%s, %s, %.3f)", _paths[i].p->getName(), ipvStr, currentAllocation);
 			// Prevent duplicates
 			if(ifnamemap[_paths[i].p->getName()] != ipv) {
 				memcpy(ptr, tmp, strlen(tmp));
@@ -543,7 +547,7 @@ char *Peer::interfaceListStr()
 	}
 	ptr--; // Overwrite trailing space
 	if (imbalanced) {
-		sprintf(tmp, ", is IMBALANCED");
+		sprintf(tmp, ", is asymmetrical");
 		memcpy(ptr, tmp, sizeof(tmp));
 	} else {
 		*ptr = '\0';
@@ -688,10 +692,11 @@ void Peer::sendACK(void *tPtr,const SharedPtr<Path> &path,const int64_t localSoc
 
 void Peer::sendQOS_MEASUREMENT(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
 {
+	const int64_t _now = RR->node->now();
 	Packet outp(_id.address(),RR->identity.address(),Packet::VERB_QOS_MEASUREMENT);
 	char qosData[ZT_PATH_MAX_QOS_PACKET_SZ];
-	path->generateQoSPacket(now,qosData);
-	outp.append(qosData,sizeof(qosData));
+	int16_t len = path->generateQoSPacket(_now,qosData);
+	outp.append(qosData,len);
 	if (atAddress) {
 		outp.armor(_key,false);
 		RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size());
@@ -775,17 +780,15 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
 	const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD);
 	_lastSentFullHello = now;
 
-	// Emit traces regarding the status of aggregate links
-	if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
+	// Emit traces regarding aggregate link status
+	if (canUseMultipath()) {
 		int alivePathCount = aggregateLinkPhysicalPathCount();
 		if ((now - _lastAggregateStatsReport) > ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL) {
 			_lastAggregateStatsReport = now;
 			if (alivePathCount) {
 				RR->t->peerLinkAggregateStatistics(NULL,*this);
 			}
-		}
-		// Report link redundancy
-		if (alivePathCount < 2 && _linkIsRedundant) {
+		} if (alivePathCount < 2 && _linkIsRedundant) {
 			_linkIsRedundant = !_linkIsRedundant;
 			RR->t->peerLinkNoLongerRedundant(NULL,*this);
 		} if (alivePathCount > 1 && !_linkIsRedundant) {
@@ -821,7 +824,7 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
 			}
 		} else break;
 	}
-	if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
+	if (canUseMultipath()) {
 		while(j < ZT_MAX_PEER_NETWORK_PATHS) {
 			_paths[j].lr = 0;
 			_paths[j].p.zero();

+ 27 - 7
node/Peer.hpp

@@ -27,18 +27,13 @@
 #ifndef ZT_PEER_HPP
 #define ZT_PEER_HPP
 
-#include <stdint.h>
-
-#include "Constants.hpp"
-
-#include <algorithm>
-#include <utility>
 #include <vector>
-#include <stdexcept>
 
 #include "../include/ZeroTierOne.h"
 
+#include "Constants.hpp"
 #include "RuntimeEnvironment.hpp"
+#include "Node.hpp"
 #include "Path.hpp"
 #include "Address.hpp"
 #include "Utils.hpp"
@@ -416,6 +411,29 @@ public:
 
 	inline bool remoteVersionKnown() const { return ((_vMajor > 0)||(_vMinor > 0)||(_vRevision > 0)); }
 
+	/**
+	 * Record that the remote peer does have multipath enabled. As is evident by the receipt of a VERB_ACK
+	 * or a VERB_QOS_MEASUREMENT packet at some point in the past. Until this flag is set, the local client
+	 * shall assume that multipath is not enabled and should only use classical Protocol 9 logic.
+	 */
+	inline void inferRemoteMultipathEnabled() { _remotePeerMultipathEnabled = true; }
+
+	/**
+	 * @return Whether the local client supports and is configured to use multipath
+	 */
+	inline bool localMultipathSupport() { return ((RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) && (ZT_PROTO_VERSION > 9)); }
+
+	/**
+	 * @return Whether the remote peer supports and is configured to use multipath
+	 */
+	inline bool remoteMultipathSupport() { return (_remotePeerMultipathEnabled && (_vProto > 9)); }
+
+	/**
+	 * @return Whether this client can use multipath to communicate with this peer. True if both peers are using
+	 * the correct protocol and if both peers have multipath enabled. False if otherwise.
+	 */
+	inline bool canUseMultipath() { return (localMultipathSupport() && remoteMultipathSupport()); }
+
 	/**
 	 * @return True if peer has received a trust established packet (e.g. common network membership) in the past ZT_TRUST_EXPIRATION ms
 	 */
@@ -624,6 +642,8 @@ private:
 
 	bool _linkIsBalanced;
 	bool _linkIsRedundant;
+	bool _remotePeerMultipathEnabled;
+
 	uint64_t _lastAggregateStatsReport;
 
 	char _interfaceListStr[256]; // 16 characters * 16 paths in a link