| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- #include "PostgresStatusWriter.hpp"
- #include "../../node/Metrics.hpp"
- #include <nlohmann/json.hpp>
- #include <pqxx/pqxx>
- namespace ZeroTier {
- PostgresStatusWriter::PostgresStatusWriter(std::shared_ptr<ConnectionPool<PostgresConnection> > pool) : _pool(pool)
- {
- }
- PostgresStatusWriter::~PostgresStatusWriter()
- {
- writePending();
- }
- void PostgresStatusWriter::updateNodeStatus(
- const std::string& network_id,
- const std::string& node_id,
- const std::string& os,
- const std::string& arch,
- const std::string& version,
- const InetAddress& address,
- int64_t last_seen,
- const std::string& /* frontend unused */)
- {
- std::lock_guard<std::mutex> l(_lock);
- _pending.push_back({ network_id, node_id, os, arch, version, address, last_seen, "" });
- }
- size_t PostgresStatusWriter::queueLength() const
- {
- std::lock_guard<std::mutex> l(_lock);
- return _pending.size();
- }
- void PostgresStatusWriter::writePending()
- {
- std::vector<PendingStatusEntry> toWrite;
- {
- std::lock_guard<std::mutex> l(_lock);
- toWrite.swap(_pending);
- }
- if (toWrite.empty()) {
- return;
- }
- try {
- auto conn = _pool->borrow();
- pqxx::work w(*conn->c);
- pqxx::pipeline pipe(w);
- for (const auto& entry : toWrite) {
- char iptmp[64] = { 0 };
- nlohmann::json record = {
- { entry.address.toIpString(iptmp), entry.last_seen },
- };
- std::string insert_statement =
- "INSERT INTO network_memberships_ctl (device_id, network_id, last_seen, os, arch) "
- "VALUES ('"
- + w.esc(entry.node_id) + "', '" + w.esc(entry.network_id) + "', '" + w.esc(record.dump())
- + "'::JSONB, "
- "'"
- + w.esc(entry.os) + "', '" + w.esc(entry.arch)
- + "') "
- "ON CONFLICT (device_id, network_id) DO UPDATE SET os = EXCLUDED.os, arch = EXCLUDED.arch, "
- "last_seen = network_memberships_ctl.last_seen || EXCLUDED.last_seen";
- pipe.insert(insert_statement);
- Metrics::pgsql_node_checkin++;
- }
- pipe.complete();
- w.commit();
- _pool->unborrow(conn);
- }
- catch (const std::exception& e) {
- // Log the error
- fprintf(stderr, "Error writing to Postgres: %s\n", e.what());
- }
- }
- } // namespace ZeroTier
|