Browse Source

Update how controller handles circuit tests -- save results to filesystem.

Adam Ierymenko 8 years ago
parent
commit
a577b8d381

+ 65 - 72
controller/EmbeddedNetworkController.cpp

@@ -428,9 +428,9 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule)
 	return false;
 }
 
-EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath,FILE *feed) :
+EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath) :
 	_threadsStarted(false),
-	_db(dbPath,feed),
+	_db(dbPath),
 	_node(node)
 {
 	OSUtils::mkdir(dbPath);
@@ -546,21 +546,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
 						return 200;
 					}
 
-				} else if ((path[2] == "test")&&(path.size() >= 4)) {
-
-					Mutex::Lock _l(_circuitTests_m);
-					std::map< uint64_t,_CircuitTestEntry >::iterator cte(_circuitTests.find(Utils::hexStrToU64(path[3].c_str())));
-					if ((cte != _circuitTests.end())&&(cte->second.test)) {
-
-						responseBody = "[";
-						responseBody.append(cte->second.jsonResults);
-						responseBody.push_back(']');
-						responseContentType = "application/json";
-
-						return 200;
-
-					} // else 404
-
 				} // else 404
 
 			} else {
@@ -755,9 +740,10 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
 					return 200;
 				} else if ((path.size() == 3)&&(path[2] == "test")) {
 
-					Mutex::Lock _l(_circuitTests_m);
+					Mutex::Lock _l(_tests_m);
 
-					ZT_CircuitTest *test = (ZT_CircuitTest *)malloc(sizeof(ZT_CircuitTest));
+					_tests.push_back(ZT_CircuitTest());
+					ZT_CircuitTest *const test = &(_tests.back());
 					memset(test,0,sizeof(ZT_CircuitTest));
 
 					Utils::getSecureRandom(&(test->testId),sizeof(test->testId));
@@ -781,7 +767,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
 					test->reportAtEveryHop = (OSUtils::jsonBool(b["reportAtEveryHop"],true) ? 1 : 0);
 
 					if (!test->hopCount) {
-						::free((void *)test);
+						_tests.pop_back();
 						responseBody = "{ \"message\": \"a test must contain at least one hop\" }";
 						responseContentType = "application/json";
 						return 400;
@@ -789,18 +775,18 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
 
 					test->timestamp = OSUtils::now();
 
-					_CircuitTestEntry &te = _circuitTests[test->testId];
-					te.test = test;
-					te.jsonResults = "";
-
-					if (_node)
+					if (_node) {
 						_node->circuitTestBegin(test,&(EmbeddedNetworkController::_circuitTestCallback));
-					else return 500;
+					} else {
+						_tests.pop_back();
+						return 500;
+					}
 
 					char json[1024];
 					Utils::snprintf(json,sizeof(json),"{\"testId\":\"%.16llx\"}",test->testId);
 					responseBody = json;
 					responseContentType = "application/json";
+
 					return 200;
 
 				} // else 404
@@ -1137,62 +1123,67 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE(
 void EmbeddedNetworkController::threadMain()
 	throw()
 {
+	uint64_t lastCircuitTestCheck = 0;
 	for(;;) {
-		_RQEntry *const qe = _queue.get();
+		_RQEntry *const qe = _queue.get(); // waits on next request
 		if (!qe) break; // enqueue a NULL to terminate threads
 		try {
 			_request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData);
 		} catch ( ... ) {}
 		delete qe;
+
+		uint64_t now = OSUtils::now();
+		if ((now - lastCircuitTestCheck) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) {
+			lastCircuitTestCheck = now;
+			Mutex::Lock _l(_tests_m);
+			for(std::list< ZT_CircuitTest >::iterator i(_tests.begin());i!=_tests.end();) {
+				if ((now - i->timestamp) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) {
+					_node->circuitTestEnd(&(*i));
+					_tests.erase(i++);
+				} else ++i;
+			}
+		}
 	}
 }
 
 void EmbeddedNetworkController::_circuitTestCallback(ZT_Node *node,ZT_CircuitTest *test,const ZT_CircuitTestReport *report)
 {
-	char tmp[65535];
+	char tmp[1024],id[128];
 	EmbeddedNetworkController *const self = reinterpret_cast<EmbeddedNetworkController *>(test->ptr);
 
-	if (!test)
-		return;
-	if (!report)
-		return;
-
-	Mutex::Lock _l(self->_circuitTests_m);
-	std::map< uint64_t,_CircuitTestEntry >::iterator cte(self->_circuitTests.find(test->testId));
-
-	if (cte == self->_circuitTests.end()) { // sanity check: a circuit test we didn't launch?
-		self->_node->circuitTestEnd(test);
-		::free((void *)test);
-		return;
-	}
+	if ((!test)||(!report)||(!test->credentialNetworkId)) return; // sanity check
 
+	const uint64_t now = OSUtils::now();
+	Utils::snprintf(id,sizeof(id),"network/%.16llx/test/%.16llx-%.16llx-%.10llx-%.10llx",test->credentialNetworkId,test->testId,now,report->upstream,report->current);
 	Utils::snprintf(tmp,sizeof(tmp),
-		"%s{\n"
-		"\t\"timestamp\": %llu," ZT_EOL_S
-		"\t\"testId\": \"%.16llx\"," ZT_EOL_S
-		"\t\"upstream\": \"%.10llx\"," ZT_EOL_S
-		"\t\"current\": \"%.10llx\"," ZT_EOL_S
-		"\t\"receivedTimestamp\": %llu," ZT_EOL_S
-		"\t\"sourcePacketId\": \"%.16llx\"," ZT_EOL_S
-		"\t\"flags\": %llu," ZT_EOL_S
-		"\t\"sourcePacketHopCount\": %u," ZT_EOL_S
-		"\t\"errorCode\": %u," ZT_EOL_S
-		"\t\"vendor\": %d," ZT_EOL_S
-		"\t\"protocolVersion\": %u," ZT_EOL_S
-		"\t\"majorVersion\": %u," ZT_EOL_S
-		"\t\"minorVersion\": %u," ZT_EOL_S
-		"\t\"revision\": %u," ZT_EOL_S
-		"\t\"platform\": %d," ZT_EOL_S
-		"\t\"architecture\": %d," ZT_EOL_S
-		"\t\"receivedOnLocalAddress\": \"%s\"," ZT_EOL_S
-		"\t\"receivedFromRemoteAddress\": \"%s\"" ZT_EOL_S
-		"}",
-		((cte->second.jsonResults.length() > 0) ? ",\n" : ""),
-		(unsigned long long)report->timestamp,
+		"{\"id\": \"%s\","
+		"\"timestamp\": %llu,"
+		"\"networkId\": \"%.16llx\","
+		"\"testId\": \"%.16llx\","
+		"\"upstream\": \"%.10llx\","
+		"\"current\": \"%.10llx\","
+		"\"receivedTimestamp\": %llu,"
+		"\"sourcePacketId\": \"%.16llx\","
+		"\"flags\": %llu,"
+		"\"sourcePacketHopCount\": %u,"
+		"\"errorCode\": %u,"
+		"\"vendor\": %d,"
+		"\"protocolVersion\": %u,"
+		"\"majorVersion\": %u,"
+		"\"minorVersion\": %u,"
+		"\"revision\": %u,"
+		"\"platform\": %d,"
+		"\"architecture\": %d,"
+		"\"receivedOnLocalAddress\": \"%s\","
+		"\"receivedFromRemoteAddress\": \"%s\","
+		"\"receivedFromLinkQuality\": %f}",
+		id + 30, // last bit only, not leading path
+		(unsigned long long)test->timestamp,
+		(unsigned long long)test->credentialNetworkId,
 		(unsigned long long)test->testId,
 		(unsigned long long)report->upstream,
 		(unsigned long long)report->current,
-		(unsigned long long)OSUtils::now(),
+		(unsigned long long)now,
 		(unsigned long long)report->sourcePacketId,
 		(unsigned long long)report->flags,
 		report->sourcePacketHopCount,
@@ -1205,9 +1196,11 @@ void EmbeddedNetworkController::_circuitTestCallback(ZT_Node *node,ZT_CircuitTes
 		(int)report->platform,
 		(int)report->architecture,
 		reinterpret_cast<const InetAddress *>(&(report->receivedOnLocalAddress))->toString().c_str(),
-		reinterpret_cast<const InetAddress *>(&(report->receivedFromRemoteAddress))->toString().c_str());
+		reinterpret_cast<const InetAddress *>(&(report->receivedFromRemoteAddress))->toString().c_str(),
+		((double)report->receivedFromLinkQuality / (double)ZT_PATH_LINK_QUALITY_MAX));
 
-	cte->second.jsonResults.append(tmp);
+	Mutex::Lock _l(self->_db_m);
+	self->_db.writeRaw(id,std::string(tmp));
 }
 
 void EmbeddedNetworkController::_request(
@@ -1354,12 +1347,12 @@ void EmbeddedNetworkController::_request(
 	if (requestPacketId) { // only log if this is a request, not for generated pushes
 		json rlEntry = json::object();
 		rlEntry["ts"] = now;
-		rlEntry["authorized"] = (authorizedBy) ? true : false;
-		rlEntry["authorizedBy"] = (authorizedBy) ? authorizedBy : "";
-		rlEntry["clientMajorVersion"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_MAJOR_VERSION,0);
-		rlEntry["clientMinorVersion"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_MINOR_VERSION,0);
-		rlEntry["clientRevision"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_REVISION,0);
-		rlEntry["clientProtocolVersion"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_PROTOCOL_VERSION,0);
+		rlEntry["auth"] = (authorizedBy) ? true : false;
+		rlEntry["authBy"] = (authorizedBy) ? authorizedBy : "";
+		rlEntry["vMajor"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_MAJOR_VERSION,0);
+		rlEntry["vMinor"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_MINOR_VERSION,0);
+		rlEntry["vRev"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_REVISION,0);
+		rlEntry["vProto"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_PROTOCOL_VERSION,0);
 		if (fromAddr)
 			rlEntry["fromAddr"] = fromAddr.toString();
 

+ 6 - 9
controller/EmbeddedNetworkController.hpp

@@ -46,6 +46,9 @@
 // Number of background threads to start -- not actually started until needed
 #define ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT 2
 
+// TTL for circuit tests
+#define ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION 120000
+
 namespace ZeroTier {
 
 class Node;
@@ -56,9 +59,8 @@ public:
 	/**
 	 * @param node Parent node
 	 * @param dbPath Path to store data
-	 * @param feed FILE to send feed of all data and changes to (zero-delimited JSON objects) or NULL for none
 	 */
-	EmbeddedNetworkController(Node *node,const char *dbPath,FILE *feed);
+	EmbeddedNetworkController(Node *node,const char *dbPath);
 	virtual ~EmbeddedNetworkController();
 
 	virtual void init(const Identity &signingId,Sender *sender);
@@ -199,13 +201,8 @@ private:
 	NetworkController::Sender *_sender;
 	Identity _signingId;
 
-	struct _CircuitTestEntry
-	{
-		ZT_CircuitTest *test;
-		std::string jsonResults;
-	};
-	std::map< uint64_t,_CircuitTestEntry > _circuitTests;
-	Mutex _circuitTests_m;
+	std::list< ZT_CircuitTest > _tests;
+	Mutex _tests_m;
 
 	std::map< std::pair<uint64_t,uint64_t>,uint64_t > _lastRequestTime;
 	Mutex _lastRequestTime_m;

+ 16 - 9
controller/JSONDB.cpp

@@ -22,6 +22,22 @@ namespace ZeroTier {
 
 static const nlohmann::json _EMPTY_JSON(nlohmann::json::object());
 
+bool JSONDB::writeRaw(const std::string &n,const std::string &obj)
+{
+	if (!_isValidObjectName(n))
+		return false;
+
+	const std::string path(_genPath(n,true));
+	if (!path.length())
+		return false;
+
+	const std::string buf(obj);
+	if (!OSUtils::writeFile(path.c_str(),buf))
+		return false;
+
+	return true;
+}
+
 bool JSONDB::put(const std::string &n,const nlohmann::json &obj)
 {
 	if (!_isValidObjectName(n))
@@ -35,9 +51,6 @@ bool JSONDB::put(const std::string &n,const nlohmann::json &obj)
 	if (!OSUtils::writeFile(path.c_str(),buf))
 		return false;
 
-	if (_feed)
-		fwrite(buf.c_str(),buf.length()+1,1,_feed);
-
 	_E &e = _db[n];
 	e.obj = obj;
 	e.lastModifiedOnDisk = OSUtils::getLastModified(path.c_str());
@@ -72,9 +85,6 @@ const nlohmann::json &JSONDB::get(const std::string &n,unsigned long maxSinceChe
 					e->second.obj = OSUtils::jsonParse(buf);
 					e->second.lastModifiedOnDisk = lm; // don't update these if there is a parse error -- try again and again ASAP
 					e->second.lastCheck = now;
-
-					if (_feed)
-						fwrite(buf.c_str(),buf.length()+1,1,_feed); // it changed, so send to feed (also sends all objects on startup, which we want for Central)
 				} catch ( ... ) {} // parse errors result in "holding pattern" behavior
 			}
 		}
@@ -99,9 +109,6 @@ const nlohmann::json &JSONDB::get(const std::string &n,unsigned long maxSinceChe
 		e2.lastModifiedOnDisk = lm;
 		e2.lastCheck = now;
 
-		if (_feed)
-			fwrite(buf.c_str(),buf.length()+1,1,_feed);
-
 		return e2.obj;
 	}
 }

+ 3 - 3
controller/JSONDB.hpp

@@ -42,8 +42,7 @@ namespace ZeroTier {
 class JSONDB
 {
 public:
-	JSONDB(const std::string &basePath,FILE *feed) :
-		_feed(feed),
+	JSONDB(const std::string &basePath) :
 		_basePath(basePath)
 	{
 		_reload(_basePath);
@@ -55,6 +54,8 @@ public:
 		_reload(_basePath);
 	}
 
+	bool writeRaw(const std::string &n,const std::string &obj);
+
 	bool put(const std::string &n,const nlohmann::json &obj);
 
 	inline bool put(const std::string &n1,const std::string &n2,const nlohmann::json &obj) { return this->put((n1 + "/" + n2),obj); }
@@ -108,7 +109,6 @@ private:
 		inline bool operator!=(const _E &e) const { return (obj != e.obj); }
 	};
 
-	FILE *_feed;
 	std::string _basePath;
 	std::map<std::string,_E> _db;
 };

+ 6 - 5
controller/README.md

@@ -237,11 +237,12 @@ Note that managed IP assignments are only used if they fall within a managed rou
 | Field                 | Type          | Description                                       |
 | --------------------- | ------------- | ------------------------------------------------- |
 | ts                    | integer       | Time of request, ms since epoch                   |
-| authorized            | boolean       | Was member authorized?                            |
-| clientMajorVersion    | integer       | Client major version or -1 if unknown             |
-| clientMinorVersion    | integer       | Client minor version or -1 if unknown             |
-| clientRevision        | integer       | Client revision or -1 if unknown                  |
-| clientProtocolVersion | integer       | ZeroTier protocol version reported by client      |
+| auth                  | boolean       | Was member authorized?                            |
+| authBy                | string        | How was member authorized?                        |
+| vMajor                | integer       | Client major version or -1 if unknown             |
+| vMinor                | integer       | Client minor version or -1 if unknown             |
+| vRev                  | integer       | Client revision or -1 if unknown                  |
+| vProto                | integer       | ZeroTier protocol version reported by client      |
 | fromAddr              | string        | Physical address if known                         |
 
 The controller can only know a member's `fromAddr` if it's able to establish a direct path to it. Members behind very restrictive firewalls may not have this information since the controller will be receiving the member's requests by way of a relay. ZeroTier does not back-trace IP paths as packets are relayed since this would add a lot of protocol overhead.

+ 3 - 3
node/Peer.cpp

@@ -141,10 +141,10 @@ void Peer::received(
 		path->trustedPacketReceived(now);
 	}
 
-	if (hops == 0) {
-		if (_vProto >= 9)
-			path->updateLinkQuality((unsigned int)(packetId & 7));
+	if (_vProto >= 9)
+		path->updateLinkQuality((unsigned int)(packetId & 7));
 
+	if (hops == 0) {
 		bool pathIsConfirmed = false;
 		{
 			Mutex::Lock _l(_paths_m);

+ 1 - 1
service/OneService.cpp

@@ -638,7 +638,7 @@ public:
 				return _termReason;
 			}
 
-			_controller = new EmbeddedNetworkController(_node,(_homePath + ZT_PATH_SEPARATOR_S ZT_CONTROLLER_DB_PATH).c_str(),(FILE *)0);
+			_controller = new EmbeddedNetworkController(_node,(_homePath + ZT_PATH_SEPARATOR_S ZT_CONTROLLER_DB_PATH).c_str());
 			_node->setNetconfMaster((void *)_controller);
 
 #ifdef ZT_ENABLE_CLUSTER