Pārlūkot izejas kodu

finish up rabbitmq integration

Grant Limberg 6 gadi atpakaļ
vecāks
revīzija
8141043560
2 mainītis faili ar 41 papildinājumiem un 32 dzēšanām
  1. 39 30
      controller/PostgreSQL.cpp
  2. 2 2
      controller/RabbitMQ.cpp

+ 39 - 30
controller/PostgreSQL.cpp

@@ -542,7 +542,8 @@ void PostgreSQL::heartbeat()
 			std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD);
 			std::string now = std::to_string(OSUtils::now());
 			std::string host_port = std::to_string(_listenPort);
-			const char *values[9] = {
+			std::string use_rabbitmq = (_mqc != NULL) ? "true" : "false";
+			const char *values[10] = {
 				controllerId,
 				hostname,
 				now.c_str(),
@@ -551,16 +552,18 @@ void PostgreSQL::heartbeat()
 				minor.c_str(),
 				rev.c_str(),
 				build.c_str(),
-				host_port.c_str()
+				host_port.c_str(),
+				use_rabbitmq.c_str()
 			};
 
 			PGresult *res = PQexecParams(conn,
-				"INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port) " 
-				"VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9) "
+				"INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_rabbitmq) " 
+				"VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) "
 				"ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
 				"public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
-				"v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port",
-				9,       // number of parameters
+				"v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, "
+				"use_rabbitmq = EXCLUDED.use_rabbitmq",
+				10,       // number of parameters
 				NULL,    // oid field.   ignore
 				values,  // values for substitution
 				NULL, // lengths in bytes of each value
@@ -591,19 +594,9 @@ void PostgreSQL::membersDbWatcher()
 
 	initializeMembers(conn);
 
-	char buf[11] = {0};
-	std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf));
-	PGresult *res = PQexec(conn, cmd.c_str());
-	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
-		fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res));
-		PQclear(res);
-		PQfinish(conn);
-		exit(1);
-	}
-
-	PQclear(res); res = NULL;
-
 	if (this->_mqc != NULL) {
+		PQfinish(conn);
+		conn = NULL;
 		_membersWatcher_RabbitMQ();
 	} else {
 		_membersWatcher_Postgres(conn);
@@ -618,6 +611,18 @@ void PostgreSQL::membersDbWatcher()
 }
 
 void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
+	char buf[11] = {0};
+	std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf));
+	PGresult *res = PQexec(conn, cmd.c_str());
+	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
+		fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res));
+		PQclear(res);
+		PQfinish(conn);
+		exit(1);
+	}
+
+	PQclear(res); res = NULL;
+
 	while(_run == 1) {
 		if (PQstatus(conn) != CONNECTION_OK) {
 			fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres.");
@@ -659,6 +664,7 @@ void PostgreSQL::_membersWatcher_RabbitMQ() {
 	while (_run == 1) {
 		try {
 			std::string msg = rmq.consume();
+			// fprintf(stderr, "Got Member Update: %s\n", msg.c_str());
 			json tmp(json::parse(msg));
 			json &ov = tmp["old_val"];
 			json &nv = tmp["new_val"];
@@ -686,19 +692,9 @@ void PostgreSQL::networksDbWatcher()
 
 	initializeNetworks(conn);
 
-	char buf[11] = {0};
-	std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf));
-	PGresult *res = PQexec(conn, cmd.c_str());
-	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
-		fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res));
-		PQclear(res);
-		PQfinish(conn);
-		exit(1);
-	}
-
-	PQclear(res); res = NULL;
-
 	if (this->_mqc != NULL) {
+		PQfinish(conn);
+		conn = NULL;
 		_networksWatcher_RabbitMQ();
 	} else {
 		_networksWatcher_Postgres(conn);
@@ -713,6 +709,18 @@ void PostgreSQL::networksDbWatcher()
 }
 
 void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) {
+	char buf[11] = {0};
+	std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf));
+	PGresult *res = PQexec(conn, cmd.c_str());
+	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
+		fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res));
+		PQclear(res);
+		PQfinish(conn);
+		exit(1);
+	}
+
+	PQclear(res); res = NULL;
+
 	while(_run == 1) {
 		if (PQstatus(conn) != CONNECTION_OK) {
 			fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres.");
@@ -752,6 +760,7 @@ void PostgreSQL::_networksWatcher_RabbitMQ() {
 	while (_run == 1) {
 		try {
 			std::string msg = rmq.consume();
+			// fprintf(stderr, "Got network update: %s\n", msg.c_str());
 			json tmp(json::parse(msg));
 			json &ov = tmp["old_val"];
 			json &nv = tmp["new_val"];

+ 2 - 2
controller/RabbitMQ.cpp

@@ -58,13 +58,13 @@ void RabbitMQ::init()
     _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
     r = amqp_get_rpc_reply(_conn);
     if (r.reply_type != AMQP_RESPONSE_NORMAL) {
-        throw std::runtime_error("Error declaring queue");
+        throw std::runtime_error("Error declaring queue " + std::string(_qName));
     }
 
     amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
     r = amqp_get_rpc_reply(_conn);
     if (r.reply_type != AMQP_RESPONSE_NORMAL) {
-        throw std::runtime_error("Error conuming");
+        throw std::runtime_error("Error consuming queue " + std::string(_qName));
     }
     fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
 }