RedisListener.cpp 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. #ifdef ZT_CONTROLLER_USE_LIBPQ
  2. #include "RedisListener.hpp"
  3. #include "../node/Metrics.hpp"
  4. #include "nlohmann/json.hpp"
  5. #include "opentelemetry/trace/provider.h"
  6. #include <memory>
  7. #include <string>
  8. #include <vector>
  9. namespace ZeroTier {
  10. using Attrs = std::vector<std::pair<std::string, std::string> >;
  11. using Item = std::pair<std::string, Attrs>;
  12. using ItemStream = std::vector<Item>;
  13. RedisListener::RedisListener(std::string controller_id, std::shared_ptr<sw::redis::Redis> redis) : _controller_id(controller_id), _redis(redis), _is_cluster(false), _run(false)
  14. {
  15. }
  16. RedisListener::RedisListener(std::string controller_id, std::shared_ptr<sw::redis::RedisCluster> cluster) : _controller_id(controller_id), _cluster(cluster), _is_cluster(true), _run(false)
  17. {
  18. }
  19. RedisListener::~RedisListener()
  20. {
  21. _run = false;
  22. if (_listenThread.joinable()) {
  23. _listenThread.join();
  24. }
  25. }
  26. RedisNetworkListener::RedisNetworkListener(std::string controller_id, std::shared_ptr<sw::redis::Redis> redis, DB* db) : RedisListener(controller_id, redis), _db(db)
  27. {
  28. // Additional initialization for network listener if needed
  29. }
  30. RedisNetworkListener::RedisNetworkListener(std::string controller_id, std::shared_ptr<sw::redis::RedisCluster> cluster, DB* db) : RedisListener(controller_id, cluster), _db(db)
  31. {
  32. // Additional initialization for network listener if needed
  33. }
  34. RedisNetworkListener::~RedisNetworkListener()
  35. {
  36. // Destructor logic if needed
  37. }
  38. void RedisNetworkListener::listen()
  39. {
  40. std::string key = "network-stream:{" + _controller_id + "}";
  41. std::string lastID = "0";
  42. while (_run) {
  43. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  44. auto tracer = provider->GetTracer("RedisNetworkListener");
  45. auto span = tracer->StartSpan("RedisNetworkListener::listen");
  46. auto scope = tracer->WithActiveSpan(span);
  47. try {
  48. nlohmann::json tmp;
  49. std::unordered_map<std::string, ItemStream> result;
  50. if (_is_cluster) {
  51. _cluster->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
  52. }
  53. else {
  54. _redis->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
  55. }
  56. if (! result.empty()) {
  57. for (auto element : result) {
  58. for (auto rec : element.second) {
  59. std::string id = rec.first;
  60. auto attrs = rec.second;
  61. for (auto a : attrs) {
  62. try {
  63. tmp = nlohmann::json::parse(a.second);
  64. tmp = nlohmann::json::parse(a.second);
  65. nlohmann::json& ov = tmp["old_val"];
  66. nlohmann::json& nv = tmp["new_val"];
  67. nlohmann::json oldConfig, newConfig;
  68. if (ov.is_object())
  69. oldConfig = ov;
  70. if (nv.is_object())
  71. newConfig = nv;
  72. if (oldConfig.is_object() || newConfig.is_object()) {
  73. _db->_networkChanged(oldConfig, newConfig, true);
  74. }
  75. }
  76. catch (const nlohmann::json::parse_error& e) {
  77. fprintf(stderr, "JSON parse error: %s\n", e.what());
  78. }
  79. catch (const std::exception& e) {
  80. fprintf(stderr, "Exception in Redis network listener: %s\n", e.what());
  81. }
  82. }
  83. if (_is_cluster) {
  84. _cluster->xdel(key, id);
  85. }
  86. else {
  87. _redis->xdel(key, id);
  88. }
  89. lastID = id;
  90. }
  91. Metrics::redis_net_notification++;
  92. }
  93. }
  94. }
  95. catch (sw::redis::Error& e) {
  96. fprintf(stderr, "Error in Redis network listener: %s\n", e.what());
  97. }
  98. catch (const std::exception& e) {
  99. fprintf(stderr, "Exception in Redis network listener: %s\n", e.what());
  100. }
  101. }
  102. }
  103. void RedisNetworkListener::onNotification(const std::string& payload)
  104. {
  105. // Handle notifications if needed
  106. }
  107. RedisMemberListener::RedisMemberListener(std::string controller_id, std::shared_ptr<sw::redis::Redis> redis, DB* db) : RedisListener(controller_id, redis), _db(db)
  108. {
  109. // Additional initialization for member listener if needed
  110. }
  111. RedisMemberListener::RedisMemberListener(std::string controller_id, std::shared_ptr<sw::redis::RedisCluster> cluster, DB* db) : RedisListener(controller_id, cluster), _db(db)
  112. {
  113. // Additional initialization for member listener if needed
  114. }
  115. RedisMemberListener::~RedisMemberListener()
  116. {
  117. // Destructor logic if needed
  118. }
  119. void RedisMemberListener::listen()
  120. {
  121. std::string key = "member-stream:{" + _controller_id + "}";
  122. std::string lastID = "0";
  123. fprintf(stderr, "Listening to Redis member stream: %s\n", key.c_str());
  124. while (_run) {
  125. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  126. auto tracer = provider->GetTracer("RedisMemberListener");
  127. auto span = tracer->StartSpan("RedisMemberListener::listen");
  128. auto scope = tracer->WithActiveSpan(span);
  129. try {
  130. nlohmann::json tmp;
  131. std::unordered_map<std::string, ItemStream> result;
  132. if (_is_cluster) {
  133. _cluster->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
  134. }
  135. else {
  136. _redis->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
  137. }
  138. if (! result.empty()) {
  139. for (auto element : result) {
  140. for (auto rec : element.second) {
  141. std::string id = rec.first;
  142. auto attrs = rec.second;
  143. for (auto a : attrs) {
  144. try {
  145. tmp = nlohmann::json::parse(a.second);
  146. nlohmann::json& ov = tmp["old_val"];
  147. nlohmann::json& nv = tmp["new_val"];
  148. nlohmann::json oldConfig, newConfig;
  149. if (ov.is_object())
  150. oldConfig = ov;
  151. if (nv.is_object())
  152. newConfig = nv;
  153. if (oldConfig.is_object() || newConfig.is_object()) {
  154. _db->_memberChanged(oldConfig, newConfig, true);
  155. }
  156. }
  157. catch (const nlohmann::json::parse_error& e) {
  158. fprintf(stderr, "JSON parse error: %s\n", e.what());
  159. }
  160. catch (const std::exception& e) {
  161. fprintf(stderr, "Exception in Redis member listener: %s\n", e.what());
  162. }
  163. }
  164. if (_is_cluster) {
  165. _cluster->xdel(key, id);
  166. }
  167. else {
  168. _redis->xdel(key, id);
  169. }
  170. lastID = id;
  171. }
  172. Metrics::redis_mem_notification++;
  173. }
  174. }
  175. }
  176. catch (sw::redis::Error& e) {
  177. fprintf(stderr, "Error in Redis member listener: %s\n", e.what());
  178. }
  179. catch (const std::exception& e) {
  180. fprintf(stderr, "Exception in Redis member listener: %s\n", e.what());
  181. }
  182. }
  183. }
  184. void RedisMemberListener::onNotification(const std::string& payload)
  185. {
  186. }
  187. } // namespace ZeroTier
  188. #endif // ZT_CONTROLLER_USE_LIBPQ