Pārlūkot izejas kodu

More cleanup and removal of DeferredPackets, will do the latter in a more elegant way.

Adam Ierymenko 9 gadi atpakaļ
vecāks
revīzija
98152d974a

+ 0 - 21
include/ZeroTierOne.h

@@ -1902,27 +1902,6 @@ void ZT_Node_clusterStatus(ZT_Node *node,ZT_ClusterStatus *cs);
  */
 void ZT_Node_setTrustedPaths(ZT_Node *node,const struct sockaddr_storage *networks,const uint64_t *ids,unsigned int count);
 
-/**
- * Do things in the background until Node dies
- *
- * This function can be called from one or more background threads to process
- * certain tasks in the background to improve foreground performance. It will
- * not return until the Node is shut down. If threading is not enabled in
- * this build it will return immediately and will do nothing.
- *
- * This is completely optional. If this is never called, all processing is
- * done in the foreground in the various processXXXX() methods.
- *
- * This does NOT replace or eliminate the need to call the normal
- * processBackgroundTasks() function in your main loop. This mechanism is
- * used to offload the processing of expensive mssages onto background
- * handler threads to prevent foreground performance degradation under
- * high load.
- *
- * @param node Node instance
- */
-void ZT_Node_backgroundThreadMain(ZT_Node *node);
-
 /**
  * Get ZeroTier One version
  *

+ 4 - 0
node/Capability.cpp

@@ -28,9 +28,11 @@ namespace ZeroTier {
 int Capability::verify(const RuntimeEnvironment *RR) const
 {
 	try {
+		// There must be at least one entry, and sanity check for bad chain max length
 		if ((_maxCustodyChainLength < 1)||(_maxCustodyChainLength > ZT_MAX_CAPABILITY_CUSTODY_CHAIN_LENGTH))
 			return -1;
 
+		// Validate all entries in chain of custody
 		Buffer<(sizeof(Capability) * 2)> tmp;
 		this->serialize(tmp,true);
 		for(unsigned int c=0;c<_maxCustodyChainLength;++c) {
@@ -53,6 +55,8 @@ int Capability::verify(const RuntimeEnvironment *RR) const
 				return 1;
 			}
 		}
+
+		// We reached max custody chain length and everything was valid
 		return 0;
 	} catch ( ... ) {}
 	return -1;

+ 29 - 33
node/Capability.hpp

@@ -114,6 +114,23 @@ public:
 	 */
 	inline uint64_t expiration() const { return _expiration; }
 
+	/**
+	 * Check to see if a given address is a 'to' address in the custody chain
+	 *
+	 * This does not actually do certificate checking. That must be done with verify().
+	 *
+	 * @param a Address to check
+	 * @return True if address is present
+	 */
+	inline bool wasIssuedTo(const Address &a) const
+	{
+		for(unsigned int i=0;i<ZT_MAX_CAPABILITY_CUSTODY_CHAIN_LENGTH;++i) {
+			if (_custody[i].to == a)
+				return true;
+		}
+		return false;
+	}
+
 	/**
 	 * Sign this capability and add signature to its chain of custody
 	 *
@@ -132,10 +149,10 @@ public:
 		try {
 			for(unsigned int i=0;((i<_maxCustodyChainLength)&&(i<ZT_MAX_CAPABILITY_CUSTODY_CHAIN_LENGTH));++i) {
 				if (!(_custody[i].to)) {
-					_custody[i].to = to;
-					_custody[i].from = from.address();
 					Buffer<(sizeof(Capability) * 2)> tmp;
 					this->serialize(tmp,true);
+					_custody[i].to = to;
+					_custody[i].from = from.address();
 					_custody[i].signature = from.sign(tmp.data(),tmp.size());
 					return true;
 				}
@@ -255,22 +272,21 @@ public:
 		b.append(_id);
 		b.append(_nwid);
 		b.append(_expiration);
-
 		serializeRules(b,_rules,_ruleCount);
-
 		b.append((uint8_t)_maxCustodyChainLength);
-		for(unsigned int i=0;;++i) {
-			if ((i < _maxCustodyChainLength)&&(i < ZT_MAX_CAPABILITY_CUSTODY_CHAIN_LENGTH)&&(_custody[i].to)) {
-				_custody[i].to.appendTo(b);
-				_custody[i].from.appendTo(b);
-				if (!forSign) {
+
+		if (!forSign) {
+			for(unsigned int i=0;;++i) {
+				if ((i < _maxCustodyChainLength)&&(i < ZT_MAX_CAPABILITY_CUSTODY_CHAIN_LENGTH)&&(_custody[i].to)) {
+					_custody[i].to.appendTo(b);
+					_custody[i].from.appendTo(b);
 					b.append((uint8_t)1); // 1 == Ed25519 signature
 					b.append((uint16_t)ZT_C25519_SIGNATURE_LEN); // length of signature
 					b.append(_custody[i].signature.data,ZT_C25519_SIGNATURE_LEN);
+				} else {
+					b.append((unsigned char)0,ZT_ADDRESS_LENGTH); // zero 'to' terminates chain
+					break;
 				}
-			} else {
-				b.append((unsigned char)0,ZT_ADDRESS_LENGTH); // zero 'to' terminates chain
-				break;
 			}
 		}
 
@@ -369,10 +385,9 @@ public:
 		_id = b.template at<uint32_t>(p); p += 4;
 		_nwid = b.template at<uint64_t>(p); p += 8;
 		_expiration = b.template at<uint64_t>(p); p += 8;
-
 		deserializeRules(b,p,_rules,_ruleCount,ZT_MAX_CAPABILITY_RULES);
-
 		_maxCustodyChainLength = (unsigned int)b[p++];
+
 		if ((_maxCustodyChainLength < 1)||(_maxCustodyChainLength > ZT_MAX_CAPABILITY_CUSTODY_CHAIN_LENGTH))
 			throw std::runtime_error("invalid max custody chain length");
 		for(unsigned int i;;++i) {
@@ -393,25 +408,6 @@ public:
 		return (p - startAt);
 	}
 
-	/**
-	 * Check to see if a given address is a 'to' address in the custody chain
-	 *
-	 * This does not actually do certificate checking. That must be done with verify().
-	 *
-	 * @param a Address to check
-	 * @return True if address is present
-	 */
-	inline bool wasIssuedTo(const Address &a) const
-	{
-		for(unsigned int i=0;i<ZT_MAX_CAPABILITY_CUSTODY_CHAIN_LENGTH;++i) {
-			if (!_custody[i].to)
-				break;
-			else if (_custody[i].to == a)
-				return true;
-		}
-		return false;
-	}
-
 	// Provides natural sort order by ID
 	inline bool operator<(const Capability &c) const { return (_id < c._id); }
 

+ 0 - 100
node/DeferredPackets.cpp

@@ -1,100 +0,0 @@
-/*
- * ZeroTier One - Network Virtualization Everywhere
- * Copyright (C) 2011-2016  ZeroTier, Inc.  https://www.zerotier.com/
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "Constants.hpp"
-#include "DeferredPackets.hpp"
-#include "IncomingPacket.hpp"
-#include "RuntimeEnvironment.hpp"
-#include "Node.hpp"
-
-namespace ZeroTier {
-
-DeferredPackets::DeferredPackets(const RuntimeEnvironment *renv) :
-	RR(renv),
-	_waiting(0),
-	_die(false)
-{
-}
-
-DeferredPackets::~DeferredPackets()
-{
-	_q_m.lock();
-	_die = true;
-	_q_m.unlock();
-
-	for(;;) {
-		_q_s.post();
-
-		_q_m.lock();
-		if (_waiting <= 0) {
-			_q_m.unlock();
-			break;
-		} else {
-			_q_m.unlock();
-		}
-	}
-}
-
-bool DeferredPackets::enqueue(IncomingPacket *pkt)
-{
-	{
-		Mutex::Lock _l(_q_m);
-		if (_q.size() >= ZT_DEFFEREDPACKETS_MAX)
-			return false;
-		_q.push_back(*pkt);
-	}
-	_q_s.post();
-	return true;
-}
-
-int DeferredPackets::process()
-{
-	std::list<IncomingPacket> pkt;
-
-	_q_m.lock();
-
-	if (_die) {
-		_q_m.unlock();
-		return -1;
-	}
-
-	while (_q.empty()) {
-		++_waiting;
-		_q_m.unlock();
-		_q_s.wait();
-		_q_m.lock();
-		--_waiting;
-		if (_die) {
-			_q_m.unlock();
-			return -1;
-		}
-	}
-
-	// Move item from _q list to a dummy list here to avoid copying packet
-	pkt.splice(pkt.end(),_q,_q.begin());
-
-	_q_m.unlock();
-
-	try {
-		pkt.front().tryDecode(RR,true);
-	} catch ( ... ) {} // drop invalids
-
-	return 1;
-}
-
-} // namespace ZeroTier

+ 0 - 85
node/DeferredPackets.hpp

@@ -1,85 +0,0 @@
-/*
- * ZeroTier One - Network Virtualization Everywhere
- * Copyright (C) 2011-2016  ZeroTier, Inc.  https://www.zerotier.com/
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef ZT_DEFERREDPACKETS_HPP
-#define ZT_DEFERREDPACKETS_HPP
-
-#include <list>
-
-#include "Constants.hpp"
-#include "SharedPtr.hpp"
-#include "Mutex.hpp"
-#include "DeferredPackets.hpp"
-#include "BinarySemaphore.hpp"
-
-/**
- * Maximum number of deferred packets
- */
-#define ZT_DEFFEREDPACKETS_MAX 256
-
-namespace ZeroTier {
-
-class IncomingPacket;
-class RuntimeEnvironment;
-
-/**
- * Deferred packets
- *
- * IncomingPacket can defer its decoding this way by enqueueing itself here.
- * When this is done, deferredDecode() is called later. This is done for
- * operations that may be expensive to allow them to potentially be handled
- * in the background or rate limited to maintain quality of service for more
- * routine operations.
- */
-class DeferredPackets
-{
-public:
-	DeferredPackets(const RuntimeEnvironment *renv);
-	~DeferredPackets();
-
-	/**
-	 * Enqueue a packet
-	 *
-	 * @param pkt Packet to process later (possibly in the background)
-	 * @return False if queue is full
-	 */
-	bool enqueue(IncomingPacket *pkt);
-
-	/**
-	 * Wait for and then process a deferred packet
-	 *
-	 * If we are shutting down (in destructor), this returns -1 and should
-	 * not be called again. Otherwise it returns the number of packets
-	 * processed.
-	 *
-	 * @return Number processed or -1 if shutting down
-	 */
-	int process();
-
-private:
-	std::list<IncomingPacket> _q;
-	const RuntimeEnvironment *const RR;
- 	volatile int _waiting;
-	volatile bool _die;
-	Mutex _q_m;
-	BinarySemaphore _q_s;
-};
-
-} // namespace ZeroTier
-
-#endif

+ 6 - 14
node/IncomingPacket.cpp

@@ -36,7 +36,6 @@
 #include "World.hpp"
 #include "Cluster.hpp"
 #include "Node.hpp"
-#include "DeferredPackets.hpp"
 #include "Filter.hpp"
 #include "CertificateOfMembership.hpp"
 #include "Capability.hpp"
@@ -44,7 +43,7 @@
 
 namespace ZeroTier {
 
-bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,bool deferred)
+bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR)
 {
 	const Address sourceAddress(source());
 
@@ -64,18 +63,11 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,bool deferred)
 				return true;
 			}
 		} else if ((c == ZT_PROTO_CIPHER_SUITE__C25519_POLY1305_NONE)&&(verb() == Packet::VERB_HELLO)) {
-			// Unencrypted HELLOs require some potentially expensive verification, so
-			// do this in the background if background processing is enabled.
-			if ((RR->dpEnabled > 0)&&(!deferred)) {
-				RR->dp->enqueue(this);
-				return true; // 'handled' via deferring to background thread(s)
-			} else {
-				// A null pointer for peer to _doHELLO() tells it to run its own
-				// special internal authentication logic. This is done for unencrypted
-				// HELLOs to learn new identities, etc.
-				SharedPtr<Peer> tmp;
-				return _doHELLO(RR,tmp);
-			}
+			// A null pointer for peer to _doHELLO() tells it to run its own
+			// special internal authentication logic. This is done for unencrypted
+			// HELLOs to learn new identities, etc.
+			SharedPtr<Peer> tmp;
+			return _doHELLO(RR,tmp);
 		}
 
 		SharedPtr<Peer> peer(RR->topology->getPeer(sourceAddress));

+ 2 - 11
node/IncomingPacket.hpp

@@ -118,21 +118,12 @@ public:
 	 * about whether the packet was valid. A rejection is 'complete.'
 	 *
 	 * Once true is returned, this must not be called again. The packet's state
-	 * may no longer be valid. The only exception is deferred decoding. In this
-	 * case true is returned to indicate to the normal decode path that it is
-	 * finished with the packet. The packet will have added itself to the
-	 * deferred queue and will expect tryDecode() to be called one more time
-	 * with deferred set to true.
-	 *
-	 * Deferred decoding is performed by DeferredPackets.cpp and should not be
-	 * done elsewhere. Under deferred decoding packets only get one shot and
-	 * so the return value of tryDecode() is ignored.
+	 * may no longer be valid.
 	 *
 	 * @param RR Runtime environment
-	 * @param deferred If true, this is a deferred decode and the return is ignored
 	 * @return True if decoding and processing is complete, false if caller should try again
 	 */
-	bool tryDecode(const RuntimeEnvironment *RR,bool deferred);
+	bool tryDecode(const RuntimeEnvironment *RR);
 
 	/**
 	 * @return Time of packet receipt / start of decode

+ 0 - 24
node/Node.cpp

@@ -37,7 +37,6 @@
 #include "Identity.hpp"
 #include "SelfAwareness.hpp"
 #include "Cluster.hpp"
-#include "DeferredPackets.hpp"
 
 const struct sockaddr_storage ZT_SOCKADDR_NULL = {0};
 
@@ -108,9 +107,7 @@ Node::Node(
 		RR->mc = new Multicaster(RR);
 		RR->topology = new Topology(RR);
 		RR->sa = new SelfAwareness(RR);
-		RR->dp = new DeferredPackets(RR);
 	} catch ( ... ) {
-		delete RR->dp;
 		delete RR->sa;
 		delete RR->topology;
 		delete RR->mc;
@@ -127,8 +124,6 @@ Node::~Node()
 
 	_networks.clear(); // ensure that networks are destroyed before shutdow
 
-	RR->dpEnabled = 0;
-	delete RR->dp;
 	delete RR->sa;
 	delete RR->topology;
 	delete RR->mc;
@@ -621,18 +616,6 @@ void Node::clusterStatus(ZT_ClusterStatus *cs)
 	memset(cs,0,sizeof(ZT_ClusterStatus));
 }
 
-void Node::backgroundThreadMain()
-{
-	++RR->dpEnabled;
-	for(;;) {
-		try {
-			if (RR->dp->process() < 0)
-				break;
-		} catch ( ... ) {} // sanity check -- should not throw
-	}
-	--RR->dpEnabled;
-}
-
 /****************************************************************************/
 /* Node methods used only within node/                                      */
 /****************************************************************************/
@@ -1009,13 +992,6 @@ void ZT_Node_setTrustedPaths(ZT_Node *node,const struct sockaddr_storage *networ
 	} catch ( ... ) {}
 }
 
-void ZT_Node_backgroundThreadMain(ZT_Node *node)
-{
-	try {
-		reinterpret_cast<ZeroTier::Node *>(node)->backgroundThreadMain();
-	} catch ( ... ) {}
-}
-
 void ZT_version(int *major,int *minor,int *revision)
 {
 	if (major) *major = ZEROTIER_ONE_VERSION_MAJOR;

+ 0 - 9
node/Node.hpp

@@ -117,18 +117,9 @@ public:
 	void clusterRemoveMember(unsigned int memberId);
 	void clusterHandleIncomingMessage(const void *msg,unsigned int len);
 	void clusterStatus(ZT_ClusterStatus *cs);
-	void backgroundThreadMain();
 
 	// Internal functions ------------------------------------------------------
 
-	/**
-	 * Convenience threadMain() for easy background thread launch
-	 *
-	 * This allows background threads to be launched with Thread::start
-	 * that will run against this node.
-	 */
-	inline void threadMain() throw() { this->backgroundThreadMain(); }
-
 	/**
 	 * @return Time as of last call to run()
 	 */

+ 0 - 9
node/RuntimeEnvironment.hpp

@@ -35,7 +35,6 @@ class Multicaster;
 class NetworkController;
 class SelfAwareness;
 class Cluster;
-class DeferredPackets;
 
 /**
  * Holds global state for an instance of ZeroTier::Node
@@ -51,11 +50,9 @@ public:
 		,mc((Multicaster *)0)
 		,topology((Topology *)0)
 		,sa((SelfAwareness *)0)
-		,dp((DeferredPackets *)0)
 #ifdef ZT_ENABLE_CLUSTER
 		,cluster((Cluster *)0)
 #endif
-		,dpEnabled(0)
 	{
 	}
 
@@ -82,15 +79,9 @@ public:
 	Multicaster *mc;
 	Topology *topology;
 	SelfAwareness *sa;
-	DeferredPackets *dp;
-
 #ifdef ZT_ENABLE_CLUSTER
 	Cluster *cluster;
 #endif
-
-	// This is set to >0 if background threads are waiting on deferred
-	// packets, otherwise 'dp' should not be used.
-	volatile int dpEnabled;
 };
 
 } // namespace ZeroTier

+ 4 - 4
node/Switch.cpp

@@ -165,7 +165,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
 								for(unsigned int f=1;f<totalFragments;++f)
 									rq->frag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength());
 
-								if (rq->frag0.tryDecode(RR,false)) {
+								if (rq->frag0.tryDecode(RR)) {
 									rq->timestamp = 0; // packet decoded, free entry
 								} else {
 									rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something
@@ -264,7 +264,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
 							for(unsigned int f=1;f<rq->totalFragments;++f)
 								rq->frag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength());
 
-							if (rq->frag0.tryDecode(RR,false)) {
+							if (rq->frag0.tryDecode(RR)) {
 								rq->timestamp = 0; // packet decoded, free entry
 							} else {
 								rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something
@@ -277,7 +277,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from
 				} else {
 					// Packet is unfragmented, so just process it
 					IncomingPacket packet(data,len,localAddr,fromAddr,now);
-					if (!packet.tryDecode(RR,false)) {
+					if (!packet.tryDecode(RR)) {
 						Mutex::Lock _l(_rxQueue_m);
 						RXQueueEntry *rq = &(_rxQueue[ZT_RX_QUEUE_SIZE - 1]);
 						unsigned long i = ZT_RX_QUEUE_SIZE - 1;
@@ -705,7 +705,7 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer)
 		while (i) {
 			RXQueueEntry *rq = &(_rxQueue[--i]);
 			if ((rq->timestamp)&&(rq->complete)) {
-				if (rq->frag0.tryDecode(RR,false))
+				if (rq->frag0.tryDecode(RR))
 					rq->timestamp = 0;
 			}
 		}

+ 0 - 1
objects.mk

@@ -3,7 +3,6 @@ OBJS=\
 	node/Capability.o \
 	node/CertificateOfMembership.o \
 	node/Cluster.o \
-	node/DeferredPackets.o \
 	node/Filter.o \
 	node/Identity.o \
 	node/IncomingPacket.o \

+ 0 - 4
service/OneService.cpp

@@ -864,10 +864,6 @@ public:
 				}
 			}
 
-			// Start two background threads to handle expensive ops out of line
-			Thread::start(_node);
-			Thread::start(_node);
-
 			_nextBackgroundTaskDeadline = 0;
 			uint64_t clockShouldBe = OSUtils::now();
 			_lastRestart = clockShouldBe;