瀏覽代碼

Added multipath field to zerotier-cli status output. Adjusted how path estimates are computed and cached

Joseph Henry 7 年之前
父節點
當前提交
17fbb020e7
共有 6 個文件被更改,包括 169 次插入47 次删除
  1. 50 7
      include/ZeroTierOne.h
  2. 11 0
      node/Node.cpp
  3. 47 32
      node/Path.hpp
  4. 2 2
      node/Peer.cpp
  5. 9 5
      node/Peer.hpp
  6. 50 1
      service/OneService.cpp

+ 50 - 7
include/ZeroTierOne.h

@@ -449,13 +449,6 @@ enum ZT_MultipathMode
 	 * Will cease sending traffic over links that appear to be stale.
 	 */
 	ZT_MULTIPATH_PROPORTIONALLY_BALANCED = 2,
-
-	/**
-	 * Traffic is allocated across a user-defined interface/allocation
-	 *
-	 * Will cease sending traffic over links that appear to be stale.
-	 */
-	ZT_MULTIPATH_MANUALLY_BALANCED = 3
 };
 
 /**
@@ -1221,6 +1214,56 @@ typedef struct
 	 */
 	uint64_t trustedPathId;
 
+	/**
+	 * One-way latency
+	 */
+	float latency;
+
+	/**
+	 * How much latency varies over time
+	 */
+	float packetDelayVariance;
+
+	/**
+	 * How much observed throughput varies over time
+	 */
+	float throughputDisturbCoeff;
+
+	/**
+	 * Packet Error Ratio (PER)
+	 */
+	float packetErrorRatio;
+
+	/**
+	 * Packet Loss Ratio (PLR)
+	 */
+	float packetLossRatio;
+
+	/**
+	 * Stability of the path
+	 */
+	float stability;
+
+	/**
+	 * Current throughput (moving average)
+	 */
+	uint64_t throughput;
+
+	/**
+	 * Maximum observed throughput for this path
+	 */
+	uint64_t maxThroughput;
+
+	/**
+	 * Percentage of traffic allocated to this path
+	 */
+	float allocation;
+
+	/**
+	 * Name of physical interface (for monitoring)
+	 */
+	char *ifname;
+
 	/**
 	 * Is path expired?
 	 */

+ 11 - 0
node/Node.cpp

@@ -474,6 +474,17 @@ ZT_PeerList *Node::peers() const
 			p->paths[p->pathCount].trustedPathId = RR->topology->getOutboundPathTrust((*path)->address());
 			p->paths[p->pathCount].expired = 0;
 			p->paths[p->pathCount].preferred = ((*path) == bestp) ? 1 : 0;
+			p->paths[p->pathCount].latency = (*path)->latency();
+			p->paths[p->pathCount].packetDelayVariance = (*path)->packetDelayVariance();
+			p->paths[p->pathCount].throughputDisturbCoeff = (*path)->throughputDisturbanceCoefficient();
+			p->paths[p->pathCount].packetErrorRatio = (*path)->packetErrorRatio();
+			p->paths[p->pathCount].packetLossRatio = (*path)->packetLossRatio();
+			p->paths[p->pathCount].stability = (*path)->lastComputedStability();
+			p->paths[p->pathCount].throughput = (*path)->meanThroughput();
+			p->paths[p->pathCount].maxThroughput = (*path)->maxLifetimeThroughput();
+			p->paths[p->pathCount].allocation = (*path)->allocation();
+			p->paths[p->pathCount].ifname = (*path)->getName();
+
 			++p->pathCount;
 		}
 	}

+ 47 - 32
node/Path.hpp

@@ -111,15 +111,16 @@ public:
 		_expectingAckAsOf(0),
 		_packetsReceivedSinceLastAck(0),
 		_packetsReceivedSinceLastQoS(0),
-		_meanThroughput(0.0),
 		_maxLifetimeThroughput(0),
+		_lastComputedMeanThroughput(0),
 		_bytesAckedSinceLastThroughputEstimation(0),
-		_meanLatency(0.0),
-		_packetDelayVariance(0.0),
-		_packetErrorRatio(0.0),
-		_packetLossRatio(0),
+		_lastComputedMeanLatency(0.0),
+		_lastComputedPacketDelayVariance(0.0),
+		_lastComputedPacketErrorRatio(0.0),
+		_lastComputedPacketLossRatio(0),
 		_lastComputedStability(0.0),
 		_lastComputedRelativeQuality(0),
+		_lastComputedThroughputDistCoeff(0.0),
 		_lastAllocation(0.0)
 	{
 		prepareBuffers();
@@ -142,15 +143,16 @@ public:
 		_expectingAckAsOf(0),
 		_packetsReceivedSinceLastAck(0),
 		_packetsReceivedSinceLastQoS(0),
-		_meanThroughput(0.0),
 		_maxLifetimeThroughput(0),
+		_lastComputedMeanThroughput(0),
 		_bytesAckedSinceLastThroughputEstimation(0),
-		_meanLatency(0.0),
-		_packetDelayVariance(0.0),
-		_packetErrorRatio(0.0),
-		_packetLossRatio(0),
+		_lastComputedMeanLatency(0.0),
+		_lastComputedPacketDelayVariance(0.0),
+		_lastComputedPacketErrorRatio(0.0),
+		_lastComputedPacketLossRatio(0),
 		_lastComputedStability(0.0),
 		_lastComputedRelativeQuality(0),
+		_lastComputedThroughputDistCoeff(0.0),
 		_lastAllocation(0.0)
 	{
 		prepareBuffers();
@@ -162,9 +164,11 @@ public:
 		delete _throughputSamples;
 		delete _latencySamples;
 		delete _packetValiditySamples;
+		delete _throughputDisturbanceSamples;
 		_throughputSamples = NULL;
 		_latencySamples = NULL;
 		_packetValiditySamples = NULL;
+		_throughputDisturbanceSamples = NULL;
 	}
 
 	/**
@@ -311,7 +315,7 @@ public:
 	inline void recordOutgoingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb)
 	{
 		Mutex::Lock _l(_statistics_m);
-		if (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME) {
+		if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) {
 			if (packetId % 2 == 0) { // even -> use for ACK
 				_unackedBytes += payloadLength;
 				// Take note that we're expecting a VERB_ACK on this path as of a specific time
@@ -336,7 +340,7 @@ public:
 	inline void recordIncomingPacket(int64_t now, int64_t packetId, uint16_t payloadLength, Packet::Verb verb)
 	{
 		Mutex::Lock _l(_statistics_m);
-		if (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME) {
+		if (verb != Packet::VERB_ACK && verb != Packet::VERB_QOS_MEASUREMENT) {
 			if (packetId % 2 == 0) { // even -> use for ACK
 				_inACKRecords[packetId] = payloadLength;
 				_packetsReceivedSinceLastAck++;
@@ -497,14 +501,14 @@ public:
 	inline int64_t ackAge(int64_t now) { return _expectingAckAsOf ? now - _expectingAckAsOf : 0; }
 
 	/**
-	 * The maximum observed throughput for this path
+	 * The maximum observed throughput (in bits/s) for this path
 	 */
 	inline uint64_t maxLifetimeThroughput() { return _maxLifetimeThroughput; }
 
 	/**
 	 * @return The mean throughput (in bits/s) of this link
 	 */
-	inline float meanThroughput() { return _meanThroughput; }
+	inline uint64_t meanThroughput() { return _lastComputedMeanThroughput; }
 
 	/**
 	 * Assign a new relative quality value for this path in the aggregate link
@@ -543,22 +547,22 @@ public:
 	/**
 	 * @return Packet delay variance
 	 */
-	inline float packetDelayVariance() { return _packetDelayVariance; }
+	inline float packetDelayVariance() { return _lastComputedPacketDelayVariance; }
 
 	/**
 	 * @return Previously-computed mean latency
 	 */
-	inline float meanLatency() { return _meanLatency; }
+	inline float meanLatency() { return _lastComputedMeanLatency; }
 
 	/**
 	 * @return Packet loss rate (PLR)
 	 */
-	inline float packetLossRatio() { return _packetLossRatio; }
+	inline float packetLossRatio() { return _lastComputedPacketLossRatio; }
 
 	/**
 	 * @return Packet error ratio (PER)
 	 */
-	inline float packetErrorRatio() { return _packetErrorRatio; }
+	inline float packetErrorRatio() { return _lastComputedPacketErrorRatio; }
 
 	/**
 	 * Record an invalid incoming packet. This packet failed MAC/compression/cipher checks and will now
@@ -571,38 +575,46 @@ public:
 	 */
 	inline char *getAddressString() { return _addrString; }
 
+	/**
+	 * @return The current throughput disturbance coefficient
+	 */
+	inline float throughputDisturbanceCoefficient() { return _lastComputedThroughputDistCoeff; }
+
 	/**
 	 * Compute and cache stability and performance metrics. The resultant stability coefficient is a measure of how "well behaved"
 	 * this path is. This figure is substantially different from (but required for the estimation of the path's overall "quality".
 	 *
 	 * @param now Current time
 	 */
-	inline void processBackgroundPathMeasurements(int64_t now, const int64_t peerId) {
+	inline void processBackgroundPathMeasurements(int64_t now) {
 		if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
 			Mutex::Lock _l(_statistics_m);
 			_lastPathQualityComputeTime = now;
 			address().toString(_addrString);
-			_meanThroughput = _throughputSamples->mean();
-			_meanLatency = _latencySamples->mean();
-			_packetDelayVariance = _latencySamples->stddev(); // Similar to "jitter" (SEE: RFC 3393, RFC 4689)
+			_lastComputedMeanLatency = _latencySamples->mean();
+			_lastComputedPacketDelayVariance = _latencySamples->stddev(); // Similar to "jitter" (SEE: RFC 3393, RFC 4689)
+			_lastComputedMeanThroughput = (uint64_t)_throughputSamples->mean();
 			// If no packet validity samples, assume PER==0
-			_packetErrorRatio = 1 - (_packetValiditySamples->count() ? _packetValiditySamples->mean() : 1);
+			_lastComputedPacketErrorRatio = 1 - (_packetValiditySamples->count() ? _packetValiditySamples->mean() : 1);
 			// Compute path stability
 			// Normalize measurements with wildly different ranges into a reasonable range
-			float normalized_pdv = Utils::normalize(_packetDelayVariance, 0, ZT_PATH_MAX_PDV, 0, 10);
-			float normalized_la = Utils::normalize(_meanLatency, 0, ZT_PATH_MAX_MEAN_LATENCY, 0, 10);
+			float normalized_pdv = Utils::normalize(_lastComputedPacketDelayVariance, 0, ZT_PATH_MAX_PDV, 0, 10);
+			float normalized_la = Utils::normalize(_lastComputedMeanLatency, 0, ZT_PATH_MAX_MEAN_LATENCY, 0, 10);
 			float throughput_cv = _throughputSamples->mean() > 0 ? _throughputSamples->stddev() / _throughputSamples->mean() : 1;
 			// Form an exponential cutoff and apply contribution weights
 			float pdv_contrib = exp((-1)*normalized_pdv) * ZT_PATH_CONTRIB_PDV;
 			float latency_contrib = exp((-1)*normalized_la) * ZT_PATH_CONTRIB_LATENCY;
+			// Throughput Disturbance Coefficient
 			float throughput_disturbance_contrib = exp((-1)*throughput_cv) * ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE;
+			_throughputDisturbanceSamples->push(throughput_cv);
+			_lastComputedThroughputDistCoeff = _throughputDisturbanceSamples->mean();
 			// Obey user-defined ignored contributions
 			pdv_contrib = ZT_PATH_CONTRIB_PDV > 0.0 ? pdv_contrib : 1;
 			latency_contrib = ZT_PATH_CONTRIB_LATENCY > 0.0 ? latency_contrib : 1;
 			throughput_disturbance_contrib = ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE > 0.0 ? throughput_disturbance_contrib : 1;
-			// Compute the quality product
+			// Stability
 			_lastComputedStability = pdv_contrib + latency_contrib + throughput_disturbance_contrib;
-			_lastComputedStability *= 1 - _packetErrorRatio;
+			_lastComputedStability *= 1 - _lastComputedPacketErrorRatio;
 			// Prevent QoS records from sticking around for too long
 			std::map<uint64_t,uint64_t>::iterator it = _outQoSRecords.begin();
 			while (it != _outQoSRecords.end()) {
@@ -646,6 +658,7 @@ public:
 		_throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
 		_latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
 		_packetValiditySamples = new RingBuffer<bool>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
+		_throughputDisturbanceSamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
 		memset(_ifname, 0, 16);
 		memset(_addrString, 0, sizeof(_addrString));
 	}
@@ -677,19 +690,20 @@ private:
 	int16_t _packetsReceivedSinceLastAck;
 	int16_t _packetsReceivedSinceLastQoS;
 
-	float _meanThroughput;
 	uint64_t _maxLifetimeThroughput;
+	uint64_t _lastComputedMeanThroughput;
 	uint64_t _bytesAckedSinceLastThroughputEstimation;
 
-	volatile float _meanLatency;
-	float _packetDelayVariance;
+	float _lastComputedMeanLatency;
+	float _lastComputedPacketDelayVariance;
 
-	float _packetErrorRatio;
-	float _packetLossRatio;
+	float _lastComputedPacketErrorRatio;
+	float _lastComputedPacketLossRatio;
 
 	// cached estimates
 	float _lastComputedStability;
 	float _lastComputedRelativeQuality;
+	float _lastComputedThroughputDistCoeff;
 	float _lastAllocation;
 
 	// cached human-readable strings for tracing purposes
@@ -699,6 +713,7 @@ private:
 	RingBuffer<uint64_t> *_throughputSamples;
 	RingBuffer<uint32_t> *_latencySamples;
 	RingBuffer<bool> *_packetValiditySamples;
+	RingBuffer<float> *_throughputDisturbanceSamples;
 };
 
 } // namespace ZeroTier

+ 2 - 2
node/Peer.cpp

@@ -116,7 +116,7 @@ void Peer::received(
 			}
 			for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
 				if (_paths[i].p) {
-					_paths[i].p->processBackgroundPathMeasurements(now, _id.address().toInt());
+					_paths[i].p->processBackgroundPathMeasurements(now);
 				}
 			}
 		}
@@ -415,7 +415,7 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
 
 	for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
 		if (_paths[i].p) {
-			_paths[i].p->processBackgroundPathMeasurements(now, _id.address().toInt());
+			_paths[i].p->processBackgroundPathMeasurements(now);
 		}
 	}
 

+ 9 - 5
node/Peer.hpp

@@ -353,14 +353,18 @@ public:
 	inline int64_t isActive(int64_t now) const { return ((now - _lastNontrivialReceive) < ZT_PEER_ACTIVITY_TIMEOUT); }
 
 	/**
-	 * @return Latency in milliseconds of best path or 0xffff if unknown / no paths
+	 * @return Latency in milliseconds of best/aggregate path or 0xffff if unknown / no paths
 	 */
 	inline unsigned int latency(const int64_t now)
 	{
-		SharedPtr<Path> bp(getAppropriatePath(now,false));
-		if (bp)
-			return bp->latency();
-		return 0xffff;
+		if (RR->node->getMultipathMode()) {
+			return (int)computeAggregateLinkMeanLatency();
+		} else {
+			SharedPtr<Path> bp(getAppropriatePath(now,false));
+			if (bp)
+				return bp->latency();
+			return 0xffff;
+		}
 	}
 
 	/**

+ 50 - 1
service/OneService.cpp

@@ -298,6 +298,39 @@ static void _peerToJson(nlohmann::json &pj,const ZT_Peer *peer)
 	pj["paths"] = pa;
 }
 
+static void _peerAggregateLinkToJson(nlohmann::json &pj,const ZT_Peer *peer)
+{
+	char tmp[256];
+	OSUtils::ztsnprintf(tmp,sizeof(tmp),"%.10llx",peer->address);
+	pj["aggregateLinkLatency"] = peer->latency;
+
+	nlohmann::json pa = nlohmann::json::array();
+	for(unsigned int i=0;i<peer->pathCount;++i) {
+		//int64_t lastSend = peer->paths[i].lastSend;
+		//int64_t lastReceive = peer->paths[i].lastReceive;
+		nlohmann::json j;
+		j["address"] = reinterpret_cast<const InetAddress *>(&(peer->paths[i].address))->toString(tmp);
+		//j["lastSend"] = (lastSend < 0) ? 0 : lastSend;
+		//j["lastReceive"] = (lastReceive < 0) ? 0 : lastReceive;
+		//j["trustedPathId"] = peer->paths[i].trustedPathId;
+		//j["active"] = (bool)(peer->paths[i].expired == 0);
+		//j["expired"] = (bool)(peer->paths[i].expired != 0);
+		//j["preferred"] = (bool)(peer->paths[i].preferred != 0);
+		j["latency"] = peer->paths[i].latency;
+		//j["packetDelayVariance"] = peer->paths[i].packetDelayVariance;
+		//j["throughputDisturbCoeff"] = peer->paths[i].throughputDisturbCoeff;
+		//j["packetErrorRatio"] = peer->paths[i].packetErrorRatio;
+		//j["packetLossRatio"] = peer->paths[i].packetLossRatio;
+		j["stability"] = peer->paths[i].stability;
+		j["throughput"] = peer->paths[i].throughput;
+		//j["maxThroughput"] = peer->paths[i].maxThroughput;
+		j["allocation"] = peer->paths[i].allocation;
+		j["ifname"] = peer->paths[i].ifname;
+		pa.push_back(j);
+	}
+	pj["paths"] = pa;
+}
+
 static void _moonToJson(nlohmann::json &mj,const World &world)
 {
 	char tmp[4096];
@@ -1189,7 +1222,23 @@ public:
 					json &settings = res["config"]["settings"];
 					settings["primaryPort"] = OSUtils::jsonInt(settings["primaryPort"],(uint64_t)_primaryPort) & 0xffff;
 					settings["allowTcpFallbackRelay"] = OSUtils::jsonBool(settings["allowTcpFallbackRelay"],_allowTcpFallbackRelay);
-					settings["multipathMode"] = OSUtils::jsonInt(settings["multipathMode"],_multipathMode);
+
+					if (_multipathMode) {
+						json &multipathConfig = res["multipath"];
+						ZT_PeerList *pl = _node->peers();
+						char peerAddrStr[256];
+						if (pl) {
+							for(unsigned long i=0;i<pl->peerCount;++i) {
+								if (pl->peers[i].role == ZT_PEER_ROLE_LEAF) {
+									nlohmann::json pj;
+									_peerAggregateLinkToJson(pj,&(pl->peers[i]));
+									OSUtils::ztsnprintf(peerAddrStr,sizeof(peerAddrStr),"%.10llx",pl->peers[i].address);
+									multipathConfig[peerAddrStr] = (pj);
+								}
+							}
+						}
+					}
+
 #ifdef ZT_USE_MINIUPNPC
 					settings["portMappingEnabled"] = OSUtils::jsonBool(settings["portMappingEnabled"],true);
 #else