PostgreSQL.cpp 46 KB


  1. /*
  2. * ZeroTier One - Network Virtualization Everywhere
  3. * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. * --
  19. *
  20. * You can be released from the requirements of the license by purchasing
  21. * a commercial license. Buying such a license is mandatory as soon as you
  22. * develop commercial closed-source software that incorporates or links
  23. * directly against ZeroTier software without disclosing the source code
  24. * of your own application.
  25. */
  26. #ifdef ZT_CONTROLLER_USE_LIBPQ
  27. #include "PostgreSQL.hpp"
  28. #include "EmbeddedNetworkController.hpp"
  29. #include "RabbitMQ.hpp"
  30. #include "../version.h"
  31. #include <libpq-fe.h>
  32. #include <sstream>
  33. #include <amqp.h>
  34. #include <amqp_tcp_socket.h>
  35. using json = nlohmann::json;
  36. namespace {
  37. static const int DB_MINIMUM_VERSION = 5;
  38. static const char *_timestr()
  39. {
  40. time_t t = time(0);
  41. char *ts = ctime(&t);
  42. char *p = ts;
  43. if (!p)
  44. return "";
  45. while (*p) {
  46. if (*p == '\n') {
  47. *p = (char)0;
  48. break;
  49. }
  50. ++p;
  51. }
  52. return ts;
  53. }
  54. std::string join(const std::vector<std::string> &elements, const char * const separator)
  55. {
  56. switch(elements.size()) {
  57. case 0:
  58. return "";
  59. case 1:
  60. return elements[0];
  61. default:
  62. std::ostringstream os;
  63. std::copy(elements.begin(), elements.end()-1, std::ostream_iterator<std::string>(os, separator));
  64. os << *elements.rbegin();
  65. return os.str();
  66. }
  67. }
  68. }
  69. using namespace ZeroTier;
  70. PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
  71. : DB(myId, path)
  72. , _ready(0)
  73. , _connected(1)
  74. , _run(1)
  75. , _waitNoticePrinted(false)
  76. , _listenPort(listenPort)
  77. , _mqc(mqc)
  78. {
  79. _connString = std::string(path) + " application_name=controller_" +_myAddressStr;
  80. // Database Schema Version Check
  81. PGconn *conn = getPgConn();
  82. if (PQstatus(conn) != CONNECTION_OK) {
  83. fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
  84. exit(1);
  85. }
  86. PGresult *res = PQexec(conn, "SELECT version FROM ztc_database");
  87. if (PQresultStatus(res) != PGRES_TUPLES_OK) {
  88. fprintf(stderr, "Error determining database version");
  89. exit(1);
  90. }
  91. if (PQntuples(res) != 1) {
  92. fprintf(stderr, "Invalid number of db version tuples returned.");
  93. exit(1);
  94. }
  95. int dbVersion = std::stoi(PQgetvalue(res, 0, 0));
  96. if (dbVersion < DB_MINIMUM_VERSION) {
  97. fprintf(stderr, "Central database schema version too low. This controller version requires a minimum schema version of %d. Please upgrade your Central instance", DB_MINIMUM_VERSION);
  98. exit(1);
  99. }
  100. PQclear(res);
  101. res = NULL;
  102. PQfinish(conn);
  103. conn = NULL;
  104. _readyLock.lock();
  105. _heartbeatThread = std::thread(&PostgreSQL::heartbeat, this);
  106. _membersDbWatcher = std::thread(&PostgreSQL::membersDbWatcher, this);
  107. _networksDbWatcher = std::thread(&PostgreSQL::networksDbWatcher, this);
  108. for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) {
  109. _commitThread[i] = std::thread(&PostgreSQL::commitThread, this);
  110. }
  111. _onlineNotificationThread = std::thread(&PostgreSQL::onlineNotificationThread, this);
  112. }
  113. PostgreSQL::~PostgreSQL()
  114. {
  115. _run = 0;
  116. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  117. _heartbeatThread.join();
  118. _membersDbWatcher.join();
  119. _networksDbWatcher.join();
  120. for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) {
  121. _commitThread[i].join();
  122. }
  123. _onlineNotificationThread.join();
  124. }
  125. bool PostgreSQL::waitForReady()
  126. {
  127. while (_ready < 2) {
  128. if (!_waitNoticePrinted) {
  129. _waitNoticePrinted = true;
  130. fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt());
  131. }
  132. _readyLock.lock();
  133. _readyLock.unlock();
  134. }
  135. return true;
  136. }
  137. bool PostgreSQL::isReady()
  138. {
  139. return ((_ready == 2)&&(_connected));
  140. }
  141. void PostgreSQL::save(nlohmann::json *orig, nlohmann::json &record)
  142. {
  143. try {
  144. if (!record.is_object()) {
  145. return;
  146. }
  147. waitForReady();
  148. if (orig) {
  149. if (*orig != record) {
  150. record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
  151. _commitQueue.post(new nlohmann::json(record));
  152. }
  153. } else {
  154. record["revision"] = 1;
  155. _commitQueue.post(new nlohmann::json(record));
  156. }
  157. } catch (std::exception &e) {
  158. fprintf(stderr, "Error on PostgreSQL::save: %s\n", e.what());
  159. } catch (...) {
  160. fprintf(stderr, "Unknown error on PostgreSQL::save\n");
  161. }
  162. }
  163. void PostgreSQL::eraseNetwork(const uint64_t networkId)
  164. {
  165. char tmp2[24];
  166. waitForReady();
  167. Utils::hex(networkId, tmp2);
  168. json *tmp = new json();
  169. (*tmp)["id"] = tmp2;
  170. (*tmp)["objtype"] = "_delete_network";
  171. _commitQueue.post(tmp);
  172. }
  173. void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
  174. {
  175. char tmp2[24];
  176. json *tmp = new json();
  177. Utils::hex(networkId, tmp2);
  178. (*tmp)["nwid"] = tmp2;
  179. Utils::hex(memberId, tmp2);
  180. (*tmp)["id"] = tmp2;
  181. (*tmp)["objtype"] = "_delete_member";
  182. _commitQueue.post(tmp);
  183. }
  184. void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress)
  185. {
  186. std::lock_guard<std::mutex> l(_lastOnline_l);
  187. std::pair<int64_t, InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId, memberId)];
  188. i.first = OSUtils::now();
  189. if (physicalAddress) {
  190. i.second = physicalAddress;
  191. }
  192. }
  193. void PostgreSQL::initializeNetworks(PGconn *conn)
  194. {
  195. try {
  196. if (PQstatus(conn) != CONNECTION_OK) {
  197. fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
  198. exit(1);
  199. }
  200. const char *params[1] = {
  201. _myAddressStr.c_str()
  202. };
  203. PGresult *res = PQexecParams(conn, "SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, "
  204. "enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, "
  205. "remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network "
  206. "WHERE deleted = false AND controller_id = $1",
  207. 1,
  208. NULL,
  209. params,
  210. NULL,
  211. NULL,
  212. 0);
  213. if (PQresultStatus(res) != PGRES_TUPLES_OK) {
  214. fprintf(stderr, "Networks Initialization Failed: %s", PQerrorMessage(conn));
  215. PQclear(res);
  216. exit(1);
  217. }
  218. int numRows = PQntuples(res);
  219. for (int i = 0; i < numRows; ++i) {
  220. json empty;
  221. json config;
  222. const char *nwidparam[1] = {
  223. PQgetvalue(res, i, 0)
  224. };
  225. config["id"] = PQgetvalue(res, i, 0);
  226. config["nwid"] = PQgetvalue(res, i, 0);
  227. try {
  228. config["creationTime"] = std::stoull(PQgetvalue(res, i, 1));
  229. } catch (std::exception &e) {
  230. config["creationTime"] = 0ULL;
  231. //fprintf(stderr, "Error converting creation time: %s\n", PQgetvalue(res, i, 1));
  232. }
  233. config["capabilities"] = json::parse(PQgetvalue(res, i, 2));
  234. config["enableBroadcast"] = (strcmp(PQgetvalue(res, i, 3),"t")==0);
  235. try {
  236. config["lastModified"] = std::stoull(PQgetvalue(res, i, 4));
  237. } catch (std::exception &e) {
  238. config["lastModified"] = 0ULL;
  239. //fprintf(stderr, "Error converting last modified: %s\n", PQgetvalue(res, i, 4));
  240. }
  241. try {
  242. config["mtu"] = std::stoi(PQgetvalue(res, i, 5));
  243. } catch (std::exception &e) {
  244. config["mtu"] = 2800;
  245. }
  246. try {
  247. config["multicastLimit"] = std::stoi(PQgetvalue(res, i, 6));
  248. } catch (std::exception &e) {
  249. config["multicastLimit"] = 64;
  250. }
  251. config["name"] = PQgetvalue(res, i, 7);
  252. config["private"] = (strcmp(PQgetvalue(res, i, 8),"t")==0);
  253. try {
  254. config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9));
  255. } catch (std::exception &e) {
  256. config["remoteTraceLevel"] = 0;
  257. }
  258. config["remoteTraceTarget"] = PQgetvalue(res, i, 10);
  259. try {
  260. config["revision"] = std::stoull(PQgetvalue(res, i, 11));
  261. } catch (std::exception &e) {
  262. config["revision"] = 0ULL;
  263. //fprintf(stderr, "Error converting revision: %s\n", PQgetvalue(res, i, 11));
  264. }
  265. config["rules"] = json::parse(PQgetvalue(res, i, 12));
  266. config["tags"] = json::parse(PQgetvalue(res, i, 13));
  267. config["v4AssignMode"] = json::parse(PQgetvalue(res, i, 14));
  268. config["v6AssignMode"] = json::parse(PQgetvalue(res, i, 15));
  269. config["objtype"] = "network";
  270. config["ipAssignmentPools"] = json::array();
  271. config["routes"] = json::array();
  272. PGresult *r2 = PQexecParams(conn,
  273. "SELECT host(ip_range_start), host(ip_range_end) FROM ztc_network_assignment_pool WHERE network_id = $1",
  274. 1,
  275. NULL,
  276. nwidparam,
  277. NULL,
  278. NULL,
  279. 0);
  280. if (PQresultStatus(r2) != PGRES_TUPLES_OK) {
  281. fprintf(stderr, "ERROR: Error retreiving IP pools for network: %s\n", PQresultErrorMessage(r2));
  282. PQclear(r2);
  283. PQclear(res);
  284. exit(1);
  285. }
  286. int n = PQntuples(r2);
  287. for (int j = 0; j < n; ++j) {
  288. json ip;
  289. ip["ipRangeStart"] = PQgetvalue(r2, j, 0);
  290. ip["ipRangeEnd"] = PQgetvalue(r2, j, 1);
  291. config["ipAssignmentPools"].push_back(ip);
  292. }
  293. PQclear(r2);
  294. r2 = PQexecParams(conn,
  295. "SELECT host(address), bits, host(via) FROM ztc_network_route WHERE network_id = $1",
  296. 1,
  297. NULL,
  298. nwidparam,
  299. NULL,
  300. NULL,
  301. 0);
  302. if (PQresultStatus(r2) != PGRES_TUPLES_OK) {
  303. fprintf(stderr, "ERROR: Error retreiving routes for network: %s\n", PQresultErrorMessage(r2));
  304. PQclear(r2);
  305. PQclear(res);
  306. exit(1);
  307. }
  308. n = PQntuples(r2);
  309. for (int j = 0; j < n; ++j) {
  310. std::string addr = PQgetvalue(r2, j, 0);
  311. std::string bits = PQgetvalue(r2, j, 1);
  312. std::string via = PQgetvalue(r2, j, 2);
  313. json route;
  314. route["target"] = addr + "/" + bits;
  315. if (via == "NULL") {
  316. route["via"] = nullptr;
  317. } else {
  318. route["via"] = via;
  319. }
  320. config["routes"].push_back(route);
  321. }
  322. PQclear(r2);
  323. _networkChanged(empty, config, false);
  324. }
  325. PQclear(res);
  326. if (++this->_ready == 2) {
  327. if (_waitNoticePrinted) {
  328. fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
  329. }
  330. _readyLock.unlock();
  331. }
  332. } catch (std::exception &e) {
  333. fprintf(stderr, "ERROR: Error initializing networks: %s", e.what());
  334. exit(-1);
  335. }
  336. }
  337. void PostgreSQL::initializeMembers(PGconn *conn)
  338. {
  339. try {
  340. if (PQstatus(conn) != CONNECTION_OK) {
  341. fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
  342. exit(1);
  343. }
  344. const char *params[1] = {
  345. _myAddressStr.c_str()
  346. };
  347. PGresult *res = PQexecParams(conn,
  348. "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, "
  349. " EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, "
  350. " EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000, "
  351. " m.remote_trace_level, m.remote_trace_target, m.tags, m.v_major, m.v_minor, m.v_rev, m.v_proto, "
  352. " m.no_auto_assign_ips, m.revision "
  353. "FROM ztc_member m "
  354. "INNER JOIN ztc_network n "
  355. " ON n.id = m.network_id "
  356. "WHERE n.controller_id = $1 AND m.deleted = false",
  357. 1,
  358. NULL,
  359. params,
  360. NULL,
  361. NULL,
  362. 0);
  363. if (PQresultStatus(res) != PGRES_TUPLES_OK) {
  364. fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn));
  365. PQclear(res);
  366. exit(1);
  367. }
  368. int numRows = PQntuples(res);
  369. for (int i = 0; i < numRows; ++i) {
  370. json empty;
  371. json config;
  372. std::string memberId(PQgetvalue(res, i, 0));
  373. std::string networkId(PQgetvalue(res, i, 1));
  374. std::string ctime = PQgetvalue(res, i, 5);
  375. config["id"] = memberId;
  376. config["nwid"] = networkId;
  377. config["activeBridge"] = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
  378. config["authorized"] = (strcmp(PQgetvalue(res, i, 3), "t") == 0);
  379. try {
  380. config["capabilities"] = json::parse(PQgetvalue(res, i, 4));
  381. } catch (std::exception &e) {
  382. config["capabilities"] = json::array();
  383. }
  384. try {
  385. config["creationTime"] = std::stoull(PQgetvalue(res, i, 5));
  386. } catch (std::exception &e) {
  387. config["creationTime"] = 0ULL;
  388. //fprintf(stderr, "Error upding creation time (member): %s\n", PQgetvalue(res, i, 5));
  389. }
  390. config["identity"] = PQgetvalue(res, i, 6);
  391. try {
  392. config["lastAuthorizedTime"] = std::stoull(PQgetvalue(res, i, 7));
  393. } catch(std::exception &e) {
  394. config["lastAuthorizedTime"] = 0ULL;
  395. //fprintf(stderr, "Error updating last auth time (member): %s\n", PQgetvalue(res, i, 7));
  396. }
  397. try {
  398. config["lastDeauthorizedTime"] = std::stoull(PQgetvalue(res, i, 8));
  399. } catch( std::exception &e) {
  400. config["lastDeauthorizedTime"] = 0ULL;
  401. //fprintf(stderr, "Error updating last deauth time (member): %s\n", PQgetvalue(res, i, 8));
  402. }
  403. try {
  404. config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9));
  405. } catch (std::exception &e) {
  406. config["remoteTraceLevel"] = 0;
  407. }
  408. config["remoteTraceTarget"] = PQgetvalue(res, i, 10);
  409. try {
  410. config["tags"] = json::parse(PQgetvalue(res, i, 11));
  411. } catch (std::exception &e) {
  412. config["tags"] = json::array();
  413. }
  414. try {
  415. config["vMajor"] = std::stoi(PQgetvalue(res, i, 12));
  416. } catch(std::exception &e) {
  417. config["vMajor"] = -1;
  418. }
  419. try {
  420. config["vMinor"] = std::stoi(PQgetvalue(res, i, 13));
  421. } catch (std::exception &e) {
  422. config["vMinor"] = -1;
  423. }
  424. try {
  425. config["vRev"] = std::stoi(PQgetvalue(res, i, 14));
  426. } catch (std::exception &e) {
  427. config["vRev"] = -1;
  428. }
  429. try {
  430. config["vProto"] = std::stoi(PQgetvalue(res, i, 15));
  431. } catch (std::exception &e) {
  432. config["vProto"] = -1;
  433. }
  434. config["noAutoAssignIps"] = (strcmp(PQgetvalue(res, i, 16), "t") == 0);
  435. try {
  436. config["revision"] = std::stoull(PQgetvalue(res, i, 17));
  437. } catch (std::exception &e) {
  438. config["revision"] = 0ULL;
  439. //fprintf(stderr, "Error updating revision (member): %s\n", PQgetvalue(res, i, 17));
  440. }
  441. config["objtype"] = "member";
  442. config["ipAssignments"] = json::array();
  443. const char *p2[2] = {
  444. memberId.c_str(),
  445. networkId.c_str()
  446. };
  447. PGresult *r2 = PQexecParams(conn,
  448. "SELECT DISTINCT address FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
  449. 2,
  450. NULL,
  451. p2,
  452. NULL,
  453. NULL,
  454. 0);
  455. if (PQresultStatus(r2) != PGRES_TUPLES_OK) {
  456. fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn));
  457. PQclear(r2);
  458. PQclear(res);
  459. exit(1);
  460. }
  461. int n = PQntuples(r2);
  462. for (int j = 0; j < n; ++j) {
  463. config["ipAssignments"].push_back(PQgetvalue(r2, j, 0));
  464. }
  465. _memberChanged(empty, config, false);
  466. }
  467. PQclear(res);
  468. if (++this->_ready == 2) {
  469. if (_waitNoticePrinted) {
  470. fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
  471. }
  472. _readyLock.unlock();
  473. }
  474. } catch (std::exception &e) {
  475. fprintf(stderr, "ERROR: Error initializing members: %s\n", e.what());
  476. exit(-1);
  477. }
  478. }
  479. void PostgreSQL::heartbeat()
  480. {
  481. char publicId[1024];
  482. char hostnameTmp[1024];
  483. _myId.toString(false,publicId);
  484. if (gethostname(hostnameTmp, sizeof(hostnameTmp))!= 0) {
  485. hostnameTmp[0] = (char)0;
  486. } else {
  487. for (int i = 0; i < sizeof(hostnameTmp); ++i) {
  488. if ((hostnameTmp[i] == '.')||(hostnameTmp[i] == 0)) {
  489. hostnameTmp[i] = (char)0;
  490. break;
  491. }
  492. }
  493. }
  494. const char *controllerId = _myAddressStr.c_str();
  495. const char *publicIdentity = publicId;
  496. const char *hostname = hostnameTmp;
  497. PGconn *conn = getPgConn();
  498. if (PQstatus(conn) == CONNECTION_BAD) {
  499. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  500. PQfinish(conn);
  501. exit(1);
  502. }
  503. while (_run == 1) {
  504. if(PQstatus(conn) != CONNECTION_OK) {
  505. fprintf(stderr, "%s heartbeat thread lost connection to Database\n", _myAddressStr.c_str());
  506. PQfinish(conn);
  507. exit(6);
  508. }
  509. if (conn) {
  510. std::string major = std::to_string(ZEROTIER_ONE_VERSION_MAJOR);
  511. std::string minor = std::to_string(ZEROTIER_ONE_VERSION_MINOR);
  512. std::string rev = std::to_string(ZEROTIER_ONE_VERSION_REVISION);
  513. std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD);
  514. std::string now = std::to_string(OSUtils::now());
  515. std::string host_port = std::to_string(_listenPort);
  516. std::string use_rabbitmq = (_mqc != NULL) ? "true" : "false";
  517. const char *values[10] = {
  518. controllerId,
  519. hostname,
  520. now.c_str(),
  521. publicIdentity,
  522. major.c_str(),
  523. minor.c_str(),
  524. rev.c_str(),
  525. build.c_str(),
  526. host_port.c_str(),
  527. use_rabbitmq.c_str()
  528. };
  529. PGresult *res = PQexecParams(conn,
  530. "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_rabbitmq) "
  531. "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) "
  532. "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
  533. "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
  534. "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, "
  535. "use_rabbitmq = EXCLUDED.use_rabbitmq",
  536. 10, // number of parameters
  537. NULL, // oid field. ignore
  538. values, // values for substitution
  539. NULL, // lengths in bytes of each value
  540. NULL, // binary?
  541. 0);
  542. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  543. fprintf(stderr, "Heartbeat Update Failed: %s\n", PQresultErrorMessage(res));
  544. }
  545. PQclear(res);
  546. }
  547. std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  548. }
  549. PQfinish(conn);
  550. conn = NULL;
  551. }
  552. void PostgreSQL::membersDbWatcher()
  553. {
  554. PGconn *conn = getPgConn(NO_OVERRIDE);
  555. if (PQstatus(conn) == CONNECTION_BAD) {
  556. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  557. PQfinish(conn);
  558. exit(1);
  559. }
  560. initializeMembers(conn);
  561. if (this->_mqc != NULL) {
  562. PQfinish(conn);
  563. conn = NULL;
  564. _membersWatcher_RabbitMQ();
  565. } else {
  566. _membersWatcher_Postgres(conn);
  567. PQfinish(conn);
  568. conn = NULL;
  569. }
  570. if (_run == 1) {
  571. fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
  572. exit(9);
  573. }
  574. fprintf(stderr, "Exited membersDbWatcher\n");
  575. }
  576. void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
  577. char buf[11] = {0};
  578. std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf));
  579. PGresult *res = PQexec(conn, cmd.c_str());
  580. if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
  581. fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res));
  582. PQclear(res);
  583. PQfinish(conn);
  584. exit(1);
  585. }
  586. PQclear(res); res = NULL;
  587. while(_run == 1) {
  588. if (PQstatus(conn) != CONNECTION_OK) {
  589. fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres.");
  590. exit(-1);
  591. }
  592. PGnotify *notify = NULL;
  593. PQconsumeInput(conn);
  594. while ((notify = PQnotifies(conn)) != NULL) {
  595. //fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra);
  596. try {
  597. json tmp(json::parse(notify->extra));
  598. json &ov = tmp["old_val"];
  599. json &nv = tmp["new_val"];
  600. json oldConfig, newConfig;
  601. if (ov.is_object()) oldConfig = ov;
  602. if (nv.is_object()) newConfig = nv;
  603. if (oldConfig.is_object() || newConfig.is_object()) {
  604. _memberChanged(oldConfig,newConfig,(this->_ready>=2));
  605. }
  606. } catch (...) {} // ignore bad records
  607. free(notify);
  608. }
  609. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  610. }
  611. }
  612. void PostgreSQL::_membersWatcher_RabbitMQ() {
  613. char buf[11] = {0};
  614. std::string qname = "member_"+ std::string(_myAddress.toString(buf));
  615. RabbitMQ rmq(_mqc, qname.c_str());
  616. try {
  617. rmq.init();
  618. } catch (std::runtime_error &e) {
  619. fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
  620. exit(11);
  621. }
  622. while (_run == 1) {
  623. try {
  624. std::string msg = rmq.consume();
  625. // fprintf(stderr, "Got Member Update: %s\n", msg.c_str());
  626. if (msg.empty()) {
  627. continue;
  628. }
  629. json tmp(json::parse(msg));
  630. json &ov = tmp["old_val"];
  631. json &nv = tmp["new_val"];
  632. json oldConfig, newConfig;
  633. if (ov.is_object()) oldConfig = ov;
  634. if (nv.is_object()) newConfig = nv;
  635. if (oldConfig.is_object() || newConfig.is_object()) {
  636. _memberChanged(oldConfig,newConfig,(this->_ready>=2));
  637. }
  638. } catch (std::runtime_error &e) {
  639. fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what());
  640. break;
  641. } catch(std::exception &e ) {
  642. fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what());
  643. } catch(...) {
  644. fprintf(stderr, "RABBITMQ ERROR member change: unknown error\n");
  645. }
  646. }
  647. }
  648. void PostgreSQL::networksDbWatcher()
  649. {
  650. PGconn *conn = getPgConn(NO_OVERRIDE);
  651. if (PQstatus(conn) == CONNECTION_BAD) {
  652. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  653. PQfinish(conn);
  654. exit(1);
  655. }
  656. initializeNetworks(conn);
  657. if (this->_mqc != NULL) {
  658. PQfinish(conn);
  659. conn = NULL;
  660. _networksWatcher_RabbitMQ();
  661. } else {
  662. _networksWatcher_Postgres(conn);
  663. PQfinish(conn);
  664. conn = NULL;
  665. }
  666. if (_run == 1) {
  667. fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
  668. exit(8);
  669. }
  670. fprintf(stderr, "Exited membersDbWatcher\n");
  671. }
  672. void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) {
  673. char buf[11] = {0};
  674. std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf));
  675. PGresult *res = PQexec(conn, cmd.c_str());
  676. if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
  677. fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res));
  678. PQclear(res);
  679. PQfinish(conn);
  680. exit(1);
  681. }
  682. PQclear(res); res = NULL;
  683. while(_run == 1) {
  684. if (PQstatus(conn) != CONNECTION_OK) {
  685. fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres.");
  686. exit(-1);
  687. }
  688. PGnotify *notify = NULL;
  689. PQconsumeInput(conn);
  690. while ((notify = PQnotifies(conn)) != NULL) {
  691. //fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra);
  692. try {
  693. json tmp(json::parse(notify->extra));
  694. json &ov = tmp["old_val"];
  695. json &nv = tmp["new_val"];
  696. json oldConfig, newConfig;
  697. if (ov.is_object()) oldConfig = ov;
  698. if (nv.is_object()) newConfig = nv;
  699. if (oldConfig.is_object()||newConfig.is_object()) {
  700. _networkChanged(oldConfig,newConfig,(this->_ready >= 2));
  701. }
  702. } catch (...) {} // ignore bad records
  703. free(notify);
  704. }
  705. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  706. }
  707. }
  708. void PostgreSQL::_networksWatcher_RabbitMQ() {
  709. char buf[11] = {0};
  710. std::string qname = "network_"+ std::string(_myAddress.toString(buf));
  711. RabbitMQ rmq(_mqc, qname.c_str());
  712. try {
  713. rmq.init();
  714. } catch (std::runtime_error &e) {
  715. fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
  716. exit(11);
  717. }
  718. while (_run == 1) {
  719. try {
  720. std::string msg = rmq.consume();
  721. if (msg.empty()) {
  722. continue;
  723. }
  724. // fprintf(stderr, "Got network update: %s\n", msg.c_str());
  725. json tmp(json::parse(msg));
  726. json &ov = tmp["old_val"];
  727. json &nv = tmp["new_val"];
  728. json oldConfig, newConfig;
  729. if (ov.is_object()) oldConfig = ov;
  730. if (nv.is_object()) newConfig = nv;
  731. if (oldConfig.is_object()||newConfig.is_object()) {
  732. _networkChanged(oldConfig,newConfig,(this->_ready >= 2));
  733. }
  734. } catch (std::runtime_error &e) {
  735. fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
  736. break;
  737. } catch (std::exception &e) {
  738. fprintf(stderr, "RABBITMQ ERROR network watcher: %s\n", e.what());
  739. } catch(...) {
  740. fprintf(stderr, "RABBITMQ ERROR network watcher: unknown error\n");
  741. }
  742. }
  743. }
  744. void PostgreSQL::commitThread()
  745. {
  746. PGconn *conn = getPgConn();
  747. if (PQstatus(conn) == CONNECTION_BAD) {
  748. fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn));
  749. PQfinish(conn);
  750. exit(1);
  751. }
  752. json *config = nullptr;
  753. while(_commitQueue.get(config)&(_run == 1)) {
  754. if (!config) {
  755. continue;
  756. }
  757. if (PQstatus(conn) == CONNECTION_BAD) {
  758. fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn));
  759. PQfinish(conn);
  760. delete config;
  761. exit(1);
  762. }
  763. try {
  764. const std::string objtype = (*config)["objtype"];
  765. if (objtype == "member") {
  766. try {
  767. std::string memberId = (*config)["id"];
  768. std::string networkId = (*config)["nwid"];
  769. std::string identity = (*config)["identity"];
  770. std::string target = "NULL";
  771. if (!(*config)["remoteTraceTarget"].is_null()) {
  772. target = (*config)["remoteTraceTarget"];
  773. }
  774. std::string caps = OSUtils::jsonDump((*config)["capabilities"], -1);
  775. std::string lastAuthTime = std::to_string((long long)(*config)["lastAuthorizedTime"]);
  776. std::string lastDeauthTime = std::to_string((long long)(*config)["lastDeauthorizedTime"]);
  777. std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]);
  778. std::string rev = std::to_string((unsigned long long)(*config)["revision"]);
  779. std::string tags = OSUtils::jsonDump((*config)["tags"], -1);
  780. std::string vmajor = std::to_string((int)(*config)["vMajor"]);
  781. std::string vminor = std::to_string((int)(*config)["vMinor"]);
  782. std::string vrev = std::to_string((int)(*config)["vRev"]);
  783. std::string vproto = std::to_string((int)(*config)["vProto"]);
  784. const char *values[19] = {
  785. memberId.c_str(),
  786. networkId.c_str(),
  787. ((*config)["activeBridge"] ? "true" : "false"),
  788. ((*config)["authorized"] ? "true" : "false"),
  789. caps.c_str(),
  790. identity.c_str(),
  791. lastAuthTime.c_str(),
  792. lastDeauthTime.c_str(),
  793. ((*config)["noAutoAssignIps"] ? "true" : "false"),
  794. rtraceLevel.c_str(),
  795. (target == "NULL") ? NULL : target.c_str(),
  796. rev.c_str(),
  797. tags.c_str(),
  798. vmajor.c_str(),
  799. vminor.c_str(),
  800. vrev.c_str(),
  801. vproto.c_str()
  802. };
  803. PGresult *res = PQexecParams(conn,
  804. "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
  805. "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
  806. "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
  807. "VALUES ($1, $2, $3, $4, $5, $6, "
  808. "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
  809. "$9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET "
  810. "active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, "
  811. "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, "
  812. "last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, "
  813. "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, "
  814. "revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, "
  815. "v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto",
  816. 17,
  817. NULL,
  818. values,
  819. NULL,
  820. NULL,
  821. 0);
  822. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  823. fprintf(stderr, "ERROR: Error updating member: %s\n", PQresultErrorMessage(res));
  824. fprintf(stderr, "%s", OSUtils::jsonDump(*config, 2).c_str());
  825. PQclear(res);
  826. delete config;
  827. config = nullptr;
  828. continue;
  829. }
  830. PQclear(res);
  831. res = PQexec(conn, "BEGIN");
  832. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  833. fprintf(stderr, "ERROR: Error beginning transaction: %s\n", PQresultErrorMessage(res));
  834. PQclear(res);
  835. delete config;
  836. config = nullptr;
  837. continue;
  838. }
  839. PQclear(res);
  840. const char *v2[2] = {
  841. memberId.c_str(),
  842. networkId.c_str()
  843. };
  844. res = PQexecParams(conn,
  845. "DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
  846. 2,
  847. NULL,
  848. v2,
  849. NULL,
  850. NULL,
  851. 0);
  852. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  853. fprintf(stderr, "ERROR: Error updating IP address assignments: %s\n", PQresultErrorMessage(res));
  854. PQclear(res);
  855. PQclear(PQexec(conn, "ROLLBACK"));;
  856. delete config;
  857. config = nullptr;
  858. continue;
  859. }
  860. PQclear(res);
  861. std::vector<std::string> assignments;
  862. for (auto i = (*config)["ipAssignments"].begin(); i != (*config)["ipAssignments"].end(); ++i) {
  863. std::string addr = *i;
  864. if (std::find(assignments.begin(), assignments.end(), addr) != assignments.end()) {
  865. continue;
  866. }
  867. const char *v3[3] = {
  868. memberId.c_str(),
  869. networkId.c_str(),
  870. addr.c_str()
  871. };
  872. res = PQexecParams(conn,
  873. "INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3)",
  874. 3,
  875. NULL,
  876. v3,
  877. NULL,
  878. NULL,
  879. 0);
  880. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  881. fprintf(stderr, "ERROR: Error setting IP addresses for member: %s\n", PQresultErrorMessage(res));
  882. PQclear(res);
  883. PQclear(PQexec(conn, "ROLLBACK"));
  884. break;;
  885. }
  886. }
  887. res = PQexec(conn, "COMMIT");
  888. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  889. fprintf(stderr, "ERROR: Error committing ip address data: %s\n", PQresultErrorMessage(res));
  890. }
  891. PQclear(res);
  892. const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL);
  893. const uint64_t memberidInt = OSUtils::jsonIntHex((*config)["id"], 0ULL);
  894. if (nwidInt && memberidInt) {
  895. nlohmann::json nwOrig;
  896. nlohmann::json memOrig;
  897. nlohmann::json memNew(*config);
  898. get(nwidInt, nwOrig, memberidInt, memOrig);
  899. _memberChanged(memOrig, memNew, (this->_ready>=2));
  900. } else {
  901. fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt);
  902. }
  903. } catch (std::exception &e) {
  904. fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
  905. }
  906. } else if (objtype == "network") {
  907. try {
  908. std::string id = (*config)["id"];
  909. std::string controllerId = _myAddressStr.c_str();
  910. std::string name = (*config)["name"];
  911. std::string remoteTraceTarget("NULL");
  912. if (!(*config)["remoteTraceTarget"].is_null()) {
  913. remoteTraceTarget = (*config)["remoteTraceTarget"];
  914. }
  915. std::string rulesSource = (*config)["rulesSource"];
  916. std::string caps = OSUtils::jsonDump((*config)["capabilitles"], -1);
  917. std::string now = std::to_string(OSUtils::now());
  918. std::string mtu = std::to_string((int)(*config)["mtu"]);
  919. std::string mcastLimit = std::to_string((int)(*config)["multicastLimit"]);
  920. std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]);
  921. std::string rules = OSUtils::jsonDump((*config)["rules"], -1);
  922. std::string tags = OSUtils::jsonDump((*config)["tags"], -1);
  923. std::string v4mode = OSUtils::jsonDump((*config)["v4AssignMode"],-1);
  924. std::string v6mode = OSUtils::jsonDump((*config)["v6AssignMode"], -1);
  925. bool enableBroadcast = (*config)["enableBroadcast"];
  926. bool isPrivate = (*config)["private"];
  927. const char *values[16] = {
  928. id.c_str(),
  929. controllerId.c_str(),
  930. caps.c_str(),
  931. enableBroadcast ? "true" : "false",
  932. now.c_str(),
  933. mtu.c_str(),
  934. mcastLimit.c_str(),
  935. name.c_str(),
  936. isPrivate ? "true" : "false",
  937. rtraceLevel.c_str(),
  938. (remoteTraceTarget == "NULL" ? NULL : remoteTraceTarget.c_str()),
  939. rules.c_str(),
  940. rulesSource.c_str(),
  941. tags.c_str(),
  942. v4mode.c_str(),
  943. v6mode.c_str(),
  944. };
  945. PGresult *res = PQexecParams(conn,
  946. "UPDATE ztc_network SET controller_id = $2, capabilities = $3, enable_broadcast = $4, "
  947. "last_updated = $5, mtu = $6, multicast_limit = $7, name = $8, private = $9, "
  948. "remote_trace_level = $10, remote_trace_target = $11, rules = $12, rules_source = $13, "
  949. "tags = $14, v4_assign_mode = $15, v6_assign_mode = $16 "
  950. "WHERE id = $1",
  951. 16,
  952. NULL,
  953. values,
  954. NULL,
  955. NULL,
  956. 0);
  957. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  958. fprintf(stderr, "ERROR: Error updating network record: %s\n", PQresultErrorMessage(res));
  959. PQclear(res);
  960. delete config;
  961. config = nullptr;
  962. continue;
  963. }
  964. PQclear(res);
  965. res = PQexec(conn, "BEGIN");
  966. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  967. fprintf(stderr, "ERROR: Error beginnning transaction: %s\n", PQresultErrorMessage(res));
  968. PQclear(res);
  969. delete config;
  970. config = nullptr;
  971. continue;
  972. }
  973. PQclear(res);
  974. const char *params[1] = {
  975. id.c_str()
  976. };
  977. res = PQexecParams(conn,
  978. "DELETE FROM ztc_network_assignment_pool WHERE network_id = $1",
  979. 1,
  980. NULL,
  981. params,
  982. NULL,
  983. NULL,
  984. 0);
  985. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  986. fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res));
  987. PQclear(res);
  988. PQclear(PQexec(conn, "ROLLBACK"));
  989. delete config;
  990. config = nullptr;
  991. continue;
  992. }
  993. PQclear(res);
  994. auto pool = (*config)["ipAssignmentPools"];
  995. bool err = false;
  996. for (auto i = pool.begin(); i != pool.end(); ++i) {
  997. std::string start = (*i)["ipRangeStart"];
  998. std::string end = (*i)["ipRangeEnd"];
  999. const char *p[3] = {
  1000. id.c_str(),
  1001. start.c_str(),
  1002. end.c_str()
  1003. };
  1004. res = PQexecParams(conn,
  1005. "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) "
  1006. "VALUES ($1, $2, $3)",
  1007. 3,
  1008. NULL,
  1009. p,
  1010. NULL,
  1011. NULL,
  1012. 0);
  1013. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1014. fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res));
  1015. PQclear(res);
  1016. err = true;
  1017. break;
  1018. }
  1019. PQclear(res);
  1020. }
  1021. if (err) {
  1022. PQclear(PQexec(conn, "ROLLBACK"));
  1023. delete config;
  1024. config = nullptr;
  1025. continue;
  1026. }
  1027. res = PQexecParams(conn,
  1028. "DELETE FROM ztc_network_route WHERE network_id = $1",
  1029. 1,
  1030. NULL,
  1031. params,
  1032. NULL,
  1033. NULL,
  1034. 0);
  1035. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1036. fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res));
  1037. PQclear(res);
  1038. PQclear(PQexec(conn, "ROLLBACK"));
  1039. delete config;
  1040. config = nullptr;
  1041. continue;
  1042. }
  1043. auto routes = (*config)["routes"];
  1044. err = false;
  1045. for (auto i = routes.begin(); i != routes.end(); ++i) {
  1046. std::string t = (*i)["target"];
  1047. std::vector<std::string> target;
  1048. std::istringstream f(t);
  1049. std::string s;
  1050. while(std::getline(f, s, '/')) {
  1051. target.push_back(s);
  1052. }
  1053. if (target.empty() || target.size() != 2) {
  1054. continue;
  1055. }
  1056. std::string targetAddr = target[0];
  1057. std::string targetBits = target[1];
  1058. std::string via = "NULL";
  1059. if (!(*i)["via"].is_null()) {
  1060. via = (*i)["via"];
  1061. }
  1062. const char *p[4] = {
  1063. id.c_str(),
  1064. targetAddr.c_str(),
  1065. targetBits.c_str(),
  1066. (via == "NULL" ? NULL : via.c_str()),
  1067. };
  1068. res = PQexecParams(conn,
  1069. "INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)",
  1070. 4,
  1071. NULL,
  1072. p,
  1073. NULL,
  1074. NULL,
  1075. 0);
  1076. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1077. fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res));
  1078. PQclear(res);
  1079. err = true;
  1080. break;
  1081. }
  1082. PQclear(res);
  1083. }
  1084. if (err) {
  1085. PQclear(PQexec(conn, "ROLLBACK"));
  1086. delete config;
  1087. config = nullptr;
  1088. continue;
  1089. }
  1090. res = PQexec(conn, "COMMIT");
  1091. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1092. fprintf(stderr, "ERROR: Error committing network update: %s\n", PQresultErrorMessage(res));
  1093. }
  1094. PQclear(res);
  1095. const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL);
  1096. if (nwidInt) {
  1097. nlohmann::json nwOrig;
  1098. nlohmann::json nwNew(*config);
  1099. get(nwidInt, nwOrig);
  1100. _networkChanged(nwOrig, nwNew, true);
  1101. } else {
  1102. fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt);
  1103. }
  1104. } catch (std::exception &e) {
  1105. fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
  1106. }
  1107. } else if (objtype == "trace") {
  1108. fprintf(stderr, "ERROR: Trace not yet implemented");
  1109. } else if (objtype == "_delete_network") {
  1110. try {
  1111. std::string networkId = (*config)["nwid"];
  1112. const char *values[1] = {
  1113. networkId.c_str()
  1114. };
  1115. PGresult * res = PQexecParams(conn,
  1116. "UPDATE ztc_network SET deleted = true WHERE id = $1",
  1117. 1,
  1118. NULL,
  1119. values,
  1120. NULL,
  1121. NULL,
  1122. 0);
  1123. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1124. fprintf(stderr, "ERROR: Error deleting network: %s\n", PQresultErrorMessage(res));
  1125. }
  1126. PQclear(res);
  1127. } catch (std::exception &e) {
  1128. fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what());
  1129. }
  1130. } else if (objtype == "_delete_member") {
  1131. try {
  1132. std::string memberId = (*config)["id"];
  1133. std::string networkId = (*config)["nwid"];
  1134. const char *values[2] = {
  1135. memberId.c_str(),
  1136. networkId.c_str()
  1137. };
  1138. PGresult *res = PQexecParams(conn,
  1139. "UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2",
  1140. 2,
  1141. NULL,
  1142. values,
  1143. NULL,
  1144. NULL,
  1145. 0);
  1146. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1147. fprintf(stderr, "ERROR: Error deleting member: %s\n", PQresultErrorMessage(res));
  1148. }
  1149. PQclear(res);
  1150. } catch (std::exception &e) {
  1151. fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what());
  1152. }
  1153. } else {
  1154. fprintf(stderr, "ERROR: unknown objtype");
  1155. }
  1156. } catch (std::exception &e) {
  1157. fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what());
  1158. }
  1159. delete config;
  1160. config = nullptr;
  1161. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  1162. }
  1163. PQfinish(conn);
  1164. if (_run == 1) {
  1165. fprintf(stderr, "ERROR: %s commitThread should still be running! Exiting Controller.\n", _myAddressStr.c_str());
  1166. exit(7);
  1167. }
  1168. }
  1169. void PostgreSQL::onlineNotificationThread()
  1170. {
  1171. PGconn *conn = getPgConn();
  1172. if (PQstatus(conn) == CONNECTION_BAD) {
  1173. fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
  1174. PQfinish(conn);
  1175. exit(1);
  1176. }
  1177. _connected = 1;
  1178. int64_t lastUpdatedNetworkStatus = 0;
  1179. std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;
  1180. while (_run == 1) {
  1181. if (PQstatus(conn) != CONNECTION_OK) {
  1182. fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
  1183. PQfinish(conn);
  1184. exit(5);
  1185. }
  1186. // map used to send notifications to front end
  1187. std::unordered_map<std::string, std::vector<std::string>> updateMap;
  1188. std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
  1189. {
  1190. std::lock_guard<std::mutex> l(_lastOnline_l);
  1191. lastOnline.swap(_lastOnline);
  1192. }
  1193. PGresult *res = NULL;
  1194. std::stringstream memberUpdate;
  1195. memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ";
  1196. bool firstRun = true;
  1197. bool memberAdded = false;
  1198. for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
  1199. uint64_t nwid_i = i->first.first;
  1200. char nwidTmp[64];
  1201. char memTmp[64];
  1202. char ipTmp[64];
  1203. OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i);
  1204. OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second);
  1205. auto found = _networks.find(nwid_i);
  1206. if (found == _networks.end()) {
  1207. continue; // skip members trying to join non-existant networks
  1208. }
  1209. std::string networkId(nwidTmp);
  1210. std::string memberId(memTmp);
  1211. std::vector<std::string> &members = updateMap[networkId];
  1212. members.push_back(memberId);
  1213. lastOnlineCumulative[i->first] = i->second.first;
  1214. const char *qvals[2] = {
  1215. networkId.c_str(),
  1216. memberId.c_str()
  1217. };
  1218. res = PQexecParams(conn,
  1219. "SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2",
  1220. 2,
  1221. NULL,
  1222. qvals,
  1223. NULL,
  1224. NULL,
  1225. 0);
  1226. if (PQresultStatus(res) != PGRES_TUPLES_OK) {
  1227. fprintf(stderr, "Member count failed: %s", PQerrorMessage(conn));
  1228. PQclear(res);
  1229. continue;
  1230. }
  1231. int nrows = PQntuples(res);
  1232. PQclear(res);
  1233. if (nrows == 1) {
  1234. int64_t ts = i->second.first;
  1235. std::string ipAddr = i->second.second.toIpString(ipTmp);
  1236. std::string timestamp = std::to_string(ts);
  1237. if (firstRun) {
  1238. firstRun = false;
  1239. } else {
  1240. memberUpdate << ", ";
  1241. }
  1242. memberUpdate << "('" << networkId << "', '" << memberId << "', ";
  1243. if (ipAddr.empty()) {
  1244. memberUpdate << "NULL, ";
  1245. } else {
  1246. memberUpdate << "'" << ipAddr << "', ";
  1247. }
  1248. memberUpdate << "TO_TIMESTAMP(" << timestamp << "::double precision/1000))";
  1249. memberAdded = true;
  1250. } else if (nrows > 1) {
  1251. fprintf(stderr, "nrows > 1?!?");
  1252. continue;
  1253. } else {
  1254. continue;
  1255. }
  1256. }
  1257. memberUpdate << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated;";
  1258. if (memberAdded) {
  1259. res = PQexec(conn, memberUpdate.str().c_str());
  1260. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1261. fprintf(stderr, "Multiple insert failed: %s", PQerrorMessage(conn));
  1262. }
  1263. PQclear(res);
  1264. }
  1265. const int64_t now = OSUtils::now();
  1266. if ((now - lastUpdatedNetworkStatus) > 10000) {
  1267. lastUpdatedNetworkStatus = now;
  1268. std::vector<std::pair<uint64_t, std::shared_ptr<_Network>>> networks;
  1269. {
  1270. std::lock_guard<std::mutex> l(_networks_l);
  1271. for (auto i = _networks.begin(); i != _networks.end(); ++i) {
  1272. networks.push_back(*i);
  1273. }
  1274. }
  1275. std::stringstream networkUpdate;
  1276. networkUpdate << "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, online_member_count, total_member_count, last_modified) VALUES ";
  1277. bool nwFirstRun = true;
  1278. bool networkAdded = false;
  1279. for (auto i = networks.begin(); i != networks.end(); ++i) {
  1280. char tmp[64];
  1281. Utils::hex(i->first, tmp);
  1282. std::string networkId(tmp);
  1283. std::vector<std::string> &_notUsed = updateMap[networkId];
  1284. (void)_notUsed;
  1285. uint64_t authMemberCount = 0;
  1286. uint64_t totalMemberCount = 0;
  1287. uint64_t onlineMemberCount = 0;
  1288. uint64_t bridgeCount = 0;
  1289. uint64_t ts = now;
  1290. {
  1291. std::lock_guard<std::mutex> l2(i->second->lock);
  1292. authMemberCount = i->second->authorizedMembers.size();
  1293. totalMemberCount = i->second->members.size();
  1294. bridgeCount = i->second->activeBridgeMembers.size();
  1295. for (auto m=i->second->members.begin(); m != i->second->members.end(); ++m) {
  1296. auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first, m->first));
  1297. if (lo != lastOnlineCumulative.end()) {
  1298. if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) {
  1299. ++onlineMemberCount;
  1300. } else {
  1301. lastOnlineCumulative.erase(lo);
  1302. }
  1303. }
  1304. }
  1305. }
  1306. const char *nvals[1] = {
  1307. networkId.c_str()
  1308. };
  1309. res = PQexecParams(conn,
  1310. "SELECT id FROM ztc_network WHERE id = $1",
  1311. 1,
  1312. NULL,
  1313. nvals,
  1314. NULL,
  1315. NULL,
  1316. 0);
  1317. if (PQresultStatus(res) != PGRES_TUPLES_OK) {
  1318. fprintf(stderr, "Network lookup failed: %s", PQerrorMessage(conn));
  1319. PQclear(res);
  1320. continue;
  1321. }
  1322. int nrows = PQntuples(res);
  1323. PQclear(res);
  1324. if (nrows == 1) {
  1325. std::string bc = std::to_string(bridgeCount);
  1326. std::string amc = std::to_string(authMemberCount);
  1327. std::string omc = std::to_string(onlineMemberCount);
  1328. std::string tmc = std::to_string(totalMemberCount);
  1329. std::string timestamp = std::to_string(ts);
  1330. if (nwFirstRun) {
  1331. nwFirstRun = false;
  1332. } else {
  1333. networkUpdate << ", ";
  1334. }
  1335. networkUpdate << "('" << networkId << "', " << bc << ", " << amc << ", " << omc << ", " << tmc << ", "
  1336. << "TO_TIMESTAMP(" << timestamp << "::double precision/1000))";
  1337. networkAdded = true;
  1338. } else if (nrows > 1) {
  1339. fprintf(stderr, "Number of networks > 1?!?!?");
  1340. continue;
  1341. } else {
  1342. continue;
  1343. }
  1344. }
  1345. networkUpdate << " ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, "
  1346. << "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, "
  1347. << "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified";
  1348. if (networkAdded) {
  1349. res = PQexec(conn, networkUpdate.str().c_str());
  1350. if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1351. fprintf(stderr, "Error during multiple network upsert: %s", PQresultErrorMessage(res));
  1352. }
  1353. PQclear(res);
  1354. }
  1355. }
  1356. // for (auto it = updateMap.begin(); it != updateMap.end(); ++it) {
  1357. // std::string networkId = it->first;
  1358. // std::vector<std::string> members = it->second;
  1359. // std::stringstream queryBuilder;
  1360. // std::string membersStr = ::join(members, ",");
  1361. // queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'";
  1362. // std::string query = queryBuilder.str();
  1363. // PGresult *res = PQexec(conn,query.c_str());
  1364. // if (PQresultStatus(res) != PGRES_COMMAND_OK) {
  1365. // fprintf(stderr, "ERROR: Error sending NOTIFY: %s\n", PQresultErrorMessage(res));
  1366. // }
  1367. // PQclear(res);
  1368. // }
  1369. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  1370. }
  1371. fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
  1372. PQfinish(conn);
  1373. if (_run == 1) {
  1374. fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str());
  1375. exit(6);
  1376. }
  1377. }
  1378. PGconn *PostgreSQL::getPgConn(OverrideMode m) {
  1379. if (m == ALLOW_PGBOUNCER_OVERRIDE) {
  1380. char *connStr = getenv("PGBOUNCER_CONNSTR");
  1381. if (connStr != NULL) {
  1382. fprintf(stderr, "PGBouncer Override\n");
  1383. std::string conn(connStr);
  1384. conn += " application_name=controller-";
  1385. conn += _myAddressStr.c_str();
  1386. return PQconnectdb(conn.c_str());
  1387. }
  1388. }
  1389. return PQconnectdb(_connString.c_str());
  1390. }
  1391. #endif //ZT_CONTROLLER_USE_LIBPQ