Ver Fonte

Starting in on Switch... kill map in defrag queue, which will probably improve performance pretty decently under high load with lots of peers.

Adam Ierymenko há 10 anos atrás
pai
commit
0ab3e49be9
3 ficheiros alterados com 35 adições e 27 exclusões
  1. 6 3
      node/Hashtable.hpp
  2. 26 23
      node/Switch.cpp
  3. 3 1
      node/Switch.hpp

+ 6 - 3
node/Hashtable.hpp

@@ -372,9 +372,12 @@ private:
 	}
 	}
 	static inline unsigned long _hc(const uint64_t i)
 	static inline unsigned long _hc(const uint64_t i)
 	{
 	{
-		// NOTE: this is fine for network IDs, but might be bad for other kinds
-		// of IDs if they are not evenly or randomly distributed.
-		return (unsigned long)((i ^ (i >> 32)) * 2654435761ULL);
+		/* NOTE: this assumes that 'i' is evenly distributed, which is the case for
+		 * packet IDs and network IDs -- the two use cases in ZT for uint64_t keys.
+		 * These values are also greater than 0xffffffff so they'll map onto a full
+		 * bucket count just fine no matter what happens. Normally you'd want to
+		 * hash an integer key index in a hash table. */
+		return (unsigned long)i;
 	}
 	}
 
 
 	inline void _grow()
 	inline void _grow()

+ 26 - 23
node/Switch.cpp

@@ -531,11 +531,14 @@ unsigned long Switch::doTimerTasks(uint64_t now)
 
 
 	{	// Time out packets that didn't get all their fragments.
 	{	// Time out packets that didn't get all their fragments.
 		Mutex::Lock _l(_defragQueue_m);
 		Mutex::Lock _l(_defragQueue_m);
-		for(std::map< uint64_t,DefragQueueEntry >::iterator i(_defragQueue.begin());i!=_defragQueue.end();) {
-			if ((now - i->second.creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) {
+		Hashtable< uint64_t,DefragQueueEntry >::Iterator i(_defragQueue);
+		uint64_t *packetId = (uint64_t *)0;
+		DefragQueueEntry *qe = (DefragQueueEntry *)0;
+		while (i.next(packetId,qe)) {
+			if ((now - qe->creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) {
 				TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",i->first);
 				TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",i->first);
-				_defragQueue.erase(i++);
-			} else ++i;
+				_defragQueue.erase(*packetId);
+			}
 		}
 		}
 	}
 	}
 
 
@@ -577,32 +580,31 @@ void Switch::_handleRemotePacketFragment(const InetAddress &fromAddr,const void
 			// seeing a Packet::Fragment?
 			// seeing a Packet::Fragment?
 
 
 			Mutex::Lock _l(_defragQueue_m);
 			Mutex::Lock _l(_defragQueue_m);
-			std::map< uint64_t,DefragQueueEntry >::iterator dqe(_defragQueue.find(pid));
+			DefragQueueEntry &dq = _defragQueue[pid];
 
 
-			if (dqe == _defragQueue.end()) {
+			if (!dq.creationTime) {
 				// We received a Packet::Fragment without its head, so queue it and wait
 				// We received a Packet::Fragment without its head, so queue it and wait
 
 
-				DefragQueueEntry &dq = _defragQueue[pid];
 				dq.creationTime = RR->node->now();
 				dq.creationTime = RR->node->now();
 				dq.frags[fno - 1] = fragment;
 				dq.frags[fno - 1] = fragment;
 				dq.totalFragments = tf; // total fragment count is known
 				dq.totalFragments = tf; // total fragment count is known
 				dq.haveFragments = 1 << fno; // we have only this fragment
 				dq.haveFragments = 1 << fno; // we have only this fragment
 				//TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str());
 				//TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str());
-			} else if (!(dqe->second.haveFragments & (1 << fno))) {
+			} else if (!(dq.haveFragments & (1 << fno))) {
 				// We have other fragments and maybe the head, so add this one and check
 				// We have other fragments and maybe the head, so add this one and check
 
 
-				dqe->second.frags[fno - 1] = fragment;
-				dqe->second.totalFragments = tf;
+				dq.frags[fno - 1] = fragment;
+				dq.totalFragments = tf;
 				//TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str());
 				//TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str());
 
 
-				if (Utils::countBits(dqe->second.haveFragments |= (1 << fno)) == tf) {
+				if (Utils::countBits(dq.haveFragments |= (1 << fno)) == tf) {
 					// We have all fragments -- assemble and process full Packet
 					// We have all fragments -- assemble and process full Packet
 					//TRACE("packet %.16llx is complete, assembling and processing...",pid);
 					//TRACE("packet %.16llx is complete, assembling and processing...",pid);
 
 
-					SharedPtr<IncomingPacket> packet(dqe->second.frag0);
+					SharedPtr<IncomingPacket> packet(dq.frag0);
 					for(unsigned int f=1;f<tf;++f)
 					for(unsigned int f=1;f<tf;++f)
-						packet->append(dqe->second.frags[f - 1].payload(),dqe->second.frags[f - 1].payloadLength());
-					_defragQueue.erase(dqe);
+						packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength());
+					_defragQueue.erase(pid); // dq no longer valid after this
 
 
 					if (!packet->tryDecode(RR)) {
 					if (!packet->tryDecode(RR)) {
 						Mutex::Lock _l(_rxQueue_m);
 						Mutex::Lock _l(_rxQueue_m);
@@ -645,26 +647,27 @@ void Switch::_handleRemotePacketHead(const InetAddress &fromAddr,const void *dat
 
 
 		uint64_t pid = packet->packetId();
 		uint64_t pid = packet->packetId();
 		Mutex::Lock _l(_defragQueue_m);
 		Mutex::Lock _l(_defragQueue_m);
-		std::map< uint64_t,DefragQueueEntry >::iterator dqe(_defragQueue.find(pid));
+		DefragQueueEntry &dq = _defragQueue[pid];
 
 
-		if (dqe == _defragQueue.end()) {
+		if (!dq.creationTime) {
 			// If we have no other fragments yet, create an entry and save the head
 			// If we have no other fragments yet, create an entry and save the head
-			DefragQueueEntry &dq = _defragQueue[pid];
+
 			dq.creationTime = RR->node->now();
 			dq.creationTime = RR->node->now();
 			dq.frag0 = packet;
 			dq.frag0 = packet;
 			dq.totalFragments = 0; // 0 == unknown, waiting for Packet::Fragment
 			dq.totalFragments = 0; // 0 == unknown, waiting for Packet::Fragment
 			dq.haveFragments = 1; // head is first bit (left to right)
 			dq.haveFragments = 1; // head is first bit (left to right)
 			//TRACE("fragment (0/?) of %.16llx from %s",pid,fromAddr.toString().c_str());
 			//TRACE("fragment (0/?) of %.16llx from %s",pid,fromAddr.toString().c_str());
-		} else if (!(dqe->second.haveFragments & 1)) {
+		} else if (!(dq.haveFragments & 1)) {
 			// If we have other fragments but no head, see if we are complete with the head
 			// If we have other fragments but no head, see if we are complete with the head
-			if ((dqe->second.totalFragments)&&(Utils::countBits(dqe->second.haveFragments |= 1) == dqe->second.totalFragments)) {
+
+			if ((dq.totalFragments)&&(Utils::countBits(dq.haveFragments |= 1) == dq.totalFragments)) {
 				// We have all fragments -- assemble and process full Packet
 				// We have all fragments -- assemble and process full Packet
 
 
 				//TRACE("packet %.16llx is complete, assembling and processing...",pid);
 				//TRACE("packet %.16llx is complete, assembling and processing...",pid);
 				// packet already contains head, so append fragments
 				// packet already contains head, so append fragments
-				for(unsigned int f=1;f<dqe->second.totalFragments;++f)
-					packet->append(dqe->second.frags[f - 1].payload(),dqe->second.frags[f - 1].payloadLength());
-				_defragQueue.erase(dqe);
+				for(unsigned int f=1;f<dq.totalFragments;++f)
+					packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength());
+				_defragQueue.erase(pid); // dq no longer valid after this
 
 
 				if (!packet->tryDecode(RR)) {
 				if (!packet->tryDecode(RR)) {
 					Mutex::Lock _l(_rxQueue_m);
 					Mutex::Lock _l(_rxQueue_m);
@@ -672,7 +675,7 @@ void Switch::_handleRemotePacketHead(const InetAddress &fromAddr,const void *dat
 				}
 				}
 			} else {
 			} else {
 				// Still waiting on more fragments, so queue the head
 				// Still waiting on more fragments, so queue the head
-				dqe->second.frag0 = packet;
+				dq.frag0 = packet;
 			}
 			}
 		} // else this is a duplicate head, ignore
 		} // else this is a duplicate head, ignore
 	} else {
 	} else {

+ 3 - 1
node/Switch.hpp

@@ -45,6 +45,7 @@
 #include "Network.hpp"
 #include "Network.hpp"
 #include "SharedPtr.hpp"
 #include "SharedPtr.hpp"
 #include "IncomingPacket.hpp"
 #include "IncomingPacket.hpp"
+#include "Hashtable.hpp"
 
 
 /* Ethernet frame types that might be relevant to us */
 /* Ethernet frame types that might be relevant to us */
 #define ZT_ETHERTYPE_IPV4 0x0800
 #define ZT_ETHERTYPE_IPV4 0x0800
@@ -199,13 +200,14 @@ private:
 	// Packet defragmentation queue -- comes before RX queue in path
 	// Packet defragmentation queue -- comes before RX queue in path
 	struct DefragQueueEntry
 	struct DefragQueueEntry
 	{
 	{
+		DefragQueueEntry() : creationTime(0),totalFragments(0),haveFragments(0) {}
 		uint64_t creationTime;
 		uint64_t creationTime;
 		SharedPtr<IncomingPacket> frag0;
 		SharedPtr<IncomingPacket> frag0;
 		Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1];
 		Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1];
 		unsigned int totalFragments; // 0 if only frag0 received, waiting for frags
 		unsigned int totalFragments; // 0 if only frag0 received, waiting for frags
 		uint32_t haveFragments; // bit mask, LSB to MSB
 		uint32_t haveFragments; // bit mask, LSB to MSB
 	};
 	};
-	std::map< uint64_t,DefragQueueEntry > _defragQueue;
+	Hashtable< uint64_t,DefragQueueEntry > _defragQueue;
 	Mutex _defragQueue_m;
 	Mutex _defragQueue_m;
 
 
 	// ZeroTier-layer RX queue of incoming packets in the process of being decoded
 	// ZeroTier-layer RX queue of incoming packets in the process of being decoded