Kaynağa Gözat

fix XREAD commands for redis message queue

Grant Limberg 3 yıl önce
ebeveyn
işleme
ff18bacd94
1 değiştirilmiş dosya ile 8 ekleme ve 5 silme
  1. 8 5
      controller/PostgreSQL.cpp

+ 8 - 5
controller/PostgreSQL.cpp

@@ -937,15 +937,16 @@ void PostgreSQL::_membersWatcher_Postgres() {
 void PostgreSQL::_membersWatcher_Redis() {
 	char buf[11] = {0};
 	std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}";
+	std::string lastID = "0";
 	fprintf(stderr, "Listening to member stream: %s\n", key.c_str());
 	while (_run == 1) {
 		try {
 			json tmp;
 			std::unordered_map<std::string, ItemStream> result;
 			if (_rc->clusterMode) {
-				_cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
+				_cluster->xread(key, lastID, std::chrono::seconds(1), 10, std::inserter(result, result.end()));
 			} else {
-				_redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
+				_redis->xread(key, lastID, std::chrono::seconds(1), 10, std::inserter(result, result.end()));
 			}
 			if (!result.empty()) {
 				for (auto element : result) {
@@ -982,6 +983,7 @@ void PostgreSQL::_membersWatcher_Redis() {
 						} else {
 							_redis->xdel(key, id);
 						}
+						lastID = id;
 					}
 				}
 			}
@@ -1024,15 +1026,15 @@ void PostgreSQL::_networksWatcher_Postgres() {
 void PostgreSQL::_networksWatcher_Redis() {
 	char buf[11] = {0};
 	std::string key = "network-stream:{" + std::string(_myAddress.toString(buf)) + "}";
-	
+	std::string lastID = "0";
 	while (_run == 1) {
 		try {
 			json tmp;
 			std::unordered_map<std::string, ItemStream> result;
 			if (_rc->clusterMode) {
-				_cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
+				_cluster->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
 			} else {
-				_redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
+				_redis->xread(key, lastID, std::chrono::seconds(1), 0, std::inserter(result, result.end()));
 			}
 			
 			if (!result.empty()) {
@@ -1070,6 +1072,7 @@ void PostgreSQL::_networksWatcher_Redis() {
 						} else {
 							_redis->xdel(key, id);
 						}
+						lastID = id;
 					}
 				}
 			}