PostgreSQL.cpp 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. /* (c) ZeroTier, Inc.
  2. * See LICENSE.txt in nonfree/
  3. */
  4. #ifdef ZT_CONTROLLER_USE_LIBPQ
  5. #include "PostgreSQL.hpp"
  6. #include "opentelemetry/trace/provider.h"
  7. #include <nlohmann/json.hpp>
  8. namespace ZeroTier {
  9. PostgresMemberListener::PostgresMemberListener(DB* db, std::shared_ptr<ConnectionPool<PostgresConnection> > pool, const std::string& channel, uint64_t timeout)
  10. : NotificationListener()
  11. , _db(db)
  12. , _pool(pool)
  13. , _notification_timeout(timeout)
  14. , _listenerThread()
  15. {
  16. _conn = _pool->borrow();
  17. _receiver = new _notificationReceiver<PostgresMemberListener>(this, *_conn->c, channel);
  18. _run = true;
  19. _listenerThread = std::thread(&PostgresMemberListener::listen, this);
  20. }
  21. PostgresMemberListener::~PostgresMemberListener()
  22. {
  23. _run = false;
  24. if (_listenerThread.joinable()) {
  25. _listenerThread.join();
  26. }
  27. delete _receiver;
  28. if (_conn) {
  29. _pool->unborrow(_conn);
  30. _conn.reset();
  31. }
  32. }
  33. void PostgresMemberListener::listen()
  34. {
  35. while (_run) {
  36. _conn->c->await_notification(_notification_timeout, 0);
  37. }
  38. }
  39. void PostgresMemberListener::onNotification(const std::string& payload)
  40. {
  41. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  42. auto tracer = provider->GetTracer("PostgresMemberNotificationListener");
  43. auto span = tracer->StartSpan("PostgresMemberNotificationListener::onNotification");
  44. auto scope = tracer->WithActiveSpan(span);
  45. span->SetAttribute("payload", payload);
  46. fprintf(stderr, "Member Notification received: %s\n", payload.c_str());
  47. Metrics::pgsql_mem_notification++;
  48. nlohmann::json tmp(nlohmann::json::parse(payload));
  49. nlohmann::json& ov = tmp["old_val"];
  50. nlohmann::json& nv = tmp["new_val"];
  51. nlohmann::json oldConfig, newConfig;
  52. if (ov.is_object())
  53. oldConfig = ov;
  54. if (nv.is_object())
  55. newConfig = nv;
  56. if (oldConfig.is_object() && newConfig.is_object()) {
  57. _db->save(newConfig, true);
  58. fprintf(stderr, "payload sent\n");
  59. }
  60. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  61. // new member
  62. Metrics::member_count++;
  63. _db->save(newConfig, true);
  64. fprintf(stderr, "new member payload sent\n");
  65. }
  66. else if (! newConfig.is_object() && oldConfig.is_object()) {
  67. // member delete
  68. uint64_t networkId = OSUtils::jsonIntHex(oldConfig["nwid"], 0ULL);
  69. uint64_t memberId = OSUtils::jsonIntHex(oldConfig["id"], 0ULL);
  70. if (memberId && networkId) {
  71. _db->eraseMember(networkId, memberId);
  72. fprintf(stderr, "member delete payload sent\n");
  73. }
  74. }
  75. }
  76. PostgresNetworkListener::PostgresNetworkListener(DB* db, std::shared_ptr<ConnectionPool<PostgresConnection> > pool, const std::string& channel, uint64_t timeout)
  77. : NotificationListener()
  78. , _db(db)
  79. , _pool(pool)
  80. , _notification_timeout(timeout)
  81. , _listenerThread()
  82. {
  83. _conn = _pool->borrow();
  84. _receiver = new _notificationReceiver<PostgresNetworkListener>(this, *_conn->c, channel);
  85. _run = true;
  86. _listenerThread = std::thread(&PostgresNetworkListener::listen, this);
  87. }
  88. PostgresNetworkListener::~PostgresNetworkListener()
  89. {
  90. _run = false;
  91. if (_listenerThread.joinable()) {
  92. _listenerThread.join();
  93. }
  94. delete _receiver;
  95. if (_conn) {
  96. _pool->unborrow(_conn);
  97. _conn.reset();
  98. }
  99. }
  100. void PostgresNetworkListener::listen()
  101. {
  102. while (_run) {
  103. _conn->c->await_notification(_notification_timeout, 0);
  104. }
  105. }
  106. void PostgresNetworkListener::onNotification(const std::string& payload)
  107. {
  108. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  109. auto tracer = provider->GetTracer("db_network_notification");
  110. auto span = tracer->StartSpan("db_network_notification::operator()");
  111. auto scope = tracer->WithActiveSpan(span);
  112. span->SetAttribute("payload", payload);
  113. fprintf(stderr, "Network Notification received: %s\n", payload.c_str());
  114. Metrics::pgsql_net_notification++;
  115. nlohmann::json tmp(nlohmann::json::parse(payload));
  116. nlohmann::json& ov = tmp["old_val"];
  117. nlohmann::json& nv = tmp["new_val"];
  118. nlohmann::json oldConfig, newConfig;
  119. if (ov.is_object())
  120. oldConfig = ov;
  121. if (nv.is_object())
  122. newConfig = nv;
  123. if (oldConfig.is_object() && newConfig.is_object()) {
  124. std::string nwid = oldConfig["id"];
  125. span->SetAttribute("action", "network_change");
  126. span->SetAttribute("network_id", nwid);
  127. _db->save(newConfig, true);
  128. fprintf(stderr, "payload sent\n");
  129. }
  130. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  131. std::string nwid = newConfig["id"];
  132. span->SetAttribute("network_id", nwid);
  133. span->SetAttribute("action", "new_network");
  134. // new network
  135. _db->save(newConfig, true);
  136. fprintf(stderr, "new network payload sent\n");
  137. }
  138. else if (! newConfig.is_object() && oldConfig.is_object()) {
  139. // network delete
  140. span->SetAttribute("action", "delete_network");
  141. std::string nwid = oldConfig["id"];
  142. span->SetAttribute("network_id", nwid);
  143. uint64_t networkId = Utils::hexStrToU64(nwid.c_str());
  144. span->SetAttribute("network_id_int", networkId);
  145. if (networkId) {
  146. _db->eraseNetwork(networkId);
  147. fprintf(stderr, "network delete payload sent\n");
  148. }
  149. }
  150. }
  151. } // namespace ZeroTier
  152. #endif