Browse Source

Some root cleanup and more lock contention reduction.

Adam Ierymenko 5 years ago
parent
commit
2deaaeef28
1 changed files with 87 additions and 59 deletions
  1. 87 59
      root/root.cpp

+ 87 - 59
root/root.cpp

@@ -117,12 +117,13 @@ using json = nlohmann::json;
 /**
 /**
  * RootPeer is a normal peer known to this root
  * RootPeer is a normal peer known to this root
  *
  *
- * This can also be a sibling root, which is itself a peer. Sibling roots
- * are sent HELLO while for other peers we only listen for HELLO.
+ * This struct must remain memcpy-able. Identity, InetAddress, and
+ * AtomicCounter all satisfy this. Take care when adding fields that
+ * this remains true.
  */
  */
 struct RootPeer
 struct RootPeer
 {
 {
-	ZT_ALWAYS_INLINE RootPeer() : lastSend(0),lastReceive(0),lastSync(0),lastEcho(0),lastHello(0),vProto(-1),vMajor(-1),vMinor(-1),vRev(-1) {}
+	ZT_ALWAYS_INLINE RootPeer() : lastSend(0),lastReceive(0),lastEcho(0),lastHello(0),vProto(-1),vMajor(-1),vMinor(-1),vRev(-1) {}
 	ZT_ALWAYS_INLINE ~RootPeer() { Utils::burn(key,sizeof(key)); }
 	ZT_ALWAYS_INLINE ~RootPeer() { Utils::burn(key,sizeof(key)); }
 
 
 	Identity id;            // Identity
 	Identity id;            // Identity
@@ -130,12 +131,10 @@ struct RootPeer
 	InetAddress ip4,ip6;    // IPv4 and IPv6 addresses
 	InetAddress ip4,ip6;    // IPv4 and IPv6 addresses
 	int64_t lastSend;       // Time of last send (any packet)
 	int64_t lastSend;       // Time of last send (any packet)
 	int64_t lastReceive;    // Time of last receive (any packet)
 	int64_t lastReceive;    // Time of last receive (any packet)
-	int64_t lastSync;       // Time of last data synchronization with LF or other root state backend (currently unused)
 	int64_t lastEcho;       // Time of last received ECHO
 	int64_t lastEcho;       // Time of last received ECHO
 	int64_t lastHello;      // Time of last received HELLO
 	int64_t lastHello;      // Time of last received HELLO
-	int vProto;             // Protocol version
+	int vProto;             // Protocol version or -1 if unknown
 	int vMajor,vMinor,vRev; // Peer version or -1,-1,-1 if unknown
 	int vMajor,vMinor,vRev; // Peer version or -1,-1,-1 if unknown
-	std::mutex lock;
 
 
 	AtomicCounter __refCount;
 	AtomicCounter __refCount;
 };
 };
@@ -180,7 +179,7 @@ struct ForwardingStats
 	Meter bps;
 	Meter bps;
 };
 };
 
 
-// These fields are not locked as they're only initialized on startup
+// These fields are not locked as they're only initialized on startup or are atomic
 static int64_t s_startTime;            // Time service was started
 static int64_t s_startTime;            // Time service was started
 static std::vector<int> s_ports;       // Ports to bind for UDP traffic
 static std::vector<int> s_ports;       // Ports to bind for UDP traffic
 static int s_relayMaxHops = 0;         // Max relay hops
 static int s_relayMaxHops = 0;         // Max relay hops
@@ -195,7 +194,7 @@ static std::string s_googleMapsAPIKey; // Google maps API key for GeoIP /map fea
 static std::map< std::pair< uint32_t,uint32_t >,std::pair< float,float > > s_geoIp4;
 static std::map< std::pair< uint32_t,uint32_t >,std::pair< float,float > > s_geoIp4;
 static std::map< std::pair< std::array< uint64_t,2 >,std::array< uint64_t,2 > >,std::pair< float,float > > s_geoIp6;
 static std::map< std::pair< std::array< uint64_t,2 >,std::array< uint64_t,2 > >,std::pair< float,float > > s_geoIp6;
 
 
-// Rate meters for statistical purposes
+// Rate meters for statistical purposes (locks are internal to Meter)
 static Meter s_inputRate;
 static Meter s_inputRate;
 static Meter s_outputRate;
 static Meter s_outputRate;
 static Meter s_forwardRate;
 static Meter s_forwardRate;
@@ -221,12 +220,14 @@ static std::mutex s_lastForwardedTo_l;
 //////////////////////////////////////////////////////////////////////////////
 //////////////////////////////////////////////////////////////////////////////
 //////////////////////////////////////////////////////////////////////////////
 //////////////////////////////////////////////////////////////////////////////
 
 
-static uint32_t ip4ToH32(const InetAddress &ip)
+// Construct GeoIP key for IPv4 IPs
+static ZT_ALWAYS_INLINE uint32_t ip4ToH32(const InetAddress &ip)
 {
 {
 	return Utils::ntoh((uint32_t)(((const struct sockaddr_in *)&ip)->sin_addr.s_addr));
 	return Utils::ntoh((uint32_t)(((const struct sockaddr_in *)&ip)->sin_addr.s_addr));
 }
 }
 
 
-static std::array< uint64_t,2 > ip6ToH128(const InetAddress &ip)
+// Construct GeoIP key for IPv6 IPs
+static ZT_ALWAYS_INLINE std::array< uint64_t,2 > ip6ToH128(const InetAddress &ip)
 {
 {
 	std::array<uint64_t,2> i128;
 	std::array<uint64_t,2> i128;
 	memcpy(i128.data(),ip.rawIpData(),16);
 	memcpy(i128.data(),ip.rawIpData(),16);
@@ -268,29 +269,47 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
 					}
 					}
 				} else {
 				} else {
 					peer.set(new RootPeer);
 					peer.set(new RootPeer);
-					if (s_self.agree(id,peer->key)) {
-						if (pkt.dearmor(peer->key)) {
-							if (!pkt.uncompress()) {
-								printf("%s HELLO rejected: decompression failed" ZT_EOL_S,ip->toString(ipstr));
-								return;
-							}
-							peer->id = id;
-							peer->lastReceive = now;
+
+					if (!s_self.agree(id,peer->key)) {
+						printf("%s HELLO rejected: key agreement failed" ZT_EOL_S,ip->toString(ipstr));
+						return;
+					}
+					if (!pkt.dearmor(peer->key)) {
+						printf("%s HELLO rejected: packet authentication failed" ZT_EOL_S,ip->toString(ipstr));
+						return;
+					}
+					if (!pkt.uncompress()) {
+						printf("%s HELLO rejected: decompression failed" ZT_EOL_S,ip->toString(ipstr));
+						return;
+					}
+					if (!id.locallyValidate()) { // this is more time consuming so check others first
+						printf("%s HELLO rejected: identity local validation failed" ZT_EOL_S,ip->toString(ipstr));
+						return;
+					}
+
+					peer->id = id;
+					peer->lastReceive = now;
+
+					bool added = false;
+					{
+						std::lock_guard<std::mutex> pbi_l(s_peersByIdentity_l);
+						auto existing = s_peersByIdentity.find(id); // make sure another thread didn't do this while we were
+						if (existing == s_peersByIdentity.end()) {
+							s_peersByIdentity.emplace(id,peer);
+							added = true;
+						} else {
+							peer = existing->second;
+						}
+					}
+					if (added) {
+						{
 							std::lock_guard<std::mutex> pl(s_peers_l);
 							std::lock_guard<std::mutex> pl(s_peers_l);
-							std::lock_guard<std::mutex> pbi_l(s_peersByIdentity_l);
+							s_peers.emplace_back(peer);
+						}
+						{
 							std::lock_guard<std::mutex> pbv_l(s_peersByVirtAddr_l);
 							std::lock_guard<std::mutex> pbv_l(s_peersByVirtAddr_l);
-							if (s_peersByIdentity.find(id) == s_peersByIdentity.end()) { // double check to ensure another thread didn't add this
-								s_peers.emplace_back(peer);
-								s_peersByIdentity.emplace(id,peer);
-								s_peersByVirtAddr[id.address()].emplace(peer);
-							}
-						} else {
-							printf("%s HELLO rejected: packet authentication failed" ZT_EOL_S,ip->toString(ipstr));
-							return;
+							s_peersByVirtAddr[id.address()].emplace(peer);
 						}
 						}
-					} else {
-						printf("%s HELLO rejected: key agreement failed" ZT_EOL_S,ip->toString(ipstr));
-						return;
 					}
 					}
 				}
 				}
 			}
 			}
@@ -309,7 +328,6 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
 							return;
 							return;
 						}
 						}
 						peer = (*p);
 						peer = (*p);
-						//printf("%s has %s (known (2))" ZT_EOL_S,ip->toString(ipstr),source().toString(astr));
 						break;
 						break;
 					}
 					}
 				}
 				}
@@ -319,8 +337,6 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
 		// If we found the peer, update IP and/or time and handle certain key packet types that the
 		// If we found the peer, update IP and/or time and handle certain key packet types that the
 		// root must concern itself with.
 		// root must concern itself with.
 		if (peer) {
 		if (peer) {
-			std::lock_guard<std::mutex> pl(peer->lock);
-
 			if (ip->isV4())
 			if (ip->isV4())
 				peer->ip4 = ip;
 				peer->ip4 = ip;
 			else if (ip->isV6())
 			else if (ip->isV6())
@@ -531,9 +547,9 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
 		if (peers != s_peersByVirtAddr.end()) {
 		if (peers != s_peersByVirtAddr.end()) {
 			for(auto p=peers->second.begin();p!=peers->second.end();++p) {
 			for(auto p=peers->second.begin();p!=peers->second.end();++p) {
 				if ((*p)->ip4) {
 				if ((*p)->ip4) {
-					toAddrs.push_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip4),*p));
+					toAddrs.emplace_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip4),*p));
 				} else if ((*p)->ip6) {
 				} else if ((*p)->ip6) {
-					toAddrs.push_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip6),*p));
+					toAddrs.emplace_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip6),*p));
 				}
 				}
 			}
 			}
 		}
 		}
@@ -905,9 +921,15 @@ int main(int argc,char **argv)
 							try {
 							try {
 								pkt.setSize((unsigned int)pl);
 								pkt.setSize((unsigned int)pl);
 								handlePacket(s4,s6,reinterpret_cast<const InetAddress *>(&in6),pkt);
 								handlePacket(s4,s6,reinterpret_cast<const InetAddress *>(&in6),pkt);
+							} catch (std::exception &exc) {
+								char ipstr[128];
+								printf("WARNING: unexpected exception handling packet from %s: %s" ZT_EOL_S,reinterpret_cast<const InetAddress *>(&in6)->toString(ipstr),exc.what());
+							} catch (int exc) {
+								char ipstr[128];
+								printf("WARNING: unexpected exception handling packet from %s: ZT exception code %d" ZT_EOL_S,reinterpret_cast<const InetAddress *>(&in6)->toString(ipstr),exc);
 							} catch ( ... ) {
 							} catch ( ... ) {
 								char ipstr[128];
 								char ipstr[128];
-								printf("* unexpected exception handling packet from %s" ZT_EOL_S,reinterpret_cast<const InetAddress *>(&in6)->toString(ipstr));
+								printf("WARNING: unexpected exception handling packet from %s: unknown exception" ZT_EOL_S,reinterpret_cast<const InetAddress *>(&in6)->toString(ipstr));
 							}
 							}
 						}
 						}
 					} else {
 					} else {
@@ -928,9 +950,15 @@ int main(int argc,char **argv)
 							try {
 							try {
 								pkt.setSize((unsigned int)pl);
 								pkt.setSize((unsigned int)pl);
 								handlePacket(s4,s6,reinterpret_cast<const InetAddress *>(&in4),pkt);
 								handlePacket(s4,s6,reinterpret_cast<const InetAddress *>(&in4),pkt);
+							} catch (std::exception &exc) {
+								char ipstr[128];
+								printf("WARNING: unexpected exception handling packet from %s: %s" ZT_EOL_S,reinterpret_cast<const InetAddress *>(&in4)->toString(ipstr),exc.what());
+							} catch (int exc) {
+								char ipstr[128];
+								printf("WARNING: unexpected exception handling packet from %s: ZT exception code %d" ZT_EOL_S,reinterpret_cast<const InetAddress *>(&in4)->toString(ipstr),exc);
 							} catch ( ... ) {
 							} catch ( ... ) {
 								char ipstr[128];
 								char ipstr[128];
-								printf("* unexpected exception handling packet from %s" ZT_EOL_S,reinterpret_cast<const InetAddress *>(&in4)->toString(ipstr));
+								printf("WARNING: unexpected exception handling packet from %s: unknown exception" ZT_EOL_S,reinterpret_cast<const InetAddress *>(&in4)->toString(ipstr));
 							}
 							}
 						}
 						}
 					} else {
 					} else {
@@ -941,23 +969,26 @@ int main(int argc,char **argv)
 		}
 		}
 	}
 	}
 
 
-	// Minimal local API for use with monitoring clients, etc.
+	// A minimal read-only local API for monitoring and status queries
 	httplib::Server apiServ;
 	httplib::Server apiServ;
 	threads.push_back(std::thread([&apiServ,httpPort]() {
 	threads.push_back(std::thread([&apiServ,httpPort]() {
+		// Human readable status page
 		apiServ.Get("/",[](const httplib::Request &req,httplib::Response &res) {
 		apiServ.Get("/",[](const httplib::Request &req,httplib::Response &res) {
 			std::ostringstream o;
 			std::ostringstream o;
-			std::lock_guard<std::mutex> l0(s_peersByIdentity_l);
 			o << "ZeroTier Root Server " << ZEROTIER_ONE_VERSION_MAJOR << '.' << ZEROTIER_ONE_VERSION_MINOR << '.' << ZEROTIER_ONE_VERSION_REVISION << ZT_EOL_S;
 			o << "ZeroTier Root Server " << ZEROTIER_ONE_VERSION_MAJOR << '.' << ZEROTIER_ONE_VERSION_MINOR << '.' << ZEROTIER_ONE_VERSION_REVISION << ZT_EOL_S;
 			o << "(c)2019 ZeroTier, Inc." ZT_EOL_S "Licensed under the ZeroTier BSL 1.1" ZT_EOL_S ZT_EOL_S;
 			o << "(c)2019 ZeroTier, Inc." ZT_EOL_S "Licensed under the ZeroTier BSL 1.1" ZT_EOL_S ZT_EOL_S;
+			s_peersByIdentity_l.lock();
 			o << "Peers Online:       " << s_peersByIdentity.size() << ZT_EOL_S;
 			o << "Peers Online:       " << s_peersByIdentity.size() << ZT_EOL_S;
+			s_peersByIdentity_l.unlock();
 			res.set_content(o.str(),"text/plain");
 			res.set_content(o.str(),"text/plain");
 		});
 		});
 
 
+		// Peer list for compatibility with software that monitors regular nodes
 		apiServ.Get("/peer",[](const httplib::Request &req,httplib::Response &res) {
 		apiServ.Get("/peer",[](const httplib::Request &req,httplib::Response &res) {
 			char tmp[256];
 			char tmp[256];
 			std::ostringstream o;
 			std::ostringstream o;
 			o << '[';
 			o << '[';
-			{
+			try {
 				bool first = true;
 				bool first = true;
 				std::lock_guard<std::mutex> l(s_peersByIdentity_l);
 				std::lock_guard<std::mutex> l(s_peersByIdentity_l);
 				for(auto p=s_peersByIdentity.begin();p!=s_peersByIdentity.end();++p) {
 				for(auto p=s_peersByIdentity.begin();p!=s_peersByIdentity.end();++p) {
@@ -997,11 +1028,12 @@ int main(int argc,char **argv)
 					",\"versionMinor\":" << p->second->vMinor <<
 					",\"versionMinor\":" << p->second->vMinor <<
 					",\"versionRev\":" << p->second->vRev << "}";
 					",\"versionRev\":" << p->second->vRev << "}";
 				}
 				}
-			}
+			} catch ( ... ) {}
 			o << ']';
 			o << ']';
 			res.set_content(o.str(),"application/json");
 			res.set_content(o.str(),"application/json");
 		});
 		});
 
 
+		// GeoIP map if enabled
 		apiServ.Get("/map",[](const httplib::Request &req,httplib::Response &res) {
 		apiServ.Get("/map",[](const httplib::Request &req,httplib::Response &res) {
 			char tmp[4096];
 			char tmp[4096];
 			if (!s_geoInit) {
 			if (!s_geoInit) {
@@ -1090,16 +1122,11 @@ int main(int argc,char **argv)
 	int64_t lastCleaned = 0;
 	int64_t lastCleaned = 0;
 	int64_t lastWroteStats = 0;
 	int64_t lastWroteStats = 0;
 	while (s_run) {
 	while (s_run) {
-		//s_peersByIdentity_l.lock();
-		//s_peersByPhysAddr_l.lock();
-		//printf("*** have %lu peers at %lu physical endpoints" ZT_EOL_S,(unsigned long)s_peersByIdentity.size(),(unsigned long)s_peersByPhysAddr.size());
-		//s_peersByPhysAddr_l.unlock();
-		//s_peersByIdentity_l.unlock();
 		sleep(1);
 		sleep(1);
 
 
 		const int64_t now = OSUtils::now();
 		const int64_t now = OSUtils::now();
 
 
-		if ((now - lastCleaned) > 120000) {
+		if ((now - lastCleaned) > 300000) {
 			lastCleaned = now;
 			lastCleaned = now;
 
 
 			// Old multicast subscription cleanup
 			// Old multicast subscription cleanup
@@ -1127,24 +1154,25 @@ int main(int argc,char **argv)
 				std::lock_guard<std::mutex> pbi_l(s_peers_l);
 				std::lock_guard<std::mutex> pbi_l(s_peers_l);
 				for(auto p=s_peers.begin();p!=s_peers.end();) {
 				for(auto p=s_peers.begin();p!=s_peers.end();) {
 					if ((now - (*p)->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT) {
 					if ((now - (*p)->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT) {
-						std::lock_guard<std::mutex> pbi_l(s_peersByIdentity_l);
-						std::lock_guard<std::mutex> pbv_l(s_peersByVirtAddr_l);
-
-						s_peersByIdentity.erase((*p)->id);
-
-						auto pbv = s_peersByVirtAddr.find((*p)->id.address());
-						if (pbv != s_peersByVirtAddr.end()) {
-							pbv->second.erase((*p));
-							if (pbv->second.empty())
-								s_peersByVirtAddr.erase(pbv);
+						{
+							std::lock_guard<std::mutex> pbi_l(s_peersByIdentity_l);
+							s_peersByIdentity.erase((*p)->id);
+						}
+						{
+							std::lock_guard<std::mutex> pbv_l(s_peersByVirtAddr_l);
+							auto pbv = s_peersByVirtAddr.find((*p)->id.address());
+							if (pbv != s_peersByVirtAddr.end()) {
+								pbv->second.erase(*p);
+								if (pbv->second.empty())
+									s_peersByVirtAddr.erase(pbv);
+							}
 						}
 						}
-
 						s_peers.erase(p++);
 						s_peers.erase(p++);
 					} else ++p;
 					} else ++p;
 				}
 				}
 			}
 			}
 
 
-			// Remove old rendezvous and last forwarded tracking entries
+			// Remove old rendezvous entries
 			{
 			{
 				std::lock_guard<std::mutex> l(s_rendezvousTracking_l);
 				std::lock_guard<std::mutex> l(s_rendezvousTracking_l);
 				for(auto lr=s_rendezvousTracking.begin();lr!=s_rendezvousTracking.end();) {
 				for(auto lr=s_rendezvousTracking.begin();lr!=s_rendezvousTracking.end();) {