Browse Source

Fix for ab-mode link failure, QoS metrics (WIP)

Joseph Henry 3 years ago
parent
commit
7ed5bde426
5 changed files with 177 additions and 16 deletions
  1. 93 8
      node/Bond.cpp
  2. 51 1
      node/Bond.hpp
  3. 5 0
      node/Constants.hpp
  4. 27 7
      node/IncomingPacket.cpp
  5. 1 0
      node/IncomingPacket.hpp

+ 93 - 8
node/Bond.cpp

@@ -407,8 +407,9 @@ void Bond::recordOutgoingPacket(const SharedPtr<Path>& path, uint64_t packetId,
 			_lastFrame = now;
 		}
 		if (shouldRecord) {
+			//_paths[pathIdx].expectingAckAsOf = now;
+			//_paths[pathIdx].totalBytesSentSinceLastAckRecieved += payloadLength;
 			//_paths[pathIdx].unackedBytes += payloadLength;
-			// Take note that we're expecting a VERB_ACK on this path as of a specific time
 			if (_paths[pathIdx].qosStatsOut.size() < ZT_QOS_MAX_PENDING_RECORDS) {
 				_paths[pathIdx].qosStatsOut[packetId] = now;
 			}
@@ -443,10 +444,24 @@ void Bond::recordIncomingPacket(const SharedPtr<Path>& path, uint64_t packetId,
 			}
 			if (shouldRecord) {
 				if (_paths[pathIdx].qosStatsIn.size() < ZT_QOS_MAX_PENDING_RECORDS) {
+					// debug("Recording QoS information (table size = %d)", _paths[pathIdx].qosStatsIn.size());
 					_paths[pathIdx].qosStatsIn[packetId] = now;
 					++(_paths[pathIdx].packetsReceivedSinceLastQoS);
 					//_paths[pathIdx].packetValiditySamples.push(true);
 				}
+				else {
+					debug("QoS buffer full, will not record information");
+				}
+				/*
+				if (_paths[pathIdx].ackStatsIn.size() < ZT_ACK_MAX_PENDING_RECORDS) {
+					//debug("Recording ACK information (table size = %d)", _paths[pathIdx].ackStatsIn.size());
+					_paths[pathIdx].ackStatsIn[packetId] = payloadLength;
+					++(_paths[pathIdx].packetsReceivedSinceLastAck);
+				}
+				else {
+					debug("ACK buffer full, will not record information");
+				}
+				*/
 			}
 		}
 	}
@@ -491,6 +506,16 @@ void Bond::receivedQoS(const SharedPtr<Path>& path, int64_t now, int count, uint
 	_paths[pathIdx].qosRecordSize.push(count);
 }
 
+void Bond::receivedAck(int pathIdx, int64_t now, int32_t ackedBytes)
+{
+	/*
+	Mutex::Lock _l(_paths_m);
+	debug("received ACK of %d bytes on path %s, there are still %d un-acked bytes", ackedBytes, pathToStr(_paths[pathIdx].p).c_str(), _paths[pathIdx].unackedBytes);
+	_paths[pathIdx].lastAckReceived = now;
+	_paths[pathIdx].unackedBytes = (ackedBytes > _paths[pathIdx].unackedBytes) ? 0 : _paths[pathIdx].unackedBytes - ackedBytes;
+	*/
+}
+
 int32_t Bond::generateQoSPacket(int pathIdx, int64_t now, char* qosBuffer)
 {
 	int32_t len = 0;
@@ -743,12 +768,38 @@ void Bond::sendPATH_NEGOTIATION_REQUEST(void* tPtr, int pathIdx)
 	}
 }
 
+void Bond::sendACK(void* tPtr, int pathIdx, int64_t localSocket, const InetAddress& atAddress, int64_t now)
+{
+	/*
+	Packet outp(_peer->_id.address(), RR->identity.address(), Packet::VERB_ACK);
+	int32_t bytesToAck = 0;
+	std::map<uint64_t, uint64_t>::iterator it = _paths[pathIdx].ackStatsIn.begin();
+	while (it != _paths[pathIdx].ackStatsIn.end()) {
+		bytesToAck += it->second;
+		++it;
+	}
+	debug("sending ACK of %d bytes on path %s (table size = %d)", bytesToAck, pathToStr(_paths[pathIdx].p).c_str(), _paths[pathIdx].ackStatsIn.size());
+	outp.append<uint32_t>(bytesToAck);
+	if (atAddress) {
+		outp.armor(_peer->key(), false, _peer->aesKeysIfSupported());
+		RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size());
+	}
+	else {
+		RR->sw->send(tPtr, outp, false);
+	}
+	_paths[pathIdx].ackStatsIn.clear();
+	_paths[pathIdx].packetsReceivedSinceLastAck = 0;
+	_paths[pathIdx].lastAckSent = now;
+	*/
+}
+
 void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, const InetAddress& atAddress, int64_t now)
 {
 	int64_t _now = RR->node->now();
 	Packet outp(_peer->_id.address(), RR->identity.address(), Packet::VERB_QOS_MEASUREMENT);
 	char qosData[ZT_QOS_MAX_PACKET_SIZE];
 	int16_t len = generateQoSPacket(pathIdx, _now, qosData);
+	// debug("sending QOS via link %s (len=%d)", pathToStr(_paths[pathIdx].p).c_str(), len);
 	if (len) {
 		outp.append(qosData, len);
 		if (atAddress) {
@@ -762,7 +813,6 @@ void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, con
 		_paths[pathIdx].lastQoSMeasurement = now;
 		_overheadBytes += outp.size();
 	}
-	// debug("send QOS via link %s (len=%d)", pathToStr(_paths[pathIdx].p).c_str(), len);
 }
 
 void Bond::processBackgroundBondTasks(void* tPtr, int64_t now)
@@ -799,6 +849,12 @@ void Bond::processBackgroundBondTasks(void* tPtr, int64_t now)
 				if (_paths[i].needsToSendQoS(now, _qosSendInterval)) {
 					sendQOS_MEASUREMENT(tPtr, i, _paths[i].p->localSocket(), _paths[i].p->address(), now);
 				}
+				// ACK
+				/*
+				if (_paths[i].needsToSendAck(now, _ackSendInterval)) {
+					sendACK(tPtr, i, _paths[i].p->localSocket(), _paths[i].p->address(), now);
+				}
+				*/
 			}
 		}
 	}
@@ -1095,6 +1151,20 @@ void Bond::estimatePathQuality(int64_t now)
 			log("Dropped %d QOS out-records", numDroppedQosOutRecords);
 		}
 
+		/*
+		for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
+			if (! _paths[i].p) {
+				continue;
+			}
+			// if ((now - _paths[i].lastAckReceived) > ackSendInterval) {
+			//	debug("been a while since ACK");
+			//	if (_paths[i].unackedBytes > 0) {
+			//		_paths[i].unackedBytes / _paths[i].bytesSen
+			//	}
+			// }
+		}
+		*/
+
 		it = _paths[i].qosStatsIn.begin();
 		int numDroppedQosInRecords = 0;
 		while (it != _paths[i].qosStatsIn.end()) {
@@ -1238,6 +1308,7 @@ void Bond::dequeueNextActiveBackupPath(uint64_t now)
 
 bool Bond::abForciblyRotateLink()
 {
+	Mutex::Lock _l(_paths_m);
 	if (_policy == ZT_BOND_POLICY_ACTIVE_BACKUP) {
 		int prevPathIdx = _abPathIdx;
 		dequeueNextActiveBackupPath(RR->node->now());
@@ -1366,12 +1437,18 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
 
 	// Remove ineligible paths from the failover link queue
 	for (std::deque<int>::iterator it(_abFailoverQueue.begin()); it != _abFailoverQueue.end();) {
+		if (! _paths[(*it)].p) {
+			log("link is no longer valid, removing from failover queue (%zu links remain in queue)", _abFailoverQueue.size());
+			it = _abFailoverQueue.erase(it);
+			continue;
+		}
 		if (_paths[(*it)].p && ! _paths[(*it)].eligible) {
 			SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[(*it)].p->localSocket());
 			it = _abFailoverQueue.erase(it);
 			if (link) {
-				log("link %s is ineligible, removing from failover queue (%zu links in queue)", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size());
+				log("link %s is ineligible, removing from failover queue (%zu links remain in queue)", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size());
 			}
+			continue;
 		}
 		else {
 			++it;
@@ -1533,8 +1610,17 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
 	if (prevActiveBackupPathIdx != _abPathIdx) {
 		_lastActiveBackupPathChange = now;
 	}
+	if (_abFailoverQueue.empty()) {
+		return;	  // No sense in continuing since there are no links to switch to
+	}
+
 	if (_abLinkSelectMethod == ZT_BOND_RESELECTION_POLICY_ALWAYS) {
 		SharedPtr<Link> abLink = getLink(_paths[_abPathIdx].p);
+		if (! _paths[_abFailoverQueue.front()].p) {
+			log("invalid link. not switching");
+			return;
+		}
+
 		SharedPtr<Link> abFailoverLink = getLink(_paths[_abFailoverQueue.front()].p);
 		if (abLink && ! abLink->primary() && _paths[_abFailoverQueue.front()].p && abFailoverLink && abFailoverLink->primary()) {
 			dequeueNextActiveBackupPath(now);
@@ -1589,6 +1675,7 @@ void Bond::initTimers()
 	_lastPathNegotiationCheck = 0;
 	_lastPathNegotiationReceived = 0;
 	_lastQoSRateCheck = 0;
+	_lastAckRateCheck = 0;
 	_lastQualityEstimation = 0;
 	_lastBondStatusLog = 0;
 	_lastSummaryDump = 0;
@@ -1621,10 +1708,6 @@ void Bond::setBondParameters(int policy, SharedPtr<Bond> templateBond, bool useT
 	_localUtility = 0;
 	_negotiatedPathIdx = 0;
 
-	// QOS Verb (and related checks)
-
-	_qosCutoffCount = 0;
-
 	// User preferences which may override the default bonding algorithm's behavior
 
 	_userHasSpecifiedPrimaryLink = false;
@@ -1717,7 +1800,9 @@ void Bond::setBondParameters(int policy, SharedPtr<Bond> templateBond, bool useT
 	_monitorInterval = _failoverInterval / ZT_BOND_ECHOS_PER_FAILOVER_INTERVAL;
 	_qualityEstimationInterval = _failoverInterval * 2;
 	_qosSendInterval = _failoverInterval * 2;
+	_ackSendInterval = _failoverInterval * 2;
 	_qosCutoffCount = 0;
+	_ackCutoffCount = 0;
 	_defaultPathRefractoryPeriod = 8000;
 }
 
@@ -1736,7 +1821,7 @@ void Bond::setUserQualityWeights(float weights[], int len)
 
 SharedPtr<Link> Bond::getLink(const SharedPtr<Path>& path)
 {
-	return RR->bc->getLinkBySocket(_policyAlias, path->localSocket());
+	return ! path ? SharedPtr<Link>() : RR->bc->getLinkBySocket(_policyAlias, path->localSocket());
 }
 
 std::string Bond::pathToStr(const SharedPtr<Path>& path)

+ 51 - 1
node/Bond.hpp

@@ -638,6 +638,15 @@ class Bond {
 	 */
 	void receivedQoS(const SharedPtr<Path>& path, int64_t now, int count, uint64_t* rx_id, uint16_t* rx_ts);
 
+	/**
+	 * Process the contents of an inbound VERB_ACK to gather path quality observations.
+	 *
+	 * @param pathIdx Path over which packet was received
+	 * @param now Current time
+	 * @param ackedBytes Number of bytes ACKed by this VERB_ACK
+	 */
+	void receivedAck(int pathIdx, int64_t now, int32_t ackedBytes);
+
 	/**
 	 * Generate the contents of a VERB_QOS_MEASUREMENT packet.
 	 *
@@ -879,6 +888,26 @@ class Bond {
 	 */
 	void processBackgroundBondTasks(void* tPtr, int64_t now);
 
+	/**
+	 * Rate limit gate for VERB_ACK
+	 *
+	 * @param now Current time
+	 * @return Whether the incoming packet should be rate-gated
+	 */
+	inline bool rateGateACK(const int64_t now)
+	{
+		_ackCutoffCount++;
+		int numToDrain = _lastAckRateCheck ? (now - _lastAckRateCheck) / ZT_ACK_DRAINAGE_DIVISOR : _ackCutoffCount;
+		_lastAckRateCheck = now;
+		if (_ackCutoffCount > numToDrain) {
+			_ackCutoffCount -= numToDrain;
+		}
+		else {
+			_ackCutoffCount = 0;
+		}
+		return (_ackCutoffCount < ZT_ACK_CUTOFF_LIMIT);
+	}
+
 	/**
 	 * Rate limit gate for VERB_QOS_MEASUREMENT
 	 *
@@ -1204,7 +1233,11 @@ class Bond {
   private:
 	struct NominatedPath {
 		NominatedPath()
-			: lastQoSMeasurement(0)
+			: lastAckSent(0)
+			, lastAckReceived(0)
+			, unackedBytes(0)
+			, packetsReceivedSinceLastAck(0)
+			, lastQoSMeasurement(0)
 			, lastThroughputEstimation(0)
 			, lastRefractoryUpdate(0)
 			, lastAliveToggle(0)
@@ -1295,6 +1328,15 @@ class Bond {
 			return ((packetsReceivedSinceLastQoS >= ZT_QOS_TABLE_SIZE) || ((now - lastQoSMeasurement) > qosSendInterval)) && packetsReceivedSinceLastQoS;
 		}
 
+		/**
+		 * @param now Current time
+		 * @return Whether an ACK (VERB_ACK) packet needs to be emitted at this time
+		 */
+		inline bool needsToSendAck(int64_t now, int ackSendInterval)
+		{
+			return ((now - lastAckSent) >= ackSendInterval || (packetsReceivedSinceLastAck == ZT_QOS_TABLE_SIZE)) && packetsReceivedSinceLastAck;
+		}
+
 		/**
 		 * Reset packet counters
 		 */
@@ -1306,6 +1348,7 @@ class Bond {
 
 		std::map<uint64_t, uint64_t> qosStatsOut;	// id:egress_time
 		std::map<uint64_t, uint64_t> qosStatsIn;	// id:now
+		std::map<uint64_t, uint64_t> ackStatsIn;	// id:now
 
 		RingBuffer<int, ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE> qosRecordSize;
 		RingBuffer<float, ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE> qosRecordLossSamples;
@@ -1314,6 +1357,11 @@ class Bond {
 		RingBuffer<float, ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE> throughputVarianceSamples;
 		RingBuffer<uint16_t, ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE> latencySamples;
 
+		uint64_t lastAckSent;
+		uint64_t lastAckReceived;
+		uint64_t unackedBytes;
+		uint64_t packetsReceivedSinceLastAck;
+
 		uint64_t lastQoSMeasurement;		 // Last time that a VERB_QOS_MEASUREMENT was sent out on this path.
 		uint64_t lastThroughputEstimation;	 // Last time that the path's throughput was estimated.
 		uint64_t lastRefractoryUpdate;		 // The last time that the refractory period was updated.
@@ -1513,7 +1561,9 @@ class Bond {
 	 * Rate-limiting
 	 */
 	uint16_t _qosCutoffCount;
+	uint16_t _ackCutoffCount;
 	uint64_t _lastQoSRateCheck;
+	uint64_t _lastAckRateCheck;
 	uint16_t _pathNegotiationCutoffCount;
 	uint64_t _lastPathNegotiationReceived;
 

+ 5 - 0
node/Constants.hpp

@@ -372,6 +372,11 @@
  */
 #define ZT_QOS_TABLE_SIZE ((ZT_QOS_MAX_PACKET_SIZE * 8) / (64 + 16))
 
+/**
+ * Maximum number of packets we monitor for ACK information at any given time
+ */
+#define ZT_ACK_MAX_PENDING_RECORDS (32 * 1024)
+
 /**
  * Maximum number of packets we monitor for QoS information at any given time
  */

+ 27 - 7
node/IncomingPacket.cpp

@@ -88,6 +88,7 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr,int32_t f
 					peer->received(tPtr,_path,hops(),packetId(),payloadLength(),v,0,Packet::VERB_NOP,false,0,ZT_QOS_NO_FLOW);
 					break;
 				case Packet::VERB_HELLO:                      r = _doHELLO(RR,tPtr,true); break;
+				case Packet::VERB_ACK            :            r = _doACK(RR,tPtr,peer); break;
 				case Packet::VERB_QOS_MEASUREMENT:            r = _doQOS_MEASUREMENT(RR,tPtr,peer); break;
 				case Packet::VERB_ERROR:                      r = _doERROR(RR,tPtr,peer); break;
 				case Packet::VERB_OK:                         r = _doOK(RR,tPtr,peer); break;
@@ -250,28 +251,47 @@ bool IncomingPacket::_doERROR(const RuntimeEnvironment *RR,void *tPtr,const Shar
 	return true;
 }
 
-bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer)
+bool IncomingPacket::_doACK(const RuntimeEnvironment* RR, void* tPtr, const SharedPtr<Peer>& peer)
 {
+	/*
 	SharedPtr<Bond> bond = peer->bond();
-	if (!bond || !bond->rateGateQoS(RR->node->now(), _path)) {
+	if (! bond || ! bond->rateGateACK(RR->node->now())) {
+		return true;
+	}
+	int32_t ackedBytes;
+	if (payloadLength() != sizeof(ackedBytes)) {
+		return true;   // ignore
+	}
+	memcpy(&ackedBytes, payload(), sizeof(ackedBytes));
+	if (bond) {
+		bond->receivedAck(_path, RR->node->now(), Utils::ntoh(ackedBytes));
+	}
+	*/
+	return true;
+}
+
+bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment* RR, void* tPtr, const SharedPtr<Peer>& peer)
+{
+	SharedPtr<Bond> bond = peer->bond();
+	if (! bond || ! bond->rateGateQoS(RR->node->now(), _path)) {
 		return true;
 	}
 	if (payloadLength() > ZT_QOS_MAX_PACKET_SIZE || payloadLength() < ZT_QOS_MIN_PACKET_SIZE) {
-		return true; // ignore
+		return true;   // ignore
 	}
 	const int64_t now = RR->node->now();
 	uint64_t rx_id[ZT_QOS_TABLE_SIZE];
 	uint16_t rx_ts[ZT_QOS_TABLE_SIZE];
-	char *begin = (char *)payload();
-	char *ptr = begin;
+	char* begin = (char*)payload();
+	char* ptr = begin;
 	int count = 0;
 	unsigned int len = payloadLength();
 	// Read packet IDs and latency compensation intervals for each packet tracked by this QoS packet
 	while (ptr < (begin + len) && (count < ZT_QOS_TABLE_SIZE)) {
 		memcpy((void*)&rx_id[count], ptr, sizeof(uint64_t));
-		ptr+=sizeof(uint64_t);
+		ptr += sizeof(uint64_t);
 		memcpy((void*)&rx_ts[count], ptr, sizeof(uint16_t));
-		ptr+=sizeof(uint16_t);
+		ptr += sizeof(uint16_t);
 		count++;
 	}
 	if (bond) {

+ 1 - 0
node/IncomingPacket.hpp

@@ -116,6 +116,7 @@ private:
 	// been authenticated, decrypted, decompressed, and classified.
 	bool _doERROR(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
 	bool _doHELLO(const RuntimeEnvironment *RR,void *tPtr,const bool alreadyAuthenticated);
+	bool _doACK(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
 	bool _doQOS_MEASUREMENT(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
 	bool _doOK(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
 	bool _doWHOIS(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);