RethinkDB.cpp 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. #include "RethinkDB.hpp"
  2. #include <chrono>
  3. #include <algorithm>
  4. #include <stdexcept>
  5. #include "../ext/librethinkdbxx/build/include/rethinkdb.h"
  6. namespace R = RethinkDB;
  7. using nlohmann::json;
  8. namespace ZeroTier {
  9. RethinkDB::RethinkDB(const Address &myAddress,const char *host,const int port,const char *db,const char *auth) :
  10. _myAddress(myAddress),
  11. _host(host ? host : "127.0.0.1"),
  12. _db(db),
  13. _auth(auth ? auth : ""),
  14. _port((port > 0) ? port : 28015),
  15. _ready(2), // two tables need to be synchronized before we're ready
  16. _run(1)
  17. {
  18. _readyLock.lock();
  19. {
  20. char tmp[32];
  21. _myAddress.toString(tmp);
  22. _myAddressStr = tmp;
  23. }
  24. _membersDbWatcher = std::thread([this]() {
  25. while (_run == 1) {
  26. try {
  27. auto rdb = R::connect(this->_host,this->_port,this->_auth);
  28. if (rdb) {
  29. _membersDbWatcherConnection = (void *)rdb.get();
  30. auto cur = R::db(this->_db).table("Member").get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.1,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb);
  31. while (cur.has_next()) {
  32. if (_run != 1) break;
  33. json tmp(json::parse(cur.next().as_json()));
  34. if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) {
  35. if (--this->_ready == 0)
  36. this->_readyLock.unlock();
  37. } else {
  38. try {
  39. this->_memberChanged(tmp["old_val"],tmp["new_val"]);
  40. } catch ( ... ) {} // ignore bad records
  41. }
  42. }
  43. }
  44. } catch (std::exception &e) {
  45. fprintf(stderr,"ERROR: controller RethinkDB: %s" ZT_EOL_S,e.what());
  46. } catch (R::Error &e) {
  47. fprintf(stderr,"ERROR: controller RethinkDB: %s" ZT_EOL_S,e.message.c_str());
  48. } catch ( ... ) {
  49. fprintf(stderr,"ERROR: controller RethinkDB: unknown exception" ZT_EOL_S);
  50. }
  51. std::this_thread::sleep_for(std::chrono::milliseconds(250));
  52. }
  53. });
  54. _networksDbWatcher = std::thread([this]() {
  55. while (_run == 1) {
  56. try {
  57. auto rdb = R::connect(this->_host,this->_port,this->_auth);
  58. if (rdb) {
  59. _membersDbWatcherConnection = (void *)rdb.get();
  60. auto cur = R::db(this->_db).table("Network").get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.1,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb);
  61. while (cur.has_next()) {
  62. if (_run != 1) break;
  63. json tmp(json::parse(cur.next().as_json()));
  64. if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) {
  65. if (--this->_ready == 0)
  66. this->_readyLock.unlock();
  67. } else {
  68. try {
  69. this->_networkChanged(tmp["old_val"],tmp["new_val"]);
  70. } catch ( ... ) {} // ignore bad records
  71. }
  72. }
  73. }
  74. } catch (std::exception &e) {
  75. fprintf(stderr,"ERROR: controller RethinkDB: %s" ZT_EOL_S,e.what());
  76. } catch (R::Error &e) {
  77. fprintf(stderr,"ERROR: controller RethinkDB: %s" ZT_EOL_S,e.message.c_str());
  78. } catch ( ... ) {
  79. fprintf(stderr,"ERROR: controller RethinkDB: unknown exception" ZT_EOL_S);
  80. }
  81. std::this_thread::sleep_for(std::chrono::milliseconds(250));
  82. }
  83. });
  84. }
  85. RethinkDB::~RethinkDB()
  86. {
  87. // FIXME: not totally safe but will generally work, and only happens on shutdown anyway
  88. _run = 0;
  89. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  90. if (_membersDbWatcherConnection)
  91. ((R::Connection *)_membersDbWatcherConnection)->close();
  92. if (_networksDbWatcherConnection)
  93. ((R::Connection *)_networksDbWatcherConnection)->close();
  94. _membersDbWatcher.join();
  95. _networksDbWatcher.join();
  96. }
  97. inline bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network)
  98. {
  99. std::shared_ptr<_Network> nw;
  100. {
  101. std::lock_guard<std::mutex> l(_networks_l);
  102. auto nwi = _networks.find(networkId);
  103. if (nwi == _networks.end())
  104. return false;
  105. nw = nwi->second;
  106. }
  107. std::lock_guard<std::mutex> l2(nw->lock);
  108. network = nw->config;
  109. return true;
  110. }
  111. inline bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info)
  112. {
  113. std::shared_ptr<_Network> nw;
  114. {
  115. std::lock_guard<std::mutex> l(_networks_l);
  116. auto nwi = _networks.find(networkId);
  117. if (nwi == _networks.end())
  118. return false;
  119. nw = nwi->second;
  120. }
  121. std::lock_guard<std::mutex> l2(nw->lock);
  122. auto m = nw->members.find(memberId);
  123. if (m == nw->members.end())
  124. return false;
  125. network = nw->config;
  126. member = m->second;
  127. _fillSummaryInfo(nw,info);
  128. return true;
  129. }
  130. inline bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members)
  131. {
  132. std::shared_ptr<_Network> nw;
  133. {
  134. std::lock_guard<std::mutex> l(_networks_l);
  135. auto nwi = _networks.find(networkId);
  136. if (nwi == _networks.end())
  137. return false;
  138. nw = nwi->second;
  139. }
  140. std::lock_guard<std::mutex> l2(nw->lock);
  141. network = nw->config;
  142. for(auto m=nw->members.begin();m!=nw->members.end();++m)
  143. members.push_back(m->second);
  144. return true;
  145. }
  146. inline bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info)
  147. {
  148. std::shared_ptr<_Network> nw;
  149. {
  150. std::lock_guard<std::mutex> l(_networks_l);
  151. auto nwi = _networks.find(networkId);
  152. if (nwi == _networks.end())
  153. return false;
  154. nw = nwi->second;
  155. }
  156. std::lock_guard<std::mutex> l2(nw->lock);
  157. _fillSummaryInfo(nw,info);
  158. return true;
  159. }
  160. void RethinkDB::_memberChanged(nlohmann::json &old,nlohmann::json &member)
  161. {
  162. uint64_t memberId = 0;
  163. uint64_t networkId = 0;
  164. std::shared_ptr<_Network> nw;
  165. if (old.is_object()) {
  166. json &config = old["config"];
  167. if (config.is_object()) {
  168. memberId = OSUtils::jsonIntHex(config["id"],0ULL);
  169. networkId = OSUtils::jsonIntHex(config["nwid"],0ULL);
  170. if ((memberId)&&(networkId)) {
  171. {
  172. std::lock_guard<std::mutex> l(_networks_l);
  173. auto nw2 = _networks.find(networkId);
  174. if (nw2 != _networks.end())
  175. nw = nw2->second;
  176. }
  177. if (nw) {
  178. std::lock_guard<std::mutex> l(nw->lock);
  179. if (OSUtils::jsonBool(config["activeBridge"],false))
  180. nw->activeBridgeMembers.erase(memberId);
  181. if (OSUtils::jsonBool(config["authorized"],false))
  182. nw->authorizedMembers.erase(memberId);
  183. json &ips = config["ipAssignments"];
  184. if (ips.is_array()) {
  185. for(unsigned long i=0;i<ips.size();++i) {
  186. json &ipj = ips[i];
  187. if (ipj.is_string()) {
  188. const std::string ips = ipj;
  189. InetAddress ipa(ips.c_str());
  190. ipa.setPort(0);
  191. nw->allocatedIps.erase(ipa);
  192. }
  193. }
  194. }
  195. }
  196. }
  197. }
  198. }
  199. if (member.is_object()) {
  200. json &config = member["config"];
  201. if (config.is_object()) {
  202. if (!nw) {
  203. memberId = OSUtils::jsonIntHex(config["id"],0ULL);
  204. networkId = OSUtils::jsonIntHex(config["nwid"],0ULL);
  205. if ((!memberId)||(!networkId))
  206. return;
  207. std::lock_guard<std::mutex> l(_networks_l);
  208. std::shared_ptr<_Network> &nw2 = _networks[networkId];
  209. if (!nw2)
  210. nw2.reset(new _Network);
  211. nw = nw2;
  212. }
  213. std::lock_guard<std::mutex> l(nw->lock);
  214. nw->members[memberId] = config;
  215. if (OSUtils::jsonBool(config["activeBridge"],false))
  216. nw->activeBridgeMembers.insert(memberId);
  217. const bool isAuth = OSUtils::jsonBool(config["authorized"],false);
  218. if (isAuth)
  219. nw->authorizedMembers.insert(memberId);
  220. json &ips = config["ipAssignments"];
  221. if (ips.is_array()) {
  222. for(unsigned long i=0;i<ips.size();++i) {
  223. json &ipj = ips[i];
  224. if (ipj.is_string()) {
  225. const std::string ips = ipj;
  226. InetAddress ipa(ips.c_str());
  227. ipa.setPort(0);
  228. nw->allocatedIps.insert(ipa);
  229. }
  230. }
  231. }
  232. if (!isAuth) {
  233. const int64_t ldt = (int64_t)OSUtils::jsonInt(config["lastDeauthorizedTime"],0ULL);
  234. if (ldt > nw->mostRecentDeauthTime)
  235. nw->mostRecentDeauthTime = ldt;
  236. }
  237. }
  238. }
  239. }
  240. void RethinkDB::_networkChanged(nlohmann::json &old,nlohmann::json &network)
  241. {
  242. if (network.is_object()) {
  243. json &config = network["config"];
  244. if (config.is_object()) {
  245. const std::string ids = config["id"];
  246. const uint64_t id = Utils::hexStrToU64(ids.c_str());
  247. if (id) {
  248. std::shared_ptr<_Network> nw;
  249. {
  250. std::lock_guard<std::mutex> l(_networks_l);
  251. std::shared_ptr<_Network> &nw2 = _networks[id];
  252. if (!nw2)
  253. nw2.reset(new _Network);
  254. nw = nw2;
  255. }
  256. std::lock_guard<std::mutex> l2(nw->lock);
  257. nw->config = config;
  258. }
  259. }
  260. } else if (old.is_object()) {
  261. const std::string ids = old["id"];
  262. const uint64_t id = Utils::hexStrToU64(ids.c_str());
  263. if (id) {
  264. std::lock_guard<std::mutex> l(_networks_l);
  265. _networks.erase(id);
  266. }
  267. }
  268. }
  269. } // namespace ZeroTier
  270. /*
  271. int main(int argc,char **argv)
  272. {
  273. ZeroTier::RethinkDB db(ZeroTier::Address(0x8056c2e21cULL),"10.6.6.188",28015,"ztc","");
  274. db.waitForReady();
  275. printf("ready.\n");
  276. pause();
  277. }
  278. */