|
@@ -4,7 +4,7 @@
|
|
|
* Use of this software is governed by the Business Source License included
|
|
|
* in the LICENSE.TXT file in the project's root directory.
|
|
|
*
|
|
|
- * Change Date: 2025-01-01
|
|
|
+ * Change Date: 2026-01-01
|
|
|
*
|
|
|
* On the date above, in accordance with the Business Source License, use
|
|
|
* of this software will be governed by version 2.0 of the Apache License.
|
|
@@ -21,6 +21,8 @@
|
|
|
#include "../version.h"
|
|
|
#include "Redis.hpp"
|
|
|
|
|
|
+#include <smeeclient.h>
|
|
|
+
|
|
|
#include <libpq-fe.h>
|
|
|
#include <sstream>
|
|
|
#include <iomanip>
|
|
@@ -159,6 +161,8 @@ using Attrs = std::vector<std::pair<std::string, std::string>>;
|
|
|
using Item = std::pair<std::string, Attrs>;
|
|
|
using ItemStream = std::vector<Item>;
|
|
|
|
|
|
+
|
|
|
+
|
|
|
PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, RedisConfig *rc)
|
|
|
: DB()
|
|
|
, _pool()
|
|
@@ -173,6 +177,7 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
|
|
|
, _redis(NULL)
|
|
|
, _cluster(NULL)
|
|
|
, _redisMemberStatus(false)
|
|
|
+ , _smee(NULL)
|
|
|
{
|
|
|
char myAddress[64];
|
|
|
_myAddressStr = myId.address().toString(myAddress);
|
|
@@ -248,10 +253,17 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
|
|
|
_commitThread[i] = std::thread(&PostgreSQL::commitThread, this);
|
|
|
}
|
|
|
_onlineNotificationThread = std::thread(&PostgreSQL::onlineNotificationThread, this);
|
|
|
+
|
|
|
+ configureSmee();
|
|
|
}
|
|
|
|
|
|
PostgreSQL::~PostgreSQL()
|
|
|
{
|
|
|
+ if (_smee != NULL) {
|
|
|
+ smeeclient::smee_client_delete(_smee);
|
|
|
+ _smee = NULL;
|
|
|
+ }
|
|
|
+
|
|
|
_run = 0;
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
|
|
|
@@ -265,6 +277,31 @@ PostgreSQL::~PostgreSQL()
|
|
|
_onlineNotificationThread.join();
|
|
|
}
|
|
|
|
|
|
+void PostgreSQL::configureSmee()
|
|
|
+{
|
|
|
+ const char *TEMPORAL_SCHEME = "ZT_TEMPORAL_SCHEME";
|
|
|
+ const char *TEMPORAL_HOST = "ZT_TEMPORAL_HOST";
|
|
|
+ const char *TEMPORAL_PORT = "ZT_TEMPORAL_PORT";
|
|
|
+ const char *TEMPORAL_NAMESPACE = "ZT_TEMPORAL_NAMESPACE";
|
|
|
+ const char *SMEE_TASK_QUEUE = "ZT_SMEE_TASK_QUEUE";
|
|
|
+
|
|
|
+ const char *scheme = getenv(TEMPORAL_SCHEME);
|
|
|
+ if (scheme == NULL) {
|
|
|
+ scheme = "http";
|
|
|
+ }
|
|
|
+ const char *host = getenv(TEMPORAL_HOST);
|
|
|
+ const char *port = getenv(TEMPORAL_PORT);
|
|
|
+ const char *ns = getenv(TEMPORAL_NAMESPACE);
|
|
|
+ const char *task_queue = getenv(SMEE_TASK_QUEUE);
|
|
|
+
|
|
|
+ if (scheme != NULL && host != NULL && port != NULL && ns != NULL && task_queue != NULL) {
|
|
|
+ fprintf(stderr, "creating smee client\n");
|
|
|
+ std::string hostPort = std::string(scheme) + std::string("://") + std::string(host) + std::string(":") + std::string(port);
|
|
|
+ this->_smee = smeeclient::smee_client_new(hostPort.c_str(), ns, task_queue);
|
|
|
+ } else {
|
|
|
+ fprintf(stderr, "Smee client not configured\n");
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
bool PostgreSQL::waitForReady()
|
|
|
{
|
|
@@ -743,11 +780,25 @@ void PostgreSQL::initializeNetworks()
|
|
|
fprintf(stderr, "adding networks to redis...\n");
|
|
|
if (_rc->clusterMode) {
|
|
|
auto tx = _cluster->transaction(_myAddressStr, true, false);
|
|
|
- tx.sadd(setKey, networkSet.begin(), networkSet.end());
|
|
|
+ uint64_t count = 0;
|
|
|
+ for (std::string nwid : networkSet) {
|
|
|
+ tx.sadd(setKey, nwid);
|
|
|
+ if (++count % 30000 == 0) {
|
|
|
+ tx.exec();
|
|
|
+ tx = _cluster->transaction(_myAddressStr, true, false);
|
|
|
+ }
|
|
|
+ }
|
|
|
tx.exec();
|
|
|
} else {
|
|
|
auto tx = _redis->transaction(true, false);
|
|
|
- tx.sadd(setKey, networkSet.begin(), networkSet.end());
|
|
|
+ uint64_t count = 0;
|
|
|
+ for (std::string nwid : networkSet) {
|
|
|
+ tx.sadd(setKey, nwid);
|
|
|
+ if (++count % 30000 == 0) {
|
|
|
+ tx.exec();
|
|
|
+ tx = _redis->transaction(true, false);
|
|
|
+ }
|
|
|
+ }
|
|
|
tx.exec();
|
|
|
}
|
|
|
fprintf(stderr, "done.\n");
|
|
@@ -968,14 +1019,24 @@ void PostgreSQL::initializeMembers()
|
|
|
fprintf(stderr, "Load member data into redis...\n");
|
|
|
if (_rc->clusterMode) {
|
|
|
auto tx = _cluster->transaction(_myAddressStr, true, false);
|
|
|
+ uint64_t count = 0;
|
|
|
for (auto it : networkMembers) {
|
|
|
tx.sadd(it.first, it.second);
|
|
|
+ if (++count % 30000 == 0) {
|
|
|
+ tx.exec();
|
|
|
+ tx = _cluster->transaction(_myAddressStr, true, false);
|
|
|
+ }
|
|
|
}
|
|
|
tx.exec();
|
|
|
} else {
|
|
|
auto tx = _redis->transaction(true, false);
|
|
|
+ uint64_t count = 0;
|
|
|
for (auto it : networkMembers) {
|
|
|
tx.sadd(it.first, it.second);
|
|
|
+ if (++count % 30000 == 0) {
|
|
|
+ tx.exec();
|
|
|
+ tx = _redis->transaction(true, false);
|
|
|
+ }
|
|
|
}
|
|
|
tx.exec();
|
|
|
}
|
|
@@ -1143,7 +1204,7 @@ void PostgreSQL::_membersWatcher_Redis() {
|
|
|
_memberChanged(oldConfig,newConfig,(this->_ready >= 2));
|
|
|
}
|
|
|
} catch (...) {
|
|
|
- fprintf(stderr, "json parse error in networkWatcher_Redis\n");
|
|
|
+ fprintf(stderr, "json parse error in _membersWatcher_Redis: %s\n", a.second.c_str());
|
|
|
}
|
|
|
}
|
|
|
if (_rc->clusterMode) {
|
|
@@ -1232,8 +1293,8 @@ void PostgreSQL::_networksWatcher_Redis() {
|
|
|
if (oldConfig.is_object()||newConfig.is_object()) {
|
|
|
_networkChanged(oldConfig,newConfig,(this->_ready >= 2));
|
|
|
}
|
|
|
- } catch (...) {
|
|
|
- fprintf(stderr, "json parse error in networkWatcher_Redis\n");
|
|
|
+ } catch (std::exception &e) {
|
|
|
+ fprintf(stderr, "json parse error in networkWatcher_Redis: what: %s json: %s\n", e.what(), a.second.c_str());
|
|
|
}
|
|
|
}
|
|
|
if (_rc->clusterMode) {
|
|
@@ -1306,40 +1367,72 @@ void PostgreSQL::commitThread()
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- pqxx::result res = w.exec_params0(
|
|
|
- "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
|
|
|
- "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
|
|
|
- "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
|
|
|
- "VALUES ($1, $2, $3, $4, $5, $6, "
|
|
|
- "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
|
|
|
- "$9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET "
|
|
|
- "active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, "
|
|
|
- "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, "
|
|
|
- "last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, "
|
|
|
- "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, "
|
|
|
- "revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, "
|
|
|
- "v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto",
|
|
|
- memberId,
|
|
|
- networkId,
|
|
|
- (bool)config["activeBridge"],
|
|
|
- (bool)config["authorized"],
|
|
|
- OSUtils::jsonDump(config["capabilities"], -1),
|
|
|
- OSUtils::jsonString(config["identity"], ""),
|
|
|
- (uint64_t)config["lastAuthorizedTime"],
|
|
|
- (uint64_t)config["lastDeauthorizedTime"],
|
|
|
- (bool)config["noAutoAssignIps"],
|
|
|
- (int)config["remoteTraceLevel"],
|
|
|
- target,
|
|
|
- (uint64_t)config["revision"],
|
|
|
- OSUtils::jsonDump(config["tags"], -1),
|
|
|
- (int)config["vMajor"],
|
|
|
- (int)config["vMinor"],
|
|
|
- (int)config["vRev"],
|
|
|
- (int)config["vProto"]);
|
|
|
|
|
|
+ pqxx::row mrow = w.exec_params1("SELECT COUNT(id) FROM ztc_member WHERE id = $1 AND network_id = $2", memberId, networkId);
|
|
|
+ int membercount = mrow[0].as<int>();
|
|
|
+
|
|
|
+ bool isNewMember = false;
|
|
|
+ if (membercount == 0) {
|
|
|
+ // new member
|
|
|
+ isNewMember = true;
|
|
|
+ pqxx::result res = w.exec_params0(
|
|
|
+ "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, "
|
|
|
+ "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, "
|
|
|
+ "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) "
|
|
|
+ "VALUES ($1, $2, $3, $4, $5, $6, "
|
|
|
+ "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), "
|
|
|
+ "$9, $10, $11, $12, $13, $14, $15, $16, $17)",
|
|
|
+ memberId,
|
|
|
+ networkId,
|
|
|
+ (bool)config["activeBridge"],
|
|
|
+ (bool)config["authorized"],
|
|
|
+ OSUtils::jsonDump(config["capabilities"], -1),
|
|
|
+ OSUtils::jsonString(config["identity"], ""),
|
|
|
+ (uint64_t)config["lastAuthorizedTime"],
|
|
|
+ (uint64_t)config["lastDeauthorizedTime"],
|
|
|
+ (bool)config["noAutoAssignIps"],
|
|
|
+ (int)config["remoteTraceLevel"],
|
|
|
+ target,
|
|
|
+ (uint64_t)config["revision"],
|
|
|
+ OSUtils::jsonDump(config["tags"], -1),
|
|
|
+ (int)config["vMajor"],
|
|
|
+ (int)config["vMinor"],
|
|
|
+ (int)config["vRev"],
|
|
|
+ (int)config["vProto"]);
|
|
|
+ } else {
|
|
|
+ // existing member
|
|
|
+ pqxx::result res = w.exec_params0(
|
|
|
+ "UPDATE ztc_member "
|
|
|
+ "SET active_bridge = $3, authorized = $4, capabilities = $5, identity = $6, "
|
|
|
+ "last_authorized_time = TO_TIMESTAMP($7::double precision/1000), "
|
|
|
+ "last_deauthorized_time = TO_TIMESTAMP($8::double precision/1000), "
|
|
|
+ "no_auto_assign_ips = $9, remote_trace_level = $10, remote_trace_target= $11, "
|
|
|
+ "revision = $12, tags = $13, v_major = $14, v_minor = $15, v_rev = $16, v_proto = $17 "
|
|
|
+ "WHERE id = $1 AND network_id = $2",
|
|
|
+ memberId,
|
|
|
+ networkId,
|
|
|
+ (bool)config["activeBridge"],
|
|
|
+ (bool)config["authorized"],
|
|
|
+ OSUtils::jsonDump(config["capabilities"], -1),
|
|
|
+ OSUtils::jsonString(config["identity"], ""),
|
|
|
+ (uint64_t)config["lastAuthorizedTime"],
|
|
|
+ (uint64_t)config["lastDeauthorizedTime"],
|
|
|
+ (bool)config["noAutoAssignIps"],
|
|
|
+ (int)config["remoteTraceLevel"],
|
|
|
+ target,
|
|
|
+ (uint64_t)config["revision"],
|
|
|
+ OSUtils::jsonDump(config["tags"], -1),
|
|
|
+ (int)config["vMajor"],
|
|
|
+ (int)config["vMinor"],
|
|
|
+ (int)config["vRev"],
|
|
|
+ (int)config["vProto"]
|
|
|
+ );
|
|
|
+ }
|
|
|
|
|
|
- res = w.exec_params0("DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
|
|
|
- memberId, networkId);
|
|
|
+ if (!isNewMember) {
|
|
|
+ pqxx::result res = w.exec_params0("DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2",
|
|
|
+ memberId, networkId);
|
|
|
+ }
|
|
|
|
|
|
std::vector<std::string> assignments;
|
|
|
bool ipAssignError = false;
|
|
@@ -1350,7 +1443,7 @@ void PostgreSQL::commitThread()
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- res = w.exec_params0(
|
|
|
+ pqxx::result res = w.exec_params0(
|
|
|
"INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3) ON CONFLICT (network_id, member_id, address) DO NOTHING",
|
|
|
memberId, networkId, addr);
|
|
|
|
|
@@ -1366,6 +1459,24 @@ void PostgreSQL::commitThread()
|
|
|
|
|
|
w.commit();
|
|
|
|
|
|
+ if (_smee != NULL && isNewMember) {
|
|
|
+ pqxx::row row = w.exec_params1(
|
|
|
+ "SELECT "
|
|
|
+ " count(h.hook_id) "
|
|
|
+ "FROM "
|
|
|
+ " ztc_hook h "
|
|
|
+ " INNER JOIN ztc_org o ON o.org_id = h.org_id "
|
|
|
+ " INNER JOIN ztc_network n ON n.owner_id = o.owner_id "
|
|
|
+ " WHERE "
|
|
|
+ "n.id = $1 ",
|
|
|
+ networkId
|
|
|
+ );
|
|
|
+ int64_t hookCount = row[0].as<int64_t>();
|
|
|
+ if (hookCount > 0) {
|
|
|
+ notifyNewMember(networkId, memberId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL);
|
|
|
const uint64_t memberidInt = OSUtils::jsonIntHex(config["id"], 0ULL);
|
|
|
if (nwidInt && memberidInt) {
|
|
@@ -1609,6 +1720,13 @@ void PostgreSQL::commitThread()
|
|
|
fprintf(stderr, "%s commitThread finished\n", _myAddressStr.c_str());
|
|
|
}
|
|
|
|
|
|
+void PostgreSQL::notifyNewMember(const std::string &networkID, const std::string &memberID) {
|
|
|
+ smeeclient::smee_client_notify_network_joined(
|
|
|
+ _smee,
|
|
|
+ networkID.c_str(),
|
|
|
+ memberID.c_str());
|
|
|
+}
|
|
|
+
|
|
|
void PostgreSQL::onlineNotificationThread()
|
|
|
{
|
|
|
waitForReady();
|