2
0

PubSubListener.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. #ifdef ZT_CONTROLLER_USE_LIBPQ
  2. #include "PubSubListener.hpp"
  3. #include "ControllerConfig.hpp"
  4. #include "DB.hpp"
  5. #include "member.pb.h"
  6. #include "network.pb.h"
  7. #include "opentelemetry/trace/provider.h"
  8. #include "rustybits.h"
  9. #include <google/cloud/pubsub/admin/subscription_admin_client.h>
  10. #include <google/cloud/pubsub/admin/subscription_admin_connection.h>
  11. #include <google/cloud/pubsub/admin/topic_admin_client.h>
  12. #include <google/cloud/pubsub/message.h>
  13. #include <google/cloud/pubsub/subscriber.h>
  14. #include <google/cloud/pubsub/subscription.h>
  15. #include <google/cloud/pubsub/topic.h>
  16. #include <nlohmann/json.hpp>
  17. namespace pubsub = ::google::cloud::pubsub;
  18. namespace pubsub_admin = ::google::cloud::pubsub_admin;
  19. namespace ZeroTier {
  20. nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc);
  21. nlohmann::json toJson(const pbmessages::MemberChange_Member& mc);
  22. PubSubListener::PubSubListener(std::string controller_id, std::string project, std::string topic)
  23. : _controller_id(controller_id)
  24. , _project(project)
  25. , _topic(topic)
  26. , _subscription_id("sub-" + controller_id + "-network-changes")
  27. , _run(false)
  28. , _adminClient(pubsub_admin::MakeSubscriptionAdminConnection())
  29. , _subscription(pubsub::Subscription(_project, _subscription_id))
  30. {
  31. GOOGLE_PROTOBUF_VERIFY_VERSION;
  32. // Create Topic if it doesn't exist
  33. // this is only really needed for testing with the emulator
  34. // in production the topic should be created via terraform or gcloud
  35. // before starting the controller
  36. auto topicAdminClient = pubsub_admin::TopicAdminClient(pubsub_admin::MakeTopicAdminConnection());
  37. auto topicName = pubsub::Topic(project, topic).FullName();
  38. auto topicResult = topicAdminClient.GetTopic(topicName);
  39. if (! topicResult.ok()) {
  40. // Only create if not found
  41. if (topicResult.status().code() == google::cloud::StatusCode::kNotFound) {
  42. auto createResult = topicAdminClient.CreateTopic(topicName);
  43. if (! createResult.ok()) {
  44. fprintf(stderr, "Failed to create topic: %s\n", createResult.status().message().c_str());
  45. throw std::runtime_error("Failed to create topic");
  46. }
  47. fprintf(stderr, "Created topic: %s\n", topicName.c_str());
  48. }
  49. else {
  50. fprintf(stderr, "Failed to get topic: %s\n", topicResult.status().message().c_str());
  51. throw std::runtime_error("Failed to get topic");
  52. }
  53. }
  54. google::pubsub::v1::Subscription request;
  55. request.set_name(_subscription.FullName());
  56. request.set_topic(pubsub::Topic(project, topic).FullName());
  57. request.set_filter("(attributes.controller_id=\"" + _controller_id + "\")");
  58. auto sub = _adminClient.CreateSubscription(request);
  59. if (! sub.ok()) {
  60. fprintf(stderr, "Failed to create subscription: %s\n", sub.status().message().c_str());
  61. throw std::runtime_error("Failed to create subscription");
  62. }
  63. if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
  64. fprintf(stderr, "Subscription already exists\n");
  65. throw std::runtime_error("Subscription already exists");
  66. }
  67. _subscriber = std::make_shared<pubsub::Subscriber>(pubsub::MakeSubscriberConnection(_subscription));
  68. _run = true;
  69. _subscriberThread = std::thread(&PubSubListener::subscribe, this);
  70. }
  71. PubSubListener::~PubSubListener()
  72. {
  73. _run = false;
  74. if (_subscriberThread.joinable()) {
  75. _subscriberThread.join();
  76. }
  77. }
  78. void PubSubListener::subscribe()
  79. {
  80. while (_run) {
  81. try {
  82. fprintf(stderr, "Starting new subscription session\n");
  83. auto session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) {
  84. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  85. auto tracer = provider->GetTracer("PubSubListener");
  86. auto span = tracer->StartSpan("PubSubListener::onMessage");
  87. auto scope = tracer->WithActiveSpan(span);
  88. span->SetAttribute("message_id", m.message_id());
  89. span->SetAttribute("ordering_key", m.ordering_key());
  90. span->SetAttribute("attributes", m.attributes().size());
  91. fprintf(stderr, "Received message %s\n", m.message_id().c_str());
  92. onNotification(m.data());
  93. std::move(h).ack();
  94. span->SetStatus(opentelemetry::trace::StatusCode::kOk);
  95. return true;
  96. });
  97. auto result = session.wait_for(std::chrono::seconds(10));
  98. if (result == std::future_status::timeout) {
  99. session.cancel();
  100. std::this_thread::sleep_for(std::chrono::seconds(5));
  101. continue;
  102. }
  103. if (! session.valid()) {
  104. fprintf(stderr, "Subscription session no longer valid\n");
  105. std::this_thread::sleep_for(std::chrono::seconds(5));
  106. continue;
  107. }
  108. }
  109. catch (google::cloud::Status const& status) {
  110. fprintf(stderr, "Subscription terminated with status: %s\n", status.message().c_str());
  111. std::this_thread::sleep_for(std::chrono::seconds(5));
  112. }
  113. }
  114. }
  115. PubSubNetworkListener::PubSubNetworkListener(std::string controller_id, std::string project, DB* db)
  116. : PubSubListener(controller_id, project, "controller-network-change-stream")
  117. , _db(db)
  118. {
  119. }
  120. PubSubNetworkListener::~PubSubNetworkListener()
  121. {
  122. }
  123. void PubSubNetworkListener::onNotification(const std::string& payload)
  124. {
  125. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  126. auto tracer = provider->GetTracer("PubSubNetworkListener");
  127. auto span = tracer->StartSpan("PubSubNetworkListener::onNotification");
  128. auto scope = tracer->WithActiveSpan(span);
  129. pbmessages::NetworkChange nc;
  130. if (! nc.ParseFromString(payload)) {
  131. fprintf(stderr, "Failed to parse NetworkChange protobuf message\n");
  132. span->SetAttribute("error", "Failed to parse NetworkChange protobuf message");
  133. span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf");
  134. return;
  135. }
  136. fprintf(stderr, "Network notification received");
  137. try {
  138. nlohmann::json oldConfig, newConfig;
  139. if (nc.has_old()) {
  140. oldConfig = toJson(nc.old());
  141. }
  142. if (nc.has_new_()) {
  143. newConfig = toJson(nc.new_());
  144. }
  145. if (oldConfig.is_object() && newConfig.is_object()) {
  146. // network modification
  147. std::string nwid = oldConfig["id"].get<std::string>();
  148. span->SetAttribute("action", "network_change");
  149. span->SetAttribute("network_id", nwid);
  150. _db->save(newConfig, _db->isReady());
  151. }
  152. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  153. // new network
  154. std::string nwid = newConfig["id"];
  155. span->SetAttribute("network_id", nwid);
  156. span->SetAttribute("action", "new_network");
  157. _db->save(newConfig, _db->isReady());
  158. }
  159. else if (! newConfig.is_object() && oldConfig.is_object()) {
  160. // network deletion
  161. std::string nwid = oldConfig["id"];
  162. span->SetAttribute("action", "delete_network");
  163. span->SetAttribute("network_id", nwid);
  164. uint64_t networkId = Utils::hexStrToU64(nwid.c_str());
  165. if (networkId) {
  166. _db->eraseNetwork(networkId);
  167. }
  168. }
  169. }
  170. catch (const nlohmann::json::parse_error& e) {
  171. fprintf(stderr, "JSON parse error: %s\n", e.what());
  172. span->SetAttribute("error", e.what());
  173. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  174. return;
  175. }
  176. catch (const std::exception& e) {
  177. fprintf(stderr, "Exception in PubSubNetworkListener: %s\n", e.what());
  178. span->SetAttribute("error", e.what());
  179. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  180. return;
  181. }
  182. }
  183. PubSubMemberListener::PubSubMemberListener(std::string controller_id, std::string project, DB* db)
  184. : PubSubListener(controller_id, project, "controller-member-change-stream")
  185. , _db(db)
  186. {
  187. }
  188. PubSubMemberListener::~PubSubMemberListener()
  189. {
  190. }
  191. void PubSubMemberListener::onNotification(const std::string& payload)
  192. {
  193. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  194. auto tracer = provider->GetTracer("PubSubMemberListener");
  195. auto span = tracer->StartSpan("PubSubMemberListener::onNotification");
  196. auto scope = tracer->WithActiveSpan(span);
  197. pbmessages::MemberChange mc;
  198. if (! mc.ParseFromString(payload)) {
  199. fprintf(stderr, "Failed to parse MemberChange protobuf message\n");
  200. span->SetAttribute("error", "Failed to parse MemberChange protobuf message");
  201. span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf");
  202. return;
  203. }
  204. fprintf(stderr, "Member notification received");
  205. try {
  206. nlohmann::json tmp;
  207. nlohmann::json oldConfig, newConfig;
  208. if (mc.has_old()) {
  209. oldConfig = toJson(mc.old());
  210. }
  211. if (mc.has_new_()) {
  212. newConfig = toJson(mc.new_());
  213. }
  214. if (oldConfig.is_object() && newConfig.is_object()) {
  215. // member modification
  216. std::string memberID = oldConfig["id"].get<std::string>();
  217. std::string networkID = oldConfig["nwid"].get<std::string>();
  218. span->SetAttribute("action", "member_change");
  219. span->SetAttribute("member_id", memberID);
  220. span->SetAttribute("network_id", networkID);
  221. _db->save(newConfig, _db->isReady());
  222. }
  223. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  224. // new member
  225. std::string memberID = newConfig["id"].get<std::string>();
  226. std::string networkID = newConfig["nwid"].get<std::string>();
  227. span->SetAttribute("action", "new_member");
  228. span->SetAttribute("member_id", memberID);
  229. span->SetAttribute("network_id", networkID);
  230. _db->save(newConfig, _db->isReady());
  231. }
  232. else if (! newConfig.is_object() && oldConfig.is_object()) {
  233. // member deletion
  234. std::string memberID = oldConfig["id"].get<std::string>();
  235. std::string networkID = oldConfig["nwid"].get<std::string>();
  236. span->SetAttribute("action", "delete_member");
  237. span->SetAttribute("member_id", memberID);
  238. span->SetAttribute("network_id", networkID);
  239. uint64_t networkId = Utils::hexStrToU64(networkID.c_str());
  240. uint64_t memberId = Utils::hexStrToU64(memberID.c_str());
  241. if (networkId && memberId) {
  242. _db->eraseMember(networkId, memberId);
  243. }
  244. }
  245. }
  246. catch (const nlohmann::json::parse_error& e) {
  247. fprintf(stderr, "JSON parse error: %s\n", e.what());
  248. span->SetAttribute("error", e.what());
  249. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  250. return;
  251. }
  252. catch (const std::exception& e) {
  253. fprintf(stderr, "Exception in PubSubMemberListener: %s\n", e.what());
  254. span->SetAttribute("error", e.what());
  255. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  256. return;
  257. }
  258. }
  259. nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc)
  260. {
  261. nlohmann::json out;
  262. out["id"] = nc.network_id();
  263. out["name"] = nc.name();
  264. out["capabilities"] = OSUtils::jsonParse(nc.capabilities());
  265. out["mtu"] = nc.mtu();
  266. out["multicastLimit"] = nc.multicast_limit();
  267. out["private"] = nc.is_private();
  268. out["remoteTraceLevel"] = nc.remote_trace_level();
  269. if (nc.has_remote_trace_target()) {
  270. out["remoteTraceTarget"] = nc.remote_trace_target();
  271. }
  272. else {
  273. out["remoteTraceTarget"] = "";
  274. }
  275. out["rules"] = OSUtils::jsonParse(nc.rules());
  276. out["rulesSource"] = nc.rules_source();
  277. out["tags"] = OSUtils::jsonParse(nc.tags());
  278. if (nc.has_ipv4_assign_mode()) {
  279. nlohmann::json ipv4mode;
  280. ipv4mode["zt"] = nc.ipv4_assign_mode().zt();
  281. out["ipv4AssignMode"] = ipv4mode;
  282. }
  283. if (nc.has_ipv6_assign_mode()) {
  284. nlohmann::json ipv6mode;
  285. ipv6mode["6plane"] = nc.ipv6_assign_mode().six_plane();
  286. ipv6mode["rfc4193"] = nc.ipv6_assign_mode().rfc4193();
  287. ipv6mode["zt"] = nc.ipv6_assign_mode().zt();
  288. out["ipv6AssignMode"] = ipv6mode;
  289. }
  290. if (nc.assignment_pools_size() > 0) {
  291. nlohmann::json pools = nlohmann::json::array();
  292. for (const auto& p : nc.assignment_pools()) {
  293. nlohmann::json pool;
  294. pool["ipRangeStart"] = p.start_ip();
  295. pool["ipRangeEnd"] = p.end_ip();
  296. pools.push_back(pool);
  297. }
  298. out["assignmentPools"] = pools;
  299. }
  300. if (nc.routes_size() > 0) {
  301. nlohmann::json routes = nlohmann::json::array();
  302. for (const auto& r : nc.routes()) {
  303. nlohmann::json route;
  304. route["target"] = r.target();
  305. if (r.has_via()) {
  306. route["via"] = r.via();
  307. }
  308. routes.push_back(route);
  309. }
  310. out["routes"] = routes;
  311. }
  312. if (nc.has_dns()) {
  313. nlohmann::json dns;
  314. if (nc.dns().nameservers_size() > 0) {
  315. nlohmann::json servers = nlohmann::json::array();
  316. for (const auto& s : nc.dns().nameservers()) {
  317. servers.push_back(s);
  318. }
  319. dns["servers"] = servers;
  320. }
  321. dns["domain"] = nc.dns().domain();
  322. out["dns"] = dns;
  323. }
  324. out["ssoEnabled"] = nc.sso_enabled();
  325. nlohmann::json sso;
  326. if (nc.sso_enabled()) {
  327. sso = nlohmann::json::object();
  328. if (nc.has_sso_client_id()) {
  329. sso["ssoClientId"] = nc.sso_client_id();
  330. }
  331. if (nc.has_sso_authorization_endpoint()) {
  332. sso["ssoAuthorizationEndpoint"] = nc.sso_authorization_endpoint();
  333. }
  334. if (nc.has_sso_issuer()) {
  335. sso["ssoIssuer"] = nc.sso_issuer();
  336. }
  337. if (nc.has_sso_provider()) {
  338. sso["ssoProvider"] = nc.sso_provider();
  339. }
  340. }
  341. out["ssoConfig"] = sso;
  342. return out;
  343. }
  344. nlohmann::json toJson(const pbmessages::MemberChange_Member& mc)
  345. {
  346. nlohmann::json out;
  347. out["id"] = mc.device_id();
  348. out["nwid"] = mc.network_id();
  349. if (mc.has_remote_trace_target()) {
  350. out["remoteTraceTarget"] = mc.remote_trace_target();
  351. }
  352. else {
  353. out["remoteTraceTarget"] = "";
  354. }
  355. out["authorized"] = mc.authorized();
  356. out["activeBridge"] = mc.active_bridge();
  357. auto ipAssignments = mc.ip_assignments();
  358. if (ipAssignments.size() > 0) {
  359. nlohmann::json assignments = nlohmann::json::array();
  360. for (const auto& ip : ipAssignments) {
  361. assignments.push_back(ip);
  362. }
  363. out["ipAssignments"] = assignments;
  364. }
  365. out["noAutoAssignIps"] = mc.no_auto_assign_ips();
  366. out["ssoExempt"] = mc.sso_exepmt();
  367. out["authenticationExpiryTime"] = mc.auth_expiry_time();
  368. out["capabilities"] = OSUtils::jsonParse(mc.capabilities());
  369. out["creationTime"] = mc.creation_time();
  370. out["identity"] = mc.identity();
  371. out["lastAuthorizedTime"] = mc.last_authorized_time();
  372. out["lastDeauthorizedTime"] = mc.last_deauthorized_time();
  373. out["remoteTraceLevel"] = mc.remote_trace_level();
  374. out["revision"] = mc.revision();
  375. out["tags"] = OSUtils::jsonParse(mc.tags());
  376. out["versionMajor"] = mc.version_major();
  377. out["versionMinor"] = mc.version_minor();
  378. out["versionRev"] = mc.version_rev();
  379. out["versionProtocol"] = mc.version_protocol();
  380. return out;
  381. }
  382. } // namespace ZeroTier
  383. #endif // ZT_CONTROLLER_USE_LIBPQ