2
0
Эх сурвалжийг харах

Some more multicast algo work...

Adam Ierymenko 11 жил өмнө
parent
commit
431476e2e4

+ 7 - 12
node/MulticastTopology.cpp

@@ -41,19 +41,8 @@ MulticastTopology::~MulticastTopology()
 {
 }
 
-void MulticastTopology::add(const MulticastGroup &mg,const Address &member,const Address &learnedFrom)
+void MulticastTopology::add(const MulticastGroup &mg,const Address &learnedFrom,const Address &member)
 {
-	Mutex::Lock _l(_groups_m);
-	std::vector<MulticastGroupMember> &mv = _groups[mg].members;
-	for(std::vector<MulticastGroupMember>::iterator m(mv.begin());m!=mv.end();++m) {
-		if (m->address == member) {
-			if (m->learnedFrom) // once a member has been seen directly, we keep its status as direct
-				m->learnedFrom = learnedFrom;
-			m->timestamp = Utils::now();
-			return;
-		}
-	}
-	mv.push_back(MulticastGroupMember(member,learnedFrom,Utils::now()));
 }
 
 void MulticastTopology::erase(const MulticastGroup &mg,const Address &member)
@@ -72,6 +61,12 @@ void MulticastTopology::erase(const MulticastGroup &mg,const Address &member)
 	}
 }
 
+void send(uint64_t nwid,uint64_t now,const Address &self,const MulticastGroup &mg,const MAC &from,unsigned int etherType,const void *data,unsigned int len)
+{
+	Mutex::Lock _l(_groups_m);
+	std::map< MulticastGroup,MulticastGroupStatus >::iterator r(_groups.find(mg));
+}
+
 unsigned int MulticastTopology::shouldGather(const MulticastGroup &mg,uint64_t now,unsigned int limit,bool updateLastGatheredTimeOnNonzeroReturn)
 {
 	Mutex::Lock _l(_groups_m);

+ 22 - 2
node/MulticastTopology.hpp

@@ -33,10 +33,14 @@
 
 #include <map>
 #include <vector>
+#include <list>
 
 #include "Constants.hpp"
 #include "Address.hpp"
+#include "MAC.hpp"
 #include "MulticastGroup.hpp"
+#include "OutboundMulticast.hpp"
+#include "Switch.hpp"
 #include "Utils.hpp"
 #include "Mutex.hpp"
 
@@ -70,6 +74,7 @@ private:
 
 		uint64_t lastGatheredMembers; // time we last gathered members
 		std::vector<MulticastGroupMember> members; // members of this group
+		std::list<OutboundMulticast> txQueue; // pending outbound multicasts
 	};
 
 public:
@@ -80,10 +85,10 @@ public:
 	 * Add or update a member in a multicast group
 	 *
 	 * @param mg Multicast group
-	 * @param member Member to add/update
 	 * @param learnedFrom Address from which we learned this member or NULL/0 Address if direct
+	 * @param member New member address
 	 */
-	void add(const MulticastGroup &mg,const Address &member,const Address &learnedFrom);
+	void add(const MulticastGroup &mg,const Address &learnedFrom,const Address &member);
 
 	/**
 	 * Erase a member from a multicast group (if present)
@@ -93,6 +98,21 @@ public:
 	 */
 	void erase(const MulticastGroup &mg,const Address &member);
 
+	/**
+	 * Send a multicast
+	 *
+	 * @param nwid Network ID
+	 * @param now Current time
+	 * @param sw Switch to use for sending packets
+	 * @param self This node's address
+	 * @param mg Multicast group
+	 * @param from Source Ethernet MAC address
+	 * @param etherType Ethernet frame type
+	 * @param data Packet data
+	 * @param len Length of packet data
+	 */
+	void send(uint64_t nwid,uint64_t now,const Switch &sw,const Address &self,const MulticastGroup &mg,const MAC &from,unsigned int etherType,const void *data,unsigned int len);
+
 	/**
 	 * @param mg Multicast group
 	 * @return Tuple of: time we last gathered members (or 0 for never) and number of known members

+ 13 - 6
node/OutboundMulticast.hpp

@@ -50,6 +50,13 @@ namespace ZeroTier {
 class OutboundMulticast
 {
 public:
+	/**
+	 * Create an uninitialized outbound multicast
+	 *
+	 * It must be initialized with init().
+	 */
+	OutboundMulticast() {}
+
 	/**
 	 * Initialize outbound multicast
 	 *
@@ -63,13 +70,13 @@ public:
 	 * @param len Length of data
 	 * @throws std::out_of_range Data too large to fit in a MULTICAST_FRAME
 	 */
-	OutboundMulticast(uint64_t timestamp,const Address &self,uint64_t nwid,const MAC &src,const MulticastGroup &dest,unsigned int etherType,const void *payload,unsigned int len) :
-		_timestamp(timestamp),
-		_nwid(nwid),
-		_source(src),
-		_destination(dest),
-		_etherType(etherType)
+	inline void init(uint64_t timestamp,const Address &self,uint64_t nwid,const MAC &src,const MulticastGroup &dest,unsigned int etherType,const void *payload,unsigned int len)
 	{
+		_timestamp = timestamp;
+		_nwid = nwid;
+		_source = src;
+		_destination = dest;
+		_etherType = etherType;
 		_packet.setSource(self);
 		_packet.setVerb(Packet::VERB_MULTICAST_FRAME);
 		_packet.append((char)0);

+ 1 - 0
node/Packet.hpp

@@ -694,6 +694,7 @@ public:
 		VERB_MULTICAST_GATHER = 13,
 
 		/* Multicast frame:
+		 *   <[8] 64-bit network ID>
 		 *   <[1] flags (currently unused, must be 0)>
 		 *   <[4] 32-bit multicast ADI (note that this is out of order here -- it precedes MAC)>
 		 *   <[6] destination MAC or all zero for destination node>

+ 91 - 92
node/Switch.cpp

@@ -56,8 +56,7 @@ namespace ZeroTier {
 
 Switch::Switch(const RuntimeEnvironment *renv) :
 	_r(renv),
-	_lastBeacon(0),
-	_multicastIdCounter((unsigned int)renv->prng->next32()) // start a random spot to minimize possible collisions on startup
+	_lastBeacon(0)
 {
 }
 
@@ -478,96 +477,6 @@ void Switch::contact(const SharedPtr<Peer> &peer,const InetAddress &atAddr)
 	_r->sm->whack();
 }
 
-unsigned long Switch::doTimerTasks()
-{
-	unsigned long nextDelay = ~((unsigned long)0); // big number, caller will cap return value
-	uint64_t now = Utils::now();
-
-	{
-		Mutex::Lock _l(_contactQueue_m);
-		for(std::list<ContactQueueEntry>::iterator qi(_contactQueue.begin());qi!=_contactQueue.end();) {
-			if (now >= qi->fireAtTime) {
-				if (!qi->peer->hasActiveDirectPath(now)) {
-					TRACE("deploying aggressive NAT-t against %s(%s)",qi->peer->address().toString().c_str(),qi->inaddr.toString().c_str());
-
-					/* Shotgun approach -- literally -- against symmetric NATs. Most of these
-					 * either increment or decrement ports so this gets a good number. Also try
-					 * the original port one more time for good measure, since sometimes it
-					 * fails first time around. */
-					int p = (int)qi->inaddr.port() - 2;
-					for(int k=0;k<5;++k) {
-						if ((p > 0)&&(p <= 0xffff)) {
-							qi->inaddr.setPort((unsigned int)p);
-							sendHELLO(qi->peer,qi->inaddr);
-						}
-						++p;
-					}
-				}
-
-				_contactQueue.erase(qi++);
-			} else {
-				nextDelay = std::min(nextDelay,(unsigned long)(qi->fireAtTime - now));
-				++qi;
-			}
-		}
-	}
-
-	{
-		Mutex::Lock _l(_outstandingWhoisRequests_m);
-		for(std::map< Address,WhoisRequest >::iterator i(_outstandingWhoisRequests.begin());i!=_outstandingWhoisRequests.end();) {
-			unsigned long since = (unsigned long)(now - i->second.lastSent);
-			if (since >= ZT_WHOIS_RETRY_DELAY) {
-				if (i->second.retries >= ZT_MAX_WHOIS_RETRIES) {
-					TRACE("WHOIS %s timed out",i->first.toString().c_str());
-					_outstandingWhoisRequests.erase(i++);
-					continue;
-				} else {
-					i->second.lastSent = now;
-					i->second.peersConsulted[i->second.retries] = _sendWhoisRequest(i->first,i->second.peersConsulted,i->second.retries);
-					++i->second.retries;
-					TRACE("WHOIS %s (retry %u)",i->first.toString().c_str(),i->second.retries);
-					nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY);
-				}
-			} else nextDelay = std::min(nextDelay,ZT_WHOIS_RETRY_DELAY - since);
-			++i;
-		}
-	}
-
-	{
-		Mutex::Lock _l(_txQueue_m);
-		for(std::multimap< Address,TXQueueEntry >::iterator i(_txQueue.begin());i!=_txQueue.end();) {
-			if (_trySend(i->second.packet,i->second.encrypt))
-				_txQueue.erase(i++);
-			else if ((now - i->second.creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) {
-				TRACE("TX %s -> %s timed out",i->second.packet.source().toString().c_str(),i->second.packet.destination().toString().c_str());
-				_txQueue.erase(i++);
-			} else ++i;
-		}
-	}
-
-	{
-		Mutex::Lock _l(_rxQueue_m);
-		for(std::list< SharedPtr<IncomingPacket> >::iterator i(_rxQueue.begin());i!=_rxQueue.end();) {
-			if ((now - (*i)->receiveTime()) > ZT_RECEIVE_QUEUE_TIMEOUT) {
-				TRACE("RX %s -> %s timed out",(*i)->source().toString().c_str(),(*i)->destination().toString().c_str());
-				_rxQueue.erase(i++);
-			} else ++i;
-		}
-	}
-
-	{
-		Mutex::Lock _l(_defragQueue_m);
-		for(std::map< uint64_t,DefragQueueEntry >::iterator i(_defragQueue.begin());i!=_defragQueue.end();) {
-			if ((now - i->second.creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) {
-				TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",i->first);
-				_defragQueue.erase(i++);
-			} else ++i;
-		}
-	}
-
-	return std::max(nextDelay,(unsigned long)10); // minimum delay
-}
-
 void Switch::announceMulticastGroups(const std::map< SharedPtr<Network>,std::set<MulticastGroup> > &allMemberships)
 {
 	std::vector< SharedPtr<Peer> > directPeers;
@@ -682,6 +591,96 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer)
 	}
 }
 
+unsigned long Switch::doTimerTasks()
+{
+	unsigned long nextDelay = ~((unsigned long)0); // big number, caller will cap return value
+	uint64_t now = Utils::now();
+
+	{
+		Mutex::Lock _l(_contactQueue_m);
+		for(std::list<ContactQueueEntry>::iterator qi(_contactQueue.begin());qi!=_contactQueue.end();) {
+			if (now >= qi->fireAtTime) {
+				if (!qi->peer->hasActiveDirectPath(now)) {
+					TRACE("deploying aggressive NAT-t against %s(%s)",qi->peer->address().toString().c_str(),qi->inaddr.toString().c_str());
+
+					/* Shotgun approach -- literally -- against symmetric NATs. Most of these
+					 * either increment or decrement ports so this gets a good number. Also try
+					 * the original port one more time for good measure, since sometimes it
+					 * fails first time around. */
+					int p = (int)qi->inaddr.port() - 2;
+					for(int k=0;k<5;++k) {
+						if ((p > 0)&&(p <= 0xffff)) {
+							qi->inaddr.setPort((unsigned int)p);
+							sendHELLO(qi->peer,qi->inaddr);
+						}
+						++p;
+					}
+				}
+
+				_contactQueue.erase(qi++);
+			} else {
+				nextDelay = std::min(nextDelay,(unsigned long)(qi->fireAtTime - now));
+				++qi;
+			}
+		}
+	}
+
+	{
+		Mutex::Lock _l(_outstandingWhoisRequests_m);
+		for(std::map< Address,WhoisRequest >::iterator i(_outstandingWhoisRequests.begin());i!=_outstandingWhoisRequests.end();) {
+			unsigned long since = (unsigned long)(now - i->second.lastSent);
+			if (since >= ZT_WHOIS_RETRY_DELAY) {
+				if (i->second.retries >= ZT_MAX_WHOIS_RETRIES) {
+					TRACE("WHOIS %s timed out",i->first.toString().c_str());
+					_outstandingWhoisRequests.erase(i++);
+					continue;
+				} else {
+					i->second.lastSent = now;
+					i->second.peersConsulted[i->second.retries] = _sendWhoisRequest(i->first,i->second.peersConsulted,i->second.retries);
+					++i->second.retries;
+					TRACE("WHOIS %s (retry %u)",i->first.toString().c_str(),i->second.retries);
+					nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY);
+				}
+			} else nextDelay = std::min(nextDelay,ZT_WHOIS_RETRY_DELAY - since);
+			++i;
+		}
+	}
+
+	{
+		Mutex::Lock _l(_txQueue_m);
+		for(std::multimap< Address,TXQueueEntry >::iterator i(_txQueue.begin());i!=_txQueue.end();) {
+			if (_trySend(i->second.packet,i->second.encrypt))
+				_txQueue.erase(i++);
+			else if ((now - i->second.creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) {
+				TRACE("TX %s -> %s timed out",i->second.packet.source().toString().c_str(),i->second.packet.destination().toString().c_str());
+				_txQueue.erase(i++);
+			} else ++i;
+		}
+	}
+
+	{
+		Mutex::Lock _l(_rxQueue_m);
+		for(std::list< SharedPtr<IncomingPacket> >::iterator i(_rxQueue.begin());i!=_rxQueue.end();) {
+			if ((now - (*i)->receiveTime()) > ZT_RECEIVE_QUEUE_TIMEOUT) {
+				TRACE("RX %s -> %s timed out",(*i)->source().toString().c_str(),(*i)->destination().toString().c_str());
+				_rxQueue.erase(i++);
+			} else ++i;
+		}
+	}
+
+	{
+		Mutex::Lock _l(_defragQueue_m);
+		for(std::map< uint64_t,DefragQueueEntry >::iterator i(_defragQueue.begin());i!=_defragQueue.end();) {
+			if ((now - i->second.creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) {
+				TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",i->first);
+				_defragQueue.erase(i++);
+			} else ++i;
+		}
+	}
+
+	return std::max(nextDelay,(unsigned long)10); // minimum delay
+}
+
 const char *Switch::etherTypeName(const unsigned int etherType)
 	throw()
 {

+ 31 - 21
node/Switch.hpp

@@ -67,6 +67,11 @@ class Peer;
 
 /**
  * Core of the distributed Ethernet switch and protocol implementation
+ *
+ * This class is perhaps a bit misnamed, but it's basically where everything
+ * meets. Transport-layer ZT packets come in here, as do virtual network
+ * packets from tap devices, and this sends them where they need to go and
+ * wraps/unwraps accordingly. It also handles queues and timeouts and such.
  */
 class Switch : NonCopyable
 {
@@ -160,13 +165,6 @@ public:
 	 */
 	void contact(const SharedPtr<Peer> &peer,const InetAddress &atAddr);
 
-	/**
-	 * Perform retries and other periodic timer tasks
-	 * 
-	 * @return Number of milliseconds until doTimerTasks() should be run again
-	 */
-	unsigned long doTimerTasks();
-
 	/**
 	 * Announce multicast group memberships
 	 *
@@ -204,7 +202,7 @@ public:
 	void cancelWhoisRequest(const Address &addr);
 
 	/**
-	 * Run any processes that are waiting for this peer
+	 * Run any processes that are waiting for this peer's identity
 	 *
 	 * Called when we learn of a peer's identity from HELLO, OK(WHOIS), etc.
 	 *
@@ -212,6 +210,13 @@ public:
 	 */
 	void doAnythingWaitingForPeer(const SharedPtr<Peer> &peer);
 
+	/**
+	 * Perform retries and other periodic timer tasks
+	 * 
+	 * @return Number of milliseconds until doTimerTasks() should be run again
+	 */
+	unsigned long doTimerTasks();
+
 	/**
 	 * @param etherType Ethernet type ID
 	 * @return Human-readable name
@@ -235,8 +240,8 @@ private:
 
 	const RuntimeEnvironment *const _r;
 	volatile uint64_t _lastBeacon;
-	volatile unsigned int _multicastIdCounter;
 
+	// Outsanding WHOIS requests and how many retries they've undergone
 	struct WhoisRequest
 	{
 		uint64_t lastSent;
@@ -246,9 +251,23 @@ private:
 	std::map< Address,WhoisRequest > _outstandingWhoisRequests;
 	Mutex _outstandingWhoisRequests_m;
 
-	std::list< SharedPtr<IncomingPacket> > _rxQueue;
+	// Packet defragmentation queue -- comes before RX queue in path
+	struct DefragQueueEntry
+	{
+		uint64_t creationTime;
+		SharedPtr<IncomingPacket> frag0;
+		Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1];
+		unsigned int totalFragments; // 0 if only frag0 received, waiting for frags
+		uint32_t haveFragments; // bit mask, LSB to MSB
+	};
+	std::map< uint64_t,DefragQueueEntry > _defragQueue;
+	Mutex _defragQueue_m;
+
+	// ZeroTier-layer RX queue of incoming packets in the process of being decoded
+	std::vector< SharedPtr<IncomingPacket> > _rxQueue;
 	Mutex _rxQueue_m;
 
+	// ZeroTier-layer TX queue by destination ZeroTier address
 	struct TXQueueEntry
 	{
 		TXQueueEntry() {}
@@ -264,20 +283,11 @@ private:
 	std::multimap< Address,TXQueueEntry > _txQueue;
 	Mutex _txQueue_m;
 
-	struct DefragQueueEntry
-	{
-		uint64_t creationTime;
-		SharedPtr<IncomingPacket> frag0;
-		Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1];
-		unsigned int totalFragments; // 0 if only frag0 received, waiting for frags
-		uint32_t haveFragments; // bit mask, LSB to MSB
-	};
-	std::map< uint64_t,DefragQueueEntry > _defragQueue;
-	Mutex _defragQueue_m;
-
+	// Tracks sending of VERB_RENDEZVOUS to relaying peers
 	std::map< Array< Address,2 >,uint64_t > _lastUniteAttempt; // key is always sorted in ascending order, for set-like behavior
 	Mutex _lastUniteAttempt_m;
 
+	// Active attempts to contact remote peers, including state of multi-phase NAT traversal
 	struct ContactQueueEntry
 	{
 		ContactQueueEntry() {}