Browse Source

Fix some identity verification stuff, performance improvements, build of root now requires libjemalloc.

Adam Ierymenko 4 years ago
parent
commit
5028aca372
2 changed files with 143 additions and 79 deletions
  1. 1 1
      root/CMakeLists.txt
  2. 142 78
      root/root.cpp

+ 1 - 1
root/CMakeLists.txt

@@ -7,7 +7,7 @@ endif(WIN32)
 
 add_executable(${PROJECT_NAME} root.cpp)
 
-target_link_libraries(${PROJECT_NAME} zt_core zt_osdep pthread resolv)
+target_link_libraries(${PROJECT_NAME} zt_core zt_osdep pthread resolv jemalloc)
 
 target_compile_features(${PROJECT_NAME} PUBLIC cxx_std_11)
 

+ 142 - 78
root/root.cpp

@@ -48,7 +48,8 @@
  * they appear with the first alive sibling being used.
  */
 
-#include <Constants.hpp>
+
+#include "../node/Constants.hpp"
 
 #include <stdio.h>
 #include <stdlib.h>
@@ -71,20 +72,22 @@
 #include <netinet/tcp.h>
 #include <netinet/udp.h>
 
-#include <json.hpp>
-#include <httplib.h>
-
-#include <Packet.hpp>
-#include <Utils.hpp>
-#include <Address.hpp>
-#include <Identity.hpp>
-#include <InetAddress.hpp>
-#include <Mutex.hpp>
-#include <SharedPtr.hpp>
-#include <MulticastGroup.hpp>
-#include <CertificateOfMembership.hpp>
-#include <OSUtils.hpp>
-#include <Meter.hpp>
+#include "../ext/json/json.hpp"
+#include "../ext/cpp-httplib/httplib.h"
+
+#include "../node/Packet.hpp"
+#include "../node/Utils.hpp"
+#include "../node/Address.hpp"
+#include "../node/Identity.hpp"
+#include "../node/InetAddress.hpp"
+#include "../node/Mutex.hpp"
+#include "../node/SharedPtr.hpp"
+#include "../node/MulticastGroup.hpp"
+#include "../node/CertificateOfMembership.hpp"
+#include "../node/Meter.hpp"
+
+#include "../osdep/OSUtils.hpp"
+#include "../osdep/BlockingQueue.hpp"
 
 #include <string>
 #include <thread>
@@ -126,7 +129,7 @@ using json = nlohmann::json;
  */
 struct RootPeer
 {
-	ZT_ALWAYS_INLINE RootPeer() : v4s(-1),v6s(-1),lastSend(0),lastReceive(0),lastReceiveV4(0),lastReceiveV6(0),lastEcho(0),lastHello(0),vProto(-1),vMajor(-1),vMinor(-1),vRev(-1) {}
+	ZT_ALWAYS_INLINE RootPeer() : v4s(-1),v6s(-1),lastSend(0),lastReceive(0),lastReceiveV4(0),lastReceiveV6(0),lastEcho(0),lastHello(0),vProto(-1),vMajor(-1),vMinor(-1),vRev(-1),identityValidated(false),identityInvalid(false) {}
 	ZT_ALWAYS_INLINE ~RootPeer() { Utils::burn(key,sizeof(key)); }
 
 	Identity id;            // Identity
@@ -141,6 +144,8 @@ struct RootPeer
 	int64_t lastHello;      // Time of last received HELLO
 	int vProto;             // Protocol version or -1 if unknown
 	int vMajor,vMinor,vRev; // Peer version or -1,-1,-1 if unknown
+	bool identityValidated; // Identity has been fully verified
+	bool identityInvalid;   // Identity validation failed, to be deleted
 
 	AtomicCounter __refCount;
 };
@@ -200,7 +205,8 @@ static Meter s_discardedForwardRate;
 
 // These fields are locked using mutexes below as they're modified during runtime
 static std::string s_planet;
-static std::list< SharedPtr<RootPeer> > s_peers;
+static std::vector< SharedPtr<RootPeer> > s_peers;
+static std::vector< SharedPtr<RootPeer> > s_peersToValidate;
 static std::unordered_map< uint64_t,std::unordered_map< MulticastGroup,std::unordered_map< Address,int64_t,AddressHasher >,MulticastGroupHasher > > s_multicastSubscriptions;
 static std::unordered_map< Identity,SharedPtr<RootPeer>,IdentityHasher > s_peersByIdentity;
 static std::unordered_map< Address,std::set< SharedPtr<RootPeer> >,AddressHasher > s_peersByVirtAddr;
@@ -208,6 +214,7 @@ static std::unordered_map< RendezvousKey,RendezvousStats,RendezvousKey::Hasher >
 
 static std::mutex s_planet_l;
 static std::mutex s_peers_l;
+static std::mutex s_peersToValidate_l;
 static std::mutex s_multicastSubscriptions_l;
 static std::mutex s_peersByIdentity_l;
 static std::mutex s_peersByVirtAddr_l;
@@ -262,12 +269,30 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
 					}
 				}
 				if (peer) {
+					// Peer found with this identity.
 					if (!pkt.dearmor(peer->key)) {
 						printf("%s HELLO rejected: packet authentication failed" ZT_EOL_S,ip->toString(ipstr));
 						return;
 					}
 				} else {
+					// Check to ensure that there is no peer with the same address as this identity. If there is,
+					// verify both identities to pick the one with this address.
+					bool needsValidation = false;
+					bool identityValidated = false;
+					{
+						std::lock_guard<std::mutex> pbv_l(s_peersByVirtAddr_l);
+						needsValidation = s_peersByVirtAddr.find(id.address()) != s_peersByVirtAddr.end();
+					}
+					if (unlikely(needsValidation)) {
+						if (!id.locallyValidate()) {
+							printf("%s HELLO rejected: identity validate failed" ZT_EOL_S,ip->toString(ipstr));
+							return;
+						}
+						identityValidated = true;
+					}
+
 					peer.set(new RootPeer);
+					peer->identityValidated = identityValidated;
 
 					if (!s_self.agree(id,peer->key)) {
 						printf("%s HELLO rejected: key agreement failed" ZT_EOL_S,ip->toString(ipstr));
@@ -289,20 +314,29 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
 					{
 						std::lock_guard<std::mutex> pbi_l(s_peersByIdentity_l);
 						auto existing = s_peersByIdentity.find(id); // make sure another thread didn't do this while we were
-						if (existing == s_peersByIdentity.end()) {
+						if (likely(existing == s_peersByIdentity.end())) {
 							s_peersByIdentity.emplace(id,peer);
 							added = true;
 						} else {
 							peer = existing->second;
 						}
 					}
-					if (added) {
+					if (likely(added)) {
 						{
 							std::lock_guard<std::mutex> pl(s_peers_l);
 							s_peers.emplace_back(peer);
 						}
+						if (!peer->identityValidated) {
+							std::lock_guard<std::mutex> pv(s_peersToValidate_l);
+							s_peersToValidate.emplace_back(peer);
+						}
 						{
 							std::lock_guard<std::mutex> pbv_l(s_peersByVirtAddr_l);
+							std::set< SharedPtr<RootPeer> > &byVirt = s_peersByVirtAddr[id.address()];
+							for(auto i=byVirt.begin();i!=byVirt.end();++i) {
+								if (!(*i)->identityValidated)
+									(*i)->identityInvalid = !(*i)->id.locallyValidate();
+							}
 							s_peersByVirtAddr[id.address()].emplace(peer);
 						}
 					}
@@ -317,13 +351,15 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
 			auto peers = s_peersByVirtAddr.find(source);
 			if (peers != s_peersByVirtAddr.end()) {
 				for(auto p=peers->second.begin();p!=peers->second.end();++p) {
-					if (pkt.dearmor((*p)->key)) {
-						if (!pkt.uncompress()) {
-							printf("%s packet rejected: decompression failed" ZT_EOL_S,ip->toString(ipstr));
-							return;
+					if (!(*p)->identityInvalid) {
+						if (pkt.dearmor((*p)->key)) {
+							if (!pkt.uncompress()) {
+								printf("%s packet rejected: decompression failed" ZT_EOL_S,ip->toString(ipstr));
+								return;
+							}
+							peer = (*p);
+							break;
 						}
-						peer = (*p);
-						break;
 					}
 				}
 			}
@@ -418,8 +454,10 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
 							for(unsigned int ptr=ZT_PACKET_IDX_PAYLOAD;(ptr+ZT_ADDRESS_LENGTH)<=pkt.size();ptr+=ZT_ADDRESS_LENGTH) {
 								auto peers = s_peersByVirtAddr.find(Address(pkt.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH));
 								if (peers != s_peersByVirtAddr.end()) {
-									for(auto p=peers->second.begin();p!=peers->second.end();++p)
-										results.push_back(*p);
+									for(auto p=peers->second.begin();p!=peers->second.end();++p) {
+										if (!(*p)->identityInvalid)
+											results.push_back(*p);
+									}
 								}
 							}
 						}
@@ -550,16 +588,18 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
 		auto peers = s_peersByVirtAddr.find(dest);
 		if (peers != s_peersByVirtAddr.end()) {
 			for(auto p=peers->second.begin();p!=peers->second.end();++p) {
-				if (((*p)->v4s >= 0)&&((*p)->v6s >= 0)) {
-					if ((*p)->lastReceiveV4 > (*p)->lastReceiveV6) {
+				if (!(*p)->identityInvalid) {
+					if (((*p)->v4s >= 0)&&((*p)->v6s >= 0)) {
+						if ((*p)->lastReceiveV4 > (*p)->lastReceiveV6) {
+							toAddrs.emplace_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip4),*p));
+						} else {
+							toAddrs.emplace_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip6),*p));
+						}
+					} else if ((*p)->v4s >= 0) {
 						toAddrs.emplace_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip4),*p));
-					} else {
+					} else if ((*p)->v6s >= 0) {
 						toAddrs.emplace_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip6),*p));
 					}
-				} else if ((*p)->v4s >= 0) {
-					toAddrs.emplace_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip4),*p));
-				} else if ((*p)->v6s >= 0) {
-					toAddrs.emplace_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip6),*p));
 				}
 			}
 		}
@@ -895,6 +935,20 @@ int main(int argc,char **argv)
 
 	s_run = true;
 
+	threads.push_back(std::thread([]() {
+		while (s_run) {
+			std::vector< SharedPtr<RootPeer> > toValidate;
+			{
+				std::lock_guard<std::mutex> l(s_peersToValidate_l);
+				toValidate.swap(s_peersToValidate);
+			}
+			for(auto p=toValidate.begin();p!=toValidate.end();++p) {
+				if (!(*p)->identityValidated)
+					(*p)->identityInvalid = !(*p)->id.locallyValidate();
+			}
+		}
+	}));
+
 	for(auto port=s_ports.begin();port!=s_ports.end();++port) {
 		for(unsigned int tn=0;tn<ncores;++tn) {
 			struct sockaddr_in6 in6;
@@ -1034,41 +1088,43 @@ int main(int argc,char **argv)
 				bool first = true;
 				std::lock_guard<std::mutex> l(s_peers_l);
 				for(auto p=s_peers.begin();p!=s_peers.end();++p) {
-					if (first)
-						first = false;
-					else o << ',';
-					o <<
-					"{\"address\":\"" << (*p)->id.address().toString(tmp) << "\""
-					",\"latency\":-1"
-					",\"paths\":[";
-					if ((*p)->v4s >= 0) {
-						o <<
-						"{\"active\":true"
-						",\"address\":\"" << (*p)->ip4.toIpString(tmp) << "\\/" << (*p)->ip4.port() << "\""
-						",\"expired\":false"
-						",\"lastReceive\":" << (*p)->lastReceive <<
-						",\"lastSend\":" << (*p)->lastSend <<
-						",\"preferred\":true"
-						",\"trustedPathId\":0}";
-					}
-					if ((*p)->v6s >= 0) {
-						if ((*p)->v4s >= 0)
-							o << ',';
+					if (likely(!(*p)->identityInvalid)) {
+						if (first)
+							first = false;
+						else o << ',';
 						o <<
-						"{\"active\":true"
-						",\"address\":\"" << (*p)->ip6.toIpString(tmp) << "\\/" << (*p)->ip6.port() << "\""
-						",\"expired\":false"
-						",\"lastReceive\":" << (*p)->lastReceive <<
-						",\"lastSend\":" << (*p)->lastSend <<
-						",\"preferred\":" << (((*p)->ip4) ? "false" : "true") <<
-						",\"trustedPathId\":0}";
+						"{\"address\":\"" << (*p)->id.address().toString(tmp) << "\""
+						",\"latency\":-1"
+						",\"paths\":[";
+						if ((*p)->v4s >= 0) {
+							o <<
+							"{\"active\":true"
+							",\"address\":\"" << (*p)->ip4.toIpString(tmp) << "\\/" << (*p)->ip4.port() << "\""
+							",\"expired\":false"
+							",\"lastReceive\":" << (*p)->lastReceive <<
+							",\"lastSend\":" << (*p)->lastSend <<
+							",\"preferred\":true"
+							",\"trustedPathId\":0}";
+						}
+						if ((*p)->v6s >= 0) {
+							if ((*p)->v4s >= 0)
+								o << ',';
+							o <<
+							"{\"active\":true"
+							",\"address\":\"" << (*p)->ip6.toIpString(tmp) << "\\/" << (*p)->ip6.port() << "\""
+							",\"expired\":false"
+							",\"lastReceive\":" << (*p)->lastReceive <<
+							",\"lastSend\":" << (*p)->lastSend <<
+							",\"preferred\":" << (((*p)->ip4) ? "false" : "true") <<
+							",\"trustedPathId\":0}";
+						}
+						o << "]"
+						",\"role\":\"LEAF\""
+						",\"version\":\"" << (*p)->vMajor << '.' << (*p)->vMinor << '.' << (*p)->vRev << "\""
+						",\"versionMajor\":" << (*p)->vMajor <<
+						",\"versionMinor\":" << (*p)->vMinor <<
+						",\"versionRev\":" << (*p)->vRev << "}";
 					}
-					o << "]"
-					",\"role\":\"LEAF\""
-					",\"version\":\"" << (*p)->vMajor << '.' << (*p)->vMinor << '.' << (*p)->vRev << "\""
-					",\"versionMajor\":" << (*p)->vMajor <<
-					",\"versionMinor\":" << (*p)->vMinor <<
-					",\"versionRev\":" << (*p)->vRev << "}";
 				}
 			} catch ( ... ) {}
 			o << ']';
@@ -1094,10 +1150,12 @@ int main(int argc,char **argv)
 				{
 					std::lock_guard<std::mutex> l(s_peers_l);
 					for(auto p=s_peers.begin();p!=s_peers.end();++p) {
-						if ((*p)->v4s >= 0)
-							ips[(*p)->ip4].insert((*p)->id.address());
-						if ((*p)->v6s >= 0)
-							ips[(*p)->ip6].insert((*p)->id.address());
+						if (likely(!(*p)->identityInvalid)) {
+							if ((*p)->v4s >= 0)
+								ips[(*p)->ip4].insert((*p)->id.address());
+							if ((*p)->v6s >= 0)
+								ips[(*p)->ip6].insert((*p)->id.address());
+						}
 					}
 				}
 
@@ -1201,18 +1259,24 @@ int main(int argc,char **argv)
 				}
 			}
 
-			// Remove expired peers
+			// Remove expired or otherwise invalid peers
 			try {
 				std::vector< SharedPtr<RootPeer> > toRemove;
 				toRemove.reserve(1024);
 				{
 					std::lock_guard<std::mutex> pbi_l(s_peers_l);
-					for(auto p=s_peers.begin();p!=s_peers.end();) {
-						if ((now - (*p)->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT) {
-							toRemove.emplace_back(*p);
-							s_peers.erase(p++);
-						} else ++p;
+					std::vector< SharedPtr<RootPeer> > newPeers;
+					newPeers.reserve(s_peers.size());
+					for(auto p=s_peers.begin();p!=s_peers.end();++p) {
+						if (((now - (*p)->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT)||((*p)->identityInvalid)) {
+							toRemove.emplace_back();
+							p->swap(toRemove.back());
+						} else {
+							newPeers.emplace_back();
+							p->swap(newPeers.back());
+						}
 					}
+					newPeers.swap(s_peers);
 				}
 				for(auto p=toRemove.begin();p!=toRemove.end();++p) {
 					{
@@ -1316,7 +1380,7 @@ int main(int argc,char **argv)
 				s_peersByIdentity_l.unlock();
 				fprintf(sf,"Peers                      : %llu" ZT_EOL_S,(unsigned long long)peersByIdentitySize);
 				s_peersByVirtAddr_l.lock();
-				fprintf(sf,"Virtual Address Collisions : %llu" ZT_EOL_S,(unsigned long long)(peersByIdentitySize - s_peersByVirtAddr.size()));
+				fprintf(sf,"Virtual Address Collisions : %lld" ZT_EOL_S,(long long)peersByIdentitySize - (long long)s_peersByVirtAddr.size());
 				s_peersByVirtAddr_l.unlock();
 				s_rendezvousTracking_l.lock();
 				uint64_t unsuccessfulp2p = 0;