BigTableStatusWriter.cpp 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. #include "BigTableStatusWriter.hpp"
  2. #include <google/cloud/bigtable/mutations.h>
  3. #include <google/cloud/bigtable/row.h>
  4. #include <google/cloud/bigtable/table.h>
  5. namespace cbt = google::cloud::bigtable;
  6. namespace ZeroTier {
  7. const std::string nodeInfoColumnFamily = "node_info";
  8. const std::string checkInColumnFamily = "check_in";
  9. const std::string osColumn = "os";
  10. const std::string archColumn = "arch";
  11. const std::string versionColumn = "version";
  12. const std::string ipv4Column = "ipv4";
  13. const std::string ipv6Column = "ipv6";
  14. const std::string lastSeenColumn = "last_seen";
  15. BigTableStatusWriter::BigTableStatusWriter(
  16. const std::string& project_id,
  17. const std::string& instance_id,
  18. const std::string& table_id)
  19. : _project_id(project_id)
  20. , _instance_id(instance_id)
  21. , _table_id(table_id)
  22. {
  23. }
  24. BigTableStatusWriter::~BigTableStatusWriter()
  25. {
  26. writePending();
  27. }
  28. void BigTableStatusWriter::updateNodeStatus(
  29. const std::string& network_id,
  30. const std::string& node_id,
  31. const std::string& os,
  32. const std::string& arch,
  33. const std::string& version,
  34. const InetAddress& address,
  35. int64_t last_seen)
  36. {
  37. std::lock_guard<std::mutex> l(_lock);
  38. _pending.push_back({ network_id, node_id, os, arch, version, address, last_seen });
  39. if (_pending.size() >= 100) {
  40. writePending();
  41. }
  42. }
  43. size_t BigTableStatusWriter::queueLength() const
  44. {
  45. std::lock_guard<std::mutex> l(_lock);
  46. return _pending.size();
  47. }
  48. void BigTableStatusWriter::writePending()
  49. {
  50. std::vector<PendingStatusEntry> toWrite;
  51. {
  52. std::lock_guard<std::mutex> l(_lock);
  53. toWrite.swap(_pending);
  54. }
  55. if (toWrite.empty()) {
  56. return;
  57. }
  58. namespace cbt = google::cloud::bigtable;
  59. cbt::Table table(cbt::MakeDataConnection(), cbt::TableResource(_project_id, _instance_id, _table_id));
  60. cbt::BulkMutation bulk;
  61. for (const auto& entry : toWrite) {
  62. std::string row_key = entry.network_id + "#" + entry.node_id;
  63. cbt::SingleRowMutation m(row_key);
  64. m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, osColumn, entry.os));
  65. m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, archColumn, entry.arch));
  66. m.emplace_back(cbt::SetCell(nodeInfoColumnFamily, versionColumn, entry.version));
  67. char buf[64] = { 0 };
  68. if (entry.address.ss_family == AF_INET) {
  69. m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv4Column, entry.address.toString(buf)));
  70. }
  71. else if (entry.address.ss_family == AF_INET6) {
  72. m.emplace_back(cbt::SetCell(checkInColumnFamily, ipv6Column, entry.address.toString(buf)));
  73. }
  74. int64_t ts = entry.last_seen;
  75. m.emplace_back(cbt::SetCell(checkInColumnFamily, lastSeenColumn, std::move(ts)));
  76. bulk.push_back(std::move(m));
  77. }
  78. std::vector<cbt::FailedMutation> failures = table.BulkApply(bulk);
  79. for (auto const& r : failures) {
  80. // Handle error (log it, retry, etc.)
  81. std::cerr << "Error writing to BigTable: " << r.status() << "\n";
  82. }
  83. }
  84. } // namespace ZeroTier