PubSubListener.cpp 14 KB


  1. #ifdef ZT_CONTROLLER_USE_LIBPQ
  2. #include "PubSubListener.hpp"
  3. #include "ControllerConfig.hpp"
  4. #include "CtlUtil.hpp"
  5. #include "DB.hpp"
  6. #include "member.pb.h"
  7. #include "network.pb.h"
  8. #include "opentelemetry/trace/provider.h"
  9. #include "rustybits.h"
  10. #include <google/cloud/pubsub/admin/subscription_admin_client.h>
  11. #include <google/cloud/pubsub/admin/subscription_admin_connection.h>
  12. #include <google/cloud/pubsub/admin/topic_admin_client.h>
  13. #include <google/cloud/pubsub/message.h>
  14. #include <google/cloud/pubsub/subscriber.h>
  15. #include <google/cloud/pubsub/subscription.h>
  16. #include <google/cloud/pubsub/topic.h>
  17. #include <nlohmann/json.hpp>
  18. namespace pubsub = ::google::cloud::pubsub;
  19. namespace pubsub_admin = ::google::cloud::pubsub_admin;
  20. namespace ZeroTier {
  21. nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc, pbmessages::NetworkChange_ChangeSource source);
  22. nlohmann::json toJson(const pbmessages::MemberChange_Member& mc, pbmessages::MemberChange_ChangeSource source);
  23. PubSubListener::PubSubListener(std::string controller_id, std::string project, std::string topic)
  24. : _controller_id(controller_id)
  25. , _project(project)
  26. , _topic(topic)
  27. , _subscription_id("sub-" + controller_id + "-" + topic)
  28. , _run(false)
  29. , _adminClient(pubsub_admin::MakeSubscriptionAdminConnection())
  30. , _subscription(pubsub::Subscription(_project, _subscription_id))
  31. {
  32. fprintf(
  33. stderr, "PubSubListener for controller %s project %s topic %s subscription %s\n", controller_id.c_str(),
  34. project.c_str(), topic.c_str(), _subscription_id.c_str());
  35. GOOGLE_PROTOBUF_VERIFY_VERSION;
  36. // If PUBSUB_EMULATOR_HOST is set, create the topic if it doesn't exist
  37. const char* emulatorHost = std::getenv("PUBSUB_EMULATOR_HOST");
  38. if (emulatorHost != nullptr) {
  39. create_gcp_pubsub_topic_if_needed(project, topic);
  40. }
  41. google::pubsub::v1::Subscription request;
  42. request.set_name(_subscription.FullName());
  43. request.set_topic(pubsub::Topic(project, topic).FullName());
  44. request.set_filter("(attributes.controller_id=\"" + _controller_id + "\")");
  45. auto sub = _adminClient.CreateSubscription(request);
  46. if (! sub.ok()) {
  47. fprintf(stderr, "Failed to create subscription: %s\n", sub.status().message().c_str());
  48. throw std::runtime_error("Failed to create subscription");
  49. }
  50. if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
  51. fprintf(stderr, "Subscription already exists\n");
  52. throw std::runtime_error("Subscription already exists");
  53. }
  54. _subscriber = std::make_shared<pubsub::Subscriber>(pubsub::MakeSubscriberConnection(_subscription));
  55. _run = true;
  56. _subscriberThread = std::thread(&PubSubListener::subscribe, this);
  57. }
  58. PubSubListener::~PubSubListener()
  59. {
  60. _run = false;
  61. if (_subscriberThread.joinable()) {
  62. _subscriberThread.join();
  63. }
  64. }
  65. void PubSubListener::subscribe()
  66. {
  67. while (_run) {
  68. try {
  69. fprintf(stderr, "Starting new subscription session\n");
  70. auto session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) {
  71. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  72. auto tracer = provider->GetTracer("PubSubListener");
  73. auto span = tracer->StartSpan("PubSubListener::onMessage");
  74. auto scope = tracer->WithActiveSpan(span);
  75. span->SetAttribute("message_id", m.message_id());
  76. span->SetAttribute("ordering_key", m.ordering_key());
  77. span->SetAttribute("attributes", m.attributes().size());
  78. fprintf(stderr, "Received message %s\n", m.message_id().c_str());
  79. onNotification(m.data());
  80. std::move(h).ack();
  81. span->SetStatus(opentelemetry::trace::StatusCode::kOk);
  82. return true;
  83. });
  84. auto result = session.wait_for(std::chrono::seconds(10));
  85. if (result == std::future_status::timeout) {
  86. session.cancel();
  87. std::this_thread::sleep_for(std::chrono::seconds(5));
  88. continue;
  89. }
  90. if (! session.valid()) {
  91. fprintf(stderr, "Subscription session no longer valid\n");
  92. std::this_thread::sleep_for(std::chrono::seconds(5));
  93. continue;
  94. }
  95. }
  96. catch (google::cloud::Status const& status) {
  97. fprintf(stderr, "Subscription terminated with status: %s\n", status.message().c_str());
  98. std::this_thread::sleep_for(std::chrono::seconds(5));
  99. }
  100. }
  101. }
  102. PubSubNetworkListener::PubSubNetworkListener(std::string controller_id, std::string project, DB* db)
  103. : PubSubListener(controller_id, project, "controller-network-change-stream")
  104. , _db(db)
  105. {
  106. }
  107. PubSubNetworkListener::~PubSubNetworkListener()
  108. {
  109. }
  110. void PubSubNetworkListener::onNotification(const std::string& payload)
  111. {
  112. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  113. auto tracer = provider->GetTracer("PubSubNetworkListener");
  114. auto span = tracer->StartSpan("PubSubNetworkListener::onNotification");
  115. auto scope = tracer->WithActiveSpan(span);
  116. pbmessages::NetworkChange nc;
  117. if (! nc.ParseFromString(payload)) {
  118. fprintf(stderr, "Failed to parse NetworkChange protobuf message\n");
  119. span->SetAttribute("error", "Failed to parse NetworkChange protobuf message");
  120. span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf");
  121. return;
  122. }
  123. fprintf(stderr, "Network notification received");
  124. try {
  125. nlohmann::json oldConfig, newConfig;
  126. if (nc.has_old()) {
  127. oldConfig = toJson(nc.old(), nc.change_source());
  128. }
  129. if (nc.has_new_()) {
  130. newConfig = toJson(nc.new_(), nc.change_source());
  131. }
  132. if (oldConfig.is_object() && newConfig.is_object()) {
  133. // network modification
  134. std::string nwid = oldConfig["id"].get<std::string>();
  135. span->SetAttribute("action", "network_change");
  136. span->SetAttribute("network_id", nwid);
  137. _db->save(newConfig, _db->isReady());
  138. }
  139. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  140. // new network
  141. std::string nwid = newConfig["id"];
  142. span->SetAttribute("network_id", nwid);
  143. span->SetAttribute("action", "new_network");
  144. _db->save(newConfig, _db->isReady());
  145. }
  146. else if (! newConfig.is_object() && oldConfig.is_object()) {
  147. // network deletion
  148. std::string nwid = oldConfig["id"];
  149. span->SetAttribute("action", "delete_network");
  150. span->SetAttribute("network_id", nwid);
  151. uint64_t networkId = Utils::hexStrToU64(nwid.c_str());
  152. if (networkId) {
  153. _db->eraseNetwork(networkId);
  154. }
  155. }
  156. }
  157. catch (const nlohmann::json::parse_error& e) {
  158. fprintf(stderr, "JSON parse error: %s\n", e.what());
  159. span->SetAttribute("error", e.what());
  160. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  161. return;
  162. }
  163. catch (const std::exception& e) {
  164. fprintf(stderr, "Exception in PubSubNetworkListener: %s\n", e.what());
  165. span->SetAttribute("error", e.what());
  166. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  167. return;
  168. }
  169. }
  170. PubSubMemberListener::PubSubMemberListener(std::string controller_id, std::string project, DB* db)
  171. : PubSubListener(controller_id, project, "controller-member-change-stream")
  172. , _db(db)
  173. {
  174. }
  175. PubSubMemberListener::~PubSubMemberListener()
  176. {
  177. }
  178. void PubSubMemberListener::onNotification(const std::string& payload)
  179. {
  180. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  181. auto tracer = provider->GetTracer("PubSubMemberListener");
  182. auto span = tracer->StartSpan("PubSubMemberListener::onNotification");
  183. auto scope = tracer->WithActiveSpan(span);
  184. pbmessages::MemberChange mc;
  185. if (! mc.ParseFromString(payload)) {
  186. fprintf(stderr, "Failed to parse MemberChange protobuf message\n");
  187. span->SetAttribute("error", "Failed to parse MemberChange protobuf message");
  188. span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf");
  189. return;
  190. }
  191. fprintf(stderr, "Member notification received");
  192. try {
  193. nlohmann::json tmp;
  194. nlohmann::json oldConfig, newConfig;
  195. if (mc.has_old()) {
  196. oldConfig = toJson(mc.old(), mc.change_source());
  197. }
  198. if (mc.has_new_()) {
  199. newConfig = toJson(mc.new_(), mc.change_source());
  200. }
  201. if (oldConfig.is_object() && newConfig.is_object()) {
  202. // member modification
  203. std::string memberID = oldConfig["id"].get<std::string>();
  204. std::string networkID = oldConfig["nwid"].get<std::string>();
  205. span->SetAttribute("action", "member_change");
  206. span->SetAttribute("member_id", memberID);
  207. span->SetAttribute("network_id", networkID);
  208. _db->save(newConfig, _db->isReady());
  209. }
  210. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  211. // new member
  212. std::string memberID = newConfig["id"].get<std::string>();
  213. std::string networkID = newConfig["nwid"].get<std::string>();
  214. span->SetAttribute("action", "new_member");
  215. span->SetAttribute("member_id", memberID);
  216. span->SetAttribute("network_id", networkID);
  217. _db->save(newConfig, _db->isReady());
  218. }
  219. else if (! newConfig.is_object() && oldConfig.is_object()) {
  220. // member deletion
  221. std::string memberID = oldConfig["id"].get<std::string>();
  222. std::string networkID = oldConfig["nwid"].get<std::string>();
  223. span->SetAttribute("action", "delete_member");
  224. span->SetAttribute("member_id", memberID);
  225. span->SetAttribute("network_id", networkID);
  226. uint64_t networkId = Utils::hexStrToU64(networkID.c_str());
  227. uint64_t memberId = Utils::hexStrToU64(memberID.c_str());
  228. if (networkId && memberId) {
  229. _db->eraseMember(networkId, memberId);
  230. }
  231. }
  232. }
  233. catch (const nlohmann::json::parse_error& e) {
  234. fprintf(stderr, "JSON parse error: %s\n", e.what());
  235. span->SetAttribute("error", e.what());
  236. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  237. return;
  238. }
  239. catch (const std::exception& e) {
  240. fprintf(stderr, "Exception in PubSubMemberListener: %s\n", e.what());
  241. span->SetAttribute("error", e.what());
  242. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  243. return;
  244. }
  245. }
  246. nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc, pbmessages::NetworkChange_ChangeSource source)
  247. {
  248. nlohmann::json out;
  249. out["id"] = nc.network_id();
  250. out["name"] = nc.name();
  251. out["capabilities"] = OSUtils::jsonParse(nc.capabilities());
  252. out["mtu"] = nc.mtu();
  253. out["multicastLimit"] = nc.multicast_limit();
  254. out["private"] = nc.is_private();
  255. out["remoteTraceLevel"] = nc.remote_trace_level();
  256. if (nc.has_remote_trace_target()) {
  257. out["remoteTraceTarget"] = nc.remote_trace_target();
  258. }
  259. else {
  260. out["remoteTraceTarget"] = "";
  261. }
  262. out["rules"] = OSUtils::jsonParse(nc.rules());
  263. out["rulesSource"] = nc.rules_source();
  264. out["tags"] = OSUtils::jsonParse(nc.tags());
  265. if (nc.has_ipv4_assign_mode()) {
  266. nlohmann::json ipv4mode;
  267. ipv4mode["zt"] = nc.ipv4_assign_mode().zt();
  268. out["ipv4AssignMode"] = ipv4mode;
  269. }
  270. if (nc.has_ipv6_assign_mode()) {
  271. nlohmann::json ipv6mode;
  272. ipv6mode["6plane"] = nc.ipv6_assign_mode().six_plane();
  273. ipv6mode["rfc4193"] = nc.ipv6_assign_mode().rfc4193();
  274. ipv6mode["zt"] = nc.ipv6_assign_mode().zt();
  275. out["ipv6AssignMode"] = ipv6mode;
  276. }
  277. if (nc.assignment_pools_size() > 0) {
  278. nlohmann::json pools = nlohmann::json::array();
  279. for (const auto& p : nc.assignment_pools()) {
  280. nlohmann::json pool;
  281. pool["ipRangeStart"] = p.start_ip();
  282. pool["ipRangeEnd"] = p.end_ip();
  283. pools.push_back(pool);
  284. }
  285. out["assignmentPools"] = pools;
  286. }
  287. if (nc.routes_size() > 0) {
  288. nlohmann::json routes = nlohmann::json::array();
  289. for (const auto& r : nc.routes()) {
  290. nlohmann::json route;
  291. route["target"] = r.target();
  292. if (r.has_via()) {
  293. route["via"] = r.via();
  294. }
  295. routes.push_back(route);
  296. }
  297. out["routes"] = routes;
  298. }
  299. if (nc.has_dns()) {
  300. nlohmann::json dns;
  301. if (nc.dns().nameservers_size() > 0) {
  302. nlohmann::json servers = nlohmann::json::array();
  303. for (const auto& s : nc.dns().nameservers()) {
  304. servers.push_back(s);
  305. }
  306. dns["servers"] = servers;
  307. }
  308. dns["domain"] = nc.dns().domain();
  309. out["dns"] = dns;
  310. }
  311. out["ssoEnabled"] = nc.sso_enabled();
  312. nlohmann::json sso;
  313. if (nc.sso_enabled()) {
  314. sso = nlohmann::json::object();
  315. if (nc.has_sso_client_id()) {
  316. sso["ssoClientId"] = nc.sso_client_id();
  317. }
  318. if (nc.has_sso_authorization_endpoint()) {
  319. sso["ssoAuthorizationEndpoint"] = nc.sso_authorization_endpoint();
  320. }
  321. if (nc.has_sso_issuer()) {
  322. sso["ssoIssuer"] = nc.sso_issuer();
  323. }
  324. if (nc.has_sso_provider()) {
  325. sso["ssoProvider"] = nc.sso_provider();
  326. }
  327. }
  328. out["ssoConfig"] = sso;
  329. switch (source) {
  330. case pbmessages::NetworkChange_ChangeSource_CV1:
  331. out["change_source"] = "cv1";
  332. break;
  333. case pbmessages::NetworkChange_ChangeSource_CV2:
  334. out["change_source"] = "cv2";
  335. break;
  336. case pbmessages::NetworkChange_ChangeSource_CONTROLLER:
  337. out["change_source"] = "controller";
  338. break;
  339. default:
  340. out["change_source"] = "unknown";
  341. break;
  342. }
  343. return out;
  344. }
  345. nlohmann::json toJson(const pbmessages::MemberChange_Member& mc, pbmessages::MemberChange_ChangeSource source)
  346. {
  347. nlohmann::json out;
  348. out["id"] = mc.device_id();
  349. out["nwid"] = mc.network_id();
  350. if (mc.has_remote_trace_target()) {
  351. out["remoteTraceTarget"] = mc.remote_trace_target();
  352. }
  353. else {
  354. out["remoteTraceTarget"] = "";
  355. }
  356. out["authorized"] = mc.authorized();
  357. out["activeBridge"] = mc.active_bridge();
  358. auto ipAssignments = mc.ip_assignments();
  359. if (ipAssignments.size() > 0) {
  360. nlohmann::json assignments = nlohmann::json::array();
  361. for (const auto& ip : ipAssignments) {
  362. assignments.push_back(ip);
  363. }
  364. out["ipAssignments"] = assignments;
  365. }
  366. out["noAutoAssignIps"] = mc.no_auto_assign_ips();
  367. out["ssoExempt"] = mc.sso_exepmt();
  368. out["authenticationExpiryTime"] = mc.auth_expiry_time();
  369. out["capabilities"] = OSUtils::jsonParse(mc.capabilities());
  370. out["creationTime"] = mc.creation_time();
  371. out["identity"] = mc.identity();
  372. out["lastAuthorizedTime"] = mc.last_authorized_time();
  373. out["lastDeauthorizedTime"] = mc.last_deauthorized_time();
  374. out["remoteTraceLevel"] = mc.remote_trace_level();
  375. out["revision"] = mc.revision();
  376. out["tags"] = OSUtils::jsonParse(mc.tags());
  377. out["versionMajor"] = mc.version_major();
  378. out["versionMinor"] = mc.version_minor();
  379. out["versionRev"] = mc.version_rev();
  380. out["versionProtocol"] = mc.version_protocol();
  381. switch (source) {
  382. case pbmessages::MemberChange_ChangeSource_CV1:
  383. out["change_source"] = "cv1";
  384. break;
  385. case pbmessages::MemberChange_ChangeSource_CV2:
  386. out["change_source"] = "cv2";
  387. break;
  388. case pbmessages::MemberChange_ChangeSource_CONTROLLER:
  389. out["change_source"] = "controller";
  390. break;
  391. default:
  392. out["change_source"] = "unknown";
  393. break;
  394. }
  395. return out;
  396. }
  397. } // namespace ZeroTier
  398. #endif // ZT_CONTROLLER_USE_LIBPQ