Преглед на файлове

more redis in the controller

Grant Limberg преди 3 години
родител
ревизия
10212e376a
променени са 1 файла, в които са добавени 148 реда и са изтрити 1 реда
  1. 148 1
      controller/PostgreSQL.cpp

+ 148 - 1
controller/PostgreSQL.cpp

@@ -512,6 +512,18 @@ void PostgreSQL::initializeNetworks()
 		
 		fprintf(stderr, "Initializing Networks...\n");
 
+		if (_redisMemberStatus) {
+			try {
+				if (_rc->clusterMode) {
+					_cluster->del(setKey);
+				} else {
+					_redis->del(setKey);
+				}
+			} catch (...) {}
+		}
+
+		std::unordered_set<std::string> networkSet;
+
 		char qbuf[2048] = {0};
 		sprintf(qbuf, "SELECT n.id, (EXTRACT(EPOCH FROM n.creation_time AT TIME ZONE 'UTC')*1000)::bigint as creation_time, n.capabilities, "
 			"n.enable_broadcast, (EXTRACT(EPOCH FROM n.last_modified AT TIME ZONE 'UTC')*1000)::bigint AS last_modified, n.mtu, n.multicast_limit, n.name, n.private, n.remote_trace_level, "
@@ -616,6 +628,8 @@ void PostgreSQL::initializeNetworks()
 			config["clientId"] = clientId.value_or("");
 			config["authorizationEndpoint"] = authorizationEndpoint.value_or("");
 
+			networkSet.insert(nwid);
+
 			if (dnsDomain.has_value()) {
 				std::string serverList = dnsServers.value();
 				json obj;
@@ -680,6 +694,20 @@ void PostgreSQL::initializeNetworks()
 		_pool->unborrow(c2);
 		_pool->unborrow(c);
 
+		if (!networkSet.empty()) {
+			if (_redisMemberStatus) {
+				if (_rc->clusterMode) {
+					auto tx = _cluster->transaction(_myAddressStr, true);
+					tx.sadd(setKey, networkSet.begin(), networkSet.end());
+					tx.exec();
+				} else {
+					auto tx = _cluster->transaction(_myAddressStr, true);
+					tx.sadd(setKey, networkSet.begin(), networkSet.end());
+					tx.exec();
+				}
+			}
+		}
+
 		if (++this->_ready == 2) {
 			if (_waitNoticePrinted) {
 				fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
@@ -703,6 +731,39 @@ void PostgreSQL::initializeMembers()
 		std::unordered_map<std::string, std::string> networkMembers;
 		fprintf(stderr, "Initializing Members...\n");
 
+		std::string setKeyBase = "network-nodes-all:{" + _myAddressStr + "}:";
+
+		if (_redisMemberStatus) {
+			std::lock_guard<std::mutex> l(_networks_l);
+			std::unordered_set<std::string> deletes;
+			for ( auto it : _networks) {
+				uint64_t nwid_i = it.first;
+				char nwidTmp[64] = {0};
+				OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
+				std::string nwid(nwidTmp);
+				std::string key = setKeyBase + nwid;
+				deletes.insert(key);
+			}
+
+			if (!deletes.empty()) {
+				try {
+					if (_rc->clusterMode) {
+						auto tx = _cluster->transaction(_myAddressStr, true);
+						for (std::string k : deletes) {
+							tx.del(k);
+						}
+						tx.exec();
+					} else {
+						auto tx = _redis->transaction(true);
+						for (std::string k : deletes) {
+							tx.del(k);
+						}
+						tx.exec();
+					}
+				} catch (...) {}
+			}
+		}
+
 		char qbuf[2048];
 		sprintf(qbuf, "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, (EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000)::bigint, m.identity, "
 			"	(EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
@@ -782,6 +843,8 @@ void PostgreSQL::initializeMembers()
 			std::optional<uint64_t> authenticationExpiryTime = std::get<19>(row);
 			std::string assignedAddresses = std::get<20>(row);
 
+			networkMembers.insert(std::pair<std::string, std::string>(setKeyBase+networkId, memberId));
+
 			config["id"] = memberId;
 			config["address"] = memberId;
 			config["nwid"] = networkId;
@@ -837,6 +900,24 @@ void PostgreSQL::initializeMembers()
 		_pool->unborrow(c2);
 		_pool->unborrow(c);
 
+		if (!networkMembers.empty()) {
+			if (_redisMemberStatus) {
+				if (_rc->clusterMode) {
+					auto tx = _cluster->transaction(_myAddressStr, true);
+					for (auto it : networkMembers) {
+						tx.sadd(it.first, it.second);
+					}
+					tx.exec();
+				} else {
+					auto tx = _redis->transaction(true);
+					for (auto it : networkMembers) {
+						tx.sadd(it.first, it.second);
+					}
+					tx.exec();
+				}
+			}
+		}
+
 		if (++this->_ready == 2) {
 			if (_waitNoticePrinted) {
 				fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt());
@@ -905,6 +986,14 @@ void PostgreSQL::heartbeat()
 		}
 		_pool->unborrow(c);
 
+		if (_redisMemberStatus) {
+			if (_rc->clusterMode) {
+				_cluster->zadd("controllers", "controllerId", ts);
+			} else {
+				_redis->zadd("controllers", "controllerId", ts);
+			}
+		}
+
 		std::this_thread::sleep_for(std::chrono::milliseconds(1000));
 	}
 	fprintf(stderr, "Exited heartbeat thread\n");
@@ -1353,6 +1442,20 @@ void PostgreSQL::commitThread()
 				} catch (std::exception &e) {
 					fprintf(stderr, "%s ERROR: Error updating network: %s\n", _myAddressStr.c_str(), e.what());
 				}
+				if (_redisMemberStatus) {
+					try {
+						std::string id = config["id"];
+						std::string controllerId = _myAddressStr.c_str();
+						std::string key = "networks:{" + controllerId + "}";
+						if (_rc->clusterMode) {
+							_cluster->sadd(key, id);
+						} else {
+							_redis->sadd(key, id);
+						}
+					} catch (sw::redis::Error &e) {
+						fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
+					}
+				}
 			} else if (objtype == "_delete_network") {
 				// fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str());
 				try {
@@ -1367,6 +1470,22 @@ void PostgreSQL::commitThread()
 				} catch (std::exception &e) {
 					fprintf(stderr, "%s ERROR: Error deleting network: %s\n", _myAddressStr.c_str(), e.what());
 				}
+				if (_redisMemberStatus) {
+					try {
+						std::string id = config["id"];
+						std::string controllerId = _myAddressStr.c_str();
+						std::string key = "networks:{" + controllerId + "}";
+						if (_rc->clusterMode) {
+							_cluster->srem(key, id);
+							_cluster->del("network-nodes-online:{"+controllerId+"}:"+id);
+						} else {
+							_redis->srem(key, id);
+							_redis->del("network-nodes-online:{"+controllerId+"}:"+id);
+						}
+					} catch (sw::redis::Error &e) {
+						fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
+					}
+				}
 
 			} else if (objtype == "_delete_member") {
 				// fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str());
@@ -1384,6 +1503,23 @@ void PostgreSQL::commitThread()
 				} catch (std::exception &e) {
 					fprintf(stderr, "%s ERROR: Error deleting member: %s\n", _myAddressStr.c_str(), e.what());
 				}
+				if (_redisMemberStatus) {
+					try {
+						std::string memberId = config["id"];
+						std::string networkId = config["nwid"];
+						std::string controllerId = _myAddressStr.c_str();
+						std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId;
+						if (_rc->clusterMode) {
+							_cluster->srem(key, memberId);
+							_cluster->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
+						} else {
+							_redis->srem(key, memberId);
+							_redis->del("member:{"+controllerId+"}:"+networkId+":"+memberId);
+						}
+					} catch (sw::redis::Error &e) {
+						fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what());
+					}
+				}
 			} else {
 				fprintf(stderr, "%s ERROR: unknown objtype\n", _myAddressStr.c_str());
 			}
@@ -1408,6 +1544,15 @@ void PostgreSQL::onlineNotificationThread()
 	}
 }
 
+/**
+ * ONLY UNCOMMENT FOR TEMPORARY DB MAINTENANCE
+ *
+ * This define temproarly turns off writing to the member status table
+ * so it can be reindexed when the indexes get too large.
+ */
+
+// #define DISABLE_MEMBER_STATUS 1
+
 void PostgreSQL::onlineNotification_Postgres()
 {
 	_connected = 1;
@@ -1423,7 +1568,8 @@ void PostgreSQL::onlineNotification_Postgres()
 				std::lock_guard<std::mutex> l(_lastOnline_l);
 				lastOnline.swap(_lastOnline);
 			}
-			
+
+#ifndef DISABLE_MEMBER_STATUS
 			pqxx::work w(*c->c);
 			pqxx::work w2(*c2->c);
 
@@ -1482,6 +1628,7 @@ void PostgreSQL::onlineNotification_Postgres()
 			pipe.complete();
 			w.commit();
 			fprintf(stderr, "%s: Updated online status of %d members\n", _myAddressStr.c_str(), updateCount);
+#endif
 		} catch (std::exception &e) {
 			fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what());
 		}