Adam Ierymenko 6 years ago
parent
commit
e5f1bf81f7
1 changed files with 35 additions and 8 deletions
  1. 35 8
      root/root.cpp

+ 35 - 8
root/root.cpp

@@ -213,13 +213,12 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
 				}	break;
 				}	break;
 
 
 				case Packet::VERB_MULTICAST_LIKE: {
 				case Packet::VERB_MULTICAST_LIKE: {
-					printf("LIKE\n");
 					std::lock_guard<std::mutex> l(multicastSubscriptions_l);
 					std::lock_guard<std::mutex> l(multicastSubscriptions_l);
 					for(unsigned int ptr=ZT_PACKET_IDX_PAYLOAD;(ptr+18)<=pkt.size();ptr+=18) {
 					for(unsigned int ptr=ZT_PACKET_IDX_PAYLOAD;(ptr+18)<=pkt.size();ptr+=18) {
 						const uint64_t nwid = pkt.template at<uint64_t>(ptr);
 						const uint64_t nwid = pkt.template at<uint64_t>(ptr);
 						const MulticastGroup mg(MAC(pkt.field(ptr + 8,6),6),pkt.template at<uint32_t>(ptr + 14));
 						const MulticastGroup mg(MAC(pkt.field(ptr + 8,6),6),pkt.template at<uint32_t>(ptr + 14));
 						multicastSubscriptions[nwid][mg][peer->id.address()] = now;
 						multicastSubscriptions[nwid][mg][peer->id.address()] = now;
-						printf("%s subscribes to %s/%.8lx on network %.16llx" ZT_EOL_S,ip->toString(ipstr),mg.mac().toString(tmpstr),(unsigned long)mg.adi(),(unsigned long long)nwid);
+						//printf("%s subscribes to %s/%.8lx on network %.16llx" ZT_EOL_S,ip->toString(ipstr),mg.mac().toString(tmpstr),(unsigned long)mg.adi(),(unsigned long long)nwid);
 					}
 					}
 				}	break;
 				}	break;
 
 
@@ -270,14 +269,17 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
 
 
 	std::vector<InetAddress> toAddrs;
 	std::vector<InetAddress> toAddrs;
 	{
 	{
+		const int64_t now = OSUtils::now();
 		std::lock_guard<std::mutex> pbv_l(peersByVirtAddr_l);
 		std::lock_guard<std::mutex> pbv_l(peersByVirtAddr_l);
 		auto peers = peersByVirtAddr.find(pkt.destination());
 		auto peers = peersByVirtAddr.find(pkt.destination());
 		if (peers != peersByVirtAddr.end()) {
 		if (peers != peersByVirtAddr.end()) {
 			for(auto p=peers->second.begin();p!=peers->second.end();++p) {
 			for(auto p=peers->second.begin();p!=peers->second.end();++p) {
-				if ((*p)->ip6)
-					toAddrs.push_back((*p)->ip6);
-				else if ((*p)->ip4)
-					toAddrs.push_back((*p)->ip4);
+				if ((now - (*p)->lastReceive) < ZT_PEER_ACTIVITY_TIMEOUT) {
+					if ((*p)->ip6)
+						toAddrs.push_back((*p)->ip6);
+					else if ((*p)->ip4)
+						toAddrs.push_back((*p)->ip4);
+				}
 			}
 			}
 		}
 		}
 	}
 	}
@@ -387,8 +389,9 @@ int main(int argc,char **argv)
 
 
 	run = true;
 	run = true;
 
 
-	std::vector<int> sockets;
 	std::vector<std::thread> threads;
 	std::vector<std::thread> threads;
+
+	std::vector<int> sockets;
 	for(unsigned int tn=0;tn<ncores;++tn) {
 	for(unsigned int tn=0;tn<ncores;++tn) {
 		struct sockaddr_in6 in6;
 		struct sockaddr_in6 in6;
 		memset(&in6,0,sizeof(in6));
 		memset(&in6,0,sizeof(in6));
@@ -454,11 +457,35 @@ int main(int argc,char **argv)
 		}));
 		}));
 	}
 	}
 
 
+	int64_t lastCleanedMulticastSubscriptions = 0;
 	while (run) {
 	while (run) {
 		peersByIdentity_l.lock();
 		peersByIdentity_l.lock();
-		printf("* have %lu peers" ZT_EOL_S,(unsigned long)peersByIdentity.size());
+		peersByPhysAddr_l.lock();
+		printf("*** have %lu peers at %lu physical endpoints" ZT_EOL_S,(unsigned long)peersByIdentity.size(),(unsigned long)peersByPhysAddr.size());
+		peersByPhysAddr_l.unlock();
 		peersByIdentity_l.unlock();
 		peersByIdentity_l.unlock();
 		sleep(1);
 		sleep(1);
+
+		const int64_t now = OSUtils::now();
+		if ((now - lastCleanedMulticastSubscriptions) > 120000) {
+			lastCleanedMulticastSubscriptions = now;
+			std::lock_guard<std::mutex> l(multicastSubscriptions_l);
+			for(auto a=multicastSubscriptions.begin();a!=multicastSubscriptions.end();) {
+				for(auto b=a->second.begin();b!=a->second.end();) {
+					for(auto c=b->second.begin();c!=b->second.end();) {
+						if ((now - c->second) > ZT_MULTICAST_LIKE_EXPIRE)
+							b->second.erase(c++);
+						else ++c;
+					}
+					if (b->second.empty())
+						a->second.erase(b++);
+					else ++b;
+				}
+				if (a->second.empty())
+					multicastSubscriptions.erase(a++);
+				else ++a;
+			}
+		}
 	}
 	}
 
 
 	for(auto s=sockets.begin();s!=sockets.end();++s) {
 	for(auto s=sockets.begin();s!=sockets.end();++s) {