PubSubListener.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. #ifdef ZT_CONTROLLER_USE_LIBPQ
  2. #include "PubSubListener.hpp"
  3. #include "DB.hpp"
  4. #include "member.pb.h"
  5. #include "network.pb.h"
  6. #include "opentelemetry/trace/provider.h"
  7. #include "rustybits.h"
  8. #include <google/cloud/pubsub/admin/subscription_admin_client.h>
  9. #include <google/cloud/pubsub/admin/subscription_admin_connection.h>
  10. #include <google/cloud/pubsub/message.h>
  11. #include <google/cloud/pubsub/subscriber.h>
  12. #include <google/cloud/pubsub/subscription.h>
  13. #include <google/cloud/pubsub/topic.h>
  14. #include <nlohmann/json.hpp>
  15. namespace pubsub = ::google::cloud::pubsub;
  16. namespace pubsub_admin = ::google::cloud::pubsub_admin;
  17. namespace ZeroTier {
  18. nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc);
  19. nlohmann::json toJson(const pbmessages::MemberChange_Member& mc);
  20. PubSubListener::PubSubListener(std::string controller_id, std::string project, std::string topic)
  21. : _controller_id(controller_id)
  22. , _project(project)
  23. , _topic(topic)
  24. , _subscription_id("sub-" + controller_id + "-network-changes")
  25. , _run(false)
  26. , _adminClient(pubsub_admin::MakeSubscriptionAdminConnection())
  27. , _subscription(pubsub::Subscription(_project, _subscription_id))
  28. {
  29. GOOGLE_PROTOBUF_VERIFY_VERSION;
  30. google::pubsub::v1::Subscription request;
  31. request.set_name(_subscription.FullName());
  32. request.set_topic(pubsub::Topic(project, topic).FullName());
  33. request.set_filter("(attributes.controller_id=\"" + _controller_id + "\")");
  34. auto sub = _adminClient.CreateSubscription(request);
  35. if (! sub.ok()) {
  36. fprintf(stderr, "Failed to create subscription: %s\n", sub.status().message().c_str());
  37. throw std::runtime_error("Failed to create subscription");
  38. }
  39. if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
  40. fprintf(stderr, "Subscription already exists\n");
  41. throw std::runtime_error("Subscription already exists");
  42. }
  43. _subscriber = std::make_shared<pubsub::Subscriber>(pubsub::MakeSubscriberConnection(_subscription));
  44. _run = true;
  45. _subscriberThread = std::thread(&PubSubListener::subscribe, this);
  46. }
  47. PubSubListener::~PubSubListener()
  48. {
  49. _run = false;
  50. _session.cancel();
  51. if (_subscriberThread.joinable()) {
  52. _subscriberThread.join();
  53. }
  54. }
  55. void PubSubListener::subscribe()
  56. {
  57. while (_run) {
  58. _session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) {
  59. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  60. auto tracer = provider->GetTracer("PubSubListener");
  61. auto span = tracer->StartSpan("PubSubListener::onMessage");
  62. auto scope = tracer->WithActiveSpan(span);
  63. span->SetAttribute("message_id", m.message_id());
  64. span->SetAttribute("ordering_key", m.ordering_key());
  65. span->SetAttribute("attributes", m.attributes().size());
  66. fprintf(stderr, "Received message %s\n", m.message_id().c_str());
  67. onNotification(m.data());
  68. std::move(h).ack();
  69. span->SetStatus(opentelemetry::trace::StatusCode::kOk);
  70. return true;
  71. });
  72. auto status = _session.get();
  73. if (! status.ok() && _run) {
  74. fprintf(stderr, "Error during Subscribe: %s\n", status.message().c_str());
  75. }
  76. }
  77. }
  78. PubSubNetworkListener::PubSubNetworkListener(std::string controller_id, std::string project, DB* db)
  79. : PubSubListener(controller_id, project, "controller-network-change-stream")
  80. , _db(db)
  81. {
  82. }
  83. PubSubNetworkListener::~PubSubNetworkListener()
  84. {
  85. }
  86. void PubSubNetworkListener::onNotification(const std::string& payload)
  87. {
  88. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  89. auto tracer = provider->GetTracer("PubSubNetworkListener");
  90. auto span = tracer->StartSpan("PubSubNetworkListener::onNotification");
  91. auto scope = tracer->WithActiveSpan(span);
  92. pbmessages::NetworkChange nc;
  93. if (! nc.ParseFromString(payload)) {
  94. fprintf(stderr, "Failed to parse NetworkChange protobuf message\n");
  95. span->SetAttribute("error", "Failed to parse NetworkChange protobuf message");
  96. span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf");
  97. return;
  98. }
  99. fprintf(stderr, "Network notification received");
  100. try {
  101. nlohmann::json oldConfig, newConfig;
  102. if (nc.has_old()) {
  103. oldConfig = toJson(nc.old());
  104. }
  105. if (nc.has_new_()) {
  106. newConfig = toJson(nc.new_());
  107. }
  108. if (oldConfig.is_object() && newConfig.is_object()) {
  109. // network modification
  110. std::string nwid = oldConfig["id"].get<std::string>();
  111. span->SetAttribute("action", "network_change");
  112. span->SetAttribute("network_id", nwid);
  113. _db->save(newConfig, _db->isReady());
  114. }
  115. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  116. // new network
  117. std::string nwid = newConfig["id"];
  118. span->SetAttribute("network_id", nwid);
  119. span->SetAttribute("action", "new_network");
  120. _db->save(newConfig, _db->isReady());
  121. }
  122. else if (! newConfig.is_object() && oldConfig.is_object()) {
  123. // network deletion
  124. std::string nwid = oldConfig["id"];
  125. span->SetAttribute("action", "delete_network");
  126. span->SetAttribute("network_id", nwid);
  127. uint64_t networkId = Utils::hexStrToU64(nwid.c_str());
  128. if (networkId) {
  129. _db->eraseNetwork(networkId);
  130. }
  131. }
  132. }
  133. catch (const nlohmann::json::parse_error& e) {
  134. fprintf(stderr, "JSON parse error: %s\n", e.what());
  135. span->SetAttribute("error", e.what());
  136. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  137. return;
  138. }
  139. catch (const std::exception& e) {
  140. fprintf(stderr, "Exception in PubSubNetworkListener: %s\n", e.what());
  141. span->SetAttribute("error", e.what());
  142. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  143. return;
  144. }
  145. }
  146. PubSubMemberListener::PubSubMemberListener(std::string controller_id, std::string project, DB* db)
  147. : PubSubListener(controller_id, project, "controller-member-change-stream")
  148. , _db(db)
  149. {
  150. }
  151. PubSubMemberListener::~PubSubMemberListener()
  152. {
  153. }
  154. void PubSubMemberListener::onNotification(const std::string& payload)
  155. {
  156. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  157. auto tracer = provider->GetTracer("PubSubMemberListener");
  158. auto span = tracer->StartSpan("PubSubMemberListener::onNotification");
  159. auto scope = tracer->WithActiveSpan(span);
  160. pbmessages::MemberChange mc;
  161. if (! mc.ParseFromString(payload)) {
  162. fprintf(stderr, "Failed to parse MemberChange protobuf message\n");
  163. span->SetAttribute("error", "Failed to parse MemberChange protobuf message");
  164. span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf");
  165. return;
  166. }
  167. fprintf(stderr, "Member notification received");
  168. try {
  169. nlohmann::json tmp;
  170. nlohmann::json oldConfig, newConfig;
  171. if (mc.has_old()) {
  172. oldConfig = toJson(mc.old());
  173. }
  174. if (mc.has_new_()) {
  175. newConfig = toJson(mc.new_());
  176. }
  177. if (oldConfig.is_object() && newConfig.is_object()) {
  178. // member modification
  179. std::string memberID = oldConfig["id"].get<std::string>();
  180. std::string networkID = oldConfig["nwid"].get<std::string>();
  181. span->SetAttribute("action", "member_change");
  182. span->SetAttribute("member_id", memberID);
  183. span->SetAttribute("network_id", networkID);
  184. _db->save(newConfig, _db->isReady());
  185. }
  186. else if (newConfig.is_object() && ! oldConfig.is_object()) {
  187. // new member
  188. std::string memberID = newConfig["id"].get<std::string>();
  189. std::string networkID = newConfig["nwid"].get<std::string>();
  190. span->SetAttribute("action", "new_member");
  191. span->SetAttribute("member_id", memberID);
  192. span->SetAttribute("network_id", networkID);
  193. _db->save(newConfig, _db->isReady());
  194. }
  195. else if (! newConfig.is_object() && oldConfig.is_object()) {
  196. // member deletion
  197. std::string memberID = oldConfig["id"].get<std::string>();
  198. std::string networkID = oldConfig["nwid"].get<std::string>();
  199. span->SetAttribute("action", "delete_member");
  200. span->SetAttribute("member_id", memberID);
  201. span->SetAttribute("network_id", networkID);
  202. uint64_t networkId = Utils::hexStrToU64(networkID.c_str());
  203. uint64_t memberId = Utils::hexStrToU64(memberID.c_str());
  204. if (networkId && memberId) {
  205. _db->eraseMember(networkId, memberId);
  206. }
  207. }
  208. }
  209. catch (const nlohmann::json::parse_error& e) {
  210. fprintf(stderr, "JSON parse error: %s\n", e.what());
  211. span->SetAttribute("error", e.what());
  212. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  213. return;
  214. }
  215. catch (const std::exception& e) {
  216. fprintf(stderr, "Exception in PubSubMemberListener: %s\n", e.what());
  217. span->SetAttribute("error", e.what());
  218. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  219. return;
  220. }
  221. }
  222. nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc)
  223. {
  224. nlohmann::json out;
  225. out["id"] = nc.network_id();
  226. out["name"] = nc.name();
  227. out["capabilities"] = OSUtils::jsonParse(nc.capabilities());
  228. out["mtu"] = nc.mtu();
  229. out["multicastLimit"] = nc.multicast_limit();
  230. out["private"] = nc.is_private();
  231. out["remoteTraceLevel"] = nc.remote_trace_level();
  232. if (nc.has_remote_trace_target()) {
  233. out["remoteTraceTarget"] = nc.remote_trace_target();
  234. }
  235. else {
  236. out["remoteTraceTarget"] = "";
  237. }
  238. out["rules"] = OSUtils::jsonParse(nc.rules());
  239. out["rulesSource"] = nc.rules_source();
  240. out["tags"] = OSUtils::jsonParse(nc.tags());
  241. if (nc.has_ipv4_assign_mode()) {
  242. nlohmann::json ipv4mode;
  243. ipv4mode["zt"] = nc.ipv4_assign_mode().zt();
  244. out["ipv4AssignMode"] = ipv4mode;
  245. }
  246. if (nc.has_ipv6_assign_mode()) {
  247. nlohmann::json ipv6mode;
  248. ipv6mode["6plane"] = nc.ipv6_assign_mode().six_plane();
  249. ipv6mode["rfc4193"] = nc.ipv6_assign_mode().rfc4193();
  250. ipv6mode["zt"] = nc.ipv6_assign_mode().zt();
  251. out["ipv6AssignMode"] = ipv6mode;
  252. }
  253. if (nc.assignment_pools_size() > 0) {
  254. nlohmann::json pools = nlohmann::json::array();
  255. for (const auto& p : nc.assignment_pools()) {
  256. nlohmann::json pool;
  257. pool["ipRangeStart"] = p.start_ip();
  258. pool["ipRangeEnd"] = p.end_ip();
  259. pools.push_back(pool);
  260. }
  261. out["assignmentPools"] = pools;
  262. }
  263. if (nc.routes_size() > 0) {
  264. nlohmann::json routes = nlohmann::json::array();
  265. for (const auto& r : nc.routes()) {
  266. nlohmann::json route;
  267. route["target"] = r.target();
  268. if (r.has_via()) {
  269. route["via"] = r.via();
  270. }
  271. routes.push_back(route);
  272. }
  273. out["routes"] = routes;
  274. }
  275. if (nc.has_dns()) {
  276. nlohmann::json dns;
  277. if (nc.dns().nameservers_size() > 0) {
  278. nlohmann::json servers = nlohmann::json::array();
  279. for (const auto& s : nc.dns().nameservers()) {
  280. servers.push_back(s);
  281. }
  282. dns["servers"] = servers;
  283. }
  284. dns["domain"] = nc.dns().domain();
  285. out["dns"] = dns;
  286. }
  287. out["ssoEnabled"] = nc.sso_enabled();
  288. nlohmann::json sso;
  289. if (nc.sso_enabled()) {
  290. sso = nlohmann::json::object();
  291. if (nc.has_sso_client_id()) {
  292. sso["ssoClientId"] = nc.sso_client_id();
  293. }
  294. if (nc.has_sso_authorization_endpoint()) {
  295. sso["ssoAuthorizationEndpoint"] = nc.sso_authorization_endpoint();
  296. }
  297. if (nc.has_sso_issuer()) {
  298. sso["ssoIssuer"] = nc.sso_issuer();
  299. }
  300. if (nc.has_sso_provider()) {
  301. sso["ssoProvider"] = nc.sso_provider();
  302. }
  303. }
  304. out["ssoConfig"] = sso;
  305. return out;
  306. }
  307. nlohmann::json toJson(const pbmessages::MemberChange_Member& mc)
  308. {
  309. nlohmann::json out;
  310. out["id"] = mc.device_id();
  311. out["nwid"] = mc.network_id();
  312. if (mc.has_remote_trace_target()) {
  313. out["remoteTraceTarget"] = mc.remote_trace_target();
  314. }
  315. else {
  316. out["remoteTraceTarget"] = "";
  317. }
  318. out["authorized"] = mc.authorized();
  319. out["activeBridge"] = mc.active_bridge();
  320. auto ipAssignments = mc.ip_assignments();
  321. if (ipAssignments.size() > 0) {
  322. nlohmann::json assignments = nlohmann::json::array();
  323. for (const auto& ip : ipAssignments) {
  324. assignments.push_back(ip);
  325. }
  326. out["ipAssignments"] = assignments;
  327. }
  328. out["noAutoAssignIps"] = mc.no_auto_assign_ips();
  329. out["ssoExempt"] = mc.sso_exepmt();
  330. out["authenticationExpiryTime"] = mc.auth_expiry_time();
  331. out["capabilities"] = OSUtils::jsonParse(mc.capabilities());
  332. out["creationTime"] = mc.creation_time();
  333. out["identity"] = mc.identity();
  334. out["lastAuthorizedTime"] = mc.last_authorized_time();
  335. out["lastDeauthorizedTime"] = mc.last_deauthorized_time();
  336. out["remoteTraceLevel"] = mc.remote_trace_level();
  337. out["revision"] = mc.revision();
  338. out["tags"] = OSUtils::jsonParse(mc.tags());
  339. out["versionMajor"] = mc.version_major();
  340. out["versionMinor"] = mc.version_minor();
  341. out["versionRev"] = mc.version_rev();
  342. out["versionProtocol"] = mc.version_protocol();
  343. return out;
  344. }
  345. } // namespace ZeroTier
  346. #endif // ZT_CONTROLLER_USE_LIBPQ