RedisListener.cpp 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. #ifdef ZT_CONTROLLER_USE_LIBPQ
  2. #include "RedisListener.hpp"
  3. #include "../../node/Metrics.hpp"
  4. #include "nlohmann/json.hpp"
  5. #include "opentelemetry/trace/provider.h"
  6. #include <memory>
  7. #include <string>
  8. #include <vector>
  9. namespace ZeroTier {
  10. using Attrs = std::vector<std::pair<std::string, std::string> >;
  11. using Item = std::pair<std::string, Attrs>;
  12. using ItemStream = std::vector<Item>;
  13. RedisListener::RedisListener(std::string controller_id, std::shared_ptr<sw::redis::Redis> redis)
  14. : _controller_id(controller_id)
  15. , _redis(redis)
  16. , _is_cluster(false)
  17. , _run(false)
  18. {
  19. }
  20. RedisListener::RedisListener(std::string controller_id, std::shared_ptr<sw::redis::RedisCluster> cluster)
  21. : _controller_id(controller_id)
  22. , _cluster(cluster)
  23. , _is_cluster(true)
  24. , _run(false)
  25. {
  26. }
  27. RedisListener::~RedisListener()
  28. {
  29. _run = false;
  30. if (_listenThread.joinable()) {
  31. _listenThread.join();
  32. }
  33. }
  34. RedisNetworkListener::RedisNetworkListener(std::string controller_id, std::shared_ptr<sw::redis::Redis> redis, DB* db)
  35. : RedisListener(controller_id, redis)
  36. , _db(db)
  37. {
  38. // Additional initialization for network listener if needed
  39. }
  40. RedisNetworkListener::RedisNetworkListener(
  41. std::string controller_id,
  42. std::shared_ptr<sw::redis::RedisCluster> cluster,
  43. DB* db)
  44. : RedisListener(controller_id, cluster)
  45. , _db(db)
  46. {
  47. // Additional initialization for network listener if needed
  48. }
  49. RedisNetworkListener::~RedisNetworkListener()
  50. {
  51. // Destructor logic if needed
  52. }
  53. void RedisNetworkListener::listen()
  54. {
  55. std::string key = "network-stream:{" + _controller_id + "}";
  56. std::string lastID = "0";
  57. while (_run) {
  58. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  59. auto tracer = provider->GetTracer("RedisNetworkListener");
  60. auto span = tracer->StartSpan("RedisNetworkListener::listen");
  61. auto scope = tracer->WithActiveSpan(span);
  62. try {
  63. nlohmann::json tmp;
  64. std::unordered_map<std::string, ItemStream> result;
  65. if (_is_cluster) {
  66. _cluster->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
  67. }
  68. else {
  69. _redis->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
  70. }
  71. if (! result.empty()) {
  72. for (auto element : result) {
  73. for (auto rec : element.second) {
  74. std::string id = rec.first;
  75. auto attrs = rec.second;
  76. for (auto a : attrs) {
  77. try {
  78. tmp = nlohmann::json::parse(a.second);
  79. tmp = nlohmann::json::parse(a.second);
  80. nlohmann::json& ov = tmp["old_val"];
  81. nlohmann::json& nv = tmp["new_val"];
  82. nlohmann::json oldConfig, newConfig;
  83. if (ov.is_object())
  84. oldConfig = ov;
  85. if (nv.is_object())
  86. newConfig = nv;
  87. if (oldConfig.is_object() || newConfig.is_object()) {
  88. _db->_networkChanged(oldConfig, newConfig, true);
  89. }
  90. }
  91. catch (const nlohmann::json::parse_error& e) {
  92. fprintf(stderr, "JSON parse error: %s\n", e.what());
  93. }
  94. catch (const std::exception& e) {
  95. fprintf(stderr, "Exception in Redis network listener: %s\n", e.what());
  96. }
  97. }
  98. if (_is_cluster) {
  99. _cluster->xdel(key, id);
  100. }
  101. else {
  102. _redis->xdel(key, id);
  103. }
  104. lastID = id;
  105. }
  106. Metrics::redis_net_notification++;
  107. }
  108. }
  109. }
  110. catch (sw::redis::Error& e) {
  111. fprintf(stderr, "Error in Redis network listener: %s\n", e.what());
  112. }
  113. catch (const std::exception& e) {
  114. fprintf(stderr, "Exception in Redis network listener: %s\n", e.what());
  115. }
  116. }
  117. }
  118. bool RedisNetworkListener::onNotification(const std::string& payload)
  119. {
  120. // Handle notifications if needed
  121. return true;
  122. }
  123. RedisMemberListener::RedisMemberListener(std::string controller_id, std::shared_ptr<sw::redis::Redis> redis, DB* db)
  124. : RedisListener(controller_id, redis)
  125. , _db(db)
  126. {
  127. // Additional initialization for member listener if needed
  128. }
  129. RedisMemberListener::RedisMemberListener(
  130. std::string controller_id,
  131. std::shared_ptr<sw::redis::RedisCluster> cluster,
  132. DB* db)
  133. : RedisListener(controller_id, cluster)
  134. , _db(db)
  135. {
  136. // Additional initialization for member listener if needed
  137. }
  138. RedisMemberListener::~RedisMemberListener()
  139. {
  140. // Destructor logic if needed
  141. }
  142. void RedisMemberListener::listen()
  143. {
  144. std::string key = "member-stream:{" + _controller_id + "}";
  145. std::string lastID = "0";
  146. fprintf(stderr, "Listening to Redis member stream: %s\n", key.c_str());
  147. while (_run) {
  148. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  149. auto tracer = provider->GetTracer("RedisMemberListener");
  150. auto span = tracer->StartSpan("RedisMemberListener::listen");
  151. auto scope = tracer->WithActiveSpan(span);
  152. try {
  153. nlohmann::json tmp;
  154. std::unordered_map<std::string, ItemStream> result;
  155. if (_is_cluster) {
  156. _cluster->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
  157. }
  158. else {
  159. _redis->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
  160. }
  161. if (! result.empty()) {
  162. for (auto element : result) {
  163. for (auto rec : element.second) {
  164. std::string id = rec.first;
  165. auto attrs = rec.second;
  166. for (auto a : attrs) {
  167. try {
  168. tmp = nlohmann::json::parse(a.second);
  169. nlohmann::json& ov = tmp["old_val"];
  170. nlohmann::json& nv = tmp["new_val"];
  171. nlohmann::json oldConfig, newConfig;
  172. if (ov.is_object())
  173. oldConfig = ov;
  174. if (nv.is_object())
  175. newConfig = nv;
  176. if (oldConfig.is_object() || newConfig.is_object()) {
  177. _db->_memberChanged(oldConfig, newConfig, true);
  178. }
  179. }
  180. catch (const nlohmann::json::parse_error& e) {
  181. fprintf(stderr, "JSON parse error: %s\n", e.what());
  182. }
  183. catch (const std::exception& e) {
  184. fprintf(stderr, "Exception in Redis member listener: %s\n", e.what());
  185. }
  186. }
  187. if (_is_cluster) {
  188. _cluster->xdel(key, id);
  189. }
  190. else {
  191. _redis->xdel(key, id);
  192. }
  193. lastID = id;
  194. }
  195. Metrics::redis_mem_notification++;
  196. }
  197. }
  198. }
  199. catch (sw::redis::Error& e) {
  200. fprintf(stderr, "Error in Redis member listener: %s\n", e.what());
  201. }
  202. catch (const std::exception& e) {
  203. fprintf(stderr, "Exception in Redis member listener: %s\n", e.what());
  204. }
  205. }
  206. }
  207. bool RedisMemberListener::onNotification(const std::string& payload)
  208. {
  209. return true;
  210. }
  211. } // namespace ZeroTier
  212. #endif // ZT_CONTROLLER_USE_LIBPQ