|
@@ -271,7 +271,7 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
|
|
|
exit(1);
|
|
|
}
|
|
|
|
|
|
- std::string setKey = "networks:{" + std::string(_myAddressStr.c_str()) + "}";
|
|
|
+ std::string setKey = "networks:{" + _myAddressStr + "}";
|
|
|
|
|
|
if (_rc != NULL) {
|
|
|
try {
|
|
@@ -286,6 +286,8 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ std::unordered_set<std::string> networkSet;
|
|
|
+
|
|
|
const char *params[1] = {
|
|
|
_myAddressStr.c_str()
|
|
|
};
|
|
@@ -319,13 +321,7 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
|
|
|
};
|
|
|
std::string nwid = PQgetvalue(res, i, 0);
|
|
|
|
|
|
- if (_rc != NULL) {
|
|
|
- if (_rc->clusterMode) {
|
|
|
- _cluster->sadd(setKey, nwid);
|
|
|
- } else {
|
|
|
- _redis->sadd(setKey, nwid);
|
|
|
- }
|
|
|
- }
|
|
|
+ networkSet.insert(nwid);
|
|
|
|
|
|
config["id"] = nwid;
|
|
|
config["nwid"] = nwid;
|
|
@@ -441,6 +437,16 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
|
|
|
|
|
|
PQclear(res);
|
|
|
|
|
|
+ if (_rc && _rc->clusterMode) {
|
|
|
+ auto tx = _cluster->transaction(_myAddressStr, true);
|
|
|
+ tx.sadd(setKey, networkSet.begin(), networkSet.end());
|
|
|
+ tx.exec();
|
|
|
+ } else if (_rc && !_rc->clusterMode) {
|
|
|
+ auto tx = _redis->transaction(true);
|
|
|
+ tx.sadd(setKey, networkSet.begin(), networkSet.end());
|
|
|
+ tx.exec();
|
|
|
+ }
|
|
|
+
|
|
|
if (++this->_ready == 2) {
|
|
|
if (_waitNoticePrinted) {
|
|
|
fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
|
|
@@ -463,30 +469,40 @@ void PostgreSQL::initializeMembers(PGconn *conn)
|
|
|
fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
|
|
|
exit(1);
|
|
|
}
|
|
|
- std::string setKeyBase = "network-nodes-all:{" + std::string(_myAddressStr.c_str()) + "}:";
|
|
|
+ std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:";
|
|
|
+
|
|
|
if (_rc != NULL) {
|
|
|
std::lock_guard<std::mutex> l(_networks_l);
|
|
|
+ std::unordered_set<std::string> deletes;
|
|
|
for ( auto it : _networks) {
|
|
|
uint64_t nwid_i = it.first;
|
|
|
char nwidTmp[64] = {0};
|
|
|
OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
|
|
|
std::string nwid(nwidTmp);
|
|
|
std::string key = setKeyBase + nwid;
|
|
|
- if (_rc->clusterMode) {
|
|
|
- try {
|
|
|
- _cluster->del(key);
|
|
|
- } catch (...) {}
|
|
|
- } else {
|
|
|
- try {
|
|
|
- _redis->del(key);
|
|
|
- } catch (...) {}
|
|
|
+ deletes.insert(key);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (_rc->clusterMode) {
|
|
|
+ auto tx = _cluster->transaction(_myAddressStr, true);
|
|
|
+ for (std::string k : deletes) {
|
|
|
+ tx.del(k);
|
|
|
+ }
|
|
|
+ tx.exec();
|
|
|
+ } else {
|
|
|
+ auto tx = _redis->transaction(true);
|
|
|
+ for (std::string k : deletes) {
|
|
|
+ tx.del(k);
|
|
|
}
|
|
|
+ tx.exec();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
const char *params[1] = {
|
|
|
_myAddressStr.c_str()
|
|
|
};
|
|
|
+
|
|
|
+ std::unordered_map<std::string, std::string> networkMembers;
|
|
|
|
|
|
fprintf(stderr, "Initializing Members...\n");
|
|
|
PGresult *res = PQexecParams(conn,
|
|
@@ -520,13 +536,7 @@ void PostgreSQL::initializeMembers(PGconn *conn)
|
|
|
std::string memberId(PQgetvalue(res, i, 0));
|
|
|
std::string networkId(PQgetvalue(res, i, 1));
|
|
|
|
|
|
- if (_rc != NULL) {
|
|
|
- if (_rc->clusterMode) {
|
|
|
- _cluster->sadd(setKeyBase + networkId, memberId);
|
|
|
- } else {
|
|
|
- _redis->sadd(setKeyBase + networkId, memberId);
|
|
|
- }
|
|
|
- }
|
|
|
+ networkMembers.insert(std::pair<std::string, std::string>(setKeyBase+networkId, memberId));
|
|
|
|
|
|
std::string ctime = PQgetvalue(res, i, 5);
|
|
|
config["id"] = memberId;
|
|
@@ -628,19 +638,26 @@ void PostgreSQL::initializeMembers(PGconn *conn)
|
|
|
config["ipAssignments"].push_back(ipaddr);
|
|
|
}
|
|
|
|
|
|
- if (_rc != NULL) {
|
|
|
- if (_rc->clusterMode) {
|
|
|
- _cluster->sadd(setKeyBase + networkId, memberId);
|
|
|
- } else {
|
|
|
- _redis->sadd(setKeyBase + networkId, memberId);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
_memberChanged(empty, config, false);
|
|
|
}
|
|
|
|
|
|
PQclear(res);
|
|
|
|
|
|
+ if (_rc != NULL) {
|
|
|
+ if (_rc->clusterMode) {
|
|
|
+ auto tx = _cluster->transaction(_myAddressStr, true);
|
|
|
+ for (auto it : networkMembers) {
|
|
|
+ tx.sadd(it.first, it.second);
|
|
|
+ }
|
|
|
+ tx.exec();
|
|
|
+ } else {
|
|
|
+ auto tx = _redis->transaction(true);
|
|
|
+ for (auto it : networkMembers) {
|
|
|
+ tx.sadd(it.first, it.second);
|
|
|
+ }
|
|
|
+ tx.exec();
|
|
|
+ }
|
|
|
+ }
|
|
|
if (++this->_ready == 2) {
|
|
|
if (_waitNoticePrinted) {
|
|
|
fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
|