Browse Source

Add RateLimiter for rate limiting multicast, not tested yet.

Adam Ierymenko 12 years ago
parent
commit
3af55f4423
6 changed files with 268 additions and 90 deletions
  1. 15 0
      node/Constants.hpp
  2. 4 0
      node/Multicaster.hpp
  3. 23 0
      node/Network.hpp
  4. 90 90
      node/PacketDecoder.cpp
  5. 125 0
      node/RateLimiter.hpp
  6. 11 0
      node/Utils.hpp

+ 15 - 0
node/Constants.hpp

@@ -266,6 +266,21 @@ error_no_ZT_ARCH_defined;
  */
 #define ZT_MULTICAST_LOCAL_POLL_PERIOD 10000
 
+/**
+ * Default bytes per second limit for multicasts per peer on a network
+ */
+#define ZT_MULTICAST_DEFAULT_BYTES_PER_SECOND 50.0
+
+/**
+ * Default balance preload for multicast rate limiters on a network
+ */
+#define ZT_MULTICAST_DEFAULT_RATE_PRELOAD 25.0
+
+/**
+ * Absolute maximum balance for multicast rate limiters
+ */
+#define ZT_MULTICAST_DEFAULT_RATE_MAX 75.0
+
 /**
  * Delay between scans of the topology active peer DB for peers that need ping
  */

+ 4 - 0
node/Multicaster.hpp

@@ -63,6 +63,9 @@ namespace ZeroTier {
  * This is written as a generic class so that it can be mocked and tested
  * in simulation. It also always takes 'now' as an argument, permitting
  * running in simulated time.
+ *
+ * This does not handle network permission or rate limiting, only the
+ * propagation algorithm.
  */
 class Multicaster
 {
@@ -328,6 +331,7 @@ private:
 	// Address and time of last LIKE
 	typedef std::pair<Address,uint64_t> MulticastMembership;
 
+	// Network : MulticastGroup -> vector<Address : time of last LIKE>
 	std::map< MulticastChannel,std::vector<MulticastMembership> > _multicastMemberships;
 	Mutex _multicastMemberships_m;
 };

+ 23 - 0
node/Network.hpp

@@ -47,6 +47,7 @@
 #include "Dictionary.hpp"
 #include "Identity.hpp"
 #include "InetAddress.hpp"
+#include "RateLimiter.hpp"
 
 namespace ZeroTier {
 
@@ -426,6 +427,25 @@ public:
 	 */
 	Status status() const;
 
+	/**
+	 * Invoke multicast rate limiter gate for a given address
+	 *
+	 * @param addr Address to check
+	 * @param bytes Bytes address wishes to send us / propagate
+	 * @return True if allowed, false if overshot rate limit
+	 */
+	inline bool multicastRateGate(const Address &addr,unsigned int bytes)
+	{
+		Mutex::Lock _l(_lock);
+		std::map<Address,RateLimiter>::iterator rl(_multicastRateLimiters.find(addr));
+		if (rl == _multicastRateLimiters.end()) {
+			RateLimiter &newrl = _multicastRateLimiters[addr];
+			newrl.init(ZT_MULTICAST_DEFAULT_BYTES_PER_SECOND,ZT_MULTICAST_DEFAULT_RATE_PRELOAD,ZT_MULTICAST_DEFAULT_RATE_MAX);
+			return newrl.gate((double)bytes);
+		}
+		return rl->second.gate((double)bytes);
+	}
+
 private:
 	static void _CBhandleTapData(void *arg,const MAC &from,const MAC &to,unsigned int etherType,const Buffer<4096> &data);
 	void _restoreState();
@@ -439,6 +459,9 @@ private:
 	// Membership certificates supplied by peers
 	std::map<Address,Certificate> _membershipCertificates;
 
+	// Rate limiters for each multicasting peer
+	std::map<Address,RateLimiter> _multicastRateLimiters;
+
 	// Configuration from network master node
 	Config _configuration;
 	Certificate _myCertificate;

+ 90 - 90
node/PacketDecoder.cpp

@@ -494,106 +494,106 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
 {
 	try {
 		SharedPtr<Network> network(_r->nc->network(at<uint64_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_NETWORK_ID)));
-		if (network) {
-			if (network->isAllowed(source())) {
-				if (size() > ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD) {
-
-					Address originalSubmitterAddress(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER_ADDRESS,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
-					MAC fromMac(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SOURCE_MAC,6));
-					MulticastGroup mg(MAC(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_DESTINATION_MAC,6)),at<uint32_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ADI));
-					unsigned int hops = (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT];
-					unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ETHERTYPE);
-					unsigned int datalen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD_LENGTH);
-					unsigned int signaturelen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SIGNATURE_LENGTH);
-					unsigned char *dataAndSignature = field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,datalen + signaturelen);
-
-					uint64_t mccrc = Multicaster::computeMulticastDedupCrc(network->id(),fromMac,mg,etherType,dataAndSignature,datalen);
-					uint64_t now = Utils::now();
-					bool isDuplicate = _r->multicaster->checkDuplicate(mccrc,now);
-
-					if (originalSubmitterAddress == _r->identity.address()) {
-						// Technically should not happen, since the original submitter is
-						// excluded from consideration as a propagation recipient.
-						TRACE("dropped boomerang MULTICAST_FRAME received from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str());
-					} else if ((!isDuplicate)||(_r->topology->amSupernode())) {
-						//
-						// If I am a supernode, I will repeatedly propagate duplicates. That's
-						// because supernodes are used to bridge sparse multicast groups. Non-
-						// supernodes will ignore duplicates completely.
-						//
-						// TODO: supernodes should keep a local bloom filter too and OR it with
-						// the bloom from the packet in order to pick different recipients each
-						// time a multicast returns to them for repropagation.
-						//
-
-						SharedPtr<Peer> originalSubmitter(_r->topology->getPeer(originalSubmitterAddress));
-						if (!originalSubmitter) {
-							TRACE("requesting WHOIS on original multicast frame submitter %s",originalSubmitterAddress.toString().c_str());
-							_r->sw->requestWhois(originalSubmitterAddress);
-							_step = DECODE_WAITING_FOR_MULTICAST_FRAME_ORIGINAL_SENDER_LOOKUP;
-							return false; // try again if/when we get OK(WHOIS)
-						} else if (Multicaster::verifyMulticastPacket(originalSubmitter->identity(),network->id(),fromMac,mg,etherType,dataAndSignature,datalen,dataAndSignature + datalen,signaturelen)) {
-							_r->multicaster->addToDedupHistory(mccrc,now);
-
-							// Even if we are a supernode, we still don't repeatedly inject
-							// duplicates into our own tap.
-							if (!isDuplicate)
-								network->tap().put(fromMac,mg.mac(),etherType,dataAndSignature,datalen);
-
-							if (++hops < ZT_MULTICAST_PROPAGATION_DEPTH) {
-								Address upstream(source()); // save this since we mangle it
-
-								Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES));
-								SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH];
-								unsigned int np = _r->multicaster->pickNextPropagationPeers(
-									*(_r->prng),
-									*(_r->topology),
-									network->id(),
-									mg,
-									originalSubmitterAddress,
-									upstream,
-									bloom,
-									ZT_MULTICAST_PROPAGATION_BREADTH,
-									propPeers,
-									now);
-
-								// In a bit of a hack, we re-use this packet to repeat it
-								// to our multicast propagation recipients. Afterwords we
-								// return true just to be sure this is the end of this
-								// packet's life cycle, since it is now mangled.
-
-								setSource(_r->identity.address());
-								(*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT] = hops;
-								memcpy(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES),bloom.data(),ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES);
-								compress();
-
-								for(unsigned int i=0;i<np;++i) {
-									//TRACE("propagating multicast from original node %s: %s -> %s",originalSubmitterAddress.toString().c_str(),upstream.toString().c_str(),propPeers[i]->address().toString().c_str());
-									// Re-use this packet to re-send multicast frame to everyone
-									// downstream from us.
-									newInitializationVector();
-									setDestination(propPeers[i]->address());
-									_r->sw->send(*this,true);
-								}
-
-								return true;
-							} else {
-								//TRACE("terminating MULTICAST_FRAME propagation from %s(%s): max depth reached",source().toString().c_str(),_remoteAddress.toString().c_str());
+		if ((network)&&(network->isAllowed(source()))) {
+			Address originalSubmitterAddress(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER_ADDRESS,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
+			MAC fromMac(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SOURCE_MAC,6));
+			MulticastGroup mg(MAC(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_DESTINATION_MAC,6)),at<uint32_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ADI));
+			unsigned int hops = (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT];
+			unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ETHERTYPE);
+			unsigned int datalen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD_LENGTH);
+			unsigned int signaturelen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SIGNATURE_LENGTH);
+			unsigned char *dataAndSignature = field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,datalen + signaturelen);
+
+			uint64_t mccrc = Multicaster::computeMulticastDedupCrc(network->id(),fromMac,mg,etherType,dataAndSignature,datalen);
+			uint64_t now = Utils::now();
+			bool isDuplicate = _r->multicaster->checkDuplicate(mccrc,now);
+
+			if (originalSubmitterAddress == _r->identity.address()) {
+				// Technically should not happen, since the original submitter is
+				// excluded from consideration as a propagation recipient.
+				TRACE("dropped boomerang MULTICAST_FRAME received from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str());
+			} else if ((!isDuplicate)||(_r->topology->amSupernode())) {
+				//
+				// If I am a supernode, I will repeatedly propagate duplicates. That's
+				// because supernodes are used to bridge sparse multicast groups. Non-
+				// supernodes will ignore duplicates completely.
+				//
+				// TODO: supernodes should keep a local bloom filter too and OR it with
+				// the bloom from the packet in order to pick different recipients each
+				// time a multicast returns to them for repropagation.
+				//
+
+				SharedPtr<Peer> originalSubmitter(_r->topology->getPeer(originalSubmitterAddress));
+				if (!originalSubmitter) {
+					TRACE("requesting WHOIS on original multicast frame submitter %s",originalSubmitterAddress.toString().c_str());
+					_r->sw->requestWhois(originalSubmitterAddress);
+					_step = DECODE_WAITING_FOR_MULTICAST_FRAME_ORIGINAL_SENDER_LOOKUP;
+					return false; // try again if/when we get OK(WHOIS)
+				} else if (Multicaster::verifyMulticastPacket(originalSubmitter->identity(),network->id(),fromMac,mg,etherType,dataAndSignature,datalen,dataAndSignature + datalen,signaturelen)) {
+					// In checking the multicast rate, we don't re-check if this is
+					// a duplicate. That's because if isDuplicate is true it means
+					// we're a supernode and it's a second pass relay.
+					if ((isDuplicate)||(network->multicastRateGate(originalSubmitter->address(),datalen))) {
+						_r->multicaster->addToDedupHistory(mccrc,now);
+
+						// Even if we are a supernode, we still don't repeatedly inject
+						// duplicates into our own tap.
+						if (!isDuplicate)
+							network->tap().put(fromMac,mg.mac(),etherType,dataAndSignature,datalen);
+
+						if (++hops < ZT_MULTICAST_PROPAGATION_DEPTH) {
+							Address upstream(source()); // save this since we mangle it
+
+							Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES));
+							SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH];
+							unsigned int np = _r->multicaster->pickNextPropagationPeers(
+								*(_r->prng),
+								*(_r->topology),
+								network->id(),
+								mg,
+								originalSubmitterAddress,
+								upstream,
+								bloom,
+								ZT_MULTICAST_PROPAGATION_BREADTH,
+								propPeers,
+								now);
+
+							// In a bit of a hack, we re-use this packet to repeat it
+							// to our multicast propagation recipients. Afterwords we
+							// return true just to be sure this is the end of this
+							// packet's life cycle, since it is now mangled.
+
+							setSource(_r->identity.address());
+							(*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT] = hops;
+							memcpy(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES),bloom.data(),ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES);
+							compress();
+
+							for(unsigned int i=0;i<np;++i) {
+								//TRACE("propagating multicast from original node %s: %s -> %s",originalSubmitterAddress.toString().c_str(),upstream.toString().c_str(),propPeers[i]->address().toString().c_str());
+								// Re-use this packet to re-send multicast frame to everyone
+								// downstream from us.
+								newInitializationVector();
+								setDestination(propPeers[i]->address());
+								_r->sw->send(*this,true);
 							}
+
+							// Return here just to be safe, since this packet's state is no
+							// longer valid.
+							return true;
 						} else {
-							LOG("rejected MULTICAST_FRAME from %s(%s) due to failed signature check (falsely claims origin %s)",source().toString().c_str(),_remoteAddress.toString().c_str(),originalSubmitterAddress.toString().c_str());
+							//TRACE("terminating MULTICAST_FRAME propagation from %s(%s): max depth reached",source().toString().c_str(),_remoteAddress.toString().c_str());
 						}
 					} else {
-						TRACE("dropped redundant MULTICAST_FRAME from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str());
+						LOG("dropped MULTICAST_FRAME from original sender %s: rate limit overrun",originalSubmitter->address().toString().c_str());
 					}
 				} else {
-					TRACE("dropped MULTICAST_FRAME from %s(%s): invalid short packet",source().toString().c_str(),_remoteAddress.toString().c_str());
+					TRACE("rejected MULTICAST_FRAME forwarded by %s(%s): failed signature check (falsely claims origin %s)",source().toString().c_str(),_remoteAddress.toString().c_str(),originalSubmitterAddress.toString().c_str());
 				}
 			} else {
-				TRACE("dropped MULTICAST_FRAME from %s(%s): not a member of closed network %llu",source().toString().c_str(),_remoteAddress.toString().c_str(),network->id());
+				TRACE("dropped duplicate MULTICAST_FRAME from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str());
 			}
 		} else {
-			TRACE("dropped MULTICAST_FRAME from %s(%s): network %llu unknown or we are not a member",source().toString().c_str(),_remoteAddress.toString().c_str(),at<uint64_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_NETWORK_ID));
+			TRACE("dropped MULTICAST_FRAME from %s(%s): network %.16llx unknown or sender not allowed",source().toString().c_str(),_remoteAddress.toString().c_str(),(unsigned long long)network->id());
 		}
 	} catch (std::exception &ex) {
 		TRACE("dropped MULTICAST_FRAME from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),ex.what());

+ 125 - 0
node/RateLimiter.hpp

@@ -0,0 +1,125 @@
+/*
+ * ZeroTier One - Global Peer to Peer Ethernet
+ * Copyright (C) 2012-2013  ZeroTier Networks LLC
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * ZeroTier may be used and distributed under the terms of the GPLv3, which
+ * are available at: http://www.gnu.org/licenses/gpl-3.0.html
+ *
+ * If you would like to embed ZeroTier into a commercial application or
+ * redistribute it in a modified binary form, please contact ZeroTier Networks
+ * LLC. Start here: http://www.zerotier.com/
+ */
+
+#ifndef _ZT_RATELIMITER_HPP
+#define _ZT_RATELIMITER_HPP
+
+#include <math.h>
+#include "Utils.hpp"
+
+namespace ZeroTier {
+
+/**
+ * Burstable rate limiter
+ *
+ * This limits a transfer rate to a maximum bytes per second using an
+ * accounting method based on a balance rather than accumulating an
+ * average rate. The result is a burstable rate limit rather than a
+ * continuous rate limit; the link being limited may use all its balance
+ * at once or slowly over time. Balance constantly replenishes over time
+ * up to a configurable maximum balance.
+ */
+class RateLimiter
+{
+public:
+	/**
+	 * Create an uninitialized rate limiter
+	 *
+	 * init() must be called before this is used.
+	 */
+	RateLimiter() throw() {}
+
+	/**
+	 * @param bytesPerSecond Bytes per second to permit (average)
+	 * @param preload Initial balance to place in account
+	 * @param max Maximum balance to permit to ever accrue (max burst)
+	 */
+	RateLimiter(double bytesPerSecond,double preload,double max)
+		throw()
+	{
+		init(bytesPerSecond,preload,max);
+	}
+
+	/**
+	 * Initialize or re-initialize rate limiter
+	 *
+	 * @param bytesPerSecond Bytes per second to permit (average)
+	 * @param preload Initial balance to place in account
+	 * @param max Maximum balance to permit to ever accrue (max burst)
+	 */
+	inline void init(double bytesPerSecond,double preload,double max)
+		throw()
+	{
+		_bytesPerSecond = bytesPerSecond;
+		_lastTime = Utils::nowf();
+		_balance = preload;
+		_max = max;
+	}
+
+	/**
+	 * Update balance based on current clock
+	 *
+	 * This can be called at any time to check the current balance without
+	 * affecting the behavior of gate().
+	 *
+	 * @return New balance
+	 */
+	inline double updateBalance()
+		throw()
+	{
+		double now = Utils::nowf();
+		double b = _balance = fmin(_max,_balance + (_bytesPerSecond * (now - _lastTime)));
+		_lastTime = now;
+		return b;
+	}
+
+	/**
+	 * Test balance and update / deduct if there is enough to transfer 'bytes'
+	 *
+	 * @param bytes Number of bytes that we wish to transfer
+	 * @return True if balance was sufficient (balance is updated), false if not (balance unchanged)
+	 */
+	inline bool gate(double bytes)
+		throw()
+	{
+		if (updateBalance() >= bytes) {
+			_balance -= bytes;
+			return true;
+		}
+		return false;
+	}
+
+private:
+	double _bytesPerSecond;
+	double _lastTime;
+	double _balance;
+	double _max;
+};
+
+} // namespace ZeroTier
+
+#endif

+ 11 - 0
node/Utils.hpp

@@ -347,6 +347,17 @@ public:
 		return ( (1000ULL * (uint64_t)tv.tv_sec) + (uint64_t)(tv.tv_usec / 1000) );
 	};
 
+	/**
+	 * @return Current time in seconds since epoch, to the highest available resolution
+	 */
+	static inline double nowf()
+		throw()
+	{
+		struct timeval tv;
+		gettimeofday(&tv,(struct timezone *)0);
+		return ( ((double)tv.tv_sec) + (((double)tv.tv_usec) / 1000000.0) );
+	}
+
 	/**
 	 * Read the full contents of a file into a string buffer
 	 *