Browse Source

Replace cluster-geo subprocess with in-memory loaded CSV of GeoIP data. This is faster, cheaper, more reliable. We use https://db-ip.com/ but others would work too.

Adam Ierymenko 9 years ago
parent
commit
587b1e05d1
4 changed files with 120 additions and 44 deletions
  1. 48 2
      service/ClusterDefinition.hpp
  2. 42 8
      service/ClusterGeoIpService.cpp
  3. 28 14
      service/ClusterGeoIpService.hpp
  4. 2 20
      service/OneService.cpp

+ 48 - 2
service/ClusterDefinition.hpp

@@ -26,14 +26,17 @@
 
 #include "../node/Constants.hpp"
 #include "../node/Utils.hpp"
+#include "../node/NonCopyable.hpp"
 #include "../osdep/OSUtils.hpp"
 
+#include "ClusterGeoIpService.hpp"
+
 namespace ZeroTier {
 
 /**
  * Parser for cluster definition file
  */
-class ClusterDefinition
+class ClusterDefinition : NonCopyable
 {
 public:
 	struct MemberDefinition
@@ -45,8 +48,17 @@ public:
 		char name[256];
 		InetAddress clusterEndpoint;
 		std::vector<InetAddress> zeroTierEndpoints;
+
+		//inline operator<(const MemberDefinition &md) const { return (id < md.id); } // sort order
 	};
 
+	/**
+	 * Load and initialize cluster definition and GeoIP data if any
+	 *
+	 * @param myAddress My ZeroTier address
+	 * @param pathToClusterFile Path to cluster definition file
+	 * @throws std::runtime_error Invalid cluster definition or unable to load data
+	 */
 	ClusterDefinition(uint64_t myAddress,const char *pathToClusterFile)
 	{
 		std::string cf;
@@ -62,9 +74,23 @@ public:
 			if ((fields.size() < 5)||(fields[0][0] == '#')||(fields[0] != myAddressStr))
 				continue;
 
+			// <address> geo <CSV path> <ip start column> <ip end column> <latitutde column> <longitude column>
+			if (fields[1] == "geo") {
+				if ((fields.size() >= 7)&&(OSUtils::fileExists(fields[2].c_str()))) {
+					int ipStartColumn = Utils::strToInt(fields[3].c_str());
+					int ipEndColumn = Utils::strToInt(fields[4].c_str());
+					int latitudeColumn = Utils::strToInt(fields[5].c_str());
+					int longitudeColumn = Utils::strToInt(fields[6].c_str());
+					if (_geo.load(fields[2].c_str(),ipStartColumn,ipEndColumn,latitudeColumn,longitudeColumn) <= 0)
+						throw std::runtime_error(std::string("failed to load geo-ip data from ")+fields[2]);
+				}
+				continue;
+			}
+
+			// <address> <ID> <name> <backplane IP/port(s)> <ZT frontplane IP/port(s)> <x,y,z>
 			int id = Utils::strToUInt(fields[1].c_str());
 			if ((id < 0)||(id > ZT_CLUSTER_MAX_MEMBERS))
-				continue;
+				throw std::runtime_error(std::string("invalid cluster member ID: ")+fields[1]);
 			MemberDefinition &md = _md[id];
 
 			md.id = (unsigned int)id;
@@ -92,10 +118,29 @@ public:
 		std::sort(_ids.begin(),_ids.end());
 	}
 
+	/**
+	 * @return All member definitions in this cluster by ID (ID is array index)
+	 */
 	inline const MemberDefinition &operator[](unsigned int id) const throw() { return _md[id]; }
+
+	/**
+	 * @return Number of members in this cluster
+	 */
 	inline unsigned int size() const throw() { return (unsigned int)_ids.size(); }
+
+	/**
+	 * @return IDs of members in this cluster sorted by ID
+	 */
 	inline const std::vector<unsigned int> &ids() const throw() { return _ids; }
 
+	/**
+	 * @return GeoIP service for this cluster
+	 */
+	inline ClusterGeoIpService &geo() throw() { return _geo; }
+
+	/**
+	 * @return A vector (new copy) containing all cluster members
+	 */
 	inline std::vector<MemberDefinition> members() const
 	{
 		std::vector<MemberDefinition> m;
@@ -107,6 +152,7 @@ public:
 private:
 	MemberDefinition _md[ZT_CLUSTER_MAX_MEMBERS];
 	std::vector<unsigned int> _ids;
+	ClusterGeoIpService _geo;
 };
 
 } // namespace ZeroTier

+ 42 - 8
service/ClusterGeoIpService.cpp

@@ -25,7 +25,6 @@
 #include "ClusterGeoIpService.hpp"
 
 #include "../node/Utils.hpp"
-#include "../node/InetAddress.hpp"
 #include "../osdep/OSUtils.hpp"
 
 #define ZT_CLUSTERGEOIPSERVICE_FILE_MODIFICATION_CHECK_EVERY 10000
@@ -69,12 +68,13 @@ bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z)
 		std::vector<_V4E>::const_iterator i(std::upper_bound(_v4db.begin(),_v4db.end(),key));
 		while (i != _v4db.begin()) {
 			--i;
-			if ((key->start >= i->start)&&(key->start <= i->end)) {
+			if ((key.start >= i->start)&&(key.start <= i->end)) {
 				x = i->x;
 				y = i->y;
 				z = i->z;
+				//printf("%s : %f,%f %d,%d,%d\n",ip.toIpString().c_str(),i->lat,i->lon,x,y,z);
 				return true;
-			} else if ((key->start > i->start)&&(key->start > i->end))
+			} else if ((key.start > i->start)&&(key.start > i->end))
 				break;
 		}
 	} else if ((ip.ss_family == AF_INET6)&&(_v6db.size() > 0)) {
@@ -83,12 +83,13 @@ bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z)
 		std::vector<_V6E>::const_iterator i(std::upper_bound(_v6db.begin(),_v6db.end(),key));
 		while (i != _v6db.begin()) {
 			--i;
-			const int s_vs_s = memcmp(key->start,i->start,16);
-			const int s_vs_e = memcmp(key->start,i->end,16);
+			const int s_vs_s = memcmp(key.start,i->start,16);
+			const int s_vs_e = memcmp(key.start,i->end,16);
 			if ((s_vs_s >= 0)&&(s_vs_e <= 0)) {
 				x = i->x;
 				y = i->y;
 				z = i->z;
+				//printf("%s : %f,%f %d,%d,%d\n",ip.toIpString().c_str(),i->lat,i->lon,x,y,z);
 				return true;
 			} else if ((s_vs_s > 0)&&(s_vs_e > 0))
 				break;
@@ -98,7 +99,7 @@ bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z)
 	return false;
 }
 
-static void _parseLine(const char *line,std::vector<_V4E> &v4db,std::vector<_V6E> &v6db,int ipStartColumn,int ipEndColumn,int latitudeColumn,int longitudeColumn)
+void ClusterGeoIpService::_parseLine(const char *line,std::vector<_V4E> &v4db,std::vector<_V6E> &v6db,int ipStartColumn,int ipEndColumn,int latitudeColumn,int longitudeColumn)
 {
 	std::vector<std::string> ls(Utils::split(line,",\t","\\","\"'"));
 	if ( ((ipStartColumn >= 0)&&(ipStartColumn < (int)ls.size()))&&
@@ -114,24 +115,30 @@ static void _parseLine(const char *line,std::vector<_V4E> &v4db,std::vector<_V6E
 			const double latRadians = lat * 0.01745329251994; // PI / 180
 			const double lonRadians = lon * 0.01745329251994; // PI / 180
 			const double cosLat = cos(latRadians);
-			const int x = (int)round((-6371.0) * cosLat * Math.cos(lonRadians)); // 6371 == Earth's approximate radius in kilometers
+			const int x = (int)round((-6371.0) * cosLat * cos(lonRadians)); // 6371 == Earth's approximate radius in kilometers
 			const int y = (int)round(6371.0 * sin(latRadians));
-			const int z = (int)round(6371.0 * cosLat * Math.sin(lonRadians));
+			const int z = (int)round(6371.0 * cosLat * sin(lonRadians));
 
 			if (ipStart.ss_family == AF_INET) {
 				v4db.push_back(_V4E());
 				v4db.back().start = Utils::ntoh((uint32_t)(reinterpret_cast<const struct sockaddr_in *>(&ipStart)->sin_addr.s_addr));
 				v4db.back().end = Utils::ntoh((uint32_t)(reinterpret_cast<const struct sockaddr_in *>(&ipEnd)->sin_addr.s_addr));
+				//v4db.back().lat = (float)lat;
+				//v4db.back().lon = (float)lon;
 				v4db.back().x = x;
 				v4db.back().y = y;
 				v4db.back().z = z;
+				//printf("%s - %s : %d,%d,%d\n",ipStart.toIpString().c_str(),ipEnd.toIpString().c_str(),x,y,z);
 			} else if (ipStart.ss_family == AF_INET6) {
 				v6db.push_back(_V6E());
 				memcpy(v6db.back().start,reinterpret_cast<const struct sockaddr_in6 *>(&ipStart)->sin6_addr.s6_addr,16);
 				memcpy(v6db.back().end,reinterpret_cast<const struct sockaddr_in6 *>(&ipEnd)->sin6_addr.s6_addr,16);
+				//v6db.back().lat = (float)lat;
+				//v6db.back().lon = (float)lon;
 				v6db.back().x = x;
 				v6db.back().y = y;
 				v6db.back().z = z;
+				//printf("%s - %s : %d,%d,%d\n",ipStart.toIpString().c_str(),ipEnd.toIpString().c_str(),x,y,z);
 			}
 		}
 	}
@@ -147,6 +154,8 @@ long ClusterGeoIpService::_load(const char *pathToCsv,int ipStartColumn,int ipEn
 
 	std::vector<_V4E> v4db;
 	std::vector<_V6E> v6db;
+	v4db.reserve(16777216);
+	v6db.reserve(16777216);
 
 	char buf[4096];
 	char linebuf[1024];
@@ -199,3 +208,28 @@ long ClusterGeoIpService::_load(const char *pathToCsv,int ipStartColumn,int ipEn
 } // namespace ZeroTier
 
 #endif // ZT_ENABLE_CLUSTER
+
+/*
+int main(int argc,char **argv)
+{
+	char buf[1024];
+
+	ZeroTier::ClusterGeoIpService gip;
+	printf("loading...\n");
+	gip.load("/Users/api/Code/ZeroTier/Infrastructure/root-servers/zerotier-one/cluster-geoip.csv",0,1,5,6);
+	printf("... done!\n"); fflush(stdout);
+
+	while (gets(buf)) { // unsafe, testing only
+		ZeroTier::InetAddress addr(buf,0);
+		printf("looking up: %s\n",addr.toString().c_str()); fflush(stdout);
+		int x = 0,y = 0,z = 0;
+		if (gip.locate(addr,x,y,z)) {
+			//printf("%s: %d,%d,%d\n",addr.toString().c_str(),x,y,z); fflush(stdout);
+		} else {
+			printf("%s: not found!\n",addr.toString().c_str()); fflush(stdout);
+		}
+	}
+
+	return 0;
+}
+*/

+ 28 - 14
service/ClusterGeoIpService.hpp

@@ -32,11 +32,13 @@
 
 #include "../node/Constants.hpp"
 #include "../node/Mutex.hpp"
+#include "../node/NonCopyable.hpp"
+#include "../node/InetAddress.hpp"
 
 namespace ZeroTier {
 
 /**
- * Loads a DBIP CSV into memory for fast lookup, reloading as needed
+ * Loads a GeoIP CSV into memory for fast lookup, reloading as needed
  *
  * This was designed around the CSV from https://db-ip.com but can be used
  * with any similar GeoIP CSV database that is presented in the form of an
@@ -45,7 +47,7 @@ namespace ZeroTier {
  * It loads the whole database into memory, which can be kind of large. If
  * the CSV file changes, the changes are loaded automatically.
  */
-class ClusterGeoIpService
+class ClusterGeoIpService : NonCopyable
 {
 public:
 	ClusterGeoIpService();
@@ -85,23 +87,21 @@ public:
 	 */
 	bool locate(const InetAddress &ip,int &x,int &y,int &z);
 
-private:
-	long _load(const char *pathToCsv,int ipStartColumn,int ipEndColumn,int latitudeColumn,int longitudeColumn);
-
-	std::string _pathToCsv;
-	int _ipStartColumn;
-	int _ipEndColumn;
-	int _latitudeColumn;
-	int _longitudeColumn;
-
-	uint64_t _lastFileCheckTime;
-	uint64_t _csvModificationTime;
-	int64_t _csvFileSize;
+	/**
+	 * @return True if IP database/service is available for queries (otherwise locate() will always be false)
+	 */
+	inline bool available() const
+	{
+		Mutex::Lock _l(_lock);
+		return ((_v4db.size() + _v6db.size()) > 0);
+	}
 
+private:
 	struct _V4E
 	{
 		uint32_t start;
 		uint32_t end;
+		//float lat,lon;
 		int x,y,z;
 
 		inline bool operator<(const _V4E &e) const { return (start < e.start); }
@@ -111,11 +111,25 @@ private:
 	{
 		uint8_t start[16];
 		uint8_t end[16];
+		//float lat,lon;
 		int x,y,z;
 
 		inline bool operator<(const _V6E &e) const { return (memcmp(start,e.start,16) < 0); }
 	};
 
+	static void _parseLine(const char *line,std::vector<_V4E> &v4db,std::vector<_V6E> &v6db,int ipStartColumn,int ipEndColumn,int latitudeColumn,int longitudeColumn);
+	long _load(const char *pathToCsv,int ipStartColumn,int ipEndColumn,int latitudeColumn,int longitudeColumn);
+
+	std::string _pathToCsv;
+	int _ipStartColumn;
+	int _ipEndColumn;
+	int _latitudeColumn;
+	int _longitudeColumn;
+
+	uint64_t _lastFileCheckTime;
+	uint64_t _csvModificationTime;
+	int64_t _csvFileSize;
+
 	std::vector<_V4E> _v4db;
 	std::vector<_V6E> _v6db;
 

+ 2 - 20
service/OneService.cpp

@@ -520,7 +520,6 @@ public:
 	// Cluster management instance if enabled
 #ifdef ZT_ENABLE_CLUSTER
 	PhySocket *_clusterMessageSocket;
-	ClusterGeoIpService *_clusterGeoIpService;
 	ClusterDefinition *_clusterDefinition;
 	unsigned int _clusterMemberId;
 #endif
@@ -553,7 +552,6 @@ public:
 #endif
 #ifdef ZT_ENABLE_CLUSTER
 		,_clusterMessageSocket((PhySocket *)0)
-		,_clusterGeoIpService((ClusterGeoIpService *)0)
 		,_clusterDefinition((ClusterDefinition *)0)
 		,_clusterMemberId(0)
 #endif
@@ -633,7 +631,6 @@ public:
 		delete _controller;
 #endif
 #ifdef ZT_ENABLE_CLUSTER
-		delete _clusterGeoIpService;
 		delete _clusterDefinition;
 #endif
 	}
@@ -750,33 +747,18 @@ public:
 						return _termReason;
 					}
 
-					if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str()))
-						_clusterGeoIpService = new ClusterGeoIpService((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str());
-
 					const ClusterDefinition::MemberDefinition &me = (*_clusterDefinition)[_clusterMemberId];
 					InetAddress endpoints[255];
 					unsigned int numEndpoints = 0;
 					for(std::vector<InetAddress>::const_iterator i(me.zeroTierEndpoints.begin());i!=me.zeroTierEndpoints.end();++i)
 						endpoints[numEndpoints++] = *i;
 
-					if (_node->clusterInit(
-						_clusterMemberId,
-						reinterpret_cast<const struct sockaddr_storage *>(endpoints),
-						numEndpoints,
-						me.x,
-						me.y,
-						me.z,
-						&SclusterSendFunction,
-						this,
-						(_clusterGeoIpService) ? &SclusterGeoIpFunction : 0,
-						this) == ZT_RESULT_OK) {
-
+					if (_node->clusterInit(_clusterMemberId,reinterpret_cast<const struct sockaddr_storage *>(endpoints),numEndpoints,me.x,me.y,me.z,&SclusterSendFunction,this,_clusterDefinition->geo().available() ? &SclusterGeoIpFunction : 0,this) == ZT_RESULT_OK) {
 						std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members());
 						for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) {
 							if (m->id != _clusterMemberId)
 								_node->clusterAddMember(m->id);
 						}
-
 					}
 				} else {
 					delete _clusterDefinition;
@@ -1611,7 +1593,7 @@ static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *
 static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z)
 {
 	OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr);
-	return (int)(impl->_clusterGeoIpService->locate(*(reinterpret_cast<const InetAddress *>(addr)),*x,*y,*z));
+	return (int)(impl->_clusterDefinition->geo().locate(*(reinterpret_cast<const InetAddress *>(addr)),*x,*y,*z));
 }
 #endif