Browse Source

A few fixes for cluster mode.

Adam Ierymenko 7 years ago
parent
commit
508fa6a7fe
4 changed files with 133 additions and 73 deletions
  1. 2 2
      node/IncomingPacket.cpp
  2. 9 4
      node/Path.hpp
  3. 119 64
      node/Peer.cpp
  4. 3 3
      node/Peer.hpp

+ 2 - 2
node/IncomingPacket.cpp

@@ -1092,7 +1092,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
 						(RR->node->shouldUsePathForZeroTierTraffic(tPtr,peer->address(),_path->localSocket(),a)) ) // should use path
 				{
 					if ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) != 0) {
-						peer->clusterRedirect(tPtr,_path->localSocket(),a,now);
+						peer->clusterRedirect(tPtr,_path,a,now);
 					} else if (++countPerScope[(int)a.ipScope()][0] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) {
 						peer->attemptToContactAt(tPtr,InetAddress(),a,now,false,0);
 					}
@@ -1106,7 +1106,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
 						(RR->node->shouldUsePathForZeroTierTraffic(tPtr,peer->address(),_path->localSocket(),a)) ) // should use path
 				{
 					if ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) != 0) {
-						peer->clusterRedirect(tPtr,_path->localSocket(),a,now);
+						peer->clusterRedirect(tPtr,_path,a,now);
 					} else if (++countPerScope[(int)a.ipScope()][1] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) {
 						peer->attemptToContactAt(tPtr,InetAddress(),a,now,false,0);
 					}

+ 9 - 4
node/Path.hpp

@@ -281,13 +281,18 @@ public:
 	/**
 	 * @return Path quality -- lower is better
 	 */
-	inline int quality(const int64_t now) const
+	inline long quality(const int64_t now) const
 	{
-		const int l = (int)_latency;
-		const int age = (int)std::min((now - _lastIn),(int64_t)(ZT_PATH_HEARTBEAT_PERIOD * 10)); // set an upper sanity limit to avoid overflow
-		return (((age < (ZT_PATH_HEARTBEAT_PERIOD + 5000)) ? l : (l + 0xffff + age)) * (int)((ZT_INETADDRESS_MAX_SCOPE - _ipScope) + 1));
+		const int l = (long)_latency;
+		const int age = (long)std::min((now - _lastIn),(int64_t)(ZT_PATH_HEARTBEAT_PERIOD * 10)); // set an upper sanity limit to avoid overflow
+		return (((age < (ZT_PATH_HEARTBEAT_PERIOD + 5000)) ? l : (l + 0xffff + age)) * (long)((ZT_INETADDRESS_MAX_SCOPE - _ipScope) + 1));
 	}
 
+	/**
+	 * @return True if this path is alive (receiving heartbeats)
+	 */
+	inline bool alive(const int64_t now) const { return ((now - _lastIn) < (ZT_PATH_HEARTBEAT_PERIOD + 5000)); }
+
 	/**
 	 * @return True if this path needs a heartbeat
 	 */

+ 119 - 64
node/Peer.cpp

@@ -148,39 +148,64 @@ void Peer::received(
 
 	if (hops == 0) {
 		// If this is a direct packet (no hops), update existing paths or learn new ones
-		Mutex::Lock _l(_paths_m);
 
-		unsigned int worstQualityPath = 0;
-		int worstQuality = 0;
 		bool havePath = false;
-		for(unsigned int p=0;p<ZT_MAX_PEER_NETWORK_PATHS;++p) {
-			if (_paths[p].p) {
-				if (_paths[p].p == path) {
-					_paths[p].lr = now;
-					havePath = true;
-					break;
-				}
-				const int q = _paths[p].p->quality(now) / _paths[p].priority;
-				if (q >= worstQuality) {
-					worstQuality = q;
-					worstQualityPath = p;
-				}
-			} else {
-				worstQualityPath = p;
-				break;
+		{
+			Mutex::Lock _l(_paths_m);
+			for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+				if (_paths[i].p) {
+					if (_paths[i].p == path) {
+						_paths[i].lr = now;
+						havePath = true;
+						break;
+					}
+				} else break;
 			}
 		}
 
 		if ((!havePath)&&(RR->node->shouldUsePathForZeroTierTraffic(tPtr,_id.address(),path->localSocket(),path->address()))) {
-			if (verb == Packet::VERB_OK) {
-				RR->t->peerLearnedNewPath(tPtr,networkId,*this,_paths[worstQualityPath].p,path,packetId);
-				_paths[worstQualityPath].lr = now;
-				_paths[worstQualityPath].p = path;
-				_paths[worstQualityPath].priority = 1;
-			} else {
-				attemptToContactAt(tPtr,path->localSocket(),path->address(),now,true,path->nextOutgoingCounter());
-				path->sent(now);
-				RR->t->peerConfirmingUnknownPath(tPtr,networkId,*this,path,packetId,verb);
+			Mutex::Lock _l(_paths_m);
+
+			// Paths are redunant if they duplicate an alive path to the same IP or
+			// with the same local socket and address family.
+			bool redundant = false;
+			for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+				if (_paths[i].p) {
+					if ( (_paths[i].p->alive(now)) && ( ((_paths[i].p->localSocket() == path->localSocket())&&(_paths[i].p->address().ss_family == path->address().ss_family)) || (_paths[i].p->address().ipsEqual(path->address())) ) )  {
+						redundant = true;
+						break;
+					}
+				} else break;
+			}
+
+			if (!redundant) {
+				unsigned int replacePath = ZT_MAX_PEER_NETWORK_PATHS;
+				int replacePathQuality = 0;
+				for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+					if (_paths[i].p) {
+						const int q = _paths[i].p->quality(now);
+						if (q > replacePathQuality) {
+							replacePathQuality = q;
+							replacePath = i;
+						}
+					} else {
+						replacePath = i;
+						break;
+					}
+				}
+
+				if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) {
+					if (verb == Packet::VERB_OK) {
+						RR->t->peerLearnedNewPath(tPtr,networkId,*this,_paths[replacePath].p,path,packetId);
+						_paths[replacePath].lr = now;
+						_paths[replacePath].p = path;
+						_paths[replacePath].priority = 1;
+					} else {
+						attemptToContactAt(tPtr,path->localSocket(),path->address(),now,true,path->nextOutgoingCounter());
+						path->sent(now);
+						RR->t->peerConfirmingUnknownPath(tPtr,networkId,*this,path,packetId,verb);
+					}
+				}
 			}
 		}
 	}
@@ -258,11 +283,11 @@ SharedPtr<Path> Peer::getBestPath(int64_t now,bool includeExpired) const
 	Mutex::Lock _l(_paths_m);
 
 	unsigned int bestPath = ZT_MAX_PEER_NETWORK_PATHS;
-	int bestPathQuality = 2147483647; // INT_MAX
+	long bestPathQuality = 2147483647;
 	for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
 		if (_paths[i].p) {
 			if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) {
-				const int q = _paths[i].p->quality(now) / _paths[i].priority;
+				const long q = _paths[i].p->quality(now) / _paths[i].priority;
 				if (q <= bestPathQuality) {
 					bestPathQuality = q;
 					bestPath = i;
@@ -280,12 +305,12 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o
 {
 	unsigned int myBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
 	unsigned int myBestV6ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
-	int myBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
-	int myBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
+	long myBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
+	long myBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
 	unsigned int theirBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
 	unsigned int theirBestV6ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
-	int theirBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
-	int theirBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
+	long theirBestV4QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
+	long theirBestV6QualityByScope[ZT_INETADDRESS_MAX_SCOPE+1];
 	for(int i=0;i<=ZT_INETADDRESS_MAX_SCOPE;++i) {
 		myBestV4ByScope[i] = ZT_MAX_PEER_NETWORK_PATHS;
 		myBestV6ByScope[i] = ZT_MAX_PEER_NETWORK_PATHS;
@@ -301,7 +326,7 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o
 
 	for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
 		if (_paths[i].p) {
-			const int q = _paths[i].p->quality(now) / _paths[i].priority;
+			const long q = _paths[i].p->quality(now) / _paths[i].priority;
 			const unsigned int s = (unsigned int)_paths[i].p->ipScope();
 			switch(_paths[i].p->address().ss_family) {
 				case AF_INET:
@@ -324,7 +349,7 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o
 
 	for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
 		if (other->_paths[i].p) {
-			const int q = other->_paths[i].p->quality(now) / other->_paths[i].priority;
+			const long q = other->_paths[i].p->quality(now) / other->_paths[i].priority;
 			const unsigned int s = (unsigned int)other->_paths[i].p->ipScope();
 			switch(other->_paths[i].p->address().ss_family) {
 				case AF_INET:
@@ -471,19 +496,32 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
 	const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD);
 	_lastSentFullHello = now;
 
+	// Right now we only keep pinging links that have the maximum priority. The
+	// priority is used to track cluster redirections, meaning that when a cluster
+	// redirects us its redirect target links override all other links and we
+	// let those old links expire.
+	long maxPriority = 0;
+	for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+		if (_paths[i].p)
+			maxPriority = std::max(_paths[i].priority,maxPriority);
+		else break;
+	}
+
 	unsigned int j = 0;
 	for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
-		if (!_paths[i].p) break;
-		if ((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION) {
-			if ((sendFullHello)||(_paths[i].p->needsHeartbeat(now))) {
-				attemptToContactAt(tPtr,_paths[i].p->localSocket(),_paths[i].p->address(),now,sendFullHello,_paths[i].p->nextOutgoingCounter());
-				_paths[i].p->sent(now);
-				sent |= (_paths[i].p->address().ss_family == AF_INET) ? 0x1 : 0x2;
+		if (_paths[i].p) {
+			// Clean expired and reduced priority paths
+			if ( ((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION) && (_paths[i].priority == maxPriority) ) {
+				if ((sendFullHello)||(_paths[i].p->needsHeartbeat(now))) {
+					attemptToContactAt(tPtr,_paths[i].p->localSocket(),_paths[i].p->address(),now,sendFullHello,_paths[i].p->nextOutgoingCounter());
+					_paths[i].p->sent(now);
+					sent |= (_paths[i].p->address().ss_family == AF_INET) ? 0x1 : 0x2;
+				}
+				if (i != j)
+					_paths[j] = _paths[i];
+				++j;
 			}
-			if (i != j)
-				_paths[j] = _paths[i];
-			++j;
-		}
+		} else break;
 	}
 	while(j < ZT_MAX_PEER_NETWORK_PATHS) {
 		_paths[j].lr = 0;
@@ -495,35 +533,52 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
 	return sent;
 }
 
-void Peer::clusterRedirect(void *tPtr,const int64_t localSocket,const InetAddress &remoteAddress,const int64_t now)
+void Peer::clusterRedirect(void *tPtr,const SharedPtr<Path> &originatingPath,const InetAddress &remoteAddress,const int64_t now)
 {
-	SharedPtr<Path> np(RR->topology->getPath(localSocket,remoteAddress));
+	SharedPtr<Path> np(RR->topology->getPath(originatingPath->localSocket(),remoteAddress));
 	RR->t->peerRedirected(tPtr,0,*this,np);
-	attemptToContactAt(tPtr,localSocket,remoteAddress,now,true,np->nextOutgoingCounter());
+
+	attemptToContactAt(tPtr,originatingPath->localSocket(),remoteAddress,now,true,np->nextOutgoingCounter());
+
 	{
 		Mutex::Lock _l(_paths_m);
-		int worstQuality = 0;
-		unsigned int worstQualityPath = 0;
+
+		// New priority is higher than the priority of the originating path (if known)
+		long newPriority = 1;
 		for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
 			if (_paths[i].p) {
-				if (_paths[i].p == np) { // <-- where's my Fields Medal?
-					_paths[i].lr = now; // consider this a "receive"
-					_paths[i].priority += 5; // kind of arbitrary, bumps way up in best path quality order
-					return;
+				if (_paths[i].p == originatingPath) {
+					newPriority = _paths[i].priority;
+					break;
 				}
-				const int q = _paths[i].p->quality(now) / _paths[i].priority;
-				if (q >= worstQuality) {
-					worstQuality = q;
-					worstQualityPath = i;
+			} else break;
+		}
+		newPriority += 2;
+
+		// Erase any paths with lower priority than this one or that are duplicate
+		// IPs and add this path.
+		unsigned int j = 0;
+		for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
+			if (_paths[i].p) {
+				if ((_paths[i].priority >= newPriority)&&(!_paths[i].p->address().ipsEqual(remoteAddress))) {
+					if (i != j)
+						_paths[j] = _paths[i];
+					++j;
 				}
-			} else {
-				worstQualityPath = i;
-				break;
 			}
 		}
-		_paths[worstQualityPath].lr = now;
-		_paths[worstQualityPath].p = np;
-		_paths[worstQualityPath].priority = 6; // 1 + 5
+		if (j < ZT_MAX_PEER_NETWORK_PATHS) {
+			_paths[j].lr = now;
+			_paths[j].p = np;
+			_paths[j].priority = newPriority;
+			++j;
+			while (j < ZT_MAX_PEER_NETWORK_PATHS) {
+				_paths[j].lr = 0;
+				_paths[j].p.zero();
+				_paths[j].priority = 1;
+				++j;
+			}
+		}
 	}
 }
 

+ 3 - 3
node/Peer.hpp

@@ -219,11 +219,11 @@ public:
 	 * Process a cluster redirect sent by this peer
 	 *
 	 * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
-	 * @param localSocket Local socket as supplied by external code
+	 * @param originatingPath Path from which redirect originated
 	 * @param remoteAddress Remote address
 	 * @param now Current time
 	 */
-	void clusterRedirect(void *tPtr,const int64_t localSocket,const InetAddress &remoteAddress,const int64_t now);
+	void clusterRedirect(void *tPtr,const SharedPtr<Path> &originatingPath,const InetAddress &remoteAddress,const int64_t now);
 
 	/**
 	 * Reset paths within a given IP scope and address family
@@ -498,7 +498,7 @@ private:
 		_PeerPath() : lr(0),p(),priority(1) {}
 		int64_t lr; // time of last valid ZeroTier packet
 		SharedPtr<Path> p;
-		int priority; // >= 1, higher is better
+		long priority; // >= 1, higher is better
 	};
 
 	uint8_t _key[ZT_PEER_SECRET_KEY_LENGTH];