Browse Source

More work in progress in new multicast propagation...

Adam Ierymenko 12 years ago
parent
commit
24bad9f3d1
9 changed files with 228 additions and 164 deletions
  1. 22 0
      node/Address.hpp
  2. 5 3
      node/Multicaster.cpp
  3. 12 8
      node/Multicaster.hpp
  4. 2 0
      node/Network.cpp
  5. 36 0
      node/Network.hpp
  6. 3 2
      node/Node.cpp
  7. 8 8
      node/Packet.hpp
  8. 137 143
      node/PacketDecoder.cpp
  9. 3 0
      node/RuntimeEnvironment.hpp

+ 22 - 0
node/Address.hpp

@@ -194,6 +194,28 @@ public:
 		return m;
 		return m;
 	}
 	}
 
 
+	/**
+	 * @param mac MAC address to check
+	 * @return True if this address would have this MAC
+	 */
+	inline bool wouldHaveMac(const MAC &mac) const
+		throw()
+	{
+		if (mac.data[0] != ZT_MAC_FIRST_OCTET)
+			return false;
+		if (mac.data[1] != (unsigned char)((_a >> 32) & 0xff))
+			return false;
+		if (mac.data[2] != (unsigned char)((_a >> 24) & 0xff))
+			return false;
+		if (mac.data[3] != (unsigned char)((_a >> 16) & 0xff))
+			return false;
+		if (mac.data[4] != (unsigned char)((_a >> 8) & 0xff))
+			return false;
+		if (mac.data[5] != (unsigned char)(_a & 0xff))
+			return false;
+		return true;
+	}
+
 	/**
 	/**
 	 * @return Hexadecimal string
 	 * @return Hexadecimal string
 	 */
 	 */

+ 5 - 3
node/Multicaster.cpp

@@ -76,19 +76,21 @@ void Multicaster::bringCloser(uint64_t nwid,const Address &a)
 	}
 	}
 }
 }
 
 
-void Multicaster::got(uint64_t nwid,const Address &peer,uint64_t mcGuid,uint64_t now)
+void Multicaster::got(uint64_t nwid,const Address &peer,uint64_t mcGuid)
 {
 {
 	Mutex::Lock _l(_lock);
 	Mutex::Lock _l(_lock);
 	_NetInfo &n = _nets[nwid];
 	_NetInfo &n = _nets[nwid];
 	std::pair< uint64_t,std::set<Address> > &g = n.got[mcGuid];
 	std::pair< uint64_t,std::set<Address> > &g = n.got[mcGuid];
-	g.first = now;
+	g.first = Utils::now();
 	g.second.insert(peer);
 	g.second.insert(peer);
 }
 }
 
 
-void Multicaster::clean(uint64_t now)
+void Multicaster::clean()
 {
 {
 	Mutex::Lock _l(_lock);
 	Mutex::Lock _l(_lock);
 
 
+	uint64_t now = Utils::now();
+
 	for(std::map< uint64_t,_NetInfo >::iterator n(_nets.begin());n!=_nets.end();) {
 	for(std::map< uint64_t,_NetInfo >::iterator n(_nets.begin());n!=_nets.end();) {
 		for(std::map< uint64_t,std::pair< uint64_t,std::set<Address> > >::iterator g(n->second.got.begin());g!=n->second.got.end();) {
 		for(std::map< uint64_t,std::pair< uint64_t,std::set<Address> > >::iterator g(n->second.got.begin());g!=n->second.got.end();) {
 			if ((now - g->second.first) > ZT_MULTICAST_MAGNET_STATE_EXPIRE)
 			if ((now - g->second.first) > ZT_MULTICAST_MAGNET_STATE_EXPIRE)

+ 12 - 8
node/Multicaster.hpp

@@ -80,14 +80,13 @@ public:
 	 * @param nwid Network ID
 	 * @param nwid Network ID
 	 * @param mcGuid Multicast GUID
 	 * @param mcGuid Multicast GUID
 	 * @param peer Peer that GOT multicast
 	 * @param peer Peer that GOT multicast
-	 * @param now Current time
 	 */
 	 */
-	void got(uint64_t nwid,const Address &peer,uint64_t mcGuid,uint64_t now);
+	void got(uint64_t nwid,const Address &peer,uint64_t mcGuid);
 
 
 	/**
 	/**
 	 * Erase entries for expired LIKEs and GOT records
 	 * Erase entries for expired LIKEs and GOT records
 	 */
 	 */
-	void clean(uint64_t now);
+	void clean();
 
 
 	/**
 	/**
 	 * Pick next hops for a multicast by proximity
 	 * Pick next hops for a multicast by proximity
@@ -99,26 +98,31 @@ public:
 	 * @param mg Multicast group
 	 * @param mg Multicast group
 	 * @param mcGuid Multicast message GUID (signer and signer unique ID)
 	 * @param mcGuid Multicast message GUID (signer and signer unique ID)
 	 * @param nextHopFunc Function to call for each address, search stops if it returns false
 	 * @param nextHopFunc Function to call for each address, search stops if it returns false
+	 * @return Number of results returned through function
 	 */
 	 */
 	template<typename F>
 	template<typename F>
-	inline void getNextHops(uint64_t nwid,const MulticastGroup &mg,uint64_t mcGuid,F nextHopFunc)
+	inline unsigned int getNextHops(uint64_t nwid,const MulticastGroup &mg,uint64_t mcGuid,F nextHopFunc)
 	{
 	{
 		Mutex::Lock _l(_lock);
 		Mutex::Lock _l(_lock);
 
 
 		std::map< uint64_t,_NetInfo >::iterator n(_nets.find(nwid));
 		std::map< uint64_t,_NetInfo >::iterator n(_nets.find(nwid));
 		if (n == _nets.end())
 		if (n == _nets.end())
-			return;
+			return 0;
 		std::map< MulticastGroup,std::list< Address > >::iterator p(n->second.proximity.find(mg));
 		std::map< MulticastGroup,std::list< Address > >::iterator p(n->second.proximity.find(mg));
 		if (p == n->second.proximity.end())
 		if (p == n->second.proximity.end())
-			return;
-		std::map< uint64_t,std::pair< uint64_t,std::set< Address > > >::iterator g(n->second.got.find(mcGuid));
+			return 0;
+		std::pair< uint64_t,std::set< Address > > &g = n->second.got[mcGuid];
+		g.first = Utils::now();
 
 
+		unsigned int cnt = 0;
 		for(std::list< Address >::iterator a(p->second.begin());a!=p->second.end();++a) {
 		for(std::list< Address >::iterator a(p->second.begin());a!=p->second.end();++a) {
-			if ((g == n->second.got.end())||(!g->second.second.count(*a))) {
+			if (g.second.insert(*a).second) {
+				++cnt;
 				if (!nextHopFunc(*a))
 				if (!nextHopFunc(*a))
 					break;
 					break;
 			}
 			}
 		}
 		}
+		return cnt;
 	}
 	}
 
 
 private:
 private:

+ 2 - 0
node/Network.cpp

@@ -157,6 +157,8 @@ SharedPtr<Network> Network::newInstance(const RuntimeEnvironment *renv,uint64_t
 	// that then causes the Network instance to be deleted before it is finished
 	// that then causes the Network instance to be deleted before it is finished
 	// being constructed. C++ edge cases, how I love thee.
 	// being constructed. C++ edge cases, how I love thee.
 	SharedPtr<Network> nw(new Network());
 	SharedPtr<Network> nw(new Network());
+	memset(nw->_multicastHistory,0,sizeof(nw->_multicastHistory));
+	nw->_multicastHistoryPtr = 0;
 	nw->_ready = false; // disable handling of Ethernet frames during construct
 	nw->_ready = false; // disable handling of Ethernet frames during construct
 	nw->_r = renv;
 	nw->_r = renv;
 	nw->_tap = new EthernetTap(renv,tag,renv->identity.address().toMAC(),ZT_IF_MTU,&_CBhandleTapData,nw.ptr());
 	nw->_tap = new EthernetTap(renv,tag,renv->identity.address().toMAC(),ZT_IF_MTU,&_CBhandleTapData,nw.ptr());

+ 36 - 0
node/Network.hpp

@@ -52,6 +52,8 @@
 #include "InetAddress.hpp"
 #include "InetAddress.hpp"
 #include "BandwidthAccount.hpp"
 #include "BandwidthAccount.hpp"
 
 
+#define ZT_NETWORK_MULTICAST_DEDUP_HISTORY_LENGTH 256
+
 namespace ZeroTier {
 namespace ZeroTier {
 
 
 class RuntimeEnvironment;
 class RuntimeEnvironment;
@@ -583,12 +585,46 @@ public:
 		//return tmp;
 		//return tmp;
 	}
 	}
 
 
+	/**
+	 * Multicast deduplicator
+	 *
+	 * This checks to see if a multicast GUID has been seen before. If not, it
+	 * adds it to the history and returns false.
+	 *
+	 * @param mcGuid Multicast GUID (sender address + sender unique ID)
+	 * @return True if multicast IS a duplicate, false otherwise
+	 */
+	inline bool multicastDeduplicate(uint64_t mcGuid)
+		throw()
+	{
+		Mutex::Lock _l(_lock);
+		for(unsigned int i=0;i<ZT_NETWORK_MULTICAST_DEDUP_HISTORY_LENGTH;++i) {
+			if (_multicastHistory[i] == mcGuid)
+				return true;
+		}
+		_multicastHistory[_multicastHistoryPtr++ % ZT_NETWORK_MULTICAST_DEDUP_HISTORY_LENGTH] = mcGuid;
+		return false;
+	}
+
+	/**
+	 * @return True if this network allows bridging
+	 */
+	inline bool permitsBridging() const
+		throw()
+	{
+		return false; // TODO: bridging not implemented yet
+	}
+
 private:
 private:
 	static void _CBhandleTapData(void *arg,const MAC &from,const MAC &to,unsigned int etherType,const Buffer<4096> &data);
 	static void _CBhandleTapData(void *arg,const MAC &from,const MAC &to,unsigned int etherType,const Buffer<4096> &data);
 	void _restoreState();
 	void _restoreState();
 
 
 	const RuntimeEnvironment *_r;
 	const RuntimeEnvironment *_r;
 
 
+	// Ring buffer of most recently injected multicast packet GUIDs
+	uint64_t _multicastHistory[ZT_NETWORK_MULTICAST_DEDUP_HISTORY_LENGTH];
+	unsigned int _multicastHistoryPtr;
+
 	// Multicast bandwidth accounting for peers on this network
 	// Multicast bandwidth accounting for peers on this network
 	std::map< std::pair<Address,MulticastGroup>,BandwidthAccount > _multicastRateAccounts;
 	std::map< std::pair<Address,MulticastGroup>,BandwidthAccount > _multicastRateAccounts;
 
 

+ 3 - 2
node/Node.cpp

@@ -210,7 +210,7 @@ struct _NodeImpl
 		delete renv.topology;
 		delete renv.topology;
 		delete renv.demarc;
 		delete renv.demarc;
 		delete renv.sw;
 		delete renv.sw;
-		delete renv.multicaster;
+		delete renv.mc;
 		delete renv.prng;
 		delete renv.prng;
 		delete renv.log;
 		delete renv.log;
 
 
@@ -372,7 +372,7 @@ Node::ReasonForTermination Node::run()
 		Utils::lockDownFile(configAuthTokenPath.c_str(),false);
 		Utils::lockDownFile(configAuthTokenPath.c_str(),false);
 
 
 		// Create the objects that make up runtime state.
 		// Create the objects that make up runtime state.
-		_r->multicaster = new Multicaster();
+		_r->mc = new Multicaster();
 		_r->sw = new Switch(_r);
 		_r->sw = new Switch(_r);
 		_r->demarc = new Demarc(_r);
 		_r->demarc = new Demarc(_r);
 		_r->topology = new Topology(_r,(_r->homePath + ZT_PATH_SEPARATOR_S + "peer.db").c_str());
 		_r->topology = new Topology(_r,(_r->homePath + ZT_PATH_SEPARATOR_S + "peer.db").c_str());
@@ -547,6 +547,7 @@ Node::ReasonForTermination Node::run()
 
 
 			if ((now - lastClean) >= ZT_DB_CLEAN_PERIOD) {
 			if ((now - lastClean) >= ZT_DB_CLEAN_PERIOD) {
 				lastClean = now;
 				lastClean = now;
+				_r->mc->clean();
 				_r->topology->clean();
 				_r->topology->clean();
 				_r->nc->clean();
 				_r->nc->clean();
 			}
 			}

+ 8 - 8
node/Packet.hpp

@@ -167,9 +167,10 @@
 #define ZT_PROTO_VERB_MULTICAST_GOT_IDX_NETWORK_ID (ZT_PACKET_IDX_PAYLOAD)
 #define ZT_PROTO_VERB_MULTICAST_GOT_IDX_NETWORK_ID (ZT_PACKET_IDX_PAYLOAD)
 #define ZT_PROTO_VERB_MULTICAST_GOT_IDX_MULTICAST_GUID (ZT_PROTO_VERB_MULTICAST_GOT_IDX_NETWORK_ID + 8)
 #define ZT_PROTO_VERB_MULTICAST_GOT_IDX_MULTICAST_GUID (ZT_PROTO_VERB_MULTICAST_GOT_IDX_NETWORK_ID + 8)
 
 
-#define ZT_PROTO_VERB_MULTICAST_FRAME_IDX_COUNTER (ZT_PACKET_IDX_PAYLOAD)
-#define ZT_PROTO_VERB_MULTICAST_FRAME_IDX_QUEUE (ZT_PROTO_VERB_MULTICAST_FRAME_IDX_COUNTER + 2)
-#define ZT_PROTO_VERB_MULTICAST_FRAME_IDX_MAGNET (ZT_PROTO_VERB_MULTICAST_FRAME_IDX_QUEUE + 320)
+#define ZT_PROTO_VERB_MULTICAST_FRAME_IDX_FORWARD_COUNT (ZT_PACKET_IDX_PAYLOAD)
+#define ZT_PROTO_VERB_MULTICAST_FRAME_IDX_QUEUE (ZT_PROTO_VERB_MULTICAST_FRAME_IDX_FORWARD_COUNT + 4)
+#define ZT_PROTO_VERB_MULTICAST_FRAME_LEN_QUEUE 320
+#define ZT_PROTO_VERB_MULTICAST_FRAME_IDX_MAGNET (ZT_PROTO_VERB_MULTICAST_FRAME_IDX_QUEUE + ZT_PROTO_VERB_MULTICAST_FRAME_LEN_QUEUE)
 #define ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER (ZT_PROTO_VERB_MULTICAST_FRAME_IDX_MAGNET + 5)
 #define ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER (ZT_PROTO_VERB_MULTICAST_FRAME_IDX_MAGNET + 5)
 #define ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER_UNIQUE_ID (ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER + 5)
 #define ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER_UNIQUE_ID (ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER + 5)
 #define ZT_PROTO_VERB_MULTICAST_FRAME_IDX_NETWORK_ID (ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER_UNIQUE_ID + 3)
 #define ZT_PROTO_VERB_MULTICAST_FRAME_IDX_NETWORK_ID (ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER_UNIQUE_ID + 3)
@@ -464,7 +465,7 @@ public:
 		 *   <[4] multicast additional distinguishing information (ADI)>
 		 *   <[4] multicast additional distinguishing information (ADI)>
 		 *   [... additional tuples of network/address/adi ...]
 		 *   [... additional tuples of network/address/adi ...]
 		 *
 		 *
-		 * OK is generated on successful receipt.
+		 * OK/ERROR are not generated.
 		 */
 		 */
 		VERB_MULTICAST_LIKE = 7,
 		VERB_MULTICAST_LIKE = 7,
 
 
@@ -477,7 +478,7 @@ public:
 		VERB_MULTICAST_GOT = 8,
 		VERB_MULTICAST_GOT = 8,
 
 
 		/* A multicast frame:
 		/* A multicast frame:
-		 *   <[2] 16-bit counter -- number of times multicast has been forwarded>
+		 *   <[4] 32-bit forwarding counter>
 		 *   <[320] FIFO queue of up to 64 ZT addresses, zero address terminated>
 		 *   <[320] FIFO queue of up to 64 ZT addresses, zero address terminated>
 		 *   [... start of signed portion, signed by original submitter below ...]
 		 *   [... start of signed portion, signed by original submitter below ...]
 		 *   <[5] ZeroTier address of propagation magnet node>
 		 *   <[5] ZeroTier address of propagation magnet node>
@@ -501,10 +502,9 @@ public:
 		 *   (1) packet is possibly injected into the local TAP
 		 *   (1) packet is possibly injected into the local TAP
 		 *   (2) send a MULTICAST_GOT message to magnet node with 64-bit
 		 *   (2) send a MULTICAST_GOT message to magnet node with 64-bit
 		 *       multicast GUID
 		 *       multicast GUID
-		 *   (3) counter is incremented, STOP if >= network's max multicast
-		 *       recipient count
+		 *   (3) forwarding counter is incremented, STOP of max exceeded
 		 *   (4) topmost value is removed from FIFO and saved (next hop)
 		 *   (4) topmost value is removed from FIFO and saved (next hop)
-		 *   (5) FIFO is deduplicated (prevents amplification floods)
+		 *   (5) deduplicate FIFO (helps prevent floods)
 		 *   (6) FIFO is filled with as many known peers that have LIKED this
 		 *   (6) FIFO is filled with as many known peers that have LIKED this
 		 *       multicast group as possible, excluding peers to whom this
 		 *       multicast group as possible, excluding peers to whom this
 		 *       multicast has already been sent or (if magnet node) have GOT
 		 *       multicast has already been sent or (if magnet node) have GOT

+ 137 - 143
node/PacketDecoder.cpp

@@ -422,29 +422,17 @@ bool PacketDecoder::_doMULTICAST_LIKE(const RuntimeEnvironment *_r,const SharedP
 {
 {
 	try {
 	try {
 		unsigned int ptr = ZT_PACKET_IDX_PAYLOAD;
 		unsigned int ptr = ZT_PACKET_IDX_PAYLOAD;
-		unsigned int numAccepted = 0;
+		if (ptr >= size())
+			return true;
 		uint64_t now = Utils::now();
 		uint64_t now = Utils::now();
+		Address src(source());
 
 
-		// Iterate through 18-byte network,MAC,ADI tuples:
-		while ((ptr + 18) <= size()) {
-			uint64_t nwid = at<uint64_t>(ptr); ptr += 8;
-			SharedPtr<Network> network(_r->nc->network(nwid));
-			if ((network)&&(network->isAllowed(source()))) {
-				MAC mac(field(ptr,6)); ptr += 6;
-				uint32_t adi = at<uint32_t>(ptr); ptr += 4;
-				//TRACE("peer %s likes multicast group %s:%.8lx on network %llu",source().toString().c_str(),mac.toString().c_str(),(unsigned long)adi,nwid);
-				_r->multicaster->likesMulticastGroup(nwid,MulticastGroup(mac,adi),source(),now);
-				++numAccepted;
-			} else ptr += 10;
+		// Iterate through 18-byte network,MAC,ADI tuples
+		for(;;) {
+			_r->mc->likesGroup(at<uint64_t>(ptr),src,MulticastGroup(MAC(field(ptr + 8,6)),at<uint32_t>(ptr + 14)),now);
+			if ((ptr += 18) >= size())
+				break;
 		}
 		}
-
-		Packet outp(source(),_r->identity.address(),Packet::VERB_OK);
-		outp.append((unsigned char)Packet::VERB_MULTICAST_LIKE);
-		outp.append(packetId());
-		outp.append((uint16_t)numAccepted);
-		outp.encrypt(peer->cryptKey());
-		outp.macSet(peer->macKey());
-		_r->demarc->send(_localPort,_remoteAddress,outp.data(),outp.size(),-1);
 	} catch (std::exception &ex) {
 	} catch (std::exception &ex) {
 		TRACE("dropped MULTICAST_LIKE from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),ex.what());
 		TRACE("dropped MULTICAST_LIKE from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),ex.what());
 	} catch ( ... ) {
 	} catch ( ... ) {
@@ -463,6 +451,7 @@ bool PacketDecoder::_doMULTICAST_GOT(const RuntimeEnvironment *_r,const SharedPt
 	}
 	}
 
 
 	try {
 	try {
+		_r->mc->got(at<uint64_t>(ZT_PROTO_VERB_MULTICAST_GOT_IDX_NETWORK_ID),source(),at<uint64_t>(ZT_PROTO_VERB_MULTICAST_GOT_IDX_MULTICAST_GUID));
 	} catch (std::exception &ex) {
 	} catch (std::exception &ex) {
 		TRACE("dropped MULTICAST_GOT from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),ex.what());
 		TRACE("dropped MULTICAST_GOT from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),ex.what());
 	} catch ( ... ) {
 	} catch ( ... ) {
@@ -472,145 +461,150 @@ bool PacketDecoder::_doMULTICAST_GOT(const RuntimeEnvironment *_r,const SharedPt
 	return true;
 	return true;
 }
 }
 
 
-bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const SharedPtr<Peer> &peer)
+// Function object used in _doMULTICAST_FRAME
+struct _doMULTICAST_FRAME_fillQueueWithNextHops
 {
 {
-	try {
-		SharedPtr<Network> network(_r->nc->network(at<uint64_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_NETWORK_ID)));
-		if ((network)&&(network->isAllowed(source()))) {
-			Address originalSubmitterAddress(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER_ADDRESS,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
-
-			if (originalSubmitterAddress.isReserved()) {
-				TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): invalid original submitter address",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
-				return true;
-			}
-			if (originalSubmitterAddress == _r->identity.address()) {
-				TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): boomerang!",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
-				return true;
-			}
-
-			SharedPtr<Peer> originalSubmitter(_r->topology->getPeer(originalSubmitterAddress));
-			if (!originalSubmitter) {
-				TRACE("requesting WHOIS on original multicast frame submitter %s",originalSubmitterAddress.toString().c_str());
-				_r->sw->requestWhois(originalSubmitterAddress);
-				_step = DECODE_WAITING_FOR_MULTICAST_FRAME_ORIGINAL_SENDER_LOOKUP;
-				return false; // try again if/when we get OK(WHOIS)
-			}
-
-			MAC fromMac(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SOURCE_MAC,6));
-			MulticastGroup mg(MAC(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_DESTINATION_MAC,6)),at<uint32_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ADI));
-			unsigned int hops = (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT];
-			unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ETHERTYPE);
-			unsigned int datalen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD_LENGTH);
-			unsigned int signaturelen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SIGNATURE_LENGTH);
-			unsigned char *dataAndSignature = field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,datalen + signaturelen);
-
-			if (!Multicaster::verifyMulticastPacket(originalSubmitter->identity(),network->id(),fromMac,mg,etherType,dataAndSignature,datalen,dataAndSignature + datalen,signaturelen)) {
-				LOG("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): FAILED SIGNATURE CHECK (spoofed original submitter?)",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
-				return true;
-			}
+	_doMULTICAST_FRAME_fillQueueWithNextHops(char *nq,unsigned int want)
+		ptr(nq),
+		need(want) {}
+
+	inline bool operator()(const Address &a) const
+		throw()
+	{
+		a.copyTo(ptr,ZT_ADDRESS_LENGTH);
+		ptr += ZT_ADDRESS_LENGTH;
+		return (--need != 0);
+	}
 
 
-			if (!network->permitsEtherType(etherType)) {
-				LOG("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): ethernet type %s not allowed on network %.16llx",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str(),Filter::etherTypeName(etherType),(unsigned long long)network->id());
-				return true;
-			}
+	char *ptr;
+	unsigned int need;
+};
 
 
-			uint64_t mccrc = Multicaster::computeMulticastDedupCrc(network->id(),fromMac,mg,etherType,dataAndSignature,datalen);
-			uint64_t now = Utils::now();
-			bool isDuplicate = _r->multicaster->checkDuplicate(mccrc,now);
-
-			if (!isDuplicate) {
-				//if (network->multicastRateGate(originalSubmitterAddress,datalen)) {
-					network->tap().put(fromMac,mg.mac(),etherType,dataAndSignature,datalen);
-				//} else {
-				//	TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): sender rate limit exceeded",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
-				//	return true;
-				//}
-
-				/* It's important that we do this *after* rate limit checking,
-				 * otherwise supernodes could be used to execute a flood by
-				 * first bouncing a multicast off a supernode and then flooding
-				 * it with retransmits. */
-				_r->multicaster->addToDedupHistory(mccrc,now);
-			}
+bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const SharedPtr<Peer> &peer)
+{
+	try {
+		unsigned int forwardCount = at<uint32_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_FORWARD_COUNT);
+		char *queue = (char *)field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_QUEUE,ZT_PROTO_VERB_MULTICAST_FRAME_LEN_QUEUE);
+		Address magnet(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_MAGNET,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
+		Address submitterAddr(Address(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH));
+		SharedPtr<Peer> submitter(_r->topology->getPeer(submitterAddr));
+		if (!submitter) {
+			_r->sw->requestWhois(submitterAddr);
+			_step = DECODE_WAITING_FOR_MULTICAST_FRAME_ORIGINAL_SENDER_LOOKUP; // causes processing to come back here
+			return false;
+		}
+		uint64_t guid = at<uint64_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER); // 40-bit sender address + 24-bit sender unique ID
+		uint64_t nwid = at<uint64_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_NETWORK_ID);
+		MAC sourceMac(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SOURCE_MAC,6));
+		MulticastGroup dest(MAC(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_DESTINATION_MAC,6)),at<uint32_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_DESTINATION_ADI));
+		unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ETHERTYPE);
+		unsigned int frameLen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD_LENGTH);
+		unsigned char *frame = field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,frameLen);
+		unsigned int signatureLen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD + frameLen);
+		unsigned char *signature = field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD + frameLen + 2,signatureLen);
+
+		unsigned int signedPartLen = (ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD - ZT_PROTO_VERB_MULTICAST_FRAME_IDX_MAGNET) + frameLen;
+		if (!submitter->identity().verify(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_MAGNET,signedPartLen),signedPartLen,signature,signatureLen)) {
+			TRACE("dropped MULTICAST_FRAME from %s(%s): failed signature verification, claims to be from %s",source().toString().c_str(),_remoteAddress.toString().c_str(),submitterAddr.toString().c_str());
+			return true;
+		}
 
 
-			if (++hops >= network->multicastPropagationDepth()) {
-				TRACE("not propagating MULTICAST_FRAME from original submitter %s, received from %s(%s): max depth reached",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
-				return true;
+		SharedPtr<Network> network(_r->nc->network(nwid));
+		if (network) {
+			if (!network->isAllowed(submitterAddr)) {
+			} else if (!dest.mac().isMulticast()) {
+			} else if ((!network->permitsBridging())&&(!submitterAddr.wouldHaveMac(sourceMac))) {
+			} else if (!network->permitsEtherType(etherType)) {
+			} else if (network->multicastDeduplicate(guid)) {
+			} else if (network->updateAndCheckMulticastBalance(submitterAddr,dest,frameLen)) {
+				network->tap().put(sourceMac,dest.mac(),etherType,frame,frameLen);
 			}
 			}
+		}
 
 
-			Address upstream(source()); // save this since we might mangle it below
-			Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES));
-			SharedPtr<Peer> propPeers[16];
-			unsigned int np = 0;
-
-			if (_r->topology->amSupernode()) {
-				/* Supernodes behave differently here from ordinary nodes, as their
-				 * role in the network is to bridge gaps between unconnected islands
-				 * in a multicast propagation graph. Instead of using the ordinary
-				 * multicast peer picker, supernodes propagate to random unvisited
-				 * peers. They will also repeatedly propagate duplicate multicasts to
-				 * new peers, while regular nodes simply discard them. This allows
-				 * such gaps to be bridged more than once by ping-ponging off the
-				 * same supernode -- a simple way to implement this without requiring
-				 * that supernodes maintain a lot of state at the cost of a small
-				 * amount of bandwidth. */
-				np = _r->multicaster->pickRandomPropagationPeers(
-					*(_r->prng),
-					*(_r->topology),
-					network->id(),
-					mg,
-					originalSubmitterAddress,
-					upstream,
-					bloom,
-					std::min(network->multicastPropagationBreadth(),(unsigned int)16), // 16 is a sanity check
-					propPeers,
-					now);
-			} else if (isDuplicate) {
-				TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): duplicate",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
-				return true;
-			} else {
-				/* Regular peers only propagate non-duplicate packets, and do so
-				 * according to ordinary propagation priority rules. */
-				np = _r->multicaster->pickSocialPropagationPeers(
-					*(_r->prng),
-					*(_r->topology),
-					network->id(),
-					mg,
-					originalSubmitterAddress,
-					upstream,
-					bloom,
-					std::min(network->multicastPropagationBreadth(),(unsigned int)16), // 16 is a sanity check
-					propPeers,
-					now);
-			}
+		if (magnet != _r->identity.address()) {
+			Packet outp(magnet,_r->identity.address(),Packet::VERB_MULTICAST_GOT);
+			outp.append(nwid);
+			outp.append(guid);
+			_r->sw->send(outp,true);
+		}
 
 
-			/* Re-use *this* packet to repeat it to our propagation
-			 * recipients, which invalidates its current contents and
-			 * state. */
-
-			if (np) {
-				setSource(_r->identity.address());
-				(*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT] = hops;
-				memcpy(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES),bloom.data(),ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES);
-				compress();
-				for(unsigned int i=0;i<np;++i) {
-					newInitializationVector();
-					setDestination(propPeers[i]->address());
-					_r->sw->send(*this,true);
+		setAt(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_FORWARD_COUNT,(uint32_t)++forwardCount);
+
+		char newQueue[ZT_PROTO_VERB_MULTICAST_FRAME_LEN_QUEUE + ZT_ADDRESS_LENGTH]; // room for an extra if we need a nextHop
+		unsigned int newQueueLen = 0;
+
+		// Top of FIFO is next hop (if there is one)
+		Address nextHop(queue,ZT_ADDRESS_LENGTH);
+
+		// Deduplicate the rest of the queue[], adding them to newQueue
+		if (nextHop) { // there was a next hop, so there was something there
+			char firstByteSeen[256];
+			for(unsigned int j=0;j<(256 / 8);++j)
+				((uint64_t *)firstByteSeen)[j] = 0;
+			for(unsigned int i=ZT_ADDRESS_LENGTH;i<ZT_PROTO_VERB_MULTICAST_FRAME_LEN_QUEUE;i+=ZT_ADDRESS_LENGTH) {
+				char *qs = queue + i;
+				if (Utils::isZero(qs,ZT_ADDRESS_LENGTH)) // zero terminates queue
+					break;
+				bool isdup = false;
+				if (firstByteSeen[(unsigned int)queue[i]]) {
+					for(unsigned int i2=ZT_ADDRESS_LENGTH;i2<ZT_PROTO_VERB_MULTICAST_FRAME_LEN_QUEUE;i2+=ZT_ADDRESS_LENGTH) {
+						if ((i2 != i)&&(!memcmp(qs,queue + i2,ZT_ADDRESS_LENGTH))) {
+							isdup = true;
+							break;
+						}
+					}
+				} else firstByteSeen[(unsigned int)queue[i]] = 1;
+				if (!isdup) {
+					char *nq = newQueue + (newQueueLen++ * ZT_ADDRESS_LENGTH);
+					for(unsigned int j=0;j<ZT_ADDRESS_LENGTH;++j)
+						nq[j] = qs[j];
 				}
 				}
 			}
 			}
+		}
 
 
-			/* Just to be safe, return true here to terminate processing as we
-			 * have thoroughly destroyed our state by doing the above. */
-			return true;
-		} else {
-			TRACE("dropped MULTICAST_FRAME from %s(%s): network %.16llx unknown or sender not allowed",source().toString().c_str(),_remoteAddress.toString().c_str(),(unsigned long long)network->id());
+		// Get next hops, including an extra if we don't have a next hop yet
+		unsigned int needQueueItems = ((ZT_PROTO_VERB_MULTICAST_FRAME_LEN_QUEUE / ZT_ADDRESS_LENGTH) - newQueueLen);
+		if (!nextHop)
+			++needQueueItems;
+		if (needQueueItems)
+			newQueueLen += _r->mc->getNextHops(nwid,dest,guid,_doMULTICAST_FRAME_fillQueueWithNextHops(newQueue,needQueueItems));
+
+		// Copy new queue over old queue, and pick off next hop if we need one
+		if (newQueueLen) {
+			char *nq = newQueue;
+			if (!nextHop) {
+				nextHop.setTo(nq,ZT_ADDRESS_LENGTH);
+				nq += ZT_ADDRESS_LENGTH;
+				--newQueueLen;
+			}
+			unsigned int i = 0;
+			unsigned int k = ZT_ADDRESS_LENGTH * newQueueLen;
+			while (i < k)
+				nq[i] = newQueue[i];
+			while (i < ZT_PROTO_VERB_MULTICAST_FRAME_LEN_QUEUE)
+				nq[i] = 0;
+		} else memset(queue,0,ZT_PROTO_VERB_MULTICAST_FRAME_LEN_QUEUE);
+
+		// If there's still no next hop, it's the magnet
+		if (!nextHop)
+			nextHop = magnet;
+
+		// Send to next hop, unless it's us of course
+		if (nextHop != _r->identity.address()) {
+			newInitializationVector();
+			setDestination(nextHop);
+			setSource(_r->identity.address());
+			compress();
+			_r->sw->send(*this,true);
 		}
 		}
+
+		return true;
 	} catch (std::exception &ex) {
 	} catch (std::exception &ex) {
 		TRACE("dropped MULTICAST_FRAME from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),ex.what());
 		TRACE("dropped MULTICAST_FRAME from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),ex.what());
 	} catch ( ... ) {
 	} catch ( ... ) {
 		TRACE("dropped MULTICAST_FRAME from %s(%s): unexpected exception: (unknown)",source().toString().c_str(),_remoteAddress.toString().c_str());
 		TRACE("dropped MULTICAST_FRAME from %s(%s): unexpected exception: (unknown)",source().toString().c_str(),_remoteAddress.toString().c_str());
 	}
 	}
+
 	return true;
 	return true;
 }
 }
 
 

+ 3 - 0
node/RuntimeEnvironment.hpp

@@ -45,6 +45,7 @@ class SysEnv;
 class CMWC4096;
 class CMWC4096;
 class Service;
 class Service;
 class Node;
 class Node;
+class Multicaster;
 
 
 /**
 /**
  * Holds global state for an instance of ZeroTier::Node
  * Holds global state for an instance of ZeroTier::Node
@@ -65,6 +66,7 @@ public:
 		shutdownInProgress(false),
 		shutdownInProgress(false),
 		log((Logger *)0),
 		log((Logger *)0),
 		prng((CMWC4096 *)0),
 		prng((CMWC4096 *)0),
+		mc((Multicaster *)0),
 		sw((Switch *)0),
 		sw((Switch *)0),
 		demarc((Demarc *)0),
 		demarc((Demarc *)0),
 		topology((Topology *)0),
 		topology((Topology *)0),
@@ -90,6 +92,7 @@ public:
 
 
 	Logger *log; // may be null
 	Logger *log; // may be null
 	CMWC4096 *prng;
 	CMWC4096 *prng;
+	Multicaster *mc;
 	Switch *sw;
 	Switch *sw;
 	Demarc *demarc;
 	Demarc *demarc;
 	Topology *topology;
 	Topology *topology;