PubSubListener.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589
  1. #ifdef ZT_CONTROLLER_USE_LIBPQ
  2. #include "PubSubListener.hpp"
  3. #include "ControllerConfig.hpp"
  4. #include "CtlUtil.hpp"
  5. #include "DB.hpp"
  6. #include "member.pb.h"
  7. #include "network.pb.h"
  8. #include "opentelemetry/trace/provider.h"
  9. #include "rustybits.h"
  10. #include <google/cloud/pubsub/admin/subscription_admin_client.h>
  11. #include <google/cloud/pubsub/admin/subscription_admin_connection.h>
  12. #include <google/cloud/pubsub/admin/topic_admin_client.h>
  13. #include <google/cloud/pubsub/message.h>
  14. #include <google/cloud/pubsub/subscriber.h>
  15. #include <google/cloud/pubsub/subscription.h>
  16. #include <google/cloud/pubsub/topic.h>
  17. #include <nlohmann/json.hpp>
  18. namespace pubsub = ::google::cloud::pubsub;
  19. namespace pubsub_admin = ::google::cloud::pubsub_admin;
  20. namespace ZeroTier {
  21. nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc, pbmessages::NetworkChange_ChangeSource source);
  22. nlohmann::json toJson(const pbmessages::MemberChange_Member& mc, pbmessages::MemberChange_ChangeSource source);
  23. PubSubListener::PubSubListener(std::string controller_id, std::string project, std::string topic)
  24. : _controller_id(controller_id)
  25. , _project(project)
  26. , _topic(topic)
  27. , _subscription_id()
  28. , _run(false)
  29. , _adminClient(pubsub_admin::MakeSubscriptionAdminConnection())
  30. , _subscription(nullptr)
  31. {
  32. GOOGLE_PROTOBUF_VERIFY_VERSION;
  33. _subscription_id = "sub-" + controller_id + "-" + topic; // + "-" + random_hex_string(8);
  34. _subscription = new pubsub::Subscription(_project, _subscription_id);
  35. fprintf(
  36. stderr, "PubSubListener for controller %s project %s topic %s subscription %s\n", controller_id.c_str(),
  37. project.c_str(), topic.c_str(), _subscription_id.c_str());
  38. // If PUBSUB_EMULATOR_HOST is set, create the topic if it doesn't exist
  39. const char* emulatorHost = std::getenv("PUBSUB_EMULATOR_HOST");
  40. if (emulatorHost != nullptr) {
  41. create_gcp_pubsub_topic_if_needed(project, topic);
  42. create_gcp_pubsub_subscription_if_needed(_project, _subscription_id, _topic, _controller_id);
  43. }
  44. _subscriber = std::make_shared<pubsub::Subscriber>(pubsub::MakeSubscriberConnection(*_subscription));
  45. _run = true;
  46. _subscriberThread = std::thread(&PubSubListener::subscribe, this);
  47. }
  48. PubSubListener::~PubSubListener()
  49. {
  50. _run = false;
  51. if (_subscriberThread.joinable()) {
  52. _subscriberThread.join();
  53. }
  54. if (_subscription) {
  55. delete _subscription;
  56. _subscription = nullptr;
  57. }
  58. }
  59. void PubSubListener::subscribe()
  60. {
  61. while (_run) {
  62. try {
  63. fprintf(stderr, "Starting new subscription session\n");
  64. auto session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) {
  65. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  66. auto tracer = provider->GetTracer("PubSubListener");
  67. auto span = tracer->StartSpan("PubSubListener::onMessage");
  68. auto scope = tracer->WithActiveSpan(span);
  69. span->SetAttribute("message_id", m.message_id());
  70. span->SetAttribute("ordering_key", m.ordering_key());
  71. fprintf(stderr, "Received message %s\n", m.message_id().c_str());
  72. if (onNotification(m.data())) {
  73. std::move(h).ack();
  74. span->SetStatus(opentelemetry::trace::StatusCode::kOk);
  75. return true;
  76. }
  77. else {
  78. span->SetStatus(opentelemetry::trace::StatusCode::kError, "onNotification failed");
  79. return false;
  80. }
  81. });
  82. auto result = session.wait_for(std::chrono::seconds(10));
  83. if (result == std::future_status::timeout) {
  84. session.cancel();
  85. continue;
  86. }
  87. if (! session.valid()) {
  88. fprintf(stderr, "Subscription session no longer valid\n");
  89. continue;
  90. }
  91. }
  92. catch (google::cloud::Status const& status) {
  93. fprintf(stderr, "Subscription terminated with status: %s\n", status.message().c_str());
  94. }
  95. }
  96. }
  97. PubSubNetworkListener::PubSubNetworkListener(std::string controller_id, std::string project, std::string topic, DB* db)
  98. : PubSubListener(controller_id, project, topic)
  99. , _db(db)
  100. {
  101. }
  102. PubSubNetworkListener::~PubSubNetworkListener()
  103. {
  104. }
  105. bool PubSubNetworkListener::onNotification(const std::string& payload)
  106. {
  107. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  108. auto tracer = provider->GetTracer("PubSubNetworkListener");
  109. auto span = tracer->StartSpan("PubSubNetworkListener::onNotification");
  110. auto scope = tracer->WithActiveSpan(span);
  111. pbmessages::NetworkChange nc;
  112. if (! nc.ParseFromString(payload)) {
  113. fprintf(stderr, "Failed to parse NetworkChange protobuf message\n");
  114. span->SetAttribute("error", "Failed to parse NetworkChange protobuf message");
  115. span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf");
  116. return false;
  117. }
  118. fprintf(stderr, "PubSubNetworkListener: parsed protobuf message. %s\n", nc.DebugString().c_str());
  119. fprintf(stderr, "Network notification received\n");
  120. try {
  121. nlohmann::json oldConfig, newConfig;
  122. if (nc.has_old()) {
  123. fprintf(stderr, "has old network config\n");
  124. oldConfig = toJson(nc.old(), nc.change_source());
  125. }
  126. if (nc.has_new_()) {
  127. fprintf(stderr, "has new network config\n");
  128. newConfig = toJson(nc.new_(), nc.change_source());
  129. }
  130. if (! nc.has_old() && ! nc.has_new_()) {
  131. fprintf(stderr, "NetworkChange message has no old or new network config\n");
  132. span->SetAttribute("error", "NetworkChange message has no old or new network config");
  133. span->SetStatus(opentelemetry::trace::StatusCode::kError, "No old or new config");
  134. return false;
  135. }
  136. if (oldConfig.is_object() && newConfig.is_object()) {
  137. // network modification
  138. std::string nwid = oldConfig["id"].get<std::string>();
  139. span->SetAttribute("action", "network_change");
  140. span->SetAttribute("network_id", nwid);
  141. _db->save(newConfig, _db->isReady());
  142. }
  143. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  144. // new network
  145. std::string nwid = newConfig["id"];
  146. span->SetAttribute("network_id", nwid);
  147. span->SetAttribute("action", "new_network");
  148. _db->save(newConfig, _db->isReady());
  149. }
  150. else if (! newConfig.is_object() && oldConfig.is_object()) {
  151. // network deletion
  152. std::string nwid = oldConfig["id"];
  153. span->SetAttribute("action", "delete_network");
  154. span->SetAttribute("network_id", nwid);
  155. uint64_t networkId = Utils::hexStrToU64(nwid.c_str());
  156. if (networkId) {
  157. _db->eraseNetwork(networkId);
  158. }
  159. }
  160. }
  161. catch (const nlohmann::json::parse_error& e) {
  162. fprintf(stderr, "PubSubNetworkListener JSON parse error: %s\n", e.what());
  163. span->SetAttribute("error", e.what());
  164. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  165. fprintf(stderr, "payload: %s\n", payload.c_str());
  166. return false;
  167. }
  168. catch (const std::exception& e) {
  169. fprintf(stderr, "PubSubNetworkListener Exception in PubSubNetworkListener: %s\n", e.what());
  170. span->SetAttribute("error", e.what());
  171. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  172. return false;
  173. }
  174. catch (...) {
  175. fprintf(stderr, "PubSubNetworkListener Unknown exception in PubSubNetworkListener\n");
  176. span->SetAttribute("error", "Unknown exception in PubSubNetworkListener");
  177. span->SetStatus(opentelemetry::trace::StatusCode::kError, "Unknown exception");
  178. return false;
  179. }
  180. fprintf(stderr, "PubSubNetworkListener onNotification complete\n");
  181. return true;
  182. }
  183. PubSubMemberListener::PubSubMemberListener(std::string controller_id, std::string project, std::string topic, DB* db)
  184. : PubSubListener(controller_id, project, topic)
  185. , _db(db)
  186. {
  187. }
  188. PubSubMemberListener::~PubSubMemberListener()
  189. {
  190. }
  191. bool PubSubMemberListener::onNotification(const std::string& payload)
  192. {
  193. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  194. auto tracer = provider->GetTracer("PubSubMemberListener");
  195. auto span = tracer->StartSpan("PubSubMemberListener::onNotification");
  196. auto scope = tracer->WithActiveSpan(span);
  197. pbmessages::MemberChange mc;
  198. if (! mc.ParseFromString(payload)) {
  199. fprintf(stderr, "Failed to parse MemberChange protobuf message\n");
  200. span->SetAttribute("error", "Failed to parse MemberChange protobuf message");
  201. span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf");
  202. return false;
  203. }
  204. fprintf(stderr, "PubSubMemberListener: parsed protobuf message. %s\n", mc.DebugString().c_str());
  205. fprintf(stderr, "Member notification received");
  206. try {
  207. nlohmann::json tmp;
  208. nlohmann::json oldConfig, newConfig;
  209. if (mc.has_old()) {
  210. fprintf(stderr, "has old member config\n");
  211. oldConfig = toJson(mc.old(), mc.change_source());
  212. }
  213. if (mc.has_new_()) {
  214. fprintf(stderr, "has new member config\n");
  215. newConfig = toJson(mc.new_(), mc.change_source());
  216. }
  217. if (! mc.has_old() && ! mc.has_new_()) {
  218. fprintf(stderr, "MemberChange message has no old or new member config\n");
  219. span->SetAttribute("error", "MemberChange message has no old or new member config");
  220. span->SetStatus(opentelemetry::trace::StatusCode::kError, "No old or new config");
  221. return false;
  222. }
  223. if (oldConfig.is_object() && newConfig.is_object()) {
  224. // member modification
  225. std::string memberID = oldConfig["id"].get<std::string>();
  226. std::string networkID = oldConfig["nwid"].get<std::string>();
  227. span->SetAttribute("action", "member_change");
  228. span->SetAttribute("member_id", memberID);
  229. span->SetAttribute("network_id", networkID);
  230. _db->save(newConfig, _db->isReady());
  231. }
  232. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  233. // new member
  234. std::string memberID = newConfig["id"].get<std::string>();
  235. std::string networkID = newConfig["nwid"].get<std::string>();
  236. span->SetAttribute("action", "new_member");
  237. span->SetAttribute("member_id", memberID);
  238. span->SetAttribute("network_id", networkID);
  239. _db->save(newConfig, _db->isReady());
  240. }
  241. else if (! newConfig.is_object() && oldConfig.is_object()) {
  242. // member deletion
  243. std::string memberID = oldConfig["id"].get<std::string>();
  244. std::string networkID = oldConfig["nwid"].get<std::string>();
  245. span->SetAttribute("action", "delete_member");
  246. span->SetAttribute("member_id", memberID);
  247. span->SetAttribute("network_id", networkID);
  248. uint64_t networkId = Utils::hexStrToU64(networkID.c_str());
  249. uint64_t memberId = Utils::hexStrToU64(memberID.c_str());
  250. if (networkId && memberId) {
  251. _db->eraseMember(networkId, memberId);
  252. }
  253. }
  254. }
  255. catch (const nlohmann::json::parse_error& e) {
  256. fprintf(stderr, "PubSubMemberListener JSON parse error: %s\n", e.what());
  257. span->SetAttribute("error", e.what());
  258. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  259. fprintf(stderr, "payload: %s\n", payload.c_str());
  260. return false;
  261. }
  262. catch (const std::exception& e) {
  263. fprintf(stderr, "PubSubMemberListener Exception in PubSubMemberListener: %s\n", e.what());
  264. span->SetAttribute("error", e.what());
  265. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  266. return false;
  267. }
  268. return true;
  269. }
  270. nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc, pbmessages::NetworkChange_ChangeSource source)
  271. {
  272. nlohmann::json out;
  273. out["objtype"] = "network";
  274. out["id"] = nc.network_id();
  275. out["name"] = nc.name();
  276. try {
  277. std::string caps = nc.capabilities();
  278. if (caps.length() == 0) {
  279. out["capabilities"] = nlohmann::json::array();
  280. }
  281. else if (caps == "null") {
  282. out["capabilities"] = nlohmann::json::array();
  283. }
  284. else {
  285. out["capabilities"] = OSUtils::jsonParse(caps);
  286. }
  287. }
  288. catch (const nlohmann::json::parse_error& e) {
  289. fprintf(stderr, "toJson Network capabilities JSON parse error: %s\n", e.what());
  290. out["capabilities"] = nlohmann::json::array();
  291. }
  292. out["mtu"] = nc.mtu();
  293. out["multicastLimit"] = nc.multicast_limit();
  294. out["private"] = nc.is_private();
  295. out["remoteTraceLevel"] = nc.remote_trace_level();
  296. if (nc.has_remote_trace_target()) {
  297. out["remoteTraceTarget"] = nc.remote_trace_target();
  298. }
  299. else {
  300. out["remoteTraceTarget"] = "";
  301. }
  302. try {
  303. std::string rules = nc.rules();
  304. if (rules.length() == 0) {
  305. out["rules"] = nlohmann::json::array();
  306. }
  307. else if (rules == "null") {
  308. out["rules"] = nlohmann::json::array();
  309. }
  310. else {
  311. out["rules"] = OSUtils::jsonParse(rules);
  312. }
  313. }
  314. catch (const nlohmann::json::parse_error& e) {
  315. fprintf(stderr, "toJson Network rules JSON parse error: %s\n", e.what());
  316. out["rules"] = nlohmann::json::array();
  317. }
  318. out["rulesSource"] = nc.rules_source();
  319. try {
  320. std::string tags = nc.tags();
  321. if (tags.length() == 0) {
  322. out["tags"] = nlohmann::json::array();
  323. }
  324. else if (tags == "[]") {
  325. out["tags"] = nlohmann::json::array();
  326. }
  327. else {
  328. out["tags"] = OSUtils::jsonParse(tags);
  329. }
  330. }
  331. catch (const nlohmann::json::parse_error& e) {
  332. fprintf(stderr, "toJson Network tags JSON parse error: %s\n", e.what());
  333. out["tags"] = nlohmann::json::array();
  334. }
  335. if (nc.has_ipv4_assign_mode()) {
  336. nlohmann::json ipv4mode;
  337. ipv4mode["zt"] = nc.ipv4_assign_mode().zt();
  338. out["v4AssignMode"] = ipv4mode;
  339. }
  340. else {
  341. nlohmann::json ipv4mode = nlohmann::json::object();
  342. out["zt"] = false;
  343. out["v4AssignMode"] = ipv4mode;
  344. }
  345. if (nc.has_ipv6_assign_mode()) {
  346. nlohmann::json ipv6mode;
  347. ipv6mode["6plane"] = nc.ipv6_assign_mode().six_plane();
  348. ipv6mode["rfc4193"] = nc.ipv6_assign_mode().rfc4193();
  349. ipv6mode["zt"] = nc.ipv6_assign_mode().zt();
  350. out["v6AssignMode"] = ipv6mode;
  351. }
  352. else {
  353. nlohmann::json ipv6mode = nlohmann::json::object();
  354. ipv6mode["6plane"] = false;
  355. ipv6mode["rfc4193"] = false;
  356. ipv6mode["zt"] = false;
  357. out["v6AssignMode"] = ipv6mode;
  358. }
  359. if (nc.assignment_pools_size() > 0) {
  360. nlohmann::json pools = nlohmann::json::array();
  361. for (const auto& p : nc.assignment_pools()) {
  362. nlohmann::json pool;
  363. pool["ipRangeStart"] = p.start_ip();
  364. pool["ipRangeEnd"] = p.end_ip();
  365. pools.push_back(pool);
  366. }
  367. out["ipAssignmentPools"] = pools;
  368. }
  369. else {
  370. out["ipAssignmentPools"] = nlohmann::json::array();
  371. }
  372. if (nc.routes_size() > 0) {
  373. nlohmann::json routes = nlohmann::json::array();
  374. for (const auto& r : nc.routes()) {
  375. nlohmann::json route;
  376. std::string target = r.target();
  377. if (target.length() > 0) {
  378. route["target"] = r.target();
  379. if (r.has_via()) {
  380. route["via"] = r.via();
  381. }
  382. else {
  383. route["via"] = nullptr;
  384. }
  385. routes.push_back(route);
  386. }
  387. }
  388. out["routes"] = routes;
  389. }
  390. if (nc.has_dns()) {
  391. nlohmann::json dns;
  392. if (nc.dns().nameservers_size() > 0) {
  393. nlohmann::json servers = nlohmann::json::array();
  394. for (const auto& s : nc.dns().nameservers()) {
  395. servers.push_back(s);
  396. }
  397. dns["servers"] = servers;
  398. }
  399. else {
  400. dns["servers"] = nlohmann::json::array();
  401. }
  402. dns["domain"] = nc.dns().domain();
  403. out["dns"] = dns;
  404. }
  405. out["ssoEnabled"] = nc.sso_enabled();
  406. nlohmann::json sso;
  407. if (nc.sso_enabled()) {
  408. sso = nlohmann::json::object();
  409. if (nc.has_sso_client_id()) {
  410. sso["ssoClientId"] = nc.sso_client_id();
  411. }
  412. if (nc.has_sso_authorization_endpoint()) {
  413. sso["ssoAuthorizationEndpoint"] = nc.sso_authorization_endpoint();
  414. }
  415. if (nc.has_sso_issuer()) {
  416. sso["ssoIssuer"] = nc.sso_issuer();
  417. }
  418. if (nc.has_sso_provider()) {
  419. sso["ssoProvider"] = nc.sso_provider();
  420. }
  421. }
  422. out["ssoConfig"] = sso;
  423. switch (source) {
  424. case pbmessages::NetworkChange_ChangeSource_CV1:
  425. out["change_source"] = "cv1";
  426. break;
  427. case pbmessages::NetworkChange_ChangeSource_CV2:
  428. out["change_source"] = "cv2";
  429. break;
  430. case pbmessages::NetworkChange_ChangeSource_CONTROLLER:
  431. out["change_source"] = "controller";
  432. break;
  433. default:
  434. out["change_source"] = "unknown";
  435. break;
  436. }
  437. return out;
  438. }
  439. nlohmann::json toJson(const pbmessages::MemberChange_Member& mc, pbmessages::MemberChange_ChangeSource source)
  440. {
  441. nlohmann::json out;
  442. out["objtype"] = "member";
  443. out["id"] = mc.device_id();
  444. out["nwid"] = mc.network_id();
  445. if (mc.has_remote_trace_target()) {
  446. out["remoteTraceTarget"] = mc.remote_trace_target();
  447. }
  448. else {
  449. out["remoteTraceTarget"] = "";
  450. }
  451. out["authorized"] = mc.authorized();
  452. out["activeBridge"] = mc.active_bridge();
  453. auto ipAssignments = mc.ip_assignments();
  454. if (ipAssignments.size() > 0) {
  455. nlohmann::json assignments = nlohmann::json::array();
  456. for (const auto& ip : ipAssignments) {
  457. assignments.push_back(ip);
  458. }
  459. out["ipAssignments"] = assignments;
  460. }
  461. out["noAutoAssignIps"] = mc.no_auto_assign_ips();
  462. out["ssoExempt"] = mc.sso_exempt();
  463. out["authenticationExpiryTime"] = mc.auth_expiry_time();
  464. try {
  465. std::string caps = mc.capabilities();
  466. if (caps.length() == 0) {
  467. out["capabilities"] = nlohmann::json::array();
  468. }
  469. else if (caps == "null") {
  470. out["capabilities"] = nlohmann::json::array();
  471. }
  472. else {
  473. out["capabilities"] = OSUtils::jsonParse(caps);
  474. }
  475. }
  476. catch (const nlohmann::json::parse_error& e) {
  477. fprintf(stderr, "MemberChange member capabilities JSON parse error: %s\n", e.what());
  478. fprintf(stderr, "capabilities: %s\n", mc.capabilities().c_str());
  479. out["capabilities"] = nlohmann::json::array();
  480. }
  481. out["creationTime"] = mc.creation_time();
  482. out["identity"] = mc.identity();
  483. out["lastAuthorizedTime"] = mc.last_authorized_time();
  484. out["lastDeauthorizedTime"] = mc.last_deauthorized_time();
  485. out["remoteTraceLevel"] = mc.remote_trace_level();
  486. out["revision"] = mc.revision();
  487. try {
  488. std::string tags = mc.tags();
  489. if (tags.length() == 0) {
  490. out["tags"] = nlohmann::json::array();
  491. }
  492. else if (tags == "null") {
  493. out["tags"] = nlohmann::json::array();
  494. }
  495. else {
  496. out["tags"] = OSUtils::jsonParse(tags);
  497. }
  498. }
  499. catch (const nlohmann::json::parse_error& e) {
  500. fprintf(stderr, "MemberChange member tags JSON parse error: %s\n", e.what());
  501. fprintf(stderr, "tags: %s\n", mc.tags().c_str());
  502. out["tags"] = nlohmann::json::array();
  503. }
  504. out["versionMajor"] = mc.version_major();
  505. out["versionMinor"] = mc.version_minor();
  506. out["versionRev"] = mc.version_rev();
  507. out["versionProtocol"] = mc.version_protocol();
  508. switch (source) {
  509. case pbmessages::MemberChange_ChangeSource_CV1:
  510. out["change_source"] = "cv1";
  511. break;
  512. case pbmessages::MemberChange_ChangeSource_CV2:
  513. out["change_source"] = "cv2";
  514. break;
  515. case pbmessages::MemberChange_ChangeSource_CONTROLLER:
  516. out["change_source"] = "controller";
  517. break;
  518. default:
  519. out["change_source"] = "unknown";
  520. break;
  521. }
  522. return out;
  523. }
  524. } // namespace ZeroTier
  525. #endif // ZT_CONTROLLER_USE_LIBPQ