Prechádzať zdrojové kódy

Cluster fix: was accumulating remote endpoints endlessly.

Adam Ierymenko 9 rokov pred
rodič
commit
964b30902a
1 zmenil súbory, kde vykonal 195 pridanie a 184 odobranie
  1. 195 184
      node/Cluster.cpp

+ 195 - 184
node/Cluster.cpp

@@ -143,212 +143,223 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
 		return;
 	const uint16_t fromMemberId = dmsg.at<uint16_t>(0);
 	unsigned int ptr = 2;
-	if (fromMemberId == _id)
+	if (fromMemberId == _id) // sanity check: we don't talk to ourselves
 		return;
 	const uint16_t toMemberId = dmsg.at<uint16_t>(ptr);
 	ptr += 2;
-	if (toMemberId != _id)
+	if (toMemberId != _id) // sanity check: message not for us?
 		return;
 
-	_Member &m = _members[fromMemberId];
-	Mutex::Lock mlck(m.lock);
-
-	try {
-		while (ptr < dmsg.size()) {
-			const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2;
-			const unsigned int nextPtr = ptr + mlen;
-
-			int mtype = -1;
-			try {
-				switch((StateMessageType)(mtype = (int)dmsg[ptr++])) {
-					default:
-						break;
-
-					case STATE_MESSAGE_ALIVE: {
-						ptr += 7; // skip version stuff, not used yet
-						m.x = dmsg.at<int32_t>(ptr); ptr += 4;
-						m.y = dmsg.at<int32_t>(ptr); ptr += 4;
-						m.z = dmsg.at<int32_t>(ptr); ptr += 4;
-						ptr += 8; // skip local clock, not used
-						m.load = dmsg.at<uint64_t>(ptr); ptr += 8;
-						ptr += 8; // skip flags, unused
+	{	// make sure sender is actually considered a member
+		Mutex::Lock _l3(_memberIds_m);
+		if (std::find(_memberIds.begin(),_memberIds.end(),fromMemberId) == _memberIds.end())
+			return;
+	}
+
+	{
+		_Member &m = _members[fromMemberId];
+		Mutex::Lock mlck(m.lock);
+
+		try {
+			while (ptr < dmsg.size()) {
+				const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2;
+				const unsigned int nextPtr = ptr + mlen;
+				if (nextPtr > dmsg.size())
+					break;
+
+				int mtype = -1;
+				try {
+					switch((StateMessageType)(mtype = (int)dmsg[ptr++])) {
+						default:
+							break;
+
+						case STATE_MESSAGE_ALIVE: {
+							ptr += 7; // skip version stuff, not used yet
+							m.x = dmsg.at<int32_t>(ptr); ptr += 4;
+							m.y = dmsg.at<int32_t>(ptr); ptr += 4;
+							m.z = dmsg.at<int32_t>(ptr); ptr += 4;
+							ptr += 8; // skip local clock, not used
+							m.load = dmsg.at<uint64_t>(ptr); ptr += 8;
+							ptr += 8; // skip flags, unused
 #ifdef ZT_TRACE
-						std::string addrs;
+							std::string addrs;
 #endif
-						unsigned int physicalAddressCount = dmsg[ptr++];
-						for(unsigned int i=0;i<physicalAddressCount;++i) {
-							m.zeroTierPhysicalEndpoints.push_back(InetAddress());
-							ptr += m.zeroTierPhysicalEndpoints.back().deserialize(dmsg,ptr);
-							if (!(m.zeroTierPhysicalEndpoints.back())) {
-								m.zeroTierPhysicalEndpoints.pop_back();
-							}
+							unsigned int physicalAddressCount = dmsg[ptr++];
+							m.zeroTierPhysicalEndpoints.clear();
+							for(unsigned int i=0;i<physicalAddressCount;++i) {
+								m.zeroTierPhysicalEndpoints.push_back(InetAddress());
+								ptr += m.zeroTierPhysicalEndpoints.back().deserialize(dmsg,ptr);
+								if (!(m.zeroTierPhysicalEndpoints.back())) {
+									m.zeroTierPhysicalEndpoints.pop_back();
+								}
 #ifdef ZT_TRACE
-							else {
-								if (addrs.length() > 0)
-									addrs.push_back(',');
-								addrs.append(m.zeroTierPhysicalEndpoints.back().toString());
-							}
+								else {
+									if (addrs.length() > 0)
+										addrs.push_back(',');
+									addrs.append(m.zeroTierPhysicalEndpoints.back().toString());
+								}
 #endif
-						}
-						m.lastReceivedAliveAnnouncement = RR->node->now();
+							}
+							m.lastReceivedAliveAnnouncement = RR->node->now();
 #ifdef ZT_TRACE
-						TRACE("[%u] I'm alive! send me peers at %s",(unsigned int)fromMemberId,addrs.c_str());
+							TRACE("[%u] I'm alive! peers may be redirected to: %s",(unsigned int)fromMemberId,addrs.c_str());
 #endif
-					}	break;
-
-					case STATE_MESSAGE_HAVE_PEER: {
-						try {
-							Identity id;
-							ptr += id.deserialize(dmsg,ptr);
-							if (id) {
-								RR->topology->saveIdentity(id);
-
-								{	// Add or update peer affinity entry
-									_PeerAffinity pa(id.address(),fromMemberId,RR->node->now());
-									Mutex::Lock _l2(_peerAffinities_m);
-									std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
-									if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
-										i->timestamp = pa.timestamp;
-									} else {
-										_peerAffinities.push_back(pa);
-										std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
-									}
-		 						}
+						}	break;
+
+						case STATE_MESSAGE_HAVE_PEER: {
+							try {
+								Identity id;
+								ptr += id.deserialize(dmsg,ptr);
+								if (id) {
+									RR->topology->saveIdentity(id);
+
+									{	// Add or update peer affinity entry
+										_PeerAffinity pa(id.address(),fromMemberId,RR->node->now());
+										Mutex::Lock _l2(_peerAffinities_m);
+										std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
+										if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
+											i->timestamp = pa.timestamp;
+										} else {
+											_peerAffinities.push_back(pa);
+											std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
+										}
+			 						}
 
-		 						TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str());
-		 					}
-						} catch ( ... ) {
-							// ignore invalid identities
-						}
-					}	break;
-
-					case STATE_MESSAGE_MULTICAST_LIKE: {
-						const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8;
-						const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
-						const MAC mac(dmsg.field(ptr,6),6); ptr += 6;
-						const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4;
-						RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address);
-						TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid);
-					}	break;
-
-					case STATE_MESSAGE_COM: {
-						CertificateOfMembership com;
-						ptr += com.deserialize(dmsg,ptr);
-						if (com) {
-							TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision());
-						}
-					}	break;
-
-					case STATE_MESSAGE_RELAY: {
-						const unsigned int numRemotePeerPaths = dmsg[ptr++];
-						InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max
-						for(unsigned int i=0;i<numRemotePeerPaths;++i)
-							ptr += remotePeerPaths[i].deserialize(dmsg,ptr);
-						const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2;
-						const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen;
-
-						if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address
-							const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH);
-							TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths);
-
-							SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress));
-							if (destinationPeer) {
-								if (
-								    (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&&
-								    (numRemotePeerPaths > 0)&&
-								    (packetLen >= 18)&&
-								    (reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR)
-								   ) {
-									// If remote peer paths were sent with this relayed packet, we do
-									// RENDEZVOUS. It's handled here for cluster-relayed packets since
-									// we don't have both Peer records so this is a different path.
-
-									const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH);
-
-									InetAddress bestDestV4,bestDestV6;
-									destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6);
-									InetAddress bestRemoteV4,bestRemoteV6;
-									for(unsigned int i=0;i<numRemotePeerPaths;++i) {
-										if ((bestRemoteV4)&&(bestRemoteV6))
-											break;
-										switch(remotePeerPaths[i].ss_family) {
-											case AF_INET:
-												if (!bestRemoteV4)
-													bestRemoteV4 = remotePeerPaths[i];
-												break;
-											case AF_INET6:
-												if (!bestRemoteV6)
-													bestRemoteV6 = remotePeerPaths[i];
+			 						TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str());
+			 					}
+							} catch ( ... ) {
+								// ignore invalid identities
+							}
+						}	break;
+
+						case STATE_MESSAGE_MULTICAST_LIKE: {
+							const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8;
+							const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
+							const MAC mac(dmsg.field(ptr,6),6); ptr += 6;
+							const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4;
+							RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address);
+							TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid);
+						}	break;
+
+						case STATE_MESSAGE_COM: {
+							CertificateOfMembership com;
+							ptr += com.deserialize(dmsg,ptr);
+							if (com) {
+								TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision());
+							}
+						}	break;
+
+						case STATE_MESSAGE_RELAY: {
+							const unsigned int numRemotePeerPaths = dmsg[ptr++];
+							InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max
+							for(unsigned int i=0;i<numRemotePeerPaths;++i)
+								ptr += remotePeerPaths[i].deserialize(dmsg,ptr);
+							const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2;
+							const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen;
+
+							if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address
+								const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH);
+								TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths);
+
+								SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress));
+								if (destinationPeer) {
+									if (
+									    (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&&
+									    (numRemotePeerPaths > 0)&&
+									    (packetLen >= 18)&&
+									    (reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR)
+									   ) {
+										// If remote peer paths were sent with this relayed packet, we do
+										// RENDEZVOUS. It's handled here for cluster-relayed packets since
+										// we don't have both Peer records so this is a different path.
+
+										const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH);
+
+										InetAddress bestDestV4,bestDestV6;
+										destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6);
+										InetAddress bestRemoteV4,bestRemoteV6;
+										for(unsigned int i=0;i<numRemotePeerPaths;++i) {
+											if ((bestRemoteV4)&&(bestRemoteV6))
 												break;
+											switch(remotePeerPaths[i].ss_family) {
+												case AF_INET:
+													if (!bestRemoteV4)
+														bestRemoteV4 = remotePeerPaths[i];
+													break;
+												case AF_INET6:
+													if (!bestRemoteV6)
+														bestRemoteV6 = remotePeerPaths[i];
+													break;
+											}
 										}
-									}
 
-									Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
-									rendezvousForDest.append((uint8_t)0);
-									remotePeerAddress.appendTo(rendezvousForDest);
-
-									Buffer<2048> rendezvousForOtherEnd;
-									remotePeerAddress.appendTo(rendezvousForOtherEnd);
-									rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS);
-									const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size();
-									rendezvousForOtherEnd.addSize(2); // space for actual packet payload length
-									rendezvousForOtherEnd.append((uint8_t)0); // flags == 0
-									destinationAddress.appendTo(rendezvousForOtherEnd);
-
-									bool haveMatch = false;
-									if ((bestDestV6)&&(bestRemoteV6)) {
-										haveMatch = true;
-
-										rendezvousForDest.append((uint16_t)bestRemoteV6.port());
-										rendezvousForDest.append((uint8_t)16);
-										rendezvousForDest.append(bestRemoteV6.rawIpData(),16);
-
-										rendezvousForOtherEnd.append((uint16_t)bestDestV6.port());
-										rendezvousForOtherEnd.append((uint8_t)16);
-										rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16);
-										rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16));
-									} else if ((bestDestV4)&&(bestRemoteV4)) {
-										haveMatch = true;
-
-										rendezvousForDest.append((uint16_t)bestRemoteV4.port());
-										rendezvousForDest.append((uint8_t)4);
-										rendezvousForDest.append(bestRemoteV4.rawIpData(),4);
-
-										rendezvousForOtherEnd.append((uint16_t)bestDestV4.port());
-										rendezvousForOtherEnd.append((uint8_t)4);
-										rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4);
-										rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4));
-									}
+										Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
+										rendezvousForDest.append((uint8_t)0);
+										remotePeerAddress.appendTo(rendezvousForDest);
+
+										Buffer<2048> rendezvousForOtherEnd;
+										remotePeerAddress.appendTo(rendezvousForOtherEnd);
+										rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS);
+										const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size();
+										rendezvousForOtherEnd.addSize(2); // space for actual packet payload length
+										rendezvousForOtherEnd.append((uint8_t)0); // flags == 0
+										destinationAddress.appendTo(rendezvousForOtherEnd);
+
+										bool haveMatch = false;
+										if ((bestDestV6)&&(bestRemoteV6)) {
+											haveMatch = true;
+
+											rendezvousForDest.append((uint16_t)bestRemoteV6.port());
+											rendezvousForDest.append((uint8_t)16);
+											rendezvousForDest.append(bestRemoteV6.rawIpData(),16);
+
+											rendezvousForOtherEnd.append((uint16_t)bestDestV6.port());
+											rendezvousForOtherEnd.append((uint8_t)16);
+											rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16);
+											rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16));
+										} else if ((bestDestV4)&&(bestRemoteV4)) {
+											haveMatch = true;
+
+											rendezvousForDest.append((uint16_t)bestRemoteV4.port());
+											rendezvousForDest.append((uint8_t)4);
+											rendezvousForDest.append(bestRemoteV4.rawIpData(),4);
+
+											rendezvousForOtherEnd.append((uint16_t)bestDestV4.port());
+											rendezvousForOtherEnd.append((uint8_t)4);
+											rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4);
+											rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4));
+										}
 
-									if (haveMatch) {
-										_send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
-										RR->sw->send(rendezvousForDest,true,0);
+										if (haveMatch) {
+											_send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
+											RR->sw->send(rendezvousForDest,true,0);
+										}
 									}
 								}
 							}
-						}
-					}	break;
-
-					case STATE_MESSAGE_PROXY_SEND: {
-						const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
-						const Packet::Verb verb = (Packet::Verb)dmsg[ptr++];
-						const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2;
-						Packet outp(rcpt,RR->identity.address(),verb);
-						outp.append(dmsg.field(ptr,len),len);
-						RR->sw->send(outp,true,0);
-						TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len);
-					}	break;
+						}	break;
+
+						case STATE_MESSAGE_PROXY_SEND: {
+							const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
+							const Packet::Verb verb = (Packet::Verb)dmsg[ptr++];
+							const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2;
+							Packet outp(rcpt,RR->identity.address(),verb);
+							outp.append(dmsg.field(ptr,len),len);
+							RR->sw->send(outp,true,0);
+							TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len);
+						}	break;
+					}
+				} catch ( ... ) {
+					TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype);
+					// drop invalids
 				}
-			} catch ( ... ) {
-				TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype);
-				// drop invalids
-			}
 
-			ptr = nextPtr;
+				ptr = nextPtr;
+			}
+		} catch ( ... ) {
+			TRACE("invalid message (outer loop), discarding");
+			// drop invalids
 		}
-	} catch ( ... ) {
-		TRACE("invalid message (outer loop), discarding");
-		// drop invalids
 	}
 }