Przeglądaj źródła

Add setmtu command, fix bond lifetime issue

Joseph Henry 1 rok temu
rodzic
commit
b8b5d0bff8
10 zmienionych plików z 228 dodań i 43 usunięć
  1. 6 1
      include/ZeroTierOne.h
  2. 42 4
      node/Bond.cpp
  3. 20 0
      node/Bond.hpp
  4. 8 20
      node/IncomingPacket.cpp
  5. 5 4
      node/Node.cpp
  6. 8 0
      node/Path.hpp
  7. 16 7
      node/Peer.cpp
  8. 80 1
      node/Peer.hpp
  9. 29 3
      one.cpp
  10. 14 3
      service/OneService.cpp

+ 6 - 1
include/ZeroTierOne.h

@@ -1194,7 +1194,7 @@ typedef struct
 		uint64_t mac; /* MAC in lower 48 bits */
 		uint32_t adi; /* Additional distinguishing information, usually zero except for IPv4 ARP groups */
 	} multicastSubscriptions[ZT_MAX_MULTICAST_SUBSCRIPTIONS];
-	
+
 	/**
 	 * Network specific DNS configuration
 	 */
@@ -1327,6 +1327,11 @@ typedef struct
 	 */
 	float packetErrorRatio;
 
+	/**
+	 * Number of flows assigned to this path
+	 */
+	uint16_t assignedFlowCount;
+
 	/**
 	 * Address scope
 	 */

+ 42 - 4
node/Bond.cpp

@@ -102,6 +102,43 @@ SharedPtr<Bond> Bond::getBondByPeerId(int64_t identity)
 	return _bonds.count(identity) ? _bonds[identity] : SharedPtr<Bond>();
 }
 
+bool Bond::setAllMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::string& ipStr)
+{
+	Mutex::Lock _l(_bonds_m);
+	std::map<int64_t, SharedPtr<Bond> >::iterator bondItr = _bonds.begin();
+	bool found = false;
+	while (bondItr != _bonds.end()) {
+		if (bondItr->second->setMtuByTuple(mtu,ifStr,ipStr)) {
+			found = true;
+		}
+		++bondItr;
+	}
+	return found;
+}
+
+bool Bond::setMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::string& ipStr)
+{
+	Mutex::Lock _lp(_paths_m);
+	bool found = false;
+	for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
+		if (_paths[i].p) {
+			SharedPtr<Link> sl = getLink(_paths[i].p);
+			if (sl) {
+				if (sl->ifname() == ifStr) {
+					char ipBuf[64] = { 0 };
+					_paths[i].p->address().toIpString(ipBuf);
+					std::string newString = std::string(ipBuf);
+					if (newString == ipStr) {
+						_paths[i].p->_mtu = mtu;
+						found = true;
+					}
+				}
+			}
+		}
+	}
+	return found;
+}
+
 SharedPtr<Bond> Bond::createBond(const RuntimeEnvironment* renv, const SharedPtr<Peer>& peer)
 {
 	Mutex::Lock _l(_bonds_m);
@@ -162,8 +199,8 @@ void Bond::destroyBond(uint64_t peerId)
 	auto iter = _bonds.find(peerId);
 	if (iter != _bonds.end()) {
 		iter->second->stopBond();
+		_bonds.erase(iter);
 	}
-	_bonds.erase(peerId);
 }
 
 void Bond::stopBond()
@@ -978,7 +1015,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
 		// Whether we've waited long enough since the link last came online
 		bool satisfiedUpDelay = (now - _paths[i].lastAliveToggle) >= _upDelay;
 		// How long since the last QoS was received (Must be less than ZT_PEER_PATH_EXPIRATION since the remote peer's _qosSendInterval isn't known)
-		bool acceptableQoSAge = _paths[i].lastQoSReceived == 0 || ((now - _paths[i].lastQoSReceived) < ZT_PEER_EXPIRED_PATH_TRIAL_PERIOD);
+		bool acceptableQoSAge = (_paths[i].lastQoSReceived == 0 && inTrial) || ((now - _paths[i].lastQoSReceived) < ZT_PEER_EXPIRED_PATH_TRIAL_PERIOD);
 		currEligibility = _paths[i].allowed() && ((acceptableAge && satisfiedUpDelay && acceptableQoSAge) || inTrial);
 
 		if (currEligibility) {
@@ -1070,7 +1107,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
 
 				// Bond a spare link if required (no viable primary links left)
 				if (! foundUsablePrimaryPath) {
-					debug("no usable primary links remain, will attempt to use spare if available");
+					// debug("no usable primary links remain, will attempt to use spare if available");
 					for (int j = 0; j < it->second.size(); j++) {
 						int idx = it->second.at(j);
 						if (! _paths[idx].p || ! _paths[idx].eligible || ! _paths[idx].allowed() || ! _paths[idx].isSpare()) {
@@ -1244,7 +1281,8 @@ void Bond::estimatePathQuality(int64_t now)
 			if (link) {
 				int linkSpeed = link->capacity();
 				_paths[i].p->_givenLinkSpeed = linkSpeed;
-				_paths[i].p->_mtu = link->mtu();
+				_paths[i].p->_mtu = link->mtu() ? link->mtu() : _paths[i].p->_mtu;
+				_paths[i].p->_assignedFlowCount = _paths[i].assignedFlowCount;
 				maxObservedLinkCap = linkSpeed > maxObservedLinkCap ? linkSpeed : maxObservedLinkCap;
 			}
 		}

+ 20 - 0
node/Bond.hpp

@@ -456,6 +456,26 @@ class Bond {
 	 */
 	static SharedPtr<Bond> getBondByPeerId(int64_t identity);
 
+	/**
+	 * Set MTU for link by given interface name and IP address (across all bonds)
+	 *
+	 * @param mtu MTU to be used on this link
+	 * @param ifStr interface name to match
+	 * @param ipStr IP address to match
+	 * @return Whether the MTU was set
+	 */
+	static bool setAllMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::string& ipStr);
+
+	/**
+	 * Set MTU for link by given interface name and IP address
+	 *
+	 * @param mtu MTU to be used on this link
+	 * @param ifStr interface name to match
+	 * @param ipStr IP address to match
+	 * @return Whether the MTU was set
+	 */
+	bool setMtuByTuple(uint16_t mtu, const std::string& ifStr, const std::string& ipStr);
+
 	/**
 	 * Add a new bond to the bond controller.
 	 *

+ 8 - 20
node/IncomingPacket.cpp

@@ -317,8 +317,7 @@ bool IncomingPacket::_doERROR(const RuntimeEnvironment *RR,void *tPtr,const Shar
 bool IncomingPacket::_doACK(const RuntimeEnvironment* RR, void* tPtr, const SharedPtr<Peer>& peer)
 {
 	/*
-	SharedPtr<Bond> bond = peer->bond();
-	if (! bond || ! bond->rateGateACK(RR->node->now())) {
+	if (! peer->rateGateACK(RR->node->now())) {
 		return true;
 	}
 	int32_t ackedBytes;
@@ -326,9 +325,7 @@ bool IncomingPacket::_doACK(const RuntimeEnvironment* RR, void* tPtr, const Shar
 		return true;   // ignore
 	}
 	memcpy(&ackedBytes, payload(), sizeof(ackedBytes));
-	if (bond) {
-		bond->receivedAck(_path, RR->node->now(), Utils::ntoh(ackedBytes));
-	}
+	peer->receivedAck(_path, RR->node->now(), Utils::ntoh(ackedBytes));
 	*/
 	Metrics::pkt_ack_in++;
 	return true;
@@ -338,7 +335,7 @@ bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment* RR, void* tPtr
 {
 	Metrics::pkt_qos_in++;
 	SharedPtr<Bond> bond = peer->bond();
-	if (! bond || ! bond->rateGateQoS(RR->node->now(), _path)) {
+	if (! peer->rateGateQoS(RR->node->now(), _path)) {
 		return true;
 	}
 	if (payloadLength() > ZT_QOS_MAX_PACKET_SIZE || payloadLength() < ZT_QOS_MIN_PACKET_SIZE) {
@@ -359,9 +356,7 @@ bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment* RR, void* tPtr
 		ptr += sizeof(uint16_t);
 		count++;
 	}
-	if (bond) {
-		bond->receivedQoS(_path, now, count, rx_id, rx_ts);
-	}
+	peer->receivedQoS(_path, now, count, rx_id, rx_ts);
 	return true;
 }
 
@@ -626,10 +621,7 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,void *tPtr,const SharedP
 			}
 
 			if (!hops()) {
-				SharedPtr<Bond> bond = peer->bond();
-				if (!bond) {
-					_path->updateLatency((unsigned int)latency,RR->node->now());
-				}
+				_path->updateLatency((unsigned int)latency,RR->node->now());
 			}
 
 			peer->setRemoteVersion(vProto,vMajor,vMinor,vRevision);
@@ -801,8 +793,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
 {
 	Metrics::pkt_frame_in++;
 	int32_t _flowId = ZT_QOS_NO_FLOW;
-	SharedPtr<Bond> bond = peer->bond();
-	if (bond && bond->flowHashingSupported()) {
+	if (peer->flowHashingSupported()) {
 		if (size() > ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD) {
 			const unsigned int etherType = at<uint16_t>(ZT_PROTO_VERB_FRAME_IDX_ETHERTYPE);
 			const unsigned int frameLen = size() - ZT_PROTO_VERB_FRAME_IDX_PAYLOAD;
@@ -1481,8 +1472,7 @@ bool IncomingPacket::_doPATH_NEGOTIATION_REQUEST(const RuntimeEnvironment *RR,vo
 {
 	Metrics::pkt_path_negotiation_request_in++;
 	uint64_t now = RR->node->now();
-	SharedPtr<Bond> bond = peer->bond();
-	if (!bond || !bond->rateGatePathNegotiation(now, _path)) {
+	if (!peer->rateGatePathNegotiation(now, _path)) {
 		return true;
 	}
 	if (payloadLength() != sizeof(int16_t)) {
@@ -1490,9 +1480,7 @@ bool IncomingPacket::_doPATH_NEGOTIATION_REQUEST(const RuntimeEnvironment *RR,vo
 	}
 	int16_t remoteUtility = 0;
 	memcpy(&remoteUtility, payload(), sizeof(int16_t));
-	if (peer->bond()) {
-		peer->bond()->processIncomingPathNegotiationRequest(now, _path, Utils::ntoh(remoteUtility));
-	}
+	peer->processIncomingPathNegotiationRequest(now, _path, Utils::ntoh(remoteUtility));
 	return true;
 }
 

+ 5 - 4
node/Node.cpp

@@ -589,6 +589,7 @@ ZT_PeerList *Node::peers() const
 					p->paths[p->pathCount].latencyVariance = (*path)->latencyVariance();
 					p->paths[p->pathCount].packetLossRatio = (*path)->packetLossRatio();
 					p->paths[p->pathCount].packetErrorRatio = (*path)->packetErrorRatio();
+					p->paths[p->pathCount].assignedFlowCount = (*path)->assignedFlowCount();
 					p->paths[p->pathCount].relativeQuality = (*path)->relativeQuality();
 					p->paths[p->pathCount].linkSpeed = (*path)->givenLinkSpeed();
 					p->paths[p->pathCount].bonded = (*path)->bonded();
@@ -602,9 +603,9 @@ ZT_PeerList *Node::peers() const
 		}
 		if (pi->second->bond()) {
 			p->isBonded = pi->second->bond();
-			p->bondingPolicy = pi->second->bond()->policy();
-			p->numAliveLinks = pi->second->bond()->getNumAliveLinks();
-			p->numTotalLinks = pi->second->bond()->getNumTotalLinks();
+			p->bondingPolicy = pi->second->bondingPolicy();
+			p->numAliveLinks = pi->second->getNumAliveLinks();
+			p->numTotalLinks = pi->second->getNumTotalLinks();
 		}
 	}
 
@@ -851,7 +852,7 @@ void Node::ncSendError(uint64_t nwid,uint64_t requestPacketId,const Address &des
 			case NetworkController::NC_ERROR_AUTHENTICATION_REQUIRED: {
 				//fprintf(stderr, "\n\nGot auth required\n\n");
 				break;
-			} 
+			}
 
 			default:
 				break;

+ 8 - 0
node/Path.hpp

@@ -89,6 +89,7 @@ public:
 		_latencyVariance(0.0),
 		_packetLossRatio(0.0),
 		_packetErrorRatio(0.0),
+		_assignedFlowCount(0),
 		_valid(true),
 		_eligible(false),
 		_bonded(false),
@@ -110,6 +111,7 @@ public:
 		_latencyVariance(0.0),
 		_packetLossRatio(0.0),
 		_packetErrorRatio(0.0),
+		_assignedFlowCount(0),
 		_valid(true),
 		_eligible(false),
 		_bonded(false),
@@ -320,6 +322,11 @@ public:
 	 */
 	inline float packetErrorRatio() const { return _packetErrorRatio; }
 
+	/**
+	 * @return Number of flows assigned to this path
+	 */
+	inline unsigned int assignedFlowCount() const { return _assignedFlowCount; }
+
 	/**
 	 * @return Whether this path is valid as reported by the bonding layer. The bonding layer
 	 * actually checks with Phy to see if the interface is still up
@@ -374,6 +381,7 @@ private:
 	volatile float _latencyVariance;
 	volatile float _packetLossRatio;
 	volatile float _packetErrorRatio;
+	volatile uint16_t _assignedFlowCount;
 	volatile bool _valid;
 	volatile bool _eligible;
 	volatile bool _bonded;

+ 16 - 7
node/Peer.cpp

@@ -28,7 +28,7 @@ namespace ZeroTier {
 
 static unsigned char s_freeRandomByteCounter = 0;
 
-Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Identity &peerIdentity) 
+Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Identity &peerIdentity)
 	: RR(renv)
 	, _lastReceive(0)
 	, _lastNontrivialReceive(0)
@@ -487,20 +487,29 @@ void Peer::tryMemorizedPath(void *tPtr,int64_t now)
 void Peer::performMultipathStateCheck(void *tPtr, int64_t now)
 {
 	Mutex::Lock _l(_bond_m);
-	if (_bond) {
-		// Once enabled the Bond object persists, no need to update state
-		return;
-	}
 	/**
 	 * Check for conditions required for multipath bonding and create a bond
 	 * if allowed.
 	 */
 	int numAlivePaths = 0;
+	bool atLeastOneNonExpired = false;
 	for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
-		if (_paths[i].p && _paths[i].p->alive(now)) {
-			numAlivePaths++;
+		if (_paths[i].p) {
+			if(_paths[i].p->alive(now)) {
+				numAlivePaths++;
+			}
+			if ((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION) {
+				atLeastOneNonExpired = true;
+			}
 		}
 	}
+	if (_bond) {
+		if (numAlivePaths == 0 && !atLeastOneNonExpired) {
+			_bond = SharedPtr<Bond>();
+			RR->bc->destroyBond(_id.address().toInt());
+		}
+		return;
+	}
 	_localMultipathSupported = ((numAlivePaths >= 1) && (RR->bc->inUse()) && (ZT_PROTO_VERSION > 9));
 	if (_localMultipathSupported && !_bond) {
 		if (RR->bc) {

+ 80 - 1
node/Peer.hpp

@@ -56,7 +56,6 @@ private:
 public:
 	~Peer() {
 		Utils::burn(_key,sizeof(_key));
-		RR->bc->destroyBond(_id.address().toInt());
 	}
 
 	/**
@@ -434,6 +433,64 @@ public:
 		return false;
 	}
 
+	/**
+	 * See definition in Bond
+	 */
+	inline bool rateGateQoS(int64_t now, SharedPtr<Path>& path)
+	{
+		Mutex::Lock _l(_bond_m);
+		if(_bond) {
+			return _bond->rateGateQoS(now, path);
+		}
+		return false; // Default behavior. If there is no bond, we drop these
+	}
+
+	/**
+	 * See definition in Bond
+	 */
+	void receivedQoS(const SharedPtr<Path>& path, int64_t now, int count, uint64_t* rx_id, uint16_t* rx_ts)
+	{
+		Mutex::Lock _l(_bond_m);
+		if(_bond) {
+			_bond->receivedQoS(path, now, count, rx_id, rx_ts);
+		}
+	}
+
+	/**
+	 * See definition in Bond
+	 */
+	void processIncomingPathNegotiationRequest(uint64_t now, SharedPtr<Path>& path, int16_t remoteUtility)
+	{
+		Mutex::Lock _l(_bond_m);
+		if(_bond) {
+			_bond->processIncomingPathNegotiationRequest(now, path, remoteUtility);
+		}
+	}
+
+	/**
+	 * See definition in Bond
+	 */
+	inline bool rateGatePathNegotiation(int64_t now, SharedPtr<Path>& path)
+	{
+		Mutex::Lock _l(_bond_m);
+		if(_bond) {
+			return _bond->rateGatePathNegotiation(now, path);
+		}
+		return false; // Default behavior. If there is no bond, we drop these
+	}
+
+	/**
+	 * See definition in Bond
+	 */
+	bool flowHashingSupported()
+	{
+		Mutex::Lock _l(_bond_m);
+		if(_bond) {
+			return _bond->flowHashingSupported();
+		}
+		return false;
+	}
+
 	/**
 	 * Serialize a peer for storage in local cache
 	 *
@@ -533,6 +590,28 @@ public:
 		return ZT_BOND_POLICY_NONE;
 	}
 
+	/**
+	 * @return the number of links in this bond which are considered alive
+	 */
+	inline uint8_t getNumAliveLinks() {
+		Mutex::Lock _l(_paths_m);
+		if (_bond) {
+			return _bond->getNumAliveLinks();
+		}
+		return 0;
+	}
+
+	/**
+	 * @return the number of links in this bond
+	 */
+	inline uint8_t getNumTotalLinks() {
+		Mutex::Lock _l(_paths_m);
+		if (_bond) {
+			return _bond->getNumTotalLinks();
+		}
+		return 0;
+	}
+
 	//inline const AES *aesKeysIfSupported() const
 	//{ return (const AES *)0; }
 

+ 29 - 3
one.cpp

@@ -171,7 +171,7 @@ static int cli(int argc,char **argv)
 #endif
 {
 	unsigned int port = 0;
-	std::string homeDir,command,arg1,arg2,authToken;
+	std::string homeDir,command,arg1,arg2,arg3,arg4,authToken;
 	std::string ip("127.0.0.1");
 	bool json = false;
 	for(int i=1;i<argc;++i) {
@@ -569,9 +569,35 @@ static int cli(int argc,char **argv)
 				return 1;
 			}
 		}
+		else if (arg1 == "setmtu") { /* zerotier-cli bond setmtu <mtu> <iface> <ip> */
+			requestHeaders["Content-Type"] = "application/json";
+			requestHeaders["Content-Length"] = "2";
+			if (argc == 8) {
+				arg2 = argv[5];
+				arg3 = argv[6];
+				arg4 = argv[7];
+			}
+			unsigned int scode = Http::POST(
+				1024 * 1024 * 16,
+				60000,
+				(const struct sockaddr *)&addr,
+				(std::string("/bond/") + arg1 + "/" + arg2 + "/" + arg3 + "/" + arg4).c_str(),
+				requestHeaders,
+				"{}",
+				2,
+				responseHeaders,
+				responseBody);
+			if (scode == 200) {
+				printf("200 setmtu OK" ZT_EOL_S);
+				return 0;
+			} else {
+				printf("no link match found, new MTU was not applied" ZT_EOL_S);
+				return 1;
+			}
+			return 0;
+		}
 		else if (arg1.length() == 10) {
 			if (arg2 == "rotate") { /* zerotier-cli bond <peerId> rotate */
-				fprintf(stderr, "zerotier-cli bond <peerId> rotate\n");
 				requestHeaders["Content-Type"] = "application/json";
 				requestHeaders["Content-Length"] = "2";
 				unsigned int scode = Http::POST(
@@ -588,7 +614,7 @@ static int cli(int argc,char **argv)
 					if (json) {
 						printf("%s",cliFixJsonCRs(responseBody).c_str());
 					} else {
-						printf("200 bond OK" ZT_EOL_S);
+						printf("200 rotate OK" ZT_EOL_S);
 					}
 					return 0;
 				} else {

+ 14 - 3
service/OneService.cpp

@@ -644,6 +644,7 @@ static void _peerToJson(nlohmann::json &pj,const ZT_Peer *peer, SharedPtr<Bond>
 			j["latencyVariance"] = peer->paths[i].latencyVariance;
 			j["packetLossRatio"] = peer->paths[i].packetLossRatio;
 			j["packetErrorRatio"] = peer->paths[i].packetErrorRatio;
+			j["assignedFlowCount"] = peer->paths[i].assignedFlowCount;
 			j["lastInAge"] = (now - lastReceive);
 			j["lastOutAge"] = (now - lastSend);
 			j["bonded"] = peer->paths[i].bonded;
@@ -1659,11 +1660,8 @@ public:
 				res.status = 400;
 				return;
 			}
-
 			auto bondID = req.matches[1];
 			uint64_t id = Utils::hexStrToU64(bondID.str().c_str());
-
-			exit(0);
 			SharedPtr<Bond> bond = _node->bondController()->getBondByPeerId(id);
 			if (bond) {
 				if (bond->abForciblyRotateLink()) {
@@ -1680,6 +1678,19 @@ public:
 		_controlPlane.Post("/bond/rotate/([0-9a-fA-F]{10})", bondRotate);
 		_controlPlane.Put("/bond/rotate/([0-9a-fA-F]{10})", bondRotate);
 
+		auto setMtu = [&, setContent](const httplib::Request &req, httplib::Response &res) {
+			if (!_node->bondController()->inUse()) {
+				setContent(req, res, "");
+				res.status = 400;
+				return;
+			}
+			uint16_t mtu = atoi(req.matches[1].str().c_str());
+			res.status = _node->bondController()->setAllMtuByTuple(mtu, req.matches[2].str().c_str(), req.matches[3].str().c_str()) ? 200 : 400;
+			setContent(req, res, "{}");
+		};
+		_controlPlane.Post("/bond/setmtu/([0-9]{3,5})/([a-zA-Z0-9_]{1,16})/([0-9a-fA-F\\.\\:]{1,39})", setMtu);
+		_controlPlane.Put("/bond/setmtu/([0-9]{3,5})/([a-zA-Z0-9_]{1,16})/([0-9a-fA-F\\.\\:]{1,39})", setMtu);
+
 		_controlPlane.Get("/config", [&, setContent](const httplib::Request &req, httplib::Response &res) {
 			std::string config;
 			{