| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503 |
- #ifdef ZT_CONTROLLER_USE_LIBPQ
- #include "PubSubListener.hpp"
- #include "ControllerConfig.hpp"
- #include "CtlUtil.hpp"
- #include "DB.hpp"
- #include "member.pb.h"
- #include "network.pb.h"
- #include "opentelemetry/trace/provider.h"
- #include "rustybits.h"
- #include <google/cloud/pubsub/admin/subscription_admin_client.h>
- #include <google/cloud/pubsub/admin/subscription_admin_connection.h>
- #include <google/cloud/pubsub/admin/topic_admin_client.h>
- #include <google/cloud/pubsub/message.h>
- #include <google/cloud/pubsub/subscriber.h>
- #include <google/cloud/pubsub/subscription.h>
- #include <google/cloud/pubsub/topic.h>
- #include <nlohmann/json.hpp>
- namespace pubsub = ::google::cloud::pubsub;
- namespace pubsub_admin = ::google::cloud::pubsub_admin;
- namespace ZeroTier {
- nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc, pbmessages::NetworkChange_ChangeSource source);
- nlohmann::json toJson(const pbmessages::MemberChange_Member& mc, pbmessages::MemberChange_ChangeSource source);
- PubSubListener::PubSubListener(std::string controller_id, std::string project, std::string topic)
- : _controller_id(controller_id)
- , _project(project)
- , _topic(topic)
- , _subscription_id()
- , _run(false)
- , _adminClient(pubsub_admin::MakeSubscriptionAdminConnection())
- , _subscription(nullptr)
- {
- GOOGLE_PROTOBUF_VERIFY_VERSION;
- _subscription_id = "sub-" + controller_id + "-" + topic; // + "-" + random_hex_string(8);
- _subscription = new pubsub::Subscription(_project, _subscription_id);
- fprintf(
- stderr, "PubSubListener for controller %s project %s topic %s subscription %s\n", controller_id.c_str(),
- project.c_str(), topic.c_str(), _subscription_id.c_str());
- // If PUBSUB_EMULATOR_HOST is set, create the topic if it doesn't exist
- const char* emulatorHost = std::getenv("PUBSUB_EMULATOR_HOST");
- if (emulatorHost != nullptr) {
- create_gcp_pubsub_topic_if_needed(project, topic);
- create_gcp_pubsub_subscription_if_needed(_project, _subscription_id, _topic, _controller_id);
- }
- _subscriber = std::make_shared<pubsub::Subscriber>(pubsub::MakeSubscriberConnection(*_subscription));
- _run = true;
- _subscriberThread = std::thread(&PubSubListener::subscribe, this);
- }
- PubSubListener::~PubSubListener()
- {
- _run = false;
- if (_subscriberThread.joinable()) {
- _subscriberThread.join();
- }
- if (_subscription) {
- delete _subscription;
- _subscription = nullptr;
- }
- }
- void PubSubListener::subscribe()
- {
- while (_run) {
- try {
- fprintf(stderr, "Starting new subscription session\n");
- auto session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("PubSubListener");
- auto span = tracer->StartSpan("PubSubListener::onMessage");
- auto scope = tracer->WithActiveSpan(span);
- span->SetAttribute("message_id", m.message_id());
- span->SetAttribute("ordering_key", m.ordering_key());
- fprintf(stderr, "Received message %s\n", m.message_id().c_str());
- if (onNotification(m.data())) {
- std::move(h).ack();
- span->SetStatus(opentelemetry::trace::StatusCode::kOk);
- return true;
- }
- else {
- span->SetStatus(opentelemetry::trace::StatusCode::kError, "onNotification failed");
- return false;
- }
- });
- auto result = session.wait_for(std::chrono::seconds(10));
- if (result == std::future_status::timeout) {
- session.cancel();
- continue;
- }
- if (! session.valid()) {
- fprintf(stderr, "Subscription session no longer valid\n");
- continue;
- }
- }
- catch (google::cloud::Status const& status) {
- fprintf(stderr, "Subscription terminated with status: %s\n", status.message().c_str());
- }
- }
- }
- PubSubNetworkListener::PubSubNetworkListener(std::string controller_id, std::string project, std::string topic, DB* db)
- : PubSubListener(controller_id, project, topic)
- , _db(db)
- {
- }
- PubSubNetworkListener::~PubSubNetworkListener()
- {
- }
- bool PubSubNetworkListener::onNotification(const std::string& payload)
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("PubSubNetworkListener");
- auto span = tracer->StartSpan("PubSubNetworkListener::onNotification");
- auto scope = tracer->WithActiveSpan(span);
- pbmessages::NetworkChange nc;
- if (! nc.ParseFromString(payload)) {
- fprintf(stderr, "Failed to parse NetworkChange protobuf message\n");
- span->SetAttribute("error", "Failed to parse NetworkChange protobuf message");
- span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf");
- return false;
- }
- fprintf(stderr, "PubSubNetworkListener: parsed protobuf message. %s\n", nc.DebugString().c_str());
- fprintf(stderr, "Network notification received\n");
- try {
- nlohmann::json oldConfig, newConfig;
- if (nc.has_old()) {
- fprintf(stderr, "has old network config\n");
- oldConfig = toJson(nc.old(), nc.change_source());
- }
- if (nc.has_new_()) {
- fprintf(stderr, "has new network config\n");
- newConfig = toJson(nc.new_(), nc.change_source());
- }
- if (! nc.has_old() && ! nc.has_new_()) {
- fprintf(stderr, "NetworkChange message has no old or new network config\n");
- span->SetAttribute("error", "NetworkChange message has no old or new network config");
- span->SetStatus(opentelemetry::trace::StatusCode::kError, "No old or new config");
- return false;
- }
- if (oldConfig.is_object() && newConfig.is_object()) {
- // network modification
- std::string nwid = oldConfig["id"].get<std::string>();
- span->SetAttribute("action", "network_change");
- span->SetAttribute("network_id", nwid);
- _db->save(newConfig, _db->isReady());
- }
- else if (newConfig.is_object() && ! oldConfig.is_object()) {
- // new network
- std::string nwid = newConfig["id"];
- span->SetAttribute("network_id", nwid);
- span->SetAttribute("action", "new_network");
- _db->save(newConfig, _db->isReady());
- }
- else if (! newConfig.is_object() && oldConfig.is_object()) {
- // network deletion
- std::string nwid = oldConfig["id"];
- span->SetAttribute("action", "delete_network");
- span->SetAttribute("network_id", nwid);
- uint64_t networkId = Utils::hexStrToU64(nwid.c_str());
- if (networkId) {
- _db->eraseNetwork(networkId);
- }
- }
- }
- catch (const nlohmann::json::parse_error& e) {
- fprintf(stderr, "PubSubNetworkListener JSON parse error: %s\n", e.what());
- span->SetAttribute("error", e.what());
- span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
- fprintf(stderr, "payload: %s\n", payload.c_str());
- return false;
- }
- catch (const std::exception& e) {
- fprintf(stderr, "PubSubNetworkListener Exception in PubSubNetworkListener: %s\n", e.what());
- span->SetAttribute("error", e.what());
- span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
- return false;
- }
- catch (...) {
- fprintf(stderr, "PubSubNetworkListener Unknown exception in PubSubNetworkListener\n");
- span->SetAttribute("error", "Unknown exception in PubSubNetworkListener");
- span->SetStatus(opentelemetry::trace::StatusCode::kError, "Unknown exception");
- return false;
- }
- fprintf(stderr, "PubSubNetworkListener onNotification complete\n");
- return true;
- }
- PubSubMemberListener::PubSubMemberListener(std::string controller_id, std::string project, std::string topic, DB* db)
- : PubSubListener(controller_id, project, topic)
- , _db(db)
- {
- }
- PubSubMemberListener::~PubSubMemberListener()
- {
- }
- bool PubSubMemberListener::onNotification(const std::string& payload)
- {
- auto provider = opentelemetry::trace::Provider::GetTracerProvider();
- auto tracer = provider->GetTracer("PubSubMemberListener");
- auto span = tracer->StartSpan("PubSubMemberListener::onNotification");
- auto scope = tracer->WithActiveSpan(span);
- pbmessages::MemberChange mc;
- if (! mc.ParseFromString(payload)) {
- fprintf(stderr, "Failed to parse MemberChange protobuf message\n");
- span->SetAttribute("error", "Failed to parse MemberChange protobuf message");
- span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf");
- return false;
- }
- fprintf(stderr, "PubSubMemberListener: parsed protobuf message. %s\n", mc.DebugString().c_str());
- fprintf(stderr, "Member notification received");
- try {
- nlohmann::json tmp;
- nlohmann::json oldConfig, newConfig;
- if (mc.has_old()) {
- fprintf(stderr, "has old member config\n");
- oldConfig = toJson(mc.old(), mc.change_source());
- }
- if (mc.has_new_()) {
- fprintf(stderr, "has new member config\n");
- newConfig = toJson(mc.new_(), mc.change_source());
- }
- if (! mc.has_old() && ! mc.has_new_()) {
- fprintf(stderr, "MemberChange message has no old or new member config\n");
- span->SetAttribute("error", "MemberChange message has no old or new member config");
- span->SetStatus(opentelemetry::trace::StatusCode::kError, "No old or new config");
- return false;
- }
- if (oldConfig.is_object() && newConfig.is_object()) {
- // member modification
- std::string memberID = oldConfig["id"].get<std::string>();
- std::string networkID = oldConfig["nwid"].get<std::string>();
- span->SetAttribute("action", "member_change");
- span->SetAttribute("member_id", memberID);
- span->SetAttribute("network_id", networkID);
- _db->save(newConfig, _db->isReady());
- }
- else if (newConfig.is_object() && ! oldConfig.is_object()) {
- // new member
- std::string memberID = newConfig["id"].get<std::string>();
- std::string networkID = newConfig["nwid"].get<std::string>();
- span->SetAttribute("action", "new_member");
- span->SetAttribute("member_id", memberID);
- span->SetAttribute("network_id", networkID);
- _db->save(newConfig, _db->isReady());
- }
- else if (! newConfig.is_object() && oldConfig.is_object()) {
- // member deletion
- std::string memberID = oldConfig["id"].get<std::string>();
- std::string networkID = oldConfig["nwid"].get<std::string>();
- span->SetAttribute("action", "delete_member");
- span->SetAttribute("member_id", memberID);
- span->SetAttribute("network_id", networkID);
- uint64_t networkId = Utils::hexStrToU64(networkID.c_str());
- uint64_t memberId = Utils::hexStrToU64(memberID.c_str());
- if (networkId && memberId) {
- _db->eraseMember(networkId, memberId);
- }
- }
- }
- catch (const nlohmann::json::parse_error& e) {
- fprintf(stderr, "PubSubMemberListener JSON parse error: %s\n", e.what());
- span->SetAttribute("error", e.what());
- span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
- fprintf(stderr, "payload: %s\n", payload.c_str());
- return false;
- }
- catch (const std::exception& e) {
- fprintf(stderr, "PubSubMemberListener Exception in PubSubMemberListener: %s\n", e.what());
- span->SetAttribute("error", e.what());
- span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
- return false;
- }
- return true;
- }
- nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc, pbmessages::NetworkChange_ChangeSource source)
- {
- nlohmann::json out;
- out["objtype"] = "network";
- out["id"] = nc.network_id();
- out["name"] = nc.name();
- out["capabilities"] = OSUtils::jsonParse(nc.capabilities());
- out["mtu"] = nc.mtu();
- out["multicastLimit"] = nc.multicast_limit();
- out["private"] = nc.is_private();
- out["remoteTraceLevel"] = nc.remote_trace_level();
- if (nc.has_remote_trace_target()) {
- out["remoteTraceTarget"] = nc.remote_trace_target();
- }
- else {
- out["remoteTraceTarget"] = "";
- }
- out["rules"] = OSUtils::jsonParse(nc.rules());
- out["rulesSource"] = nc.rules_source();
- out["tags"] = OSUtils::jsonParse(nc.tags());
- if (nc.has_ipv4_assign_mode()) {
- nlohmann::json ipv4mode;
- ipv4mode["zt"] = nc.ipv4_assign_mode().zt();
- out["v4AssignMode"] = ipv4mode;
- }
- else {
- nlohmann::json ipv4mode = nlohmann::json::object();
- out["zt"] = false;
- out["v4AssignMode"] = ipv4mode;
- }
- if (nc.has_ipv6_assign_mode()) {
- nlohmann::json ipv6mode;
- ipv6mode["6plane"] = nc.ipv6_assign_mode().six_plane();
- ipv6mode["rfc4193"] = nc.ipv6_assign_mode().rfc4193();
- ipv6mode["zt"] = nc.ipv6_assign_mode().zt();
- out["v6AssignMode"] = ipv6mode;
- }
- else {
- nlohmann::json ipv6mode = nlohmann::json::object();
- ipv6mode["6plane"] = false;
- ipv6mode["rfc4193"] = false;
- ipv6mode["zt"] = false;
- out["v6AssignMode"] = ipv6mode;
- }
- if (nc.assignment_pools_size() > 0) {
- nlohmann::json pools = nlohmann::json::array();
- for (const auto& p : nc.assignment_pools()) {
- nlohmann::json pool;
- pool["ipRangeStart"] = p.start_ip();
- pool["ipRangeEnd"] = p.end_ip();
- pools.push_back(pool);
- }
- out["ipAssignmentPools"] = pools;
- }
- else {
- out["ipAssignmentPools"] = nlohmann::json::array();
- }
- if (nc.routes_size() > 0) {
- nlohmann::json routes = nlohmann::json::array();
- for (const auto& r : nc.routes()) {
- nlohmann::json route;
- std::string target = r.target();
- if (target.length() > 0) {
- route["target"] = r.target();
- if (r.has_via()) {
- route["via"] = r.via();
- }
- else {
- route["via"] = nullptr;
- }
- routes.push_back(route);
- }
- }
- out["routes"] = routes;
- }
- if (nc.has_dns()) {
- nlohmann::json dns;
- if (nc.dns().nameservers_size() > 0) {
- nlohmann::json servers = nlohmann::json::array();
- for (const auto& s : nc.dns().nameservers()) {
- servers.push_back(s);
- }
- dns["servers"] = servers;
- }
- else {
- dns["servers"] = nlohmann::json::array();
- }
- dns["domain"] = nc.dns().domain();
- out["dns"] = dns;
- }
- out["ssoEnabled"] = nc.sso_enabled();
- nlohmann::json sso;
- if (nc.sso_enabled()) {
- sso = nlohmann::json::object();
- if (nc.has_sso_client_id()) {
- sso["ssoClientId"] = nc.sso_client_id();
- }
- if (nc.has_sso_authorization_endpoint()) {
- sso["ssoAuthorizationEndpoint"] = nc.sso_authorization_endpoint();
- }
- if (nc.has_sso_issuer()) {
- sso["ssoIssuer"] = nc.sso_issuer();
- }
- if (nc.has_sso_provider()) {
- sso["ssoProvider"] = nc.sso_provider();
- }
- }
- out["ssoConfig"] = sso;
- switch (source) {
- case pbmessages::NetworkChange_ChangeSource_CV1:
- out["change_source"] = "cv1";
- break;
- case pbmessages::NetworkChange_ChangeSource_CV2:
- out["change_source"] = "cv2";
- break;
- case pbmessages::NetworkChange_ChangeSource_CONTROLLER:
- out["change_source"] = "controller";
- break;
- default:
- out["change_source"] = "unknown";
- break;
- }
- return out;
- }
- nlohmann::json toJson(const pbmessages::MemberChange_Member& mc, pbmessages::MemberChange_ChangeSource source)
- {
- nlohmann::json out;
- out["objtype"] = "member";
- out["id"] = mc.device_id();
- out["nwid"] = mc.network_id();
- if (mc.has_remote_trace_target()) {
- out["remoteTraceTarget"] = mc.remote_trace_target();
- }
- else {
- out["remoteTraceTarget"] = "";
- }
- out["authorized"] = mc.authorized();
- out["activeBridge"] = mc.active_bridge();
- auto ipAssignments = mc.ip_assignments();
- if (ipAssignments.size() > 0) {
- nlohmann::json assignments = nlohmann::json::array();
- for (const auto& ip : ipAssignments) {
- assignments.push_back(ip);
- }
- out["ipAssignments"] = assignments;
- }
- out["noAutoAssignIps"] = mc.no_auto_assign_ips();
- out["ssoExempt"] = mc.sso_exempt();
- out["authenticationExpiryTime"] = mc.auth_expiry_time();
- out["capabilities"] = OSUtils::jsonParse(mc.capabilities());
- out["creationTime"] = mc.creation_time();
- out["identity"] = mc.identity();
- out["lastAuthorizedTime"] = mc.last_authorized_time();
- out["lastDeauthorizedTime"] = mc.last_deauthorized_time();
- out["remoteTraceLevel"] = mc.remote_trace_level();
- out["revision"] = mc.revision();
- out["tags"] = OSUtils::jsonParse(mc.tags());
- out["versionMajor"] = mc.version_major();
- out["versionMinor"] = mc.version_minor();
- out["versionRev"] = mc.version_rev();
- out["versionProtocol"] = mc.version_protocol();
- switch (source) {
- case pbmessages::MemberChange_ChangeSource_CV1:
- out["change_source"] = "cv1";
- break;
- case pbmessages::MemberChange_ChangeSource_CV2:
- out["change_source"] = "cv2";
- break;
- case pbmessages::MemberChange_ChangeSource_CONTROLLER:
- out["change_source"] = "controller";
- break;
- default:
- out["change_source"] = "unknown";
- break;
- }
- return out;
- }
- } // namespace ZeroTier
- #endif // ZT_CONTROLLER_USE_LIBPQ
|