BigTableStatusWriter.cpp 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. #include "BigTableStatusWriter.hpp"
  2. #include "ControllerConfig.hpp"
  3. #include <google/cloud/bigtable/mutations.h>
  4. #include <google/cloud/bigtable/row.h>
  5. #include <google/cloud/bigtable/table.h>
  6. namespace cbt = google::cloud::bigtable;
  7. namespace ZeroTier {
  8. const std::string nodeInfoColumnFamily = "node_info";
  9. const std::string checkInColumnFamily = "check_in";
  10. const std::string osColumn = "os";
  11. const std::string archColumn = "arch";
  12. const std::string versionColumn = "version";
  13. const std::string ipv4Column = "ipv4";
  14. const std::string ipv6Column = "ipv6";
  15. const std::string lastSeenColumn = "last_seen";
  16. BigTableStatusWriter::BigTableStatusWriter(
  17. const std::string& project_id,
  18. const std::string& instance_id,
  19. const std::string& table_id)
  20. : _project_id(project_id)
  21. , _instance_id(instance_id)
  22. , _table_id(table_id)
  23. {
  24. }
  25. BigTableStatusWriter::~BigTableStatusWriter()
  26. {
  27. writePending();
  28. }
  29. void BigTableStatusWriter::updateNodeStatus(
  30. const std::string& network_id,
  31. const std::string& node_id,
  32. const std::string& os,
  33. const std::string& arch,
  34. const std::string& version,
  35. const InetAddress& address,
  36. int64_t last_seen)
  37. {
  38. std::lock_guard<std::mutex> l(_lock);
  39. _pending.push_back({ network_id, node_id, os, arch, version, address, last_seen });
  40. if (_pending.size() >= 100) {
  41. writePending();
  42. }
  43. }
  44. size_t BigTableStatusWriter::queueLength() const
  45. {
  46. std::lock_guard<std::mutex> l(_lock);
  47. return _pending.size();
  48. }
  49. void BigTableStatusWriter::writePending()
  50. {
  51. std::vector<PendingStatusEntry> toWrite;
  52. {
  53. std::lock_guard<std::mutex> l(_lock);
  54. toWrite.swap(_pending);
  55. }
  56. if (toWrite.empty()) {
  57. return;
  58. }
  59. namespace cbt = google::cloud::bigtable;
  60. cbt::Table table(cbt::MakeDataConnection(), cbt::TableResource(_project_id, _instance_id, _table_id));
  61. cbt::BulkMutation bulk;
  62. for (const auto& entry : toWrite) {
  63. std::string row_key = entry.network_id + "#" + entry.node_id;
  64. cbt::SingleRowMutation m(row_key);
  65. m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, osColumn, entry.os));
  66. m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, archColumn, entry.arch));
  67. m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, versionColumn, entry.version));
  68. char buf[64] = { 0 };
  69. if (entry.address.ss_family == AF_INET) {
  70. m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv4Column, entry.address.toString(buf)));
  71. }
  72. else if (entry.address.ss_family == AF_INET6) {
  73. m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv6Column, entry.address.toString(buf)));
  74. }
  75. int64_t ts = entry.last_seen;
  76. m.emplace_back(cbt::SetCell(checkInColumnFamily, lastSeenColumn, std::move(ts)));
  77. bulk.push_back(std::move(m));
  78. }
  79. std::vector<cbt::FailedMutation> failures = table.BulkApply(bulk);
  80. for (auto const& r : failures) {
  81. // Handle error (log it, retry, etc.)
  82. std::cerr << "Error writing to BigTable: " << r.status() << "\n";
  83. }
  84. }
  85. } // namespace ZeroTier