Ver código fonte

wire up pubsub outgoing status changes from controller -> CV2

Grant Limberg 5 dias atrás
pai
commit
024824c2fe

+ 22 - 6
nonfree/controller/BigTableStatusWriter.cpp

@@ -1,10 +1,12 @@
 #include "BigTableStatusWriter.hpp"
 
 #include "ControllerConfig.hpp"
+#include "PubSubWriter.hpp"
 
 #include <google/cloud/bigtable/mutations.h>
 #include <google/cloud/bigtable/row.h>
 #include <google/cloud/bigtable/table.h>
+#include <opentelemetry/trace/provider.h>
 
 namespace cbt = google::cloud::bigtable;
 
@@ -23,10 +25,12 @@ const std::string lastSeenColumn = "last_seen";
 BigTableStatusWriter::BigTableStatusWriter(
 	const std::string& project_id,
 	const std::string& instance_id,
-	const std::string& table_id)
+	const std::string& table_id,
+	std::shared_ptr<PubSubWriter> pubsubWriter)
 	: _project_id(project_id)
 	, _instance_id(instance_id)
 	, _table_id(table_id)
+	, _pubsubWriter(pubsubWriter)
 {
 }
 
@@ -42,13 +46,16 @@ void BigTableStatusWriter::updateNodeStatus(
 	const std::string& arch,
 	const std::string& version,
 	const InetAddress& address,
-	int64_t last_seen)
+	int64_t last_seen,
+	const std::string& frontend)
 {
+	auto provider = opentelemetry::trace::Provider::GetTracerProvider();
+	auto tracer = provider->GetTracer("BigTableStatusWriter");
+	auto span = tracer->StartSpan("BigTableStatusWriter::updateNodeStatus");
+	auto scope = tracer->WithActiveSpan(span);
+
 	std::lock_guard<std::mutex> l(_lock);
-	_pending.push_back({ network_id, node_id, os, arch, version, address, last_seen });
-	if (_pending.size() >= 100) {
-		writePending();
-	}
+	_pending.push_back({ network_id, node_id, os, arch, version, address, last_seen, frontend });
 }
 
 size_t BigTableStatusWriter::queueLength() const
@@ -59,6 +66,11 @@ size_t BigTableStatusWriter::queueLength() const
 
 void BigTableStatusWriter::writePending()
 {
+	auto provider = opentelemetry::trace::Provider::GetTracerProvider();
+	auto tracer = provider->GetTracer("BigTableStatusWriter");
+	auto span = tracer->StartSpan("BigTableStatusWriter::writePending");
+	auto scope = tracer->WithActiveSpan(span);
+
 	std::vector<PendingStatusEntry> toWrite;
 	{
 		std::lock_guard<std::mutex> l(_lock);
@@ -88,6 +100,10 @@ void BigTableStatusWriter::writePending()
 		int64_t ts = entry.last_seen;
 		m.emplace_back(cbt::SetCell(checkInColumnFamily, lastSeenColumn, std::move(ts)));
 		bulk.push_back(std::move(m));
+
+		// TODO: Check performance on this.  May need to bach these.
+		_pubsubWriter->publishStatusChange(
+			entry.target, entry.network_id, entry.node_id, entry.os, entry.arch, entry.version, entry.last_seen);
 	}
 
 	std::vector<cbt::FailedMutation> failures = table.BulkApply(bulk);

+ 11 - 2
nonfree/controller/BigTableStatusWriter.hpp

@@ -3,14 +3,21 @@
 
 #include "StatusWriter.hpp"
 
+#include <memory>
 #include <mutex>
 #include <string>
 
 namespace ZeroTier {
 
+class PubSubWriter;
+
 class BigTableStatusWriter : public StatusWriter {
   public:
-	BigTableStatusWriter(const std::string& project_id, const std::string& instance_id, const std::string& table_id);
+	BigTableStatusWriter(
+		const std::string& project_id,
+		const std::string& instance_id,
+		const std::string& table_id,
+		std::shared_ptr<PubSubWriter> pubsubWriter);
 	virtual ~BigTableStatusWriter();
 
 	virtual void updateNodeStatus(
@@ -20,7 +27,8 @@ class BigTableStatusWriter : public StatusWriter {
 		const std::string& arch,
 		const std::string& version,
 		const InetAddress& address,
-		int64_t last_seen) override;
+		int64_t last_seen,
+		const std::string& frontend) override;
 	virtual size_t queueLength() const override;
 	virtual void writePending() override;
 
@@ -31,6 +39,7 @@ class BigTableStatusWriter : public StatusWriter {
 
 	mutable std::mutex _lock;
 	std::vector<PendingStatusEntry> _pending;
+	std::shared_ptr<PubSubWriter> _pubsubWriter;
 };
 
 }	// namespace ZeroTier

+ 2 - 0
nonfree/controller/CMakeLists.txt

@@ -47,6 +47,8 @@ if (ZT1_CENTRAL_CONTROLLER)
         PostgreSQL.hpp
         PubSubListener.cpp
         PubSubListener.hpp
+        PubSubWriter.cpp
+        PubSubWriter.hpp
         Redis.hpp
         RedisListener.cpp
         RedisListener.hpp

+ 21 - 4
nonfree/controller/CentralDB.cpp

@@ -24,6 +24,7 @@
 #include "EmbeddedNetworkController.hpp"
 #include "PostgresStatusWriter.hpp"
 #include "PubSubListener.hpp"
+#include "PubSubWriter.hpp"
 #include "Redis.hpp"
 #include "RedisListener.hpp"
 #include "RedisStatusWriter.hpp"
@@ -178,6 +179,7 @@ CentralDB::CentralDB(
 			break;
 	}
 
+	std::shared_ptr<PubSubWriter> pubsubWriter;
 	switch (statusMode) {
 		case STATUS_WRITER_MODE_REDIS:
 			if (_cc->redisConfig != NULL) {
@@ -193,8 +195,21 @@ CentralDB::CentralDB(
 			}
 			break;
 		case STATUS_WRITER_MODE_BIGTABLE:
+			if (cc->bigTableConfig == NULL) {
+				throw std::runtime_error(
+					"CentralDB: BigTable status mode selected but no BigTable configuration provided");
+			}
+			if (cc->pubSubConfig == NULL) {
+				throw std::runtime_error(
+					"CentralDB: BigTable status mode selected but no PubSub configuration provided");
+			}
+
+			pubsubWriter = std::make_shared<PubSubWriter>(
+				cc->pubSubConfig->project_id, "ctl-member-status-update-stream", _myAddressStr);
+
 			_statusWriter = std::make_shared<BigTableStatusWriter>(
-				cc->bigTableConfig->project_id, cc->bigTableConfig->instance_id, cc->bigTableConfig->table_id);
+				cc->bigTableConfig->project_id, cc->bigTableConfig->instance_id, cc->bigTableConfig->table_id,
+				pubsubWriter);
 			break;
 		case STATUS_WRITER_MODE_PGSQL:
 		default:
@@ -1439,9 +1454,9 @@ void CentralDB::onlineNotificationThread()
 				char ipTmp[64];
 				OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
 				OSUtils::ztsnprintf(memTmp, sizeof(memTmp), "%.10llx", i->first.second);
-				nlohmann::json jtmp1, jtmp2;
+				nlohmann::json network, member;
 
-				if (! get(nwid_i, jtmp1, i->first.second, jtmp2)) {
+				if (! get(nwid_i, network, i->first.second, member)) {
 					continue;	// skip non existent networks/members
 				}
 
@@ -1469,12 +1484,14 @@ void CentralDB::onlineNotificationThread()
 				std::vector<std::string> osArchSplit = split(osArch, '/');
 				std::string os = "unknown";
 				std::string arch = "unknown";
+				std::string frontend = member["frontend"].get<std::string>();
 				if (osArchSplit.size() == 2) {
 					os = osArchSplit[0];
 					arch = osArchSplit[1];
 				}
 
-				_statusWriter->updateNodeStatus(networkId, memberId, os, arch, "", i->second.physicalAddress, ts);
+				_statusWriter->updateNodeStatus(
+					networkId, memberId, os, arch, "", i->second.physicalAddress, ts, frontend);
 			}
 			_statusWriter->writePending();
 			w.commit();

+ 80 - 1
nonfree/controller/CtlUtil.cpp

@@ -9,6 +9,23 @@
 #include <iomanip>
 #include <sstream>
 
+#ifdef ZT1_CENTRAL_CONTROLLER
+#include <google/cloud/bigtable/admin/bigtable_table_admin_client.h>
+#include <google/cloud/bigtable/admin/bigtable_table_admin_connection.h>
+#include <google/cloud/bigtable/table.h>
+#include <google/cloud/pubsub/admin/subscription_admin_client.h>
+#include <google/cloud/pubsub/admin/subscription_admin_connection.h>
+#include <google/cloud/pubsub/admin/topic_admin_client.h>
+#include <google/cloud/pubsub/message.h>
+#include <google/cloud/pubsub/subscriber.h>
+#include <google/cloud/pubsub/subscription.h>
+#include <google/cloud/pubsub/topic.h>
+
+namespace pubsub = ::google::cloud::pubsub;
+namespace pubsub_admin = ::google::cloud::pubsub_admin;
+namespace bigtable_admin = ::google::cloud::bigtable_admin;
+#endif
+
 namespace ZeroTier {
 
 const char* _timestr()
@@ -63,6 +80,68 @@ std::string url_encode(const std::string& value)
 	return escaped.str();
 }
 
-}	// namespace ZeroTier
+#ifdef ZT1_CENTRAL_CONTROLLER
+void create_gcp_pubsub_topic_if_needed(std::string project_id, std::string topic_id)
+{
+	// This is a no-op if the topic already exists.
+	auto topicAdminClient = pubsub_admin::TopicAdminClient(pubsub_admin::MakeTopicAdminConnection());
+	auto topicName = pubsub::Topic(project_id, topic_id).FullName();
+	auto topicResult = topicAdminClient.GetTopic(topicName);
+	if (! topicResult.ok()) {
+		// Only create if not found
+		if (topicResult.status().code() == google::cloud::StatusCode::kNotFound) {
+			auto createResult = topicAdminClient.CreateTopic(topicName);
+			if (! createResult.ok()) {
+				fprintf(stderr, "Failed to create topic: %s\n", createResult.status().message().c_str());
+				throw std::runtime_error("Failed to create topic");
+			}
+			fprintf(stderr, "Created topic: %s\n", topicName.c_str());
+		}
+		else {
+			fprintf(stderr, "Failed to get topic: %s\n", topicResult.status().message().c_str());
+			throw std::runtime_error("Failed to get topic");
+		}
+	}
+}
+
+// void create_bigtable_table(std::string project_id, std::string instance_id)
+// {
+// 	auto bigtableAdminClient =
+// 		bigtable_admin::BigtableTableAdminClient(bigtable_admin::MakeBigtableTableAdminConnection());
+
+// 	std::string table_id = "member_status";
+// 	std::string table_name = "projects/" + project_id + "/instances/" + instance_id + "/tables/" + table_id;
+
+// 	// Check if the table exists
+// 	auto table = bigtableAdminClient.GetTable(table_name);
+// 	if (! table.ok()) {
+// 		if (table.status().code() == google::cloud::StatusCode::kNotFound) {
+// 			google::bigtable::admin::v2::Table table_config;
+// 			table_config.set_name(table_id);
+// 			auto families = table_config.mutable_column_families();
+// 			// Define column families
+// 			// Column family "node_info" with max 1 version
+// 			// google::bigtable::admin::v2::ColumnFamily* node_info = table_config.add_column_families();
+// 			// Column family "check_in" with max 1 version
 
+// 			auto create_result = bigtableAdminClient.CreateTable(
+// 				"projects/" + project_id + "/instances/" + instance_id, table_id, table_config);
+
+// 			if (! create_result.ok()) {
+// 				fprintf(
+// 					stderr, "Failed to create Bigtable table member_status: %s\n",
+// 					create_result.status().message().c_str());
+// 				throw std::runtime_error("Failed to create Bigtable table");
+// 			}
+// 			fprintf(stderr, "Created Bigtable table: member_status\n");
+// 		}
+// 		else {
+// 			fprintf(stderr, "Failed to get Bigtable table member_status: %s\n", table.status().message().c_str());
+// 			throw std::runtime_error("Failed to get Bigtable table");
+// 		}
+// 	}
+// }
+#endif
+
+}	// namespace ZeroTier
 #endif

+ 5 - 0
nonfree/controller/CtlUtil.hpp

@@ -15,6 +15,11 @@ const char* _timestr();
 std::vector<std::string> split(std::string str, char delim);
 
 std::string url_encode(const std::string& value);
+
+#ifdef ZT1_CENTRAL_CONTROLLER
+void create_gcp_pubsub_topic_if_needed(std::string project_id, std::string topic_id);
+#endif
+
 }	// namespace ZeroTier
 
 #endif	 // namespace ZeroTier

+ 3 - 2
nonfree/controller/PostgresStatusWriter.cpp

@@ -23,10 +23,11 @@ void PostgresStatusWriter::updateNodeStatus(
 	const std::string& arch,
 	const std::string& version,
 	const InetAddress& address,
-	int64_t last_seen)
+	int64_t last_seen,
+	const std::string& /* frontend unused */)
 {
 	std::lock_guard<std::mutex> l(_lock);
-	_pending.push_back({ network_id, node_id, os, arch, version, address, last_seen });
+	_pending.push_back({ network_id, node_id, os, arch, version, address, last_seen, "" });
 }
 
 size_t PostgresStatusWriter::queueLength() const

+ 2 - 1
nonfree/controller/PostgresStatusWriter.hpp

@@ -23,7 +23,8 @@ class PostgresStatusWriter : public StatusWriter {
 		const std::string& arch,
 		const std::string& version,
 		const InetAddress& address,
-		int64_t last_seen) override;
+		int64_t last_seen,
+		const std::string& /* frontend unused */) override;
 	virtual size_t queueLength() const override;
 	virtual void writePending() override;
 

+ 41 - 29
nonfree/controller/PubSubListener.cpp

@@ -2,6 +2,7 @@
 #include "PubSubListener.hpp"
 
 #include "ControllerConfig.hpp"
+#include "CtlUtil.hpp"
 #include "DB.hpp"
 #include "member.pb.h"
 #include "network.pb.h"
@@ -22,8 +23,8 @@ namespace pubsub_admin = ::google::cloud::pubsub_admin;
 
 namespace ZeroTier {
 
-nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc);
-nlohmann::json toJson(const pbmessages::MemberChange_Member& mc);
+nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc, pbmessages::NetworkChange_ChangeSource source);
+nlohmann::json toJson(const pbmessages::MemberChange_Member& mc, pbmessages::MemberChange_ChangeSource source);
 
 PubSubListener::PubSubListener(std::string controller_id, std::string project, std::string topic)
 	: _controller_id(controller_id)
@@ -36,27 +37,10 @@ PubSubListener::PubSubListener(std::string controller_id, std::string project, s
 {
 	GOOGLE_PROTOBUF_VERIFY_VERSION;
 
-	// Create Topic if it doesn't exist
-	// this is only really needed for testing with the emulator
-	// in production the topic should be created via terraform or gcloud
-	// before starting the controller
-	auto topicAdminClient = pubsub_admin::TopicAdminClient(pubsub_admin::MakeTopicAdminConnection());
-	auto topicName = pubsub::Topic(project, topic).FullName();
-	auto topicResult = topicAdminClient.GetTopic(topicName);
-	if (! topicResult.ok()) {
-		// Only create if not found
-		if (topicResult.status().code() == google::cloud::StatusCode::kNotFound) {
-			auto createResult = topicAdminClient.CreateTopic(topicName);
-			if (! createResult.ok()) {
-				fprintf(stderr, "Failed to create topic: %s\n", createResult.status().message().c_str());
-				throw std::runtime_error("Failed to create topic");
-			}
-			fprintf(stderr, "Created topic: %s\n", topicName.c_str());
-		}
-		else {
-			fprintf(stderr, "Failed to get topic: %s\n", topicResult.status().message().c_str());
-			throw std::runtime_error("Failed to get topic");
-		}
+	// If PUBSUB_EMULATOR_HOST is set, create the topic if it doesn't exist
+	const char* emulatorHost = std::getenv("PUBSUB_EMULATOR_HOST");
+	if (emulatorHost != nullptr) {
+		create_gcp_pubsub_topic_if_needed(project, topic);
 	}
 
 	google::pubsub::v1::Subscription request;
@@ -160,11 +144,11 @@ void PubSubNetworkListener::onNotification(const std::string& payload)
 		nlohmann::json oldConfig, newConfig;
 
 		if (nc.has_old()) {
-			oldConfig = toJson(nc.old());
+			oldConfig = toJson(nc.old(), nc.change_source());
 		}
 
 		if (nc.has_new_()) {
-			newConfig = toJson(nc.new_());
+			newConfig = toJson(nc.new_(), nc.change_source());
 		}
 
 		if (oldConfig.is_object() && newConfig.is_object()) {
@@ -239,11 +223,11 @@ void PubSubMemberListener::onNotification(const std::string& payload)
 		nlohmann::json oldConfig, newConfig;
 
 		if (mc.has_old()) {
-			oldConfig = toJson(mc.old());
+			oldConfig = toJson(mc.old(), mc.change_source());
 		}
 
 		if (mc.has_new_()) {
-			newConfig = toJson(mc.new_());
+			newConfig = toJson(mc.new_(), mc.change_source());
 		}
 
 		if (oldConfig.is_object() && newConfig.is_object()) {
@@ -293,7 +277,7 @@ void PubSubMemberListener::onNotification(const std::string& payload)
 	}
 }
 
-nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc)
+nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc, pbmessages::NetworkChange_ChangeSource source)
 {
 	nlohmann::json out;
 
@@ -386,11 +370,25 @@ nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc)
 		}
 	}
 	out["ssoConfig"] = sso;
+	switch (source) {
+		case pbmessages::NetworkChange_ChangeSource_CV1:
+			out["change_source"] = "cv1";
+			break;
+		case pbmessages::NetworkChange_ChangeSource_CV2:
+			out["change_source"] = "cv2";
+			break;
+		case pbmessages::NetworkChange_ChangeSource_CONTROLLER:
+			out["change_source"] = "controller";
+			break;
+		default:
+			out["change_source"] = "unknown";
+			break;
+	}
 
 	return out;
 }
 
-nlohmann::json toJson(const pbmessages::MemberChange_Member& mc)
+nlohmann::json toJson(const pbmessages::MemberChange_Member& mc, pbmessages::MemberChange_ChangeSource source)
 {
 	nlohmann::json out;
 	out["id"] = mc.device_id();
@@ -428,6 +426,20 @@ nlohmann::json toJson(const pbmessages::MemberChange_Member& mc)
 	out["versionMinor"] = mc.version_minor();
 	out["versionRev"] = mc.version_rev();
 	out["versionProtocol"] = mc.version_protocol();
+	switch (source) {
+		case pbmessages::MemberChange_ChangeSource_CV1:
+			out["change_source"] = "cv1";
+			break;
+		case pbmessages::MemberChange_ChangeSource_CV2:
+			out["change_source"] = "cv2";
+			break;
+		case pbmessages::MemberChange_ChangeSource_CONTROLLER:
+			out["change_source"] = "controller";
+			break;
+		default:
+			out["change_source"] = "unknown";
+			break;
+	}
 
 	return out;
 }

+ 125 - 0
nonfree/controller/PubSubWriter.cpp

@@ -0,0 +1,125 @@
+#include "PubSubWriter.hpp"
+
+#include "CtlUtil.hpp"
+#include "member.pb.h"
+#include "member_status.pb.h"
+#include "network.pb.h"
+
+#include <chrono>
+#include <google/cloud/options.h>
+#include <google/cloud/pubsub/message.h>
+#include <google/cloud/pubsub/publisher.h>
+#include <google/cloud/pubsub/topic.h>
+#include <opentelemetry/trace/provider.h>
+
+namespace pubsub = ::google::cloud::pubsub;
+
+namespace ZeroTier {
+PubSubWriter::PubSubWriter(std::string controller_id, std::string project, std::string topic)
+	: _controller_id(controller_id)
+	, _project(project)
+	, _topic(topic)
+{
+	GOOGLE_PROTOBUF_VERIFY_VERSION;
+
+	// If PUBSUB_EMULATOR_HOST is set, create the topic if it doesn't exist
+	const char* emulatorHost = std::getenv("PUBSUB_EMULATOR_HOST");
+	if (emulatorHost != nullptr) {
+		create_gcp_pubsub_topic_if_needed(project, topic);
+	}
+
+	auto options =
+		::google::cloud::Options {}
+			.set<pubsub::RetryPolicyOption>(pubsub::LimitedTimeRetryPolicy(std::chrono::seconds(5)).clone())
+			.set<pubsub::BackoffPolicyOption>(
+				pubsub::ExponentialBackoffPolicy(std::chrono::milliseconds(100), std::chrono::seconds(2), 1.3).clone());
+	auto publisher = pubsub::MakePublisherConnection(pubsub::Topic(project, topic), std::move(options));
+	_publisher = std::make_shared<pubsub::Publisher>(std::move(publisher));
+}
+
+PubSubWriter::~PubSubWriter()
+{
+}
+
+bool PubSubWriter::publishMessage(const std::string& payload, const std::string& frontend)
+{
+	std::vector<std::pair<std::string, std::string> > attributes;
+	if (! frontend.empty()) {
+		attributes.emplace_back("frontend", frontend);
+	}
+	attributes.emplace_back("controller_id", _controller_id);
+
+	auto msg = pubsub::MessageBuilder {}.SetData(payload).SetAttributes(attributes).Build();
+	auto message_id = _publisher->Publish(std::move(msg)).get();
+	if (! message_id) {
+		fprintf(stderr, "Failed to publish message: %s\n", std::move(message_id).status().message().c_str());
+		return false;
+	}
+
+	fprintf(stderr, "Published message to %s\n", _topic.c_str());
+	return true;
+}
+
+bool PubSubWriter::publishNetworkChange(const nlohmann::json& networkJson, const std::string& frontend)
+{
+	pbmessages::NetworkChange nc;
+	// nc.mutable_new_()->CopyFrom(fromJson<pbmessages::NetworkChange_Network>(networkJson));
+	std::string payload;
+	if (! nc.SerializeToString(&payload)) {
+		fprintf(stderr, "Failed to serialize NetworkChange protobuf message\n");
+		return false;
+	}
+
+	return publishMessage(payload, frontend);
+}
+
+bool PubSubWriter::publishMemberChange(const nlohmann::json& memberJson, const std::string& frontend)
+{
+	pbmessages::MemberChange mc;
+	// mc.mutable_new_()->CopyFrom(fromJson<pbmessages::MemberChange_Member>(memberJson));
+	std::string payload;
+	if (! mc.SerializeToString(&payload)) {
+		fprintf(stderr, "Failed to serialize MemberChange protobuf message\n");
+		return false;
+	}
+
+	return publishMessage(payload, frontend);
+}
+
+bool PubSubWriter::publishStatusChange(
+	std::string frontend,
+	std::string network_id,
+	std::string node_id,
+	std::string os,
+	std::string arch,
+	std::string version,
+	int64_t last_seen)
+{
+	auto provider = opentelemetry::trace::Provider::GetTracerProvider();
+	auto tracer = provider->GetTracer("PubSubWriter");
+	auto span = tracer->StartSpan("PubSubWriter::publishStatusChange");
+	auto scope = tracer->WithActiveSpan(span);
+
+	pbmessages::MemberStatus_MemberStatusMetadata metadata;
+	metadata.set_controller_id(_controller_id);
+	metadata.set_trace_id("");	 // TODO: generate a trace ID
+
+	pbmessages::MemberStatus ms;
+	ms.set_network_id(network_id);
+	ms.set_member_id(node_id);
+	ms.set_os(os);
+	ms.set_arch(arch);
+	ms.set_version(version);
+	ms.set_timestamp(last_seen);
+	ms.set_allocated_metadata(&metadata);
+
+	std::string payload;
+	if (! ms.SerializeToString(&payload)) {
+		fprintf(stderr, "Failed to serialize StatusChange protobuf message\n");
+		return false;
+	}
+
+	return publishMessage(payload, frontend);
+}
+
+}	// namespace ZeroTier

+ 39 - 0
nonfree/controller/PubSubWriter.hpp

@@ -0,0 +1,39 @@
+#ifndef ZT_CONTROLLER_PUBSUBWRITER_HPP
+#define ZT_CONTROLLER_PUBSUBWRITER_HPP
+
+#include <google/cloud/pubsub/publisher.h>
+#include <memory>
+#include <nlohmann/json.hpp>
+#include <string>
+
+namespace ZeroTier {
+
+class PubSubWriter {
+  public:
+	PubSubWriter(std::string controller_id, std::string project, std::string topic);
+	virtual ~PubSubWriter();
+
+	bool publishNetworkChange(const nlohmann::json& networkJson, const std::string& frontend = "");
+	bool publishMemberChange(const nlohmann::json& memberJson, const std::string& frontend = "");
+	bool publishStatusChange(
+		std::string frontend,
+		std::string network_id,
+		std::string node_id,
+		std::string os,
+		std::string arch,
+		std::string version,
+		int64_t last_seen);
+
+  protected:
+	bool publishMessage(const std::string& payload, const std::string& frontend = "");
+
+  private:
+	std::string _controller_id;
+	std::string _project;
+	std::string _topic;
+	std::shared_ptr<google::cloud::pubsub::Publisher> _publisher;
+};
+
+}	// namespace ZeroTier
+
+#endif	 // ZT_CONTROLLER_PUBSUBWRITER_HPP

+ 3 - 2
nonfree/controller/RedisStatusWriter.cpp

@@ -32,10 +32,11 @@ void RedisStatusWriter::updateNodeStatus(
 	const std::string& arch,
 	const std::string& version,
 	const InetAddress& address,
-	int64_t last_seen)
+	int64_t last_seen,
+	const std::string& /* frontend unused */)
 {
 	std::lock_guard<std::mutex> l(_lock);
-	_pending.push_back({ network_id, node_id, os, arch, version, address, last_seen });
+	_pending.push_back({ network_id, node_id, os, arch, version, address, last_seen, "" });
 }
 
 size_t RedisStatusWriter::queueLength() const

+ 2 - 1
nonfree/controller/RedisStatusWriter.hpp

@@ -23,7 +23,8 @@ class RedisStatusWriter : public StatusWriter {
 		const std::string& arch,
 		const std::string& version,
 		const InetAddress& address,
-		int64_t last_seen) override;
+		int64_t last_seen,
+		const std::string& /* frontend unused */) override;
 	virtual size_t queueLength() const override;
 	virtual void writePending() override;
 

+ 3 - 1
nonfree/controller/StatusWriter.hpp

@@ -23,7 +23,8 @@ class StatusWriter {
 		const std::string& arch,
 		const std::string& version,
 		const InetAddress& address,
-		int64_t last_seen) = 0;
+		int64_t last_seen,
+		const std::string& target) = 0;
 	virtual size_t queueLength() const = 0;
 	virtual void writePending() = 0;
 };
@@ -36,6 +37,7 @@ struct PendingStatusEntry {
 	std::string version;
 	InetAddress address;
 	int64_t last_seen;
+	std::string target;
 };
 
 }	// namespace ZeroTier