PubSubWriter.cpp 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. #include "PubSubWriter.hpp"
  2. #include "CtlUtil.hpp"
  3. #include "member.pb.h"
  4. #include "member_status.pb.h"
  5. #include "network.pb.h"
  6. #include <chrono>
  7. #include <google/cloud/options.h>
  8. #include <google/cloud/pubsub/message.h>
  9. #include <google/cloud/pubsub/publisher.h>
  10. #include <google/cloud/pubsub/topic.h>
  11. #include <opentelemetry/trace/provider.h>
  12. namespace pubsub = ::google::cloud::pubsub;
  13. namespace ZeroTier {
  14. PubSubWriter::PubSubWriter(std::string project, std::string topic, std::string controller_id)
  15. : _controller_id(controller_id)
  16. , _project(project)
  17. , _topic(topic)
  18. {
  19. fprintf(
  20. stderr, "PubSubWriter for controller %s project %s topic %s\n", controller_id.c_str(), project.c_str(),
  21. topic.c_str());
  22. GOOGLE_PROTOBUF_VERIFY_VERSION;
  23. // If PUBSUB_EMULATOR_HOST is set, create the topic if it doesn't exist
  24. const char* emulatorHost = std::getenv("PUBSUB_EMULATOR_HOST");
  25. if (emulatorHost != nullptr) {
  26. create_gcp_pubsub_topic_if_needed(project, topic);
  27. }
  28. auto options =
  29. ::google::cloud::Options {}
  30. .set<pubsub::RetryPolicyOption>(pubsub::LimitedTimeRetryPolicy(std::chrono::seconds(5)).clone())
  31. .set<pubsub::BackoffPolicyOption>(
  32. pubsub::ExponentialBackoffPolicy(std::chrono::milliseconds(100), std::chrono::seconds(2), 1.3).clone());
  33. auto publisher = pubsub::MakePublisherConnection(pubsub::Topic(project, topic), std::move(options));
  34. _publisher = std::make_shared<pubsub::Publisher>(std::move(publisher));
  35. }
  36. PubSubWriter::~PubSubWriter()
  37. {
  38. }
  39. bool PubSubWriter::publishMessage(const std::string& payload, const std::string& frontend)
  40. {
  41. std::vector<std::pair<std::string, std::string> > attributes;
  42. if (! frontend.empty()) {
  43. attributes.emplace_back("frontend", frontend);
  44. }
  45. attributes.emplace_back("controller_id", _controller_id);
  46. auto msg = pubsub::MessageBuilder {}.SetData(payload).SetAttributes(attributes).Build();
  47. auto message_id = _publisher->Publish(std::move(msg)).get();
  48. if (! message_id) {
  49. fprintf(stderr, "Failed to publish message: %s\n", std::move(message_id).status().message().c_str());
  50. return false;
  51. }
  52. fprintf(stderr, "Published message to %s\n", _topic.c_str());
  53. return true;
  54. }
  55. bool PubSubWriter::publishNetworkChange(const nlohmann::json& networkJson, const std::string& frontend)
  56. {
  57. pbmessages::NetworkChange nc;
  58. // nc.mutable_new_()->CopyFrom(fromJson<pbmessages::NetworkChange_Network>(networkJson));
  59. std::string payload;
  60. if (! nc.SerializeToString(&payload)) {
  61. fprintf(stderr, "Failed to serialize NetworkChange protobuf message\n");
  62. return false;
  63. }
  64. return publishMessage(payload, frontend);
  65. }
  66. bool PubSubWriter::publishMemberChange(const nlohmann::json& memberJson, const std::string& frontend)
  67. {
  68. pbmessages::MemberChange mc;
  69. // mc.mutable_new_()->CopyFrom(fromJson<pbmessages::MemberChange_Member>(memberJson));
  70. std::string payload;
  71. if (! mc.SerializeToString(&payload)) {
  72. fprintf(stderr, "Failed to serialize MemberChange protobuf message\n");
  73. return false;
  74. }
  75. return publishMessage(payload, frontend);
  76. }
  77. bool PubSubWriter::publishStatusChange(
  78. std::string frontend,
  79. std::string network_id,
  80. std::string node_id,
  81. std::string os,
  82. std::string arch,
  83. std::string version,
  84. int64_t last_seen)
  85. {
  86. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  87. auto tracer = provider->GetTracer("PubSubWriter");
  88. auto span = tracer->StartSpan("PubSubWriter::publishStatusChange");
  89. auto scope = tracer->WithActiveSpan(span);
  90. pbmessages::MemberStatus_MemberStatusMetadata* metadata = new pbmessages::MemberStatus_MemberStatusMetadata();
  91. metadata->set_controller_id(_controller_id);
  92. metadata->set_trace_id(""); // TODO: generate a trace ID
  93. pbmessages::MemberStatus ms;
  94. ms.set_network_id(network_id);
  95. ms.set_member_id(node_id);
  96. ms.set_os(os);
  97. ms.set_arch(arch);
  98. ms.set_version(version);
  99. ms.set_timestamp(last_seen);
  100. ms.set_allocated_metadata(metadata);
  101. std::string payload;
  102. if (! ms.SerializeToString(&payload)) {
  103. fprintf(stderr, "Failed to serialize StatusChange protobuf message\n");
  104. return false;
  105. }
  106. return publishMessage(payload, frontend);
  107. }
  108. } // namespace ZeroTier