Ver código fonte

Cluster status plumbing.

Adam Ierymenko 9 anos atrás
pai
commit
865acfa40f
5 arquivos alterados com 160 adições e 0 exclusões
  1. 77 0
      include/ZeroTierOne.h
  2. 56 0
      node/Cluster.cpp
  3. 7 0
      node/Cluster.hpp
  4. 15 0
      node/Topology.cpp
  5. 5 0
      node/Topology.hpp

+ 77 - 0
include/ZeroTierOne.h

@@ -133,6 +133,11 @@ extern "C" {
  */
 #define ZT_CLUSTER_MAX_MEMBERS 128
 
+/**
+ * Maximum number of physical ZeroTier addresses a cluster member can report
+ */
+#define ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES 16
+
 /**
  * Maximum allowed cluster message length in bytes
  */
@@ -879,6 +884,78 @@ typedef struct {
 	unsigned int nextHopCount;
 } ZT_CircuitTestReport;
 
+/**
+ * A cluster member's status
+ */
+typedef struct {
+	/**
+	 * This cluster member's ID (from 0 to 1-ZT_CLUSTER_MAX_MEMBERS)
+	 */
+	unsigned int id;
+
+	/**
+	 * Number of milliseconds since last 'alive' heartbeat message received via cluster backplane address
+	 */
+	unsigned int msSinceLastHeartbeat;
+
+	/**
+	 * Non-zero if cluster member is alive
+	 */
+	int alive;
+
+	/**
+	 * X, Y, and Z coordinates of this member (if specified, otherwise zero)
+	 *
+	 * What these mean depends on the location scheme being used for
+	 * location-aware clustering. At present this is GeoIP and these
+	 * will be the X, Y, and Z coordinates of the location on a spherical
+	 * approximation of Earth where Earth's core is the origin (in km).
+	 * They don't have to be perfect and need only be comparable with others
+	 * to find shortest path via the standard vector distance formula.
+	 */
+	int x,y,z;
+
+	/**
+	 * Cluster member's last reported load
+	 */
+	uint64_t load;
+
+	/**
+	 * Number of peers this cluster member "has"
+	 */
+	uint64_t peers;
+
+	/**
+	 * Physical ZeroTier endpoints for this member (where peers are sent when directed here)
+	 */
+	struct sockaddr_storage zeroTierPhysicalEndpoints[ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES];
+
+	/**
+	 * Number of physical ZeroTier endpoints this member is announcing
+	 */
+	unsigned int numZeroTierPhysicalEndpoints;
+} ZT_ClusterMemberStatus;
+
+/**
+ * ZeroTier cluster status
+ */
+typedef struct {
+	/**
+	 * My cluster member ID (a record for 'self' is included in member[])
+	 */
+	unsigned int myId;
+
+	/**
+	 * Number of cluster members
+	 */
+	unsigned int clusterSize;
+
+	/**
+	 * Cluster member statuses
+	 */
+	ZT_ClusterMemberStatus member[ZT_CLUSTER_MAX_MEMBERS];
+} ZT_ClusterStatus;
+
 /**
  * An instance of a ZeroTier One node (opaque)
  */

+ 56 - 0
node/Cluster.cpp

@@ -638,6 +638,62 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
 	}
 }
 
+void Cluster::status(ZT_ClusterStatus &status) const
+{
+	const uint64_t now = RR->node->now();
+	memset(&status,0,sizeof(ZT_ClusterStatus));
+	ZT_ClusterMemberStatus *ms[ZT_CLUSTER_MAX_MEMBERS];
+	memset(ms,0,sizeof(ms));
+
+	status.myId = _id;
+
+	ms[_id] = &(status.member[status.clusterSize++]);
+	ms[_id]->id = _id;
+	ms[_id]->alive = 1;
+	ms[_id]->x = _x;
+	ms[_id]->y = _y;
+	ms[_id]->z = _z;
+	ms[_id]->peers = RR->topology->countAlive();
+	for(std::vector<InetAddress>::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) {
+		if (ms[_id]->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
+			break;
+		memcpy(&(ms[_id]->zeroTierPhysicalEndpoints[ms[_id]->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage));
+	}
+
+	{
+		Mutex::Lock _l1(_memberIds_m);
+		for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+			if (status.clusterSize >= ZT_CLUSTER_MAX_MEMBERS) // sanity check
+				break;
+			ZT_ClusterMemberStatus *s = ms[*mid] = &(status.member[status.clusterSize++]);
+			_Member &m = _members[*mid];
+			Mutex::Lock ml(m.lock);
+
+			s->id = *mid;
+			s->msSinceLastHeartbeat = (unsigned int)std::min((uint64_t)(~((unsigned int)0)),(now - m.lastReceivedAliveAnnouncement));
+			s->alive = (s->msSinceLastHeartbeat < ZT_CLUSTER_TIMEOUT) ? 1 : 0;
+			s->x = m.x;
+			s->y = m.y;
+			s->z = m.z;
+			s->load = m.load;
+			for(std::vector<InetAddress>::const_iterator ep(m.zeroTierPhysicalEndpoints.begin());ep!=m.zeroTierPhysicalEndpoints.end();++ep) {
+				if (s->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
+					break;
+				memcpy(&(s->zeroTierPhysicalEndpoints[s->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage));
+			}
+		}
+	}
+
+	{
+		Mutex::Lock _l2(_peerAffinities_m);
+		for(std::vector<_PeerAffinity>::const_iterator pi(_peerAffinities.begin());pi!=_peerAffinities.end();++pi) {
+			unsigned int mid = pi->clusterMemberId();
+			if ((ms[mid])&&(mid != _id)&&((now - pi->timestamp) < ZT_PEER_ACTIVITY_TIMEOUT))
+				++ms[mid]->peers;
+		}
+	}
+}
+
 void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len)
 {
 	if ((len + 3) > (ZT_CLUSTER_MAX_MESSAGE_LENGTH - (24 + 2 + 2))) // sanity check

+ 7 - 0
node/Cluster.hpp

@@ -254,6 +254,13 @@ public:
 	 */
 	bool redirectPeer(const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload);
 
+	/**
+	 * Fill out ZT_ClusterStatus structure (from core API)
+	 *
+	 * @param status Reference to structure to hold result (anything there is replaced)
+	 */
+	void status(ZT_ClusterStatus &status) const;
+
 private:
 	void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len);
 	void _flush(uint16_t memberId);

+ 15 - 0
node/Topology.cpp

@@ -332,6 +332,21 @@ void Topology::clean(uint64_t now)
 	}
 }
 
+unsigned long Topology::countAlive() const
+{
+	const uint64_t now = RR->node->now();
+	unsigned long cnt = 0;
+	Mutex::Lock _l(_lock);
+	Hashtable< Address,SharedPtr<Peer> >::Iterator i(const_cast<Topology *>(this)->_peers);
+	Address *a = (Address *)0;
+	SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
+	while (i.next(a,p)) {
+		if ((*p)->alive(now))
+			++cnt;
+	}
+	return cnt;
+}
+
 Identity Topology::_getIdentity(const Address &zta)
 {
 	char p[128];

+ 5 - 0
node/Topology.hpp

@@ -192,6 +192,11 @@ public:
 	 */
 	void clean(uint64_t now);
 
+	/**
+	 * @return Number of 'alive' peers
+	 */
+	unsigned long countAlive() const;
+
 	/**
 	 * Apply a function or function object to all peers
 	 *