Bladeren bron

Deferred decode for HELLO to prevent HELLOcalypse under high load of new peers.

Adam Ierymenko 9 jaren geleden
bovenliggende
commit
8ef4edebbf
12 gewijzigde bestanden met toevoegingen van 464 en 74 verwijderingen
  1. 21 0
      include/ZeroTierOne.h
  2. 106 0
      node/BinarySemaphore.hpp
  3. 95 0
      node/DeferredPackets.cpp
  4. 98 0
      node/DeferredPackets.hpp
  5. 63 50
      node/IncomingPacket.cpp
  6. 12 5
      node/IncomingPacket.hpp
  7. 37 1
      node/Node.cpp
  8. 1 0
      node/Node.hpp
  9. 7 0
      node/RuntimeEnvironment.hpp
  10. 19 14
      node/SharedPtr.hpp
  11. 4 4
      node/Switch.cpp
  12. 1 0
      objects.mk

+ 21 - 0
include/ZeroTierOne.h

@@ -1511,6 +1511,27 @@ void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned
  */
 void ZT_Node_clusterStatus(ZT_Node *node,ZT_ClusterStatus *cs);
 
+/**
+ * Do things in the background until Node dies
+ *
+ * This function can be called from one or more background threads to process
+ * certain tasks in the background to improve foreground performance. It will
+ * not return until the Node is shut down. If threading is not enabled in
+ * this build it will return immediately and will do nothing.
+ *
+ * This is completely optional. If this is never called, all processing is
+ * done in the foreground in the various processXXXX() methods.
+ *
+ * This does NOT replace or eliminate the need to call the normal
+ * processBackgroundTasks() function in your main loop. This mechanism is
+ * used to offload the processing of expensive mssages onto background
+ * handler threads to prevent foreground performance degradation under
+ * high load.
+ *
+ * @param node Node instance
+ */
+void ZT_Node_backgroundThreadMain(ZT_Node *node);
+
 /**
  * Get ZeroTier One version
  *

+ 106 - 0
node/BinarySemaphore.hpp

@@ -0,0 +1,106 @@
+/*
+ * ZeroTier One - Network Virtualization Everywhere
+ * Copyright (C) 2011-2015  ZeroTier, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * ZeroTier may be used and distributed under the terms of the GPLv3, which
+ * are available at: http://www.gnu.org/licenses/gpl-3.0.html
+ *
+ * If you would like to embed ZeroTier into a commercial application or
+ * redistribute it in a modified binary form, please contact ZeroTier Networks
+ * LLC. Start here: http://www.zerotier.com/
+ */
+
+#ifndef ZT_BINARYSEMAPHORE_HPP
+#define ZT_BINARYSEMAPHORE_HPP
+
+#include <stdio.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#include "Constants.hpp"
+#include "NonCopyable.hpp"
+
+#ifdef __WINDOWS__
+
+#include <Windows.h>
+
+namespace ZeroTier {
+
+class BinarySemaphore : NonCopyable
+{
+public:
+	BinarySemaphore() throw() { _sem = CreateSemaphore(NULL,0,1,NULL); }
+	~BinarySemaphore() { CloseHandle(_sem); }
+	inline void wait() { WaitForSingleObject(_sem,INFINITE); }
+	inline void post() { ReleaseSemaphore(_sem,1,NULL); }
+private:
+	HANDLE _sem;
+};
+
+} // namespace ZeroTier
+
+#else // !__WINDOWS__
+
+#include <pthread.h>
+
+namespace ZeroTier {
+
+class BinarySemaphore : NonCopyable
+{
+public:
+	BinarySemaphore()
+	{
+		pthread_mutex_init(&_mh,(const pthread_mutexattr_t *)0);
+		pthread_cond_init(&_cond,(const pthread_condattr_t *)0);
+		_f = false;
+	}
+
+	~BinarySemaphore()
+	{
+		pthread_cond_destroy(&_cond);
+		pthread_mutex_destroy(&_mh);
+	}
+
+	inline void wait()
+	{
+		pthread_mutex_lock(const_cast <pthread_mutex_t *>(&_mh));
+		while (!_f)
+			pthread_cond_wait(const_cast <pthread_cond_t *>(&_cond),const_cast <pthread_mutex_t *>(&_mh));
+		_f = false;
+		pthread_mutex_unlock(const_cast <pthread_mutex_t *>(&_mh));
+	}
+
+	inline void post()
+	{
+		pthread_mutex_lock(const_cast <pthread_mutex_t *>(&_mh));
+		_f = true;
+		pthread_mutex_unlock(const_cast <pthread_mutex_t *>(&_mh));
+		pthread_cond_signal(const_cast <pthread_cond_t *>(&_cond));
+	}
+
+private:
+	pthread_cond_t _cond;
+	pthread_mutex_t _mh;
+	volatile bool _f;
+};
+
+} // namespace ZeroTier
+
+#endif // !__WINDOWS__
+
+#endif

+ 95 - 0
node/DeferredPackets.cpp

@@ -0,0 +1,95 @@
+/*
+ * ZeroTier One - Network Virtualization Everywhere
+ * Copyright (C) 2011-2015  ZeroTier, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * ZeroTier may be used and distributed under the terms of the GPLv3, which
+ * are available at: http://www.gnu.org/licenses/gpl-3.0.html
+ *
+ * If you would like to embed ZeroTier into a commercial application or
+ * redistribute it in a modified binary form, please contact ZeroTier Networks
+ * LLC. Start here: http://www.zerotier.com/
+ */
+
+#include "Constants.hpp"
+#include "DeferredPackets.hpp"
+#include "IncomingPacket.hpp"
+#include "RuntimeEnvironment.hpp"
+#include "Node.hpp"
+
+namespace ZeroTier {
+
+DeferredPackets::DeferredPackets(const RuntimeEnvironment *renv) :
+	RR(renv),
+	_readPtr(0),
+	_writePtr(0),
+	_die(false)
+{
+}
+
+DeferredPackets::~DeferredPackets()
+{
+	_q_m.lock();
+	_die = true;
+	_q_m.unlock();
+	_q_s.post();
+}
+
+bool DeferredPackets::enqueue(IncomingPacket *pkt)
+{
+	_q_m.lock();
+	const unsigned long p = _writePtr % ZT_DEFFEREDPACKETS_MAX;
+	if (_q[p]) {
+		_q_m.unlock();
+		return false;
+	} else {
+		_q[p].setToUnsafe(pkt);
+		++_writePtr;
+		_q_m.unlock();
+		_q_s.post();
+		return true;
+	}
+}
+
+int DeferredPackets::process()
+{
+	SharedPtr<IncomingPacket> pkt;
+
+	_q_m.lock();
+	if (_die) {
+		_q_m.unlock();
+		_q_s.post();
+		return -1;
+	}
+	while (_readPtr == _writePtr) {
+		_q_m.unlock();
+		_q_s.wait();
+		_q_m.lock();
+		if (_die) {
+			_q_m.unlock();
+			_q_s.post();
+			return -1;
+		}
+	}
+	pkt.swap(_q[_readPtr++ % ZT_DEFFEREDPACKETS_MAX]);
+	_q_m.unlock();
+
+	pkt->tryDecode(RR,true);
+	return 1;
+}
+
+} // namespace ZeroTier

+ 98 - 0
node/DeferredPackets.hpp

@@ -0,0 +1,98 @@
+/*
+ * ZeroTier One - Network Virtualization Everywhere
+ * Copyright (C) 2011-2015  ZeroTier, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * ZeroTier may be used and distributed under the terms of the GPLv3, which
+ * are available at: http://www.gnu.org/licenses/gpl-3.0.html
+ *
+ * If you would like to embed ZeroTier into a commercial application or
+ * redistribute it in a modified binary form, please contact ZeroTier Networks
+ * LLC. Start here: http://www.zerotier.com/
+ */
+
+#ifndef ZT_DEFERREDPACKETS_HPP
+#define ZT_DEFERREDPACKETS_HPP
+
+#include "Constants.hpp"
+#include "SharedPtr.hpp"
+#include "Mutex.hpp"
+#include "DeferredPackets.hpp"
+#include "BinarySemaphore.hpp"
+
+/**
+ * Maximum number of deferred packets
+ */
+#define ZT_DEFFEREDPACKETS_MAX 1024
+
+namespace ZeroTier {
+
+class IncomingPacket;
+class RuntimeEnvironment;
+
+/**
+ * Deferred packets
+ *
+ * IncomingPacket can defer its decoding this way by enqueueing itself here.
+ * When this is done, deferredDecode() is called later. This is done for
+ * operations that may be expensive to allow them to potentially be handled
+ * in the background or rate limited to maintain quality of service for more
+ * routine operations.
+ */
+class DeferredPackets
+{
+public:
+	DeferredPackets(const RuntimeEnvironment *renv);
+	~DeferredPackets();
+
+	/**
+	 * Enqueue a packet
+	 *
+	 * Since packets enqueue themselves, they call it with 'this' and we wrap
+	 * them in a SharedPtr<>. This is safe as SharedPtr<> is introspective and
+	 * supports this. This should not be called from any other code outside
+	 * IncomingPacket.
+	 *
+	 * @param pkt Packet to process later (possibly in the background)
+	 * @return False if queue is full
+	 */
+	bool enqueue(IncomingPacket *pkt);
+
+	/**
+	 * Wait for and then process a deferred packet
+	 *
+	 * If we are shutting down (in destructor), this returns -1 and should
+	 * not be called again. Otherwise it returns the number of packets
+	 * processed.
+	 *
+	 * @return Number processed or -1 if shutting down
+	 */
+	int process();
+
+private:
+	SharedPtr<IncomingPacket> _q[ZT_DEFFEREDPACKETS_MAX];
+	const RuntimeEnvironment *const RR;
+	unsigned long _readPtr;
+	unsigned long _writePtr;
+	bool _die;
+	Mutex _q_m;
+	BinarySemaphore _q_s;
+};
+
+} // namespace ZeroTier
+
+#endif

+ 63 - 50
node/IncomingPacket.cpp

@@ -46,21 +46,31 @@
 #include "Cluster.hpp"
 #include "Node.hpp"
 #include "AntiRecursion.hpp"
+#include "DeferredPackets.hpp"
 
 namespace ZeroTier {
 
-bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR)
+bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,bool deferred)
 {
 	const Address sourceAddress(source());
 	try {
 		if ((cipher() == ZT_PROTO_CIPHER_SUITE__C25519_POLY1305_NONE)&&(verb() == Packet::VERB_HELLO)) {
-			// Unencrypted HELLOs are handled here since they are used to
-			// populate our identity cache in the first place. _doHELLO() is special
-			// in that it contains its own authentication logic.
-			return _doHELLO(RR);
+			// Unencrypted HELLOs require some potentially expensive verification, so
+			// do this in the background if background processing is enabled.
+			DeferredPackets *const dp = RR->dp; // read volatile pointer
+			if ((dp)&&(!deferred)) {
+				dp->enqueue(this);
+				return true; // 'handled' via deferring to background thread(s)
+			} else {
+				// A null pointer for peer to _doHELLO() tells it to run its own
+				// special internal authentication logic. This is done for unencrypted
+				// HELLOs to learn new identities, etc.
+				SharedPtr<Peer> tmp;
+				return _doHELLO(RR,tmp);
+			}
 		}
 
-		SharedPtr<Peer> peer = RR->topology->getPeer(sourceAddress);
+		SharedPtr<Peer> peer(RR->topology->getPeer(sourceAddress));
 		if (peer) {
 			if (!dearmor(peer->key())) {
 				TRACE("dropped packet from %s(%s), MAC authentication failed (size: %u)",peer->address().toString().c_str(),_remoteAddress.toString().c_str(),size());
@@ -79,7 +89,8 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR)
 				default: // ignore unknown verbs, but if they pass auth check they are "received"
 					peer->received(RR,_localAddress,_remoteAddress,hops(),packetId(),v,0,Packet::VERB_NOP);
 					return true;
-				case Packet::VERB_HELLO:                          return _doHELLO(RR);
+
+				case Packet::VERB_HELLO:                          return _doHELLO(RR,peer);
 				case Packet::VERB_ERROR:                          return _doERROR(RR,peer);
 				case Packet::VERB_OK:                             return _doOK(RR,peer);
 				case Packet::VERB_WHOIS:                          return _doWHOIS(RR,peer);
@@ -185,7 +196,7 @@ bool IncomingPacket::_doERROR(const RuntimeEnvironment *RR,const SharedPtr<Peer>
 	return true;
 }
 
-bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR)
+bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR,SharedPtr<Peer> &peer)
 {
 	/* Note: this is the only packet ever sent in the clear, and it's also
 	 * the only packet that we authenticate via a different path. Authentication
@@ -226,63 +237,65 @@ bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR)
 			return true;
 		}
 
-		SharedPtr<Peer> peer(RR->topology->getPeer(id.address()));
-		if (peer) {
-			// We already have an identity with this address -- check for collisions
-
-			if (peer->identity() != id) {
-				// Identity is different from the one we already have -- address collision
-
-				unsigned char key[ZT_PEER_SECRET_KEY_LENGTH];
-				if (RR->identity.agree(id,key,ZT_PEER_SECRET_KEY_LENGTH)) {
-					if (dearmor(key)) { // ensure packet is authentic, otherwise drop
-						TRACE("rejected HELLO from %s(%s): address already claimed",id.address().toString().c_str(),_remoteAddress.toString().c_str());
-						Packet outp(id.address(),RR->identity.address(),Packet::VERB_ERROR);
-						outp.append((unsigned char)Packet::VERB_HELLO);
-						outp.append((uint64_t)pid);
-						outp.append((unsigned char)Packet::ERROR_IDENTITY_COLLISION);
-						outp.armor(key,true);
-						RR->node->putPacket(_localAddress,_remoteAddress,outp.data(),outp.size());
+		if (!peer) {
+			peer = RR->topology->getPeer(id.address());
+			if (peer) {
+				// We already have an identity with this address -- check for collisions
+
+				if (peer->identity() != id) {
+					// Identity is different from the one we already have -- address collision
+
+					unsigned char key[ZT_PEER_SECRET_KEY_LENGTH];
+					if (RR->identity.agree(id,key,ZT_PEER_SECRET_KEY_LENGTH)) {
+						if (dearmor(key)) { // ensure packet is authentic, otherwise drop
+							TRACE("rejected HELLO from %s(%s): address already claimed",id.address().toString().c_str(),_remoteAddress.toString().c_str());
+							Packet outp(id.address(),RR->identity.address(),Packet::VERB_ERROR);
+							outp.append((unsigned char)Packet::VERB_HELLO);
+							outp.append((uint64_t)pid);
+							outp.append((unsigned char)Packet::ERROR_IDENTITY_COLLISION);
+							outp.armor(key,true);
+							RR->node->putPacket(_localAddress,_remoteAddress,outp.data(),outp.size());
+						} else {
+							TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str());
+						}
 					} else {
-						TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str());
+						TRACE("rejected HELLO from %s(%s): key agreement failed",id.address().toString().c_str(),_remoteAddress.toString().c_str());
 					}
+
+					return true;
 				} else {
-					TRACE("rejected HELLO from %s(%s): key agreement failed",id.address().toString().c_str(),_remoteAddress.toString().c_str());
-				}
+					// Identity is the same as the one we already have -- check packet integrity
 
-				return true;
+					if (!dearmor(peer->key())) {
+						TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str());
+						return true;
+					}
+
+					// Continue at // VALID
+				}
 			} else {
-				// Identity is the same as the one we already have -- check packet integrity
+				// We don't already have an identity with this address -- validate and learn it
 
-				if (!dearmor(peer->key())) {
+				// Check identity proof of work
+				if (!id.locallyValidate()) {
+					TRACE("dropped HELLO from %s(%s): identity invalid",id.address().toString().c_str(),_remoteAddress.toString().c_str());
+					return true;
+				}
+
+				// Check packet integrity and authentication
+				SharedPtr<Peer> newPeer(new Peer(RR->identity,id));
+				if (!dearmor(newPeer->key())) {
 					TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str());
 					return true;
 				}
+				peer = RR->topology->addPeer(newPeer);
 
 				// Continue at // VALID
 			}
-		} else {
-			// We don't already have an identity with this address -- validate and learn it
-
-			// Check identity proof of work
-			if (!id.locallyValidate()) {
-				TRACE("dropped HELLO from %s(%s): identity invalid",id.address().toString().c_str(),_remoteAddress.toString().c_str());
-				return true;
-			}
 
-			// Check packet integrity and authentication
-			SharedPtr<Peer> newPeer(new Peer(RR->identity,id));
-			if (!dearmor(newPeer->key())) {
-				TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str());
-				return true;
-			}
-			peer = RR->topology->addPeer(newPeer);
-
-			// Continue at // VALID
+			// VALID -- if we made it here, packet passed identity and authenticity checks!
 		}
 
-		// VALID -- if we made it here, packet passed identity and authenticity checks!
-
 		if (externalSurfaceAddress)
 			RR->sa->iam(id.address(),_remoteAddress,externalSurfaceAddress,RR->topology->isRoot(id),RR->node->now());
 

+ 12 - 5
node/IncomingPacket.hpp

@@ -93,14 +93,21 @@ public:
 	 * about whether the packet was valid. A rejection is 'complete.'
 	 *
 	 * Once true is returned, this must not be called again. The packet's state
-	 * may no longer be valid.
+	 * may no longer be valid. The only exception is deferred decoding. In this
+	 * case true is returned to indicate to the normal decode path that it is
+	 * finished with the packet. The packet will have added itself to the
+	 * deferred queue and will expect tryDecode() to be called one more time
+	 * with deferred set to true.
+	 *
+	 * Deferred decoding is performed by DeferredPackets.cpp and should not be
+	 * done elsewhere. Under deferred decoding packets only get one shot and
+	 * so the return value of tryDecode() is ignored.
 	 *
 	 * @param RR Runtime environment
+	 * @param deferred If true, this is a deferred decode and the return is ignored
 	 * @return True if decoding and processing is complete, false if caller should try again
-	 * @throws std::out_of_range Range error processing packet (should be discarded)
-	 * @throws std::runtime_error Other error processing packet (should be discarded)
 	 */
-	bool tryDecode(const RuntimeEnvironment *RR);
+	bool tryDecode(const RuntimeEnvironment *RR,bool deferred);
 
 	/**
 	 * @return Time of packet receipt / start of decode
@@ -132,7 +139,7 @@ private:
 	// These are called internally to handle packet contents once it has
 	// been authenticated, decrypted, decompressed, and classified.
 	bool _doERROR(const RuntimeEnvironment *RR,const SharedPtr<Peer> &peer);
-	bool _doHELLO(const RuntimeEnvironment *RR);
+	bool _doHELLO(const RuntimeEnvironment *RR,SharedPtr<Peer> &peer); // can be called with NULL peer, while all others cannot
 	bool _doOK(const RuntimeEnvironment *RR,const SharedPtr<Peer> &peer);
 	bool _doWHOIS(const RuntimeEnvironment *RR,const SharedPtr<Peer> &peer);
 	bool _doRENDEZVOUS(const RuntimeEnvironment *RR,const SharedPtr<Peer> &peer);

+ 37 - 1
node/Node.cpp

@@ -47,6 +47,7 @@
 #include "Identity.hpp"
 #include "SelfAwareness.hpp"
 #include "Cluster.hpp"
+#include "DeferredPackets.hpp"
 
 const struct sockaddr_storage ZT_SOCKADDR_NULL = {0};
 
@@ -130,7 +131,14 @@ Node::Node(
 Node::~Node()
 {
 	Mutex::Lock _l(_networks_m);
-	_networks.clear(); // ensure that networks are destroyed before shutdown
+	Mutex::Lock _l2(RR->dpSetLock);
+
+	_networks.clear(); // ensure that networks are destroyed before shutdow
+
+	DeferredPackets *dp = RR->dp;
+	RR->dp = (DeferredPackets *)0;
+	delete dp;
+
 	delete RR->sa;
 	delete RR->topology;
 	delete RR->antiRec;
@@ -637,6 +645,27 @@ void Node::clusterStatus(ZT_ClusterStatus *cs)
 	memset(cs,0,sizeof(ZT_ClusterStatus));
 }
 
+void Node::backgroundThreadMain()
+{
+	RR->dpSetLock.lock();
+	if (!RR->dp) {
+		try {
+			RR->dp = new DeferredPackets(RR);
+		} catch ( ... ) { // sanity check -- could only really happen if out of memory
+			RR->dpSetLock.unlock();
+			return;
+		}
+	}
+	RR->dpSetLock.unlock();
+
+	for(;;) {
+		try {
+			if (RR->dp->process() < 0)
+				break;
+		} catch ( ... ) {} // sanity check -- should not throw
+	}
+}
+
 /****************************************************************************/
 /* Node methods used only within node/                                      */
 /****************************************************************************/
@@ -978,6 +1007,13 @@ void ZT_Node_clusterStatus(ZT_Node *node,ZT_ClusterStatus *cs)
 	} catch ( ... ) {}
 }
 
+void ZT_Node_backgroundThreadMain(ZT_Node *node)
+{
+	try {
+		reinterpret_cast<ZeroTier::Node *>(node)->backgroundThreadMain();
+	} catch ( ... ) {}
+}
+
 void ZT_version(int *major,int *minor,int *revision,unsigned long *featureFlags)
 {
 	if (major) *major = ZEROTIER_ONE_VERSION_MAJOR;

+ 1 - 0
node/Node.hpp

@@ -125,6 +125,7 @@ public:
 	void clusterRemoveMember(unsigned int memberId);
 	void clusterHandleIncomingMessage(const void *msg,unsigned int len);
 	void clusterStatus(ZT_ClusterStatus *cs);
+	void backgroundThreadMain();
 
 	// Internal functions ------------------------------------------------------
 

+ 7 - 0
node/RuntimeEnvironment.hpp

@@ -32,6 +32,7 @@
 
 #include "Constants.hpp"
 #include "Identity.hpp"
+#include "Mutex.hpp"
 
 namespace ZeroTier {
 
@@ -44,6 +45,7 @@ class AntiRecursion;
 class NetworkController;
 class SelfAwareness;
 class Cluster;
+class DeferredPackets;
 
 /**
  * Holds global state for an instance of ZeroTier::Node
@@ -55,6 +57,7 @@ public:
 		node(n)
 		,identity()
 		,localNetworkController((NetworkController *)0)
+		,dp((DeferredPackets *)0)
 		,sw((Switch *)0)
 		,mc((Multicaster *)0)
 		,antiRec((AntiRecursion *)0)
@@ -77,6 +80,10 @@ public:
 	// This is set externally to an instance of this base class
 	NetworkController *localNetworkController;
 
+	// This is created if background threads call Node::backgroundThreadMain().
+	DeferredPackets *volatile dp; // can be read without lock but not written
+	Mutex dpSetLock;
+
 	/*
 	 * Order matters a bit here. These are constructed in this order
 	 * and then deleted in the opposite order on Node exit. The order ensures

+ 19 - 14
node/SharedPtr.hpp

@@ -64,20 +64,6 @@ public:
 		++obj->__refCount;
 	}
 
-	SharedPtr(T *obj,bool runAwayFromZombies)
-		throw() :
-		_ptr(obj)
-	{
-		// HACK: this is used in "handlers" to take ownership of naked pointers,
-		// an ugly pattern that really ought to be factored out.
-		if (runAwayFromZombies) {
-			if ((int)(++obj->__refCount) < 2) {
-				--obj->__refCount;
-				_ptr = (T *)0;
-			}
-		} else ++obj->__refCount;
-	}
-
 	SharedPtr(const SharedPtr &sp)
 		throw() :
 		_ptr(sp._getAndInc())
@@ -105,6 +91,25 @@ public:
 		return *this;
 	}
 
+	/**
+	 * Set to a naked pointer and increment its reference count
+	 *
+	 * This assumes this SharedPtr is NULL and that ptr is not a 'zombie.' No
+	 * checks are performed.
+	 *
+	 * @param ptr Naked pointer to assign
+	 */
+	inline void setToUnsafe(T *ptr)
+	{
+		++ptr->__refCount;
+		_ptr = ptr;
+	}
+
+	/**
+	 * Swap with another pointer 'for free' without ref count overhead
+	 *
+	 * @param with Pointer to swap with
+	 */
 	inline void swap(SharedPtr &with)
 		throw()
 	{

+ 4 - 4
node/Switch.cpp

@@ -475,7 +475,7 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer)
 	{	// finish processing any packets waiting on peer's public key / identity
 		Mutex::Lock _l(_rxQueue_m);
 		for(std::list< SharedPtr<IncomingPacket> >::iterator rxi(_rxQueue.begin());rxi!=_rxQueue.end();) {
-			if ((*rxi)->tryDecode(RR))
+			if ((*rxi)->tryDecode(RR,false))
 				_rxQueue.erase(rxi++);
 			else ++rxi;
 		}
@@ -672,7 +672,7 @@ void Switch::_handleRemotePacketFragment(const InetAddress &localAddr,const Inet
 						packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength());
 					_defragQueue.erase(pid); // dq no longer valid after this
 
-					if (!packet->tryDecode(RR)) {
+					if (!packet->tryDecode(RR,false)) {
 						Mutex::Lock _l(_rxQueue_m);
 						_rxQueue.push_back(packet);
 					}
@@ -746,7 +746,7 @@ void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddr
 					packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength());
 				_defragQueue.erase(pid); // dq no longer valid after this
 
-				if (!packet->tryDecode(RR)) {
+				if (!packet->tryDecode(RR,false)) {
 					Mutex::Lock _l(_rxQueue_m);
 					_rxQueue.push_back(packet);
 				}
@@ -757,7 +757,7 @@ void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddr
 		} // else this is a duplicate head, ignore
 	} else {
 		// Packet is unfragmented, so just process it
-		if (!packet->tryDecode(RR)) {
+		if (!packet->tryDecode(RR,false)) {
 			Mutex::Lock _l(_rxQueue_m);
 			_rxQueue.push_back(packet);
 		}

+ 1 - 0
objects.mk

@@ -5,6 +5,7 @@ OBJS=\
 	node/C25519.o \
 	node/CertificateOfMembership.o \
 	node/Cluster.o \
+	node/DeferredPackets.o \
 	node/Dictionary.o \
 	node/Identity.o \
 	node/IncomingPacket.o \