Browse Source

high throughput root is working!

Adam Ierymenko 6 years ago
parent
commit
86762d2b40
1 changed files with 43 additions and 6 deletions
  1. 43 6
      root/root.cpp

+ 43 - 6
root/root.cpp

@@ -80,6 +80,7 @@ struct PeerInfo
 	uint8_t key[32];
 	uint8_t key[32];
 	InetAddress ip4,ip6;
 	InetAddress ip4,ip6;
 	int64_t lastReceive;
 	int64_t lastReceive;
+	int64_t lastSync;
 
 
 	AtomicCounter __refCount;
 	AtomicCounter __refCount;
 
 
@@ -89,13 +90,11 @@ struct PeerInfo
 static Identity self;
 static Identity self;
 static std::atomic_bool run;
 static std::atomic_bool run;
 
 
-static std::vector< SharedPtr<PeerInfo> > newPeers;
 static std::unordered_map< uint64_t,std::unordered_map< MulticastGroup,std::unordered_map< Address,int64_t,AddressHasher >,MulticastGroupHasher > > multicastSubscriptions;
 static std::unordered_map< uint64_t,std::unordered_map< MulticastGroup,std::unordered_map< Address,int64_t,AddressHasher >,MulticastGroupHasher > > multicastSubscriptions;
 static std::unordered_map< Identity,SharedPtr<PeerInfo>,IdentityHasher > peersByIdentity;
 static std::unordered_map< Identity,SharedPtr<PeerInfo>,IdentityHasher > peersByIdentity;
 static std::unordered_map< Address,std::set< SharedPtr<PeerInfo> >,AddressHasher > peersByVirtAddr;
 static std::unordered_map< Address,std::set< SharedPtr<PeerInfo> >,AddressHasher > peersByVirtAddr;
 static std::unordered_map< InetAddress,std::set< SharedPtr<PeerInfo> >,InetAddressHasher > peersByPhysAddr;
 static std::unordered_map< InetAddress,std::set< SharedPtr<PeerInfo> >,InetAddressHasher > peersByPhysAddr;
 
 
-static std::mutex newPeers_l;
 static std::mutex multicastSubscriptions_l;
 static std::mutex multicastSubscriptions_l;
 static std::mutex peersByIdentity_l;
 static std::mutex peersByIdentity_l;
 static std::mutex peersByVirtAddr_l;
 static std::mutex peersByVirtAddr_l;
@@ -134,10 +133,7 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
 					if (self.agree(id,peer->key)) {
 					if (self.agree(id,peer->key)) {
 						if (pkt.dearmor(peer->key)) {
 						if (pkt.dearmor(peer->key)) {
 							peer->id = id;
 							peer->id = id;
-							{
-								std::lock_guard<std::mutex> np_l(newPeers_l);
-								newPeers.push_back(peer);
-							}
+							peer->lastSync = 0;
 							{
 							{
 								std::lock_guard<std::mutex> pbi_l(peersByIdentity_l);
 								std::lock_guard<std::mutex> pbi_l(peersByIdentity_l);
 								peersByIdentity.emplace(id,peer);
 								peersByIdentity.emplace(id,peer);
@@ -476,6 +472,7 @@ int main(int argc,char **argv)
 	}
 	}
 
 
 	int64_t lastCleanedMulticastSubscriptions = 0;
 	int64_t lastCleanedMulticastSubscriptions = 0;
+	int64_t lastCleanedPeers = 0;
 	while (run) {
 	while (run) {
 		peersByIdentity_l.lock();
 		peersByIdentity_l.lock();
 		peersByPhysAddr_l.lock();
 		peersByPhysAddr_l.lock();
@@ -485,8 +482,10 @@ int main(int argc,char **argv)
 		sleep(1);
 		sleep(1);
 
 
 		const int64_t now = OSUtils::now();
 		const int64_t now = OSUtils::now();
+
 		if ((now - lastCleanedMulticastSubscriptions) > 120000) {
 		if ((now - lastCleanedMulticastSubscriptions) > 120000) {
 			lastCleanedMulticastSubscriptions = now;
 			lastCleanedMulticastSubscriptions = now;
+
 			std::lock_guard<std::mutex> l(multicastSubscriptions_l);
 			std::lock_guard<std::mutex> l(multicastSubscriptions_l);
 			for(auto a=multicastSubscriptions.begin();a!=multicastSubscriptions.end();) {
 			for(auto a=multicastSubscriptions.begin();a!=multicastSubscriptions.end();) {
 				for(auto b=a->second.begin();b!=a->second.end();) {
 				for(auto b=a->second.begin();b!=a->second.end();) {
@@ -504,6 +503,44 @@ int main(int argc,char **argv)
 				else ++a;
 				else ++a;
 			}
 			}
 		}
 		}
+
+		if ((now - lastCleanedPeers) > 120000) {
+			lastCleanedPeers = now;
+
+			std::lock_guard<std::mutex> pbi_l(peersByIdentity_l);
+			for(auto p=peersByIdentity.begin();p!=peersByIdentity.end();) {
+				if ((now - p->second->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT) {
+					std::lock_guard<std::mutex> pbv_l(peersByVirtAddr_l);
+					std::lock_guard<std::mutex> pbp_l(peersByPhysAddr_l);
+
+					auto pbv = peersByVirtAddr.find(p->second->id.address());
+					if (pbv != peersByVirtAddr.end()) {
+						pbv->second.erase(p->second);
+						if (pbv->second.empty())
+							peersByVirtAddr.erase(pbv);
+					}
+
+					if (p->second->ip4) {
+						auto pbp = peersByPhysAddr.find(p->second->ip4);
+						if (pbp != peersByPhysAddr.end()) {
+							pbp->second.erase(p->second);
+							if (pbp->second.empty())
+								peersByPhysAddr.erase(pbp);
+						}
+					}
+					if (p->second->ip6) {
+						auto pbp = peersByPhysAddr.find(p->second->ip6);
+						if (pbp != peersByPhysAddr.end()) {
+							pbp->second.erase(p->second);
+							if (pbp->second.empty())
+								peersByPhysAddr.erase(pbp);
+						}
+					}
+
+					peersByIdentity.erase(p++);
+				} else ++p;
+			}
+		}
 	}
 	}
 
 
 	for(auto s=sockets.begin();s!=sockets.end();++s) {
 	for(auto s=sockets.begin();s!=sockets.end();++s) {