PubSubWriter.cpp 14 KB


  1. #include "PubSubWriter.hpp"
  2. #include "../../osdep/OSUtils.hpp"
  3. #include "CtlUtil.hpp"
  4. #include "OtelCarrier.hpp"
  5. #include "member.pb.h"
  6. #include "member_status.pb.h"
  7. #include "network.pb.h"
  8. #include "opentelemetry/context/propagation/global_propagator.h"
  9. #include <chrono>
  10. #include <google/cloud/options.h>
  11. #include <google/cloud/pubsub/message.h>
  12. #include <google/cloud/pubsub/publisher.h>
  13. #include <google/cloud/pubsub/topic.h>
  14. #include <opentelemetry/trace/provider.h>
  15. namespace pubsub = ::google::cloud::pubsub;
  16. namespace ZeroTier {
  17. pbmessages::NetworkChange*
  18. networkChangeFromJson(std::string controllerID, const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork);
  19. pbmessages::MemberChange*
  20. memberChangeFromJson(std::string controllerID, const nlohmann::json& oldMember, const nlohmann::json& newMember);
  21. PubSubWriter::PubSubWriter(std::string project, std::string topic, std::string controller_id)
  22. : _controller_id(controller_id)
  23. , _project(project)
  24. , _topic(topic)
  25. {
  26. fprintf(
  27. stderr, "PubSubWriter for controller %s project %s topic %s\n", controller_id.c_str(), project.c_str(),
  28. topic.c_str());
  29. GOOGLE_PROTOBUF_VERIFY_VERSION;
  30. // If PUBSUB_EMULATOR_HOST is set, create the topic if it doesn't exist
  31. const char* emulatorHost = std::getenv("PUBSUB_EMULATOR_HOST");
  32. if (emulatorHost != nullptr) {
  33. create_gcp_pubsub_topic_if_needed(project, topic);
  34. }
  35. auto options =
  36. ::google::cloud::Options {}
  37. .set<pubsub::RetryPolicyOption>(pubsub::LimitedTimeRetryPolicy(std::chrono::seconds(5)).clone())
  38. .set<pubsub::BackoffPolicyOption>(
  39. pubsub::ExponentialBackoffPolicy(std::chrono::milliseconds(100), std::chrono::seconds(2), 1.3).clone())
  40. .set<pubsub::MessageOrderingOption>(true);
  41. auto publisher = pubsub::MakePublisherConnection(pubsub::Topic(project, topic), std::move(options));
  42. _publisher = std::make_shared<pubsub::Publisher>(std::move(publisher));
  43. }
  44. PubSubWriter::~PubSubWriter()
  45. {
  46. }
  47. bool PubSubWriter::publishMessage(
  48. const std::string& payload,
  49. const std::string& frontend,
  50. const std::string& orderingKey)
  51. {
  52. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  53. auto tracer = provider->GetTracer("PubSubWriter");
  54. auto span = tracer->StartSpan("PubSubWriter::publishMessage");
  55. auto scope = tracer->WithActiveSpan(span);
  56. fprintf(stderr, "Publishing message to %s\n", _topic.c_str());
  57. std::vector<std::pair<std::string, std::string> > attributes;
  58. attributes.emplace_back("controller_id", _controller_id);
  59. std::map<std::string, std::string> attrs_map;
  60. OtelCarrier<std::map<std::string, std::string> > carrier(attrs_map);
  61. auto propagator = opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator();
  62. auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent();
  63. propagator->Inject(carrier, current_ctx);
  64. for (const auto& kv : attrs_map) {
  65. fprintf(stderr, "Attributes injected: %s=%s\n", kv.first.c_str(), kv.second.c_str());
  66. attributes.emplace_back(kv.first, kv.second);
  67. }
  68. if (! frontend.empty()) {
  69. attributes.emplace_back("frontend", frontend);
  70. }
  71. auto msg_tmp = pubsub::MessageBuilder {}.SetData(payload).SetAttributes(attributes);
  72. if (! orderingKey.empty()) {
  73. msg_tmp.SetOrderingKey(orderingKey);
  74. }
  75. auto msg = std::move(msg_tmp).Build();
  76. auto message_id = _publisher->Publish(std::move(msg)).get();
  77. if (! message_id) {
  78. fprintf(stderr, "Failed to publish message: %s\n", std::move(message_id).status().message().c_str());
  79. return false;
  80. }
  81. fprintf(stderr, "Published message to %s\n", _topic.c_str());
  82. return true;
  83. }
  84. bool PubSubWriter::publishNetworkChange(
  85. const nlohmann::json& oldNetwork,
  86. const nlohmann::json& newNetwork,
  87. const std::string& frontend)
  88. {
  89. fprintf(stderr, "Publishing network change\n");
  90. pbmessages::NetworkChange* nc = networkChangeFromJson(_controller_id, oldNetwork, newNetwork);
  91. std::string networkID;
  92. if (nc->has_new_()) {
  93. networkID = nc->new_().network_id();
  94. }
  95. else if (nc->has_old()) {
  96. networkID = nc->old().network_id();
  97. }
  98. std::string payload;
  99. if (! nc->SerializeToString(&payload)) {
  100. fprintf(stderr, "Failed to serialize NetworkChange protobuf message\n");
  101. delete nc;
  102. return false;
  103. }
  104. delete nc;
  105. return publishMessage(payload, frontend, networkID);
  106. }
  107. bool PubSubWriter::publishMemberChange(
  108. const nlohmann::json& oldMember,
  109. const nlohmann::json& newMember,
  110. const std::string& frontend)
  111. {
  112. fprintf(stderr, "Publishing member change\n");
  113. pbmessages::MemberChange* mc = memberChangeFromJson(_controller_id, oldMember, newMember);
  114. std::string memberID;
  115. if (mc->has_new_()) {
  116. memberID = mc->new_().network_id() + "-" + mc->new_().device_id();
  117. }
  118. else if (mc->has_old()) {
  119. memberID = mc->old().network_id() + "-" + mc->old().device_id();
  120. }
  121. std::string payload;
  122. if (! mc->SerializeToString(&payload)) {
  123. fprintf(stderr, "Failed to serialize MemberChange protobuf message\n");
  124. delete mc;
  125. return false;
  126. }
  127. delete mc;
  128. return publishMessage(payload, frontend, memberID);
  129. }
  130. bool PubSubWriter::publishStatusChange(
  131. std::string frontend,
  132. std::string network_id,
  133. std::string node_id,
  134. std::string os,
  135. std::string arch,
  136. std::string version,
  137. int64_t last_seen)
  138. {
  139. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  140. auto tracer = provider->GetTracer("PubSubWriter");
  141. auto span = tracer->StartSpan("PubSubWriter::publishStatusChange");
  142. auto scope = tracer->WithActiveSpan(span);
  143. pbmessages::MemberStatus_MemberStatusMetadata* metadata = new pbmessages::MemberStatus_MemberStatusMetadata();
  144. metadata->set_controller_id(_controller_id);
  145. metadata->set_trace_id(""); // TODO: generate a trace ID
  146. pbmessages::MemberStatus ms;
  147. ms.set_network_id(network_id);
  148. ms.set_member_id(node_id);
  149. ms.set_os(os);
  150. ms.set_arch(arch);
  151. ms.set_version(version);
  152. ms.set_timestamp(last_seen);
  153. ms.set_allocated_metadata(metadata);
  154. std::string payload;
  155. if (! ms.SerializeToString(&payload)) {
  156. fprintf(stderr, "Failed to serialize StatusChange protobuf message\n");
  157. return false;
  158. }
  159. return publishMessage(payload, "", "");
  160. }
  161. pbmessages::NetworkChange_Network* networkFromJson(const nlohmann::json& j)
  162. {
  163. if (! j.is_object()) {
  164. return nullptr;
  165. }
  166. pbmessages::NetworkChange_Network* n = new pbmessages::NetworkChange_Network();
  167. try {
  168. n->set_network_id(OSUtils::jsonString(j["id"], ""));
  169. n->set_name(OSUtils::jsonString(j["name"], ""));
  170. n->set_capabilities(OSUtils::jsonDump(j.value("capabilities", "[]"), -1));
  171. n->set_creation_time(OSUtils::jsonInt(j["creationTime"], 0));
  172. n->set_enable_broadcast(OSUtils::jsonBool(j["enableBroadcast"], false));
  173. for (const auto& p : j["ipAssignmentPools"]) {
  174. if (p.is_object()) {
  175. auto pool = n->add_assignment_pools();
  176. pool->set_start_ip(OSUtils::jsonString(p["ipRangeStart"], ""));
  177. pool->set_end_ip(OSUtils::jsonString(p["ipRangeEnd"], ""));
  178. }
  179. }
  180. n->set_mtu(OSUtils::jsonInt(j["mtu"], 2800));
  181. n->set_multicast_limit(OSUtils::jsonInt(j["multicastLimit"], 32));
  182. n->set_is_private(OSUtils::jsonBool(j["private"], true));
  183. n->set_remote_trace_level(OSUtils::jsonInt(j["remoteTraceLevel"], 0));
  184. n->set_remote_trace_target(OSUtils::jsonString(j["remoteTraceTarget"], ""));
  185. n->set_revision(OSUtils::jsonInt(j["revision"], 0));
  186. for (const auto& p : j["routes"]) {
  187. if (p.is_object()) {
  188. auto r = n->add_routes();
  189. r->set_target(OSUtils::jsonString(p["target"], ""));
  190. r->set_via(OSUtils::jsonString(p["via"], ""));
  191. }
  192. }
  193. std::string rules;
  194. if (j["rules"].is_array()) {
  195. rules = OSUtils::jsonDump(j["rules"], -1);
  196. }
  197. else {
  198. rules = "[]";
  199. }
  200. n->set_rules(rules);
  201. std::string tags;
  202. if (j["tags"].is_array()) {
  203. tags = OSUtils::jsonDump(j["tags"], -1);
  204. }
  205. else {
  206. tags = "[]";
  207. }
  208. n->set_tags(tags);
  209. pbmessages::NetworkChange_IPV4AssignMode* v4am = new pbmessages::NetworkChange_IPV4AssignMode();
  210. if (j["v4AssignMode"].is_object()) {
  211. nlohmann::json am = j["v4AssignMode"];
  212. v4am->set_zt(OSUtils::jsonBool(am["zt"], false));
  213. }
  214. n->set_allocated_ipv4_assign_mode(v4am);
  215. pbmessages::NetworkChange_IPV6AssignMode* v6am = new pbmessages::NetworkChange_IPV6AssignMode();
  216. if (j["v6AssignMode"].is_object()) {
  217. nlohmann::json am = j["v6AssignMode"];
  218. v6am->set_zt(OSUtils::jsonBool(am["zt"], false));
  219. v6am->set_six_plane(OSUtils::jsonBool(am["6plane"], false));
  220. v6am->set_rfc4193(OSUtils::jsonBool(am["rfc4193"], false));
  221. }
  222. n->set_allocated_ipv6_assign_mode(v6am);
  223. nlohmann::json jdns = j["dns"];
  224. if (jdns.is_object()) {
  225. pbmessages::NetworkChange_DNS* dns = new pbmessages::NetworkChange_DNS();
  226. dns->set_domain(jdns.value("domain", ""));
  227. for (const auto& s : jdns["servers"]) {
  228. if (s.is_string()) {
  229. auto server = dns->add_nameservers();
  230. *server = s;
  231. }
  232. }
  233. n->set_allocated_dns(dns);
  234. }
  235. n->set_sso_enabled(OSUtils::jsonBool(j["ssoEnabled"], false));
  236. nlohmann::json ssocfg = j["ssoConfig"];
  237. if (ssocfg.is_object()) {
  238. n->set_sso_provider(OSUtils::jsonString(ssocfg["provider"], ""));
  239. n->set_sso_client_id(OSUtils::jsonString(ssocfg["clientId"], ""));
  240. n->set_sso_authorization_endpoint(OSUtils::jsonString(ssocfg["authorizationEndpoint"], ""));
  241. n->set_sso_issuer(OSUtils::jsonString(ssocfg["issuer"], ""));
  242. n->set_sso_provider(OSUtils::jsonString(ssocfg["provider"], ""));
  243. }
  244. n->set_rules_source(OSUtils::jsonString(j["rulesSource"], ""));
  245. }
  246. catch (const std::exception& e) {
  247. fprintf(stderr, "Exception parsing network JSON: %s\n", e.what());
  248. delete n;
  249. return nullptr;
  250. }
  251. return n;
  252. }
  253. pbmessages::NetworkChange*
  254. networkChangeFromJson(std::string controllerID, const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork)
  255. {
  256. pbmessages::NetworkChange* nc = new pbmessages::NetworkChange();
  257. nc->set_allocated_old(networkFromJson(oldNetwork));
  258. nc->set_allocated_new_(networkFromJson(newNetwork));
  259. nc->set_change_source(pbmessages::NetworkChange_ChangeSource::NetworkChange_ChangeSource_CONTROLLER);
  260. pbmessages::NetworkChange_NetworkChangeMetadata* metadata = new pbmessages::NetworkChange_NetworkChangeMetadata();
  261. metadata->set_controller_id(controllerID);
  262. metadata->set_trace_id(""); // TODO: generate a trace ID
  263. nc->set_allocated_metadata(metadata);
  264. return nc;
  265. }
  266. pbmessages::MemberChange_Member* memberFromJson(const nlohmann::json& j)
  267. {
  268. if (! j.is_object()) {
  269. fprintf(stderr, "memberFromJson: JSON is not an object\n");
  270. return nullptr;
  271. }
  272. fprintf(stderr, "memberFromJSON: %s\n", j.dump().c_str());
  273. pbmessages::MemberChange_Member* m = new pbmessages::MemberChange_Member();
  274. try {
  275. m->set_network_id(OSUtils::jsonString(j["nwid"], ""));
  276. m->set_device_id(OSUtils::jsonString(j["id"], ""));
  277. m->set_identity(OSUtils::jsonString(j["identity"], ""));
  278. m->set_authorized(OSUtils::jsonBool(j["authorized"], false));
  279. if (j["ipAssignments"].is_array()) {
  280. for (const auto& addr : j["ipAssignments"]) {
  281. if (addr.is_string()) {
  282. auto a = m->add_ip_assignments();
  283. std::string address = addr.get<std::string>();
  284. *a = address;
  285. }
  286. }
  287. }
  288. m->set_active_bridge(OSUtils::jsonBool(j["activeBridge"], false));
  289. if (j["tags"].is_array()) {
  290. nlohmann::json tags = j["tags"];
  291. std::string tagsStr = OSUtils::jsonDump(tags, -1);
  292. m->set_tags(tagsStr);
  293. }
  294. else {
  295. nlohmann::json tags = nlohmann::json::array();
  296. std::string tagsStr = OSUtils::jsonDump(tags, -1);
  297. m->set_tags(tagsStr);
  298. }
  299. if (j["capabilities"].is_array()) {
  300. nlohmann::json caps = j["capabilities"];
  301. std::string capsStr = OSUtils::jsonDump(caps, -1);
  302. m->set_capabilities(capsStr);
  303. }
  304. else {
  305. nlohmann::json caps = nlohmann::json::array();
  306. std::string capsStr = OSUtils::jsonDump(caps, -1);
  307. m->set_capabilities(capsStr);
  308. }
  309. m->set_creation_time(OSUtils::jsonInt(j["creationTime"], 0));
  310. m->set_no_auto_assign_ips(OSUtils::jsonBool(j["noAutoAssignIps"], false));
  311. m->set_revision(OSUtils::jsonInt(j["revision"], 0));
  312. m->set_last_authorized_time(OSUtils::jsonInt(j["lastAuthorizedTime"], 0));
  313. m->set_last_deauthorized_time(OSUtils::jsonInt(j["lastDeauthorizedTime"], 0));
  314. m->set_last_authorized_credential_type(OSUtils::jsonString(j["lastAuthorizedCredentialType"], ""));
  315. m->set_last_authorized_credential(OSUtils::jsonString(j["lastAuthorizedCredential"], ""));
  316. m->set_version_major(OSUtils::jsonInt(j["versionMajor"], 0));
  317. m->set_version_minor(OSUtils::jsonInt(j["versionMinor"], 0));
  318. m->set_version_rev(OSUtils::jsonInt(j["versionRev"], 0));
  319. m->set_version_protocol(OSUtils::jsonInt(j["versionProtocol"], 0));
  320. m->set_remote_trace_level(OSUtils::jsonInt(j["remoteTraceLevel"], 0));
  321. m->set_remote_trace_target(OSUtils::jsonString(j["remoteTraceTarget"], ""));
  322. m->set_sso_exempt(OSUtils::jsonBool(j["ssoExempt"], false));
  323. m->set_auth_expiry_time(OSUtils::jsonInt(j["authExpiryTime"], 0));
  324. }
  325. catch (const std::exception& e) {
  326. fprintf(stderr, "Exception parsing member JSON: %s\n", e.what());
  327. delete m;
  328. return nullptr;
  329. }
  330. fprintf(stderr, "memberFromJSON complete\n");
  331. return m;
  332. }
  333. pbmessages::MemberChange*
  334. memberChangeFromJson(std::string controllerID, const nlohmann::json& oldMember, const nlohmann::json& newMember)
  335. {
  336. fprintf(stderr, "memberrChangeFromJson: old: %s\n", oldMember.dump().c_str());
  337. fprintf(stderr, "memberrChangeFromJson: new: %s\n", newMember.dump().c_str());
  338. pbmessages::MemberChange* mc = new pbmessages::MemberChange();
  339. pbmessages::MemberChange_Member* om = memberFromJson(oldMember);
  340. if (om != nullptr) {
  341. mc->set_allocated_old(om);
  342. }
  343. pbmessages::MemberChange_Member* nm = memberFromJson(newMember);
  344. if (nm != nullptr) {
  345. mc->set_allocated_new_(nm);
  346. }
  347. mc->set_change_source(pbmessages::MemberChange_ChangeSource::MemberChange_ChangeSource_CONTROLLER);
  348. pbmessages::MemberChange_MemberChangeMetadata* metadata = new pbmessages::MemberChange_MemberChangeMetadata();
  349. metadata->set_controller_id(controllerID);
  350. metadata->set_trace_id(""); // TODO: generate a trace ID
  351. mc->set_allocated_metadata(metadata);
  352. return mc;
  353. }
  354. } // namespace ZeroTier