PostgreSQL.cpp 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. /* (c) ZeroTier, Inc.
  2. * See LICENSE.txt in nonfree/
  3. */
  4. #ifdef ZT_CONTROLLER_USE_LIBPQ
  5. #include "PostgreSQL.hpp"
  6. #include "opentelemetry/trace/provider.h"
  7. #include <nlohmann/json.hpp>
  8. namespace ZeroTier {
  9. PostgresMemberListener::PostgresMemberListener(
  10. DB* db,
  11. std::shared_ptr<ConnectionPool<PostgresConnection> > pool,
  12. const std::string& channel,
  13. uint64_t timeout)
  14. : NotificationListener()
  15. , _db(db)
  16. , _pool(pool)
  17. , _notification_timeout(timeout)
  18. , _listenerThread()
  19. {
  20. _conn = _pool->borrow();
  21. _receiver = new _notificationReceiver<PostgresMemberListener>(this, *_conn->c, channel);
  22. _run = true;
  23. _listenerThread = std::thread(&PostgresMemberListener::listen, this);
  24. }
  25. PostgresMemberListener::~PostgresMemberListener()
  26. {
  27. _run = false;
  28. if (_listenerThread.joinable()) {
  29. _listenerThread.join();
  30. }
  31. delete _receiver;
  32. if (_conn) {
  33. _pool->unborrow(_conn);
  34. _conn.reset();
  35. }
  36. }
  37. void PostgresMemberListener::listen()
  38. {
  39. while (_run) {
  40. _conn->c->await_notification(_notification_timeout, 0);
  41. }
  42. }
  43. bool PostgresMemberListener::onNotification(const std::string& payload)
  44. {
  45. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  46. auto tracer = provider->GetTracer("PostgresMemberNotificationListener");
  47. auto span = tracer->StartSpan("PostgresMemberNotificationListener::onNotification");
  48. auto scope = tracer->WithActiveSpan(span);
  49. span->SetAttribute("payload", payload);
  50. fprintf(stderr, "Member Notification received: %s\n", payload.c_str());
  51. Metrics::pgsql_mem_notification++;
  52. nlohmann::json tmp(nlohmann::json::parse(payload));
  53. nlohmann::json& ov = tmp["old_val"];
  54. nlohmann::json& nv = tmp["new_val"];
  55. nlohmann::json oldConfig, newConfig;
  56. if (ov.is_object())
  57. oldConfig = ov;
  58. if (nv.is_object())
  59. newConfig = nv;
  60. if (oldConfig.is_object() && newConfig.is_object()) {
  61. _db->save(newConfig, true);
  62. fprintf(stderr, "payload sent\n");
  63. }
  64. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  65. // new member
  66. Metrics::member_count++;
  67. _db->save(newConfig, true);
  68. fprintf(stderr, "new member payload sent\n");
  69. }
  70. else if (! newConfig.is_object() && oldConfig.is_object()) {
  71. // member delete
  72. uint64_t networkId = OSUtils::jsonIntHex(oldConfig["nwid"], 0ULL);
  73. uint64_t memberId = OSUtils::jsonIntHex(oldConfig["id"], 0ULL);
  74. if (memberId && networkId) {
  75. _db->eraseMember(networkId, memberId);
  76. fprintf(stderr, "member delete payload sent\n");
  77. }
  78. }
  79. return true;
  80. }
  81. PostgresNetworkListener::PostgresNetworkListener(
  82. DB* db,
  83. std::shared_ptr<ConnectionPool<PostgresConnection> > pool,
  84. const std::string& channel,
  85. uint64_t timeout)
  86. : NotificationListener()
  87. , _db(db)
  88. , _pool(pool)
  89. , _notification_timeout(timeout)
  90. , _listenerThread()
  91. {
  92. _conn = _pool->borrow();
  93. _receiver = new _notificationReceiver<PostgresNetworkListener>(this, *_conn->c, channel);
  94. _run = true;
  95. _listenerThread = std::thread(&PostgresNetworkListener::listen, this);
  96. }
  97. PostgresNetworkListener::~PostgresNetworkListener()
  98. {
  99. _run = false;
  100. if (_listenerThread.joinable()) {
  101. _listenerThread.join();
  102. }
  103. delete _receiver;
  104. if (_conn) {
  105. _pool->unborrow(_conn);
  106. _conn.reset();
  107. }
  108. }
  109. void PostgresNetworkListener::listen()
  110. {
  111. while (_run) {
  112. _conn->c->await_notification(_notification_timeout, 0);
  113. }
  114. }
  115. bool PostgresNetworkListener::onNotification(const std::string& payload)
  116. {
  117. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  118. auto tracer = provider->GetTracer("db_network_notification");
  119. auto span = tracer->StartSpan("db_network_notification::operator()");
  120. auto scope = tracer->WithActiveSpan(span);
  121. span->SetAttribute("payload", payload);
  122. fprintf(stderr, "Network Notification received: %s\n", payload.c_str());
  123. Metrics::pgsql_net_notification++;
  124. nlohmann::json tmp(nlohmann::json::parse(payload));
  125. nlohmann::json& ov = tmp["old_val"];
  126. nlohmann::json& nv = tmp["new_val"];
  127. nlohmann::json oldConfig, newConfig;
  128. if (ov.is_object())
  129. oldConfig = ov;
  130. if (nv.is_object())
  131. newConfig = nv;
  132. if (oldConfig.is_object() && newConfig.is_object()) {
  133. std::string nwid = oldConfig["id"];
  134. span->SetAttribute("action", "network_change");
  135. span->SetAttribute("network_id", nwid);
  136. _db->save(newConfig, true);
  137. fprintf(stderr, "payload sent\n");
  138. }
  139. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  140. std::string nwid = newConfig["id"];
  141. span->SetAttribute("network_id", nwid);
  142. span->SetAttribute("action", "new_network");
  143. // new network
  144. _db->save(newConfig, true);
  145. fprintf(stderr, "new network payload sent\n");
  146. }
  147. else if (! newConfig.is_object() && oldConfig.is_object()) {
  148. // network delete
  149. span->SetAttribute("action", "delete_network");
  150. std::string nwid = oldConfig["id"];
  151. span->SetAttribute("network_id", nwid);
  152. uint64_t networkId = Utils::hexStrToU64(nwid.c_str());
  153. span->SetAttribute("network_id_int", networkId);
  154. if (networkId) {
  155. _db->eraseNetwork(networkId);
  156. fprintf(stderr, "network delete payload sent\n");
  157. }
  158. }
  159. return true;
  160. }
  161. } // namespace ZeroTier
  162. #endif