123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372 |
- /*
- * Copyright (c)2019 ZeroTier, Inc.
- *
- * Use of this software is governed by the Business Source License included
- * in the LICENSE.TXT file in the project's root directory.
- *
- * Change Date: 2026-01-01
- *
- * On the date above, in accordance with the Business Source License, use
- * of this software will be governed by version 2.0 of the Apache License.
- */
- /****/
- #include "DBMirrorSet.hpp"
- #include "opentelemetry/trace/provider.h"
- namespace ZeroTier {
- DBMirrorSet::DBMirrorSet(DB::ChangeListener* listener) : _listener(listener), _running(true), _syncCheckerThread(), _dbs(), _dbs_l()
- {
- _syncCheckerThread = std::thread([this]() {
- for (;;) {
- for (int i = 0; i < 120; ++i) { // 1 minute delay between checks
- if (! _running)
- return;
- std::this_thread::sleep_for(std::chrono::milliseconds(500));
- }
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db::syncChecker");
- auto scope = tracer->WithActiveSpan(span);
- std::vector<std::shared_ptr<DB> > dbs;
- {
- std::unique_lock<std::shared_mutex> l(_dbs_l);
- if (_dbs.size() <= 1)
- continue; // no need to do this if there's only one DB, so skip the iteration
- dbs = _dbs;
- }
- for (auto db = dbs.begin(); db != dbs.end(); ++db) {
- (*db)->each([&dbs, &db](uint64_t networkId, const nlohmann::json& network, uint64_t memberId, const nlohmann::json& member) {
- try {
- if (network.is_object()) {
- if (memberId == 0) {
- for (auto db2 = dbs.begin(); db2 != dbs.end(); ++db2) {
- if (db->get() != db2->get()) {
- nlohmann::json nw2;
- if ((! (*db2)->get(networkId, nw2)) || ((nw2.is_object()) && (OSUtils::jsonInt(nw2["revision"], 0) < OSUtils::jsonInt(network["revision"], 0)))) {
- nw2 = network;
- (*db2)->save(nw2, false);
- }
- }
- }
- }
- else if (member.is_object()) {
- for (auto db2 = dbs.begin(); db2 != dbs.end(); ++db2) {
- if (db->get() != db2->get()) {
- nlohmann::json nw2, m2;
- if ((! (*db2)->get(networkId, nw2, memberId, m2)) || ((m2.is_object()) && (OSUtils::jsonInt(m2["revision"], 0) < OSUtils::jsonInt(member["revision"], 0)))) {
- m2 = member;
- (*db2)->save(m2, false);
- }
- }
- }
- }
- }
- }
- catch (...) {
- } // skip entries that generate JSON errors
- });
- }
- }
- });
- }
- DBMirrorSet::~DBMirrorSet()
- {
- _running = false;
- _syncCheckerThread.join();
- }
- bool DBMirrorSet::hasNetwork(const uint64_t networkId) const
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::hasNetwork");
- auto scope = tracer->WithActiveSpan(span);
- std::shared_lock<std::shared_mutex> l(_dbs_l);
- for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
- if ((*d)->hasNetwork(networkId))
- return true;
- }
- return false;
- }
- bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network)
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::getNetwork");
- auto scope = tracer->WithActiveSpan(span);
- char networkIdStr[17];
- span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
- std::shared_lock<std::shared_mutex> l(_dbs_l);
- for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
- if ((*d)->get(networkId, network)) {
- return true;
- }
- }
- return false;
- }
- bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network, const uint64_t memberId, nlohmann::json& member)
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::getNetworkAndMember");
- auto scope = tracer->WithActiveSpan(span);
- char networkIdStr[17];
- char memberIdStr[11];
- span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
- span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
- std::shared_lock<std::shared_mutex> l(_dbs_l);
- for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
- if ((*d)->get(networkId, network, memberId, member))
- return true;
- }
- return false;
- }
- bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network, const uint64_t memberId, nlohmann::json& member, DB::NetworkSummaryInfo& info)
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::getNetworkAndMemberWithSummary");
- auto scope = tracer->WithActiveSpan(span);
- char networkIdStr[17];
- char memberIdStr[11];
- span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
- span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
- std::shared_lock<std::shared_mutex> l(_dbs_l);
- for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
- if ((*d)->get(networkId, network, memberId, member, info))
- return true;
- }
- return false;
- }
- bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network, std::vector<nlohmann::json>& members)
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::getNetworkAndMembers");
- auto scope = tracer->WithActiveSpan(span);
- char networkIdStr[17];
- span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
- std::shared_lock<std::shared_mutex> l(_dbs_l);
- for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
- if ((*d)->get(networkId, network, members))
- return true;
- }
- return false;
- }
- AuthInfo DBMirrorSet::getSSOAuthInfo(const nlohmann::json& member, const std::string& redirectURL)
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::getSSOAuthInfo");
- auto scope = tracer->WithActiveSpan(span);
- std::shared_lock<std::shared_mutex> l(_dbs_l);
- for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
- AuthInfo info = (*d)->getSSOAuthInfo(member, redirectURL);
- if (info.enabled) {
- return info;
- }
- }
- return AuthInfo();
- }
- void DBMirrorSet::networks(std::set<uint64_t>& networks)
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::networks");
- auto scope = tracer->WithActiveSpan(span);
- std::shared_lock<std::shared_mutex> l(_dbs_l);
- for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
- (*d)->networks(networks);
- }
- }
- bool DBMirrorSet::waitForReady()
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::waitForReady");
- auto scope = tracer->WithActiveSpan(span);
- bool r = false;
- std::shared_lock<std::shared_mutex> l(_dbs_l);
- for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
- r |= (*d)->waitForReady();
- }
- return r;
- }
- bool DBMirrorSet::isReady()
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::isReady");
- auto scope = tracer->WithActiveSpan(span);
- std::shared_lock<std::shared_mutex> l(_dbs_l);
- for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
- if (! (*d)->isReady())
- return false;
- }
- return true;
- }
- bool DBMirrorSet::save(nlohmann::json& record, bool notifyListeners)
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::save");
- auto scope = tracer->WithActiveSpan(span);
- std::vector<std::shared_ptr<DB> > dbs;
- {
- std::unique_lock<std::shared_mutex> l(_dbs_l);
- dbs = _dbs;
- }
- if (notifyListeners) {
- for (auto d = dbs.begin(); d != dbs.end(); ++d) {
- if ((*d)->save(record, true))
- return true;
- }
- return false;
- }
- else {
- bool modified = false;
- for (auto d = dbs.begin(); d != dbs.end(); ++d) {
- modified |= (*d)->save(record, false);
- }
- return modified;
- }
- }
- void DBMirrorSet::eraseNetwork(const uint64_t networkId)
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::eraseNetwork");
- auto scope = tracer->WithActiveSpan(span);
- char networkIdStr[17];
- span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
- std::unique_lock<std::shared_mutex> l(_dbs_l);
- for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
- (*d)->eraseNetwork(networkId);
- }
- }
- void DBMirrorSet::eraseMember(const uint64_t networkId, const uint64_t memberId)
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::eraseMember");
- auto scope = tracer->WithActiveSpan(span);
- char networkIdStr[17];
- char memberIdStr[11];
- span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
- span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
- std::unique_lock<std::shared_mutex> l(_dbs_l);
- for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
- (*d)->eraseMember(networkId, memberId);
- }
- }
- void DBMirrorSet::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress, const char* osArch)
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::nodeIsOnline");
- auto scope = tracer->WithActiveSpan(span);
- char networkIdStr[17];
- char memberIdStr[11];
- char phyAddressStr[INET6_ADDRSTRLEN];
- span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
- span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
- span->SetAttribute("physical_address", physicalAddress.toString(phyAddressStr));
- span->SetAttribute("os_arch", osArch);
- std::shared_lock<std::shared_mutex> l(_dbs_l);
- for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
- (*d)->nodeIsOnline(networkId, memberId, physicalAddress, osArch);
- }
- }
- void DBMirrorSet::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress)
- {
- this->nodeIsOnline(networkId, memberId, physicalAddress, "unknown/unknown");
- }
- void DBMirrorSet::onNetworkUpdate(const void* db, uint64_t networkId, const nlohmann::json& network)
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::onNetworkUpdate");
- auto scope = tracer->WithActiveSpan(span);
- char networkIdStr[17];
- span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
- nlohmann::json record(network);
- std::unique_lock<std::shared_mutex> l(_dbs_l);
- for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
- if (d->get() != db) {
- (*d)->save(record, false);
- }
- }
- _listener->onNetworkUpdate(this, networkId, network);
- }
- void DBMirrorSet::onNetworkMemberUpdate(const void* db, uint64_t networkId, uint64_t memberId, const nlohmann::json& member)
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::onNetworkMemberUpdate");
- auto scope = tracer->WithActiveSpan(span);
- char networkIdStr[17];
- char memberIdStr[11];
- span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
- span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
- nlohmann::json record(member);
- std::unique_lock<std::shared_mutex> l(_dbs_l);
- for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
- if (d->get() != db) {
- (*d)->save(record, false);
- }
- }
- _listener->onNetworkMemberUpdate(this, networkId, memberId, member);
- }
- void DBMirrorSet::onNetworkMemberDeauthorize(const void* db, uint64_t networkId, uint64_t memberId)
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("db_mirror_set");
- auto span = tracer->StartSpan("db_mirror_set::onNetworkMemberDeauthorize");
- auto scope = tracer->WithActiveSpan(span);
- char networkIdStr[17];
- char memberIdStr[11];
- span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
- span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
- _listener->onNetworkMemberDeauthorize(this, networkId, memberId);
- }
- } // namespace ZeroTier
|