浏览代码

Make rxQueue lock-free using an atomic counter ring buffer.

Adam Ierymenko 8 年之前
父节点
当前提交
e3cf756785
共有 3 个文件被更改,包括 33 次插入39 次删除
  1. 9 0
      node/AtomicCounter.hpp
  2. 11 25
      node/Switch.cpp
  3. 13 14
      node/Switch.hpp

+ 9 - 0
node/AtomicCounter.hpp

@@ -47,6 +47,15 @@ public:
 		_v = 0;
 	}
 
+	inline int load() const
+	{
+#ifdef __GNUC__
+		return __sync_or_and_fetch(&_v,0);
+#else
+		return _v.load();
+#endif
+	}
+
 	inline int operator++()
 	{
 #ifdef __GNUC__

+ 11 - 25
node/Switch.cpp

@@ -120,10 +120,8 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
 						// Total fragments must be more than 1, otherwise why are we
 						// seeing a Packet::Fragment?
 
-						Mutex::Lock _l(_rxQueue_m);
-						RXQueueEntry *const rq = _findRXQueueEntry(now,fragmentPacketId);
-
-						if ((!rq->timestamp)||(rq->packetId != fragmentPacketId)) {
+						RXQueueEntry *const rq = _findRXQueueEntry(fragmentPacketId);
+						if (rq->packetId != fragmentPacketId) {
 							// No packet found, so we received a fragment without its head.
 
 							rq->timestamp = now;
@@ -250,10 +248,8 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
 						((uint64_t)reinterpret_cast<const uint8_t *>(data)[7])
 					);
 
-					Mutex::Lock _l(_rxQueue_m);
-					RXQueueEntry *const rq = _findRXQueueEntry(now,packetId);
-
-					if ((!rq->timestamp)||(rq->packetId != packetId)) {
+					RXQueueEntry *const rq = _findRXQueueEntry(packetId);
+					if (rq->packetId != packetId) {
 						// If we have no other fragments yet, create an entry and save the head
 
 						rq->timestamp = now;
@@ -286,14 +282,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
 					// Packet is unfragmented, so just process it
 					IncomingPacket packet(data,len,path,now);
 					if (!packet.tryDecode(RR,tPtr)) {
-						Mutex::Lock _l(_rxQueue_m);
-						RXQueueEntry *rq = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]);
-						unsigned long i = ZT_RX_QUEUE_SIZE - 1;
-						while ((i)&&(rq->timestamp)) {
-							RXQueueEntry *tmp = &(_rxQueue[--i]);
-							if (tmp->timestamp < rq->timestamp)
-								rq = tmp;
-						}
+						RXQueueEntry *const rq = _nextRXQueueEntry();
 						rq->timestamp = now;
 						rq->packetId = packet.packetId();
 						rq->frag0 = packet;
@@ -590,15 +579,12 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
 		_outstandingWhoisRequests.erase(peer->address());
 	}
 
-	{	// finish processing any packets waiting on peer's public key / identity
-		Mutex::Lock _l(_rxQueue_m);
-		unsigned long i = ZT_RX_QUEUE_SIZE;
-		while (i) {
-			RXQueueEntry *rq = &(_rxQueue[--i]);
-			if ((rq->timestamp)&&(rq->complete)) {
-				if (rq->frag0.tryDecode(RR,tPtr))
-					rq->timestamp = 0;
-			}
+	// finish processing any packets waiting on peer's public key / identity
+	for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) {
+		RXQueueEntry *const rq = &(_rxQueue[ptr]);
+		if ((rq->timestamp)&&(rq->complete)) {
+			if (rq->frag0.tryDecode(RR,tPtr))
+				rq->timestamp = 0;
 		}
 	}
 

+ 13 - 14
node/Switch.hpp

@@ -169,25 +169,24 @@ private:
 		bool complete; // if true, packet is complete
 	};
 	RXQueueEntry _rxQueue[ZT_RX_QUEUE_SIZE];
-	Mutex _rxQueue_m;
+	AtomicCounter _rxQueuePtr;
 
-	/* Returns the matching or oldest entry. Caller must check timestamp and
-	 * packet ID to determine which. */
-	inline RXQueueEntry *_findRXQueueEntry(uint64_t now,uint64_t packetId)
+	// Returns matching or next available RX queue entry
+	inline RXQueueEntry *_findRXQueueEntry(uint64_t packetId)
 	{
-		RXQueueEntry *rq;
-		RXQueueEntry *oldest = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]);
-		unsigned long i = ZT_RX_QUEUE_SIZE;
-		while (i) {
-			rq = &(_rxQueue[--i]);
+		unsigned int ptr = static_cast<unsigned int>(_rxQueuePtr.load());
+		for(unsigned int k=0;k<ZT_RX_QUEUE_SIZE;++k) {
+			RXQueueEntry *rq = &(_rxQueue[--ptr % ZT_RX_QUEUE_SIZE]);
 			if ((rq->packetId == packetId)&&(rq->timestamp))
 				return rq;
-			if ((now - rq->timestamp) >= ZT_RX_QUEUE_EXPIRE)
-				rq->timestamp = 0;
-			if (rq->timestamp < oldest->timestamp)
-				oldest = rq;
 		}
-		return oldest;
+		return &(_rxQueue[static_cast<unsigned int>(++_rxQueuePtr) % ZT_RX_QUEUE_SIZE]);
+	}
+
+	// Returns next RX queue entry in ring buffer and increments ring counter
+	inline RXQueueEntry *_nextRXQueueEntry()
+	{
+		return &(_rxQueue[static_cast<unsigned int>(++_rxQueuePtr) % ZT_RX_QUEUE_SIZE]);
 	}
 
 	// ZeroTier-layer TX queue entry