RedisStatusWriter.cpp 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. #include "RedisStatusWriter.hpp"
  2. #include "../../node/Metrics.hpp"
  3. #include "../../osdep/OSUtils.hpp"
  4. #include <nlohmann/json.hpp>
  5. #include <set>
  6. namespace ZeroTier {
  7. RedisStatusWriter::RedisStatusWriter(std::shared_ptr<sw::redis::Redis> redis, std::string controller_id)
  8. : _redis(redis)
  9. , _mode(REDIS_MODE_STANDALONE)
  10. {
  11. }
  12. RedisStatusWriter::RedisStatusWriter(std::shared_ptr<sw::redis::RedisCluster> cluster, std::string controller_id)
  13. : _cluster(cluster)
  14. , _mode(REDIS_MODE_CLUSTER)
  15. {
  16. }
  17. RedisStatusWriter::~RedisStatusWriter()
  18. {
  19. writePending();
  20. }
  21. void RedisStatusWriter::updateNodeStatus(
  22. const std::string& network_id,
  23. const std::string& node_id,
  24. const std::string& os,
  25. const std::string& arch,
  26. const std::string& version,
  27. const InetAddress& address,
  28. int64_t last_seen,
  29. const std::string& /* frontend unused */)
  30. {
  31. std::lock_guard<std::mutex> l(_lock);
  32. _pending.push_back({ network_id, node_id, os, arch, version, address, last_seen, "" });
  33. }
  34. size_t RedisStatusWriter::queueLength() const
  35. {
  36. std::lock_guard<std::mutex> l(_lock);
  37. return _pending.size();
  38. }
  39. void RedisStatusWriter::writePending()
  40. {
  41. try {
  42. if (_mode == REDIS_MODE_STANDALONE) {
  43. auto tx = _redis->transaction(true, false);
  44. _doWritePending(tx);
  45. }
  46. else if (_mode == REDIS_MODE_CLUSTER) {
  47. auto tx = _cluster->transaction(_controller_id, true, false);
  48. _doWritePending(tx);
  49. }
  50. }
  51. catch (const sw::redis::Error& e) {
  52. // Log the error
  53. fprintf(stderr, "Error writing to Redis: %s\n", e.what());
  54. }
  55. }
  56. void RedisStatusWriter::_doWritePending(sw::redis::Transaction& tx)
  57. {
  58. std::vector<PendingStatusEntry> toWrite;
  59. {
  60. std::lock_guard<std::mutex> l(_lock);
  61. toWrite.swap(_pending);
  62. }
  63. if (toWrite.empty()) {
  64. return;
  65. }
  66. std::set<std::string> networksUpdated;
  67. uint64_t updateCount = 0;
  68. for (const auto& entry : _pending) {
  69. char iptmp[64] = { 0 };
  70. std::string ipAddr = entry.address.toIpString(iptmp);
  71. std::unordered_map<std::string, std::string> record = {
  72. { "id", entry.node_id }, { "address", ipAddr }, { "last_updated", std::to_string(entry.last_seen) },
  73. { "os", entry.os }, { "arch", entry.arch }, { "version", entry.version }
  74. };
  75. tx.zadd("nodes-online:{" + _controller_id + "}", entry.node_id, entry.last_seen)
  76. .zadd("nodes-online2:{" + _controller_id + "}", entry.network_id + "-" + entry.node_id, entry.last_seen)
  77. .zadd("network-nodes-online:{" + _controller_id + "}:" + entry.network_id, entry.node_id, entry.last_seen)
  78. .zadd("active-networks:{" + _controller_id + "}", entry.network_id, entry.last_seen)
  79. .sadd("network-nodes-all:{" + _controller_id + "}:" + entry.network_id, entry.node_id)
  80. .hmset(
  81. "member:{" + _controller_id + "}:" + entry.network_id + ":" + entry.node_id, record.begin(),
  82. record.end());
  83. networksUpdated.insert(entry.network_id);
  84. ++updateCount;
  85. Metrics::redis_node_checkin++;
  86. }
  87. // expire records from all-nodes and network-nodes member list
  88. uint64_t expireOld = OSUtils::now() - 300000;
  89. tx.zremrangebyscore(
  90. "nodes-online:{" + _controller_id + "}",
  91. sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
  92. tx.zremrangebyscore(
  93. "nodes-online2:{" + _controller_id + "}",
  94. sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
  95. tx.zremrangebyscore(
  96. "active-networks:{" + _controller_id + "}",
  97. sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
  98. for (const auto& nwid : networksUpdated) {
  99. tx.zremrangebyscore(
  100. "network-nodes-online:{" + _controller_id + "}:" + nwid,
  101. sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
  102. }
  103. fprintf(stderr, "%s: Updated online status of %d members\n", _controller_id.c_str(), updateCount);
  104. tx.exec();
  105. }
  106. } // namespace ZeroTier