DBMirrorSet.cpp 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. /*
  2. * Copyright (c)2019 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2026-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #include "DBMirrorSet.hpp"
  14. namespace ZeroTier {
  15. DBMirrorSet::DBMirrorSet(DB::ChangeListener* listener) : _listener(listener), _running(true), _syncCheckerThread(), _dbs(), _dbs_l()
  16. {
  17. _syncCheckerThread = std::thread([this]() {
  18. for (;;) {
  19. for (int i = 0; i < 120; ++i) { // 1 minute delay between checks
  20. if (! _running)
  21. return;
  22. std::this_thread::sleep_for(std::chrono::milliseconds(500));
  23. }
  24. std::vector<std::shared_ptr<DB> > dbs;
  25. {
  26. std::unique_lock<std::shared_mutex> l(_dbs_l);
  27. if (_dbs.size() <= 1)
  28. continue; // no need to do this if there's only one DB, so skip the iteration
  29. dbs = _dbs;
  30. }
  31. for (auto db = dbs.begin(); db != dbs.end(); ++db) {
  32. (*db)->each([&dbs, &db](uint64_t networkId, const nlohmann::json& network, uint64_t memberId, const nlohmann::json& member) {
  33. try {
  34. if (network.is_object()) {
  35. if (memberId == 0) {
  36. for (auto db2 = dbs.begin(); db2 != dbs.end(); ++db2) {
  37. if (db->get() != db2->get()) {
  38. nlohmann::json nw2;
  39. if ((! (*db2)->get(networkId, nw2)) || ((nw2.is_object()) && (OSUtils::jsonInt(nw2["revision"], 0) < OSUtils::jsonInt(network["revision"], 0)))) {
  40. nw2 = network;
  41. (*db2)->save(nw2, false);
  42. }
  43. }
  44. }
  45. }
  46. else if (member.is_object()) {
  47. for (auto db2 = dbs.begin(); db2 != dbs.end(); ++db2) {
  48. if (db->get() != db2->get()) {
  49. nlohmann::json nw2, m2;
  50. if ((! (*db2)->get(networkId, nw2, memberId, m2)) || ((m2.is_object()) && (OSUtils::jsonInt(m2["revision"], 0) < OSUtils::jsonInt(member["revision"], 0)))) {
  51. m2 = member;
  52. (*db2)->save(m2, false);
  53. }
  54. }
  55. }
  56. }
  57. }
  58. }
  59. catch (...) {
  60. } // skip entries that generate JSON errors
  61. });
  62. }
  63. }
  64. });
  65. }
  66. DBMirrorSet::~DBMirrorSet()
  67. {
  68. _running = false;
  69. _syncCheckerThread.join();
  70. }
  71. bool DBMirrorSet::hasNetwork(const uint64_t networkId) const
  72. {
  73. std::shared_lock<std::shared_mutex> l(_dbs_l);
  74. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  75. if ((*d)->hasNetwork(networkId))
  76. return true;
  77. }
  78. return false;
  79. }
  80. bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network)
  81. {
  82. std::shared_lock<std::shared_mutex> l(_dbs_l);
  83. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  84. if ((*d)->get(networkId, network)) {
  85. return true;
  86. }
  87. }
  88. return false;
  89. }
  90. bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network, const uint64_t memberId, nlohmann::json& member)
  91. {
  92. std::shared_lock<std::shared_mutex> l(_dbs_l);
  93. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  94. if ((*d)->get(networkId, network, memberId, member))
  95. return true;
  96. }
  97. return false;
  98. }
  99. bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network, const uint64_t memberId, nlohmann::json& member, DB::NetworkSummaryInfo& info)
  100. {
  101. std::shared_lock<std::shared_mutex> l(_dbs_l);
  102. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  103. if ((*d)->get(networkId, network, memberId, member, info))
  104. return true;
  105. }
  106. return false;
  107. }
  108. bool DBMirrorSet::get(const uint64_t networkId, nlohmann::json& network, std::vector<nlohmann::json>& members)
  109. {
  110. std::shared_lock<std::shared_mutex> l(_dbs_l);
  111. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  112. if ((*d)->get(networkId, network, members))
  113. return true;
  114. }
  115. return false;
  116. }
  117. AuthInfo DBMirrorSet::getSSOAuthInfo(const nlohmann::json& member, const std::string& redirectURL)
  118. {
  119. std::shared_lock<std::shared_mutex> l(_dbs_l);
  120. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  121. AuthInfo info = (*d)->getSSOAuthInfo(member, redirectURL);
  122. if (info.enabled) {
  123. return info;
  124. }
  125. }
  126. return AuthInfo();
  127. }
  128. void DBMirrorSet::networks(std::set<uint64_t>& networks)
  129. {
  130. std::shared_lock<std::shared_mutex> l(_dbs_l);
  131. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  132. (*d)->networks(networks);
  133. }
  134. }
  135. bool DBMirrorSet::waitForReady()
  136. {
  137. bool r = false;
  138. std::shared_lock<std::shared_mutex> l(_dbs_l);
  139. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  140. r |= (*d)->waitForReady();
  141. }
  142. return r;
  143. }
  144. bool DBMirrorSet::isReady()
  145. {
  146. std::shared_lock<std::shared_mutex> l(_dbs_l);
  147. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  148. if (! (*d)->isReady())
  149. return false;
  150. }
  151. return true;
  152. }
  153. bool DBMirrorSet::save(nlohmann::json& record, bool notifyListeners)
  154. {
  155. std::vector<std::shared_ptr<DB> > dbs;
  156. {
  157. std::unique_lock<std::shared_mutex> l(_dbs_l);
  158. dbs = _dbs;
  159. }
  160. if (notifyListeners) {
  161. for (auto d = dbs.begin(); d != dbs.end(); ++d) {
  162. if ((*d)->save(record, true))
  163. return true;
  164. }
  165. return false;
  166. }
  167. else {
  168. bool modified = false;
  169. for (auto d = dbs.begin(); d != dbs.end(); ++d) {
  170. modified |= (*d)->save(record, false);
  171. }
  172. return modified;
  173. }
  174. }
  175. void DBMirrorSet::eraseNetwork(const uint64_t networkId)
  176. {
  177. std::unique_lock<std::shared_mutex> l(_dbs_l);
  178. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  179. (*d)->eraseNetwork(networkId);
  180. }
  181. }
  182. void DBMirrorSet::eraseMember(const uint64_t networkId, const uint64_t memberId)
  183. {
  184. std::unique_lock<std::shared_mutex> l(_dbs_l);
  185. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  186. (*d)->eraseMember(networkId, memberId);
  187. }
  188. }
  189. void DBMirrorSet::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress)
  190. {
  191. std::shared_lock<std::shared_mutex> l(_dbs_l);
  192. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  193. (*d)->nodeIsOnline(networkId, memberId, physicalAddress);
  194. }
  195. }
  196. void DBMirrorSet::onNetworkUpdate(const void* db, uint64_t networkId, const nlohmann::json& network)
  197. {
  198. nlohmann::json record(network);
  199. std::unique_lock<std::shared_mutex> l(_dbs_l);
  200. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  201. if (d->get() != db) {
  202. (*d)->save(record, false);
  203. }
  204. }
  205. _listener->onNetworkUpdate(this, networkId, network);
  206. }
  207. void DBMirrorSet::onNetworkMemberUpdate(const void* db, uint64_t networkId, uint64_t memberId, const nlohmann::json& member)
  208. {
  209. nlohmann::json record(member);
  210. std::unique_lock<std::shared_mutex> l(_dbs_l);
  211. for (auto d = _dbs.begin(); d != _dbs.end(); ++d) {
  212. if (d->get() != db) {
  213. (*d)->save(record, false);
  214. }
  215. }
  216. _listener->onNetworkMemberUpdate(this, networkId, memberId, member);
  217. }
  218. void DBMirrorSet::onNetworkMemberDeauthorize(const void* db, uint64_t networkId, uint64_t memberId)
  219. {
  220. _listener->onNetworkMemberDeauthorize(this, networkId, memberId);
  221. }
  222. } // namespace ZeroTier