CV2.cpp 43 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249
  1. /*
  2. * Copyright (c)2025 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2026-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #include "CV2.hpp"
  14. #ifdef ZT_CONTROLLER_USE_LIBPQ
  15. #include "../node/Constants.hpp"
  16. #include "../node/SHA512.hpp"
  17. #include "../version.h"
  18. #include "CtlUtil.hpp"
  19. #include "EmbeddedNetworkController.hpp"
  20. #include "opentelemetry/trace/provider.h"
  21. #include <chrono>
  22. #include <climits>
  23. #include <iomanip>
  24. #include <libpq-fe.h>
  25. #include <rustybits.h>
  26. #include <sstream>
  27. using json = nlohmann::json;
  28. namespace {
  29. }
  30. using namespace ZeroTier;
  31. CV2::CV2(const Identity& myId, const char* path, int listenPort) : DB(), _pool(), _myId(myId), _myAddress(myId.address()), _ready(0), _connected(1), _run(1), _waitNoticePrinted(false), _listenPort(listenPort)
  32. {
  33. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  34. auto tracer = provider->GetTracer("cv2");
  35. auto span = tracer->StartSpan("cv2::CV2");
  36. auto scope = tracer->WithActiveSpan(span);
  37. rustybits::init_async_runtime();
  38. fprintf(stderr, "CV2::CV2\n");
  39. char myAddress[64];
  40. _myAddressStr = myId.address().toString(myAddress);
  41. _connString = std::string(path);
  42. auto f = std::make_shared<PostgresConnFactory>(_connString);
  43. _pool = std::make_shared<ConnectionPool<PostgresConnection> >(15, 5, std::static_pointer_cast<ConnectionFactory>(f));
  44. memset(_ssoPsk, 0, sizeof(_ssoPsk));
  45. char* const ssoPskHex = getenv("ZT_SSO_PSK");
  46. #ifdef ZT_TRACE
  47. fprintf(stderr, "ZT_SSO_PSK: %s\n", ssoPskHex);
  48. #endif
  49. if (ssoPskHex) {
  50. // SECURITY: note that ssoPskHex will always be null-terminated if libc actually
  51. // returns something non-NULL. If the hex encodes something shorter than 48 bytes,
  52. // it will be padded at the end with zeroes. If longer, it'll be truncated.
  53. Utils::unhex(ssoPskHex, _ssoPsk, sizeof(_ssoPsk));
  54. }
  55. _readyLock.lock();
  56. fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt());
  57. _waitNoticePrinted = true;
  58. initializeNetworks();
  59. initializeMembers();
  60. _heartbeatThread = std::thread(&CV2::heartbeat, this);
  61. _membersDbWatcher = std::thread(&CV2::membersDbWatcher, this);
  62. _networksDbWatcher = std::thread(&CV2::networksDbWatcher, this);
  63. for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) {
  64. _commitThread[i] = std::thread(&CV2::commitThread, this);
  65. }
  66. _onlineNotificationThread = std::thread(&CV2::onlineNotificationThread, this);
  67. }
  68. CV2::~CV2()
  69. {
  70. rustybits::shutdown_async_runtime();
  71. _run = 0;
  72. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  73. _heartbeatThread.join();
  74. _membersDbWatcher.join();
  75. _networksDbWatcher.join();
  76. _commitQueue.stop();
  77. for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) {
  78. _commitThread[i].join();
  79. }
  80. _onlineNotificationThread.join();
  81. }
  82. bool CV2::waitForReady()
  83. {
  84. while (_ready < 2) {
  85. _readyLock.lock();
  86. _readyLock.unlock();
  87. }
  88. return true;
  89. }
  90. bool CV2::isReady()
  91. {
  92. return (_ready == 2) && _connected;
  93. }
  94. void CV2::_memberChanged(nlohmann::json& old, nlohmann::json& memberConfig, bool notifyListeners)
  95. {
  96. DB::_memberChanged(old, memberConfig, notifyListeners);
  97. }
  98. void CV2::_networkChanged(nlohmann::json& old, nlohmann::json& networkConfig, bool notifyListeners)
  99. {
  100. DB::_networkChanged(old, networkConfig, notifyListeners);
  101. }
  102. bool CV2::save(nlohmann::json& record, bool notifyListeners)
  103. {
  104. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  105. auto tracer = provider->GetTracer("cv2");
  106. auto span = tracer->StartSpan("cv2::save");
  107. auto scope = tracer->WithActiveSpan(span);
  108. bool modified = false;
  109. try {
  110. if (! record.is_object()) {
  111. fprintf(stderr, "record is not an object?!?\n");
  112. return false;
  113. }
  114. const std::string objtype = record["objtype"];
  115. if (objtype == "network") {
  116. auto nspan = tracer->StartSpan("cv2::save::network");
  117. auto nscope = tracer->WithActiveSpan(nspan);
  118. // fprintf(stderr, "network save\n");
  119. const uint64_t nwid = OSUtils::jsonIntHex(record["id"], 0ULL);
  120. if (nwid) {
  121. nlohmann::json old;
  122. get(nwid, old);
  123. if ((! old.is_object()) || (! _compareRecords(old, record))) {
  124. record["revision"] = OSUtils::jsonInt(record["revision"], 0ULL) + 1ULL;
  125. _commitQueue.post(std::pair<nlohmann::json, bool>(record, notifyListeners));
  126. modified = true;
  127. }
  128. }
  129. }
  130. else if (objtype == "member") {
  131. auto mspan = tracer->StartSpan("cv2::save::member");
  132. auto mscope = tracer->WithActiveSpan(mspan);
  133. std::string networkId = record["nwid"];
  134. std::string memberId = record["id"];
  135. const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"], 0ULL);
  136. const uint64_t id = OSUtils::jsonIntHex(record["id"], 0ULL);
  137. // fprintf(stderr, "member save %s-%s\n", networkId.c_str(), memberId.c_str());
  138. if ((id) && (nwid)) {
  139. nlohmann::json network, old;
  140. get(nwid, network, id, old);
  141. if ((! old.is_object()) || (! _compareRecords(old, record))) {
  142. // fprintf(stderr, "commit queue post\n");
  143. record["revision"] = OSUtils::jsonInt(record["revision"], 0ULL) + 1ULL;
  144. _commitQueue.post(std::pair<nlohmann::json, bool>(record, notifyListeners));
  145. modified = true;
  146. }
  147. else {
  148. // fprintf(stderr, "no change\n");
  149. }
  150. }
  151. }
  152. else {
  153. fprintf(stderr, "uhh waaat\n");
  154. }
  155. }
  156. catch (std::exception& e) {
  157. fprintf(stderr, "Error on PostgreSQL::save: %s\n", e.what());
  158. }
  159. catch (...) {
  160. fprintf(stderr, "Unknown error on PostgreSQL::save\n");
  161. }
  162. return modified;
  163. }
  164. void CV2::eraseNetwork(const uint64_t networkId)
  165. {
  166. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  167. auto tracer = provider->GetTracer("cv2");
  168. auto span = tracer->StartSpan("cv2::eraseNetwork");
  169. auto scope = tracer->WithActiveSpan(span);
  170. char networkIdStr[17];
  171. std::string nwid = Utils::hex(networkId, networkIdStr);
  172. span->SetAttribute("network_id", nwid);
  173. fprintf(stderr, "CV2::eraseNetwork\n");
  174. waitForReady();
  175. std::pair<nlohmann::json, bool> tmp;
  176. tmp.first["id"] = nwid;
  177. tmp.first["objtype"] = "_delete_network";
  178. tmp.second = true;
  179. _commitQueue.post(tmp);
  180. // nlohmann::json nullJson;
  181. //_networkChanged(tmp.first, nullJson, isReady());
  182. }
  183. void CV2::eraseMember(const uint64_t networkId, const uint64_t memberId)
  184. {
  185. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  186. auto tracer = provider->GetTracer("cv2");
  187. auto span = tracer->StartSpan("cv2::eraseMember");
  188. auto scope = tracer->WithActiveSpan(span);
  189. char networkIdStr[17];
  190. char memberIdStr[11];
  191. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  192. span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
  193. fprintf(stderr, "PostgreSQL::eraseMember\n");
  194. char tmp2[24];
  195. waitForReady();
  196. std::pair<nlohmann::json, bool> tmp, nw;
  197. Utils::hex(networkId, tmp2);
  198. tmp.first["nwid"] = tmp2;
  199. Utils::hex(memberId, tmp2);
  200. tmp.first["id"] = tmp2;
  201. tmp.first["objtype"] = "_delete_member";
  202. tmp.second = true;
  203. _commitQueue.post(tmp);
  204. // nlohmann::json nullJson;
  205. //_memberChanged(tmp.first, nullJson, isReady());
  206. }
  207. void CV2::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress, const char* osArch)
  208. {
  209. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  210. auto tracer = provider->GetTracer("cv2");
  211. auto span = tracer->StartSpan("cv2::nodeIsOnline");
  212. auto scope = tracer->WithActiveSpan(span);
  213. char networkIdStr[17];
  214. char memberIdStr[11];
  215. char ipAddressStr[INET6_ADDRSTRLEN];
  216. span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
  217. span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
  218. span->SetAttribute("physical_address", ipAddressStr);
  219. span->SetAttribute("os_arch", osArch);
  220. std::lock_guard<std::mutex> l(_lastOnline_l);
  221. NodeOnlineRecord& i = _lastOnline[std::pair<uint64_t, uint64_t>(networkId, memberId)];
  222. i.lastSeen = OSUtils::now();
  223. if (physicalAddress) {
  224. i.physicalAddress = physicalAddress;
  225. }
  226. i.osArch = std::string(osArch);
  227. }
  228. void CV2::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress)
  229. {
  230. this->nodeIsOnline(networkId, memberId, physicalAddress, "unknown/unknown");
  231. }
  232. AuthInfo CV2::getSSOAuthInfo(const nlohmann::json& member, const std::string& redirectURL)
  233. {
  234. // TODO: Redo this for CV2
  235. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  236. auto tracer = provider->GetTracer("cv2");
  237. auto span = tracer->StartSpan("cv2::getSSOAuthInfo");
  238. auto scope = tracer->WithActiveSpan(span);
  239. Metrics::db_get_sso_info++;
  240. // NONCE is just a random character string. no semantic meaning
  241. // state = HMAC SHA384 of Nonce based on shared sso key
  242. //
  243. // need nonce timeout in database? make sure it's used within X time
  244. // X is 5 minutes for now. Make configurable later?
  245. //
  246. // how do we tell when a nonce is used? if auth_expiration_time is set
  247. std::string networkId = member["nwid"];
  248. std::string memberId = member["id"];
  249. char authenticationURL[4096] = { 0 };
  250. AuthInfo info;
  251. info.enabled = true;
  252. // if (memberId == "a10dccea52" && networkId == "8056c2e21c24673d") {
  253. // fprintf(stderr, "invalid authinfo for grant's machine\n");
  254. // info.version=1;
  255. // return info;
  256. // }
  257. // fprintf(stderr, "PostgreSQL::updateMemberOnLoad: %s-%s\n", networkId.c_str(), memberId.c_str());
  258. std::shared_ptr<PostgresConnection> c;
  259. try {
  260. // c = _pool->borrow();
  261. // pqxx::work w(*c->c);
  262. // char nonceBytes[16] = {0};
  263. // std::string nonce = "";
  264. // // check if the member exists first.
  265. // pqxx::row count = w.exec_params1("SELECT count(id) FROM ztc_member WHERE id = $1 AND network_id = $2 AND deleted = false", memberId, networkId);
  266. // if (count[0].as<int>() == 1) {
  267. // // get active nonce, if exists.
  268. // pqxx::result r = w.exec_params("SELECT nonce FROM ztc_sso_expiry "
  269. // "WHERE network_id = $1 AND member_id = $2 "
  270. // "AND ((NOW() AT TIME ZONE 'UTC') <= authentication_expiry_time) AND ((NOW() AT TIME ZONE 'UTC') <= nonce_expiration)",
  271. // networkId, memberId);
  272. // if (r.size() == 0) {
  273. // // no active nonce.
  274. // // find an unused nonce, if one exists.
  275. // pqxx::result r = w.exec_params("SELECT nonce FROM ztc_sso_expiry "
  276. // "WHERE network_id = $1 AND member_id = $2 "
  277. // "AND authentication_expiry_time IS NULL AND ((NOW() AT TIME ZONE 'UTC') <= nonce_expiration)",
  278. // networkId, memberId);
  279. // if (r.size() == 1) {
  280. // // we have an existing nonce. Use it
  281. // nonce = r.at(0)[0].as<std::string>();
  282. // Utils::unhex(nonce.c_str(), nonceBytes, sizeof(nonceBytes));
  283. // } else if (r.empty()) {
  284. // // create a nonce
  285. // Utils::getSecureRandom(nonceBytes, 16);
  286. // char nonceBuf[64] = {0};
  287. // Utils::hex(nonceBytes, sizeof(nonceBytes), nonceBuf);
  288. // nonce = std::string(nonceBuf);
  289. // pqxx::result ir = w.exec_params0("INSERT INTO ztc_sso_expiry "
  290. // "(nonce, nonce_expiration, network_id, member_id) VALUES "
  291. // "($1, TO_TIMESTAMP($2::double precision/1000), $3, $4)",
  292. // nonce, OSUtils::now() + 300000, networkId, memberId);
  293. // w.commit();
  294. // } else {
  295. // // > 1 ?!? Thats an error!
  296. // fprintf(stderr, "> 1 unused nonce!\n");
  297. // exit(6);
  298. // }
  299. // } else if (r.size() == 1) {
  300. // nonce = r.at(0)[0].as<std::string>();
  301. // Utils::unhex(nonce.c_str(), nonceBytes, sizeof(nonceBytes));
  302. // } else {
  303. // // more than 1 nonce in use? Uhhh...
  304. // fprintf(stderr, "> 1 nonce in use for network member?!?\n");
  305. // exit(7);
  306. // }
  307. // r = w.exec_params(
  308. // "SELECT oc.client_id, oc.authorization_endpoint, oc.issuer, oc.provider, oc.sso_impl_version "
  309. // "FROM ztc_network AS n "
  310. // "INNER JOIN ztc_org o "
  311. // " ON o.owner_id = n.owner_id "
  312. // "LEFT OUTER JOIN ztc_network_oidc_config noc "
  313. // " ON noc.network_id = n.id "
  314. // "LEFT OUTER JOIN ztc_oidc_config oc "
  315. // " ON noc.client_id = oc.client_id AND oc.org_id = o.org_id "
  316. // "WHERE n.id = $1 AND n.sso_enabled = true", networkId);
  317. // std::string client_id = "";
  318. // std::string authorization_endpoint = "";
  319. // std::string issuer = "";
  320. // std::string provider = "";
  321. // uint64_t sso_version = 0;
  322. // if (r.size() == 1) {
  323. // client_id = r.at(0)[0].as<std::optional<std::string>>().value_or("");
  324. // authorization_endpoint = r.at(0)[1].as<std::optional<std::string>>().value_or("");
  325. // issuer = r.at(0)[2].as<std::optional<std::string>>().value_or("");
  326. // provider = r.at(0)[3].as<std::optional<std::string>>().value_or("");
  327. // sso_version = r.at(0)[4].as<std::optional<uint64_t>>().value_or(1);
  328. // } else if (r.size() > 1) {
  329. // fprintf(stderr, "ERROR: More than one auth endpoint for an organization?!?!? NetworkID: %s\n", networkId.c_str());
  330. // } else {
  331. // fprintf(stderr, "No client or auth endpoint?!?\n");
  332. // }
  333. // info.version = sso_version;
  334. // // no catch all else because we don't actually care if no records exist here. just continue as normal.
  335. // if ((!client_id.empty())&&(!authorization_endpoint.empty())) {
  336. // uint8_t state[48];
  337. // HMACSHA384(_ssoPsk, nonceBytes, sizeof(nonceBytes), state);
  338. // char state_hex[256];
  339. // Utils::hex(state, 48, state_hex);
  340. // if (info.version == 0) {
  341. // char url[2048] = {0};
  342. // OSUtils::ztsnprintf(url, sizeof(authenticationURL),
  343. // "%s?response_type=id_token&response_mode=form_post&scope=openid+email+profile&redirect_uri=%s&nonce=%s&state=%s&client_id=%s",
  344. // authorization_endpoint.c_str(),
  345. // url_encode(redirectURL).c_str(),
  346. // nonce.c_str(),
  347. // state_hex,
  348. // client_id.c_str());
  349. // info.authenticationURL = std::string(url);
  350. // } else if (info.version == 1) {
  351. // info.ssoClientID = client_id;
  352. // info.issuerURL = issuer;
  353. // info.ssoProvider = provider;
  354. // info.ssoNonce = nonce;
  355. // info.ssoState = std::string(state_hex) + "_" +networkId;
  356. // info.centralAuthURL = redirectURL;
  357. // #ifdef ZT_DEBUG
  358. // fprintf(
  359. // stderr,
  360. // "ssoClientID: %s\nissuerURL: %s\nssoNonce: %s\nssoState: %s\ncentralAuthURL: %s\nprovider: %s\n",
  361. // info.ssoClientID.c_str(),
  362. // info.issuerURL.c_str(),
  363. // info.ssoNonce.c_str(),
  364. // info.ssoState.c_str(),
  365. // info.centralAuthURL.c_str(),
  366. // provider.c_str());
  367. // #endif
  368. // }
  369. // } else {
  370. // fprintf(stderr, "client_id: %s\nauthorization_endpoint: %s\n", client_id.c_str(), authorization_endpoint.c_str());
  371. // }
  372. // }
  373. // _pool->unborrow(c);
  374. }
  375. catch (std::exception& e) {
  376. fprintf(stderr, "ERROR: Error updating member on load for network %s: %s\n", networkId.c_str(), e.what());
  377. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  378. }
  379. return info; // std::string(authenticationURL);
  380. }
  381. void CV2::initializeNetworks()
  382. {
  383. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  384. auto tracer = provider->GetTracer("cv2");
  385. auto span = tracer->StartSpan("cv2::initializeNetworks");
  386. auto scope = tracer->WithActiveSpan(span);
  387. fprintf(stderr, "Initializing networks...\n");
  388. try {
  389. char qbuf[2048];
  390. sprintf(
  391. qbuf,
  392. "SELECT id, name, configuration , (EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000)::bigint, "
  393. "(EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000)::bigint, revision "
  394. "FROM networks_ctl WHERE controller_id = '%s'",
  395. _myAddressStr.c_str());
  396. auto c = _pool->borrow();
  397. pqxx::work w(*c->c);
  398. fprintf(stderr, "Load networks from psql...\n");
  399. auto stream = pqxx::stream_from::query(w, qbuf);
  400. std::tuple<
  401. std::string // network ID
  402. ,
  403. std::optional<std::string> // name
  404. ,
  405. std::string // configuration
  406. ,
  407. std::optional<uint64_t> // creation_time
  408. ,
  409. std::optional<uint64_t> // last_modified
  410. ,
  411. std::optional<uint64_t> // revision
  412. >
  413. row;
  414. uint64_t count = 0;
  415. uint64_t total = 0;
  416. while (stream >> row) {
  417. auto start = std::chrono::high_resolution_clock::now();
  418. json empty;
  419. json config;
  420. initNetwork(config);
  421. std::string nwid = std::get<0>(row);
  422. std::string name = std::get<1>(row).value_or("");
  423. json cfgtmp = json::parse(std::get<2>(row));
  424. std::optional<uint64_t> created_at = std::get<3>(row);
  425. std::optional<uint64_t> last_modified = std::get<4>(row);
  426. std::optional<uint64_t> revision = std::get<5>(row);
  427. config["id"] = nwid;
  428. config["name"] = name;
  429. config["creationTime"] = created_at.value_or(0);
  430. config["lastModified"] = last_modified.value_or(0);
  431. config["revision"] = revision.value_or(0);
  432. config["capabilities"] = cfgtmp["capabilities"].is_array() ? cfgtmp["capabilities"] : json::array();
  433. config["enableBroadcast"] = cfgtmp["enableBroadcast"].is_boolean() ? cfgtmp["enableBroadcast"].get<bool>() : false;
  434. config["mtu"] = cfgtmp["mtu"].is_number() ? cfgtmp["mtu"].get<int32_t>() : 2800;
  435. config["multicastLimit"] = cfgtmp["multicastLimit"].is_number() ? cfgtmp["multicastLimit"].get<int32_t>() : 64;
  436. config["private"] = cfgtmp["private"].is_boolean() ? cfgtmp["private"].get<bool>() : true;
  437. config["remoteTraceLevel"] = cfgtmp["remoteTraceLevel"].is_number() ? cfgtmp["remoteTraceLevel"].get<int32_t>() : 0;
  438. config["remoteTraceTarget"] = cfgtmp["remoteTraceTarget"].is_string() ? cfgtmp["remoteTraceTarget"].get<std::string>() : "";
  439. config["revision"] = revision.value_or(0);
  440. config["rules"] = cfgtmp["rules"].is_array() ? cfgtmp["rules"] : json::array();
  441. config["tags"] = cfgtmp["tags"].is_array() ? cfgtmp["tags"] : json::array();
  442. if (cfgtmp["v4AssignMode"].is_object()) {
  443. config["v4AssignMode"] = cfgtmp["v4AssignMode"];
  444. }
  445. else {
  446. config["v4AssignMode"] = json::object();
  447. config["v4AssignMode"]["zt"] = true;
  448. }
  449. if (cfgtmp["v6AssignMode"].is_object()) {
  450. config["v6AssignMode"] = cfgtmp["v6AssignMode"];
  451. }
  452. else {
  453. config["v6AssignMode"] = json::object();
  454. config["v6AssignMode"]["zt"] = true;
  455. config["v6AssignMode"]["6plane"] = true;
  456. config["v6AssignMode"]["rfc4193"] = false;
  457. }
  458. config["ssoEnabled"] = cfgtmp["ssoEnabled"].is_boolean() ? cfgtmp["ssoEnabled"].get<bool>() : false;
  459. config["objtype"] = "network";
  460. config["routes"] = cfgtmp["routes"].is_array() ? cfgtmp["routes"] : json::array();
  461. config["clientId"] = cfgtmp["clientId"].is_string() ? cfgtmp["clientId"].get<std::string>() : "";
  462. config["authorizationEndpoint"] = cfgtmp["authorizationEndpoint"].is_string() ? cfgtmp["authorizationEndpoint"].get<std::string>() : nullptr;
  463. config["provider"] = cfgtmp["ssoProvider"].is_string() ? cfgtmp["ssoProvider"].get<std::string>() : "";
  464. if (! cfgtmp["dns"].is_object()) {
  465. cfgtmp["dns"] = json::object();
  466. cfgtmp["dns"]["domain"] = "";
  467. cfgtmp["dns"]["servers"] = json::array();
  468. }
  469. else {
  470. config["dns"] = cfgtmp["dns"];
  471. }
  472. config["ipAssignmentPools"] = cfgtmp["ipAssignmentPools"].is_array() ? cfgtmp["ipAssignmentPools"] : json::array();
  473. Metrics::network_count++;
  474. _networkChanged(empty, config, false);
  475. auto end = std::chrono::high_resolution_clock::now();
  476. auto dur = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
  477. ;
  478. total += dur.count();
  479. ++count;
  480. if (count > 0 && count % 10000 == 0) {
  481. fprintf(stderr, "Averaging %lu us per network\n", (total / count));
  482. }
  483. }
  484. w.commit();
  485. _pool->unborrow(c);
  486. fprintf(stderr, "done.\n");
  487. if (++this->_ready == 2) {
  488. if (_waitNoticePrinted) {
  489. fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(), (unsigned long long)_myAddress.toInt());
  490. }
  491. _readyLock.unlock();
  492. }
  493. fprintf(stderr, "network init done\n");
  494. }
  495. catch (std::exception& e) {
  496. fprintf(stderr, "ERROR: Error initializing networks: %s\n", e.what());
  497. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  498. std::this_thread::sleep_for(std::chrono::milliseconds(5000));
  499. exit(-1);
  500. }
  501. }
  502. void CV2::initializeMembers()
  503. {
  504. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  505. auto tracer = provider->GetTracer("cv2");
  506. auto span = tracer->StartSpan("cv2::initializeMembers");
  507. auto scope = tracer->WithActiveSpan(span);
  508. std::string memberId;
  509. std::string networkId;
  510. try {
  511. char qbuf[2048];
  512. sprintf(
  513. qbuf,
  514. "SELECT nm.device_id, nm.network_id, nm.authorized, nm.active_bridge, nm.ip_assignments, nm.no_auto_assign_ips, "
  515. "nm.sso_exempt, (EXTRACT(EPOCH FROM nm.authentication_expiry_time AT TIME ZONE 'UTC')*1000)::bigint, "
  516. "(EXTRACT(EPOCH FROM nm.creation_time AT TIME ZONE 'UTC')*1000)::bigint, nm.identity, "
  517. "(EXTRACT(EPOCH FROM nm.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
  518. "(EXTRACT(EPOCH FROM nm.last_deauthorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
  519. "nm.remote_trace_level, nm.remote_trace_target, nm.revision, nm.capabilities, nm.tags "
  520. "FROM network_memberships_ctl nm "
  521. "INNER JOIN networks_ctl n "
  522. " ON nm.network_id = n.id "
  523. "WHERE n.controller_id = '%s'",
  524. _myAddressStr.c_str());
  525. auto c = _pool->borrow();
  526. pqxx::work w(*c->c);
  527. fprintf(stderr, "Load members from psql...\n");
  528. auto stream = pqxx::stream_from::query(w, qbuf);
  529. std::tuple<
  530. std::string // device ID
  531. ,
  532. std::string // network ID
  533. ,
  534. bool // authorized
  535. ,
  536. std::optional<bool> // active_bridge
  537. ,
  538. std::optional<std::string> // ip_assignments
  539. ,
  540. std::optional<bool> // no_auto_assign_ips
  541. ,
  542. std::optional<bool> // sso_exempt
  543. ,
  544. std::optional<uint64_t> // authentication_expiry_time
  545. ,
  546. std::optional<uint64_t> // creation_time
  547. ,
  548. std::optional<std::string> // identity
  549. ,
  550. std::optional<uint64_t> // last_authorized_time
  551. ,
  552. std::optional<uint64_t> // last_deauthorized_time
  553. ,
  554. std::optional<int32_t> // remote_trace_level
  555. ,
  556. std::optional<std::string> // remote_trace_target
  557. ,
  558. std::optional<uint64_t> // revision
  559. ,
  560. std::optional<std::string> // capabilities
  561. ,
  562. std::optional<std::string> // tags
  563. >
  564. row;
  565. uint64_t count = 0;
  566. uint64_t total = 0;
  567. while (stream >> row) {
  568. auto start = std::chrono::high_resolution_clock::now();
  569. json empty;
  570. json config;
  571. initMember(config);
  572. memberId = std::get<0>(row);
  573. networkId = std::get<1>(row);
  574. bool authorized = std::get<2>(row);
  575. std::optional<bool> active_bridge = std::get<3>(row);
  576. std::string ip_assignments = std::get<4>(row).value_or("");
  577. std::optional<bool> no_auto_assign_ips = std::get<5>(row);
  578. std::optional<bool> sso_exempt = std::get<6>(row);
  579. std::optional<uint64_t> authentication_expiry_time = std::get<7>(row);
  580. std::optional<uint64_t> creation_time = std::get<8>(row);
  581. std::optional<std::string> identity = std::get<9>(row);
  582. std::optional<uint64_t> last_authorized_time = std::get<10>(row);
  583. std::optional<uint64_t> last_deauthorized_time = std::get<11>(row);
  584. std::optional<int32_t> remote_trace_level = std::get<12>(row);
  585. std::optional<std::string> remote_trace_target = std::get<13>(row);
  586. std::optional<uint64_t> revision = std::get<14>(row);
  587. std::optional<std::string> capabilities = std::get<15>(row);
  588. std::optional<std::string> tags = std::get<16>(row);
  589. config["objtype"] = "member";
  590. config["id"] = memberId;
  591. config["address"] = identity.value_or("");
  592. config["nwid"] = networkId;
  593. config["authorized"] = authorized;
  594. config["activeBridge"] = active_bridge.value_or(false);
  595. config["ipAssignments"] = json::array();
  596. if (ip_assignments != "{}") {
  597. std::string tmp = ip_assignments.substr(1, ip_assignments.length() - 2);
  598. std::vector<std::string> addrs = split(tmp, ',');
  599. for (auto it = addrs.begin(); it != addrs.end(); ++it) {
  600. config["ipAssignments"].push_back(*it);
  601. }
  602. }
  603. config["capabilities"] = json::parse(capabilities.value_or("[]"));
  604. config["creationTime"] = creation_time.value_or(0);
  605. config["lastAuthorizedTime"] = last_authorized_time.value_or(0);
  606. config["lastDeauthorizedTime"] = last_deauthorized_time.value_or(0);
  607. config["noAutoAssignIPs"] = no_auto_assign_ips.value_or(false);
  608. config["remoteTraceLevel"] = remote_trace_level.value_or(0);
  609. config["remoteTraceTarget"] = remote_trace_target.value_or(nullptr);
  610. config["revision"] = revision.value_or(0);
  611. config["ssoExempt"] = sso_exempt.value_or(false);
  612. config["authenticationExpiryTime"] = authentication_expiry_time.value_or(0);
  613. config["tags"] = json::parse(tags.value_or("[]"));
  614. Metrics::member_count++;
  615. _memberChanged(empty, config, false);
  616. memberId = "";
  617. networkId = "";
  618. auto end = std::chrono::high_resolution_clock::now();
  619. auto dur = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
  620. total += dur.count();
  621. ++count;
  622. if (count > 0 && count % 10000 == 0) {
  623. fprintf(stderr, "Averaging %lu us per member\n", (total / count));
  624. }
  625. }
  626. if (count > 0) {
  627. fprintf(stderr, "Took %lu us per member to load\n", (total / count));
  628. }
  629. stream.complete();
  630. w.commit();
  631. _pool->unborrow(c);
  632. fprintf(stderr, "done.\n");
  633. if (++this->_ready == 2) {
  634. if (_waitNoticePrinted) {
  635. fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(), (unsigned long long)_myAddress.toInt());
  636. }
  637. _readyLock.unlock();
  638. }
  639. fprintf(stderr, "member init done\n");
  640. }
  641. catch (std::exception& e) {
  642. fprintf(stderr, "ERROR: Error initializing member: %s-%s %s\n", networkId.c_str(), memberId.c_str(), e.what());
  643. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  644. exit(-1);
  645. }
  646. }
  647. void CV2::heartbeat()
  648. {
  649. char publicId[1024];
  650. char hostnameTmp[1024];
  651. _myId.toString(false, publicId);
  652. if (gethostname(hostnameTmp, sizeof(hostnameTmp)) != 0) {
  653. hostnameTmp[0] = (char)0;
  654. }
  655. else {
  656. for (int i = 0; i < (int)sizeof(hostnameTmp); ++i) {
  657. if ((hostnameTmp[i] == '.') || (hostnameTmp[i] == 0)) {
  658. hostnameTmp[i] = (char)0;
  659. break;
  660. }
  661. }
  662. }
  663. const char* controllerId = _myAddressStr.c_str();
  664. const char* publicIdentity = publicId;
  665. const char* hostname = hostnameTmp;
  666. while (_run == 1) {
  667. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  668. auto tracer = provider->GetTracer("cv2");
  669. auto span = tracer->StartSpan("cv2::heartbeat");
  670. auto scope = tracer->WithActiveSpan(span);
  671. auto c = _pool->borrow();
  672. int64_t ts = OSUtils::now();
  673. if (c->c) {
  674. std::string major = std::to_string(ZEROTIER_ONE_VERSION_MAJOR);
  675. std::string minor = std::to_string(ZEROTIER_ONE_VERSION_MINOR);
  676. std::string rev = std::to_string(ZEROTIER_ONE_VERSION_REVISION);
  677. std::string version = major + "." + minor + "." + rev;
  678. std::string versionStr = "v" + version;
  679. try {
  680. pqxx::work w { *c->c };
  681. w.exec_params0(
  682. "INSERT INTO controllers_ctl (id, hostname, last_heartbeat, public_identity, version) VALUES "
  683. "($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5) "
  684. "ON CONFLICT (id) DO UPDATE SET hostname = EXCLUDED.hostname, last_heartbeat = EXCLUDED.last_heartbeat, "
  685. "public_identity = EXCLUDED.public_identity, version = EXCLUDED.version",
  686. controllerId,
  687. hostname,
  688. ts,
  689. publicIdentity,
  690. versionStr);
  691. w.commit();
  692. }
  693. catch (std::exception& e) {
  694. fprintf(stderr, "ERROR: Error in heartbeat: %s\n", e.what());
  695. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  696. continue;
  697. }
  698. catch (...) {
  699. fprintf(stderr, "ERROR: Unknown error in heartbeat\n");
  700. span->SetStatus(opentelemetry::trace::StatusCode::kError, "Unknown error in heartbeat");
  701. continue;
  702. }
  703. }
  704. _pool->unborrow(c);
  705. span->End();
  706. std::this_thread::sleep_for(std::chrono::seconds(1));
  707. }
  708. fprintf(stderr, "Exited heartbeat thread\n");
  709. }
  710. void CV2::membersDbWatcher()
  711. {
  712. auto c = _pool->borrow();
  713. std::string stream = "member_" + _myAddressStr;
  714. fprintf(stderr, "Listening to member stream: %s\n", stream.c_str());
  715. MemberNotificationReceiver<CV2> m(this, *c->c, stream);
  716. while (_run == 1) {
  717. c->c->await_notification(5, 0);
  718. }
  719. _pool->unborrow(c);
  720. fprintf(stderr, "Exited membersDbWatcher\n");
  721. }
  722. void CV2::networksDbWatcher()
  723. {
  724. std::string stream = "network_" + _myAddressStr;
  725. fprintf(stderr, "Listening to member stream: %s\n", stream.c_str());
  726. auto c = _pool->borrow();
  727. NetworkNotificationReceiver<CV2> n(this, *c->c, stream);
  728. while (_run == 1) {
  729. c->c->await_notification(5, 0);
  730. }
  731. _pool->unborrow(c);
  732. fprintf(stderr, "Exited networksDbWatcher\n");
  733. }
  734. void CV2::commitThread()
  735. {
  736. fprintf(stderr, "%s: commitThread start\n", _myAddressStr.c_str());
  737. std::pair<nlohmann::json, bool> qitem;
  738. while (_commitQueue.get(qitem) && (_run == 1)) {
  739. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  740. auto tracer = provider->GetTracer("cv2");
  741. auto span = tracer->StartSpan("cv2::commitThread");
  742. auto scope = tracer->WithActiveSpan(span);
  743. // fprintf(stderr, "commitThread tick\n");
  744. if (! qitem.first.is_object()) {
  745. fprintf(stderr, "not an object\n");
  746. continue;
  747. }
  748. std::shared_ptr<PostgresConnection> c;
  749. try {
  750. c = _pool->borrow();
  751. }
  752. catch (std::exception& e) {
  753. fprintf(stderr, "ERROR: %s\n", e.what());
  754. continue;
  755. }
  756. if (! c) {
  757. fprintf(stderr, "Error getting database connection\n");
  758. continue;
  759. }
  760. Metrics::pgsql_commit_ticks++;
  761. try {
  762. nlohmann::json& config = (qitem.first);
  763. const std::string objtype = config["objtype"];
  764. if (objtype == "member") {
  765. auto mspan = tracer->StartSpan("cv2::commitThread::member");
  766. auto mscope = tracer->WithActiveSpan(span);
  767. // fprintf(stderr, "%s: commitThread: member\n", _myAddressStr.c_str());
  768. std::string memberId;
  769. std::string networkId;
  770. try {
  771. pqxx::work w(*c->c);
  772. memberId = config["id"];
  773. networkId = config["nwid"];
  774. std::string target = "NULL";
  775. if (! config["remoteTraceTarget"].is_null()) {
  776. target = config["remoteTraceTarget"];
  777. }
  778. pqxx::row nwrow = w.exec_params1("SELECT COUNT(id) FROM networks WHERE id = $1", networkId);
  779. int nwcount = nwrow[0].as<int>();
  780. if (nwcount != 1) {
  781. fprintf(stderr, "network %s does not exist. skipping member upsert\n", networkId.c_str());
  782. w.abort();
  783. _pool->unborrow(c);
  784. continue;
  785. }
  786. // only needed for hooks, and no hooks for now
  787. // pqxx::row mrow = w.exec_params1("SELECT COUNT(id) FROM device_networks WHERE device_id = $1 AND network_id = $2", memberId, networkId);
  788. // int membercount = mrow[0].as<int>();
  789. // bool isNewMember = (membercount == 0);
  790. pqxx::result res = w.exec_params0(
  791. "INSERT INTO network_memberships_ctl (device_id, network_id, authorized, active_bridge, ip_assignments, "
  792. "no_auto_assign_ips, sso_exempt, authentication_expiry_time, capabilities, creation_time, "
  793. "identity, last_authorized_time, last_deauthorized_time, "
  794. "remote_trace_level, remote_trace_target, revision, tags, version_major, version_minor, "
  795. "version_revision, version_protocol) "
  796. "VALUES ($1, $2, $3, $4, $5, $6, $7, TO_TIMESTAMP($8::double precision/1000), $9, "
  797. "TO_TIMESTAMP($10::double precision/1000), $11, TO_TIMESTAMP($12::double precision/1000), "
  798. "TO_TIMESTAMP($13::double precision/1000), $14, $15, $16, $17, $18, $19, $20, $21) "
  799. "ON CONFLICT (device_id, network_id) DO UPDATE SET "
  800. "authorized = EXCLUDED.authorized, active_bridge = EXCLUDED.active_bridge, "
  801. "ip_assignments = EXCLUDED.ip_assignments, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, "
  802. "sso_exempt = EXCLUDED.sso_exempt, authentication_expiry_time = EXCLUDED.authentication_expiry_time, "
  803. "capabilities = EXCLUDED.capabilities, creation_time = EXCLUDED.creation_time, "
  804. "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, "
  805. "last_deauthorized_time = EXCLUDED.last_deauthorized_time, "
  806. "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, "
  807. "revision = EXCLUDED.revision, tags = EXCLUDED.tags, version_major = EXCLUDED.version_major, "
  808. "version_minor = EXCLUDED.version_minor, version_revision = EXCLUDED.version_revision, "
  809. "version_protocol = EXCLUDED.version_protocol",
  810. memberId,
  811. networkId,
  812. (bool)config["authorized"],
  813. (bool)config["activeBridge"],
  814. config["ipAssignments"].get<std::vector<std::string> >(),
  815. (bool)config["noAutoAssignIps"],
  816. (bool)config["ssoExempt"],
  817. (uint64_t)config["authenticationExpiryTime"],
  818. OSUtils::jsonDump(config["capabilities"], -1),
  819. (uint64_t)config["creationTime"],
  820. OSUtils::jsonString(config["identity"], ""),
  821. (uint64_t)config["lastAuthorizedTime"],
  822. (uint64_t)config["lastDeauthorizedTime"],
  823. (int)config["remoteTraceLevel"],
  824. target,
  825. (uint64_t)config["revision"],
  826. OSUtils::jsonDump(config["tags"], -1),
  827. (int)config["vMajor"],
  828. (int)config["vMinor"],
  829. (int)config["vRev"],
  830. (int)config["vProto"]);
  831. w.commit();
  832. // No hooks for now
  833. // if (_smee != NULL && isNewMember) {
  834. // pqxx::row row = w.exec_params1(
  835. // "SELECT "
  836. // " count(h.hook_id) "
  837. // "FROM "
  838. // " ztc_hook h "
  839. // " INNER JOIN ztc_org o ON o.org_id = h.org_id "
  840. // " INNER JOIN ztc_network n ON n.owner_id = o.owner_id "
  841. // " WHERE "
  842. // "n.id = $1 ",
  843. // networkId
  844. // );
  845. // int64_t hookCount = row[0].as<int64_t>();
  846. // if (hookCount > 0) {
  847. // notifyNewMember(networkId, memberId);
  848. // }
  849. // }
  850. const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL);
  851. const uint64_t memberidInt = OSUtils::jsonIntHex(config["id"], 0ULL);
  852. if (nwidInt && memberidInt) {
  853. nlohmann::json nwOrig;
  854. nlohmann::json memOrig;
  855. nlohmann::json memNew(config);
  856. get(nwidInt, nwOrig, memberidInt, memOrig);
  857. _memberChanged(memOrig, memNew, qitem.second);
  858. }
  859. else {
  860. fprintf(stderr, "%s: Can't notify of change. Error parsing nwid or memberid: %llu-%llu\n", _myAddressStr.c_str(), (unsigned long long)nwidInt, (unsigned long long)memberidInt);
  861. }
  862. }
  863. catch (pqxx::data_exception& e) {
  864. std::string cfgDump = OSUtils::jsonDump(config, 2);
  865. fprintf(stderr, "Member save %s-%s: %s\n", networkId.c_str(), memberId.c_str(), cfgDump.c_str());
  866. const pqxx::sql_error* s = dynamic_cast<const pqxx::sql_error*>(&e);
  867. fprintf(stderr, "%s ERROR: Error updating member: %s\n", _myAddressStr.c_str(), e.what());
  868. if (s) {
  869. fprintf(stderr, "%s ERROR: SQL error: %s\n", _myAddressStr.c_str(), s->query().c_str());
  870. }
  871. mspan->SetStatus(opentelemetry::trace::StatusCode::kError, "pqxx::data_exception");
  872. mspan->SetAttribute("error", e.what());
  873. mspan->SetAttribute("config", cfgDump);
  874. }
  875. catch (std::exception& e) {
  876. std::string cfgDump = OSUtils::jsonDump(config, 2);
  877. fprintf(stderr, "%s ERROR: Error updating member %s-%s: %s\njsonDump: %s\n", _myAddressStr.c_str(), networkId.c_str(), memberId.c_str(), e.what(), cfgDump.c_str());
  878. mspan->SetStatus(opentelemetry::trace::StatusCode::kError, "std::exception");
  879. mspan->SetAttribute("error", e.what());
  880. mspan->SetAttribute("config", cfgDump);
  881. }
  882. }
  883. else if (objtype == "network") {
  884. auto nspan = tracer->StartSpan("cv2::commitThread::network");
  885. auto nscope = tracer->WithActiveSpan(span);
  886. try {
  887. // fprintf(stderr, "%s: commitThread: network\n", _myAddressStr.c_str());
  888. pqxx::work w(*c->c);
  889. std::string id = config["id"];
  890. // network must already exist
  891. pqxx::result res = w.exec_params0(
  892. "INSERT INTO networks_ctl (id, name, configuration, controller_id, revision) "
  893. "VALUES ($1, $2, $3, $4, $5) "
  894. "ON CONFLICT (id) DO UPDATE SET "
  895. "name = EXCLUDED.name, configuration = EXCLUDED.configuration, revision = EXCLUDED.revision+1",
  896. id,
  897. OSUtils::jsonString(config["name"], ""),
  898. OSUtils::jsonDump(config, -1),
  899. _myAddressStr,
  900. ((uint64_t)config["revision"]));
  901. w.commit();
  902. const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL);
  903. if (nwidInt) {
  904. nlohmann::json nwOrig;
  905. nlohmann::json nwNew(config);
  906. get(nwidInt, nwOrig);
  907. _networkChanged(nwOrig, nwNew, qitem.second);
  908. }
  909. else {
  910. fprintf(stderr, "%s: Can't notify network changed: %llu\n", _myAddressStr.c_str(), (unsigned long long)nwidInt);
  911. }
  912. }
  913. catch (pqxx::data_exception& e) {
  914. const pqxx::sql_error* s = dynamic_cast<const pqxx::sql_error*>(&e);
  915. fprintf(stderr, "%s ERROR: Error updating network: %s\n", _myAddressStr.c_str(), e.what());
  916. if (s) {
  917. fprintf(stderr, "%s ERROR: SQL error: %s\n", _myAddressStr.c_str(), s->query().c_str());
  918. }
  919. nspan->SetStatus(opentelemetry::trace::StatusCode::kError, "pqxx::data_exception");
  920. nspan->SetAttribute("error", e.what());
  921. nspan->SetAttribute("config", OSUtils::jsonDump(config, 2));
  922. }
  923. catch (std::exception& e) {
  924. fprintf(stderr, "%s ERROR: Error updating network: %s\n", _myAddressStr.c_str(), e.what());
  925. nspan->SetStatus(opentelemetry::trace::StatusCode::kError, "std::exception");
  926. nspan->SetAttribute("error", e.what());
  927. nspan->SetAttribute("config", OSUtils::jsonDump(config, 2));
  928. }
  929. }
  930. else if (objtype == "_delete_network") {
  931. auto dspan = tracer->StartSpan("cv2::commitThread::delete_network");
  932. auto dscope = tracer->WithActiveSpan(span);
  933. // fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str());
  934. try {
  935. pqxx::work w(*c->c);
  936. std::string networkId = config["id"];
  937. fprintf(stderr, "Deleting network %s\n", networkId.c_str());
  938. w.exec_params0("DELETE FROM network_memberships_ctl WHERE network_id = $1", networkId);
  939. w.exec_params0("DELETE FROM networks_ctl WHERE id = $1", networkId);
  940. w.commit();
  941. uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL);
  942. json oldConfig;
  943. get(nwidInt, oldConfig);
  944. json empty;
  945. _networkChanged(oldConfig, empty, qitem.second);
  946. }
  947. catch (std::exception& e) {
  948. fprintf(stderr, "%s ERROR: Error deleting network: %s\n", _myAddressStr.c_str(), e.what());
  949. dspan->SetStatus(opentelemetry::trace::StatusCode::kError, "std::exception");
  950. dspan->SetAttribute("error", e.what());
  951. dspan->SetAttribute("config", OSUtils::jsonDump(config, 2));
  952. }
  953. }
  954. else if (objtype == "_delete_member") {
  955. auto dspan = tracer->StartSpan("cv2::commitThread::delete_member");
  956. auto dscope = tracer->WithActiveSpan(span);
  957. // fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str());
  958. try {
  959. pqxx::work w(*c->c);
  960. std::string memberId = config["id"];
  961. std::string networkId = config["nwid"];
  962. pqxx::result res = w.exec_params0("DELETE FROM network_memberships_ctl WHERE device_id = $1 AND network_id = $2", memberId, networkId);
  963. w.commit();
  964. uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL);
  965. uint64_t memberidInt = OSUtils::jsonIntHex(config["id"], 0ULL);
  966. nlohmann::json networkConfig;
  967. nlohmann::json oldConfig;
  968. get(nwidInt, networkConfig, memberidInt, oldConfig);
  969. json empty;
  970. _memberChanged(oldConfig, empty, qitem.second);
  971. }
  972. catch (std::exception& e) {
  973. fprintf(stderr, "%s ERROR: Error deleting member: %s\n", _myAddressStr.c_str(), e.what());
  974. dspan->SetStatus(opentelemetry::trace::StatusCode::kError, "std::exception");
  975. dspan->SetAttribute("error", e.what());
  976. dspan->SetAttribute("config", OSUtils::jsonDump(config, 2));
  977. }
  978. }
  979. else {
  980. fprintf(stderr, "%s ERROR: unknown objtype\n", _myAddressStr.c_str());
  981. }
  982. }
  983. catch (std::exception& e) {
  984. fprintf(stderr, "%s ERROR: Error getting objtype: %s\n", _myAddressStr.c_str(), e.what());
  985. span->SetStatus(opentelemetry::trace::StatusCode::kError, "std::exception");
  986. span->SetAttribute("error", e.what());
  987. }
  988. _pool->unborrow(c);
  989. c.reset();
  990. }
  991. fprintf(stderr, "%s commitThread finished\n", _myAddressStr.c_str());
  992. }
  993. void CV2::onlineNotificationThread()
  994. {
  995. waitForReady();
  996. _connected = 1;
  997. nlohmann::json jtmp1, jtmp2;
  998. while (_run == 1) {
  999. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  1000. auto tracer = provider->GetTracer("cv2");
  1001. auto span = tracer->StartSpan("cv2::onlineNotificationThread");
  1002. auto scope = tracer->WithActiveSpan(span);
  1003. auto c = _pool->borrow();
  1004. auto c2 = _pool->borrow();
  1005. try {
  1006. fprintf(stderr, "%s onlineNotificationThread\n", _myAddressStr.c_str());
  1007. std::unordered_map<std::pair<uint64_t, uint64_t>, NodeOnlineRecord, _PairHasher> lastOnline;
  1008. {
  1009. std::lock_guard<std::mutex> l(_lastOnline_l);
  1010. lastOnline.swap(_lastOnline);
  1011. }
  1012. pqxx::work w(*c->c);
  1013. pqxx::work w2(*c2->c);
  1014. bool firstRun = true;
  1015. bool memberAdded = false;
  1016. uint64_t updateCount = 0;
  1017. pqxx::pipeline pipe(w);
  1018. for (auto i = lastOnline.begin(); i != lastOnline.end(); ++i) {
  1019. updateCount++;
  1020. uint64_t nwid_i = i->first.first;
  1021. char nwidTmp[64];
  1022. char memTmp[64];
  1023. char ipTmp[64];
  1024. OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
  1025. OSUtils::ztsnprintf(memTmp, sizeof(memTmp), "%.10llx", i->first.second);
  1026. if (! get(nwid_i, jtmp1, i->first.second, jtmp2)) {
  1027. continue; // skip non existent networks/members
  1028. }
  1029. std::string networkId(nwidTmp);
  1030. std::string memberId(memTmp);
  1031. try {
  1032. pqxx::row r = w2.exec_params1("SELECT device_id, network_id FROM network_memberships_ctl WHERE network_id = $1 AND device_id = $2", networkId, memberId);
  1033. }
  1034. catch (pqxx::unexpected_rows& e) {
  1035. continue;
  1036. }
  1037. int64_t ts = i->second.lastSeen;
  1038. std::string ipAddr = i->second.physicalAddress.toIpString(ipTmp);
  1039. std::string timestamp = std::to_string(ts);
  1040. std::string osArch = i->second.osArch;
  1041. std::vector<std::string> osArchSplit = split(osArch, '/');
  1042. std::string os = osArchSplit[0];
  1043. std::string arch = osArchSplit[1];
  1044. if (ipAddr.empty()) {
  1045. ipAddr = "relayed";
  1046. }
  1047. json record = {
  1048. { ipAddr, ts },
  1049. };
  1050. std::string device_network_insert = "INSERT INTO network_memberships_ctl (device_id, network_id, last_seen, os, arch) "
  1051. "VALUES ('"
  1052. + w2.esc(memberId) + "', '" + w2.esc(networkId) + "', '" + w2.esc(record.dump())
  1053. + "'::JSONB, "
  1054. "'"
  1055. + w2.esc(os) + "', '" + w2.esc(arch)
  1056. + "') "
  1057. "ON CONFLICT (device_id, network_id) DO UPDATE SET os = EXCLUDED.os, arch = EXCLUDED.arch, "
  1058. "last_seen = network_memberships_ctl.last_seen || EXCLUDED.last_seen";
  1059. pipe.insert(device_network_insert);
  1060. Metrics::pgsql_node_checkin++;
  1061. }
  1062. pipe.complete();
  1063. ;
  1064. w2.commit();
  1065. w.commit();
  1066. fprintf(stderr, "%s: Updated online status of %lu members\n", _myAddressStr.c_str(), updateCount);
  1067. }
  1068. catch (std::exception& e) {
  1069. fprintf(stderr, "%s ERROR: Error in onlineNotificationThread: %s\n", _myAddressStr.c_str(), e.what());
  1070. span->SetStatus(opentelemetry::trace::StatusCode::kError, "std::exception");
  1071. span->SetAttribute("error", e.what());
  1072. }
  1073. catch (...) {
  1074. fprintf(stderr, "%s ERROR: Unknown error in onlineNotificationThread\n", _myAddressStr.c_str());
  1075. span->SetStatus(opentelemetry::trace::StatusCode::kError, "unknown");
  1076. }
  1077. _pool->unborrow(c2);
  1078. _pool->unborrow(c);
  1079. span->End();
  1080. std::this_thread::sleep_for(std::chrono::seconds(10));
  1081. }
  1082. fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
  1083. if (_run == 1) {
  1084. fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str());
  1085. exit(6);
  1086. }
  1087. }
  1088. #endif // ZT_CONTROLLER_USE_LIBPQ