Bläddra i källkod

add a StatusWriter class hierarchy for writing member status updates

* Postgres Direct
* Redis
* BigTable
Grant Limberg 2 veckor sedan
förälder
incheckning
85f23356a3

+ 98 - 0
controller/BigTableStatusWriter.cpp

@@ -0,0 +1,98 @@
+#include "BigTableStatusWriter.hpp"
+
+#include <google/cloud/bigtable/mutations.h>
+#include <google/cloud/bigtable/row.h>
+#include <google/cloud/bigtable/table.h>
+
+namespace cbt = google::cloud::bigtable;
+
+namespace ZeroTier {
+
+const std::string nodeInfoColumnFamily = "node_info";
+const std::string checkInColumnFamily = "check_in";
+
+const std::string osColumn = "os";
+const std::string archColumn = "arch";
+const std::string versionColumn = "version";
+const std::string ipv4Column = "ipv4";
+const std::string ipv6Column = "ipv6";
+const std::string lastSeenColumn = "last_seen";
+
+BigTableStatusWriter::BigTableStatusWriter(
+	const std::string& project_id,
+	const std::string& instance_id,
+	const std::string& table_id)
+	: _project_id(project_id)
+	, _instance_id(instance_id)
+	, _table_id(table_id)
+{
+}
+
+BigTableStatusWriter::~BigTableStatusWriter()
+{
+	writePending();
+}
+
+void BigTableStatusWriter::updateNodeStatus(
+	const std::string& network_id,
+	const std::string& node_id,
+	const std::string& os,
+	const std::string& arch,
+	const std::string& version,
+	const InetAddress& address,
+	int64_t last_seen)
+{
+	std::lock_guard<std::mutex> l(_lock);
+	_pending.push_back({ network_id, node_id, os, arch, version, address, last_seen });
+	if (_pending.size() >= 100) {
+		writePending();
+	}
+}
+
+size_t BigTableStatusWriter::queueLength() const
+{
+	std::lock_guard<std::mutex> l(_lock);
+	return _pending.size();
+}
+
+void BigTableStatusWriter::writePending()
+{
+	std::vector<PendingStatusEntry> toWrite;
+	{
+		std::lock_guard<std::mutex> l(_lock);
+		toWrite.swap(_pending);
+	}
+	if (toWrite.empty()) {
+		return;
+	}
+
+	namespace cbt = google::cloud::bigtable;
+	cbt::Table table(cbt::MakeDataConnection(), cbt::TableResource(_project_id, _instance_id, _table_id));
+
+	cbt::BulkMutation bulk;
+	for (const auto& entry : toWrite) {
+		std::string row_key = entry.network_id + "#" + entry.node_id;
+		cbt::SingleRowMutation m(row_key);
+		m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, osColumn, entry.os));
+		m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, archColumn, entry.arch));
+		m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, versionColumn, entry.version));
+		char buf[64] = { 0 };
+		if (entry.address.ss_family == AF_INET) {
+			m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv4Column, entry.address.toString(buf)));
+		}
+		else if (entry.address.ss_family == AF_INET6) {
+			m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv6Column, entry.address.toString(buf)));
+		}
+		int64_t ts = entry.last_seen;
+		m.emplace_back(cbt::SetCell(checkInColumnFamily, lastSeenColumn, std::move(ts)));
+		bulk.push_back(std::move(m));
+	}
+
+	std::vector<cbt::FailedMutation> failures = table.BulkApply(bulk);
+	for (auto const& r : failures) {
+		// Handle error (log it, retry, etc.)
+		std::cerr << "Error writing to BigTable: " << r.status() << "\n";
+	}
+}
+
+}	// namespace ZeroTier

+ 37 - 0
controller/BigTableStatusWriter.hpp

@@ -0,0 +1,37 @@
+#ifndef BIGTABLESTATUSWRITER_HPP
+#define BIGTABLESTATUSWRITER_HPP
+
+#include "StatusWriter.hpp"
+
+#include <mutex>
+#include <string>
+
+namespace ZeroTier {
+class BigTableStatusWriter : public StatusWriter {
+  public:
+	BigTableStatusWriter(const std::string& project_id, const std::string& instance_id, const std::string& table_id);
+	virtual ~BigTableStatusWriter();
+
+	virtual void updateNodeStatus(
+		const std::string& network_id,
+		const std::string& node_id,
+		const std::string& os,
+		const std::string& arch,
+		const std::string& version,
+		const InetAddress& address,
+		int64_t last_seen) override;
+	virtual size_t queueLength() const override;
+	virtual void writePending() override;
+
+  private:
+	const std::string _project_id;
+	const std::string _instance_id;
+	const std::string _table_id;
+
+	mutable std::mutex _lock;
+	std::vector<PendingStatusEntry> _pending;
+};
+
+}	// namespace ZeroTier
+
+#endif

+ 13 - 1
controller/CMakeLists.txt

@@ -49,18 +49,30 @@ if (ZT1_CENTRAL_CONTROLLER)
         PubSubListener.hpp
         Redis.hpp
         RedisListener.cpp
-        RedisListener.hpp)
+        RedisListener.hpp
+        StatusWriter.cpp
+        StatusWriter.hpp
+        BigTableStatusWriter.cpp
+        BigTableStatusWriter.hpp
+        PostgresStatusWriter.cpp
+        PostgresStatusWriter.hpp
+        RedisStatusWriter.cpp
+        RedisStatusWriter.hpp
+    )
 
     list(APPEND INCLUDE_DIRS
         ${PostgreSQL_INCLUDE_DIRS}
         "${redis++_BUILD_DIR}/src"
         ${pqxx_INCLUDE_DIRS}
+
     )
 
     list(APPEND LINK_LIBS
         redis++::redis++_static
         pqxx
         ${PostgreSQL_LIBRARIES}
+        google-cloud-cpp::bigtable
+        google-cloud-cpp::pubsub
     )
 endif()
 

+ 99 - 0
controller/PostgresStatusWriter.cpp

@@ -0,0 +1,99 @@
+#include "PostgresStatusWriter.hpp"
+
+#include "../node/Metrics.hpp"
+
+#include <nlohmann/json.hpp>
+#include <pqxx/pqxx>
+
+namespace ZeroTier {
+
+PostgresStatusWriter::PostgresStatusWriter(std::shared_ptr<ConnectionPool<PostgresConnection> > pool) : _pool(pool)
+{
+}
+
+PostgresStatusWriter::~PostgresStatusWriter()
+{
+	writePending();
+}
+
+void PostgresStatusWriter::updateNodeStatus(
+	const std::string& network_id,
+	const std::string& node_id,
+	const std::string& os,
+	const std::string& arch,
+	const std::string& version,
+	const InetAddress& address,
+	int64_t last_seen)
+{
+	std::lock_guard<std::mutex> l(_lock);
+	_pending.push_back({ network_id, node_id, os, arch, version, address, last_seen });
+}
+
+size_t PostgresStatusWriter::queueLength() const
+{
+	std::lock_guard<std::mutex> l(_lock);
+	return _pending.size();
+}
+
+void PostgresStatusWriter::writePending()
+{
+	std::vector<PendingStatusEntry> toWrite;
+	{
+		std::lock_guard<std::mutex> l(_lock);
+		toWrite.swap(_pending);
+	}
+	if (toWrite.empty()) {
+		return;
+	}
+
+	try {
+		auto conn = _pool->borrow();
+		pqxx::work w(*conn->c);
+
+		pqxx::pipeline pipe(w);
+		for (const auto& entry : toWrite) {
+			char iptmp[64] = { 0 };
+			nlohmann::json record = {
+				{ entry.address.toIpString(iptmp), entry.last_seen },
+			};
+
+			try {
+				// check if the member exists first.
+				//
+				// exec_params1 will throw pqxx::unexpected_rows if not exactly one row is returned.  If that's the
+				// case, skip this record and move on.
+				w.exec_params1(
+					"SELECT device_id, network_id FROM network_memberships_ctl WHERE network_id = $1 AND device_id = "
+					"$2",
+					entry.network_id, entry.node_id);
+			}
+			catch (pqxx::unexpected_rows& e) {
+				continue;
+			}
+
+			std::string insert_statement =
+				"INSERT INTO network_memberships_ctl (device_id, network_id, last_seen, os, arch) "
+				"VALUES ('"
+				+ w.esc(entry.node_id) + "', '" + w.esc(entry.network_id) + "', '" + w.esc(record.dump())
+				+ "'::JSONB, "
+				  "'"
+				+ w.esc(entry.os) + "', '" + w.esc(entry.arch)
+				+ "') "
+				  "ON CONFLICT (device_id, network_id) DO UPDATE SET os = EXCLUDED.os, arch = EXCLUDED.arch, "
+				  "last_seen = network_memberships_ctl.last_seen || EXCLUDED.last_seen";
+
+			pipe.insert(insert_statement);
+			Metrics::pgsql_node_checkin++;
+		}
+
+		pipe.complete();
+		w.commit();
+		_pool->unborrow(conn);
+	}
+	catch (const std::exception& e) {
+		// Log the error
+		fprintf(stderr, "Error writing to Postgres: %s\n", e.what());
+	}
+}
+
+}	// namespace ZeroTier

+ 39 - 0
controller/PostgresStatusWriter.hpp

@@ -0,0 +1,39 @@
+#ifndef POSTGRES_STATUS_WRITER_HPP
+#define POSTGRES_STATUS_WRITER_HPP
+
+#include "PostgreSQL.hpp"
+#include "StatusWriter.hpp"
+
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+namespace ZeroTier {
+
+class PostgresStatusWriter : public StatusWriter {
+  public:
+	PostgresStatusWriter(std::shared_ptr<ConnectionPool<PostgresConnection> > pool);
+	virtual ~PostgresStatusWriter();
+
+	virtual void updateNodeStatus(
+		const std::string& network_id,
+		const std::string& node_id,
+		const std::string& os,
+		const std::string& arch,
+		const std::string& version,
+		const InetAddress& address,
+		int64_t last_seen) override;
+	virtual size_t queueLength() const override;
+	virtual void writePending() override;
+
+  private:
+	std::shared_ptr<ConnectionPool<PostgresConnection> > _pool;
+
+	mutable std::mutex _lock;
+	std::vector<PendingStatusEntry> _pending;
+};
+
+}	// namespace ZeroTier
+
+#endif	 // POSTGRES_STATUS_WRITER_HPP

+ 122 - 0
controller/RedisStatusWriter.cpp

@@ -0,0 +1,122 @@
+#include "RedisStatusWriter.hpp"
+
+#include "../node/Metrics.hpp"
+#include "../osdep/OSUtils.hpp"
+
+#include <nlohmann/json.hpp>
+#include <set>
+
+namespace ZeroTier {
+
+RedisStatusWriter::RedisStatusWriter(std::shared_ptr<sw::redis::Redis> redis, std::string controller_id)
+	: _redis(redis)
+	, _mode(REDIS_MODE_STANDALONE)
+{
+}
+
+RedisStatusWriter::RedisStatusWriter(std::shared_ptr<sw::redis::RedisCluster> cluster, std::string controller_id)
+	: _cluster(cluster)
+	, _mode(REDIS_MODE_CLUSTER)
+{
+}
+
+RedisStatusWriter::~RedisStatusWriter()
+{
+	writePending();
+}
+
+void RedisStatusWriter::updateNodeStatus(
+	const std::string& network_id,
+	const std::string& node_id,
+	const std::string& os,
+	const std::string& arch,
+	const std::string& version,
+	const InetAddress& address,
+	int64_t last_seen)
+{
+	std::lock_guard<std::mutex> l(_lock);
+	_pending.push_back({ network_id, node_id, os, arch, version, address, last_seen });
+}
+
+size_t RedisStatusWriter::queueLength() const
+{
+	std::lock_guard<std::mutex> l(_lock);
+	return _pending.size();
+}
+
+void RedisStatusWriter::writePending()
+{
+	try {
+		if (_mode == REDIS_MODE_STANDALONE) {
+			auto tx = _redis->transaction(true, false);
+			_doWritePending(tx);
+		}
+		else if (_mode == REDIS_MODE_CLUSTER) {
+			auto tx = _cluster->transaction(_controller_id, true, false);
+			_doWritePending(tx);
+		}
+	}
+	catch (const sw::redis::Error& e) {
+		// Log the error
+		fprintf(stderr, "Error writing to Redis: %s\n", e.what());
+	}
+}
+
+void RedisStatusWriter::_doWritePending(sw::redis::Transaction& tx)
+{
+	std::vector<PendingStatusEntry> toWrite;
+	{
+		std::lock_guard<std::mutex> l(_lock);
+		toWrite.swap(_pending);
+	}
+	if (toWrite.empty()) {
+		return;
+	}
+
+	std::set<std::string> networksUpdated;
+	uint64_t updateCount = 0;
+	for (const auto& entry : _pending) {
+		char iptmp[64] = { 0 };
+		std::string ipAddr = entry.address.toIpString(iptmp);
+		std::unordered_map<std::string, std::string> record = {
+			{ "id", entry.node_id }, { "address", ipAddr },	 { "last_updated", std::to_string(entry.last_seen) },
+			{ "os", entry.os },		 { "arch", entry.arch }, { "version", entry.version }
+		};
+
+		tx.zadd("nodes-online:{" + _controller_id + "}", entry.node_id, entry.last_seen)
+			.zadd("nodes-online2:{" + _controller_id + "}", entry.network_id + "-" + entry.node_id, entry.last_seen)
+			.zadd("network-nodes-online:{" + _controller_id + "}:" + entry.network_id, entry.node_id, entry.last_seen)
+			.zadd("active-networks:{" + _controller_id + "}", entry.network_id, entry.last_seen)
+			.sadd("network-nodes-all:{" + _controller_id + "}:" + entry.network_id, entry.node_id)
+			.hmset(
+				"member:{" + _controller_id + "}:" + entry.network_id + ":" + entry.node_id, record.begin(),
+				record.end());
+		networksUpdated.insert(entry.network_id);
+		++updateCount;
+		Metrics::redis_node_checkin++;
+	}
+
+	// expire records from all-nodes and network-nodes member list
+	uint64_t expireOld = OSUtils::now() - 300000;
+
+	tx.zremrangebyscore(
+		"nodes-online:{" + _controller_id + "}",
+		sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
+	tx.zremrangebyscore(
+		"nodes-online2:{" + _controller_id + "}",
+		sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
+	tx.zremrangebyscore(
+		"active-networks:{" + _controller_id + "}",
+		sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
+
+	for (const auto& nwid : networksUpdated) {
+		tx.zremrangebyscore(
+			"network-nodes-online:{" + _controller_id + "}:" + nwid,
+			sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
+	}
+
+	fprintf(stderr, "%s: Updated online status of %d members\n", _controller_id.c_str(), updateCount);
+	tx.exec();
+}
+
+}	// namespace ZeroTier

+ 46 - 0
controller/RedisStatusWriter.hpp

@@ -0,0 +1,46 @@
+#ifndef REDIS_STATUS_WRITER_HPP
+#define REDIS_STATUS_WRITER_HPP
+
+#include "Redis.hpp"
+#include "StatusWriter.hpp"
+
+#include <memory>
+#include <mutex>
+#include <sw/redis++/redis++.h>
+
+namespace ZeroTier {
+
+class RedisStatusWriter : public StatusWriter {
+  public:
+	RedisStatusWriter(std::shared_ptr<sw::redis::Redis> redis, std::string controller_id);
+	RedisStatusWriter(std::shared_ptr<sw::redis::RedisCluster> cluster, std::string controller_id);
+	virtual ~RedisStatusWriter();
+
+	virtual void updateNodeStatus(
+		const std::string& network_id,
+		const std::string& node_id,
+		const std::string& os,
+		const std::string& arch,
+		const std::string& version,
+		const InetAddress& address,
+		int64_t last_seen) override;
+	virtual size_t queueLength() const override;
+	virtual void writePending() override;
+
+  private:
+	void _doWritePending(sw::redis::Transaction& tx);
+
+	std::string _controller_id;
+
+	enum RedisMode { REDIS_MODE_STANDALONE, REDIS_MODE_CLUSTER };
+	std::shared_ptr<sw::redis::Redis> _redis;
+	std::shared_ptr<sw::redis::RedisCluster> _cluster;
+	RedisMode _mode = REDIS_MODE_STANDALONE;
+
+	mutable std::mutex _lock;
+	std::vector<PendingStatusEntry> _pending;
+};
+
+}	// namespace ZeroTier
+
+#endif	 // REDIS_STATUS_WRITER_HPP

+ 1 - 0
controller/StatusWriter.cpp

@@ -0,0 +1 @@
+#include "StatusWriter.hpp"

+ 43 - 0
controller/StatusWriter.hpp

@@ -0,0 +1,43 @@
+#ifndef STATUS_WRITER_HPP
+#define STATUS_WRITER_HPP
+
+#include "../node/InetAddress.hpp"
+
+#include <string>
+
+namespace ZeroTier {
+
+/**
+ * Abstract interface for writing status information somewhere.
+ *
+ * Implementations might write to a database, a file, or something else.
+ */
+class StatusWriter {
+  public:
+	virtual ~StatusWriter() = 0;
+
+	virtual void updateNodeStatus(
+		const std::string& network_id,
+		const std::string& node_id,
+		const std::string& os,
+		const std::string& arch,
+		const std::string& version,
+		const InetAddress& address,
+		int64_t last_seen) = 0;
+	virtual size_t queueLength() const = 0;
+	virtual void writePending() = 0;
+};
+
+struct PendingStatusEntry {
+	std::string network_id;
+	std::string node_id;
+	std::string os;
+	std::string arch;
+	std::string version;
+	InetAddress address;
+	int64_t last_seen;
+};
+
+}	// namespace ZeroTier
+
+#endif	 // STATUS_WRITER_HPP