CV1.cpp 63 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952
  1. /*
  2. * Copyright (c)2019 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2026-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #include "CV1.hpp"
  14. #ifdef ZT_CONTROLLER_USE_LIBPQ
  15. #include "../node/Constants.hpp"
  16. #include "../node/SHA512.hpp"
  17. #include "../version.h"
  18. #include "CtlUtil.hpp"
  19. #include "EmbeddedNetworkController.hpp"
  20. #include "Redis.hpp"
  21. #include <chrono>
  22. #include <climits>
  23. #include <iomanip>
  24. #include <libpq-fe.h>
  25. #include <smeeclient.h>
  26. #include <sstream>
  27. // #define REDIS_TRACE 1
  28. using json = nlohmann::json;
  29. namespace {
  30. static const int DB_MINIMUM_VERSION = 38;
  31. } // anonymous namespace
  32. using namespace ZeroTier;
  33. using Attrs = std::vector<std::pair<std::string, std::string> >;
  34. using Item = std::pair<std::string, Attrs>;
  35. using ItemStream = std::vector<Item>;
  36. CV1::CV1(const Identity& myId, const char* path, int listenPort, RedisConfig* rc)
  37. : DB()
  38. , _pool()
  39. , _myId(myId)
  40. , _myAddress(myId.address())
  41. , _ready(0)
  42. , _connected(1)
  43. , _run(1)
  44. , _waitNoticePrinted(false)
  45. , _listenPort(listenPort)
  46. , _rc(rc)
  47. , _redis(NULL)
  48. , _cluster(NULL)
  49. , _redisMemberStatus(false)
  50. , _smee(NULL)
  51. {
  52. char myAddress[64];
  53. _myAddressStr = myId.address().toString(myAddress);
  54. _connString = std::string(path);
  55. auto f = std::make_shared<PostgresConnFactory>(_connString);
  56. _pool = std::make_shared<ConnectionPool<PostgresConnection> >(15, 5, std::static_pointer_cast<ConnectionFactory>(f));
  57. memset(_ssoPsk, 0, sizeof(_ssoPsk));
  58. char* const ssoPskHex = getenv("ZT_SSO_PSK");
  59. #ifdef ZT_TRACE
  60. fprintf(stderr, "ZT_SSO_PSK: %s\n", ssoPskHex);
  61. #endif
  62. if (ssoPskHex) {
  63. // SECURITY: note that ssoPskHex will always be null-terminated if libc actually
  64. // returns something non-NULL. If the hex encodes something shorter than 48 bytes,
  65. // it will be padded at the end with zeroes. If longer, it'll be truncated.
  66. Utils::unhex(ssoPskHex, _ssoPsk, sizeof(_ssoPsk));
  67. }
  68. const char* redisMemberStatus = getenv("ZT_REDIS_MEMBER_STATUS");
  69. if (redisMemberStatus && (strcmp(redisMemberStatus, "true") == 0)) {
  70. _redisMemberStatus = true;
  71. fprintf(stderr, "Using redis for member status\n");
  72. }
  73. auto c = _pool->borrow();
  74. pqxx::work txn { *c->c };
  75. pqxx::row r { txn.exec1("SELECT version FROM ztc_database") };
  76. int dbVersion = r[0].as<int>();
  77. txn.commit();
  78. if (dbVersion < DB_MINIMUM_VERSION) {
  79. fprintf(stderr, "Central database schema version too low. This controller version requires a minimum schema version of %d. Please upgrade your Central instance", DB_MINIMUM_VERSION);
  80. exit(1);
  81. }
  82. _pool->unborrow(c);
  83. if (_rc != NULL) {
  84. sw::redis::ConnectionOptions opts;
  85. sw::redis::ConnectionPoolOptions poolOpts;
  86. opts.host = _rc->hostname;
  87. opts.port = _rc->port;
  88. opts.password = _rc->password;
  89. opts.db = 0;
  90. opts.keep_alive = true;
  91. opts.connect_timeout = std::chrono::seconds(3);
  92. poolOpts.size = 25;
  93. poolOpts.wait_timeout = std::chrono::seconds(5);
  94. poolOpts.connection_lifetime = std::chrono::minutes(3);
  95. poolOpts.connection_idle_time = std::chrono::minutes(1);
  96. if (_rc->clusterMode) {
  97. fprintf(stderr, "Using Redis in Cluster Mode\n");
  98. _cluster = std::make_shared<sw::redis::RedisCluster>(opts, poolOpts);
  99. }
  100. else {
  101. fprintf(stderr, "Using Redis in Standalone Mode\n");
  102. _redis = std::make_shared<sw::redis::Redis>(opts, poolOpts);
  103. }
  104. }
  105. _readyLock.lock();
  106. fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt());
  107. _waitNoticePrinted = true;
  108. initializeNetworks();
  109. initializeMembers();
  110. _heartbeatThread = std::thread(&CV1::heartbeat, this);
  111. _membersDbWatcher = std::thread(&CV1::membersDbWatcher, this);
  112. _networksDbWatcher = std::thread(&CV1::networksDbWatcher, this);
  113. for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) {
  114. _commitThread[i] = std::thread(&CV1::commitThread, this);
  115. }
  116. _onlineNotificationThread = std::thread(&CV1::onlineNotificationThread, this);
  117. configureSmee();
  118. }
  119. CV1::~CV1()
  120. {
  121. if (_smee != NULL) {
  122. smeeclient::smee_client_delete(_smee);
  123. _smee = NULL;
  124. }
  125. _run = 0;
  126. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  127. _heartbeatThread.join();
  128. _membersDbWatcher.join();
  129. _networksDbWatcher.join();
  130. _commitQueue.stop();
  131. for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) {
  132. _commitThread[i].join();
  133. }
  134. _onlineNotificationThread.join();
  135. }
  136. void CV1::configureSmee()
  137. {
  138. const char* TEMPORAL_SCHEME = "ZT_TEMPORAL_SCHEME";
  139. const char* TEMPORAL_HOST = "ZT_TEMPORAL_HOST";
  140. const char* TEMPORAL_PORT = "ZT_TEMPORAL_PORT";
  141. const char* TEMPORAL_NAMESPACE = "ZT_TEMPORAL_NAMESPACE";
  142. const char* SMEE_TASK_QUEUE = "ZT_SMEE_TASK_QUEUE";
  143. const char* scheme = getenv(TEMPORAL_SCHEME);
  144. if (scheme == NULL) {
  145. scheme = "http";
  146. }
  147. const char* host = getenv(TEMPORAL_HOST);
  148. const char* port = getenv(TEMPORAL_PORT);
  149. const char* ns = getenv(TEMPORAL_NAMESPACE);
  150. const char* task_queue = getenv(SMEE_TASK_QUEUE);
  151. if (scheme != NULL && host != NULL && port != NULL && ns != NULL && task_queue != NULL) {
  152. fprintf(stderr, "creating smee client\n");
  153. std::string hostPort = std::string(scheme) + std::string("://") + std::string(host) + std::string(":") + std::string(port);
  154. this->_smee = smeeclient::smee_client_new(hostPort.c_str(), ns, task_queue);
  155. }
  156. else {
  157. fprintf(stderr, "Smee client not configured\n");
  158. }
  159. }
  160. bool CV1::waitForReady()
  161. {
  162. while (_ready < 2) {
  163. _readyLock.lock();
  164. _readyLock.unlock();
  165. }
  166. return true;
  167. }
  168. bool CV1::isReady()
  169. {
  170. return ((_ready == 2) && (_connected));
  171. }
  172. bool CV1::save(nlohmann::json& record, bool notifyListeners)
  173. {
  174. bool modified = false;
  175. try {
  176. if (! record.is_object()) {
  177. fprintf(stderr, "record is not an object?!?\n");
  178. return false;
  179. }
  180. const std::string objtype = record["objtype"];
  181. if (objtype == "network") {
  182. // fprintf(stderr, "network save\n");
  183. const uint64_t nwid = OSUtils::jsonIntHex(record["id"], 0ULL);
  184. if (nwid) {
  185. nlohmann::json old;
  186. get(nwid, old);
  187. if ((! old.is_object()) || (! _compareRecords(old, record))) {
  188. record["revision"] = OSUtils::jsonInt(record["revision"], 0ULL) + 1ULL;
  189. _commitQueue.post(std::pair<nlohmann::json, bool>(record, notifyListeners));
  190. modified = true;
  191. }
  192. }
  193. }
  194. else if (objtype == "member") {
  195. std::string networkId = record["nwid"];
  196. std::string memberId = record["id"];
  197. const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"], 0ULL);
  198. const uint64_t id = OSUtils::jsonIntHex(record["id"], 0ULL);
  199. // fprintf(stderr, "member save %s-%s\n", networkId.c_str(), memberId.c_str());
  200. if ((id) && (nwid)) {
  201. nlohmann::json network, old;
  202. get(nwid, network, id, old);
  203. if ((! old.is_object()) || (! _compareRecords(old, record))) {
  204. // fprintf(stderr, "commit queue post\n");
  205. record["revision"] = OSUtils::jsonInt(record["revision"], 0ULL) + 1ULL;
  206. _commitQueue.post(std::pair<nlohmann::json, bool>(record, notifyListeners));
  207. modified = true;
  208. }
  209. else {
  210. // fprintf(stderr, "no change\n");
  211. }
  212. }
  213. }
  214. else {
  215. fprintf(stderr, "uhh waaat\n");
  216. }
  217. }
  218. catch (std::exception& e) {
  219. fprintf(stderr, "Error on PostgreSQL::save: %s\n", e.what());
  220. }
  221. catch (...) {
  222. fprintf(stderr, "Unknown error on PostgreSQL::save\n");
  223. }
  224. return modified;
  225. }
  226. void CV1::eraseNetwork(const uint64_t networkId)
  227. {
  228. fprintf(stderr, "PostgreSQL::eraseNetwork\n");
  229. char tmp2[24];
  230. waitForReady();
  231. Utils::hex(networkId, tmp2);
  232. std::pair<nlohmann::json, bool> tmp;
  233. tmp.first["id"] = tmp2;
  234. tmp.first["objtype"] = "_delete_network";
  235. tmp.second = true;
  236. _commitQueue.post(tmp);
  237. nlohmann::json nullJson;
  238. _networkChanged(tmp.first, nullJson, true);
  239. }
  240. void CV1::eraseMember(const uint64_t networkId, const uint64_t memberId)
  241. {
  242. fprintf(stderr, "PostgreSQL::eraseMember\n");
  243. char tmp2[24];
  244. waitForReady();
  245. std::pair<nlohmann::json, bool> tmp, nw;
  246. Utils::hex(networkId, tmp2);
  247. tmp.first["nwid"] = tmp2;
  248. Utils::hex(memberId, tmp2);
  249. tmp.first["id"] = tmp2;
  250. tmp.first["objtype"] = "_delete_member";
  251. tmp.second = true;
  252. _commitQueue.post(tmp);
  253. nlohmann::json nullJson;
  254. _memberChanged(tmp.first, nullJson, true);
  255. }
  256. void CV1::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress, const char* osArch)
  257. {
  258. std::lock_guard<std::mutex> l(_lastOnline_l);
  259. NodeOnlineRecord& i = _lastOnline[std::pair<uint64_t, uint64_t>(networkId, memberId)];
  260. i.lastSeen = OSUtils::now();
  261. if (physicalAddress) {
  262. i.physicalAddress = physicalAddress;
  263. }
  264. i.osArch = std::string(osArch);
  265. }
  266. void CV1::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress)
  267. {
  268. this->nodeIsOnline(networkId, memberId, physicalAddress, "unknown/unknown");
  269. }
  270. AuthInfo CV1::getSSOAuthInfo(const nlohmann::json& member, const std::string& redirectURL)
  271. {
  272. Metrics::db_get_sso_info++;
  273. // NONCE is just a random character string. no semantic meaning
  274. // state = HMAC SHA384 of Nonce based on shared sso key
  275. //
  276. // need nonce timeout in database? make sure it's used within X time
  277. // X is 5 minutes for now. Make configurable later?
  278. //
  279. // how do we tell when a nonce is used? if auth_expiration_time is set
  280. std::string networkId = member["nwid"];
  281. std::string memberId = member["id"];
  282. char authenticationURL[4096] = { 0 };
  283. AuthInfo info;
  284. info.enabled = true;
  285. // if (memberId == "a10dccea52" && networkId == "8056c2e21c24673d") {
  286. // fprintf(stderr, "invalid authinfo for grant's machine\n");
  287. // info.version=1;
  288. // return info;
  289. // }
  290. // fprintf(stderr, "PostgreSQL::updateMemberOnLoad: %s-%s\n", networkId.c_str(), memberId.c_str());
  291. std::shared_ptr<PostgresConnection> c;
  292. try {
  293. c = _pool->borrow();
  294. pqxx::work w(*c->c);
  295. char nonceBytes[16] = { 0 };
  296. std::string nonce = "";
  297. // check if the member exists first.
  298. pqxx::row count = w.exec_params1("SELECT count(id) FROM ztc_member WHERE id = $1 AND network_id = $2 AND deleted = false", memberId, networkId);
  299. if (count[0].as<int>() == 1) {
  300. // get active nonce, if exists.
  301. pqxx::result r = w.exec_params(
  302. "SELECT nonce FROM ztc_sso_expiry "
  303. "WHERE network_id = $1 AND member_id = $2 "
  304. "AND ((NOW() AT TIME ZONE 'UTC') <= authentication_expiry_time) AND ((NOW() AT TIME ZONE 'UTC') <= nonce_expiration)",
  305. networkId,
  306. memberId);
  307. if (r.size() == 0) {
  308. // no active nonce.
  309. // find an unused nonce, if one exists.
  310. pqxx::result r = w.exec_params(
  311. "SELECT nonce FROM ztc_sso_expiry "
  312. "WHERE network_id = $1 AND member_id = $2 "
  313. "AND authentication_expiry_time IS NULL AND ((NOW() AT TIME ZONE 'UTC') <= nonce_expiration)",
  314. networkId,
  315. memberId);
  316. if (r.size() == 1) {
  317. // we have an existing nonce. Use it
  318. nonce = r.at(0)[0].as<std::string>();
  319. Utils::unhex(nonce.c_str(), nonceBytes, sizeof(nonceBytes));
  320. }
  321. else if (r.empty()) {
  322. // create a nonce
  323. Utils::getSecureRandom(nonceBytes, 16);
  324. char nonceBuf[64] = { 0 };
  325. Utils::hex(nonceBytes, sizeof(nonceBytes), nonceBuf);
  326. nonce = std::string(nonceBuf);
  327. pqxx::result ir = w.exec_params0(
  328. "INSERT INTO ztc_sso_expiry "
  329. "(nonce, nonce_expiration, network_id, member_id) VALUES "
  330. "($1, TO_TIMESTAMP($2::double precision/1000), $3, $4)",
  331. nonce,
  332. OSUtils::now() + 300000,
  333. networkId,
  334. memberId);
  335. w.commit();
  336. }
  337. else {
  338. // > 1 ?!? Thats an error!
  339. fprintf(stderr, "> 1 unused nonce!\n");
  340. exit(6);
  341. }
  342. }
  343. else if (r.size() == 1) {
  344. nonce = r.at(0)[0].as<std::string>();
  345. Utils::unhex(nonce.c_str(), nonceBytes, sizeof(nonceBytes));
  346. }
  347. else {
  348. // more than 1 nonce in use? Uhhh...
  349. fprintf(stderr, "> 1 nonce in use for network member?!?\n");
  350. exit(7);
  351. }
  352. r = w.exec_params(
  353. "SELECT oc.client_id, oc.authorization_endpoint, oc.issuer, oc.provider, oc.sso_impl_version "
  354. "FROM ztc_network AS n "
  355. "INNER JOIN ztc_org o "
  356. " ON o.owner_id = n.owner_id "
  357. "LEFT OUTER JOIN ztc_network_oidc_config noc "
  358. " ON noc.network_id = n.id "
  359. "LEFT OUTER JOIN ztc_oidc_config oc "
  360. " ON noc.client_id = oc.client_id AND oc.org_id = o.org_id "
  361. "WHERE n.id = $1 AND n.sso_enabled = true",
  362. networkId);
  363. std::string client_id = "";
  364. std::string authorization_endpoint = "";
  365. std::string issuer = "";
  366. std::string provider = "";
  367. uint64_t sso_version = 0;
  368. if (r.size() == 1) {
  369. client_id = r.at(0)[0].as<std::optional<std::string> >().value_or("");
  370. authorization_endpoint = r.at(0)[1].as<std::optional<std::string> >().value_or("");
  371. issuer = r.at(0)[2].as<std::optional<std::string> >().value_or("");
  372. provider = r.at(0)[3].as<std::optional<std::string> >().value_or("");
  373. sso_version = r.at(0)[4].as<std::optional<uint64_t> >().value_or(1);
  374. }
  375. else if (r.size() > 1) {
  376. fprintf(stderr, "ERROR: More than one auth endpoint for an organization?!?!? NetworkID: %s\n", networkId.c_str());
  377. }
  378. else {
  379. fprintf(stderr, "No client or auth endpoint?!?\n");
  380. }
  381. info.version = sso_version;
  382. // no catch all else because we don't actually care if no records exist here. just continue as normal.
  383. if ((! client_id.empty()) && (! authorization_endpoint.empty())) {
  384. uint8_t state[48];
  385. HMACSHA384(_ssoPsk, nonceBytes, sizeof(nonceBytes), state);
  386. char state_hex[256];
  387. Utils::hex(state, 48, state_hex);
  388. if (info.version == 0) {
  389. char url[2048] = { 0 };
  390. OSUtils::ztsnprintf(
  391. url,
  392. sizeof(authenticationURL),
  393. "%s?response_type=id_token&response_mode=form_post&scope=openid+email+profile&redirect_uri=%s&nonce=%s&state=%s&client_id=%s",
  394. authorization_endpoint.c_str(),
  395. url_encode(redirectURL).c_str(),
  396. nonce.c_str(),
  397. state_hex,
  398. client_id.c_str());
  399. info.authenticationURL = std::string(url);
  400. }
  401. else if (info.version == 1) {
  402. info.ssoClientID = client_id;
  403. info.issuerURL = issuer;
  404. info.ssoProvider = provider;
  405. info.ssoNonce = nonce;
  406. info.ssoState = std::string(state_hex) + "_" + networkId;
  407. info.centralAuthURL = redirectURL;
  408. #ifdef ZT_DEBUG
  409. fprintf(
  410. stderr,
  411. "ssoClientID: %s\nissuerURL: %s\nssoNonce: %s\nssoState: %s\ncentralAuthURL: %s\nprovider: %s\n",
  412. info.ssoClientID.c_str(),
  413. info.issuerURL.c_str(),
  414. info.ssoNonce.c_str(),
  415. info.ssoState.c_str(),
  416. info.centralAuthURL.c_str(),
  417. provider.c_str());
  418. #endif
  419. }
  420. }
  421. else {
  422. fprintf(stderr, "client_id: %s\nauthorization_endpoint: %s\n", client_id.c_str(), authorization_endpoint.c_str());
  423. }
  424. }
  425. _pool->unborrow(c);
  426. }
  427. catch (std::exception& e) {
  428. fprintf(stderr, "ERROR: Error updating member on load for network %s: %s\n", networkId.c_str(), e.what());
  429. }
  430. return info; // std::string(authenticationURL);
  431. }
  432. void CV1::initializeNetworks()
  433. {
  434. try {
  435. std::string setKey = "networks:{" + _myAddressStr + "}";
  436. fprintf(stderr, "Initializing Networks...\n");
  437. if (_redisMemberStatus) {
  438. fprintf(stderr, "Init Redis for networks...\n");
  439. try {
  440. if (_rc->clusterMode) {
  441. _cluster->del(setKey);
  442. }
  443. else {
  444. _redis->del(setKey);
  445. }
  446. }
  447. catch (sw::redis::Error& e) {
  448. // ignore. if this key doesn't exist, there's no reason to delete it
  449. }
  450. }
  451. std::unordered_set<std::string> networkSet;
  452. char qbuf[2048] = { 0 };
  453. sprintf(
  454. qbuf,
  455. "SELECT n.id, (EXTRACT(EPOCH FROM n.creation_time AT TIME ZONE 'UTC')*1000)::bigint as creation_time, n.capabilities, "
  456. "n.enable_broadcast, (EXTRACT(EPOCH FROM n.last_modified AT TIME ZONE 'UTC')*1000)::bigint AS last_modified, n.mtu, n.multicast_limit, n.name, n.private, n.remote_trace_level, "
  457. "n.remote_trace_target, n.revision, n.rules, n.tags, n.v4_assign_mode, n.v6_assign_mode, n.sso_enabled, (CASE WHEN n.sso_enabled THEN noc.client_id ELSE NULL END) as client_id, "
  458. "(CASE WHEN n.sso_enabled THEN oc.authorization_endpoint ELSE NULL END) as authorization_endpoint, "
  459. "(CASE WHEN n.sso_enabled THEN oc.provider ELSE NULL END) as provider, d.domain, d.servers, "
  460. "ARRAY(SELECT CONCAT(host(ip_range_start),'|', host(ip_range_end)) FROM ztc_network_assignment_pool WHERE network_id = n.id) AS assignment_pool, "
  461. "ARRAY(SELECT CONCAT(host(address),'/',bits::text,'|',COALESCE(host(via), 'NULL'))FROM ztc_network_route WHERE network_id = n.id) AS routes "
  462. "FROM ztc_network n "
  463. "LEFT OUTER JOIN ztc_org o "
  464. " ON o.owner_id = n.owner_id "
  465. "LEFT OUTER JOIN ztc_network_oidc_config noc "
  466. " ON noc.network_id = n.id "
  467. "LEFT OUTER JOIN ztc_oidc_config oc "
  468. " ON noc.client_id = oc.client_id AND oc.org_id = o.org_id "
  469. "LEFT OUTER JOIN ztc_network_dns d "
  470. " ON d.network_id = n.id "
  471. "WHERE deleted = false AND controller_id = '%s'",
  472. _myAddressStr.c_str());
  473. auto c = _pool->borrow();
  474. auto c2 = _pool->borrow();
  475. pqxx::work w { *c->c };
  476. fprintf(stderr, "Load networks from psql...\n");
  477. auto stream = pqxx::stream_from::query(w, qbuf);
  478. std::tuple<
  479. std::string // network ID
  480. ,
  481. std::optional<int64_t> // creationTime
  482. ,
  483. std::optional<std::string> // capabilities
  484. ,
  485. std::optional<bool> // enableBroadcast
  486. ,
  487. std::optional<uint64_t> // lastModified
  488. ,
  489. std::optional<int> // mtu
  490. ,
  491. std::optional<int> // multicastLimit
  492. ,
  493. std::optional<std::string> // name
  494. ,
  495. bool // private
  496. ,
  497. std::optional<int> // remoteTraceLevel
  498. ,
  499. std::optional<std::string> // remoteTraceTarget
  500. ,
  501. std::optional<uint64_t> // revision
  502. ,
  503. std::optional<std::string> // rules
  504. ,
  505. std::optional<std::string> // tags
  506. ,
  507. std::optional<std::string> // v4AssignMode
  508. ,
  509. std::optional<std::string> // v6AssignMode
  510. ,
  511. std::optional<bool> // ssoEnabled
  512. ,
  513. std::optional<std::string> // clientId
  514. ,
  515. std::optional<std::string> // authorizationEndpoint
  516. ,
  517. std::optional<std::string> // ssoProvider
  518. ,
  519. std::optional<std::string> // domain
  520. ,
  521. std::optional<std::string> // servers
  522. ,
  523. std::string // assignmentPoolString
  524. ,
  525. std::string // routeString
  526. >
  527. row;
  528. uint64_t count = 0;
  529. auto tmp = std::chrono::high_resolution_clock::now();
  530. uint64_t total = 0;
  531. while (stream >> row) {
  532. auto start = std::chrono::high_resolution_clock::now();
  533. json empty;
  534. json config;
  535. initNetwork(config);
  536. std::string nwid = std::get<0>(row);
  537. std::optional<int64_t> creationTime = std::get<1>(row);
  538. std::optional<std::string> capabilities = std::get<2>(row);
  539. std::optional<bool> enableBroadcast = std::get<3>(row);
  540. std::optional<uint64_t> lastModified = std::get<4>(row);
  541. std::optional<int> mtu = std::get<5>(row);
  542. std::optional<int> multicastLimit = std::get<6>(row);
  543. std::optional<std::string> name = std::get<7>(row);
  544. bool isPrivate = std::get<8>(row);
  545. std::optional<int> remoteTraceLevel = std::get<9>(row);
  546. std::optional<std::string> remoteTraceTarget = std::get<10>(row);
  547. std::optional<uint64_t> revision = std::get<11>(row);
  548. std::optional<std::string> rules = std::get<12>(row);
  549. std::optional<std::string> tags = std::get<13>(row);
  550. std::optional<std::string> v4AssignMode = std::get<14>(row);
  551. std::optional<std::string> v6AssignMode = std::get<15>(row);
  552. std::optional<bool> ssoEnabled = std::get<16>(row);
  553. std::optional<std::string> clientId = std::get<17>(row);
  554. std::optional<std::string> authorizationEndpoint = std::get<18>(row);
  555. std::optional<std::string> ssoProvider = std::get<19>(row);
  556. std::optional<std::string> dnsDomain = std::get<20>(row);
  557. std::optional<std::string> dnsServers = std::get<21>(row);
  558. std::string assignmentPoolString = std::get<22>(row);
  559. std::string routesString = std::get<23>(row);
  560. config["id"] = nwid;
  561. config["nwid"] = nwid;
  562. config["creationTime"] = creationTime.value_or(0);
  563. config["capabilities"] = json::parse(capabilities.value_or("[]"));
  564. config["enableBroadcast"] = enableBroadcast.value_or(false);
  565. config["lastModified"] = lastModified.value_or(0);
  566. config["mtu"] = mtu.value_or(2800);
  567. config["multicastLimit"] = multicastLimit.value_or(64);
  568. config["name"] = name.value_or("");
  569. config["private"] = isPrivate;
  570. config["remoteTraceLevel"] = remoteTraceLevel.value_or(0);
  571. config["remoteTraceTarget"] = remoteTraceTarget.value_or("");
  572. config["revision"] = revision.value_or(0);
  573. config["rules"] = json::parse(rules.value_or("[]"));
  574. config["tags"] = json::parse(tags.value_or("[]"));
  575. config["v4AssignMode"] = json::parse(v4AssignMode.value_or("{}"));
  576. config["v6AssignMode"] = json::parse(v6AssignMode.value_or("{}"));
  577. config["ssoEnabled"] = ssoEnabled.value_or(false);
  578. config["objtype"] = "network";
  579. config["ipAssignmentPools"] = json::array();
  580. config["routes"] = json::array();
  581. config["clientId"] = clientId.value_or("");
  582. config["authorizationEndpoint"] = authorizationEndpoint.value_or("");
  583. config["provider"] = ssoProvider.value_or("");
  584. networkSet.insert(nwid);
  585. if (dnsDomain.has_value()) {
  586. std::string serverList = dnsServers.value();
  587. json obj;
  588. auto servers = json::array();
  589. if (serverList.rfind("{", 0) != std::string::npos) {
  590. serverList = serverList.substr(1, serverList.size() - 2);
  591. std::stringstream ss(serverList);
  592. while (ss.good()) {
  593. std::string server;
  594. std::getline(ss, server, ',');
  595. servers.push_back(server);
  596. }
  597. }
  598. obj["domain"] = dnsDomain.value();
  599. obj["servers"] = servers;
  600. config["dns"] = obj;
  601. }
  602. config["ipAssignmentPools"] = json::array();
  603. if (assignmentPoolString != "{}") {
  604. std::string tmp = assignmentPoolString.substr(1, assignmentPoolString.size() - 2);
  605. std::vector<std::string> assignmentPools = split(tmp, ',');
  606. for (auto it = assignmentPools.begin(); it != assignmentPools.end(); ++it) {
  607. std::vector<std::string> r = split(*it, '|');
  608. json ip;
  609. ip["ipRangeStart"] = r[0];
  610. ip["ipRangeEnd"] = r[1];
  611. config["ipAssignmentPools"].push_back(ip);
  612. }
  613. }
  614. config["routes"] = json::array();
  615. if (routesString != "{}") {
  616. std::string tmp = routesString.substr(1, routesString.size() - 2);
  617. std::vector<std::string> routes = split(tmp, ',');
  618. for (auto it = routes.begin(); it != routes.end(); ++it) {
  619. std::vector<std::string> r = split(*it, '|');
  620. json route;
  621. route["target"] = r[0];
  622. route["via"] = ((route["via"] == "NULL") ? nullptr : r[1]);
  623. config["routes"].push_back(route);
  624. }
  625. }
  626. Metrics::network_count++;
  627. _networkChanged(empty, config, false);
  628. auto end = std::chrono::high_resolution_clock::now();
  629. auto dur = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
  630. ;
  631. total += dur.count();
  632. ++count;
  633. if (count > 0 && count % 10000 == 0) {
  634. fprintf(stderr, "Averaging %llu us per network\n", (total / count));
  635. }
  636. }
  637. if (count > 0) {
  638. fprintf(stderr, "Took %llu us per network to load\n", (total / count));
  639. }
  640. stream.complete();
  641. w.commit();
  642. _pool->unborrow(c2);
  643. _pool->unborrow(c);
  644. fprintf(stderr, "done.\n");
  645. if (! networkSet.empty()) {
  646. if (_redisMemberStatus) {
  647. fprintf(stderr, "adding networks to redis...\n");
  648. if (_rc->clusterMode) {
  649. auto tx = _cluster->transaction(_myAddressStr, true, false);
  650. uint64_t count = 0;
  651. for (std::string nwid : networkSet) {
  652. tx.sadd(setKey, nwid);
  653. if (++count % 30000 == 0) {
  654. tx.exec();
  655. tx = _cluster->transaction(_myAddressStr, true, false);
  656. }
  657. }
  658. tx.exec();
  659. }
  660. else {
  661. auto tx = _redis->transaction(true, false);
  662. uint64_t count = 0;
  663. for (std::string nwid : networkSet) {
  664. tx.sadd(setKey, nwid);
  665. if (++count % 30000 == 0) {
  666. tx.exec();
  667. tx = _redis->transaction(true, false);
  668. }
  669. }
  670. tx.exec();
  671. }
  672. fprintf(stderr, "done.\n");
  673. }
  674. }
  675. if (++this->_ready == 2) {
  676. if (_waitNoticePrinted) {
  677. fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(), (unsigned long long)_myAddress.toInt());
  678. }
  679. _readyLock.unlock();
  680. }
  681. fprintf(stderr, "network init done.\n");
  682. }
  683. catch (sw::redis::Error& e) {
  684. fprintf(stderr, "ERROR: Error initializing networks in Redis: %s\n", e.what());
  685. std::this_thread::sleep_for(std::chrono::milliseconds(5000));
  686. exit(-1);
  687. }
  688. catch (std::exception& e) {
  689. fprintf(stderr, "ERROR: Error initializing networks: %s\n", e.what());
  690. std::this_thread::sleep_for(std::chrono::milliseconds(5000));
  691. exit(-1);
  692. }
  693. }
  694. void CV1::initializeMembers()
  695. {
  696. std::string memberId;
  697. std::string networkId;
  698. try {
  699. std::unordered_map<std::string, std::string> networkMembers;
  700. fprintf(stderr, "Initializing Members...\n");
  701. std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:";
  702. if (_redisMemberStatus) {
  703. fprintf(stderr, "Initialize Redis for members...\n");
  704. std::unique_lock<std::shared_mutex> l(_networks_l);
  705. std::unordered_set<std::string> deletes;
  706. for (auto it : _networks) {
  707. uint64_t nwid_i = it.first;
  708. char nwidTmp[64] = { 0 };
  709. OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
  710. std::string nwid(nwidTmp);
  711. std::string key = setKeyBase + nwid;
  712. deletes.insert(key);
  713. }
  714. if (! deletes.empty()) {
  715. try {
  716. if (_rc->clusterMode) {
  717. auto tx = _cluster->transaction(_myAddressStr, true, false);
  718. for (std::string k : deletes) {
  719. tx.del(k);
  720. }
  721. tx.exec();
  722. }
  723. else {
  724. auto tx = _redis->transaction(true, false);
  725. for (std::string k : deletes) {
  726. tx.del(k);
  727. }
  728. tx.exec();
  729. }
  730. }
  731. catch (sw::redis::Error& e) {
  732. // ignore
  733. }
  734. }
  735. }
  736. char qbuf[2048];
  737. sprintf(
  738. qbuf,
  739. "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, "
  740. "(EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000)::bigint, m.identity, "
  741. "(EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
  742. "(EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
  743. "m.remote_trace_level, m.remote_trace_target, m.tags, m.v_major, m.v_minor, m.v_rev, m.v_proto, "
  744. "m.no_auto_assign_ips, m.revision, m.sso_exempt, "
  745. "(CASE WHEN n.sso_enabled = TRUE AND m.sso_exempt = FALSE THEN "
  746. " ( "
  747. " SELECT (EXTRACT(EPOCH FROM e.authentication_expiry_time)*1000)::bigint "
  748. " FROM ztc_sso_expiry e "
  749. " INNER JOIN ztc_network n1 "
  750. " ON n1.id = e.network_id AND n1.deleted = TRUE "
  751. " WHERE e.network_id = m.network_id AND e.member_id = m.id AND n.sso_enabled = TRUE AND e.authentication_expiry_time IS NOT NULL "
  752. " ORDER BY e.authentication_expiry_time DESC LIMIT 1 "
  753. " ) "
  754. " ELSE NULL "
  755. " END) AS authentication_expiry_time, "
  756. "ARRAY(SELECT DISTINCT address FROM ztc_member_ip_assignment WHERE member_id = m.id AND network_id = m.network_id) AS assigned_addresses "
  757. "FROM ztc_member m "
  758. "INNER JOIN ztc_network n "
  759. " ON n.id = m.network_id "
  760. "WHERE n.controller_id = '%s' AND n.deleted = FALSE AND m.deleted = FALSE",
  761. _myAddressStr.c_str());
  762. auto c = _pool->borrow();
  763. auto c2 = _pool->borrow();
  764. pqxx::work w { *c->c };
  765. fprintf(stderr, "Load members from psql...\n");
  766. auto stream = pqxx::stream_from::query(w, qbuf);
  767. std::tuple<
  768. std::string // memberId
  769. ,
  770. std::string // memberId
  771. ,
  772. std::optional<bool> // activeBridge
  773. ,
  774. std::optional<bool> // authorized
  775. ,
  776. std::optional<std::string> // capabilities
  777. ,
  778. std::optional<uint64_t> // creationTime
  779. ,
  780. std::optional<std::string> // identity
  781. ,
  782. std::optional<uint64_t> // lastAuthorizedTime
  783. ,
  784. std::optional<uint64_t> // lastDeauthorizedTime
  785. ,
  786. std::optional<int> // remoteTraceLevel
  787. ,
  788. std::optional<std::string> // remoteTraceTarget
  789. ,
  790. std::optional<std::string> // tags
  791. ,
  792. std::optional<int> // vMajor
  793. ,
  794. std::optional<int> // vMinor
  795. ,
  796. std::optional<int> // vRev
  797. ,
  798. std::optional<int> // vProto
  799. ,
  800. std::optional<bool> // noAutoAssignIps
  801. ,
  802. std::optional<uint64_t> // revision
  803. ,
  804. std::optional<bool> // ssoExempt
  805. ,
  806. std::optional<uint64_t> // authenticationExpiryTime
  807. ,
  808. std::string // assignedAddresses
  809. >
  810. row;
  811. uint64_t count = 0;
  812. auto tmp = std::chrono::high_resolution_clock::now();
  813. uint64_t total = 0;
  814. while (stream >> row) {
  815. auto start = std::chrono::high_resolution_clock::now();
  816. json empty;
  817. json config;
  818. initMember(config);
  819. memberId = std::get<0>(row);
  820. networkId = std::get<1>(row);
  821. std::optional<bool> activeBridge = std::get<2>(row);
  822. std::optional<bool> authorized = std::get<3>(row);
  823. std::optional<std::string> capabilities = std::get<4>(row);
  824. std::optional<uint64_t> creationTime = std::get<5>(row);
  825. std::optional<std::string> identity = std::get<6>(row);
  826. std::optional<uint64_t> lastAuthorizedTime = std::get<7>(row);
  827. std::optional<uint64_t> lastDeauthorizedTime = std::get<8>(row);
  828. std::optional<int> remoteTraceLevel = std::get<9>(row);
  829. std::optional<std::string> remoteTraceTarget = std::get<10>(row);
  830. std::optional<std::string> tags = std::get<11>(row);
  831. std::optional<int> vMajor = std::get<12>(row);
  832. std::optional<int> vMinor = std::get<13>(row);
  833. std::optional<int> vRev = std::get<14>(row);
  834. std::optional<int> vProto = std::get<15>(row);
  835. std::optional<bool> noAutoAssignIps = std::get<16>(row);
  836. std::optional<uint64_t> revision = std::get<17>(row);
  837. std::optional<bool> ssoExempt = std::get<18>(row);
  838. std::optional<uint64_t> authenticationExpiryTime = std::get<19>(row);
  839. std::string assignedAddresses = std::get<20>(row);
  840. networkMembers.insert(std::pair<std::string, std::string>(setKeyBase + networkId, memberId));
  841. config["id"] = memberId;
  842. config["address"] = memberId;
  843. config["nwid"] = networkId;
  844. config["activeBridge"] = activeBridge.value_or(false);
  845. config["authorized"] = authorized.value_or(false);
  846. config["capabilities"] = json::parse(capabilities.value_or("[]"));
  847. config["creationTime"] = creationTime.value_or(0);
  848. config["identity"] = identity.value_or("");
  849. config["lastAuthorizedTime"] = lastAuthorizedTime.value_or(0);
  850. config["lastDeauthorizedTime"] = lastDeauthorizedTime.value_or(0);
  851. config["remoteTraceLevel"] = remoteTraceLevel.value_or(0);
  852. config["remoteTraceTarget"] = remoteTraceTarget.value_or("");
  853. config["tags"] = json::parse(tags.value_or("[]"));
  854. config["vMajor"] = vMajor.value_or(-1);
  855. config["vMinor"] = vMinor.value_or(-1);
  856. config["vRev"] = vRev.value_or(-1);
  857. config["vProto"] = vProto.value_or(-1);
  858. config["noAutoAssignIps"] = noAutoAssignIps.value_or(false);
  859. config["revision"] = revision.value_or(0);
  860. config["ssoExempt"] = ssoExempt.value_or(false);
  861. config["authenticationExpiryTime"] = authenticationExpiryTime.value_or(0);
  862. config["objtype"] = "member";
  863. config["ipAssignments"] = json::array();
  864. if (assignedAddresses != "{}") {
  865. std::string tmp = assignedAddresses.substr(1, assignedAddresses.size() - 2);
  866. std::vector<std::string> addrs = split(tmp, ',');
  867. for (auto it = addrs.begin(); it != addrs.end(); ++it) {
  868. config["ipAssignments"].push_back(*it);
  869. }
  870. }
  871. Metrics::member_count++;
  872. _memberChanged(empty, config, false);
  873. memberId = "";
  874. networkId = "";
  875. auto end = std::chrono::high_resolution_clock::now();
  876. auto dur = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
  877. total += dur.count();
  878. ++count;
  879. if (count > 0 && count % 10000 == 0) {
  880. fprintf(stderr, "Averaging %llu us per member\n", (total / count));
  881. }
  882. }
  883. if (count > 0) {
  884. fprintf(stderr, "Took %llu us per member to load\n", (total / count));
  885. }
  886. stream.complete();
  887. w.commit();
  888. _pool->unborrow(c2);
  889. _pool->unborrow(c);
  890. fprintf(stderr, "done.\n");
  891. if (! networkMembers.empty()) {
  892. if (_redisMemberStatus) {
  893. fprintf(stderr, "Load member data into redis...\n");
  894. if (_rc->clusterMode) {
  895. auto tx = _cluster->transaction(_myAddressStr, true, false);
  896. uint64_t count = 0;
  897. for (auto it : networkMembers) {
  898. tx.sadd(it.first, it.second);
  899. if (++count % 30000 == 0) {
  900. tx.exec();
  901. tx = _cluster->transaction(_myAddressStr, true, false);
  902. }
  903. }
  904. tx.exec();
  905. }
  906. else {
  907. auto tx = _redis->transaction(true, false);
  908. uint64_t count = 0;
  909. for (auto it : networkMembers) {
  910. tx.sadd(it.first, it.second);
  911. if (++count % 30000 == 0) {
  912. tx.exec();
  913. tx = _redis->transaction(true, false);
  914. }
  915. }
  916. tx.exec();
  917. }
  918. fprintf(stderr, "done.\n");
  919. }
  920. }
  921. fprintf(stderr, "Done loading members...\n");
  922. if (++this->_ready == 2) {
  923. if (_waitNoticePrinted) {
  924. fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(), (unsigned long long)_myAddress.toInt());
  925. }
  926. _readyLock.unlock();
  927. }
  928. }
  929. catch (sw::redis::Error& e) {
  930. fprintf(stderr, "ERROR: Error initializing members (redis): %s\n", e.what());
  931. exit(-1);
  932. }
  933. catch (std::exception& e) {
  934. fprintf(stderr, "ERROR: Error initializing member: %s-%s %s\n", networkId.c_str(), memberId.c_str(), e.what());
  935. exit(-1);
  936. }
  937. }
  938. void CV1::heartbeat()
  939. {
  940. char publicId[1024];
  941. char hostnameTmp[1024];
  942. _myId.toString(false, publicId);
  943. if (gethostname(hostnameTmp, sizeof(hostnameTmp)) != 0) {
  944. hostnameTmp[0] = (char)0;
  945. }
  946. else {
  947. for (int i = 0; i < (int)sizeof(hostnameTmp); ++i) {
  948. if ((hostnameTmp[i] == '.') || (hostnameTmp[i] == 0)) {
  949. hostnameTmp[i] = (char)0;
  950. break;
  951. }
  952. }
  953. }
  954. const char* controllerId = _myAddressStr.c_str();
  955. const char* publicIdentity = publicId;
  956. const char* hostname = hostnameTmp;
  957. while (_run == 1) {
  958. // fprintf(stderr, "%s: heartbeat\n", controllerId);
  959. auto c = _pool->borrow();
  960. int64_t ts = OSUtils::now();
  961. if (c->c) {
  962. std::string major = std::to_string(ZEROTIER_ONE_VERSION_MAJOR);
  963. std::string minor = std::to_string(ZEROTIER_ONE_VERSION_MINOR);
  964. std::string rev = std::to_string(ZEROTIER_ONE_VERSION_REVISION);
  965. std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD);
  966. std::string now = std::to_string(ts);
  967. std::string host_port = std::to_string(_listenPort);
  968. std::string use_redis = (_rc != NULL) ? "true" : "false";
  969. std::string redis_mem_status = (_redisMemberStatus) ? "true" : "false";
  970. try {
  971. pqxx::work w { *c->c };
  972. pqxx::result res = w.exec0(
  973. "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis, redis_member_status) "
  974. "VALUES ("
  975. + w.quote(controllerId) + ", " + w.quote(hostname) + ", TO_TIMESTAMP(" + now + "::double precision/1000), " + w.quote(publicIdentity) + ", " + major + ", " + minor + ", " + rev + ", " + build + ", " + host_port + ", "
  976. + use_redis + ", " + redis_mem_status
  977. + ") "
  978. "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
  979. "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
  980. "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, "
  981. "use_redis = EXCLUDED.use_redis, redis_member_status = EXCLUDED.redis_member_status");
  982. w.commit();
  983. }
  984. catch (std::exception& e) {
  985. fprintf(stderr, "%s: Heartbeat update failed: %s\n", controllerId, e.what());
  986. std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  987. continue;
  988. }
  989. }
  990. _pool->unborrow(c);
  991. try {
  992. if (_redisMemberStatus) {
  993. if (_rc->clusterMode) {
  994. _cluster->zadd("controllers", "controllerId", ts);
  995. }
  996. else {
  997. _redis->zadd("controllers", "controllerId", ts);
  998. }
  999. }
  1000. }
  1001. catch (sw::redis::Error& e) {
  1002. fprintf(stderr, "ERROR: Redis error in heartbeat thread: %s\n", e.what());
  1003. }
  1004. std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  1005. }
  1006. fprintf(stderr, "Exited heartbeat thread\n");
  1007. }
  1008. void CV1::membersDbWatcher()
  1009. {
  1010. if (_rc) {
  1011. _membersWatcher_Redis();
  1012. }
  1013. else {
  1014. _membersWatcher_Postgres();
  1015. }
  1016. if (_run == 1) {
  1017. fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
  1018. exit(9);
  1019. }
  1020. fprintf(stderr, "Exited membersDbWatcher\n");
  1021. }
  1022. void CV1::_membersWatcher_Postgres()
  1023. {
  1024. auto c = _pool->borrow();
  1025. std::string stream = "member_" + _myAddressStr;
  1026. fprintf(stderr, "Listening to member stream: %s\n", stream.c_str());
  1027. MemberNotificationReceiver m(this, *c->c, stream);
  1028. while (_run == 1) {
  1029. c->c->await_notification(5, 0);
  1030. }
  1031. _pool->unborrow(c);
  1032. }
  1033. void CV1::_membersWatcher_Redis()
  1034. {
  1035. char buf[11] = { 0 };
  1036. std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}";
  1037. std::string lastID = "0";
  1038. fprintf(stderr, "Listening to member stream: %s\n", key.c_str());
  1039. while (_run == 1) {
  1040. try {
  1041. json tmp;
  1042. std::unordered_map<std::string, ItemStream> result;
  1043. if (_rc->clusterMode) {
  1044. _cluster->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
  1045. }
  1046. else {
  1047. _redis->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
  1048. }
  1049. if (! result.empty()) {
  1050. for (auto element : result) {
  1051. #ifdef REDIS_TRACE
  1052. fprintf(stdout, "Received notification from: %s\n", element.first.c_str());
  1053. #endif
  1054. for (auto rec : element.second) {
  1055. std::string id = rec.first;
  1056. auto attrs = rec.second;
  1057. #ifdef REDIS_TRACE
  1058. fprintf(stdout, "Record ID: %s\n", id.c_str());
  1059. fprintf(stdout, "attrs len: %lu\n", attrs.size());
  1060. #endif
  1061. for (auto a : attrs) {
  1062. #ifdef REDIS_TRACE
  1063. fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str());
  1064. #endif
  1065. try {
  1066. tmp = json::parse(a.second);
  1067. json& ov = tmp["old_val"];
  1068. json& nv = tmp["new_val"];
  1069. json oldConfig, newConfig;
  1070. if (ov.is_object())
  1071. oldConfig = ov;
  1072. if (nv.is_object())
  1073. newConfig = nv;
  1074. if (oldConfig.is_object() || newConfig.is_object()) {
  1075. _memberChanged(oldConfig, newConfig, (this->_ready >= 2));
  1076. }
  1077. }
  1078. catch (...) {
  1079. fprintf(stderr, "json parse error in _membersWatcher_Redis: %s\n", a.second.c_str());
  1080. }
  1081. }
  1082. if (_rc->clusterMode) {
  1083. _cluster->xdel(key, id);
  1084. }
  1085. else {
  1086. _redis->xdel(key, id);
  1087. }
  1088. lastID = id;
  1089. Metrics::redis_mem_notification++;
  1090. }
  1091. }
  1092. }
  1093. }
  1094. catch (sw::redis::Error& e) {
  1095. fprintf(stderr, "Error in Redis members watcher: %s\n", e.what());
  1096. }
  1097. }
  1098. fprintf(stderr, "membersWatcher ended\n");
  1099. }
  1100. void CV1::networksDbWatcher()
  1101. {
  1102. if (_rc) {
  1103. _networksWatcher_Redis();
  1104. }
  1105. else {
  1106. _networksWatcher_Postgres();
  1107. }
  1108. if (_run == 1) {
  1109. fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
  1110. exit(8);
  1111. }
  1112. fprintf(stderr, "Exited networksDbWatcher\n");
  1113. }
  1114. void CV1::_networksWatcher_Postgres()
  1115. {
  1116. std::string stream = "network_" + _myAddressStr;
  1117. fprintf(stderr, "Listening to member stream: %s\n", stream.c_str());
  1118. auto c = _pool->borrow();
  1119. NetworkNotificationReceiver n(this, *c->c, stream);
  1120. while (_run == 1) {
  1121. c->c->await_notification(5, 0);
  1122. }
  1123. }
  1124. void CV1::_networksWatcher_Redis()
  1125. {
  1126. char buf[11] = { 0 };
  1127. std::string key = "network-stream:{" + std::string(_myAddress.toString(buf)) + "}";
  1128. std::string lastID = "0";
  1129. while (_run == 1) {
  1130. try {
  1131. json tmp;
  1132. std::unordered_map<std::string, ItemStream> result;
  1133. if (_rc->clusterMode) {
  1134. _cluster->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
  1135. }
  1136. else {
  1137. _redis->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
  1138. }
  1139. if (! result.empty()) {
  1140. for (auto element : result) {
  1141. #ifdef REDIS_TRACE
  1142. fprintf(stdout, "Received notification from: %s\n", element.first.c_str());
  1143. #endif
  1144. for (auto rec : element.second) {
  1145. std::string id = rec.first;
  1146. auto attrs = rec.second;
  1147. #ifdef REDIS_TRACE
  1148. fprintf(stdout, "Record ID: %s\n", id.c_str());
  1149. fprintf(stdout, "attrs len: %lu\n", attrs.size());
  1150. #endif
  1151. for (auto a : attrs) {
  1152. #ifdef REDIS_TRACE
  1153. fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str());
  1154. #endif
  1155. try {
  1156. tmp = json::parse(a.second);
  1157. json& ov = tmp["old_val"];
  1158. json& nv = tmp["new_val"];
  1159. json oldConfig, newConfig;
  1160. if (ov.is_object())
  1161. oldConfig = ov;
  1162. if (nv.is_object())
  1163. newConfig = nv;
  1164. if (oldConfig.is_object() || newConfig.is_object()) {
  1165. _networkChanged(oldConfig, newConfig, (this->_ready >= 2));
  1166. }
  1167. }
  1168. catch (std::exception& e) {
  1169. fprintf(stderr, "json parse error in networkWatcher_Redis: what: %s json: %s\n", e.what(), a.second.c_str());
  1170. }
  1171. }
  1172. if (_rc->clusterMode) {
  1173. _cluster->xdel(key, id);
  1174. }
  1175. else {
  1176. _redis->xdel(key, id);
  1177. }
  1178. lastID = id;
  1179. }
  1180. Metrics::redis_net_notification++;
  1181. }
  1182. }
  1183. }
  1184. catch (sw::redis::Error& e) {
  1185. fprintf(stderr, "Error in Redis networks watcher: %s\n", e.what());
  1186. }
  1187. }
  1188. fprintf(stderr, "networksWatcher ended\n");
  1189. }
  1190. void CV1::commitThread()
  1191. {
  1192. fprintf(stderr, "%s: commitThread start\n", _myAddressStr.c_str());
  1193. std::pair<nlohmann::json, bool> qitem;
  1194. while (_commitQueue.get(qitem) & (_run == 1)) {
  1195. // fprintf(stderr, "commitThread tick\n");
  1196. if (! qitem.first.is_object()) {
  1197. fprintf(stderr, "not an object\n");
  1198. continue;
  1199. }
  1200. std::shared_ptr<PostgresConnection> c;
  1201. try {
  1202. c = _pool->borrow();
  1203. }
  1204. catch (std::exception& e) {
  1205. fprintf(stderr, "ERROR: %s\n", e.what());
  1206. continue;
  1207. }
  1208. if (! c) {
  1209. fprintf(stderr, "Error getting database connection\n");
  1210. continue;
  1211. }
  1212. Metrics::pgsql_commit_ticks++;
  1213. try {
  1214. nlohmann::json& config = (qitem.first);
  1215. const std::string objtype = config["objtype"];
  1216. if (objtype == "member") {
  1217. // fprintf(stderr, "%s: commitThread: member\n", _myAddressStr.c_str());
  1218. std::string memberId;
  1219. std::string networkId;
  1220. try {
  1221. pqxx::work w(*c->c);
  1222. memberId = config["id"];
  1223. networkId = config["nwid"];
  1224. std::string target = "NULL";
  1225. if (! config["remoteTraceTarget"].is_null()) {
  1226. target = config["remoteTraceTarget"];
  1227. }
  1228. pqxx::row nwrow = w.exec_params1("SELECT COUNT(id) FROM ztc_network WHERE id = $1", networkId);
  1229. int nwcount = nwrow[0].as<int>();
  1230. if (nwcount != 1) {
  1231. fprintf(stderr, "network %s does not exist. skipping member upsert\n", networkId.c_str());
  1232. w.abort();
  1233. _pool->unborrow(c);
  1234. continue;
  1235. }
  1236. pqxx::row mrow = w.exec_params1("SELECT COUNT(id) FROM ztc_member WHERE id = $1 AND network_id = $2", memberId, networkId);
  1237. int membercount = mrow[0].as<int>();
  1238. bool isNewMember = false;
  1239. if (membercount == 0) {
  1240. // new member
  1241. isNewMember = true;
  1242. pqxx::result res = w.exec_params0(
  1243. "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
  1244. "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
  1245. "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
  1246. "VALUES ($1, $2, $3, $4, $5, $6, "
  1247. "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
  1248. "$9, $10, $11, $12, $13, $14, $15, $16, $17)",
  1249. memberId,
  1250. networkId,
  1251. (bool)config["activeBridge"],
  1252. (bool)config["authorized"],
  1253. OSUtils::jsonDump(config["capabilities"], -1),
  1254. OSUtils::jsonString(config["identity"], ""),
  1255. (uint64_t)config["lastAuthorizedTime"],
  1256. (uint64_t)config["lastDeauthorizedTime"],
  1257. (bool)config["noAutoAssignIps"],
  1258. (int)config["remoteTraceLevel"],
  1259. target,
  1260. (uint64_t)config["revision"],
  1261. OSUtils::jsonDump(config["tags"], -1),
  1262. (int)config["vMajor"],
  1263. (int)config["vMinor"],
  1264. (int)config["vRev"],
  1265. (int)config["vProto"]);
  1266. }
  1267. else {
  1268. // existing member
  1269. pqxx::result res = w.exec_params0(
  1270. "UPDATE ztc_member "
  1271. "SET active_bridge = $3, authorized = $4, capabilities = $5, identity = $6, "
  1272. "last_authorized_time = TO_TIMESTAMP($7::double precision/1000), "
  1273. "last_deauthorized_time = TO_TIMESTAMP($8::double precision/1000), "
  1274. "no_auto_assign_ips = $9, remote_trace_level = $10, remote_trace_target= $11, "
  1275. "revision = $12, tags = $13, v_major = $14, v_minor = $15, v_rev = $16, v_proto = $17 "
  1276. "WHERE id = $1 AND network_id = $2",
  1277. memberId,
  1278. networkId,
  1279. (bool)config["activeBridge"],
  1280. (bool)config["authorized"],
  1281. OSUtils::jsonDump(config["capabilities"], -1),
  1282. OSUtils::jsonString(config["identity"], ""),
  1283. (uint64_t)config["lastAuthorizedTime"],
  1284. (uint64_t)config["lastDeauthorizedTime"],
  1285. (bool)config["noAutoAssignIps"],
  1286. (int)config["remoteTraceLevel"],
  1287. target,
  1288. (uint64_t)config["revision"],
  1289. OSUtils::jsonDump(config["tags"], -1),
  1290. (int)config["vMajor"],
  1291. (int)config["vMinor"],
  1292. (int)config["vRev"],
  1293. (int)config["vProto"]);
  1294. }
  1295. if (! isNewMember) {
  1296. pqxx::result res = w.exec_params0("DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", memberId, networkId);
  1297. }
  1298. std::vector<std::string> assignments;
  1299. bool ipAssignError = false;
  1300. for (auto i = config["ipAssignments"].begin(); i != config["ipAssignments"].end(); ++i) {
  1301. std::string addr = *i;
  1302. if (std::find(assignments.begin(), assignments.end(), addr) != assignments.end()) {
  1303. continue;
  1304. }
  1305. pqxx::result res = w.exec_params0("INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) ON CONFLICT (network_id, member_id, address) DO NOTHING", memberId, networkId, addr);
  1306. assignments.push_back(addr);
  1307. }
  1308. if (ipAssignError) {
  1309. fprintf(stderr, "%s: ipAssignError\n", _myAddressStr.c_str());
  1310. w.abort();
  1311. _pool->unborrow(c);
  1312. c.reset();
  1313. continue;
  1314. }
  1315. w.commit();
  1316. if (_smee != NULL && isNewMember) {
  1317. pqxx::row row = w.exec_params1(
  1318. "SELECT "
  1319. " count(h.hook_id) "
  1320. "FROM "
  1321. " ztc_hook h "
  1322. " INNER JOIN ztc_org o ON o.org_id = h.org_id "
  1323. " INNER JOIN ztc_network n ON n.owner_id = o.owner_id "
  1324. " WHERE "
  1325. "n.id = $1 ",
  1326. networkId);
  1327. int64_t hookCount = row[0].as<int64_t>();
  1328. if (hookCount > 0) {
  1329. notifyNewMember(networkId, memberId);
  1330. }
  1331. }
  1332. const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL);
  1333. const uint64_t memberidInt = OSUtils::jsonIntHex(config["id"], 0ULL);
  1334. if (nwidInt && memberidInt) {
  1335. nlohmann::json nwOrig;
  1336. nlohmann::json memOrig;
  1337. nlohmann::json memNew(config);
  1338. get(nwidInt, nwOrig, memberidInt, memOrig);
  1339. _memberChanged(memOrig, memNew, qitem.second);
  1340. }
  1341. else {
  1342. fprintf(stderr, "%s: Can't notify of change. Error parsing nwid or memberid: %llu-%llu\n", _myAddressStr.c_str(), (unsigned long long)nwidInt, (unsigned long long)memberidInt);
  1343. }
  1344. }
  1345. catch (std::exception& e) {
  1346. fprintf(stderr, "%s ERROR: Error updating member %s-%s: %s\n", _myAddressStr.c_str(), networkId.c_str(), memberId.c_str(), e.what());
  1347. }
  1348. }
  1349. else if (objtype == "network") {
  1350. try {
  1351. // fprintf(stderr, "%s: commitThread: network\n", _myAddressStr.c_str());
  1352. pqxx::work w(*c->c);
  1353. std::string id = config["id"];
  1354. std::string remoteTraceTarget = "";
  1355. if (! config["remoteTraceTarget"].is_null()) {
  1356. remoteTraceTarget = config["remoteTraceTarget"];
  1357. }
  1358. std::string rulesSource = "";
  1359. if (config["rulesSource"].is_string()) {
  1360. rulesSource = config["rulesSource"];
  1361. }
  1362. // This ugly query exists because when we want to mirror networks to/from
  1363. // another data store (e.g. FileDB or LFDB) it is possible to get a network
  1364. // that doesn't exist in Central's database. This does an upsert and sets
  1365. // the owner_id to the "first" global admin in the user DB if the record
  1366. // did not previously exist. If the record already exists owner_id is left
  1367. // unchanged, so owner_id should be left out of the update clause.
  1368. pqxx::result res = w.exec_params0(
  1369. "INSERT INTO ztc_network (id, creation_time, owner_id, controller_id, capabilities, enable_broadcast, "
  1370. "last_modified, mtu, multicast_limit, name, private, "
  1371. "remote_trace_level, remote_trace_target, rules, rules_source, "
  1372. "tags, v4_assign_mode, v6_assign_mode, sso_enabled) VALUES ("
  1373. "$1, TO_TIMESTAMP($5::double precision/1000), "
  1374. "(SELECT user_id AS owner_id FROM ztc_global_permissions WHERE authorize = true AND del = true AND modify = true AND read = true LIMIT 1),"
  1375. "$2, $3, $4, TO_TIMESTAMP($5::double precision/1000), "
  1376. "$6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) "
  1377. "ON CONFLICT (id) DO UPDATE set controller_id = EXCLUDED.controller_id, "
  1378. "capabilities = EXCLUDED.capabilities, enable_broadcast = EXCLUDED.enable_broadcast, "
  1379. "last_modified = EXCLUDED.last_modified, mtu = EXCLUDED.mtu, "
  1380. "multicast_limit = EXCLUDED.multicast_limit, name = EXCLUDED.name, "
  1381. "private = EXCLUDED.private, remote_trace_level = EXCLUDED.remote_trace_level, "
  1382. "remote_trace_target = EXCLUDED.remote_trace_target, rules = EXCLUDED.rules, "
  1383. "rules_source = EXCLUDED.rules_source, tags = EXCLUDED.tags, "
  1384. "v4_assign_mode = EXCLUDED.v4_assign_mode, v6_assign_mode = EXCLUDED.v6_assign_mode, "
  1385. "sso_enabled = EXCLUDED.sso_enabled",
  1386. id,
  1387. _myAddressStr,
  1388. OSUtils::jsonDump(config["capabilities"], -1),
  1389. (bool)config["enableBroadcast"],
  1390. OSUtils::now(),
  1391. (int)config["mtu"],
  1392. (int)config["multicastLimit"],
  1393. OSUtils::jsonString(config["name"], ""),
  1394. (bool)config["private"],
  1395. (int)config["remoteTraceLevel"],
  1396. remoteTraceTarget,
  1397. OSUtils::jsonDump(config["rules"], -1),
  1398. rulesSource,
  1399. OSUtils::jsonDump(config["tags"], -1),
  1400. OSUtils::jsonDump(config["v4AssignMode"], -1),
  1401. OSUtils::jsonDump(config["v6AssignMode"], -1),
  1402. OSUtils::jsonBool(config["ssoEnabled"], false));
  1403. res = w.exec_params0("DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", 0);
  1404. auto pool = config["ipAssignmentPools"];
  1405. bool err = false;
  1406. for (auto i = pool.begin(); i != pool.end(); ++i) {
  1407. std::string start = (*i)["ipRangeStart"];
  1408. std::string end = (*i)["ipRangeEnd"];
  1409. res = w.exec_params0(
  1410. "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) "
  1411. "VALUES ($1, $2, $3)",
  1412. id,
  1413. start,
  1414. end);
  1415. }
  1416. res = w.exec_params0("DELETE FROM ztc_network_route WHERE network_id = $1", id);
  1417. auto routes = config["routes"];
  1418. err = false;
  1419. for (auto i = routes.begin(); i != routes.end(); ++i) {
  1420. std::string t = (*i)["target"];
  1421. std::vector<std::string> target;
  1422. std::istringstream f(t);
  1423. std::string s;
  1424. while (std::getline(f, s, '/')) {
  1425. target.push_back(s);
  1426. }
  1427. if (target.empty() || target.size() != 2) {
  1428. continue;
  1429. }
  1430. std::string targetAddr = target[0];
  1431. std::string targetBits = target[1];
  1432. std::string via = "NULL";
  1433. if (! (*i)["via"].is_null()) {
  1434. via = (*i)["via"];
  1435. }
  1436. res = w.exec_params0("INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)", id, targetAddr, targetBits, (via == "NULL" ? NULL : via.c_str()));
  1437. }
  1438. if (err) {
  1439. fprintf(stderr, "%s: route add error\n", _myAddressStr.c_str());
  1440. w.abort();
  1441. _pool->unborrow(c);
  1442. continue;
  1443. }
  1444. auto dns = config["dns"];
  1445. std::string domain = dns["domain"];
  1446. std::stringstream servers;
  1447. servers << "{";
  1448. for (auto j = dns["servers"].begin(); j < dns["servers"].end(); ++j) {
  1449. servers << *j;
  1450. if ((j + 1) != dns["servers"].end()) {
  1451. servers << ",";
  1452. }
  1453. }
  1454. servers << "}";
  1455. std::string s = servers.str();
  1456. res = w.exec_params0("INSERT INTO ztc_network_dns (network_id, domain, servers) VALUES ($1, $2, $3) ON CONFLICT (network_id) DO UPDATE SET domain = EXCLUDED.domain, servers = EXCLUDED.servers", id, domain, s);
  1457. w.commit();
  1458. const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL);
  1459. if (nwidInt) {
  1460. nlohmann::json nwOrig;
  1461. nlohmann::json nwNew(config);
  1462. get(nwidInt, nwOrig);
  1463. _networkChanged(nwOrig, nwNew, qitem.second);
  1464. }
  1465. else {
  1466. fprintf(stderr, "%s: Can't notify network changed: %llu\n", _myAddressStr.c_str(), (unsigned long long)nwidInt);
  1467. }
  1468. }
  1469. catch (std::exception& e) {
  1470. fprintf(stderr, "%s ERROR: Error updating network: %s\n", _myAddressStr.c_str(), e.what());
  1471. }
  1472. if (_redisMemberStatus) {
  1473. try {
  1474. std::string id = config["id"];
  1475. std::string controllerId = _myAddressStr.c_str();
  1476. std::string key = "networks:{" + controllerId + "}";
  1477. if (_rc->clusterMode) {
  1478. _cluster->sadd(key, id);
  1479. }
  1480. else {
  1481. _redis->sadd(key, id);
  1482. }
  1483. }
  1484. catch (sw::redis::Error& e) {
  1485. fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
  1486. }
  1487. }
  1488. }
  1489. else if (objtype == "_delete_network") {
  1490. // fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str());
  1491. try {
  1492. pqxx::work w(*c->c);
  1493. std::string networkId = config["nwid"];
  1494. pqxx::result res = w.exec_params0("UPDATE ztc_network SET deleted = true WHERE id = $1", networkId);
  1495. w.commit();
  1496. }
  1497. catch (std::exception& e) {
  1498. fprintf(stderr, "%s ERROR: Error deleting network: %s\n", _myAddressStr.c_str(), e.what());
  1499. }
  1500. if (_redisMemberStatus) {
  1501. try {
  1502. std::string id = config["id"];
  1503. std::string controllerId = _myAddressStr.c_str();
  1504. std::string key = "networks:{" + controllerId + "}";
  1505. if (_rc->clusterMode) {
  1506. _cluster->srem(key, id);
  1507. _cluster->del("network-nodes-online:{" + controllerId + "}:" + id);
  1508. }
  1509. else {
  1510. _redis->srem(key, id);
  1511. _redis->del("network-nodes-online:{" + controllerId + "}:" + id);
  1512. }
  1513. }
  1514. catch (sw::redis::Error& e) {
  1515. fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
  1516. }
  1517. }
  1518. }
  1519. else if (objtype == "_delete_member") {
  1520. // fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str());
  1521. try {
  1522. pqxx::work w(*c->c);
  1523. std::string memberId = config["id"];
  1524. std::string networkId = config["nwid"];
  1525. pqxx::result res = w.exec_params0("UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2", memberId, networkId);
  1526. w.commit();
  1527. }
  1528. catch (std::exception& e) {
  1529. fprintf(stderr, "%s ERROR: Error deleting member: %s\n", _myAddressStr.c_str(), e.what());
  1530. }
  1531. if (_redisMemberStatus) {
  1532. try {
  1533. std::string memberId = config["id"];
  1534. std::string networkId = config["nwid"];
  1535. std::string controllerId = _myAddressStr.c_str();
  1536. std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId;
  1537. if (_rc->clusterMode) {
  1538. _cluster->srem(key, memberId);
  1539. _cluster->del("member:{" + controllerId + "}:" + networkId + ":" + memberId);
  1540. }
  1541. else {
  1542. _redis->srem(key, memberId);
  1543. _redis->del("member:{" + controllerId + "}:" + networkId + ":" + memberId);
  1544. }
  1545. }
  1546. catch (sw::redis::Error& e) {
  1547. fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what());
  1548. }
  1549. }
  1550. }
  1551. else {
  1552. fprintf(stderr, "%s ERROR: unknown objtype\n", _myAddressStr.c_str());
  1553. }
  1554. }
  1555. catch (std::exception& e) {
  1556. fprintf(stderr, "%s ERROR: Error getting objtype: %s\n", _myAddressStr.c_str(), e.what());
  1557. }
  1558. _pool->unborrow(c);
  1559. c.reset();
  1560. }
  1561. fprintf(stderr, "%s commitThread finished\n", _myAddressStr.c_str());
  1562. }
  1563. void CV1::notifyNewMember(const std::string& networkID, const std::string& memberID)
  1564. {
  1565. smeeclient::smee_client_notify_network_joined(_smee, networkID.c_str(), memberID.c_str());
  1566. }
  1567. void CV1::onlineNotificationThread()
  1568. {
  1569. waitForReady();
  1570. if (_redisMemberStatus) {
  1571. onlineNotification_Redis();
  1572. }
  1573. else {
  1574. onlineNotification_Postgres();
  1575. }
  1576. }
  1577. /**
  1578. * ONLY UNCOMMENT FOR TEMPORARY DB MAINTENANCE
  1579. *
  1580. * This define temporarily turns off writing to the member status table
  1581. * so it can be reindexed when the indexes get too large.
  1582. */
  1583. // #define DISABLE_MEMBER_STATUS 1
  1584. void CV1::onlineNotification_Postgres()
  1585. {
  1586. _connected = 1;
  1587. nlohmann::json jtmp1, jtmp2;
  1588. while (_run == 1) {
  1589. auto c = _pool->borrow();
  1590. auto c2 = _pool->borrow();
  1591. try {
  1592. fprintf(stderr, "%s onlineNotification_Postgres\n", _myAddressStr.c_str());
  1593. std::unordered_map<std::pair<uint64_t, uint64_t>, NodeOnlineRecord, _PairHasher> lastOnline;
  1594. {
  1595. std::lock_guard<std::mutex> l(_lastOnline_l);
  1596. lastOnline.swap(_lastOnline);
  1597. }
  1598. #ifndef DISABLE_MEMBER_STATUS
  1599. pqxx::work w(*c->c);
  1600. pqxx::work w2(*c2->c);
  1601. fprintf(stderr, "online notification tick\n");
  1602. bool firstRun = true;
  1603. bool memberAdded = false;
  1604. int updateCount = 0;
  1605. pqxx::pipeline pipe(w);
  1606. for (auto i = lastOnline.begin(); i != lastOnline.end(); ++i) {
  1607. updateCount += 1;
  1608. uint64_t nwid_i = i->first.first;
  1609. char nwidTmp[64];
  1610. char memTmp[64];
  1611. char ipTmp[64];
  1612. OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
  1613. OSUtils::ztsnprintf(memTmp, sizeof(memTmp), "%.10llx", i->first.second);
  1614. if (! get(nwid_i, jtmp1, i->first.second, jtmp2)) {
  1615. continue; // skip non existent networks/members
  1616. }
  1617. std::string networkId(nwidTmp);
  1618. std::string memberId(memTmp);
  1619. try {
  1620. pqxx::row r = w2.exec_params1("SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2", networkId, memberId);
  1621. }
  1622. catch (pqxx::unexpected_rows& e) {
  1623. continue;
  1624. }
  1625. int64_t ts = i->second.lastSeen;
  1626. std::string ipAddr = i->second.physicalAddress.toIpString(ipTmp);
  1627. std::string timestamp = std::to_string(ts);
  1628. std::string osArch = i->second.osArch;
  1629. std::stringstream memberUpdate;
  1630. memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES "
  1631. << "('" << networkId << "', '" << memberId << "', ";
  1632. if (ipAddr.empty()) {
  1633. memberUpdate << "NULL, ";
  1634. }
  1635. else {
  1636. memberUpdate << "'" << ipAddr << "', ";
  1637. }
  1638. memberUpdate << "TO_TIMESTAMP(" << timestamp << "::double precision/1000)) "
  1639. << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated";
  1640. pipe.insert(memberUpdate.str());
  1641. Metrics::pgsql_node_checkin++;
  1642. }
  1643. while (! pipe.empty()) {
  1644. pipe.retrieve();
  1645. }
  1646. pipe.complete();
  1647. w.commit();
  1648. fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), updateCount);
  1649. #endif
  1650. }
  1651. catch (std::exception& e) {
  1652. fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what());
  1653. }
  1654. _pool->unborrow(c2);
  1655. _pool->unborrow(c);
  1656. ConnectionPoolStats stats = _pool->get_stats();
  1657. fprintf(stderr, "%s pool stats: in use size: %llu, available size: %llu, total: %llu\n", _myAddressStr.c_str(), stats.borrowed_size, stats.pool_size, (stats.borrowed_size + stats.pool_size));
  1658. std::this_thread::sleep_for(std::chrono::seconds(10));
  1659. }
  1660. fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
  1661. if (_run == 1) {
  1662. fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str());
  1663. exit(6);
  1664. }
  1665. }
  1666. void CV1::onlineNotification_Redis()
  1667. {
  1668. _connected = 1;
  1669. char buf[11] = { 0 };
  1670. std::string controllerId = std::string(_myAddress.toString(buf));
  1671. while (_run == 1) {
  1672. fprintf(stderr, "onlineNotification tick\n");
  1673. auto start = std::chrono::high_resolution_clock::now();
  1674. uint64_t count = 0;
  1675. std::unordered_map<std::pair<uint64_t, uint64_t>, NodeOnlineRecord, _PairHasher> lastOnline;
  1676. {
  1677. std::lock_guard<std::mutex> l(_lastOnline_l);
  1678. lastOnline.swap(_lastOnline);
  1679. }
  1680. try {
  1681. if (! lastOnline.empty()) {
  1682. if (_rc->clusterMode) {
  1683. auto tx = _cluster->transaction(controllerId, true, false);
  1684. count = _doRedisUpdate(tx, controllerId, lastOnline);
  1685. }
  1686. else {
  1687. auto tx = _redis->transaction(true, false);
  1688. count = _doRedisUpdate(tx, controllerId, lastOnline);
  1689. }
  1690. }
  1691. }
  1692. catch (sw::redis::Error& e) {
  1693. fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what());
  1694. }
  1695. auto end = std::chrono::high_resolution_clock::now();
  1696. auto dur = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
  1697. auto total = dur.count();
  1698. fprintf(stderr, "onlineNotification ran in %llu ms\n", total);
  1699. std::this_thread::sleep_for(std::chrono::seconds(5));
  1700. }
  1701. }
  1702. uint64_t CV1::_doRedisUpdate(sw::redis::Transaction& tx, std::string& controllerId, std::unordered_map<std::pair<uint64_t, uint64_t>, NodeOnlineRecord, _PairHasher>& lastOnline)
  1703. {
  1704. nlohmann::json jtmp1, jtmp2;
  1705. uint64_t count = 0;
  1706. for (auto i = lastOnline.begin(); i != lastOnline.end(); ++i) {
  1707. uint64_t nwid_i = i->first.first;
  1708. uint64_t memberid_i = i->first.second;
  1709. char nwidTmp[64];
  1710. char memTmp[64];
  1711. char ipTmp[64];
  1712. OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
  1713. OSUtils::ztsnprintf(memTmp, sizeof(memTmp), "%.10llx", memberid_i);
  1714. if (! get(nwid_i, jtmp1, memberid_i, jtmp2)) {
  1715. continue; // skip non existent members/networks
  1716. }
  1717. std::string networkId(nwidTmp);
  1718. std::string memberId(memTmp);
  1719. int64_t ts = i->second.lastSeen;
  1720. std::string ipAddr = i->second.physicalAddress.toIpString(ipTmp);
  1721. std::string timestamp = std::to_string(ts);
  1722. std::string osArch = i->second.osArch;
  1723. std::unordered_map<std::string, std::string> record = { { "id", memberId }, { "address", ipAddr }, { "last_updated", std::to_string(ts) } };
  1724. tx.zadd("nodes-online:{" + controllerId + "}", memberId, ts)
  1725. .zadd("nodes-online2:{" + controllerId + "}", networkId + "-" + memberId, ts)
  1726. .zadd("network-nodes-online:{" + controllerId + "}:" + networkId, memberId, ts)
  1727. .zadd("active-networks:{" + controllerId + "}", networkId, ts)
  1728. .sadd("network-nodes-all:{" + controllerId + "}:" + networkId, memberId)
  1729. .hmset("member:{" + controllerId + "}:" + networkId + ":" + memberId, record.begin(), record.end());
  1730. ++count;
  1731. Metrics::redis_node_checkin++;
  1732. }
  1733. // expire records from all-nodes and network-nodes member list
  1734. uint64_t expireOld = OSUtils::now() - 300000;
  1735. tx.zremrangebyscore("nodes-online:{" + controllerId + "}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
  1736. tx.zremrangebyscore("nodes-online2:{" + controllerId + "}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
  1737. tx.zremrangebyscore("active-networks:{" + controllerId + "}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
  1738. {
  1739. std::shared_lock<std::shared_mutex> l(_networks_l);
  1740. for (const auto& it : _networks) {
  1741. uint64_t nwid_i = it.first;
  1742. char nwidTmp[64];
  1743. OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
  1744. tx.zremrangebyscore("network-nodes-online:{" + controllerId + "}:" + nwidTmp, sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
  1745. }
  1746. }
  1747. tx.exec();
  1748. fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), count);
  1749. return count;
  1750. }
  1751. #endif // ZT_CONTROLLER_USE_LIBPQ