PostgreSQL.cpp 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452
  1. /*
  2. * ZeroTier One - Network Virtualization Everywhere
  3. * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. * --
  19. *
  20. * You can be released from the requirements of the license by purchasing
  21. * a commercial license. Buying such a license is mandatory as soon as you
  22. * develop commercial closed-source software that incorporates or links
  23. * directly against ZeroTier software without disclosing the source code
  24. * of your own application.
  25. */
  26. #ifdef ZT_CONTROLLER_USE_LIBPQ
  27. #include "PostgreSQL.hpp"
  28. #include "EmbeddedNetworkController.hpp"
  29. #include "RabbitMQ.hpp"
  30. #include "../version.h"
  31. #include <libpq-fe.h>
  32. #include <sstream>
  33. #include <amqp.h>
  34. #include <amqp_tcp_socket.h>
  35. using json = nlohmann::json;
  36. namespace {
  37. static const char *_timestr()
  38. {
  39. time_t t = time(0);
  40. char *ts = ctime(&t);
  41. char *p = ts;
  42. if (!p)
  43. return "";
  44. while (*p) {
  45. if (*p == '\n') {
  46. *p = (char)0;
  47. break;
  48. }
  49. ++p;
  50. }
  51. return ts;
  52. }
  53. std::string join(const std::vector<std::string> &elements, const char * const separator)
  54. {
  55. switch(elements.size()) {
  56. case 0:
  57. return "";
  58. case 1:
  59. return elements[0];
  60. default:
  61. std::ostringstream os;
  62. std::copy(elements.begin(), elements.end()-1, std::ostream_iterator<std::string>(os, separator));
  63. os << *elements.rbegin();
  64. return os.str();
  65. }
  66. }
  67. }
  68. using namespace ZeroTier;
  69. PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
  70. : DB(nc, myId, path)
  71. , _ready(0)
  72. , _connected(1)
  73. , _run(1)
  74. , _waitNoticePrinted(false)
  75. , _listenPort(listenPort)
  76. , _mqc(mqc)
  77. {
  78. _connString = std::string(path) + " application_name=controller_" +_myAddressStr;
  79. _readyLock.lock();
  80. _heartbeatThread = std::thread(&PostgreSQL::heartbeat, this);
  81. _membersDbWatcher = std::thread(&PostgreSQL::membersDbWatcher, this);
  82. _networksDbWatcher = std::thread(&PostgreSQL::networksDbWatcher, this);
  83. for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) {
  84. _commitThread[i] = std::thread(&PostgreSQL::commitThread, this);
  85. }
  86. _onlineNotificationThread = std::thread(&PostgreSQL::onlineNotificationThread, this);
  87. }
  88. PostgreSQL::~PostgreSQL()
  89. {
  90. _run = 0;
  91. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  92. _heartbeatThread.join();
  93. _membersDbWatcher.join();
  94. _networksDbWatcher.join();
  95. for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) {
  96. _commitThread[i].join();
  97. }
  98. _onlineNotificationThread.join();
  99. }
  100. bool PostgreSQL::waitForReady()
  101. {
  102. while (_ready < 2) {
  103. if (!_waitNoticePrinted) {
  104. _waitNoticePrinted = true;
  105. fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt());
  106. }
  107. _readyLock.lock();
  108. _readyLock.unlock();
  109. }
  110. return true;
  111. }
  112. bool PostgreSQL::isReady()
  113. {
  114. return ((_ready == 2)&&(_connected));
  115. }
  116. void PostgreSQL::save(nlohmann::json *orig, nlohmann::json &record)
  117. {
  118. try {
  119. if (!record.is_object()) {
  120. return;
  121. }
  122. waitForReady();
  123. if (orig) {
  124. if (*orig != record) {
  125. record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
  126. _commitQueue.post(new nlohmann::json(record));
  127. }
  128. } else {
  129. record["revision"] = 1;
  130. _commitQueue.post(new nlohmann::json(record));
  131. }
  132. } catch (std::exception &e) {
  133. fprintf(stderr, "Error on PostgreSQL::save: %s\n", e.what());
  134. } catch (...) {
  135. fprintf(stderr, "Unknown error on PostgreSQL::save\n");
  136. }
  137. }
  138. void PostgreSQL::eraseNetwork(const uint64_t networkId)
  139. {
  140. char tmp2[24];
  141. waitForReady();
  142. Utils::hex(networkId, tmp2);
  143. json *tmp = new json();
  144. (*tmp)["id"] = tmp2;
  145. (*tmp)["objtype"] = "_delete_network";
  146. _commitQueue.post(tmp);
  147. }
  148. void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
  149. {
  150. char tmp2[24];
  151. json *tmp = new json();
  152. Utils::hex(networkId, tmp2);
  153. (*tmp)["nwid"] = tmp2;
  154. Utils::hex(memberId, tmp2);
  155. (*tmp)["id"] = tmp2;
  156. (*tmp)["objtype"] = "_delete_member";
  157. _commitQueue.post(tmp);
  158. }
  159. void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress)
  160. {
  161. std::lock_guard<std::mutex> l(_lastOnline_l);
  162. std::pair<int64_t, InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId, memberId)];
  163. i.first = OSUtils::now();
  164. if (physicalAddress) {
  165. i.second = physicalAddress;
  166. }
  167. }
  168. void PostgreSQL::initializeNetworks(PGconn *conn)
  169. {
  170. try {
  171. if (PQstatus(conn) != CONNECTION_OK) {
  172. fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
  173. exit(1);
  174. }
  175. const char *params[1] = {
  176. _myAddressStr.c_str()
  177. };
  178. PGresult *res = PQexecParams(conn, "SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, "
  179. "enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, "
  180. "remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network "
  181. "WHERE deleted = false AND controller_id = $1",
  182. 1,
  183. NULL,
  184. params,
  185. NULL,
  186. NULL,
  187. 0);
  188. if (PQresultStatus(res) != PGRES_TUPLES_OK) {
  189. fprintf(stderr, "Networks Initialization Failed: %s", PQerrorMessage(conn));
  190. PQclear(res);
  191. exit(1);
  192. }
  193. int numRows = PQntuples(res);
  194. for (int i = 0; i < numRows; ++i) {
  195. json empty;
  196. json config;
  197. const char *nwidparam[1] = {
  198. PQgetvalue(res, i, 0)
  199. };
  200. config["id"] = PQgetvalue(res, i, 0);
  201. config["nwid"] = PQgetvalue(res, i, 0);
  202. try {
  203. config["creationTime"] = std::stoull(PQgetvalue(res, i, 1));
  204. } catch (std::exception &e) {
  205. config["creationTime"] = 0ULL;
  206. //fprintf(stderr, "Error converting creation time: %s\n", PQgetvalue(res, i, 1));
  207. }
  208. config["capabilities"] = json::parse(PQgetvalue(res, i, 2));
  209. config["enableBroadcast"] = (strcmp(PQgetvalue(res, i, 3),"t")==0);
  210. try {
  211. config["lastModified"] = std::stoull(PQgetvalue(res, i, 4));
  212. } catch (std::exception &e) {
  213. config["lastModified"] = 0ULL;
  214. //fprintf(stderr, "Error converting last modified: %s\n", PQgetvalue(res, i, 4));
  215. }
  216. try {
  217. config["mtu"] = std::stoi(PQgetvalue(res, i, 5));
  218. } catch (std::exception &e) {
  219. config["mtu"] = 2800;
  220. }
  221. try {
  222. config["multicastLimit"] = std::stoi(PQgetvalue(res, i, 6));
  223. } catch (std::exception &e) {
  224. config["multicastLimit"] = 64;
  225. }
  226. config["name"] = PQgetvalue(res, i, 7);
  227. config["private"] = (strcmp(PQgetvalue(res, i, 8),"t")==0);
  228. try {
  229. config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9));
  230. } catch (std::exception &e) {
  231. config["remoteTraceLevel"] = 0;
  232. }
  233. config["remoteTraceTarget"] = PQgetvalue(res, i, 10);
  234. try {
  235. config["revision"] = std::stoull(PQgetvalue(res, i, 11));
  236. } catch (std::exception &e) {
  237. config["revision"] = 0ULL;
  238. //fprintf(stderr, "Error converting revision: %s\n", PQgetvalue(res, i, 11));
  239. }
  240. config["rules"] = json::parse(PQgetvalue(res, i, 12));
  241. config["tags"] = json::parse(PQgetvalue(res, i, 13));
  242. config["v4AssignMode"] = json::parse(PQgetvalue(res, i, 14));
  243. config["v6AssignMode"] = json::parse(PQgetvalue(res, i, 15));
  244. config["objtype"] = "network";
  245. config["ipAssignmentPools"] = json::array();
  246. config["routes"] = json::array();
  247. PGresult *r2 = PQexecParams(conn,
  248. "SELECT host(ip_range_start), host(ip_range_end) FROM ztc_network_assignment_pool WHERE network_id = $1",
  249. 1,
  250. NULL,
  251. nwidparam,
  252. NULL,
  253. NULL,
  254. 0);
  255. if (PQresultStatus(r2) != PGRES_TUPLES_OK) {
  256. fprintf(stderr, "ERROR: Error retreiving IP pools for network: %s\n", PQresultErrorMessage(r2));
  257. PQclear(r2);
  258. PQclear(res);
  259. exit(1);
  260. }
  261. int n = PQntuples(r2);
  262. for (int j = 0; j < n; ++j) {
  263. json ip;
  264. ip["ipRangeStart"] = PQgetvalue(r2, j, 0);
  265. ip["ipRangeEnd"] = PQgetvalue(r2, j, 1);
  266. config["ipAssignmentPools"].push_back(ip);
  267. }
  268. PQclear(r2);
  269. r2 = PQexecParams(conn,
  270. "SELECT host(address), bits, host(via) FROM ztc_network_route WHERE network_id = $1",
  271. 1,
  272. NULL,
  273. nwidparam,
  274. NULL,
  275. NULL,
  276. 0);
  277. if (PQresultStatus(r2) != PGRES_TUPLES_OK) {
  278. fprintf(stderr, "ERROR: Error retreiving routes for network: %s\n", PQresultErrorMessage(r2));
  279. PQclear(r2);
  280. PQclear(res);
  281. exit(1);
  282. }
  283. n = PQntuples(r2);
  284. for (int j = 0; j < n; ++j) {
  285. std::string addr = PQgetvalue(r2, j, 0);
  286. std::string bits = PQgetvalue(r2, j, 1);
  287. std::string via = PQgetvalue(r2, j, 2);
  288. json route;
  289. route["target"] = addr + "/" + bits;
  290. if (via == "NULL") {
  291. route["via"] = nullptr;
  292. } else {
  293. route["via"] = via;
  294. }
  295. config["routes"].push_back(route);
  296. }
  297. PQclear(r2);
  298. _networkChanged(empty, config, false);
  299. }
  300. PQclear(res);
  301. if (++this->_ready == 2) {
  302. if (_waitNoticePrinted) {
  303. fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
  304. }
  305. _readyLock.unlock();
  306. }
  307. } catch (std::exception &e) {
  308. fprintf(stderr, "ERROR: Error initializing networks: %s", e.what());
  309. exit(-1);
  310. }
  311. }
  312. void PostgreSQL::initializeMembers(PGconn *conn)
  313. {
  314. try {
  315. if (PQstatus(conn) != CONNECTION_OK) {
  316. fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
  317. exit(1);
  318. }
  319. const char *params[1] = {
  320. _myAddressStr.c_str()
  321. };
  322. PGresult *res = PQexecParams(conn,
  323. "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, "
  324. " EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, "
  325. " EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000, "
  326. " m.remote_trace_level, m.remote_trace_target, m.tags, m.v_major, m.v_minor, m.v_rev, m.v_proto, "
  327. " m.no_auto_assign_ips, m.revision "
  328. "FROM ztc_member m "
  329. "INNER JOIN ztc_network n "
  330. " ON n.id = m.network_id "
  331. "WHERE n.controller_id = $1 AND m.deleted = false",
  332. 1,
  333. NULL,
  334. params,
  335. NULL,
  336. NULL,
  337. 0);
  338. if (PQresultStatus(res) != PGRES_TUPLES_OK) {
  339. fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn));
  340. PQclear(res);
  341. exit(1);
  342. }
  343. int numRows = PQntuples(res);
  344. for (int i = 0; i < numRows; ++i) {
  345. json empty;
  346. json config;
  347. std::string memberId(PQgetvalue(res, i, 0));
  348. std::string networkId(PQgetvalue(res, i, 1));
  349. std::string ctime = PQgetvalue(res, i, 5);
  350. config["id"] = memberId;
  351. config["nwid"] = networkId;
  352. config["activeBridge"] = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
  353. config["authorized"] = (strcmp(PQgetvalue(res, i, 3), "t") == 0);
  354. try {
  355. config["capabilities"] = json::parse(PQgetvalue(res, i, 4));
  356. } catch (std::exception &e) {
  357. config["capabilities"] = json::array();
  358. }
  359. try {
  360. config["creationTime"] = std::stoull(PQgetvalue(res, i, 5));
  361. } catch (std::exception &e) {
  362. config["creationTime"] = 0ULL;
  363. //fprintf(stderr, "Error upding creation time (member): %s\n", PQgetvalue(res, i, 5));
  364. }
  365. config["identity"] = PQgetvalue(res, i, 6);
  366. try {
  367. config["lastAuthorizedTime"] = std::stoull(PQgetvalue(res, i, 7));
  368. } catch(std::exception &e) {
  369. config["lastAuthorizedTime"] = 0ULL;
  370. //fprintf(stderr, "Error updating last auth time (member): %s\n", PQgetvalue(res, i, 7));
  371. }
  372. try {
  373. config["lastDeauthorizedTime"] = std::stoull(PQgetvalue(res, i, 8));
  374. } catch( std::exception &e) {
  375. config["lastDeauthorizedTime"] = 0ULL;
  376. //fprintf(stderr, "Error updating last deauth time (member): %s\n", PQgetvalue(res, i, 8));
  377. }
  378. try {
  379. config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9));
  380. } catch (std::exception &e) {
  381. config["remoteTraceLevel"] = 0;
  382. }
  383. config["remoteTraceTarget"] = PQgetvalue(res, i, 10);
  384. try {
  385. config["tags"] = json::parse(PQgetvalue(res, i, 11));
  386. } catch (std::exception &e) {
  387. config["tags"] = json::array();
  388. }
  389. try {
  390. config["vMajor"] = std::stoi(PQgetvalue(res, i, 12));
  391. } catch(std::exception &e) {
  392. config["vMajor"] = -1;
  393. }
  394. try {
  395. config["vMinor"] = std::stoi(PQgetvalue(res, i, 13));
  396. } catch (std::exception &e) {
  397. config["vMinor"] = -1;
  398. }
  399. try {
  400. config["vRev"] = std::stoi(PQgetvalue(res, i, 14));
  401. } catch (std::exception &e) {
  402. config["vRev"] = -1;
  403. }
  404. try {
  405. config["vProto"] = std::stoi(PQgetvalue(res, i, 15));
  406. } catch (std::exception &e) {
  407. config["vProto"] = -1;
  408. }
  409. config["noAutoAssignIps"] = (strcmp(PQgetvalue(res, i, 16), "t") == 0);
  410. try {
  411. config["revision"] = std::stoull(PQgetvalue(res, i, 17));
  412. } catch (std::exception &e) {
  413. config["revision"] = 0ULL;
  414. //fprintf(stderr, "Error updating revision (member): %s\n", PQgetvalue(res, i, 17));
  415. }
  416. config["objtype"] = "member";
  417. config["ipAssignments"] = json::array();
  418. const char *p2[2] = {
  419. memberId.c_str(),
  420. networkId.c_str()
  421. };
  422. PGresult *r2 = PQexecParams(conn,
  423. "SELECT DISTINCT address FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
  424. 2,
  425. NULL,
  426. p2,
  427. NULL,
  428. NULL,
  429. 0);
  430. if (PQresultStatus(r2) != PGRES_TUPLES_OK) {
  431. fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn));
  432. PQclear(r2);
  433. PQclear(res);
  434. exit(1);
  435. }
  436. int n = PQntuples(r2);
  437. for (int j = 0; j < n; ++j) {
  438. config["ipAssignments"].push_back(PQgetvalue(r2, j, 0));
  439. }
  440. _memberChanged(empty, config, false);
  441. }
  442. PQclear(res);
  443. if (++this->_ready == 2) {
  444. if (_waitNoticePrinted) {
  445. fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
  446. }
  447. _readyLock.unlock();
  448. }
  449. } catch (std::exception &e) {
  450. fprintf(stderr, "ERROR: Error initializing members: %s\n", e.what());
  451. exit(-1);
  452. }
  453. }
  454. void PostgreSQL::heartbeat()
  455. {
  456. char publicId[1024];
  457. char hostnameTmp[1024];
  458. _myId.toString(false,publicId);
  459. if (gethostname(hostnameTmp, sizeof(hostnameTmp))!= 0) {
  460. hostnameTmp[0] = (char)0;
  461. } else {
  462. for (int i = 0; i < sizeof(hostnameTmp); ++i) {
  463. if ((hostnameTmp[i] == '.')||(hostnameTmp[i] == 0)) {
  464. hostnameTmp[i] = (char)0;
  465. break;
  466. }
  467. }
  468. }
  469. const char *controllerId = _myAddressStr.c_str();
  470. const char *publicIdentity = publicId;
  471. const char *hostname = hostnameTmp;
  472. PGconn *conn = getPgConn();
  473. if (PQstatus(conn) == CONNECTION_BAD) {
  474. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  475. PQfinish(conn);
  476. exit(1);
  477. }
  478. while (_run == 1) {
  479. if(PQstatus(conn) != CONNECTION_OK) {
  480. fprintf(stderr, "%s heartbeat thread lost connection to Database\n", _myAddressStr.c_str());
  481. PQfinish(conn);
  482. exit(6);
  483. }
  484. if (conn) {
  485. std::string major = std::to_string(ZEROTIER_ONE_VERSION_MAJOR);
  486. std::string minor = std::to_string(ZEROTIER_ONE_VERSION_MINOR);
  487. std::string rev = std::to_string(ZEROTIER_ONE_VERSION_REVISION);
  488. std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD);
  489. std::string now = std::to_string(OSUtils::now());
  490. std::string host_port = std::to_string(_listenPort);
  491. const char *values[9] = {
  492. controllerId,
  493. hostname,
  494. now.c_str(),
  495. publicIdentity,
  496. major.c_str(),
  497. minor.c_str(),
  498. rev.c_str(),
  499. build.c_str(),
  500. host_port.c_str()
  501. };
  502. PGresult *res = PQexecParams(conn,
  503. "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port) "
  504. "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9) "
  505. "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
  506. "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
  507. "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port",
  508. 9, // number of parameters
  509. NULL, // oid field. ignore
  510. values, // values for substitution
  511. NULL, // lengths in bytes of each value
  512. NULL, // binary?
  513. 0);
  514. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  515. fprintf(stderr, "Heartbeat Update Failed: %s\n", PQresultErrorMessage(res));
  516. }
  517. PQclear(res);
  518. }
  519. std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  520. }
  521. PQfinish(conn);
  522. conn = NULL;
  523. }
  524. void PostgreSQL::membersDbWatcher()
  525. {
  526. PGconn *conn = getPgConn(NO_OVERRIDE);
  527. if (PQstatus(conn) == CONNECTION_BAD) {
  528. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  529. PQfinish(conn);
  530. exit(1);
  531. }
  532. initializeMembers(conn);
  533. char buf[11] = {0};
  534. std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf));
  535. PGresult *res = PQexec(conn, cmd.c_str());
  536. if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
  537. fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res));
  538. PQclear(res);
  539. PQfinish(conn);
  540. exit(1);
  541. }
  542. PQclear(res); res = NULL;
  543. if (this->_mqc != NULL) {
  544. _membersWatcher_RabbitMQ();
  545. } else {
  546. _membersWatcher_Postgres(conn);
  547. PQfinish(conn);
  548. conn = NULL;
  549. }
  550. if (_run == 1) {
  551. fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
  552. exit(9);
  553. }
  554. }
  555. void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
  556. while(_run == 1) {
  557. if (PQstatus(conn) != CONNECTION_OK) {
  558. fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres.");
  559. exit(-1);
  560. }
  561. PGnotify *notify = NULL;
  562. PQconsumeInput(conn);
  563. while ((notify = PQnotifies(conn)) != NULL) {
  564. //fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra);
  565. try {
  566. json tmp(json::parse(notify->extra));
  567. json &ov = tmp["old_val"];
  568. json &nv = tmp["new_val"];
  569. json oldConfig, newConfig;
  570. if (ov.is_object()) oldConfig = ov;
  571. if (nv.is_object()) newConfig = nv;
  572. if (oldConfig.is_object() || newConfig.is_object()) {
  573. _memberChanged(oldConfig,newConfig,(this->_ready>=2));
  574. }
  575. } catch (...) {} // ignore bad records
  576. free(notify);
  577. }
  578. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  579. }
  580. }
  581. void PostgreSQL::_membersWatcher_RabbitMQ() {
  582. char buf[11] = {0};
  583. std::string qname = "member_"+ std::string(_myAddress.toString(buf));
  584. RabbitMQ rmq(_mqc, qname.c_str());
  585. try {
  586. rmq.init();
  587. } catch (std::runtime_error &e) {
  588. fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
  589. exit(11);
  590. }
  591. while (_run == 1) {
  592. try {
  593. std::string msg = rmq.consume();
  594. json tmp(json::parse(msg));
  595. json &ov = tmp["old_val"];
  596. json &nv = tmp["new_val"];
  597. json oldConfig, newConfig;
  598. if (ov.is_object()) oldConfig = ov;
  599. if (nv.is_object()) newConfig = nv;
  600. if (oldConfig.is_object() || newConfig.is_object()) {
  601. _memberChanged(oldConfig,newConfig,(this->_ready>=2));
  602. }
  603. } catch (std::runtime_error &e) {
  604. fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
  605. break;
  606. } catch(...) {}
  607. }
  608. }
  609. void PostgreSQL::networksDbWatcher()
  610. {
  611. PGconn *conn = getPgConn(NO_OVERRIDE);
  612. if (PQstatus(conn) == CONNECTION_BAD) {
  613. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  614. PQfinish(conn);
  615. exit(1);
  616. }
  617. initializeNetworks(conn);
  618. char buf[11] = {0};
  619. std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf));
  620. PGresult *res = PQexec(conn, cmd.c_str());
  621. if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
  622. fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res));
  623. PQclear(res);
  624. PQfinish(conn);
  625. exit(1);
  626. }
  627. PQclear(res); res = NULL;
  628. if (this->_mqc != NULL) {
  629. _networksWatcher_RabbitMQ();
  630. } else {
  631. _networksWatcher_Postgres(conn);
  632. PQfinish(conn);
  633. conn = NULL;
  634. }
  635. if (_run == 1) {
  636. fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
  637. exit(8);
  638. }
  639. }
  640. void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) {
  641. while(_run == 1) {
  642. if (PQstatus(conn) != CONNECTION_OK) {
  643. fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres.");
  644. exit(-1);
  645. }
  646. PGnotify *notify = NULL;
  647. PQconsumeInput(conn);
  648. while ((notify = PQnotifies(conn)) != NULL) {
  649. //fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra);
  650. try {
  651. json tmp(json::parse(notify->extra));
  652. json &ov = tmp["old_val"];
  653. json &nv = tmp["new_val"];
  654. json oldConfig, newConfig;
  655. if (ov.is_object()) oldConfig = ov;
  656. if (nv.is_object()) newConfig = nv;
  657. if (oldConfig.is_object()||newConfig.is_object()) {
  658. _networkChanged(oldConfig,newConfig,(this->_ready >= 2));
  659. }
  660. } catch (...) {} // ignore bad records
  661. free(notify);
  662. }
  663. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  664. }
  665. }
  666. void PostgreSQL::_networksWatcher_RabbitMQ() {
  667. char buf[11] = {0};
  668. std::string qname = "network_"+ std::string(_myAddress.toString(buf));
  669. RabbitMQ rmq(_mqc, qname.c_str());
  670. try {
  671. rmq.init();
  672. } catch (std::runtime_error &e) {
  673. fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
  674. exit(11);
  675. }
  676. while (_run == 1) {
  677. try {
  678. std::string msg = rmq.consume();
  679. json tmp(json::parse(msg));
  680. json &ov = tmp["old_val"];
  681. json &nv = tmp["new_val"];
  682. json oldConfig, newConfig;
  683. if (ov.is_object()) oldConfig = ov;
  684. if (nv.is_object()) newConfig = nv;
  685. if (oldConfig.is_object()||newConfig.is_object()) {
  686. _networkChanged(oldConfig,newConfig,(this->_ready >= 2));
  687. }
  688. } catch (std::runtime_error &e) {
  689. fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
  690. break;
  691. } catch(...) {}
  692. }
  693. }
  694. void PostgreSQL::commitThread()
  695. {
  696. PGconn *conn = getPgConn();
  697. if (PQstatus(conn) == CONNECTION_BAD) {
  698. fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn));
  699. PQfinish(conn);
  700. exit(1);
  701. }
  702. json *config = nullptr;
  703. while(_commitQueue.get(config)&(_run == 1)) {
  704. if (!config) {
  705. continue;
  706. }
  707. if (PQstatus(conn) == CONNECTION_BAD) {
  708. fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn));
  709. PQfinish(conn);
  710. delete config;
  711. exit(1);
  712. }
  713. try {
  714. const std::string objtype = (*config)["objtype"];
  715. if (objtype == "member") {
  716. try {
  717. std::string memberId = (*config)["id"];
  718. std::string networkId = (*config)["nwid"];
  719. std::string identity = (*config)["identity"];
  720. std::string target = "NULL";
  721. if (!(*config)["remoteTraceTarget"].is_null()) {
  722. target = (*config)["remoteTraceTarget"];
  723. }
  724. std::string caps = OSUtils::jsonDump((*config)["capabilities"], -1);
  725. std::string lastAuthTime = std::to_string((long long)(*config)["lastAuthorizedTime"]);
  726. std::string lastDeauthTime = std::to_string((long long)(*config)["lastDeauthorizedTime"]);
  727. std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]);
  728. std::string rev = std::to_string((unsigned long long)(*config)["revision"]);
  729. std::string tags = OSUtils::jsonDump((*config)["tags"], -1);
  730. std::string vmajor = std::to_string((int)(*config)["vMajor"]);
  731. std::string vminor = std::to_string((int)(*config)["vMinor"]);
  732. std::string vrev = std::to_string((int)(*config)["vRev"]);
  733. std::string vproto = std::to_string((int)(*config)["vProto"]);
  734. const char *values[19] = {
  735. memberId.c_str(),
  736. networkId.c_str(),
  737. ((*config)["activeBridge"] ? "true" : "false"),
  738. ((*config)["authorized"] ? "true" : "false"),
  739. caps.c_str(),
  740. identity.c_str(),
  741. lastAuthTime.c_str(),
  742. lastDeauthTime.c_str(),
  743. ((*config)["noAutoAssignIps"] ? "true" : "false"),
  744. rtraceLevel.c_str(),
  745. (target == "NULL") ? NULL : target.c_str(),
  746. rev.c_str(),
  747. tags.c_str(),
  748. vmajor.c_str(),
  749. vminor.c_str(),
  750. vrev.c_str(),
  751. vproto.c_str()
  752. };
  753. PGresult *res = PQexecParams(conn,
  754. "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
  755. "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
  756. "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
  757. "VALUES ($1, $2, $3, $4, $5, $6, "
  758. "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
  759. "$9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET "
  760. "active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, "
  761. "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, "
  762. "last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, "
  763. "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, "
  764. "revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, "
  765. "v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto",
  766. 17,
  767. NULL,
  768. values,
  769. NULL,
  770. NULL,
  771. 0);
  772. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  773. fprintf(stderr, "ERROR: Error updating member: %s\n", PQresultErrorMessage(res));
  774. fprintf(stderr, "%s", OSUtils::jsonDump(*config, 2).c_str());
  775. PQclear(res);
  776. delete config;
  777. config = nullptr;
  778. continue;
  779. }
  780. PQclear(res);
  781. res = PQexec(conn, "BEGIN");
  782. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  783. fprintf(stderr, "ERROR: Error beginning transaction: %s\n", PQresultErrorMessage(res));
  784. PQclear(res);
  785. delete config;
  786. config = nullptr;
  787. continue;
  788. }
  789. PQclear(res);
  790. const char *v2[2] = {
  791. memberId.c_str(),
  792. networkId.c_str()
  793. };
  794. res = PQexecParams(conn,
  795. "DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
  796. 2,
  797. NULL,
  798. v2,
  799. NULL,
  800. NULL,
  801. 0);
  802. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  803. fprintf(stderr, "ERROR: Error updating IP address assignments: %s\n", PQresultErrorMessage(res));
  804. PQclear(res);
  805. PQclear(PQexec(conn, "ROLLBACK"));;
  806. delete config;
  807. config = nullptr;
  808. continue;
  809. }
  810. PQclear(res);
  811. std::vector<std::string> assignments;
  812. for (auto i = (*config)["ipAssignments"].begin(); i != (*config)["ipAssignments"].end(); ++i) {
  813. std::string addr = *i;
  814. if (std::find(assignments.begin(), assignments.end(), addr) != assignments.end()) {
  815. continue;
  816. }
  817. const char *v3[3] = {
  818. memberId.c_str(),
  819. networkId.c_str(),
  820. addr.c_str()
  821. };
  822. res = PQexecParams(conn,
  823. "INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3)",
  824. 3,
  825. NULL,
  826. v3,
  827. NULL,
  828. NULL,
  829. 0);
  830. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  831. fprintf(stderr, "ERROR: Error setting IP addresses for member: %s\n", PQresultErrorMessage(res));
  832. PQclear(res);
  833. PQclear(PQexec(conn, "ROLLBACK"));
  834. break;;
  835. }
  836. }
  837. res = PQexec(conn, "COMMIT");
  838. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  839. fprintf(stderr, "ERROR: Error committing ip address data: %s\n", PQresultErrorMessage(res));
  840. }
  841. PQclear(res);
  842. const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL);
  843. const uint64_t memberidInt = OSUtils::jsonIntHex((*config)["id"], 0ULL);
  844. if (nwidInt && memberidInt) {
  845. nlohmann::json nwOrig;
  846. nlohmann::json memOrig;
  847. nlohmann::json memNew(*config);
  848. get(nwidInt, nwOrig, memberidInt, memOrig);
  849. _memberChanged(memOrig, memNew, (this->_ready>=2));
  850. } else {
  851. fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt);
  852. }
  853. } catch (std::exception &e) {
  854. fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
  855. }
  856. } else if (objtype == "network") {
  857. try {
  858. std::string id = (*config)["id"];
  859. std::string controllerId = _myAddressStr.c_str();
  860. std::string name = (*config)["name"];
  861. std::string remoteTraceTarget("NULL");
  862. if (!(*config)["remoteTraceTarget"].is_null()) {
  863. remoteTraceTarget = (*config)["remoteTraceTarget"];
  864. }
  865. std::string rulesSource = (*config)["rulesSource"];
  866. std::string caps = OSUtils::jsonDump((*config)["capabilitles"], -1);
  867. std::string now = std::to_string(OSUtils::now());
  868. std::string mtu = std::to_string((int)(*config)["mtu"]);
  869. std::string mcastLimit = std::to_string((int)(*config)["multicastLimit"]);
  870. std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]);
  871. std::string rules = OSUtils::jsonDump((*config)["rules"], -1);
  872. std::string tags = OSUtils::jsonDump((*config)["tags"], -1);
  873. std::string v4mode = OSUtils::jsonDump((*config)["v4AssignMode"],-1);
  874. std::string v6mode = OSUtils::jsonDump((*config)["v6AssignMode"], -1);
  875. bool enableBroadcast = (*config)["enableBroadcast"];
  876. bool isPrivate = (*config)["private"];
  877. const char *values[16] = {
  878. id.c_str(),
  879. controllerId.c_str(),
  880. caps.c_str(),
  881. enableBroadcast ? "true" : "false",
  882. now.c_str(),
  883. mtu.c_str(),
  884. mcastLimit.c_str(),
  885. name.c_str(),
  886. isPrivate ? "true" : "false",
  887. rtraceLevel.c_str(),
  888. (remoteTraceTarget == "NULL" ? NULL : remoteTraceTarget.c_str()),
  889. rules.c_str(),
  890. rulesSource.c_str(),
  891. tags.c_str(),
  892. v4mode.c_str(),
  893. v6mode.c_str(),
  894. };
  895. PGresult *res = PQexecParams(conn,
  896. "UPDATE ztc_network SET controller_id = $2, capabilities = $3, enable_broadcast = $4, "
  897. "last_updated = $5, mtu = $6, multicast_limit = $7, name = $8, private = $9, "
  898. "remote_trace_level = $10, remote_trace_target = $11, rules = $12, rules_source = $13, "
  899. "tags = $14, v4_assign_mode = $15, v6_assign_mode = $16 "
  900. "WHERE id = $1",
  901. 16,
  902. NULL,
  903. values,
  904. NULL,
  905. NULL,
  906. 0);
  907. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  908. fprintf(stderr, "ERROR: Error updating network record: %s\n", PQresultErrorMessage(res));
  909. PQclear(res);
  910. delete config;
  911. config = nullptr;
  912. continue;
  913. }
  914. PQclear(res);
  915. res = PQexec(conn, "BEGIN");
  916. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  917. fprintf(stderr, "ERROR: Error beginnning transaction: %s\n", PQresultErrorMessage(res));
  918. PQclear(res);
  919. delete config;
  920. config = nullptr;
  921. continue;
  922. }
  923. PQclear(res);
  924. const char *params[1] = {
  925. id.c_str()
  926. };
  927. res = PQexecParams(conn,
  928. "DELETE FROM ztc_network_assignment_pool WHERE network_id = $1",
  929. 1,
  930. NULL,
  931. params,
  932. NULL,
  933. NULL,
  934. 0);
  935. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  936. fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res));
  937. PQclear(res);
  938. PQclear(PQexec(conn, "ROLLBACK"));
  939. delete config;
  940. config = nullptr;
  941. continue;
  942. }
  943. PQclear(res);
  944. auto pool = (*config)["ipAssignmentPools"];
  945. bool err = false;
  946. for (auto i = pool.begin(); i != pool.end(); ++i) {
  947. std::string start = (*i)["ipRangeStart"];
  948. std::string end = (*i)["ipRangeEnd"];
  949. const char *p[3] = {
  950. id.c_str(),
  951. start.c_str(),
  952. end.c_str()
  953. };
  954. res = PQexecParams(conn,
  955. "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) "
  956. "VALUES ($1, $2, $3)",
  957. 3,
  958. NULL,
  959. p,
  960. NULL,
  961. NULL,
  962. 0);
  963. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  964. fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res));
  965. PQclear(res);
  966. err = true;
  967. break;
  968. }
  969. PQclear(res);
  970. }
  971. if (err) {
  972. PQclear(PQexec(conn, "ROLLBACK"));
  973. delete config;
  974. config = nullptr;
  975. continue;
  976. }
  977. res = PQexecParams(conn,
  978. "DELETE FROM ztc_network_route WHERE network_id = $1",
  979. 1,
  980. NULL,
  981. params,
  982. NULL,
  983. NULL,
  984. 0);
  985. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  986. fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res));
  987. PQclear(res);
  988. PQclear(PQexec(conn, "ROLLBACK"));
  989. delete config;
  990. config = nullptr;
  991. continue;
  992. }
  993. auto routes = (*config)["routes"];
  994. err = false;
  995. for (auto i = routes.begin(); i != routes.end(); ++i) {
  996. std::string t = (*i)["target"];
  997. std::vector<std::string> target;
  998. std::istringstream f(t);
  999. std::string s;
  1000. while(std::getline(f, s, '/')) {
  1001. target.push_back(s);
  1002. }
  1003. if (target.empty() || target.size() != 2) {
  1004. continue;
  1005. }
  1006. std::string targetAddr = target[0];
  1007. std::string targetBits = target[1];
  1008. std::string via = "NULL";
  1009. if (!(*i)["via"].is_null()) {
  1010. via = (*i)["via"];
  1011. }
  1012. const char *p[4] = {
  1013. id.c_str(),
  1014. targetAddr.c_str(),
  1015. targetBits.c_str(),
  1016. (via == "NULL" ? NULL : via.c_str()),
  1017. };
  1018. res = PQexecParams(conn,
  1019. "INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)",
  1020. 4,
  1021. NULL,
  1022. p,
  1023. NULL,
  1024. NULL,
  1025. 0);
  1026. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1027. fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res));
  1028. PQclear(res);
  1029. err = true;
  1030. break;
  1031. }
  1032. PQclear(res);
  1033. }
  1034. if (err) {
  1035. PQclear(PQexec(conn, "ROLLBACK"));
  1036. delete config;
  1037. config = nullptr;
  1038. continue;
  1039. }
  1040. res = PQexec(conn, "COMMIT");
  1041. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1042. fprintf(stderr, "ERROR: Error committing network update: %s\n", PQresultErrorMessage(res));
  1043. }
  1044. PQclear(res);
  1045. const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL);
  1046. if (nwidInt) {
  1047. nlohmann::json nwOrig;
  1048. nlohmann::json nwNew(*config);
  1049. get(nwidInt, nwOrig);
  1050. _networkChanged(nwOrig, nwNew, true);
  1051. } else {
  1052. fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt);
  1053. }
  1054. } catch (std::exception &e) {
  1055. fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
  1056. }
  1057. } else if (objtype == "trace") {
  1058. fprintf(stderr, "ERROR: Trace not yet implemented");
  1059. } else if (objtype == "_delete_network") {
  1060. try {
  1061. std::string networkId = (*config)["nwid"];
  1062. const char *values[1] = {
  1063. networkId.c_str()
  1064. };
  1065. PGresult * res = PQexecParams(conn,
  1066. "UPDATE ztc_network SET deleted = true WHERE id = $1",
  1067. 1,
  1068. NULL,
  1069. values,
  1070. NULL,
  1071. NULL,
  1072. 0);
  1073. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1074. fprintf(stderr, "ERROR: Error deleting network: %s\n", PQresultErrorMessage(res));
  1075. }
  1076. PQclear(res);
  1077. } catch (std::exception &e) {
  1078. fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what());
  1079. }
  1080. } else if (objtype == "_delete_member") {
  1081. try {
  1082. std::string memberId = (*config)["id"];
  1083. std::string networkId = (*config)["nwid"];
  1084. const char *values[2] = {
  1085. memberId.c_str(),
  1086. networkId.c_str()
  1087. };
  1088. PGresult *res = PQexecParams(conn,
  1089. "UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2",
  1090. 2,
  1091. NULL,
  1092. values,
  1093. NULL,
  1094. NULL,
  1095. 0);
  1096. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1097. fprintf(stderr, "ERROR: Error deleting member: %s\n", PQresultErrorMessage(res));
  1098. }
  1099. PQclear(res);
  1100. } catch (std::exception &e) {
  1101. fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what());
  1102. }
  1103. } else {
  1104. fprintf(stderr, "ERROR: unknown objtype");
  1105. }
  1106. } catch (std::exception &e) {
  1107. fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what());
  1108. }
  1109. delete config;
  1110. config = nullptr;
  1111. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  1112. }
  1113. PQfinish(conn);
  1114. if (_run == 1) {
  1115. fprintf(stderr, "ERROR: %s commitThread should still be running! Exiting Controller.\n", _myAddressStr.c_str());
  1116. exit(7);
  1117. }
  1118. }
  1119. void PostgreSQL::onlineNotificationThread()
  1120. {
  1121. PGconn *conn = getPgConn();
  1122. if (PQstatus(conn) == CONNECTION_BAD) {
  1123. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  1124. PQfinish(conn);
  1125. exit(1);
  1126. }
  1127. _connected = 1;
  1128. int64_t lastUpdatedNetworkStatus = 0;
  1129. std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;
  1130. while (_run == 1) {
  1131. if (PQstatus(conn) != CONNECTION_OK) {
  1132. fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
  1133. PQfinish(conn);
  1134. exit(5);
  1135. }
  1136. // map used to send notifications to front end
  1137. std::unordered_map<std::string, std::vector<std::string>> updateMap;
  1138. std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
  1139. {
  1140. std::lock_guard<std::mutex> l(_lastOnline_l);
  1141. lastOnline.swap(_lastOnline);
  1142. }
  1143. PGresult *res = NULL;
  1144. for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
  1145. uint64_t nwid_i = i->first.first;
  1146. char nwidTmp[64];
  1147. char memTmp[64];
  1148. char ipTmp[64];
  1149. OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i);
  1150. OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second);
  1151. auto found = _networks.find(nwid_i);
  1152. if (found == _networks.end()) {
  1153. continue; // skip members trying to join non-existant networks
  1154. }
  1155. lastOnlineCumulative[i->first] = i->second.first;
  1156. std::string networkId(nwidTmp);
  1157. std::string memberId(memTmp);
  1158. std::vector<std::string> &members = updateMap[networkId];
  1159. members.push_back(memberId);
  1160. int64_t ts = i->second.first;
  1161. std::string ipAddr = i->second.second.toIpString(ipTmp);
  1162. std::string timestamp = std::to_string(ts);
  1163. const char *values[4] = {
  1164. networkId.c_str(),
  1165. memberId.c_str(),
  1166. (ipAddr.empty() ? NULL : ipAddr.c_str()),
  1167. timestamp.c_str(),
  1168. };
  1169. res = PQexecParams(conn,
  1170. "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ($1, $2, $3, TO_TIMESTAMP($4::double precision/1000)) "
  1171. "ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated",
  1172. 4, // number of parameters
  1173. NULL, // oid field. ignore
  1174. values, // values for substitution
  1175. NULL, // lengths in bytes of each value
  1176. NULL,
  1177. 0);
  1178. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1179. fprintf(stderr, "Error on Member Status upsert: %s\n", PQresultErrorMessage(res));
  1180. PQclear(res);
  1181. continue;
  1182. }
  1183. PQclear(res);
  1184. }
  1185. const int64_t now = OSUtils::now();
  1186. if ((now - lastUpdatedNetworkStatus) > 10000) {
  1187. lastUpdatedNetworkStatus = now;
  1188. std::vector<std::pair<uint64_t, std::shared_ptr<_Network>>> networks;
  1189. {
  1190. std::lock_guard<std::mutex> l(_networks_l);
  1191. for (auto i = _networks.begin(); i != _networks.end(); ++i) {
  1192. networks.push_back(*i);
  1193. }
  1194. }
  1195. for (auto i = networks.begin(); i != networks.end(); ++i) {
  1196. char tmp[64];
  1197. Utils::hex(i->first, tmp);
  1198. std::string networkId(tmp);
  1199. std::vector<std::string> &_notUsed = updateMap[networkId];
  1200. (void)_notUsed;
  1201. uint64_t authMemberCount = 0;
  1202. uint64_t totalMemberCount = 0;
  1203. uint64_t onlineMemberCount = 0;
  1204. uint64_t bridgeCount = 0;
  1205. uint64_t ts = now;
  1206. {
  1207. std::lock_guard<std::mutex> l2(i->second->lock);
  1208. authMemberCount = i->second->authorizedMembers.size();
  1209. totalMemberCount = i->second->members.size();
  1210. bridgeCount = i->second->activeBridgeMembers.size();
  1211. for (auto m=i->second->members.begin(); m != i->second->members.end(); ++m) {
  1212. auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first, m->first));
  1213. if (lo != lastOnlineCumulative.end()) {
  1214. if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) {
  1215. ++onlineMemberCount;
  1216. } else {
  1217. lastOnlineCumulative.erase(lo);
  1218. }
  1219. }
  1220. }
  1221. }
  1222. std::string bc = std::to_string(bridgeCount);
  1223. std::string amc = std::to_string(authMemberCount);
  1224. std::string omc = std::to_string(onlineMemberCount);
  1225. std::string tmc = std::to_string(totalMemberCount);
  1226. std::string timestamp = std::to_string(ts);
  1227. const char *values[6] = {
  1228. networkId.c_str(),
  1229. bc.c_str(),
  1230. amc.c_str(),
  1231. omc.c_str(),
  1232. tmc.c_str(),
  1233. timestamp.c_str()
  1234. };
  1235. res = PQexecParams(conn, "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, "
  1236. "online_member_count, total_member_count, last_modified) VALUES ($1, $2, $3, $4, $5, TO_TIMESTAMP($6::double precision/1000)) "
  1237. "ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, "
  1238. "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, "
  1239. "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified",
  1240. 6,
  1241. NULL,
  1242. values,
  1243. NULL,
  1244. NULL,
  1245. 0);
  1246. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1247. fprintf(stderr, "ERROR: Error on Network Status upsert (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  1248. PQclear(res);
  1249. continue;
  1250. }
  1251. PQclear(res);
  1252. }
  1253. }
  1254. // for (auto it = updateMap.begin(); it != updateMap.end(); ++it) {
  1255. // std::string networkId = it->first;
  1256. // std::vector<std::string> members = it->second;
  1257. // std::stringstream queryBuilder;
  1258. // std::string membersStr = ::join(members, ",");
  1259. // queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'";
  1260. // std::string query = queryBuilder.str();
  1261. // PGresult *res = PQexec(conn,query.c_str());
  1262. // if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1263. // fprintf(stderr, "ERROR: Error sending NOTIFY: %s\n", PQresultErrorMessage(res));
  1264. // }
  1265. // PQclear(res);
  1266. // }
  1267. std::this_thread::sleep_for(std::chrono::milliseconds(250));
  1268. }
  1269. fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
  1270. PQfinish(conn);
  1271. if (_run == 1) {
  1272. fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str());
  1273. exit(6);
  1274. }
  1275. }
  1276. PGconn *PostgreSQL::getPgConn(OverrideMode m) {
  1277. if (m == ALLOW_PGBOUNCER_OVERRIDE) {
  1278. char *connStr = getenv("PGBOUNCER_CONNSTR");
  1279. if (connStr != NULL) {
  1280. fprintf(stderr, "PGBouncer Override\n");
  1281. std::string conn(connStr);
  1282. conn += " application_name=controller-";
  1283. conn += _myAddressStr.c_str();
  1284. return PQconnectdb(conn.c_str());
  1285. }
  1286. }
  1287. return PQconnectdb(_connString.c_str());
  1288. }
  1289. #endif //ZT_CONTROLLER_USE_LIBPQ