DBMirrorSet.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. /*
  2. * Copyright (c)2019 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2026-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #include "DBMirrorSet.hpp"
  14. #include "opentelemetry/trace/provider.h"
  15. namespace ZeroTier {
  16. DBMirrorSet::DBMirrorSet(DB::ChangeListener* listener) : _listener(listener), _running(true), _syncCheckerThread(), _dbs(), _dbs_l()
  17. {
  18. _syncCheckerThread = std::thread([this]() {
  19. for (;;) {
  20. for (int i = 0; i < 120; ++i) { // 1 minute delay between checks
  21. if (! _running)
  22. return;
  23. std::this_thread::sleep_for(std::chrono::milliseconds(500));
  24. }
  25. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  26. auto tracer = provider->GetTracer("db_mirror_set");
  27. auto span = tracer->StartSpan("db::syncChecker");
  28. auto scope = tracer->WithActiveSpan(span);
  29. std::vector<std::shared_ptr<DB> > dbs;
  30. {
  31. std::unique_lock<std::shared_mutex> l(_dbs_l);
  32. if (_dbs.size() <= 1)
  33. continue; // no need to do this if there's only one DB, so skip the iteration
  34. dbs = _dbs;
  35. }
  36. for (auto db = dbs.begin(); db != dbs.end(); ++db) {
  37. (*db)->each([&dbs, &db](uint64_t networkId, const nlohmann::json& network, uint64_t memberId, const nlohmann::json& member) {
  38. try {
  39. if (network.is_object()) {
  40. if (memberId == 0) {
  41. for (auto db2 = dbs.begin(); db2 != dbs.end(); ++db2) {
  42. if (db->get() != db2->get()) {
  43. nlohmann::json nw2;
  44. if ((! (*db2)->get(networkId, nw2)) || ((nw2.is_object()) && (OSUtils::jsonInt(nw2["revision"], 0) < OSUtils::jsonInt(network["revision"], 0)))) {
  45. nw2 = network;
  46. (*db2)->save(nw2, false);
  47. }
  48. }
  49. }
  50. }
  51. else if (member.is_object()) {
  52. for (auto db2 = dbs.begin(); db2 != dbs.end(); ++db2) {
  53. if (db->get() != db2->get()) {
  54. nlohmann::json nw2, m2;
  55. if ((! (*db2)->get(networkId, nw2, memberId, m2)) || ((m2.is_object()) && (OSUtils::jsonInt(m2["revision"], 0) < OSUtils::jsonInt(member["revision"], 0)))) {
  56. m2 = member;
  57. (*db2)->save(m2, false);
  58. }
  59. }
  60. }
  61. }
  62. }
  63. }
  64. catch (...) {
  65. } // skip entries that generate JSON errors
  66. });
  67. }
  68. }
  69. });
  70. }
  71. DBMirrorSet::~DBMirrorSet()
  72. {
  73. _running = false;
  74. _syncCheckerThread.join();
  75. }
  76. bool DBMirrorSet::hasNetwork(const uint64_t networkId) const
  77. {
  78. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  79. auto tracer = provider->GetTracer("db_mirror_set");
  80. auto span = tracer->StartSpan("db_mirror_set::hasNetwork");
  81. auto scope = tracer->WithActiveSpan(span);
  82. std::shared_lock<std::shared_mutex> l(_dbs_l);
  83. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  84. if ((*d)->hasNetwork(networkId))
  85. return true;
  86. }
  87. return false;
  88. }
  89. bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network)
  90. {
  91. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  92. auto tracer = provider->GetTracer("db_mirror_set");
  93. auto span = tracer->StartSpan("db_mirror_set::getNetwork");
  94. auto scope = tracer->WithActiveSpan(span);
  95. char networkIdStr[17];
  96. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  97. std::shared_lock<std::shared_mutex> l(_dbs_l);
  98. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  99. if ((*d)->get(networkId, network)) {
  100. return true;
  101. }
  102. }
  103. return false;
  104. }
  105. bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network, const uint64_t memberId, nlohmann::json& member)
  106. {
  107. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  108. auto tracer = provider->GetTracer("db_mirror_set");
  109. auto span = tracer->StartSpan("db_mirror_set::getNetworkAndMember");
  110. auto scope = tracer->WithActiveSpan(span);
  111. char networkIdStr[17];
  112. char memberIdStr[11];
  113. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  114. span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
  115. std::shared_lock<std::shared_mutex> l(_dbs_l);
  116. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  117. if ((*d)->get(networkId, network, memberId, member))
  118. return true;
  119. }
  120. return false;
  121. }
  122. bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network, const uint64_t memberId, nlohmann::json& member, DB::NetworkSummaryInfo& info)
  123. {
  124. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  125. auto tracer = provider->GetTracer("db_mirror_set");
  126. auto span = tracer->StartSpan("db_mirror_set::getNetworkAndMemberWithSummary");
  127. auto scope = tracer->WithActiveSpan(span);
  128. char networkIdStr[17];
  129. char memberIdStr[11];
  130. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  131. span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
  132. std::shared_lock<std::shared_mutex> l(_dbs_l);
  133. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  134. if ((*d)->get(networkId, network, memberId, member, info))
  135. return true;
  136. }
  137. return false;
  138. }
  139. bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network, std::vector<nlohmann::json>& members)
  140. {
  141. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  142. auto tracer = provider->GetTracer("db_mirror_set");
  143. auto span = tracer->StartSpan("db_mirror_set::getNetworkAndMembers");
  144. auto scope = tracer->WithActiveSpan(span);
  145. char networkIdStr[17];
  146. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  147. std::shared_lock<std::shared_mutex> l(_dbs_l);
  148. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  149. if ((*d)->get(networkId, network, members))
  150. return true;
  151. }
  152. return false;
  153. }
  154. AuthInfo DBMirrorSet::getSSOAuthInfo(const nlohmann::json& member, const std::string& redirectURL)
  155. {
  156. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  157. auto tracer = provider->GetTracer("db_mirror_set");
  158. auto span = tracer->StartSpan("db_mirror_set::getSSOAuthInfo");
  159. auto scope = tracer->WithActiveSpan(span);
  160. std::shared_lock<std::shared_mutex> l(_dbs_l);
  161. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  162. AuthInfo info = (*d)->getSSOAuthInfo(member, redirectURL);
  163. if (info.enabled) {
  164. return info;
  165. }
  166. }
  167. return AuthInfo();
  168. }
  169. void DBMirrorSet::networks(std::set<uint64_t>& networks)
  170. {
  171. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  172. auto tracer = provider->GetTracer("db_mirror_set");
  173. auto span = tracer->StartSpan("db_mirror_set::networks");
  174. auto scope = tracer->WithActiveSpan(span);
  175. std::shared_lock<std::shared_mutex> l(_dbs_l);
  176. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  177. (*d)->networks(networks);
  178. }
  179. }
  180. bool DBMirrorSet::waitForReady()
  181. {
  182. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  183. auto tracer = provider->GetTracer("db_mirror_set");
  184. auto span = tracer->StartSpan("db_mirror_set::waitForReady");
  185. auto scope = tracer->WithActiveSpan(span);
  186. bool r = false;
  187. std::shared_lock<std::shared_mutex> l(_dbs_l);
  188. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  189. r |= (*d)->waitForReady();
  190. }
  191. return r;
  192. }
  193. bool DBMirrorSet::isReady()
  194. {
  195. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  196. auto tracer = provider->GetTracer("db_mirror_set");
  197. auto span = tracer->StartSpan("db_mirror_set::isReady");
  198. auto scope = tracer->WithActiveSpan(span);
  199. std::shared_lock<std::shared_mutex> l(_dbs_l);
  200. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  201. if (! (*d)->isReady())
  202. return false;
  203. }
  204. return true;
  205. }
  206. bool DBMirrorSet::save(nlohmann::json& record, bool notifyListeners)
  207. {
  208. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  209. auto tracer = provider->GetTracer("db_mirror_set");
  210. auto span = tracer->StartSpan("db_mirror_set::save");
  211. auto scope = tracer->WithActiveSpan(span);
  212. std::vector<std::shared_ptr<DB> > dbs;
  213. {
  214. std::unique_lock<std::shared_mutex> l(_dbs_l);
  215. dbs = _dbs;
  216. }
  217. if (notifyListeners) {
  218. for (auto d = dbs.begin(); d != dbs.end(); ++d) {
  219. if ((*d)->save(record, true))
  220. return true;
  221. }
  222. return false;
  223. }
  224. else {
  225. bool modified = false;
  226. for (auto d = dbs.begin(); d != dbs.end(); ++d) {
  227. modified |= (*d)->save(record, false);
  228. }
  229. return modified;
  230. }
  231. }
  232. void DBMirrorSet::eraseNetwork(const uint64_t networkId)
  233. {
  234. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  235. auto tracer = provider->GetTracer("db_mirror_set");
  236. auto span = tracer->StartSpan("db_mirror_set::eraseNetwork");
  237. auto scope = tracer->WithActiveSpan(span);
  238. char networkIdStr[17];
  239. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  240. std::unique_lock<std::shared_mutex> l(_dbs_l);
  241. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  242. (*d)->eraseNetwork(networkId);
  243. }
  244. }
  245. void DBMirrorSet::eraseMember(const uint64_t networkId, const uint64_t memberId)
  246. {
  247. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  248. auto tracer = provider->GetTracer("db_mirror_set");
  249. auto span = tracer->StartSpan("db_mirror_set::eraseMember");
  250. auto scope = tracer->WithActiveSpan(span);
  251. char networkIdStr[17];
  252. char memberIdStr[11];
  253. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  254. span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
  255. std::unique_lock<std::shared_mutex> l(_dbs_l);
  256. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  257. (*d)->eraseMember(networkId, memberId);
  258. }
  259. }
  260. void DBMirrorSet::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress, const char* osArch)
  261. {
  262. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  263. auto tracer = provider->GetTracer("db_mirror_set");
  264. auto span = tracer->StartSpan("db_mirror_set::nodeIsOnline");
  265. auto scope = tracer->WithActiveSpan(span);
  266. char networkIdStr[17];
  267. char memberIdStr[11];
  268. char phyAddressStr[INET6_ADDRSTRLEN];
  269. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  270. span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
  271. span->SetAttribute("physical_address", physicalAddress.toString(phyAddressStr));
  272. span->SetAttribute("os_arch", osArch);
  273. std::shared_lock<std::shared_mutex> l(_dbs_l);
  274. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  275. (*d)->nodeIsOnline(networkId, memberId, physicalAddress, osArch);
  276. }
  277. }
  278. void DBMirrorSet::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress)
  279. {
  280. this->nodeIsOnline(networkId, memberId, physicalAddress, "unknown/unknown");
  281. }
  282. void DBMirrorSet::onNetworkUpdate(const void* db, uint64_t networkId, const nlohmann::json& network)
  283. {
  284. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  285. auto tracer = provider->GetTracer("db_mirror_set");
  286. auto span = tracer->StartSpan("db_mirror_set::onNetworkUpdate");
  287. auto scope = tracer->WithActiveSpan(span);
  288. char networkIdStr[17];
  289. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  290. nlohmann::json record(network);
  291. std::unique_lock<std::shared_mutex> l(_dbs_l);
  292. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  293. if (d->get() != db) {
  294. (*d)->save(record, false);
  295. }
  296. }
  297. _listener->onNetworkUpdate(this, networkId, network);
  298. }
  299. void DBMirrorSet::onNetworkMemberUpdate(const void* db, uint64_t networkId, uint64_t memberId, const nlohmann::json& member)
  300. {
  301. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  302. auto tracer = provider->GetTracer("db_mirror_set");
  303. auto span = tracer->StartSpan("db_mirror_set::onNetworkMemberUpdate");
  304. auto scope = tracer->WithActiveSpan(span);
  305. char networkIdStr[17];
  306. char memberIdStr[11];
  307. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  308. span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
  309. nlohmann::json record(member);
  310. std::unique_lock<std::shared_mutex> l(_dbs_l);
  311. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  312. if (d->get() != db) {
  313. (*d)->save(record, false);
  314. }
  315. }
  316. _listener->onNetworkMemberUpdate(this, networkId, memberId, member);
  317. }
  318. void DBMirrorSet::onNetworkMemberDeauthorize(const void* db, uint64_t networkId, uint64_t memberId)
  319. {
  320. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  321. auto tracer = provider->GetTracer("db_mirror_set");
  322. auto span = tracer->StartSpan("db_mirror_set::onNetworkMemberDeauthorize");
  323. auto scope = tracer->WithActiveSpan(span);
  324. char networkIdStr[17];
  325. char memberIdStr[11];
  326. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  327. span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
  328. _listener->onNetworkMemberDeauthorize(this, networkId, memberId);
  329. }
  330. } // namespace ZeroTier