浏览代码

Make multicast gather slightly more aggressive, and add total to list command in testnet.

Adam Ierymenko 10 年之前
父节点
当前提交
4dec598fb8
共有 4 个文件被更改,包括 43 次插入36 次删除
  1. 1 7
      node/Constants.hpp
  2. 36 27
      node/Multicaster.cpp
  3. 1 2
      node/Multicaster.hpp
  4. 5 0
      testnet.cpp

+ 1 - 7
node/Constants.hpp

@@ -242,16 +242,10 @@
  */
  */
 #define ZT_MULTICAST_EXPLICIT_GATHER_DELAY (ZT_MULTICAST_LIKE_EXPIRE / 10)
 #define ZT_MULTICAST_EXPLICIT_GATHER_DELAY (ZT_MULTICAST_LIKE_EXPIRE / 10)
 
 
-/**
- * Minimum delay between implicit gathers via MULTICAST_FRAME
- */
-#define ZT_MULTICAST_IMPLICIT_GATHER_DELAY 30000
-
 /**
 /**
  * Timeout for outgoing multicasts
  * Timeout for outgoing multicasts
  *
  *
- * Attempts will be made to gather recipients and send until we reach
- * the limit or sending times out.
+ * This is how long we wait for explicit or implicit gather results.
  */
  */
 #define ZT_MULTICAST_TRANSMIT_TIMEOUT 5000
 #define ZT_MULTICAST_TRANSMIT_TIMEOUT 5000
 
 

+ 36 - 27
node/Multicaster.cpp

@@ -56,7 +56,7 @@ unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const
 {
 {
 	unsigned char *p;
 	unsigned char *p;
 	unsigned int added = 0,i,k,rptr,totalKnown = 0;
 	unsigned int added = 0,i,k,rptr,totalKnown = 0;
-	uint64_t a,picked[(ZT_PROTO_MAX_PACKET_LENGTH / 5) + 1];
+	uint64_t a,picked[(ZT_PROTO_MAX_PACKET_LENGTH / 5) + 2];
 
 
 	if (!limit)
 	if (!limit)
 		return 0;
 		return 0;
@@ -88,6 +88,7 @@ unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const
 		k = 0;
 		k = 0;
 		while ((added < limit)&&(k < gs->second.members.size())&&((appendTo.size() + ZT_ADDRESS_LENGTH) <= ZT_PROTO_MAX_PACKET_LENGTH)) {
 		while ((added < limit)&&(k < gs->second.members.size())&&((appendTo.size() + ZT_ADDRESS_LENGTH) <= ZT_PROTO_MAX_PACKET_LENGTH)) {
 			rptr = (unsigned int)RR->prng->next32();
 			rptr = (unsigned int)RR->prng->next32();
+
 restart_member_scan:
 restart_member_scan:
 			a = gs->second.members[rptr % (unsigned int)gs->second.members.size()].address.toInt();
 			a = gs->second.members[rptr % (unsigned int)gs->second.members.size()].address.toInt();
 			for(i=0;i<k;++i) {
 			for(i=0;i<k;++i) {
@@ -176,22 +177,25 @@ void Multicaster::send(
 					continue;
 					continue;
 			}
 			}
 
 
-			if (count++ >= limit)
-				break;
 			out.sendOnly(RR,*ast);
 			out.sendOnly(RR,*ast);
+			if (++count >= limit)
+				break;
 		}
 		}
 
 
-		for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) {
-			{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
-				SharedPtr<Peer> p(RR->topology->getPeer(m->address));
-				if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
-					continue;
-			}
+		if (count < limit) {
+			for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) {
+				{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
+					SharedPtr<Peer> p(RR->topology->getPeer(m->address));
+					if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
+						continue;
+				}
 
 
-			if (count++ >= limit)
-				break;
-			if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m->address) == alwaysSendTo.end())
-				out.sendOnly(RR,m->address);
+				if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m->address) == alwaysSendTo.end()) {
+					out.sendOnly(RR,m->address);
+					if (++count >= limit)
+						break;
+				}
+			}
 		}
 		}
 	} else {
 	} else {
 		unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1;
 		unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1;
@@ -212,10 +216,6 @@ void Multicaster::send(
 				sn->send(RR,outp.data(),outp.size(),now);
 				sn->send(RR,outp.data(),outp.size(),now);
 			}
 			}
 			gatherLimit = 0; // implicit not needed
 			gatherLimit = 0; // implicit not needed
-		} else if ((now - gs.lastImplicitGather) > ZT_MULTICAST_IMPLICIT_GATHER_DELAY) {
-			gs.lastImplicitGather = now;
-		} else {
-			gatherLimit = 0;
 		}
 		}
 
 
 		gs.txQueue.push_back(OutboundMulticast());
 		gs.txQueue.push_back(OutboundMulticast());
@@ -234,6 +234,8 @@ void Multicaster::send(
 			data,
 			data,
 			len);
 			len);
 
 
+		unsigned int count = 0;
+
 		for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
 		for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
 			{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
 			{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
 				SharedPtr<Peer> p(RR->topology->getPeer(*ast));
 				SharedPtr<Peer> p(RR->topology->getPeer(*ast));
@@ -242,17 +244,24 @@ void Multicaster::send(
 			}
 			}
 
 
 			out.sendAndLog(RR,*ast);
 			out.sendAndLog(RR,*ast);
+			if (++count >= limit)
+				break;
 		}
 		}
 
 
-		for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) {
-			{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
-				SharedPtr<Peer> p(RR->topology->getPeer(m->address));
-				if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
-					continue;
-			}
+		if (count < limit) {
+			for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) {
+				{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
+					SharedPtr<Peer> p(RR->topology->getPeer(m->address));
+					if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
+						continue;
+				}
 
 
-			if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m->address) == alwaysSendTo.end())
-				out.sendAndLog(RR,m->address);
+				if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m->address) == alwaysSendTo.end()) {
+					out.sendAndLog(RR,m->address);
+					if (++count >= limit)
+						break;
+				}
+			}
 		}
 		}
 	}
 	}
 
 
@@ -331,12 +340,12 @@ void Multicaster::clean(uint64_t now)
 				if (writer->learnedFrom) {
 				if (writer->learnedFrom) {
 					SharedPtr<Peer> p(RR->topology->getPeer(writer->learnedFrom));
 					SharedPtr<Peer> p(RR->topology->getPeer(writer->learnedFrom));
 					if (p)
 					if (p)
-						writer->rank = p->lastUnicastFrame() - ZT_MULTICAST_LIKE_EXPIRE;
+						writer->rank = (RR->topology->amSupernode() ? p->lastDirectReceive() : p->lastUnicastFrame()) - ZT_MULTICAST_LIKE_EXPIRE;
 					else writer->rank = writer->timestamp - (86400000 + ZT_MULTICAST_LIKE_EXPIRE);
 					else writer->rank = writer->timestamp - (86400000 + ZT_MULTICAST_LIKE_EXPIRE);
 				} else {
 				} else {
 					SharedPtr<Peer> p(RR->topology->getPeer(writer->address));
 					SharedPtr<Peer> p(RR->topology->getPeer(writer->address));
 					if (p)
 					if (p)
-						writer->rank = p->lastUnicastFrame();
+						writer->rank = (RR->topology->amSupernode() ? p->lastDirectReceive() : p->lastUnicastFrame());
 					else writer->rank = writer->timestamp - 86400000;
 					else writer->rank = writer->timestamp - 86400000;
 				}
 				}
 
 

+ 1 - 2
node/Multicaster.hpp

@@ -72,10 +72,9 @@ private:
 
 
 	struct MulticastGroupStatus
 	struct MulticastGroupStatus
 	{
 	{
-		MulticastGroupStatus() : lastExplicitGather(0),lastImplicitGather(0) {}
+		MulticastGroupStatus() : lastExplicitGather(0) {}
 
 
 		uint64_t lastExplicitGather;
 		uint64_t lastExplicitGather;
-		uint64_t lastImplicitGather;
 		std::list<OutboundMulticast> txQueue; // pending outbound multicasts
 		std::list<OutboundMulticast> txQueue; // pending outbound multicasts
 		std::vector<MulticastGroupMember> members; // members of this group
 		std::vector<MulticastGroupMember> members; // members of this group
 	};
 	};

+ 5 - 0
testnet.cpp

@@ -277,6 +277,7 @@ static void doMKN(const std::vector<std::string> &cmd)
 
 
 static void doList(const std::vector<std::string> &cmd)
 static void doList(const std::vector<std::string> &cmd)
 {
 {
+	unsigned int peers = 0,supernodes = 0;
 	ZT1_Node_Status status;
 	ZT1_Node_Status status;
 	for(std::map< Address,SimNode * >::iterator n(nodes.begin());n!=nodes.end();++n) {
 	for(std::map< Address,SimNode * >::iterator n(nodes.begin());n!=nodes.end();++n) {
 		n->second->node.status(&status);
 		n->second->node.status(&status);
@@ -287,8 +288,12 @@ static void doList(const std::vector<std::string> &cmd)
 				(status.online ? "ONLINE" : "OFFLINE"),
 				(status.online ? "ONLINE" : "OFFLINE"),
 				status.knownPeers,
 				status.knownPeers,
 				status.directlyConnectedPeers);
 				status.directlyConnectedPeers);
+			if (n->second->supernode)
+				++supernodes;
+			else ++peers;
 		} else printf("%s ? INITIALIZING (0 peers, 0 direct links)"ZT_EOL_S,n->first.toString().c_str());
 		} else printf("%s ? INITIALIZING (0 peers, 0 direct links)"ZT_EOL_S,n->first.toString().c_str());
 	}
 	}
+	printf("---------- %u regular peers, %u supernodes"ZT_EOL_S,peers,supernodes);
 }
 }
 
 
 static void doJoin(const std::vector<std::string> &cmd)
 static void doJoin(const std::vector<std::string> &cmd)