|
@@ -229,12 +229,14 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId)
|
|
tmp.first["objtype"] = "_delete_network";
|
|
tmp.first["objtype"] = "_delete_network";
|
|
tmp.second = true;
|
|
tmp.second = true;
|
|
_commitQueue.post(tmp);
|
|
_commitQueue.post(tmp);
|
|
|
|
+ nlohmann::json nullJson;
|
|
|
|
+ _networkChanged(tmp.first, nullJson, true);
|
|
}
|
|
}
|
|
|
|
|
|
void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
|
|
void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
|
|
{
|
|
{
|
|
char tmp2[24];
|
|
char tmp2[24];
|
|
- std::pair<nlohmann::json,bool> tmp;
|
|
|
|
|
|
+ std::pair<nlohmann::json,bool> tmp, nw;
|
|
Utils::hex(networkId, tmp2);
|
|
Utils::hex(networkId, tmp2);
|
|
tmp.first["nwid"] = tmp2;
|
|
tmp.first["nwid"] = tmp2;
|
|
Utils::hex(memberId, tmp2);
|
|
Utils::hex(memberId, tmp2);
|
|
@@ -242,6 +244,8 @@ void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
|
|
tmp.first["objtype"] = "_delete_member";
|
|
tmp.first["objtype"] = "_delete_member";
|
|
tmp.second = true;
|
|
tmp.second = true;
|
|
_commitQueue.post(tmp);
|
|
_commitQueue.post(tmp);
|
|
|
|
+ nlohmann::json nullJson;
|
|
|
|
+ _memberChanged(tmp.first, nullJson, true);
|
|
}
|
|
}
|
|
|
|
|
|
void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress)
|
|
void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress)
|
|
@@ -630,7 +634,7 @@ void PostgreSQL::heartbeat()
|
|
};
|
|
};
|
|
|
|
|
|
PGresult *res = PQexecParams(conn,
|
|
PGresult *res = PQexecParams(conn,
|
|
- "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port,use_redis) "
|
|
|
|
|
|
+ "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis) "
|
|
"VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) "
|
|
"VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) "
|
|
"ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
|
|
"ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
|
|
"public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
|
|
"public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
|
|
@@ -1401,6 +1405,15 @@ void PostgreSQL::commitThread()
|
|
}
|
|
}
|
|
|
|
|
|
void PostgreSQL::onlineNotificationThread()
|
|
void PostgreSQL::onlineNotificationThread()
|
|
|
|
+{
|
|
|
|
+ if (_rc != NULL) {
|
|
|
|
+ onlineNotification_Redis();
|
|
|
|
+ } else {
|
|
|
|
+ onlineNotification_Postgres();
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+void PostgreSQL::onlineNotification_Postgres()
|
|
{
|
|
{
|
|
PGconn *conn = getPgConn();
|
|
PGconn *conn = getPgConn();
|
|
if (PQstatus(conn) == CONNECTION_BAD) {
|
|
if (PQstatus(conn) == CONNECTION_BAD) {
|
|
@@ -1410,9 +1423,7 @@ void PostgreSQL::onlineNotificationThread()
|
|
}
|
|
}
|
|
_connected = 1;
|
|
_connected = 1;
|
|
|
|
|
|
- //int64_t lastUpdatedNetworkStatus = 0;
|
|
|
|
- std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;
|
|
|
|
-
|
|
|
|
|
|
+ nlohmann::json jtmp1, jtmp2;
|
|
while (_run == 1) {
|
|
while (_run == 1) {
|
|
if (PQstatus(conn) != CONNECTION_OK) {
|
|
if (PQstatus(conn) != CONNECTION_OK) {
|
|
fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
|
|
fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
|
|
@@ -1420,9 +1431,6 @@ void PostgreSQL::onlineNotificationThread()
|
|
exit(5);
|
|
exit(5);
|
|
}
|
|
}
|
|
|
|
|
|
- // map used to send notifications to front end
|
|
|
|
- std::unordered_map<std::string, std::vector<std::string>> updateMap;
|
|
|
|
-
|
|
|
|
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
|
|
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
|
|
{
|
|
{
|
|
std::lock_guard<std::mutex> l(_lastOnline_l);
|
|
std::lock_guard<std::mutex> l(_lastOnline_l);
|
|
@@ -1443,20 +1451,13 @@ void PostgreSQL::onlineNotificationThread()
|
|
OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i);
|
|
OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i);
|
|
OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second);
|
|
OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second);
|
|
|
|
|
|
- auto found = _networks.find(nwid_i);
|
|
|
|
- if (found == _networks.end()) {
|
|
|
|
- continue; // skip members trying to join non-existant networks
|
|
|
|
|
|
+ if(!get(nwid_i, jtmp1, i->first.second, jtmp2)) {
|
|
|
|
+ continue; // skip non existent networks/members
|
|
}
|
|
}
|
|
|
|
|
|
std::string networkId(nwidTmp);
|
|
std::string networkId(nwidTmp);
|
|
std::string memberId(memTmp);
|
|
std::string memberId(memTmp);
|
|
|
|
|
|
- std::vector<std::string> &members = updateMap[networkId];
|
|
|
|
- members.push_back(memberId);
|
|
|
|
-
|
|
|
|
- lastOnlineCumulative[i->first] = i->second.first;
|
|
|
|
-
|
|
|
|
-
|
|
|
|
const char *qvals[2] = {
|
|
const char *qvals[2] = {
|
|
networkId.c_str(),
|
|
networkId.c_str(),
|
|
memberId.c_str()
|
|
memberId.c_str()
|
|
@@ -1526,6 +1527,107 @@ void PostgreSQL::onlineNotificationThread()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+void PostgreSQL::onlineNotification_Redis()
|
|
|
|
+{
|
|
|
|
+ _connected = 1;
|
|
|
|
+
|
|
|
|
+ char buf[11] = {0};
|
|
|
|
+ std::string controllerId = std::string(_myAddress.toString(buf));
|
|
|
|
+
|
|
|
|
+ while (_run == 1) {
|
|
|
|
+ std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
|
|
|
|
+ {
|
|
|
|
+ std::lock_guard<std::mutex> l(_lastOnline_l);
|
|
|
|
+ lastOnline.swap(_lastOnline);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (_rc->clusterMode) {
|
|
|
|
+ auto tx = _cluster->redis(controllerId).transaction(true);
|
|
|
|
+ _doRedisUpdate(tx, controllerId, lastOnline);
|
|
|
|
+ } else {
|
|
|
|
+ auto tx = _redis->transaction(true);
|
|
|
|
+ _doRedisUpdate(tx, controllerId, lastOnline);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &controllerId,
|
|
|
|
+ std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > &lastOnline)
|
|
|
|
+
|
|
|
|
+{
|
|
|
|
+ nlohmann::json jtmp1, jtmp2;
|
|
|
|
+ for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
|
|
|
|
+ uint64_t nwid_i = i->first.first;
|
|
|
|
+ uint64_t memberid_i = i->first.second;
|
|
|
|
+ char nwidTmp[64];
|
|
|
|
+ char memTmp[64];
|
|
|
|
+ char ipTmp[64];
|
|
|
|
+ OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i);
|
|
|
|
+ OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", memberid_i);
|
|
|
|
+
|
|
|
|
+ if (!get(nwid_i, jtmp1, memberid_i, jtmp2)){
|
|
|
|
+ continue; // skip non existent members/networks
|
|
|
|
+ }
|
|
|
|
+ auto found = _networks.find(nwid_i);
|
|
|
|
+ if (found == _networks.end()) {
|
|
|
|
+ continue; // skip members trying to join non-existant networks
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ std::string networkId(nwidTmp);
|
|
|
|
+ std::string memberId(memTmp);
|
|
|
|
+
|
|
|
|
+ int64_t ts = i->second.first;
|
|
|
|
+ std::string ipAddr = i->second.second.toIpString(ipTmp);
|
|
|
|
+ std::string timestamp = std::to_string(ts);
|
|
|
|
+
|
|
|
|
+ std::unordered_map<std::string, std::string> record = {
|
|
|
|
+ {"id", memberId},
|
|
|
|
+ {"address", ipAddr},
|
|
|
|
+ {"last_updated", std::to_string(ts)}
|
|
|
|
+ };
|
|
|
|
+ tx.zadd("nodes-online:{"+controllerId+"}", memberId, ts)
|
|
|
|
+ .zadd("network-nodes-online:{"+controllerId+"}:"+networkId, memberId, ts)
|
|
|
|
+ .sadd("network-nodes-all:{"+controllerId+"}:"+networkId, memberId)
|
|
|
|
+ .hmset("network:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ tx.exec();
|
|
|
|
+
|
|
|
|
+ // expire records from all-nodes and network-nodes member list
|
|
|
|
+ uint64_t expireOld = OSUtils::now() - 300000;
|
|
|
|
+
|
|
|
|
+ auto cursor = 0LL;
|
|
|
|
+ std::unordered_set<std::string> keys;
|
|
|
|
+ // can't scan for keys in a transaction, so we need to fall back to _cluster or _redis
|
|
|
|
+ // to get all network-members keys
|
|
|
|
+ if(_rc->clusterMode) {
|
|
|
|
+ auto r = _cluster->redis(controllerId);
|
|
|
|
+ while(true) {
|
|
|
|
+ cursor = r.scan(cursor, "network-nodes-online:{"+controllerId+"}:*", INT_MAX, std::inserter(keys, keys.begin()));
|
|
|
|
+ if (cursor == 0) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ while(true) {
|
|
|
|
+ cursor = _redis->scan(cursor, "network-nodes-online:"+controllerId+":*", INT_MAX, std::inserter(keys, keys.begin()));
|
|
|
|
+ if (cursor == 0) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ tx.zremrangebyscore("nodes-online:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
|
|
|
|
+
|
|
|
|
+ for(const auto &k : keys) {
|
|
|
|
+ tx.zremrangebyscore(k, sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ tx.exec();
|
|
|
|
+}
|
|
|
|
+
|
|
PGconn *PostgreSQL::getPgConn(OverrideMode m)
|
|
PGconn *PostgreSQL::getPgConn(OverrideMode m)
|
|
{
|
|
{
|
|
if (m == ALLOW_PGBOUNCER_OVERRIDE) {
|
|
if (m == ALLOW_PGBOUNCER_OVERRIDE) {
|