Selaa lähdekoodia

Ready to test.

Adam Ierymenko 9 vuotta sitten
vanhempi
commit
21341ab15c
3 muutettua tiedostoa jossa 63 lisäystä ja 15 poistoa
  1. 61 13
      node/Cluster.cpp
  2. 1 1
      node/Multicaster.cpp
  3. 1 1
      node/Multicaster.hpp

+ 61 - 13
node/Cluster.cpp

@@ -161,8 +161,6 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
 			return;
 	}
 
-	_Member &m = _members[fromMemberId];
-
 	try {
 		while (ptr < dmsg.size()) {
 			const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2;
@@ -177,6 +175,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
 						break;
 
 					case CLUSTER_MESSAGE_ALIVE: {
+						_Member &m = _members[fromMemberId];
 						Mutex::Lock mlck(m.lock);
 						ptr += 7; // skip version stuff, not used yet
 						m.x = dmsg.at<int32_t>(ptr); ptr += 4;
@@ -253,7 +252,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
 							peer->identity().serialize(buf);
 							Mutex::Lock _l2(_members[fromMemberId].lock);
 							_send(fromMemberId,CLUSTER_MESSAGE_HAVE_PEER,buf.data(),buf.size());
-							_flush(fromMemberId); // lookups are latency sensitive
+							_flush(fromMemberId);
 						}
 					}	break;
 
@@ -340,8 +339,11 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
 							}
 
 							if (haveMatch) {
-								_send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size());
-								_flush(fromMemberId);
+								{
+									Mutex::Lock _l2(_members[fromMemberId].lock);
+									_send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size());
+									_flush(fromMemberId);
+								}
 								RR->sw->send(rendezvousForLocal,true,0);
 							}
 						}
@@ -382,7 +384,6 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
 		return;
 
 	const uint64_t now = RR->node->now();
-	unsigned int canHasPeer = 0;
 
 	uint64_t mostRecentTs = 0;
 	unsigned int mostRecentMemberId = 0xffffffff;
@@ -413,7 +414,7 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
 				Mutex::Lock _l2(_members[*mid].lock);
 				_send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH);
 				if ((enqueueAndWait)&&(queueCount == 0))
-					_flush(*mid); // send first query immediately to reduce latency
+					_flush(*mid);
 			}
 		}
 
@@ -453,14 +454,14 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
 	{
 		Mutex::Lock _l2(_members[mostRecentMemberId].lock);
 		if (buf.size() > 0) {
-			_send(canHasPeer,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
-			_flush(canHasPeer); // latency sensitive
+			_send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
+			_flush(mostRecentMemberId);
 		}
 		if (_members[mostRecentMemberId].zeroTierPhysicalEndpoints.size() > 0)
-			RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len);
+			RR->node->putPacket(InetAddress(),_members[mostRecentMemberId].zeroTierPhysicalEndpoints.front(),data,len);
 	}
 
-	TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer);
+	TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
 }
 
 void Cluster::sendDistributedQuery(const Packet &pkt)
@@ -472,7 +473,7 @@ void Cluster::sendDistributedQuery(const Packet &pkt)
 	for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
 		Mutex::Lock _l2(_members[*mid].lock);
 		_send(*mid,CLUSTER_MESSAGE_REMOTE_PACKET,buf.data(),buf.size());
-		_flush(*mid); // these tend to be latency-sensitive
+		_flush(*mid);
 	}
 }
 
@@ -524,7 +525,7 @@ void Cluster::doPeriodicTasks()
 				_send(*mid,CLUSTER_MESSAGE_ALIVE,alive.data(),alive.size());
 			}
 
-			_flush(*mid); // does nothing if nothing to flush
+			_flush(*mid);
 		}
 	}
 
@@ -740,6 +741,53 @@ void Cluster::_flush(uint16_t memberId)
 	}
 }
 
+void Cluster::_doREMOTE_WHOIS(uint64_t fromMemberId,const Packet &remotep)
+{
+	if (remotep.payloadLength() >= ZT_ADDRESS_LENGTH) {
+		Identity queried(RR->topology->getIdentity(Address(remotep.payload(),ZT_ADDRESS_LENGTH)));
+		if (queried) {
+			Buffer<1024> routp;
+			remotep.source().appendTo(routp);
+			routp.append((uint8_t)Packet::VERB_OK);
+			routp.append((uint8_t)Packet::VERB_WHOIS);
+			routp.append(remotep.packetId());
+			queried.serialize(routp);
+
+			Mutex::Lock _l2(_members[fromMemberId].lock);
+			_send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size());
+			_flush(fromMemberId);
+		}
+	}
+}
+
+void Cluster::_doREMOTE_MULTICAST_GATHER(uint64_t fromMemberId,const Packet &remotep)
+{
+	const uint64_t nwid = remotep.at<uint64_t>(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_NETWORK_ID);
+	const MulticastGroup mg(MAC(remotep.field(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_MAC,6),6),remotep.at<uint32_t>(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_ADI));
+	unsigned int gatherLimit = remotep.at<uint32_t>(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_GATHER_LIMIT);
+	const Address remotePeerAddress(remotep.source());
+
+	//TRACE("<<MC %s(%s) GATHER up to %u in %.16llx/%s",source().toString().c_str(),_remoteAddress.toString().c_str(),gatherLimit,nwid,mg.toString().c_str());
+
+	if (gatherLimit) {
+		Buffer<ZT_PROTO_MAX_PACKET_LENGTH> routp;
+		remotePeerAddress.appendTo(routp);
+		routp.append((uint8_t)Packet::VERB_OK);
+		routp.append((uint8_t)Packet::VERB_MULTICAST_GATHER);
+		routp.append(remotep.packetId());
+		routp.append(nwid);
+		mg.mac().appendTo(routp);
+		routp.append((uint32_t)mg.adi());
+
+		if (gatherLimit > ((ZT_CLUSTER_MAX_MESSAGE_LENGTH - 64) / 5))
+			gatherLimit = ((ZT_CLUSTER_MAX_MESSAGE_LENGTH - 64) / 5);
+		if (RR->mc->gather(remotePeerAddress,nwid,mg,routp,gatherLimit)) {
+			Mutex::Lock _l2(_members[fromMemberId].lock);
+			_send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size());
+		}
+	}
+}
+
 } // namespace ZeroTier
 
 #endif // ZT_ENABLE_CLUSTER

+ 1 - 1
node/Multicaster.cpp

@@ -78,7 +78,7 @@ void Multicaster::remove(uint64_t nwid,const MulticastGroup &mg,const Address &m
 	}
 }
 
-unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const MulticastGroup &mg,Packet &appendTo,unsigned int limit) const
+unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const MulticastGroup &mg,Buffer<ZT_PROTO_MAX_PACKET_LENGTH> &appendTo,unsigned int limit) const
 {
 	unsigned char *p;
 	unsigned int added = 0,i,k,rptr,totalKnown = 0;

+ 1 - 1
node/Multicaster.hpp

@@ -146,7 +146,7 @@ public:
 	 * @return Number of addresses appended
 	 * @throws std::out_of_range Buffer overflow writing to packet
 	 */
-	unsigned int gather(const Address &queryingPeer,uint64_t nwid,const MulticastGroup &mg,Packet &appendTo,unsigned int limit) const;
+	unsigned int gather(const Address &queryingPeer,uint64_t nwid,const MulticastGroup &mg,Buffer<ZT_PROTO_MAX_PACKET_LENGTH> &appendTo,unsigned int limit) const;
 
 	/**
 	 * Get subscribers to a multicast group