PostgreSQL.cpp 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198
  1. /*
  2. * ZeroTier One - Network Virtualization Everywhere
  3. * Copyright (C) 2011-2018 ZeroTier, Inc.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. */
  18. #ifdef ZT_CONTROLLER_USE_LIBPQ
  19. #include "PostgreSQL.hpp"
  20. #include "EmbeddedNetworkController.hpp"
  21. #include "../version.h"
  22. #include <libpq-fe.h>
  23. using json = nlohmann::json;
  24. namespace {
  25. static const char *_timestr()
  26. {
  27. time_t t = time(0);
  28. char *ts = ctime(&t);
  29. char *p = ts;
  30. if (!p)
  31. return "";
  32. while (*p) {
  33. if (*p == '\n') {
  34. *p = (char)0;
  35. break;
  36. }
  37. ++p;
  38. }
  39. return ts;
  40. }
  41. }
  42. using namespace ZeroTier;
  43. PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path)
  44. : DB(nc, myId, path)
  45. , _ready(0)
  46. , _connected(1)
  47. , _run(1)
  48. , _waitNoticePrinted(false)
  49. {
  50. _connString = std::string(path);
  51. _readyLock.lock();
  52. _heartbeatThread = std::thread(&PostgreSQL::heartbeat, this);
  53. _membersDbWatcher = std::thread(&PostgreSQL::membersDbWatcher, this);
  54. _networksDbWatcher = std::thread(&PostgreSQL::networksDbWatcher, this);
  55. for (int i = 0; i < ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS; ++i) {
  56. _commitThread[i] = std::thread(&PostgreSQL::commitThread, this);
  57. }
  58. _onlineNotificationThread = std::thread(&PostgreSQL::onlineNotificationThread, this);
  59. }
  60. PostgreSQL::~PostgreSQL()
  61. {
  62. _run = 0;
  63. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  64. _heartbeatThread.join();
  65. _membersDbWatcher.join();
  66. _networksDbWatcher.join();
  67. for (int i = 0; i < ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS; ++i) {
  68. _commitThread[i].join();
  69. }
  70. _onlineNotificationThread.join();
  71. }
  72. bool PostgreSQL::waitForReady()
  73. {
  74. while (_ready < 2) {
  75. if (!_waitNoticePrinted) {
  76. _waitNoticePrinted = true;
  77. fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt());
  78. }
  79. _readyLock.lock();
  80. _readyLock.unlock();
  81. }
  82. return true;
  83. }
  84. bool PostgreSQL::isReady()
  85. {
  86. return ((_ready == 2)&&(_connected));
  87. }
  88. void PostgreSQL::save(nlohmann::json *orig, nlohmann::json &record)
  89. {
  90. if (!record.is_object()) {
  91. return;
  92. }
  93. waitForReady();
  94. if (orig) {
  95. if (*orig != record) {
  96. record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
  97. _commitQueue.post(new nlohmann::json(record));
  98. }
  99. } else {
  100. record["revision"] = 1;
  101. _commitQueue.post(new nlohmann::json(record));
  102. }
  103. }
  104. void PostgreSQL::eraseNetwork(const uint64_t networkId)
  105. {
  106. char tmp2[24];
  107. waitForReady();
  108. Utils::hex(networkId, tmp2);
  109. json *tmp = new json();
  110. (*tmp)["id"] = tmp2;
  111. (*tmp)["objtype"] = "_delete_network";
  112. _commitQueue.post(tmp);
  113. }
  114. void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
  115. {
  116. char tmp2[24];
  117. json *tmp = new json();
  118. Utils::hex(networkId, tmp2);
  119. (*tmp)["nwid"] = tmp2;
  120. Utils::hex(memberId, tmp2);
  121. (*tmp)["id"] = tmp2;
  122. (*tmp)["objtype"] = "_delete_member";
  123. _commitQueue.post(tmp);
  124. }
  125. void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress)
  126. {
  127. std::lock_guard<std::mutex> l(_lastOnline_l);
  128. std::pair<int64_t, InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId, memberId)];
  129. i.first = OSUtils::now();
  130. if (physicalAddress) {
  131. i.second = physicalAddress;
  132. }
  133. }
  134. void PostgreSQL::initializeNetworks(PGconn *conn)
  135. {
  136. try {
  137. if (PQstatus(conn) != CONNECTION_OK) {
  138. fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
  139. exit(1);
  140. }
  141. const char *params[1] = {
  142. _myAddressStr.c_str()
  143. };
  144. PGresult *res = PQexecParams(conn, "SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, "
  145. "enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, "
  146. "remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network "
  147. "WHERE deleted = false AND controller_id = $1",
  148. 1,
  149. NULL,
  150. params,
  151. NULL,
  152. NULL,
  153. 0);
  154. if (PQresultStatus(res) != PGRES_TUPLES_OK) {
  155. fprintf(stderr, "Networks Initialization Failed: %s", PQerrorMessage(conn));
  156. PQclear(res);
  157. exit(1);
  158. }
  159. int numRows = PQntuples(res);
  160. for (int i = 0; i < numRows; ++i) {
  161. json empty;
  162. json config;
  163. config["nwid"] = PQgetvalue(res, i, 0);
  164. config["creationTime"] = std::stoull(PQgetvalue(res, i, 1));
  165. config["capabilities"] = json::parse(PQgetvalue(res, i, 2));
  166. config["enableBroadcast"] = (strcmp(PQgetvalue(res, i, 3),"true")==0);
  167. config["lastModified"] = std::stoull(PQgetvalue(res, i, 4));
  168. config["mtu"] = std::stoi(PQgetvalue(res, i, 5));
  169. config["multicastLimit"] = std::stoi(PQgetvalue(res, i, 6));
  170. config["name"] = PQgetvalue(res, i, 7);
  171. config["private"] = (strcmp(PQgetvalue(res, i, 8),"true")==0);
  172. config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9));
  173. config["remoteTraceTarget"] = PQgetvalue(res, i, 10);
  174. config["revision"] = std::stoull(PQgetvalue(res, i, 11));
  175. config["rules"] = json::parse(PQgetvalue(res, i, 12));
  176. config["tags"] = json::parse(PQgetvalue(res, i, 13));
  177. config["v4AssignMode"] = json::parse(PQgetvalue(res, i, 14));
  178. config["v6AssignMode"] = json::parse(PQgetvalue(res, i, 15));
  179. config["objtype"] = "network";
  180. config["ipAssignmentPools"] = json::array();
  181. config["routes"] = json::array();
  182. PGresult *r2 = PQexecParams(conn,
  183. "SELECT host(ip_range_start), host(ip_range_end) FROM ztc_network_assignment_pool WHERE network_id = $1",
  184. 1,
  185. NULL,
  186. params,
  187. NULL,
  188. NULL,
  189. 0);
  190. if (PQresultStatus(r2) != PGRES_TUPLES_OK) {
  191. fprintf(stderr, "ERROR: Error retreiving IP pools for network: %s\n", PQresultErrorMessage(r2));
  192. PQclear(r2);
  193. PQclear(res);
  194. exit(1);
  195. }
  196. int n = PQntuples(r2);
  197. for (int j = 0; j < n; ++j) {
  198. json ip;
  199. ip["ipRangeStart"] = PQgetvalue(r2, j, 0);
  200. ip["ipRangeEnd"] = PQgetvalue(r2, j, 1);
  201. config["ipAssignmentPools"].push_back(ip);
  202. }
  203. PQclear(r2);
  204. r2 = PQexecParams(conn,
  205. "SELECT host(address), bits, host(via) FROM ztc_network_route WHERE network_id = $1",
  206. 1,
  207. NULL,
  208. params,
  209. NULL,
  210. NULL,
  211. 0);
  212. if (PQresultStatus(r2) != PGRES_TUPLES_OK) {
  213. fprintf(stderr, "ERROR: Error retreiving routes for network: %s\n", PQresultErrorMessage(r2));
  214. PQclear(r2);
  215. PQclear(res);
  216. exit(1);
  217. }
  218. n = PQntuples(r2);
  219. for (int j = 0; j < n; ++j) {
  220. std::string addr = PQgetvalue(r2, j, 0);
  221. std::string bits = PQgetvalue(r2, j, 1);
  222. std::string via = PQgetvalue(r2, j, 2);
  223. json route;
  224. route["target"] = addr + "/" + bits;
  225. if (via == "NULL") {
  226. route["via"] = nullptr;
  227. } else {
  228. route["via"] = via;
  229. }
  230. config["routes"].push_back(route);
  231. }
  232. PQclear(r2);
  233. _networkChanged(empty, config, false);
  234. }
  235. PQclear(res);
  236. if (++this->_ready == 2) {
  237. if (_waitNoticePrinted) {
  238. fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
  239. }
  240. _readyLock.unlock();
  241. }
  242. } catch (std::exception &e) {
  243. fprintf(stderr, "ERROR: Error initializing networks: %s", e.what());
  244. exit(-1);
  245. }
  246. }
  247. void PostgreSQL::initializeMembers(PGconn *conn)
  248. {
  249. try {
  250. if (PQstatus(conn) != CONNECTION_OK) {
  251. fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
  252. exit(1);
  253. }
  254. const char *params[1] = {
  255. _myAddressStr.c_str()
  256. };
  257. PGresult *res = PQexecParams(conn,
  258. "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, "
  259. " EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, "
  260. " EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000, "
  261. " m.remote_trace_level, m.remote_trace_target, m.tags, m.v_major, m.v_minor, m.v_rev, m.v_proto, "
  262. " m.no_auto_assign_ips, m.revision "
  263. "FROM ztc_member m "
  264. "INNER JOIN ztc_network n "
  265. " ON n.id = m.network_id "
  266. "WHERE n.controller_id = $1 AND m.deleted = false",
  267. 1,
  268. NULL,
  269. params,
  270. NULL,
  271. NULL,
  272. 0);
  273. if (PQresultStatus(res) != PGRES_TUPLES_OK) {
  274. fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn));
  275. PQclear(res);
  276. exit(1);
  277. }
  278. int numRows = PQntuples(res);
  279. for (int i = 0; i < numRows; ++i) {
  280. json empty;
  281. json config;
  282. std::string memberId(PQgetvalue(res, i, 0));
  283. std::string networkId(PQgetvalue(res, i, 1));
  284. config["id"] = memberId;
  285. config["nwid"] = networkId;
  286. config["activeBridge"] = (strcmp(PQgetvalue(res, i, 3), "true") == 0);
  287. config["authorized"] = (strcmp(PQgetvalue(res, i, 4), "true") == 0);
  288. config["capabilities"] = json::parse(PQgetvalue(res, i, 5));
  289. config["creationTime"] = std::stoull(PQgetvalue(res, i, 6));
  290. config["identity"] = PQgetvalue(res, i, 7);
  291. config["lastAuthorizedTime"] = std::stoull(PQgetvalue(res, i, 8));
  292. config["lastDeauthorizedTime"] = std::stoull(PQgetvalue(res, i, 9));
  293. config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 10));
  294. config["remoteTraceTarget"] = PQgetvalue(res, i, 11);
  295. config["tags"] = json::parse(PQgetvalue(res, i, 12));
  296. config["vMajor"] = std::stoi(PQgetvalue(res, i, 13));
  297. config["vMinor"] = std::stoi(PQgetvalue(res, i, 14));
  298. config["vRev"] = std::stoi(PQgetvalue(res, i, 15));
  299. config["vProto"] = std::stoi(PQgetvalue(res, i, 16));
  300. config["noAutoAssignIps"] = (strcmp(PQgetvalue(res, i, 17), "true") == 0);
  301. config["revision"] = std::stoull(PQgetvalue(res, i, 18));
  302. config["objtype"] = "member";
  303. config["ipAssignments"] = json::array();
  304. const char *p2[2] = {
  305. memberId.c_str(),
  306. networkId.c_str()
  307. };
  308. PGresult *r2 = PQexecParams(conn,
  309. "SELECT address FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
  310. 2,
  311. NULL,
  312. p2,
  313. NULL,
  314. NULL,
  315. 0);
  316. if (PQresultStatus(r2) != PGRES_TUPLES_OK) {
  317. fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn));
  318. PQclear(r2);
  319. PQclear(res);
  320. exit(1);
  321. }
  322. int n = PQntuples(r2);
  323. for (int j = 0; j < n; ++j) {
  324. config["ipAssignments"].push_back(PQgetvalue(r2, j, 0));
  325. }
  326. _memberChanged(empty, config, false);
  327. }
  328. PQclear(res);
  329. if (++this->_ready == 2) {
  330. if (_waitNoticePrinted) {
  331. fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
  332. }
  333. _readyLock.unlock();
  334. }
  335. } catch (std::exception &e) {
  336. fprintf(stderr, "ERROR: Error initializing members: %s\n", e.what());
  337. exit(-1);
  338. }
  339. }
  340. void PostgreSQL::heartbeat()
  341. {
  342. char publicId[1024];
  343. char hostnameTmp[1024];
  344. _myId.toString(false,publicId);
  345. if (gethostname(hostnameTmp, sizeof(hostnameTmp))!= 0) {
  346. hostnameTmp[0] = (char)0;
  347. } else {
  348. for (int i = 0; i < sizeof(hostnameTmp); ++i) {
  349. if ((hostnameTmp[i] == '.')||(hostnameTmp[i] == 0)) {
  350. hostnameTmp[i] = (char)0;
  351. break;
  352. }
  353. }
  354. }
  355. const char *controllerId = _myAddressStr.c_str();
  356. const char *publicIdentity = publicId;
  357. const char *hostname = hostnameTmp;
  358. PGconn *conn = PQconnectdb(_path.c_str());
  359. if (PQstatus(conn) == CONNECTION_BAD) {
  360. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  361. PQfinish(conn);
  362. exit(1);
  363. }
  364. while (_run == 1) {
  365. if(PQstatus(conn) != CONNECTION_OK) {
  366. PQfinish(conn);
  367. conn = PQconnectdb(_path.c_str());
  368. }
  369. if (conn) {
  370. const char *values[8] = {
  371. controllerId,
  372. hostname,
  373. std::to_string(OSUtils::now()).c_str(),
  374. publicIdentity,
  375. std::to_string(ZEROTIER_ONE_VERSION_MAJOR).c_str(),
  376. std::to_string(ZEROTIER_ONE_VERSION_MINOR).c_str(),
  377. std::to_string(ZEROTIER_ONE_VERSION_REVISION).c_str(),
  378. std::to_string(ZEROTIER_ONE_VERSION_BUILD).c_str()
  379. };
  380. int lengths[8] = {
  381. (int)strlen(values[0]),
  382. (int)strlen(values[1]),
  383. (int)strlen(values[2]),
  384. (int)strlen(values[3]),
  385. (int)strlen(values[4]),
  386. (int)strlen(values[5]),
  387. (int)strlen(values[6]),
  388. (int)strlen(values[7])
  389. };
  390. int binary[8] = {0,0,0,0,0,0,0,0};
  391. PGresult *res = PQexecParams(conn,
  392. "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build) "
  393. "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8) "
  394. "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
  395. "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
  396. "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev",
  397. 8, // number of parameters
  398. NULL, // oid field. ignore
  399. values, // values for substitution
  400. lengths, // lengths in bytes of each value
  401. binary, // binary?
  402. 0);
  403. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  404. fprintf(stderr, "Heartbeat Update Failed: %s\n", PQresultErrorMessage(res));
  405. }
  406. PQclear(res);
  407. }
  408. std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  409. }
  410. PQfinish(conn);
  411. conn = NULL;
  412. }
  413. void PostgreSQL::membersDbWatcher()
  414. {
  415. PGconn *conn = PQconnectdb(_path.c_str());
  416. if (PQstatus(conn) == CONNECTION_BAD) {
  417. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  418. PQfinish(conn);
  419. exit(1);
  420. }
  421. initializeMembers(conn);
  422. char buf[11] = {0};
  423. std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf));
  424. PGresult *res = PQexec(conn, cmd.c_str());
  425. if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
  426. fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res));
  427. PQclear(res);
  428. PQfinish(conn);
  429. exit(1);
  430. }
  431. PQclear(res); res = NULL;
  432. while(_run == 1) {
  433. if (PQstatus(conn) != CONNECTION_OK) {
  434. fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres.");
  435. exit(-1);
  436. }
  437. PGnotify *notify = NULL;
  438. PQconsumeInput(conn);
  439. while ((notify = PQnotifies(conn)) != NULL) {
  440. fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra);
  441. try {
  442. json tmp(json::parse(notify->extra));
  443. json &ov = tmp["old_val"];
  444. json &nv = tmp["new_val"];
  445. json oldConfig, newConfig;
  446. if (ov.is_object()) oldConfig = ov;
  447. if (nv.is_object()) newConfig = nv;
  448. if (oldConfig.is_object() || newConfig.is_object()) {
  449. _memberChanged(oldConfig,newConfig,(this->_ready>=2));
  450. }
  451. } catch (...) {} // ignore bad records
  452. free(notify);
  453. }
  454. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  455. }
  456. PQfinish(conn);
  457. conn = NULL;
  458. }
  459. void PostgreSQL::networksDbWatcher()
  460. {
  461. PGconn *conn = PQconnectdb(_path.c_str());
  462. if (PQstatus(conn) == CONNECTION_BAD) {
  463. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  464. PQfinish(conn);
  465. exit(1);
  466. }
  467. initializeNetworks(conn);
  468. char buf[11] = {0};
  469. std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf));
  470. PGresult *res = PQexec(conn, cmd.c_str());
  471. if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
  472. fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res));
  473. PQclear(res);
  474. PQfinish(conn);
  475. exit(1);
  476. }
  477. PQclear(res); res = NULL;
  478. while(_run == 1) {
  479. if (PQstatus(conn) != CONNECTION_OK) {
  480. fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres.");
  481. exit(-1);
  482. }
  483. PGnotify *notify = NULL;
  484. PQconsumeInput(conn);
  485. while ((notify = PQnotifies(conn)) != NULL) {
  486. fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra);
  487. try {
  488. json tmp(json::parse(notify->extra));
  489. json &ov = tmp["old_val"];
  490. json &nv = tmp["new_val"];
  491. json oldConfig, newConfig;
  492. if (ov.is_object()) oldConfig = ov;
  493. if (nv.is_object()) newConfig = nv;
  494. if (oldConfig.is_object()||newConfig.is_object()) {
  495. _networkChanged(oldConfig,newConfig,(this->_ready >= 2));
  496. }
  497. } catch (...) {} // ignore bad records
  498. free(notify);
  499. }
  500. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  501. }
  502. PQfinish(conn);
  503. conn = NULL;
  504. }
  505. void PostgreSQL::commitThread()
  506. {
  507. PGconn *conn = PQconnectdb(_path.c_str());
  508. if (PQstatus(conn) == CONNECTION_BAD) {
  509. fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn));
  510. PQfinish(conn);
  511. exit(1);
  512. }
  513. json *config = nullptr;
  514. while(_commitQueue.get(config)&(_run == 1)) {
  515. if (!config) {
  516. continue;
  517. }
  518. if (PQstatus(conn) == CONNECTION_BAD) {
  519. fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn));
  520. PQfinish(conn);
  521. exit(1);
  522. }
  523. try {
  524. const std::string objtype = (*config)["objtype"];
  525. if (objtype == "member") {
  526. try {
  527. std::string memberId = (*config)["id"];
  528. std::string networkId = (*config)["nwid"];
  529. std::string name = (*config)["name"];
  530. std::string description = (*config)["description"];
  531. std::string identity = (*config)["identity"];
  532. std::string target = "NULL";
  533. if (!(*config)["remoteTraceTarget"].is_null()) {
  534. target = (*config)["remoteTraceTarget"];
  535. }
  536. const char *values[19] = {
  537. memberId.c_str(),
  538. networkId.c_str(),
  539. ((*config)["activeBridge"] ? "true" : "false"),
  540. ((*config)["authorized"] ? "true" : "false"),
  541. OSUtils::jsonDump((*config)["capabilities"], -1).c_str(),
  542. name.c_str(),
  543. description.c_str(),
  544. identity.c_str(),
  545. std::to_string((long long)(*config)["lastAuthorizedTime"]).c_str(),
  546. std::to_string((long long)(*config)["lastDeauthorizedTime"]).c_str(),
  547. ((*config)["noAutoAssignIps"] ? "true" : "false"),
  548. std::to_string((int)(*config)["remoteTraceLevel"]).c_str(),
  549. (target == "NULL") ? NULL : target.c_str(),
  550. std::to_string((unsigned long long)(*config)["revision"]).c_str(),
  551. OSUtils::jsonDump((*config)["tags"], -1).c_str(),
  552. std::to_string((int)(*config)["vMajor"]).c_str(),
  553. std::to_string((int)(*config)["vMinor"]).c_str(),
  554. std::to_string((int)(*config)["vRev"]).c_str(),
  555. std::to_string((int)(*config)["vProto"]).c_str()
  556. };
  557. PGresult *res = PQexecParams(conn,
  558. "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
  559. "name, description, identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
  560. "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
  561. "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, "
  562. "TO_TIMESTAMP($9::double precision/1000), TO_TIMESTAMP($10::double precision/1000), "
  563. "$11, $12, $13, $14, $15, $16, $17, $18, $19) ON CONFLICT (network_id, id) DO UPDATE SET "
  564. "active_bridge = EXCLUDED.active_bridge, authorized = EXCDLUDED.authorized, capabilities = EXCLUDED.capabilities, "
  565. "name = EXCLUDED.name, description = EXCLUDED.description, "
  566. "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, "
  567. "last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, "
  568. "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, "
  569. "revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, "
  570. "v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto",
  571. 20,
  572. NULL,
  573. values,
  574. NULL,
  575. NULL,
  576. 0);
  577. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  578. fprintf(stderr, "ERROR: Error updating member: %s\n", PQresultErrorMessage(res));
  579. PQclear(res);
  580. continue;
  581. }
  582. PQclear(res);
  583. res = PQexec(conn, "BEGIN");
  584. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  585. fprintf(stderr, "ERROR: Error beginning transaction: %s\n", PQresultErrorMessage(res));
  586. PQclear(res);
  587. continue;
  588. }
  589. PQclear(res);
  590. const char *v2[2] = {
  591. memberId.c_str(),
  592. networkId.c_str()
  593. };
  594. res = PQexecParams(conn,
  595. "DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
  596. 2,
  597. NULL,
  598. v2,
  599. NULL,
  600. NULL,
  601. 0);
  602. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  603. fprintf(stderr, "ERROR: Error updating IP address assignments: %s\n", PQresultErrorMessage(res));
  604. PQclear(res);
  605. PQclear(PQexec(conn, "ROLLBACK"));;
  606. continue;
  607. }
  608. PQclear(res);
  609. for (auto i = (*config)["ipAssignments"].begin(); i != (*config)["ipAssignments"].end(); ++i) {
  610. std::string addr = *i;
  611. const char *v3[3] = {
  612. memberId.c_str(),
  613. networkId.c_str(),
  614. addr.c_str()
  615. };
  616. res = PQexecParams(conn,
  617. "INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3)",
  618. 3,
  619. NULL,
  620. v3,
  621. NULL,
  622. NULL,
  623. 0);
  624. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  625. fprintf(stderr, "ERROR: Error setting IP addresses for member: %s\n", PQresultErrorMessage(res));
  626. PQclear(res);
  627. PQclear(PQexec(conn, "ROLLBACK"));
  628. continue;
  629. }
  630. }
  631. res = PQexec(conn, "COMMIT");
  632. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  633. fprintf(stderr, "ERROR: Error committing ip address data: %s\n", PQresultErrorMessage(res));
  634. }
  635. PQclear(res);
  636. } catch (std::exception &e) {
  637. fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
  638. }
  639. } else if (objtype == "network") {
  640. try {
  641. std::string id = (*config)["id"];
  642. std::string controllerId = _myAddressStr.c_str();
  643. std::string name = (*config)["name"];
  644. std::string remoteTraceTarget("NULL");
  645. if (!(*config)["remoteTraceTarget"].is_null()) {
  646. remoteTraceTarget = (*config)["remoteTraceTarget"];
  647. }
  648. std::string rulesSource = (*config)["rulesSource"];
  649. const char *values[16] = {
  650. id.c_str(),
  651. controllerId.c_str(),
  652. OSUtils::jsonDump((*config)["capabilitles"], -1).c_str(),
  653. ((*config)["enableBroadcast"] ? "true" : "false"),
  654. std::to_string(OSUtils::now()).c_str(),
  655. std::to_string((int)(*config)["mtu"]).c_str(),
  656. std::to_string((int)(*config)["multicastLimit"]).c_str(),
  657. name.c_str(),
  658. ((*config)["private"] ? "true" : "false"),
  659. std::to_string((int)(*config)["remoteTraceLevel"]).c_str(),
  660. (remoteTraceTarget == "NULL" ? NULL : remoteTraceTarget.c_str()),
  661. OSUtils::jsonDump((*config)["rules"], -1).c_str(),
  662. rulesSource.c_str(),
  663. OSUtils::jsonDump((*config)["tags"], -1).c_str(),
  664. OSUtils::jsonDump((*config)["v4AssignMode"],-1).c_str(),
  665. OSUtils::jsonDump((*config)["v6AssignMode"], -1).c_str(),
  666. };
  667. PGresult *res = PQexecParams(conn,
  668. "UPDATE ztc_network SET controller_id = $2, capabilities = $3, enable_broadcast = $4, "
  669. "last_updated = $5, mtu = $6, multicast_limit = $7, name = $8, private = $9, "
  670. "remote_trace_level = $10, remote_trace_target = $11, rules = $12, rules_source = $13, "
  671. "tags = $14, v4_assign_mode = $15, v6_assign_mode = $16 "
  672. "WHERE id = $1",
  673. 16,
  674. NULL,
  675. values,
  676. NULL,
  677. NULL,
  678. 0);
  679. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  680. fprintf(stderr, "ERROR: Error updating network record: %s\n", PQresultErrorMessage(res));
  681. PQclear(res);
  682. continue;
  683. }
  684. PQclear(res);
  685. res = PQexec(conn, "BEGIN");
  686. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  687. fprintf(stderr, "ERROR: Error beginnning transaction: %s\n", PQresultErrorMessage(res));
  688. PQclear(res);
  689. continue;
  690. }
  691. PQclear(res);
  692. const char *params[1] = {
  693. id.c_str()
  694. };
  695. res = PQexecParams(conn,
  696. "DELETE FROM ztc_network_assignment_pool WHERE network_id = $1",
  697. 1,
  698. NULL,
  699. params,
  700. NULL,
  701. NULL,
  702. 0);
  703. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  704. fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res));
  705. PQclear(res);
  706. PQclear(PQexec(conn, "ROLLBACK"));
  707. continue;
  708. }
  709. PQclear(res);
  710. auto pool = (*config)["ipAssignmentPools"];
  711. bool err = false;
  712. for (auto i = pool.begin(); i != pool.end(); ++i) {
  713. std::string start = (*i)["ipRangeStart"];
  714. std::string end = (*i)["ipRangeEnd"];
  715. const char *p[3] = {
  716. id.c_str(),
  717. start.c_str(),
  718. end.c_str()
  719. };
  720. res = PQexecParams(conn,
  721. "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) "
  722. "VALUES ($1, $2, $3)",
  723. 3,
  724. NULL,
  725. p,
  726. NULL,
  727. NULL,
  728. 0);
  729. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  730. fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res));
  731. PQclear(res);
  732. err = true;
  733. break;
  734. }
  735. PQclear(res);
  736. }
  737. if (err) {
  738. PQclear(PQexec(conn, "ROLLBACK"));
  739. continue;
  740. }
  741. res = PQexecParams(conn,
  742. "DELETE FROM ztc_network_route WHERE network_id = $1",
  743. 1,
  744. NULL,
  745. params,
  746. NULL,
  747. NULL,
  748. 0);
  749. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  750. fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res));
  751. PQclear(res);
  752. PQclear(PQexec(conn, "ROLLBACK"));
  753. continue;
  754. }
  755. auto routes = (*config)["routes"];
  756. err = false;
  757. for (auto i = routes.begin(); i != routes.end(); ++i) {
  758. std::string t = (*i)["target"];
  759. std::vector<std::string> target;
  760. std::istringstream f(t);
  761. std::string s;
  762. while(std::getline(f, s, '/')) {
  763. target.push_back(s);
  764. }
  765. if (target.empty() || target.size() != 2) {
  766. continue;
  767. }
  768. std::string targetAddr = target[0];
  769. std::string targetBits = target[1];
  770. std::string via = "NULL";
  771. if (!(*i)["via"].is_null()) {
  772. via = (*i)["via"];
  773. }
  774. const char *p[4] = {
  775. id.c_str(),
  776. targetAddr.c_str(),
  777. targetBits.c_str(),
  778. (via == "NULL" ? NULL : via.c_str()),
  779. };
  780. res = PQexecParams(conn,
  781. "INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)",
  782. 4,
  783. NULL,
  784. p,
  785. NULL,
  786. NULL,
  787. 0);
  788. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  789. fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res));
  790. PQclear(res);
  791. err = true;
  792. break;
  793. }
  794. PQclear(res);
  795. }
  796. if (err) {
  797. PQclear(PQexec(conn, "ROLLBAcK"));
  798. continue;
  799. }
  800. res = PQexec(conn, "COMMIT");
  801. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  802. fprintf(stderr, "ERROR: Error committing network update: %s\n", PQresultErrorMessage(res));
  803. }
  804. PQclear(res);
  805. } catch (std::exception &e) {
  806. fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
  807. }
  808. } else if (objtype == "trace") {
  809. fprintf(stderr, "ERROR: Trace not yet implemented");
  810. } else if (objtype == "_delete_network") {
  811. try {
  812. std::string networkId = (*config)["nwid"];
  813. const char *values[1] = {
  814. networkId.c_str()
  815. };
  816. PGresult * res = PQexecParams(conn,
  817. "UPDATE ztc_network SET deleted = true WHERE id = $1",
  818. 1,
  819. NULL,
  820. values,
  821. NULL,
  822. NULL,
  823. 0);
  824. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  825. fprintf(stderr, "ERROR: Error deleting network: %s\n", PQresultErrorMessage(res));
  826. }
  827. PQclear(res);
  828. } catch (std::exception &e) {
  829. fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what());
  830. }
  831. } else if (objtype == "_delete_member") {
  832. try {
  833. std::string memberId = (*config)["id"];
  834. std::string networkId = (*config)["nwid"];
  835. const char *values[2] = {
  836. memberId.c_str(),
  837. networkId.c_str()
  838. };
  839. PGresult *res = PQexecParams(conn,
  840. "UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2",
  841. 2,
  842. NULL,
  843. values,
  844. NULL,
  845. NULL,
  846. 0);
  847. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  848. fprintf(stderr, "ERROR: Error deleting member: %s\n", PQresultErrorMessage(res));
  849. }
  850. PQclear(res);
  851. } catch (std::exception &e) {
  852. fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what());
  853. }
  854. } else {
  855. fprintf(stderr, "ERROR: unknown objtype");
  856. }
  857. } catch (std::exception &e) {
  858. fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what());
  859. }
  860. delete config;
  861. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  862. }
  863. PQfinish(conn);
  864. }
  865. void PostgreSQL::onlineNotificationThread()
  866. {
  867. PGconn *conn = PQconnectdb(_path.c_str());
  868. if (PQstatus(conn) == CONNECTION_BAD) {
  869. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  870. PQfinish(conn);
  871. exit(1);
  872. }
  873. _connected = 1;
  874. int64_t lastUpdatedNetworkStatus = 0;
  875. std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;
  876. while (_run == 1) {
  877. if (PQstatus(conn) != CONNECTION_OK) {
  878. fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
  879. exit(-1);
  880. }
  881. std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
  882. {
  883. std::lock_guard<std::mutex> l(_lastOnline_l);
  884. lastOnline.swap(_lastOnline);
  885. }
  886. PGresult *res = NULL;
  887. int qCount = 0;
  888. if (!lastOnline.empty()) {
  889. fprintf(stderr, "Last Online Update\n");
  890. res = PQexec(conn, "BEGIN");
  891. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  892. fprintf(stderr, "ERROR: Error on BEGIN command (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  893. PQclear(res);
  894. exit(1);
  895. }
  896. PQclear(res);
  897. }
  898. for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
  899. lastOnlineCumulative[i->first] = i->second.first;
  900. char nwidTmp[64];
  901. char memTmp[64];
  902. char ipTmp[64];
  903. OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", i->first.first);
  904. OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second);
  905. std::string networkId(nwidTmp);
  906. std::string memberId(memTmp);
  907. int64_t ts = i->second.first;
  908. std::string ipAddr = i->second.second.toIpString(ipTmp);
  909. const char *values[4] = {
  910. networkId.c_str(),
  911. memberId.c_str(),
  912. std::to_string(ts).c_str(),
  913. ipAddr.c_str()
  914. };
  915. res = PQexecParams(conn,
  916. "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ($1, $2, $3, $4)"
  917. "ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated",
  918. 8, // number of parameters
  919. NULL, // oid field. ignore
  920. values, // values for substitution
  921. NULL, // lengths in bytes of each value
  922. NULL,
  923. 0);
  924. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  925. fprintf(stderr, "Error on Member Status upsert: %s\n", PQresultErrorMessage(res));
  926. PQclear(res);
  927. PQexec(conn, "ROLLBACK");
  928. exit(1);
  929. }
  930. PQclear(res);
  931. if ((++qCount) == 1024) {
  932. res = PQexec(conn, "COMMIT");
  933. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  934. fprintf(stderr, "ERROR: Error on commit (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  935. PQclear(res);
  936. PQexec(conn, "ROLLBACK");
  937. exit(1);
  938. }
  939. PQclear(res);
  940. res = PQexec(conn, "BEGIN");
  941. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  942. fprintf(stderr, "ERROR: Error on BEGIN (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  943. PQclear(res);
  944. exit(1);
  945. }
  946. PQclear(res);
  947. qCount = 0;
  948. }
  949. }
  950. if (qCount > 0) {
  951. fprintf(stderr, "qCount is %d\n", qCount);
  952. res = PQexec(conn, "COMMIT");
  953. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  954. fprintf(stderr, "ERROR: Error on commit (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  955. PQclear(res);
  956. PQexec(conn, "ROLLBACK");
  957. exit(1);
  958. }
  959. PQclear(res);
  960. }
  961. const int64_t now = OSUtils::now();
  962. if ((now - lastUpdatedNetworkStatus) > 10000) {
  963. lastUpdatedNetworkStatus = now;
  964. std::vector<std::pair<uint64_t, std::shared_ptr<_Network>>> networks;
  965. {
  966. std::lock_guard<std::mutex> l(_networks_l);
  967. for (auto i = _networks.begin(); i != _networks.end(); ++i) {
  968. networks.push_back(*i);
  969. }
  970. }
  971. int nCount = 0;
  972. if (!networks.empty()) {
  973. fprintf(stderr, "Network update");
  974. res = PQexec(conn, "BEGIN");
  975. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  976. fprintf(stderr, "ERROR: Error on BEGIN command (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  977. PQclear(res);
  978. exit(1);
  979. }
  980. PQclear(res);
  981. }
  982. for (auto i = networks.begin(); i != networks.end(); ++i) {
  983. char tmp[64];
  984. Utils::hex(i->first, tmp);
  985. std::string networkId(tmp);
  986. uint64_t authMemberCount = 0;
  987. uint64_t totalMemberCount = 0;
  988. uint64_t onlineMemberCount = 0;
  989. uint64_t bridgeCount = 0;
  990. uint64_t ts = now;
  991. {
  992. std::lock_guard<std::mutex> l2(i->second->lock);
  993. authMemberCount = i->second->authorizedMembers.size();
  994. totalMemberCount = i->second->members.size();
  995. bridgeCount = i->second->activeBridgeMembers.size();
  996. for (auto m=i->second->members.begin(); m != i->second->members.end(); ++m) {
  997. auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first, m->first));
  998. if (lo != lastOnlineCumulative.end()) {
  999. if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) {
  1000. ++onlineMemberCount;
  1001. } else {
  1002. lastOnlineCumulative.erase(lo);
  1003. }
  1004. }
  1005. }
  1006. }
  1007. const char *values[6] = {
  1008. networkId.c_str(),
  1009. std::to_string(bridgeCount).c_str(),
  1010. std::to_string(authMemberCount).c_str(),
  1011. std::to_string(onlineMemberCount).c_str(),
  1012. std::to_string(totalMemberCount).c_str(),
  1013. std::to_string(ts).c_str()
  1014. };
  1015. res = PQexecParams(conn, "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, "
  1016. "online_member_count, total_member_count, last_modified) VALUES ($1, $2, $3, $4, $5, $6) "
  1017. "ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, "
  1018. "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCDLUDED.online_member_count, "
  1019. "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified",
  1020. 6,
  1021. NULL,
  1022. values,
  1023. NULL,
  1024. NULL,
  1025. 0);
  1026. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1027. fprintf(stderr, "ERROR: Error on Network Satus upsert (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  1028. PQclear(res);
  1029. PQexec(conn, "ROLLBACK");
  1030. exit(1);
  1031. }
  1032. if ((++nCount) == 1024) {
  1033. res = PQexec(conn, "COMMIT");
  1034. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1035. fprintf(stderr, "ERROR: Error on COMMIT (onlineNotificationThread): %s\n" , PQresultErrorMessage(res));
  1036. PQclear(res);
  1037. PQexec(conn, "ROLLBACK");
  1038. exit(1);
  1039. }
  1040. res = PQexec(conn, "BEGIN");
  1041. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1042. fprintf(stderr, "ERROR: Error on BEGIN command (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  1043. PQclear(res);
  1044. exit(1);
  1045. }
  1046. nCount = 0;
  1047. }
  1048. }
  1049. if (nCount > 0) {
  1050. fprintf(stderr, "nCount is %d\n", nCount);
  1051. res = PQexec(conn, "COMMIT");
  1052. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1053. fprintf(stderr, "ERROR: Error on COMMIT (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  1054. PQclear(res);
  1055. PQexec(conn, "ROLLBACK");
  1056. exit(1);
  1057. }
  1058. }
  1059. }
  1060. std::this_thread::sleep_for(std::chrono::milliseconds(250));
  1061. }
  1062. PQfinish(conn);
  1063. }
  1064. #endif //ZT_CONTROLLER_USE_LIBPQ