DBMirrorSet.cpp 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. /*
  2. * ZeroTier One - Network Virtualization Everywhere
  3. * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. * --
  19. *
  20. * You can be released from the requirements of the license by purchasing
  21. * a commercial license. Buying such a license is mandatory as soon as you
  22. * develop commercial closed-source software that incorporates or links
  23. * directly against ZeroTier software without disclosing the source code
  24. * of your own application.
  25. */
  26. #include "DBMirrorSet.hpp"
  27. namespace ZeroTier {
  28. DBMirrorSet::DBMirrorSet(DB::ChangeListener *listener) :
  29. _listener(listener),
  30. _running(true)
  31. {
  32. _syncCheckerThread = std::thread([this]() {
  33. for(;;) {
  34. for(int i=0;i<120;++i) { // 1 minute delay between checks
  35. if (!_running)
  36. return;
  37. std::this_thread::sleep_for(std::chrono::milliseconds(500));
  38. }
  39. std::vector< std::shared_ptr<DB> > dbs;
  40. {
  41. std::lock_guard<std::mutex> l(_dbs_l);
  42. if (_dbs.size() <= 1)
  43. continue; // no need to do this if there's only one DB, so skip the iteration
  44. dbs = _dbs;
  45. }
  46. for(auto db=dbs.begin();db!=dbs.end();++db) {
  47. (*db)->each([&dbs,&db](uint64_t networkId,const nlohmann::json &network,uint64_t memberId,const nlohmann::json &member) {
  48. try {
  49. if (network.is_object()) {
  50. if (memberId == 0) {
  51. for(auto db2=dbs.begin();db2!=dbs.end();++db2) {
  52. if (db->get() != db2->get()) {
  53. nlohmann::json nw2;
  54. if ((!(*db2)->get(networkId,nw2))||((nw2.is_object())&&(OSUtils::jsonInt(nw2["revision"],0) < OSUtils::jsonInt(network["revision"],0)))) {
  55. nw2 = network;
  56. (*db2)->save(nw2,false);
  57. }
  58. }
  59. }
  60. } else if (member.is_object()) {
  61. for(auto db2=dbs.begin();db2!=dbs.end();++db2) {
  62. if (db->get() != db2->get()) {
  63. nlohmann::json nw2,m2;
  64. if ((!(*db2)->get(networkId,nw2,memberId,m2))||((m2.is_object())&&(OSUtils::jsonInt(m2["revision"],0) < OSUtils::jsonInt(member["revision"],0)))) {
  65. m2 = member;
  66. (*db2)->save(m2,false);
  67. }
  68. }
  69. }
  70. }
  71. }
  72. } catch ( ... ) {} // skip entries that generate JSON errors
  73. });
  74. }
  75. }
  76. });
  77. }
  78. DBMirrorSet::~DBMirrorSet()
  79. {
  80. _running = false;
  81. _syncCheckerThread.join();
  82. }
  83. bool DBMirrorSet::hasNetwork(const uint64_t networkId) const
  84. {
  85. std::lock_guard<std::mutex> l(_dbs_l);
  86. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  87. if ((*d)->hasNetwork(networkId))
  88. return true;
  89. }
  90. return false;
  91. }
  92. bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network)
  93. {
  94. std::lock_guard<std::mutex> l(_dbs_l);
  95. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  96. if ((*d)->get(networkId,network)) {
  97. return true;
  98. }
  99. }
  100. return false;
  101. }
  102. bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member)
  103. {
  104. std::lock_guard<std::mutex> l(_dbs_l);
  105. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  106. if ((*d)->get(networkId,network,memberId,member))
  107. return true;
  108. }
  109. return false;
  110. }
  111. bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,DB::NetworkSummaryInfo &info)
  112. {
  113. std::lock_guard<std::mutex> l(_dbs_l);
  114. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  115. if ((*d)->get(networkId,network,memberId,member,info))
  116. return true;
  117. }
  118. return false;
  119. }
  120. bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members)
  121. {
  122. std::lock_guard<std::mutex> l(_dbs_l);
  123. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  124. if ((*d)->get(networkId,network,members))
  125. return true;
  126. }
  127. return false;
  128. }
  129. void DBMirrorSet::networks(std::set<uint64_t> &networks)
  130. {
  131. std::lock_guard<std::mutex> l(_dbs_l);
  132. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  133. (*d)->networks(networks);
  134. }
  135. }
  136. bool DBMirrorSet::waitForReady()
  137. {
  138. bool r = false;
  139. std::lock_guard<std::mutex> l(_dbs_l);
  140. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  141. r |= (*d)->waitForReady();
  142. }
  143. return r;
  144. }
  145. bool DBMirrorSet::isReady()
  146. {
  147. std::lock_guard<std::mutex> l(_dbs_l);
  148. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  149. if (!(*d)->isReady())
  150. return false;
  151. }
  152. return true;
  153. }
  154. bool DBMirrorSet::save(nlohmann::json &record,bool notifyListeners)
  155. {
  156. std::vector< std::shared_ptr<DB> > dbs;
  157. {
  158. std::lock_guard<std::mutex> l(_dbs_l);
  159. dbs = _dbs;
  160. }
  161. if (notifyListeners) {
  162. for(auto d=dbs.begin();d!=dbs.end();++d) {
  163. if ((*d)->save(record,true))
  164. return true;
  165. }
  166. return false;
  167. } else {
  168. bool modified = false;
  169. for(auto d=dbs.begin();d!=dbs.end();++d) {
  170. modified |= (*d)->save(record,false);
  171. }
  172. return modified;
  173. }
  174. }
  175. void DBMirrorSet::eraseNetwork(const uint64_t networkId)
  176. {
  177. std::lock_guard<std::mutex> l(_dbs_l);
  178. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  179. (*d)->eraseNetwork(networkId);
  180. }
  181. }
  182. void DBMirrorSet::eraseMember(const uint64_t networkId,const uint64_t memberId)
  183. {
  184. std::lock_guard<std::mutex> l(_dbs_l);
  185. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  186. (*d)->eraseMember(networkId,memberId);
  187. }
  188. }
  189. void DBMirrorSet::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress)
  190. {
  191. std::lock_guard<std::mutex> l(_dbs_l);
  192. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  193. (*d)->nodeIsOnline(networkId,memberId,physicalAddress);
  194. }
  195. }
  196. void DBMirrorSet::onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network)
  197. {
  198. nlohmann::json record(network);
  199. std::lock_guard<std::mutex> l(_dbs_l);
  200. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  201. if (d->get() != db) {
  202. (*d)->save(record,false);
  203. }
  204. }
  205. _listener->onNetworkUpdate(this,networkId,network);
  206. }
  207. void DBMirrorSet::onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
  208. {
  209. nlohmann::json record(member);
  210. std::lock_guard<std::mutex> l(_dbs_l);
  211. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  212. if (d->get() != db) {
  213. (*d)->save(record,false);
  214. }
  215. }
  216. _listener->onNetworkMemberUpdate(this,networkId,memberId,member);
  217. }
  218. void DBMirrorSet::onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId)
  219. {
  220. _listener->onNetworkMemberDeauthorize(this,networkId,memberId);
  221. }
  222. } // namespace ZeroTier