소스 검색

Change how cluster relays packets -- just PROXY_UNITE and then send packet via normal ZeroTier front plane -- more efficient and eliminates fragmentation issues.

Adam Ierymenko 9 년 전
부모
커밋
f692cec763
5개의 변경된 파일133개의 추가작업 그리고 131개의 파일을 삭제
  1. 1 1
      include/ZeroTierOne.h
  2. 96 99
      node/Cluster.cpp
  3. 14 8
      node/Cluster.hpp
  4. 20 17
      node/Switch.cpp
  5. 2 6
      node/Switch.hpp

+ 1 - 1
include/ZeroTierOne.h

@@ -141,7 +141,7 @@ extern "C" {
 /**
  * Maximum allowed cluster message length in bytes
  */
-#define ZT_CLUSTER_MAX_MESSAGE_LENGTH (1444 * 4)
+#define ZT_CLUSTER_MAX_MESSAGE_LENGTH (1500 - 48)
 
 /**
  * A null/empty sockaddr (all zero) to signify an unspecified socket address

+ 96 - 99
node/Cluster.cpp

@@ -250,102 +250,87 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
 							}
 						}	break;
 
-						case STATE_MESSAGE_RELAY: {
+						case STATE_MESSAGE_PROXY_UNITE: {
+							const Address localPeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
+							const Address remotePeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
 							const unsigned int numRemotePeerPaths = dmsg[ptr++];
 							InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max
 							for(unsigned int i=0;i<numRemotePeerPaths;++i)
 								ptr += remotePeerPaths[i].deserialize(dmsg,ptr);
-							const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2;
-							const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen;
-
-							if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address
-								const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH);
-								TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths);
-
-								SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress));
-								if (destinationPeer) {
-									if (
-									    (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&&
-									    (numRemotePeerPaths > 0)&&
-									    (packetLen >= 18)&&
-									    (reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR)
-									   ) {
-										// If remote peer paths were sent with this relayed packet, we do
-										// RENDEZVOUS. It's handled here for cluster-relayed packets since
-										// we don't have both Peer records so this is a different path.
-
-										const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH);
-
-										InetAddress bestDestV4,bestDestV6;
-										destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6);
-										InetAddress bestRemoteV4,bestRemoteV6;
-										for(unsigned int i=0;i<numRemotePeerPaths;++i) {
-											if ((bestRemoteV4)&&(bestRemoteV6))
-												break;
-											switch(remotePeerPaths[i].ss_family) {
-												case AF_INET:
-													if (!bestRemoteV4)
-														bestRemoteV4 = remotePeerPaths[i];
-													break;
-												case AF_INET6:
-													if (!bestRemoteV6)
-														bestRemoteV6 = remotePeerPaths[i];
-													break;
-											}
-										}
-
-										Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
-										rendezvousForDest.append((uint8_t)0);
-										remotePeerAddress.appendTo(rendezvousForDest);
-
-										Buffer<2048> rendezvousForOtherEnd;
-										remotePeerAddress.appendTo(rendezvousForOtherEnd);
-										rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS);
-										const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size();
-										rendezvousForOtherEnd.addSize(2); // space for actual packet payload length
-										rendezvousForOtherEnd.append((uint8_t)0); // flags == 0
-										destinationAddress.appendTo(rendezvousForOtherEnd);
-
-										bool haveMatch = false;
-										if ((bestDestV6)&&(bestRemoteV6)) {
-											haveMatch = true;
-
-											rendezvousForDest.append((uint16_t)bestRemoteV6.port());
-											rendezvousForDest.append((uint8_t)16);
-											rendezvousForDest.append(bestRemoteV6.rawIpData(),16);
-
-											rendezvousForOtherEnd.append((uint16_t)bestDestV6.port());
-											rendezvousForOtherEnd.append((uint8_t)16);
-											rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16);
-											rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16));
-										} else if ((bestDestV4)&&(bestRemoteV4)) {
-											haveMatch = true;
-
-											rendezvousForDest.append((uint16_t)bestRemoteV4.port());
-											rendezvousForDest.append((uint8_t)4);
-											rendezvousForDest.append(bestRemoteV4.rawIpData(),4);
-
-											rendezvousForOtherEnd.append((uint16_t)bestDestV4.port());
-											rendezvousForOtherEnd.append((uint8_t)4);
-											rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4);
-											rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4));
-										}
 
-										if (haveMatch) {
-											_send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
-											RR->sw->send(rendezvousForDest,true,0);
-										}
+							TRACE("[%u] requested proxy unite between local peer %s and remote peer %s",(unsigned int)fromMemberId,localPeerAddress.toString().c_str(),remotePeerAddress.toString().c_str());
+
+							SharedPtr<Peer> localPeer(RR->topology->getPeer(localPeerAddress));
+							if ((localPeer)&&(numRemotePeerPaths > 0)) {
+								InetAddress bestLocalV4,bestLocalV6;
+								localPeer->getBestActiveAddresses(RR->node->now(),bestLocalV4,bestLocalV6);
+
+								InetAddress bestRemoteV4,bestRemoteV6;
+								for(unsigned int i=0;i<numRemotePeerPaths;++i) {
+									if ((bestRemoteV4)&&(bestRemoteV6))
+										break;
+									switch(remotePeerPaths[i].ss_family) {
+										case AF_INET:
+											if (!bestRemoteV4)
+												bestRemoteV4 = remotePeerPaths[i];
+											break;
+										case AF_INET6:
+											if (!bestRemoteV6)
+												bestRemoteV6 = remotePeerPaths[i];
+											break;
 									}
 								}
+
+								Packet rendezvousForLocal(localPeerAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
+								rendezvousForLocal.append((uint8_t)0);
+								remotePeerAddress.appendTo(rendezvousForLocal);
+
+								Buffer<2048> rendezvousForRemote;
+								remotePeerAddress.appendTo(rendezvousForRemote);
+								rendezvousForRemote.append((uint8_t)Packet::VERB_RENDEZVOUS);
+								const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForRemote.size();
+								rendezvousForRemote.addSize(2); // space for actual packet payload length
+								rendezvousForRemote.append((uint8_t)0); // flags == 0
+								localPeerAddress.appendTo(rendezvousForRemote);
+
+								bool haveMatch = false;
+								if ((bestLocalV6)&&(bestRemoteV6)) {
+									haveMatch = true;
+
+									rendezvousForLocal.append((uint16_t)bestRemoteV6.port());
+									rendezvousForLocal.append((uint8_t)16);
+									rendezvousForLocal.append(bestRemoteV6.rawIpData(),16);
+
+									rendezvousForRemote.append((uint16_t)bestLocalV6.port());
+									rendezvousForRemote.append((uint8_t)16);
+									rendezvousForRemote.append(bestLocalV6.rawIpData(),16);
+									rendezvousForRemote.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16));
+								} else if ((bestLocalV4)&&(bestRemoteV4)) {
+									haveMatch = true;
+
+									rendezvousForLocal.append((uint16_t)bestRemoteV4.port());
+									rendezvousForLocal.append((uint8_t)4);
+									rendezvousForLocal.append(bestRemoteV4.rawIpData(),4);
+
+									rendezvousForRemote.append((uint16_t)bestLocalV4.port());
+									rendezvousForRemote.append((uint8_t)4);
+									rendezvousForRemote.append(bestLocalV4.rawIpData(),4);
+									rendezvousForRemote.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4));
+								}
+
+								if (haveMatch) {
+									_send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size());
+									RR->sw->send(rendezvousForLocal,true,0);
+								}
 							}
 						}	break;
 
 						case STATE_MESSAGE_PROXY_SEND: {
-							const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
+							const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
 							const Packet::Verb verb = (Packet::Verb)dmsg[ptr++];
 							const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2;
 							Packet outp(rcpt,RR->identity.address(),verb);
-							outp.append(dmsg.field(ptr,len),len);
+							outp.append(dmsg.field(ptr,len),len); ptr += len;
 							RR->sw->send(outp,true,0);
 							TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len);
 						}	break;
@@ -364,13 +349,13 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
 	}
 }
 
-bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len)
+bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite)
 {
 	if (len > 16384) // sanity check
 		return false;
 
 	uint64_t mostRecentTimestamp = 0;
-	uint16_t canHasPeer = 0;
+	unsigned int canHasPeer = 0;
 
 	{	// Anyone got this peer?
 		Mutex::Lock _l2(_peerAffinities_m);
@@ -387,25 +372,37 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
 
 	const uint64_t now = RR->node->now();
 	if ((now - mostRecentTimestamp) < ZT_PEER_ACTIVITY_TIMEOUT) {
-		Buffer<16384> buf;
-
-		InetAddress v4,v6;
-		if (fromPeerAddress) {
-			SharedPtr<Peer> fromPeer(RR->topology->getPeer(fromPeerAddress));
-			if (fromPeer)
-				fromPeer->getBestActiveAddresses(now,v4,v6);
+		Buffer<2048> buf;
+
+		if (unite) {
+			InetAddress v4,v6;
+			if (fromPeerAddress) {
+				SharedPtr<Peer> fromPeer(RR->topology->getPeer(fromPeerAddress));
+				if (fromPeer)
+					fromPeer->getBestActiveAddresses(now,v4,v6);
+			}
+			uint8_t addrCount = 0;
+			if (v4)
+				++addrCount;
+			if (v6)
+				++addrCount;
+			if (addrCount) {
+				toPeerAddress.appendTo(buf);
+				fromPeerAddress.appendTo(buf);
+				buf.append(addrCount);
+				if (v4)
+					v4.serialize(buf);
+				if (v6)
+					v6.serialize(buf);
+			}
 		}
-		buf.append((uint8_t)( (v4) ? ((v6) ? 2 : 1) : ((v6) ? 1 : 0) ));
-		if (v4)
-			v4.serialize(buf);
-		if (v6)
-			v6.serialize(buf);
-		buf.append((uint16_t)len);
-		buf.append(data,len);
 
 		{
 			Mutex::Lock _l2(_members[canHasPeer].lock);
-			_send(canHasPeer,STATE_MESSAGE_RELAY,buf.data(),buf.size());
+			if (buf.size() > 0)
+				_send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
+			if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0)
+				RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len);
 		}
 
 		TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer);

+ 14 - 8
node/Cluster.hpp

@@ -57,7 +57,7 @@
 /**
  * Desired period between doPeriodicTasks() in milliseconds
  */
-#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 50
+#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 100
 
 namespace ZeroTier {
 
@@ -136,13 +136,18 @@ public:
 		STATE_MESSAGE_COM = 4,
 
 		/**
-		 * Relay a packet to a peer:
-		 *   <[1] 8-bit number of sending peer active path addresses>
-		 *   <[...] series of serialized InetAddresses of sending peer's paths>
-		 *   <[2] 16-bit packet length>
-		 *   <[...] packet or packet fragment>
+		 * Request that VERB_RENDEZVOUS be sent to a peer that we have:
+		 *   <[5] ZeroTier address of peer on recipient's side>
+		 *   <[5] ZeroTier address of peer on sender's side>
+		 *   <[1] 8-bit number of sender's peer's active path addresses>
+		 *   <[...] series of serialized InetAddresses of sender's peer's paths>
+		 *
+		 * This requests that we perform NAT-t introduction between a peer that
+		 * we have and one on the sender's side. The sender furnishes contact
+		 * info for its peer, and we send VERB_RENDEZVOUS to both sides: to ours
+		 * directly and with PROXY_SEND to theirs.
 		 */
-		STATE_MESSAGE_RELAY = 5,
+		STATE_MESSAGE_PROXY_UNITE = 5,
 
 		/**
 		 * Request that a cluster member send a packet to a locally-known peer:
@@ -211,9 +216,10 @@ public:
 	 * @param toPeerAddress Destination peer address
 	 * @param data Packet or packet fragment data
 	 * @param len Length of packet or fragment
+	 * @param unite If true, also request proxy unite across cluster
 	 * @return True if this data was sent via another cluster member, false if none have this peer
 	 */
-	bool sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len);
+	bool sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite);
 
 	/**
 	 * Advertise to the cluster that we have this peer

+ 20 - 17
node/Switch.cpp

@@ -303,11 +303,10 @@ void Switch::send(const Packet &packet,bool encrypt,uint64_t nwid)
 	}
 }
 
-bool Switch::unite(const Address &p1,const Address &p2,bool force)
+bool Switch::unite(const Address &p1,const Address &p2)
 {
 	if ((p1 == RR->identity.address())||(p2 == RR->identity.address()))
 		return false;
-
 	SharedPtr<Peer> p1p = RR->topology->getPeer(p1);
 	if (!p1p)
 		return false;
@@ -317,14 +316,6 @@ bool Switch::unite(const Address &p1,const Address &p2,bool force)
 
 	const uint64_t now = RR->node->now();
 
-	{
-		Mutex::Lock _l(_lastUniteAttempt_m);
-		uint64_t &luts = _lastUniteAttempt[_LastUniteKey(p1,p2)];
-		if (((now - luts) < ZT_MIN_UNITE_INTERVAL)&&(!force))
-			return false;
-		luts = now;
-	}
-
 	std::pair<InetAddress,InetAddress> cg(Peer::findCommonGround(*p1p,*p2p,now));
 	if ((!(cg.first))||(cg.first.ipScope() != cg.second.ipScope()))
 		return false;
@@ -571,7 +562,7 @@ void Switch::_handleRemotePacketFragment(const InetAddress &localAddr,const Inet
 			SharedPtr<Peer> relayTo = RR->topology->getPeer(destination);
 			if ((!relayTo)||(!relayTo->send(RR,fragment.data(),fragment.size(),RR->node->now()))) {
 #ifdef ZT_ENABLE_CLUSTER
-				if ((RR->cluster)&&(RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size())))
+				if ((RR->cluster)&&(RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false)))
 					return; // sent by way of another member of this cluster
 #endif
 
@@ -634,7 +625,8 @@ void Switch::_handleRemotePacketFragment(const InetAddress &localAddr,const Inet
 
 void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len)
 {
-	SharedPtr<IncomingPacket> packet(new IncomingPacket(data,len,localAddr,fromAddr,RR->node->now()));
+	const uint64_t now = RR->node->now();
+	SharedPtr<IncomingPacket> packet(new IncomingPacket(data,len,localAddr,fromAddr,now));
 
 	Address source(packet->source());
 	Address destination(packet->destination());
@@ -652,17 +644,18 @@ void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddr
 			packet->incrementHops();
 
 			SharedPtr<Peer> relayTo = RR->topology->getPeer(destination);
-			if ((relayTo)&&((relayTo->send(RR,packet->data(),packet->size(),RR->node->now())))) {
-				unite(source,destination,false);
+			if ((relayTo)&&((relayTo->send(RR,packet->data(),packet->size(),now)))) {
+				if (_shouldTryUnite(now,source,destination))
+					unite(source,destination);
 			} else {
 #ifdef ZT_ENABLE_CLUSTER
-				if ((RR->cluster)&&(RR->cluster->sendViaCluster(source,destination,packet->data(),packet->size())))
+				if ((RR->cluster)&&(RR->cluster->sendViaCluster(source,destination,packet->data(),packet->size(),_shouldTryUnite(now,source,destination))))
 					return; // sent by way of another member of this cluster
 #endif
 
 				relayTo = RR->topology->getBestRoot(&source,1,true);
 				if (relayTo)
-					relayTo->send(RR,packet->data(),packet->size(),RR->node->now());
+					relayTo->send(RR,packet->data(),packet->size(),now);
 			}
 		} else {
 			TRACE("dropped relay %s(%s) -> %s, max hops exceeded",packet->source().toString().c_str(),fromAddr.toString().c_str(),destination.toString().c_str());
@@ -677,7 +670,7 @@ void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddr
 		if (!dq.creationTime) {
 			// If we have no other fragments yet, create an entry and save the head
 
-			dq.creationTime = RR->node->now();
+			dq.creationTime = now;
 			dq.frag0 = packet;
 			dq.totalFragments = 0; // 0 == unknown, waiting for Packet::Fragment
 			dq.haveFragments = 1; // head is first bit (left to right)
@@ -805,4 +798,14 @@ bool Switch::_trySend(const Packet &packet,bool encrypt,uint64_t nwid)
 	return false;
 }
 
+bool Switch::_shouldTryUnite(const uint64_t now,const Address &p1,const Address &p2)
+{
+	Mutex::Lock _l(_lastUniteAttempt_m);
+	uint64_t &luts = _lastUniteAttempt[_LastUniteKey(p1,p2)];
+	if ((now - luts) < ZT_MIN_UNITE_INTERVAL)
+		return false;
+	luts = now;
+	return true;
+}
+
 } // namespace ZeroTier

+ 2 - 6
node/Switch.hpp

@@ -127,15 +127,10 @@ public:
 	 * This only works if both peers are known, with known working direct
 	 * links to this peer. The best link for each peer is sent to the other.
 	 *
-	 * A rate limiter is in effect via the _lastUniteAttempt map. If force
-	 * is true, a unite attempt is made even if one has been made less than
-	 * ZT_MIN_UNITE_INTERVAL milliseconds ago.
-	 *
 	 * @param p1 One of two peers (order doesn't matter)
 	 * @param p2 Second of pair
-	 * @param force If true, send now regardless of interval
 	 */
-	bool unite(const Address &p1,const Address &p2,bool force);
+	bool unite(const Address &p1,const Address &p2);
 
 	/**
 	 * Attempt NAT traversal to peer at a given physical address
@@ -185,6 +180,7 @@ private:
 	void _handleRemotePacketHead(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len);
 	Address _sendWhoisRequest(const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted);
 	bool _trySend(const Packet &packet,bool encrypt,uint64_t nwid);
+	bool _shouldTryUnite(const uint64_t now,const Address &p1,const Address &p2);
 
 	const RuntimeEnvironment *const RR;
 	uint64_t _lastBeaconResponse;