Adam Ierymenko пре 9 година
родитељ
комит
7711eba297

+ 2 - 2
include/ZeroTierOne.h

@@ -131,12 +131,12 @@ extern "C" {
 /**
  * Maximum number of cluster members (and max member ID plus one)
  */
-#define ZT_CLUSTER_MAX_MEMBERS 256
+#define ZT_CLUSTER_MAX_MEMBERS 128
 
 /**
  * Maximum allowed cluster message length in bytes
  */
-#define ZT_CLUSTER_MAX_MESSAGE_LENGTH 65535
+#define ZT_CLUSTER_MAX_MESSAGE_LENGTH (1444 * 6)
 
 /**
  * A null/empty sockaddr (all zero) to signify an unspecified socket address

+ 3 - 1
node/Cluster.cpp

@@ -504,7 +504,7 @@ void Cluster::doPeriodicTasks()
 
 void Cluster::addMember(uint16_t memberId)
 {
-	if (memberId >= ZT_CLUSTER_MAX_MEMBERS)
+	if ((memberId >= ZT_CLUSTER_MAX_MEMBERS)||(memberId == _id))
 		return;
 
 	Mutex::Lock _l2(_members[memberId].lock);
@@ -622,6 +622,8 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
 
 void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len)
 {
+	if ((len + 3) > (ZT_CLUSTER_MAX_MESSAGE_LENGTH - (24 + 2 + 2))) // sanity check
+		return;
 	_Member &m = _members[memberId];
 	// assumes m.lock is locked!
 	if ((m.q.size() + len + 3) > ZT_CLUSTER_MAX_MESSAGE_LENGTH)

+ 2 - 2
node/Cluster.hpp

@@ -47,7 +47,7 @@
 /**
  * Timeout for cluster members being considered "alive"
  */
-#define ZT_CLUSTER_TIMEOUT 30000
+#define ZT_CLUSTER_TIMEOUT 10000
 
 /**
  * How often should we announce that we have a peer?
@@ -57,7 +57,7 @@
 /**
  * Desired period between doPeriodicTasks() in milliseconds
  */
-#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 50
+#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 100
 
 namespace ZeroTier {
 

+ 9 - 0
osdep/Phy.hpp

@@ -64,6 +64,12 @@
 #include <netinet/in.h>
 #include <netinet/tcp.h>
 
+#if defined(__linux__) || defined(linux) || defined(__LINUX__) || defined(__linux)
+#ifndef IPV6_DONTFRAG
+#define IPV6_DONTFRAG 62
+#endif
+#endif
+
 #define ZT_PHY_SOCKFD_TYPE int
 #define ZT_PHY_SOCKFD_NULL (-1)
 #define ZT_PHY_SOCKFD_VALID(s) ((s) > -1)
@@ -374,6 +380,9 @@ public:
 				f = 1; setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f));
 #ifdef IPV6_MTU_DISCOVER
 				f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_MTU_DISCOVER,&f,sizeof(f));
+#endif
+#ifdef IPV6_DONTFRAG
+				f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_DONTFRAG,&f,sizeof(f));
 #endif
 			}
 			f = 0; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f));

+ 125 - 0
service/ClusterDefinition.hpp

@@ -0,0 +1,125 @@
+/*
+ * ZeroTier One - Network Virtualization Everywhere
+ * Copyright (C) 2011-2015  ZeroTier, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * ZeroTier may be used and distributed under the terms of the GPLv3, which
+ * are available at: http://www.gnu.org/licenses/gpl-3.0.html
+ *
+ * If you would like to embed ZeroTier into a commercial application or
+ * redistribute it in a modified binary form, please contact ZeroTier Networks
+ * LLC. Start here: http://www.zerotier.com/
+ */
+
+#ifndef ZT_CLUSTERDEFINITION_HPP
+#define ZT_CLUSTERDEFINITION_HPP
+
+#ifdef ZT_ENABLE_CLUSTER
+
+#include <vector>
+#include <algorithm>
+
+#include "../node/Constants.hpp"
+#include "../node/Utils.hpp"
+#include "../osdep/OSUtils.hpp"
+
+namespace ZeroTier {
+
+/**
+ * Parser for cluster definition file
+ */
+class ClusterDefinition
+{
+public:
+	struct MemberDefinition
+	{
+		MemberDefinition() : id(0),x(0),y(0),z(0) { name[0] = (char)0; }
+
+		unsigned int id;
+		int x,y,z;
+		char name[256];
+		InetAddress clusterEndpoint;
+		std::vector<InetAddress> zeroTierEndpoints;
+	};
+
+	ClusterDefinition(uint64_t myAddress,const char *pathToClusterFile)
+	{
+		std::string cf;
+		if (!OSUtils::readFile(pathToClusterFile,cf))
+			return;
+
+		char myAddressStr[64];
+		Utils::snprintf(myAddressStr,sizeof(myAddressStr),"%.10llx",myAddress);
+
+		std::vector<std::string> lines(Utils::split(cf.c_str(),"\r\n","",""));
+		for(std::vector<std::string>::iterator l(lines.begin());l!=lines.end();++l) {
+			std::vector<std::string> fields(Utils::split(l->c_str()," \t","",""));
+			if ((fields.size() < 5)||(fields[0][0] == '#')||(fields[0] != myAddressStr))
+				continue;
+
+			int id = Utils::strToUInt(fields[1].c_str());
+			if ((id < 0)||(id > ZT_CLUSTER_MAX_MEMBERS))
+				continue;
+			MemberDefinition &md = _md[id];
+
+			md.id = (unsigned int)id;
+			if (fields.size() >= 6) {
+				std::vector<std::string> xyz(Utils::split(fields[5].c_str(),",","",""));
+				md.x = (xyz.size() > 0) ? Utils::strToInt(xyz[0].c_str()) : 0;
+				md.y = (xyz.size() > 1) ? Utils::strToInt(xyz[1].c_str()) : 0;
+				md.z = (xyz.size() > 2) ? Utils::strToInt(xyz[2].c_str()) : 0;
+			}
+			Utils::scopy(md.name,sizeof(md.name),fields[2].c_str());
+			md.clusterEndpoint.fromString(fields[3]);
+			if (!md.clusterEndpoint)
+				continue;
+			std::vector<std::string> zips(Utils::split(fields[4].c_str(),",","",""));
+			for(std::vector<std::string>::iterator zip(zips.begin());zip!=zips.end();++zip) {
+				InetAddress i;
+				i.fromString(*zip);
+				if (i)
+					md.zeroTierEndpoints.push_back(i);
+			}
+
+			_ids.push_back((unsigned int)id);
+		}
+
+		std::sort(_ids.begin(),_ids.end());
+	}
+
+	inline const MemberDefinition &operator[](unsigned int id) const throw() { return _md[id]; }
+	inline unsigned int size() const throw() { return (unsigned int)_ids.size(); }
+	inline const std::vector<unsigned int> &ids() const throw() { return _ids; }
+
+	inline std::vector<MemberDefinition> members() const
+	{
+		std::vector<MemberDefinition> m;
+		for(std::vector<unsigned int>::const_iterator i(_ids.begin());i!=_ids.end();++i)
+			m.push_back(_md[*i]);
+		return m;
+	}
+
+private:
+	MemberDefinition _md[ZT_CLUSTER_MAX_MEMBERS];
+	std::vector<unsigned int> _ids;
+};
+
+} // namespace ZeroTier
+
+#endif // ZT_ENABLE_CLUSTER
+
+#endif

+ 0 - 1
service/ClusterGeoIpService.cpp

@@ -165,7 +165,6 @@ void ClusterGeoIpService::threadMain()
 									{
 										Mutex::Lock _l2(_cache_m);
 										_cache[rip] = ce;
-										std::cout << ">> " << linebuf << std::endl;
 									}
 								}
 							}

+ 128 - 17
service/OneService.cpp

@@ -58,6 +58,8 @@
 
 #include "OneService.hpp"
 #include "ControlPlane.hpp"
+#include "ClusterGeoIpService.hpp"
+#include "ClusterDefinition.hpp"
 
 /**
  * Uncomment to enable UDP breakage switch
@@ -366,6 +368,11 @@ static int SnodeDataStorePutFunction(ZT_Node *node,void *uptr,const char *name,c
 static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len);
 static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
 
+#ifdef ZT_ENABLE_CLUSTER
+static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len);
+static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z);
+#endif
+
 static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
 
 static int ShttpOnMessageBegin(http_parser *parser);
@@ -419,26 +426,32 @@ class OneServiceImpl : public OneService
 {
 public:
 	OneServiceImpl(const char *hp,unsigned int port) :
-		_homePath((hp) ? hp : "."),
-		_tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY),
+		_homePath((hp) ? hp : ".")
+		,_tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY)
 #ifdef ZT_ENABLE_NETWORK_CONTROLLER
-		_controller((SqliteNetworkController *)0),
+		,_controller((SqliteNetworkController *)0)
 #endif
-		_phy(this,false,true),
-		_node((Node *)0),
-		_controlPlane((ControlPlane *)0),
-		_lastDirectReceiveFromGlobal(0),
-		_lastSendToGlobal(0),
-		_lastRestart(0),
-		_nextBackgroundTaskDeadline(0),
-		_tcpFallbackTunnel((TcpConnection *)0),
-		_termReason(ONE_STILL_RUNNING),
-		_port(0),
+		,_phy(this,false,true)
+		,_node((Node *)0)
+		,_controlPlane((ControlPlane *)0)
+		,_lastDirectReceiveFromGlobal(0)
+		,_lastSendToGlobal(0)
+		,_lastRestart(0)
+		,_nextBackgroundTaskDeadline(0)
+		,_tcpFallbackTunnel((TcpConnection *)0)
+		,_termReason(ONE_STILL_RUNNING)
+		,_port(0)
 #ifdef ZT_USE_MINIUPNPC
-		_v4UpnpUdpSocket((PhySocket *)0),
-		_upnpClient((UPNPClient *)0),
+		,_v4UpnpUdpSocket((PhySocket *)0)
+		,_upnpClient((UPNPClient *)0)
 #endif
-		_run(true)
+#ifdef ZT_ENABLE_CLUSTER
+		,_clusterMessageSocket((PhySocket *)0)
+		,_clusterGeoIpService((ClusterGeoIpService *)0)
+		,_clusterDefinition((ClusterDefinition *)0)
+		,_clusterMemberId(0)
+#endif
+		,_run(true)
 	{
 		const int portTrials = (port == 0) ? 256 : 1; // if port is 0, pick random
 		for(int k=0;k<portTrials;++k) {
@@ -510,12 +523,19 @@ public:
 		_phy.close(_v6UdpSocket);
 		_phy.close(_v4TcpListenSocket);
 		_phy.close(_v6TcpListenSocket);
+#ifdef ZT_ENABLE_CLUSTER
+		_phy.close(_clusterMessageSocket);
+#endif
 #ifdef ZT_USE_MINIUPNPC
 		_phy.close(_v4UpnpUdpSocket);
 		delete _upnpClient;
 #endif
 #ifdef ZT_ENABLE_NETWORK_CONTROLLER
 		delete _controller;
+#endif
+#ifdef ZT_ENABLE_CLUSTER
+		delete _clusterGeoIpService;
+		delete _clusterDefinition;
 #endif
 	}
 
@@ -556,6 +576,70 @@ public:
 			_node->setNetconfMaster((void *)_controller);
 #endif
 
+#ifdef ZT_ENABLE_CLUSTER
+		if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str())) {
+			_clusterDefinition = new ClusterDefinition(_node->address(),(_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str());
+			if (_clusterDefinition->size() > 0) {
+				std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members());
+				for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) {
+					PhySocket *cs = _phy.udpBind(reinterpret_cast<const struct sockaddr *>(&(m->clusterEndpoint)));
+					if (cs) {
+						if (_clusterMessageSocket) {
+							_phy.close(_clusterMessageSocket,false);
+							_phy.close(cs,false);
+
+							Mutex::Lock _l(_termReason_m);
+							_termReason = ONE_UNRECOVERABLE_ERROR;
+							_fatalErrorMessage = "Cluster: can't determine my cluster member ID: able to bind more than one cluster message socket IP/port!";
+							return _termReason;
+						}
+						_clusterMessageSocket = cs;
+						_clusterMemberId = m->id;
+					}
+				}
+
+				if (!_clusterMessageSocket) {
+					Mutex::Lock _l(_termReason_m);
+					_termReason = ONE_UNRECOVERABLE_ERROR;
+					_fatalErrorMessage = "Cluster: can't determine my cluster member ID: unable to bind to any cluster message socket IP/port.";
+					return _termReason;
+				}
+
+				if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str()))
+					_clusterGeoIpService = new ClusterGeoIpService((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str());
+
+				const ClusterDefinition::MemberDefinition &me = (*_clusterDefinition)[_clusterMemberId];
+				InetAddress endpoints[255];
+				unsigned int numEndpoints = 0;
+				for(std::vector<InetAddress>::const_iterator i(me.zeroTierEndpoints.begin());i!=me.zeroTierEndpoints.end();++i)
+					endpoints[numEndpoints++] = *i;
+
+				if (_node->clusterInit(
+					_clusterMemberId,
+					reinterpret_cast<const struct sockaddr_storage *>(endpoints),
+					numEndpoints,
+					me.x,
+					me.y,
+					me.z,
+					&SclusterSendFunction,
+					this,
+					(_clusterGeoIpService) ? &SclusterGeoIpFunction : 0,
+					this) == ZT_RESULT_OK) {
+
+					std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members());
+					for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) {
+						if (m->id != _clusterMemberId)
+							_node->clusterAddMember(m->id);
+					}
+
+				}
+			} else {
+				delete _clusterDefinition;
+				_clusterDefinition = (ClusterDefinition *)0;
+			}
+		}
+#endif
+
 			_controlPlane = new ControlPlane(this,_node,(_homePath + ZT_PATH_SEPARATOR_S + "ui").c_str());
 			_controlPlane->addAuthToken(authToken.c_str());
 
@@ -781,10 +865,18 @@ public:
 
 	inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len)
 	{
+#ifdef ZT_ENABLE_CLUSTER
+		if (sock == _clusterMessageSocket) {
+			_node->clusterHandleIncomingMessage(data,len);
+			return;
+		}
+#endif
+
 #ifdef ZT_BREAK_UDP
 		if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP"))
 			return;
 #endif
+
 		if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
 			_lastDirectReceiveFromGlobal = OSUtils::now();
 		ZT_ResultCode rc = _node->processWirePacket(
@@ -1303,7 +1395,6 @@ public:
 			_phy.close(tc->sock); // will call close handler, which deletes from _tcpConnections
 	}
 
-private:
 	std::string _dataStorePrepPath(const char *name) const
 	{
 		std::string p(_homePath);
@@ -1358,6 +1449,13 @@ private:
 	UPNPClient *_upnpClient;
 #endif
 
+#ifdef ZT_ENABLE_CLUSTER
+	PhySocket *_clusterMessageSocket;
+	ClusterGeoIpService *_clusterGeoIpService;
+	ClusterDefinition *_clusterDefinition;
+	unsigned int _clusterMemberId;
+#endif
+
 	bool _run;
 	Mutex _run_m;
 };
@@ -1375,6 +1473,19 @@ static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct soc
 static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
 { reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkFrameFunction(nwid,sourceMac,destMac,etherType,vlanId,data,len); }
 
+static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len)
+{
+	OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr);
+	const ClusterDefinition::MemberDefinition &md = (*(impl->_clusterDefinition))[toMemberId];
+	if (md.clusterEndpoint)
+		impl->_phy.udpSend(impl->_clusterMessageSocket,reinterpret_cast<const struct sockaddr *>(&(md.clusterEndpoint)),data,len);
+}
+static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z)
+{
+	OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr);
+	return (int)(impl->_clusterGeoIpService->locate(*(reinterpret_cast<const InetAddress *>(addr)),*x,*y,*z));
+}
+
 static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
 { reinterpret_cast<OneServiceImpl *>(uptr)->tapFrameHandler(nwid,from,to,etherType,vlanId,data,len); }
 

+ 3 - 0
service/OneService.hpp

@@ -43,6 +43,9 @@ namespace ZeroTier {
  * periodically checked and updates are automatically downloaded, verified
  * against a built-in list of update signing keys, and installed. This is
  * only supported for certain platforms.
+ *
+ * If built with ZT_ENABLE_CLUSTER, a 'cluster' file is checked and if
+ * present is read to determine the identity of other cluster members.
  */
 class OneService
 {