Browse Source

Some cleanup, Multicaster now sends multicasts as it gets additional members.

Adam Ierymenko 10 years ago
parent
commit
e1882b614b
8 changed files with 76 additions and 27 deletions
  1. 40 7
      node/Multicaster.cpp
  2. 9 7
      node/Multicaster.hpp
  3. 3 3
      node/Node.cpp
  4. 14 2
      node/OutboundMulticast.cpp
  5. 5 2
      node/OutboundMulticast.hpp
  6. 2 2
      node/Switch.cpp
  7. 2 3
      node/Topology.cpp
  8. 1 1
      node/Topology.hpp

+ 40 - 7
node/Multicaster.cpp

@@ -40,7 +40,8 @@
 
 namespace ZeroTier {
 
-Multicaster::Multicaster()
+Multicaster::Multicaster(const RuntimeEnvironment *renv) :
+	RR(renv)
 {
 }
 
@@ -104,9 +105,9 @@ restart_member_scan:
 }
 
 void Multicaster::send(
-	const RuntimeEnvironment *RR,
 	const CertificateOfMembership *com,
 	unsigned int limit,
+	unsigned int gatherLimit,
 	uint64_t now,
 	uint64_t nwid,
 	const MulticastGroup &mg,
@@ -122,7 +123,19 @@ void Multicaster::send(
 		// If we already have enough members, just send and we're done -- no need for TX queue
 		OutboundMulticast out;
 
-		out.init(now,RR->identity.address(),nwid,com,ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER,src,mg,etherType,data,len);
+		out.init(
+			now,
+			RR->identity.address(),
+			nwid,
+			com,
+			limit,
+			gatherLimit,
+			src,
+			mg,
+			etherType,
+			data,
+			len);
+
 		unsigned int count = 0;
 		for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m) {
 			out.sendOnly(*(RR->sw),m->address); // sendOnly() avoids overhead of creating sent log since we're going to discard this immediately
@@ -134,7 +147,19 @@ void Multicaster::send(
 		gs.txQueue.push_back(OutboundMulticast());
 		OutboundMulticast &out = gs.txQueue.back();
 
-		out.init(now,RR->identity.address(),nwid,com,ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER,src,mg,etherType,data,len);
+		out.init(
+			now,
+			RR->identity.address(),
+			nwid,
+			com,
+			limit,
+			gatherLimit,
+			src,
+			mg,
+			etherType,
+			data,
+			len);
+
 		for(std::vector<MulticastGroupMember>::const_reverse_iterator m(gs.members.rbegin());m!=gs.members.rend();++m)
 			out.sendAndLog(*(RR->sw),m->address);
 
@@ -161,13 +186,13 @@ void Multicaster::send(
 	}
 }
 
-void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now)
+void Multicaster::clean(uint64_t now)
 {
 	Mutex::Lock _l(_groups_m);
 	for(std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus >::iterator mm(_groups.begin());mm!=_groups.end();) {
 		// Remove expired outgoing multicasts from multicast TX queue
 		for(std::list<OutboundMulticast>::iterator tx(mm->second.txQueue.begin());tx!=mm->second.txQueue.end();) {
-			if (tx->expired(now))
+			if ((tx->expired(now))||(tx->atLimit()))
 				mm->second.txQueue.erase(tx++);
 			else ++tx;
 		}
@@ -218,7 +243,7 @@ void Multicaster::clean(const RuntimeEnvironment *RR,uint64_t now)
 	}
 }
 
-void Multicaster::_add(uint64_t now,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member)
+void Multicaster::_add(uint64_t now,uint64_t nwid,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member)
 {
 	// assumes _groups_m is locked
 
@@ -236,6 +261,14 @@ void Multicaster::_add(uint64_t now,MulticastGroupStatus &gs,const Address &lear
 	// be resorted on next clean(). In the future we might want to insert
 	// this somewhere else but we'll try this for now.
 	gs.members.push_back(MulticastGroupMember(member,learnedFrom,now));
+
+	// Try to send to any outgoing multicasts that are waiting for more recipients
+	for(std::list<OutboundMulticast>::iterator tx(gs.txQueue.begin());tx!=gs.txQueue.end();) {
+		tx->sendIfNew(*(RR->sw),member);
+		if (tx->atLimit())
+			gs.txQueue.erase(tx++);
+		else ++tx;
+	}
 }
 
 } // namespace ZeroTier

+ 9 - 7
node/Multicaster.hpp

@@ -42,6 +42,7 @@
 #include "OutboundMulticast.hpp"
 #include "Utils.hpp"
 #include "Mutex.hpp"
+#include "NonCopyable.hpp"
 
 namespace ZeroTier {
 
@@ -52,7 +53,7 @@ class Packet;
 /**
  * Database of known multicast peers within a network
  */
-class Multicaster
+class Multicaster : NonCopyable
 {
 private:
 	struct MulticastGroupMember
@@ -79,7 +80,7 @@ private:
 	};
 
 public:
-	Multicaster();
+	Multicaster(const RuntimeEnvironment *renv);
 	~Multicaster();
 
 	/**
@@ -94,7 +95,7 @@ public:
 	inline void subscribe(uint64_t now,uint64_t nwid,const MulticastGroup &mg,const Address &learnedFrom,const Address &member)
 	{
 		Mutex::Lock _l(_groups_m);
-		_add(now,_groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)],learnedFrom,member);
+		_add(now,nwid,_groups[std::pair<uint64_t,MulticastGroup>(nwid,mg)],learnedFrom,member);
 	}
 
 	/**
@@ -120,10 +121,10 @@ public:
 	/**
 	 * Send a multicast
 	 *
-	 * @param RR Runtime environment
 	 * @param nwid Network ID
 	 * @param com Certificate of membership to include or NULL for none
 	 * @param limit Multicast limit
+	 * @param gatherLimit Limit to pass for implicit gather with MULTICAST_FRAME
 	 * @param now Current time
 	 * @param mg Multicast group
 	 * @param from Source Ethernet MAC address
@@ -132,9 +133,9 @@ public:
 	 * @param len Length of packet data
 	 */
 	void send(
-		const RuntimeEnvironment *RR,
 		const CertificateOfMembership *com,
 		unsigned int limit,
+		unsigned int gatherLimit,
 		uint64_t now,
 		uint64_t nwid,
 		const MulticastGroup &mg,
@@ -149,11 +150,12 @@ public:
 	 * @param RR Runtime environment
 	 * @param now Current time
 	 */
-	void clean(const RuntimeEnvironment *RR,uint64_t now);
+	void clean(uint64_t now);
 
 private:
-	void _add(uint64_t now,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member);
+	void _add(uint64_t now,uint64_t nwid,MulticastGroupStatus &gs,const Address &learnedFrom,const Address &member);
 
+	const RuntimeEnvironment *RR;
 	std::map< std::pair<uint64_t,MulticastGroup>,MulticastGroupStatus > _groups;
 	Mutex _groups_m;
 };

+ 3 - 3
node/Node.cpp

@@ -382,7 +382,7 @@ Node::ReasonForTermination Node::run()
 
 		RR->http = new HttpClient();
 		RR->antiRec = new AntiRecursion();
-		RR->mc = new Multicaster();
+		RR->mc = new Multicaster(RR);
 		RR->sw = new Switch(RR);
 		RR->sm = new SocketManager(impl->udpPort,impl->tcpPort,&_CBztTraffic,RR);
 		RR->topology = new Topology(RR,Utils::fileExists((RR->homePath + ZT_PATH_SEPARATOR_S + "iddb.d").c_str()));
@@ -605,8 +605,8 @@ Node::ReasonForTermination Node::run()
 			// Do periodic tasks in submodules.
 			if ((now - lastClean) >= ZT_DB_CLEAN_PERIOD) {
 				lastClean = now;
-				RR->topology->clean();
-				RR->mc->clean(RR,now);
+				RR->topology->clean(now);
+				RR->mc->clean(now);
 				RR->nc->clean();
 				if (RR->updater)
 					RR->updater->checkIfMaxIntervalExceeded(now);

+ 14 - 2
node/OutboundMulticast.cpp

@@ -32,12 +32,24 @@
 
 namespace ZeroTier {
 
-void OutboundMulticast::init(uint64_t timestamp,const Address &self,uint64_t nwid,const CertificateOfMembership *com,unsigned int gatherLimit,const MAC &src,const MulticastGroup &dest,unsigned int etherType,const void *payload,unsigned int len)
+void OutboundMulticast::init(
+	uint64_t timestamp,
+	const Address &self,
+	uint64_t nwid,
+	const CertificateOfMembership *com,
+	unsigned int limit,
+	unsigned int gatherLimit,
+	const MAC &src,
+	const MulticastGroup &dest,
+	unsigned int etherType,
+	const void *payload,
+	unsigned int len)
 {
 	_timestamp = timestamp;
 	_nwid = nwid;
 	_source = src;
 	_destination = dest;
+	_limit = limit;
 	_etherType = etherType;
 
 	_packet.setSource(self);
@@ -46,7 +58,7 @@ void OutboundMulticast::init(uint64_t timestamp,const Address &self,uint64_t nwi
 	self.appendTo(_packet);
 	_packet.append((uint64_t)nwid);
 	_packet.append((uint8_t)((com) ? 0x01 : 0x00));
-	_packet.append((uint32_t)gatherLimit); // gather limit -- set before send, start with 0
+	_packet.append((uint32_t)gatherLimit);
 	if (com) com->serialize(_packet);
 	_packet.append((uint32_t)dest.adi());
 	dest.mac().appendTo(_packet);

+ 5 - 2
node/OutboundMulticast.hpp

@@ -66,6 +66,7 @@ public:
 	 * @param self My ZeroTier address
 	 * @param nwid Network ID
 	 * @param com Certificate of membership to attach or NULL to omit
+	 * @param limit Multicast limit for desired number of packets to send
 	 * @param gatherLimit Number to lazily/implicitly gather with this frame or 0 for none
 	 * @param src Source MAC address of frame
 	 * @param dest Destination multicast group (MAC + ADI)
@@ -79,6 +80,7 @@ public:
 		const Address &self,
 		uint64_t nwid,
 		const CertificateOfMembership *com,
+		unsigned int limit,
 		unsigned int gatherLimit,
 		const MAC &src,
 		const MulticastGroup &dest,
@@ -98,9 +100,9 @@ public:
 	inline bool expired(uint64_t now) const throw() { return ((now - _timestamp) >= ZT_MULTICAST_TRANSMIT_TIMEOUT); }
 
 	/**
-	 * @return Number of unique recipients to which this packet has already been sent
+	 * @return True if this outbound multicast has been sent to enough peers
 	 */
-	inline unsigned int sentToCount() const throw() { return (unsigned int)_alreadySentTo.size(); }
+	inline bool atLimit() const throw() { return (_alreadySentTo.size() > _limit); }
 
 	/**
 	 * Just send without checking log
@@ -144,6 +146,7 @@ private:
 	uint64_t _nwid;
 	MAC _source;
 	MulticastGroup _destination;
+	unsigned int _limit;
 	unsigned int _etherType;
 	Packet _packet; // packet contains basic structure of MULTICAST_FRAME and payload, is re-used with new IV and addressing each time
 	std::vector<Address> _alreadySentTo;

+ 2 - 2
node/Switch.cpp

@@ -151,9 +151,9 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
 		TRACE("%s: MULTICAST %s -> %s %s %d",network->tapDeviceName().c_str(),from.toString().c_str(),mg.toString().c_str(),etherTypeName(etherType),(int)data.size());
 
 		RR->mc->send(
-			RR,
 			((!nconf->isPublic())&&(nconf->com())) ? &(nconf->com()) : (const CertificateOfMembership *)0,
-			network->wantMulticastGroup(mg) ? nconf->multicastLimit() : 0,
+			nconf->multicastLimit(),
+			network->wantMulticastGroup(mg) ? ZT_MULTICAST_DEFAULT_IMPLICIT_GATHER : 0,
 			now,
 			network->id(),
 			mg,

+ 2 - 3
node/Topology.cpp

@@ -49,7 +49,7 @@ Topology::Topology(const RuntimeEnvironment *renv,bool enablePermanentIdCaching)
 
 Topology::~Topology()
 {
-	clean();
+	clean(Utils::now());
 	_dumpPeers();
 }
 
@@ -256,9 +256,8 @@ keep_searching_for_supernodes:
 	return bestSupernode;
 }
 
-void Topology::clean()
+void Topology::clean(uint64_t now)
 {
-	uint64_t now = Utils::now();
 	Mutex::Lock _l(_activePeers_m);
 	Mutex::Lock _l2(_supernodes_m);
 	for(std::map< Address,SharedPtr<Peer> >::iterator p(_activePeers.begin());p!=_activePeers.end();) {

+ 1 - 1
node/Topology.hpp

@@ -186,7 +186,7 @@ public:
 	/**
 	 * Clean and flush database
 	 */
-	void clean();
+	void clean(uint64_t now);
 
 	/**
 	 * Apply a function or function object to all peers