|
@@ -15,10 +15,10 @@
|
|
|
|
|
|
#include "Switch.hpp"
|
|
|
|
|
|
+#include <cinttypes> // for PRId64, etc. macros
|
|
|
#include <cmath>
|
|
|
#include <cstdio>
|
|
|
#include <string>
|
|
|
-#include <cinttypes> // for PRId64, etc. macros
|
|
|
|
|
|
// FIXME: remove this suppression and actually fix warnings
|
|
|
#ifdef __GNUC__
|
|
@@ -108,7 +108,7 @@ bool Bond::setAllMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::s
|
|
|
std::map<int64_t, SharedPtr<Bond> >::iterator bondItr = _bonds.begin();
|
|
|
bool found = false;
|
|
|
while (bondItr != _bonds.end()) {
|
|
|
- if (bondItr->second->setMtuByTuple(mtu,ifStr,ipStr)) {
|
|
|
+ if (bondItr->second->setMtuByTuple(mtu, ifStr, ipStr)) {
|
|
|
found = true;
|
|
|
}
|
|
|
++bondItr;
|
|
@@ -154,11 +154,13 @@ SharedPtr<Bond> Bond::createBond(const RuntimeEnvironment* renv, const SharedPtr
|
|
|
bond = new Bond(renv, _bondPolicyTemplates[_defaultPolicyStr].ptr(), peer);
|
|
|
bond->debug("new default custom bond (based on %s)", bond->getPolicyStrByCode(bond->policy()).c_str());
|
|
|
}
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
if (! _bondPolicyTemplates[_policyTemplateAssignments[identity]]) {
|
|
|
bond = new Bond(renv, _defaultPolicy, peer);
|
|
|
bond->debug("peer-specific bond, was specified as %s but the bond definition was not found, using default %s", _policyTemplateAssignments[identity].c_str(), getPolicyStrByCode(_defaultPolicy).c_str());
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
bond = new Bond(renv, _bondPolicyTemplates[_policyTemplateAssignments[identity]].ptr(), peer);
|
|
|
bond->debug("new default bond");
|
|
|
}
|
|
@@ -227,10 +229,12 @@ SharedPtr<Link> Bond::getLinkBySocket(const std::string& policyAlias, uint64_t l
|
|
|
SharedPtr<Link> s = new Link(ifnameStr, 0, 0, 0, true, ZT_BOND_SLAVE_MODE_PRIMARY, "");
|
|
|
_interfaceToLinkMap[policyAlias].insert(std::pair<std::string, SharedPtr<Link> >(ifnameStr, s));
|
|
|
return s;
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
return SharedPtr<Link>();
|
|
|
}
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
return search->second;
|
|
|
}
|
|
|
}
|
|
@@ -340,6 +344,7 @@ void Bond::nominatePathToBond(const SharedPtr<Path>& path, int64_t now)
|
|
|
_paths[i].ipvPref = sl->ipvPref();
|
|
|
_paths[i].mode = sl->mode();
|
|
|
_paths[i].enabled = sl->enabled();
|
|
|
+ _paths[i].localPort = _phy->getLocalPort((PhySocket*)((uintptr_t)path->localSocket()));
|
|
|
_paths[i].onlyPathOnLink = ! bFoundCommonLink;
|
|
|
}
|
|
|
}
|
|
@@ -397,7 +402,8 @@ SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
|
|
|
_rrPacketsSentOnCurrLink = 0;
|
|
|
if (_numBondedPaths == 1 || _rrIdx >= (ZT_MAX_PEER_NETWORK_PATHS - 1)) {
|
|
|
_rrIdx = 0;
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
int _tempIdx = _rrIdx;
|
|
|
for (int searchCount = 0; searchCount < (_numBondedPaths - 1); searchCount++) {
|
|
|
_tempIdx = (_tempIdx == (_numBondedPaths - 1)) ? 0 : _tempIdx + 1;
|
|
@@ -427,7 +433,8 @@ SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
|
|
|
if (likely(it != _flows.end())) {
|
|
|
it->second->lastActivity = now;
|
|
|
return _paths[it->second->assignedPath].p;
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
unsigned char entropy;
|
|
|
Utils::getSecureRandom(&entropy, 1);
|
|
|
SharedPtr<Flow> flow = createFlow(ZT_MAX_PEER_NETWORK_PATHS, flowId, entropy, now);
|
|
@@ -505,7 +512,8 @@ void Bond::recordIncomingPacket(const SharedPtr<Path>& path, uint64_t packetId,
|
|
|
_paths[pathIdx].qosStatsIn[packetId] = now;
|
|
|
++(_paths[pathIdx].packetsReceivedSinceLastQoS);
|
|
|
//_paths[pathIdx].packetValiditySamples.push(true);
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
// debug("QoS buffer full, will not record information");
|
|
|
}
|
|
|
/*
|
|
@@ -532,7 +540,8 @@ void Bond::recordIncomingPacket(const SharedPtr<Path>& path, uint64_t packetId,
|
|
|
SharedPtr<Flow> flow;
|
|
|
if (! _flows.count(flowId)) {
|
|
|
flow = createFlow(pathIdx, flowId, 0, now);
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
flow = _flows[flowId];
|
|
|
}
|
|
|
if (flow) {
|
|
@@ -618,7 +627,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now, bool reass
|
|
|
|
|
|
if (reassign) {
|
|
|
log("attempting to re-assign out-flow %04x previously on idx %d (%u / %zu flows)", flow->id, flow->assignedPath, _paths[_realIdxMap[flow->assignedPath]].assignedFlowCount, _flows.size());
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
debug("attempting to assign flow for the first time");
|
|
|
}
|
|
|
|
|
@@ -632,7 +642,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now, bool reass
|
|
|
|
|
|
if (reassign) {
|
|
|
bondedIdx = (flow->assignedPath + offset) % (_numBondedPaths);
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
bondedIdx = abs((int)((entropy + offset) % (_numBondedPaths)));
|
|
|
}
|
|
|
// debug("idx=%d, offset=%d, randomCap=%f, actualCap=%f", bondedIdx, offset, randomLinkCapacity, _paths[_realIdxMap[bondedIdx]].relativeLinkCapacity);
|
|
@@ -655,7 +666,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now, bool reass
|
|
|
flow->assignPath(_realIdxMap[bondedIdx], now);
|
|
|
++(_paths[_realIdxMap[bondedIdx]].assignedFlowCount);
|
|
|
// debug(" ABLE to find optimal link %f idx %d", _paths[_realIdxMap[bondedIdx]].relativeQuality, bondedIdx);
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
// We were (unable) to find a path that didn't violate at least one quality requirement, will choose next best option
|
|
|
flow->assignPath(_realIdxMap[nextBestQualIdx], now);
|
|
|
++(_paths[_realIdxMap[nextBestQualIdx]].assignedFlowCount);
|
|
@@ -715,11 +727,13 @@ void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now)
|
|
|
debug("forget flow %04x (age %" PRId64 ") (%u / %zu)", it->first, it->second->age(now), _paths[it->second->assignedPath].assignedFlowCount, (_flows.size() - 1));
|
|
|
_paths[it->second->assignedPath].assignedFlowCount--;
|
|
|
it = _flows.erase(it);
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
++it;
|
|
|
}
|
|
|
}
|
|
|
- } else if (oldest) { // Remove single oldest by natural expiration
|
|
|
+ }
|
|
|
+ else if (oldest) { // Remove single oldest by natural expiration
|
|
|
uint64_t maxAge = 0;
|
|
|
while (it != _flows.end()) {
|
|
|
if (it->second->age(now) > maxAge) {
|
|
@@ -766,7 +780,8 @@ void Bond::processIncomingPathNegotiationRequest(uint64_t now, SharedPtr<Path>&
|
|
|
if (_peer->_id.address().toInt() > RR->node->identity().address().toInt()) {
|
|
|
debug("agree with peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr);
|
|
|
_negotiatedPathIdx = pathIdx;
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
debug("ignore petition from peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr);
|
|
|
}
|
|
|
}
|
|
@@ -881,7 +896,8 @@ void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, con
|
|
|
if (atAddress) {
|
|
|
outp.armor(_peer->key(), false, _peer->aesKeysIfSupported());
|
|
|
RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size());
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
RR->sw->send(tPtr, outp, false);
|
|
|
}
|
|
|
Metrics::pkt_qos_out++;
|
|
@@ -1078,6 +1094,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
|
|
|
* Curate the set of paths that are part of the bond proper. Select a set of paths
|
|
|
* per logical link according to eligibility and user-specified constraints.
|
|
|
*/
|
|
|
+ int updatedBondedPathCount = 0;
|
|
|
if ((_policy == ZT_BOND_POLICY_BALANCE_RR) || (_policy == ZT_BOND_POLICY_BALANCE_XOR) || (_policy == ZT_BOND_POLICY_BALANCE_AWARE)) {
|
|
|
if (! _numBondedPaths) {
|
|
|
rebuildBond = true;
|
|
@@ -1089,7 +1106,6 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
|
|
|
_paths[i].bonded = false;
|
|
|
}
|
|
|
|
|
|
- int updatedBondedPathCount = 0;
|
|
|
// Build map associating paths with local physical links. Will be selected from in next step
|
|
|
std::map<SharedPtr<Link>, std::vector<int> > linkMap;
|
|
|
for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
|
|
@@ -1191,6 +1207,14 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ if (_policy == ZT_BOND_POLICY_ACTIVE_BACKUP) {
|
|
|
+ for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
|
|
|
+ if (_paths[i].p && _paths[i].bonded) {
|
|
|
+ updatedBondedPathCount++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ _numBondedPaths = updatedBondedPathCount;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
void Bond::estimatePathQuality(int64_t now)
|
|
@@ -1222,7 +1246,8 @@ void Bond::estimatePathQuality(int64_t now)
|
|
|
if ((now - it->second) >= qosRecordTimeout) {
|
|
|
it = _paths[i].qosStatsOut.erase(it);
|
|
|
++numDroppedQosOutRecords;
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
++it;
|
|
|
}
|
|
|
}
|
|
@@ -1250,7 +1275,8 @@ void Bond::estimatePathQuality(int64_t now)
|
|
|
if ((now - it->second) >= qosRecordTimeout) {
|
|
|
it = _paths[i].qosStatsIn.erase(it);
|
|
|
++numDroppedQosInRecords;
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
++it;
|
|
|
}
|
|
|
}
|
|
@@ -1327,10 +1353,10 @@ void Bond::estimatePathQuality(int64_t now)
|
|
|
continue;
|
|
|
}
|
|
|
// Compute/Smooth average of real-world observations
|
|
|
- if (_paths[i].latencySamples.count() == ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE) {
|
|
|
+ if (_paths[i].latencySamples.count() >= ZT_QOS_SHORTTERM_SAMPLE_WIN_MIN_REQ_SIZE) {
|
|
|
_paths[i].latency = _paths[i].latencySamples.mean();
|
|
|
}
|
|
|
- if (_paths[i].latencySamples.count() == ZT_QOS_SHORTTERM_SAMPLE_WIN_SIZE) {
|
|
|
+ if (_paths[i].latencySamples.count() >= ZT_QOS_SHORTTERM_SAMPLE_WIN_MIN_REQ_SIZE) {
|
|
|
_paths[i].latencyVariance = _paths[i].latencySamples.stddev();
|
|
|
}
|
|
|
|
|
@@ -1344,6 +1370,7 @@ void Bond::estimatePathQuality(int64_t now)
|
|
|
//_paths[i].packetErrorRatio = 1.0 - (_paths[i].packetValiditySamples.count() ? _paths[i].packetValiditySamples.mean() : 1.0);
|
|
|
// _valid is written elsewhere
|
|
|
_paths[i].p->_relativeQuality = _paths[i].relativeQuality;
|
|
|
+ _paths[i].p->_localPort = _paths[i].localPort;
|
|
|
}
|
|
|
|
|
|
// Flag links for avoidance
|
|
@@ -1370,7 +1397,8 @@ void Bond::estimatePathQuality(int64_t now)
|
|
|
shouldAvoid = true;
|
|
|
}
|
|
|
_paths[i].shouldAvoid = shouldAvoid;
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
if (! shouldAvoid) {
|
|
|
log("no longer avoiding link %s", pathToStr(_paths[i].p).c_str());
|
|
|
_paths[i].shouldAvoid = false;
|
|
@@ -1482,7 +1510,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
|
|
|
_lastBondStatusLog = now;
|
|
|
if (_abPathIdx == ZT_MAX_PEER_NETWORK_PATHS) {
|
|
|
log("no active link");
|
|
|
- } else if (_paths[_abPathIdx].p) {
|
|
|
+ }
|
|
|
+ else if (_paths[_abPathIdx].p) {
|
|
|
log("active link is %s, failover queue size is %zu", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size());
|
|
|
}
|
|
|
if (_abFailoverQueue.empty()) {
|
|
@@ -1590,7 +1619,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
|
|
|
log("link %s is ineligible, removing from failover queue (%zu links remain in queue)", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size());
|
|
|
}
|
|
|
continue;
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
++it;
|
|
|
}
|
|
|
}
|
|
@@ -1656,7 +1686,7 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
|
|
|
if (! bFoundPathInQueue) {
|
|
|
_abFailoverQueue.push_front(i);
|
|
|
log("add link %s to failover queue (%zu links in queue)", pathToStr(_paths[i].p).c_str(), _abFailoverQueue.size());
|
|
|
- addPathToBond(0, i);
|
|
|
+ addPathToBond(i, 0);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1706,7 +1736,7 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
|
|
|
if (! bFoundPathInQueue) {
|
|
|
_abFailoverQueue.push_front(i);
|
|
|
log("add link %s to failover queue (%zu links in queue)", pathToStr(_paths[i].p).c_str(), _abFailoverQueue.size());
|
|
|
- addPathToBond(0, i);
|
|
|
+ addPathToBond(i, 0);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1739,7 +1769,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
|
|
|
if (! _abFailoverQueue.empty()) {
|
|
|
dequeueNextActiveBackupPath(now);
|
|
|
log("active link switched to %s", pathToStr(_paths[_abPathIdx].p).c_str());
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
log("failover queue is empty, no links to choose from");
|
|
|
}
|
|
|
}
|
|
@@ -1785,7 +1816,8 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
|
|
|
dequeueNextActiveBackupPath(now);
|
|
|
_lastPathNegotiationCheck = now;
|
|
|
log("switch negotiated link %s (select mode: optimize)", pathToStr(_paths[_abPathIdx].p).c_str());
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
// Try to find a better path and automatically switch to it -- not too often, though.
|
|
|
if ((now - _lastActiveBackupPathChange) > ZT_BOND_OPTIMIZE_INTERVAL) {
|
|
|
if (! _abFailoverQueue.empty()) {
|
|
@@ -1901,7 +1933,7 @@ void Bond::setBondParameters(int policy, SharedPtr<Bond> templateBond, bool useT
|
|
|
}
|
|
|
|
|
|
if (! _isLeaf) {
|
|
|
- _policy = ZT_BOND_POLICY_ACTIVE_BACKUP;
|
|
|
+ _policy = ZT_BOND_POLICY_NONE;
|
|
|
}
|
|
|
|
|
|
// Timer geometry
|