|
@@ -21,6 +21,7 @@
|
|
|
#include "CtlUtil.hpp"
|
|
|
#include "EmbeddedNetworkController.hpp"
|
|
|
#include "Redis.hpp"
|
|
|
+#include "opentelemetry/trace/provider.h"
|
|
|
|
|
|
#include <chrono>
|
|
|
#include <climits>
|
|
@@ -61,6 +62,11 @@ CV1::CV1(const Identity& myId, const char* path, int listenPort, RedisConfig* rc
|
|
|
, _redisMemberStatus(false)
|
|
|
, _smee(NULL)
|
|
|
{
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::CV1");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
char myAddress[64];
|
|
|
_myAddressStr = myId.address().toString(myAddress);
|
|
|
_connString = std::string(path);
|
|
@@ -98,6 +104,9 @@ CV1::CV1(const Identity& myId, const char* path, int listenPort, RedisConfig* rc
|
|
|
_pool->unborrow(c);
|
|
|
|
|
|
if (_rc != NULL) {
|
|
|
+ auto innerspan = tracer->StartSpan("cv1::CV1::configureRedis");
|
|
|
+ auto innerscope = tracer->WithActiveSpan(innerspan);
|
|
|
+
|
|
|
sw::redis::ConnectionOptions opts;
|
|
|
sw::redis::ConnectionPoolOptions poolOpts;
|
|
|
opts.host = _rc->hostname;
|
|
@@ -111,10 +120,12 @@ CV1::CV1(const Identity& myId, const char* path, int listenPort, RedisConfig* rc
|
|
|
poolOpts.connection_lifetime = std::chrono::minutes(3);
|
|
|
poolOpts.connection_idle_time = std::chrono::minutes(1);
|
|
|
if (_rc->clusterMode) {
|
|
|
+ innerspan->SetAttribute("cluster_mode", "true");
|
|
|
fprintf(stderr, "Using Redis in Cluster Mode\n");
|
|
|
_cluster = std::make_shared<sw::redis::RedisCluster>(opts, poolOpts);
|
|
|
}
|
|
|
else {
|
|
|
+ innerspan->SetAttribute("cluster_mode", "false");
|
|
|
fprintf(stderr, "Using Redis in Standalone Mode\n");
|
|
|
_redis = std::make_shared<sw::redis::Redis>(opts, poolOpts);
|
|
|
}
|
|
@@ -161,6 +172,11 @@ CV1::~CV1()
|
|
|
|
|
|
void CV1::configureSmee()
|
|
|
{
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::configureSmee");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
const char* TEMPORAL_SCHEME = "ZT_TEMPORAL_SCHEME";
|
|
|
const char* TEMPORAL_HOST = "ZT_TEMPORAL_HOST";
|
|
|
const char* TEMPORAL_PORT = "ZT_TEMPORAL_PORT";
|
|
@@ -202,6 +218,11 @@ bool CV1::isReady()
|
|
|
|
|
|
bool CV1::save(nlohmann::json& record, bool notifyListeners)
|
|
|
{
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::save");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
bool modified = false;
|
|
|
try {
|
|
|
if (! record.is_object()) {
|
|
@@ -257,6 +278,13 @@ bool CV1::save(nlohmann::json& record, bool notifyListeners)
|
|
|
|
|
|
void CV1::eraseNetwork(const uint64_t networkId)
|
|
|
{
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::eraseNetwork");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+ char networkIdStr[17];
|
|
|
+ span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
|
|
|
+
|
|
|
fprintf(stderr, "PostgreSQL::eraseNetwork\n");
|
|
|
char tmp2[24];
|
|
|
waitForReady();
|
|
@@ -272,6 +300,15 @@ void CV1::eraseNetwork(const uint64_t networkId)
|
|
|
|
|
|
void CV1::eraseMember(const uint64_t networkId, const uint64_t memberId)
|
|
|
{
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::eraseMember");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+ char networkIdStr[17];
|
|
|
+ char memberIdStr[11];
|
|
|
+ span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
|
|
|
+ span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
|
|
|
+
|
|
|
fprintf(stderr, "PostgreSQL::eraseMember\n");
|
|
|
char tmp2[24];
|
|
|
waitForReady();
|
|
@@ -289,6 +326,18 @@ void CV1::eraseMember(const uint64_t networkId, const uint64_t memberId)
|
|
|
|
|
|
void CV1::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress, const char* osArch)
|
|
|
{
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::nodeIsOnline");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+ char networkIdStr[17];
|
|
|
+ char memberIdStr[11];
|
|
|
+ char ipStr[INET6_ADDRSTRLEN];
|
|
|
+ span->SetAttribute("network_id", Utils::hex(networkId, networkIdStr));
|
|
|
+ span->SetAttribute("member_id", Utils::hex10(memberId, memberIdStr));
|
|
|
+ span->SetAttribute("physical_address", physicalAddress.toString(ipStr));
|
|
|
+ span->SetAttribute("os_arch", osArch);
|
|
|
+
|
|
|
std::lock_guard<std::mutex> l(_lastOnline_l);
|
|
|
NodeOnlineRecord& i = _lastOnline[std::pair<uint64_t, uint64_t>(networkId, memberId)];
|
|
|
i.lastSeen = OSUtils::now();
|
|
@@ -305,6 +354,11 @@ void CV1::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const
|
|
|
|
|
|
AuthInfo CV1::getSSOAuthInfo(const nlohmann::json& member, const std::string& redirectURL)
|
|
|
{
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::getSSOAuthInfo");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
Metrics::db_get_sso_info++;
|
|
|
// NONCE is just a random character string. no semantic meaning
|
|
|
// state = HMAC SHA384 of Nonce based on shared sso key
|
|
@@ -484,6 +538,11 @@ AuthInfo CV1::getSSOAuthInfo(const nlohmann::json& member, const std::string& re
|
|
|
|
|
|
void CV1::initializeNetworks()
|
|
|
{
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::initializeNetworks");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
try {
|
|
|
std::string setKey = "networks:{" + _myAddressStr + "}";
|
|
|
|
|
@@ -769,6 +828,11 @@ void CV1::initializeNetworks()
|
|
|
|
|
|
void CV1::initializeMembers()
|
|
|
{
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::initializeMembers");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
std::string memberId;
|
|
|
std::string networkId;
|
|
|
try {
|
|
@@ -1054,6 +1118,11 @@ void CV1::heartbeat()
|
|
|
const char* hostname = hostnameTmp;
|
|
|
|
|
|
while (_run == 1) {
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::heartbeat");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
// fprintf(stderr, "%s: heartbeat\n", controllerId);
|
|
|
auto c = _pool->borrow();
|
|
|
int64_t ts = OSUtils::now();
|
|
@@ -1136,6 +1205,11 @@ void CV1::_membersWatcher_Postgres()
|
|
|
MemberNotificationReceiver m(this, *c->c, stream);
|
|
|
|
|
|
while (_run == 1) {
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::_membersWatcher_Postgres");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
c->c->await_notification(5, 0);
|
|
|
}
|
|
|
|
|
@@ -1149,6 +1223,11 @@ void CV1::_membersWatcher_Redis()
|
|
|
std::string lastID = "0";
|
|
|
fprintf(stderr, "Listening to member stream: %s\n", key.c_str());
|
|
|
while (_run == 1) {
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::_membersWatcher_Redis");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
try {
|
|
|
json tmp;
|
|
|
std::unordered_map<std::string, ItemStream> result;
|
|
@@ -1237,6 +1316,11 @@ void CV1::_networksWatcher_Postgres()
|
|
|
NetworkNotificationReceiver n(this, *c->c, stream);
|
|
|
|
|
|
while (_run == 1) {
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::_networksWatcher_Postgres");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
c->c->await_notification(5, 0);
|
|
|
}
|
|
|
}
|
|
@@ -1247,6 +1331,11 @@ void CV1::_networksWatcher_Redis()
|
|
|
std::string key = "network-stream:{" + std::string(_myAddress.toString(buf)) + "}";
|
|
|
std::string lastID = "0";
|
|
|
while (_run == 1) {
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::_networksWatcher_Redis");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
try {
|
|
|
json tmp;
|
|
|
std::unordered_map<std::string, ItemStream> result;
|
|
@@ -1314,6 +1403,11 @@ void CV1::commitThread()
|
|
|
fprintf(stderr, "%s: commitThread start\n", _myAddressStr.c_str());
|
|
|
std::pair<nlohmann::json, bool> qitem;
|
|
|
while (_commitQueue.get(qitem) & (_run == 1)) {
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::commitThread");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
// fprintf(stderr, "commitThread tick\n");
|
|
|
if (! qitem.first.is_object()) {
|
|
|
fprintf(stderr, "not an object\n");
|
|
@@ -1339,6 +1433,9 @@ void CV1::commitThread()
|
|
|
nlohmann::json& config = (qitem.first);
|
|
|
const std::string objtype = config["objtype"];
|
|
|
if (objtype == "member") {
|
|
|
+ auto mspan = tracer->StartSpan("cv1::commitThread::member");
|
|
|
+ auto mscope = tracer->WithActiveSpan(mspan);
|
|
|
+
|
|
|
// fprintf(stderr, "%s: commitThread: member\n", _myAddressStr.c_str());
|
|
|
std::string memberId;
|
|
|
std::string networkId;
|
|
@@ -1489,6 +1586,9 @@ void CV1::commitThread()
|
|
|
}
|
|
|
}
|
|
|
else if (objtype == "network") {
|
|
|
+ auto nspan = tracer->StartSpan("cv1::commitThread::network");
|
|
|
+ auto nscope = tracer->WithActiveSpan(nspan);
|
|
|
+
|
|
|
try {
|
|
|
// fprintf(stderr, "%s: commitThread: network\n", _myAddressStr.c_str());
|
|
|
pqxx::work w(*c->c);
|
|
@@ -1644,6 +1744,9 @@ void CV1::commitThread()
|
|
|
}
|
|
|
}
|
|
|
else if (objtype == "_delete_network") {
|
|
|
+ auto dspan = tracer->StartSpan("cv1::commitThread::_delete_network");
|
|
|
+ auto dscope = tracer->WithActiveSpan(dspan);
|
|
|
+
|
|
|
// fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str());
|
|
|
try {
|
|
|
pqxx::work w(*c->c);
|
|
@@ -1677,6 +1780,9 @@ void CV1::commitThread()
|
|
|
}
|
|
|
}
|
|
|
else if (objtype == "_delete_member") {
|
|
|
+ auto mspan = tracer->StartSpan("cv1::commitThread::_delete_member");
|
|
|
+ auto mscope = tracer->WithActiveSpan(mspan);
|
|
|
+
|
|
|
// fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str());
|
|
|
try {
|
|
|
pqxx::work w(*c->c);
|
|
@@ -1727,6 +1833,11 @@ void CV1::commitThread()
|
|
|
|
|
|
void CV1::notifyNewMember(const std::string& networkID, const std::string& memberID)
|
|
|
{
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::notifyNewMember");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
smeeclient::smee_client_notify_network_joined(_smee, networkID.c_str(), memberID.c_str());
|
|
|
}
|
|
|
|
|
@@ -1756,6 +1867,11 @@ void CV1::onlineNotification_Postgres()
|
|
|
|
|
|
nlohmann::json jtmp1, jtmp2;
|
|
|
while (_run == 1) {
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::onlineNotification_Postgres");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
auto c = _pool->borrow();
|
|
|
auto c2 = _pool->borrow();
|
|
|
try {
|
|
@@ -1856,6 +1972,11 @@ void CV1::onlineNotification_Redis()
|
|
|
std::string controllerId = std::string(_myAddress.toString(buf));
|
|
|
|
|
|
while (_run == 1) {
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::onlineNotification_Redis");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
fprintf(stderr, "onlineNotification tick\n");
|
|
|
auto start = std::chrono::high_resolution_clock::now();
|
|
|
uint64_t count = 0;
|
|
@@ -1892,8 +2013,12 @@ void CV1::onlineNotification_Redis()
|
|
|
}
|
|
|
|
|
|
uint64_t CV1::_doRedisUpdate(sw::redis::Transaction& tx, std::string& controllerId, std::unordered_map<std::pair<uint64_t, uint64_t>, NodeOnlineRecord, _PairHasher>& lastOnline)
|
|
|
-
|
|
|
{
|
|
|
+ auto provider = opentelemetry::trace::Provider::GetTracerProvider();
|
|
|
+ auto tracer = provider->GetTracer("cv1");
|
|
|
+ auto span = tracer->StartSpan("cv1::_doRedisUpdate");
|
|
|
+ auto scope = tracer->WithActiveSpan(span);
|
|
|
+
|
|
|
nlohmann::json jtmp1, jtmp2;
|
|
|
uint64_t count = 0;
|
|
|
for (auto i = lastOnline.begin(); i != lastOnline.end(); ++i) {
|