2
0

DBMirrorSet.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. /* (c) ZeroTier, Inc.
  2. * See LICENSE.txt in nonfree/
  3. */
  4. #include "DBMirrorSet.hpp"
  5. #include "opentelemetry/trace/provider.h"
  6. namespace ZeroTier {
  7. DBMirrorSet::DBMirrorSet(DB::ChangeListener* listener) : _listener(listener), _running(true), _syncCheckerThread(), _dbs(), _dbs_l()
  8. {
  9. _syncCheckerThread = std::thread([this]() {
  10. for (;;) {
  11. for (int i = 0; i < 120; ++i) { // 1 minute delay between checks
  12. if (! _running)
  13. return;
  14. std::this_thread::sleep_for(std::chrono::milliseconds(500));
  15. }
  16. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  17. auto tracer = provider->GetTracer("db_mirror_set");
  18. auto span = tracer->StartSpan("db::syncChecker");
  19. auto scope = tracer->WithActiveSpan(span);
  20. std::vector<std::shared_ptr<DB> > dbs;
  21. {
  22. std::unique_lock<std::shared_mutex> l(_dbs_l);
  23. if (_dbs.size() <= 1)
  24. continue; // no need to do this if there's only one DB, so skip the iteration
  25. dbs = _dbs;
  26. }
  27. for (auto db = dbs.begin(); db != dbs.end(); ++db) {
  28. (*db)->each([&dbs, &db](uint64_t networkId, const nlohmann::json& network, uint64_t memberId, const nlohmann::json& member) {
  29. try {
  30. if (network.is_object()) {
  31. if (memberId == 0) {
  32. for (auto db2 = dbs.begin(); db2 != dbs.end(); ++db2) {
  33. if (db->get() != db2->get()) {
  34. nlohmann::json nw2;
  35. if ((! (*db2)->get(networkId, nw2)) || ((nw2.is_object()) && (OSUtils::jsonInt(nw2["revision"], 0) < OSUtils::jsonInt(network["revision"], 0)))) {
  36. nw2 = network;
  37. (*db2)->save(nw2, false);
  38. }
  39. }
  40. }
  41. }
  42. else if (member.is_object()) {
  43. for (auto db2 = dbs.begin(); db2 != dbs.end(); ++db2) {
  44. if (db->get() != db2->get()) {
  45. nlohmann::json nw2, m2;
  46. if ((! (*db2)->get(networkId, nw2, memberId, m2)) || ((m2.is_object()) && (OSUtils::jsonInt(m2["revision"], 0) < OSUtils::jsonInt(member["revision"], 0)))) {
  47. m2 = member;
  48. (*db2)->save(m2, false);
  49. }
  50. }
  51. }
  52. }
  53. }
  54. }
  55. catch (...) {
  56. } // skip entries that generate JSON errors
  57. });
  58. }
  59. }
  60. });
  61. }
  62. DBMirrorSet::~DBMirrorSet()
  63. {
  64. _running = false;
  65. _syncCheckerThread.join();
  66. }
  67. bool DBMirrorSet::hasNetwork(const uint64_t networkId) const
  68. {
  69. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  70. auto tracer = provider->GetTracer("db_mirror_set");
  71. auto span = tracer->StartSpan("db_mirror_set::hasNetwork");
  72. auto scope = tracer->WithActiveSpan(span);
  73. std::shared_lock<std::shared_mutex> l(_dbs_l);
  74. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  75. if ((*d)->hasNetwork(networkId))
  76. return true;
  77. }
  78. return false;
  79. }
  80. bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network)
  81. {
  82. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  83. auto tracer = provider->GetTracer("db_mirror_set");
  84. auto span = tracer->StartSpan("db_mirror_set::getNetwork");
  85. auto scope = tracer->WithActiveSpan(span);
  86. char networkIdStr[17];
  87. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  88. std::shared_lock<std::shared_mutex> l(_dbs_l);
  89. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  90. if ((*d)->get(networkId, network)) {
  91. return true;
  92. }
  93. }
  94. return false;
  95. }
  96. bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network, const uint64_t memberId, nlohmann::json& member)
  97. {
  98. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  99. auto tracer = provider->GetTracer("db_mirror_set");
  100. auto span = tracer->StartSpan("db_mirror_set::getNetworkAndMember");
  101. auto scope = tracer->WithActiveSpan(span);
  102. char networkIdStr[17];
  103. char memberIdStr[11];
  104. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  105. span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
  106. std::shared_lock<std::shared_mutex> l(_dbs_l);
  107. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  108. if ((*d)->get(networkId, network, memberId, member))
  109. return true;
  110. }
  111. return false;
  112. }
  113. bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network, const uint64_t memberId, nlohmann::json& member, DB::NetworkSummaryInfo& info)
  114. {
  115. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  116. auto tracer = provider->GetTracer("db_mirror_set");
  117. auto span = tracer->StartSpan("db_mirror_set::getNetworkAndMemberWithSummary");
  118. auto scope = tracer->WithActiveSpan(span);
  119. char networkIdStr[17];
  120. char memberIdStr[11];
  121. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  122. span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
  123. std::shared_lock<std::shared_mutex> l(_dbs_l);
  124. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  125. if ((*d)->get(networkId, network, memberId, member, info))
  126. return true;
  127. }
  128. return false;
  129. }
  130. bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network, std::vector<nlohmann::json>& members)
  131. {
  132. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  133. auto tracer = provider->GetTracer("db_mirror_set");
  134. auto span = tracer->StartSpan("db_mirror_set::getNetworkAndMembers");
  135. auto scope = tracer->WithActiveSpan(span);
  136. char networkIdStr[17];
  137. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  138. std::shared_lock<std::shared_mutex> l(_dbs_l);
  139. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  140. if ((*d)->get(networkId, network, members))
  141. return true;
  142. }
  143. return false;
  144. }
  145. AuthInfo DBMirrorSet::getSSOAuthInfo(const nlohmann::json& member, const std::string& redirectURL)
  146. {
  147. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  148. auto tracer = provider->GetTracer("db_mirror_set");
  149. auto span = tracer->StartSpan("db_mirror_set::getSSOAuthInfo");
  150. auto scope = tracer->WithActiveSpan(span);
  151. std::shared_lock<std::shared_mutex> l(_dbs_l);
  152. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  153. AuthInfo info = (*d)->getSSOAuthInfo(member, redirectURL);
  154. if (info.enabled) {
  155. return info;
  156. }
  157. }
  158. return AuthInfo();
  159. }
  160. void DBMirrorSet::networks(std::set<uint64_t>& networks)
  161. {
  162. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  163. auto tracer = provider->GetTracer("db_mirror_set");
  164. auto span = tracer->StartSpan("db_mirror_set::networks");
  165. auto scope = tracer->WithActiveSpan(span);
  166. std::shared_lock<std::shared_mutex> l(_dbs_l);
  167. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  168. (*d)->networks(networks);
  169. }
  170. }
  171. bool DBMirrorSet::waitForReady()
  172. {
  173. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  174. auto tracer = provider->GetTracer("db_mirror_set");
  175. auto span = tracer->StartSpan("db_mirror_set::waitForReady");
  176. auto scope = tracer->WithActiveSpan(span);
  177. bool r = false;
  178. std::shared_lock<std::shared_mutex> l(_dbs_l);
  179. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  180. r |= (*d)->waitForReady();
  181. }
  182. return r;
  183. }
  184. bool DBMirrorSet::isReady()
  185. {
  186. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  187. auto tracer = provider->GetTracer("db_mirror_set");
  188. auto span = tracer->StartSpan("db_mirror_set::isReady");
  189. auto scope = tracer->WithActiveSpan(span);
  190. std::shared_lock<std::shared_mutex> l(_dbs_l);
  191. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  192. if (! (*d)->isReady())
  193. return false;
  194. }
  195. return true;
  196. }
  197. bool DBMirrorSet::save(nlohmann::json& record, bool notifyListeners)
  198. {
  199. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  200. auto tracer = provider->GetTracer("db_mirror_set");
  201. auto span = tracer->StartSpan("db_mirror_set::save");
  202. auto scope = tracer->WithActiveSpan(span);
  203. std::vector<std::shared_ptr<DB> > dbs;
  204. {
  205. std::unique_lock<std::shared_mutex> l(_dbs_l);
  206. dbs = _dbs;
  207. }
  208. if (notifyListeners) {
  209. for (auto d = dbs.begin(); d != dbs.end(); ++d) {
  210. if ((*d)->save(record, true))
  211. return true;
  212. }
  213. return false;
  214. }
  215. else {
  216. bool modified = false;
  217. for (auto d = dbs.begin(); d != dbs.end(); ++d) {
  218. modified |= (*d)->save(record, false);
  219. }
  220. return modified;
  221. }
  222. }
  223. void DBMirrorSet::eraseNetwork(const uint64_t networkId)
  224. {
  225. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  226. auto tracer = provider->GetTracer("db_mirror_set");
  227. auto span = tracer->StartSpan("db_mirror_set::eraseNetwork");
  228. auto scope = tracer->WithActiveSpan(span);
  229. char networkIdStr[17];
  230. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  231. std::unique_lock<std::shared_mutex> l(_dbs_l);
  232. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  233. (*d)->eraseNetwork(networkId);
  234. }
  235. }
  236. void DBMirrorSet::eraseMember(const uint64_t networkId, const uint64_t memberId)
  237. {
  238. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  239. auto tracer = provider->GetTracer("db_mirror_set");
  240. auto span = tracer->StartSpan("db_mirror_set::eraseMember");
  241. auto scope = tracer->WithActiveSpan(span);
  242. char networkIdStr[17];
  243. char memberIdStr[11];
  244. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  245. span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
  246. std::unique_lock<std::shared_mutex> l(_dbs_l);
  247. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  248. (*d)->eraseMember(networkId, memberId);
  249. }
  250. }
  251. void DBMirrorSet::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress, const char* osArch)
  252. {
  253. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  254. auto tracer = provider->GetTracer("db_mirror_set");
  255. auto span = tracer->StartSpan("db_mirror_set::nodeIsOnline");
  256. auto scope = tracer->WithActiveSpan(span);
  257. char networkIdStr[17];
  258. char memberIdStr[11];
  259. char phyAddressStr[INET6_ADDRSTRLEN];
  260. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  261. span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
  262. span->SetAttribute("physical_address", physicalAddress.toString(phyAddressStr));
  263. span->SetAttribute("os_arch", osArch);
  264. std::shared_lock<std::shared_mutex> l(_dbs_l);
  265. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  266. (*d)->nodeIsOnline(networkId, memberId, physicalAddress, osArch);
  267. }
  268. }
  269. void DBMirrorSet::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress)
  270. {
  271. this->nodeIsOnline(networkId, memberId, physicalAddress, "unknown/unknown");
  272. }
  273. void DBMirrorSet::onNetworkUpdate(const void* db, uint64_t networkId, const nlohmann::json& network)
  274. {
  275. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  276. auto tracer = provider->GetTracer("db_mirror_set");
  277. auto span = tracer->StartSpan("db_mirror_set::onNetworkUpdate");
  278. auto scope = tracer->WithActiveSpan(span);
  279. char networkIdStr[17];
  280. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  281. nlohmann::json record(network);
  282. std::unique_lock<std::shared_mutex> l(_dbs_l);
  283. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  284. if (d->get() != db) {
  285. (*d)->save(record, false);
  286. }
  287. }
  288. _listener->onNetworkUpdate(this, networkId, network);
  289. }
  290. void DBMirrorSet::onNetworkMemberUpdate(const void* db, uint64_t networkId, uint64_t memberId, const nlohmann::json& member)
  291. {
  292. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  293. auto tracer = provider->GetTracer("db_mirror_set");
  294. auto span = tracer->StartSpan("db_mirror_set::onNetworkMemberUpdate");
  295. auto scope = tracer->WithActiveSpan(span);
  296. char networkIdStr[17];
  297. char memberIdStr[11];
  298. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  299. span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
  300. nlohmann::json record(member);
  301. std::unique_lock<std::shared_mutex> l(_dbs_l);
  302. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  303. if (d->get() != db) {
  304. (*d)->save(record, false);
  305. }
  306. }
  307. _listener->onNetworkMemberUpdate(this, networkId, memberId, member);
  308. }
  309. void DBMirrorSet::onNetworkMemberDeauthorize(const void* db, uint64_t networkId, uint64_t memberId)
  310. {
  311. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  312. auto tracer = provider->GetTracer("db_mirror_set");
  313. auto span = tracer->StartSpan("db_mirror_set::onNetworkMemberDeauthorize");
  314. auto scope = tracer->WithActiveSpan(span);
  315. char networkIdStr[17];
  316. char memberIdStr[11];
  317. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  318. span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
  319. _listener->onNetworkMemberDeauthorize(this, networkId, memberId);
  320. }
  321. } // namespace ZeroTier