Browse Source

Fix a bug that caused a crash on empty HTTP requests (localhost only) and add a lightweight lock to the RX queue to prevent possible threads stepping on each other in parallel receive paths.

Adam Ierymenko 7 years ago
parent
commit
5b114791e5
4 changed files with 8 additions and 3 deletions
  1. 5 0
      node/Switch.cpp
  2. 1 0
      node/Switch.hpp
  3. 0 3
      node/Topology.cpp
  4. 2 0
      service/OneService.cpp

+ 5 - 0
node/Switch.cpp

@@ -121,6 +121,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
 						// seeing a Packet::Fragment?
 
 						RXQueueEntry *const rq = _findRXQueueEntry(fragmentPacketId);
+						Mutex::Lock rql(rq->lock);
 						if (rq->packetId != fragmentPacketId) {
 							// No packet found, so we received a fragment without its head.
 
@@ -203,6 +204,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
 					);
 
 					RXQueueEntry *const rq = _findRXQueueEntry(packetId);
+					Mutex::Lock rql(rq->lock);
 					if (rq->packetId != packetId) {
 						// If we have no other fragments yet, create an entry and save the head
 
@@ -237,6 +239,7 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
 					IncomingPacket packet(data,len,path,now);
 					if (!packet.tryDecode(RR,tPtr)) {
 						RXQueueEntry *const rq = _nextRXQueueEntry();
+						Mutex::Lock rql(rq->lock);
 						rq->timestamp = now;
 						rq->packetId = packet.packetId();
 						rq->frag0 = packet;
@@ -545,6 +548,7 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
 	const int64_t now = RR->node->now();
 	for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) {
 		RXQueueEntry *const rq = &(_rxQueue[ptr]);
+		Mutex::Lock rql(rq->lock);
 		if ((rq->timestamp)&&(rq->complete)) {
 			if ((rq->frag0.tryDecode(RR,tPtr))||((now - rq->timestamp) > ZT_RECEIVE_QUEUE_TIMEOUT))
 				rq->timestamp = 0;
@@ -594,6 +598,7 @@ unsigned long Switch::doTimerTasks(void *tPtr,int64_t now)
 
 	for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) {
 		RXQueueEntry *const rq = &(_rxQueue[ptr]);
+		Mutex::Lock rql(rq->lock);
 		if ((rq->timestamp)&&(rq->complete)) {
 			if ((rq->frag0.tryDecode(RR,tPtr))||((now - rq->timestamp) > ZT_RECEIVE_QUEUE_TIMEOUT)) {
 				rq->timestamp = 0;

+ 1 - 0
node/Switch.hpp

@@ -159,6 +159,7 @@ private:
 		unsigned int totalFragments; // 0 if only frag0 received, waiting for frags
 		uint32_t haveFragments; // bit mask, LSB to MSB
 		volatile bool complete; // if true, packet is complete
+		Mutex lock;
 	};
 	RXQueueEntry _rxQueue[ZT_RX_QUEUE_SIZE];
 	AtomicCounter _rxQueuePtr;

+ 0 - 3
node/Topology.cpp

@@ -382,8 +382,6 @@ void Topology::doPeriodicTasks(void *tPtr,int64_t now)
 		}
 	}
 
-	// Temporarily disable path cleanup to test hypothesis about periodic threading issues as reported by Keysight.
-/*
 	{
 		Mutex::Lock _l(_paths_m);
 		Hashtable< Path::HashKey,SharedPtr<Path> >::Iterator i(_paths);
@@ -394,7 +392,6 @@ void Topology::doPeriodicTasks(void *tPtr,int64_t now)
 				_paths.erase(*k);
 		}
 	}
-*/
 }
 
 void Topology::_memoizeUpstreams(void *tPtr)

+ 2 - 0
service/OneService.cpp

@@ -1063,6 +1063,8 @@ public:
 					else urlArgs[a->substr(0,eqpos)] = a->substr(eqpos + 1);
 				}
 			}
+		} else {
+			return 404;
 		}
 
 		bool isAuth = false;