Răsfoiți Sursa

rework the pubsub listener again so the subscription is set up on its own thread

Grant Limberg 1 săptămână în urmă
părinte
comite
db6e698245
2 a modificat fișierele cu 34 adăugiri și 20 ștergeri
  1. 34 19
      controller/PubSubListener.cpp
  2. 0 1
      controller/PubSubListener.hpp

+ 34 - 19
controller/PubSubListener.cpp

@@ -58,7 +58,6 @@ PubSubListener::PubSubListener(std::string controller_id, std::string project, s
 PubSubListener::~PubSubListener()
 {
 	_run = false;
-	_session.cancel();
 	if (_subscriberThread.joinable()) {
 		_subscriberThread.join();
 	}
@@ -67,24 +66,40 @@ PubSubListener::~PubSubListener()
 void PubSubListener::subscribe()
 {
 	while (_run) {
-		_session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) {
-			auto provider = opentelemetry::trace::Provider::GetTracerProvider();
-			auto tracer = provider->GetTracer("PubSubListener");
-			auto span = tracer->StartSpan("PubSubListener::onMessage");
-			auto scope = tracer->WithActiveSpan(span);
-			span->SetAttribute("message_id", m.message_id());
-			span->SetAttribute("ordering_key", m.ordering_key());
-			span->SetAttribute("attributes", m.attributes().size());
-
-			fprintf(stderr, "Received message %s\n", m.message_id().c_str());
-			onNotification(m.data());
-			std::move(h).ack();
-			span->SetStatus(opentelemetry::trace::StatusCode::kOk);
-			return true;
-		});
-		auto status = _session.get();
-		if (! status.ok() && _run) {
-			fprintf(stderr, "Error during Subscribe: %s\n", status.message().c_str());
+		try {
+			fprintf(stderr, "Starting new subscription session\n");
+			auto session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) {
+				auto provider = opentelemetry::trace::Provider::GetTracerProvider();
+				auto tracer = provider->GetTracer("PubSubListener");
+				auto span = tracer->StartSpan("PubSubListener::onMessage");
+				auto scope = tracer->WithActiveSpan(span);
+				span->SetAttribute("message_id", m.message_id());
+				span->SetAttribute("ordering_key", m.ordering_key());
+				span->SetAttribute("attributes", m.attributes().size());
+
+				fprintf(stderr, "Received message %s\n", m.message_id().c_str());
+				onNotification(m.data());
+				std::move(h).ack();
+				span->SetStatus(opentelemetry::trace::StatusCode::kOk);
+				return true;
+			});
+
+			auto result = session.wait_for(std::chrono::seconds(10));
+			if (result == std::future_status::timeout) {
+				session.cancel();
+				std::this_thread::sleep_for(std::chrono::seconds(5));
+				continue;
+			}
+
+			if (! session.valid()) {
+				fprintf(stderr, "Subscription session no longer valid\n");
+				std::this_thread::sleep_for(std::chrono::seconds(5));
+				continue;
+			}
+		}
+		catch (google::cloud::Status const& status) {
+			fprintf(stderr, "Subscription terminated with status: %s\n", status.message().c_str());
+			std::this_thread::sleep_for(std::chrono::seconds(5));
 		}
 	}
 }

+ 0 - 1
controller/PubSubListener.hpp

@@ -44,7 +44,6 @@ class PubSubListener : public NotificationListener {
 	google::cloud::pubsub_admin::SubscriptionAdminClient _adminClient;
 	google::cloud::pubsub::Subscription _subscription;
 	std::shared_ptr<google::cloud::pubsub::Subscriber> _subscriber;
-	google::cloud::future<google::cloud::Status> _session;
 	std::thread _subscriberThread;
 };