Răsfoiți Sursa

PubSub to gcloud-cpp library for simplicity's sake

Grant Limberg 1 săptămână în urmă
părinte
comite
b114896e7b

+ 12 - 1
controller/CMakeLists.txt

@@ -34,6 +34,7 @@ set(LINK_LIBS
 
 if (ZT1_CENTRAL_CONTROLLER)
     find_package(PostgreSQL REQUIRED)
+    find_package(protobuf REQUIRED)
 
     list(APPEND SRC_FILES
         CV1.cpp
@@ -64,7 +65,7 @@ if (ZT1_CENTRAL_CONTROLLER)
         ${PostgreSQL_INCLUDE_DIRS}
         "${redis++_BUILD_DIR}/src"
         ${pqxx_INCLUDE_DIRS}
-
+        "${CMAKE_CURRENT_BINARY_DIR}" 
     )
 
     list(APPEND LINK_LIBS
@@ -78,6 +79,16 @@ endif()
 
 add_library(zerotier-controller STATIC ${SRC_FILES})
 
+if (ZT1_CENTRAL_CONTROLLER)
+    file(GLOB PROTO_FILES "${CMAKE_CURRENT_SOURCE_DIR}/protobuf/*.proto")
+    protobuf_generate(
+        TARGET zerotier-controller
+        LANGUAGE cpp
+        PROTOS ${PROTO_FILES}
+        APPEND_PATH
+    )
+endif()
+
 target_include_directories(zerotier-controller PRIVATE ${INCLUDE_DIRS})
 
 add_dependencies(zerotier-controller redis++::redis++)

+ 164 - 126
controller/CV1.cpp

@@ -73,7 +73,8 @@ CV1::CV1(const Identity& myId, const char* path, int listenPort, RedisConfig* rc
 	_myAddressStr = myId.address().toString(myAddress);
 	_connString = std::string(path);
 	auto f = std::make_shared<PostgresConnFactory>(_connString);
-	_pool = std::make_shared<ConnectionPool<PostgresConnection> >(15, 5, std::static_pointer_cast<ConnectionFactory>(f));
+	_pool =
+		std::make_shared<ConnectionPool<PostgresConnection> >(15, 5, std::static_pointer_cast<ConnectionFactory>(f));
 
 	memset(_ssoPsk, 0, sizeof(_ssoPsk));
 	char* const ssoPskHex = getenv("ZT_SSO_PSK");
@@ -100,7 +101,11 @@ CV1::CV1(const Identity& myId, const char* path, int listenPort, RedisConfig* rc
 	txn.commit();
 
 	if (dbVersion < DB_MINIMUM_VERSION) {
-		fprintf(stderr, "Central database schema version too low.  This controller version requires a minimum schema version of %d. Please upgrade your Central instance", DB_MINIMUM_VERSION);
+		fprintf(
+			stderr,
+			"Central database schema version too low.  This controller version requires a minimum schema version of "
+			"%d. Please upgrade your Central instance",
+			DB_MINIMUM_VERSION);
 		exit(1);
 	}
 	_pool->unborrow(c);
@@ -135,7 +140,9 @@ CV1::CV1(const Identity& myId, const char* path, int listenPort, RedisConfig* rc
 
 	_readyLock.lock();
 
-	fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt());
+	fprintf(
+		stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S,
+		::_timestr(), (unsigned long long)_myAddress.toInt());
 	_waitNoticePrinted = true;
 
 	initializeNetworks();
@@ -198,7 +205,8 @@ void CV1::configureSmee()
 
 	if (scheme != NULL && host != NULL && port != NULL && ns != NULL && task_queue != NULL) {
 		fprintf(stderr, "creating smee client\n");
-		std::string hostPort = std::string(scheme) + std::string("://") + std::string(host) + std::string(":") + std::string(port);
+		std::string hostPort =
+			std::string(scheme) + std::string("://") + std::string(host) + std::string(":") + std::string(port);
 		this->_smee = rustybits::smee_client_new(hostPort.c_str(), ns, task_queue);
 	}
 	else {
@@ -328,7 +336,11 @@ void CV1::eraseMember(const uint64_t networkId, const uint64_t memberId)
 	_memberChanged(tmp.first, nullJson, true);
 }
 
-void CV1::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress, const char* osArch)
+void CV1::nodeIsOnline(
+	const uint64_t networkId,
+	const uint64_t memberId,
+	const InetAddress& physicalAddress,
+	const char* osArch)
 {
 	auto provider = opentelemetry::trace::Provider::GetTracerProvider();
 	auto tracer = provider->GetTracer("cv1");
@@ -393,15 +405,17 @@ AuthInfo CV1::getSSOAuthInfo(const nlohmann::json& member, const std::string& re
 		std::string nonce = "";
 
 		// check if the member exists first.
-		pqxx::row count = w.exec_params1("SELECT count(id) FROM ztc_member WHERE id = $1 AND network_id = $2 AND deleted = false", memberId, networkId);
+		pqxx::row count = w.exec_params1(
+			"SELECT count(id) FROM ztc_member WHERE id = $1 AND network_id = $2 AND deleted = false", memberId,
+			networkId);
 		if (count[0].as<int>() == 1) {
 			// get active nonce, if exists.
 			pqxx::result r = w.exec_params(
 				"SELECT nonce FROM ztc_sso_expiry "
 				"WHERE network_id = $1 AND member_id = $2 "
-				"AND ((NOW() AT TIME ZONE 'UTC') <= authentication_expiry_time) AND ((NOW() AT TIME ZONE 'UTC') <= nonce_expiration)",
-				networkId,
-				memberId);
+				"AND ((NOW() AT TIME ZONE 'UTC') <= authentication_expiry_time) AND ((NOW() AT TIME ZONE 'UTC') <= "
+				"nonce_expiration)",
+				networkId, memberId);
 
 			if (r.size() == 0) {
 				// no active nonce.
@@ -410,8 +424,7 @@ AuthInfo CV1::getSSOAuthInfo(const nlohmann::json& member, const std::string& re
 					"SELECT nonce FROM ztc_sso_expiry "
 					"WHERE network_id = $1 AND member_id = $2 "
 					"AND authentication_expiry_time IS NULL AND ((NOW() AT TIME ZONE 'UTC') <= nonce_expiration)",
-					networkId,
-					memberId);
+					networkId, memberId);
 
 				if (r.size() == 1) {
 					// we have an existing nonce.  Use it
@@ -429,10 +442,7 @@ AuthInfo CV1::getSSOAuthInfo(const nlohmann::json& member, const std::string& re
 						"INSERT INTO ztc_sso_expiry "
 						"(nonce, nonce_expiration, network_id, member_id) VALUES "
 						"($1, TO_TIMESTAMP($2::double precision/1000), $3, $4)",
-						nonce,
-						OSUtils::now() + 300000,
-						networkId,
-						memberId);
+						nonce, OSUtils::now() + 300000, networkId, memberId);
 
 					w.commit();
 				}
@@ -478,7 +488,9 @@ AuthInfo CV1::getSSOAuthInfo(const nlohmann::json& member, const std::string& re
 				sso_version = r.at(0)[4].as<std::optional<uint64_t> >().value_or(1);
 			}
 			else if (r.size() > 1) {
-				fprintf(stderr, "ERROR: More than one auth endpoint for an organization?!?!? NetworkID: %s\n", networkId.c_str());
+				fprintf(
+					stderr, "ERROR: More than one auth endpoint for an organization?!?!? NetworkID: %s\n",
+					networkId.c_str());
 			}
 			else {
 				fprintf(stderr, "No client or auth endpoint?!?\n");
@@ -496,13 +508,10 @@ AuthInfo CV1::getSSOAuthInfo(const nlohmann::json& member, const std::string& re
 				if (info.version == 0) {
 					char url[2048] = { 0 };
 					OSUtils::ztsnprintf(
-						url,
-						sizeof(authenticationURL),
-						"%s?response_type=id_token&response_mode=form_post&scope=openid+email+profile&redirect_uri=%s&nonce=%s&state=%s&client_id=%s",
-						authorization_endpoint.c_str(),
-						url_encode(redirectURL).c_str(),
-						nonce.c_str(),
-						state_hex,
+						url, sizeof(authenticationURL),
+						"%s?response_type=id_token&response_mode=form_post&scope=openid+email+profile&redirect_uri=%s&"
+						"nonce=%s&state=%s&client_id=%s",
+						authorization_endpoint.c_str(), url_encode(redirectURL).c_str(), nonce.c_str(), state_hex,
 						client_id.c_str());
 					info.authenticationURL = std::string(url);
 				}
@@ -516,18 +525,17 @@ AuthInfo CV1::getSSOAuthInfo(const nlohmann::json& member, const std::string& re
 #ifdef ZT_DEBUG
 					fprintf(
 						stderr,
-						"ssoClientID: %s\nissuerURL: %s\nssoNonce: %s\nssoState: %s\ncentralAuthURL: %s\nprovider: %s\n",
-						info.ssoClientID.c_str(),
-						info.issuerURL.c_str(),
-						info.ssoNonce.c_str(),
-						info.ssoState.c_str(),
-						info.centralAuthURL.c_str(),
-						provider.c_str());
+						"ssoClientID: %s\nissuerURL: %s\nssoNonce: %s\nssoState: %s\ncentralAuthURL: %s\nprovider: "
+						"%s\n",
+						info.ssoClientID.c_str(), info.issuerURL.c_str(), info.ssoNonce.c_str(), info.ssoState.c_str(),
+						info.centralAuthURL.c_str(), provider.c_str());
 #endif
 				}
 			}
 			else {
-				fprintf(stderr, "client_id: %s\nauthorization_endpoint: %s\n", client_id.c_str(), authorization_endpoint.c_str());
+				fprintf(
+					stderr, "client_id: %s\nauthorization_endpoint: %s\n", client_id.c_str(),
+					authorization_endpoint.c_str());
 			}
 		}
 
@@ -573,13 +581,18 @@ void CV1::initializeNetworks()
 		char qbuf[2048] = { 0 };
 		sprintf(
 			qbuf,
-			"SELECT n.id, (EXTRACT(EPOCH FROM n.creation_time AT TIME ZONE 'UTC')*1000)::bigint as creation_time, n.capabilities, "
-			"n.enable_broadcast, (EXTRACT(EPOCH FROM n.last_modified AT TIME ZONE 'UTC')*1000)::bigint AS last_modified, n.mtu, n.multicast_limit, n.name, n.private, n.remote_trace_level, "
-			"n.remote_trace_target, n.revision, n.rules, n.tags, n.v4_assign_mode, n.v6_assign_mode, n.sso_enabled, (CASE WHEN n.sso_enabled THEN noc.client_id ELSE NULL END) as client_id, "
+			"SELECT n.id, (EXTRACT(EPOCH FROM n.creation_time AT TIME ZONE 'UTC')*1000)::bigint as creation_time, "
+			"n.capabilities, "
+			"n.enable_broadcast, (EXTRACT(EPOCH FROM n.last_modified AT TIME ZONE 'UTC')*1000)::bigint AS "
+			"last_modified, n.mtu, n.multicast_limit, n.name, n.private, n.remote_trace_level, "
+			"n.remote_trace_target, n.revision, n.rules, n.tags, n.v4_assign_mode, n.v6_assign_mode, n.sso_enabled, "
+			"(CASE WHEN n.sso_enabled THEN noc.client_id ELSE NULL END) as client_id, "
 			"(CASE WHEN n.sso_enabled THEN oc.authorization_endpoint ELSE NULL END) as authorization_endpoint, "
 			"(CASE WHEN n.sso_enabled THEN oc.provider ELSE NULL END) as provider, d.domain, d.servers, "
-			"ARRAY(SELECT CONCAT(host(ip_range_start),'|', host(ip_range_end)) FROM ztc_network_assignment_pool WHERE network_id = n.id) AS assignment_pool, "
-			"ARRAY(SELECT CONCAT(host(address),'/',bits::text,'|',COALESCE(host(via), 'NULL'))FROM ztc_network_route WHERE network_id = n.id) AS routes "
+			"ARRAY(SELECT CONCAT(host(ip_range_start),'|', host(ip_range_end)) FROM ztc_network_assignment_pool WHERE "
+			"network_id = n.id) AS assignment_pool, "
+			"ARRAY(SELECT CONCAT(host(address),'/',bits::text,'|',COALESCE(host(via), 'NULL'))FROM ztc_network_route "
+			"WHERE network_id = n.id) AS routes "
 			"FROM ztc_network n "
 			"LEFT OUTER JOIN ztc_org o "
 			" ON o.owner_id = n.owner_id "
@@ -813,7 +826,9 @@ void CV1::initializeNetworks()
 
 		if (++this->_ready == 2) {
 			if (_waitNoticePrinted) {
-				fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(), (unsigned long long)_myAddress.toInt());
+				fprintf(
+					stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(),
+					(unsigned long long)_myAddress.toInt());
 			}
 			_readyLock.unlock();
 		}
@@ -899,12 +914,14 @@ void CV1::initializeMembers()
 			"	FROM ztc_sso_expiry e "
 			"	INNER JOIN ztc_network n1 "
 			"	ON n1.id = e.network_id  AND n1.deleted = TRUE "
-			"	WHERE e.network_id = m.network_id AND e.member_id = m.id AND n.sso_enabled = TRUE AND e.authentication_expiry_time IS NOT NULL "
+			"	WHERE e.network_id = m.network_id AND e.member_id = m.id AND n.sso_enabled = TRUE AND "
+			"e.authentication_expiry_time IS NOT NULL "
 			"	ORDER BY e.authentication_expiry_time DESC LIMIT 1 "
 			" ) "
 			" ELSE NULL "
 			" END) AS authentication_expiry_time, "
-			"ARRAY(SELECT DISTINCT address FROM ztc_member_ip_assignment WHERE member_id = m.id AND network_id = m.network_id) AS assigned_addresses "
+			"ARRAY(SELECT DISTINCT address FROM ztc_member_ip_assignment WHERE member_id = m.id AND network_id = "
+			"m.network_id) AS assigned_addresses "
 			"FROM ztc_member m "
 			"INNER JOIN ztc_network n "
 			"	ON n.id = m.network_id "
@@ -1089,7 +1106,9 @@ void CV1::initializeMembers()
 
 		if (++this->_ready == 2) {
 			if (_waitNoticePrinted) {
-				fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(), (unsigned long long)_myAddress.toInt());
+				fprintf(
+					stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(),
+					(unsigned long long)_myAddress.toInt());
 			}
 			_readyLock.unlock();
 		}
@@ -1150,13 +1169,17 @@ void CV1::heartbeat()
 				pqxx::work w { *c->c };
 
 				pqxx::result res = w.exec0(
-					"INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis, redis_member_status) "
+					"INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, "
+					"v_rev, v_build, host_port, use_redis, redis_member_status) "
 					"VALUES ("
-					+ w.quote(controllerId) + ", " + w.quote(hostname) + ", TO_TIMESTAMP(" + now + "::double precision/1000), " + w.quote(publicIdentity) + ", " + major + ", " + minor + ", " + rev + ", " + build + ", " + host_port + ", "
-					+ use_redis + ", " + redis_mem_status
+					+ w.quote(controllerId) + ", " + w.quote(hostname) + ", TO_TIMESTAMP(" + now
+					+ "::double precision/1000), " + w.quote(publicIdentity) + ", " + major + ", " + minor + ", " + rev
+					+ ", " + build + ", " + host_port + ", " + use_redis + ", " + redis_mem_status
 					+ ") "
-					  "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
-					  "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
+					  "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = "
+					  "EXCLUDED.last_alive, "
+					  "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = "
+					  "EXCLUDED.v_minor, "
 					  "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, "
 					  "use_redis = EXCLUDED.use_redis, redis_member_status = EXCLUDED.redis_member_status");
 				w.commit();
@@ -1200,7 +1223,8 @@ void CV1::membersDbWatcher()
 	}
 
 	if (_run == 1) {
-		fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
+		fprintf(
+			stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
 		exit(9);
 	}
 	fprintf(stderr, "Exited membersDbWatcher\n");
@@ -1306,7 +1330,9 @@ void CV1::networksDbWatcher()
 	}
 
 	if (_run == 1) {
-		fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
+		fprintf(
+			stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n",
+			_myAddressStr.c_str());
 		exit(8);
 	}
 	fprintf(stderr, "Exited networksDbWatcher\n");
@@ -1383,7 +1409,9 @@ void CV1::_networksWatcher_Redis()
 								}
 							}
 							catch (std::exception& e) {
-								fprintf(stderr, "json parse error in networkWatcher_Redis: what: %s json: %s\n", e.what(), a.second.c_str());
+								fprintf(
+									stderr, "json parse error in networkWatcher_Redis: what: %s json: %s\n", e.what(),
+									a.second.c_str());
 							}
 						}
 						if (_rc->clusterMode) {
@@ -1468,7 +1496,8 @@ void CV1::commitThread()
 						continue;
 					}
 
-					pqxx::row mrow = w.exec_params1("SELECT COUNT(id) FROM ztc_member WHERE id = $1 AND network_id = $2", memberId, networkId);
+					pqxx::row mrow = w.exec_params1(
+						"SELECT COUNT(id) FROM ztc_member WHERE id = $1 AND network_id = $2", memberId, networkId);
 					int membercount = mrow[0].as<int>();
 
 					bool isNewMember = false;
@@ -1478,27 +1507,17 @@ void CV1::commitThread()
 						pqxx::result res = w.exec_params0(
 							"INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
 							"identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
-							"remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
+							"remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, "
+							"v_proto) "
 							"VALUES ($1, $2, $3, $4, $5, $6, "
 							"TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
 							"$9, $10, $11, $12, $13, $14, $15, $16, $17)",
-							memberId,
-							networkId,
-							(bool)config["activeBridge"],
-							(bool)config["authorized"],
-							OSUtils::jsonDump(config["capabilities"], -1),
-							OSUtils::jsonString(config["identity"], ""),
-							(uint64_t)config["lastAuthorizedTime"],
-							(uint64_t)config["lastDeauthorizedTime"],
-							(bool)config["noAutoAssignIps"],
-							(int)config["remoteTraceLevel"],
-							target,
-							(uint64_t)config["revision"],
-							OSUtils::jsonDump(config["tags"], -1),
-							(int)config["vMajor"],
-							(int)config["vMinor"],
-							(int)config["vRev"],
-							(int)config["vProto"]);
+							memberId, networkId, (bool)config["activeBridge"], (bool)config["authorized"],
+							OSUtils::jsonDump(config["capabilities"], -1), OSUtils::jsonString(config["identity"], ""),
+							(uint64_t)config["lastAuthorizedTime"], (uint64_t)config["lastDeauthorizedTime"],
+							(bool)config["noAutoAssignIps"], (int)config["remoteTraceLevel"], target,
+							(uint64_t)config["revision"], OSUtils::jsonDump(config["tags"], -1), (int)config["vMajor"],
+							(int)config["vMinor"], (int)config["vRev"], (int)config["vProto"]);
 					}
 					else {
 						// existing member
@@ -1510,27 +1529,18 @@ void CV1::commitThread()
 							"no_auto_assign_ips = $9, remote_trace_level = $10, remote_trace_target= $11, "
 							"revision = $12, tags = $13, v_major = $14, v_minor = $15, v_rev = $16, v_proto = $17 "
 							"WHERE id = $1 AND network_id = $2",
-							memberId,
-							networkId,
-							(bool)config["activeBridge"],
-							(bool)config["authorized"],
-							OSUtils::jsonDump(config["capabilities"], -1),
-							OSUtils::jsonString(config["identity"], ""),
-							(uint64_t)config["lastAuthorizedTime"],
-							(uint64_t)config["lastDeauthorizedTime"],
-							(bool)config["noAutoAssignIps"],
-							(int)config["remoteTraceLevel"],
-							target,
-							(uint64_t)config["revision"],
-							OSUtils::jsonDump(config["tags"], -1),
-							(int)config["vMajor"],
-							(int)config["vMinor"],
-							(int)config["vRev"],
-							(int)config["vProto"]);
+							memberId, networkId, (bool)config["activeBridge"], (bool)config["authorized"],
+							OSUtils::jsonDump(config["capabilities"], -1), OSUtils::jsonString(config["identity"], ""),
+							(uint64_t)config["lastAuthorizedTime"], (uint64_t)config["lastDeauthorizedTime"],
+							(bool)config["noAutoAssignIps"], (int)config["remoteTraceLevel"], target,
+							(uint64_t)config["revision"], OSUtils::jsonDump(config["tags"], -1), (int)config["vMajor"],
+							(int)config["vMinor"], (int)config["vRev"], (int)config["vProto"]);
 					}
 
 					if (! isNewMember) {
-						pqxx::result res = w.exec_params0("DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", memberId, networkId);
+						pqxx::result res = w.exec_params0(
+							"DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", memberId,
+							networkId);
 					}
 
 					std::vector<std::string> assignments;
@@ -1542,7 +1552,10 @@ void CV1::commitThread()
 							continue;
 						}
 
-						pqxx::result res = w.exec_params0("INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) ON CONFLICT (network_id, member_id, address) DO NOTHING", memberId, networkId, addr);
+						pqxx::result res = w.exec_params0(
+							"INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) "
+							"ON CONFLICT (network_id, member_id, address) DO NOTHING",
+							memberId, networkId, addr);
 
 						assignments.push_back(addr);
 					}
@@ -1586,11 +1599,15 @@ void CV1::commitThread()
 						_memberChanged(memOrig, memNew, qitem.second);
 					}
 					else {
-						fprintf(stderr, "%s: Can't notify of change.  Error parsing nwid or memberid: %llu-%llu\n", _myAddressStr.c_str(), (unsigned long long)nwidInt, (unsigned long long)memberidInt);
+						fprintf(
+							stderr, "%s: Can't notify of change.  Error parsing nwid or memberid: %llu-%llu\n",
+							_myAddressStr.c_str(), (unsigned long long)nwidInt, (unsigned long long)memberidInt);
 					}
 				}
 				catch (std::exception& e) {
-					fprintf(stderr, "%s ERROR: Error updating member %s-%s: %s\n", _myAddressStr.c_str(), networkId.c_str(), memberId.c_str(), e.what());
+					fprintf(
+						stderr, "%s ERROR: Error updating member %s-%s: %s\n", _myAddressStr.c_str(), networkId.c_str(),
+						memberId.c_str(), e.what());
 					mspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
 				}
 			}
@@ -1619,12 +1636,14 @@ void CV1::commitThread()
 					// did not previously exist. If the record already exists owner_id is left
 					// unchanged, so owner_id should be left out of the update clause.
 					pqxx::result res = w.exec_params0(
-						"INSERT INTO ztc_network (id, creation_time, owner_id, controller_id, capabilities, enable_broadcast, "
+						"INSERT INTO ztc_network (id, creation_time, owner_id, controller_id, capabilities, "
+						"enable_broadcast, "
 						"last_modified, mtu, multicast_limit, name, private, "
 						"remote_trace_level, remote_trace_target, rules, rules_source, "
 						"tags, v4_assign_mode, v6_assign_mode, sso_enabled) VALUES ("
 						"$1, TO_TIMESTAMP($5::double precision/1000), "
-						"(SELECT user_id AS owner_id FROM ztc_global_permissions WHERE authorize = true AND del = true AND modify = true AND read = true LIMIT 1),"
+						"(SELECT user_id AS owner_id FROM ztc_global_permissions WHERE authorize = true AND del = true "
+						"AND modify = true AND read = true LIMIT 1),"
 						"$2, $3, $4, TO_TIMESTAMP($5::double precision/1000), "
 						"$6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) "
 						"ON CONFLICT (id) DO UPDATE set controller_id = EXCLUDED.controller_id, "
@@ -1636,25 +1655,15 @@ void CV1::commitThread()
 						"rules_source = EXCLUDED.rules_source, tags = EXCLUDED.tags, "
 						"v4_assign_mode = EXCLUDED.v4_assign_mode, v6_assign_mode = EXCLUDED.v6_assign_mode, "
 						"sso_enabled = EXCLUDED.sso_enabled",
-						id,
-						_myAddressStr,
-						OSUtils::jsonDump(config["capabilities"], -1),
-						(bool)config["enableBroadcast"],
-						OSUtils::now(),
-						(int)config["mtu"],
-						(int)config["multicastLimit"],
-						OSUtils::jsonString(config["name"], ""),
-						(bool)config["private"],
-						(int)config["remoteTraceLevel"],
-						remoteTraceTarget,
-						OSUtils::jsonDump(config["rules"], -1),
-						rulesSource,
-						OSUtils::jsonDump(config["tags"], -1),
-						OSUtils::jsonDump(config["v4AssignMode"], -1),
-						OSUtils::jsonDump(config["v6AssignMode"], -1),
+						id, _myAddressStr, OSUtils::jsonDump(config["capabilities"], -1),
+						(bool)config["enableBroadcast"], OSUtils::now(), (int)config["mtu"],
+						(int)config["multicastLimit"], OSUtils::jsonString(config["name"], ""), (bool)config["private"],
+						(int)config["remoteTraceLevel"], remoteTraceTarget, OSUtils::jsonDump(config["rules"], -1),
+						rulesSource, OSUtils::jsonDump(config["tags"], -1),
+						OSUtils::jsonDump(config["v4AssignMode"], -1), OSUtils::jsonDump(config["v6AssignMode"], -1),
 						OSUtils::jsonBool(config["ssoEnabled"], false));
 
-					res = w.exec_params0("DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", 0);
+					res = w.exec_params0("DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", id);
 
 					auto pool = config["ipAssignmentPools"];
 					bool err = false;
@@ -1665,9 +1674,7 @@ void CV1::commitThread()
 						res = w.exec_params0(
 							"INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) "
 							"VALUES ($1, $2, $3)",
-							id,
-							start,
-							end);
+							id, start, end);
 					}
 
 					res = w.exec_params0("DELETE FROM ztc_network_route WHERE network_id = $1", id);
@@ -1692,7 +1699,9 @@ void CV1::commitThread()
 							via = (*i)["via"];
 						}
 
-						res = w.exec_params0("INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)", id, targetAddr, targetBits, (via == "NULL" ? NULL : via.c_str()));
+						res = w.exec_params0(
+							"INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)",
+							id, targetAddr, targetBits, (via == "NULL" ? NULL : via.c_str()));
 					}
 					if (err) {
 						fprintf(stderr, "%s: route add error\n", _myAddressStr.c_str());
@@ -1715,7 +1724,10 @@ void CV1::commitThread()
 
 					std::string s = servers.str();
 
-					res = w.exec_params0("INSERT INTO ztc_network_dns (network_id, domain, servers) VALUES ($1, $2, $3) ON CONFLICT (network_id) DO UPDATE SET domain = EXCLUDED.domain, servers = EXCLUDED.servers", id, domain, s);
+					res = w.exec_params0(
+						"INSERT INTO ztc_network_dns (network_id, domain, servers) VALUES ($1, $2, $3) ON CONFLICT "
+						"(network_id) DO UPDATE SET domain = EXCLUDED.domain, servers = EXCLUDED.servers",
+						id, domain, s);
 
 					w.commit();
 
@@ -1729,7 +1741,9 @@ void CV1::commitThread()
 						_networkChanged(nwOrig, nwNew, qitem.second);
 					}
 					else {
-						fprintf(stderr, "%s: Can't notify network changed: %llu\n", _myAddressStr.c_str(), (unsigned long long)nwidInt);
+						fprintf(
+							stderr, "%s: Can't notify network changed: %llu\n", _myAddressStr.c_str(),
+							(unsigned long long)nwidInt);
 					}
 				}
 				catch (std::exception& e) {
@@ -1803,7 +1817,9 @@ void CV1::commitThread()
 					std::string memberId = config["id"];
 					std::string networkId = config["nwid"];
 
-					pqxx::result res = w.exec_params0("UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2", memberId, networkId);
+					pqxx::result res = w.exec_params0(
+						"UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2",
+						memberId, networkId);
 
 					w.commit();
 				}
@@ -1927,7 +1943,8 @@ void CV1::onlineNotification_Postgres()
 				std::string memberId(memTmp);
 
 				try {
-					pqxx::row r = w2.exec_params1("SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2", networkId, memberId);
+					pqxx::row r = w2.exec_params1(
+						"SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2", networkId, memberId);
 				}
 				catch (pqxx::unexpected_rows& e) {
 					continue;
@@ -1946,8 +1963,9 @@ void CV1::onlineNotification_Postgres()
 				}
 
 				std::stringstream memberUpdate;
-				memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated, os, arch) VALUES "
-							 << "('" << networkId << "', '" << memberId << "', ";
+				memberUpdate
+					<< "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated, os, arch) VALUES "
+					<< "('" << networkId << "', '" << memberId << "', ";
 				if (ipAddr.empty()) {
 					memberUpdate << "NULL, ";
 				}
@@ -1958,7 +1976,8 @@ void CV1::onlineNotification_Postgres()
 							 << "'" << os << "', "
 							 << "'" << arch << "'"
 							 << ") "
-							 << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated, "
+							 << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, "
+								"last_updated = EXCLUDED.last_updated, "
 							 << "os = EXCLUDED.os, arch = EXCLUDED.arch";
 
 				pipe.insert(memberUpdate.str());
@@ -1980,7 +1999,9 @@ void CV1::onlineNotification_Postgres()
 		_pool->unborrow(c);
 
 		ConnectionPoolStats stats = _pool->get_stats();
-		fprintf(stderr, "%s pool stats: in use size: %llu, available size: %llu, total: %llu\n", _myAddressStr.c_str(), stats.borrowed_size, stats.pool_size, (stats.borrowed_size + stats.pool_size));
+		fprintf(
+			stderr, "%s pool stats: in use size: %llu, available size: %llu, total: %llu\n", _myAddressStr.c_str(),
+			stats.borrowed_size, stats.pool_size, (stats.borrowed_size + stats.pool_size));
 
 		span->End();
 
@@ -1988,7 +2009,9 @@ void CV1::onlineNotification_Postgres()
 	}
 	fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
 	if (_run == 1) {
-		fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str());
+		fprintf(
+			stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n",
+			_myAddressStr.c_str());
 		exit(6);
 	}
 }
@@ -2042,7 +2065,10 @@ void CV1::onlineNotification_Redis()
 	}
 }
 
-uint64_t CV1::_doRedisUpdate(sw::redis::Transaction& tx, std::string& controllerId, std::unordered_map<std::pair<uint64_t, uint64_t>, NodeOnlineRecord, _PairHasher>& lastOnline)
+uint64_t CV1::_doRedisUpdate(
+	sw::redis::Transaction& tx,
+	std::string& controllerId,
+	std::unordered_map<std::pair<uint64_t, uint64_t>, NodeOnlineRecord, _PairHasher>& lastOnline)
 {
 	auto provider = opentelemetry::trace::Provider::GetTracerProvider();
 	auto tracer = provider->GetTracer("cv1");
@@ -2079,7 +2105,11 @@ uint64_t CV1::_doRedisUpdate(sw::redis::Transaction& tx, std::string& controller
 			arch = osArchSplit[1];
 		}
 
-		std::unordered_map<std::string, std::string> record = { { "id", memberId }, { "address", ipAddr }, { "last_updated", std::to_string(ts) }, { "os", os }, { "arch", arch } };
+		std::unordered_map<std::string, std::string> record = { { "id", memberId },
+																{ "address", ipAddr },
+																{ "last_updated", std::to_string(ts) },
+																{ "os", os },
+																{ "arch", arch } };
 		tx.zadd("nodes-online:{" + controllerId + "}", memberId, ts)
 			.zadd("nodes-online2:{" + controllerId + "}", networkId + "-" + memberId, ts)
 			.zadd("network-nodes-online:{" + controllerId + "}:" + networkId, memberId, ts)
@@ -2093,16 +2123,24 @@ uint64_t CV1::_doRedisUpdate(sw::redis::Transaction& tx, std::string& controller
 	// expire records from all-nodes and network-nodes member list
 	uint64_t expireOld = OSUtils::now() - 300000;
 
-	tx.zremrangebyscore("nodes-online:{" + controllerId + "}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
-	tx.zremrangebyscore("nodes-online2:{" + controllerId + "}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
-	tx.zremrangebyscore("active-networks:{" + controllerId + "}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
+	tx.zremrangebyscore(
+		"nodes-online:{" + controllerId + "}",
+		sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
+	tx.zremrangebyscore(
+		"nodes-online2:{" + controllerId + "}",
+		sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
+	tx.zremrangebyscore(
+		"active-networks:{" + controllerId + "}",
+		sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
 	{
 		std::shared_lock<std::shared_mutex> l(_networks_l);
 		for (const auto& it : _networks) {
 			uint64_t nwid_i = it.first;
 			char nwidTmp[64];
 			OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
-			tx.zremrangebyscore("network-nodes-online:{" + controllerId + "}:" + nwidTmp, sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
+			tx.zremrangebyscore(
+				"network-nodes-online:{" + controllerId + "}:" + nwidTmp,
+				sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
 		}
 	}
 	tx.exec();

+ 13 - 14
controller/CentralDB.cpp

@@ -184,9 +184,9 @@ CentralDB::CentralDB(
 		case LISTENER_MODE_PUBSUB:
 			if (cc->pubSubConfig != NULL) {
 				_membersDbWatcher =
-					std::make_shared<PubSubMemberListener>(_myAddressStr, cc->pubSubConfig->listen_timeout, this);
+					std::make_shared<PubSubMemberListener>(_myAddressStr, cc->pubSubConfig->project, this);
 				_networksDbWatcher =
-					std::make_shared<PubSubNetworkListener>(_myAddressStr, cc->pubSubConfig->listen_timeout, this);
+					std::make_shared<PubSubNetworkListener>(_myAddressStr, cc->pubSubConfig->project, this);
 			}
 			else {
 				throw std::runtime_error(
@@ -1250,7 +1250,6 @@ void CentralDB::commitThread()
 
 					std::string id = config["id"];
 
-					// network must already exist
 					pqxx::result res = w.exec_params0(
 						"INSERT INTO networks_ctl (id, name, configuration, controller_id, revision) "
 						"VALUES ($1, $2, $3, $4, $5) "
@@ -1261,19 +1260,19 @@ void CentralDB::commitThread()
 
 					w.commit();
 
-					res = w.exec_params0("DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", 0);
+					// res = w.exec_params0("DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", 0);
 
-					auto pool = config["ipAssignmentPools"];
-					bool err = false;
-					for (auto i = pool.begin(); i != pool.end(); ++i) {
-						std::string start = (*i)["ipRangeStart"];
-						std::string end = (*i)["ipRangeEnd"];
+					// auto pool = config["ipAssignmentPools"];
+					// bool err = false;
+					// for (auto i = pool.begin(); i != pool.end(); ++i) {
+					// 	std::string start = (*i)["ipRangeStart"];
+					// 	std::string end = (*i)["ipRangeEnd"];
 
-						res = w.exec_params0(
-							"INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) "
-							"VALUES ($1, $2, $3)",
-							id, start, end);
-					}
+					// 	res = w.exec_params0(
+					// 		"INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) "
+					// 		"VALUES ($1, $2, $3)",
+					// 		id, start, end);
+					// }
 
 					const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL);
 					if (nwidInt) {

+ 237 - 75
controller/PubSubListener.cpp

@@ -2,68 +2,129 @@
 #include "PubSubListener.hpp"
 
 #include "DB.hpp"
+#include "member.pb.h"
+#include "network.pb.h"
 #include "opentelemetry/trace/provider.h"
 #include "rustybits.h"
 
+#include <google/cloud/pubsub/admin/subscription_admin_client.h>
+#include <google/cloud/pubsub/admin/subscription_admin_connection.h>
+#include <google/cloud/pubsub/message.h>
+#include <google/cloud/pubsub/subscriber.h>
+#include <google/cloud/pubsub/subscription.h>
+#include <google/cloud/pubsub/topic.h>
 #include <nlohmann/json.hpp>
 
+namespace pubsub = ::google::cloud::pubsub;
+namespace pubsub_admin = ::google::cloud::pubsub_admin;
+
 namespace ZeroTier {
 
-void listener_callback(void* user_ptr, const uint8_t* payload, uintptr_t length)
+nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc);
+nlohmann::json toJson(const pbmessages::MemberChange_Member& mc);
+
+PubSubListener::PubSubListener(std::string controller_id, std::string project, std::string topic)
+	: _controller_id(controller_id)
+	, _project(project)
+	, _topic(topic)
+	, _subscription_id("sub-" + controller_id + "-network-changes")
+	, _run(false)
+	, _adminClient(pubsub_admin::MakeSubscriptionAdminConnection())
+	, _subscription(pubsub::Subscription(_project, _subscription_id))
 {
-	if (! user_ptr || ! payload || length == 0) {
-		fprintf(stderr, "Invalid parameters in listener_callback\n");
-		return;
+	GOOGLE_PROTOBUF_VERIFY_VERSION;
+
+	google::pubsub::v1::Subscription request;
+	request.set_name(_subscription.FullName());
+	request.set_topic(pubsub::Topic(project, topic).FullName());
+	request.set_filter("(attributes.controller_id=\"" + _controller_id + "\")");
+	auto sub = _adminClient.CreateSubscription(request);
+	if (! sub.ok()) {
+		fprintf(stderr, "Failed to create subscription: %s\n", sub.status().message().c_str());
+		throw std::runtime_error("Failed to create subscription");
 	}
 
-	auto* listener = static_cast<PubSubListener*>(user_ptr);
-	std::string payload_str(reinterpret_cast<const char*>(payload), length);
-	listener->onNotification(payload_str);
-}
+	if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
+		fprintf(stderr, "Subscription already exists\n");
+		throw std::runtime_error("Subscription already exists");
+	}
 
-PubSubNetworkListener::PubSubNetworkListener(std::string controller_id, uint64_t listen_timeout, DB* db) : _run(true), _controller_id(controller_id), _db(db), _listener(nullptr)
-{
-	_listener = rustybits::network_listener_new(_controller_id.c_str(), listen_timeout, listener_callback, this);
-	_listenThread = std::thread(&PubSubNetworkListener::listenThread, this);
-	_changeHandlerThread = std::thread(&PubSubNetworkListener::changeHandlerThread, this);
+	_subscriber = std::make_shared<pubsub::Subscriber>(pubsub::MakeSubscriberConnection(_subscription));
+
+	_run = true;
+	_subscriberThread = std::thread(&PubSubListener::subscribe, this);
 }
 
-PubSubNetworkListener::~PubSubNetworkListener()
+PubSubListener::~PubSubListener()
 {
 	_run = false;
-	if (_listenThread.joinable()) {
-		_listenThread.join();
+	_session.cancel();
+	if (_subscriberThread.joinable()) {
+		_subscriberThread.join();
 	}
+}
+
+void PubSubListener::subscribe()
+{
+	while (_run) {
+		_session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) {
+			auto provider = opentelemetry::trace::Provider::GetTracerProvider();
+			auto tracer = provider->GetTracer("PubSubListener");
+			auto span = tracer->StartSpan("PubSubListener::onMessage");
+			auto scope = tracer->WithActiveSpan(span);
+			span->SetAttribute("message_id", m.message_id());
+			span->SetAttribute("ordering_key", m.ordering_key());
+			span->SetAttribute("attributes", m.attributes().size());
 
-	if (_listener) {
-		rustybits::network_listener_delete(_listener);
-		_listener = nullptr;
+			fprintf(stderr, "Received message %s\n", m.message_id().c_str());
+			onNotification(m.data());
+			std::move(h).ack();
+			span->SetStatus(opentelemetry::trace::StatusCode::kOk);
+			return true;
+		});
+		auto status = _session.get();
+		if (! status.ok() && _run) {
+			fprintf(stderr, "Error during Subscribe: %s\n", status.message().c_str());
+		}
 	}
 }
 
+PubSubNetworkListener::PubSubNetworkListener(std::string controller_id, std::string project, DB* db)
+	: PubSubListener(controller_id, project, "controller-network-change-stream")
+	, _db(db)
+{
+}
+
+PubSubNetworkListener::~PubSubNetworkListener()
+{
+}
+
 void PubSubNetworkListener::onNotification(const std::string& payload)
 {
 	auto provider = opentelemetry::trace::Provider::GetTracerProvider();
 	auto tracer = provider->GetTracer("PubSubNetworkListener");
 	auto span = tracer->StartSpan("PubSubNetworkListener::onNotification");
 	auto scope = tracer->WithActiveSpan(span);
-	span->SetAttribute("payload", payload);
 
-	fprintf(stderr, "Network notification received: %s\n", payload.c_str());
+	pbmessages::NetworkChange nc;
+	if (! nc.ParseFromString(payload)) {
+		fprintf(stderr, "Failed to parse NetworkChange protobuf message\n");
+		span->SetAttribute("error", "Failed to parse NetworkChange protobuf message");
+		span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf");
+		return;
+	}
+
+	fprintf(stderr, "Network notification received");
 
 	try {
-		nlohmann::json j = nlohmann::json::parse(payload);
-		nlohmann::json& ov_tmp = j["old"];
-		nlohmann::json& nv_tmp = j["new"];
 		nlohmann::json oldConfig, newConfig;
 
-		if (ov_tmp.is_object()) {
-			// TODO:  copy old configuration to oldConfig
-			// changing key names along the way
+		if (nc.has_old()) {
+			oldConfig = toJson(nc.old());
 		}
-		if (nv_tmp.is_object()) {
-			// TODO:  copy new configuration to newConfig
-			// changing key names along the way
+
+		if (nc.has_new_()) {
+			newConfig = toJson(nc.new_());
 		}
 
 		if (oldConfig.is_object() && newConfig.is_object()) {
@@ -106,41 +167,14 @@ void PubSubNetworkListener::onNotification(const std::string& payload)
 	}
 }
 
-void PubSubNetworkListener::listenThread()
-{
-	if (_listener) {
-		while (_run) {
-			rustybits::network_listener_listen(_listener);
-		}
-	}
-}
-
-void PubSubNetworkListener::changeHandlerThread()
+PubSubMemberListener::PubSubMemberListener(std::string controller_id, std::string project, DB* db)
+	: PubSubListener(controller_id, project, "controller-member-change-stream")
+	, _db(db)
 {
-	if (_listener) {
-		rustybits::network_listener_change_handler(_listener);
-	}
-}
-
-PubSubMemberListener::PubSubMemberListener(std::string controller_id, uint64_t listen_timeout, DB* db) : _run(true), _controller_id(controller_id), _db(db), _listener(nullptr)
-{
-	_run = true;
-	_listener = rustybits::member_listener_new(_controller_id.c_str(), listen_timeout, listener_callback, this);
-	_listenThread = std::thread(&PubSubMemberListener::listenThread, this);
-	_changeHandlerThread = std::thread(&PubSubMemberListener::changeHandlerThread, this);
 }
 
 PubSubMemberListener::~PubSubMemberListener()
 {
-	_run = false;
-	if (_listenThread.joinable()) {
-		_listenThread.join();
-	}
-
-	if (_listener) {
-		rustybits::member_listener_delete(_listener);
-		_listener = nullptr;
-	}
 }
 
 void PubSubMemberListener::onNotification(const std::string& payload)
@@ -149,22 +183,27 @@ void PubSubMemberListener::onNotification(const std::string& payload)
 	auto tracer = provider->GetTracer("PubSubMemberListener");
 	auto span = tracer->StartSpan("PubSubMemberListener::onNotification");
 	auto scope = tracer->WithActiveSpan(span);
-	span->SetAttribute("payload", payload);
 
-	fprintf(stderr, "Member notification received: %s\n", payload.c_str());
+	pbmessages::MemberChange mc;
+	if (! mc.ParseFromString(payload)) {
+		fprintf(stderr, "Failed to parse MemberChange protobuf message\n");
+		span->SetAttribute("error", "Failed to parse MemberChange protobuf message");
+		span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf");
+		return;
+	}
+
+	fprintf(stderr, "Member notification received");
 
 	try {
 		nlohmann::json tmp;
-		nlohmann::json old_tmp = tmp["old"];
-		nlohmann::json new_tmp = tmp["new"];
 		nlohmann::json oldConfig, newConfig;
 
-		if (old_tmp.is_object()) {
-			// TODO: copy old configuration to oldConfig
+		if (mc.has_old()) {
+			oldConfig = toJson(mc.old());
 		}
 
-		if (new_tmp.is_object()) {
-			// TODO: copy new configuration to newConfig
+		if (mc.has_new_()) {
+			newConfig = toJson(mc.new_());
 		}
 
 		if (oldConfig.is_object() && newConfig.is_object()) {
@@ -214,20 +253,143 @@ void PubSubMemberListener::onNotification(const std::string& payload)
 	}
 }
 
-void PubSubMemberListener::listenThread()
+nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc)
 {
-	if (_listener) {
-		while (_run) {
-			rustybits::member_listener_listen(_listener);
+	nlohmann::json out;
+
+	out["id"] = nc.network_id();
+	out["name"] = nc.name();
+	out["capabilities"] = OSUtils::jsonParse(nc.capabilities());
+	out["mtu"] = nc.mtu();
+	out["multicastLimit"] = nc.multicast_limit();
+	out["private"] = nc.is_private();
+	out["remoteTraceLevel"] = nc.remote_trace_level();
+	if (nc.has_remote_trace_target()) {
+		out["remoteTraceTarget"] = nc.remote_trace_target();
+	}
+	else {
+		out["remoteTraceTarget"] = "";
+	}
+	out["rules"] = OSUtils::jsonParse(nc.rules());
+	out["rulesSource"] = nc.rules_source();
+	out["tags"] = OSUtils::jsonParse(nc.tags());
+
+	if (nc.has_ipv4_assign_mode()) {
+		nlohmann::json ipv4mode;
+		ipv4mode["zt"] = nc.ipv4_assign_mode().zt();
+		out["ipv4AssignMode"] = ipv4mode;
+	}
+	if (nc.has_ipv6_assign_mode()) {
+		nlohmann::json ipv6mode;
+		ipv6mode["6plane"] = nc.ipv6_assign_mode().six_plane();
+		ipv6mode["rfc4193"] = nc.ipv6_assign_mode().rfc4193();
+		ipv6mode["zt"] = nc.ipv6_assign_mode().zt();
+		out["ipv6AssignMode"] = ipv6mode;
+	}
+
+	if (nc.assignment_pools_size() > 0) {
+		nlohmann::json pools = nlohmann::json::array();
+		for (const auto& p : nc.assignment_pools()) {
+			nlohmann::json pool;
+			pool["ipRangeStart"] = p.start_ip();
+			pool["ipRangeEnd"] = p.end_ip();
+			pools.push_back(pool);
 		}
+		out["assignmentPools"] = pools;
 	}
+
+	if (nc.routes_size() > 0) {
+		nlohmann::json routes = nlohmann::json::array();
+		for (const auto& r : nc.routes()) {
+			nlohmann::json route;
+			route["target"] = r.target();
+			if (r.has_via()) {
+				route["via"] = r.via();
+			}
+			routes.push_back(route);
+		}
+		out["routes"] = routes;
+	}
+
+	if (nc.has_dns()) {
+		nlohmann::json dns;
+		if (nc.dns().nameservers_size() > 0) {
+			nlohmann::json servers = nlohmann::json::array();
+			for (const auto& s : nc.dns().nameservers()) {
+				servers.push_back(s);
+			}
+			dns["servers"] = servers;
+		}
+		dns["domain"] = nc.dns().domain();
+
+		out["dns"] = dns;
+	}
+
+	out["ssoEnabled"] = nc.sso_enabled();
+	nlohmann::json sso;
+	if (nc.sso_enabled()) {
+		sso = nlohmann::json::object();
+		if (nc.has_sso_client_id()) {
+			sso["ssoClientId"] = nc.sso_client_id();
+		}
+
+		if (nc.has_sso_authorization_endpoint()) {
+			sso["ssoAuthorizationEndpoint"] = nc.sso_authorization_endpoint();
+		}
+
+		if (nc.has_sso_issuer()) {
+			sso["ssoIssuer"] = nc.sso_issuer();
+		}
+
+		if (nc.has_sso_provider()) {
+			sso["ssoProvider"] = nc.sso_provider();
+		}
+	}
+	out["ssoConfig"] = sso;
+
+	return out;
 }
 
-void PubSubMemberListener::changeHandlerThread()
+nlohmann::json toJson(const pbmessages::MemberChange_Member& mc)
 {
-	if (_listener) {
-		rustybits::member_listener_change_handler(_listener);
+	nlohmann::json out;
+	out["id"] = mc.device_id();
+	out["nwid"] = mc.network_id();
+	if (mc.has_remote_trace_target()) {
+		out["remoteTraceTarget"] = mc.remote_trace_target();
 	}
+	else {
+		out["remoteTraceTarget"] = "";
+	}
+	out["authorized"] = mc.authorized();
+	out["activeBridge"] = mc.active_bridge();
+
+	auto ipAssignments = mc.ip_assignments();
+	if (ipAssignments.size() > 0) {
+		nlohmann::json assignments = nlohmann::json::array();
+		for (const auto& ip : ipAssignments) {
+			assignments.push_back(ip);
+		}
+		out["ipAssignments"] = assignments;
+	}
+
+	out["noAutoAssignIps"] = mc.no_auto_assign_ips();
+	out["ssoExempt"] = mc.sso_exepmt();
+	out["authenticationExpiryTime"] = mc.auth_expiry_time();
+	out["capabilities"] = OSUtils::jsonParse(mc.capabilities());
+	out["creationTime"] = mc.creation_time();
+	out["identity"] = mc.identity();
+	out["lastAuthorizedTime"] = mc.last_authorized_time();
+	out["lastDeauthorizedTime"] = mc.last_deauthorized_time();
+	out["remoteTraceLevel"] = mc.remote_trace_level();
+	out["revision"] = mc.revision();
+	out["tags"] = OSUtils::jsonParse(mc.tags());
+	out["versionMajor"] = mc.version_major();
+	out["versionMinor"] = mc.version_minor();
+	out["versionRev"] = mc.version_rev();
+	out["versionProtocol"] = mc.version_protocol();
+
+	return out;
 }
 
 }	// namespace ZeroTier

+ 23 - 21
controller/PubSubListener.hpp

@@ -6,6 +6,8 @@
 #include "NotificationListener.hpp"
 #include "rustybits.h"
 
+#include <google/cloud/pubsub/admin/subscription_admin_client.h>
+#include <google/cloud/pubsub/subscriber.h>
 #include <memory>
 #include <string>
 #include <thread>
@@ -15,6 +17,8 @@ class DB;
 
 struct PubSubConfig {
 	const char* controller_id;
+	std::string project;
+	std::string topic;
 	uint64_t listen_timeout;
 };
 
@@ -23,11 +27,25 @@ struct PubSubConfig {
  */
 class PubSubListener : public NotificationListener {
   public:
-	virtual ~PubSubListener()
-	{
-	}
+	PubSubListener(std::string controller_id, std::string project, std::string topic);
+	virtual ~PubSubListener();
 
 	virtual void onNotification(const std::string& payload) = 0;
+
+  protected:
+	std::string _controller_id;
+	std::string _project;
+	std::string _topic;
+	std::string _subscription_id;
+
+  private:
+	void subscribe();
+	bool _run = false;
+	google::cloud::pubsub_admin::SubscriptionAdminClient _adminClient;
+	google::cloud::pubsub::Subscription _subscription;
+	std::shared_ptr<google::cloud::pubsub::Subscriber> _subscriber;
+	google::cloud::future<google::cloud::Status> _session;
+	std::thread _subscriberThread;
 };
 
 /**
@@ -35,21 +53,13 @@ class PubSubListener : public NotificationListener {
  */
 class PubSubNetworkListener : public PubSubListener {
   public:
-	PubSubNetworkListener(std::string controller_id, uint64_t listen_timeout, DB* db);
+	PubSubNetworkListener(std::string controller_id, std::string project, DB* db);
 	virtual ~PubSubNetworkListener();
 
 	virtual void onNotification(const std::string& payload) override;
 
   private:
-	void listenThread();
-	void changeHandlerThread();
-
-	bool _run = false;
-	std::string _controller_id;
 	DB* _db;
-	const rustybits::NetworkListener* _listener;
-	std::thread _listenThread;
-	std::thread _changeHandlerThread;
 };
 
 /**
@@ -57,21 +67,13 @@ class PubSubNetworkListener : public PubSubListener {
  */
 class PubSubMemberListener : public PubSubListener {
   public:
-	PubSubMemberListener(std::string controller_id, uint64_t listen_timeout, DB* db);
+	PubSubMemberListener(std::string controller_id, std::string project, DB* db);
 	virtual ~PubSubMemberListener();
 
 	virtual void onNotification(const std::string& payload) override;
 
   private:
-	void listenThread();
-	void changeHandlerThread();
-
-	bool _run = false;
-	std::string _controller_id;
 	DB* _db;
-	const rustybits::MemberListener* _listener;
-	std::thread _listenThread;
-	std::thread _changeHandlerThread;
 };
 
 }	// namespace ZeroTier

+ 39 - 0
controller/protobuf/member.proto

@@ -0,0 +1,39 @@
+syntax = "proto3";
+
+package pbmessages;
+
+message MemberChange {
+    message Member {
+        string device_id = 1;
+        string network_id = 2;
+        string identity = 3; // Identity of the member
+        bool authorized = 4; // Whether the member is authorized
+        repeated string ip_assignments = 5; // List of IP assignments
+        bool active_bridge = 6; // Whether the member is an active bridge
+        string tags = 7; // JSON string of tags
+        string capabilities = 8; // JSON string of capabilities
+        uint64 creation_time = 9; // Unix timestamp in milliseconds
+        bool no_auto_assign_ips = 10; // Whether auto IP assignment is disabled
+        uint64 revision = 11; // Revision number
+        uint64 last_authorized_time = 12; // Last time the member was authorized
+        uint64 last_deauthorized_time = 13; // Last time the member was deauthorized
+        optional string last_authorized_credential_type = 14; // Type of credential used for last authorization
+        optional string last_authorized_credential = 15; // Credential used for last authorization
+        int32 version_major = 16; // Major version of the member
+        int32 version_minor = 17; // Minor version of the member
+        int32 version_rev = 18; // Patch version of the member
+        int32 version_protocol = 19; // Protocol version of the member
+        int32 remote_trace_level = 20; // Remote trace level
+        optional string remote_trace_target = 21; // Remote trace target
+        bool sso_exepmt = 22; // Whether SSO is exempt
+        uint64 auth_expiry_time = 23; // Authorization expiry time in milliseconds
+    }
+    message MemberChangeMetadata {
+        string trace_id = 1;
+        string controller_id = 2; 
+    }
+
+    optional Member old = 1;
+    optional Member new = 2;
+    optional MemberChangeMetadata metadata = 3;
+}

+ 21 - 0
controller/protobuf/member_status.proto

@@ -0,0 +1,21 @@
+syntax = "proto3";
+
+package pbmessages;
+
+
+
+message MemberStatus {
+    message MemberStatusMetadata {
+        string trace_id = 1;
+        string controller_id = 2; 
+    }
+
+    MemberStatusMetadata metadata = 1;
+    string network_id = 2;
+    string member_id = 3;
+    uint64 timestamp = 4; // Unix timestamp in milliseconds
+    optional string ip_address = 5; // Optional IP address of the member
+    optional string os = 6;
+    optional string arch = 7;
+    optional string version = 8;
+}

+ 66 - 0
controller/protobuf/network.proto

@@ -0,0 +1,66 @@
+syntax = "proto3";
+
+package pbmessages;
+
+message NetworkChange {
+    message NetworkChangeMetadata {
+        string trace_id = 1;
+        string controller_id = 2; 
+    }
+
+    message IPRange {
+        string start_ip = 1; // Start of the IP range
+        string end_ip = 2;   // End of the IP range
+    }
+
+    message Route {
+        string target = 1; // Target IP or network
+        optional string via = 2; // Optional next hop IP
+    }
+
+    message DNS {
+        string domain = 1; // Search domain
+        repeated string nameservers = 2; // List of nameservers
+    }
+
+    message IPV4AssignMode {
+        bool zt = 1; // Whether ZeroTier is used for IPv4 assignment
+    }
+
+    message IPv6AssignMode {
+        bool six_plane = 1; // Whether 6plane is used for IPv6 assignment
+        bool rfc4193 = 2; // Whether RFC 4193 is used for IPv6 assignment
+        bool zt = 3; // Whether ZeroTier is used for IPv6 assignment
+    }
+
+    message Network {
+        string network_id = 1;
+        string capabilities = 2; // JSON string of capabilities
+        uint64 creation_time = 3; // Unix timestamp in milliseconds
+        bool enable_broadcast = 4; // Whether broadcast is enabled
+        repeated IPRange assignment_pools = 5; // List of IP ranges for assignment
+        uint32 mtu = 6; // Maximum Transmission Unit
+        uint32 multicast_limit = 7; // Limit for multicast messages
+        optional string name = 8; // Name of the network
+        bool is_private = 9; // Whether the network is private
+        uint32 remote_trace_level = 10; // Remote trace level
+        optional string remote_trace_target = 11; // Remote trace target
+        uint64 revision = 12; // Revision number
+        repeated Route routes = 13; // List of routes
+        string rules = 14; // JSON string of rules
+        optional string tags = 15; // JSON string of tags
+        IPV4AssignMode ipv4_assign_mode = 16; // IPv4 assignment mode
+        IPv6AssignMode ipv6_assign_mode = 17; // IPv6 assignment mode
+        optional DNS dns = 18; // DNS configuration
+        bool sso_enabled = 19; // Whether Single Sign-On is enabled
+        optional string sso_client_id = 20; // SSO client ID
+        optional string sso_authorization_endpoint = 21; // SSO authorization endpoint
+        optional string sso_issuer = 22; // SSO issuer
+        optional string sso_provider = 23; // SSO provider
+        string rules_source = 24; // source code for rules
+    }
+
+    optional Network old = 1;
+    optional Network new = 2;
+    optional NetworkChangeMetadata metadata = 3;
+}