Browse Source

Finalize Redis integration

Grant Limberg 5 years ago
parent
commit
879ef58565
1 changed files with 131 additions and 21 deletions
  1. 131 21
      controller/PostgreSQL.cpp

+ 131 - 21
controller/PostgreSQL.cpp

@@ -113,11 +113,8 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
 		fprintf(stderr, "Central database schema version too low.  This controller version requires a minimum schema version of %d. Please upgrade your Central instance", DB_MINIMUM_VERSION);
 		fprintf(stderr, "Central database schema version too low.  This controller version requires a minimum schema version of %d. Please upgrade your Central instance", DB_MINIMUM_VERSION);
 		exit(1);
 		exit(1);
 	}
 	}
-
 	PQclear(res);
 	PQclear(res);
 	res = NULL;
 	res = NULL;
-	PQfinish(conn);
-	conn = NULL;
 
 
 	if (_rc != NULL) {
 	if (_rc != NULL) {
 		sw::redis::ConnectionOptions opts;
 		sw::redis::ConnectionOptions opts;
@@ -137,6 +134,16 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
 	}
 	}
 
 
 	_readyLock.lock();
 	_readyLock.lock();
+	
+	fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt());
+	_waitNoticePrinted = true;
+
+	initializeNetworks(conn);
+	initializeMembers(conn);
+
+	PQfinish(conn);
+	conn = NULL;
+
 	_heartbeatThread = std::thread(&PostgreSQL::heartbeat, this);
 	_heartbeatThread = std::thread(&PostgreSQL::heartbeat, this);
 	_membersDbWatcher = std::thread(&PostgreSQL::membersDbWatcher, this);
 	_membersDbWatcher = std::thread(&PostgreSQL::membersDbWatcher, this);
 	_networksDbWatcher = std::thread(&PostgreSQL::networksDbWatcher, this);
 	_networksDbWatcher = std::thread(&PostgreSQL::networksDbWatcher, this);
@@ -165,10 +172,6 @@ PostgreSQL::~PostgreSQL()
 bool PostgreSQL::waitForReady()
 bool PostgreSQL::waitForReady()
 {
 {
 	while (_ready < 2) {
 	while (_ready < 2) {
-		if (!_waitNoticePrinted) {
-			_waitNoticePrinted = true;
-			fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt());
-		}
 		_readyLock.lock();
 		_readyLock.lock();
 		_readyLock.unlock();
 		_readyLock.unlock();
 	}
 	}
@@ -236,6 +239,7 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId)
 void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
 void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
 {
 {
 	char tmp2[24];
 	char tmp2[24];
+	waitForReady();
 	std::pair<nlohmann::json,bool> tmp, nw;
 	std::pair<nlohmann::json,bool> tmp, nw;
 	Utils::hex(networkId, tmp2);
 	Utils::hex(networkId, tmp2);
 	tmp.first["nwid"] = tmp2;
 	tmp.first["nwid"] = tmp2;
@@ -265,11 +269,28 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
 			fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
 			fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
 			exit(1);
 			exit(1);
 		}
 		}
+		
+		std::string setKey = "networks:{" + std::string(_myAddressStr.c_str()) + "}";
 
 
+		if (_rc != NULL) {
+			try {
+				if (_rc->clusterMode) {
+					_cluster->del(setKey);
+				} else {
+					_redis->del(setKey);
+				}
+			} catch (sw::redis::Error &e) {
+				// del can throw an error if the key doesn't exist
+				// swallow it and move along
+			}
+		}
+		
 		const char *params[1] = {
 		const char *params[1] = {
 			_myAddressStr.c_str()
 			_myAddressStr.c_str()
 		};
 		};
 
 
+		fprintf(stderr, "Initializing Networks...\n");
+
 		PGresult *res = PQexecParams(conn, "SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, "
 		PGresult *res = PQexecParams(conn, "SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, "
 			"enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, "
 			"enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, "
 			"remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network "
 			"remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network "
@@ -295,9 +316,18 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
 			const char *nwidparam[1] = {
 			const char *nwidparam[1] = {
 				PQgetvalue(res, i, 0)
 				PQgetvalue(res, i, 0)
 			};
 			};
+			std::string nwid = PQgetvalue(res, i, 0);
+			
+			if (_rc != NULL) {
+				if (_rc->clusterMode) {
+					_cluster->sadd(setKey, nwid);
+				} else {
+					_redis->sadd(setKey, nwid);
+				}
+			}
 
 
-			config["id"] = PQgetvalue(res, i, 0);
-			config["nwid"] = PQgetvalue(res, i, 0);
+			config["id"] = nwid;
+			config["nwid"] = nwid;
 			try {
 			try {
 				config["creationTime"] = std::stoull(PQgetvalue(res, i, 1));
 				config["creationTime"] = std::stoull(PQgetvalue(res, i, 1));
 			} catch (std::exception &e) {
 			} catch (std::exception &e) {
@@ -416,8 +446,11 @@ void PostgreSQL::initializeNetworks(PGconn *conn)
 			}
 			}
 			_readyLock.unlock();
 			_readyLock.unlock();
 		}
 		}
+	} catch (sw::redis::Error &e) {
+		fprintf(stderr, "ERROR: Error initializing networks in Redis: %s\n", e.what());
+		exit(-1);
 	} catch (std::exception &e) {
 	} catch (std::exception &e) {
-		fprintf(stderr, "ERROR: Error initializing networks: %s", e.what());
+		fprintf(stderr, "ERROR: Error initializing networks: %s\n", e.what());
 		exit(-1);
 		exit(-1);
 	}
 	}
 }
 }
@@ -429,11 +462,32 @@ void PostgreSQL::initializeMembers(PGconn *conn)
 			fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
 			fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn));
 			exit(1);
 			exit(1);
 		}
 		}
-
+		std::string setKeyBase = "network-nodes-all:{" + std::string(_myAddressStr.c_str()) + "}:";
+		if (_rc != NULL) {
+			std::lock_guard<std::mutex> l(_networks_l);
+			for ( auto it : _networks) {
+				uint64_t nwid_i = it.first;
+				char nwidTmp[64] = {0};
+				OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i);
+				std::string nwid(nwidTmp);
+				std::string key = setKeyBase + nwid;
+				if (_rc->clusterMode) {
+					try {
+						_cluster->del(key);
+					} catch (...) {}
+				} else {
+					try {
+						_redis->del(key);
+					} catch (...) {}
+				}
+			}
+		}
+		
 		const char *params[1] = {
 		const char *params[1] = {
 			_myAddressStr.c_str()
 			_myAddressStr.c_str()
 		};
 		};
 
 
+		fprintf(stderr, "Initializing Members...\n");
 		PGresult *res = PQexecParams(conn,
 		PGresult *res = PQexecParams(conn,
 			"SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, "
 			"SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, "
 			"	EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, "
 			"	EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, "
@@ -464,6 +518,15 @@ void PostgreSQL::initializeMembers(PGconn *conn)
 
 
 			std::string memberId(PQgetvalue(res, i, 0));
 			std::string memberId(PQgetvalue(res, i, 0));
 			std::string networkId(PQgetvalue(res, i, 1));
 			std::string networkId(PQgetvalue(res, i, 1));
+
+			if (_rc != NULL) {
+				if (_rc->clusterMode) {
+					_cluster->sadd(setKeyBase + networkId, memberId);
+				} else {
+					_redis->sadd(setKeyBase + networkId, memberId);
+				}
+			}
+
 			std::string ctime = PQgetvalue(res, i, 5);
 			std::string ctime = PQgetvalue(res, i, 5);
 			config["id"] = memberId;
 			config["id"] = memberId;
 			config["nwid"] = networkId;
 			config["nwid"] = networkId;
@@ -564,6 +627,14 @@ void PostgreSQL::initializeMembers(PGconn *conn)
 				config["ipAssignments"].push_back(ipaddr);
 				config["ipAssignments"].push_back(ipaddr);
 			}
 			}
 
 
+			if (_rc != NULL) {
+				if (_rc->clusterMode) {
+					_cluster->sadd(setKeyBase + networkId, memberId);
+				} else {
+					_redis->sadd(setKeyBase + networkId, memberId);
+				}
+			}
+
 			_memberChanged(empty, config, false);
 			_memberChanged(empty, config, false);
 		}
 		}
 
 
@@ -575,6 +646,8 @@ void PostgreSQL::initializeMembers(PGconn *conn)
 			}
 			}
 			_readyLock.unlock();
 			_readyLock.unlock();
 		}
 		}
+	} catch (sw::redis::Error &e) {
+		fprintf(stderr, "ERROR: Error initializing members (redis): %s\n", e.what());
 	} catch (std::exception &e) {
 	} catch (std::exception &e) {
 		fprintf(stderr, "ERROR: Error initializing members: %s\n", e.what());
 		fprintf(stderr, "ERROR: Error initializing members: %s\n", e.what());
 		exit(-1);
 		exit(-1);
@@ -670,8 +743,6 @@ void PostgreSQL::membersDbWatcher()
 		exit(1);
 		exit(1);
 	}
 	}
 
 
-	initializeMembers(conn);
-
 	if (_rc) {
 	if (_rc) {
 		PQfinish(conn);
 		PQfinish(conn);
 		conn = NULL;
 		conn = NULL;
@@ -797,8 +868,6 @@ void PostgreSQL::networksDbWatcher()
 		exit(1);
 		exit(1);
 	}
 	}
 
 
-	initializeNetworks(conn);
-
 	if (_rc) {
 	if (_rc) {
 		PQfinish(conn);
 		PQfinish(conn);
 		conn = NULL;
 		conn = NULL;
@@ -1344,6 +1413,20 @@ void PostgreSQL::commitThread()
 				} catch (std::exception &e) {
 				} catch (std::exception &e) {
 					fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
 					fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
 				}
 				}
+				if (_rc != NULL) {
+					try {
+						std::string id = (*config)["id"];
+						std::string controllerId = _myAddressStr.c_str();
+						std::string key = "networks:{" + controllerId + "}";
+						if (_rc->clusterMode) {
+							_cluster->sadd(key, id);
+						} else {
+							_redis->sadd(key, id);
+						}
+					} catch (sw::redis::Error &e) {
+						fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
+					}
+				}
 			} else if (objtype == "_delete_network") {
 			} else if (objtype == "_delete_network") {
 				try {
 				try {
 					std::string networkId = (*config)["nwid"];
 					std::string networkId = (*config)["nwid"];
@@ -1367,6 +1450,20 @@ void PostgreSQL::commitThread()
 				} catch (std::exception &e) {
 				} catch (std::exception &e) {
 					fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what());
 					fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what());
 				}
 				}
+				if (_rc != NULL) {
+					try {
+						std::string id = (*config)["id"];
+						std::string controllerId = _myAddressStr.c_str();
+						std::string key = "networks:{" + controllerId + "}";
+						if (_rc->clusterMode) {
+							_cluster->srem(key, id);
+						} else {
+							_redis->srem(key, id);
+						}
+					} catch (sw::redis::Error &e) {
+						fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what());
+					}
+				}
 			} else if (objtype == "_delete_member") {
 			} else if (objtype == "_delete_member") {
 				try {
 				try {
 					std::string memberId = (*config)["id"];
 					std::string memberId = (*config)["id"];
@@ -1394,6 +1491,21 @@ void PostgreSQL::commitThread()
 				} catch (std::exception &e) {
 				} catch (std::exception &e) {
 					fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what());
 					fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what());
 				}
 				}
+				if (_rc != NULL) {
+					try {
+						std::string memberId = (*config)["id"];
+						std::string networkId = (*config)["nwid"];
+						std::string controllerId = _myAddressStr.c_str();
+						std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId;
+						if (_rc->clusterMode) {
+							_cluster->srem(key, memberId);
+						} else {
+							_redis->srem(key, memberId);
+						}
+					} catch (sw::redis::Error &e) {
+						fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what());
+					}
+				}
 			} else {
 			} else {
 				fprintf(stderr, "ERROR: unknown objtype");
 				fprintf(stderr, "ERROR: unknown objtype");
 			}
 			}
@@ -1414,6 +1526,8 @@ void PostgreSQL::commitThread()
 
 
 void PostgreSQL::onlineNotificationThread()
 void PostgreSQL::onlineNotificationThread()
 {
 {
+	waitForReady();
+
 	if (_rc != NULL) {
 	if (_rc != NULL) {
 		onlineNotification_Redis();
 		onlineNotification_Redis();
 	} else {
 	} else {
@@ -1569,7 +1683,6 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control
 	std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > &lastOnline) 
 	std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > &lastOnline) 
 
 
 {
 {
-	fprintf(stderr, "Redis Update\n");
 	nlohmann::json jtmp1, jtmp2;
 	nlohmann::json jtmp1, jtmp2;
 	for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
 	for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
 		uint64_t nwid_i = i->first.first;
 		uint64_t nwid_i = i->first.first;
@@ -1581,12 +1694,9 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control
 		OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", memberid_i);
 		OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", memberid_i);
 
 
 		if (!get(nwid_i, jtmp1, memberid_i, jtmp2)){
 		if (!get(nwid_i, jtmp1, memberid_i, jtmp2)){
+			fprintf(stderr, "network or member doesn't exist\n");
 			continue;  // skip non existent members/networks
 			continue;  // skip non existent members/networks
 		}
 		}
-		auto found = _networks.find(nwid_i);
-		if (found == _networks.end()) {
-			continue; // skip members trying to join non-existant networks
-		}
 
 
 		std::string networkId(nwidTmp);
 		std::string networkId(nwidTmp);
 		std::string memberId(memTmp);
 		std::string memberId(memTmp);
@@ -1603,7 +1713,7 @@ void PostgreSQL::_doRedisUpdate(sw::redis::Transaction &tx, std::string &control
 		tx.zadd("nodes-online:{"+controllerId+"}", memberId, ts)
 		tx.zadd("nodes-online:{"+controllerId+"}", memberId, ts)
 			.zadd("network-nodes-online:{"+controllerId+"}:"+networkId, memberId, ts)
 			.zadd("network-nodes-online:{"+controllerId+"}:"+networkId, memberId, ts)
 			.sadd("network-nodes-all:{"+controllerId+"}:"+networkId, memberId)
 			.sadd("network-nodes-all:{"+controllerId+"}:"+networkId, memberId)
-			.hmset("network:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end());
+			.hmset("member:{"+controllerId+"}:"+networkId+":"+memberId, record.begin(), record.end());
 	}
 	}
 
 
 	tx.exec();
 	tx.exec();