PubSubWriter.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. #include "PubSubWriter.hpp"
  2. #include "../../osdep/OSUtils.hpp"
  3. #include "CtlUtil.hpp"
  4. #include "member.pb.h"
  5. #include "member_status.pb.h"
  6. #include "network.pb.h"
  7. #include <chrono>
  8. #include <google/cloud/options.h>
  9. #include <google/cloud/pubsub/message.h>
  10. #include <google/cloud/pubsub/publisher.h>
  11. #include <google/cloud/pubsub/topic.h>
  12. #include <opentelemetry/trace/provider.h>
  13. namespace pubsub = ::google::cloud::pubsub;
  14. namespace ZeroTier {
  15. pbmessages::NetworkChange
  16. networkChangeFromJson(std::string controllerID, const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork);
  17. pbmessages::MemberChange
  18. memberChangeFromJson(std::string controllerID, const nlohmann::json& oldMember, const nlohmann::json& newMember);
  19. PubSubWriter::PubSubWriter(std::string project, std::string topic, std::string controller_id)
  20. : _controller_id(controller_id)
  21. , _project(project)
  22. , _topic(topic)
  23. {
  24. fprintf(
  25. stderr, "PubSubWriter for controller %s project %s topic %s\n", controller_id.c_str(), project.c_str(),
  26. topic.c_str());
  27. GOOGLE_PROTOBUF_VERIFY_VERSION;
  28. // If PUBSUB_EMULATOR_HOST is set, create the topic if it doesn't exist
  29. const char* emulatorHost = std::getenv("PUBSUB_EMULATOR_HOST");
  30. if (emulatorHost != nullptr) {
  31. create_gcp_pubsub_topic_if_needed(project, topic);
  32. }
  33. auto options =
  34. ::google::cloud::Options {}
  35. .set<pubsub::RetryPolicyOption>(pubsub::LimitedTimeRetryPolicy(std::chrono::seconds(5)).clone())
  36. .set<pubsub::BackoffPolicyOption>(
  37. pubsub::ExponentialBackoffPolicy(std::chrono::milliseconds(100), std::chrono::seconds(2), 1.3).clone());
  38. auto publisher = pubsub::MakePublisherConnection(pubsub::Topic(project, topic), std::move(options));
  39. _publisher = std::make_shared<pubsub::Publisher>(std::move(publisher));
  40. }
  41. PubSubWriter::~PubSubWriter()
  42. {
  43. }
  44. bool PubSubWriter::publishMessage(const std::string& payload, const std::string& frontend)
  45. {
  46. std::vector<std::pair<std::string, std::string> > attributes;
  47. attributes.emplace_back("controller_id", _controller_id);
  48. if (! frontend.empty()) {
  49. attributes.emplace_back("frontend", frontend);
  50. }
  51. auto msg = pubsub::MessageBuilder {}.SetData(payload).SetAttributes(attributes).Build();
  52. auto message_id = _publisher->Publish(std::move(msg)).get();
  53. if (! message_id) {
  54. fprintf(stderr, "Failed to publish message: %s\n", std::move(message_id).status().message().c_str());
  55. return false;
  56. }
  57. fprintf(stderr, "Published message to %s\n", _topic.c_str());
  58. return true;
  59. }
  60. bool PubSubWriter::publishNetworkChange(
  61. const nlohmann::json& oldNetwork,
  62. const nlohmann::json& newNetwork,
  63. const std::string& frontend)
  64. {
  65. pbmessages::NetworkChange nc = networkChangeFromJson(_controller_id, oldNetwork, newNetwork);
  66. std::string payload;
  67. if (! nc.SerializeToString(&payload)) {
  68. fprintf(stderr, "Failed to serialize NetworkChange protobuf message\n");
  69. return false;
  70. }
  71. return publishMessage(payload, frontend);
  72. }
  73. bool PubSubWriter::publishMemberChange(
  74. const nlohmann::json& oldMember,
  75. const nlohmann::json& newMember,
  76. const std::string& frontend)
  77. {
  78. pbmessages::MemberChange mc = memberChangeFromJson(_controller_id, oldMember, newMember);
  79. std::string payload;
  80. if (! mc.SerializeToString(&payload)) {
  81. fprintf(stderr, "Failed to serialize MemberChange protobuf message\n");
  82. return false;
  83. }
  84. return publishMessage(payload, frontend);
  85. }
  86. bool PubSubWriter::publishStatusChange(
  87. std::string frontend,
  88. std::string network_id,
  89. std::string node_id,
  90. std::string os,
  91. std::string arch,
  92. std::string version,
  93. int64_t last_seen)
  94. {
  95. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  96. auto tracer = provider->GetTracer("PubSubWriter");
  97. auto span = tracer->StartSpan("PubSubWriter::publishStatusChange");
  98. auto scope = tracer->WithActiveSpan(span);
  99. pbmessages::MemberStatus_MemberStatusMetadata* metadata = new pbmessages::MemberStatus_MemberStatusMetadata();
  100. metadata->set_controller_id(_controller_id);
  101. metadata->set_trace_id(""); // TODO: generate a trace ID
  102. pbmessages::MemberStatus ms;
  103. ms.set_network_id(network_id);
  104. ms.set_member_id(node_id);
  105. ms.set_os(os);
  106. ms.set_arch(arch);
  107. ms.set_version(version);
  108. ms.set_timestamp(last_seen);
  109. ms.set_allocated_metadata(metadata);
  110. std::string payload;
  111. if (! ms.SerializeToString(&payload)) {
  112. fprintf(stderr, "Failed to serialize StatusChange protobuf message\n");
  113. return false;
  114. }
  115. return publishMessage(payload, "");
  116. }
  117. pbmessages::NetworkChange_Network* networkFromJson(const nlohmann::json& j)
  118. {
  119. if (! j.is_object()) {
  120. return nullptr;
  121. }
  122. pbmessages::NetworkChange_Network* n = new pbmessages::NetworkChange_Network();
  123. try {
  124. n->set_network_id(j.value("id", ""));
  125. n->set_name(j.value("name", ""));
  126. n->set_capabilities(OSUtils::jsonDump(j.value("capabilities", "[]"), -1));
  127. n->set_creation_time(j.value("creationTime", 0));
  128. n->set_enable_broadcast(j.value("enableBroadcast", false));
  129. for (const auto& p : j["ipAssignmentPools"]) {
  130. if (p.is_object()) {
  131. auto pool = n->add_assignment_pools();
  132. pool->set_start_ip(p.value("ipRangeStart", ""));
  133. pool->set_end_ip(p.value("ipRangeEnd", ""));
  134. }
  135. }
  136. n->set_mtu(j.value("mtu", 2800));
  137. n->set_multicast_limit(j.value("multicastLimit", 32));
  138. n->set_is_private(j.value("private", true));
  139. n->set_remote_trace_level(j.value("remoteTraceLevel", 0));
  140. n->set_remote_trace_target(j.value("remoteTraceTarget", ""));
  141. n->set_revision(j.value("revision", 0));
  142. for (const auto& p : j["routes"]) {
  143. if (p.is_object()) {
  144. auto r = n->add_routes();
  145. r->set_target(p.value("target", ""));
  146. r->set_via(p.value("via", ""));
  147. }
  148. }
  149. n->set_rules("");
  150. n->set_tags(OSUtils::jsonDump(j.value("tags", "[]"), -1));
  151. pbmessages::NetworkChange_IPV4AssignMode* v4am = new pbmessages::NetworkChange_IPV4AssignMode();
  152. if (j["v4AssignMode"].is_object()) {
  153. v4am->set_zt(j["v4AssignMode"].value("zt", false));
  154. }
  155. n->set_allocated_ipv4_assign_mode(v4am);
  156. pbmessages::NetworkChange_IPV6AssignMode* v6am = new pbmessages::NetworkChange_IPV6AssignMode();
  157. if (j["v6AssignMode"].is_object()) {
  158. v6am->set_zt(j["v6AssignMode"].value("zt", false));
  159. v6am->set_six_plane(j["v6AssignMode"].value("6plane", false));
  160. v6am->set_rfc4193(j["v6AssignMode"].value("rfc4193", false));
  161. }
  162. n->set_allocated_ipv6_assign_mode(v6am);
  163. nlohmann::json jdns = j.value("dns", nullptr);
  164. if (jdns.is_object()) {
  165. pbmessages::NetworkChange_DNS* dns = new pbmessages::NetworkChange_DNS();
  166. dns->set_domain(jdns.value("domain", ""));
  167. for (const auto& s : jdns["servers"]) {
  168. if (s.is_string()) {
  169. auto server = dns->add_nameservers();
  170. *server = s;
  171. }
  172. }
  173. n->set_allocated_dns(dns);
  174. }
  175. n->set_sso_enabled(j.value("ssoEnabled", false));
  176. if (j.value("ssoEnabled", false)) {
  177. n->set_sso_provider(j.value("provider", ""));
  178. n->set_sso_client_id(j.value("clientId", ""));
  179. n->set_sso_authorization_endpoint(j.value("authorizationEndpoint", ""));
  180. n->set_sso_issuer(j.value("issuer", ""));
  181. n->set_sso_provider(j.value("provider", ""));
  182. }
  183. n->set_rules_source(j.value("rulesSource", ""));
  184. }
  185. catch (const std::exception& e) {
  186. fprintf(stderr, "Exception parsing network JSON: %s\n", e.what());
  187. delete n;
  188. return nullptr;
  189. }
  190. return n;
  191. }
  192. pbmessages::NetworkChange
  193. networkChangeFromJson(std::string controllerID, const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork)
  194. {
  195. pbmessages::NetworkChange nc;
  196. nc.set_allocated_old(networkFromJson(oldNetwork));
  197. nc.set_allocated_new_(networkFromJson(newNetwork));
  198. nc.set_change_source(pbmessages::NetworkChange_ChangeSource::NetworkChange_ChangeSource_CONTROLLER);
  199. pbmessages::NetworkChange_NetworkChangeMetadata* metadata = new pbmessages::NetworkChange_NetworkChangeMetadata();
  200. metadata->set_controller_id(controllerID);
  201. metadata->set_trace_id(""); // TODO: generate a trace ID
  202. nc.set_allocated_metadata(metadata);
  203. return nc;
  204. }
  205. pbmessages::MemberChange_Member* memberFromJson(const nlohmann::json& j)
  206. {
  207. if (! j.is_object()) {
  208. return nullptr;
  209. }
  210. fprintf(stderr, "memberFromJSON: %s\n", j.dump().c_str());
  211. pbmessages::MemberChange_Member* m = new pbmessages::MemberChange_Member();
  212. try {
  213. m->set_network_id(j.value("networkId", ""));
  214. m->set_device_id(j.value("id", ""));
  215. m->set_identity(j.value("identity", ""));
  216. m->set_authorized(j.value("authorized", false));
  217. for (const auto& addr : j.value("ipAssignments", nlohmann::json::array())) {
  218. if (addr.is_string()) {
  219. auto a = m->add_ip_assignments();
  220. *a = addr;
  221. }
  222. }
  223. m->set_active_bridge(j.value("activeBridge", false));
  224. m->set_tags(OSUtils::jsonDump(j.value("tags", "[]"), -1));
  225. m->set_capabilities(OSUtils::jsonDump(j.value("capabilities", "[]"), -1));
  226. m->set_creation_time(j.value("creationTime", 0));
  227. m->set_no_auto_assign_ips(j.value("noAutoAssignIps", false));
  228. m->set_revision(j.value("revision", 0));
  229. m->set_last_authorized_time(j.value("lastAuthorizedTime", 0));
  230. m->set_last_deauthorized_time(j.value("lastDeauthorizedTime", 0));
  231. m->set_last_authorized_credential_type(j.value("lastAuthorizedCredentialType", nullptr));
  232. m->set_last_authorized_credential(j.value("lastAuthorizedCredential", nullptr));
  233. m->set_version_major(j.value("versionMajor", 0));
  234. m->set_version_minor(j.value("versionMinor", 0));
  235. m->set_version_rev(j.value("versionRev", 0));
  236. m->set_version_protocol(j.value("versionProtocol", 0));
  237. m->set_remote_trace_level(j.value("remoteTraceLevel", 0));
  238. m->set_remote_trace_target(j.value("remoteTraceTarget", ""));
  239. m->set_sso_exempt(j.value("ssoExempt", false));
  240. m->set_auth_expiry_time(j.value("authExpiryTime", 0));
  241. }
  242. catch (const std::exception& e) {
  243. fprintf(stderr, "Exception parsing member JSON: %s\n", e.what());
  244. delete m;
  245. return nullptr;
  246. }
  247. return m;
  248. }
  249. pbmessages::MemberChange
  250. memberChangeFromJson(std::string controllerID, const nlohmann::json& oldMember, const nlohmann::json& newMember)
  251. {
  252. pbmessages::MemberChange mc;
  253. mc.set_allocated_old(memberFromJson(oldMember));
  254. mc.set_allocated_new_(memberFromJson(newMember));
  255. mc.set_change_source(pbmessages::MemberChange_ChangeSource::MemberChange_ChangeSource_CONTROLLER);
  256. pbmessages::MemberChange_MemberChangeMetadata* metadata = new pbmessages::MemberChange_MemberChangeMetadata();
  257. metadata->set_controller_id(controllerID);
  258. metadata->set_trace_id(""); // TODO: generate a trace ID
  259. mc.set_allocated_metadata(metadata);
  260. return mc;
  261. }
  262. } // namespace ZeroTier