Browse Source

Temporary online notification thread hack

Updates both Redis and Postgres
Grant Limberg 5 years ago
parent
commit
9794e31a64
2 changed files with 60 additions and 24 deletions
  1. 59 23
      controller/PostgreSQL.cpp
  2. 1 1
      controller/PostgreSQL.hpp

+ 59 - 23
controller/PostgreSQL.cpp

@@ -1554,15 +1554,6 @@ void PostgreSQL::onlineNotificationThread()
 {
 {
 	waitForReady();
 	waitForReady();
 
 
-	if (_rc != NULL) {
-		onlineNotification_Redis();
-	} else {
-		onlineNotification_Postgres();
-	}
-}
-
-void PostgreSQL::onlineNotification_Postgres()
-{
 	PGconn *conn = getPgConn();
 	PGconn *conn = getPgConn();
 	if (PQstatus(conn) == CONNECTION_BAD) {
 	if (PQstatus(conn) == CONNECTION_BAD) {
 		fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
 		fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
@@ -1571,19 +1562,64 @@ void PostgreSQL::onlineNotification_Postgres()
 	}
 	}
 	_connected = 1;
 	_connected = 1;
 
 
-	nlohmann::json jtmp1, jtmp2;
 	while (_run == 1) {
 	while (_run == 1) {
+		std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
+		{
+			std::lock_guard<std::mutex> l(_lastOnline_l);
+			lastOnline.swap(_lastOnline);
+		}
+
+		onlineNotification_Postgres(conn, lastOnline);
+
+		try {
+			if (!lastOnline.empty()) {
+				if (_rc->clusterMode) {
+					auto tx = _cluster->transaction(_myAddressStr, true);
+					_doRedisUpdate(tx, _myAddressStr, lastOnline);
+				} else {
+					auto tx = _redis->transaction(true);
+					_doRedisUpdate(tx, _myAddressStr, lastOnline);
+				}
+			}
+		} catch (sw::redis::Error &e) {
+#ifdef ZT_TRACE
+			fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what());
+#endif
+		}
+		std::this_thread::sleep_for(std::chrono::milliseconds(10));
+	}
+
+	fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
+	PQfinish(conn);
+	if (_run == 1) {
+		fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str());
+		exit(6);
+	}
+
+	// if (_rc != NULL) {
+	// 	onlineNotification_Redis();
+	// } else {
+	// 	onlineNotification_Postgres();
+	// }
+}
+
+void PostgreSQL::onlineNotification_Postgres(PGconn *conn, std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > &lastOnline)
+{
+	
+
+	nlohmann::json jtmp1, jtmp2;
+	// while (_run == 1) {
 		if (PQstatus(conn) != CONNECTION_OK) {
 		if (PQstatus(conn) != CONNECTION_OK) {
 			fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
 			fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
 			PQfinish(conn);
 			PQfinish(conn);
 			exit(5);
 			exit(5);
 		}
 		}
 
 
-		std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
-		{
-			std::lock_guard<std::mutex> l(_lastOnline_l);
-			lastOnline.swap(_lastOnline);
-		}
+		// std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
+		// {
+		// 	std::lock_guard<std::mutex> l(_lastOnline_l);
+		// 	lastOnline.swap(_lastOnline);
+		// }
 
 
 		PGresult *res = NULL;
 		PGresult *res = NULL;
 
 
@@ -1665,14 +1701,14 @@ void PostgreSQL::onlineNotification_Postgres()
 			PQclear(res);
 			PQclear(res);
 		}
 		}
 
 
-		std::this_thread::sleep_for(std::chrono::milliseconds(10));
-	}
-	fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
-	PQfinish(conn);
-	if (_run == 1) {
-		fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str());
-		exit(6);
-	}
+	// 	std::this_thread::sleep_for(std::chrono::milliseconds(10));
+	// }
+	// fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
+	// PQfinish(conn);
+	// if (_run == 1) {
+	// 	fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str());
+	// 	exit(6);
+	// }
 }
 }
 
 
 void PostgreSQL::onlineNotification_Redis()
 void PostgreSQL::onlineNotification_Redis()

+ 1 - 1
controller/PostgreSQL.hpp

@@ -70,7 +70,7 @@ private:
 
 
 	void commitThread();
 	void commitThread();
 	void onlineNotificationThread();
 	void onlineNotificationThread();
-	void onlineNotification_Postgres();
+	void onlineNotification_Postgres(PGconn *conn, std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > &lastOnline);
 	void onlineNotification_Redis();
 	void onlineNotification_Redis();
 	void _doRedisUpdate(sw::redis::Transaction &tx, std::string &controllerId, 
 	void _doRedisUpdate(sw::redis::Transaction &tx, std::string &controllerId, 
 		std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > &lastOnline);
 		std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > &lastOnline);