Browse Source

wire up pubsub notifications from controller to frontend

Grant Limberg 1 ngày trước cách đây
mục cha
commit
012443acfa

+ 1 - 3
nonfree/controller/BigTableStatusWriter.cpp

@@ -25,12 +25,10 @@ const std::string lastSeenColumn = "last_seen";
 BigTableStatusWriter::BigTableStatusWriter(
 	const std::string& project_id,
 	const std::string& instance_id,
-	const std::string& table_id,
-	std::shared_ptr<PubSubWriter> pubsubWriter)
+	const std::string& table_id)
 	: _project_id(project_id)
 	, _instance_id(instance_id)
 	, _table_id(table_id)
-	, _pubsubWriter(pubsubWriter)
 	, _table(nullptr)
 {
 	_table = new cbt::Table(cbt::MakeDataConnection(), cbt::TableResource(_project_id, _instance_id, _table_id));

+ 1 - 6
nonfree/controller/BigTableStatusWriter.hpp

@@ -14,11 +14,7 @@ class PubSubWriter;
 
 class BigTableStatusWriter : public StatusWriter {
   public:
-	BigTableStatusWriter(
-		const std::string& project_id,
-		const std::string& instance_id,
-		const std::string& table_id,
-		std::shared_ptr<PubSubWriter> pubsubWriter);
+	BigTableStatusWriter(const std::string& project_id, const std::string& instance_id, const std::string& table_id);
 	virtual ~BigTableStatusWriter();
 
 	virtual void updateNodeStatus(
@@ -40,7 +36,6 @@ class BigTableStatusWriter : public StatusWriter {
 
 	mutable std::mutex _lock;
 	std::vector<PendingStatusEntry> _pending;
-	std::shared_ptr<PubSubWriter> _pubsubWriter;
 	google::cloud::bigtable::Table* _table;
 };
 

+ 2 - 0
nonfree/controller/CMakeLists.txt

@@ -42,6 +42,8 @@ if (ZT1_CENTRAL_CONTROLLER)
         CentralDB.cpp
         CentralDB.hpp
         ControllerConfig.hpp
+        ControllerChangeNotifier.cpp
+        ControllerChangeNotifier.hpp
         NotificationListener.hpp
         PostgreSQL.cpp
         PostgreSQL.hpp

+ 234 - 26
nonfree/controller/CentralDB.cpp

@@ -19,6 +19,7 @@
 #include "../../node/SHA512.hpp"
 #include "../../version.h"
 #include "BigTableStatusWriter.hpp"
+#include "ControllerChangeNotifier.hpp"
 #include "ControllerConfig.hpp"
 #include "CtlUtil.hpp"
 #include "EmbeddedNetworkController.hpp"
@@ -168,6 +169,7 @@ CentralDB::CentralDB(
 					std::make_shared<PubSubMemberListener>(_myAddressStr, cc->pubSubConfig->project_id, this);
 				_networksDbWatcher =
 					std::make_shared<PubSubNetworkListener>(_myAddressStr, cc->pubSubConfig->project_id, this);
+				_changeNotifier = std::make_shared<PubSubChangeNotifier>(_myAddressStr, cc->pubSubConfig->project_id);
 			}
 			else {
 				throw std::runtime_error(
@@ -209,12 +211,8 @@ CentralDB::CentralDB(
 					"CentralDB: BigTable status mode selected but no PubSub configuration provided");
 			}
 
-			pubsubWriter = std::make_shared<PubSubWriter>(
-				cc->pubSubConfig->project_id, "ctl-member-status-update-stream", _myAddressStr);
-
 			_statusWriter = std::make_shared<BigTableStatusWriter>(
-				cc->bigTableConfig->project_id, cc->bigTableConfig->instance_id, cc->bigTableConfig->table_id,
-				pubsubWriter);
+				cc->bigTableConfig->project_id, cc->bigTableConfig->instance_id, cc->bigTableConfig->table_id);
 			break;
 		case STATUS_WRITER_MODE_PGSQL:
 		default:
@@ -223,6 +221,7 @@ CentralDB::CentralDB(
 			break;
 	}
 
+	// start background threads
 	for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) {
 		_commitThread[i] = std::thread(&CentralDB::commitThread, this);
 	}
@@ -935,7 +934,6 @@ void CentralDB::initializeMembers()
 			config["ssoExempt"] = sso_exempt.value_or(false);
 			config["authenticationExpiryTime"] = authentication_expiry_time.value_or(0);
 			config["tags"] = json::parse(tags.value_or("[]"));
-			config["ipAssignments"] = json::array();
 			config["frontend"] = std::get<17>(row);
 
 			Metrics::member_count++;
@@ -1147,8 +1145,12 @@ void CentralDB::commitThread()
 						target = config["remoteTraceTarget"];
 					}
 
-					pqxx::row nwrow = w.exec_params1("SELECT COUNT(id) FROM networks_ctl WHERE id = $1", networkId);
+					// get network and the frontend it is assigned to
+					// if network does not exist, skip member update
+					pqxx::row nwrow = w.exec_params1(
+						"SELECT COUNT(id), frontend FROM networks_ctl WHERE id = $1 GROUP BY frontend", networkId);
 					int nwcount = nwrow[0].as<int>();
+					std::string frontend = nwrow[1].as<std::string>();
 
 					if (nwcount != 1) {
 						fprintf(stderr, "network %s does not exist.  skipping member upsert\n", networkId.c_str());
@@ -1161,9 +1163,28 @@ void CentralDB::commitThread()
 						"SELECT COUNT(device_id) FROM network_memberships_ctl WHERE device_id = $1 AND network_id = $2",
 						memberId, networkId);
 					int membercount = mrow[0].as<int>();
-
 					bool isNewMember = (membercount == 0);
 
+					std::string change_source = config["change_source"];
+					if (! isNewMember && change_source != "controller" && frontend != change_source) {
+						// if it is not a new member and the change source is not the controller and doesn't match the
+						// frontend, don't apply the change.
+						continue;
+					}
+
+					if (_listenerMode == LISTENER_MODE_PUBSUB) {
+						// Publish change to pubsub stream
+
+						if (config["change_source"].is_null() || config["change_source"] == "controller") {
+							nlohmann::json oldMember;
+							nlohmann::json newMember = config;
+							if (! isNewMember) {
+								oldMember = _getNetworkMember(w, networkId, memberId);
+							}
+							_changeNotifier->notifyMemberChange(oldMember, newMember, frontend);
+						}
+					}
+
 					pqxx::result res = w.exec_params0(
 						"INSERT INTO network_memberships_ctl (device_id, network_id, authorized, active_bridge, "
 						"ip_assignments, "
@@ -1251,30 +1272,42 @@ void CentralDB::commitThread()
 
 					std::string id = config["id"];
 
+					pqxx::row nwrow = w.exec_params1(
+						"SELECT COUNT(id), frontend FROM networks_ctl WHERE id = $1 GROUP BY frontend", id);
+					int nwcount = nwrow[0].as<int>();
+					std::string frontend = nwrow[1].as<std::string>();
+					bool isNewNetwork = (nwcount == 0);
+
+					std::string change_source = config["change_source"];
+					if (! isNewNetwork && change_source != "controller" && frontend != change_source) {
+						// if it is not a new network and the change source is not the controller and doesn't match the
+						// frontend, don't apply the change.
+						continue;
+					}
+
+					if (_listenerMode == LISTENER_MODE_PUBSUB) {
+						// Publish change to pubsub stream
+						if (config["change_source"].is_null() || config["change_source"] == "controller") {
+							nlohmann::json oldNetwork;
+							nlohmann::json newNetwork = config;
+							if (! isNewNetwork) {
+								oldNetwork = _getNetwork(w, id);
+							}
+							_changeNotifier->notifyNetworkChange(oldNetwork, newNetwork, frontend);
+						}
+					}
+
 					pqxx::result res = w.exec_params0(
-						"INSERT INTO networks_ctl (id, name, configuration, controller_id, revision) "
-						"VALUES ($1, $2, $3, $4, $5) "
+						"INSERT INTO networks_ctl (id, name, configuration, controller_id, revision, frontend) "
+						"VALUES ($1, $2, $3, $4, $5, $6) "
 						"ON CONFLICT (id) DO UPDATE SET "
-						"name = EXCLUDED.name, configuration = EXCLUDED.configuration, revision = EXCLUDED.revision+1",
+						"name = EXCLUDED.name, configuration = EXCLUDED.configuration, revision = EXCLUDED.revision+1, "
+						"frontend = EXCLUDED.frontend",
 						id, OSUtils::jsonString(config["name"], ""), OSUtils::jsonDump(config, -1), _myAddressStr,
-						((uint64_t)config["revision"]));
+						((uint64_t)config["revision"]), change_source);
 
 					w.commit();
 
-					// res = w.exec_params0("DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", 0);
-
-					// auto pool = config["ipAssignmentPools"];
-					// bool err = false;
-					// for (auto i = pool.begin(); i != pool.end(); ++i) {
-					// 	std::string start = (*i)["ipRangeStart"];
-					// 	std::string end = (*i)["ipRangeEnd"];
-
-					// 	res = w.exec_params0(
-					// 		"INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) "
-					// 		"VALUES ($1, $2, $3)",
-					// 		id, start, end);
-					// }
-
 					const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL);
 					if (nwidInt) {
 						nlohmann::json nwOrig;
@@ -1516,4 +1549,179 @@ void CentralDB::onlineNotificationThread()
 	}
 }
 
+nlohmann::json CentralDB::_getNetworkMember(pqxx::work& tx, const std::string networkID, const std::string memberID)
+{
+	nlohmann::json out;
+
+	try {
+		pqxx::row row = tx.exec_params1(
+			"SELECT nm.device_id, nm.network_id, nm.authorized, nm.active_bridge, nm.ip_assignments, "
+			"nm.no_auto_assign_ips, "
+			"nm.sso_exempt, (EXTRACT(EPOCH FROM nm.authentication_expiry_time AT TIME ZONE 'UTC')*1000)::bigint, "
+			"(EXTRACT(EPOCH FROM nm.creation_time AT TIME ZONE 'UTC')*1000)::bigint, nm.identity, "
+			"(EXTRACT(EPOCH FROM nm.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
+			"(EXTRACT(EPOCH FROM nm.last_deauthorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
+			"nm.remote_trace_level, nm.remote_trace_target, nm.revision, nm.capabilities, nm.tags, "
+			"nm.frontend "
+			"FROM network_memberships_ctl nm "
+			"INNER JOIN networks_ctl n "
+			"  ON nm.network_id = n.id "
+			"WHERE nm.network_id = $1 AND nm.device_id = $2",
+			networkID, memberID);
+
+		bool authorized = row[2].as<bool>();
+		std::optional<bool> active_bridge =
+			row[3].is_null() ? std::optional<bool>() : std::optional<bool>(row[3].as<bool>());
+		std::string ip_assignments = row[4].is_null() ? "{}" : row[4].as<std::string>();
+		std::optional<bool> no_auto_assign_ips =
+			row[5].is_null() ? std::optional<bool>() : std::optional<bool>(row[5].as<bool>());
+		std::optional<bool> sso_exempt =
+			row[6].is_null() ? std::optional<bool>() : std::optional<bool>(row[6].as<bool>());
+		std::optional<uint64_t> authentication_expiry_time =
+			row[7].is_null() ? std::optional<uint64_t>() : std::optional<uint64_t>(row[7].as<uint64_t>());
+		std::optional<uint64_t> creation_time =
+			row[8].is_null() ? std::optional<uint64_t>() : std::optional<uint64_t>(row[8].as<uint64_t>());
+		std::optional<std::string> identity =
+			row[9].is_null() ? std::optional<std::string>() : std::optional<std::string>(row[9].as<std::string>());
+		std::optional<uint64_t> last_authorized_time =
+			row[10].is_null() ? std::optional<uint64_t>() : std::optional<uint64_t>(row[10].as<uint64_t>());
+		std::optional<uint64_t> last_deauthorized_time =
+			row[11].is_null() ? std::optional<uint64_t>() : std::optional<uint64_t>(row[11].as<uint64_t>());
+		std::optional<int32_t> remote_trace_level =
+			row[12].is_null() ? std::optional<int32_t>() : std::optional<int32_t>(row[12].as<int32_t>());
+		std::optional<std::string> remote_trace_target =
+			row[13].is_null() ? std::optional<std::string>() : std::optional<std::string>(row[13].as<std::string>());
+		std::optional<uint64_t> revision =
+			row[14].is_null() ? std::optional<uint64_t>() : std::optional<uint64_t>(row[14].as<uint64_t>());
+		std::optional<std::string> capabilities =
+			row[15].is_null() ? std::optional<std::string>() : std::optional<std::string>(row[15].as<std::string>());
+		std::optional<std::string> tags =
+			row[16].is_null() ? std::optional<std::string>() : std::optional<std::string>(row[16].as<std::string>());
+		std::string frontend = row[17].is_null() ? "" : row[17].as<std::string>();
+
+		out["objtype"] = "member";
+		out["id"] = memberID;
+		out["nwid"] = networkID;
+		out["address"] = identity.value_or("");
+		out["authorized"] = authorized;
+		out["activeBridge"] = active_bridge.value_or(false);
+		out["ipAssignments"] = json::array();
+		if (ip_assignments != "{}" && ip_assignments != "[]") {
+			std::string tmp = ip_assignments.substr(1, ip_assignments.length() - 2);
+			std::vector<std::string> addrs = split(tmp, ',');
+			for (auto it = addrs.begin(); it != addrs.end(); ++it) {
+				out["ipAssignments"].push_back(*it);
+			}
+		}
+		out["capabilities"] = json::parse(capabilities.value_or("[]"));
+		out["creationTime"] = creation_time.value_or(0);
+		out["lastAuthorizedTime"] = last_authorized_time.value_or(0);
+		out["lastDeauthorizedTime"] = last_deauthorized_time.value_or(0);
+		out["noAutoAssignIps"] = no_auto_assign_ips.value_or(false);
+		out["remoteTraceLevel"] = remote_trace_level.value_or(0);
+		out["remoteTraceTarget"] = remote_trace_target.value_or(nullptr);
+		out["revision"] = revision.value_or(0);
+		out["ssoExempt"] = sso_exempt.value_or(false);
+		out["authenticationExpiryTime"] = authentication_expiry_time.value_or(0);
+		out["tags"] = json::parse(tags.value_or("[]"));
+		out["frontend"] = frontend;
+	}
+	catch (std::exception& e) {
+		fprintf(
+			stderr, "ERROR: Error getting network member %s-%s: %s\n", networkID.c_str(), memberID.c_str(), e.what());
+		return nlohmann::json();
+	}
+
+	return out;
+}
+
+nlohmann::json CentralDB::_getNetwork(pqxx::work& tx, const std::string networkID)
+{
+	nlohmann::json out;
+
+	try {
+		std::optional<std::string> name;
+		std::string cfg;
+		std::optional<uint64_t> creation_time;
+		std::optional<uint64_t> last_modified;
+		std::optional<uint64_t> revision;
+		std::string frontend;
+
+		pqxx::row row = tx.exec_params1(
+			"SELECT id, name, configuration , (EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000)::bigint, "
+			"(EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000)::bigint, revision, frontend "
+			"FROM networks_ctl WHERE id = $1",
+			networkID);
+
+		cfg = row[2].as<std::string>();
+		creation_time = row[3].is_null() ? std::optional<uint64_t>() : std::optional<uint64_t>(row[3].as<uint64_t>());
+		last_modified = row[4].is_null() ? std::optional<uint64_t>() : std::optional<uint64_t>(row[4].as<uint64_t>());
+		revision = row[5].is_null() ? std::optional<uint64_t>() : std::optional<uint64_t>(row[5].as<uint64_t>());
+		frontend = row[6].is_null() ? "" : row[6].as<std::string>();
+
+		nlohmann::json cfgtmp = nlohmann::json::parse(cfg);
+		if (! cfgtmp.is_object()) {
+			fprintf(stderr, "ERROR: Network %s configuration is not a JSON object\n", networkID.c_str());
+			return nlohmann::json();
+		}
+
+		out["objtype"] = "network";
+		out["id"] = row[0].as<std::string>();
+		out["name"] = row[1].is_null() ? "" : row[1].as<std::string>();
+		out["creationTime"] = creation_time.value_or(0);
+		out["lastModified"] = last_modified.value_or(0);
+		out["revision"] = revision.value_or(0);
+		out["capabilities"] = cfgtmp["capabilities"].is_array() ? cfgtmp["capabilities"] : json::array();
+		out["enableBroadcast"] = cfgtmp["enableBroadcast"].is_boolean() ? cfgtmp["enableBroadcast"].get<bool>() : false;
+		out["mtu"] = cfgtmp["mtu"].is_number() ? cfgtmp["mtu"].get<int32_t>() : 2800;
+		out["multicastLimit"] = cfgtmp["multicastLimit"].is_number() ? cfgtmp["multicastLimit"].get<int32_t>() : 64;
+		out["private"] = cfgtmp["private"].is_boolean() ? cfgtmp["private"].get<bool>() : true;
+		out["remoteTraceLevel"] =
+			cfgtmp["remoteTraceLevel"].is_number() ? cfgtmp["remoteTraceLevel"].get<int32_t>() : 0;
+		out["remoteTraceTarget"] =
+			cfgtmp["remoteTraceTarget"].is_string() ? cfgtmp["remoteTraceTarget"].get<std::string>() : "";
+		out["revision"] = revision.value_or(0);
+		out["rules"] = cfgtmp["rules"].is_array() ? cfgtmp["rules"] : json::array();
+		out["tags"] = cfgtmp["tags"].is_array() ? cfgtmp["tags"] : json::array();
+		if (cfgtmp["v4AssignMode"].is_object()) {
+			out["v4AssignMode"] = cfgtmp["v4AssignMode"];
+		}
+		else {
+			out["v4AssignMode"] = json::object();
+			out["v4AssignMode"]["zt"] = true;
+		}
+		if (cfgtmp["v6AssignMode"].is_object()) {
+			out["v6AssignMode"] = cfgtmp["v6AssignMode"];
+		}
+		else {
+			out["v6AssignMode"] = json::object();
+			out["v6AssignMode"]["zt"] = true;
+			out["v6AssignMode"]["6plane"] = true;
+			out["v6AssignMode"]["rfc4193"] = false;
+		}
+		out["ssoEnabled"] = cfgtmp["ssoEnabled"].is_boolean() ? cfgtmp["ssoEnabled"].get<bool>() : false;
+		out["objtype"] = "network";
+		out["routes"] = cfgtmp["routes"].is_array() ? cfgtmp["routes"] : json::array();
+		out["clientId"] = cfgtmp["clientId"].is_string() ? cfgtmp["clientId"].get<std::string>() : "";
+		out["authorizationEndpoint"] =
+			cfgtmp["authorizationEndpoint"].is_string() ? cfgtmp["authorizationEndpoint"].get<std::string>() : nullptr;
+		out["provider"] = cfgtmp["ssoProvider"].is_string() ? cfgtmp["ssoProvider"].get<std::string>() : "";
+		if (! cfgtmp["dns"].is_object()) {
+			cfgtmp["dns"] = json::object();
+			cfgtmp["dns"]["domain"] = "";
+			cfgtmp["dns"]["servers"] = json::array();
+		}
+		else {
+			out["dns"] = cfgtmp["dns"];
+		}
+		out["ipAssignmentPools"] = cfgtmp["ipAssignmentPools"].is_array() ? cfgtmp["ipAssignmentPools"] : json::array();
+		out["frontend"] = row[6].as<std::string>();
+	}
+	catch (std::exception& e) {
+		fprintf(stderr, "ERROR: Error getting network %s: %s\n", networkID.c_str(), e.what());
+		return nlohmann::json();
+	}
+	return out;
+}
+
 #endif	 // ZT_CONTROLLER_USE_LIBPQ

+ 7 - 0
nonfree/controller/CentralDB.hpp

@@ -23,6 +23,7 @@ struct SmeeClient;
 namespace ZeroTier {
 struct RedisConfig;
 struct ControllerConfig;
+struct ControllerChangeNotifier;
 
 class CentralDB : public DB {
   public:
@@ -94,6 +95,11 @@ class CentralDB : public DB {
 	void configureSmee();
 	void notifyNewMember(const std::string& networkID, const std::string& memberID);
 
+	nlohmann::json _getNetworkMember(pqxx::work& tx, const std::string networkID, const std::string memberID);
+
+	nlohmann::json _getNetwork(pqxx::work& tx, const std::string networkID);
+
+  private:
 	enum OverrideMode { ALLOW_PGBOUNCER_OVERRIDE = 0, NO_OVERRIDE = 1 };
 
 	ListenerMode _listenerMode;
@@ -112,6 +118,7 @@ class CentralDB : public DB {
 	std::shared_ptr<NotificationListener> _membersDbWatcher;
 	std::shared_ptr<NotificationListener> _networksDbWatcher;
 	std::shared_ptr<StatusWriter> _statusWriter;
+	std::shared_ptr<ControllerChangeNotifier> _changeNotifier;
 	std::thread _commitThread[ZT_CENTRAL_CONTROLLER_COMMIT_THREADS];
 	std::thread _onlineNotificationThread;
 

+ 52 - 0
nonfree/controller/ControllerChangeNotifier.cpp

@@ -0,0 +1,52 @@
+#include "ControllerChangeNotifier.hpp"
+
+#include "PubSubWriter.hpp"
+
+namespace ZeroTier {
+
+PubSubChangeNotifier::PubSubChangeNotifier(std::string controllerID, std::string project)
+	: ControllerChangeNotifier()
+	, _cv1networkChangeWriter(std::make_shared<PubSubWriter>(project, "ctl-to-cv1-network-change-stream", controllerID))
+	, _cv1memberChangeWriter(std::make_shared<PubSubWriter>(project, "ctl-to-cv1-member-change-stream", controllerID))
+	, _cv2networkChangeWriter(std::make_shared<PubSubWriter>(project, "ctl-to-cv2-network-change-stream", controllerID))
+	, _cv2memberChangeWriter(std::make_shared<PubSubWriter>(project, "ctl-to-cv2-member-change-stream", controllerID))
+{
+}
+
+PubSubChangeNotifier::~PubSubChangeNotifier()
+{
+}
+
+void PubSubChangeNotifier::notifyNetworkChange(
+	const nlohmann::json& oldNetwork,
+	const nlohmann::json& newNetwork,
+	const std::string& frontend)
+{
+	if (frontend == "cv1") {
+		_cv1networkChangeWriter->publishNetworkChange(oldNetwork, newNetwork);
+	}
+	else if (frontend == "cv2") {
+		_cv2networkChangeWriter->publishNetworkChange(oldNetwork, newNetwork);
+	}
+	else {
+		throw std::runtime_error("Unknown frontend: " + frontend);
+	}
+}
+
+void PubSubChangeNotifier::notifyMemberChange(
+	const nlohmann::json& oldMember,
+	const nlohmann::json newMember,
+	const std::string& frontend)
+{
+	if (frontend == "cv1") {
+		_cv1memberChangeWriter->publishMemberChange(oldMember, newMember);
+	}
+	else if (frontend == "cv2") {
+		_cv2memberChangeWriter->publishMemberChange(oldMember, newMember);
+	}
+	else {
+		throw std::runtime_error("Unknown frontend: " + frontend);
+	}
+}
+
+}	// namespace ZeroTier

+ 52 - 0
nonfree/controller/ControllerChangeNotifier.hpp

@@ -0,0 +1,52 @@
+#ifndef CONTROLLERCHANGENOTIFIER_HPP
+#define CONTROLLERCHANGENOTIFIER_HPP
+
+#include <memory>
+#include <nlohmann/json.hpp>
+#include <string>
+
+namespace ZeroTier {
+
+class PubSubWriter;
+
+class ControllerChangeNotifier {
+  public:
+	virtual ~ControllerChangeNotifier() = default;
+
+	virtual void notifyNetworkChange(
+		const nlohmann::json& oldNetwork,
+		const nlohmann::json& newNetwork,
+		const std::string& frontend = "") = 0;
+
+	virtual void notifyMemberChange(
+		const nlohmann::json& oldMember,
+		const nlohmann::json newMember,
+		const std::string& frontend = "") = 0;
+};
+
+class PubSubChangeNotifier : public ControllerChangeNotifier {
+  public:
+	PubSubChangeNotifier(std::string controllerID, std::string project);
+	virtual ~PubSubChangeNotifier();
+
+	virtual void notifyNetworkChange(
+		const nlohmann::json& oldNetwork,
+		const nlohmann::json& newNetwork,
+		const std::string& frontend = "") override;
+
+	virtual void notifyMemberChange(
+		const nlohmann::json& oldMember,
+		const nlohmann::json newMember,
+		const std::string& frontend = "") override;
+
+  private:
+	std::shared_ptr<PubSubWriter> _cv1networkChangeWriter;
+	std::shared_ptr<PubSubWriter> _cv1memberChangeWriter;
+
+	std::shared_ptr<PubSubWriter> _cv2networkChangeWriter;
+	std::shared_ptr<PubSubWriter> _cv2memberChangeWriter;
+};
+
+}	// namespace ZeroTier
+
+#endif	 // CONTROLLERCHANGENOTIFIER_HPP

+ 1 - 1
nonfree/controller/PubSubListener.cpp

@@ -420,7 +420,7 @@ nlohmann::json toJson(const pbmessages::MemberChange_Member& mc, pbmessages::Mem
 	}
 
 	out["noAutoAssignIps"] = mc.no_auto_assign_ips();
-	out["ssoExempt"] = mc.sso_exepmt();
+	out["ssoExempt"] = mc.sso_exempt();
 	out["authenticationExpiryTime"] = mc.auth_expiry_time();
 	out["capabilities"] = OSUtils::jsonParse(mc.capabilities());
 	out["creationTime"] = mc.creation_time();

+ 166 - 13
nonfree/controller/PubSubWriter.cpp

@@ -1,5 +1,6 @@
 #include "PubSubWriter.hpp"
 
+#include "../../osdep/OSUtils.hpp"
 #include "CtlUtil.hpp"
 #include "member.pb.h"
 #include "member_status.pb.h"
@@ -15,6 +16,12 @@
 namespace pubsub = ::google::cloud::pubsub;
 
 namespace ZeroTier {
+
+pbmessages::NetworkChange
+networkChangeFromJson(std::string controllerID, const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork);
+pbmessages::MemberChange
+memberChangeFromJson(std::string controllerID, const nlohmann::json& oldMember, const nlohmann::json& newMember);
+
 PubSubWriter::PubSubWriter(std::string project, std::string topic, std::string controller_id)
 	: _controller_id(controller_id)
 	, _project(project)
@@ -44,12 +51,9 @@ PubSubWriter::~PubSubWriter()
 {
 }
 
-bool PubSubWriter::publishMessage(const std::string& payload, const std::string& frontend)
+bool PubSubWriter::publishMessage(const std::string& payload)
 {
 	std::vector<std::pair<std::string, std::string> > attributes;
-	if (! frontend.empty()) {
-		attributes.emplace_back("frontend", frontend);
-	}
 	attributes.emplace_back("controller_id", _controller_id);
 
 	auto msg = pubsub::MessageBuilder {}.SetData(payload).SetAttributes(attributes).Build();
@@ -63,30 +67,28 @@ bool PubSubWriter::publishMessage(const std::string& payload, const std::string&
 	return true;
 }
 
-bool PubSubWriter::publishNetworkChange(const nlohmann::json& networkJson, const std::string& frontend)
+bool PubSubWriter::publishNetworkChange(const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork)
 {
-	pbmessages::NetworkChange nc;
-	// nc.mutable_new_()->CopyFrom(fromJson<pbmessages::NetworkChange_Network>(networkJson));
+	pbmessages::NetworkChange nc = networkChangeFromJson(_controller_id, oldNetwork, newNetwork);
 	std::string payload;
 	if (! nc.SerializeToString(&payload)) {
 		fprintf(stderr, "Failed to serialize NetworkChange protobuf message\n");
 		return false;
 	}
 
-	return publishMessage(payload, frontend);
+	return publishMessage(payload);
 }
 
-bool PubSubWriter::publishMemberChange(const nlohmann::json& memberJson, const std::string& frontend)
+bool PubSubWriter::publishMemberChange(const nlohmann::json& oldMember, const nlohmann::json& newMember)
 {
-	pbmessages::MemberChange mc;
-	// mc.mutable_new_()->CopyFrom(fromJson<pbmessages::MemberChange_Member>(memberJson));
+	pbmessages::MemberChange mc = memberChangeFromJson(_controller_id, oldMember, newMember);
 	std::string payload;
 	if (! mc.SerializeToString(&payload)) {
 		fprintf(stderr, "Failed to serialize MemberChange protobuf message\n");
 		return false;
 	}
 
-	return publishMessage(payload, frontend);
+	return publishMessage(payload);
 }
 
 bool PubSubWriter::publishStatusChange(
@@ -122,7 +124,158 @@ bool PubSubWriter::publishStatusChange(
 		return false;
 	}
 
-	return publishMessage(payload, frontend);
+	return publishMessage(payload);
+}
+
+pbmessages::NetworkChange_Network* networkFromJson(const nlohmann::json& j)
+{
+	if (! j.is_object()) {
+		return nullptr;
+	}
+
+	pbmessages::NetworkChange_Network* n = new pbmessages::NetworkChange_Network();
+	n->set_network_id(j.value("id", ""));
+	n->set_name(j.value("name", ""));
+	n->set_capabilities(OSUtils::jsonDump(j.value("capabilities", "[]"), -1));
+	n->set_creation_time(j.value("creationTime", 0));
+	n->set_enable_broadcast(j.value("enableBroadcast", false));
+
+	for (const auto& p : j["ipAssignmentPools"]) {
+		if (p.is_object()) {
+			auto pool = n->add_assignment_pools();
+			pool->set_start_ip(p.value("ipRangeStart", ""));
+			pool->set_end_ip(p.value("ipRangeEnd", ""));
+		}
+	}
+
+	n->set_mtu(j.value("mtu", 2800));
+	n->set_multicast_limit(j.value("multicastLimit", 32));
+	n->set_is_private(j.value("private", true));
+	n->set_remote_trace_level(j.value("remoteTraceLevel", 0));
+	n->set_remote_trace_target(j.value("remoteTraceTarget", ""));
+	n->set_revision(j.value("revision", 0));
+
+	for (const auto& p : j["routes"]) {
+		if (p.is_object()) {
+			auto r = n->add_routes();
+			r->set_target(p.value("target", ""));
+			r->set_via(p.value("via", ""));
+		}
+	}
+
+	n->set_rules("");
+	n->set_tags(OSUtils::jsonDump(j.value("tags", "[]"), -1));
+
+	pbmessages::NetworkChange_IPV4AssignMode* v4am = new pbmessages::NetworkChange_IPV4AssignMode();
+	if (j["v4AssignMode"].is_object()) {
+		v4am->set_zt(j["v4AssignMode"].value("zt", false));
+	}
+	n->set_allocated_ipv4_assign_mode(v4am);
+
+	pbmessages::NetworkChange_IPV6AssignMode* v6am = new pbmessages::NetworkChange_IPV6AssignMode();
+	if (j["v6AssignMode"].is_object()) {
+		v6am->set_zt(j["v6AssignMode"].value("zt", false));
+		v6am->set_six_plane(j["v6AssignMode"].value("6plane", false));
+		v6am->set_rfc4193(j["v6AssignMode"].value("rfc4193", false));
+	}
+	n->set_allocated_ipv6_assign_mode(v6am);
+
+	nlohmann::json jdns = j.value("dns", nullptr);
+	if (jdns.is_object()) {
+		pbmessages::NetworkChange_DNS* dns = new pbmessages::NetworkChange_DNS();
+		dns->set_domain(jdns.value("domain", ""));
+		for (const auto& s : jdns["servers"]) {
+			if (s.is_string()) {
+				auto server = dns->add_nameservers();
+				*server = s;
+			}
+		}
+		n->set_allocated_dns(dns);
+	}
+
+	n->set_sso_enabled(j.value("ssoEnabled", false));
+	if (j.value("ssoEnabled", false)) {
+		n->set_sso_provider(j.value("provider", ""));
+		n->set_sso_client_id(j.value("clientId", ""));
+		n->set_sso_authorization_endpoint(j.value("authorizationEndpoint", ""));
+		n->set_sso_issuer(j.value("issuer", ""));
+		n->set_sso_provider(j.value("provider", ""));
+	}
+
+	n->set_rules_source("");
+
+	return n;
+}
+
+pbmessages::NetworkChange
+networkChangeFromJson(std::string controllerID, const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork)
+{
+	pbmessages::NetworkChange nc;
+	nc.set_allocated_old(networkFromJson(oldNetwork));
+	nc.set_allocated_new_(networkFromJson(newNetwork));
+	nc.set_change_source(pbmessages::NetworkChange_ChangeSource::NetworkChange_ChangeSource_CONTROLLER);
+
+	pbmessages::NetworkChange_NetworkChangeMetadata* metadata = new pbmessages::NetworkChange_NetworkChangeMetadata();
+	metadata->set_controller_id(controllerID);
+	metadata->set_trace_id("");	  // TODO: generate a trace ID
+	nc.set_allocated_metadata(metadata);
+
+	return nc;
+}
+
+pbmessages::MemberChange_Member* memberFromJson(const nlohmann::json& j)
+{
+	if (! j.is_object()) {
+		return nullptr;
+	}
+
+	pbmessages::MemberChange_Member* m = new pbmessages::MemberChange_Member();
+	m->set_network_id(j.value("networkId", ""));
+	m->set_device_id(j.value("id", ""));
+	m->set_identity(j.value("identity", ""));
+	m->set_authorized(j.value("authorized", false));
+	for (const auto& addr : j.value("ipAssignments", nlohmann::json::array())) {
+		if (addr.is_string()) {
+			auto a = m->add_ip_assignments();
+			*a = addr;
+		}
+	}
+	m->set_active_bridge(j.value("activeBridge", false));
+	m->set_tags(OSUtils::jsonDump(j.value("tags", "[]"), -1));
+	m->set_capabilities(OSUtils::jsonDump(j.value("capabilities", "[]"), -1));
+	m->set_creation_time(j.value("creationTime", 0));
+	m->set_no_auto_assign_ips(j.value("noAutoAssignIps", false));
+	m->set_revision(j.value("revision", 0));
+	m->set_last_authorized_time(j.value("lastAuthorizedTime", 0));
+	m->set_last_deauthorized_time(j.value("lastDeauthorizedTime", 0));
+	m->set_last_authorized_credential_type(j.value("lastAuthorizedCredentialType", nullptr));
+	m->set_last_authorized_credential(j.value("lastAuthorizedCredential", nullptr));
+	m->set_version_major(j.value("versionMajor", 0));
+	m->set_version_minor(j.value("versionMinor", 0));
+	m->set_version_rev(j.value("versionRev", 0));
+	m->set_version_protocol(j.value("versionProtocol", 0));
+	m->set_remote_trace_level(j.value("remoteTraceLevel", 0));
+	m->set_remote_trace_target(j.value("remoteTraceTarget", ""));
+	m->set_sso_exempt(j.value("ssoExempt", false));
+	m->set_auth_expiry_time(j.value("authExpiryTime", 0));
+
+	return m;
+}
+
+pbmessages::MemberChange
+memberChangeFromJson(std::string controllerID, const nlohmann::json& oldMember, const nlohmann::json& newMember)
+{
+	pbmessages::MemberChange mc;
+	mc.set_allocated_old(memberFromJson(oldMember));
+	mc.set_allocated_new_(memberFromJson(newMember));
+	mc.set_change_source(pbmessages::MemberChange_ChangeSource::MemberChange_ChangeSource_CONTROLLER);
+
+	pbmessages::MemberChange_MemberChangeMetadata* metadata = new pbmessages::MemberChange_MemberChangeMetadata();
+	metadata->set_controller_id(controllerID);
+	metadata->set_trace_id("");	  // TODO: generate a trace ID
+	mc.set_allocated_metadata(metadata);
+
+	return mc;
 }
 
 }	// namespace ZeroTier

+ 5 - 3
nonfree/controller/PubSubWriter.hpp

@@ -13,8 +13,10 @@ class PubSubWriter {
 	PubSubWriter(std::string project, std::string topic, std::string controller_id);
 	virtual ~PubSubWriter();
 
-	bool publishNetworkChange(const nlohmann::json& networkJson, const std::string& frontend = "");
-	bool publishMemberChange(const nlohmann::json& memberJson, const std::string& frontend = "");
+	bool publishNetworkChange(const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork);
+
+	bool publishMemberChange(const nlohmann::json& oldMember, const nlohmann::json& newMember);
+
 	bool publishStatusChange(
 		std::string frontend,
 		std::string network_id,
@@ -25,7 +27,7 @@ class PubSubWriter {
 		int64_t last_seen);
 
   protected:
-	bool publishMessage(const std::string& payload, const std::string& frontend = "");
+	bool publishMessage(const std::string& payload);
 
   private:
 	std::string _controller_id;

+ 1 - 1
nonfree/controller/protobuf/member.proto

@@ -25,7 +25,7 @@ message MemberChange {
         int32 version_protocol = 19; // Protocol version of the member
         int32 remote_trace_level = 20; // Remote trace level
         optional string remote_trace_target = 21; // Remote trace target
-        bool sso_exepmt = 22; // Whether SSO is exempt
+        bool sso_exempt = 22; // Whether SSO is exempt
         uint64 auth_expiry_time = 23; // Authorization expiry time in milliseconds
     }
     message MemberChangeMetadata {

+ 2 - 2
nonfree/controller/protobuf/network.proto

@@ -27,7 +27,7 @@ message NetworkChange {
         bool zt = 1; // Whether ZeroTier is used for IPv4 assignment
     }
 
-    message IPv6AssignMode {
+    message IPV6AssignMode {
         bool six_plane = 1; // Whether 6plane is used for IPv6 assignment
         bool rfc4193 = 2; // Whether RFC 4193 is used for IPv6 assignment
         bool zt = 3; // Whether ZeroTier is used for IPv6 assignment
@@ -50,7 +50,7 @@ message NetworkChange {
         string rules = 14; // JSON string of rules
         optional string tags = 15; // JSON string of tags
         IPV4AssignMode ipv4_assign_mode = 16; // IPv4 assignment mode
-        IPv6AssignMode ipv6_assign_mode = 17; // IPv6 assignment mode
+        IPV6AssignMode ipv6_assign_mode = 17; // IPv6 assignment mode
         optional DNS dns = 18; // DNS configuration
         bool sso_enabled = 19; // Whether Single Sign-On is enabled
         optional string sso_client_id = 20; // SSO client ID