Explorar el Código

Clean up some multicast code.

Adam Ierymenko hace 7 años
padre
commit
f03fd57997
Se han modificado 5 ficheros con 79 adiciones y 44 borrados
  1. 51 31
      node/Multicaster.cpp
  2. 4 8
      node/Multicaster.hpp
  3. 2 1
      node/Network.cpp
  4. 20 0
      node/NetworkConfig.hpp
  5. 2 4
      node/Switch.cpp

+ 51 - 31
node/Multicaster.cpp

@@ -28,7 +28,6 @@
 
 #include "Constants.hpp"
 #include "RuntimeEnvironment.hpp"
-#include "SharedPtr.hpp"
 #include "Multicaster.hpp"
 #include "Topology.hpp"
 #include "Switch.hpp"
@@ -37,6 +36,7 @@
 #include "C25519.hpp"
 #include "CertificateOfMembership.hpp"
 #include "Node.hpp"
+#include "Network.hpp"
 
 namespace ZeroTier {
 
@@ -159,11 +159,8 @@ std::vector<Address> Multicaster::getMembers(uint64_t nwid,const MulticastGroup
 
 void Multicaster::send(
 	void *tPtr,
-	unsigned int limit,
 	int64_t now,
-	uint64_t nwid,
-	bool disableCompression,
-	const std::vector<Address> &alwaysSendTo,
+	const SharedPtr<Network> &network,
 	const MulticastGroup &mg,
 	const MAC &src,
 	unsigned int etherType,
@@ -175,7 +172,7 @@ void Multicaster::send(
 
 	try {
 		Mutex::Lock _l(_groups_m);
-		MulticastGroupStatus &gs = _groups[Multicaster::Key(nwid,mg)];
+		MulticastGroupStatus &gs = _groups[Multicaster::Key(network->id(),mg)];
 
 		if (!gs.members.empty()) {
 			// Allocate a memory buffer if group is monstrous
@@ -193,6 +190,10 @@ void Multicaster::send(
 			}
 		}
 
+		Address activeBridges[ZT_MAX_NETWORK_SPECIALISTS];
+		const unsigned int activeBridgeCount = network->config().activeBridges(activeBridges);
+		const unsigned int limit = network->config().multicastLimit;
+
 		if (gs.members.size() >= limit) {
 			// Skip queue if we already have enough members to complete the send operation
 			OutboundMulticast out;
@@ -200,8 +201,8 @@ void Multicaster::send(
 			out.init(
 				RR,
 				now,
-				nwid,
-				disableCompression,
+				network->id(),
+				network->config().disableCompression(),
 				limit,
 				1, // we'll still gather a little from peers to keep multicast list fresh
 				src,
@@ -212,9 +213,9 @@ void Multicaster::send(
 
 			unsigned int count = 0;
 
-			for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
-				if (*ast != RR->identity.address()) {
-					out.sendOnly(RR,tPtr,*ast); // optimization: don't use dedup log if it's a one-pass send
+			for(unsigned int i=0;i<activeBridgeCount;++i) {
+				if (activeBridges[i] != RR->identity.address()) {
+					out.sendOnly(RR,tPtr,activeBridges[i]); // optimization: don't use dedup log if it's a one-pass send
 					if (++count >= limit)
 						break;
 				}
@@ -222,40 +223,59 @@ void Multicaster::send(
 
 			unsigned long idx = 0;
 			while ((count < limit)&&(idx < gs.members.size())) {
-				Address ma(gs.members[indexes[idx++]].address);
-				if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) {
+				const Address ma(gs.members[indexes[idx++]].address);
+				if (std::find(activeBridges,activeBridges + activeBridgeCount,ma) == (activeBridges + activeBridgeCount)) {
 					out.sendOnly(RR,tPtr,ma); // optimization: don't use dedup log if it's a one-pass send
 					++count;
 				}
 			}
 		} else {
-			unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1;
+			const unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1;
 
 			if ((gs.members.empty())||((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY)) {
 				gs.lastExplicitGather = now;
 
 				Address explicitGatherPeers[16];
 				unsigned int numExplicitGatherPeers = 0;
+
 				SharedPtr<Peer> bestRoot(RR->topology->getUpstreamPeer());
 				if (bestRoot)
 					explicitGatherPeers[numExplicitGatherPeers++] = bestRoot->address();
-				explicitGatherPeers[numExplicitGatherPeers++] = Network::controllerFor(nwid);
-				SharedPtr<Network> network(RR->node->network(nwid));
-				if (network) {
-					std::vector<Address> anchors(network->config().anchors());
-					for(std::vector<Address>::const_iterator a(anchors.begin());a!=anchors.end();++a) {
-						if (*a != RR->identity.address()) {
-							explicitGatherPeers[numExplicitGatherPeers++] = *a;
-							if (numExplicitGatherPeers == 16)
-								break;
-						}
+
+				explicitGatherPeers[numExplicitGatherPeers++] = network->controller();
+
+				Address ac[ZT_MAX_NETWORK_SPECIALISTS];
+				const unsigned int accnt = network->config().alwaysContactAddresses(ac);
+				unsigned int shuffled[ZT_MAX_NETWORK_SPECIALISTS];
+				for(unsigned int i=0;i<accnt;++i)
+					shuffled[i] = i;
+				for(unsigned int i=0,k=accnt>>1;i<k;++i) {
+					const uint64_t x = RR->node->prng();
+					const unsigned int x1 = shuffled[(unsigned int)x % accnt];
+					const unsigned int x2 = shuffled[(unsigned int)(x >> 32) % accnt];
+					const unsigned int tmp = shuffled[x1];
+					shuffled[x1] = shuffled[x2];
+					shuffled[x2] = tmp;
+				}
+				for(unsigned int i=0;i<accnt;++i) {
+					explicitGatherPeers[numExplicitGatherPeers++] = ac[shuffled[i]];
+					if (numExplicitGatherPeers == 16)
+						break;
+				}
+
+				std::vector<Address> anchors(network->config().anchors());
+				for(std::vector<Address>::const_iterator a(anchors.begin());a!=anchors.end();++a) {
+					if (*a != RR->identity.address()) {
+						explicitGatherPeers[numExplicitGatherPeers++] = *a;
+						if (numExplicitGatherPeers == 16)
+							break;
 					}
 				}
 
 				for(unsigned int k=0;k<numExplicitGatherPeers;++k) {
 					const CertificateOfMembership *com = (network) ? ((network->config().com) ? &(network->config().com) : (const CertificateOfMembership *)0) : (const CertificateOfMembership *)0;
 					Packet outp(explicitGatherPeers[k],RR->identity.address(),Packet::VERB_MULTICAST_GATHER);
-					outp.append(nwid);
+					outp.append(network->id());
 					outp.append((uint8_t)((com) ? 0x01 : 0x00));
 					mg.mac().appendTo(outp);
 					outp.append((uint32_t)mg.adi());
@@ -273,8 +293,8 @@ void Multicaster::send(
 			out.init(
 				RR,
 				now,
-				nwid,
-				disableCompression,
+				network->id(),
+				network->config().disableCompression(),
 				limit,
 				gatherLimit,
 				src,
@@ -285,9 +305,9 @@ void Multicaster::send(
 
 			unsigned int count = 0;
 
-			for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
-				if (*ast != RR->identity.address()) {
-					out.sendAndLog(RR,tPtr,*ast);
+			for(unsigned int i=0;i<activeBridgeCount;++i) {
+				if (activeBridges[i] != RR->identity.address()) {
+					out.sendAndLog(RR,tPtr,activeBridges[i]);
 					if (++count >= limit)
 						break;
 				}
@@ -296,7 +316,7 @@ void Multicaster::send(
 			unsigned long idx = 0;
 			while ((count < limit)&&(idx < gs.members.size())) {
 				Address ma(gs.members[indexes[idx++]].address);
-				if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) {
+				if (std::find(activeBridges,activeBridges + activeBridgeCount,ma) == (activeBridges + activeBridgeCount)) {
 					out.sendAndLog(RR,tPtr,ma);
 					++count;
 				}

+ 4 - 8
node/Multicaster.hpp

@@ -42,12 +42,14 @@
 #include "OutboundMulticast.hpp"
 #include "Utils.hpp"
 #include "Mutex.hpp"
+#include "SharedPtr.hpp"
 
 namespace ZeroTier {
 
 class RuntimeEnvironment;
 class CertificateOfMembership;
 class Packet;
+class Network;
 
 /**
  * Database of known multicast peers within a network
@@ -128,11 +130,8 @@ public:
 	 * Send a multicast
 	 *
 	 * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
-	 * @param limit Multicast limit
 	 * @param now Current time
-	 * @param nwid Network ID
-	 * @param disableCompression Disable packet payload compression?
-	 * @param alwaysSendTo Send to these peers first and even if not included in subscriber list
+	 * @param network Network
 	 * @param mg Multicast group
 	 * @param src Source Ethernet MAC address or NULL to skip in packet and compute from ZT address (non-bridged mode)
 	 * @param etherType Ethernet frame type
@@ -141,11 +140,8 @@ public:
 	 */
 	void send(
 		void *tPtr,
-		unsigned int limit,
 		int64_t now,
-		uint64_t nwid,
-		bool disableCompression,
-		const std::vector<Address> &alwaysSendTo,
+		const SharedPtr<Network> &network,
 		const MulticastGroup &mg,
 		const MAC &src,
 		unsigned int etherType,

+ 2 - 1
node/Network.cpp

@@ -1432,7 +1432,8 @@ void Network::_sendUpdatesToMembers(void *tPtr,const MulticastGroup *const newMu
 		std::sort(alwaysAnnounceTo.begin(),alwaysAnnounceTo.end());
 
 		for(std::vector<Address>::const_iterator a(alwaysAnnounceTo.begin());a!=alwaysAnnounceTo.end();++a) {
-			if ( (_config.com) && (!_memberships.contains(*a)) ) { // push COM to non-members so they can do multicast request auth
+		 // push COM to non-members so they can do multicast request auth
+			if ( (_config.com) && (!_memberships.contains(*a)) && (*a != RR->identity.address()) ) {
 				Packet outp(*a,RR->identity.address(),Packet::VERB_NETWORK_CREDENTIALS);
 				_config.com.serialize(outp);
 				outp.append((uint8_t)0x00);

+ 20 - 0
node/NetworkConfig.hpp

@@ -292,6 +292,16 @@ public:
 		return r;
 	}
 
+	inline unsigned int activeBridges(Address ab[ZT_MAX_NETWORK_SPECIALISTS]) const
+	{
+		unsigned int c = 0;
+		for(unsigned int i=0;i<specialistCount;++i) {
+			if ((specialists[i] & ZT_NETWORKCONFIG_SPECIALIST_TYPE_ACTIVE_BRIDGE) != 0)
+				ab[c++] = specialists[i];
+		}
+		return c;
+	}
+
 	inline std::vector<Address> anchors() const
 	{
 		std::vector<Address> r;
@@ -322,6 +332,16 @@ public:
 		return r;
 	}
 
+	inline unsigned int alwaysContactAddresses(Address ac[ZT_MAX_NETWORK_SPECIALISTS]) const
+	{
+		unsigned int c = 0;
+		for(unsigned int i=0;i<specialistCount;++i) {
+			if ((specialists[i] & (ZT_NETWORKCONFIG_SPECIALIST_TYPE_ANCHOR | ZT_NETWORKCONFIG_SPECIALIST_TYPE_MULTICAST_REPLICATOR)) != 0)
+				ac[c++] = specialists[i];
+		}
+		return c;
+	}
+
 	inline void alwaysContactAddresses(Hashtable< Address,std::vector<InetAddress> > &a) const
 	{
 		for(unsigned int i=0;i<specialistCount;++i) {

+ 2 - 4
node/Switch.cpp

@@ -389,11 +389,9 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 
 		RR->mc->send(
 			tPtr,
-			network->config().multicastLimit,
 			RR->node->now(),
-			network->id(),
-			network->config().disableCompression(),
-			network->config().activeBridges(),
+			network,
+//			network->config().activeBridges(),
 			multicastGroup,
 			(fromBridged) ? from : MAC(),
 			etherType,