|
@@ -710,11 +710,11 @@ void PostgreSQL::initializeNetworks()
|
|
if (_redisMemberStatus) {
|
|
if (_redisMemberStatus) {
|
|
fprintf(stderr, "adding networks to redis...\n");
|
|
fprintf(stderr, "adding networks to redis...\n");
|
|
if (_rc->clusterMode) {
|
|
if (_rc->clusterMode) {
|
|
- auto tx = _cluster->transaction(_myAddressStr, true);
|
|
|
|
|
|
+ auto tx = _cluster->transaction(_myAddressStr, true, false);
|
|
tx.sadd(setKey, networkSet.begin(), networkSet.end());
|
|
tx.sadd(setKey, networkSet.begin(), networkSet.end());
|
|
tx.exec();
|
|
tx.exec();
|
|
} else {
|
|
} else {
|
|
- auto tx = _redis->transaction(true);
|
|
|
|
|
|
+ auto tx = _redis->transaction(true, false);
|
|
tx.sadd(setKey, networkSet.begin(), networkSet.end());
|
|
tx.sadd(setKey, networkSet.begin(), networkSet.end());
|
|
tx.exec();
|
|
tx.exec();
|
|
}
|
|
}
|
|
@@ -766,13 +766,13 @@ void PostgreSQL::initializeMembers()
|
|
if (!deletes.empty()) {
|
|
if (!deletes.empty()) {
|
|
try {
|
|
try {
|
|
if (_rc->clusterMode) {
|
|
if (_rc->clusterMode) {
|
|
- auto tx = _cluster->transaction(_myAddressStr, true);
|
|
|
|
|
|
+ auto tx = _cluster->transaction(_myAddressStr, true, false);
|
|
for (std::string k : deletes) {
|
|
for (std::string k : deletes) {
|
|
tx.del(k);
|
|
tx.del(k);
|
|
}
|
|
}
|
|
tx.exec();
|
|
tx.exec();
|
|
} else {
|
|
} else {
|
|
- auto tx = _redis->transaction(true);
|
|
|
|
|
|
+ auto tx = _redis->transaction(true, false);
|
|
for (std::string k : deletes) {
|
|
for (std::string k : deletes) {
|
|
tx.del(k);
|
|
tx.del(k);
|
|
}
|
|
}
|
|
@@ -926,13 +926,13 @@ void PostgreSQL::initializeMembers()
|
|
if (_redisMemberStatus) {
|
|
if (_redisMemberStatus) {
|
|
fprintf(stderr, "Load member data into redis...\n");
|
|
fprintf(stderr, "Load member data into redis...\n");
|
|
if (_rc->clusterMode) {
|
|
if (_rc->clusterMode) {
|
|
- auto tx = _cluster->transaction(_myAddressStr, true);
|
|
|
|
|
|
+ auto tx = _cluster->transaction(_myAddressStr, true, false);
|
|
for (auto it : networkMembers) {
|
|
for (auto it : networkMembers) {
|
|
tx.sadd(it.first, it.second);
|
|
tx.sadd(it.first, it.second);
|
|
}
|
|
}
|
|
tx.exec();
|
|
tx.exec();
|
|
} else {
|
|
} else {
|
|
- auto tx = _redis->transaction(true);
|
|
|
|
|
|
+ auto tx = _redis->transaction(true, false);
|
|
for (auto it : networkMembers) {
|
|
for (auto it : networkMembers) {
|
|
tx.sadd(it.first, it.second);
|
|
tx.sadd(it.first, it.second);
|
|
}
|
|
}
|
|
@@ -1014,12 +1014,16 @@ void PostgreSQL::heartbeat()
|
|
}
|
|
}
|
|
_pool->unborrow(c);
|
|
_pool->unborrow(c);
|
|
|
|
|
|
- if (_redisMemberStatus) {
|
|
|
|
- if (_rc->clusterMode) {
|
|
|
|
- _cluster->zadd("controllers", "controllerId", ts);
|
|
|
|
- } else {
|
|
|
|
- _redis->zadd("controllers", "controllerId", ts);
|
|
|
|
|
|
+ try {
|
|
|
|
+ if (_redisMemberStatus) {
|
|
|
|
+ if (_rc->clusterMode) {
|
|
|
|
+ _cluster->zadd("controllers", "controllerId", ts);
|
|
|
|
+ } else {
|
|
|
|
+ _redis->zadd("controllers", "controllerId", ts);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ } catch (sw::redis::Error &e) {
|
|
|
|
+ fprintf(stderr, "ERROR: Redis error in heartbeat thread: %s\n", e.what());
|
|
}
|
|
}
|
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
|
@@ -1696,10 +1700,10 @@ void PostgreSQL::onlineNotification_Redis()
|
|
try {
|
|
try {
|
|
if (!lastOnline.empty()) {
|
|
if (!lastOnline.empty()) {
|
|
if (_rc->clusterMode) {
|
|
if (_rc->clusterMode) {
|
|
- auto tx = _cluster->transaction(controllerId, true);
|
|
|
|
|
|
+ auto tx = _cluster->transaction(controllerId, true, false);
|
|
count = _doRedisUpdate(tx, controllerId, lastOnline);
|
|
count = _doRedisUpdate(tx, controllerId, lastOnline);
|
|
} else {
|
|
} else {
|
|
- auto tx = _redis->transaction(true);
|
|
|
|
|
|
+ auto tx = _redis->transaction(true, false);
|
|
count = _doRedisUpdate(tx, controllerId, lastOnline);
|
|
count = _doRedisUpdate(tx, controllerId, lastOnline);
|
|
}
|
|
}
|
|
}
|
|
}
|