PostgreSQL.cpp 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. #ifdef ZT_CONTROLLER_USE_LIBPQ
  2. #include "PostgreSQL.hpp"
  3. #include "opentelemetry/trace/provider.h"
  4. #include <nlohmann/json.hpp>
  5. namespace ZeroTier {
  6. PostgresMemberListener::PostgresMemberListener(DB* db, std::shared_ptr<ConnectionPool<PostgresConnection> > pool, const std::string& channel, uint64_t timeout)
  7. : NotificationListener()
  8. , _db(db)
  9. , _pool(pool)
  10. , _notification_timeout(timeout)
  11. , _listenerThread()
  12. {
  13. _conn = _pool->borrow();
  14. _receiver = new _notificationReceiver<PostgresMemberListener>(this, *_conn->c, channel);
  15. _run = true;
  16. _listenerThread = std::thread(&PostgresMemberListener::listen, this);
  17. }
  18. PostgresMemberListener::~PostgresMemberListener()
  19. {
  20. _run = false;
  21. if (_listenerThread.joinable()) {
  22. _listenerThread.join();
  23. }
  24. delete _receiver;
  25. if (_conn) {
  26. _pool->unborrow(_conn);
  27. _conn.reset();
  28. }
  29. }
  30. void PostgresMemberListener::listen()
  31. {
  32. while (_run) {
  33. _conn->c->await_notification(_notification_timeout, 0);
  34. }
  35. }
  36. void PostgresMemberListener::onNotification(const std::string& payload)
  37. {
  38. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  39. auto tracer = provider->GetTracer("PostgresMemberNotificationListener");
  40. auto span = tracer->StartSpan("PostgresMemberNotificationListener::onNotification");
  41. auto scope = tracer->WithActiveSpan(span);
  42. span->SetAttribute("payload", payload);
  43. fprintf(stderr, "Member Notification received: %s\n", payload.c_str());
  44. Metrics::pgsql_mem_notification++;
  45. nlohmann::json tmp(nlohmann::json::parse(payload));
  46. nlohmann::json& ov = tmp["old_val"];
  47. nlohmann::json& nv = tmp["new_val"];
  48. nlohmann::json oldConfig, newConfig;
  49. if (ov.is_object())
  50. oldConfig = ov;
  51. if (nv.is_object())
  52. newConfig = nv;
  53. if (oldConfig.is_object() && newConfig.is_object()) {
  54. _db->save(newConfig, true);
  55. fprintf(stderr, "payload sent\n");
  56. }
  57. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  58. // new member
  59. Metrics::member_count++;
  60. _db->save(newConfig, true);
  61. fprintf(stderr, "new member payload sent\n");
  62. }
  63. else if (! newConfig.is_object() && oldConfig.is_object()) {
  64. // member delete
  65. uint64_t networkId = OSUtils::jsonIntHex(oldConfig["nwid"], 0ULL);
  66. uint64_t memberId = OSUtils::jsonIntHex(oldConfig["id"], 0ULL);
  67. if (memberId && networkId) {
  68. _db->eraseMember(networkId, memberId);
  69. fprintf(stderr, "member delete payload sent\n");
  70. }
  71. }
  72. }
  73. PostgresNetworkListener::PostgresNetworkListener(DB* db, std::shared_ptr<ConnectionPool<PostgresConnection> > pool, const std::string& channel, uint64_t timeout)
  74. : NotificationListener()
  75. , _db(db)
  76. , _pool(pool)
  77. , _notification_timeout(timeout)
  78. , _listenerThread()
  79. {
  80. _conn = _pool->borrow();
  81. _receiver = new _notificationReceiver<PostgresNetworkListener>(this, *_conn->c, channel);
  82. _run = true;
  83. _listenerThread = std::thread(&PostgresNetworkListener::listen, this);
  84. }
  85. PostgresNetworkListener::~PostgresNetworkListener()
  86. {
  87. _run = false;
  88. if (_listenerThread.joinable()) {
  89. _listenerThread.join();
  90. }
  91. delete _receiver;
  92. if (_conn) {
  93. _pool->unborrow(_conn);
  94. _conn.reset();
  95. }
  96. }
  97. void PostgresNetworkListener::listen()
  98. {
  99. while (_run) {
  100. _conn->c->await_notification(_notification_timeout, 0);
  101. }
  102. }
  103. void PostgresNetworkListener::onNotification(const std::string& payload)
  104. {
  105. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  106. auto tracer = provider->GetTracer("db_network_notification");
  107. auto span = tracer->StartSpan("db_network_notification::operator()");
  108. auto scope = tracer->WithActiveSpan(span);
  109. span->SetAttribute("payload", payload);
  110. fprintf(stderr, "Network Notification received: %s\n", payload.c_str());
  111. Metrics::pgsql_net_notification++;
  112. nlohmann::json tmp(nlohmann::json::parse(payload));
  113. nlohmann::json& ov = tmp["old_val"];
  114. nlohmann::json& nv = tmp["new_val"];
  115. nlohmann::json oldConfig, newConfig;
  116. if (ov.is_object())
  117. oldConfig = ov;
  118. if (nv.is_object())
  119. newConfig = nv;
  120. if (oldConfig.is_object() && newConfig.is_object()) {
  121. std::string nwid = oldConfig["id"];
  122. span->SetAttribute("action", "network_change");
  123. span->SetAttribute("network_id", nwid);
  124. _db->save(newConfig, true);
  125. fprintf(stderr, "payload sent\n");
  126. }
  127. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  128. std::string nwid = newConfig["id"];
  129. span->SetAttribute("network_id", nwid);
  130. span->SetAttribute("action", "new_network");
  131. // new network
  132. _db->save(newConfig, true);
  133. fprintf(stderr, "new network payload sent\n");
  134. }
  135. else if (! newConfig.is_object() && oldConfig.is_object()) {
  136. // network delete
  137. span->SetAttribute("action", "delete_network");
  138. std::string nwid = oldConfig["id"];
  139. span->SetAttribute("network_id", nwid);
  140. uint64_t networkId = Utils::hexStrToU64(nwid.c_str());
  141. span->SetAttribute("network_id_int", networkId);
  142. if (networkId) {
  143. _db->eraseNetwork(networkId);
  144. fprintf(stderr, "network delete payload sent\n");
  145. }
  146. }
  147. }
  148. } // namespace ZeroTier
  149. #endif