DBMirrorSet.cpp 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. /*
  2. * ZeroTier One - Network Virtualization Everywhere
  3. * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. * --
  19. *
  20. * You can be released from the requirements of the license by purchasing
  21. * a commercial license. Buying such a license is mandatory as soon as you
  22. * develop commercial closed-source software that incorporates or links
  23. * directly against ZeroTier software without disclosing the source code
  24. * of your own application.
  25. */
  26. #include "DBMirrorSet.hpp"
  27. namespace ZeroTier {
  28. DBMirrorSet::DBMirrorSet(DB::ChangeListener *listener) :
  29. _listener(listener),
  30. _running(true)
  31. {
  32. _syncCheckerThread = std::thread([this]() {
  33. for(;;) {
  34. for(int i=0;i<120;++i) { // 1 minute delay between checks
  35. if (!_running)
  36. return;
  37. std::this_thread::sleep_for(std::chrono::milliseconds(500));
  38. }
  39. std::vector< std::shared_ptr<DB> > dbs;
  40. {
  41. std::lock_guard<std::mutex> l(_dbs_l);
  42. if (_dbs.size() <= 1)
  43. continue; // no need to do this if there's only one DB, so skip the iteration
  44. dbs = _dbs;
  45. }
  46. for(auto db=dbs.begin();db!=dbs.end();++db) {
  47. (*db)->each([this,&dbs,&db](uint64_t networkId,const nlohmann::json &network,uint64_t memberId,const nlohmann::json &member) {
  48. if (memberId == 0) {
  49. for(auto db2=dbs.begin();db2!=dbs.end();++db2) {
  50. if (db->get() != db2->get()) {
  51. nlohmann::json nw2;
  52. if ((!(*db2)->get(networkId,nw2))||(OSUtils::jsonInt(nw2["revision"],0) < OSUtils::jsonInt(network["revision"],0))) {
  53. nw2 = network;
  54. (*db2)->save(nw2,false);
  55. }
  56. }
  57. }
  58. } else {
  59. for(auto db2=dbs.begin();db2!=dbs.end();++db2) {
  60. if (db->get() != db2->get()) {
  61. nlohmann::json nw2,m2;
  62. if ((!(*db2)->get(networkId,nw2,memberId,m2))||(OSUtils::jsonInt(nw2["revision"],0) < OSUtils::jsonInt(network["revision"],0))) {
  63. m2 = member;
  64. (*db2)->save(m2,false);
  65. }
  66. }
  67. }
  68. }
  69. });
  70. }
  71. }
  72. });
  73. }
  74. DBMirrorSet::~DBMirrorSet()
  75. {
  76. _running = false;
  77. _syncCheckerThread.join();
  78. }
  79. bool DBMirrorSet::hasNetwork(const uint64_t networkId) const
  80. {
  81. std::lock_guard<std::mutex> l(_dbs_l);
  82. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  83. if ((*d)->hasNetwork(networkId))
  84. return true;
  85. }
  86. return false;
  87. }
  88. bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network)
  89. {
  90. std::lock_guard<std::mutex> l(_dbs_l);
  91. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  92. if ((*d)->get(networkId,network)) {
  93. return true;
  94. }
  95. }
  96. return false;
  97. }
  98. bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member)
  99. {
  100. std::lock_guard<std::mutex> l(_dbs_l);
  101. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  102. if ((*d)->get(networkId,network,memberId,member))
  103. return true;
  104. }
  105. return false;
  106. }
  107. bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,DB::NetworkSummaryInfo &info)
  108. {
  109. std::lock_guard<std::mutex> l(_dbs_l);
  110. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  111. if ((*d)->get(networkId,network,memberId,member,info))
  112. return true;
  113. }
  114. return false;
  115. }
  116. bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members)
  117. {
  118. std::lock_guard<std::mutex> l(_dbs_l);
  119. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  120. if ((*d)->get(networkId,network,members))
  121. return true;
  122. }
  123. return false;
  124. }
  125. void DBMirrorSet::networks(std::set<uint64_t> &networks)
  126. {
  127. std::lock_guard<std::mutex> l(_dbs_l);
  128. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  129. (*d)->networks(networks);
  130. }
  131. }
  132. bool DBMirrorSet::waitForReady()
  133. {
  134. bool r = false;
  135. std::lock_guard<std::mutex> l(_dbs_l);
  136. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  137. r |= (*d)->waitForReady();
  138. }
  139. return r;
  140. }
  141. bool DBMirrorSet::isReady()
  142. {
  143. std::lock_guard<std::mutex> l(_dbs_l);
  144. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  145. if (!(*d)->isReady())
  146. return false;
  147. }
  148. return true;
  149. }
  150. bool DBMirrorSet::save(nlohmann::json &record,bool notifyListeners)
  151. {
  152. std::vector< std::shared_ptr<DB> > dbs;
  153. {
  154. std::lock_guard<std::mutex> l(_dbs_l);
  155. dbs = _dbs;
  156. }
  157. if (notifyListeners) {
  158. for(auto d=dbs.begin();d!=dbs.end();++d) {
  159. if ((*d)->save(record,true))
  160. return true;
  161. }
  162. return false;
  163. } else {
  164. bool modified = false;
  165. for(auto d=dbs.begin();d!=dbs.end();++d) {
  166. modified |= (*d)->save(record,false);
  167. }
  168. return modified;
  169. }
  170. }
  171. void DBMirrorSet::eraseNetwork(const uint64_t networkId)
  172. {
  173. std::lock_guard<std::mutex> l(_dbs_l);
  174. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  175. (*d)->eraseNetwork(networkId);
  176. }
  177. }
  178. void DBMirrorSet::eraseMember(const uint64_t networkId,const uint64_t memberId)
  179. {
  180. std::lock_guard<std::mutex> l(_dbs_l);
  181. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  182. (*d)->eraseMember(networkId,memberId);
  183. }
  184. }
  185. void DBMirrorSet::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress)
  186. {
  187. std::lock_guard<std::mutex> l(_dbs_l);
  188. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  189. (*d)->nodeIsOnline(networkId,memberId,physicalAddress);
  190. }
  191. }
  192. void DBMirrorSet::onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network)
  193. {
  194. nlohmann::json record(network);
  195. std::lock_guard<std::mutex> l(_dbs_l);
  196. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  197. if (d->get() != db) {
  198. (*d)->save(record,false);
  199. }
  200. }
  201. _listener->onNetworkUpdate(this,networkId,network);
  202. }
  203. void DBMirrorSet::onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
  204. {
  205. nlohmann::json record(member);
  206. std::lock_guard<std::mutex> l(_dbs_l);
  207. for(auto d=_dbs.begin();d!=_dbs.end();++d) {
  208. if (d->get() != db) {
  209. (*d)->save(record,false);
  210. }
  211. }
  212. _listener->onNetworkMemberUpdate(this,networkId,memberId,member);
  213. }
  214. void DBMirrorSet::onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId)
  215. {
  216. _listener->onNetworkMemberDeauthorize(this,networkId,memberId);
  217. }
  218. } // namespace ZeroTier