RedisStatusWriter.cpp 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. #include "RedisStatusWriter.hpp"
  2. #include "../node/Metrics.hpp"
  3. #include "../osdep/OSUtils.hpp"
  4. #include <nlohmann/json.hpp>
  5. #include <set>
  6. namespace ZeroTier {
  7. RedisStatusWriter::RedisStatusWriter(std::shared_ptr<sw::redis::Redis> redis, std::string controller_id)
  8. : _redis(redis)
  9. , _mode(REDIS_MODE_STANDALONE)
  10. {
  11. }
  12. RedisStatusWriter::RedisStatusWriter(std::shared_ptr<sw::redis::RedisCluster> cluster, std::string controller_id)
  13. : _cluster(cluster)
  14. , _mode(REDIS_MODE_CLUSTER)
  15. {
  16. }
  17. RedisStatusWriter::~RedisStatusWriter()
  18. {
  19. writePending();
  20. }
  21. void RedisStatusWriter::updateNodeStatus(
  22. const std::string& network_id,
  23. const std::string& node_id,
  24. const std::string& os,
  25. const std::string& arch,
  26. const std::string& version,
  27. const InetAddress& address,
  28. int64_t last_seen)
  29. {
  30. std::lock_guard<std::mutex> l(_lock);
  31. _pending.push_back({ network_id, node_id, os, arch, version, address, last_seen });
  32. }
  33. size_t RedisStatusWriter::queueLength() const
  34. {
  35. std::lock_guard<std::mutex> l(_lock);
  36. return _pending.size();
  37. }
  38. void RedisStatusWriter::writePending()
  39. {
  40. try {
  41. if (_mode == REDIS_MODE_STANDALONE) {
  42. auto tx = _redis->transaction(true, false);
  43. _doWritePending(tx);
  44. }
  45. else if (_mode == REDIS_MODE_CLUSTER) {
  46. auto tx = _cluster->transaction(_controller_id, true, false);
  47. _doWritePending(tx);
  48. }
  49. }
  50. catch (const sw::redis::Error& e) {
  51. // Log the error
  52. fprintf(stderr, "Error writing to Redis: %s\n", e.what());
  53. }
  54. }
  55. void RedisStatusWriter::_doWritePending(sw::redis::Transaction& tx)
  56. {
  57. std::vector<PendingStatusEntry> toWrite;
  58. {
  59. std::lock_guard<std::mutex> l(_lock);
  60. toWrite.swap(_pending);
  61. }
  62. if (toWrite.empty()) {
  63. return;
  64. }
  65. std::set<std::string> networksUpdated;
  66. uint64_t updateCount = 0;
  67. for (const auto& entry : _pending) {
  68. char iptmp[64] = { 0 };
  69. std::string ipAddr = entry.address.toIpString(iptmp);
  70. std::unordered_map<std::string, std::string> record = {
  71. { "id", entry.node_id }, { "address", ipAddr }, { "last_updated", std::to_string(entry.last_seen) },
  72. { "os", entry.os }, { "arch", entry.arch }, { "version", entry.version }
  73. };
  74. tx.zadd("nodes-online:{" + _controller_id + "}", entry.node_id, entry.last_seen)
  75. .zadd("nodes-online2:{" + _controller_id + "}", entry.network_id + "-" + entry.node_id, entry.last_seen)
  76. .zadd("network-nodes-online:{" + _controller_id + "}:" + entry.network_id, entry.node_id, entry.last_seen)
  77. .zadd("active-networks:{" + _controller_id + "}", entry.network_id, entry.last_seen)
  78. .sadd("network-nodes-all:{" + _controller_id + "}:" + entry.network_id, entry.node_id)
  79. .hmset(
  80. "member:{" + _controller_id + "}:" + entry.network_id + ":" + entry.node_id, record.begin(),
  81. record.end());
  82. networksUpdated.insert(entry.network_id);
  83. ++updateCount;
  84. Metrics::redis_node_checkin++;
  85. }
  86. // expire records from all-nodes and network-nodes member list
  87. uint64_t expireOld = OSUtils::now() - 300000;
  88. tx.zremrangebyscore(
  89. "nodes-online:{" + _controller_id + "}",
  90. sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
  91. tx.zremrangebyscore(
  92. "nodes-online2:{" + _controller_id + "}",
  93. sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
  94. tx.zremrangebyscore(
  95. "active-networks:{" + _controller_id + "}",
  96. sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
  97. for (const auto& nwid : networksUpdated) {
  98. tx.zremrangebyscore(
  99. "network-nodes-online:{" + _controller_id + "}:" + nwid,
  100. sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
  101. }
  102. fprintf(stderr, "%s: Updated online status of %d members\n", _controller_id.c_str(), updateCount);
  103. tx.exec();
  104. }
  105. } // namespace ZeroTier