Bond.cpp 72 KB


  1. /*
  2. * Copyright (c)2013-2020 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2025-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #include "Bond.hpp"
  14. #include "../osdep/OSUtils.hpp"
  15. #include "Switch.hpp"
  16. #include <cmath>
  17. namespace ZeroTier {
  18. Bond::Bond(const RuntimeEnvironment* renv, int policy, const SharedPtr<Peer>& peer)
  19. : RR(renv)
  20. , _peer(peer)
  21. , _qosCutoffCount(0)
  22. , _ackCutoffCount(0)
  23. , _lastAckRateCheck(0)
  24. , _lastQoSRateCheck(0)
  25. , _lastQualityEstimation(0)
  26. , _lastCheckUserPreferences(0)
  27. , _lastBackgroundTaskCheck(0)
  28. , _lastBondStatusLog(0)
  29. , _lastPathNegotiationReceived(0)
  30. , _lastPathNegotiationCheck(0)
  31. , _lastSentPathNegotiationRequest(0)
  32. , _lastFlowStatReset(0)
  33. , _lastFlowExpirationCheck(0)
  34. , _lastFlowRebalance(0)
  35. , _lastFrame(0)
  36. , _lastActiveBackupPathChange(0)
  37. {
  38. setReasonableDefaults(policy, SharedPtr<Bond>(), false);
  39. _policyAlias = BondController::getPolicyStrByCode(policy);
  40. }
  41. Bond::Bond(const RuntimeEnvironment* renv, std::string& basePolicy, std::string& policyAlias, const SharedPtr<Peer>& peer) : RR(renv), _policyAlias(policyAlias), _peer(peer)
  42. {
  43. setReasonableDefaults(BondController::getPolicyCodeByStr(basePolicy), SharedPtr<Bond>(), false);
  44. }
  45. Bond::Bond(const RuntimeEnvironment* renv, SharedPtr<Bond> originalBond, const SharedPtr<Peer>& peer)
  46. : RR(renv)
  47. , _peer(peer)
  48. , _lastAckRateCheck(0)
  49. , _lastQoSRateCheck(0)
  50. , _lastQualityEstimation(0)
  51. , _lastCheckUserPreferences(0)
  52. , _lastBackgroundTaskCheck(0)
  53. , _lastBondStatusLog(0)
  54. , _lastPathNegotiationReceived(0)
  55. , _lastPathNegotiationCheck(0)
  56. , _lastFlowStatReset(0)
  57. , _lastFlowExpirationCheck(0)
  58. , _lastFlowRebalance(0)
  59. , _lastFrame(0)
  60. {
  61. setReasonableDefaults(originalBond->_bondingPolicy, originalBond, true);
  62. }
  63. void Bond::nominatePath(const SharedPtr<Path>& path, int64_t now)
  64. {
  65. char traceMsg[256];
  66. char pathStr[128];
  67. path->address().toString(pathStr);
  68. Mutex::Lock _l(_paths_m);
  69. if (! RR->bc->linkAllowed(_policyAlias, getLink(path))) {
  70. return;
  71. }
  72. bool alreadyPresent = false;
  73. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  74. if (path.ptr() == _paths[i].ptr()) {
  75. // Previously encountered path, not notifying bond
  76. alreadyPresent = true;
  77. break;
  78. }
  79. }
  80. if (! alreadyPresent) {
  81. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  82. if (! _paths[i]) {
  83. _paths[i] = path;
  84. sprintf(traceMsg, "%s (bond) Nominating link %s/%s to peer %llx. It has now entered its trial period", OSUtils::humanReadableTimestamp().c_str(), getLink(path)->ifname().c_str(), pathStr, (unsigned long long)(_peer->_id.address().toInt()));
  85. RR->t->bondStateMessage(NULL, traceMsg);
  86. _paths[i]->startTrial(now);
  87. break;
  88. }
  89. }
  90. }
  91. curateBond(now, true);
  92. estimatePathQuality(now);
  93. }
  94. SharedPtr<Path> Bond::getAppropriatePath(int64_t now, int32_t flowId)
  95. {
  96. Mutex::Lock _l(_paths_m);
  97. /**
  98. * active-backup
  99. */
  100. if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
  101. if (_abPath) {
  102. return _abPath;
  103. }
  104. }
  105. /**
  106. * broadcast
  107. */
  108. if (_bondingPolicy == ZT_BONDING_POLICY_BROADCAST) {
  109. return SharedPtr<Path>(); // Handled in Switch::_trySend()
  110. }
  111. if (! _numBondedPaths) {
  112. return SharedPtr<Path>(); // No paths assigned to bond yet, cannot balance traffic
  113. }
  114. /**
  115. * balance-rr
  116. */
  117. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) {
  118. if (! _allowFlowHashing) {
  119. if (_packetsPerLink == 0) {
  120. // Randomly select a path
  121. return _paths[_bondedIdx[_freeRandomByte % _numBondedPaths]]; // TODO: Optimize
  122. }
  123. if (_rrPacketsSentOnCurrLink < _packetsPerLink) {
  124. // Continue to use this link
  125. ++_rrPacketsSentOnCurrLink;
  126. return _paths[_bondedIdx[_rrIdx]];
  127. }
  128. // Reset striping counter
  129. _rrPacketsSentOnCurrLink = 0;
  130. if (_numBondedPaths == 1) {
  131. _rrIdx = 0;
  132. }
  133. else {
  134. int _tempIdx = _rrIdx;
  135. for (int searchCount = 0; searchCount < (_numBondedPaths - 1); searchCount++) {
  136. _tempIdx = (_tempIdx == (_numBondedPaths - 1)) ? 0 : _tempIdx + 1;
  137. if (_bondedIdx[_tempIdx] != ZT_MAX_PEER_NETWORK_PATHS) {
  138. if (_paths[_bondedIdx[_tempIdx]] && _paths[_bondedIdx[_tempIdx]]->eligible(now, _ackSendInterval)) {
  139. _rrIdx = _tempIdx;
  140. break;
  141. }
  142. }
  143. }
  144. }
  145. if (_paths[_bondedIdx[_rrIdx]]) {
  146. return _paths[_bondedIdx[_rrIdx]];
  147. }
  148. }
  149. }
  150. /**
  151. * balance-xor
  152. */
  153. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
  154. if (! _allowFlowHashing || flowId == -1) {
  155. // No specific path required for unclassified traffic, send on anything
  156. return _paths[_bondedIdx[_freeRandomByte % _numBondedPaths]]; // TODO: Optimize
  157. }
  158. else if (_allowFlowHashing) {
  159. // TODO: Optimize
  160. Mutex::Lock _l(_flows_m);
  161. SharedPtr<Flow> flow;
  162. if (_flows.count(flowId)) {
  163. flow = _flows[flowId];
  164. flow->updateActivity(now);
  165. }
  166. else {
  167. unsigned char entropy;
  168. Utils::getSecureRandom(&entropy, 1);
  169. flow = createFlow(SharedPtr<Path>(), flowId, entropy, now);
  170. }
  171. if (flow) {
  172. return flow->assignedPath();
  173. }
  174. }
  175. }
  176. return SharedPtr<Path>();
  177. }
  178. void Bond::recordIncomingInvalidPacket(const SharedPtr<Path>& path)
  179. {
  180. // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr);
  181. // sprintf(traceMsg, "%s (qos) Invalid packet on link %s/%s from peer %llx",
  182. // OSUtils::humanReadableTimestamp().c_str(), getLink(path)->ifname().c_str(), pathStr, (unsigned long long)(_peer->_id.address().toInt()));
  183. // RR->t->bondStateMessage(NULL, traceMsg);
  184. Mutex::Lock _l(_paths_m);
  185. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  186. if (_paths[i] == path) {
  187. _paths[i]->packetValiditySamples.push(false);
  188. }
  189. }
  190. }
  191. void Bond::recordOutgoingPacket(const SharedPtr<Path>& path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, const int32_t flowId, int64_t now)
  192. {
  193. // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr);
  194. // sprintf(traceMsg, "%s (bond) Outgoing packet on link %s/%s to peer %llx",
  195. // OSUtils::humanReadableTimestamp().c_str(), getLink(path)->ifname().c_str(), pathStr, (unsigned long long)(_peer->_id.address().toInt()));
  196. // RR->t->bondStateMessage(NULL, traceMsg);
  197. _freeRandomByte += (unsigned char)(packetId >> 8); // Grab entropy to use in path selection logic
  198. if (! _shouldCollectPathStatistics) {
  199. return;
  200. }
  201. bool isFrame = (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME);
  202. bool shouldRecord = (packetId & (ZT_QOS_ACK_DIVISOR - 1) && (verb != Packet::VERB_ACK) && (verb != Packet::VERB_QOS_MEASUREMENT));
  203. if (isFrame || shouldRecord) {
  204. Mutex::Lock _l(_paths_m);
  205. if (isFrame) {
  206. ++(path->_packetsOut);
  207. _lastFrame = now;
  208. }
  209. if (shouldRecord) {
  210. path->_unackedBytes += payloadLength;
  211. // Take note that we're expecting a VERB_ACK on this path as of a specific time
  212. if (path->qosStatsOut.size() < ZT_QOS_MAX_OUTSTANDING_RECORDS) {
  213. path->qosStatsOut[packetId] = now;
  214. }
  215. }
  216. }
  217. if (_allowFlowHashing && (flowId != ZT_QOS_NO_FLOW)) {
  218. Mutex::Lock _l(_flows_m);
  219. if (_flows.count(flowId)) {
  220. _flows[flowId]->recordOutgoingBytes(payloadLength);
  221. }
  222. }
  223. }
  224. void Bond::recordIncomingPacket(const SharedPtr<Path>& path, uint64_t packetId, uint16_t payloadLength, Packet::Verb verb, int32_t flowId, int64_t now)
  225. {
  226. // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr);
  227. // sprintf(traceMsg, "%s (bond) Incoming packet on link %s/%s from peer %llx [id=%llx, len=%d, verb=%d, flowId=%x]",
  228. // OSUtils::humanReadableTimestamp().c_str(), getLink(path)->ifname().c_str(), pathStr, (unsigned long long)(_peer->_id.address().toInt()), packetId, payloadLength, verb, flowId);
  229. // RR->t->bondStateMessage(NULL, traceMsg);
  230. bool isFrame = (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME);
  231. bool shouldRecord = (packetId & (ZT_QOS_ACK_DIVISOR - 1) && (verb != Packet::VERB_ACK) && (verb != Packet::VERB_QOS_MEASUREMENT));
  232. if (isFrame || shouldRecord) {
  233. Mutex::Lock _l(_paths_m);
  234. if (isFrame) {
  235. ++(path->_packetsIn);
  236. _lastFrame = now;
  237. }
  238. if (shouldRecord) {
  239. path->ackStatsIn[packetId] = payloadLength;
  240. ++(path->_packetsReceivedSinceLastAck);
  241. path->qosStatsIn[packetId] = now;
  242. ++(path->_packetsReceivedSinceLastQoS);
  243. path->packetValiditySamples.push(true);
  244. }
  245. }
  246. /**
  247. * Learn new flows and pro-actively create entries for them in the bond so
  248. * that the next time we send a packet out that is part of a flow we know
  249. * which path to use.
  250. */
  251. if ((flowId != ZT_QOS_NO_FLOW) && (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE)) {
  252. Mutex::Lock _l(_flows_m);
  253. SharedPtr<Flow> flow;
  254. if (! _flows.count(flowId)) {
  255. flow = createFlow(path, flowId, 0, now);
  256. }
  257. else {
  258. flow = _flows[flowId];
  259. }
  260. if (flow) {
  261. flow->recordIncomingBytes(payloadLength);
  262. }
  263. }
  264. }
  265. void Bond::receivedQoS(const SharedPtr<Path>& path, int64_t now, int count, uint64_t* rx_id, uint16_t* rx_ts)
  266. {
  267. Mutex::Lock _l(_paths_m);
  268. // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr);
  269. // sprintf(traceMsg, "%s (qos) Received QoS packet sampling %d frames from peer %llx via %s/%s",
  270. // OSUtils::humanReadableTimestamp().c_str(), count, (unsigned long long)(_peer->_id.address().toInt()), getLink(path)->ifname().c_str(), pathStr);
  271. // RR->t->bondStateMessage(NULL, traceMsg);
  272. // Look up egress times and compute latency values for each record
  273. std::map<uint64_t, uint64_t>::iterator it;
  274. for (int j = 0; j < count; j++) {
  275. it = path->qosStatsOut.find(rx_id[j]);
  276. if (it != path->qosStatsOut.end()) {
  277. path->latencySamples.push(((uint16_t)(now - it->second) - rx_ts[j]) / 2);
  278. path->qosStatsOut.erase(it);
  279. }
  280. }
  281. path->qosRecordSize.push(count);
  282. }
  283. void Bond::receivedAck(const SharedPtr<Path>& path, int64_t now, int32_t ackedBytes)
  284. {
  285. Mutex::Lock _l(_paths_m);
  286. // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr);
  287. // sprintf(traceMsg, "%s (qos) Received ACK packet for %d bytes from peer %llx via %s/%s",
  288. // OSUtils::humanReadableTimestamp().c_str(), ackedBytes, (unsigned long long)(_peer->_id.address().toInt()), getLink(path)->ifname().c_str(), pathStr);
  289. // RR->t->bondStateMessage(NULL, traceMsg);
  290. path->_lastAckReceived = now;
  291. path->_unackedBytes = (ackedBytes > path->_unackedBytes) ? 0 : path->_unackedBytes - ackedBytes;
  292. int64_t timeSinceThroughputEstimate = (now - path->_lastThroughputEstimation);
  293. if (timeSinceThroughputEstimate >= throughputMeasurementInterval) {
  294. // TODO: See if this floating point math can be reduced
  295. uint64_t throughput = (uint64_t)((float)(path->_bytesAckedSinceLastThroughputEstimation) / ((float)timeSinceThroughputEstimate / (float)1000));
  296. throughput /= 1000;
  297. if (throughput > 0.0) {
  298. path->throughputSamples.push(throughput);
  299. path->_throughputMax = throughput > path->_throughputMax ? throughput : path->_throughputMax;
  300. }
  301. path->_lastThroughputEstimation = now;
  302. path->_bytesAckedSinceLastThroughputEstimation = 0;
  303. }
  304. else {
  305. path->_bytesAckedSinceLastThroughputEstimation += ackedBytes;
  306. }
  307. }
  308. int32_t Bond::generateQoSPacket(const SharedPtr<Path>& path, int64_t now, char* qosBuffer)
  309. {
  310. int32_t len = 0;
  311. std::map<uint64_t, uint64_t>::iterator it = path->qosStatsIn.begin();
  312. int i = 0;
  313. int numRecords = std::min(path->_packetsReceivedSinceLastQoS, ZT_QOS_TABLE_SIZE);
  314. while (i < numRecords && it != path->qosStatsIn.end()) {
  315. uint64_t id = it->first;
  316. memcpy(qosBuffer, &id, sizeof(uint64_t));
  317. qosBuffer += sizeof(uint64_t);
  318. uint16_t holdingTime = (uint16_t)(now - it->second);
  319. memcpy(qosBuffer, &holdingTime, sizeof(uint16_t));
  320. qosBuffer += sizeof(uint16_t);
  321. len += sizeof(uint64_t) + sizeof(uint16_t);
  322. path->qosStatsIn.erase(it++);
  323. ++i;
  324. }
  325. return len;
  326. }
  327. bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now)
  328. {
  329. char traceMsg[256];
  330. char curPathStr[128];
  331. unsigned int idx = ZT_MAX_PEER_NETWORK_PATHS;
  332. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR) {
  333. idx = abs((int)(flow->id() % (_numBondedPaths)));
  334. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[_bondedIdx[idx]]->localSocket());
  335. _paths[_bondedIdx[idx]]->address().toString(curPathStr);
  336. sprintf(
  337. traceMsg,
  338. "%s (balance-xor) Assigned outgoing flow %x to peer %llx to link %s/%s, %lu active flow(s)",
  339. OSUtils::humanReadableTimestamp().c_str(),
  340. flow->id(),
  341. (unsigned long long)(_peer->_id.address().toInt()),
  342. link->ifname().c_str(),
  343. curPathStr,
  344. _flows.size());
  345. RR->t->bondStateMessage(NULL, traceMsg);
  346. flow->assignPath(_paths[_bondedIdx[idx]], now);
  347. ++(_paths[_bondedIdx[idx]]->_assignedFlowCount);
  348. }
  349. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
  350. unsigned char entropy;
  351. Utils::getSecureRandom(&entropy, 1);
  352. if (_totalBondUnderload) {
  353. entropy %= _totalBondUnderload;
  354. }
  355. if (! _numBondedPaths) {
  356. sprintf(traceMsg, "%s (balance-aware) There are no bonded paths, cannot assign flow %x\n", OSUtils::humanReadableTimestamp().c_str(), flow->id());
  357. RR->t->bondStateMessage(NULL, traceMsg);
  358. return false;
  359. }
  360. /* Since there may be scenarios where a path is removed before we can re-estimate
  361. relative qualities (and thus allocations) we need to down-modulate the entropy
  362. value that we use to randomly assign among the surviving paths, otherwise we risk
  363. not being able to find a path to assign this flow to. */
  364. int totalIncompleteAllocation = 0;
  365. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  366. if (_paths[i] && _paths[i]->bonded()) {
  367. totalIncompleteAllocation += _paths[i]->_allocation;
  368. }
  369. }
  370. entropy %= totalIncompleteAllocation;
  371. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  372. if (_paths[i] && _paths[i]->bonded()) {
  373. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  374. _paths[i]->address().toString(curPathStr);
  375. uint8_t probabilitySegment = (_totalBondUnderload > 0) ? _paths[i]->_affinity : _paths[i]->_allocation;
  376. if (entropy <= probabilitySegment) {
  377. idx = i;
  378. break;
  379. }
  380. entropy -= probabilitySegment;
  381. }
  382. }
  383. if (idx < ZT_MAX_PEER_NETWORK_PATHS) {
  384. if (flow->_assignedPath) {
  385. flow->_previouslyAssignedPath = flow->_assignedPath;
  386. }
  387. flow->assignPath(_paths[idx], now);
  388. ++(_paths[idx]->_assignedFlowCount);
  389. }
  390. else {
  391. fprintf(stderr, "could not assign flow?\n");
  392. exit(0); // TODO: Remove for production
  393. return false;
  394. }
  395. }
  396. if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
  397. if (_abOverflowEnabled) {
  398. flow->assignPath(_abPath, now);
  399. }
  400. else {
  401. sprintf(traceMsg, "%s (bond) Unable to assign outgoing flow %x to peer %llx, no active overflow link", OSUtils::humanReadableTimestamp().c_str(), flow->id(), (unsigned long long)(_peer->_id.address().toInt()));
  402. RR->t->bondStateMessage(NULL, traceMsg);
  403. return false;
  404. }
  405. }
  406. flow->assignedPath()->address().toString(curPathStr);
  407. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, flow->assignedPath()->localSocket());
  408. sprintf(
  409. traceMsg,
  410. "%s (bond) Assigned outgoing flow %x to peer %llx to link %s/%s, %lu active flow(s)",
  411. OSUtils::humanReadableTimestamp().c_str(),
  412. flow->id(),
  413. (unsigned long long)(_peer->_id.address().toInt()),
  414. link->ifname().c_str(),
  415. curPathStr,
  416. _flows.size());
  417. RR->t->bondStateMessage(NULL, traceMsg);
  418. return true;
  419. }
  420. SharedPtr<Flow> Bond::createFlow(const SharedPtr<Path>& path, int32_t flowId, unsigned char entropy, int64_t now)
  421. {
  422. char traceMsg[256];
  423. char curPathStr[128];
  424. // ---
  425. if (! _numBondedPaths) {
  426. sprintf(traceMsg, "%s (bond) There are no bonded paths to peer %llx, cannot assign flow %x\n", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()), flowId);
  427. RR->t->bondStateMessage(NULL, traceMsg);
  428. return SharedPtr<Flow>();
  429. }
  430. if (_flows.size() >= ZT_FLOW_MAX_COUNT) {
  431. sprintf(traceMsg, "%s (bond) Maximum number of flows on bond to peer %llx reached (%d), forcibly forgetting oldest flow\n", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()), ZT_FLOW_MAX_COUNT);
  432. RR->t->bondStateMessage(NULL, traceMsg);
  433. forgetFlowsWhenNecessary(0, true, now);
  434. }
  435. SharedPtr<Flow> flow = new Flow(flowId, now);
  436. _flows[flowId] = flow;
  437. /**
  438. * Add a flow with a given Path already provided. This is the case when a packet
  439. * is received on a path but no flow exists, in this case we simply assign the path
  440. * that the remote peer chose for us.
  441. */
  442. if (path) {
  443. flow->assignPath(path, now);
  444. path->address().toString(curPathStr);
  445. path->_assignedFlowCount++;
  446. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, flow->assignedPath()->localSocket());
  447. sprintf(
  448. traceMsg,
  449. "%s (bond) Assigned incoming flow %x from peer %llx to link %s/%s, %lu active flow(s)",
  450. OSUtils::humanReadableTimestamp().c_str(),
  451. flow->id(),
  452. (unsigned long long)(_peer->_id.address().toInt()),
  453. link->ifname().c_str(),
  454. curPathStr,
  455. _flows.size());
  456. RR->t->bondStateMessage(NULL, traceMsg);
  457. }
  458. /**
  459. * Add a flow when no path was provided. This means that it is an outgoing packet
  460. * and that it is up to the local peer to decide how to load-balance its transmission.
  461. */
  462. else if (! path) {
  463. assignFlowToBondedPath(flow, now);
  464. }
  465. return flow;
  466. }
  467. void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now)
  468. {
  469. char traceMsg[256];
  470. std::map<int32_t, SharedPtr<Flow> >::iterator it = _flows.begin();
  471. std::map<int32_t, SharedPtr<Flow> >::iterator oldestFlow = _flows.end();
  472. SharedPtr<Flow> expiredFlow;
  473. if (age) { // Remove by specific age
  474. while (it != _flows.end()) {
  475. if (it->second->age(now) > age) {
  476. sprintf(traceMsg, "%s (bond) Forgetting flow %x between this node and peer %llx, %lu active flow(s)", OSUtils::humanReadableTimestamp().c_str(), it->first, (unsigned long long)(_peer->_id.address().toInt()), (_flows.size() - 1));
  477. RR->t->bondStateMessage(NULL, traceMsg);
  478. it->second->assignedPath()->_assignedFlowCount--;
  479. it = _flows.erase(it);
  480. }
  481. else {
  482. ++it;
  483. }
  484. }
  485. }
  486. else if (oldest) { // Remove single oldest by natural expiration
  487. uint64_t maxAge = 0;
  488. while (it != _flows.end()) {
  489. if (it->second->age(now) > maxAge) {
  490. maxAge = (now - it->second->age(now));
  491. oldestFlow = it;
  492. }
  493. ++it;
  494. }
  495. if (oldestFlow != _flows.end()) {
  496. sprintf(
  497. traceMsg,
  498. "%s (bond) Forgetting oldest flow %x (of age %llu) between this node and peer %llx, %lu active flow(s)",
  499. OSUtils::humanReadableTimestamp().c_str(),
  500. oldestFlow->first,
  501. oldestFlow->second->age(now),
  502. (unsigned long long)(_peer->_id.address().toInt()),
  503. (_flows.size() - 1));
  504. RR->t->bondStateMessage(NULL, traceMsg);
  505. oldestFlow->second->assignedPath()->_assignedFlowCount--;
  506. _flows.erase(oldestFlow);
  507. }
  508. }
  509. }
  510. void Bond::processIncomingPathNegotiationRequest(uint64_t now, SharedPtr<Path>& path, int16_t remoteUtility)
  511. {
  512. char traceMsg[256];
  513. if (_abLinkSelectMethod != ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE) {
  514. return;
  515. }
  516. Mutex::Lock _l(_paths_m);
  517. char pathStr[128];
  518. path->address().toString(pathStr);
  519. if (! _lastPathNegotiationCheck) {
  520. return;
  521. }
  522. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, path->localSocket());
  523. if (remoteUtility > _localUtility) {
  524. char pathStr[128];
  525. path->address().toString(pathStr);
  526. sprintf(
  527. traceMsg,
  528. "%s (bond) Peer %llx suggests using alternate link %s/%s. Remote utility (%d) is GREATER than local utility (%d), switching to said link\n",
  529. OSUtils::humanReadableTimestamp().c_str(),
  530. (unsigned long long)(_peer->_id.address().toInt()),
  531. link->ifname().c_str(),
  532. pathStr,
  533. remoteUtility,
  534. _localUtility);
  535. RR->t->bondStateMessage(NULL, traceMsg);
  536. negotiatedPath = path;
  537. }
  538. if (remoteUtility < _localUtility) {
  539. sprintf(
  540. traceMsg,
  541. "%s (bond) Peer %llx suggests using alternate link %s/%s. Remote utility (%d) is LESS than local utility (%d), not switching\n",
  542. OSUtils::humanReadableTimestamp().c_str(),
  543. (unsigned long long)(_peer->_id.address().toInt()),
  544. link->ifname().c_str(),
  545. pathStr,
  546. remoteUtility,
  547. _localUtility);
  548. RR->t->bondStateMessage(NULL, traceMsg);
  549. }
  550. if (remoteUtility == _localUtility) {
  551. sprintf(
  552. traceMsg,
  553. "%s (bond) Peer %llx suggests using alternate link %s/%s. Remote utility (%d) is equal to local utility (%d)\n",
  554. OSUtils::humanReadableTimestamp().c_str(),
  555. _peer->_id.address().toInt(),
  556. link->ifname().c_str(),
  557. pathStr,
  558. remoteUtility,
  559. _localUtility);
  560. RR->t->bondStateMessage(NULL, traceMsg);
  561. if (_peer->_id.address().toInt() > RR->node->identity().address().toInt()) {
  562. sprintf(traceMsg, "%s (bond) Agreeing with peer %llx to use alternate link %s/%s\n", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()), link->ifname().c_str(), pathStr);
  563. RR->t->bondStateMessage(NULL, traceMsg);
  564. negotiatedPath = path;
  565. }
  566. else {
  567. sprintf(traceMsg, "%s (bond) Ignoring petition from peer %llx to use alternate link %s/%s\n", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()), link->ifname().c_str(), pathStr);
  568. RR->t->bondStateMessage(NULL, traceMsg);
  569. }
  570. }
  571. }
  572. void Bond::pathNegotiationCheck(void* tPtr, const int64_t now)
  573. {
  574. char pathStr[128];
  575. int maxInPathIdx = ZT_MAX_PEER_NETWORK_PATHS;
  576. int maxOutPathIdx = ZT_MAX_PEER_NETWORK_PATHS;
  577. uint64_t maxInCount = 0;
  578. uint64_t maxOutCount = 0;
  579. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  580. if (! _paths[i]) {
  581. continue;
  582. }
  583. if (_paths[i]->_packetsIn > maxInCount) {
  584. maxInCount = _paths[i]->_packetsIn;
  585. maxInPathIdx = i;
  586. }
  587. if (_paths[i]->_packetsOut > maxOutCount) {
  588. maxOutCount = _paths[i]->_packetsOut;
  589. maxOutPathIdx = i;
  590. }
  591. _paths[i]->resetPacketCounts();
  592. }
  593. bool _peerLinksSynchronized = ((maxInPathIdx != ZT_MAX_PEER_NETWORK_PATHS) && (maxOutPathIdx != ZT_MAX_PEER_NETWORK_PATHS) && (maxInPathIdx != maxOutPathIdx)) ? false : true;
  594. /**
  595. * Determine utility and attempt to petition remote peer to switch to our chosen path
  596. */
  597. if (! _peerLinksSynchronized) {
  598. _localUtility = _paths[maxOutPathIdx]->_failoverScore - _paths[maxInPathIdx]->_failoverScore;
  599. if (_paths[maxOutPathIdx]->_negotiated) {
  600. _localUtility -= ZT_MULTIPATH_FAILOVER_HANDICAP_NEGOTIATED;
  601. }
  602. if ((now - _lastSentPathNegotiationRequest) > ZT_PATH_NEGOTIATION_CUTOFF_TIME) {
  603. // fprintf(stderr, "BT: (sync) it's been long enough, sending more requests.\n");
  604. _numSentPathNegotiationRequests = 0;
  605. }
  606. if (_numSentPathNegotiationRequests < ZT_PATH_NEGOTIATION_TRY_COUNT) {
  607. if (_localUtility >= 0) {
  608. // fprintf(stderr, "BT: (sync) paths appear to be out of sync (utility=%d)\n", _localUtility);
  609. sendPATH_NEGOTIATION_REQUEST(tPtr, _paths[maxOutPathIdx]);
  610. ++_numSentPathNegotiationRequests;
  611. _lastSentPathNegotiationRequest = now;
  612. _paths[maxOutPathIdx]->address().toString(pathStr);
  613. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[maxOutPathIdx]->localSocket());
  614. // fprintf(stderr, "sending request to use %s on %s, ls=%llx, utility=%d\n", pathStr, link->ifname().c_str(), _paths[maxOutPathIdx]->localSocket(), _localUtility);
  615. }
  616. }
  617. /**
  618. * Give up negotiating and consider switching
  619. */
  620. else if ((now - _lastSentPathNegotiationRequest) > (2 * ZT_PATH_NEGOTIATION_CHECK_INTERVAL)) {
  621. if (_localUtility == 0) {
  622. // There's no loss to us, just switch without sending a another request
  623. // fprintf(stderr, "BT: (sync) giving up, switching to remote peer's path.\n");
  624. negotiatedPath = _paths[maxInPathIdx];
  625. }
  626. }
  627. }
  628. }
  629. void Bond::sendPATH_NEGOTIATION_REQUEST(void* tPtr, const SharedPtr<Path>& path)
  630. {
  631. char traceMsg[256];
  632. char pathStr[128];
  633. path->address().toString(pathStr);
  634. sprintf(
  635. traceMsg,
  636. "%s (bond) Sending link negotiation request to peer %llx via link %s/%s, local utility is %d",
  637. OSUtils::humanReadableTimestamp().c_str(),
  638. (unsigned long long)(_peer->_id.address().toInt()),
  639. getLink(path)->ifname().c_str(),
  640. pathStr,
  641. _localUtility);
  642. RR->t->bondStateMessage(NULL, traceMsg);
  643. if (_abLinkSelectMethod != ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE) {
  644. return;
  645. }
  646. Packet outp(_peer->_id.address(), RR->identity.address(), Packet::VERB_PATH_NEGOTIATION_REQUEST);
  647. outp.append<int16_t>(_localUtility);
  648. if (path->address()) {
  649. outp.armor(_peer->key(), false, _peer->aesKeysIfSupported());
  650. RR->node->putPacket(tPtr, path->localSocket(), path->address(), outp.data(), outp.size());
  651. }
  652. }
  653. void Bond::sendACK(void* tPtr, const SharedPtr<Path>& path, const int64_t localSocket, const InetAddress& atAddress, int64_t now)
  654. {
  655. Packet outp(_peer->_id.address(), RR->identity.address(), Packet::VERB_ACK);
  656. int32_t bytesToAck = 0;
  657. std::map<uint64_t, uint16_t>::iterator it = path->ackStatsIn.begin();
  658. while (it != path->ackStatsIn.end()) {
  659. bytesToAck += it->second;
  660. ++it;
  661. }
  662. // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr);
  663. // sprintf(traceMsg, "%s (qos) Sending ACK packet for %d bytes to peer %llx via link %s/%s",
  664. // OSUtils::humanReadableTimestamp().c_str(), bytesToAck, (unsigned long long)(_peer->_id.address().toInt()), getLink(path)->ifname().c_str(), pathStr);
  665. // RR->t->bondStateMessage(NULL, traceMsg);
  666. outp.append<uint32_t>(bytesToAck);
  667. if (atAddress) {
  668. outp.armor(_peer->key(), false, _peer->aesKeysIfSupported());
  669. RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size());
  670. }
  671. else {
  672. RR->sw->send(tPtr, outp, false);
  673. }
  674. path->ackStatsIn.clear();
  675. path->_packetsReceivedSinceLastAck = 0;
  676. path->_lastAckSent = now;
  677. }
  678. void Bond::sendQOS_MEASUREMENT(void* tPtr, const SharedPtr<Path>& path, const int64_t localSocket, const InetAddress& atAddress, int64_t now)
  679. {
  680. // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr);
  681. // sprintf(traceMsg, "%s (qos) Sending QoS packet to peer %llx via link %s/%s",
  682. // OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()), getLink(path)->ifname().c_str(), pathStr);
  683. // RR->t->bondStateMessage(NULL, traceMsg);
  684. const int64_t _now = RR->node->now();
  685. Packet outp(_peer->_id.address(), RR->identity.address(), Packet::VERB_QOS_MEASUREMENT);
  686. char qosData[ZT_QOS_MAX_PACKET_SIZE];
  687. int16_t len = generateQoSPacket(path, _now, qosData);
  688. outp.append(qosData, len);
  689. if (atAddress) {
  690. outp.armor(_peer->key(), false, _peer->aesKeysIfSupported());
  691. RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size());
  692. }
  693. else {
  694. RR->sw->send(tPtr, outp, false);
  695. }
  696. // Account for the fact that a VERB_QOS_MEASUREMENT was just sent. Reset timers.
  697. path->_packetsReceivedSinceLastQoS = 0;
  698. path->_lastQoSMeasurement = now;
  699. }
  700. void Bond::processBackgroundTasks(void* tPtr, const int64_t now)
  701. {
  702. Mutex::Lock _l(_paths_m);
  703. if (! _peer->_canUseMultipath || (now - _lastBackgroundTaskCheck) < ZT_BOND_BACKGROUND_TASK_MIN_INTERVAL) {
  704. return;
  705. }
  706. _lastBackgroundTaskCheck = now;
  707. // Compute dynamic path monitor timer interval
  708. if (_linkMonitorStrategy == ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC) {
  709. int suggestedMonitorInterval = (now - _lastFrame) / 100;
  710. _dynamicPathMonitorInterval = std::min(ZT_PATH_HEARTBEAT_PERIOD, ((suggestedMonitorInterval > _bondMonitorInterval) ? suggestedMonitorInterval : _bondMonitorInterval));
  711. }
  712. // TODO: Clarify and generalize this logic
  713. if (_linkMonitorStrategy == ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC) {
  714. _shouldCollectPathStatistics = true;
  715. }
  716. // Memoize oft-used properties in the packet ingress/egress logic path
  717. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
  718. // Required for real-time balancing
  719. _shouldCollectPathStatistics = true;
  720. }
  721. if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
  722. if (_abLinkSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_BETTER) {
  723. // Required for judging suitability of primary link after recovery
  724. _shouldCollectPathStatistics = true;
  725. }
  726. if (_abLinkSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE) {
  727. // Required for judging suitability of new candidate primary
  728. _shouldCollectPathStatistics = true;
  729. }
  730. }
  731. if ((now - _lastCheckUserPreferences) > 1000) {
  732. _lastCheckUserPreferences = now;
  733. applyUserPrefs();
  734. }
  735. curateBond(now, false);
  736. if ((now - _lastQualityEstimation) > _qualityEstimationInterval) {
  737. _lastQualityEstimation = now;
  738. estimatePathQuality(now);
  739. }
  740. dumpInfo(now);
  741. // Send QOS/ACK packets as needed
  742. if (_shouldCollectPathStatistics) {
  743. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  744. if (_paths[i] && _paths[i]->allowed()) {
  745. if (_paths[i]->needsToSendQoS(now, _qosSendInterval)) {
  746. sendQOS_MEASUREMENT(tPtr, _paths[i], _paths[i]->localSocket(), _paths[i]->address(), now);
  747. }
  748. if (_paths[i]->needsToSendAck(now, _ackSendInterval)) {
  749. sendACK(tPtr, _paths[i], _paths[i]->localSocket(), _paths[i]->address(), now);
  750. }
  751. }
  752. }
  753. }
  754. // Perform periodic background tasks unique to each bonding policy
  755. switch (_bondingPolicy) {
  756. case ZT_BONDING_POLICY_ACTIVE_BACKUP:
  757. processActiveBackupTasks(tPtr, now);
  758. break;
  759. case ZT_BONDING_POLICY_BROADCAST:
  760. break;
  761. case ZT_BONDING_POLICY_BALANCE_RR:
  762. case ZT_BONDING_POLICY_BALANCE_XOR:
  763. case ZT_BONDING_POLICY_BALANCE_AWARE:
  764. processBalanceTasks(now);
  765. break;
  766. default:
  767. break;
  768. }
  769. // Check whether or not a path negotiation needs to be performed
  770. if (((now - _lastPathNegotiationCheck) > ZT_PATH_NEGOTIATION_CHECK_INTERVAL) && _allowPathNegotiation) {
  771. _lastPathNegotiationCheck = now;
  772. pathNegotiationCheck(tPtr, now);
  773. }
  774. }
  775. void Bond::applyUserPrefs()
  776. {
  777. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  778. if (! _paths[i]) {
  779. continue;
  780. }
  781. SharedPtr<Link> sl = getLink(_paths[i]);
  782. if (sl) {
  783. if (sl->monitorInterval() == 0) { // If no interval was specified for this link, use more generic bond-wide interval
  784. sl->setMonitorInterval(_bondMonitorInterval);
  785. }
  786. RR->bc->setMinReqPathMonitorInterval((sl->monitorInterval() < RR->bc->minReqPathMonitorInterval()) ? sl->monitorInterval() : RR->bc->minReqPathMonitorInterval());
  787. bool bFoundCommonLink = false;
  788. SharedPtr<Link> commonLink = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  789. for (unsigned int j = 0; j < ZT_MAX_PEER_NETWORK_PATHS; ++j) {
  790. if (_paths[j] && _paths[j].ptr() != _paths[i].ptr()) {
  791. if (RR->bc->getLinkBySocket(_policyAlias, _paths[j]->localSocket()) == commonLink) {
  792. bFoundCommonLink = true;
  793. }
  794. }
  795. }
  796. _paths[i]->_monitorInterval = sl->monitorInterval();
  797. _paths[i]->_upDelay = sl->upDelay() ? sl->upDelay() : _upDelay;
  798. _paths[i]->_downDelay = sl->downDelay() ? sl->downDelay() : _downDelay;
  799. _paths[i]->_ipvPref = sl->ipvPref();
  800. _paths[i]->_mode = sl->mode();
  801. _paths[i]->_enabled = sl->enabled();
  802. _paths[i]->_onlyPathOnLink = ! bFoundCommonLink;
  803. }
  804. }
  805. if (_peer) {
  806. _peer->_shouldCollectPathStatistics = _shouldCollectPathStatistics;
  807. _peer->_bondingPolicy = _bondingPolicy;
  808. }
  809. }
  810. void Bond::curateBond(const int64_t now, bool rebuildBond)
  811. {
  812. char traceMsg[256];
  813. char pathStr[128];
  814. uint8_t tmpNumAliveLinks = 0;
  815. uint8_t tmpNumTotalLinks = 0;
  816. /**
  817. * Update path states
  818. */
  819. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  820. if (! _paths[i]) {
  821. continue;
  822. }
  823. tmpNumTotalLinks++;
  824. if (_paths[i]->alive(now, true)) {
  825. tmpNumAliveLinks++;
  826. }
  827. bool currEligibility = _paths[i]->eligible(now, _ackSendInterval);
  828. if (currEligibility != _paths[i]->_lastEligibilityState) {
  829. _paths[i]->address().toString(pathStr);
  830. char traceMsg[256];
  831. _paths[i]->address().toString(pathStr);
  832. sprintf(
  833. traceMsg,
  834. "%s (bond) Eligibility of link %s/%s to peer %llx has changed from %d to %d",
  835. OSUtils::humanReadableTimestamp().c_str(),
  836. getLink(_paths[i])->ifname().c_str(),
  837. pathStr,
  838. (unsigned long long)(_peer->_id.address().toInt()),
  839. _paths[i]->_lastEligibilityState,
  840. currEligibility);
  841. RR->t->bondStateMessage(NULL, traceMsg);
  842. if (currEligibility) {
  843. rebuildBond = true;
  844. }
  845. if (! currEligibility) {
  846. _paths[i]->adjustRefractoryPeriod(now, _defaultPathRefractoryPeriod, ! currEligibility);
  847. if (_paths[i]->bonded()) {
  848. char pathStr[128];
  849. _paths[i]->address().toString(pathStr);
  850. sprintf(
  851. traceMsg,
  852. "%s (bond) Link %s/%s to peer %llx was bonded, reallocation of its flows will occur soon",
  853. OSUtils::humanReadableTimestamp().c_str(),
  854. getLink(_paths[i])->ifname().c_str(),
  855. pathStr,
  856. _peer->_id.address().toInt());
  857. RR->t->bondStateMessage(NULL, traceMsg);
  858. rebuildBond = true;
  859. _paths[i]->_shouldReallocateFlows = _paths[i]->bonded();
  860. _paths[i]->setBonded(false);
  861. }
  862. else {
  863. sprintf(traceMsg, "%s (bond) Link %s/%s to peer %llx was not bonded, no allocation consequences", OSUtils::humanReadableTimestamp().c_str(), getLink(_paths[i])->ifname().c_str(), pathStr, (unsigned long long)(_peer->_id.address().toInt()));
  864. RR->t->bondStateMessage(NULL, traceMsg);
  865. }
  866. }
  867. }
  868. if (currEligibility) {
  869. _paths[i]->adjustRefractoryPeriod(now, _defaultPathRefractoryPeriod, false);
  870. }
  871. _paths[i]->_lastEligibilityState = currEligibility;
  872. }
  873. _numAliveLinks = tmpNumAliveLinks;
  874. _numTotalLinks = tmpNumTotalLinks;
  875. /* Determine health status to report to user */
  876. bool tmpHealthStatus = true;
  877. if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
  878. if (_numAliveLinks < 2) {
  879. // Considered healthy if there is at least one failover link
  880. tmpHealthStatus = false;
  881. }
  882. }
  883. if (_bondingPolicy == ZT_BONDING_POLICY_BROADCAST) {
  884. if (_numAliveLinks < 1) {
  885. // Considered healthy if we're able to send frames at all
  886. tmpHealthStatus = false;
  887. }
  888. }
  889. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) {
  890. if (_numAliveLinks < _numTotalLinks) {
  891. // Considered healthy if all known paths are alive, this should be refined to account for user bond config settings
  892. tmpHealthStatus = false;
  893. }
  894. }
  895. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR) {
  896. if (_numAliveLinks < _numTotalLinks) {
  897. // Considered healthy if all known paths are alive, this should be refined to account for user bond config settings
  898. tmpHealthStatus = false;
  899. }
  900. }
  901. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
  902. if (_numAliveLinks < _numTotalLinks) {
  903. // Considered healthy if all known paths are alive, this should be refined to account for user bond config settings
  904. tmpHealthStatus = false;
  905. }
  906. }
  907. if (tmpHealthStatus != _isHealthy) {
  908. std::string healthStatusStr;
  909. if (tmpHealthStatus == true) {
  910. healthStatusStr = "HEALTHY";
  911. }
  912. else {
  913. healthStatusStr = "DEGRADED";
  914. }
  915. sprintf(traceMsg, "%s (bond) Bond to peer %llx is in a %s state (%d/%d links)", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()), healthStatusStr.c_str(), _numAliveLinks, _numTotalLinks);
  916. RR->t->bondStateMessage(NULL, traceMsg);
  917. }
  918. _isHealthy = tmpHealthStatus;
  919. /**
  920. * Curate the set of paths that are part of the bond proper. Selects a single path
  921. * per logical link according to eligibility and user-specified constraints.
  922. */
  923. if ((_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) || (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR) || (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE)) {
  924. if (! _numBondedPaths) {
  925. rebuildBond = true;
  926. }
  927. // TODO: Optimize
  928. if (rebuildBond) {
  929. int updatedBondedPathCount = 0;
  930. std::map<SharedPtr<Link>, int> linkMap;
  931. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  932. if (_paths[i] && _paths[i]->allowed() && (_paths[i]->eligible(now, _ackSendInterval) || ! _numBondedPaths)) {
  933. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  934. if (! linkMap.count(link)) {
  935. linkMap[link] = i;
  936. }
  937. else {
  938. bool overriden = false;
  939. _paths[i]->address().toString(pathStr);
  940. // fprintf(stderr, " link representative path already exists! (%s %s)\n", getLink(_paths[i])->ifname().c_str(), pathStr);
  941. if (_paths[i]->preferred() && ! _paths[linkMap[link]]->preferred()) {
  942. // Override previous choice if preferred
  943. if (_paths[linkMap[link]]->_assignedFlowCount) {
  944. _paths[linkMap[link]]->_deprecated = true;
  945. }
  946. else {
  947. _paths[linkMap[link]]->_deprecated = true;
  948. _paths[linkMap[link]]->setBonded(false);
  949. }
  950. linkMap[link] = i;
  951. overriden = true;
  952. }
  953. if ((_paths[i]->preferred() && _paths[linkMap[link]]->preferred()) || (! _paths[i]->preferred() && ! _paths[linkMap[link]]->preferred())) {
  954. if (_paths[i]->preferenceRank() > _paths[linkMap[link]]->preferenceRank()) {
  955. // Override if higher preference
  956. if (_paths[linkMap[link]]->_assignedFlowCount) {
  957. _paths[linkMap[link]]->_deprecated = true;
  958. }
  959. else {
  960. _paths[linkMap[link]]->_deprecated = true;
  961. _paths[linkMap[link]]->setBonded(false);
  962. }
  963. linkMap[link] = i;
  964. }
  965. }
  966. }
  967. }
  968. }
  969. std::map<SharedPtr<Link>, int>::iterator it = linkMap.begin();
  970. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  971. if (! _paths[i]) {
  972. continue;
  973. }
  974. _bondedIdx[i] = ZT_MAX_PEER_NETWORK_PATHS;
  975. if (it != linkMap.end()) {
  976. _bondedIdx[i] = it->second;
  977. _paths[_bondedIdx[i]]->setBonded(true);
  978. ++it;
  979. ++updatedBondedPathCount;
  980. _paths[_bondedIdx[i]]->address().toString(pathStr);
  981. // fprintf(stderr, "setting i=%d, _bondedIdx[%d]=%d to bonded (%s %s)\n", i, i, _bondedIdx[i], getLink(_paths[_bondedIdx[i]])->ifname().c_str(), pathStr);
  982. }
  983. }
  984. _numBondedPaths = updatedBondedPathCount;
  985. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) {
  986. // Cause a RR reset since the currently used index might no longer be valid
  987. _rrPacketsSentOnCurrLink = _packetsPerLink;
  988. }
  989. }
  990. }
  991. }
  992. void Bond::estimatePathQuality(const int64_t now)
  993. {
  994. char pathStr[128];
  995. uint32_t totUserSpecifiedLinkSpeed = 0;
  996. if (_numBondedPaths) { // Compute relative user-specified speeds of links
  997. for (unsigned int i = 0; i < _numBondedPaths; ++i) {
  998. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  999. if (_paths[i] && _paths[i]->allowed()) {
  1000. totUserSpecifiedLinkSpeed += link->speed();
  1001. }
  1002. }
  1003. for (unsigned int i = 0; i < _numBondedPaths; ++i) {
  1004. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  1005. if (_paths[i] && _paths[i]->allowed()) {
  1006. link->setRelativeSpeed(round(((float)link->speed() / (float)totUserSpecifiedLinkSpeed) * 255));
  1007. }
  1008. }
  1009. }
  1010. float lat[ZT_MAX_PEER_NETWORK_PATHS];
  1011. float pdv[ZT_MAX_PEER_NETWORK_PATHS];
  1012. float plr[ZT_MAX_PEER_NETWORK_PATHS];
  1013. float per[ZT_MAX_PEER_NETWORK_PATHS];
  1014. float maxLAT = 0;
  1015. float maxPDV = 0;
  1016. float maxPLR = 0;
  1017. float maxPER = 0;
  1018. float quality[ZT_MAX_PEER_NETWORK_PATHS];
  1019. uint8_t alloc[ZT_MAX_PEER_NETWORK_PATHS];
  1020. float totQuality = 0.0f;
  1021. memset(&lat, 0, sizeof(lat));
  1022. memset(&pdv, 0, sizeof(pdv));
  1023. memset(&plr, 0, sizeof(plr));
  1024. memset(&per, 0, sizeof(per));
  1025. memset(&quality, 0, sizeof(quality));
  1026. memset(&alloc, 0, sizeof(alloc));
  1027. // Compute initial summary statistics
  1028. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1029. if (! _paths[i] || ! _paths[i]->allowed()) {
  1030. continue;
  1031. }
  1032. // Compute/Smooth average of real-world observations
  1033. _paths[i]->_latencyMean = _paths[i]->latencySamples.mean();
  1034. _paths[i]->_latencyVariance = _paths[i]->latencySamples.stddev();
  1035. _paths[i]->_packetErrorRatio = 1.0 - (_paths[i]->packetValiditySamples.count() ? _paths[i]->packetValiditySamples.mean() : 1.0);
  1036. if (userHasSpecifiedLinkSpeeds()) {
  1037. // Use user-reported metrics
  1038. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  1039. if (link) {
  1040. _paths[i]->_throughputMean = link->speed();
  1041. _paths[i]->_throughputVariance = 0;
  1042. }
  1043. }
  1044. // Drain unacknowledged QoS records
  1045. std::map<uint64_t, uint64_t>::iterator it = _paths[i]->qosStatsOut.begin();
  1046. uint64_t currentLostRecords = 0;
  1047. while (it != _paths[i]->qosStatsOut.end()) {
  1048. int qosRecordTimeout = 5000; //_paths[i]->monitorInterval() * ZT_MULTIPATH_QOS_ACK_INTERVAL_MULTIPLIER * 8;
  1049. if ((now - it->second) >= qosRecordTimeout) {
  1050. // Packet was lost
  1051. it = _paths[i]->qosStatsOut.erase(it);
  1052. ++currentLostRecords;
  1053. }
  1054. else {
  1055. ++it;
  1056. }
  1057. }
  1058. quality[i] = 0;
  1059. totQuality = 0;
  1060. // Normalize raw observations according to sane limits and/or user specified values
  1061. lat[i] = 1.0 / expf(4 * Utils::normalize(_paths[i]->_latencyMean, 0, _maxAcceptableLatency, 0, 1));
  1062. pdv[i] = 1.0 / expf(4 * Utils::normalize(_paths[i]->_latencyVariance, 0, _maxAcceptablePacketDelayVariance, 0, 1));
  1063. plr[i] = 1.0 / expf(4 * Utils::normalize(_paths[i]->_packetLossRatio, 0, _maxAcceptablePacketLossRatio, 0, 1));
  1064. per[i] = 1.0 / expf(4 * Utils::normalize(_paths[i]->_packetErrorRatio, 0, _maxAcceptablePacketErrorRatio, 0, 1));
  1065. // Record bond-wide maximums to determine relative values
  1066. maxLAT = lat[i] > maxLAT ? lat[i] : maxLAT;
  1067. maxPDV = pdv[i] > maxPDV ? pdv[i] : maxPDV;
  1068. maxPLR = plr[i] > maxPLR ? plr[i] : maxPLR;
  1069. maxPER = per[i] > maxPER ? per[i] : maxPER;
  1070. }
  1071. // Convert metrics to relative quantities and apply contribution weights
  1072. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1073. if (_paths[i] && _paths[i]->bonded()) {
  1074. quality[i] += ((maxLAT > 0.0f ? lat[i] / maxLAT : 0.0f) * _qualityWeights[ZT_QOS_LAT_IDX]);
  1075. quality[i] += ((maxPDV > 0.0f ? pdv[i] / maxPDV : 0.0f) * _qualityWeights[ZT_QOS_PDV_IDX]);
  1076. quality[i] += ((maxPLR > 0.0f ? plr[i] / maxPLR : 0.0f) * _qualityWeights[ZT_QOS_PLR_IDX]);
  1077. quality[i] += ((maxPER > 0.0f ? per[i] / maxPER : 0.0f) * _qualityWeights[ZT_QOS_PER_IDX]);
  1078. totQuality += quality[i];
  1079. }
  1080. }
  1081. // Normalize to 8-bit allocation values
  1082. for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1083. if (_paths[i] && _paths[i]->bonded()) {
  1084. alloc[i] = std::ceil((quality[i] / totQuality) * (float)255);
  1085. _paths[i]->_allocation = alloc[i];
  1086. }
  1087. }
  1088. }
  1089. void Bond::processBalanceTasks(const int64_t now)
  1090. {
  1091. char curPathStr[128];
  1092. // TODO: Generalize
  1093. int totalAllocation = 0;
  1094. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1095. if (! _paths[i]) {
  1096. continue;
  1097. }
  1098. if (_paths[i] && _paths[i]->bonded() && _paths[i]->eligible(now, _ackSendInterval)) {
  1099. totalAllocation += _paths[i]->_allocation;
  1100. }
  1101. }
  1102. unsigned char minimumAllocationValue = 0.33 * ((float)totalAllocation / (float)_numBondedPaths);
  1103. if (_allowFlowHashing) {
  1104. /**
  1105. * Clean up and reset flows if necessary
  1106. */
  1107. if ((now - _lastFlowExpirationCheck) > ZT_MULTIPATH_FLOW_CHECK_INTERVAL) {
  1108. Mutex::Lock _l(_flows_m);
  1109. forgetFlowsWhenNecessary(ZT_MULTIPATH_FLOW_EXPIRATION_INTERVAL, false, now);
  1110. _lastFlowExpirationCheck = now;
  1111. }
  1112. if ((now - _lastFlowStatReset) > ZT_FLOW_STATS_RESET_INTERVAL) {
  1113. Mutex::Lock _l(_flows_m);
  1114. _lastFlowStatReset = now;
  1115. std::map<int32_t, SharedPtr<Flow> >::iterator it = _flows.begin();
  1116. while (it != _flows.end()) {
  1117. it->second->resetByteCounts();
  1118. ++it;
  1119. }
  1120. }
  1121. /**
  1122. * Re-allocate flows from dead paths
  1123. */
  1124. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
  1125. Mutex::Lock _l(_flows_m);
  1126. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1127. if (! _paths[i]) {
  1128. continue;
  1129. }
  1130. if (! _paths[i]->eligible(now, _ackSendInterval) && _paths[i]->_shouldReallocateFlows) {
  1131. char traceMsg[256];
  1132. char pathStr[128];
  1133. _paths[i]->address().toString(pathStr);
  1134. sprintf(
  1135. traceMsg,
  1136. "%s (balance-*) Reallocating flows to peer %llx from dead link %s/%s to surviving links",
  1137. OSUtils::humanReadableTimestamp().c_str(),
  1138. (unsigned long long)(_peer->_id.address().toInt()),
  1139. getLink(_paths[i])->ifname().c_str(),
  1140. pathStr);
  1141. RR->t->bondStateMessage(NULL, traceMsg);
  1142. std::map<int32_t, SharedPtr<Flow> >::iterator flow_it = _flows.begin();
  1143. while (flow_it != _flows.end()) {
  1144. if (flow_it->second->assignedPath() == _paths[i]) {
  1145. if (assignFlowToBondedPath(flow_it->second, now)) {
  1146. _paths[i]->_assignedFlowCount--;
  1147. }
  1148. }
  1149. ++flow_it;
  1150. }
  1151. _paths[i]->_shouldReallocateFlows = false;
  1152. }
  1153. }
  1154. }
  1155. /**
  1156. * Re-allocate flows from under-performing
  1157. * NOTE: This could be part of the above block but was kept separate for clarity.
  1158. */
  1159. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
  1160. Mutex::Lock _l(_flows_m);
  1161. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1162. if (! _paths[i]) {
  1163. continue;
  1164. }
  1165. if (_paths[i] && _paths[i]->bonded() && _paths[i]->eligible(now, _ackSendInterval) && (_paths[i]->_allocation < minimumAllocationValue) && _paths[i]->_assignedFlowCount) {
  1166. _paths[i]->address().toString(curPathStr);
  1167. char traceMsg[256];
  1168. char pathStr[128];
  1169. _paths[i]->address().toString(pathStr);
  1170. sprintf(
  1171. traceMsg,
  1172. "%s (balance-*) Reallocating flows to peer %llx from under-performing link %s/%s\n",
  1173. OSUtils::humanReadableTimestamp().c_str(),
  1174. (unsigned long long)(_peer->_id.address().toInt()),
  1175. getLink(_paths[i])->ifname().c_str(),
  1176. pathStr);
  1177. RR->t->bondStateMessage(NULL, traceMsg);
  1178. std::map<int32_t, SharedPtr<Flow> >::iterator flow_it = _flows.begin();
  1179. while (flow_it != _flows.end()) {
  1180. if (flow_it->second->assignedPath() == _paths[i]) {
  1181. if (assignFlowToBondedPath(flow_it->second, now)) {
  1182. _paths[i]->_assignedFlowCount--;
  1183. }
  1184. }
  1185. ++flow_it;
  1186. }
  1187. _paths[i]->_shouldReallocateFlows = false;
  1188. }
  1189. }
  1190. }
  1191. }
  1192. /**
  1193. * Tasks specific to (Balance Round Robin)
  1194. */
  1195. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) {
  1196. // Nothing
  1197. }
  1198. /**
  1199. * Tasks specific to (Balance XOR)
  1200. */
  1201. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR) {
  1202. // Nothing
  1203. }
  1204. /**
  1205. * Tasks specific to (Balance Aware)
  1206. */
  1207. if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) {
  1208. if (_allowFlowHashing) {
  1209. Mutex::Lock _l(_flows_m);
  1210. if (_flowRebalanceStrategy == ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_PASSIVE) {
  1211. // Do nothing here, this is taken care of in the more general case above.
  1212. }
  1213. if (_flowRebalanceStrategy == ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_OPPORTUNISTIC) {
  1214. // If the flow is temporarily inactive we should take this opportunity to re-assign the flow if needed.
  1215. }
  1216. if (_flowRebalanceStrategy == ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_AGGRESSIVE) {
  1217. /**
  1218. * Return flows to the original path if it has once again become available
  1219. */
  1220. if ((now - _lastFlowRebalance) > ZT_FLOW_REBALANCE_INTERVAL) {
  1221. std::map<int32_t, SharedPtr<Flow> >::iterator flow_it = _flows.begin();
  1222. while (flow_it != _flows.end()) {
  1223. if (flow_it->second->_previouslyAssignedPath && flow_it->second->_previouslyAssignedPath->eligible(now, _ackSendInterval) && (flow_it->second->_previouslyAssignedPath->_allocation >= (minimumAllocationValue * 2))) {
  1224. // fprintf(stderr, "moving flow back onto its previous path assignment (based on eligibility)\n");
  1225. (flow_it->second->_assignedPath->_assignedFlowCount)--;
  1226. flow_it->second->assignPath(flow_it->second->_previouslyAssignedPath, now);
  1227. (flow_it->second->_previouslyAssignedPath->_assignedFlowCount)++;
  1228. }
  1229. ++flow_it;
  1230. }
  1231. _lastFlowRebalance = now;
  1232. }
  1233. /**
  1234. * Return flows to the original path if it has once again become (performant)
  1235. */
  1236. if ((now - _lastFlowRebalance) > ZT_FLOW_REBALANCE_INTERVAL) {
  1237. std::map<int32_t, SharedPtr<Flow> >::iterator flow_it = _flows.begin();
  1238. while (flow_it != _flows.end()) {
  1239. if (flow_it->second->_previouslyAssignedPath && flow_it->second->_previouslyAssignedPath->eligible(now, _ackSendInterval) && (flow_it->second->_previouslyAssignedPath->_allocation >= (minimumAllocationValue * 2))) {
  1240. // fprintf(stderr, "moving flow back onto its previous path assignment (based on performance)\n");
  1241. (flow_it->second->_assignedPath->_assignedFlowCount)--;
  1242. flow_it->second->assignPath(flow_it->second->_previouslyAssignedPath, now);
  1243. (flow_it->second->_previouslyAssignedPath->_assignedFlowCount)++;
  1244. }
  1245. ++flow_it;
  1246. }
  1247. _lastFlowRebalance = now;
  1248. }
  1249. }
  1250. }
  1251. else if (! _allowFlowHashing) {
  1252. // Nothing
  1253. }
  1254. }
  1255. }
  1256. void Bond::dequeueNextActiveBackupPath(const uint64_t now)
  1257. {
  1258. if (_abFailoverQueue.empty()) {
  1259. return;
  1260. }
  1261. _abPath = _abFailoverQueue.front();
  1262. _abFailoverQueue.pop_front();
  1263. _lastActiveBackupPathChange = now;
  1264. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1265. if (_paths[i]) {
  1266. _paths[i]->resetPacketCounts();
  1267. }
  1268. }
  1269. }
  1270. bool Bond::abForciblyRotateLink()
  1271. {
  1272. char traceMsg[256];
  1273. char prevPathStr[128];
  1274. char curPathStr[128];
  1275. if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) {
  1276. SharedPtr<Path> prevPath = _abPath;
  1277. _abPath->address().toString(prevPathStr);
  1278. dequeueNextActiveBackupPath(RR->node->now());
  1279. _abPath->address().toString(curPathStr);
  1280. sprintf(
  1281. traceMsg,
  1282. "%s (active-backup) Forcibly rotating peer %llx link from %s/%s to %s/%s",
  1283. OSUtils::humanReadableTimestamp().c_str(),
  1284. (unsigned long long)(_peer->_id.address().toInt()),
  1285. getLink(prevPath)->ifname().c_str(),
  1286. prevPathStr,
  1287. getLink(_abPath)->ifname().c_str(),
  1288. curPathStr);
  1289. RR->t->bondStateMessage(NULL, traceMsg);
  1290. return true;
  1291. }
  1292. return false;
  1293. }
  1294. void Bond::processActiveBackupTasks(void* tPtr, const int64_t now)
  1295. {
  1296. char traceMsg[256];
  1297. char pathStr[128];
  1298. char prevPathStr[128];
  1299. char curPathStr[128];
  1300. SharedPtr<Path> prevActiveBackupPath = _abPath;
  1301. SharedPtr<Path> nonPreferredPath;
  1302. bool bFoundPrimaryLink = false;
  1303. /**
  1304. * Generate periodic status report
  1305. */
  1306. if ((now - _lastBondStatusLog) > ZT_MULTIPATH_BOND_STATUS_INTERVAL) {
  1307. _lastBondStatusLog = now;
  1308. if (_abPath) {
  1309. _abPath->address().toString(curPathStr);
  1310. sprintf(
  1311. traceMsg,
  1312. "%s (active-backup) Active link to peer %llx is %s/%s, failover queue size is %zu",
  1313. OSUtils::humanReadableTimestamp().c_str(),
  1314. (unsigned long long)(_peer->_id.address().toInt()),
  1315. getLink(_abPath)->ifname().c_str(),
  1316. curPathStr,
  1317. _abFailoverQueue.size());
  1318. RR->t->bondStateMessage(NULL, traceMsg);
  1319. }
  1320. else {
  1321. sprintf(traceMsg, "%s (active-backup) No active link to peer %llx", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()));
  1322. RR->t->bondStateMessage(NULL, traceMsg);
  1323. }
  1324. if (_abFailoverQueue.empty()) {
  1325. sprintf(traceMsg, "%s (active-backup) Failover queue is empty, bond to peer %llx is NOT currently fault-tolerant", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()));
  1326. RR->t->bondStateMessage(NULL, traceMsg);
  1327. }
  1328. }
  1329. /**
  1330. * Select initial "active" active-backup link
  1331. */
  1332. if (! _abPath) {
  1333. /**
  1334. * [Automatic mode]
  1335. * The user has not explicitly specified links or their failover schedule,
  1336. * the bonding policy will now select the first eligible path and set it as
  1337. * its active backup path, if a substantially better path is detected the bonding
  1338. * policy will assign it as the new active backup path. If the path fails it will
  1339. * simply find the next eligible path.
  1340. */
  1341. if (! userHasSpecifiedLinks()) {
  1342. sprintf(traceMsg, "%s (active-backup) No links to peer %llx specified. Searching...", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()));
  1343. RR->t->bondStateMessage(NULL, traceMsg);
  1344. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1345. if (_paths[i] && _paths[i]->eligible(now, _ackSendInterval)) {
  1346. _paths[i]->address().toString(curPathStr);
  1347. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  1348. if (link) {
  1349. sprintf(traceMsg, "%s (active-backup) Found eligible link %s/%s to peer %llx", OSUtils::humanReadableTimestamp().c_str(), getLink(_paths[i])->ifname().c_str(), curPathStr, (unsigned long long)(_peer->_id.address().toInt()));
  1350. RR->t->bondStateMessage(NULL, traceMsg);
  1351. }
  1352. _abPath = _paths[i];
  1353. break;
  1354. }
  1355. }
  1356. }
  1357. /**
  1358. * [Manual mode]
  1359. * The user has specified links or failover rules that the bonding policy should adhere to.
  1360. */
  1361. else if (userHasSpecifiedLinks()) {
  1362. if (userHasSpecifiedPrimaryLink()) {
  1363. // sprintf(traceMsg, "%s (active-backup) Checking local.conf for user-specified primary link\n", OSUtils::humanReadableTimestamp().c_str());
  1364. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1365. if (! _paths[i]) {
  1366. continue;
  1367. }
  1368. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  1369. if (_paths[i]->eligible(now, _ackSendInterval) && link->primary()) {
  1370. if (! _paths[i]->preferred()) {
  1371. _paths[i]->address().toString(curPathStr);
  1372. // Found path on primary link, take note in case we don't find a preferred path
  1373. nonPreferredPath = _paths[i];
  1374. bFoundPrimaryLink = true;
  1375. }
  1376. if (_paths[i]->preferred()) {
  1377. _abPath = _paths[i];
  1378. _abPath->address().toString(curPathStr);
  1379. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  1380. bFoundPrimaryLink = true;
  1381. break; // Found preferred path %s on primary link
  1382. }
  1383. }
  1384. }
  1385. if (_abPath) {
  1386. _abPath->address().toString(curPathStr);
  1387. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _abPath->localSocket());
  1388. if (link) {
  1389. sprintf(traceMsg, "%s (active-backup) Found preferred primary link %s/%s to peer %llx", OSUtils::humanReadableTimestamp().c_str(), getLink(_abPath)->ifname().c_str(), curPathStr, (unsigned long long)(_peer->_id.address().toInt()));
  1390. RR->t->bondStateMessage(NULL, traceMsg);
  1391. }
  1392. }
  1393. else {
  1394. if (bFoundPrimaryLink && nonPreferredPath) {
  1395. sprintf(traceMsg, "%s (active-backup) Found non-preferred primary link to peer %llx", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()));
  1396. RR->t->bondStateMessage(NULL, traceMsg);
  1397. _abPath = nonPreferredPath;
  1398. }
  1399. }
  1400. if (! _abPath) {
  1401. sprintf(traceMsg, "%s (active-backup) Designated primary link to peer %llx is not yet ready", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()));
  1402. RR->t->bondStateMessage(NULL, traceMsg);
  1403. // TODO: Should wait for some time (failover interval?) and then switch to spare link
  1404. }
  1405. }
  1406. else if (! userHasSpecifiedPrimaryLink()) {
  1407. int _abIdx = ZT_MAX_PEER_NETWORK_PATHS;
  1408. sprintf(traceMsg, "%s (active-backup) User did not specify a primary link to peer %llx, selecting first available link", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()));
  1409. RR->t->bondStateMessage(NULL, traceMsg);
  1410. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1411. if (_paths[i] && _paths[i]->eligible(now, _ackSendInterval)) {
  1412. _abIdx = i;
  1413. break;
  1414. }
  1415. }
  1416. if (_abIdx == ZT_MAX_PEER_NETWORK_PATHS) {
  1417. // Unable to find a candidate next-best, no change
  1418. }
  1419. else {
  1420. _abPath = _paths[_abIdx];
  1421. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _abPath->localSocket());
  1422. if (link) {
  1423. _abPath->address().toString(curPathStr);
  1424. sprintf(traceMsg, "%s (active-backup) Selected non-primary link %s/%s to peer %llx", OSUtils::humanReadableTimestamp().c_str(), getLink(_abPath)->ifname().c_str(), curPathStr, (unsigned long long)(_peer->_id.address().toInt()));
  1425. RR->t->bondStateMessage(NULL, traceMsg);
  1426. }
  1427. }
  1428. }
  1429. }
  1430. }
  1431. /**
  1432. * Update and maintain the active-backup failover queue
  1433. */
  1434. if (_abPath) {
  1435. // Don't worry about the failover queue until we have an active link
  1436. // Remove ineligible paths from the failover link queue
  1437. for (std::list<SharedPtr<Path> >::iterator it(_abFailoverQueue.begin()); it != _abFailoverQueue.end();) {
  1438. if ((*it) && ! (*it)->eligible(now, _ackSendInterval)) {
  1439. (*it)->address().toString(curPathStr);
  1440. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, (*it)->localSocket());
  1441. it = _abFailoverQueue.erase(it);
  1442. if (link) {
  1443. sprintf(
  1444. traceMsg,
  1445. "%s (active-backup) Link %s/%s to peer %llx is now ineligible, removing from failover queue, there are %zu links in the queue",
  1446. OSUtils::humanReadableTimestamp().c_str(),
  1447. getLink(_abPath)->ifname().c_str(),
  1448. curPathStr,
  1449. (unsigned long long)(_peer->_id.address().toInt()),
  1450. _abFailoverQueue.size());
  1451. RR->t->bondStateMessage(NULL, traceMsg);
  1452. }
  1453. }
  1454. else {
  1455. ++it;
  1456. }
  1457. }
  1458. /**
  1459. * Failover instructions were provided by user, build queue according those as well as IPv
  1460. * preference, disregarding performance.
  1461. */
  1462. if (userHasSpecifiedFailoverInstructions()) {
  1463. /**
  1464. * Clear failover scores
  1465. */
  1466. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1467. if (_paths[i]) {
  1468. _paths[i]->_failoverScore = 0;
  1469. }
  1470. }
  1471. // Follow user-specified failover instructions
  1472. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1473. if (! _paths[i] || ! _paths[i]->allowed() || ! _paths[i]->eligible(now, _ackSendInterval)) {
  1474. continue;
  1475. }
  1476. SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket());
  1477. _paths[i]->address().toString(pathStr);
  1478. int failoverScoreHandicap = _paths[i]->_failoverScore;
  1479. if (_paths[i]->preferred()) {
  1480. failoverScoreHandicap += ZT_MULTIPATH_FAILOVER_HANDICAP_PREFERRED;
  1481. }
  1482. if (link->primary()) {
  1483. // If using "optimize" primary reselect mode, ignore user link designations
  1484. failoverScoreHandicap += ZT_MULTIPATH_FAILOVER_HANDICAP_PRIMARY;
  1485. }
  1486. if (! _paths[i]->_failoverScore) {
  1487. // If we didn't inherit a failover score from a "parent" that wants to use this path as a failover
  1488. int newHandicap = failoverScoreHandicap ? failoverScoreHandicap : _paths[i]->_allocation;
  1489. _paths[i]->_failoverScore = newHandicap;
  1490. }
  1491. SharedPtr<Link> failoverLink;
  1492. if (link->failoverToLink().length()) {
  1493. failoverLink = RR->bc->getLinkByName(_policyAlias, link->failoverToLink());
  1494. }
  1495. if (failoverLink) {
  1496. for (int j = 0; j < ZT_MAX_PEER_NETWORK_PATHS; j++) {
  1497. if (_paths[j] && getLink(_paths[j]) == failoverLink.ptr()) {
  1498. _paths[j]->address().toString(pathStr);
  1499. int inheritedHandicap = failoverScoreHandicap - 10;
  1500. int newHandicap = _paths[j]->_failoverScore > inheritedHandicap ? _paths[j]->_failoverScore : inheritedHandicap;
  1501. if (! _paths[j]->preferred()) {
  1502. newHandicap--;
  1503. }
  1504. _paths[j]->_failoverScore = newHandicap;
  1505. }
  1506. }
  1507. }
  1508. if (_paths[i].ptr() != _abPath.ptr()) {
  1509. bool bFoundPathInQueue = false;
  1510. for (std::list<SharedPtr<Path> >::iterator it(_abFailoverQueue.begin()); it != _abFailoverQueue.end(); ++it) {
  1511. if (_paths[i].ptr() == (*it).ptr()) {
  1512. bFoundPathInQueue = true;
  1513. }
  1514. }
  1515. if (! bFoundPathInQueue) {
  1516. _abFailoverQueue.push_front(_paths[i]);
  1517. _paths[i]->address().toString(curPathStr);
  1518. sprintf(
  1519. traceMsg,
  1520. "%s (active-backup) Added link %s/%s to peer %llx to failover queue, there are %zu links in the queue",
  1521. OSUtils::humanReadableTimestamp().c_str(),
  1522. getLink(_abPath)->ifname().c_str(),
  1523. curPathStr,
  1524. (unsigned long long)(_peer->_id.address().toInt()),
  1525. _abFailoverQueue.size());
  1526. RR->t->bondStateMessage(NULL, traceMsg);
  1527. }
  1528. }
  1529. }
  1530. }
  1531. /**
  1532. * No failover instructions provided by user, build queue according to performance
  1533. * and IPv preference.
  1534. */
  1535. else if (! userHasSpecifiedFailoverInstructions()) {
  1536. for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
  1537. if (! _paths[i] || ! _paths[i]->allowed() || ! _paths[i]->eligible(now, _ackSendInterval)) {
  1538. continue;
  1539. }
  1540. int failoverScoreHandicap = 0;
  1541. if (_paths[i]->preferred()) {
  1542. failoverScoreHandicap = ZT_MULTIPATH_FAILOVER_HANDICAP_PREFERRED;
  1543. }
  1544. bool includeRefractoryPeriod = true;
  1545. if (! _paths[i]->eligible(now, includeRefractoryPeriod)) {
  1546. failoverScoreHandicap = -10000;
  1547. }
  1548. if (getLink(_paths[i])->primary() && _abLinkSelectMethod != ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE) {
  1549. // If using "optimize" primary reselect mode, ignore user link designations
  1550. failoverScoreHandicap = ZT_MULTIPATH_FAILOVER_HANDICAP_PRIMARY;
  1551. }
  1552. if (_paths[i].ptr() == negotiatedPath.ptr()) {
  1553. _paths[i]->_negotiated = true;
  1554. failoverScoreHandicap = ZT_MULTIPATH_FAILOVER_HANDICAP_NEGOTIATED;
  1555. }
  1556. else {
  1557. _paths[i]->_negotiated = false;
  1558. }
  1559. _paths[i]->_failoverScore = _paths[i]->_allocation + failoverScoreHandicap;
  1560. if (_paths[i].ptr() != _abPath.ptr()) {
  1561. bool bFoundPathInQueue = false;
  1562. for (std::list<SharedPtr<Path> >::iterator it(_abFailoverQueue.begin()); it != _abFailoverQueue.end(); ++it) {
  1563. if (_paths[i].ptr() == (*it).ptr()) {
  1564. bFoundPathInQueue = true;
  1565. }
  1566. }
  1567. if (! bFoundPathInQueue) {
  1568. _abFailoverQueue.push_front(_paths[i]);
  1569. _paths[i]->address().toString(curPathStr);
  1570. sprintf(
  1571. traceMsg,
  1572. "%s (active-backup) Added link %s/%s to peer %llx to failover queue, there are %zu links in the queue",
  1573. OSUtils::humanReadableTimestamp().c_str(),
  1574. getLink(_paths[i])->ifname().c_str(),
  1575. curPathStr,
  1576. (unsigned long long)(_peer->_id.address().toInt()),
  1577. _abFailoverQueue.size());
  1578. RR->t->bondStateMessage(NULL, traceMsg);
  1579. }
  1580. }
  1581. }
  1582. }
  1583. _abFailoverQueue.sort(PathQualityComparator());
  1584. }
  1585. /**
  1586. * Short-circuit if we have no queued paths
  1587. */
  1588. if (_abFailoverQueue.empty()) {
  1589. return;
  1590. }
  1591. /**
  1592. * Fulfill primary reselect obligations
  1593. */
  1594. if (_abPath && ! _abPath->eligible(now, _ackSendInterval)) { // Implicit ZT_MULTIPATH_RESELECTION_POLICY_FAILURE
  1595. _abPath->address().toString(curPathStr);
  1596. sprintf(
  1597. traceMsg,
  1598. "%s (active-backup) Link %s/%s to peer %llx has failed. Selecting new link from failover queue, there are %zu links in the queue",
  1599. OSUtils::humanReadableTimestamp().c_str(),
  1600. getLink(_abPath)->ifname().c_str(),
  1601. curPathStr,
  1602. (unsigned long long)(_peer->_id.address().toInt()),
  1603. _abFailoverQueue.size());
  1604. RR->t->bondStateMessage(NULL, traceMsg);
  1605. if (! _abFailoverQueue.empty()) {
  1606. dequeueNextActiveBackupPath(now);
  1607. _abPath->address().toString(curPathStr);
  1608. sprintf(traceMsg, "%s (active-backup) Active link to peer %llx has been switched to %s/%s", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()), getLink(_abPath)->ifname().c_str(), curPathStr);
  1609. RR->t->bondStateMessage(NULL, traceMsg);
  1610. }
  1611. else {
  1612. sprintf(traceMsg, "%s (active-backup) Failover queue is empty. No links to peer %llx to choose from", OSUtils::humanReadableTimestamp().c_str(), (unsigned long long)(_peer->_id.address().toInt()));
  1613. RR->t->bondStateMessage(NULL, traceMsg);
  1614. }
  1615. }
  1616. /**
  1617. * Detect change to prevent flopping during later optimization step.
  1618. */
  1619. if (prevActiveBackupPath != _abPath) {
  1620. _lastActiveBackupPathChange = now;
  1621. }
  1622. if (_abLinkSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_ALWAYS) {
  1623. if (_abPath && ! getLink(_abPath)->primary() && getLink(_abFailoverQueue.front())->primary()) {
  1624. dequeueNextActiveBackupPath(now);
  1625. _abPath->address().toString(curPathStr);
  1626. sprintf(
  1627. traceMsg,
  1628. "%s (active-backup) Switching back to available primary link %s/%s to peer %llx [linkSelectionMethod = always]",
  1629. OSUtils::humanReadableTimestamp().c_str(),
  1630. getLink(_abPath)->ifname().c_str(),
  1631. curPathStr,
  1632. (unsigned long long)(_peer->_id.address().toInt()));
  1633. RR->t->bondStateMessage(NULL, traceMsg);
  1634. }
  1635. }
  1636. if (_abLinkSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_BETTER) {
  1637. if (_abPath && ! getLink(_abPath)->primary()) {
  1638. // Active backup has switched to "better" primary link according to re-select policy.
  1639. if (getLink(_abFailoverQueue.front())->primary() && (_abFailoverQueue.front()->_failoverScore > _abPath->_failoverScore)) {
  1640. dequeueNextActiveBackupPath(now);
  1641. _abPath->address().toString(curPathStr);
  1642. sprintf(
  1643. traceMsg,
  1644. "%s (active-backup) Switching back to user-defined primary link %s/%s to peer %llx [linkSelectionMethod = better]",
  1645. OSUtils::humanReadableTimestamp().c_str(),
  1646. getLink(_abPath)->ifname().c_str(),
  1647. curPathStr,
  1648. (unsigned long long)(_peer->_id.address().toInt()));
  1649. RR->t->bondStateMessage(NULL, traceMsg);
  1650. }
  1651. }
  1652. }
  1653. if (_abLinkSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE && ! _abFailoverQueue.empty()) {
  1654. /**
  1655. * Implement link negotiation that was previously-decided
  1656. */
  1657. if (_abFailoverQueue.front()->_negotiated) {
  1658. dequeueNextActiveBackupPath(now);
  1659. _abPath->address().toString(prevPathStr);
  1660. _lastPathNegotiationCheck = now;
  1661. _abPath->address().toString(curPathStr);
  1662. sprintf(
  1663. traceMsg,
  1664. "%s (active-backup) Switching negotiated link %s/%s to peer %llx [linkSelectionMethod = optimize]",
  1665. OSUtils::humanReadableTimestamp().c_str(),
  1666. getLink(_abPath)->ifname().c_str(),
  1667. curPathStr,
  1668. (unsigned long long)(_peer->_id.address().toInt()));
  1669. RR->t->bondStateMessage(NULL, traceMsg);
  1670. }
  1671. else {
  1672. // Try to find a better path and automatically switch to it -- not too often, though.
  1673. if ((now - _lastActiveBackupPathChange) > ZT_MULTIPATH_MIN_ACTIVE_BACKUP_AUTOFLOP_INTERVAL) {
  1674. if (! _abFailoverQueue.empty()) {
  1675. int newFScore = _abFailoverQueue.front()->_failoverScore;
  1676. int prevFScore = _abPath->_failoverScore;
  1677. // Establish a minimum switch threshold to prevent flapping
  1678. int failoverScoreDifference = _abFailoverQueue.front()->_failoverScore - _abPath->_failoverScore;
  1679. int thresholdQuantity = (ZT_MULTIPATH_ACTIVE_BACKUP_OPTIMIZE_MIN_THRESHOLD * (float)_abPath->_allocation);
  1680. if ((failoverScoreDifference > 0) && (failoverScoreDifference > thresholdQuantity)) {
  1681. SharedPtr<Path> oldPath = _abPath;
  1682. _abPath->address().toString(prevPathStr);
  1683. dequeueNextActiveBackupPath(now);
  1684. _abPath->address().toString(curPathStr);
  1685. sprintf(
  1686. traceMsg,
  1687. "%s (active-backup) Switching from %s/%s (fscore=%d) to better link %s/%s (fscore=%d) for peer %llx [linkSelectionMethod = optimize]",
  1688. OSUtils::humanReadableTimestamp().c_str(),
  1689. getLink(oldPath)->ifname().c_str(),
  1690. prevPathStr,
  1691. prevFScore,
  1692. getLink(_abPath)->ifname().c_str(),
  1693. curPathStr,
  1694. newFScore,
  1695. (unsigned long long)(_peer->_id.address().toInt()));
  1696. RR->t->bondStateMessage(NULL, traceMsg);
  1697. }
  1698. }
  1699. }
  1700. }
  1701. }
  1702. }
  1703. void Bond::setReasonableDefaults(int policy, SharedPtr<Bond> templateBond, bool useTemplate)
  1704. {
  1705. // If invalid bonding policy, try default
  1706. int _defaultBondingPolicy = BondController::defaultBondingPolicy();
  1707. if (policy <= ZT_BONDING_POLICY_NONE || policy > ZT_BONDING_POLICY_BALANCE_AWARE) {
  1708. // If no default set, use NONE (effectively disabling this bond)
  1709. if (_defaultBondingPolicy < ZT_BONDING_POLICY_NONE || _defaultBondingPolicy > ZT_BONDING_POLICY_BALANCE_AWARE) {
  1710. _bondingPolicy = ZT_BONDING_POLICY_NONE;
  1711. }
  1712. _bondingPolicy = _defaultBondingPolicy;
  1713. }
  1714. else {
  1715. _bondingPolicy = policy;
  1716. }
  1717. _freeRandomByte = 0;
  1718. _userHasSpecifiedPrimaryLink = false;
  1719. _userHasSpecifiedFailoverInstructions = false;
  1720. _isHealthy = false;
  1721. _numAliveLinks = 0;
  1722. _numTotalLinks = 0;
  1723. _downDelay = 0;
  1724. _upDelay = 0;
  1725. _allowFlowHashing = false;
  1726. _bondMonitorInterval = 0;
  1727. _shouldCollectPathStatistics = false;
  1728. // Path negotiation
  1729. _allowPathNegotiation = false;
  1730. _pathNegotiationCutoffCount = 0;
  1731. _localUtility = 0;
  1732. _numBondedPaths = 0;
  1733. _rrPacketsSentOnCurrLink = 0;
  1734. _rrIdx = 0;
  1735. _totalBondUnderload = 0;
  1736. _maxAcceptableLatency = 100;
  1737. _maxAcceptablePacketDelayVariance = 50;
  1738. _maxAcceptablePacketLossRatio = 0.10;
  1739. _maxAcceptablePacketErrorRatio = 0.10;
  1740. _userHasSpecifiedLinkSpeeds = 0;
  1741. /* ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_PASSIVE is the most conservative strategy and is
  1742. least likely to cause unexpected behavior */
  1743. _flowRebalanceStrategy = ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_AGGRESSIVE;
  1744. /**
  1745. * Paths are actively monitored to provide a real-time quality/preference-ordered rapid failover queue.
  1746. */
  1747. switch (policy) {
  1748. case ZT_BONDING_POLICY_ACTIVE_BACKUP:
  1749. _failoverInterval = 500;
  1750. _abLinkSelectMethod = ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE;
  1751. _linkMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC;
  1752. _qualityWeights[ZT_QOS_LAT_IDX] = 0.2f;
  1753. _qualityWeights[ZT_QOS_LTM_IDX] = 0.0f;
  1754. _qualityWeights[ZT_QOS_PDV_IDX] = 0.2f;
  1755. _qualityWeights[ZT_QOS_PLR_IDX] = 0.2f;
  1756. _qualityWeights[ZT_QOS_PER_IDX] = 0.2f;
  1757. _qualityWeights[ZT_QOS_THR_IDX] = 0.2f;
  1758. _qualityWeights[ZT_QOS_THM_IDX] = 0.0f;
  1759. _qualityWeights[ZT_QOS_THV_IDX] = 0.0f;
  1760. _qualityWeights[ZT_QOS_SCP_IDX] = 0.0f;
  1761. break;
  1762. /**
  1763. * All seemingly-alive paths are used. Paths are not actively monitored.
  1764. */
  1765. case ZT_BONDING_POLICY_BROADCAST:
  1766. _downDelay = 30000;
  1767. _upDelay = 0;
  1768. break;
  1769. /**
  1770. * Paths are monitored to determine when/if one needs to be added or removed from the rotation
  1771. */
  1772. case ZT_BONDING_POLICY_BALANCE_RR:
  1773. _failoverInterval = 3000;
  1774. _allowFlowHashing = false;
  1775. _packetsPerLink = 1024;
  1776. _linkMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC;
  1777. _qualityWeights[ZT_QOS_LAT_IDX] = 0.4f;
  1778. _qualityWeights[ZT_QOS_LTM_IDX] = 0.0f;
  1779. _qualityWeights[ZT_QOS_PDV_IDX] = 0.2f;
  1780. _qualityWeights[ZT_QOS_PLR_IDX] = 0.1f;
  1781. _qualityWeights[ZT_QOS_PER_IDX] = 0.1f;
  1782. _qualityWeights[ZT_QOS_THR_IDX] = 0.1f;
  1783. _qualityWeights[ZT_QOS_THM_IDX] = 0.0f;
  1784. _qualityWeights[ZT_QOS_THV_IDX] = 0.0f;
  1785. _qualityWeights[ZT_QOS_SCP_IDX] = 0.0f;
  1786. break;
  1787. /**
  1788. * Path monitoring is used to determine the capacity of each
  1789. * path and where to place the next flow.
  1790. */
  1791. case ZT_BONDING_POLICY_BALANCE_XOR:
  1792. _failoverInterval = 3000;
  1793. _upDelay = _bondMonitorInterval * 2;
  1794. _allowFlowHashing = true;
  1795. _linkMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC;
  1796. _qualityWeights[ZT_QOS_LAT_IDX] = 0.4f;
  1797. _qualityWeights[ZT_QOS_LTM_IDX] = 0.0f;
  1798. _qualityWeights[ZT_QOS_PDV_IDX] = 0.2f;
  1799. _qualityWeights[ZT_QOS_PLR_IDX] = 0.1f;
  1800. _qualityWeights[ZT_QOS_PER_IDX] = 0.1f;
  1801. _qualityWeights[ZT_QOS_THR_IDX] = 0.1f;
  1802. _qualityWeights[ZT_QOS_THM_IDX] = 0.0f;
  1803. _qualityWeights[ZT_QOS_THV_IDX] = 0.0f;
  1804. _qualityWeights[ZT_QOS_SCP_IDX] = 0.0f;
  1805. break;
  1806. /**
  1807. * Path monitoring is used to determine the capacity of each
  1808. * path and where to place the next flow. Additionally, re-shuffling
  1809. * of flows may take place.
  1810. */
  1811. case ZT_BONDING_POLICY_BALANCE_AWARE:
  1812. _failoverInterval = 3000;
  1813. _allowFlowHashing = true;
  1814. _linkMonitorStrategy = ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC;
  1815. _qualityWeights[ZT_QOS_LAT_IDX] = 0.4f;
  1816. _qualityWeights[ZT_QOS_LTM_IDX] = 0.0f;
  1817. _qualityWeights[ZT_QOS_PDV_IDX] = 0.4f;
  1818. _qualityWeights[ZT_QOS_PLR_IDX] = 0.2f;
  1819. _qualityWeights[ZT_QOS_PER_IDX] = 0.0f;
  1820. _qualityWeights[ZT_QOS_THR_IDX] = 0.0f;
  1821. _qualityWeights[ZT_QOS_THM_IDX] = 0.0f;
  1822. _qualityWeights[ZT_QOS_THV_IDX] = 0.0f;
  1823. _qualityWeights[ZT_QOS_SCP_IDX] = 0.0f;
  1824. break;
  1825. default:
  1826. break;
  1827. }
  1828. /* If a user has specified custom parameters for this bonding policy, overlay
  1829. them onto the defaults that were previously set */
  1830. if (useTemplate) {
  1831. _policyAlias = templateBond->_policyAlias;
  1832. _failoverInterval = templateBond->_failoverInterval >= 250 ? templateBond->_failoverInterval : _failoverInterval;
  1833. _downDelay = templateBond->_downDelay;
  1834. _upDelay = templateBond->_upDelay;
  1835. if (templateBond->_linkMonitorStrategy == ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_PASSIVE && templateBond->_failoverInterval != 0) {
  1836. // fprintf(stderr, "warning: passive path monitoring was specified, this will prevent failovers from happening in a timely manner.\n");
  1837. }
  1838. _abLinkSelectMethod = templateBond->_abLinkSelectMethod;
  1839. memcpy(_qualityWeights, templateBond->_qualityWeights, ZT_QOS_WEIGHT_SIZE * sizeof(float));
  1840. }
  1841. /* Set timer geometries */
  1842. _bondMonitorInterval = _failoverInterval / 3;
  1843. BondController::setMinReqPathMonitorInterval(_bondMonitorInterval);
  1844. _ackSendInterval = _failoverInterval;
  1845. _qualityEstimationInterval = _failoverInterval * 2;
  1846. _dynamicPathMonitorInterval = 0;
  1847. _ackCutoffCount = 0;
  1848. _qosSendInterval = _bondMonitorInterval * 4;
  1849. _qosCutoffCount = 0;
  1850. throughputMeasurementInterval = _ackSendInterval * 2;
  1851. _defaultPathRefractoryPeriod = 8000;
  1852. }
  1853. void Bond::setUserQualityWeights(float weights[], int len)
  1854. {
  1855. if (len == ZT_QOS_WEIGHT_SIZE) {
  1856. float weightTotal = 0.0;
  1857. for (unsigned int i = 0; i < ZT_QOS_WEIGHT_SIZE; ++i) {
  1858. weightTotal += weights[i];
  1859. }
  1860. if (weightTotal > 0.99 && weightTotal < 1.01) {
  1861. memcpy(_qualityWeights, weights, len * sizeof(float));
  1862. }
  1863. }
  1864. }
  1865. bool Bond::relevant()
  1866. {
  1867. return false;
  1868. }
  1869. SharedPtr<Link> Bond::getLink(const SharedPtr<Path>& path)
  1870. {
  1871. return RR->bc->getLinkBySocket(_policyAlias, path->localSocket());
  1872. }
  1873. void Bond::dumpInfo(const int64_t now)
  1874. {
  1875. // Omitted
  1876. }
  1877. } // namespace ZeroTier