Adam Ierymenko 11 tahun lalu
induk
melakukan
050a0ce85d
7 mengubah file dengan 104 tambahan dan 106 penghapusan
  1. 1 1
      node/Constants.hpp
  2. 1 1
      node/IncomingPacket.hpp
  3. 56 38
      node/Multicaster.cpp
  4. 17 52
      node/Multicaster.hpp
  5. 0 6
      node/Network.hpp
  6. 26 5
      node/OutboundMulticast.hpp
  7. 3 3
      node/Peer.hpp

+ 1 - 1
node/Constants.hpp

@@ -245,7 +245,7 @@
  * If we have the limit of known multicast endpoints, no further attempts
  * to gather them are made.
  */
-#define ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN (ZT_MULTICAST_LIKE_EXPIRE / 50)
+#define ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN (ZT_MULTICAST_LIKE_EXPIRE / 60)
 
 /**
  * Maximum delay between multicast endpoint gathering attempts

+ 1 - 1
node/IncomingPacket.hpp

@@ -97,7 +97,7 @@ public:
 	 * Once true is returned, this must not be called again. The packet's state
 	 * may no longer be valid.
 	 *
-	 * @param _r Runtime environment
+	 * @param RR Runtime environment
 	 * @return True if decoding and processing is complete, false if caller should try again
 	 * @throws std::out_of_range Range error processing packet (should be discarded)
 	 * @throws std::runtime_error Other error processing packet (should be discarded)

+ 56 - 38
node/Multicaster.cpp

@@ -30,10 +30,14 @@
 #include "Constants.hpp"
 #include "Multicaster.hpp"
 #include "Topology.hpp"
+#include "Switch.hpp"
+#include "Packet.hpp"
+#include "RuntimeEnvironment.hpp"
 
 namespace ZeroTier {
 
-Multicaster::Multicaster()
+Multicaster::Multicaster() :
+	_limit(ZT_DEFAULT_MULTICAST_LIMIT)
 {
 }
 
@@ -41,48 +45,28 @@ Multicaster::~Multicaster()
 {
 }
 
-void Multicaster::add(const MulticastGroup &mg,const Address &learnedFrom,const Address &member)
-{
-}
-
-void Multicaster::erase(const MulticastGroup &mg,const Address &member)
+void send(const RuntimeEnvironment *RR,uint64_t nwid,unsigned int limit,uint64_t now,const MulticastGroup &mg,const MAC &src,unsigned int etherType,const void *data,unsigned int len)
 {
 	Mutex::Lock _l(_groups_m);
-	std::map< MulticastGroup,MulticastGroupStatus >::iterator r(_groups.find(mg));
-	if (r != _groups.end()) {
-		for(std::vector<MulticastGroupMember>::iterator m(r->second.members.begin());m!=r->second.members.end();++m) {
-			if (m->address == member) {
-				r->second.members.erase(m);
-				if (r->second.members.empty())
-					_groups.erase(r);
-				return;
-			}
-		}
-	}
-}
+	MulticastGroupStatus &gs = _groups[mg];
 
-void send(uint64_t nwid,uint64_t now,const Address &self,const MulticastGroup &mg,const MAC &from,unsigned int etherType,const void *data,unsigned int len)
-{
-	Mutex::Lock _l(_groups_m);
-	std::map< MulticastGroup,MulticastGroupStatus >::iterator r(_groups.find(mg));
-}
+	if (gs.members.size() >= limit) {
+		// If we already have enough members, just send and we're done -- no need for TX queue
+		OutboundMulticast out;
 
-unsigned int Multicaster::shouldGather(const MulticastGroup &mg,uint64_t now,unsigned int limit,bool updateLastGatheredTimeOnNonzeroReturn)
-{
-	Mutex::Lock _l(_groups_m);
-	MulticastGroupStatus &gs = _groups[mg];
-	if ((unsigned int)gs.members.size() >= limit) {
-		// We already caught our limit, don't need to go fishing any more.
-		return 0;
+		out.init(now,RR->identity.address(),nwid,src,mg,etherType,data,len);
+		for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++gs)
+			out.sendOnly(*(RR->sw),m->address);
 	} else {
-		// Compute the delay between fishing expeditions from the fraction of the limit that we already have.
-		const uint64_t rateDelay = (uint64_t)ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN + (uint64_t)(((double)gs.members.size() / (double)limit) * (double)(ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MAX - ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN));
+		// If we don't already have enough members, send to the ones we have and then gather (if allowed within gather rate limit delay)
+		gs.txQueue.push_back(OutboundMulticast());
+		OutboundMulticast &out = gs.txQueue.back();
+
+		out.init(now,RR->identity.address(),nwid,src,mg,etherType,data,len);
+		for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++gs)
+			out.sendAndLog(*(RR->sw),m->address);
+
 
-		if ((now - gs.lastGatheredMembers) >= rateDelay) {
-			if (updateLastGatheredTimeOnNonzeroReturn)
-				gs.lastGatheredMembers = now;
-			return (limit - (unsigned int)gs.members.size());
-		} else return 0;
 	}
 }
 
@@ -90,6 +74,16 @@ void Multicaster::clean(uint64_t now,const Topology &topology)
 {
 	Mutex::Lock _l(_groups_m);
 	for(std::map< MulticastGroup,MulticastGroupStatus >::iterator mm(_groups.begin());mm!=_groups.end();) {
+		// Remove expired outgoing multicasts from multicast TX queue
+		for(std::list<OutboundMulticast>::iterator tx(mm->second.txQueue.begin());tx!=mm->second.txQueue.end();) {
+			if (tx->expired(now))
+				mm->second.txQueue.erase(tx++);
+			else ++tx;
+		}
+
+		// Remove expired members from membership list, and update rank
+		// so that remaining members can be sorted in ascending order of
+		// transmit priority.
 		std::vector<MulticastGroupMember>::iterator reader(mm->second.members.begin());
 		std::vector<MulticastGroupMember>::iterator writer(mm->second.members.begin());
 		unsigned int count = 0;
@@ -122,10 +116,34 @@ void Multicaster::clean(uint64_t now,const Topology &topology)
 		}
 
 		if (count) {
+			// There are remaining members, so re-sort them by rank and resize the vector
 			std::sort(mm->second.members.begin(),writer); // sorts in ascending order of rank
 			mm->second.members.resize(count); // trim off the ones we cut, after writer
 			++mm;
-		} else _groups.erase(mm++);
+		} else if (mm->second.txQueue.empty()) {
+			// There are no remaining members and no pending multicasts, so erase the entry
+			_groups.erase(mm++);
+		} else ++mm;
+	}
+}
+
+void Multicaster::_add(const RuntimeEnvironment *RR,const MulticastGroup &mg,const Address &learnedFrom,const Address &member)
+{
+	// assumes _groups_m is locked
+}
+
+unsigned int Multicaster::_want(const MulticastGroup &mg,MulticastGroupStatus &gs,uint64_t now,unsigned int limit)
+{
+	if (gs.members.size() >= limit) {
+		// We already caught our limit, don't need to go fishing any more.
+		return 0;
+	} else {
+		// Compute the delay between fishing expeditions from the fraction of the limit that we already have.
+		const uint64_t rateDelay = (uint64_t)ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN + (uint64_t)(((double)gs.members.size() / (double)limit) * (double)(ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MAX - ZT_MULTICAST_TOPOLOGY_GATHER_DELAY_MIN));
+		if ((now - gs.lastGatheredMembers) >= rateDelay) {
+			gs.lastGatheredMembers = now;
+			return (limit - (unsigned int)gs.members.size());
+		} else return 0;
 	}
 }
 

+ 17 - 52
node/Multicaster.hpp

@@ -40,13 +40,12 @@
 #include "MAC.hpp"
 #include "MulticastGroup.hpp"
 #include "OutboundMulticast.hpp"
-#include "Switch.hpp"
 #include "Utils.hpp"
 #include "Mutex.hpp"
 
 namespace ZeroTier {
 
-class Topology;
+class RuntimeEnvironment;
 
 /**
  * Database of known multicast peers within a network
@@ -73,8 +72,8 @@ private:
 		MulticastGroupStatus() : lastGatheredMembers(0) {}
 
 		uint64_t lastGatheredMembers; // time we last gathered members
-		std::vector<MulticastGroupMember> members; // members of this group
 		std::list<OutboundMulticast> txQueue; // pending outbound multicasts
+		std::vector<MulticastGroupMember> members; // members of this group
 	};
 
 public:
@@ -82,80 +81,46 @@ public:
 	~Multicaster();
 
 	/**
-	 * Add or update a member in a multicast group
+	 * Add or update a member in a multicast group and send any pending multicasts
 	 *
+	 * @param RR Runtime environment
 	 * @param mg Multicast group
 	 * @param learnedFrom Address from which we learned this member or NULL/0 Address if direct
 	 * @param member New member address
 	 */
-	void add(const MulticastGroup &mg,const Address &learnedFrom,const Address &member);
-
-	/**
-	 * Erase a member from a multicast group (if present)
-	 *
-	 * @param mg Multicast group
-	 * @param member Member to erase
-	 */
-	void erase(const MulticastGroup &mg,const Address &member);
+	inline void add(const RuntimeEnvironment *RR,const MulticastGroup &mg,const Address &learnedFrom,const Address &member)
+	{
+		Mutex::Lock _l(_groups_m);
+		_add(RR,mg,learnedFrom,member);
+	}
 
 	/**
 	 * Send a multicast
 	 *
+	 * @param RR Runtime environment
 	 * @param nwid Network ID
+	 * @param limit Multicast limit
 	 * @param now Current time
-	 * @param sw Switch to use for sending packets
-	 * @param self This node's address
 	 * @param mg Multicast group
 	 * @param from Source Ethernet MAC address
 	 * @param etherType Ethernet frame type
 	 * @param data Packet data
 	 * @param len Length of packet data
 	 */
-	void send(uint64_t nwid,uint64_t now,const Switch &sw,const Address &self,const MulticastGroup &mg,const MAC &from,unsigned int etherType,const void *data,unsigned int len);
-
-	/**
-	 * @param mg Multicast group
-	 * @return Tuple of: time we last gathered members (or 0 for never) and number of known members
-	 */
-	inline std::pair<uint64_t,unsigned int> groupStatus(const MulticastGroup &mg) const
-	{
-		Mutex::Lock _l(_groups_m);
-		std::map< MulticastGroup,MulticastGroupStatus >::const_iterator r(_groups.find(mg));
-		return ((r != _groups.end()) ? std::pair<uint64_t,unsigned int>(r->second.lastGatheredMembers,r->second.members.size()) : std::pair<uint64_t,unsigned int>(0,0));
-	}
-
-	/**
-	 * Return the number of new members we should want to gather or 0 for none
-	 *
-	 * @param mg Multicast group
-	 * @param now Current time
-	 * @param limit The maximum number we want per multicast group on this network
-	 * @param updateLastGatheredTimeOnNonzeroReturn If true, reset group's last gathered time to 'now' on non-zero return
-	 */
-	unsigned int shouldGather(const MulticastGroup &mg,uint64_t now,unsigned int limit,bool updateLastGatheredTimeOnNonzeroReturn);
-
-	/**
-	 * Update last gathered members time for a group
-	 *
-	 * @param mg Multicast group
-	 * @param now Current time
-	 */
-	inline void gatheringMembersNow(const MulticastGroup &mg,uint64_t now)
-	{
-		Mutex::Lock _l(_groups_m);
-		_groups[mg].lastGatheredMembers = now;
-	}
+	void send(const RuntimeEnvironment *RR,uint64_t nwid,unsigned int limit,uint64_t now,const MulticastGroup &mg,const MAC &src,unsigned int etherType,const void *data,unsigned int len);
 
 	/**
 	 * Clean up and resort database
 	 *
+	 * @param RR Runtime environment
 	 * @param now Current time
-	 * @param topology Global peer topology
-	 * @param trim Trim lists to a maximum of this many members per multicast group
 	 */
-	void clean(uint64_t now,const Topology &topology);
+	void clean(const RuntimeEnvironment *RR,uint64_t now);
 
 private:
+	void _add(const RuntimeEnvironment *RR,const MulticastGroup &mg,const Address &learnedFrom,const Address &member);
+	unsigned int _want(const MulticastGroup &mg,MulticastGroupStatus &gs,uint64_t now,unsigned int limit);
+
 	std::map< MulticastGroup,MulticastGroupStatus > _groups;
 	Mutex _groups_m;
 };

+ 0 - 6
node/Network.hpp

@@ -413,12 +413,6 @@ public:
 	 */
 	void setEnabled(bool enabled);
 
-	/**
-	 * @return Multicast topology for this network
-	 */
-	inline Multicaster &mc() { return _multicaster; }
-	inline const Multicaster &mc() const { return _multicaster; }
-
 	/**
 	 * Destroy this network
 	 *

+ 26 - 5
node/OutboundMulticast.hpp

@@ -102,7 +102,30 @@ public:
 	/**
 	 * @return Number of unique recipients to which this packet has already been sent
 	 */
-	inline unsigned int sendCount() const throw() { return (unsigned int)_alreadySentTo.size(); }
+	inline unsigned int sentToCount() const throw() { return (unsigned int)_alreadySentTo.size(); }
+
+	/**
+	 * Just send without checking log
+	 *
+	 * @param sw Switch instance to send packets
+	 * @param toAddr Destination address
+	 */
+	inline void sendOnly(Switch &sw,const Address &toAddr)
+	{
+		sw.send(Packet(_packet,toAddr),true);
+	}
+
+	/**
+	 * Just send and log but do not check sent log
+	 *
+	 * @param sw Switch instance to send packets
+	 * @param toAddr Destination address
+	 */
+	inline void sendAndLog(Switch &sw,const Address &toAddr)
+	{
+		_alreadySentTo.push_back(toAddr);
+		sendOnly(sw,toAddr);
+	}
 
 	/**
 	 * Try to send this to a given peer if it hasn't been sent to them already
@@ -111,15 +134,13 @@ public:
 	 * @param toAddr Destination address
 	 * @return True if address is new and packet was sent to switch, false if duplicate
 	 */
-	inline bool send(Switch &sw,const Address &toAddr)
+	inline bool sendIfNew(Switch &sw,const Address &toAddr)
 	{
-		// If things get really big, we can go to a sorted vector or a flat_set implementation
 		for(std::vector<Address>::iterator a(_alreadySentTo.begin());a!=_alreadySentTo.end();++a) {
 			if (*a == toAddr)
 				return false;
 		}
-		_alreadySentTo.push_back(toAddr);
-		sw.send(Packet(_packet,toAddr),true);
+		sendAndLog(sw,toAddr);
 		return true;
 	}
 

+ 3 - 3
node/Peer.hpp

@@ -107,7 +107,7 @@ public:
 	 * This is called by the decode pipe when a packet is proven to be authentic
 	 * and appears to be valid.
 	 * 
-	 * @param _r Runtime environment
+	 * @param RR Runtime environment
 	 * @param fromSock Socket from which packet was received
 	 * @param remoteAddr Internet address of sender
 	 * @param hops ZeroTier (not IP) hops
@@ -134,7 +134,7 @@ public:
 	 * This sends only via direct paths if available and does not handle
 	 * finding of relays. That is done in the send logic in Switch.
 	 *
-	 * @param _r Runtime environment
+	 * @param RR Runtime environment
 	 * @param data Data to send
 	 * @param len Length of packet
 	 * @param now Current time
@@ -148,7 +148,7 @@ public:
 	 * This begins attempting to use TCP paths if no ping response has been
 	 * received from any UDP path in more than ZT_TCP_FALLBACK_AFTER.
 	 * 
-	 * @param _r Runtime environment
+	 * @param RR Runtime environment
 	 * @param now Current time
 	 * @return True if send appears successful for at least one address type
 	 */