Browse Source

Yet more minor refactoring and redesign Meter to be more thread-safe.

Adam Ierymenko 5 years ago
parent
commit
e5c7366e71
10 changed files with 131 additions and 80 deletions
  1. 29 10
      node/AES.cpp
  2. 1 1
      node/Buf.hpp
  3. 2 2
      node/LZ4.cpp
  4. 2 2
      node/LZ4.hpp
  5. 4 4
      node/Locator.cpp
  6. 15 15
      node/Locator.hpp
  7. 5 2
      node/Membership.cpp
  8. 10 9
      node/Membership.hpp
  9. 55 34
      node/Meter.hpp
  10. 8 1
      node/Protocol.hpp

+ 29 - 10
node/AES.cpp

@@ -353,53 +353,72 @@ void AES::GMAC::finish(uint8_t tag[16]) noexcept
 		const __m128i h = _aes._k.ni.h;
 		y = _mm_xor_si128(y,_mm_set_epi64x(0LL,(long long)Utils::hton((uint64_t)_len << 3U)));
 		y = _mm_shuffle_epi8(y,s_shuf);
+
 		__m128i encIV = _mm_xor_si128(_mm_loadu_si128(reinterpret_cast<const __m128i *>(_iv)),k[0]);
+
 		__m128i t1 = _mm_clmulepi64_si128(h,y,0x00);
 		__m128i t2 = _mm_clmulepi64_si128(h,y,0x01);
 		__m128i t3 = _mm_clmulepi64_si128(h,y,0x10);
 		__m128i t4 = _mm_clmulepi64_si128(h,y,0x11);
+
 		encIV = _mm_aesenc_si128(encIV,k[1]);
+		encIV = _mm_aesenc_si128(encIV,k[2]);
+
 		t2 = _mm_xor_si128(t2,t3);
 		t3 = _mm_slli_si128(t2,8);
-		encIV = _mm_aesenc_si128(encIV,k[2]);
 		t2 = _mm_srli_si128(t2,8);
 		t1 = _mm_xor_si128(t1,t3);
+
 		encIV = _mm_aesenc_si128(encIV,k[3]);
+
 		t4 = _mm_xor_si128(t4,t2);
 		__m128i t5 = _mm_srli_epi32(t1,31);
-		encIV = _mm_aesenc_si128(encIV,k[4]);
 		t1 = _mm_slli_epi32(t1,1);
 		__m128i t6 = _mm_srli_epi32(t4,31);
-		encIV = _mm_aesenc_si128(encIV,k[5]);
+
+		encIV = _mm_aesenc_si128(encIV,k[4]);
+
 		t4 = _mm_slli_epi32(t4,1);
 		t3 = _mm_srli_si128(t5,12);
-		encIV = _mm_aesenc_si128(encIV,k[6]);
 		t6 = _mm_slli_si128(t6,4);
 		t5 = _mm_slli_si128(t5,4);
-		encIV = _mm_aesenc_si128(encIV,k[7]);
+
+		encIV = _mm_aesenc_si128(encIV,k[5]);
+
 		t1 = _mm_or_si128(t1,t5);
 		t4 = _mm_or_si128(t4,t6);
-		encIV = _mm_aesenc_si128(encIV,k[8]);
+
+		encIV = _mm_aesenc_si128(encIV,k[6]);
+		encIV = _mm_aesenc_si128(encIV,k[7]);
+
 		t4 = _mm_or_si128(t4,t3);
 		t5 = _mm_slli_epi32(t1,31);
-		encIV = _mm_aesenc_si128(encIV,k[9]);
 		t6 = _mm_slli_epi32(t1,30);
 		t3 = _mm_slli_epi32(t1,25);
-		encIV = _mm_aesenc_si128(encIV,k[10]);
+
+		encIV = _mm_aesenc_si128(encIV,k[8]);
+		encIV = _mm_aesenc_si128(encIV,k[9]);
+
 		t5 = _mm_xor_si128(t5,t6);
 		t5 = _mm_xor_si128(t5,t3);
+
+		encIV = _mm_aesenc_si128(encIV,k[10]);
 		encIV = _mm_aesenc_si128(encIV,k[11]);
+
 		t6 = _mm_srli_si128(t5,4);
 		t4 = _mm_xor_si128(t4,t6);
-		encIV = _mm_aesenc_si128(encIV,k[12]);
 		t5 = _mm_slli_si128(t5,12);
 		t1 = _mm_xor_si128(t1,t5);
-		encIV = _mm_aesenc_si128(encIV,k[13]);
+
 		t4 = _mm_xor_si128(t4,t1);
 		t5 = _mm_srli_epi32(t1,1);
 		t2 = _mm_srli_epi32(t1,2);
 		t3 = _mm_srli_epi32(t1,7);
+
+		encIV = _mm_aesenc_si128(encIV,k[12]);
+		encIV = _mm_aesenc_si128(encIV,k[13]);
 		encIV = _mm_aesenclast_si128(encIV,k[14]);
+
 		t4 = _mm_xor_si128(t4,t2);
 		t4 = _mm_xor_si128(t4,t3);
 		t4 = _mm_xor_si128(t4,t5);

+ 1 - 1
node/Buf.hpp

@@ -79,7 +79,7 @@ namespace ZeroTier {
  */
 class Buf
 {
-	friend class SharedPtr< Buf >;
+	friend class SharedPtr<Buf>;
 
 public:
 	// New and delete operators that allocate Buf instances from a shared lock-free memory pool.

+ 2 - 2
node/LZ4.cpp

@@ -750,7 +750,7 @@ FORCE_INLINE int LZ4_decompress_generic(
 
 } // anonymous namespace
 
-int LZ4_compress_fast(const char* source, char* dest, int inputSize, int maxOutputSize, int acceleration)
+int LZ4_compress_fast(const char* source, char* dest, int inputSize, int maxOutputSize, int acceleration) noexcept
 {
 #if (HEAPMODE)
 	void* ctxPtr = ALLOCATOR(1, sizeof(LZ4_stream_t));   /* malloc-calloc always properly aligned */
@@ -767,7 +767,7 @@ int LZ4_compress_fast(const char* source, char* dest, int inputSize, int maxOutp
 	return result;
 }
 
-int LZ4_decompress_safe(const char* source, char* dest, int compressedSize, int maxDecompressedSize)
+int LZ4_decompress_safe(const char* source, char* dest, int compressedSize, int maxDecompressedSize) noexcept
 {
 	return LZ4_decompress_generic(source, dest, compressedSize, maxDecompressedSize, endOnInputSize, full, 0, noDict, (BYTE*)dest, NULL, 0);
 }

+ 2 - 2
node/LZ4.hpp

@@ -18,8 +18,8 @@
 
 namespace ZeroTier {
 
-int LZ4_compress_fast(const char *source,char *dest,int inputSize,int maxOutputSize,int acceleration = 1);
-int LZ4_decompress_safe(const char *source,char *dest,int compressedSize,int maxDecompressedSize);
+int LZ4_compress_fast(const char *source,char *dest,int inputSize,int maxOutputSize,int acceleration = 1) noexcept;
+int LZ4_decompress_safe(const char *source,char *dest,int compressedSize,int maxDecompressedSize) noexcept;
 
 } // namespace ZeroTier
 

+ 4 - 4
node/Locator.cpp

@@ -15,7 +15,7 @@
 
 namespace ZeroTier {
 
-bool Locator::sign(const int64_t ts,const Identity &id)
+bool Locator::sign(const int64_t ts,const Identity &id) noexcept
 {
 	uint8_t signData[ZT_LOCATOR_MARSHAL_SIZE_MAX];
 	if (!id.hasPrivate())
@@ -28,7 +28,7 @@ bool Locator::sign(const int64_t ts,const Identity &id)
 	return (_signatureLength > 0);
 }
 
-bool Locator::verify(const Identity &id) const
+bool Locator::verify(const Identity &id) const noexcept
 {
 	if ((_ts == 0)||(_endpointCount > ZT_LOCATOR_MAX_ENDPOINTS)||(_signatureLength > ZT_SIGNATURE_BUFFER_SIZE))
 		return false;
@@ -37,7 +37,7 @@ bool Locator::verify(const Identity &id) const
 	return id.verify(signData,signLen,_signature,_signatureLength);
 }
 
-int Locator::marshal(uint8_t data[ZT_LOCATOR_MARSHAL_SIZE_MAX],const bool excludeSignature) const
+int Locator::marshal(uint8_t data[ZT_LOCATOR_MARSHAL_SIZE_MAX],const bool excludeSignature) const noexcept
 {
 	if ((_endpointCount > ZT_LOCATOR_MAX_ENDPOINTS)||(_signatureLength > ZT_SIGNATURE_BUFFER_SIZE))
 		return -1;
@@ -69,7 +69,7 @@ int Locator::marshal(uint8_t data[ZT_LOCATOR_MARSHAL_SIZE_MAX],const bool exclud
 	return p;
 }
 
-int Locator::unmarshal(const uint8_t *restrict data,const int len)
+int Locator::unmarshal(const uint8_t *restrict data,const int len) noexcept
 {
 	if (len <= (8 + 2 + 48))
 		return -1;

+ 15 - 15
node/Locator.hpp

@@ -37,42 +37,42 @@ namespace ZeroTier {
 class Locator : public TriviallyCopyable
 {
 public:
-	ZT_ALWAYS_INLINE Locator() { this->clear(); }
+	ZT_ALWAYS_INLINE Locator() noexcept { memoryZero(this); }
 
 	/**
 	 * Zero the Locator data structure
 	 */
-	ZT_ALWAYS_INLINE void clear() { memset(reinterpret_cast<void *>(this),0,sizeof(Locator)); }
+	ZT_ALWAYS_INLINE void clear() noexcept { memoryZero(this); }
 
 	/**
 	 * @return Timestamp (a.k.a. revision number) set by Location signer
 	 */
-	ZT_ALWAYS_INLINE int64_t timestamp() const { return _ts; }
+	ZT_ALWAYS_INLINE int64_t timestamp() const noexcept { return _ts; }
 
 	/**
 	 * @return True if locator is signed
 	 */
-	ZT_ALWAYS_INLINE bool isSigned() const { return (_signatureLength > 0); }
+	ZT_ALWAYS_INLINE bool isSigned() const noexcept { return (_signatureLength > 0); }
 
 	/**
 	 * @return Length of signature in bytes or 0 if none
 	 */
-	ZT_ALWAYS_INLINE unsigned int signatureLength() const { return _signatureLength; }
+	ZT_ALWAYS_INLINE unsigned int signatureLength() const noexcept { return _signatureLength; }
 
 	/**
 	 * @return Pointer to signature bytes
 	 */
-	ZT_ALWAYS_INLINE const uint8_t *signature() const { return _signature; }
+	ZT_ALWAYS_INLINE const uint8_t *signature() const noexcept { return _signature; }
 
 	/**
 	 * @return Number of endpoints in this locator
 	 */
-	ZT_ALWAYS_INLINE unsigned int endpointCount() const { return _endpointCount; }
+	ZT_ALWAYS_INLINE unsigned int endpointCount() const noexcept { return _endpointCount; }
 
 	/**
 	 * @return Pointer to array of endpoints
 	 */
-	ZT_ALWAYS_INLINE const Endpoint *endpoints() const { return _at; }
+	ZT_ALWAYS_INLINE const Endpoint *endpoints() const noexcept { return _at; }
 
 	/**
 	 * Add an endpoint to this locator
@@ -83,7 +83,7 @@ public:
 	 * @param ep Endpoint to add
 	 * @return True if endpoint was added (or already present), false if locator is full
 	 */
-	ZT_ALWAYS_INLINE bool add(const Endpoint &ep)
+	ZT_ALWAYS_INLINE bool add(const Endpoint &ep) noexcept
 	{
 		if (_endpointCount >= ZT_LOCATOR_MAX_ENDPOINTS)
 			return false;
@@ -100,7 +100,7 @@ public:
 	 * @param id Identity that includes private key
 	 * @return True if signature successful
 	 */
-	bool sign(int64_t ts,const Identity &id);
+	bool sign(int64_t ts,const Identity &id) noexcept;
 
 	/**
 	 * Verify this Locator's validity and signature
@@ -108,13 +108,13 @@ public:
 	 * @param id Identity corresponding to hash
 	 * @return True if valid and signature checks out
 	 */
-	bool verify(const Identity &id) const;
+	bool verify(const Identity &id) const noexcept;
 
-	explicit ZT_ALWAYS_INLINE operator bool() const { return (_ts != 0); }
+	explicit ZT_ALWAYS_INLINE operator bool() const noexcept { return (_ts != 0); }
 
-	static ZT_ALWAYS_INLINE int marshalSizeMax() { return ZT_LOCATOR_MARSHAL_SIZE_MAX; }
-	int marshal(uint8_t data[ZT_LOCATOR_MARSHAL_SIZE_MAX],bool excludeSignature = false) const;
-	int unmarshal(const uint8_t *restrict data,int len);
+	static constexpr int marshalSizeMax() noexcept { return ZT_LOCATOR_MARSHAL_SIZE_MAX; }
+	int marshal(uint8_t data[ZT_LOCATOR_MARSHAL_SIZE_MAX],bool excludeSignature = false) const noexcept;
+	int unmarshal(const uint8_t *restrict data,int len) noexcept;
 
 private:
 	int64_t _ts;

+ 5 - 2
node/Membership.cpp

@@ -17,7 +17,6 @@
 #include "RuntimeEnvironment.hpp"
 #include "Peer.hpp"
 #include "Topology.hpp"
-#include "Switch.hpp"
 #include "Node.hpp"
 
 namespace ZeroTier {
@@ -33,6 +32,10 @@ Membership::Membership() :
 {
 }
 
+Membership::~Membership()
+{
+}
+
 void Membership::pushCredentials(const RuntimeEnvironment *RR,void *tPtr,const int64_t now,const Address &peerAddress,const NetworkConfig &nconf)
 {
 	const Capability *sendCaps[ZT_MAX_NETWORK_CAPABILITIES];
@@ -218,7 +221,7 @@ Membership::AddCredentialResult Membership::addCredential(const RuntimeEnvironme
 	}
 }
 
-bool Membership::_isUnspoofableAddress(const NetworkConfig &nconf,const InetAddress &ip) const
+bool Membership::_isUnspoofableAddress(const NetworkConfig &nconf,const InetAddress &ip) const noexcept
 {
 	if ((ip.isV6())&&(nconf.ndpEmulation())) {
 		const InetAddress sixpl(InetAddress::makeIpv66plane(nconf.networkId,nconf.issuedTo.toInt()));

+ 10 - 9
node/Membership.hpp

@@ -49,6 +49,7 @@ public:
 	};
 
 	Membership();
+	~Membership();
 
 	/**
 	 * Send COM and other credentials to this peer
@@ -64,7 +65,7 @@ public:
 	/**
 	 * @return Time we last pushed credentials to this member
 	 */
-	ZT_ALWAYS_INLINE int64_t lastPushedCredentials() const { return _lastPushedCredentials; }
+	ZT_ALWAYS_INLINE int64_t lastPushedCredentials() const noexcept { return _lastPushedCredentials; }
 
 	/**
 	 * Check whether the peer represented by this Membership should be allowed on this network at all
@@ -72,7 +73,7 @@ public:
 	 * @param nconf Our network config
 	 * @return True if this peer is allowed on this network at all
 	 */
-	ZT_ALWAYS_INLINE bool isAllowedOnNetwork(const NetworkConfig &nconf) const
+	ZT_ALWAYS_INLINE bool isAllowedOnNetwork(const NetworkConfig &nconf) const noexcept
 	{
 		if (nconf.isPublic()) return true; // public network
 		if (_com.timestamp() <= _comRevocationThreshold) return false; // COM has been revoked
@@ -88,7 +89,7 @@ public:
 	 * @return True if this peer has a certificate of ownership for the given resource
 	 */
 	template<typename T>
-	ZT_ALWAYS_INLINE bool peerOwnsAddress(const NetworkConfig &nconf,const T &r) const
+	ZT_ALWAYS_INLINE bool peerOwnsAddress(const NetworkConfig &nconf,const T &r) const noexcept
 	{
 		if (_isUnspoofableAddress(nconf,r))
 			return true;
@@ -109,7 +110,7 @@ public:
 	 * @param id Tag ID
 	 * @return Pointer to tag or NULL if not found
 	 */
-	ZT_ALWAYS_INLINE const Tag *getTag(const NetworkConfig &nconf,const uint32_t id) const
+	ZT_ALWAYS_INLINE const Tag *getTag(const NetworkConfig &nconf,const uint32_t id) const noexcept
 	{
 		const Tag *const t = _remoteTags.get(id);
 		return (((t)&&(_isCredentialTimestampValid(nconf,*t))) ? t : (Tag *)0);
@@ -138,13 +139,13 @@ private:
 	// This returns true if a resource is an IPv6 NDP-emulated address. These embed the ZT
 	// address of the peer and therefore cannot be spoofed, causing peerOwnsAddress() to
 	// always return true for them. A certificate is not required for these.
-	ZT_ALWAYS_INLINE bool _isUnspoofableAddress(const NetworkConfig &nconf,const MAC &m) const { return false; }
-	bool _isUnspoofableAddress(const NetworkConfig &nconf,const InetAddress &ip) const;
+	ZT_ALWAYS_INLINE bool _isUnspoofableAddress(const NetworkConfig &nconf,const MAC &m) const noexcept { return false; }
+	bool _isUnspoofableAddress(const NetworkConfig &nconf,const InetAddress &ip) const noexcept;
 
 	// This compares the remote credential's timestamp to the timestamp in our network config
 	// plus or minus the permitted maximum timestamp delta.
 	template<typename C>
-	ZT_ALWAYS_INLINE bool _isCredentialTimestampValid(const NetworkConfig &nconf,const C &remoteCredential) const
+	ZT_ALWAYS_INLINE bool _isCredentialTimestampValid(const NetworkConfig &nconf,const C &remoteCredential) const noexcept
 	{
 		const int64_t ts = remoteCredential.timestamp();
 		if (((ts >= nconf.timestamp) ? (ts - nconf.timestamp) : (nconf.timestamp - ts)) <= nconf.credentialTimeMaxDelta) {
@@ -190,7 +191,7 @@ public:
 	class CapabilityIterator
 	{
 	public:
-		ZT_ALWAYS_INLINE CapabilityIterator(Membership &m,const NetworkConfig &nconf) :
+		ZT_ALWAYS_INLINE CapabilityIterator(Membership &m,const NetworkConfig &nconf) noexcept :
 			_hti(m._remoteCaps),
 			_k(nullptr),
 			_c(nullptr),
@@ -199,7 +200,7 @@ public:
 		{
 		}
 
-		ZT_ALWAYS_INLINE Capability *next()
+		ZT_ALWAYS_INLINE Capability *next() noexcept
 		{
 			while (_hti.next(_k,_c)) {
 				if (_m._isCredentialTimestampValid(_nconf,*_c))

+ 55 - 34
node/Meter.hpp

@@ -17,58 +17,79 @@
 #include "Constants.hpp"
 #include "Mutex.hpp"
 
-#define ZT_METER_HISTORY_LENGTH 4
-#define ZT_METER_HISTORY_TICK_DURATION 1000
+#include <algorithm>
 
 namespace ZeroTier {
 
 /**
- * Transfer rate meter (thread-safe)
+ * Transfer rate and total transferred amount meter
+ *
+ * @tparam TUNIT Unit of time in milliseconds (default: 1000 for one second)
+ * @tparam LSIZE Log size in units of time (default: 60 for one minute worth of data)
  */
+template<int64_t TUNIT = 1000,unsigned long LSIZE = 60>
 class Meter
 {
 public:
-	ZT_ALWAYS_INLINE Meter() noexcept
-	{
-		for(int i=0;i<ZT_METER_HISTORY_LENGTH;++i)
-			_history[i] = 0.0;
-		_ts = 0;
-		_count = 0;
-	}
+	/**
+	 * Create and initialize a new meter
+	 *
+	 * @param now Start time
+	 */
+	ZT_ALWAYS_INLINE Meter(const int64_t now) noexcept : startTime(now) {}
 
+	/**
+	 * Add a measurement
+	 *
+	 * @tparam I Type of 'count' (usually inferred)
+	 * @param now Current time
+	 * @param count Count of items (usually bytes)
+	 */
 	template<typename I>
 	ZT_ALWAYS_INLINE void log(const int64_t now,I count) noexcept
 	{
-		const int64_t since = now - _ts;
-		if (since >= ZT_METER_HISTORY_TICK_DURATION) {
-			_ts = now;
-			_history[++_hptr % ZT_METER_HISTORY_LENGTH] = (double)_count / ((double)since / 1000.0);
-			_count = (uint64_t)count;
-		} else {
-			_count += (uint64_t)count;
-		}
+		_total += (uint64_t)count;
+
+		// We log by choosing a log bucket based on the current time in units modulo
+		// the log size and then if it's a new bucket setting it or otherwise adding
+		// to it.
+		const unsigned long bucket = ((unsigned int)((uint64_t)(now / TUNIT))) % LSIZE;
+		const unsigned long prevBucket = _bucket.exchange(bucket);
+		if (prevBucket != bucket)
+			_counts[bucket].store((uint64_t)count);
+		else _counts[bucket].fetch_add((uint64_t)count);
 	}
 
-	ZT_ALWAYS_INLINE double perSecond(const int64_t now) const noexcept
+	/**
+	 * Get rate per TUNIT time
+	 *
+	 * @param now Current time
+	 * @return Count per TUNIT time (rate)
+	 */
+	ZT_ALWAYS_INLINE double rate(const int64_t now) const noexcept
 	{
-		double r = 0.0,n = 0.0;
-		const int64_t since = (now - _ts);
-		if (since >= ZT_METER_HISTORY_TICK_DURATION) {
-			r += (double)_count / ((double)since / 1000.0);
-			n += 1.0;
-		}
-		for(int i=0;i<ZT_METER_HISTORY_LENGTH;++i) {
-			r += _history[i];
-			n += 1.0;
-		}
-		return r / n;
+		// Rate is computed by looking back at N buckets where N is the smaller of
+		// the size of the log or the number of units since the start time.
+		const unsigned long lookback = std::min((unsigned long)((now - startTime) / TUNIT),LSIZE);
+		if (lookback == 0)
+			return 0.0;
+		unsigned long bi = ((unsigned int)((uint64_t)(now / TUNIT)));
+		double sum = 0.0;
+		for(unsigned long l=0;l<lookback;++l)
+			sum += (double)_counts[bi-- % LSIZE].load();
+		return sum / (double)lookback;
 	}
 
+	/**
+	 * @return Total count since meter was created
+	 */
+	ZT_ALWAYS_INLINE uint64_t total() const noexcept { return _total.load(); }
+
 private:
-	volatile double _history[ZT_METER_HISTORY_LENGTH];
-	volatile int64_t _ts;
-	volatile uint64_t _count;
-	std::atomic<unsigned int> _hptr;
+	const int64_t startTime;
+	std::atomic<uint64_t> _total;
+	std::atomic<uint64_t> _counts[LSIZE];
+	std::atomic<unsigned long> _bucket;
 };
 
 } // namespace ZeroTier

+ 8 - 1
node/Protocol.hpp

@@ -263,6 +263,13 @@ namespace Protocol {
  */
 enum Verb
 {
+	/**
+	 * No operation
+	 *
+	 * This packet does nothing, but it is sometimes sent as a probe to
+	 * trigger a HELLO exchange as the code will attempt HELLO when it
+	 * receives a packet from an unidentified source.
+	 */
 	VERB_NOP = 0x00,
 
 	/**
@@ -280,8 +287,8 @@ enum Verb
 	 *   <[2] 16-bit length of meta-data dictionary>
 	 *   <[...] meta-data dictionary>
 	 *   <[2] 16-bit length of any additional fields>
-	 *   <[48] HMAC-SHA384 of full plaintext payload>
 	 *   [... end encrypted region ...]
+	 *   <[48] HMAC-SHA384 of full plaintext payload>
 	 *
 	 * HELLO is sent with authentication but without the usual encryption so
 	 * that peers can exchange identities.