Browse Source

(1) No need to confirm if we are a root (small optimization), (2) Refactor peer affinity tracking.

Adam Ierymenko 9 years ago
parent
commit
cc6080fe38
3 changed files with 98 additions and 131 deletions
  1. 55 75
      node/Cluster.cpp
  2. 9 20
      node/Cluster.hpp
  3. 34 36
      node/Peer.cpp

+ 55 - 75
node/Cluster.cpp

@@ -82,7 +82,8 @@ Cluster::Cluster(
 	_z(z),
 	_id(id),
 	_zeroTierPhysicalEndpoints(zeroTierPhysicalEndpoints),
-	_members(new _Member[ZT_CLUSTER_MAX_MEMBERS])
+	_members(new _Member[ZT_CLUSTER_MAX_MEMBERS]),
+	_peerAffinities(65536)
 {
 	uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
 
@@ -214,19 +215,12 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
 								ptr += id.deserialize(dmsg,ptr);
 								if (id) {
 									RR->topology->saveIdentity(id);
-
-									{	// Add or update peer affinity entry
-										_PeerAffinity pa(id.address(),fromMemberId,RR->node->now());
+									{
 										Mutex::Lock _l2(_peerAffinities_m);
-										std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
-										if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
-											i->timestamp = pa.timestamp;
-										} else {
-											_peerAffinities.push_back(pa);
-											std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
-										}
+										_PA &pa = _peerAffinities[id.address()];
+										pa.ts = RR->node->now();
+										pa.mid = fromMemberId;
 			 						}
-
 			 						TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str());
 			 					}
 							} catch ( ... ) {
@@ -355,83 +349,66 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
 	if (len > 16384) // sanity check
 		return false;
 
-	uint64_t mostRecentTimestamp = 0;
+	const uint64_t now = RR->node->now();
 	unsigned int canHasPeer = 0;
 
 	{	// Anyone got this peer?
 		Mutex::Lock _l2(_peerAffinities_m);
-		std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),_PeerAffinity(toPeerAddress,0,0))); // O(log(n))
-		while ((i != _peerAffinities.end())&&(i->address() == toPeerAddress)) {
-			const uint16_t mid = i->clusterMemberId();
-			if ((mid != _id)&&(i->timestamp > mostRecentTimestamp)) {
-				mostRecentTimestamp = i->timestamp;
-				canHasPeer = mid;
-			}
-			++i;
-		}
+		_PA *pa = _peerAffinities.get(toPeerAddress);
+		if ((pa)&&(pa->mid != _id)&&((now - pa->ts) < ZT_PEER_ACTIVITY_TIMEOUT))
+			canHasPeer = pa->mid;
+		else return false;
 	}
 
-	const uint64_t now = RR->node->now();
-	if ((now - mostRecentTimestamp) < ZT_PEER_ACTIVITY_TIMEOUT) {
-		Buffer<2048> buf;
-
-		if (unite) {
-			InetAddress v4,v6;
-			if (fromPeerAddress) {
-				SharedPtr<Peer> fromPeer(RR->topology->getPeer(fromPeerAddress));
-				if (fromPeer)
-					fromPeer->getBestActiveAddresses(now,v4,v6);
-			}
-			uint8_t addrCount = 0;
+	Buffer<2048> buf;
+	if (unite) {
+		InetAddress v4,v6;
+		if (fromPeerAddress) {
+			SharedPtr<Peer> fromPeer(RR->topology->getPeer(fromPeerAddress));
+			if (fromPeer)
+				fromPeer->getBestActiveAddresses(now,v4,v6);
+		}
+		uint8_t addrCount = 0;
+		if (v4)
+			++addrCount;
+		if (v6)
+			++addrCount;
+		if (addrCount) {
+			toPeerAddress.appendTo(buf);
+			fromPeerAddress.appendTo(buf);
+			buf.append(addrCount);
 			if (v4)
-				++addrCount;
+				v4.serialize(buf);
 			if (v6)
-				++addrCount;
-			if (addrCount) {
-				toPeerAddress.appendTo(buf);
-				fromPeerAddress.appendTo(buf);
-				buf.append(addrCount);
-				if (v4)
-					v4.serialize(buf);
-				if (v6)
-					v6.serialize(buf);
-			}
+				v6.serialize(buf);
 		}
+	}
+	{
+		Mutex::Lock _l2(_members[canHasPeer].lock);
+		if (buf.size() > 0)
+			_send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
+		if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0)
+			RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len);
+	}
 
-		{
-			Mutex::Lock _l2(_members[canHasPeer].lock);
-			if (buf.size() > 0)
-				_send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
-			if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0)
-				RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len);
-		}
+	TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer);
 
-		TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer);
-		return true;
-	} else {
-		TRACE("sendViaCluster(): unable to relay %u bytes from %s to %s since no cluster members seem to have it!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
-		return false;
-	}
+	return true;
 }
 
 void Cluster::replicateHavePeer(const Identity &peerId)
 {
+	const uint64_t now = RR->node->now();
 	{	// Use peer affinity table to track our own last announce time for peers
-		_PeerAffinity pa(peerId.address(),_id,RR->node->now());
 		Mutex::Lock _l2(_peerAffinities_m);
-		std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
-		if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
-			if ((pa.timestamp - i->timestamp) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) {
-				i->timestamp = pa.timestamp;
-				// continue to announcement
-			} else {
-				// we've already announced this peer recently, so skip
-				return;
-			}
+		_PA &pa = _peerAffinities[peerId.address()];
+		if (pa.mid != _id) {
+			pa.ts = now;
+			pa.mid = _id;
+		} else if ((now - pa.ts) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) {
+			return;
 		} else {
-			_peerAffinities.push_back(pa);
-			std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
-			// continue to announcement
+			pa.ts = now;
 		}
 	}
 
@@ -598,6 +575,7 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr
 			}
 		}
 
+		// Suggestion redirection if a closer member was found
 		for(std::vector<InetAddress>::const_iterator a(best.begin());a!=best.end();++a) {
 			if (a->ss_family == peerPhysicalAddress.ss_family) {
 				TRACE("%s at [%d,%d,%d] is %f from us but %f from %u, can redirect to %s",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestDistance,bestMember,a->toString().c_str());
@@ -661,10 +639,12 @@ void Cluster::status(ZT_ClusterStatus &status) const
 
 	{
 		Mutex::Lock _l2(_peerAffinities_m);
-		for(std::vector<_PeerAffinity>::const_iterator pi(_peerAffinities.begin());pi!=_peerAffinities.end();++pi) {
-			unsigned int mid = pi->clusterMemberId();
-			if ((ms[mid])&&(mid != _id)&&((now - pi->timestamp) < ZT_PEER_ACTIVITY_TIMEOUT))
-				++ms[mid]->peers;
+		Address *k = (Address *)0;
+		_PA *v = (_PA *)0;
+		Hashtable< Address,_PA >::Iterator i(const_cast<Cluster *>(this)->_peerAffinities);
+		while (i.next(k,v)) {
+			if ( (ms[v->mid]) && (v->mid != _id) && ((now - v->ts) < ZT_PEER_ACTIVITY_TIMEOUT) )
+				++ms[v->mid]->peers;
 		}
 	}
 }

+ 9 - 20
node/Cluster.hpp

@@ -52,7 +52,7 @@
 /**
  * How often should we announce that we have a peer?
  */
-#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD 60000
+#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD 30000
 
 /**
  * Desired period between doPeriodicTasks() in milliseconds
@@ -285,7 +285,7 @@ private:
 	void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len);
 	void _flush(uint16_t memberId);
 
-	// These are initialized in the constructor and remain static
+	// These are initialized in the constructor and remain immutable
 	uint16_t _masterSecret[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
 	unsigned char _key[ZT_PEER_SECRET_KEY_LENGTH];
 	const RuntimeEnvironment *RR;
@@ -298,6 +298,7 @@ private:
 	const int32_t _z;
 	const uint16_t _id;
 	const std::vector<InetAddress> _zeroTierPhysicalEndpoints;
+	// end immutable fields
 
 	struct _Member
 	{
@@ -330,30 +331,18 @@ private:
 		_Member() { this->clear(); }
 		~_Member() { Utils::burn(key,sizeof(key)); }
 	};
-
-	_Member *const _members; // cluster IDs can be from 0 to 65535 (16-bit)
+	_Member *const _members;
 
 	std::vector<uint16_t> _memberIds;
 	Mutex _memberIds_m;
 
-	// Record tracking which members have which peers and how recently they claimed this -- also used to track our last claimed time
-	struct _PeerAffinity
+	struct _PA
 	{
-		_PeerAffinity(const Address &a,uint16_t mid,uint64_t ts) :
-			key((a.toInt() << 16) | (uint64_t)mid),
-			timestamp(ts) {}
-
-		uint64_t key;
-		uint64_t timestamp;
-
-		inline Address address() const throw() { return Address(key >> 16); }
-		inline uint16_t clusterMemberId() const throw() { return (uint16_t)(key & 0xffff); }
-
-		inline bool operator<(const _PeerAffinity &pi) const throw() { return (key < pi.key); }
+		_PA() : ts(0),mid(0xffff) {}
+		uint64_t ts;
+		uint16_t mid;
 	};
-
-	// A memory-efficient packed map of _PeerAffinity records searchable with std::binary_search() and std::lower_bound()
-	std::vector<_PeerAffinity> _peerAffinities;
+	Hashtable< Address,_PA > _peerAffinities;
 	Mutex _peerAffinities_m;
 };
 

+ 34 - 36
node/Peer.cpp

@@ -86,43 +86,41 @@ void Peer::received(
 		// Note: findBetterEndpoint() is first since we still want to check
 		// for a better endpoint even if we don't actually send a redirect.
 		if ( (RR->cluster->findBetterEndpoint(redirectTo,_id.address(),remoteAddr,false)) && (verb != Packet::VERB_OK)&&(verb != Packet::VERB_ERROR)&&(verb != Packet::VERB_RENDEZVOUS)&&(verb != Packet::VERB_PUSH_DIRECT_PATHS) ) {
-			if ((redirectTo.ss_family == AF_INET)||(redirectTo.ss_family == AF_INET6)) {
-				if (_vProto >= 5) {
-					// For newer peers we can send a more idiomatic verb: PUSH_DIRECT_PATHS.
-					Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS);
-					outp.append((uint16_t)1); // count == 1
-					outp.append((uint8_t)0); // no flags
-					outp.append((uint16_t)0); // no extensions
-					if (redirectTo.ss_family == AF_INET) {
-						outp.append((uint8_t)4);
-						outp.append((uint8_t)6);
-						outp.append(redirectTo.rawIpData(),4);
-					} else {
-						outp.append((uint8_t)6);
-						outp.append((uint8_t)18);
-						outp.append(redirectTo.rawIpData(),16);
-					}
-					outp.append((uint16_t)redirectTo.port());
-					outp.armor(_key,true);
-					RR->antiRec->logOutgoingZT(outp.data(),outp.size());
-					RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size());
+			if (_vProto >= 5) {
+				// For newer peers we can send a more idiomatic verb: PUSH_DIRECT_PATHS.
+				Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS);
+				outp.append((uint16_t)1); // count == 1
+				outp.append((uint8_t)0); // no flags
+				outp.append((uint16_t)0); // no extensions
+				if (redirectTo.ss_family == AF_INET) {
+					outp.append((uint8_t)4);
+					outp.append((uint8_t)6);
+					outp.append(redirectTo.rawIpData(),4);
 				} else {
-					// For older peers we use RENDEZVOUS to coax them into contacting us elsewhere.
-					Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS);
-					outp.append((uint8_t)0); // no flags
-					RR->identity.address().appendTo(outp);
-					outp.append((uint16_t)redirectTo.port());
-					if (redirectTo.ss_family == AF_INET) {
-						outp.append((uint8_t)4);
-						outp.append(redirectTo.rawIpData(),4);
-					} else {
-						outp.append((uint8_t)16);
-						outp.append(redirectTo.rawIpData(),16);
-					}
-					outp.armor(_key,true);
-					RR->antiRec->logOutgoingZT(outp.data(),outp.size());
-					RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size());
+					outp.append((uint8_t)6);
+					outp.append((uint8_t)18);
+					outp.append(redirectTo.rawIpData(),16);
 				}
+				outp.append((uint16_t)redirectTo.port());
+				outp.armor(_key,true);
+				RR->antiRec->logOutgoingZT(outp.data(),outp.size());
+				RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size());
+			} else {
+				// For older peers we use RENDEZVOUS to coax them into contacting us elsewhere.
+				Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS);
+				outp.append((uint8_t)0); // no flags
+				RR->identity.address().appendTo(outp);
+				outp.append((uint16_t)redirectTo.port());
+				if (redirectTo.ss_family == AF_INET) {
+					outp.append((uint8_t)4);
+					outp.append(redirectTo.rawIpData(),4);
+				} else {
+					outp.append((uint8_t)16);
+					outp.append(redirectTo.rawIpData(),16);
+				}
+				outp.armor(_key,true);
+				RR->antiRec->logOutgoingZT(outp.data(),outp.size());
+				RR->node->putPacket(localAddr,remoteAddr,outp.data(),outp.size());
 			}
 		}
 	}
@@ -167,7 +165,7 @@ void Peer::received(
 			}
 
 			if (!pathIsConfirmed) {
-				if (verb == Packet::VERB_OK) {
+				if ((verb == Packet::VERB_OK)||(RR->topology->amRoot())) {
 
 					Path *slot = (Path *)0;
 					if (np < ZT_MAX_PEER_NETWORK_PATHS) {