PostgreSQL.cpp 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. /* (c) ZeroTier, Inc.
  2. * See LICENSE.txt in nonfree/
  3. */
  4. #ifdef ZT_CONTROLLER_USE_LIBPQ
  5. #include "PostgreSQL.hpp"
  6. #include "opentelemetry/trace/provider.h"
  7. #include <nlohmann/json.hpp>
  8. namespace ZeroTier {
  9. PostgresMemberListener::PostgresMemberListener(
  10. DB* db,
  11. std::shared_ptr<ConnectionPool<PostgresConnection> > pool,
  12. const std::string& channel,
  13. uint64_t timeout)
  14. : NotificationListener()
  15. , _db(db)
  16. , _pool(pool)
  17. , _notification_timeout(timeout)
  18. , _listenerThread()
  19. {
  20. _conn = _pool->borrow();
  21. _receiver = new _notificationReceiver<PostgresMemberListener>(this, *_conn->c, channel);
  22. _run = true;
  23. _listenerThread = std::thread(&PostgresMemberListener::listen, this);
  24. }
  25. PostgresMemberListener::~PostgresMemberListener()
  26. {
  27. _run = false;
  28. if (_listenerThread.joinable()) {
  29. _listenerThread.join();
  30. }
  31. delete _receiver;
  32. if (_conn) {
  33. _pool->unborrow(_conn);
  34. _conn.reset();
  35. }
  36. }
  37. void PostgresMemberListener::listen()
  38. {
  39. while (_run) {
  40. _conn->c->await_notification(_notification_timeout, 0);
  41. }
  42. }
  43. bool PostgresMemberListener::onNotification(const std::string& payload)
  44. {
  45. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  46. auto tracer = provider->GetTracer("PostgresMemberNotificationListener");
  47. auto span = tracer->StartSpan("PostgresMemberNotificationListener::onNotification");
  48. auto scope = tracer->WithActiveSpan(span);
  49. span->SetAttribute("payload", payload);
  50. Metrics::pgsql_mem_notification++;
  51. nlohmann::json tmp(nlohmann::json::parse(payload));
  52. nlohmann::json& ov = tmp["old_val"];
  53. nlohmann::json& nv = tmp["new_val"];
  54. nlohmann::json oldConfig, newConfig;
  55. if (ov.is_object())
  56. oldConfig = ov;
  57. if (nv.is_object())
  58. newConfig = nv;
  59. if (oldConfig.is_object() && newConfig.is_object()) {
  60. _db->save(newConfig, true);
  61. }
  62. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  63. // new member
  64. Metrics::member_count++;
  65. _db->save(newConfig, true);
  66. }
  67. else if (! newConfig.is_object() && oldConfig.is_object()) {
  68. // member delete
  69. uint64_t networkId = OSUtils::jsonIntHex(oldConfig["nwid"], 0ULL);
  70. uint64_t memberId = OSUtils::jsonIntHex(oldConfig["id"], 0ULL);
  71. if (memberId && networkId) {
  72. _db->eraseMember(networkId, memberId);
  73. }
  74. }
  75. return true;
  76. }
  77. PostgresNetworkListener::PostgresNetworkListener(
  78. DB* db,
  79. std::shared_ptr<ConnectionPool<PostgresConnection> > pool,
  80. const std::string& channel,
  81. uint64_t timeout)
  82. : NotificationListener()
  83. , _db(db)
  84. , _pool(pool)
  85. , _notification_timeout(timeout)
  86. , _listenerThread()
  87. {
  88. _conn = _pool->borrow();
  89. _receiver = new _notificationReceiver<PostgresNetworkListener>(this, *_conn->c, channel);
  90. _run = true;
  91. _listenerThread = std::thread(&PostgresNetworkListener::listen, this);
  92. }
  93. PostgresNetworkListener::~PostgresNetworkListener()
  94. {
  95. _run = false;
  96. if (_listenerThread.joinable()) {
  97. _listenerThread.join();
  98. }
  99. delete _receiver;
  100. if (_conn) {
  101. _pool->unborrow(_conn);
  102. _conn.reset();
  103. }
  104. }
  105. void PostgresNetworkListener::listen()
  106. {
  107. while (_run) {
  108. _conn->c->await_notification(_notification_timeout, 0);
  109. }
  110. }
  111. bool PostgresNetworkListener::onNotification(const std::string& payload)
  112. {
  113. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  114. auto tracer = provider->GetTracer("db_network_notification");
  115. auto span = tracer->StartSpan("db_network_notification::operator()");
  116. auto scope = tracer->WithActiveSpan(span);
  117. span->SetAttribute("payload", payload);
  118. Metrics::pgsql_net_notification++;
  119. nlohmann::json tmp(nlohmann::json::parse(payload));
  120. nlohmann::json& ov = tmp["old_val"];
  121. nlohmann::json& nv = tmp["new_val"];
  122. nlohmann::json oldConfig, newConfig;
  123. if (ov.is_object())
  124. oldConfig = ov;
  125. if (nv.is_object())
  126. newConfig = nv;
  127. if (oldConfig.is_object() && newConfig.is_object()) {
  128. std::string nwid = oldConfig["id"];
  129. span->SetAttribute("action", "network_change");
  130. span->SetAttribute("network_id", nwid);
  131. _db->save(newConfig, true);
  132. }
  133. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  134. std::string nwid = newConfig["id"];
  135. span->SetAttribute("network_id", nwid);
  136. span->SetAttribute("action", "new_network");
  137. // new network
  138. _db->save(newConfig, true);
  139. }
  140. else if (! newConfig.is_object() && oldConfig.is_object()) {
  141. // network delete
  142. span->SetAttribute("action", "delete_network");
  143. std::string nwid = oldConfig["id"];
  144. span->SetAttribute("network_id", nwid);
  145. uint64_t networkId = Utils::hexStrToU64(nwid.c_str());
  146. span->SetAttribute("network_id_int", networkId);
  147. if (networkId) {
  148. _db->eraseNetwork(networkId);
  149. }
  150. }
  151. return true;
  152. }
  153. } // namespace ZeroTier
  154. #endif