Browse Source

Set ordering key on pubsub message publishing.

Grant Limberg 1 month ago
parent
commit
30c4484731

+ 1 - 0
nonfree/controller/CtlUtil.cpp

@@ -139,6 +139,7 @@ void create_gcp_pubsub_subscription_if_needed(
 			request.set_name(subscriptionName);
 			request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
 			request.set_filter("(attributes.controller_id=\"" + controller_id + "\")");
+			request.set_enable_message_ordering(true);
 			auto createResult = subscriptionAdminClient.CreateSubscription(request);
 			if (! createResult.ok()) {
 				fprintf(stderr, "Failed to create subscription: %s\n", createResult.status().message().c_str());

+ 29 - 5
nonfree/controller/PubSubWriter.cpp

@@ -51,7 +51,10 @@ PubSubWriter::~PubSubWriter()
 {
 }
 
-bool PubSubWriter::publishMessage(const std::string& payload, const std::string& frontend)
+bool PubSubWriter::publishMessage(
+	const std::string& payload,
+	const std::string& frontend,
+	const std::string& orderingKey)
 {
 	fprintf(stderr, "Publishing message to %s\n", _topic.c_str());
 	std::vector<std::pair<std::string, std::string> > attributes;
@@ -61,7 +64,11 @@ bool PubSubWriter::publishMessage(const std::string& payload, const std::string&
 		attributes.emplace_back("frontend", frontend);
 	}
 
-	auto msg = pubsub::MessageBuilder {}.SetData(payload).SetAttributes(attributes).Build();
+	auto msg_tmp = pubsub::MessageBuilder {}.SetData(payload).SetAttributes(attributes);
+	if (! orderingKey.empty()) {
+		msg_tmp.SetOrderingKey(orderingKey);
+	}
+	auto msg = std::move(msg_tmp).Build();
 	auto message_id = _publisher->Publish(std::move(msg)).get();
 	if (! message_id) {
 		fprintf(stderr, "Failed to publish message: %s\n", std::move(message_id).status().message().c_str());
@@ -79,6 +86,15 @@ bool PubSubWriter::publishNetworkChange(
 {
 	fprintf(stderr, "Publishing network change\n");
 	pbmessages::NetworkChange* nc = networkChangeFromJson(_controller_id, oldNetwork, newNetwork);
+
+	std::string networkID;
+	if (nc->has_new_()) {
+		networkID = nc->new_().network_id();
+	}
+	else if (nc->has_old()) {
+		networkID = nc->old().network_id();
+	}
+
 	std::string payload;
 	if (! nc->SerializeToString(&payload)) {
 		fprintf(stderr, "Failed to serialize NetworkChange protobuf message\n");
@@ -86,7 +102,7 @@ bool PubSubWriter::publishNetworkChange(
 		return false;
 	}
 	delete nc;
-	return publishMessage(payload, frontend);
+	return publishMessage(payload, frontend, networkID);
 }
 
 bool PubSubWriter::publishMemberChange(
@@ -96,6 +112,14 @@ bool PubSubWriter::publishMemberChange(
 {
 	fprintf(stderr, "Publishing member change\n");
 	pbmessages::MemberChange* mc = memberChangeFromJson(_controller_id, oldMember, newMember);
+	std::string memberID;
+	if (mc->has_new_()) {
+		memberID = mc->new_().network_id() + "-" + mc->new_().device_id();
+	}
+	else if (mc->has_old()) {
+		memberID = mc->old().network_id() + "-" + mc->old().device_id();
+	}
+
 	std::string payload;
 	if (! mc->SerializeToString(&payload)) {
 		fprintf(stderr, "Failed to serialize MemberChange protobuf message\n");
@@ -104,7 +128,7 @@ bool PubSubWriter::publishMemberChange(
 	}
 
 	delete mc;
-	return publishMessage(payload, frontend);
+	return publishMessage(payload, frontend, memberID);
 }
 
 bool PubSubWriter::publishStatusChange(
@@ -140,7 +164,7 @@ bool PubSubWriter::publishStatusChange(
 		return false;
 	}
 
-	return publishMessage(payload, "");
+	return publishMessage(payload, "", "");
 }
 
 pbmessages::NetworkChange_Network* networkFromJson(const nlohmann::json& j)

+ 1 - 1
nonfree/controller/PubSubWriter.hpp

@@ -31,7 +31,7 @@ class PubSubWriter {
 		int64_t last_seen);
 
   protected:
-	bool publishMessage(const std::string& payload, const std::string& frontend);
+	bool publishMessage(const std::string& payload, const std::string& frontend, const std::string& orderingKey);
 
   private:
 	std::string _controller_id;