|
@@ -1,10 +1,10 @@
|
|
|
/*
|
|
|
- * Copyright (c)2013-2020 ZeroTier, Inc.
|
|
|
+ * Copyright (c)2019 ZeroTier, Inc.
|
|
|
*
|
|
|
* Use of this software is governed by the Business Source License included
|
|
|
* in the LICENSE.TXT file in the project's root directory.
|
|
|
*
|
|
|
- * Change Date: 2024-01-01
|
|
|
+ * Change Date: 2023-01-01
|
|
|
*
|
|
|
* On the date above, in accordance with the Business Source License, use
|
|
|
* of this software will be governed by version 2.0 of the Apache License.
|
|
@@ -17,13 +17,13 @@
|
|
|
|
|
|
#include "../core/Constants.hpp"
|
|
|
#include "EmbeddedNetworkController.hpp"
|
|
|
-#include "RabbitMQ.hpp"
|
|
|
-#include "../version.h"
|
|
|
+#include "version.h"
|
|
|
+#include "Redis.hpp"
|
|
|
|
|
|
#include <libpq-fe.h>
|
|
|
#include <sstream>
|
|
|
-#include <amqp.h>
|
|
|
-#include <amqp_tcp_socket.h>
|
|
|
+#include <climits>
|
|
|
+
|
|
|
|
|
|
using json = nlohmann::json;
|
|
|
|
|
@@ -69,7 +69,11 @@ std::string join(const std::vector<std::string> &elements, const char * const se
|
|
|
|
|
|
using namespace ZeroTier;
|
|
|
|
|
|
-PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
|
|
|
+using Attrs = std::vector<std::pair<std::string, std::string>>;
|
|
|
+using Item = std::pair<std::string, Attrs>;
|
|
|
+using ItemStream = std::vector<Item>;
|
|
|
+
|
|
|
+PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, RedisConfig *rc)
|
|
|
: DB()
|
|
|
, _myId(myId)
|
|
|
, _myAddress(myId.address())
|
|
@@ -78,7 +82,9 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, M
|
|
|
, _run(1)
|
|
|
, _waitNoticePrinted(false)
|
|
|
, _listenPort(listenPort)
|
|
|
- , _mqc(mqc)
|
|
|
+ , _rc(rc)
|
|
|
+ , _redis(NULL)
|
|
|
+ , _cluster(NULL)
|
|
|
{
|
|
|
char myAddress[64];
|
|
|
_myAddressStr = myId.address().toString(myAddress);
|
|
@@ -108,13 +114,37 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, M
|
|
|
fprintf(stderr, "Central database schema version too low. This controller version requires a minimum schema version of %d. Please upgrade your Central instance", DB_MINIMUM_VERSION);
|
|
|
exit(1);
|
|
|
}
|
|
|
-
|
|
|
PQclear(res);
|
|
|
res = NULL;
|
|
|
+
|
|
|
+ if (_rc != NULL) {
|
|
|
+ sw::redis::ConnectionOptions opts;
|
|
|
+ sw::redis::ConnectionPoolOptions poolOpts;
|
|
|
+ opts.host = _rc->hostname;
|
|
|
+ opts.port = _rc->port;
|
|
|
+ opts.password = _rc->password;
|
|
|
+ opts.db = 0;
|
|
|
+ poolOpts.size = 10;
|
|
|
+ if (_rc->clusterMode) {
|
|
|
+ fprintf(stderr, "Using Redis in Cluster Mode\n");
|
|
|
+ _cluster = std::make_shared<sw::redis::RedisCluster>(opts, poolOpts);
|
|
|
+ } else {
|
|
|
+ fprintf(stderr, "Using Redis in Standalone Mode\n");
|
|
|
+ _redis = std::make_shared<sw::redis::Redis>(opts, poolOpts);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ _readyLock.lock();
|
|
|
+
|
|
|
+ fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt());
|
|
|
+ _waitNoticePrinted = true;
|
|
|
+
|
|
|
+ initializeNetworks(conn);
|
|
|
+ initializeMembers(conn);
|
|
|
+
|
|
|
PQfinish(conn);
|
|
|
conn = NULL;
|
|
|
|
|
|
- _readyLock.lock();
|
|
|
_heartbeatThread = std::thread(&PostgreSQL::heartbeat, this);
|
|
|
_membersDbWatcher = std::thread(&PostgreSQL::membersDbWatcher, this);
|
|
|
_networksDbWatcher = std::thread(&PostgreSQL::networksDbWatcher, this);
|
|
@@ -132,21 +162,17 @@ PostgreSQL::~PostgreSQL()
|
|
|
_heartbeatThread.join();
|
|
|
_membersDbWatcher.join();
|
|
|
_networksDbWatcher.join();
|
|
|
+ _commitQueue.stop();
|
|
|
for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) {
|
|
|
_commitThread[i].join();
|
|
|
}
|
|
|
_onlineNotificationThread.join();
|
|
|
-
|
|
|
}
|
|
|
|
|
|
|
|
|
bool PostgreSQL::waitForReady()
|
|
|
{
|
|
|
while (_ready < 2) {
|
|
|
- if (!_waitNoticePrinted) {
|
|
|
- _waitNoticePrinted = true;
|
|
|
- fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt());
|
|
|
- }
|
|
|
_readyLock.lock();
|
|
|
_readyLock.unlock();
|
|
|
}
|
|
@@ -166,24 +192,24 @@ bool PostgreSQL::save(nlohmann::json &record,bool notifyListeners)
|
|
|
return false;
|
|
|
const std::string objtype = record["objtype"];
|
|
|
if (objtype == "network") {
|
|
|
- const uint64_t nwid = DB::jsonIntHex(record["id"],0ULL);
|
|
|
+ const uint64_t nwid = jsonIntHex(record["id"],0ULL);
|
|
|
if (nwid) {
|
|
|
nlohmann::json old;
|
|
|
get(nwid,old);
|
|
|
if ((!old.is_object())||(!_compareRecords(old,record))) {
|
|
|
- record["revision"] = DB::jsonInt(record["revision"],0ULL) + 1ULL;
|
|
|
+ record["revision"] = jsonInt(record["revision"],0ULL) + 1ULL;
|
|
|
_commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners));
|
|
|
modified = true;
|
|
|
}
|
|
|
}
|
|
|
} else if (objtype == "member") {
|
|
|
- const uint64_t nwid = DB::jsonIntHex(record["nwid"],0ULL);
|
|
|
- const uint64_t id = DB::jsonIntHex(record["id"],0ULL);
|
|
|
+ const uint64_t nwid = jsonIntHex(record["nwid"],0ULL);
|
|
|
+ const uint64_t id = jsonIntHex(record["id"],0ULL);
|
|
|
if ((id)&&(nwid)) {
|
|
|
nlohmann::json network,old;
|
|
|
get(nwid,network,id,old);
|
|
|
if ((!old.is_object())||(!_compareRecords(old,record))) {
|
|
|
- record["revision"] = DB::jsonInt(record["revision"],0ULL) + 1ULL;
|
|
|
+ record["revision"] = jsonInt(record["revision"],0ULL) + 1ULL;
|
|
|
_commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners));
|
|
|
modified = true;
|
|
|
}
|
|
@@ -207,12 +233,15 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId)
|
|
|
tmp.first["objtype"] = "_delete_network";
|
|
|
tmp.second = true;
|
|
|
_commitQueue.post(tmp);
|
|
|
+ nlohmann::json nullJson;
|
|
|
+ _networkChanged(tmp.first, nullJson, true);
|
|
|
}
|
|
|
|
|
|
void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
|
|
|
{
|
|
|
char tmp2[24];
|
|
|
- std::pair<nlohmann::json,bool> tmp;
|
|
|
+ waitForReady();
|
|
|
+ std::pair<nlohmann::json,bool> tmp, nw;
|
|
|
Utils::hex(networkId, tmp2);
|
|
|
tmp.first["nwid"] = tmp2;
|
|
|
Utils::hex(memberId, tmp2);
|
|
@@ -220,6 +249,8 @@ void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
|
|
|
tmp.first["objtype"] = "_delete_member";
|
|
|
tmp.second = true;
|
|
|
_commitQueue.post(tmp);
|
|
|
+ nlohmann::json nullJson;
|
|
|
+ _memberChanged(tmp.first, nullJson, true);
|
|
|
}
|
|
|
|
|
|
void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress)
|
|
@@ -239,11 +270,30 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
|
|
|
fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
|
|
|
exit(1);
|
|
|
}
|
|
|
+
|
|
|
+ std::string setKey = "networks:{" + _myAddressStr + "}";
|
|
|
+
|
|
|
+ if (_rc != NULL) {
|
|
|
+ try {
|
|
|
+ if (_rc->clusterMode) {
|
|
|
+ _cluster->del(setKey);
|
|
|
+ } else {
|
|
|
+ _redis->del(setKey);
|
|
|
+ }
|
|
|
+ } catch (sw::redis::Error &e) {
|
|
|
+ // del can throw an error if the key doesn't exist
|
|
|
+ // swallow it and move along
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ std::unordered_set<std::string> networkSet;
|
|
|
|
|
|
const char *params[1] = {
|
|
|
_myAddressStr.c_str()
|
|
|
};
|
|
|
|
|
|
+ fprintf(stderr, "Initializing Networks...\n");
|
|
|
+
|
|
|
PGresult *res = PQexecParams(conn, "SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, "
|
|
|
"enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, "
|
|
|
"remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network "
|
|
@@ -269,9 +319,12 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
|
|
|
const char *nwidparam[1] = {
|
|
|
PQgetvalue(res, i, 0)
|
|
|
};
|
|
|
+ std::string nwid = PQgetvalue(res, i, 0);
|
|
|
+
|
|
|
+ networkSet.insert(nwid);
|
|
|
|
|
|
- config["id"] = PQgetvalue(res, i, 0);
|
|
|
- config["nwid"] = PQgetvalue(res, i, 0);
|
|
|
+ config["id"] = nwid;
|
|
|
+ config["nwid"] = nwid;
|
|
|
try {
|
|
|
config["creationTime"] = std::stoull(PQgetvalue(res, i, 1));
|
|
|
} catch (std::exception &e) {
|
|
@@ -384,14 +437,29 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
|
|
|
|
|
|
PQclear(res);
|
|
|
|
|
|
+ if(!networkSet.empty()) {
|
|
|
+ if (_rc && _rc->clusterMode) {
|
|
|
+ auto tx = _cluster->transaction(_myAddressStr, true);
|
|
|
+ tx.sadd(setKey, networkSet.begin(), networkSet.end());
|
|
|
+ tx.exec();
|
|
|
+ } else if (_rc && !_rc->clusterMode) {
|
|
|
+ auto tx = _redis->transaction(true);
|
|
|
+ tx.sadd(setKey, networkSet.begin(), networkSet.end());
|
|
|
+ tx.exec();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
if (++this->_ready == 2) {
|
|
|
if (_waitNoticePrinted) {
|
|
|
fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
|
|
|
}
|
|
|
_readyLock.unlock();
|
|
|
}
|
|
|
+ } catch (sw::redis::Error &e) {
|
|
|
+ fprintf(stderr, "ERROR: Error initializing networks in Redis: %s\n", e.what());
|
|
|
+ exit(-1);
|
|
|
} catch (std::exception &e) {
|
|
|
- fprintf(stderr, "ERROR: Error initializing networks: %s", e.what());
|
|
|
+ fprintf(stderr, "ERROR: Error initializing networks: %s\n", e.what());
|
|
|
exit(-1);
|
|
|
}
|
|
|
}
|
|
@@ -403,11 +471,44 @@ void PostgreSQL::initializeMembers(PGconn *conn)
|
|
|
fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
|
|
|
exit(1);
|
|
|
}
|
|
|
+ std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:";
|
|
|
+
|
|
|
+ if (_rc != NULL) {
|
|
|
+ std::lock_guard<std::mutex> l(_networks_l);
|
|
|
+ std::unordered_set<std::string> deletes;
|
|
|
+ for ( auto it : _networks) {
|
|
|
+ uint64_t nwid_i = it.first;
|
|
|
+ char nwidTmp[64] = {0};
|
|
|
+ OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
|
|
|
+ std::string nwid(nwidTmp);
|
|
|
+ std::string key = setKeyBase + nwid;
|
|
|
+ deletes.insert(key);
|
|
|
+ }
|
|
|
|
|
|
+ if (!deletes.empty()) {
|
|
|
+ if (_rc->clusterMode) {
|
|
|
+ auto tx = _cluster->transaction(_myAddressStr, true);
|
|
|
+ for (std::string k : deletes) {
|
|
|
+ tx.del(k);
|
|
|
+ }
|
|
|
+ tx.exec();
|
|
|
+ } else {
|
|
|
+ auto tx = _redis->transaction(true);
|
|
|
+ for (std::string k : deletes) {
|
|
|
+ tx.del(k);
|
|
|
+ }
|
|
|
+ tx.exec();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
const char *params[1] = {
|
|
|
_myAddressStr.c_str()
|
|
|
};
|
|
|
+
|
|
|
+ std::unordered_map<std::string, std::string> networkMembers;
|
|
|
|
|
|
+ fprintf(stderr, "Initializing Members...\n");
|
|
|
PGresult *res = PQexecParams(conn,
|
|
|
"SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, "
|
|
|
" EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, "
|
|
@@ -438,6 +539,9 @@ void PostgreSQL::initializeMembers(PGconn *conn)
|
|
|
|
|
|
std::string memberId(PQgetvalue(res, i, 0));
|
|
|
std::string networkId(PQgetvalue(res, i, 1));
|
|
|
+
|
|
|
+ networkMembers.insert(std::pair<std::string, std::string>(setKeyBase+networkId, memberId));
|
|
|
+
|
|
|
std::string ctime = PQgetvalue(res, i, 5);
|
|
|
config["id"] = memberId;
|
|
|
config["nwid"] = networkId;
|
|
@@ -530,7 +634,12 @@ void PostgreSQL::initializeMembers(PGconn *conn)
|
|
|
|
|
|
int n = PQntuples(r2);
|
|
|
for (int j = 0; j < n; ++j) {
|
|
|
- config["ipAssignments"].push_back(PQgetvalue(r2, j, 0));
|
|
|
+ std::string ipaddr = PQgetvalue(r2, j, 0);
|
|
|
+ std::size_t pos = ipaddr.find('/');
|
|
|
+ if (pos != std::string::npos) {
|
|
|
+ ipaddr = ipaddr.substr(0, pos);
|
|
|
+ }
|
|
|
+ config["ipAssignments"].push_back(ipaddr);
|
|
|
}
|
|
|
|
|
|
_memberChanged(empty, config, false);
|
|
@@ -538,12 +647,31 @@ void PostgreSQL::initializeMembers(PGconn *conn)
|
|
|
|
|
|
PQclear(res);
|
|
|
|
|
|
+ if (!networkMembers.empty()) {
|
|
|
+ if (_rc != NULL) {
|
|
|
+ if (_rc->clusterMode) {
|
|
|
+ auto tx = _cluster->transaction(_myAddressStr, true);
|
|
|
+ for (auto it : networkMembers) {
|
|
|
+ tx.sadd(it.first, it.second);
|
|
|
+ }
|
|
|
+ tx.exec();
|
|
|
+ } else {
|
|
|
+ auto tx = _redis->transaction(true);
|
|
|
+ for (auto it : networkMembers) {
|
|
|
+ tx.sadd(it.first, it.second);
|
|
|
+ }
|
|
|
+ tx.exec();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
if (++this->_ready == 2) {
|
|
|
if (_waitNoticePrinted) {
|
|
|
fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
|
|
|
}
|
|
|
_readyLock.unlock();
|
|
|
}
|
|
|
+ } catch (sw::redis::Error &e) {
|
|
|
+ fprintf(stderr, "ERROR: Error initializing members (redis): %s\n", e.what());
|
|
|
} catch (std::exception &e) {
|
|
|
fprintf(stderr, "ERROR: Error initializing members: %s\n", e.what());
|
|
|
exit(-1);
|
|
@@ -581,14 +709,15 @@ void PostgreSQL::heartbeat()
|
|
|
PQfinish(conn);
|
|
|
exit(6);
|
|
|
}
|
|
|
+ int64_t ts = OSUtils::now();
|
|
|
if (conn) {
|
|
|
- std::string major = std::to_string(ZEROTIER_ONE_VERSION_MAJOR);
|
|
|
- std::string minor = std::to_string(ZEROTIER_ONE_VERSION_MINOR);
|
|
|
- std::string rev = std::to_string(ZEROTIER_ONE_VERSION_REVISION);
|
|
|
- std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD);
|
|
|
- std::string now = std::to_string(OSUtils::now());
|
|
|
+ std::string major = std::to_string(ZEROTIER_VERSION_MAJOR);
|
|
|
+ std::string minor = std::to_string(ZEROTIER_VERSION_MINOR);
|
|
|
+ std::string rev = std::to_string(ZEROTIER_VERSION_REVISION);
|
|
|
+ std::string build = std::to_string(ZEROTIER_VERSION_BUILD);
|
|
|
+ std::string now = std::to_string(ts);
|
|
|
std::string host_port = std::to_string(_listenPort);
|
|
|
- std::string use_rabbitmq = (_mqc != NULL) ? "true" : "false";
|
|
|
+ std::string use_redis = (_rc != NULL) ? "true" : "false";
|
|
|
const char *values[10] = {
|
|
|
controllerId,
|
|
|
hostname,
|
|
@@ -599,16 +728,16 @@ void PostgreSQL::heartbeat()
|
|
|
rev.c_str(),
|
|
|
build.c_str(),
|
|
|
host_port.c_str(),
|
|
|
- use_rabbitmq.c_str()
|
|
|
+ use_redis.c_str()
|
|
|
};
|
|
|
|
|
|
PGresult *res = PQexecParams(conn,
|
|
|
- "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_rabbitmq) "
|
|
|
+ "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_redis) "
|
|
|
"VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) "
|
|
|
"ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
|
|
|
"public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
|
|
|
"v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, "
|
|
|
- "use_rabbitmq = EXCLUDED.use_rabbitmq",
|
|
|
+ "use_redis = EXCLUDED.use_redis",
|
|
|
10, // number of parameters
|
|
|
NULL, // oid field. ignore
|
|
|
values, // values for substitution
|
|
@@ -621,12 +750,20 @@ void PostgreSQL::heartbeat()
|
|
|
}
|
|
|
PQclear(res);
|
|
|
}
|
|
|
+ if (_rc != NULL) {
|
|
|
+ if (_rc->clusterMode) {
|
|
|
+ _cluster->zadd("controllers", controllerId, ts);
|
|
|
+ } else {
|
|
|
+ _redis->zadd("controllers", controllerId, ts);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
|
|
}
|
|
|
|
|
|
PQfinish(conn);
|
|
|
conn = NULL;
|
|
|
+ fprintf(stderr, "Exited heartbeat thread\n");
|
|
|
}
|
|
|
|
|
|
void PostgreSQL::membersDbWatcher()
|
|
@@ -638,12 +775,10 @@ void PostgreSQL::membersDbWatcher()
|
|
|
exit(1);
|
|
|
}
|
|
|
|
|
|
- initializeMembers(conn);
|
|
|
-
|
|
|
- if (this->_mqc != NULL) {
|
|
|
+ if (_rc) {
|
|
|
PQfinish(conn);
|
|
|
conn = NULL;
|
|
|
- _membersWatcher_RabbitMQ();
|
|
|
+ _membersWatcher_Redis();
|
|
|
} else {
|
|
|
_membersWatcher_Postgres(conn);
|
|
|
PQfinish(conn);
|
|
@@ -698,41 +833,62 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-void PostgreSQL::_membersWatcher_RabbitMQ() {
|
|
|
+void PostgreSQL::_membersWatcher_Redis() {
|
|
|
char buf[11] = {0};
|
|
|
- std::string qname = "member_"+ std::string(_myAddress.toString(buf));
|
|
|
- RabbitMQ rmq(_mqc, qname.c_str());
|
|
|
- try {
|
|
|
- rmq.init();
|
|
|
- } catch (std::runtime_error &e) {
|
|
|
- fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
|
|
|
- exit(11);
|
|
|
- }
|
|
|
+ std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}";
|
|
|
+
|
|
|
while (_run == 1) {
|
|
|
try {
|
|
|
- std::string msg = rmq.consume();
|
|
|
- // fprintf(stderr, "Got Member Update: %s\n", msg.c_str());
|
|
|
- if (msg.empty()) {
|
|
|
- continue;
|
|
|
+ json tmp;
|
|
|
+ std::unordered_map<std::string, ItemStream> result;
|
|
|
+ if (_rc->clusterMode) {
|
|
|
+ _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
|
|
|
+ } else {
|
|
|
+ _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
|
|
|
}
|
|
|
- json tmp(json::parse(msg));
|
|
|
- json &ov = tmp["old_val"];
|
|
|
- json &nv = tmp["new_val"];
|
|
|
- json oldConfig, newConfig;
|
|
|
- if (ov.is_object()) oldConfig = ov;
|
|
|
- if (nv.is_object()) newConfig = nv;
|
|
|
- if (oldConfig.is_object() || newConfig.is_object()) {
|
|
|
- _memberChanged(oldConfig,newConfig,(this->_ready>=2));
|
|
|
+ if (!result.empty()) {
|
|
|
+ for (auto element : result) {
|
|
|
+ #ifdef ZT_TRACE
|
|
|
+ fprintf(stdout, "Received notification from: %s\n", element.first.c_str());
|
|
|
+ #endif
|
|
|
+ for (auto rec : element.second) {
|
|
|
+ std::string id = rec.first;
|
|
|
+ auto attrs = rec.second;
|
|
|
+ #ifdef ZT_TRACE
|
|
|
+ fprintf(stdout, "Record ID: %s\n", id.c_str());
|
|
|
+ fprintf(stdout, "attrs len: %lu\n", attrs.size());
|
|
|
+ #endif
|
|
|
+ for (auto a : attrs) {
|
|
|
+ #ifdef ZT_TRACE
|
|
|
+ fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str());
|
|
|
+ #endif
|
|
|
+ try {
|
|
|
+ tmp = json::parse(a.second);
|
|
|
+ json &ov = tmp["old_val"];
|
|
|
+ json &nv = tmp["new_val"];
|
|
|
+ json oldConfig, newConfig;
|
|
|
+ if (ov.is_object()) oldConfig = ov;
|
|
|
+ if (nv.is_object()) newConfig = nv;
|
|
|
+ if (oldConfig.is_object()||newConfig.is_object()) {
|
|
|
+ _memberChanged(oldConfig,newConfig,(this->_ready >= 2));
|
|
|
+ }
|
|
|
+ } catch (...) {
|
|
|
+ fprintf(stderr, "json parse error in networkWatcher_Redis\n");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (_rc->clusterMode) {
|
|
|
+ _cluster->xdel(key, id);
|
|
|
+ } else {
|
|
|
+ _redis->xdel(key, id);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- } catch (std::runtime_error &e) {
|
|
|
- fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what());
|
|
|
- break;
|
|
|
- } catch(std::exception &e ) {
|
|
|
- fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what());
|
|
|
- } catch(...) {
|
|
|
- fprintf(stderr, "RABBITMQ ERROR member change: unknown error\n");
|
|
|
+ } catch (sw::redis::Error &e) {
|
|
|
+ fprintf(stderr, "Error in Redis members watcher: %s\n", e.what());
|
|
|
}
|
|
|
}
|
|
|
+ fprintf(stderr, "membersWatcher ended\n");
|
|
|
}
|
|
|
|
|
|
void PostgreSQL::networksDbWatcher()
|
|
@@ -744,12 +900,10 @@ void PostgreSQL::networksDbWatcher()
|
|
|
exit(1);
|
|
|
}
|
|
|
|
|
|
- initializeNetworks(conn);
|
|
|
-
|
|
|
- if (this->_mqc != NULL) {
|
|
|
+ if (_rc) {
|
|
|
PQfinish(conn);
|
|
|
conn = NULL;
|
|
|
- _networksWatcher_RabbitMQ();
|
|
|
+ _networksWatcher_Redis();
|
|
|
} else {
|
|
|
_networksWatcher_Postgres(conn);
|
|
|
PQfinish(conn);
|
|
@@ -802,41 +956,63 @@ void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-void PostgreSQL::_networksWatcher_RabbitMQ() {
|
|
|
+void PostgreSQL::_networksWatcher_Redis() {
|
|
|
char buf[11] = {0};
|
|
|
- std::string qname = "network_"+ std::string(_myAddress.toString(buf));
|
|
|
- RabbitMQ rmq(_mqc, qname.c_str());
|
|
|
- try {
|
|
|
- rmq.init();
|
|
|
- } catch (std::runtime_error &e) {
|
|
|
- fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
|
|
|
- exit(11);
|
|
|
- }
|
|
|
+ std::string key = "network-stream:{" + std::string(_myAddress.toString(buf)) + "}";
|
|
|
+
|
|
|
while (_run == 1) {
|
|
|
try {
|
|
|
- std::string msg = rmq.consume();
|
|
|
- if (msg.empty()) {
|
|
|
- continue;
|
|
|
+ json tmp;
|
|
|
+ std::unordered_map<std::string, ItemStream> result;
|
|
|
+ if (_rc->clusterMode) {
|
|
|
+ _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
|
|
|
+ } else {
|
|
|
+ _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
|
|
|
}
|
|
|
- // fprintf(stderr, "Got network update: %s\n", msg.c_str());
|
|
|
- json tmp(json::parse(msg));
|
|
|
- json &ov = tmp["old_val"];
|
|
|
- json &nv = tmp["new_val"];
|
|
|
- json oldConfig, newConfig;
|
|
|
- if (ov.is_object()) oldConfig = ov;
|
|
|
- if (nv.is_object()) newConfig = nv;
|
|
|
- if (oldConfig.is_object()||newConfig.is_object()) {
|
|
|
- _networkChanged(oldConfig,newConfig,(this->_ready >= 2));
|
|
|
+
|
|
|
+ if (!result.empty()) {
|
|
|
+ for (auto element : result) {
|
|
|
+#ifdef ZT_TRACE
|
|
|
+ fprintf(stdout, "Received notification from: %s\n", element.first.c_str());
|
|
|
+#endif
|
|
|
+ for (auto rec : element.second) {
|
|
|
+ std::string id = rec.first;
|
|
|
+ auto attrs = rec.second;
|
|
|
+#ifdef ZT_TRACE
|
|
|
+ fprintf(stdout, "Record ID: %s\n", id.c_str());
|
|
|
+ fprintf(stdout, "attrs len: %lu\n", attrs.size());
|
|
|
+#endif
|
|
|
+ for (auto a : attrs) {
|
|
|
+#ifdef ZT_TRACE
|
|
|
+ fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str());
|
|
|
+#endif
|
|
|
+ try {
|
|
|
+ tmp = json::parse(a.second);
|
|
|
+ json &ov = tmp["old_val"];
|
|
|
+ json &nv = tmp["new_val"];
|
|
|
+ json oldConfig, newConfig;
|
|
|
+ if (ov.is_object()) oldConfig = ov;
|
|
|
+ if (nv.is_object()) newConfig = nv;
|
|
|
+ if (oldConfig.is_object()||newConfig.is_object()) {
|
|
|
+ _networkChanged(oldConfig,newConfig,(this->_ready >= 2));
|
|
|
+ }
|
|
|
+ } catch (...) {
|
|
|
+ fprintf(stderr, "json parse error in networkWatcher_Redis\n");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (_rc->clusterMode) {
|
|
|
+ _cluster->xdel(key, id);
|
|
|
+ } else {
|
|
|
+ _redis->xdel(key, id);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- } catch (std::runtime_error &e) {
|
|
|
- fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
|
|
|
- break;
|
|
|
- } catch (std::exception &e) {
|
|
|
- fprintf(stderr, "RABBITMQ ERROR network watcher: %s\n", e.what());
|
|
|
- } catch(...) {
|
|
|
- fprintf(stderr, "RABBITMQ ERROR network watcher: unknown error\n");
|
|
|
+ } catch (sw::redis::Error &e) {
|
|
|
+ fprintf(stderr, "Error in Redis networks watcher: %s\n", e.what());
|
|
|
}
|
|
|
}
|
|
|
+ fprintf(stderr, "networksWatcher ended\n");
|
|
|
}
|
|
|
|
|
|
void PostgreSQL::commitThread()
|
|
@@ -872,12 +1048,12 @@ void PostgreSQL::commitThread()
|
|
|
target = (*config)["remoteTraceTarget"];
|
|
|
}
|
|
|
|
|
|
- std::string caps = DB::jsonDump((*config)["capabilities"], -1);
|
|
|
+ std::string caps = jsonDump((*config)["capabilities"], -1);
|
|
|
std::string lastAuthTime = std::to_string((long long)(*config)["lastAuthorizedTime"]);
|
|
|
std::string lastDeauthTime = std::to_string((long long)(*config)["lastDeauthorizedTime"]);
|
|
|
std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]);
|
|
|
std::string rev = std::to_string((unsigned long long)(*config)["revision"]);
|
|
|
- std::string tags = DB::jsonDump((*config)["tags"], -1);
|
|
|
+ std::string tags = jsonDump((*config)["tags"], -1);
|
|
|
std::string vmajor = std::to_string((int)(*config)["vMajor"]);
|
|
|
std::string vminor = std::to_string((int)(*config)["vMinor"]);
|
|
|
std::string vrev = std::to_string((int)(*config)["vRev"]);
|
|
@@ -924,7 +1100,7 @@ void PostgreSQL::commitThread()
|
|
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
|
|
|
fprintf(stderr, "ERROR: Error updating member: %s\n", PQresultErrorMessage(res));
|
|
|
- fprintf(stderr, "%s", DB::jsonDump(*config, 2).c_str());
|
|
|
+ fprintf(stderr, "%s", jsonDump(*config, 2).c_str());
|
|
|
PQclear(res);
|
|
|
delete config;
|
|
|
config = nullptr;
|
|
@@ -984,7 +1160,7 @@ void PostgreSQL::commitThread()
|
|
|
};
|
|
|
|
|
|
res = PQexecParams(conn,
|
|
|
- "INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3)",
|
|
|
+ "INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) ON CONFLICT (network_id, member_id, address) DO NOTHING",
|
|
|
3,
|
|
|
NULL,
|
|
|
v3,
|
|
@@ -998,6 +1174,7 @@ void PostgreSQL::commitThread()
|
|
|
PQclear(PQexec(conn, "ROLLBACK"));
|
|
|
break;;
|
|
|
}
|
|
|
+ assignments.push_back(addr);
|
|
|
}
|
|
|
|
|
|
res = PQexec(conn, "COMMIT");
|
|
@@ -1007,8 +1184,8 @@ void PostgreSQL::commitThread()
|
|
|
|
|
|
PQclear(res);
|
|
|
|
|
|
- const uint64_t nwidInt = DB::jsonIntHex((*config)["nwid"], 0ULL);
|
|
|
- const uint64_t memberidInt = DB::jsonIntHex((*config)["id"], 0ULL);
|
|
|
+ const uint64_t nwidInt = jsonIntHex((*config)["nwid"], 0ULL);
|
|
|
+ const uint64_t memberidInt = jsonIntHex((*config)["id"], 0ULL);
|
|
|
if (nwidInt && memberidInt) {
|
|
|
nlohmann::json nwOrig;
|
|
|
nlohmann::json memOrig;
|
|
@@ -1038,15 +1215,15 @@ void PostgreSQL::commitThread()
|
|
|
if ((*config)["rulesSource"].is_string()) {
|
|
|
rulesSource = (*config)["rulesSource"];
|
|
|
}
|
|
|
- std::string caps = DB::jsonDump((*config)["capabilitles"], -1);
|
|
|
+ std::string caps = jsonDump((*config)["capabilitles"], -1);
|
|
|
std::string now = std::to_string(OSUtils::now());
|
|
|
std::string mtu = std::to_string((int)(*config)["mtu"]);
|
|
|
std::string mcastLimit = std::to_string((int)(*config)["multicastLimit"]);
|
|
|
std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]);
|
|
|
- std::string rules = DB::jsonDump((*config)["rules"], -1);
|
|
|
- std::string tags = DB::jsonDump((*config)["tags"], -1);
|
|
|
- std::string v4mode = DB::jsonDump((*config)["v4AssignMode"],-1);
|
|
|
- std::string v6mode = DB::jsonDump((*config)["v6AssignMode"], -1);
|
|
|
+ std::string rules = jsonDump((*config)["rules"], -1);
|
|
|
+ std::string tags = jsonDump((*config)["tags"], -1);
|
|
|
+ std::string v4mode = jsonDump((*config)["v4AssignMode"],-1);
|
|
|
+ std::string v6mode = jsonDump((*config)["v6AssignMode"], -1);
|
|
|
bool enableBroadcast = (*config)["enableBroadcast"];
|
|
|
bool isPrivate = (*config)["private"];
|
|
|
|
|
@@ -1253,7 +1430,7 @@ void PostgreSQL::commitThread()
|
|
|
}
|
|
|
PQclear(res);
|
|
|
|
|
|
- const uint64_t nwidInt = DB::jsonIntHex((*config)["nwid"], 0ULL);
|
|
|
+ const uint64_t nwidInt = jsonIntHex((*config)["nwid"], 0ULL);
|
|
|
if (nwidInt) {
|
|
|
nlohmann::json nwOrig;
|
|
|
nlohmann::json nwNew(*config);
|
|
@@ -1268,6 +1445,20 @@ void PostgreSQL::commitThread()
|
|
|
} catch (std::exception &e) {
|
|
|
fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
|
|
|
}
|
|
|
+ if (_rc != NULL) {
|
|
|
+ try {
|
|
|
+ std::string id = (*config)["id"];
|
|
|
+ std::string controllerId = _myAddressStr.c_str();
|
|
|
+ std::string key = "networks:{" + controllerId + "}";
|
|
|
+ if (_rc->clusterMode) {
|
|
|
+ _cluster->sadd(key, id);
|
|
|
+ } else {
|
|
|
+ _redis->sadd(key, id);
|
|
|
+ }
|
|
|
+ } catch (sw::redis::Error &e) {
|
|
|
+ fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
|
|
|
+ }
|
|
|
+ }
|
|
|
} else if (objtype == "_delete_network") {
|
|
|
try {
|
|
|
std::string networkId = (*config)["nwid"];
|
|
@@ -1291,6 +1482,22 @@ void PostgreSQL::commitThread()
|
|
|
} catch (std::exception &e) {
|
|
|
fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what());
|
|
|
}
|
|
|
+ if (_rc != NULL) {
|
|
|
+ try {
|
|
|
+ std::string id = (*config)["id"];
|
|
|
+ std::string controllerId = _myAddressStr.c_str();
|
|
|
+ std::string key = "networks:{" + controllerId + "}";
|
|
|
+ if (_rc->clusterMode) {
|
|
|
+ _cluster->srem(key, id);
|
|
|
+ _cluster->del("network-nodes-online:{"+controllerId+"}:"+id);
|
|
|
+ } else {
|
|
|
+ _redis->srem(key, id);
|
|
|
+ _redis->del("network-nodes-online:{"+controllerId+"}:"+id);
|
|
|
+ }
|
|
|
+ } catch (sw::redis::Error &e) {
|
|
|
+ fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
|
|
|
+ }
|
|
|
+ }
|
|
|
} else if (objtype == "_delete_member") {
|
|
|
try {
|
|
|
std::string memberId = (*config)["id"];
|
|
@@ -1318,6 +1525,23 @@ void PostgreSQL::commitThread()
|
|
|
} catch (std::exception &e) {
|
|
|
fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what());
|
|
|
}
|
|
|
+ if (_rc != NULL) {
|
|
|
+ try {
|
|
|
+ std::string memberId = (*config)["id"];
|
|
|
+ std::string networkId = (*config)["nwid"];
|
|
|
+ std::string controllerId = _myAddressStr.c_str();
|
|
|
+ std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId;
|
|
|
+ if (_rc->clusterMode) {
|
|
|
+ _cluster->srem(key, memberId);
|
|
|
+ _cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
|
|
|
+ } else {
|
|
|
+ _redis->srem(key, memberId);
|
|
|
+ _redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
|
|
|
+ }
|
|
|
+ } catch (sw::redis::Error &e) {
|
|
|
+ fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what());
|
|
|
+ }
|
|
|
+ }
|
|
|
} else {
|
|
|
fprintf(stderr, "ERROR: unknown objtype");
|
|
|
}
|
|
@@ -1333,9 +1557,21 @@ void PostgreSQL::commitThread()
|
|
|
fprintf(stderr, "ERROR: %s commitThread should still be running! Exiting Controller.\n", _myAddressStr.c_str());
|
|
|
exit(7);
|
|
|
}
|
|
|
+ fprintf(stderr, "commitThread finished\n");
|
|
|
}
|
|
|
|
|
|
void PostgreSQL::onlineNotificationThread()
|
|
|
+{
|
|
|
+ waitForReady();
|
|
|
+
|
|
|
+ if (_rc != NULL) {
|
|
|
+ onlineNotification_Redis();
|
|
|
+ } else {
|
|
|
+ onlineNotification_Postgres();
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void PostgreSQL::onlineNotification_Postgres()
|
|
|
{
|
|
|
PGconn *conn = getPgConn();
|
|
|
if (PQstatus(conn) == CONNECTION_BAD) {
|
|
@@ -1345,9 +1581,7 @@ void PostgreSQL::onlineNotificationThread()
|
|
|
}
|
|
|
_connected = 1;
|
|
|
|
|
|
- //int64_t lastUpdatedNetworkStatus = 0;
|
|
|
- std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;
|
|
|
-
|
|
|
+ nlohmann::json jtmp1, jtmp2;
|
|
|
while (_run == 1) {
|
|
|
if (PQstatus(conn) != CONNECTION_OK) {
|
|
|
fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
|
|
@@ -1355,9 +1589,6 @@ void PostgreSQL::onlineNotificationThread()
|
|
|
exit(5);
|
|
|
}
|
|
|
|
|
|
- // map used to send notifications to front end
|
|
|
- std::unordered_map<std::string, std::vector<std::string>> updateMap;
|
|
|
-
|
|
|
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
|
|
|
{
|
|
|
std::lock_guard<std::mutex> l(_lastOnline_l);
|
|
@@ -1378,20 +1609,13 @@ void PostgreSQL::onlineNotificationThread()
|
|
|
OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i);
|
|
|
OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second);
|
|
|
|
|
|
- auto found = _networks.find(nwid_i);
|
|
|
- if (found == _networks.end()) {
|
|
|
- continue; // skip members trying to join non-existant networks
|
|
|
+ if(!get(nwid_i, jtmp1, i->first.second, jtmp2)) {
|
|
|
+ continue; // skip non existent networks/members
|
|
|
}
|
|
|
|
|
|
std::string networkId(nwidTmp);
|
|
|
std::string memberId(memTmp);
|
|
|
|
|
|
- std::vector<std::string> &members = updateMap[networkId];
|
|
|
- members.push_back(memberId);
|
|
|
-
|
|
|
- lastOnlineCumulative[i->first] = i->second.first;
|
|
|
-
|
|
|
-
|
|
|
const char *qvals[2] = {
|
|
|
networkId.c_str(),
|
|
|
memberId.c_str()
|
|
@@ -1451,7 +1675,7 @@ void PostgreSQL::onlineNotificationThread()
|
|
|
PQclear(res);
|
|
|
}
|
|
|
|
|
|
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|
|
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
|
}
|
|
|
fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
|
|
|
PQfinish(conn);
|
|
@@ -1461,6 +1685,95 @@ void PostgreSQL::onlineNotificationThread()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+void PostgreSQL::onlineNotification_Redis()
|
|
|
+{
|
|
|
+ _connected = 1;
|
|
|
+
|
|
|
+ char buf[11] = {0};
|
|
|
+ std::string controllerId = std::string(_myAddress.toString(buf));
|
|
|
+
|
|
|
+ while (_run == 1) {
|
|
|
+ std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
|
|
|
+ {
|
|
|
+ std::lock_guard<std::mutex> l(_lastOnline_l);
|
|
|
+ lastOnline.swap(_lastOnline);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ if (!lastOnline.empty()) {
|
|
|
+ if (_rc->clusterMode) {
|
|
|
+ auto tx = _cluster->transaction(controllerId, true);
|
|
|
+ _doRedisUpdate(tx, controllerId, lastOnline);
|
|
|
+ } else {
|
|
|
+ auto tx = _redis->transaction(true);
|
|
|
+ _doRedisUpdate(tx, controllerId, lastOnline);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (sw::redis::Error &e) {
|
|
|
+#ifdef ZT_TRACE
|
|
|
+ fprintf(stderr, "Error in online notification thread (redis): %s\n", e.what());
|
|
|
+#endif
|
|
|
+ }
|
|
|
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &controllerId,
|
|
|
+ std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > &lastOnline)
|
|
|
+
|
|
|
+{
|
|
|
+ nlohmann::json jtmp1, jtmp2;
|
|
|
+ for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
|
|
|
+ uint64_t nwid_i = i->first.first;
|
|
|
+ uint64_t memberid_i = i->first.second;
|
|
|
+ char nwidTmp[64];
|
|
|
+ char memTmp[64];
|
|
|
+ char ipTmp[64];
|
|
|
+ OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i);
|
|
|
+ OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", memberid_i);
|
|
|
+
|
|
|
+ if (!get(nwid_i, jtmp1, memberid_i, jtmp2)){
|
|
|
+ continue; // skip non existent members/networks
|
|
|
+ }
|
|
|
+
|
|
|
+ std::string networkId(nwidTmp);
|
|
|
+ std::string memberId(memTmp);
|
|
|
+
|
|
|
+ int64_t ts = i->second.first;
|
|
|
+ std::string ipAddr = i->second.second.toIpString(ipTmp);
|
|
|
+ std::string timestamp = std::to_string(ts);
|
|
|
+
|
|
|
+ std::unordered_map<std::string, std::string> record = {
|
|
|
+ {"id", memberId},
|
|
|
+ {"address", ipAddr},
|
|
|
+ {"last_updated", std::to_string(ts)}
|
|
|
+ };
|
|
|
+ tx.zadd("nodes-online:{"+controllerId+"}", memberId, ts)
|
|
|
+ .zadd("nodes-online2:{"+controllerId+"}", networkId+"-"+memberId, ts)
|
|
|
+ .zadd("network-nodes-online:{"+controllerId+"}:"+networkId, memberId, ts)
|
|
|
+ .zadd("active-networks:{"+controllerId+"}", networkId, ts)
|
|
|
+ .sadd("network-nodes-all:{"+controllerId+"}:"+networkId, memberId)
|
|
|
+ .hmset("member:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end());
|
|
|
+ }
|
|
|
+
|
|
|
+ // expire records from all-nodes and network-nodes member list
|
|
|
+ uint64_t expireOld = OSUtils::now() - 300000;
|
|
|
+
|
|
|
+ tx.zremrangebyscore("nodes-online:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
|
|
|
+ tx.zremrangebyscore("nodes-online2:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
|
|
|
+ tx.zremrangebyscore("active-networks:{"+controllerId+"}", sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
|
|
|
+ {
|
|
|
+ std::lock_guard<std::mutex> l(_networks_l);
|
|
|
+ for (const auto &it : _networks) {
|
|
|
+ uint64_t nwid_i = it.first;
|
|
|
+ char nwidTmp[64];
|
|
|
+ OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i);
|
|
|
+ tx.zremrangebyscore("network-nodes-online:{"+controllerId+"}:"+nwidTmp,
|
|
|
+ sw::redis::RightBoundedInterval<double>(expireOld, sw::redis::BoundType::LEFT_OPEN));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ tx.exec();
|
|
|
+}
|
|
|
+
|
|
|
PGconn *PostgreSQL::getPgConn(OverrideMode m)
|
|
|
{
|
|
|
if (m == ALLOW_PGBOUNCER_OVERRIDE) {
|