Browse Source

Trim some unnecessary locks from root, and cleanup elsewhere.

Adam Ierymenko 5 years ago
parent
commit
e245eb1eb5
4 changed files with 101 additions and 72 deletions
  1. 6 0
      node/Constants.hpp
  2. 9 27
      node/Meter.hpp
  3. 84 8
      node/Topology.hpp
  4. 2 37
      root/root.cpp

+ 6 - 0
node/Constants.hpp

@@ -157,6 +157,12 @@
 #define ZT_PACKED_STRUCT(D) D __attribute__((packed))
 #define ZT_PACKED_STRUCT(D) D __attribute__((packed))
 #endif
 #endif
 
 
+#if __cplusplus > 199711L
+#ifndef __CPP11__
+#define __CPP11__
+#endif
+#endif
+
 /**
 /**
  * Length of a ZeroTier address in bytes
  * Length of a ZeroTier address in bytes
  */
  */

+ 9 - 27
node/Meter.hpp

@@ -16,6 +16,7 @@
 
 
 #include "Constants.hpp"
 #include "Constants.hpp"
 #include "Mutex.hpp"
 #include "Mutex.hpp"
+#include "AtomicCounter.hpp"
 
 
 #define ZT_METER_HISTORY_LENGTH 4
 #define ZT_METER_HISTORY_LENGTH 4
 #define ZT_METER_HISTORY_TICK_DURATION 1000
 #define ZT_METER_HISTORY_TICK_DURATION 1000
@@ -35,40 +36,23 @@ public:
 		_ts = 0;
 		_ts = 0;
 		_count = 0;
 		_count = 0;
 	}
 	}
-	ZT_ALWAYS_INLINE Meter(const Meter &m) { *this = m; }
-
-	ZT_ALWAYS_INLINE Meter &operator=(const Meter &m)
-	{
-		m._lock.lock();
-		for(int i=0;i<ZT_METER_HISTORY_LENGTH;++i)
-			_history[i] = m._history[i];
-		_ts = m._ts;
-		_count = m._count;
-		m._lock.unlock();
-		return *this;
-	}
 
 
 	template<typename I>
 	template<typename I>
 	ZT_ALWAYS_INLINE void log(const int64_t now,I count)
 	ZT_ALWAYS_INLINE void log(const int64_t now,I count)
 	{
 	{
-		_lock.lock();
 		const int64_t since = now - _ts;
 		const int64_t since = now - _ts;
 		if (since >= ZT_METER_HISTORY_TICK_DURATION) {
 		if (since >= ZT_METER_HISTORY_TICK_DURATION) {
 			_ts = now;
 			_ts = now;
-			for(int i=1;i<ZT_METER_HISTORY_LENGTH;++i)
-				_history[i-1] = _history[i];
-			_history[ZT_METER_HISTORY_LENGTH-1] = (double)_count / ((double)since / 1000.0);
-			_count = 0;
+			_history[(unsigned int)(++_hptr) % ZT_METER_HISTORY_LENGTH] = (double)_count / ((double)since / 1000.0);
+			_count = (uint64_t)count;
+		} else {
+			_count += (uint64_t)count;
 		}
 		}
-		_count += (uint64_t)count;
-		_lock.unlock();
 	}
 	}
 
 
 	ZT_ALWAYS_INLINE double perSecond(const int64_t now) const
 	ZT_ALWAYS_INLINE double perSecond(const int64_t now) const
 	{
 	{
 		double r = 0.0,n = 0.0;
 		double r = 0.0,n = 0.0;
-
-		_lock.lock();
 		const int64_t since = (now - _ts);
 		const int64_t since = (now - _ts);
 		if (since >= ZT_METER_HISTORY_TICK_DURATION) {
 		if (since >= ZT_METER_HISTORY_TICK_DURATION) {
 			r += (double)_count / ((double)since / 1000.0);
 			r += (double)_count / ((double)since / 1000.0);
@@ -78,16 +62,14 @@ public:
 			r += _history[i];
 			r += _history[i];
 			n += 1.0;
 			n += 1.0;
 		}
 		}
-		_lock.unlock();
-
 		return r / n;
 		return r / n;
 	}
 	}
 
 
 private:
 private:
-	double _history[ZT_METER_HISTORY_LENGTH];
-	int64_t _ts;
-	uint64_t _count;
-	Mutex _lock;
+	volatile double _history[ZT_METER_HISTORY_LENGTH];
+	volatile int64_t _ts;
+	volatile uint64_t _count;
+	volatile AtomicCounter _hptr;
 };
 };
 
 
 } // namespace ZeroTier
 } // namespace ZeroTier

+ 84 - 8
node/Topology.hpp

@@ -46,6 +46,17 @@ class RuntimeEnvironment;
  */
  */
 class Topology
 class Topology
 {
 {
+private:
+	static _RootRankingFunction
+	{
+		ZT_ALWAYS_INLINE _RootRankingFunction() : bestRoot(),bestRootLatency(0xffff) {}
+		ZT_ALWAYS_INLINE bool operator()(const SharedPtr<Peer> &peer,const std::vector<InetAddress> &phy)
+		{
+		}
+		SharedPtr<Peer> bestRoot;
+		unsigned int bestRootLatency;
+	};
+
 public:
 public:
 	ZT_ALWAYS_INLINE Topology(const RuntimeEnvironment *renv,const Identity &myId) :
 	ZT_ALWAYS_INLINE Topology(const RuntimeEnvironment *renv,const Identity &myId) :
 		RR(renv),
 		RR(renv),
@@ -296,9 +307,80 @@ public:
 	}
 	}
 #endif
 #endif
 
 
-	ZT_ALWAYS_INLINE SharedPtr<Peer> root(const int64_t now)
+	/**
+	 * Apply a function or function object to all roots
+	 *
+	 * This locks the root list during execution but other operations
+	 * are fine.
+	 *
+	 * @param f Function to apply f(peer,IPs)
+	 * @tparam F function or function object type
+	 */
+	template<typename F>
+	inline void eachRoot(F f)
 	{
 	{
-		return SharedPtr<Peer>();
+		{
+			Mutex::Lock l(_staticRoots_l);
+			Hashtable< Identity,std::vector<InetAddress> >::Iterator i(_staticRoots);
+			Identity *k = (Identity *)0;
+			std::vector<InetAddress> *v = (std::vector<InetAddress> *)0;
+			while (i.next(k,v)) {
+				if (!v->empty()) {
+					const SharedPtr<Peer> *ap;
+					{
+						Mutex::Lock l2(_peers_l);
+						ap = _peers.get(k->address());
+					}
+					if (ap) {
+						if (!f(*ap,*v))
+							return;
+					} else {
+						SharedPtr<Peer> p(new Peer(RR,_myIdentity,*k));
+						{
+							Mutex::Lock l2(_peers_l);
+							_peers.set(k->address(),p);
+						}
+						if (!f(p,*v))
+							return;
+					}
+				}
+			}
+		}
+		{
+			Mutex::Lock l(_dynamicRoots_l);
+			Hashtable< Str,Locator >::Iterator i(_dynamicRoots);
+			Str *k = (Str *)0;
+			Locator *v = (Locator *)0;
+			while (i.next(k,v)) {
+				if (*v) {
+					for(std::vector<Identity>::const_iterator id(v->virt().begin());id!=v->virt().end();++id) {
+						const SharedPtr<Peer> *ap;
+						{
+							Mutex::Lock l2(_peers_l);
+							ap = _peers.get(id->address());
+						}
+						if (ap) {
+							if (!f(*ap,v->phy()))
+								return;
+						} else {
+							SharedPtr<Peer> p(new Peer(RR,_myIdentity,*id));
+							{
+								Mutex::Lock l2(_peers_l);
+								_peers.set(id->address(),p);
+							}
+							if (!f(p,v->phy()))
+								return;
+						}
+					}
+				}
+			}
+		}
+	}
+
+	inline SharedPtr<Peer> root(const int64_t now)
+	{
+		_RootRankingFunc rrf;
+		eachRoot(rrf);
 	}
 	}
 
 
 	/**
 	/**
@@ -487,12 +569,6 @@ private:
 	Hashtable< Identity,std::vector<InetAddress> > _staticRoots;
 	Hashtable< Identity,std::vector<InetAddress> > _staticRoots;
 	Hashtable< Str,Locator > _dynamicRoots;
 	Hashtable< Str,Locator > _dynamicRoots;
 
 
-	//std::vector<Root> _roots;
-	//SharedPtr<Peer> _bestRoot;
-	//int64_t _lastRankedBestRoot;
-	//Mutex _roots_m;
-	//Mutex _bestRoot_m;
-
 	Mutex _peers_l;
 	Mutex _peers_l;
 	Mutex _paths_l;
 	Mutex _paths_l;
 	Mutex _staticRoots_l;
 	Mutex _staticRoots_l;

+ 2 - 37
root/root.cpp

@@ -173,14 +173,6 @@ struct RendezvousStats
 	int64_t ts;
 	int64_t ts;
 };
 };
 
 
-struct ForwardingStats
-{
-	ForwardingStats() : bytes(0),ts(0),bps() {}
-	uint64_t bytes;
-	int64_t ts;
-	Meter bps;
-};
-
 // These fields are not locked as they're only initialized on startup or are atomic
 // These fields are not locked as they're only initialized on startup or are atomic
 static int64_t s_startTime;            // Time service was started
 static int64_t s_startTime;            // Time service was started
 static std::vector<int> s_ports;       // Ports to bind for UDP traffic
 static std::vector<int> s_ports;       // Ports to bind for UDP traffic
@@ -209,7 +201,6 @@ static std::unordered_map< uint64_t,std::unordered_map< MulticastGroup,std::unor
 static std::unordered_map< Identity,SharedPtr<RootPeer>,IdentityHasher > s_peersByIdentity;
 static std::unordered_map< Identity,SharedPtr<RootPeer>,IdentityHasher > s_peersByIdentity;
 static std::unordered_map< Address,std::set< SharedPtr<RootPeer> >,AddressHasher > s_peersByVirtAddr;
 static std::unordered_map< Address,std::set< SharedPtr<RootPeer> >,AddressHasher > s_peersByVirtAddr;
 static std::unordered_map< RendezvousKey,RendezvousStats,RendezvousKey::Hasher > s_rendezvousTracking;
 static std::unordered_map< RendezvousKey,RendezvousStats,RendezvousKey::Hasher > s_rendezvousTracking;
-static std::unordered_map< Address,ForwardingStats,AddressHasher > s_lastForwardedTo;
 
 
 static std::mutex s_planet_l;
 static std::mutex s_planet_l;
 static std::mutex s_peers_l;
 static std::mutex s_peers_l;
@@ -217,7 +208,6 @@ static std::mutex s_multicastSubscriptions_l;
 static std::mutex s_peersByIdentity_l;
 static std::mutex s_peersByIdentity_l;
 static std::mutex s_peersByVirtAddr_l;
 static std::mutex s_peersByVirtAddr_l;
 static std::mutex s_rendezvousTracking_l;
 static std::mutex s_rendezvousTracking_l;
-static std::mutex s_lastForwardedTo_l;
 
 
 //////////////////////////////////////////////////////////////////////////////
 //////////////////////////////////////////////////////////////////////////////
 //////////////////////////////////////////////////////////////////////////////
 //////////////////////////////////////////////////////////////////////////////
@@ -490,7 +480,6 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
 
 
 										s_outputRate.log(now,pkt.size());
 										s_outputRate.log(now,pkt.size());
 										peer->lastSend = now;
 										peer->lastSend = now;
-										//printf("%s %s gathered %u subscribers to %s/%.8lx on network %.16llx" ZT_EOL_S,ip->toString(ipstr),source.toString(astr),l,mg.mac().toString(tmpstr),(unsigned long)mg.adi(),(unsigned long long)nwid);
 									}
 									}
 								}
 								}
 							}
 							}
@@ -557,14 +546,6 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
 		return;
 		return;
 	}
 	}
 
 
-	{
-		std::lock_guard<std::mutex> l(s_lastForwardedTo_l);
-		ForwardingStats &fs = s_lastForwardedTo[dest];
-		fs.bytes += (uint64_t)pkt.size();
-		fs.ts = now;
-		fs.bps.log(now,pkt.size());
-	}
-
 	if (introduce) {
 	if (introduce) {
 		std::lock_guard<std::mutex> l(s_peersByVirtAddr_l);
 		std::lock_guard<std::mutex> l(s_peersByVirtAddr_l);
 		auto sources = s_peersByVirtAddr.find(source);
 		auto sources = s_peersByVirtAddr.find(source);
@@ -1186,16 +1167,6 @@ int main(int argc,char **argv)
 					else ++lr;
 					else ++lr;
 				}
 				}
 			}
 			}
-
-			// Remove old last forwarded tracking entries
-			{
-				std::lock_guard<std::mutex> l(s_lastForwardedTo_l);
-				for(auto lf=s_lastForwardedTo.begin();lf!=s_lastForwardedTo.end();) {
-					if ((now - lf->second.ts) > ZT_PEER_ACTIVITY_TIMEOUT)
-						s_lastForwardedTo.erase(lf++);
-					else ++lf;
-				}
-			}
 		}
 		}
 
 
 		// Write stats if configured to do so, and periodically refresh planet file (if any)
 		// Write stats if configured to do so, and periodically refresh planet file (if any)
@@ -1247,18 +1218,12 @@ int main(int argc,char **argv)
 						}
 						}
 						OSUtils::ztsnprintf(ver,sizeof(ver),"%d.%d.%d",(*p)->vMajor,(*p)->vMinor,(*p)->vRev);
 						OSUtils::ztsnprintf(ver,sizeof(ver),"%d.%d.%d",(*p)->vMajor,(*p)->vMinor,(*p)->vRev);
 						double forwardingSpeed = 0.0;
 						double forwardingSpeed = 0.0;
-						s_lastForwardedTo_l.lock();
-						auto lft = s_lastForwardedTo.find((*p)->id.address());
-						if (lft != s_lastForwardedTo.end())
-							forwardingSpeed = lft->second.bps.perSecond(now) / 1024.0;
-						s_lastForwardedTo_l.unlock();
-						fprintf(pf,"%.10llx %21s %45s %10.4f %6s %10.4f" ZT_EOL_S,
+						fprintf(pf,"%.10llx %21s %45s %10.4f %6s" ZT_EOL_S,
 							(unsigned long long)(*p)->id.address().toInt(),
 							(unsigned long long)(*p)->id.address().toInt(),
 							ip4,
 							ip4,
 							ip6,
 							ip6,
 							fabs((double)(now - (*p)->lastReceive) / 1000.0),
 							fabs((double)(now - (*p)->lastReceive) / 1000.0),
-							ver,
-							forwardingSpeed);
+							ver);
 					}
 					}
 				}
 				}