|
@@ -90,7 +90,7 @@ SharedPtr<Bond> Bond::getBondByPeerId(int64_t identity)
|
|
|
return _bonds.count(identity) ? _bonds[identity] : SharedPtr<Bond>();
|
|
|
}
|
|
|
|
|
|
-SharedPtr<Bond> Bond::createTransportTriggeredBond(const RuntimeEnvironment* renv, const SharedPtr<Peer>& peer)
|
|
|
+SharedPtr<Bond> Bond::createBond(const RuntimeEnvironment* renv, const SharedPtr<Peer>& peer)
|
|
|
{
|
|
|
Mutex::Lock _l(_bonds_m);
|
|
|
int64_t identity = peer->identity().address().toInt();
|
|
@@ -145,6 +145,12 @@ SharedPtr<Bond> Bond::createTransportTriggeredBond(const RuntimeEnvironment* ren
|
|
|
return SharedPtr<Bond>();
|
|
|
}
|
|
|
|
|
|
+void Bond::destroyBond(uint64_t peerId)
|
|
|
+{
|
|
|
+ Mutex::Lock _l(_bonds_m);
|
|
|
+ _bonds.erase(peerId);
|
|
|
+}
|
|
|
+
|
|
|
SharedPtr<Link> Bond::getLinkBySocket(const std::string& policyAlias, uint64_t localSocket)
|
|
|
{
|
|
|
Mutex::Lock _l(_links_m);
|
|
@@ -717,6 +723,7 @@ void Bond::sendPATH_NEGOTIATION_REQUEST(void* tPtr, int pathIdx)
|
|
|
if (_paths[pathIdx].p->address()) {
|
|
|
outp.armor(_peer->key(), false, _peer->aesKeysIfSupported());
|
|
|
RR->node->putPacket(tPtr, _paths[pathIdx].p->localSocket(), _paths[pathIdx].p->address(), outp.data(), outp.size());
|
|
|
+ _overheadBytes += outp.size();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -726,7 +733,6 @@ void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, con
|
|
|
Packet outp(_peer->_id.address(), RR->identity.address(), Packet::VERB_QOS_MEASUREMENT);
|
|
|
char qosData[ZT_QOS_MAX_PACKET_SIZE];
|
|
|
int16_t len = generateQoSPacket(pathIdx, _now, qosData);
|
|
|
- _overheadBytes += len;
|
|
|
if (len) {
|
|
|
outp.append(qosData, len);
|
|
|
if (atAddress) {
|
|
@@ -738,6 +744,7 @@ void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, con
|
|
|
}
|
|
|
_paths[pathIdx].packetsReceivedSinceLastQoS = 0;
|
|
|
_paths[pathIdx].lastQoSMeasurement = now;
|
|
|
+ _overheadBytes += outp.size();
|
|
|
}
|
|
|
// debug("send QOS via link %s (len=%d)", pathToStr(_paths[pathIdx].p).c_str(), len);
|
|
|
}
|
|
@@ -761,7 +768,7 @@ void Bond::processBackgroundBondTasks(void* tPtr, int64_t now)
|
|
|
for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
|
|
|
if (_paths[i].p && _paths[i].allowed()) {
|
|
|
if (_isLeaf) {
|
|
|
- if ((_monitorInterval > 0) && (((now - _paths[i].p->_lastIn) >= _monitorInterval) /*|| ((now - _paths[i].p->_lastOut) >= _monitorInterval)*/)) {
|
|
|
+ if ((_monitorInterval > 0) && (((now - _paths[i].p->_lastIn) >= (_paths[i].alive ? _monitorInterval : _failoverInterval)))) {
|
|
|
if ((_peer->remoteVersionProtocol() >= 5) && (! ((_peer->remoteVersionMajor() == 1) && (_peer->remoteVersionMinor() == 1) && (_peer->remoteVersionRevision() == 0)))) {
|
|
|
Packet outp(_peer->address(), RR->identity.address(), Packet::VERB_ECHO); // ECHO (this is our bond's heartbeat)
|
|
|
outp.armor(_peer->key(), true, _peer->aesKeysIfSupported());
|
|
@@ -815,6 +822,17 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
|
|
|
if (! _paths[i].p) {
|
|
|
continue;
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Remove expired links from bond
|
|
|
+ */
|
|
|
+ if ((now - _paths[i].p->_lastIn) > (ZT_PEER_EXPIRED_PATH_TRIAL_PERIOD)) {
|
|
|
+ log("link %s has expired, removing from bond", pathToStr(_paths[i].p).c_str());
|
|
|
+ _paths[i] = NominatedPath();
|
|
|
+ _paths[i].p = SharedPtr<Path>();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
tmpNumTotalLinks++;
|
|
|
if (_paths[i].eligible) {
|
|
|
tmpNumAliveLinks++;
|
|
@@ -875,42 +893,18 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Determine health status to report to user
|
|
|
+ * Trigger status report if number of links change
|
|
|
*/
|
|
|
_numAliveLinks = tmpNumAliveLinks;
|
|
|
_numTotalLinks = tmpNumTotalLinks;
|
|
|
- bool tmpHealthStatus = true;
|
|
|
-
|
|
|
- if (_policy == ZT_BOND_POLICY_BROADCAST) {
|
|
|
- if (_numAliveLinks < 1) {
|
|
|
- // Considered healthy if we're able to send frames at all
|
|
|
- tmpHealthStatus = false;
|
|
|
- }
|
|
|
- }
|
|
|
- if ((_policy == ZT_BOND_POLICY_BALANCE_RR) || (_policy == ZT_BOND_POLICY_BALANCE_XOR) || (_policy == ZT_BOND_POLICY_BALANCE_AWARE || (_policy == ZT_BOND_POLICY_ACTIVE_BACKUP))) {
|
|
|
- if (_numAliveLinks < _numTotalLinks) {
|
|
|
- tmpHealthStatus = false;
|
|
|
- }
|
|
|
- }
|
|
|
- if (tmpHealthStatus != _isHealthy) {
|
|
|
- std::string healthStatusStr;
|
|
|
- if (tmpHealthStatus == true) {
|
|
|
- healthStatusStr = "HEALTHY";
|
|
|
- }
|
|
|
- else {
|
|
|
- healthStatusStr = "DEGRADED";
|
|
|
- }
|
|
|
- log("bond is %s (%d/%d links)", healthStatusStr.c_str(), _numAliveLinks, _numTotalLinks);
|
|
|
+ if ((_numAliveLinks != tmpNumAliveLinks) || (_numTotalLinks != tmpNumTotalLinks)) {
|
|
|
dumpInfo(now, true);
|
|
|
}
|
|
|
|
|
|
- _isHealthy = tmpHealthStatus;
|
|
|
-
|
|
|
/**
|
|
|
* Curate the set of paths that are part of the bond proper. Select a set of paths
|
|
|
* per logical link according to eligibility and user-specified constraints.
|
|
|
*/
|
|
|
-
|
|
|
if ((_policy == ZT_BOND_POLICY_BALANCE_RR) || (_policy == ZT_BOND_POLICY_BALANCE_XOR) || (_policy == ZT_BOND_POLICY_BALANCE_AWARE)) {
|
|
|
if (! _numBondedPaths) {
|
|
|
rebuildBond = true;
|
|
@@ -1008,14 +1002,14 @@ void Bond::estimatePathQuality(int64_t now)
|
|
|
uint32_t totUserSpecifiedLinkSpeed = 0;
|
|
|
if (_numBondedPaths) { // Compute relative user-specified speeds of links
|
|
|
for (unsigned int i = 0; i < _numBondedPaths; ++i) {
|
|
|
- SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket());
|
|
|
if (_paths[i].p && _paths[i].allowed()) {
|
|
|
+ SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket());
|
|
|
totUserSpecifiedLinkSpeed += link->speed();
|
|
|
}
|
|
|
}
|
|
|
for (unsigned int i = 0; i < _numBondedPaths; ++i) {
|
|
|
- SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket());
|
|
|
if (_paths[i].p && _paths[i].allowed()) {
|
|
|
+ SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket());
|
|
|
link->setRelativeSpeed((uint8_t)round(((float)link->speed() / (float)totUserSpecifiedLinkSpeed) * 255));
|
|
|
}
|
|
|
}
|
|
@@ -1212,6 +1206,11 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
|
|
|
int nonPreferredPathIdx;
|
|
|
bool bFoundPrimaryLink = false;
|
|
|
|
|
|
+ if (_abPathIdx != ZT_MAX_PEER_NETWORK_PATHS && !_paths[_abPathIdx].p) {
|
|
|
+ _abPathIdx = ZT_MAX_PEER_NETWORK_PATHS;
|
|
|
+ log("main active-backup path has been removed");
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Generate periodic status report
|
|
|
*/
|
|
@@ -1241,7 +1240,6 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
|
|
|
* simply find the next eligible path.
|
|
|
*/
|
|
|
if (! userHasSpecifiedLinks()) {
|
|
|
- debug("no user-specified links");
|
|
|
for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
|
|
|
if (_paths[i].p && _paths[i].eligible) {
|
|
|
SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket());
|
|
@@ -1574,7 +1572,6 @@ void Bond::setBondParameters(int policy, SharedPtr<Bond> templateBond, bool useT
|
|
|
|
|
|
// Bond status
|
|
|
|
|
|
- _isHealthy = false;
|
|
|
_numAliveLinks = 0;
|
|
|
_numTotalLinks = 0;
|
|
|
_numBondedPaths = 0;
|
|
@@ -1684,11 +1681,14 @@ SharedPtr<Link> Bond::getLink(const SharedPtr<Path>& path)
|
|
|
std::string Bond::pathToStr(const SharedPtr<Path>& path)
|
|
|
{
|
|
|
#ifdef ZT_TRACE
|
|
|
- char pathStr[64] = { 0 };
|
|
|
- char fullPathStr[384] = { 0 };
|
|
|
- path->address().toString(pathStr);
|
|
|
- snprintf(fullPathStr, 384, "%.16llx-%s/%s", (unsigned long long)(path->localSocket()), getLink(path)->ifname().c_str(), pathStr);
|
|
|
- return std::string(fullPathStr);
|
|
|
+ if (path) {
|
|
|
+ char pathStr[64] = { 0 };
|
|
|
+ char fullPathStr[384] = { 0 };
|
|
|
+ path->address().toString(pathStr);
|
|
|
+ snprintf(fullPathStr, 384, "%.16llx-%s/%s", (unsigned long long)(path->localSocket()), getLink(path)->ifname().c_str(), pathStr);
|
|
|
+ return std::string(fullPathStr);
|
|
|
+ }
|
|
|
+ return "";
|
|
|
#else
|
|
|
return "";
|
|
|
#endif
|
|
@@ -1727,7 +1727,7 @@ void Bond::dumpInfo(int64_t now, bool force)
|
|
|
_lastSummaryDump = now;
|
|
|
float overhead = (_overheadBytes / (timeSinceLastDump / 1000.0f) / 1000.0f);
|
|
|
_overheadBytes = 0;
|
|
|
- log("bond: bp=%d, fi=%d, mi=%d, ud=%d, dd=%d, flows=%lu, leaf=%d, overhead=%f KB/s",
|
|
|
+ log("bond: bp=%d, fi=%d, mi=%d, ud=%d, dd=%d, flows=%lu, leaf=%d, overhead=%f KB/s, links=(%d/%d)",
|
|
|
_policy,
|
|
|
_failoverInterval,
|
|
|
_monitorInterval,
|
|
@@ -1735,7 +1735,9 @@ void Bond::dumpInfo(int64_t now, bool force)
|
|
|
_downDelay,
|
|
|
(unsigned long)_flows.size(),
|
|
|
_isLeaf,
|
|
|
- overhead);
|
|
|
+ overhead,
|
|
|
+ _numAliveLinks,
|
|
|
+ _numTotalLinks);
|
|
|
for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
|
|
|
if (_paths[i].p) {
|
|
|
dumpPathStatus(now, i);
|