浏览代码

Fix invalid defaultBondingPolicy conditions, Add ZT_MultipathFlowRebalanceStrategy, Add basic hysteresis mechanism to flow re-assignment

Joseph Henry 5 年之前
父节点
当前提交
5f0ee4fc78
共有 9 个文件被更改,包括 166 次插入158 次删除
  1. 27 0
      include/ZeroTierOne.h
  2. 122 148
      node/Bond.cpp
  3. 8 2
      node/Bond.hpp
  4. 3 2
      node/BondController.cpp
  5. 0 5
      node/Constants.hpp
  6. 1 0
      node/Flow.hpp
  7. 2 0
      node/IncomingPacket.cpp
  8. 2 1
      node/Peer.cpp
  9. 1 0
      service/OneService.cpp

+ 27 - 0
include/ZeroTierOne.h

@@ -521,6 +521,33 @@ enum ZT_MultipathMonitorStrategy
 	ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC = 3
 };
 
+/**
+ * Strategy for re-balancing protocol flows
+ */
+enum ZT_MultipathFlowRebalanceStrategy
+{
+	/**
+	 * Flows will only be re-balanced among slaves during
+	 * assignment or failover. This minimizes the possibility
+	 * of sequence reordering and is thus the default setting.
+	 */
+	ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_PASSIVE = 0,
+
+	/**
+	 * Flows that are active may be re-assigned to a new more
+	 * suitable slave if it can be done without disrupting the flow.
+	 * This setting can sometimes cause sequence re-ordering.
+	 */
+	ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_OPPORTUNISTIC = 0,
+
+	/**
+	 * Flows will be continuously re-assigned the most suitable slave
+	 * in order to maximize "balance". This can often cause sequence
+	 * reordering and is thus only reccomended for protocols like UDP.
+	 */
+	ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_AGGRESSIVE = 2
+};
+
 /**
  * Indices for the path quality weight vector
  */

+ 122 - 148
node/Bond.cpp

@@ -25,6 +25,10 @@ Bond::Bond(const RuntimeEnvironment *renv, int policy, const SharedPtr<Peer>& pe
 	RR(renv),
 	_peer(peer)
 {
+	// TODO: Remove for production
+	_header=false;
+	_lastLogTS = RR->node->now();
+	_lastPrintTS = RR->node->now();
 	setReasonableDefaults(policy, SharedPtr<Bond>(), false);
 	_policyAlias = BondController::getPolicyStrByCode(policy);
 }
@@ -41,6 +45,10 @@ Bond::Bond(const RuntimeEnvironment *renv, SharedPtr<Bond> originalBond, const S
 	RR(renv),
 	_peer(peer)
 {
+	// TODO: Remove for production
+	_header=false;
+	_lastLogTS = RR->node->now();
+	_lastPrintTS = RR->node->now();
 	setReasonableDefaults(originalBond->_bondingPolicy, originalBond, true);
 }
 
@@ -162,7 +170,7 @@ SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
 
 void Bond::recordIncomingInvalidPacket(const SharedPtr<Path>& path)
 {
-	//char pathStr[128];path->address().toString(pathStr);fprintf(stderr, "recordIncomingInvalidPacket() %s %s\n", getSlave(path)->ifname().c_str(), pathStr);
+	// char pathStr[128];path->address().toString(pathStr);fprintf(stderr, "recordIncomingInvalidPacket() %s %s\n", getSlave(path)->ifname().c_str(), pathStr);
 	Mutex::Lock _l(_paths_m);
 	for (int i=0; i<ZT_MAX_PEER_NETWORK_PATHS; ++i) {
 		if (_paths[i] == path) {
@@ -174,7 +182,7 @@ void Bond::recordIncomingInvalidPacket(const SharedPtr<Path>& path)
 void Bond::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId,
 	uint16_t payloadLength, const Packet::Verb verb, const int32_t flowId, int64_t now)
 {
-	//char pathStr[128];path->address().toString(pathStr);fprintf(stderr, "recordOutgoingPacket() %s %s, packetId=%llx, payloadLength=%d, verb=%x, flowId=%lx\n", getSlave(path)->ifname().c_str(), pathStr, packetId, payloadLength, verb, flowId);
+	// char pathStr[128];path->address().toString(pathStr);fprintf(stderr, "recordOutgoingPacket() %s %s, packetId=%llx, payloadLength=%d, verb=%x, flowId=%lx\n", getSlave(path)->ifname().c_str(), pathStr, packetId, payloadLength, verb, flowId);
 	_freeRandomByte += (unsigned char)(packetId >> 8); // Grab entropy to use in path selection logic
 	if (!_shouldCollectPathStatistics) {
 		return;
@@ -320,6 +328,7 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow> &flow, int64_t now)
 		idx = abs((int)(flow->id() % (_numBondedPaths)));
 		//fprintf(stderr, "flow->id()=%d, %x, _numBondedPaths=%d, idx=%d\n", flow->id(), flow->id(), _numBondedPaths, idx);
 		flow->assignPath(_paths[_bondedIdx[idx]],now);
+		++(_paths[_bondedIdx[idx]]->_assignedFlowCount);
 	}
 	if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
 		unsigned char entropy;
@@ -341,29 +350,32 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow> &flow, int64_t now)
 				totalIncompleteAllocation += _paths[i]->_allocation;
 			}
 		}
-		fprintf(stderr, "entropy     = %d, totalIncompleteAllocation=%d\n", entropy, totalIncompleteAllocation);
+		//fprintf(stderr, "entropy     = %d, totalIncompleteAllocation=%d\n", entropy, totalIncompleteAllocation);
 		entropy %= totalIncompleteAllocation;
-		fprintf(stderr, "new entropy = %d\n", entropy);
+		//fprintf(stderr, "new entropy = %d\n", entropy);
 		for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
 			if (_paths[i] && _paths[i]->bonded()) {
 				SharedPtr<Slave> slave = RR->bc->getSlaveBySocket(_policyAlias, _paths[i]->localSocket());
 				_paths[i]->address().toString(curPathStr);
 				uint8_t probabilitySegment = (_totalBondUnderload > 0) ? _paths[i]->_affinity : _paths[i]->_allocation;
-				fprintf(stderr, "i=%2d, entropy=%3d, alloc=%3d, byteload=%4d, segment=%3d, _totalBondUnderload=%3d, ifname=%s, path=%20s\n", i, entropy, _paths[i]->_allocation, _paths[i]->_relativeByteLoad, probabilitySegment, _totalBondUnderload, slave->ifname().c_str(), curPathStr);
+				//fprintf(stderr, "i=%2d, entropy=%3d, alloc=%3d, byteload=%4d, segment=%3d, _totalBondUnderload=%3d, ifname=%s, path=%20s\n", i, entropy, _paths[i]->_allocation, _paths[i]->_relativeByteLoad, probabilitySegment, _totalBondUnderload, slave->ifname().c_str(), curPathStr);
 				if (entropy <= probabilitySegment) {
 					idx = i;
-					fprintf(stderr, "\t is best path\n");
+					//fprintf(stderr, "\t is best path\n");
 					break;
 				}
 				entropy -= probabilitySegment;
 			}
 		}
 		if (idx < ZT_MAX_PEER_NETWORK_PATHS) {
+			if (flow->_assignedPath) {
+				flow->_previouslyAssignedPath = flow->_assignedPath;
+			}
 			flow->assignPath(_paths[idx],now);
 			++(_paths[idx]->_assignedFlowCount);
 		}
 		else {
-			fprintf(stderr, "could not assign flow?\n"); exit(0); // TODO: Remove
+			fprintf(stderr, "could not assign flow?\n"); exit(0); // TODO: Remove for production
 			return false;
 		}
 	}
@@ -397,6 +409,7 @@ SharedPtr<Flow> Bond::createFlow(const SharedPtr<Path> &path, int32_t flowId, un
 	if (path) {
 		flow->assignPath(path,now);
 		path->address().toString(curPathStr);
+		path->_assignedFlowCount++;
 		SharedPtr<Slave> slave = RR->bc->getSlaveBySocket(_policyAlias, flow->assignedPath()->localSocket());
 		fprintf(stderr, "assigned (rx) flow %x with peer %llx to path %s on %s\n", flow->id(), _peer->_id.address().toInt(), curPathStr, slave->ifname().c_str());
 	}
@@ -818,7 +831,7 @@ void Bond::curateBond(const int64_t now, bool rebuildBond)
 					++it;
 					++updatedBondedPathCount;
 					_paths[_bondedIdx[i]]->address().toString(pathStr);
-					fprintf(stderr, "setting i=%d, _bondedIdx[%d]=%d to bonded (%s %s)\n", i, i, _bondedIdx[i], getSlave(_paths[_bondedIdx[i]])->ifname().c_str(), pathStr);
+					//fprintf(stderr, "setting i=%d, _bondedIdx[%d]=%d to bonded (%s %s)\n", i, i, _bondedIdx[i], getSlave(_paths[_bondedIdx[i]])->ifname().c_str(), pathStr);
 				}
 			}
 			_numBondedPaths = updatedBondedPathCount;
@@ -834,8 +847,6 @@ void Bond::curateBond(const int64_t now, bool rebuildBond)
 void Bond::estimatePathQuality(const int64_t now)
 {
 	char pathStr[128];
-	//---
-
 	uint32_t totUserSpecifiedSlaveSpeed = 0;
 	if (_numBondedPaths) { // Compute relative user-specified speeds of slaves
 		for(unsigned int i=0;i<_numBondedPaths;++i) {
@@ -856,17 +867,11 @@ void Bond::estimatePathQuality(const int64_t now)
 	float pdv[ZT_MAX_PEER_NETWORK_PATHS];
 	float plr[ZT_MAX_PEER_NETWORK_PATHS];
 	float per[ZT_MAX_PEER_NETWORK_PATHS];
-	float thr[ZT_MAX_PEER_NETWORK_PATHS];
-	float thm[ZT_MAX_PEER_NETWORK_PATHS];
-	float thv[ZT_MAX_PEER_NETWORK_PATHS];
 
 	float maxLAT = 0;
 	float maxPDV = 0;
 	float maxPLR = 0;
 	float maxPER = 0;
-	float maxTHR = 0;
-	float maxTHM = 0;
-	float maxTHV = 0;
 
 	float quality[ZT_MAX_PEER_NETWORK_PATHS];
 	uint8_t alloc[ZT_MAX_PEER_NETWORK_PATHS];
@@ -877,9 +882,6 @@ void Bond::estimatePathQuality(const int64_t now)
 	memset(&pdv, 0, sizeof(pdv));
 	memset(&plr, 0, sizeof(plr));
 	memset(&per, 0, sizeof(per));
-	memset(&thr, 0, sizeof(thr));
-	memset(&thm, 0, sizeof(thm));
-	memset(&thv, 0, sizeof(thv));
 	memset(&quality, 0, sizeof(quality));
 	memset(&alloc, 0, sizeof(alloc));
 
@@ -901,24 +903,6 @@ void Bond::estimatePathQuality(const int64_t now)
 				_paths[i]->_throughputVariance = 0;
 			}
 		}
-		/*
-		else {
-			// Use estimated metrics
-			if (_paths[i]->throughputSamples.count()) {
-				// If we have samples, use them
-				_paths[i]->throughputMean = (uint64_t)_paths[i]->throughputSamples.mean();
-				if (_paths[i]->throughputMean > 0) {
-					_paths[i]->throughputVarianceSamples.push((float)_paths[i]->throughputSamples.stddev() / (float)_paths[i]->throughputMean);
-					_paths[i]->throughputVariance = _paths[i]->throughputVarianceSamples.mean();
-				}
-			}
-			else {
-				// No samples have been collected yet, assume best case scenario
-				_paths[i]->throughputMean = ZT_QOS_THR_NORM_MAX;
-				_paths[i]->throughputVariance = 0;
-			}
-		}
-		*/
 		// Drain unacknowledged QoS records
 		std::map<uint64_t,uint64_t>::iterator it = _paths[i]->qosStatsOut.begin();
 		uint64_t currentLostRecords = 0;
@@ -934,23 +918,16 @@ void Bond::estimatePathQuality(const int64_t now)
 		quality[i]=0;
 		totQuality=0;
 		// Normalize raw observations according to sane limits and/or user specified values
-		lat[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_latencyMean, 0, _maxAcceptableLatency, 0, 1));
-		pdv[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_latencyVariance, 0, _maxAcceptablePacketDelayVariance, 0, 1));
-		plr[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_packetLossRatio, 0, _maxAcceptablePacketLossRatio, 0, 1));
-		per[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_packetErrorRatio, 0, _maxAcceptablePacketErrorRatio, 0, 1));
-		//thr[i] = 1.0; //Utils::normalize(_paths[i]->throughputMean, 0, ZT_QOS_THR_NORM_MAX, 0, 1);
-		//thm[i] = 1.0; //Utils::normalize(_paths[i]->throughputMax, 0, ZT_QOS_THM_NORM_MAX, 0, 1);
-		//thv[i] = 1.0; //1.0 / expf(4*Utils::normalize(_paths[i]->throughputVariance, 0, ZT_QOS_THV_NORM_MAX, 0, 1));
+		lat[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_latencyMean,      0, _maxAcceptableLatency,             0, 1));
+		pdv[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_latencyVariance,  0, _maxAcceptablePacketDelayVariance, 0, 1));
+		plr[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_packetLossRatio,  0, _maxAcceptablePacketLossRatio,     0, 1));
+		per[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_packetErrorRatio, 0, _maxAcceptablePacketErrorRatio,    0, 1));
 		//scp[i] = _paths[i]->ipvPref != 0 ? 1.0 : Utils::normalize(_paths[i]->ipScope(), InetAddress::IP_SCOPE_NONE, InetAddress::IP_SCOPE_PRIVATE, 0, 1);
 		// Record bond-wide maximums to determine relative values
 		maxLAT = lat[i] > maxLAT ? lat[i] : maxLAT;
 		maxPDV = pdv[i] > maxPDV ? pdv[i] : maxPDV;
 		maxPLR = plr[i] > maxPLR ? plr[i] : maxPLR;
 		maxPER = per[i] > maxPER ? per[i] : maxPER;
-		//maxTHR = thr[i] > maxTHR ? thr[i] : maxTHR;
-		//maxTHM = thm[i] > maxTHM ? thm[i] : maxTHM;
-		//maxTHV = thv[i] > maxTHV ? thv[i] : maxTHV;
-
 		//fprintf(stdout, "EH   %d: lat=%8.3f,  ltm=%8.3f,  pdv=%8.3f,  plr=%5.3f,  per=%5.3f,  thr=%8f,  thm=%5.3f,  thv=%5.3f,  avl=%5.3f,  age=%8.2f,  scp=%4d,  q=%5.3f,  qtot=%5.3f,  ac=%d if=%s, path=%s\n",
 		//	              i,   lat[i],     ltm[i],     pdv[i],     plr[i],     per[i],     thr[i],     thm[i],     thv[i],     avl[i],     age[i],     scp[i], quality[i], totQuality, alloc[i], getSlave(_paths[i])->ifname().c_str(), pathStr);
 
@@ -962,9 +939,6 @@ void Bond::estimatePathQuality(const int64_t now)
 			quality[i] += ((maxPDV > 0.0f ? pdv[i] / maxPDV : 0.0f) * _qualityWeights[ZT_QOS_PDV_IDX]);
 			quality[i] += ((maxPLR > 0.0f ? plr[i] / maxPLR : 0.0f) * _qualityWeights[ZT_QOS_PLR_IDX]);
 			quality[i] += ((maxPER > 0.0f ? per[i] / maxPER : 0.0f) * _qualityWeights[ZT_QOS_PER_IDX]);
-			//quality[i] += ((maxTHR > 0.0f ? thr[i] / maxTHR : 0.0f) * _qualityWeights[ZT_QOS_THR_IDX]);
-			//quality[i] += ((maxTHM > 0.0f ? thm[i] / maxTHM : 0.0f) * _qualityWeights[ZT_QOS_THM_IDX]);
-			//quality[i] += ((maxTHV > 0.0f ? thv[i] / maxTHV : 0.0f) * _qualityWeights[ZT_QOS_THV_IDX]);
 			//quality[i] += (scp[i] * _qualityWeights[ZT_QOS_SCP_IDX]);
 			totQuality += quality[i];
 		}
@@ -1007,6 +981,7 @@ void Bond::estimatePathQuality(const int64_t now)
 			}
 			_header=true;
 		}
+		/*
 		fprintf(stdout, "%ld, %d, %d, %d, ",((now - RR->bc->getBondStartTime())),_numBondedPaths,_totalBondUnderload, _flows.size());
 		for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
 			if (_paths[i]) {
@@ -1015,15 +990,27 @@ void Bond::estimatePathQuality(const int64_t now)
 								  getSlave(_paths[i])->ifname().c_str(), pathStr, _paths[i]->_latencyMean, lat[i],pdv[i], _paths[i]->_packetLossRatio, plr[i],per[i],thr[i],thm[i],thv[i],(now - _paths[i]->lastIn()),quality[i],alloc[i],
 								  _paths[i]->_relativeByteLoad, _paths[i]->_assignedFlowCount, _paths[i]->alive(now, true), _paths[i]->eligible(now,_ackSendInterval), _paths[i]->qosStatsOut.size());
 			}
-		}
-		fprintf(stdout, "\n");
+		}*/
+		//fprintf(stdout, "\n");
 	}
 }
 
 void Bond::processBalanceTasks(const int64_t now)
 {
-	//fprintf(stderr, "processBalanceTasks\n");
 	char curPathStr[128];
+
+	// TODO: Generalize
+	int totalAllocation = 0;
+	for (int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+		if (!_paths[i]) {
+			continue;
+		}
+		if (_paths[i] && _paths[i]->bonded() && _paths[i]->eligible(now,_ackSendInterval)) {
+			totalAllocation+=_paths[i]->_allocation;
+		}
+	}
+	unsigned char minimumAllocationValue = 0.33 * ((float)totalAllocation / (float)_numBondedPaths);
+
 	if (_allowFlowHashing) {
 		/**
 		 * Clean up and reset flows if necessary
@@ -1067,6 +1054,32 @@ void Bond::processBalanceTasks(const int64_t now)
 				}
 			}
 		}
+		/**
+		 * Re-allocate flows from under-performing
+		 * NOTE: This could be part of the above block but was kept separate for clarity.
+		 */
+		if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
+			Mutex::Lock _l(_flows_m);
+			for (int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+				if (!_paths[i]) {
+					continue;
+				}
+				if (_paths[i] && _paths[i]->bonded() && _paths[i]->eligible(now,_ackSendInterval) && (_paths[i]->_allocation < minimumAllocationValue) && _paths[i]->_assignedFlowCount) {
+					_paths[i]->address().toString(curPathStr);
+					fprintf(stderr, "%d reallocating flows from under-performing path %s on %s\n", (RR->node->now() - RR->bc->getBondStartTime()), curPathStr, getSlave(_paths[i])->ifname().c_str());
+					std::map<int32_t,SharedPtr<Flow> >::iterator flow_it = _flows.begin();
+					while (flow_it != _flows.end()) {
+						if (flow_it->second->assignedPath() == _paths[i]) {
+							if(assignFlowToBondedPath(flow_it->second, now)) {
+								_paths[i]->_assignedFlowCount--;
+							}
+						}
+						++flow_it;
+					}
+					_paths[i]->_shouldReallocateFlows = false;
+				}
+			}
+		}
 	}
 	/**
 	 * Tasks specific to (Balance Round Robin)
@@ -1091,70 +1104,47 @@ void Bond::processBalanceTasks(const int64_t now)
 	if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
 		if (_allowFlowHashing) {
 			Mutex::Lock _l(_flows_m);
-			/**
-			 * Re-balance flows in proportion to slave capacity (or when eligibility changes)
-			 */
-			if ((now - _lastFlowRebalance) > ZT_FLOW_REBALANCE_INTERVAL) {
+			if (_flowRebalanceStrategy == ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_PASSIVE) {
+				// Do nothing here, this is taken care of in the more general case above.
+			}
+			if (_flowRebalanceStrategy == ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_OPPORTUNISTIC) {
+				// If the flow is temporarily inactive we should take this opportunity to re-assign the flow if needed.
+			}
+			if (_flowRebalanceStrategy == ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_AGGRESSIVE) {
 				/**
-				 * Determine "load" for bonded paths
+				 * Return flows to the original path if it has once again become available
 				 */
-				uint64_t totalBytes = 0;
-				for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { // first pass: compute absolute byte load and total
-					if (_paths[i] && _paths[i]->bonded()) {
-						_paths[i]->_byteLoad = 0;
-						std::map<int32_t,SharedPtr<Flow> >::iterator flow_it = _flows.begin();
-						while (flow_it != _flows.end()) {
-							if (flow_it->second->assignedPath() == _paths[i]) {
-								_paths[i]->_byteLoad += flow_it->second->totalBytes();
-							}
-							++flow_it;
+				if ((now - _lastFlowRebalance) > ZT_FLOW_REBALANCE_INTERVAL) {
+					std::map<int32_t,SharedPtr<Flow> >::iterator flow_it = _flows.begin();
+					while (flow_it != _flows.end()) {
+						if (flow_it->second->_previouslyAssignedPath && flow_it->second->_previouslyAssignedPath->eligible(now, _ackSendInterval)
+								&& (flow_it->second->_previouslyAssignedPath->_allocation >= (minimumAllocationValue * 2))) {
+							fprintf(stderr, "moving flow back onto its previous path assignment (based on eligibility)\n");
+							(flow_it->second->_assignedPath->_assignedFlowCount)--;
+							flow_it->second->assignPath(flow_it->second->_previouslyAssignedPath,now);
+							(flow_it->second->_previouslyAssignedPath->_assignedFlowCount)++;
 						}
-						totalBytes += _paths[i]->_byteLoad;
+						++flow_it;
 					}
+					_lastFlowRebalance = now;
 				}
 				/**
-				 * Determine "affinity" for bonded path
+				 * Return flows to the original path if it has once again become (performant)
 				 */
-				//fprintf(stderr, "\n\n");
-
-				_totalBondUnderload = 0;
-/*
-				for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) { // second pass: compute relative byte loads and total imbalance
-					if (_paths[i] && _paths[i]->bonded()) {
-						if (totalBytes) {
-							uint8_t relativeByteLoad = std::ceil(((float)_paths[i]->_byteLoad / (float)totalBytes) * (float)255);
-							//fprintf(stderr, "lastComputedAllocation  = %d\n", _paths[i]->allocation);
-							//fprintf(stderr, "  relativeByteLoad      = %d\n", relativeByteLoad);
-							_paths[i]->_relativeByteLoad = relativeByteLoad;
-							uint8_t relativeUnderload = std::max(0, (int)_paths[i]->_allocation - (int)relativeByteLoad);
-							//fprintf(stderr, "    relativeUnderload  = %d\n", relativeUnderload);
-							_totalBondUnderload += relativeUnderload;
-							//fprintf(stderr, "    _totalBondUnderload = %d\n\n", _totalBondUnderload);
-							//_paths[i]->affinity = (relativeUnderload > 0 ? relativeUnderload : _paths[i]->_allocation);
-						}
-						else { // set everything to base values
-							_totalBondUnderload = 0;
-							//_paths[i]->affinity = 0;
+				if ((now - _lastFlowRebalance) > ZT_FLOW_REBALANCE_INTERVAL) {
+					std::map<int32_t,SharedPtr<Flow> >::iterator flow_it = _flows.begin();
+					while (flow_it != _flows.end()) {
+						if (flow_it->second->_previouslyAssignedPath && flow_it->second->_previouslyAssignedPath->eligible(now, _ackSendInterval)
+								&& (flow_it->second->_previouslyAssignedPath->_allocation >= (minimumAllocationValue * 2))) {
+							fprintf(stderr, "moving flow back onto its previous path assignment (based on performance)\n");
+							(flow_it->second->_assignedPath->_assignedFlowCount)--;
+							flow_it->second->assignPath(flow_it->second->_previouslyAssignedPath,now);
+							(flow_it->second->_previouslyAssignedPath->_assignedFlowCount)++;
 						}
+						++flow_it;
 					}
+					_lastFlowRebalance = now;
 				}
-*/
-				//fprintf(stderr, "_totalBondUnderload=%d (end)\n\n", _totalBondUnderload);
-
-				/**
-				 *
-				 */
-				//fprintf(stderr, "_lastFlowRebalance\n");
-				std::map<int32_t, SharedPtr<Flow> >::iterator it = _flows.begin();
-				while (it != _flows.end()) {
-					int32_t flowId = it->first;
-					SharedPtr<Flow> flow = it->second;
-					if ((now - flow->_lastPathReassignment) > ZT_FLOW_MIN_REBALANCE_INTERVAL) {
-						//fprintf(stdout, "  could move : %x\n", flowId);
-					}
-					++it;
-				}
-				_lastFlowRebalance = now;
 			}
 		}
 		else if (!_allowFlowHashing) {
@@ -1440,7 +1430,7 @@ void Bond::processActiveBackupTasks(const int64_t now)
 		if (!_abFailoverQueue.empty()) {
 			fprintf(stderr, "%llu AB: (failure) there are (%lu) slaves in queue to choose from...\n", ((now - RR->bc->getBondStartTime())), _abFailoverQueue.size());
 			dequeueNextActiveBackupPath(now);
-			_abPath->address().toString(curPathStr); fprintf(stderr, "%llu sAB: (failure) switched to %s on %s\n", ((now - RR->bc->getBondStartTime())), curPathStr, getSlave(_abPath)->ifname().c_str());
+			_abPath->address().toString(curPathStr); fprintf(stderr, "%llu AB: (failure) switched to %s on %s\n", ((now - RR->bc->getBondStartTime())), curPathStr, getSlave(_abPath)->ifname().c_str());
 		} else {
 			fprintf(stderr, "%llu AB: (failure) nothing available in the slave queue, doing nothing.\n", ((now - RR->bc->getBondStartTime())));
 		}
@@ -1515,12 +1505,16 @@ void Bond::setReasonableDefaults(int policy, SharedPtr<Bond> templateBond, bool
 		_bondingPolicy= policy;
 	}
 
+	_freeRandomByte = 0;
+	_lastCheckUserPreferences = 0;
+	_lastBackgroundTaskCheck = 0;
+
 	_downDelay = 0;
 	_upDelay = 0;
 	_allowFlowHashing=false;
 	_bondMonitorInterval=0;
 	_shouldCollectPathStatistics=false;
-	_lastBackgroundTaskCheck=0;
+
 
 	// Path negotiation
 	_allowPathNegotiation=false;
@@ -1539,7 +1533,7 @@ void Bond::setReasonableDefaults(int policy, SharedPtr<Bond> templateBond, bool
 	_lastFlowRebalance=0;
 	_totalBondUnderload = 0;
 
-	//_maxAcceptableLatency
+	_maxAcceptableLatency = 100;
 	_maxAcceptablePacketDelayVariance = 50;
 	_maxAcceptablePacketLossRatio = 0.10;
 	_maxAcceptablePacketErrorRatio = 0.10;
@@ -1547,17 +1541,18 @@ void Bond::setReasonableDefaults(int policy, SharedPtr<Bond> templateBond, bool
 
 	_lastFrame=0;
 
-	// TODO: Remove
-	_header=false;
-	_lastLogTS = RR->node->now();
-	_lastPrintTS = RR->node->now();
+
+
+	/* ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_PASSIVE is the most conservative strategy and is
+	least likely to cause unexpected behavior */
+	_flowRebalanceStrategy = ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_AGGRESSIVE;
 
 	/**
 	 * Paths are actively monitored to provide a real-time quality/preference-ordered rapid failover queue.
 	 */
 	switch (policy) {
 		case ZT_BONDING_POLICY_ACTIVE_BACKUP:
-			_failoverInterval = 5000;
+			_failoverInterval = 500;
 			_abSlaveSelectMethod = ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE;
 			_slaveMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC;
 			_qualityWeights[ZT_QOS_LAT_IDX] = 0.2f;
@@ -1581,7 +1576,7 @@ void Bond::setReasonableDefaults(int policy, SharedPtr<Bond> templateBond, bool
 		 * Paths are monitored to determine when/if one needs to be added or removed from the rotation
 		 */
 		case ZT_BONDING_POLICY_BALANCE_RR:
-			_failoverInterval = 5000;
+			_failoverInterval = 500;
 			_allowFlowHashing = false;
 			_packetsPerSlave = 1024;
 			_slaveMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC;
@@ -1600,8 +1595,8 @@ void Bond::setReasonableDefaults(int policy, SharedPtr<Bond> templateBond, bool
 		 * path and where to place the next flow.
 		 */
 		case ZT_BONDING_POLICY_BALANCE_XOR:
-			_failoverInterval = 5000;;
-			_upDelay=_bondMonitorInterval*2;
+			_failoverInterval = 500;
+			_upDelay = _bondMonitorInterval * 2;
 			_allowFlowHashing = true;
 			_slaveMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC;
 			_qualityWeights[ZT_QOS_LAT_IDX] = 0.4f;
@@ -1623,13 +1618,13 @@ void Bond::setReasonableDefaults(int policy, SharedPtr<Bond> templateBond, bool
 			_failoverInterval = 3000;
 			_allowFlowHashing = true;
 			_slaveMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC;
-			_qualityWeights[ZT_QOS_LAT_IDX] = 0.3f;
+			_qualityWeights[ZT_QOS_LAT_IDX] = 0.4f;
 			_qualityWeights[ZT_QOS_LTM_IDX] = 0.0f;
-			_qualityWeights[ZT_QOS_PDV_IDX] = 0.1f;
-			_qualityWeights[ZT_QOS_PLR_IDX] = 0.1f;
-			_qualityWeights[ZT_QOS_PER_IDX] = 0.1f;
+			_qualityWeights[ZT_QOS_PDV_IDX] = 0.4f;
+			_qualityWeights[ZT_QOS_PLR_IDX] = 0.2f;
+			_qualityWeights[ZT_QOS_PER_IDX] = 0.0f;
 			_qualityWeights[ZT_QOS_THR_IDX] = 0.0f;
-			_qualityWeights[ZT_QOS_THM_IDX] = 0.4f;
+			_qualityWeights[ZT_QOS_THM_IDX] = 0.0f;
 			_qualityWeights[ZT_QOS_THV_IDX] = 0.0f;
 			_qualityWeights[ZT_QOS_SCP_IDX] = 0.0f;
 			break;
@@ -1637,6 +1632,8 @@ void Bond::setReasonableDefaults(int policy, SharedPtr<Bond> templateBond, bool
 			break;
 	}
 
+	/* If a user has specified custom parameters for this bonding policy, overlay
+	them onto the defaults that were previously set */
 	if (useTemplate) {
 		_policyAlias = templateBond->_policyAlias;
 		_failoverInterval = templateBond->_failoverInterval;
@@ -1742,7 +1739,7 @@ void Bond::dumpInfo(const int64_t now)
 		fprintf(stderr, "Paths (bp=%d, stats=%d, fh=%d) :\n",
 			_policy, _shouldCollectPathStatistics, _allowFlowHashing);
 	}*/
-	if ((now - _lastPrintTS) < 1000) {
+	if ((now - _lastPrintTS) < 2000) {
 		return;
 	}
 	_lastPrintTS = now;
@@ -1856,30 +1853,7 @@ void Bond::dumpInfo(const int64_t now)
 					currPathStr);
 			}
 		}
-		/*
-		if (_allowFlowHashing) {
-			//Mutex::Lock _l(_flows_m);
-			if (_flows.size()) {
-				fprintf(stderr, "\nFlows:\n");
-				std::map<int32_t,SharedPtr<Flow> >::iterator it = _flows.begin();
-				while (it != _flows.end()) {
-					it->second->assignedPath()->address().toString(currPathStr);
-					SharedPtr<Slave> slave =RR->bc->getSlaveBySocket(_policyAlias, it->second->assignedPath()->localSocket());
-					fprintf(stderr, " [%4x] in=%16llu, out=%16llu, bytes=%16llu, last=%16llu, if=%8s\t\t%s\n",
-						it->second->id(),
-						it->second->bytesInPerUnitTime(),
-						it->second->bytesOutPerUnitTime(),
-						it->second->totalBytes(),
-						it->second->age(now),
-						slave->ifname().c_str(),
-						currPathStr);
-					++it;
-				}
-			}
-		}
-		*/
 	}
-	//fprintf(stderr, "\n\n\n\n\n");
 }
 
 } // namespace ZeroTier

+ 8 - 2
node/Bond.hpp

@@ -87,7 +87,7 @@ public:
 	std::string policyAlias() { return _policyAlias; }
 
 	/**
-	 * Inform the bond about the path that its peer just learned about
+	 * Inform the bond about the path that its peer (owning object) just learned about
 	 *
 	 * @param path Newly-learned Path which should now be handled by the Bond
 	 * @param now Current time
@@ -434,7 +434,12 @@ public:
 	inline void setFailoverInterval(uint32_t interval) { _failoverInterval = interval; }
 
 	/**
-	 * @param strategy The strategy that the bond uses to prob for path aliveness and quality
+	 * @param strategy Strategy that the bond uses to re-assign protocol flows.
+	 */
+	inline void setFlowRebalanceStrategy(uint32_t strategy) { _flowRebalanceStrategy = strategy; }
+
+	/**
+	 * @param strategy Strategy that the bond uses to prob for path aliveness and quality
 	 */
 	inline void setSlaveMonitorStrategy(uint8_t strategy) { _slaveMonitorStrategy = strategy; }
 
@@ -578,6 +583,7 @@ private:
 
 	// balance-aware
 	uint64_t _totalBondUnderload;
+	uint8_t _flowRebalanceStrategy;
 
 	// dynamic slave monitoring
 	uint8_t _slaveMonitorStrategy;

+ 3 - 2
node/BondController.cpp

@@ -11,6 +11,7 @@
  */
 /****/
 
+#include "Constants.hpp"
 #include "BondController.hpp"
 #include "Peer.hpp"
 
@@ -23,6 +24,7 @@ BondController::BondController(const RuntimeEnvironment *renv) :
 	RR(renv)
 {
 	bondStartTime = RR->node->now();
+	_defaultBondingPolicy = ZT_BONDING_POLICY_NONE;
 }
 
 bool BondController::slaveAllowed(std::string &policyAlias, SharedPtr<Slave> slave)
@@ -83,10 +85,9 @@ SharedPtr<Bond> BondController::createTransportTriggeredBond(const RuntimeEnviro
 	Bond *bond = nullptr;
 	if (!_bonds.count(identity)) {
 		std::string policyAlias;
-		int _defaultBondingPolicy = defaultBondingPolicy();
 		fprintf(stderr, "new bond, registering for %llx\n", identity);
 		if (!_policyTemplateAssignments.count(identity)) {
-			if (defaultBondingPolicy()) {
+			if (_defaultBondingPolicy) {
 				fprintf(stderr, "  no assignment, using default (%d)\n", _defaultBondingPolicy);
 				bond = new Bond(renv, _defaultBondingPolicy, peer);
 			}

+ 0 - 5
node/Constants.hpp

@@ -341,11 +341,6 @@
  */
 #define ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE 32
 
-/**
- * Number of samples to consider when processing long-term trends
- */
-#define ZT_QOS_LONGTERM_SAMPLE_WIN_SIZE (ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE * 4)
-
 /**
  * Max allowable time spent in any queue (in ms)
  */

+ 1 - 0
node/Flow.hpp

@@ -116,6 +116,7 @@ struct Flow
 	int64_t _lastActivity;
 	int64_t _lastPathReassignment;
 	SharedPtr<Path> _assignedPath;
+	SharedPtr<Path> _previouslyAssignedPath;
 };
 
 } // namespace ZeroTier

+ 2 - 0
node/IncomingPacket.cpp

@@ -221,9 +221,11 @@ bool IncomingPacket::_doACK(const RuntimeEnvironment *RR,void *tPtr,const Shared
 bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer)
 {
 	SharedPtr<Bond> bond = peer->bond();
+	/* TODO: Fix rate gate issue
 	if (!bond || !bond->rateGateQoS(RR->node->now())) {
 		return true;
 	}
+	*/
 	/* Dissect incoming QoS packet. From this we can compute latency values and their variance.
 	 * The latency variance is used as a measure of "jitter". */
 	if (payloadLength() > ZT_QOS_MAX_PACKET_SIZE || payloadLength() < ZT_QOS_MIN_PACKET_SIZE) {

+ 2 - 1
node/Peer.cpp

@@ -55,7 +55,8 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident
 	_remoteMultipathSupported(false),
 	_canUseMultipath(false),
 	_shouldCollectPathStatistics(0),
-	_lastComputedAggregateMeanLatency(0)
+	_lastComputedAggregateMeanLatency(0),
+	_bondingPolicy(0)
 {
 	if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH)) {
 		throw ZT_EXCEPTION_INVALID_ARGUMENT;

+ 1 - 0
service/OneService.cpp

@@ -1621,6 +1621,7 @@ public:
 				// Bond-specific properties
 				newTemplateBond->setUpDelay(OSUtils::jsonInt(customPolicy["upDelay"],-1));
 				newTemplateBond->setDownDelay(OSUtils::jsonInt(customPolicy["downDelay"],-1));
+				newTemplateBond->setFlowRebalanceStrategy(OSUtils::jsonInt(customPolicy["flowRebalanceStrategy"],(uint64_t)0));
 				newTemplateBond->setFailoverInterval(OSUtils::jsonInt(customPolicy["failoverInterval"],(uint64_t)0));
 				newTemplateBond->setPacketsPerSlave(OSUtils::jsonInt(customPolicy["packetsPerSlave"],-1));
 				std::string slaveMonitorStrategyStr(OSUtils::jsonString(customPolicy["slaveMonitorStrategy"],""));