浏览代码

Send member/network change notifications via Postgres

Grant Limberg 6 年之前
父节点
当前提交
0ec6215493
共有 1 个文件被更改,包括 46 次插入0 次删除
  1. 46 0
      controller/PostgreSQL.cpp

+ 46 - 0
controller/PostgreSQL.cpp

@@ -44,6 +44,21 @@ static const char *_timestr()
 	return ts;
 }
 
+std::string join(const std::vector<std::string> &elements, const char * const separator)
+{
+	switch(elements.size()) {
+	case 0:
+		return "";
+	case 1:
+		return elements[0];
+	default:
+		std::ostringstream os;
+		std::copy(elements.begin(), elements.end()-1, std::ostream_iterator<std::string>(os, separator));
+		os << *elements.rbegin();
+		return os.str();
+	}
+}
+
 }
 
 using namespace ZeroTier;
@@ -983,12 +998,16 @@ void PostgreSQL::onlineNotificationThread()
 
 	int64_t	lastUpdatedNetworkStatus = 0;
 	std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;
+	
 	while (_run == 1) {
 		if (PQstatus(conn) != CONNECTION_OK) {
 			fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
 			exit(-1);
 		}
 
+		// map used to send notifications to front end
+		std::unordered_map<std::string, std::vector<std::string>> updateMap;
+
 		std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
 		{
 			std::lock_guard<std::mutex> l(_lastOnline_l);
@@ -1024,6 +1043,10 @@ void PostgreSQL::onlineNotificationThread()
 
 			std::string networkId(nwidTmp);
 			std::string memberId(memTmp);
+
+			std::vector<std::string> &members = updateMap[networkId];
+			members.push_back(memberId);
+
 			int64_t ts = i->second.first;
 			std::string ipAddr = i->second.second.toIpString(ipTmp);
 
@@ -1108,6 +1131,10 @@ void PostgreSQL::onlineNotificationThread()
 				Utils::hex(i->first, tmp);
 
 				std::string networkId(tmp);
+
+				std::vector<std::string> &_notUsed = updateMap[networkId];
+				(void)_notUsed;
+
 				uint64_t authMemberCount = 0;
 				uint64_t totalMemberCount = 0;
 				uint64_t onlineMemberCount = 0;
@@ -1186,6 +1213,25 @@ void PostgreSQL::onlineNotificationThread()
 			}
 		}
 
+		for (auto it = updateMap.begin(); it != updateMap.end(); ++it) {
+			std::string networkId = it->first;
+			std::vector<std::string> members = it->second;
+			std::stringstream queryBuilder;
+
+			std::string membersStr = ::join(members, ",");
+
+			queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'";
+			std::string query = queryBuilder.str();
+
+			fprintf(stderr, "%s\n", query.c_str());
+
+			PGresult *res = PQexec(conn,query.c_str());
+			if (PQresultStatus(res) != PGRES_COMMAND_OK) {
+				fprintf(stderr, "ERROR: Error sending NOTIFY: %s\n", PQresultErrorMessage(res));
+			}
+			PQclear(res);
+		}
+
 		std::this_thread::sleep_for(std::chrono::milliseconds(250));
 	}
 	PQfinish(conn);