Browse Source

Huge convoluted logic de-tangling in multicast propagation, supernodes now do random propagation for more efficient coverage with less bias in sparse graph cases.

Adam Ierymenko 12 years ago
parent
commit
2efc9b31bd
3 changed files with 167 additions and 83 deletions
  1. 62 2
      node/Multicaster.hpp
  2. 104 80
      node/PacketDecoder.cpp
  3. 1 1
      node/Switch.cpp

+ 62 - 2
node/Multicaster.hpp

@@ -206,7 +206,7 @@ public:
 	}
 
 	/**
-	 * Choose peers to send a propagating multicast to
+	 * Choose peers for multicast propagation via random selection
 	 *
 	 * @param prng Random source
 	 * @param topology Topology object or mock thereof
@@ -223,7 +223,67 @@ public:
 	 * @tparam P Type of peers, which is SharedPtr<Peer> in running code or a mock in simulation (mock must behave like a pointer type)
 	 */
 	template<typename T,typename P>
-	inline unsigned int pickNextPropagationPeers(
+	inline unsigned int pickRandomPropagationPeers(
+		CMWC4096 &prng,
+		T &topology,
+		uint64_t nwid,
+		const MulticastGroup &mg,
+		const Address &originalSubmitter,
+		const Address &upstream,
+		MulticastBloomFilter &bf,
+		unsigned int max,
+		P *peers,
+		uint64_t now)
+	{
+		unsigned int chosen = 0;
+		Mutex::Lock _l(_multicastMemberships_m);
+		std::map< MulticastChannel,std::vector<MulticastMembership> >::iterator mm(_multicastMemberships.find(MulticastChannel(nwid,mg)));
+		if ((mm != _multicastMemberships.end())&&(!mm->second.empty())) {
+			for(unsigned int stries=0;((stries<ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE)&&(chosen < max));++stries) {
+				MulticastMembership &m = mm->second[prng.next32() % mm->second.size()];
+				unsigned int sum = m.first.sum();
+				if (
+				     ((now - m.second) < ZT_MULTICAST_LIKE_EXPIRE)&& /* LIKE is not expired */
+				     (!bf.contains(sum))&&                           /* Not in propagation bloom */
+				     (m.first != originalSubmitter)&&                /* Not the original submitter */
+				     (m.first != upstream) ) {                       /* Not where the frame came from */
+					P peer(topology.getPeer(m.first));
+					if (peer) {
+						unsigned int chk = 0;
+						while (chk < chosen) {
+							if (peers[chk++] == peer)
+								break;
+						}
+						if (chk == chosen) { /* not already picked */
+							peers[chosen++] = peer;
+							bf.set(sum);
+						}
+					}
+				}
+			}
+		}
+		return 0;
+	}
+
+	/**
+	 * Choose peers for multicast propagation via implicit social switching
+	 *
+	 * @param prng Random source
+	 * @param topology Topology object or mock thereof
+	 * @param nwid Network ID
+	 * @param mg Multicast group
+	 * @param originalSubmitter Original submitter of multicast message to network
+	 * @param upstream Address from which message originated, or null (0) address if none
+	 * @param bf Bloom filter, updated in place with sums of addresses in chosen peers and/or decay
+	 * @param max Maximum number of peers to pick
+	 * @param peers Array of objects of type P to fill with up to [max] peers
+	 * @param now Current timestamp
+	 * @return Number of peers actually stored in peers array
+	 * @tparam T Type of topology, which is Topology in running code or a mock in simulation
+	 * @tparam P Type of peers, which is SharedPtr<Peer> in running code or a mock in simulation (mock must behave like a pointer type)
+	 */
+	template<typename T,typename P>
+	inline unsigned int pickSocialPropagationPeers(
 		CMWC4096 &prng,
 		T &topology,
 		uint64_t nwid,

+ 104 - 80
node/PacketDecoder.cpp

@@ -496,6 +496,24 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
 		SharedPtr<Network> network(_r->nc->network(at<uint64_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_NETWORK_ID)));
 		if ((network)&&(network->isAllowed(source()))) {
 			Address originalSubmitterAddress(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER_ADDRESS,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
+
+			if (originalSubmitterAddress.isReserved()) {
+				TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): invalid original submitter address",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
+				return true;
+			}
+			if (originalSubmitterAddress == _r->identity.address()) {
+				TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): boomerang!",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
+				return true;
+			}
+
+			SharedPtr<Peer> originalSubmitter(_r->topology->getPeer(originalSubmitterAddress));
+			if (!originalSubmitter) {
+				TRACE("requesting WHOIS on original multicast frame submitter %s",originalSubmitterAddress.toString().c_str());
+				_r->sw->requestWhois(originalSubmitterAddress);
+				_step = DECODE_WAITING_FOR_MULTICAST_FRAME_ORIGINAL_SENDER_LOOKUP;
+				return false; // try again if/when we get OK(WHOIS)
+			}
+
 			MAC fromMac(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SOURCE_MAC,6));
 			MulticastGroup mg(MAC(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_DESTINATION_MAC,6)),at<uint32_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ADI));
 			unsigned int hops = (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT];
@@ -504,94 +522,100 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
 			unsigned int signaturelen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SIGNATURE_LENGTH);
 			unsigned char *dataAndSignature = field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,datalen + signaturelen);
 
+			if (!Multicaster::verifyMulticastPacket(originalSubmitter->identity(),network->id(),fromMac,mg,etherType,dataAndSignature,datalen,dataAndSignature + datalen,signaturelen)) {
+				LOG("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): FAILED SIGNATURE CHECK (spoofed original submitter?)",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
+				return true;
+			}
+
+			if (++hops >= ZT_MULTICAST_PROPAGATION_DEPTH) {
+				TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): max depth reached",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
+				return true;
+			}
+
 			uint64_t mccrc = Multicaster::computeMulticastDedupCrc(network->id(),fromMac,mg,etherType,dataAndSignature,datalen);
 			uint64_t now = Utils::now();
 			bool isDuplicate = _r->multicaster->checkDuplicate(mccrc,now);
 
-			if (originalSubmitterAddress == _r->identity.address()) {
-				// Technically should not happen, since the original submitter is
-				// excluded from consideration as a propagation recipient.
-				TRACE("dropped boomerang MULTICAST_FRAME received from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str());
-			} else if ((!isDuplicate)||(_r->topology->amSupernode())) {
-				//
-				// If I am a supernode, I will repeatedly propagate duplicates. That's
-				// because supernodes are used to bridge sparse multicast groups. Non-
-				// supernodes will ignore duplicates completely.
-				//
-				// TODO: supernodes should keep a local bloom filter too and OR it with
-				// the bloom from the packet in order to pick different recipients each
-				// time a multicast returns to them for repropagation.
-				//
-
-				SharedPtr<Peer> originalSubmitter(_r->topology->getPeer(originalSubmitterAddress));
-				if (!originalSubmitter) {
-					TRACE("requesting WHOIS on original multicast frame submitter %s",originalSubmitterAddress.toString().c_str());
-					_r->sw->requestWhois(originalSubmitterAddress);
-					_step = DECODE_WAITING_FOR_MULTICAST_FRAME_ORIGINAL_SENDER_LOOKUP;
-					return false; // try again if/when we get OK(WHOIS)
-				} else if (Multicaster::verifyMulticastPacket(originalSubmitter->identity(),network->id(),fromMac,mg,etherType,dataAndSignature,datalen,dataAndSignature + datalen,signaturelen)) {
-					// In checking the multicast rate, we don't re-check if this is
-					// a duplicate. That's because if isDuplicate is true it means
-					// we're a supernode and it's a second pass relay.
-					if ((isDuplicate)||(network->multicastRateGate(originalSubmitter->address(),datalen))) {
-						_r->multicaster->addToDedupHistory(mccrc,now);
-
-						// Even if we are a supernode, we still don't repeatedly inject
-						// duplicates into our own tap.
-						if (!isDuplicate)
-							network->tap().put(fromMac,mg.mac(),etherType,dataAndSignature,datalen);
-
-						if (++hops < ZT_MULTICAST_PROPAGATION_DEPTH) {
-							Address upstream(source()); // save this since we mangle it
-
-							Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES));
-							SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH];
-							unsigned int np = _r->multicaster->pickNextPropagationPeers(
-								*(_r->prng),
-								*(_r->topology),
-								network->id(),
-								mg,
-								originalSubmitterAddress,
-								upstream,
-								bloom,
-								ZT_MULTICAST_PROPAGATION_BREADTH,
-								propPeers,
-								now);
-
-							// In a bit of a hack, we re-use this packet to repeat it
-							// to our multicast propagation recipients. Afterwords we
-							// return true just to be sure this is the end of this
-							// packet's life cycle, since it is now mangled.
-
-							setSource(_r->identity.address());
-							(*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT] = hops;
-							memcpy(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES),bloom.data(),ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES);
-							compress();
-
-							for(unsigned int i=0;i<np;++i) {
-								//TRACE("propagating multicast from original node %s: %s -> %s",originalSubmitterAddress.toString().c_str(),upstream.toString().c_str(),propPeers[i]->address().toString().c_str());
-								// Re-use this packet to re-send multicast frame to everyone
-								// downstream from us.
-								newInitializationVector();
-								setDestination(propPeers[i]->address());
-								_r->sw->send(*this,true);
-							}
-
-							// Return here just to be safe, since this packet's state is no
-							// longer valid.
-							return true;
-						} else {
-							//TRACE("terminating MULTICAST_FRAME propagation from %s(%s): max depth reached",source().toString().c_str(),_remoteAddress.toString().c_str());
-						}
-					} else {
-						LOG("dropped MULTICAST_FRAME from original sender %s: rate limit overrun",originalSubmitter->address().toString().c_str());
-					}
+			if (!isDuplicate) {
+				if (network->multicastRateGate(originalSubmitterAddress,datalen)) {
+					network->tap().put(fromMac,mg.mac(),etherType,dataAndSignature,datalen);
 				} else {
-					TRACE("rejected MULTICAST_FRAME forwarded by %s(%s): failed signature check (falsely claims origin %s)",source().toString().c_str(),_remoteAddress.toString().c_str(),originalSubmitterAddress.toString().c_str());
+					TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): sender rate limit exceeded",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
+					return true;
 				}
+
+				/* It's important that we do this *after* rate limit checking,
+				 * otherwise supernodes could be used to execute a flood by
+				 * first bouncing a multicast off a supernode and then flooding
+				 * it with retransmits. */
+				_r->multicaster->addToDedupHistory(mccrc,now);
+			}
+
+			Address upstream(source()); // save this since we might mangle it below
+			Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES));
+			SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH];
+			unsigned int np = 0;
+
+			if (_r->topology->amSupernode()) {
+				/* Supernodes behave differently here from ordinary nodes, as their
+				 * role in the network is to bridge gaps between unconnected islands
+				 * in a multicast propagation graph. Instead of using the ordinary
+				 * multicast peer picker, supernodes propagate to random unvisited
+				 * peers. They will also repeatedly propagate duplicate multicasts to
+				 * new peers, while regular nodes simply discard them. This allows
+				 * such gaps to be bridged more than once by ping-ponging off the
+				 * same supernode -- a simple way to implement this without requiring
+				 * that supernodes maintain a lot of state at the cost of a small
+				 * amount of bandwidth. */
+				np = _r->multicaster->pickRandomPropagationPeers(
+					*(_r->prng),
+					*(_r->topology),
+					network->id(),
+					mg,
+					originalSubmitterAddress,
+					upstream,
+					bloom,
+					ZT_MULTICAST_PROPAGATION_BREADTH,
+					propPeers,
+					now);
+			} else if (isDuplicate) {
+				TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): duplicate",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str());
+				return true;
 			} else {
-				TRACE("dropped duplicate MULTICAST_FRAME from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str());
+				/* Regular peers only propagate non-duplicate packets, and do so
+				 * according to ordinary propagation priority rules. */
+				np = _r->multicaster->pickSocialPropagationPeers(
+					*(_r->prng),
+					*(_r->topology),
+					network->id(),
+					mg,
+					originalSubmitterAddress,
+					upstream,
+					bloom,
+					ZT_MULTICAST_PROPAGATION_BREADTH,
+					propPeers,
+					now);
+			}
+
+			/* Re-use *this* packet to repeat it to our propagation
+			 * recipients, which invalidates its current contents and
+			 * state. */
+
+			if (np) {
+				setSource(_r->identity.address());
+				(*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT] = hops;
+				memcpy(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES),bloom.data(),ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES);
+				compress();
+				for(unsigned int i=0;i<np;++i) {
+					newInitializationVector();
+					setDestination(propPeers[i]->address());
+					_r->sw->send(*this,true);
+				}
 			}
+
+			/* Just to be safe, return true here to terminate processing as we
+			 * have thoroughly destroyed our state by doing the above. */
+			return true;
 		} else {
 			TRACE("dropped MULTICAST_FRAME from %s(%s): network %.16llx unknown or sender not allowed",source().toString().c_str(),_remoteAddress.toString().c_str(),(unsigned long long)network->id());
 		}

+ 1 - 1
node/Switch.cpp

@@ -108,7 +108,7 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
 
 		Multicaster::MulticastBloomFilter bloom;
 		SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH];
-		unsigned int np = _r->multicaster->pickNextPropagationPeers(
+		unsigned int np = _r->multicaster->pickSocialPropagationPeers(
 			*(_r->prng),
 			*(_r->topology),
 			network->id(),