浏览代码

add explicit nack if there's an error processing a pubsub message

Grant Limberg 3 周之前
父节点
当前提交
18714c7785

+ 1 - 1
nonfree/controller/NotificationListener.hpp

@@ -24,7 +24,7 @@ class NotificationListener {
 	 *
 	 * @param payload The payload of the notification.
 	 */
-	virtual void onNotification(const std::string& payload) = 0;
+	virtual bool onNotification(const std::string& payload) = 0;
 };
 
 }	// namespace ZeroTier

+ 14 - 4
nonfree/controller/PostgreSQL.cpp

@@ -12,7 +12,11 @@
 
 namespace ZeroTier {
 
-PostgresMemberListener::PostgresMemberListener(DB* db, std::shared_ptr<ConnectionPool<PostgresConnection> > pool, const std::string& channel, uint64_t timeout)
+PostgresMemberListener::PostgresMemberListener(
+	DB* db,
+	std::shared_ptr<ConnectionPool<PostgresConnection> > pool,
+	const std::string& channel,
+	uint64_t timeout)
 	: NotificationListener()
 	, _db(db)
 	, _pool(pool)
@@ -45,7 +49,7 @@ void PostgresMemberListener::listen()
 	}
 }
 
-void PostgresMemberListener::onNotification(const std::string& payload)
+bool PostgresMemberListener::onNotification(const std::string& payload)
 {
 	auto provider = opentelemetry::trace::Provider::GetTracerProvider();
 	auto tracer = provider->GetTracer("PostgresMemberNotificationListener");
@@ -83,9 +87,14 @@ void PostgresMemberListener::onNotification(const std::string& payload)
 			fprintf(stderr, "member delete payload sent\n");
 		}
 	}
+	return true;
 }
 
-PostgresNetworkListener::PostgresNetworkListener(DB* db, std::shared_ptr<ConnectionPool<PostgresConnection> > pool, const std::string& channel, uint64_t timeout)
+PostgresNetworkListener::PostgresNetworkListener(
+	DB* db,
+	std::shared_ptr<ConnectionPool<PostgresConnection> > pool,
+	const std::string& channel,
+	uint64_t timeout)
 	: NotificationListener()
 	, _db(db)
 	, _pool(pool)
@@ -118,7 +127,7 @@ void PostgresNetworkListener::listen()
 	}
 }
 
-void PostgresNetworkListener::onNotification(const std::string& payload)
+bool PostgresNetworkListener::onNotification(const std::string& payload)
 {
 	auto provider = opentelemetry::trace::Provider::GetTracerProvider();
 	auto tracer = provider->GetTracer("db_network_notification");
@@ -166,6 +175,7 @@ void PostgresNetworkListener::onNotification(const std::string& payload)
 			fprintf(stderr, "network delete payload sent\n");
 		}
 	}
+	return true;
 }
 
 }	// namespace ZeroTier

+ 2 - 2
nonfree/controller/PostgreSQL.hpp

@@ -221,7 +221,7 @@ class PostgresMemberListener : public NotificationListener {
 
 	virtual void listen();
 
-	virtual void onNotification(const std::string& payload) override;
+	virtual bool onNotification(const std::string& payload) override;
 
   private:
 	bool _run = false;
@@ -244,7 +244,7 @@ class PostgresNetworkListener : public NotificationListener {
 
 	virtual void listen();
 
-	virtual void onNotification(const std::string& payload) override;
+	virtual bool onNotification(const std::string& payload) override;
 
   private:
 	bool _run = false;

+ 23 - 15
nonfree/controller/PubSubListener.cpp

@@ -83,10 +83,16 @@ void PubSubListener::subscribe()
 				span->SetAttribute("ordering_key", m.ordering_key());
 
 				fprintf(stderr, "Received message %s\n", m.message_id().c_str());
-				onNotification(m.data());
-				std::move(h).ack();
-				span->SetStatus(opentelemetry::trace::StatusCode::kOk);
-				return true;
+				if (onNotification(m.data())) {
+					std::move(h).ack();
+					span->SetStatus(opentelemetry::trace::StatusCode::kOk);
+					return true;
+				}
+				else {
+					std::move(h).nack();
+					span->SetStatus(opentelemetry::trace::StatusCode::kError, "onNotification failed");
+					return false;
+				}
 			});
 
 			auto result = session.wait_for(std::chrono::seconds(10));
@@ -119,7 +125,7 @@ PubSubNetworkListener::~PubSubNetworkListener()
 {
 }
 
-void PubSubNetworkListener::onNotification(const std::string& payload)
+bool PubSubNetworkListener::onNotification(const std::string& payload)
 {
 	auto provider = opentelemetry::trace::Provider::GetTracerProvider();
 	auto tracer = provider->GetTracer("PubSubNetworkListener");
@@ -131,7 +137,7 @@ void PubSubNetworkListener::onNotification(const std::string& payload)
 		fprintf(stderr, "Failed to parse NetworkChange protobuf message\n");
 		span->SetAttribute("error", "Failed to parse NetworkChange protobuf message");
 		span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf");
-		return;
+		return false;
 	}
 	fprintf(stderr, "PubSubNetworkListener: parsed protobuf message. %s\n", nc.DebugString().c_str());
 	fprintf(stderr, "Network notification received\n");
@@ -153,7 +159,7 @@ void PubSubNetworkListener::onNotification(const std::string& payload)
 			fprintf(stderr, "NetworkChange message has no old or new network config\n");
 			span->SetAttribute("error", "NetworkChange message has no old or new network config");
 			span->SetStatus(opentelemetry::trace::StatusCode::kError, "No old or new config");
-			return;
+			return false;
 		}
 
 		if (oldConfig.is_object() && newConfig.is_object()) {
@@ -187,21 +193,22 @@ void PubSubNetworkListener::onNotification(const std::string& payload)
 		span->SetAttribute("error", e.what());
 		span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
 		fprintf(stderr, "payload: %s\n", payload.c_str());
-		return;
+		return false;
 	}
 	catch (const std::exception& e) {
 		fprintf(stderr, "PubSubNetworkListener Exception in PubSubNetworkListener: %s\n", e.what());
 		span->SetAttribute("error", e.what());
 		span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
-		return;
+		return false;
 	}
 	catch (...) {
 		fprintf(stderr, "PubSubNetworkListener Unknown exception in PubSubNetworkListener\n");
 		span->SetAttribute("error", "Unknown exception in PubSubNetworkListener");
 		span->SetStatus(opentelemetry::trace::StatusCode::kError, "Unknown exception");
-		return;
+		return false;
 	}
 	fprintf(stderr, "PubSubNetworkListener onNotification complete\n");
+	return true;
 }
 
 PubSubMemberListener::PubSubMemberListener(std::string controller_id, std::string project, std::string topic, DB* db)
@@ -214,7 +221,7 @@ PubSubMemberListener::~PubSubMemberListener()
 {
 }
 
-void PubSubMemberListener::onNotification(const std::string& payload)
+bool PubSubMemberListener::onNotification(const std::string& payload)
 {
 	auto provider = opentelemetry::trace::Provider::GetTracerProvider();
 	auto tracer = provider->GetTracer("PubSubMemberListener");
@@ -226,7 +233,7 @@ void PubSubMemberListener::onNotification(const std::string& payload)
 		fprintf(stderr, "Failed to parse MemberChange protobuf message\n");
 		span->SetAttribute("error", "Failed to parse MemberChange protobuf message");
 		span->SetStatus(opentelemetry::trace::StatusCode::kError, "Failed to parse protobuf");
-		return;
+		return false;
 	}
 	fprintf(stderr, "PubSubMemberListener: parsed protobuf message. %s\n", mc.DebugString().c_str());
 	fprintf(stderr, "Member notification received");
@@ -249,7 +256,7 @@ void PubSubMemberListener::onNotification(const std::string& payload)
 			fprintf(stderr, "MemberChange message has no old or new member config\n");
 			span->SetAttribute("error", "MemberChange message has no old or new member config");
 			span->SetStatus(opentelemetry::trace::StatusCode::kError, "No old or new config");
-			return;
+			return false;
 		}
 
 		if (oldConfig.is_object() && newConfig.is_object()) {
@@ -290,14 +297,15 @@ void PubSubMemberListener::onNotification(const std::string& payload)
 		span->SetAttribute("error", e.what());
 		span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
 		fprintf(stderr, "payload: %s\n", payload.c_str());
-		return;
+		return false;
 	}
 	catch (const std::exception& e) {
 		fprintf(stderr, "PubSubMemberListener Exception in PubSubMemberListener: %s\n", e.what());
 		span->SetAttribute("error", e.what());
 		span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
-		return;
+		return false;
 	}
+	return true;
 }
 
 nlohmann::json toJson(const pbmessages::NetworkChange_Network& nc, pbmessages::NetworkChange_ChangeSource source)

+ 3 - 3
nonfree/controller/PubSubListener.hpp

@@ -23,7 +23,7 @@ class PubSubListener : public NotificationListener {
 	PubSubListener(std::string controller_id, std::string project, std::string topic);
 	virtual ~PubSubListener();
 
-	virtual void onNotification(const std::string& payload) = 0;
+	virtual bool onNotification(const std::string& payload) = 0;
 
   protected:
 	std::string _controller_id;
@@ -48,7 +48,7 @@ class PubSubNetworkListener : public PubSubListener {
 	PubSubNetworkListener(std::string controller_id, std::string project, std::string topic, DB* db);
 	virtual ~PubSubNetworkListener();
 
-	virtual void onNotification(const std::string& payload) override;
+	virtual bool onNotification(const std::string& payload) override;
 
   private:
 	DB* _db;
@@ -62,7 +62,7 @@ class PubSubMemberListener : public PubSubListener {
 	PubSubMemberListener(std::string controller_id, std::string project, std::string topic, DB* db);
 	virtual ~PubSubMemberListener();
 
-	virtual void onNotification(const std::string& payload) override;
+	virtual bool onNotification(const std::string& payload) override;
 
   private:
 	DB* _db;

+ 4 - 2
nonfree/controller/RedisListener.cpp

@@ -131,9 +131,10 @@ void RedisNetworkListener::listen()
 	}
 }
 
-void RedisNetworkListener::onNotification(const std::string& payload)
+bool RedisNetworkListener::onNotification(const std::string& payload)
 {
 	// Handle notifications if needed
+	return true;
 }
 
 RedisMemberListener::RedisMemberListener(std::string controller_id, std::shared_ptr<sw::redis::Redis> redis, DB* db)
@@ -227,8 +228,9 @@ void RedisMemberListener::listen()
 	}
 }
 
-void RedisMemberListener::onNotification(const std::string& payload)
+bool RedisMemberListener::onNotification(const std::string& payload)
 {
+	return true;
 }
 
 }	// namespace ZeroTier

+ 4 - 3
nonfree/controller/RedisListener.hpp

@@ -22,8 +22,9 @@ class RedisListener : public NotificationListener {
 	virtual ~RedisListener();
 
 	virtual void listen() = 0;
-	virtual void onNotification(const std::string& payload) override
+	virtual bool onNotification(const std::string& payload) override
 	{
+		return true;
 	}
 
 	void start()
@@ -50,7 +51,7 @@ class RedisNetworkListener : public RedisListener {
 	virtual ~RedisNetworkListener();
 
 	virtual void listen() override;
-	virtual void onNotification(const std::string& payload) override;
+	virtual bool onNotification(const std::string& payload) override;
 
   private:
 	DB* _db;
@@ -63,7 +64,7 @@ class RedisMemberListener : public RedisListener {
 	virtual ~RedisMemberListener();
 
 	virtual void listen() override;
-	virtual void onNotification(const std::string& payload) override;
+	virtual bool onNotification(const std::string& payload) override;
 
   private:
 	DB* _db;