Browse Source

Bunch of little bug fixes in newly refactored code.

Adam Ierymenko 12 years ago
parent
commit
aa59c1de10
8 changed files with 110 additions and 58 deletions
  1. 4 4
      Makefile.linux
  2. 1 1
      node/Constants.hpp
  3. 48 36
      node/Multicaster.hpp
  4. 3 1
      node/Node.cpp
  5. 22 8
      node/PacketDecoder.cpp
  6. 4 0
      node/RuntimeEnvironment.hpp
  7. 28 7
      node/Switch.cpp
  8. 0 1
      node/Topology.cpp

+ 4 - 4
Makefile.linux

@@ -6,12 +6,12 @@ ARCH=$(shell uname -m)
 DEFS=-DZT_ARCH="$(ARCH)" -DZT_OSNAME="linux" -DZT_TRACE
 
 # Uncomment for a release optimized build
-#CFLAGS=-Wall -O3 -fno-unroll-loops -fstack-protector -pthread $(INCLUDES) -DNDEBUG $(DEFS)
-#STRIP=strip --strip-all
+CFLAGS=-Wall -O3 -fno-unroll-loops -fstack-protector -pthread $(INCLUDES) -DNDEBUG $(DEFS)
+STRIP=strip --strip-all
 
 # Uncomment for a debug build
-CFLAGS=-Wall -g -pthread $(INCLUDES) -DZT_TRACE -DZT_LOG_STDOUT $(DEFS)
-STRIP=echo
+#CFLAGS=-Wall -g -pthread $(INCLUDES) -DZT_TRACE -DZT_LOG_STDOUT $(DEFS)
+#STRIP=echo
 
 CXXFLAGS=$(CFLAGS) -fno-rtti
 

+ 1 - 1
node/Constants.hpp

@@ -235,7 +235,7 @@ error_no_ZT_ARCH_defined;
 /**
  * Length of circular ring buffer history of multicast packets
  */
-#define ZT_MULTICAST_DEDUP_HISTORY_LENGTH 4096
+#define ZT_MULTICAST_DEDUP_HISTORY_LENGTH 1024
 
 /**
  * Expiration time in ms for multicast history items

+ 48 - 36
node/Multicaster.hpp

@@ -52,7 +52,7 @@
 #include "Identity.hpp"
 
 // Maximum sample size to pick during choice of multicast propagation peers
-#define ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE 64
+#define ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE 32
 
 namespace ZeroTier {
 
@@ -75,6 +75,7 @@ public:
 		throw()
 	{
 		memset(_multicastHistory,0,sizeof(_multicastHistory));
+		_multicastHistoryPtr = 0;
 	}
 
 	/**
@@ -131,54 +132,67 @@ public:
 	}
 
 	/**
-	 * Check multicast history to see if this is a duplicate, and add/update entry
+	 * Compute the CRC64 code for multicast deduplication
 	 *
-	 * @param from Ultimate sending MAC address
-	 * @param to Destination multicast group
-	 * @param payload Multicast packet payload
-	 * @param len Length of packet
 	 * @param nwid Network ID
-	 * @param now Current time
-	 * @return True if this appears to be a duplicate to within history expiration time
+	 * @param from Sender MAC
+	 * @param to Destination multicast group
+	 * @param etherType Ethernet frame type
+	 * @param payload Multicast frame data
+	 * @param len Length of frame
 	 */
-	inline bool checkAndUpdateMulticastHistory(
+	static inline uint64_t computeMulticastDedupCrc(
+		uint64_t nwid,
 		const MAC &from,
 		const MulticastGroup &to,
+		unsigned int etherType,
 		const void *payload,
-		unsigned int len,
-		const uint64_t nwid,
-		const uint64_t now)
+		unsigned int len)
 		throw()
 	{
-		// Note: CRCs aren't transmitted over the network, so portability and
-		// byte order don't matter. This calculation can be changed. We just
-		// want a unique code.
+		// This CRC is only used locally, so byte order issues and
+		// such don't matter. It can also be changed without protocol
+		// impact.
 		uint64_t crc = Utils::crc64(0,from.data,6);
 		crc = Utils::crc64(crc,to.mac().data,6);
 		crc ^= (uint64_t)to.adi();
+		crc ^= (uint64_t)etherType;
 		crc = Utils::crc64(crc,payload,len);
 		crc ^= nwid; // also include network ID in CRC
+		return crc;
+	}
 
-		// Replace existing entry or pick one to replace with new entry
-		uint64_t earliest = 0xffffffffffffffffULL;
-		unsigned long earliestIdx = 0;
+	/**
+	 * Check multicast history to see if this is a duplicate
+	 *
+	 * @param crc Multicast CRC
+	 * @param now Current time
+	 * @return True if this appears to be a duplicate to within history expiration time
+	 */
+	inline bool checkDuplicate(uint64_t crc,uint64_t now) const
+		throw()
+	{
 		for(unsigned int i=0;i<ZT_MULTICAST_DEDUP_HISTORY_LENGTH;++i) {
-			if (_multicastHistory[i][0] == crc) {
-				uint64_t then = _multicastHistory[i][1];
-				_multicastHistory[i][1] = now;
-				return ((now - then) < ZT_MULTICAST_DEDUP_HISTORY_EXPIRE);
-			} else if (_multicastHistory[i][1] < earliest) {
-				earliest = _multicastHistory[i][1];
-				earliestIdx = i;
-			}
+			if ((_multicastHistory[i][0] == crc)&&((now - _multicastHistory[i][1]) < ZT_MULTICAST_DEDUP_HISTORY_EXPIRE))
+				return true;
 		}
-
-		_multicastHistory[earliestIdx][0] = crc; // replace oldest entry
-		_multicastHistory[earliestIdx][1] = now;
-
 		return false;
 	}
 
+	/**
+	 * Add a multicast CRC to the multicast deduplication history
+	 *
+	 * @param crc Multicast CRC
+	 * @param now Current time
+	 */
+	inline void addToDedupHistory(uint64_t crc,uint64_t now)
+		throw()
+	{
+		unsigned int mhi = ++_multicastHistoryPtr % ZT_MULTICAST_DEDUP_HISTORY_LENGTH;
+		_multicastHistory[mhi][0] = crc;
+		_multicastHistory[mhi][1] = now;
+	}
+
 	/**
 	 * Choose peers to send a propagating multicast to
 	 *
@@ -248,7 +262,7 @@ public:
 					// a fact they've already seen this.
 					if ((channelMemberEntry->first != originalSubmitter)&&(channelMemberEntry->first != upstream)) {
 						P peer = topology.getPeer(channelMemberEntry->first);
-						if (peer) {
+						if ((peer)&&(peer->hasActiveDirectPath(now))) {
 							toConsider[sampleSize++] = peer;
 							if (sampleSize >= ZT_MULTICAST_PICK_MAX_SAMPLE_SIZE)
 								break; // abort if we have enough candidates
@@ -289,10 +303,7 @@ public:
 		// Add a supernode if there's nowhere else to go. Supernodes know of all multicast
 		// LIKEs and so can act to bridge sparse multicast groups.
 		if (!picked) {
-			Address avoid[2];
-			avoid[0] = upstream;
-			avoid[1] = originalSubmitter; // otherwise supernodes will play ping pong
-			P peer = topology.getBestSupernode(avoid,2,true);
+			P peer = topology.getBestSupernode(&originalSubmitter,1,true);
 			if (peer)
 				peers[picked++] = peer;
 		}
@@ -334,8 +345,9 @@ private:
 		SHA256_Final(digest,&sha);
 	}
 
-	// [0] - CRC, [1] - timestamp
+	// ring buffer: [0] - CRC, [1] - timestamp
 	uint64_t _multicastHistory[ZT_MULTICAST_DEDUP_HISTORY_LENGTH][2];
+	volatile unsigned int _multicastHistoryPtr;
 
 	// A multicast channel, essentially a pub/sub channel. It consists of a
 	// network ID and a multicast group within that network.

+ 3 - 1
node/Node.cpp

@@ -399,7 +399,7 @@ Node::ReasonForTermination Node::run()
 			try {
 				unsigned long delay = std::min((unsigned long)ZT_MIN_SERVICE_LOOP_INTERVAL,_r->sw->doTimerTasks());
 				uint64_t start = Utils::now();
-				Thread::sleep(delay);
+				_r->mainLoopWaitCondition.wait(delay);
 				lastDelayDelta = (long)(Utils::now() - start) - (long)delay;
 			} catch (std::exception &exc) {
 				LOG("unexpected exception running Switch doTimerTasks: %s",exc.what());
@@ -426,12 +426,14 @@ void Node::terminate()
 	throw()
 {
 	((_NodeImpl *)_impl)->terminateNow = true;
+	((_NodeImpl *)_impl)->renv.mainLoopWaitCondition.signal();
 }
 
 void Node::updateStatusNow()
 	throw()
 {
 	((_NodeImpl *)_impl)->updateStatusNow = true;
+	((_NodeImpl *)_impl)->renv.mainLoopWaitCondition.signal();
 }
 
 class _VersionStringMaker

+ 22 - 8
node/PacketDecoder.cpp

@@ -291,7 +291,7 @@ bool PacketDecoder::_doOK(const RuntimeEnvironment *_r,const SharedPtr<Peer> &pe
 					_r->topology->addPeer(SharedPtr<Peer>(new Peer(_r->identity,Identity(*this,ZT_PROTO_VERB_WHOIS__OK__IDX_IDENTITY))),&PacketDecoder::_CBaddPeerFromWhois,const_cast<void *>((const void *)_r));
 				break;
 			default:
-				TRACE("%s(%s): OK(%s)",source().toString().c_str(),_remoteAddress.toString().c_str(),Packet::verbString(inReVerb));
+				//TRACE("%s(%s): OK(%s)",source().toString().c_str(),_remoteAddress.toString().c_str(),Packet::verbString(inReVerb));
 				break;
 		}
 	} catch (std::exception &ex) {
@@ -336,13 +336,14 @@ bool PacketDecoder::_doRENDEZVOUS(const RuntimeEnvironment *_r,const SharedPtr<P
 {
 	try {
 		Address with(field(ZT_PROTO_VERB_RENDEZVOUS_IDX_ZTADDRESS,ZT_ADDRESS_LENGTH));
-		if (_r->topology->getPeer(with)) {
+		SharedPtr<Peer> withPeer(_r->topology->getPeer(with));
+		if (withPeer) {
 			unsigned int port = at<uint16_t>(ZT_PROTO_VERB_RENDEZVOUS_IDX_PORT);
 			unsigned int addrlen = (*this)[ZT_PROTO_VERB_RENDEZVOUS_IDX_ADDRLEN];
 			if ((port > 0)&&((addrlen == 4)||(addrlen == 16))) {
 				InetAddress atAddr(field(ZT_PROTO_VERB_RENDEZVOUS_IDX_ADDRESS,addrlen),addrlen,port);
 				TRACE("RENDEZVOUS from %s says %s might be at %s, starting NAT-t",source().toString().c_str(),with.toString().c_str(),atAddr.toString().c_str());
-				_r->sw->contact(peer,atAddr);
+				_r->sw->contact(withPeer,atAddr);
 			} else {
 				TRACE("dropped corrupt RENDEZVOUS from %s(%s) (bad address or port)",source().toString().c_str(),_remoteAddress.toString().c_str());
 			}
@@ -398,7 +399,7 @@ bool PacketDecoder::_doMULTICAST_LIKE(const RuntimeEnvironment *_r,const SharedP
 				if (network->isAllowed(source())) {
 					MAC mac(field(ptr,6)); ptr += 6;
 					uint32_t adi = at<uint32_t>(ptr); ptr += 4;
-					TRACE("peer %s likes multicast group %s:%.8lx on network %llu",source().toString().c_str(),mac.toString().c_str(),(unsigned long)adi,nwid);
+					//TRACE("peer %s likes multicast group %s:%.8lx on network %llu",source().toString().c_str(),mac.toString().c_str(),(unsigned long)adi,nwid);
 					_r->multicaster->likesMulticastGroup(nwid,MulticastGroup(mac,adi),source(),now);
 					++numAccepted;
 				} else {
@@ -441,7 +442,9 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
 					unsigned int signaturelen = at<uint16_t>(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SIGNATURE_LENGTH);
 					unsigned char *dataAndSignature = field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,datalen + signaturelen);
 
-					bool isDuplicate = _r->multicaster->checkAndUpdateMulticastHistory(fromMac,mg,dataAndSignature,datalen,network->id(),Utils::now());
+					uint64_t mccrc = Multicaster::computeMulticastDedupCrc(network->id(),fromMac,mg,etherType,dataAndSignature,datalen);
+					uint64_t now = Utils::now();
+					bool isDuplicate = _r->multicaster->checkDuplicate(mccrc,now);
 
 					if (originalSubmitterAddress == _r->identity.address()) {
 						// Technically should not happen, since the original submitter is
@@ -458,10 +461,14 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
 							_step = DECODE_STEP_WAITING_FOR_ORIGINAL_SUBMITTER_LOOKUP;
 							return false;
 						} else if (Multicaster::verifyMulticastPacket(originalSubmitter->identity(),network->id(),fromMac,mg,etherType,dataAndSignature,datalen,dataAndSignature + datalen,signaturelen)) {
+							_r->multicaster->addToDedupHistory(mccrc,now);
+
 							if (!isDuplicate)
 								network->tap().put(fromMac,mg.mac(),etherType,dataAndSignature,datalen);
 
 							if (++hops < ZT_MULTICAST_PROPAGATION_DEPTH) {
+								Address upstream(source()); // save this since we mangle it
+
 								Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES));
 								SharedPtr<Peer> propPeers[ZT_MULTICAST_PROPAGATION_BREADTH];
 								unsigned int np = _r->multicaster->pickNextPropagationPeers(
@@ -469,11 +476,16 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
 									network->id(),
 									mg,
 									originalSubmitterAddress,
-									source(),
+									upstream,
 									bloom,
 									ZT_MULTICAST_PROPAGATION_BREADTH,
 									propPeers,
-									Utils::now());
+									now);
+
+								// In a bit of a hack, we re-use this packet to repeat it
+								// to our multicast propagation recipients. Afterwords we
+								// return true just to be sure this is the end of this
+								// packet's life cycle, since it is now mangled.
 
 								setSource(_r->identity.address());
 								(*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT] = hops;
@@ -481,13 +493,15 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
 								compress();
 
 								for(unsigned int i=0;i<np;++i) {
-									TRACE("propagating multicast from original node %s via %s toward %s",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),propPeers[i]->address().toString().c_str());
+									TRACE("propagating multicast from original node %s: %s -> %s",originalSubmitterAddress.toString().c_str(),upstream.toString().c_str(),propPeers[i]->address().toString().c_str());
 									// Re-use this packet to re-send multicast frame to everyone
 									// downstream from us.
 									newInitializationVector();
 									setDestination(propPeers[i]->address());
 									_r->sw->send(*this,true);
 								}
+
+								return true;
 							} else {
 								TRACE("terminating MULTICAST_FRAME propagation from %s(%s): max depth reached",source().toString().c_str(),_remoteAddress.toString().c_str());
 							}

+ 4 - 0
node/RuntimeEnvironment.hpp

@@ -30,6 +30,7 @@
 
 #include <string>
 #include "Identity.hpp"
+#include "Condition.hpp"
 
 namespace ZeroTier {
 
@@ -73,6 +74,9 @@ public:
 	std::string ownershipVerificationSecret;
 	std::string ownershipVerificationSecretHash; // base64 of SHA-256 X16 rounds
 
+	// signal() to prematurely interrupt main loop wait
+	Condition mainLoopWaitCondition;
+
 	Identity configAuthority;
 	Identity identity;
 

+ 28 - 7
node/Switch.cpp

@@ -161,6 +161,11 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
 
 void Switch::send(const Packet &packet,bool encrypt)
 {
+	if (packet.destination() == _r->identity.address()) {
+		TRACE("BUG: caught attempt to send() to self, ignored");
+		return;
+	}
+
 	//TRACE("%.16llx %s -> %s (size: %u) (enc: %s)",packet.packetId(),Packet::verbString(packet.verb()),packet.destination().toString().c_str(),packet.size(),(encrypt ? "yes" : "no"));
 	if (!_trySend(packet,encrypt)) {
 		Mutex::Lock _l(_txQueue_m);
@@ -195,6 +200,9 @@ bool Switch::sendHELLO(const SharedPtr<Peer> &dest,Demarc::Port localPort,const
 
 bool Switch::unite(const Address &p1,const Address &p2,bool force)
 {
+	if ((p1 == _r->identity.address())||(p2 == _r->identity.address()))
+		return false;
+
 	SharedPtr<Peer> p1p = _r->topology->getPeer(p1);
 	if (!p1p)
 		return false;
@@ -266,10 +274,15 @@ void Switch::contact(const SharedPtr<Peer> &peer,const InetAddress &atAddr)
 {
 	Demarc::Port fromPort = _r->demarc->pick(atAddr);
 	_r->demarc->send(fromPort,atAddr,"\0",1,ZT_FIREWALL_OPENER_HOPS);
-	Mutex::Lock _l(_contactQueue_m);
-	_contactQueue.push_back(ContactQueueEntry(peer,Utils::now() + ZT_RENDEZVOUS_NAT_T_DELAY,fromPort,atAddr));
-	// TODO: there needs to be a mechanism to interrupt Node's waiting to
-	// make sure the fire happens at the right time, but it's not critical.
+
+	{
+		Mutex::Lock _l(_contactQueue_m);
+		_contactQueue.push_back(ContactQueueEntry(peer,Utils::now() + ZT_RENDEZVOUS_NAT_T_DELAY,fromPort,atAddr));
+	}
+
+	// Kick main loop out of wait so that it can pick up this
+	// change to our scheduled timer tasks.
+	_r->mainLoopWaitCondition.signal();
 }
 
 unsigned long Switch::doTimerTasks()
@@ -424,8 +437,8 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer)
 void Switch::_handleRemotePacketFragment(Demarc::Port localPort,const InetAddress &fromAddr,const Buffer<4096> &data)
 {
 	Packet::Fragment fragment(data);
-
 	Address destination(fragment.destination());
+
 	if (destination != _r->identity.address()) {
 		// Fragment is not for us, so try to relay it
 		if (fragment.hops() < ZT_RELAY_MAX_HOPS) {
@@ -493,6 +506,8 @@ void Switch::_handleRemotePacketFragment(Demarc::Port localPort,const InetAddres
 void Switch::_handleRemotePacketHead(Demarc::Port localPort,const InetAddress &fromAddr,const Buffer<4096> &data)
 {
 	SharedPtr<PacketDecoder> packet(new PacketDecoder(data,localPort,fromAddr));
+
+	Address source(packet->source());
 	Address destination(packet->destination());
 
 	if (destination != _r->identity.address()) {
@@ -502,9 +517,15 @@ void Switch::_handleRemotePacketHead(Demarc::Port localPort,const InetAddress &f
 
 			SharedPtr<Peer> relayTo = _r->topology->getPeer(destination);
 			if ((relayTo)&&(relayTo->send(_r,packet->data(),packet->size(),true,Packet::VERB_NOP,Utils::now()))) {
-				unite(packet->source(),destination,false); // periodically try to get them to talk directly
+				unite(source,destination,false); // periodically try to get them to talk directly
 			} else {
-				relayTo = _r->topology->getBestSupernode();
+				// Relay via a supernode if there's no direct path, but pass
+				// source to getBestSupernode() to avoid just in case this is
+				// being passed from another supernode so that we don't just
+				// pass it back to where it came from. This can happen if a
+				// supernode for some reason lacks a direct path to a peer that
+				// it wants to talk to, such as because of Internet weather.
+				relayTo = _r->topology->getBestSupernode(&source,1,true);
 				if (relayTo)
 					relayTo->send(_r,packet->data(),packet->size(),true,Packet::VERB_NOP,Utils::now());
 			}

+ 0 - 1
node/Topology.cpp

@@ -115,7 +115,6 @@ void Topology::addPeer(const SharedPtr<Peer> &candidate,void (*callback)(void *,
 SharedPtr<Peer> Topology::getPeer(const Address &zta)
 {
 	if (zta == _r->identity.address()) {
-		abort();
 		TRACE("BUG: ignored attempt to getPeer() for self, returned NULL");
 		return SharedPtr<Peer>();
 	}