|
@@ -77,7 +77,8 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident
|
|
|
_lastAggregateStatsReport(0),
|
|
|
_lastAggregateAllocation(0),
|
|
|
_virtualPathCount(0),
|
|
|
- _roundRobinPathAssignmentIdx(0)
|
|
|
+ _roundRobinPathAssignmentIdx(0),
|
|
|
+ _pathAssignmentIdx(0)
|
|
|
{
|
|
|
if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH))
|
|
|
throw ZT_EXCEPTION_INVALID_ARGUMENT;
|
|
@@ -468,16 +469,18 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired, int64
|
|
|
_paths[i].p->processBackgroundPathMeasurements(now);
|
|
|
}
|
|
|
}
|
|
|
- // Detect new flows and update existing records
|
|
|
- if (_flows.count(flowId)) {
|
|
|
- _flows[flowId]->lastSend = now;
|
|
|
- }
|
|
|
- else {
|
|
|
- fprintf(stderr, "new flow %llx detected between this node and %llx (%lu active flow(s))\n",
|
|
|
- flowId, this->_id.address().toInt(), (_flows.size()+1));
|
|
|
- struct Flow *newFlow = new Flow(flowId, now);
|
|
|
- _flows[flowId] = newFlow;
|
|
|
- newFlow->assignedPath = nullptr;
|
|
|
+ if (RR->sw->isFlowAware()) {
|
|
|
+ // Detect new flows and update existing records
|
|
|
+ if (_flows.count(flowId)) {
|
|
|
+ _flows[flowId]->lastSend = now;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ fprintf(stderr, "new flow %llx detected between this node and %llx (%lu active flow(s))\n",
|
|
|
+ flowId, this->_id.address().toInt(), (_flows.size()+1));
|
|
|
+ struct Flow *newFlow = new Flow(flowId, now);
|
|
|
+ _flows[flowId] = newFlow;
|
|
|
+ newFlow->assignedPath = nullptr;
|
|
|
+ }
|
|
|
}
|
|
|
// Construct set of virtual paths if needed
|
|
|
if (!_virtualPaths.size()) {
|
|
@@ -532,45 +535,64 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired, int64
|
|
|
if ((now - _paths[i].p->lastIn()) < ZT_MULTIPATH_ACTIVE_BACKUP_RAPID_FAILOVER_PERIOD) {
|
|
|
bFoundHotPath = true;
|
|
|
_activeBackupPath = _paths[i].p;
|
|
|
+ _pathAssignmentIdx = i;
|
|
|
_activeBackupPath->address().toString(curPathStr);
|
|
|
- fprintf(stderr, "selected %s as the primary active-backup path to %llx\n",
|
|
|
- curPathStr, this->_id.address().toInt());
|
|
|
+ fprintf(stderr, "selected %s as the primary active-backup path to %llx (idx=%d)\n",
|
|
|
+ curPathStr, this->_id.address().toInt(), _pathAssignmentIdx);
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- if (!_activeBackupPath) {
|
|
|
- return SharedPtr<Path>();
|
|
|
- }
|
|
|
- if (!bFoundHotPath) {
|
|
|
- _activeBackupPath->address().toString(curPathStr);
|
|
|
- fprintf(stderr, "no hot paths available to to use as active-backup primary to %llx, selected %s anyway\n",
|
|
|
- this->_id.address().toInt(), curPathStr);
|
|
|
- }
|
|
|
}
|
|
|
else {
|
|
|
+ char what[128];
|
|
|
if ((now - _activeBackupPath->lastIn()) > ZT_MULTIPATH_ACTIVE_BACKUP_RAPID_FAILOVER_PERIOD) {
|
|
|
- _activeBackupPath->address().toString(curPathStr);
|
|
|
- /* Fail-over to the fist path that appears to still be active.
|
|
|
- * This will eventually be user-configurable */
|
|
|
- for (int i=0; i<ZT_MAX_PEER_NETWORK_PATHS; i++) {
|
|
|
- if (_paths[i].p) {
|
|
|
- if (_activeBackupPath.ptr() == _paths[i].p.ptr()) {
|
|
|
+ _activeBackupPath->address().toString(curPathStr); // Record path string for later debug trace
|
|
|
+ int16_t previousIdx = _pathAssignmentIdx;
|
|
|
+ SharedPtr<Path> nextAlternativePath;
|
|
|
+ // Search for a hot path, at the same time find the next path in
|
|
|
+ // a RR sequence that seems viable to use as an alternative
|
|
|
+ int searchCount = 0;
|
|
|
+ while (searchCount < ZT_MAX_PEER_NETWORK_PATHS) {
|
|
|
+ _pathAssignmentIdx++;
|
|
|
+ if (_pathAssignmentIdx == ZT_MAX_PEER_NETWORK_PATHS) {
|
|
|
+ _pathAssignmentIdx = 0;
|
|
|
+ }
|
|
|
+ searchCount++;
|
|
|
+ if (_paths[_pathAssignmentIdx].p) {
|
|
|
+ _paths[_pathAssignmentIdx].p->address().toString(what);
|
|
|
+ if (_activeBackupPath.ptr() == _paths[_pathAssignmentIdx].p.ptr()) {
|
|
|
continue;
|
|
|
}
|
|
|
- if ((now - _paths[i].p->lastIn()) < ZT_MULTIPATH_ACTIVE_BACKUP_RAPID_FAILOVER_PERIOD) {
|
|
|
+ if (!nextAlternativePath) { // Record the first viable alternative in the RR sequence
|
|
|
+ nextAlternativePath = _paths[_pathAssignmentIdx].p;
|
|
|
+ }
|
|
|
+ if ((now - _paths[_pathAssignmentIdx].p->lastIn()) < ZT_MULTIPATH_ACTIVE_BACKUP_RAPID_FAILOVER_PERIOD) {
|
|
|
bFoundHotPath = true;
|
|
|
- _activeBackupPath->address().toString(curPathStr); // Record path string for later debug trace
|
|
|
- _activeBackupPath = _paths[i].p;
|
|
|
+ _activeBackupPath = _paths[_pathAssignmentIdx].p;
|
|
|
_activeBackupPath->address().toString(newPathStr);
|
|
|
+ fprintf(stderr, "primary active-backup path %s to %llx appears to be dead, switched to %s\n",
|
|
|
+ curPathStr, this->_id.address().toInt(), newPathStr);
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- if (bFoundHotPath) {
|
|
|
- fprintf(stderr, "primary active-backup path %s to %llx appears to be dead, switched to path %s\n",
|
|
|
- curPathStr, this->_id.address().toInt(), newPathStr);
|
|
|
+ if (!bFoundHotPath) {
|
|
|
+ if (nextAlternativePath) {
|
|
|
+ _activeBackupPath = nextAlternativePath;
|
|
|
+ _activeBackupPath->address().toString(curPathStr);
|
|
|
+ //fprintf(stderr, "no hot paths found to use as active-backup primary to %llx, using next best: %s\n",
|
|
|
+ // this->_id.address().toInt(), curPathStr);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ // No change
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ if (!_activeBackupPath) {
|
|
|
+ return SharedPtr<Path>();
|
|
|
+ }
|
|
|
return _activeBackupPath;
|
|
|
}
|
|
|
|
|
@@ -866,14 +888,16 @@ inline void Peer::processBackgroundPeerTasks(const int64_t now)
|
|
|
}
|
|
|
|
|
|
// Remove old flows
|
|
|
- std::map<int64_t, struct Flow *>::iterator it = _flows.begin();
|
|
|
- while (it != _flows.end()) {
|
|
|
- if ((now - it->second->lastSend) > ZT_MULTIPATH_FLOW_EXPIRATION) {
|
|
|
- fprintf(stderr, "forgetting flow %llx between this node and %llx (%lu active flow(s))\n",
|
|
|
- it->first, this->_id.address().toInt(), _flows.size());
|
|
|
- it = _flows.erase(it);
|
|
|
- } else {
|
|
|
- it++;
|
|
|
+ if (RR->sw->isFlowAware()) {
|
|
|
+ std::map<int64_t, struct Flow *>::iterator it = _flows.begin();
|
|
|
+ while (it != _flows.end()) {
|
|
|
+ if ((now - it->second->lastSend) > ZT_MULTIPATH_FLOW_EXPIRATION) {
|
|
|
+ fprintf(stderr, "forgetting flow %llx between this node and %llx (%lu active flow(s))\n",
|
|
|
+ it->first, this->_id.address().toInt(), _flows.size());
|
|
|
+ it = _flows.erase(it);
|
|
|
+ } else {
|
|
|
+ it++;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|