PostgresStatusWriter.cpp 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. #include "PostgresStatusWriter.hpp"
  2. #include "../../node/Metrics.hpp"
  3. #include <nlohmann/json.hpp>
  4. #include <pqxx/pqxx>
  5. namespace ZeroTier {
  6. PostgresStatusWriter::PostgresStatusWriter(std::shared_ptr<ConnectionPool<PostgresConnection> > pool) : _pool(pool)
  7. {
  8. }
  9. PostgresStatusWriter::~PostgresStatusWriter()
  10. {
  11. writePending();
  12. }
  13. void PostgresStatusWriter::updateNodeStatus(
  14. const std::string& network_id,
  15. const std::string& node_id,
  16. const std::string& os,
  17. const std::string& arch,
  18. const std::string& version,
  19. const InetAddress& address,
  20. int64_t last_seen)
  21. {
  22. std::lock_guard<std::mutex> l(_lock);
  23. _pending.push_back({ network_id, node_id, os, arch, version, address, last_seen });
  24. }
  25. size_t PostgresStatusWriter::queueLength() const
  26. {
  27. std::lock_guard<std::mutex> l(_lock);
  28. return _pending.size();
  29. }
  30. void PostgresStatusWriter::writePending()
  31. {
  32. std::vector<PendingStatusEntry> toWrite;
  33. {
  34. std::lock_guard<std::mutex> l(_lock);
  35. toWrite.swap(_pending);
  36. }
  37. if (toWrite.empty()) {
  38. return;
  39. }
  40. try {
  41. auto conn = _pool->borrow();
  42. pqxx::work w(*conn->c);
  43. pqxx::pipeline pipe(w);
  44. for (const auto& entry : toWrite) {
  45. char iptmp[64] = { 0 };
  46. nlohmann::json record = {
  47. { entry.address.toIpString(iptmp), entry.last_seen },
  48. };
  49. std::string insert_statement =
  50. "INSERT INTO network_memberships_ctl (device_id, network_id, last_seen, os, arch) "
  51. "VALUES ('"
  52. + w.esc(entry.node_id) + "', '" + w.esc(entry.network_id) + "', '" + w.esc(record.dump())
  53. + "'::JSONB, "
  54. "'"
  55. + w.esc(entry.os) + "', '" + w.esc(entry.arch)
  56. + "') "
  57. "ON CONFLICT (device_id, network_id) DO UPDATE SET os = EXCLUDED.os, arch = EXCLUDED.arch, "
  58. "last_seen = network_memberships_ctl.last_seen || EXCLUDED.last_seen";
  59. pipe.insert(insert_statement);
  60. Metrics::pgsql_node_checkin++;
  61. }
  62. pipe.complete();
  63. w.commit();
  64. _pool->unborrow(conn);
  65. }
  66. catch (const std::exception& e) {
  67. // Log the error
  68. fprintf(stderr, "Error writing to Postgres: %s\n", e.what());
  69. }
  70. }
  71. } // namespace ZeroTier