PostgresStatusWriter.cpp 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. #include "PostgresStatusWriter.hpp"
  2. #include "../../node/Metrics.hpp"
  3. #include <nlohmann/json.hpp>
  4. #include <pqxx/pqxx>
  5. namespace ZeroTier {
  6. PostgresStatusWriter::PostgresStatusWriter(std::shared_ptr<ConnectionPool<PostgresConnection> > pool) : _pool(pool)
  7. {
  8. }
  9. PostgresStatusWriter::~PostgresStatusWriter()
  10. {
  11. writePending();
  12. }
  13. void PostgresStatusWriter::updateNodeStatus(
  14. const std::string& network_id,
  15. const std::string& node_id,
  16. const std::string& os,
  17. const std::string& arch,
  18. const std::string& version,
  19. const InetAddress& address,
  20. int64_t last_seen,
  21. const std::string& /* frontend unused */)
  22. {
  23. std::lock_guard<std::mutex> l(_lock);
  24. _pending.push_back({ network_id, node_id, os, arch, version, address, last_seen, "" });
  25. }
  26. size_t PostgresStatusWriter::queueLength() const
  27. {
  28. std::lock_guard<std::mutex> l(_lock);
  29. return _pending.size();
  30. }
  31. void PostgresStatusWriter::writePending()
  32. {
  33. std::vector<PendingStatusEntry> toWrite;
  34. {
  35. std::lock_guard<std::mutex> l(_lock);
  36. toWrite.swap(_pending);
  37. }
  38. if (toWrite.empty()) {
  39. return;
  40. }
  41. try {
  42. auto conn = _pool->borrow();
  43. pqxx::work w(*conn->c);
  44. pqxx::pipeline pipe(w);
  45. for (const auto& entry : toWrite) {
  46. char iptmp[64] = { 0 };
  47. nlohmann::json record = {
  48. { entry.address.toIpString(iptmp), entry.last_seen },
  49. };
  50. std::string insert_statement =
  51. "INSERT INTO network_memberships_ctl (device_id, network_id, last_seen, os, arch) "
  52. "VALUES ('"
  53. + w.esc(entry.node_id) + "', '" + w.esc(entry.network_id) + "', '" + w.esc(record.dump())
  54. + "'::JSONB, "
  55. "'"
  56. + w.esc(entry.os) + "', '" + w.esc(entry.arch)
  57. + "') "
  58. "ON CONFLICT (device_id, network_id) DO UPDATE SET os = EXCLUDED.os, arch = EXCLUDED.arch, "
  59. "last_seen = network_memberships_ctl.last_seen || EXCLUDED.last_seen";
  60. pipe.insert(insert_statement);
  61. Metrics::pgsql_node_checkin++;
  62. }
  63. pipe.complete();
  64. w.commit();
  65. _pool->unborrow(conn);
  66. }
  67. catch (const std::exception& e) {
  68. // Log the error
  69. fprintf(stderr, "Error writing to Postgres: %s\n", e.what());
  70. }
  71. }
  72. } // namespace ZeroTier