Adam Ierymenko преди 11 години
родител
ревизия
9e186bbd89
променени са 7 файла, в които са добавени 85 реда и са изтрити 56 реда
  1. 10 15
      node/Constants.hpp
  2. 41 24
      node/Multicaster.cpp
  3. 10 9
      node/Multicaster.hpp
  4. 1 1
      node/Network.cpp
  5. 1 1
      node/NetworkConfig.cpp
  6. 7 4
      node/OutboundMulticast.hpp
  7. 15 2
      node/Packet.hpp

+ 10 - 15
node/Constants.hpp

@@ -193,7 +193,7 @@
 /**
  * How often Topology::clean() and Network::clean() and similar are called, in ms
  */
-#define ZT_DB_CLEAN_PERIOD 300000
+#define ZT_DB_CLEAN_PERIOD 120000
 
 /**
  * How long to remember peer records in RAM if they haven't been used
@@ -238,19 +238,9 @@
 #define ZT_MULTICAST_LOCAL_POLL_PERIOD 10000
 
 /**
- * Minimum delay between multicast endpoint gathering attempts
- *
- * Actual delay will vary between MIN and MAX research rate depending on
- * how many endpoints we have -- MIN for 0, MAX for one less than limit.
- * If we have the limit of known multicast endpoints, no further attempts
- * to gather them are made.
- */
-#define ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN (ZT_MULTICAST_LIKE_EXPIRE / 60)
-
-/**
- * Maximum delay between multicast endpoint gathering attempts
+ * Delay between explicit MULTICAST_GATHER requests for a given multicast channel
  */
-#define ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MAX (ZT_MULTICAST_LIKE_EXPIRE / 2)
+#define ZT_MULTICAST_GATHER_DELAY (ZT_MULTICAST_LIKE_EXPIRE / 10)
 
 /**
  * Timeout for outgoing multicasts
@@ -258,12 +248,17 @@
  * Attempts will be made to gather recipients and send until we reach
  * the limit or sending times out.
  */
-#define ZT_MULTICAST_TRANSMIT_TIMEOUT (ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN * 3)
+#define ZT_MULTICAST_TRANSMIT_TIMEOUT (ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN * 2)
+
+/**
+ * Default number of endpoints to implicitly gather from peers with each multicast frame
+ */
+#define ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER 8
 
 /**
  * Default maximum number of peers to address with a single multicast (if unspecified in network)
  */
-#define ZT_DEFAULT_MULTICAST_LIMIT 64
+#define ZT_MULTICAST_DEFAULT_LIMIT 128
 
 /**
  * Delay between scans of the topology active peer DB for peers that need ping

+ 41 - 24
node/Multicaster.cpp

@@ -28,16 +28,18 @@
 #include <algorithm>
 
 #include "Constants.hpp"
+#include "SharedPtr.hpp"
 #include "Multicaster.hpp"
 #include "Topology.hpp"
 #include "Switch.hpp"
 #include "Packet.hpp"
+#include "Peer.hpp"
 #include "RuntimeEnvironment.hpp"
 
 namespace ZeroTier {
 
 Multicaster::Multicaster() :
-	_limit(ZT_DEFAULT_MULTICAST_LIMIT)
+	_limit(ZT_MULTICAST_DEFAULT_LIMIT)
 {
 }
 
@@ -54,29 +56,50 @@ void send(const RuntimeEnvironment *RR,uint64_t nwid,unsigned int limit,uint64_t
 		// If we already have enough members, just send and we're done -- no need for TX queue
 		OutboundMulticast out;
 
-		out.init(now,RR->identity.address(),nwid,src,mg,etherType,data,len);
-		for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++gs)
+		out.init(now,RR->identity.address(),nwid,ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER,src,mg,etherType,data,len);
+		unsigned int count = 0;
+		for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++gs) {
 			out.sendOnly(*(RR->sw),m->address);
+			if (++count >= limit)
+				break;
+		}
 	} else {
 		// If we don't already have enough members, send to the ones we have and then gather (if allowed within gather rate limit delay)
 		gs.txQueue.push_back(OutboundMulticast());
 		OutboundMulticast &out = gs.txQueue.back();
 
-		out.init(now,RR->identity.address(),nwid,src,mg,etherType,data,len);
+		out.init(now,RR->identity.address(),nwid,ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER,src,mg,etherType,data,len);
 		for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++gs)
 			out.sendAndLog(*(RR->sw),m->address);
 
-
+		if ((now - gs.lastExplicitGather) >= ZT_MULTICAST_GATHER_DELAY) {
+			gs.lastExplicitGather = now;
+
+			// Explicitly gather -- right now we only do this from supernodes since they
+			// know all multicast group memberships. In the future this might be more
+			// distributed somehow.
+			SharedPtr<Peer> sn(RR->topology->getBestSupernode());
+			if (sn) {
+				Packet outp(sn->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER);
+				outp.append(nwid);
+				outp.append((char)0); // TODO: include network membership cert
+				mg.mac().appendTo(outp);
+				outp.append((uint32_t)mg.adi());
+				outp.append((uint32_t)((limit - (unsigned int)gs.members.size()) + 1)); // +1 just means we'll have an extra in the queue if available
+				outp.armor(sn->key(),true);
+				sn->send(RR,outp.data(),outp.size(),now);
+			}
+		}
 	}
 }
 
-void Multicaster::clean(uint64_t now,const Topology &topology)
+void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now,unsigned int limit)
 {
 	Mutex::Lock _l(_groups_m);
 	for(std::map< MulticastGroup,MulticastGroupStatus >::iterator mm(_groups.begin());mm!=_groups.end();) {
 		// Remove expired outgoing multicasts from multicast TX queue
 		for(std::list<OutboundMulticast>::iterator tx(mm->second.txQueue.begin());tx!=mm->second.txQueue.end();) {
-			if (tx->expired(now))
+			if ((tx->expired(now))||(tx->sentToCount() >= limit))
 				mm->second.txQueue.erase(tx++);
 			else ++tx;
 		}
@@ -98,12 +121,12 @@ void Multicaster::clean(uint64_t now,const Topology &topology)
 				 * about them minus one day (a large constant) to put these at the bottom of the list.
 				 * List is sorted in ascending order of rank and multicasts are sent last-to-first. */
 				if (writer->learnedFrom) {
-					SharedPtr<Peer> p(topology.getPeer(writer->learnedFrom));
+					SharedPtr<Peer> p(RR->topology.getPeer(writer->learnedFrom));
 					if (p)
 						writer->rank = p->lastUnicastFrame() - ZT_MULTICAST_LIKE_EXPIRE;
 					else writer->rank = writer->timestamp - (86400000 + ZT_MULTICAST_LIKE_EXPIRE);
 				} else {
-					SharedPtr<Peer> p(topology.getPeer(writer->address));
+					SharedPtr<Peer> p(RR->topology.getPeer(writer->address));
 					if (p)
 						writer->rank = p->lastUnicastFrame();
 					else writer->rank = writer->timestamp - 86400000;
@@ -127,24 +150,18 @@ void Multicaster::clean(uint64_t now,const Topology &topology)
 	}
 }
 
-void Multicaster::_add(const RuntimeEnvironment *RR,const MulticastGroup &mg,const Address &learnedFrom,const Address &member)
+void Multicaster::_add(const RuntimeEnvironment *RR,uint64_t now,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member)
 {
 	// assumes _groups_m is locked
-}
-
-unsigned int Multicaster::_want(const MulticastGroup &mg,MulticastGroupStatus &gs,uint64_t now,unsigned int limit)
-{
-	if (gs.members.size() >= limit) {
-		// We already caught our limit, don't need to go fishing any more.
-		return 0;
-	} else {
-		// Compute the delay between fishing expeditions from the fraction of the limit that we already have.
-		const uint64_t rateDelay = (uint64_t)ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN + (uint64_t)(((double)gs.members.size() / (double)limit) * (double)(ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MAX - ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN));
-		if ((now - gs.lastGatheredMembers) >= rateDelay) {
-			gs.lastGatheredMembers = now;
-			return (limit - (unsigned int)gs.members.size());
-		} else return 0;
+	for(std::vector<MulticastGroupMember>::iterator m(gs.members.begin());m!=gs.members.end();++m) {
+		if (m->address == member) {
+			if (m->learnedFrom)
+				m->learnedFrom = learnedFrom; // only update with indirect learnedFrom if we've never directly learned from this peer
+			m->timestamp = now;
+			return;
+		}
 	}
+	gs.members.push_back(MulticastGroupMember(member,learnedFrom,now));
 }
 
 } // namespace ZeroTier

+ 10 - 9
node/Multicaster.hpp

@@ -56,11 +56,11 @@ private:
 	struct MulticastGroupMember
 	{
 		MulticastGroupMember() {}
-		MulticastGroupMember(const Address &a,const Address &lf,uint64_t ts) : address(a),learnedFrom(lf),timestamp(ts) {}
+		MulticastGroupMember(const Address &a,const Address &lf,uint64_t ts) : address(a),learnedFrom(lf),timestamp(ts),rank(0) {}
 
 		Address address;
 		Address learnedFrom; // NULL/0 for addresses directly learned from LIKE
-		uint64_t timestamp; // time of last LIKE or OK response to MULTICAST_LONELY
+		uint64_t timestamp; // time of last LIKE/OK(GATHER)
 		uint64_t rank; // used by sorting algorithm in clean()
 
 		// for sorting in ascending order of rank
@@ -69,9 +69,9 @@ private:
 
 	struct MulticastGroupStatus
 	{
-		MulticastGroupStatus() : lastGatheredMembers(0) {}
+		MulticastGroupStatus() : lastExplicitGather(0) {}
 
-		uint64_t lastGatheredMembers; // time we last gathered members
+		uint64_t lastExplicitGather; // time we last gathered members explicitly
 		std::list<OutboundMulticast> txQueue; // pending outbound multicasts
 		std::vector<MulticastGroupMember> members; // members of this group
 	};
@@ -84,14 +84,15 @@ public:
 	 * Add or update a member in a multicast group and send any pending multicasts
 	 *
 	 * @param RR Runtime environment
+	 * @param now Current time
 	 * @param mg Multicast group
 	 * @param learnedFrom Address from which we learned this member or NULL/0 Address if direct
 	 * @param member New member address
 	 */
-	inline void add(const RuntimeEnvironment *RR,const MulticastGroup &mg,const Address &learnedFrom,const Address &member)
+	inline void add(const RuntimeEnvironment *RR,uint64_t now,const MulticastGroup &mg,const Address &learnedFrom,const Address &member)
 	{
 		Mutex::Lock _l(_groups_m);
-		_add(RR,mg,learnedFrom,member);
+		_add(RR,uint64_t now,_groups[mg],learnedFrom,member);
 	}
 
 	/**
@@ -114,12 +115,12 @@ public:
 	 *
 	 * @param RR Runtime environment
 	 * @param now Current time
+	 * @param limit Multicast limit
 	 */
-	void clean(const RuntimeEnvironment *RR,uint64_t now);
+	void clean(const RuntimeEnvironment *RR,uint64_t now,unsigned int limit);
 
 private:
-	void _add(const RuntimeEnvironment *RR,const MulticastGroup &mg,const Address &learnedFrom,const Address &member);
-	unsigned int _want(const MulticastGroup &mg,MulticastGroupStatus &gs,uint64_t now,unsigned int limit);
+	void _add(const RuntimeEnvironment *RR,uint64_t now,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member);
 
 	std::map< MulticastGroup,MulticastGroupStatus > _groups;
 	Mutex _groups_m;

+ 1 - 1
node/Network.cpp

@@ -318,7 +318,7 @@ void Network::clean()
 		}
 	}
 	{
-		_multicastTopology.clean(now,*(RR->topology),(_config) ? _config->multicastLimit() : (unsigned int)ZT_DEFAULT_MULTICAST_LIMIT);
+		_multicastTopology.clean(now,*(RR->topology),(_config) ? _config->multicastLimit() : (unsigned int)ZT_MULTICAST_DEFAULT_LIMIT);
 	}
 }
 

+ 1 - 1
node/NetworkConfig.cpp

@@ -86,7 +86,7 @@ void NetworkConfig::_fromDictionary(const Dictionary &d)
 	_timestamp = Utils::hexStrToU64(d.get(ZT_NETWORKCONFIG_DICT_KEY_TIMESTAMP).c_str());
 	_issuedTo = Address(d.get(ZT_NETWORKCONFIG_DICT_KEY_ISSUED_TO));
 	_multicastLimit = Utils::hexStrToUInt(d.get(ZT_NETWORKCONFIG_DICT_KEY_MULTICAST_LIMIT,zero).c_str());
-	if (_multicastLimit == 0) _multicastLimit = ZT_DEFAULT_MULTICAST_LIMIT;
+	if (_multicastLimit == 0) _multicastLimit = ZT_MULTICAST_DEFAULT_LIMIT;
 	_allowPassiveBridging = (Utils::hexStrToUInt(d.get(ZT_NETWORKCONFIG_DICT_KEY_ALLOW_PASSIVE_BRIDGING,zero).c_str()) != 0);
 	_private = (Utils::hexStrToUInt(d.get(ZT_NETWORKCONFIG_DICT_KEY_PRIVATE,one).c_str()) != 0);
 	_enableBroadcast = (Utils::hexStrToUInt(d.get(ZT_NETWORKCONFIG_DICT_KEY_ENABLE_BROADCAST,one).c_str()) != 0);

+ 7 - 4
node/OutboundMulticast.hpp

@@ -63,6 +63,7 @@ public:
 	 * @param timestamp Creation time
 	 * @param self My ZeroTier address
 	 * @param nwid Network ID
+	 * @param gatherLimit Number to lazily/implicitly gather with this frame or 0 for none
 	 * @param src Source MAC address of frame
 	 * @param dest Destination multicast group (MAC + ADI)
 	 * @param etherType 16-bit Ethernet type ID
@@ -70,7 +71,7 @@ public:
 	 * @param len Length of data
 	 * @throws std::out_of_range Data too large to fit in a MULTICAST_FRAME
 	 */
-	inline void init(uint64_t timestamp,const Address &self,uint64_t nwid,const MAC &src,const MulticastGroup &dest,unsigned int etherType,const void *payload,unsigned int len)
+	inline void init(uint64_t timestamp,const Address &self,uint64_t nwid,unsigned int gatherLimit,const MAC &src,const MulticastGroup &dest,unsigned int etherType,const void *payload,unsigned int len)
 	{
 		_timestamp = timestamp;
 		_nwid = nwid;
@@ -79,7 +80,9 @@ public:
 		_etherType = etherType;
 		_packet.setSource(self);
 		_packet.setVerb(Packet::VERB_MULTICAST_FRAME);
-		_packet.append((char)0);
+		_packet.append((uint64_t)nwid);
+		_packet.append((char)0); // 0 flags
+		_packet.append((uint32_t)gatherLimit); // gather limit -- set before send, start with 0
 		_packet.append((uint32_t)dest.adi());
 		dest.mac().appendTo(_packet);
 		src.appendTo(_packet);
@@ -124,7 +127,7 @@ public:
 	inline void sendAndLog(Switch &sw,const Address &toAddr)
 	{
 		_alreadySentTo.push_back(toAddr);
-		sendOnly(sw,toAddr);
+		sendOnly(sw,toAddr,gatherLimit);
 	}
 
 	/**
@@ -140,7 +143,7 @@ public:
 			if (*a == toAddr)
 				return false;
 		}
-		sendAndLog(sw,toAddr);
+		sendAndLog(sw,toAddr,gatherLimit);
 		return true;
 	}
 

+ 15 - 2
node/Packet.hpp

@@ -660,8 +660,8 @@ public:
 		VERB_NETWORK_CONFIG_REFRESH = 12,
 
 		/* Request endpoints for multicast distribution:
-		 *   <[1] flags>
 		 *   <[8] 64-bit network ID>
+		 *   <[1] flags>
 		 *   <[6] MAC address of multicast group being queried>
 		 *   <[4] 32-bit ADI for multicast group being queried>
 		 *   <[4] 32-bit (suggested) max number of multicast peers desired or 0 for no limit>
@@ -683,6 +683,9 @@ public:
 		 *   <[2] 16-bit number of members enumerated in this packet>
 		 *   <[...] series of 5-byte ZeroTier addresses of enumerated members>
 		 *
+		 * If no endpoints are known, OK and ERROR are both optional. It's okay
+		 * to return nothing in that case since gathering is "lazy."
+		 *
 		 * ERROR response payload:
 		 *   <[8] 64-bit network ID>
 		 *   <[6] MAC address of multicast group being queried>
@@ -696,6 +699,7 @@ public:
 		/* Multicast frame:
 		 *   <[8] 64-bit network ID>
 		 *   <[1] flags (currently unused, must be 0)>
+		 *   <[4] 32-bit (suggested) gather limit or 0 for no gathering>
 		 *   <[4] 32-bit multicast ADI (note that this is out of order here -- it precedes MAC)>
 		 *   <[6] destination MAC or all zero for destination node>
 		 *   <[6] source MAC or all zero for node of origin>
@@ -705,7 +709,16 @@ public:
 		 * This is similar to EXT_FRAME but carries a multicast, and is sent
 		 * out to recipients on a multicast list.
 		 *
-		 * OK is not generated.
+		 * (ADI precedes MAC here so that everything from destination MAC forward
+		 * could be treated as a raw Ethernet frame.)
+		 *
+		 * OK response payload:
+		 *   <[1] flags>
+		 *   [... same as OK(GATHER) if flag 0x01 is set ...]
+		 *
+		 * Flags in OK are 0x01 for "gathering results returned," which can be
+		 * sent if a gather limit is specified in the original FRAME and there
+		 * are known endpoints to gather. This way frames can also gather.
 		 *
 		 * ERROR response payload:
 		 *   <[6] multicast group MAC>