PostgresStatusWriter.cpp 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. #include "PostgresStatusWriter.hpp"
  2. #include "../node/Metrics.hpp"
  3. #include <nlohmann/json.hpp>
  4. #include <pqxx/pqxx>
  5. namespace ZeroTier {
  6. PostgresStatusWriter::PostgresStatusWriter(std::shared_ptr<ConnectionPool<PostgresConnection> > pool) : _pool(pool)
  7. {
  8. }
  9. PostgresStatusWriter::~PostgresStatusWriter()
  10. {
  11. writePending();
  12. }
  13. void PostgresStatusWriter::updateNodeStatus(
  14. const std::string& network_id,
  15. const std::string& node_id,
  16. const std::string& os,
  17. const std::string& arch,
  18. const std::string& version,
  19. const InetAddress& address,
  20. int64_t last_seen)
  21. {
  22. std::lock_guard<std::mutex> l(_lock);
  23. _pending.push_back({ network_id, node_id, os, arch, version, address, last_seen });
  24. }
  25. size_t PostgresStatusWriter::queueLength() const
  26. {
  27. std::lock_guard<std::mutex> l(_lock);
  28. return _pending.size();
  29. }
  30. void PostgresStatusWriter::writePending()
  31. {
  32. std::vector<PendingStatusEntry> toWrite;
  33. {
  34. std::lock_guard<std::mutex> l(_lock);
  35. toWrite.swap(_pending);
  36. }
  37. if (toWrite.empty()) {
  38. return;
  39. }
  40. try {
  41. auto conn = _pool->borrow();
  42. pqxx::work w(*conn->c);
  43. pqxx::pipeline pipe(w);
  44. for (const auto& entry : toWrite) {
  45. char iptmp[64] = { 0 };
  46. nlohmann::json record = {
  47. { entry.address.toIpString(iptmp), entry.last_seen },
  48. };
  49. try {
  50. // check if the member exists first.
  51. //
  52. // exec_params1 will throw pqxx::unexpected_rows if not exactly one row is returned. If that's the
  53. // case, skip this record and move on.
  54. w.exec_params1(
  55. "SELECT device_id, network_id FROM network_memberships_ctl WHERE network_id = $1 AND device_id = "
  56. "$2",
  57. entry.network_id, entry.node_id);
  58. }
  59. catch (pqxx::unexpected_rows& e) {
  60. continue;
  61. }
  62. std::string insert_statement =
  63. "INSERT INTO network_memberships_ctl (device_id, network_id, last_seen, os, arch) "
  64. "VALUES ('"
  65. + w.esc(entry.node_id) + "', '" + w.esc(entry.network_id) + "', '" + w.esc(record.dump())
  66. + "'::JSONB, "
  67. "'"
  68. + w.esc(entry.os) + "', '" + w.esc(entry.arch)
  69. + "') "
  70. "ON CONFLICT (device_id, network_id) DO UPDATE SET os = EXCLUDED.os, arch = EXCLUDED.arch, "
  71. "last_seen = network_memberships_ctl.last_seen || EXCLUDED.last_seen";
  72. pipe.insert(insert_statement);
  73. Metrics::pgsql_node_checkin++;
  74. }
  75. pipe.complete();
  76. w.commit();
  77. _pool->unborrow(conn);
  78. }
  79. catch (const std::exception& e) {
  80. // Log the error
  81. fprintf(stderr, "Error writing to Postgres: %s\n", e.what());
  82. }
  83. }
  84. } // namespace ZeroTier