Browse Source

Wire up peer announcement in cluster.

Adam Ierymenko 9 years ago
parent
commit
eb79d4a2f3
5 changed files with 87 additions and 30 deletions
  1. 58 14
      node/Cluster.cpp
  2. 8 2
      node/Cluster.hpp
  3. 0 1
      node/IncomingPacket.cpp
  4. 6 1
      node/Peer.cpp
  5. 15 12
      node/Topology.cpp

+ 58 - 14
node/Cluster.cpp

@@ -263,8 +263,6 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
 									remotePeerAddress.appendTo(rendezvousForDest);
 
 									Buffer<2048> rendezvousForOtherEnd;
-									rendezvousForOtherEnd.addSize(2); // leave room for payload size
-									rendezvousForOtherEnd.append((uint8_t)STATE_MESSAGE_PROXY_SEND);
 									remotePeerAddress.appendTo(rendezvousForOtherEnd);
 									rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS);
 									const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size();
@@ -298,9 +296,8 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
 									}
 
 									if (haveMatch) {
+										_send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
 										RR->sw->send(rendezvousForDest,true,0);
-										rendezvousForOtherEnd.setAt<uint16_t>(0,(uint16_t)(rendezvousForOtherEnd.size() - 2));
-										_send(fromMemberId,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
 									}
 								}
 							}
@@ -331,14 +328,64 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
 
 void Cluster::replicateHavePeer(const Identity &peerId)
 {
+	{	// Use peer affinity table to track our own last announce time for peers
+		_PeerAffinity pa(peerId.address(),_id,RR->node->now());
+		Mutex::Lock _l2(_peerAffinities_m);
+		std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
+		if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
+			if ((pa.timestamp - i->timestamp) >= ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD) {
+				i->timestamp = pa.timestamp;
+				// continue to announcement
+			} else {
+				// we've already announced this peer recently, so skip
+				return;
+			}
+		} else {
+			_peerAffinities.push_back(pa);
+			std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
+			// continue to announcement
+		}
+	}
+
+	// announcement
+	Buffer<4096> buf;
+	peerId.serialize(buf,false);
+	{
+		Mutex::Lock _l(_memberIds_m);
+		for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+			Mutex::Lock _l2(_members[*mid].lock);
+			_send(*mid,STATE_MESSAGE_HAVE_PEER,buf.data(),buf.size());
+		}
+	}
 }
 
 void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group)
 {
+	Buffer<4096> buf;
+	buf.append((uint64_t)nwid);
+	peerAddress.appendTo(buf);
+	group.mac().appendTo(buf);
+	buf.append((uint32_t)group.adi());
+	{
+		Mutex::Lock _l(_memberIds_m);
+		for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+			Mutex::Lock _l2(_members[*mid].lock);
+			_send(*mid,STATE_MESSAGE_MULTICAST_LIKE,buf.data(),buf.size());
+		}
+	}
 }
 
 void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com)
 {
+	Buffer<4096> buf;
+	com.serialize(buf);
+	{
+		Mutex::Lock _l(_memberIds_m);
+		for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+			Mutex::Lock _l2(_members[*mid].lock);
+			_send(*mid,STATE_MESSAGE_COM,buf.data(),buf.size());
+		}
+	}
 }
 
 void Cluster::doPeriodicTasks()
@@ -371,7 +418,7 @@ void Cluster::doPeriodicTasks()
 				alive.append((uint8_t)_zeroTierPhysicalEndpoints.size());
 				for(std::vector<InetAddress>::const_iterator pe(_zeroTierPhysicalEndpoints.begin());pe!=_zeroTierPhysicalEndpoints.end();++pe)
 					pe->serialize(alive);
-				_send(*mid,alive.data(),alive.size());
+				_send(*mid,STATE_MESSAGE_ALIVE,alive.data(),alive.size());
 				_members[*mid].lastAnnouncedAliveTo = now;
 			}
 
@@ -498,18 +545,15 @@ bool Cluster::redirectPeer(const SharedPtr<Peer> &peer,const InetAddress &peerPh
 	}
 }
 
-void Cluster::_send(uint16_t memberId,const void *msg,unsigned int len)
+void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len)
 {
 	_Member &m = _members[memberId];
 	// assumes m.lock is locked!
-	for(;;) {
-		if ((m.q.size() + len) > ZT_CLUSTER_MAX_MESSAGE_LENGTH)
-			_flush(memberId);
-		else {
-			m.q.append(msg,len);
-			break;
-		}
-	}
+	if ((m.q.size() + len + 3) > ZT_CLUSTER_MAX_MESSAGE_LENGTH)
+		_flush(memberId);
+	m.q.append((uint16_t)(len + 1));
+	m.q.append((uint8_t)type);
+	m.q.append(msg,len);
 }
 
 void Cluster::_flush(uint16_t memberId)

+ 8 - 2
node/Cluster.hpp

@@ -42,12 +42,18 @@
 #include "Buffer.hpp"
 #include "Mutex.hpp"
 #include "SharedPtr.hpp"
+#include "Hashtable.hpp"
 
 /**
  * Timeout for cluster members being considered "alive"
  */
 #define ZT_CLUSTER_TIMEOUT 30000
 
+/**
+ * How often should we announce that we have a peer?
+ */
+#define ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD 60000
+
 /**
  * Desired period between doPeriodicTasks() in milliseconds
  */
@@ -238,7 +244,7 @@ public:
 	bool redirectPeer(const SharedPtr<Peer> &peer,const InetAddress &peerPhysicalAddress,bool offload);
 
 private:
-	void _send(uint16_t memberId,const void *msg,unsigned int len);
+	void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len);
 	void _flush(uint16_t memberId);
 
 	// These are initialized in the constructor and remain static
@@ -292,7 +298,7 @@ private:
 	std::vector<uint16_t> _memberIds;
 	Mutex _memberIds_m;
 
-	// Record tracking which members have which peers and how recently they claimed this
+	// Record tracking which members have which peers and how recently they claimed this -- also used to track our last claimed time
 	struct _PeerAffinity
 	{
 		_PeerAffinity(const Address &a,uint16_t mid,uint64_t ts) :

+ 0 - 1
node/IncomingPacket.cpp

@@ -272,7 +272,6 @@ bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR)
 				TRACE("rejected HELLO from %s(%s): packet failed authentication",id.address().toString().c_str(),_remoteAddress.toString().c_str());
 				return true;
 			}
-
 			peer = RR->topology->addPeer(newPeer);
 
 			// Continue at // VALID

+ 6 - 1
node/Peer.cpp

@@ -34,6 +34,7 @@
 #include "Network.hpp"
 #include "AntiRecursion.hpp"
 #include "SelfAwareness.hpp"
+#include "Cluster.hpp"
 
 #include <algorithm>
 
@@ -107,7 +108,6 @@ void Peer::received(
 						// Learn paths if they've been confirmed via a HELLO or an ECHO
 						RemotePath *slot = (RemotePath *)0;
 						if (np < ZT_MAX_PEER_NETWORK_PATHS) {
-							// Add new path
 							slot = &(_paths[np++]);
 						} else {
 							uint64_t slotLRmin = 0xffffffffffffffffULL;
@@ -141,6 +141,11 @@ void Peer::received(
 					}
 				}
 			}
+
+#ifdef ZT_ENABLE_CLUSTER
+			if ((pathIsConfirmed)&&(RR->cluster))
+				RR->cluster->replicateHavePeer(_id);
+#endif
 		}
 
 		if ((now - _lastAnnouncedTo) >= ((ZT_MULTICAST_LIKE_EXPIRE / 2) - 1000)) {

+ 15 - 12
node/Topology.cpp

@@ -122,18 +122,22 @@ Topology::~Topology()
 SharedPtr<Peer> Topology::addPeer(const SharedPtr<Peer> &peer)
 {
 	if (peer->address() == RR->identity.address()) {
-		TRACE("BUG: addNewPeer() caught and ignored attempt to add peer for self");
+		TRACE("BUG: addPeer() caught and ignored attempt to add peer for self");
 		throw std::logic_error("cannot add peer for self");
 	}
 
-	const uint64_t now = RR->node->now();
-	Mutex::Lock _l(_lock);
-
-	SharedPtr<Peer> &p = _peers.set(peer->address(),peer);
-	p->use(now);
-	saveIdentity(p->identity());
+	SharedPtr<Peer> np;
+	{
+		Mutex::Lock _l(_lock);
+		SharedPtr<Peer> &hp = _peers[peer->address()];
+		if (!hp)
+			hp = peer;
+		np = hp;
+	}
+	np->use(RR->node->now());
+	saveIdentity(np->identity());
 
-	return p;
+	return np;
 }
 
 SharedPtr<Peer> Topology::getPeer(const Address &zta)
@@ -143,13 +147,12 @@ SharedPtr<Peer> Topology::getPeer(const Address &zta)
 		return SharedPtr<Peer>();
 	}
 
-	const uint64_t now = RR->node->now();
 	Mutex::Lock _l(_lock);
 
 	SharedPtr<Peer> &ap = _peers[zta];
 
 	if (ap) {
-		ap->use(now);
+		ap->use(RR->node->now());
 		return ap;
 	}
 
@@ -157,13 +160,13 @@ SharedPtr<Peer> Topology::getPeer(const Address &zta)
 	if (id) {
 		try {
 			ap = SharedPtr<Peer>(new Peer(RR->identity,id));
-			ap->use(now);
+			ap->use(RR->node->now());
 			return ap;
 		} catch ( ... ) {} // invalid identity?
 	}
 
+	// If we get here it means we read an invalid cache identity or had some other error
 	_peers.erase(zta);
-
 	return SharedPtr<Peer>();
 }