PostgreSQL.cpp 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210
  1. /*
  2. * ZeroTier One - Network Virtualization Everywhere
  3. * Copyright (C) 2011-2018 ZeroTier, Inc.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. */
  18. #ifdef ZT_CONTROLLER_USE_LIBPQ
  19. #include "PostgreSQL.hpp"
  20. #include "EmbeddedNetworkController.hpp"
  21. #include "../version.h"
  22. #include <libpq-fe.h>
  23. using json = nlohmann::json;
  24. namespace {
  25. static const char *_timestr()
  26. {
  27. time_t t = time(0);
  28. char *ts = ctime(&t);
  29. char *p = ts;
  30. if (!p)
  31. return "";
  32. while (*p) {
  33. if (*p == '\n') {
  34. *p = (char)0;
  35. break;
  36. }
  37. ++p;
  38. }
  39. return ts;
  40. }
  41. }
  42. using namespace ZeroTier;
  43. PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path)
  44. : DB(nc, myId, path)
  45. , _ready(0)
  46. , _connected(1)
  47. , _run(1)
  48. , _waitNoticePrinted(false)
  49. {
  50. _connString = std::string(path);
  51. _readyLock.lock();
  52. _heartbeatThread = std::thread(&PostgreSQL::heartbeat, this);
  53. _membersDbWatcher = std::thread(&PostgreSQL::membersDbWatcher, this);
  54. _networksDbWatcher = std::thread(&PostgreSQL::networksDbWatcher, this);
  55. for (int i = 0; i < ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS; ++i) {
  56. _commitThread[i] = std::thread(&PostgreSQL::commitThread, this);
  57. }
  58. _onlineNotificationThread = std::thread(&PostgreSQL::onlineNotificationThread, this);
  59. }
  60. PostgreSQL::~PostgreSQL()
  61. {
  62. _run = 0;
  63. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  64. _heartbeatThread.join();
  65. _membersDbWatcher.join();
  66. _networksDbWatcher.join();
  67. for (int i = 0; i < ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS; ++i) {
  68. _commitThread[i].join();
  69. }
  70. _onlineNotificationThread.join();
  71. }
  72. bool PostgreSQL::waitForReady()
  73. {
  74. while (_ready < 2) {
  75. if (!_waitNoticePrinted) {
  76. _waitNoticePrinted = true;
  77. fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt());
  78. }
  79. _readyLock.lock();
  80. _readyLock.unlock();
  81. }
  82. return true;
  83. }
  84. bool PostgreSQL::isReady()
  85. {
  86. return ((_ready == 2)&&(_connected));
  87. }
  88. void PostgreSQL::save(nlohmann::json *orig, nlohmann::json &record)
  89. {
  90. if (!record.is_object()) {
  91. return;
  92. }
  93. waitForReady();
  94. if (orig) {
  95. if (*orig != record) {
  96. record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
  97. _commitQueue.post(new nlohmann::json(record));
  98. }
  99. } else {
  100. record["revision"] = 1;
  101. _commitQueue.post(new nlohmann::json(record));
  102. }
  103. }
  104. void PostgreSQL::eraseNetwork(const uint64_t networkId)
  105. {
  106. char tmp2[24];
  107. waitForReady();
  108. Utils::hex(networkId, tmp2);
  109. json *tmp = new json();
  110. (*tmp)["id"] = tmp2;
  111. (*tmp)["objtype"] = "_delete_network";
  112. _commitQueue.post(tmp);
  113. }
  114. void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
  115. {
  116. char tmp2[24];
  117. json *tmp = new json();
  118. Utils::hex(networkId, tmp2);
  119. (*tmp)["nwid"] = tmp2;
  120. Utils::hex(memberId, tmp2);
  121. (*tmp)["id"] = tmp2;
  122. (*tmp)["objtype"] = "_delete_member";
  123. _commitQueue.post(tmp);
  124. }
  125. void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress)
  126. {
  127. std::lock_guard<std::mutex> l(_lastOnline_l);
  128. std::pair<int64_t, InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId, memberId)];
  129. i.first = OSUtils::now();
  130. if (physicalAddress) {
  131. i.second = physicalAddress;
  132. }
  133. }
  134. void PostgreSQL::initializeNetworks(PGconn *conn)
  135. {
  136. try {
  137. if (PQstatus(conn) != CONNECTION_OK) {
  138. fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
  139. exit(1);
  140. }
  141. const char *params[1] = {
  142. _myAddressStr.c_str()
  143. };
  144. PGresult *res = PQexecParams(conn, "SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, "
  145. "enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, "
  146. "remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network "
  147. "WHERE deleted = false AND controller_id = $1",
  148. 1,
  149. NULL,
  150. params,
  151. NULL,
  152. NULL,
  153. 0);
  154. if (PQresultStatus(res) != PGRES_TUPLES_OK) {
  155. fprintf(stderr, "Networks Initialization Failed: %s", PQerrorMessage(conn));
  156. PQclear(res);
  157. exit(1);
  158. }
  159. int numRows = PQntuples(res);
  160. for (int i = 0; i < numRows; ++i) {
  161. json empty;
  162. json config;
  163. fprintf(stderr, "Network Private? %s\n", PQgetvalue(res, i, 8));
  164. config["id"] = PQgetvalue(res, i, 0);
  165. config["nwid"] = PQgetvalue(res, i, 0);
  166. config["creationTime"] = std::stoull(PQgetvalue(res, i, 1));
  167. config["capabilities"] = json::parse(PQgetvalue(res, i, 2));
  168. config["enableBroadcast"] = (strcmp(PQgetvalue(res, i, 3),"t")==0);
  169. config["lastModified"] = std::stoull(PQgetvalue(res, i, 4));
  170. config["mtu"] = std::stoi(PQgetvalue(res, i, 5));
  171. config["multicastLimit"] = std::stoi(PQgetvalue(res, i, 6));
  172. config["name"] = PQgetvalue(res, i, 7);
  173. config["private"] = (strcmp(PQgetvalue(res, i, 8),"t")==0);
  174. config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9));
  175. config["remoteTraceTarget"] = PQgetvalue(res, i, 10);
  176. config["revision"] = std::stoull(PQgetvalue(res, i, 11));
  177. config["rules"] = json::parse(PQgetvalue(res, i, 12));
  178. config["tags"] = json::parse(PQgetvalue(res, i, 13));
  179. config["v4AssignMode"] = json::parse(PQgetvalue(res, i, 14));
  180. config["v6AssignMode"] = json::parse(PQgetvalue(res, i, 15));
  181. config["objtype"] = "network";
  182. config["ipAssignmentPools"] = json::array();
  183. config["routes"] = json::array();
  184. PGresult *r2 = PQexecParams(conn,
  185. "SELECT host(ip_range_start), host(ip_range_end) FROM ztc_network_assignment_pool WHERE network_id = $1",
  186. 1,
  187. NULL,
  188. params,
  189. NULL,
  190. NULL,
  191. 0);
  192. if (PQresultStatus(r2) != PGRES_TUPLES_OK) {
  193. fprintf(stderr, "ERROR: Error retreiving IP pools for network: %s\n", PQresultErrorMessage(r2));
  194. PQclear(r2);
  195. PQclear(res);
  196. exit(1);
  197. }
  198. int n = PQntuples(r2);
  199. for (int j = 0; j < n; ++j) {
  200. json ip;
  201. ip["ipRangeStart"] = PQgetvalue(r2, j, 0);
  202. ip["ipRangeEnd"] = PQgetvalue(r2, j, 1);
  203. config["ipAssignmentPools"].push_back(ip);
  204. }
  205. PQclear(r2);
  206. r2 = PQexecParams(conn,
  207. "SELECT host(address), bits, host(via) FROM ztc_network_route WHERE network_id = $1",
  208. 1,
  209. NULL,
  210. params,
  211. NULL,
  212. NULL,
  213. 0);
  214. if (PQresultStatus(r2) != PGRES_TUPLES_OK) {
  215. fprintf(stderr, "ERROR: Error retreiving routes for network: %s\n", PQresultErrorMessage(r2));
  216. PQclear(r2);
  217. PQclear(res);
  218. exit(1);
  219. }
  220. n = PQntuples(r2);
  221. for (int j = 0; j < n; ++j) {
  222. std::string addr = PQgetvalue(r2, j, 0);
  223. std::string bits = PQgetvalue(r2, j, 1);
  224. std::string via = PQgetvalue(r2, j, 2);
  225. fprintf(stderr, "via: %s", via.c_str());
  226. json route;
  227. route["target"] = addr + "/" + bits;
  228. if (via == "NULL") {
  229. route["via"] = nullptr;
  230. } else {
  231. route["via"] = via;
  232. }
  233. config["routes"].push_back(route);
  234. }
  235. PQclear(r2);
  236. _networkChanged(empty, config, false);
  237. }
  238. PQclear(res);
  239. if (++this->_ready == 2) {
  240. if (_waitNoticePrinted) {
  241. fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
  242. }
  243. _readyLock.unlock();
  244. }
  245. } catch (std::exception &e) {
  246. fprintf(stderr, "ERROR: Error initializing networks: %s", e.what());
  247. exit(-1);
  248. }
  249. }
  250. void PostgreSQL::initializeMembers(PGconn *conn)
  251. {
  252. try {
  253. if (PQstatus(conn) != CONNECTION_OK) {
  254. fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
  255. exit(1);
  256. }
  257. const char *params[1] = {
  258. _myAddressStr.c_str()
  259. };
  260. PGresult *res = PQexecParams(conn,
  261. "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, "
  262. " EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, "
  263. " EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000, "
  264. " m.remote_trace_level, m.remote_trace_target, m.tags, m.v_major, m.v_minor, m.v_rev, m.v_proto, "
  265. " m.no_auto_assign_ips, m.revision "
  266. "FROM ztc_member m "
  267. "INNER JOIN ztc_network n "
  268. " ON n.id = m.network_id "
  269. "WHERE n.controller_id = $1 AND m.deleted = false",
  270. 1,
  271. NULL,
  272. params,
  273. NULL,
  274. NULL,
  275. 0);
  276. if (PQresultStatus(res) != PGRES_TUPLES_OK) {
  277. fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn));
  278. PQclear(res);
  279. exit(1);
  280. }
  281. int numRows = PQntuples(res);
  282. for (int i = 0; i < numRows; ++i) {
  283. json empty;
  284. json config;
  285. std::string memberId(PQgetvalue(res, i, 0));
  286. std::string networkId(PQgetvalue(res, i, 1));
  287. fprintf(stderr, "Initializing %s-%s\n", networkId.c_str(), memberId.c_str());
  288. std::string ctime = PQgetvalue(res, i, 5);
  289. fprintf(stderr, "Creation Time %s\n", ctime.c_str());
  290. config["id"] = memberId;
  291. config["nwid"] = networkId;
  292. config["activeBridge"] = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
  293. config["authorized"] = (strcmp(PQgetvalue(res, i, 3), "t") == 0);
  294. config["capabilities"] = json::parse(PQgetvalue(res, i, 4));
  295. config["creationTime"] = std::stoull(PQgetvalue(res, i, 5));
  296. config["identity"] = PQgetvalue(res, i, 6);
  297. config["lastAuthorizedTime"] = std::stoull(PQgetvalue(res, i, 7));
  298. config["lastDeauthorizedTime"] = std::stoull(PQgetvalue(res, i, 8));
  299. config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9));
  300. config["remoteTraceTarget"] = PQgetvalue(res, i, 10);
  301. config["tags"] = json::parse(PQgetvalue(res, i, 11));
  302. config["vMajor"] = std::stoi(PQgetvalue(res, i, 12));
  303. config["vMinor"] = std::stoi(PQgetvalue(res, i, 13));
  304. config["vRev"] = std::stoi(PQgetvalue(res, i, 14));
  305. config["vProto"] = std::stoi(PQgetvalue(res, i, 15));
  306. config["noAutoAssignIps"] = (strcmp(PQgetvalue(res, i, 16), "t") == 0);
  307. config["revision"] = std::stoull(PQgetvalue(res, i, 17));
  308. config["objtype"] = "member";
  309. config["ipAssignments"] = json::array();
  310. const char *p2[2] = {
  311. memberId.c_str(),
  312. networkId.c_str()
  313. };
  314. PGresult *r2 = PQexecParams(conn,
  315. "SELECT address FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
  316. 2,
  317. NULL,
  318. p2,
  319. NULL,
  320. NULL,
  321. 0);
  322. if (PQresultStatus(r2) != PGRES_TUPLES_OK) {
  323. fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn));
  324. PQclear(r2);
  325. PQclear(res);
  326. exit(1);
  327. }
  328. int n = PQntuples(r2);
  329. for (int j = 0; j < n; ++j) {
  330. config["ipAssignments"].push_back(PQgetvalue(r2, j, 0));
  331. }
  332. _memberChanged(empty, config, false);
  333. }
  334. PQclear(res);
  335. if (++this->_ready == 2) {
  336. if (_waitNoticePrinted) {
  337. fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
  338. }
  339. _readyLock.unlock();
  340. }
  341. } catch (std::exception &e) {
  342. fprintf(stderr, "ERROR: Error initializing members: %s\n", e.what());
  343. exit(-1);
  344. }
  345. }
  346. void PostgreSQL::heartbeat()
  347. {
  348. char publicId[1024];
  349. char hostnameTmp[1024];
  350. _myId.toString(false,publicId);
  351. if (gethostname(hostnameTmp, sizeof(hostnameTmp))!= 0) {
  352. hostnameTmp[0] = (char)0;
  353. } else {
  354. for (int i = 0; i < sizeof(hostnameTmp); ++i) {
  355. if ((hostnameTmp[i] == '.')||(hostnameTmp[i] == 0)) {
  356. hostnameTmp[i] = (char)0;
  357. break;
  358. }
  359. }
  360. }
  361. const char *controllerId = _myAddressStr.c_str();
  362. const char *publicIdentity = publicId;
  363. const char *hostname = hostnameTmp;
  364. PGconn *conn = PQconnectdb(_path.c_str());
  365. if (PQstatus(conn) == CONNECTION_BAD) {
  366. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  367. PQfinish(conn);
  368. exit(1);
  369. }
  370. while (_run == 1) {
  371. if(PQstatus(conn) != CONNECTION_OK) {
  372. PQfinish(conn);
  373. conn = PQconnectdb(_path.c_str());
  374. }
  375. if (conn) {
  376. const char *values[8] = {
  377. controllerId,
  378. hostname,
  379. std::to_string(OSUtils::now()).c_str(),
  380. publicIdentity,
  381. std::to_string(ZEROTIER_ONE_VERSION_MAJOR).c_str(),
  382. std::to_string(ZEROTIER_ONE_VERSION_MINOR).c_str(),
  383. std::to_string(ZEROTIER_ONE_VERSION_REVISION).c_str(),
  384. std::to_string(ZEROTIER_ONE_VERSION_BUILD).c_str()
  385. };
  386. int lengths[8] = {
  387. (int)strlen(values[0]),
  388. (int)strlen(values[1]),
  389. (int)strlen(values[2]),
  390. (int)strlen(values[3]),
  391. (int)strlen(values[4]),
  392. (int)strlen(values[5]),
  393. (int)strlen(values[6]),
  394. (int)strlen(values[7])
  395. };
  396. int binary[8] = {0,0,0,0,0,0,0,0};
  397. PGresult *res = PQexecParams(conn,
  398. "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build) "
  399. "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8) "
  400. "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
  401. "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
  402. "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev",
  403. 8, // number of parameters
  404. NULL, // oid field. ignore
  405. values, // values for substitution
  406. lengths, // lengths in bytes of each value
  407. binary, // binary?
  408. 0);
  409. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  410. fprintf(stderr, "Heartbeat Update Failed: %s\n", PQresultErrorMessage(res));
  411. }
  412. PQclear(res);
  413. }
  414. std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  415. }
  416. PQfinish(conn);
  417. conn = NULL;
  418. }
  419. void PostgreSQL::membersDbWatcher()
  420. {
  421. PGconn *conn = PQconnectdb(_path.c_str());
  422. if (PQstatus(conn) == CONNECTION_BAD) {
  423. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  424. PQfinish(conn);
  425. exit(1);
  426. }
  427. initializeMembers(conn);
  428. char buf[11] = {0};
  429. std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf));
  430. PGresult *res = PQexec(conn, cmd.c_str());
  431. if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
  432. fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res));
  433. PQclear(res);
  434. PQfinish(conn);
  435. exit(1);
  436. }
  437. PQclear(res); res = NULL;
  438. while(_run == 1) {
  439. if (PQstatus(conn) != CONNECTION_OK) {
  440. fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres.");
  441. exit(-1);
  442. }
  443. PGnotify *notify = NULL;
  444. PQconsumeInput(conn);
  445. while ((notify = PQnotifies(conn)) != NULL) {
  446. fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra);
  447. try {
  448. json tmp(json::parse(notify->extra));
  449. json &ov = tmp["old_val"];
  450. json &nv = tmp["new_val"];
  451. json oldConfig, newConfig;
  452. if (ov.is_object()) oldConfig = ov;
  453. if (nv.is_object()) newConfig = nv;
  454. if (oldConfig.is_object() || newConfig.is_object()) {
  455. _memberChanged(oldConfig,newConfig,(this->_ready>=2));
  456. }
  457. } catch (...) {} // ignore bad records
  458. free(notify);
  459. }
  460. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  461. }
  462. PQfinish(conn);
  463. conn = NULL;
  464. }
  465. void PostgreSQL::networksDbWatcher()
  466. {
  467. PGconn *conn = PQconnectdb(_path.c_str());
  468. if (PQstatus(conn) == CONNECTION_BAD) {
  469. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  470. PQfinish(conn);
  471. exit(1);
  472. }
  473. initializeNetworks(conn);
  474. char buf[11] = {0};
  475. std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf));
  476. PGresult *res = PQexec(conn, cmd.c_str());
  477. if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
  478. fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res));
  479. PQclear(res);
  480. PQfinish(conn);
  481. exit(1);
  482. }
  483. PQclear(res); res = NULL;
  484. while(_run == 1) {
  485. if (PQstatus(conn) != CONNECTION_OK) {
  486. fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres.");
  487. exit(-1);
  488. }
  489. PGnotify *notify = NULL;
  490. PQconsumeInput(conn);
  491. while ((notify = PQnotifies(conn)) != NULL) {
  492. fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra);
  493. try {
  494. json tmp(json::parse(notify->extra));
  495. json &ov = tmp["old_val"];
  496. json &nv = tmp["new_val"];
  497. json oldConfig, newConfig;
  498. if (ov.is_object()) oldConfig = ov;
  499. if (nv.is_object()) newConfig = nv;
  500. if (oldConfig.is_object()||newConfig.is_object()) {
  501. _networkChanged(oldConfig,newConfig,(this->_ready >= 2));
  502. }
  503. } catch (...) {} // ignore bad records
  504. free(notify);
  505. }
  506. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  507. }
  508. PQfinish(conn);
  509. conn = NULL;
  510. }
  511. void PostgreSQL::commitThread()
  512. {
  513. PGconn *conn = PQconnectdb(_path.c_str());
  514. if (PQstatus(conn) == CONNECTION_BAD) {
  515. fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn));
  516. PQfinish(conn);
  517. exit(1);
  518. }
  519. json *config = nullptr;
  520. while(_commitQueue.get(config)&(_run == 1)) {
  521. if (!config) {
  522. continue;
  523. }
  524. if (PQstatus(conn) == CONNECTION_BAD) {
  525. fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn));
  526. PQfinish(conn);
  527. exit(1);
  528. }
  529. try {
  530. const std::string objtype = (*config)["objtype"];
  531. if (objtype == "member") {
  532. try {
  533. std::string memberId = (*config)["id"];
  534. std::string networkId = (*config)["nwid"];
  535. std::string identity = (*config)["identity"];
  536. std::string target = "NULL";
  537. if (!(*config)["remoteTraceTarget"].is_null()) {
  538. target = (*config)["remoteTraceTarget"];
  539. }
  540. const char *values[19] = {
  541. memberId.c_str(),
  542. networkId.c_str(),
  543. ((*config)["activeBridge"] ? "true" : "false"),
  544. ((*config)["authorized"] ? "true" : "false"),
  545. OSUtils::jsonDump((*config)["capabilities"], -1).c_str(),
  546. identity.c_str(),
  547. std::to_string((long long)(*config)["lastAuthorizedTime"]).c_str(),
  548. std::to_string((long long)(*config)["lastDeauthorizedTime"]).c_str(),
  549. ((*config)["noAutoAssignIps"] ? "true" : "false"),
  550. std::to_string((int)(*config)["remoteTraceLevel"]).c_str(),
  551. (target == "NULL") ? NULL : target.c_str(),
  552. std::to_string((unsigned long long)(*config)["revision"]).c_str(),
  553. OSUtils::jsonDump((*config)["tags"], -1).c_str(),
  554. std::to_string((int)(*config)["vMajor"]).c_str(),
  555. std::to_string((int)(*config)["vMinor"]).c_str(),
  556. std::to_string((int)(*config)["vRev"]).c_str(),
  557. std::to_string((int)(*config)["vProto"]).c_str()
  558. };
  559. PGresult *res = PQexecParams(conn,
  560. "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
  561. "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
  562. "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
  563. "VALUES ($1, $2, $3, $4, $5, $6, "
  564. "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
  565. "$9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET "
  566. "active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, "
  567. "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, "
  568. "last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, "
  569. "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, "
  570. "revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, "
  571. "v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto",
  572. 17,
  573. NULL,
  574. values,
  575. NULL,
  576. NULL,
  577. 0);
  578. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  579. fprintf(stderr, "ERROR: Error updating member: %s\n", PQresultErrorMessage(res));
  580. fprintf(stderr, "%s", OSUtils::jsonDump(*config, 2).c_str());
  581. PQclear(res);
  582. continue;
  583. }
  584. PQclear(res);
  585. res = PQexec(conn, "BEGIN");
  586. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  587. fprintf(stderr, "ERROR: Error beginning transaction: %s\n", PQresultErrorMessage(res));
  588. PQclear(res);
  589. continue;
  590. }
  591. PQclear(res);
  592. const char *v2[2] = {
  593. memberId.c_str(),
  594. networkId.c_str()
  595. };
  596. res = PQexecParams(conn,
  597. "DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
  598. 2,
  599. NULL,
  600. v2,
  601. NULL,
  602. NULL,
  603. 0);
  604. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  605. fprintf(stderr, "ERROR: Error updating IP address assignments: %s\n", PQresultErrorMessage(res));
  606. PQclear(res);
  607. PQclear(PQexec(conn, "ROLLBACK"));;
  608. continue;
  609. }
  610. PQclear(res);
  611. for (auto i = (*config)["ipAssignments"].begin(); i != (*config)["ipAssignments"].end(); ++i) {
  612. std::string addr = *i;
  613. const char *v3[3] = {
  614. memberId.c_str(),
  615. networkId.c_str(),
  616. addr.c_str()
  617. };
  618. res = PQexecParams(conn,
  619. "INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3)",
  620. 3,
  621. NULL,
  622. v3,
  623. NULL,
  624. NULL,
  625. 0);
  626. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  627. fprintf(stderr, "ERROR: Error setting IP addresses for member: %s\n", PQresultErrorMessage(res));
  628. PQclear(res);
  629. PQclear(PQexec(conn, "ROLLBACK"));
  630. continue;
  631. }
  632. }
  633. res = PQexec(conn, "COMMIT");
  634. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  635. fprintf(stderr, "ERROR: Error committing ip address data: %s\n", PQresultErrorMessage(res));
  636. }
  637. PQclear(res);
  638. } catch (std::exception &e) {
  639. fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
  640. }
  641. } else if (objtype == "network") {
  642. try {
  643. std::string id = (*config)["id"];
  644. std::string controllerId = _myAddressStr.c_str();
  645. std::string name = (*config)["name"];
  646. std::string remoteTraceTarget("NULL");
  647. if (!(*config)["remoteTraceTarget"].is_null()) {
  648. remoteTraceTarget = (*config)["remoteTraceTarget"];
  649. }
  650. std::string rulesSource = (*config)["rulesSource"];
  651. const char *values[16] = {
  652. id.c_str(),
  653. controllerId.c_str(),
  654. OSUtils::jsonDump((*config)["capabilitles"], -1).c_str(),
  655. ((*config)["enableBroadcast"] ? "true" : "false"),
  656. std::to_string(OSUtils::now()).c_str(),
  657. std::to_string((int)(*config)["mtu"]).c_str(),
  658. std::to_string((int)(*config)["multicastLimit"]).c_str(),
  659. name.c_str(),
  660. ((*config)["private"] ? "true" : "false"),
  661. std::to_string((int)(*config)["remoteTraceLevel"]).c_str(),
  662. (remoteTraceTarget == "NULL" ? NULL : remoteTraceTarget.c_str()),
  663. OSUtils::jsonDump((*config)["rules"], -1).c_str(),
  664. rulesSource.c_str(),
  665. OSUtils::jsonDump((*config)["tags"], -1).c_str(),
  666. OSUtils::jsonDump((*config)["v4AssignMode"],-1).c_str(),
  667. OSUtils::jsonDump((*config)["v6AssignMode"], -1).c_str(),
  668. };
  669. PGresult *res = PQexecParams(conn,
  670. "UPDATE ztc_network SET controller_id = $2, capabilities = $3, enable_broadcast = $4, "
  671. "last_updated = $5, mtu = $6, multicast_limit = $7, name = $8, private = $9, "
  672. "remote_trace_level = $10, remote_trace_target = $11, rules = $12, rules_source = $13, "
  673. "tags = $14, v4_assign_mode = $15, v6_assign_mode = $16 "
  674. "WHERE id = $1",
  675. 16,
  676. NULL,
  677. values,
  678. NULL,
  679. NULL,
  680. 0);
  681. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  682. fprintf(stderr, "ERROR: Error updating network record: %s\n", PQresultErrorMessage(res));
  683. PQclear(res);
  684. continue;
  685. }
  686. PQclear(res);
  687. res = PQexec(conn, "BEGIN");
  688. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  689. fprintf(stderr, "ERROR: Error beginnning transaction: %s\n", PQresultErrorMessage(res));
  690. PQclear(res);
  691. continue;
  692. }
  693. PQclear(res);
  694. const char *params[1] = {
  695. id.c_str()
  696. };
  697. res = PQexecParams(conn,
  698. "DELETE FROM ztc_network_assignment_pool WHERE network_id = $1",
  699. 1,
  700. NULL,
  701. params,
  702. NULL,
  703. NULL,
  704. 0);
  705. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  706. fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res));
  707. PQclear(res);
  708. PQclear(PQexec(conn, "ROLLBACK"));
  709. continue;
  710. }
  711. PQclear(res);
  712. auto pool = (*config)["ipAssignmentPools"];
  713. bool err = false;
  714. for (auto i = pool.begin(); i != pool.end(); ++i) {
  715. std::string start = (*i)["ipRangeStart"];
  716. std::string end = (*i)["ipRangeEnd"];
  717. const char *p[3] = {
  718. id.c_str(),
  719. start.c_str(),
  720. end.c_str()
  721. };
  722. res = PQexecParams(conn,
  723. "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) "
  724. "VALUES ($1, $2, $3)",
  725. 3,
  726. NULL,
  727. p,
  728. NULL,
  729. NULL,
  730. 0);
  731. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  732. fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res));
  733. PQclear(res);
  734. err = true;
  735. break;
  736. }
  737. PQclear(res);
  738. }
  739. if (err) {
  740. PQclear(PQexec(conn, "ROLLBACK"));
  741. continue;
  742. }
  743. res = PQexecParams(conn,
  744. "DELETE FROM ztc_network_route WHERE network_id = $1",
  745. 1,
  746. NULL,
  747. params,
  748. NULL,
  749. NULL,
  750. 0);
  751. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  752. fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res));
  753. PQclear(res);
  754. PQclear(PQexec(conn, "ROLLBACK"));
  755. continue;
  756. }
  757. auto routes = (*config)["routes"];
  758. err = false;
  759. for (auto i = routes.begin(); i != routes.end(); ++i) {
  760. std::string t = (*i)["target"];
  761. std::vector<std::string> target;
  762. std::istringstream f(t);
  763. std::string s;
  764. while(std::getline(f, s, '/')) {
  765. target.push_back(s);
  766. }
  767. if (target.empty() || target.size() != 2) {
  768. continue;
  769. }
  770. std::string targetAddr = target[0];
  771. std::string targetBits = target[1];
  772. std::string via = "NULL";
  773. if (!(*i)["via"].is_null()) {
  774. via = (*i)["via"];
  775. }
  776. const char *p[4] = {
  777. id.c_str(),
  778. targetAddr.c_str(),
  779. targetBits.c_str(),
  780. (via == "NULL" ? NULL : via.c_str()),
  781. };
  782. res = PQexecParams(conn,
  783. "INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)",
  784. 4,
  785. NULL,
  786. p,
  787. NULL,
  788. NULL,
  789. 0);
  790. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  791. fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res));
  792. PQclear(res);
  793. err = true;
  794. break;
  795. }
  796. PQclear(res);
  797. }
  798. if (err) {
  799. PQclear(PQexec(conn, "ROLLBAcK"));
  800. continue;
  801. }
  802. res = PQexec(conn, "COMMIT");
  803. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  804. fprintf(stderr, "ERROR: Error committing network update: %s\n", PQresultErrorMessage(res));
  805. }
  806. PQclear(res);
  807. } catch (std::exception &e) {
  808. fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
  809. }
  810. } else if (objtype == "trace") {
  811. fprintf(stderr, "ERROR: Trace not yet implemented");
  812. } else if (objtype == "_delete_network") {
  813. try {
  814. std::string networkId = (*config)["nwid"];
  815. const char *values[1] = {
  816. networkId.c_str()
  817. };
  818. PGresult * res = PQexecParams(conn,
  819. "UPDATE ztc_network SET deleted = true WHERE id = $1",
  820. 1,
  821. NULL,
  822. values,
  823. NULL,
  824. NULL,
  825. 0);
  826. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  827. fprintf(stderr, "ERROR: Error deleting network: %s\n", PQresultErrorMessage(res));
  828. }
  829. PQclear(res);
  830. } catch (std::exception &e) {
  831. fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what());
  832. }
  833. } else if (objtype == "_delete_member") {
  834. try {
  835. std::string memberId = (*config)["id"];
  836. std::string networkId = (*config)["nwid"];
  837. const char *values[2] = {
  838. memberId.c_str(),
  839. networkId.c_str()
  840. };
  841. PGresult *res = PQexecParams(conn,
  842. "UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2",
  843. 2,
  844. NULL,
  845. values,
  846. NULL,
  847. NULL,
  848. 0);
  849. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  850. fprintf(stderr, "ERROR: Error deleting member: %s\n", PQresultErrorMessage(res));
  851. }
  852. PQclear(res);
  853. } catch (std::exception &e) {
  854. fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what());
  855. }
  856. } else {
  857. fprintf(stderr, "ERROR: unknown objtype");
  858. }
  859. } catch (std::exception &e) {
  860. fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what());
  861. }
  862. delete config;
  863. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  864. }
  865. PQfinish(conn);
  866. }
  867. void PostgreSQL::onlineNotificationThread()
  868. {
  869. PGconn *conn = PQconnectdb(_path.c_str());
  870. if (PQstatus(conn) == CONNECTION_BAD) {
  871. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  872. PQfinish(conn);
  873. exit(1);
  874. }
  875. _connected = 1;
  876. int64_t lastUpdatedNetworkStatus = 0;
  877. std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;
  878. while (_run == 1) {
  879. if (PQstatus(conn) != CONNECTION_OK) {
  880. fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
  881. exit(-1);
  882. }
  883. std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
  884. {
  885. std::lock_guard<std::mutex> l(_lastOnline_l);
  886. lastOnline.swap(_lastOnline);
  887. }
  888. PGresult *res = NULL;
  889. int qCount = 0;
  890. if (!lastOnline.empty()) {
  891. fprintf(stderr, "Last Online Update\n");
  892. res = PQexec(conn, "BEGIN");
  893. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  894. fprintf(stderr, "ERROR: Error on BEGIN command (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  895. PQclear(res);
  896. exit(1);
  897. }
  898. PQclear(res);
  899. }
  900. for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
  901. uint64_t nwid_i = i->first.first;
  902. char nwidTmp[64];
  903. char memTmp[64];
  904. char ipTmp[64];
  905. OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i);
  906. OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second);
  907. auto found = _networks.find(nwid_i);
  908. if (found == _networks.end()) {
  909. fprintf(stderr, "Network %s not found.\n", nwidTmp);
  910. continue;
  911. }
  912. lastOnlineCumulative[i->first] = i->second.first;
  913. std::string networkId(nwidTmp);
  914. std::string memberId(memTmp);
  915. int64_t ts = i->second.first;
  916. std::string ipAddr = i->second.second.toIpString(ipTmp);
  917. const char *values[4] = {
  918. networkId.c_str(),
  919. memberId.c_str(),
  920. (ipAddr.empty() ? NULL : ipAddr.c_str()),
  921. std::to_string(ts).c_str(),
  922. };
  923. res = PQexecParams(conn,
  924. "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ($1, $2, $3, TO_TIMESTAMP($4::double precision/1000)) "
  925. "ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated",
  926. 4, // number of parameters
  927. NULL, // oid field. ignore
  928. values, // values for substitution
  929. NULL, // lengths in bytes of each value
  930. NULL,
  931. 0);
  932. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  933. fprintf(stderr, "Error on Member Status upsert: %s\n", PQresultErrorMessage(res));
  934. PQclear(res);
  935. PQclear(PQexec(conn, "ROLLBACK"));
  936. continue;
  937. }
  938. PQclear(res);
  939. if ((++qCount) == 1024) {
  940. res = PQexec(conn, "COMMIT");
  941. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  942. fprintf(stderr, "ERROR: Error on commit (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  943. PQclear(res);
  944. PQexec(conn, "ROLLBACK");
  945. exit(1);
  946. }
  947. PQclear(res);
  948. res = PQexec(conn, "BEGIN");
  949. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  950. fprintf(stderr, "ERROR: Error on BEGIN (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  951. PQclear(res);
  952. exit(1);
  953. }
  954. PQclear(res);
  955. qCount = 0;
  956. }
  957. }
  958. if (qCount > 0) {
  959. fprintf(stderr, "qCount is %d\n", qCount);
  960. res = PQexec(conn, "COMMIT");
  961. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  962. fprintf(stderr, "ERROR: Error on commit (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  963. PQclear(res);
  964. PQexec(conn, "ROLLBACK");
  965. exit(1);
  966. }
  967. PQclear(res);
  968. }
  969. const int64_t now = OSUtils::now();
  970. if ((now - lastUpdatedNetworkStatus) > 10000) {
  971. lastUpdatedNetworkStatus = now;
  972. std::vector<std::pair<uint64_t, std::shared_ptr<_Network>>> networks;
  973. {
  974. std::lock_guard<std::mutex> l(_networks_l);
  975. for (auto i = _networks.begin(); i != _networks.end(); ++i) {
  976. networks.push_back(*i);
  977. }
  978. }
  979. int nCount = 0;
  980. if (!networks.empty()) {
  981. fprintf(stderr, "Network update\n");
  982. res = PQexec(conn, "BEGIN");
  983. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  984. fprintf(stderr, "ERROR: Error on BEGIN command (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  985. PQclear(res);
  986. exit(1);
  987. }
  988. PQclear(res);
  989. }
  990. for (auto i = networks.begin(); i != networks.end(); ++i) {
  991. char tmp[64];
  992. Utils::hex(i->first, tmp);
  993. std::string networkId(tmp);
  994. fprintf(stderr, "Updating network status for network: %s\n", networkId.c_str());
  995. uint64_t authMemberCount = 0;
  996. uint64_t totalMemberCount = 0;
  997. uint64_t onlineMemberCount = 0;
  998. uint64_t bridgeCount = 0;
  999. uint64_t ts = now;
  1000. {
  1001. std::lock_guard<std::mutex> l2(i->second->lock);
  1002. authMemberCount = i->second->authorizedMembers.size();
  1003. totalMemberCount = i->second->members.size();
  1004. bridgeCount = i->second->activeBridgeMembers.size();
  1005. for (auto m=i->second->members.begin(); m != i->second->members.end(); ++m) {
  1006. auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first, m->first));
  1007. if (lo != lastOnlineCumulative.end()) {
  1008. if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) {
  1009. ++onlineMemberCount;
  1010. } else {
  1011. lastOnlineCumulative.erase(lo);
  1012. }
  1013. }
  1014. }
  1015. }
  1016. const char *values[6] = {
  1017. networkId.c_str(),
  1018. std::to_string(bridgeCount).c_str(),
  1019. std::to_string(authMemberCount).c_str(),
  1020. std::to_string(onlineMemberCount).c_str(),
  1021. std::to_string(totalMemberCount).c_str(),
  1022. std::to_string(ts).c_str()
  1023. };
  1024. res = PQexecParams(conn, "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, "
  1025. "online_member_count, total_member_count, last_modified) VALUES ($1, $2, $3, $4, $5, TO_TIMESTAMP($6::double precision/1000)) "
  1026. "ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, "
  1027. "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, "
  1028. "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified",
  1029. 6,
  1030. NULL,
  1031. values,
  1032. NULL,
  1033. NULL,
  1034. 0);
  1035. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1036. fprintf(stderr, "ERROR: Error on Network Status upsert (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  1037. PQclear(res);
  1038. PQexec(conn, "ROLLBACK");
  1039. exit(1);
  1040. }
  1041. if ((++nCount) == 1024) {
  1042. res = PQexec(conn, "COMMIT");
  1043. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1044. fprintf(stderr, "ERROR: Error on COMMIT (onlineNotificationThread): %s\n" , PQresultErrorMessage(res));
  1045. PQclear(res);
  1046. PQexec(conn, "ROLLBACK");
  1047. exit(1);
  1048. }
  1049. res = PQexec(conn, "BEGIN");
  1050. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1051. fprintf(stderr, "ERROR: Error on BEGIN command (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  1052. PQclear(res);
  1053. exit(1);
  1054. }
  1055. nCount = 0;
  1056. }
  1057. }
  1058. if (nCount > 0) {
  1059. fprintf(stderr, "nCount is %d\n", nCount);
  1060. res = PQexec(conn, "COMMIT");
  1061. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1062. fprintf(stderr, "ERROR: Error on COMMIT (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
  1063. PQclear(res);
  1064. PQexec(conn, "ROLLBACK");
  1065. exit(1);
  1066. }
  1067. }
  1068. }
  1069. std::this_thread::sleep_for(std::chrono::milliseconds(250));
  1070. }
  1071. PQfinish(conn);
  1072. }
  1073. #endif //ZT_CONTROLLER_USE_LIBPQ