Browse Source

Yet more cleanup, and add meters to paths.

Adam Ierymenko 5 years ago
parent
commit
0589964f99
7 changed files with 51 additions and 61 deletions
  1. 0 5
      node/Constants.hpp
  2. 1 1
      node/Identity.cpp
  3. 1 1
      node/Identity.hpp
  4. 19 31
      node/Meter.hpp
  5. 1 0
      node/Path.cpp
  6. 28 22
      node/Path.hpp
  7. 1 1
      node/Topology.cpp

+ 0 - 5
node/Constants.hpp

@@ -116,11 +116,6 @@
  */
 #define ZT_PATH_ALIVE_TIMEOUT ((ZT_PATH_KEEPALIVE_PERIOD * 2) + 5000)
 
-/**
- * Timeout for path active-ness (measured from last receive)
- */
-#define ZT_PATH_ACTIVITY_TIMEOUT (ZT_PATH_KEEPALIVE_PERIOD + 5000)
-
 /**
  * Delay between full HELLO messages between peers
  */

+ 1 - 1
node/Identity.cpp

@@ -156,7 +156,7 @@ bool Identity::locallyValidate() const
 	return false;
 }
 
-void Identity::hashWithPrivate(uint8_t h[48]) const
+void Identity::hashWithPrivate(uint8_t h[ZT_IDENTITY_HASH_SIZE]) const
 {
 	if (_hasPrivate) {
 		switch (_type) {

+ 1 - 1
node/Identity.hpp

@@ -137,7 +137,7 @@ public:
 	 *
 	 * @param h Buffer to store SHA384 hash
 	 */
-	void hashWithPrivate(uint8_t h[48]) const;
+	void hashWithPrivate(uint8_t h[ZT_IDENTITY_HASH_SIZE]) const;
 
 	/**
 	 * Sign a message with this identity (private key required)

+ 19 - 31
node/Meter.hpp

@@ -25,9 +25,9 @@ namespace ZeroTier {
  * Transfer rate and total transferred amount meter
  *
  * @tparam TUNIT Unit of time in milliseconds (default: 1000 for one second)
- * @tparam LSIZE Log size in units of time (default: 60 for one minute worth of data)
+ * @tparam LSIZE Log size in units of time (default: 10 for 10s worth of data)
  */
-template<int64_t TUNIT = 1000,unsigned long LSIZE = 60>
+template<int64_t TUNIT = 1000,unsigned long LSIZE = 10>
 class Meter
 {
 public:
@@ -36,58 +36,46 @@ public:
 	 *
 	 * @param now Start time
 	 */
-	ZT_ALWAYS_INLINE Meter(const int64_t now) noexcept : startTime(now) {}
+	ZT_ALWAYS_INLINE Meter(const int64_t now) noexcept {}
 
 	/**
 	 * Add a measurement
 	 *
-	 * @tparam I Type of 'count' (usually inferred)
 	 * @param now Current time
 	 * @param count Count of items (usually bytes)
 	 */
-	template<typename I>
-	ZT_ALWAYS_INLINE void log(const int64_t now,I count) noexcept
+	ZT_ALWAYS_INLINE void log(const int64_t now,uint64_t count) noexcept
 	{
-		_total += (uint64_t)count;
-
 		// We log by choosing a log bucket based on the current time in units modulo
 		// the log size and then if it's a new bucket setting it or otherwise adding
 		// to it.
-		const unsigned long bucket = ((unsigned int)((uint64_t)(now / TUNIT))) % LSIZE;
-		if (_bucket.exchange(bucket) != bucket)
-			_counts[bucket].store((uint64_t)count);
-		else _counts[bucket].fetch_add((uint64_t)count);
+		const unsigned long bucket = ((unsigned long)(now / TUNIT)) % LSIZE;
+		if (_bucket.exchange(bucket) != bucket) {
+			_totalExclCounts.fetch_add(_counts[bucket].exchange(count));
+		} else {
+			_counts[bucket].fetch_add(count);
+		}
 	}
 
 	/**
 	 * Get rate per TUNIT time
 	 *
 	 * @param now Current time
-	 * @return Count per TUNIT time (rate)
+	 * @param rate Result parameter: rate in count/TUNIT
+	 * @param total Total count for life of object
 	 */
-	ZT_ALWAYS_INLINE double rate(const int64_t now) const noexcept
+	ZT_ALWAYS_INLINE void rate(double &rate,uint64_t &total) const noexcept
 	{
-		// Rate is computed by looking back at N buckets where N is the smaller of
-		// the size of the log or the number of units since the start time.
-		const unsigned long lookback = std::min((unsigned long)((now - startTime) / TUNIT),LSIZE);
-		if (lookback == 0)
-			return 0.0;
-		unsigned long bi = ((unsigned int)((uint64_t)(now / TUNIT)));
-		double sum = 0.0;
-		for(unsigned long l=0;l<lookback;++l)
-			sum += (double)_counts[bi-- % LSIZE].load();
-		return sum / (double)lookback;
+		total = 0;
+		for(unsigned long i=0;i<LSIZE;++i)
+			total += _counts[i].load();
+		rate = (double)total / (double)LSIZE;
+		total += _totalExclCounts.load();
 	}
 
-	/**
-	 * @return Total count since meter was created
-	 */
-	ZT_ALWAYS_INLINE uint64_t total() const noexcept { return _total.load(); }
-
 private:
-	const int64_t startTime;
-	std::atomic<uint64_t> _total;
 	std::atomic<uint64_t> _counts[LSIZE];
+	std::atomic<uint64_t> _totalExclCounts;
 	std::atomic<unsigned long> _bucket;
 };
 

+ 1 - 0
node/Path.cpp

@@ -21,6 +21,7 @@ bool Path::send(const RuntimeEnvironment *RR,void *tPtr,const void *data,unsigne
 {
 	if (RR->node->putPacket(tPtr,_localSocket,_addr,data,len)) {
 		_lastOut = now;
+		_outMeter.log(now,len);
 		return true;
 	}
 	return false;

+ 28 - 22
node/Path.hpp

@@ -14,6 +14,13 @@
 #ifndef ZT_PATH_HPP
 #define ZT_PATH_HPP
 
+#include "Constants.hpp"
+#include "InetAddress.hpp"
+#include "SharedPtr.hpp"
+#include "Utils.hpp"
+#include "Mutex.hpp"
+#include "Meter.hpp"
+
 #include <cstdint>
 #include <cstring>
 #include <cstdlib>
@@ -21,12 +28,6 @@
 #include <algorithm>
 #include <set>
 
-#include "Constants.hpp"
-#include "InetAddress.hpp"
-#include "SharedPtr.hpp"
-#include "Utils.hpp"
-#include "Mutex.hpp"
-
 namespace ZeroTier {
 
 class RuntimeEnvironment;
@@ -69,30 +70,33 @@ public:
 	/**
 	 * Explicitly update last sent time
 	 *
-	 * @param t Time of send
+	 * @param now Time of send
+	 * @param bytes Bytes sent
 	 */
-	ZT_ALWAYS_INLINE void sent(const int64_t t) noexcept { _lastOut = t; }
+	ZT_ALWAYS_INLINE void sent(const int64_t now,const unsigned int bytes) noexcept
+	{
+		_lastOut.store(now);
+		_outMeter.log(now,bytes);
+	}
 
 	/**
 	 * Called when a packet is received from this remote path, regardless of content
 	 *
-	 * @param t Time of receive
+	 * @param now Time of receive
+	 * @param bytes Bytes received
 	 */
-	ZT_ALWAYS_INLINE void received(const int64_t t) noexcept { _lastIn = t; }
+	ZT_ALWAYS_INLINE void received(const int64_t now,const unsigned int bytes) noexcept
+	{
+		_lastIn.store(now);
+		_inMeter.log(now,bytes);
+	}
 
 	/**
 	 * Check path aliveness
 	 *
 	 * @param now Current time
 	 */
-	ZT_ALWAYS_INLINE bool alive(const int64_t now) const noexcept { return ((now - _lastIn) < ZT_PATH_ALIVE_TIMEOUT); }
-
-	/**
-	 * Check if path is considered active
-	 *
-	 * @param now Current time
-	 */
-	ZT_ALWAYS_INLINE bool active(const int64_t now) const noexcept { return ((now - _lastIn) < ZT_PATH_ACTIVITY_TIMEOUT); }
+	ZT_ALWAYS_INLINE bool alive(const int64_t now) const noexcept { return ((now - _lastIn.load()) < ZT_PATH_ALIVE_TIMEOUT); }
 
 	/**
 	 * @return Physical address
@@ -107,18 +111,20 @@ public:
 	/**
 	 * @return Last time we received anything
 	 */
-	ZT_ALWAYS_INLINE int64_t lastIn() const noexcept { return _lastIn; }
+	ZT_ALWAYS_INLINE int64_t lastIn() const noexcept { return _lastIn.load(); }
 
 	/**
 	 * @return Last time we sent something
 	 */
-	ZT_ALWAYS_INLINE int64_t lastOut() const noexcept { return _lastOut; }
+	ZT_ALWAYS_INLINE int64_t lastOut() const noexcept { return _lastOut.load(); }
 
 private:
 	int64_t _localSocket;
-	int64_t _lastIn;
-	int64_t _lastOut;
+	std::atomic<int64_t> _lastIn;
+	std::atomic<int64_t> _lastOut;
 	InetAddress _addr;
+	Meter<> _inMeter;
+	Meter<> _outMeter;
 
 	// These fields belong to Defragmenter but are kept in Path for performance
 	// as it's much faster this way than having Defragmenter maintain another

+ 1 - 1
node/Topology.cpp

@@ -224,7 +224,7 @@ void Topology::doPeriodicTasks(void *tPtr,const int64_t now)
 		uint64_t *k = nullptr;
 		SharedPtr<Path> *p = nullptr;
 		while (i.next(k,p)) {
-			if (p->references() <= 1)
+			if ((p->references() <= 1)&&(!(*p)->alive(now)))
 				_paths.erase(*k);
 		}
 	}