PostgreSQL.cpp 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. #ifdef ZT_CONTROLLER_USE_LIBPQ
  2. #include "PostgreSQL.hpp"
  3. #include "opentelemetry/trace/provider.h"
  4. #include <nlohmann/json.hpp>
  5. using namespace nlohmann;
  6. using namespace ZeroTier;
  7. MemberNotificationReceiver::MemberNotificationReceiver(DB* p, pqxx::connection& c, const std::string& channel) : pqxx::notification_receiver(c, channel), _psql(p)
  8. {
  9. fprintf(stderr, "initialize MemberNotificationReceiver\n");
  10. }
  11. void MemberNotificationReceiver::operator()(const std::string& payload, int packend_pid)
  12. {
  13. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  14. auto tracer = provider->GetTracer("db_member_notification");
  15. auto span = tracer->StartSpan("db_member_notification::operator()");
  16. auto scope = tracer->WithActiveSpan(span);
  17. span->SetAttribute("payload", payload);
  18. fprintf(stderr, "Member Notification received: %s\n", payload.c_str());
  19. Metrics::pgsql_mem_notification++;
  20. json tmp(json::parse(payload));
  21. json& ov = tmp["old_val"];
  22. json& nv = tmp["new_val"];
  23. json oldConfig, newConfig;
  24. if (ov.is_object())
  25. oldConfig = ov;
  26. if (nv.is_object())
  27. newConfig = nv;
  28. if (oldConfig.is_object() || newConfig.is_object()) {
  29. _psql->_memberChanged(oldConfig, newConfig, _psql->isReady());
  30. fprintf(stderr, "payload sent\n");
  31. }
  32. }
  33. NetworkNotificationReceiver::NetworkNotificationReceiver(DB* p, pqxx::connection& c, const std::string& channel) : pqxx::notification_receiver(c, channel), _psql(p)
  34. {
  35. fprintf(stderr, "initialize NetworkNotificationReceiver\n");
  36. }
  37. void NetworkNotificationReceiver::operator()(const std::string& payload, int packend_pid)
  38. {
  39. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  40. auto tracer = provider->GetTracer("db_network_notification");
  41. auto span = tracer->StartSpan("db_network_notification::operator()");
  42. auto scope = tracer->WithActiveSpan(span);
  43. span->SetAttribute("payload", payload);
  44. fprintf(stderr, "Network Notification received: %s\n", payload.c_str());
  45. Metrics::pgsql_net_notification++;
  46. json tmp(json::parse(payload));
  47. json& ov = tmp["old_val"];
  48. json& nv = tmp["new_val"];
  49. json oldConfig, newConfig;
  50. if (ov.is_object())
  51. oldConfig = ov;
  52. if (nv.is_object())
  53. newConfig = nv;
  54. if (oldConfig.is_object() || newConfig.is_object()) {
  55. _psql->_networkChanged(oldConfig, newConfig, _psql->isReady());
  56. fprintf(stderr, "payload sent\n");
  57. }
  58. }
  59. #endif