|
@@ -1558,6 +1558,15 @@ void PostgreSQL::onlineNotificationThread()
|
|
|
{
|
|
{
|
|
|
waitForReady();
|
|
waitForReady();
|
|
|
|
|
|
|
|
|
|
+ if (_rc != NULL) {
|
|
|
|
|
+ onlineNotification_Redis();
|
|
|
|
|
+ } else {
|
|
|
|
|
+ onlineNotification_Postgres();
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+void PostgreSQL::onlineNotification_Postgres()
|
|
|
|
|
+{
|
|
|
PGconn *conn = getPgConn();
|
|
PGconn *conn = getPgConn();
|
|
|
if (PQstatus(conn) == CONNECTION_BAD) {
|
|
if (PQstatus(conn) == CONNECTION_BAD) {
|
|
|
fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
|
|
fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
|
|
@@ -1566,64 +1575,19 @@ void PostgreSQL::onlineNotificationThread()
|
|
|
}
|
|
}
|
|
|
_connected = 1;
|
|
_connected = 1;
|
|
|
|
|
|
|
|
- while (_run == 1) {
|
|
|
|
|
- std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
|
|
|
|
|
- {
|
|
|
|
|
- std::lock_guard<std::mutex> l(_lastOnline_l);
|
|
|
|
|
- lastOnline.swap(_lastOnline);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- onlineNotification_Postgres(conn, lastOnline);
|
|
|
|
|
-
|
|
|
|
|
- try {
|
|
|
|
|
- if (!lastOnline.empty()) {
|
|
|
|
|
- if (_rc->clusterMode) {
|
|
|
|
|
- auto tx = _cluster->transaction(_myAddressStr, true);
|
|
|
|
|
- _doRedisUpdate(tx, _myAddressStr, lastOnline);
|
|
|
|
|
- } else {
|
|
|
|
|
- auto tx = _redis->transaction(true);
|
|
|
|
|
- _doRedisUpdate(tx, _myAddressStr, lastOnline);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- } catch (sw::redis::Error &e) {
|
|
|
|
|
-#ifdef ZT_TRACE
|
|
|
|
|
- fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what());
|
|
|
|
|
-#endif
|
|
|
|
|
- }
|
|
|
|
|
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
|
|
|
|
|
- PQfinish(conn);
|
|
|
|
|
- if (_run == 1) {
|
|
|
|
|
- fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str());
|
|
|
|
|
- exit(6);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // if (_rc != NULL) {
|
|
|
|
|
- // onlineNotification_Redis();
|
|
|
|
|
- // } else {
|
|
|
|
|
- // onlineNotification_Postgres();
|
|
|
|
|
- // }
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-void PostgreSQL::onlineNotification_Postgres(PGconn *conn, std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > &lastOnline)
|
|
|
|
|
-{
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
nlohmann::json jtmp1, jtmp2;
|
|
nlohmann::json jtmp1, jtmp2;
|
|
|
- // while (_run == 1) {
|
|
|
|
|
|
|
+ while (_run == 1) {
|
|
|
if (PQstatus(conn) != CONNECTION_OK) {
|
|
if (PQstatus(conn) != CONNECTION_OK) {
|
|
|
fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
|
|
fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
|
|
|
PQfinish(conn);
|
|
PQfinish(conn);
|
|
|
exit(5);
|
|
exit(5);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
|
|
|
|
|
- // {
|
|
|
|
|
- // std::lock_guard<std::mutex> l(_lastOnline_l);
|
|
|
|
|
- // lastOnline.swap(_lastOnline);
|
|
|
|
|
- // }
|
|
|
|
|
|
|
+ std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
|
|
|
|
|
+ {
|
|
|
|
|
+ std::lock_guard<std::mutex> l(_lastOnline_l);
|
|
|
|
|
+ lastOnline.swap(_lastOnline);
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
PGresult *res = NULL;
|
|
PGresult *res = NULL;
|
|
|
|
|
|
|
@@ -1705,14 +1669,14 @@ void PostgreSQL::onlineNotification_Postgres(PGconn *conn, std::unordered_map< s
|
|
|
PQclear(res);
|
|
PQclear(res);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
|
|
|
- // }
|
|
|
|
|
- // fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
|
|
|
|
|
- // PQfinish(conn);
|
|
|
|
|
- // if (_run == 1) {
|
|
|
|
|
- // fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str());
|
|
|
|
|
- // exit(6);
|
|
|
|
|
- // }
|
|
|
|
|
|
|
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
|
|
|
+ }
|
|
|
|
|
+ fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
|
|
|
|
|
+ PQfinish(conn);
|
|
|
|
|
+ if (_run == 1) {
|
|
|
|
|
+ fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str());
|
|
|
|
|
+ exit(6);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void PostgreSQL::onlineNotification_Redis()
|
|
void PostgreSQL::onlineNotification_Redis()
|