Grant Limberg 5 месяцев назад
Родитель
Сommit
3f19712178
8 измененных файлов с 73 добавлено и 78 удалено
  1. 3 3
      controller/CV1.cpp
  2. 3 0
      controller/CV1.hpp
  3. 2 2
      controller/CV2.cpp
  4. 3 0
      controller/CV2.hpp
  5. 0 4
      controller/DB.hpp
  6. 0 60
      controller/PostgreSQL.cpp
  7. 62 8
      controller/PostgreSQL.hpp
  8. 0 1
      objects.mk

+ 3 - 3
controller/CV1.cpp

@@ -1209,7 +1209,7 @@ void CV1::_membersWatcher_Postgres()
 	std::string stream = "member_" + _myAddressStr;
 	std::string stream = "member_" + _myAddressStr;
 
 
 	fprintf(stderr, "Listening to member stream: %s\n", stream.c_str());
 	fprintf(stderr, "Listening to member stream: %s\n", stream.c_str());
-	MemberNotificationReceiver m(this, *c->c, stream);
+	MemberNotificationReceiver<CV1> m(this, *c->c, stream);
 
 
 	while (_run == 1) {
 	while (_run == 1) {
 		c->c->await_notification(5, 0);
 		c->c->await_notification(5, 0);
@@ -1316,7 +1316,7 @@ void CV1::_networksWatcher_Postgres()
 
 
 	auto c = _pool->borrow();
 	auto c = _pool->borrow();
 
 
-	NetworkNotificationReceiver n(this, *c->c, stream);
+	NetworkNotificationReceiver<CV1> n(this, *c->c, stream);
 
 
 	while (_run == 1) {
 	while (_run == 1) {
 		auto provider = opentelemetry::trace::Provider::GetTracerProvider();
 		auto provider = opentelemetry::trace::Provider::GetTracerProvider();
@@ -2022,7 +2022,7 @@ void CV1::onlineNotification_Redis()
 
 
 		fprintf(stderr, "onlineNotification ran in %llu ms\n", total);
 		fprintf(stderr, "onlineNotification ran in %llu ms\n", total);
 		span->End();
 		span->End();
-		
+
 		std::this_thread::sleep_for(std::chrono::seconds(5));
 		std::this_thread::sleep_for(std::chrono::seconds(5));
 	}
 	}
 }
 }

+ 3 - 0
controller/CV1.hpp

@@ -43,6 +43,9 @@ struct RedisConfig;
  * but be aware that we might change it at any time.
  * but be aware that we might change it at any time.
  */
  */
 class CV1 : public DB {
 class CV1 : public DB {
+	friend class MemberNotificationReceiver<CV1>;
+	friend class NetworkNotificationReceiver<CV1>;
+
   public:
   public:
 	CV1(const Identity& myId, const char* path, int listenPort, RedisConfig* rc);
 	CV1(const Identity& myId, const char* path, int listenPort, RedisConfig* rc);
 	virtual ~CV1();
 	virtual ~CV1();

+ 2 - 2
controller/CV2.cpp

@@ -790,7 +790,7 @@ void CV2::membersDbWatcher()
 	std::string stream = "member_" + _myAddressStr;
 	std::string stream = "member_" + _myAddressStr;
 
 
 	fprintf(stderr, "Listening to member stream: %s\n", stream.c_str());
 	fprintf(stderr, "Listening to member stream: %s\n", stream.c_str());
-	MemberNotificationReceiver m(this, *c->c, stream);
+	MemberNotificationReceiver<CV2> m(this, *c->c, stream);
 
 
 	while (_run == 1) {
 	while (_run == 1) {
 		c->c->await_notification(5, 0);
 		c->c->await_notification(5, 0);
@@ -809,7 +809,7 @@ void CV2::networksDbWatcher()
 
 
 	auto c = _pool->borrow();
 	auto c = _pool->borrow();
 
 
-	NetworkNotificationReceiver n(this, *c->c, stream);
+	NetworkNotificationReceiver<CV2> n(this, *c->c, stream);
 
 
 	while (_run == 1) {
 	while (_run == 1) {
 		c->c->await_notification(5, 0);
 		c->c->await_notification(5, 0);

+ 3 - 0
controller/CV2.hpp

@@ -31,6 +31,9 @@
 namespace ZeroTier {
 namespace ZeroTier {
 
 
 class CV2 : public DB {
 class CV2 : public DB {
+	friend class MemberNotificationReceiver<CV2>;
+	friend class NetworkNotificationReceiver<CV2>;
+
   public:
   public:
 	CV2(const Identity& myId, const char* path, int listenPort);
 	CV2(const Identity& myId, const char* path, int listenPort);
 	virtual ~CV2();
 	virtual ~CV2();

+ 0 - 4
controller/DB.hpp

@@ -61,10 +61,6 @@ struct AuthInfo {
  * Base class with common infrastructure for all controller DB implementations
  * Base class with common infrastructure for all controller DB implementations
  */
  */
 class DB {
 class DB {
-#ifdef ZT_CONTROLLER_USE_LIBPQ
-	friend class MemberNotificationReceiver;
-	friend class NetworkNotificationReceiver;
-#endif
   public:
   public:
 	class ChangeListener {
 	class ChangeListener {
 	  public:
 	  public:

+ 0 - 60
controller/PostgreSQL.cpp

@@ -2,70 +2,10 @@
 
 
 #include "PostgreSQL.hpp"
 #include "PostgreSQL.hpp"
 
 
-#include "opentelemetry/trace/provider.h"
-
 #include <nlohmann/json.hpp>
 #include <nlohmann/json.hpp>
 
 
 using namespace nlohmann;
 using namespace nlohmann;
 
 
 using namespace ZeroTier;
 using namespace ZeroTier;
 
 
-MemberNotificationReceiver::MemberNotificationReceiver(DB* p, pqxx::connection& c, const std::string& channel) : pqxx::notification_receiver(c, channel), _psql(p)
-{
-	fprintf(stderr, "initialize MemberNotificationReceiver\n");
-}
-
-void MemberNotificationReceiver::operator()(const std::string& payload, int packend_pid)
-{
-	auto provider = opentelemetry::trace::Provider::GetTracerProvider();
-	auto tracer = provider->GetTracer("db_member_notification");
-	auto span = tracer->StartSpan("db_member_notification::operator()");
-	auto scope = tracer->WithActiveSpan(span);
-	span->SetAttribute("payload", payload);
-
-	fprintf(stderr, "Member Notification received: %s\n", payload.c_str());
-	Metrics::pgsql_mem_notification++;
-	json tmp(json::parse(payload));
-	json& ov = tmp["old_val"];
-	json& nv = tmp["new_val"];
-	json oldConfig, newConfig;
-	if (ov.is_object())
-		oldConfig = ov;
-	if (nv.is_object())
-		newConfig = nv;
-	if (oldConfig.is_object() || newConfig.is_object()) {
-		_psql->_memberChanged(oldConfig, newConfig, _psql->isReady());
-		fprintf(stderr, "payload sent\n");
-	}
-}
-
-NetworkNotificationReceiver::NetworkNotificationReceiver(DB* p, pqxx::connection& c, const std::string& channel) : pqxx::notification_receiver(c, channel), _psql(p)
-{
-	fprintf(stderr, "initialize NetworkNotificationReceiver\n");
-}
-
-void NetworkNotificationReceiver::operator()(const std::string& payload, int packend_pid)
-{
-	auto provider = opentelemetry::trace::Provider::GetTracerProvider();
-	auto tracer = provider->GetTracer("db_network_notification");
-	auto span = tracer->StartSpan("db_network_notification::operator()");
-	auto scope = tracer->WithActiveSpan(span);
-	span->SetAttribute("payload", payload);
-
-	fprintf(stderr, "Network Notification received: %s\n", payload.c_str());
-	Metrics::pgsql_net_notification++;
-	json tmp(json::parse(payload));
-	json& ov = tmp["old_val"];
-	json& nv = tmp["new_val"];
-	json oldConfig, newConfig;
-	if (ov.is_object())
-		oldConfig = ov;
-	if (nv.is_object())
-		newConfig = nv;
-	if (oldConfig.is_object() || newConfig.is_object()) {
-		_psql->_networkChanged(oldConfig, newConfig, _psql->isReady());
-		fprintf(stderr, "payload sent\n");
-	}
-}
-
 #endif
 #endif

+ 62 - 8
controller/PostgreSQL.hpp

@@ -18,8 +18,10 @@
 
 
 #include "ConnectionPool.hpp"
 #include "ConnectionPool.hpp"
 #include "DB.hpp"
 #include "DB.hpp"
+#include "opentelemetry/trace/provider.h"
 
 
 #include <memory>
 #include <memory>
+#include <nlohmann/json.hpp>
 #include <pqxx/pqxx>
 #include <pqxx/pqxx>
 
 
 namespace ZeroTier {
 namespace ZeroTier {
@@ -56,32 +58,84 @@ class PostgresConnFactory : public ConnectionFactory {
 	std::string m_connString;
 	std::string m_connString;
 };
 };
 
 
-class MemberNotificationReceiver : public pqxx::notification_receiver {
+template <typename T> class MemberNotificationReceiver : public pqxx::notification_receiver {
   public:
   public:
-	MemberNotificationReceiver(DB* p, pqxx::connection& c, const std::string& channel);
+	MemberNotificationReceiver(T* p, pqxx::connection& c, const std::string& channel) : pqxx::notification_receiver(c, channel), _psql(p)
+	{
+		fprintf(stderr, "initialize MemberNotificationReceiver\n");
+	}
+
 	virtual ~MemberNotificationReceiver()
 	virtual ~MemberNotificationReceiver()
 	{
 	{
 		fprintf(stderr, "MemberNotificationReceiver destroyed\n");
 		fprintf(stderr, "MemberNotificationReceiver destroyed\n");
 	}
 	}
 
 
-	virtual void operator()(const std::string& payload, int backendPid);
+	virtual void operator()(const std::string& payload, int backendPid)
+	{
+		auto provider = opentelemetry::trace::Provider::GetTracerProvider();
+		auto tracer = provider->GetTracer("db_member_notification");
+		auto span = tracer->StartSpan("db_member_notification::operator()");
+		auto scope = tracer->WithActiveSpan(span);
+		span->SetAttribute("payload", payload);
+
+		fprintf(stderr, "Member Notification received: %s\n", payload.c_str());
+		Metrics::pgsql_mem_notification++;
+		nlohmann::json tmp(nlohmann::json::parse(payload));
+		nlohmann::json& ov = tmp["old_val"];
+		nlohmann::json& nv = tmp["new_val"];
+		nlohmann::json oldConfig, newConfig;
+		if (ov.is_object())
+			oldConfig = ov;
+		if (nv.is_object())
+			newConfig = nv;
+		if (oldConfig.is_object() || newConfig.is_object()) {
+			_psql->_memberChanged(oldConfig, newConfig, _psql->isReady());
+			fprintf(stderr, "payload sent\n");
+		}
+	}
 
 
   private:
   private:
-	DB* _psql;
+	T* _psql;
 };
 };
 
 
-class NetworkNotificationReceiver : public pqxx::notification_receiver {
+template <typename T> class NetworkNotificationReceiver : public pqxx::notification_receiver {
   public:
   public:
-	NetworkNotificationReceiver(DB* p, pqxx::connection& c, const std::string& channel);
+	NetworkNotificationReceiver(T* p, pqxx::connection& c, const std::string& channel) : pqxx::notification_receiver(c, channel), _psql(p)
+	{
+		fprintf(stderr, "initialize NetworkrNotificationReceiver\n");
+	}
+
 	virtual ~NetworkNotificationReceiver()
 	virtual ~NetworkNotificationReceiver()
 	{
 	{
 		fprintf(stderr, "NetworkNotificationReceiver destroyed\n");
 		fprintf(stderr, "NetworkNotificationReceiver destroyed\n");
 	};
 	};
 
 
-	virtual void operator()(const std::string& payload, int packend_pid);
+	virtual void operator()(const std::string& payload, int packend_pid)
+	{
+		auto provider = opentelemetry::trace::Provider::GetTracerProvider();
+		auto tracer = provider->GetTracer("db_network_notification");
+		auto span = tracer->StartSpan("db_network_notification::operator()");
+		auto scope = tracer->WithActiveSpan(span);
+		span->SetAttribute("payload", payload);
+
+		fprintf(stderr, "Network Notification received: %s\n", payload.c_str());
+		Metrics::pgsql_net_notification++;
+		nlohmann::json tmp(nlohmann::json::parse(payload));
+		nlohmann::json& ov = tmp["old_val"];
+		nlohmann::json& nv = tmp["new_val"];
+		nlohmann::json oldConfig, newConfig;
+		if (ov.is_object())
+			oldConfig = ov;
+		if (nv.is_object())
+			newConfig = nv;
+		if (oldConfig.is_object() || newConfig.is_object()) {
+			_psql->_networkChanged(oldConfig, newConfig, _psql->isReady());
+			fprintf(stderr, "payload sent\n");
+		}
+	}
 
 
   private:
   private:
-	DB* _psql;
+	T* _psql;
 };
 };
 
 
 struct NodeOnlineRecord {
 struct NodeOnlineRecord {

+ 0 - 1
objects.mk

@@ -40,7 +40,6 @@ ONE_OBJS=\
 	controller/FileDB.o \
 	controller/FileDB.o \
 	controller/LFDB.o \
 	controller/LFDB.o \
 	controller/CtlUtil.o \
 	controller/CtlUtil.o \
-	controller/PostgreSQL.o \
 	controller/CV1.o \
 	controller/CV1.o \
 	controller/CV2.o \
 	controller/CV2.o \
 	osdep/EthernetTap.o \
 	osdep/EthernetTap.o \