Procházet zdrojové kódy

Fully operational pgsql controller

Grant Limberg před 6 roky
rodič
revize
d87fcca176

+ 5 - 3
controller/EmbeddedNetworkController.cpp

@@ -1610,18 +1610,20 @@ void EmbeddedNetworkController::_request(
 				if ( (ipRangeStartIA.ss_family == AF_INET) && (ipRangeEndIA.ss_family == AF_INET) ) {
 					uint32_t ipRangeStart = Utils::ntoh((uint32_t)(reinterpret_cast<struct sockaddr_in *>(&ipRangeStartIA)->sin_addr.s_addr));
 					uint32_t ipRangeEnd = Utils::ntoh((uint32_t)(reinterpret_cast<struct sockaddr_in *>(&ipRangeEndIA)->sin_addr.s_addr));
+					
 					if ((ipRangeEnd < ipRangeStart)||(ipRangeStart == 0))
 						continue;
 					uint32_t ipRangeLen = ipRangeEnd - ipRangeStart;
-
+					
 					// Start with the LSB of the member's address
 					uint32_t ipTrialCounter = (uint32_t)(identity.address().toInt() & 0xffffffff);
 
 					for(uint32_t k=ipRangeStart,trialCount=0;((k<=ipRangeEnd)&&(trialCount < 1000));++k,++trialCount) {
 						uint32_t ip = (ipRangeLen > 0) ? (ipRangeStart + (ipTrialCounter % ipRangeLen)) : ipRangeStart;
 						++ipTrialCounter;
-						if ((ip & 0x000000ff) == 0x000000ff)
+						if ((ip & 0x000000ff) == 0x000000ff) {
 							continue; // don't allow addresses that end in .255
+						}
 
 						// Check if this IP is within a local-to-Ethernet routed network
 						int routedNetmaskBits = -1;
@@ -1659,7 +1661,7 @@ void EmbeddedNetworkController::_request(
 			}
 		}
 	}
-
+	
 	// Issue a certificate of ownership for all static IPs
 	if (nc->staticIpCount) {
 		nc->certificatesOfOwnership[0] = CertificateOfOwnership(nwid,now,identity.address(),1);

+ 137 - 59
controller/PostgreSQL.cpp

@@ -119,18 +119,24 @@ bool PostgreSQL::isReady()
 
 void PostgreSQL::save(nlohmann::json *orig, nlohmann::json &record)
 {
-	if (!record.is_object()) {
-		return;
-	}
-	waitForReady();
-	if (orig) {
-		if (*orig != record) {
-			record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
+	try {
+		if (!record.is_object()) {
+			return;
+		}
+		waitForReady();
+		if (orig) {
+			if (*orig != record) {
+				record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
+				_commitQueue.post(new nlohmann::json(record));
+			}
+		} else {
+			record["revision"] = 1;
 			_commitQueue.post(new nlohmann::json(record));
 		}
-	} else {
-		record["revision"] = 1;
-		_commitQueue.post(new nlohmann::json(record));
+	} catch (std::exception &e) {
+		fprintf(stderr, "Error on PostgreSQL::save: %s\n", e.what());
+	} catch (...) {
+		fprintf(stderr, "Unknown error on PostgreSQL::save\n");
 	}
 }
 
@@ -274,7 +280,6 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
 				std::string addr = PQgetvalue(r2, j, 0);
 				std::string bits = PQgetvalue(r2, j, 1);
 				std::string via = PQgetvalue(r2, j, 2);
-				fprintf(stderr, "via: %s", via.c_str());
 				json route;
 				route["target"] = addr + "/" + bits;
 
@@ -442,27 +447,21 @@ void PostgreSQL::heartbeat()
 			conn = PQconnectdb(_path.c_str());
 		}
 		if (conn) {
+			std::string major = std::to_string(ZEROTIER_ONE_VERSION_MAJOR);
+			std::string minor = std::to_string(ZEROTIER_ONE_VERSION_MINOR);
+			std::string rev = std::to_string(ZEROTIER_ONE_VERSION_REVISION);
+			std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD);
+			std::string now = std::to_string(OSUtils::now());
 			const char *values[8] = {
 				controllerId,
 				hostname,
-				std::to_string(OSUtils::now()).c_str(),
+				now.c_str(),
 				publicIdentity,
-				std::to_string(ZEROTIER_ONE_VERSION_MAJOR).c_str(),
-				std::to_string(ZEROTIER_ONE_VERSION_MINOR).c_str(),
-				std::to_string(ZEROTIER_ONE_VERSION_REVISION).c_str(),
-				std::to_string(ZEROTIER_ONE_VERSION_BUILD).c_str()
-			};
-			int lengths[8] = {
-				(int)strlen(values[0]),
-				(int)strlen(values[1]),
-				(int)strlen(values[2]),
-				(int)strlen(values[3]),
-				(int)strlen(values[4]),
-				(int)strlen(values[5]),
-				(int)strlen(values[6]),
-				(int)strlen(values[7])
+				major.c_str(),
+				minor.c_str(),
+				rev.c_str(),
+				build.c_str()
 			};
-			int binary[8] = {0,0,0,0,0,0,0,0};
 
 			PGresult *res = PQexecParams(conn,
 				"INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build) " 
@@ -473,8 +472,8 @@ void PostgreSQL::heartbeat()
 				8,       // number of parameters
 				NULL,    // oid field.   ignore
 				values,  // values for substitution
-				lengths, // lengths in bytes of each value
-				binary,  // binary?
+				NULL, // lengths in bytes of each value
+				NULL,  // binary?
 				0);
 
 			if (PQresultStatus(res) != PGRES_COMMAND_OK) {
@@ -521,7 +520,7 @@ void PostgreSQL::membersDbWatcher()
 		PGnotify *notify = NULL;
 		PQconsumeInput(conn);
 		while ((notify = PQnotifies(conn)) != NULL) {
-			fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra);
+			//fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra);
 
 			try {
 				json tmp(json::parse(notify->extra));
@@ -574,7 +573,7 @@ void PostgreSQL::networksDbWatcher()
 		PGnotify *notify = NULL;
 		PQconsumeInput(conn);
 		while ((notify = PQnotifies(conn)) != NULL) {
-			fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra);
+			//fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra);
 			try {
 				json tmp(json::parse(notify->extra));
 				json &ov = tmp["old_val"];
@@ -611,6 +610,7 @@ void PostgreSQL::commitThread()
 		if (PQstatus(conn) == CONNECTION_BAD) {
 			fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn));
 			PQfinish(conn);
+			delete config;
 			exit(1);
 		}
 		try { 
@@ -621,27 +621,39 @@ void PostgreSQL::commitThread()
 					std::string networkId = (*config)["nwid"];
 					std::string identity = (*config)["identity"];
 					std::string target = "NULL";
+					fprintf(stderr, "Updating Member %s-%s\n", networkId.c_str(), memberId.c_str());
 					if (!(*config)["remoteTraceTarget"].is_null()) {
 						target = (*config)["remoteTraceTarget"];
 					}
+
+					std::string caps = OSUtils::jsonDump((*config)["capabilities"], -1);
+					std::string lastAuthTime = std::to_string((long long)(*config)["lastAuthorizedTime"]);
+					std::string lastDeauthTime = std::to_string((long long)(*config)["lastDeauthorizedTime"]);
+					std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]);
+					std::string rev = std::to_string((unsigned long long)(*config)["revision"]);
+					std::string tags = OSUtils::jsonDump((*config)["tags"], -1);
+					std::string vmajor = std::to_string((int)(*config)["vMajor"]);
+					std::string vminor = std::to_string((int)(*config)["vMinor"]);
+					std::string vrev = std::to_string((int)(*config)["vRev"]);
+					std::string vproto = std::to_string((int)(*config)["vProto"]);
 					const char *values[19] = {
 						memberId.c_str(),
 						networkId.c_str(),
 						((*config)["activeBridge"] ? "true" : "false"),
 						((*config)["authorized"] ? "true" : "false"),
-						OSUtils::jsonDump((*config)["capabilities"], -1).c_str(),
+						caps.c_str(),
 						identity.c_str(),
-						std::to_string((long long)(*config)["lastAuthorizedTime"]).c_str(),
-						std::to_string((long long)(*config)["lastDeauthorizedTime"]).c_str(),
+						lastAuthTime.c_str(),
+						lastDeauthTime.c_str(),
 						((*config)["noAutoAssignIps"] ? "true" : "false"),
-						std::to_string((int)(*config)["remoteTraceLevel"]).c_str(),
+						rtraceLevel.c_str(),
 						(target == "NULL") ? NULL : target.c_str(),
-						std::to_string((unsigned long long)(*config)["revision"]).c_str(),
-						OSUtils::jsonDump((*config)["tags"], -1).c_str(),
-						std::to_string((int)(*config)["vMajor"]).c_str(),
-						std::to_string((int)(*config)["vMinor"]).c_str(),
-						std::to_string((int)(*config)["vRev"]).c_str(),
-						std::to_string((int)(*config)["vProto"]).c_str()
+						rev.c_str(),
+						tags.c_str(),
+						vmajor.c_str(),
+						vminor.c_str(),
+						vrev.c_str(),
+						vproto.c_str()
 					};
 
 					PGresult *res = PQexecParams(conn,
@@ -668,6 +680,8 @@ void PostgreSQL::commitThread()
 						fprintf(stderr, "ERROR: Error updating member: %s\n", PQresultErrorMessage(res));
 						fprintf(stderr, "%s", OSUtils::jsonDump(*config, 2).c_str());
 						PQclear(res);
+						delete config;
+						config = nullptr;
 						continue;
 					}
 
@@ -677,6 +691,8 @@ void PostgreSQL::commitThread()
 					if (PQresultStatus(res) != PGRES_COMMAND_OK) {
 						fprintf(stderr, "ERROR: Error beginning transaction: %s\n", PQresultErrorMessage(res));
 						PQclear(res);
+						delete config;
+						config = nullptr;
 						continue;
 					}
 
@@ -700,6 +716,8 @@ void PostgreSQL::commitThread()
 						fprintf(stderr, "ERROR: Error updating IP address assignments: %s\n", PQresultErrorMessage(res));
 						PQclear(res);
 						PQclear(PQexec(conn, "ROLLBACK"));;
+						delete config;
+						config = nullptr;
 						continue;
 					}
 
@@ -736,6 +754,22 @@ void PostgreSQL::commitThread()
 					}
 
 					PQclear(res);
+
+					const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL);
+					const uint64_t memberidInt = OSUtils::jsonIntHex((*config)["id"], 0ULL);
+					if (nwidInt && memberidInt) {
+						nlohmann::json nwOrig;
+						nlohmann::json memOrig;
+
+						nlohmann::json memNew(*config);
+						
+						get(nwidInt, nwOrig, memberidInt, memOrig);
+				
+						_memberChanged(memOrig, memNew, (this->_ready>=2));
+					} else {
+						fprintf(stderr, "Can't notify of change.  Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt);
+					}
+
 				} catch (std::exception &e) {
 					fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
 				}
@@ -749,23 +783,35 @@ void PostgreSQL::commitThread()
 						remoteTraceTarget = (*config)["remoteTraceTarget"];
 					}
 					std::string rulesSource = (*config)["rulesSource"];
+					std::string caps = OSUtils::jsonDump((*config)["capabilitles"], -1);
+					std::string now = std::to_string(OSUtils::now());
+					std::string mtu = std::to_string((int)(*config)["mtu"]);
+					std::string mcastLimit = std::to_string((int)(*config)["multicastLimit"]);
+					std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]);
+					std::string rules = OSUtils::jsonDump((*config)["rules"], -1);
+					std::string tags = OSUtils::jsonDump((*config)["tags"], -1);
+					std::string v4mode = OSUtils::jsonDump((*config)["v4AssignMode"],-1);
+					std::string v6mode = OSUtils::jsonDump((*config)["v6AssignMode"], -1);
+					bool enableBroadcast = (*config)["enableBroadcast"];
+					bool isPrivate = (*config)["private"];
+
 					const char *values[16] = {
 						id.c_str(),
 						controllerId.c_str(),
-						OSUtils::jsonDump((*config)["capabilitles"], -1).c_str(),
-						((*config)["enableBroadcast"] ? "true" : "false"),
-						std::to_string(OSUtils::now()).c_str(),
-						std::to_string((int)(*config)["mtu"]).c_str(),
-						std::to_string((int)(*config)["multicastLimit"]).c_str(),
+						caps.c_str(),
+						enableBroadcast ? "true" : "false",
+						now.c_str(),
+						mtu.c_str(),
+						mcastLimit.c_str(),
 						name.c_str(),
-						((*config)["private"] ? "true" : "false"),
-						std::to_string((int)(*config)["remoteTraceLevel"]).c_str(),
+						isPrivate ? "true" : "false",
+						rtraceLevel.c_str(),
 						(remoteTraceTarget == "NULL" ? NULL : remoteTraceTarget.c_str()),
-						OSUtils::jsonDump((*config)["rules"], -1).c_str(),
+						rules.c_str(),
 						rulesSource.c_str(),
-						OSUtils::jsonDump((*config)["tags"], -1).c_str(),
-						OSUtils::jsonDump((*config)["v4AssignMode"],-1).c_str(),
-						OSUtils::jsonDump((*config)["v6AssignMode"], -1).c_str(),
+						tags.c_str(),
+						v4mode.c_str(),
+						v6mode.c_str(),
 					};
 
 					PGresult *res = PQexecParams(conn,
@@ -784,6 +830,8 @@ void PostgreSQL::commitThread()
 					if (PQresultStatus(res) != PGRES_COMMAND_OK) {
 						fprintf(stderr, "ERROR: Error updating network record: %s\n", PQresultErrorMessage(res));
 						PQclear(res);
+						delete config;
+						config = nullptr;
 						continue;
 					}
 
@@ -793,6 +841,8 @@ void PostgreSQL::commitThread()
 					if (PQresultStatus(res) != PGRES_COMMAND_OK) {
 						fprintf(stderr, "ERROR: Error beginnning transaction: %s\n", PQresultErrorMessage(res));
 						PQclear(res);
+						delete config;
+						config = nullptr;
 						continue;
 					}
 
@@ -813,6 +863,8 @@ void PostgreSQL::commitThread()
 						fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res));
 						PQclear(res);
 						PQclear(PQexec(conn, "ROLLBACK"));
+						delete config;
+						config = nullptr;
 						continue;
 					}
 
@@ -848,6 +900,8 @@ void PostgreSQL::commitThread()
 					}
 					if (err) {
 						PQclear(PQexec(conn, "ROLLBACK"));
+						delete config;
+						config = nullptr;
 						continue;
 					}
 
@@ -864,6 +918,8 @@ void PostgreSQL::commitThread()
 						fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res));
 						PQclear(res);
 						PQclear(PQexec(conn, "ROLLBACK"));
+						delete config;
+						config = nullptr;
 						continue;
 					}
 
@@ -913,7 +969,9 @@ void PostgreSQL::commitThread()
 						PQclear(res);
 					}
 					if (err) {
-						PQclear(PQexec(conn, "ROLLBAcK"));
+						PQclear(PQexec(conn, "ROLLBACK"));
+						delete config;
+						config = nullptr;
 						continue;
 					}
 
@@ -922,6 +980,19 @@ void PostgreSQL::commitThread()
 						fprintf(stderr, "ERROR: Error committing network update: %s\n", PQresultErrorMessage(res));
 					}
 					PQclear(res);
+
+					const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL);
+					if (nwidInt) {
+						nlohmann::json nwOrig;
+						nlohmann::json nwNew(*config);
+
+						get(nwidInt, nwOrig);
+
+						_networkChanged(nwOrig, nwNew, true);
+					} else {
+						fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt);
+					}
+
 				} catch (std::exception &e) {
 					fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
 				}
@@ -984,6 +1055,7 @@ void PostgreSQL::commitThread()
 			fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what());
 		}
 		delete config;
+		config = nullptr;
 
 		std::this_thread::sleep_for(std::chrono::milliseconds(10));
 	}
@@ -1054,12 +1126,13 @@ void PostgreSQL::onlineNotificationThread()
 
 			int64_t ts = i->second.first;
 			std::string ipAddr = i->second.second.toIpString(ipTmp);
+			std::string timestamp = std::to_string(ts);
 
 			const char *values[4] = {
 				networkId.c_str(),
 				memberId.c_str(),
 				(ipAddr.empty() ? NULL : ipAddr.c_str()),
-				std::to_string(ts).c_str(),
+				timestamp.c_str(),
 			};
 
 			res = PQexecParams(conn,
@@ -1162,13 +1235,18 @@ void PostgreSQL::onlineNotificationThread()
 					}
 				}
 
+				std::string bc = std::to_string(bridgeCount);
+				std::string amc = std::to_string(authMemberCount);
+				std::string omc = std::to_string(onlineMemberCount);
+				std::string tmc = std::to_string(totalMemberCount);
+				std::string timestamp = std::to_string(ts);
 				const char *values[6] = {
 					networkId.c_str(),
-					std::to_string(bridgeCount).c_str(),
-					std::to_string(authMemberCount).c_str(),
-					std::to_string(onlineMemberCount).c_str(),
-					std::to_string(totalMemberCount).c_str(),
-					std::to_string(ts).c_str()
+					bc.c_str(),
+					amc.c_str(),
+					omc.c_str(),
+					tmc.c_str(),
+					timestamp.c_str()
 				};
 
 				res = PQexecParams(conn, "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, "

+ 4 - 0
docker/Dockerfile

@@ -6,6 +6,10 @@ RUN yum update -y
 RUN yum install -y https://download.postgresql.org/pub/repos/yum/10/redhat/rhel-7-x86_64/pgdg-centos10-10-2.noarch.rpm
 RUN yum install -y postgresql10
 
+RUN yum -y install epel-release && yum -y update && yum clean all
+RUN yum -y install clang
+
+
 ADD zerotier-one /usr/local/bin/zerotier-one
 RUN chmod a+x /usr/local/bin/zerotier-one
 

+ 3 - 0
docker/main.sh

@@ -41,4 +41,7 @@ echo "{
 }    
 " > /var/lib/zerotier-one/local.conf
 
+# export ASAN_OPTIONS=detect_leaks=1
+# export ASAN_SYMBOLIZER_PATH=/usr/bin/llvm-symbolizer
+# export MSAN_SYMBOLIZER_PATH=/usr/bin/llvm-symbolizer
 exec /usr/local/bin/zerotier-one /var/lib/zerotier-one