2
0
Эх сурвалжийг харах

Rough draft of fq-codel implementation

Joseph Henry 7 жил өмнө
parent
commit
28cb40529d

+ 35 - 0
node/Constants.hpp

@@ -410,6 +410,41 @@
  */
 #define ZT_PATH_IMBALANCE_THRESHOLD 0.20
 
+/**
+ * Max allowable time spent in any queue
+ */
+#define ZT_QOS_TARGET 5 // ms
+
+/**
+ * Time period where the time spent in the queue by a packet should fall below
+ * target at least once
+ */
+#define ZT_QOS_INTERVAL 100 // ms
+
+/**
+ * The number of bytes that each queue is allowed to send during each DRR cycle.
+ * This approximates a single-byte-based fairness queuing scheme
+ */
+#define ZT_QOS_QUANTUM ZT_DEFAULT_MTU
+
+/**
+ * The maximum total number of packets that can be queued among all
+ * active/inactive, old/new queues
+ */
+#define ZT_QOS_MAX_ENQUEUED_PACKETS 1024
+
+/**
+ * Number of QoS queues (buckets)
+ */
+#define ZT_QOS_NUM_BUCKETS 9
+
+/**
+ * All unspecified traffic is put in this bucket. Anything in a bucket with a smaller
+ * value is de-prioritized. Anything in a bucket with a higher value is prioritized over
+ * other traffic.
+ */
+#define ZT_QOS_DEFAULT_BUCKET 0
+
 /**
  * How frequently to send heartbeats over in-use paths
  */

+ 10 - 6
node/Network.cpp

@@ -106,7 +106,8 @@ static _doZtFilterResult _doZtFilter(
 	const unsigned int ruleCount,
 	Address &cc, // MUTABLE -- set to TEE destination if TEE action is taken or left alone otherwise
 	unsigned int &ccLength, // MUTABLE -- set to length of packet payload to TEE
-	bool &ccWatch) // MUTABLE -- set to true for WATCH target as opposed to normal TEE
+	bool &ccWatch, // MUTABLE -- set to true for WATCH target as opposed to normal TEE
+	uint8_t &qosBucket) // MUTABLE -- set to the value of the argument provided to the matching action
 {
 	// Set to true if we are a TEE/REDIRECT/WATCH target
 	bool superAccept = false;
@@ -621,7 +622,8 @@ bool Network::filterOutgoingPacket(
 	const uint8_t *frameData,
 	const unsigned int frameLen,
 	const unsigned int etherType,
-	const unsigned int vlanId)
+	const unsigned int vlanId,
+	uint8_t &qosBucket)
 {
 	const int64_t now = RR->node->now();
 	Address ztFinalDest(ztDest);
@@ -636,7 +638,7 @@ bool Network::filterOutgoingPacket(
 
 	Membership *const membership = (ztDest) ? _memberships.get(ztDest) : (Membership *)0;
 
-	switch(_doZtFilter(RR,rrl,_config,membership,false,ztSource,ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.rules,_config.ruleCount,cc,ccLength,ccWatch)) {
+	switch(_doZtFilter(RR,rrl,_config,membership,false,ztSource,ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.rules,_config.ruleCount,cc,ccLength,ccWatch,qosBucket)) {
 
 		case DOZTFILTER_NO_MATCH: {
 			for(unsigned int c=0;c<_config.capabilityCount;++c) {
@@ -644,7 +646,7 @@ bool Network::filterOutgoingPacket(
 				Address cc2;
 				unsigned int ccLength2 = 0;
 				bool ccWatch2 = false;
-				switch (_doZtFilter(RR,crrl,_config,membership,false,ztSource,ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.capabilities[c].rules(),_config.capabilities[c].ruleCount(),cc2,ccLength2,ccWatch2)) {
+				switch (_doZtFilter(RR,crrl,_config,membership,false,ztSource,ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.capabilities[c].rules(),_config.capabilities[c].ruleCount(),cc2,ccLength2,ccWatch2,qosBucket)) {
 					case DOZTFILTER_NO_MATCH:
 					case DOZTFILTER_DROP: // explicit DROP in a capability just terminates its evaluation and is an anti-pattern
 						break;
@@ -759,11 +761,13 @@ int Network::filterIncomingPacket(
 	bool ccWatch = false;
 	const Capability *c = (Capability *)0;
 
+	uint8_t qosBucket = 255; // For incoming packets this is a dummy value
+
 	Mutex::Lock _l(_lock);
 
 	Membership &membership = _membership(sourcePeer->address());
 
-	switch (_doZtFilter(RR,rrl,_config,&membership,true,sourcePeer->address(),ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.rules,_config.ruleCount,cc,ccLength,ccWatch)) {
+	switch (_doZtFilter(RR,rrl,_config,&membership,true,sourcePeer->address(),ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.rules,_config.ruleCount,cc,ccLength,ccWatch,qosBucket)) {
 
 		case DOZTFILTER_NO_MATCH: {
 			Membership::CapabilityIterator mci(membership,_config);
@@ -772,7 +776,7 @@ int Network::filterIncomingPacket(
 				Address cc2;
 				unsigned int ccLength2 = 0;
 				bool ccWatch2 = false;
-				switch(_doZtFilter(RR,crrl,_config,&membership,true,sourcePeer->address(),ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,c->rules(),c->ruleCount(),cc2,ccLength2,ccWatch2)) {
+				switch(_doZtFilter(RR,crrl,_config,&membership,true,sourcePeer->address(),ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,c->rules(),c->ruleCount(),cc2,ccLength2,ccWatch2,qosBucket)) {
 					case DOZTFILTER_NO_MATCH:
 					case DOZTFILTER_DROP: // explicit DROP in a capability just terminates its evaluation and is an anti-pattern
 						break;

+ 9 - 1
node/Network.hpp

@@ -132,7 +132,8 @@ public:
 		const uint8_t *frameData,
 		const unsigned int frameLen,
 		const unsigned int etherType,
-		const unsigned int vlanId);
+		const unsigned int vlanId,
+		uint8_t &qosBucket);
 
 	/**
 	 * Apply filters to an incoming packet
@@ -297,6 +298,13 @@ public:
 	 */
 	void learnBridgeRoute(const MAC &mac,const Address &addr);
 
+	/**
+	 * Whether QoS is in effect for this network
+	 */
+	bool QoSEnabled() {
+		return false;
+	}
+
 	/**
 	 * Learn a multicast group that is bridged to our tap device
 	 *

+ 1 - 0
node/Node.cpp

@@ -368,6 +368,7 @@ ZT_ResultCode Node::leave(uint64_t nwid,void **uptr,void *tptr)
 	{
 		Mutex::Lock _l(_networks_m);
 		SharedPtr<Network> *nw = _networks.get(nwid);
+		RR->sw->removeNetworkQoSControlBlock(nwid);
 		if (!nw)
 			return ZT_RESULT_OK;
 		if (uptr)

+ 2 - 1
node/OutboundMulticast.cpp

@@ -85,7 +85,8 @@ void OutboundMulticast::sendOnly(const RuntimeEnvironment *RR,void *tPtr,const A
 {
 	const SharedPtr<Network> nw(RR->node->network(_nwid));
 	const Address toAddr2(toAddr);
-	if ((nw)&&(nw->filterOutgoingPacket(tPtr,true,RR->identity.address(),toAddr2,_macSrc,_macDest,_frameData,_frameLen,_etherType,0))) {
+	uint8_t QoSBucket = 255; // Dummy value
+	if ((nw)&&(nw->filterOutgoingPacket(tPtr,true,RR->identity.address(),toAddr2,_macSrc,_macDest,_frameData,_frameLen,_etherType,0,QoSBucket))) {
 		_packet.newInitializationVector();
 		_packet.setDestination(toAddr2);
 		RR->node->expectReplyTo(_packet.packetId());

+ 268 - 7
node/Switch.cpp

@@ -266,6 +266,8 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 		}
 	}
 
+	uint8_t qosBucket = ZT_QOS_DEFAULT_BUCKET;
+
 	if (to.isMulticast()) {
 		MulticastGroup multicastGroup(to,0);
 
@@ -383,7 +385,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 			network->learnBridgedMulticastGroup(tPtr,multicastGroup,RR->node->now());
 
 		// First pass sets noTee to false, but noTee is set to true in OutboundMulticast to prevent duplicates.
-		if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+		if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
 			RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked");
 			return;
 		}
@@ -407,7 +409,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 		Address toZT(to.toAddress(network->id())); // since in-network MACs are derived from addresses and network IDs, we can reverse this
 		SharedPtr<Peer> toPeer(RR->topology->getPeer(tPtr,toZT));
 
-		if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),toZT,from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+		if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),toZT,from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
 			RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked");
 			return;
 		}
@@ -422,7 +424,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 			outp.append(data,len);
 			if (!network->config().disableCompression())
 				outp.compress();
-			send(tPtr,outp,true);
+			aqm_enqueue(tPtr,network,outp,true,qosBucket);
 		} else {
 			Packet outp(toZT,RR->identity.address(),Packet::VERB_FRAME);
 			outp.append(network->id());
@@ -430,7 +432,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 			outp.append(data,len);
 			if (!network->config().disableCompression())
 				outp.compress();
-			send(tPtr,outp,true);
+			aqm_enqueue(tPtr,network,outp,true,qosBucket);
 		}
 
 	} else {
@@ -439,7 +441,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 		// We filter with a NULL destination ZeroTier address first. Filtrations
 		// for each ZT destination are also done below. This is the same rationale
 		// and design as for multicast.
-		if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+		if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
 			RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked");
 			return;
 		}
@@ -477,7 +479,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 		}
 
 		for(unsigned int b=0;b<numBridges;++b) {
-			if (network->filterOutgoingPacket(tPtr,true,RR->identity.address(),bridges[b],from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+			if (network->filterOutgoingPacket(tPtr,true,RR->identity.address(),bridges[b],from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
 				Packet outp(bridges[b],RR->identity.address(),Packet::VERB_EXT_FRAME);
 				outp.append(network->id());
 				outp.append((uint8_t)0x00);
@@ -487,7 +489,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 				outp.append(data,len);
 				if (!network->config().disableCompression())
 					outp.compress();
-				send(tPtr,outp,true);
+				aqm_enqueue(tPtr,network,outp,true,qosBucket);
 			} else {
 				RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked (bridge replication)");
 			}
@@ -495,6 +497,263 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
 	}
 }
 
+void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket)
+{
+	if(!network->QoSEnabled()) {
+		send(tPtr, packet, encrypt);
+		return;
+	}
+	NetworkQoSControlBlock *nqcb = _netQueueControlBlock[network->id()];
+	if (!nqcb) {
+		// DEBUG_INFO("creating network QoS control block (NQCB) for network %llx", network->id());
+		nqcb = new NetworkQoSControlBlock();
+		_netQueueControlBlock[network->id()] = nqcb;
+		// Initialize ZT_QOS_NUM_BUCKETS queues and place them in the INACTIVE list
+		// These queues will be shuffled between the new/old/inactive lists by the enqueue/dequeue algorithm
+		for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
+			nqcb->inactiveQueues.push_back(new ManagedQueue(i));
+		}
+	}
+
+	if (packet.verb() != Packet::VERB_FRAME && packet.verb() != Packet::VERB_EXT_FRAME) {
+		// DEBUG_INFO("skipping, no QoS for this packet, verb=%x", packet.verb());
+		// just send packet normally, no QoS for ZT protocol traffic
+		send(tPtr, packet, encrypt);
+	} 
+
+	_aqm_m.lock();
+
+	// Enqueue packet and move queue to appropriate list
+
+	const Address dest(packet.destination());
+	TXQueueEntry *txEntry = new TXQueueEntry(dest,RR->node->now(),packet,encrypt);
+	
+	ManagedQueue *selectedQueue = nullptr;
+	for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
+		if (i < nqcb->oldQueues.size()) { // search old queues first (I think this is best since old would imply most recent usage of the queue)
+			if (nqcb->oldQueues[i]->id == qosBucket) {
+				selectedQueue = nqcb->oldQueues[i];
+			}
+		} if (i < nqcb->newQueues.size()) { // search new queues (this would imply not often-used queues)
+			if (nqcb->newQueues[i]->id == qosBucket) {
+				selectedQueue = nqcb->newQueues[i];
+			}
+		} if (i < nqcb->inactiveQueues.size()) { // search inactive queues
+			if (nqcb->inactiveQueues[i]->id == qosBucket) {
+				selectedQueue = nqcb->inactiveQueues[i];
+				// move queue to end of NEW queue list
+				selectedQueue->byteCredit = ZT_QOS_QUANTUM;
+				// DEBUG_INFO("moving q=%p from INACTIVE to NEW list", selectedQueue);
+				nqcb->newQueues.push_back(selectedQueue);
+				nqcb->inactiveQueues.erase(nqcb->inactiveQueues.begin() + i);
+			}
+		}
+	}
+	if (!selectedQueue) {
+		return;
+	}
+
+	selectedQueue->q.push_back(txEntry);
+	selectedQueue->byteLength+=txEntry->packet.payloadLength();
+	nqcb->_currEnqueuedPackets++;
+
+	// DEBUG_INFO("nq=%2lu, oq=%2lu, iq=%2lu, nqcb.size()=%3d, bucket=%2d, q=%p", nqcb->newQueues.size(), nqcb->oldQueues.size(), nqcb->inactiveQueues.size(), nqcb->_currEnqueuedPackets, qosBucket, selectedQueue);
+
+	// Drop a packet if necessary
+	ManagedQueue *selectedQueueToDropFrom = nullptr;
+	if (nqcb->_currEnqueuedPackets > ZT_QOS_MAX_ENQUEUED_PACKETS)
+	{
+		// DEBUG_INFO("too many enqueued packets (%d), finding packet to drop", nqcb->_currEnqueuedPackets);
+		int maxQueueLength = 0;
+		for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
+			if (i < nqcb->oldQueues.size()) {
+				if (nqcb->oldQueues[i]->byteLength > maxQueueLength) {
+					maxQueueLength = nqcb->oldQueues[i]->byteLength;
+					selectedQueueToDropFrom = nqcb->oldQueues[i];
+				}
+			} if (i < nqcb->newQueues.size()) {
+				if (nqcb->newQueues[i]->byteLength > maxQueueLength) {
+					maxQueueLength = nqcb->newQueues[i]->byteLength;
+					selectedQueueToDropFrom = nqcb->newQueues[i];
+				}
+			} if (i < nqcb->inactiveQueues.size()) {
+				if (nqcb->inactiveQueues[i]->byteLength > maxQueueLength) {
+					maxQueueLength = nqcb->inactiveQueues[i]->byteLength;
+					selectedQueueToDropFrom = nqcb->inactiveQueues[i];
+				}
+			}
+		}
+		if (selectedQueueToDropFrom) {
+			// DEBUG_INFO("dropping packet from head of largest queue (%d payload bytes)", maxQueueLength);
+			int sizeOfDroppedPacket = selectedQueueToDropFrom->q.front()->packet.payloadLength();
+			delete selectedQueueToDropFrom->q.front();
+			selectedQueueToDropFrom->q.pop_front();
+			selectedQueueToDropFrom->byteLength-=sizeOfDroppedPacket;
+			nqcb->_currEnqueuedPackets--;
+		}
+	}
+	_aqm_m.unlock();
+	aqm_dequeue(tPtr);
+}
+
+uint64_t Switch::control_law(uint64_t t, int count)
+{
+	return t + ZT_QOS_INTERVAL / sqrt(count);
+}
+
+Switch::dqr Switch::dodequeue(ManagedQueue *q, uint64_t now) 
+{
+	dqr r;
+	r.ok_to_drop = false;
+	r.p = q->q.front();
+
+	if (r.p == NULL) {
+		q->first_above_time = 0;
+		return r;
+	}
+	uint64_t sojourn_time = now - r.p->creationTime;
+	if (sojourn_time < ZT_QOS_TARGET || q->byteLength <= ZT_DEFAULT_MTU) {
+		// went below - stay below for at least interval
+		q->first_above_time = 0;
+	} else {
+		if (q->first_above_time == 0) {
+			// just went above from below. if still above at
+			// first_above_time, will say it's ok to drop.
+			q->first_above_time = now + ZT_QOS_INTERVAL;
+		} else if (now >= q->first_above_time) {
+			r.ok_to_drop = true;
+		}
+	}
+	return r;
+}
+
+Switch::TXQueueEntry * Switch::CoDelDequeue(ManagedQueue *q, bool isNew, uint64_t now)
+{
+	dqr r = dodequeue(q, now);
+
+	if (q->dropping) {
+		if (!r.ok_to_drop) {
+			q->dropping = false;
+		}
+		while (now >= q->drop_next && q->dropping) {
+			q->q.pop_front(); // drop
+			r = dodequeue(q, now);
+			if (!r.ok_to_drop) {
+				// leave dropping state
+				q->dropping = false;
+			} else {
+				++(q->count);
+				// schedule the next drop.
+				q->drop_next = control_law(q->drop_next, q->count);
+			}
+		}
+	} else if (r.ok_to_drop) {
+		q->q.pop_front(); // drop
+		r = dodequeue(q, now);
+		q->dropping = true;
+		q->count = (q->count > 2 && now - q->drop_next < 8*ZT_QOS_INTERVAL)?
+		q->count - 2 : 1;
+		q->drop_next = control_law(now, q->count);
+	}
+	return r.p;
+}
+
+void Switch::aqm_dequeue(void *tPtr)
+{
+	// Cycle through network-specific QoS control blocks
+	for(std::map<uint64_t,NetworkQoSControlBlock*>::iterator nqcb(_netQueueControlBlock.begin());nqcb!=_netQueueControlBlock.end();) {
+		if (!(*nqcb).second->_currEnqueuedPackets) {
+			return;
+		}
+
+		uint64_t now = RR->node->now();
+		TXQueueEntry *entryToEmit = nullptr;
+		std::vector<ManagedQueue*> *currQueues = &((*nqcb).second->newQueues);
+		std::vector<ManagedQueue*> *oldQueues = &((*nqcb).second->oldQueues);
+		std::vector<ManagedQueue*> *inactiveQueues = &((*nqcb).second->inactiveQueues);
+
+		_aqm_m.lock();
+
+		// Attempt dequeue from queues in NEW list
+		bool examiningNewQueues = true;
+		while (currQueues->size()) {
+			ManagedQueue *queueAtFrontOfList = currQueues->front();
+			if (queueAtFrontOfList->byteCredit < 0) {
+				queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM;
+				// Move to list of OLD queues
+				// DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList);
+				oldQueues->push_back(queueAtFrontOfList);
+				currQueues->erase(currQueues->begin());
+			} else {
+				entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now);
+				if (!entryToEmit) {
+					// Move to end of list of OLD queues
+					// DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList);
+					oldQueues->push_back(queueAtFrontOfList);
+					currQueues->erase(currQueues->begin());
+				}
+				else {
+					int len = entryToEmit->packet.payloadLength();
+					queueAtFrontOfList->byteLength -= len;
+					queueAtFrontOfList->byteCredit -= len;
+					// Send the packet!
+					queueAtFrontOfList->q.pop_front();
+					send(tPtr, entryToEmit->packet, entryToEmit->encrypt);
+					(*nqcb).second->_currEnqueuedPackets--;
+				}
+				if (queueAtFrontOfList) {
+					//DEBUG_INFO("dequeuing from q=%p, len=%lu in NEW list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit);
+				}
+				break;
+			}
+		}
+
+		// Attempt dequeue from queues in OLD list
+		examiningNewQueues = false;
+		currQueues = &((*nqcb).second->oldQueues);
+		while (currQueues->size()) {
+			ManagedQueue *queueAtFrontOfList = currQueues->front();
+			if (queueAtFrontOfList->byteCredit < 0) {
+				queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM;
+				oldQueues->push_back(queueAtFrontOfList);
+				currQueues->erase(currQueues->begin());
+			} else {
+				entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now);
+				if (!entryToEmit) {
+					//DEBUG_INFO("moving q=%p from OLD to INACTIVE list", queueAtFrontOfList);
+					// Move to inactive list of queues
+					inactiveQueues->push_back(queueAtFrontOfList);
+					currQueues->erase(currQueues->begin());
+				}
+				else {
+					int len = entryToEmit->packet.payloadLength();
+					queueAtFrontOfList->byteLength -= len;
+					queueAtFrontOfList->byteCredit -= len;
+					queueAtFrontOfList->q.pop_front();
+					send(tPtr, entryToEmit->packet, entryToEmit->encrypt);
+					(*nqcb).second->_currEnqueuedPackets--;
+				}
+				if (queueAtFrontOfList) {
+					//DEBUG_INFO("dequeuing from q=%p, len=%lu in OLD list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit);
+				}
+				break;
+			}
+		}
+		nqcb++;
+		_aqm_m.unlock();
+	}
+}
+
+void Switch::removeNetworkQoSControlBlock(uint64_t nwid)
+{
+	NetworkQoSControlBlock *nq = _netQueueControlBlock[nwid];
+	if (nq) {
+		_netQueueControlBlock.erase(nwid);
+		delete nq;
+		nq = NULL;
+	}
+}
+
 void Switch::send(void *tPtr,Packet &packet,bool encrypt)
 {
 	const Address dest(packet.destination());
@@ -550,6 +809,7 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
 
 	{
 		Mutex::Lock _l(_txQueue_m);
+
 		for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
 			if (txi->dest == peer->address()) {
 				if (_trySend(tPtr,txi->packet,txi->encrypt)) {
@@ -574,6 +834,7 @@ unsigned long Switch::doTimerTasks(void *tPtr,int64_t now)
 	std::vector<Address> needWhois;
 	{
 		Mutex::Lock _l(_txQueue_m);
+
 		for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
 			if (_trySend(tPtr,txi->packet,txi->encrypt)) {
 				_txQueue.erase(txi++);

+ 94 - 0
node/Switch.hpp

@@ -59,6 +59,14 @@ class Peer;
  */
 class Switch
 {
+	struct ManagedQueue;
+	struct TXQueueEntry;
+
+	typedef struct {
+		TXQueueEntry *p;
+		bool ok_to_drop;
+	} dqr;
+
 public:
 	Switch(const RuntimeEnvironment *renv);
 
@@ -87,6 +95,62 @@ public:
 	 */
 	void onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
 
+	/**
+	 * Determines the next drop schedule for packets in the TX queue
+	 *
+	 * @param t Current time
+	 * @param count Number of packets dropped this round
+	 */
+	uint64_t control_law(uint64_t t, int count);
+
+	/**
+	 * Selects a packet eligible for transmission from a TX queue. According to the control law, multiple packets
+	 * may be intentionally dropped before a packet is returned to the AQM scheduler.
+	 *
+	 * @param q The TX queue that is being dequeued from
+	 * @param now Current time
+	 */
+	dqr dodequeue(ManagedQueue *q, uint64_t now);
+
+	/**
+	 * Presents a packet to the AQM scheduler.
+	 *
+	 * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
+	 * @param network Network that the packet shall be sent over
+	 * @param packet Packet to be sent
+	 * @param encrypt Encrypt packet payload? (always true except for HELLO)
+	 * @param qosBucket Which bucket the rule-system determined this packet should fall into
+	 */
+	void aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket);
+
+	/**
+	 * Performs a single AQM cycle and dequeues and transmits all eligible packets on all networks
+	 *
+	 * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
+	 */
+	void aqm_dequeue(void *tPtr);
+
+	/**
+	 * Calls the dequeue mechanism and adjust queue state variables
+	 *
+	 * @param q The TX queue that is being dequeued from
+	 * @param isNew Whether or not this queue is in the NEW list
+	 * @param now Current time
+	 */
+	Switch::TXQueueEntry * CoDelDequeue(ManagedQueue *q, bool isNew, uint64_t now);
+
+	/**
+	 * Removes QoS Queues and flow state variables for a specific network. These queues are created
+	 * automatically upon the transmission of the first packet from this peer to another peer on the
+	 * given network.
+	 *
+	 * The reason for existence of queues and flow state variables specific to each network is so that
+	 * each network's QoS rules function independently.
+	 *
+	 * @param nwid Network ID
+	 */
+	void removeNetworkQoSControlBlock(uint64_t nwid);
+
 	/**
 	 * Send a packet to a ZeroTier address (destination in packet)
 	 *
@@ -199,6 +263,7 @@ private:
 	};
 	std::list< TXQueueEntry > _txQueue;
 	Mutex _txQueue_m;
+	Mutex _aqm_m;
 
 	// Tracks sending of VERB_RENDEZVOUS to relaying peers
 	struct _LastUniteKey
@@ -220,6 +285,35 @@ private:
 	};
 	Hashtable< _LastUniteKey,uint64_t > _lastUniteAttempt; // key is always sorted in ascending order, for set-like behavior
 	Mutex _lastUniteAttempt_m;
+
+	// Queue with additional flow state variables
+	struct ManagedQueue
+	{
+		ManagedQueue(int id) :
+			id(id),
+			byteCredit(ZT_QOS_QUANTUM),
+			byteLength(0),
+			dropping(false)
+		{}
+		int id;
+		int byteCredit;
+		int byteLength;
+		uint64_t first_above_time;
+		uint32_t count;
+		uint64_t drop_next;
+		bool dropping;
+		uint64_t drop_next_time;
+		std::list< TXQueueEntry *> q;
+	};
+	// To implement fq_codel we need to maintain a queue of queues
+	struct NetworkQoSControlBlock
+	{
+		int _currEnqueuedPackets;
+		std::vector<ManagedQueue *> newQueues;
+		std::vector<ManagedQueue *> oldQueues;
+		std::vector<ManagedQueue *> inactiveQueues;
+	};
+	std::map<uint64_t,NetworkQoSControlBlock*> _netQueueControlBlock;
 };
 
 } // namespace ZeroTier