Selaa lähdekoodia

Safely handle disappearing bonded interfaces (See issue #1587)

Joseph Henry 3 vuotta sitten
vanhempi
commit
6c85f8c7a7
3 muutettua tiedostoa jossa 135 lisäystä ja 73 poistoa
  1. 101 60
      node/Bond.cpp
  2. 4 3
      node/Bond.hpp
  3. 30 10
      osdep/Phy.hpp

+ 101 - 60
node/Bond.cpp

@@ -40,6 +40,9 @@ std::map<std::string, std::map<std::string, SharedPtr<Link> > > Bond::_interface
 
 bool Bond::linkAllowed(std::string& policyAlias, SharedPtr<Link> link)
 {
+	if (! link) {
+		return false;
+	}
 	bool foundInDefinitions = false;
 	if (_linkDefinitions.count(policyAlias)) {
 		auto it = _linkDefinitions[policyAlias].begin();
@@ -152,7 +155,7 @@ void Bond::destroyBond(uint64_t peerId)
 	_bonds.erase(peerId);
 }
 
-SharedPtr<Link> Bond::getLinkBySocket(const std::string& policyAlias, uint64_t localSocket)
+SharedPtr<Link> Bond::getLinkBySocket(const std::string& policyAlias, uint64_t localSocket, bool createIfNeeded = false)
 {
 	Mutex::Lock _l(_links_m);
 	char ifname[64] = { 0 };
@@ -160,10 +163,14 @@ SharedPtr<Link> Bond::getLinkBySocket(const std::string& policyAlias, uint64_t l
 	std::string ifnameStr(ifname);
 	auto search = _interfaceToLinkMap[policyAlias].find(ifnameStr);
 	if (search == _interfaceToLinkMap[policyAlias].end()) {
-		// If the link wasn't already known, add a new entry
-		SharedPtr<Link> s = new Link(ifnameStr, 0, 0, true, ZT_BOND_SLAVE_MODE_SPARE, "", 0.0);
-		_interfaceToLinkMap[policyAlias].insert(std::pair<std::string, SharedPtr<Link> >(ifnameStr, s));
-		return s;
+		if (createIfNeeded) {
+			SharedPtr<Link> s = new Link(ifnameStr, 0, 0, true, ZT_BOND_SLAVE_MODE_SPARE, "", 0.0);
+			_interfaceToLinkMap[policyAlias].insert(std::pair<std::string, SharedPtr<Link> >(ifnameStr, s));
+			return s;
+		}
+		else {
+			return SharedPtr<Link>();
+		}
 	}
 	else {
 		return search->second;
@@ -225,10 +232,12 @@ Bond::Bond(const RuntimeEnvironment* renv, SharedPtr<Bond> originalBond, const S
 void Bond::nominatePathToBond(const SharedPtr<Path>& path, int64_t now)
 {
 	Mutex::Lock _l(_paths_m);
+	debug("attempting to nominate link %s", pathToStr(path).c_str());
 	/**
 	 * Ensure the link is allowed and the path is not already present
 	 */
 	if (! RR->bc->linkAllowed(_policyAlias, getLink(path))) {
+		debug("link %s is not permitted according to user-specified rules", pathToStr(path).c_str());
 		return;
 	}
 	bool alreadyPresent = false;
@@ -236,6 +245,7 @@ void Bond::nominatePathToBond(const SharedPtr<Path>& path, int64_t now)
 		// Sanity check
 		if (path.ptr() == _paths[i].p.ptr()) {
 			alreadyPresent = true;
+			debug("link %s already exists", pathToStr(path).c_str());
 			break;
 		}
 	}
@@ -254,20 +264,22 @@ void Bond::nominatePathToBond(const SharedPtr<Path>& path, int64_t now)
 					// Determine if there are any other paths on this link
 					bool bFoundCommonLink = false;
 					SharedPtr<Link> commonLink = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket());
-					for (unsigned int j = 0; j < ZT_MAX_PEER_NETWORK_PATHS; ++j) {
-						if (_paths[j].p && _paths[j].p.ptr() != _paths[i].p.ptr()) {
-							if (RR->bc->getLinkBySocket(_policyAlias, _paths[j].p->localSocket()) == commonLink) {
-								bFoundCommonLink = true;
-								_paths[j].onlyPathOnLink = false;
+					if (commonLink) {
+						for (unsigned int j = 0; j < ZT_MAX_PEER_NETWORK_PATHS; ++j) {
+							if (_paths[j].p && _paths[j].p.ptr() != _paths[i].p.ptr()) {
+								if (RR->bc->getLinkBySocket(_policyAlias, _paths[j].p->localSocket()) == commonLink, true) {
+									bFoundCommonLink = true;
+									_paths[j].onlyPathOnLink = false;
+								}
 							}
 						}
+						_paths[i].ipvPref = sl->ipvPref();
+						_paths[i].mode = sl->mode();
+						_paths[i].enabled = sl->enabled();
+						_paths[i].onlyPathOnLink = ! bFoundCommonLink;
 					}
-					_paths[i].ipvPref = sl->ipvPref();
-					_paths[i].mode = sl->mode();
-					_paths[i].enabled = sl->enabled();
-					_paths[i].onlyPathOnLink = ! bFoundCommonLink;
 				}
-				log("nominate link %s", pathToStr(path).c_str());
+				log("nominated link %s", pathToStr(path).c_str());
 				break;
 			}
 		}
@@ -553,7 +565,6 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now)
 		}
 		flow->assignPath(_abPathIdx, now);
 	}
-	SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[flow->assignedPath].p->localSocket());
 	debug("assign out-flow %04x to link %s (%lu / %lu flows)", flow->id, pathToStr(_paths[flow->assignedPath].p).c_str(), _paths[flow->assignedPath].assignedFlowCount, (unsigned long)_flows.size());
 	return true;
 }
@@ -640,22 +651,24 @@ void Bond::processIncomingPathNegotiationRequest(uint64_t now, SharedPtr<Path>&
 		return;
 	}
 	SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[pathIdx].p->localSocket());
-	if (remoteUtility > _localUtility) {
-		_paths[pathIdx].p->address().toString(pathStr);
-		debug("peer suggests alternate link %s/%s, remote utility (%d) greater than local utility (%d), switching to suggested link\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility);
-		_negotiatedPathIdx = pathIdx;
-	}
-	if (remoteUtility < _localUtility) {
-		debug("peer suggests alternate link %s/%s, remote utility (%d) less than local utility (%d), not switching\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility);
-	}
-	if (remoteUtility == _localUtility) {
-		debug("peer suggests alternate link %s/%s, remote utility (%d) equal to local utility (%d)\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility);
-		if (_peer->_id.address().toInt() > RR->node->identity().address().toInt()) {
-			debug("agree with peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr);
+	if (link) {
+		if (remoteUtility > _localUtility) {
+			_paths[pathIdx].p->address().toString(pathStr);
+			debug("peer suggests alternate link %s/%s, remote utility (%d) greater than local utility (%d), switching to suggested link\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility);
 			_negotiatedPathIdx = pathIdx;
 		}
-		else {
-			debug("ignore petition from peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr);
+		if (remoteUtility < _localUtility) {
+			debug("peer suggests alternate link %s/%s, remote utility (%d) less than local utility (%d), not switching\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility);
+		}
+		if (remoteUtility == _localUtility) {
+			debug("peer suggests alternate link %s/%s, remote utility (%d) equal to local utility (%d)\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility);
+			if (_peer->_id.address().toInt() > RR->node->identity().address().toInt()) {
+				debug("agree with peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr);
+				_negotiatedPathIdx = pathIdx;
+			}
+			else {
+				debug("ignore petition from peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr);
+			}
 		}
 	}
 }
@@ -827,10 +840,17 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
 		}
 
 		/**
-		 * Remove expired links from bond
+		 * Remove expired or invalid links from bond
 		 */
+		SharedPtr<Link> link = getLink(_paths[i].p);
+		if (! link) {
+			log("link is no longer valid, removing from bond");
+			_paths[i] = NominatedPath();
+			_paths[i].p = SharedPtr<Path>();
+			continue;
+		}
 		if ((now - _paths[i].p->_lastIn) > (ZT_PEER_EXPIRED_PATH_TRIAL_PERIOD)) {
-			log("link %s has expired, removing from bond", pathToStr(_paths[i].p).c_str());
+			log("link (%s) has expired or is invalid, removing from bond", pathToStr(_paths[i].p).c_str());
 			_paths[i] = NominatedPath();
 			_paths[i].p = SharedPtr<Path>();
 			continue;
@@ -920,7 +940,9 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
 			for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
 				if (_paths[i].p) {
 					SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket());
-					linkMap[link].push_back(i);
+					if (link) {
+						linkMap[link].push_back(i);
+					}
 				}
 			}
 			// Re-form bond from link<->path map
@@ -1007,13 +1029,17 @@ void Bond::estimatePathQuality(int64_t now)
 		for (unsigned int i = 0; i < _numBondedPaths; ++i) {
 			if (_paths[i].p && _paths[i].allowed()) {
 				SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket());
-				totUserSpecifiedLinkSpeed += link->speed();
+				if (link) {
+					totUserSpecifiedLinkSpeed += link->speed();
+				}
 			}
 		}
 		for (unsigned int i = 0; i < _numBondedPaths; ++i) {
 			if (_paths[i].p && _paths[i].allowed()) {
 				SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket());
-				link->setRelativeSpeed((uint8_t)round(((float)link->speed() / (float)totUserSpecifiedLinkSpeed) * 255));
+				if (link) {
+					link->setRelativeSpeed((uint8_t)round(((float)link->speed() / (float)totUserSpecifiedLinkSpeed) * 255));
+				}
 			}
 		}
 	}
@@ -1283,21 +1309,23 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
 						continue;
 					}
 					SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket());
-					if (_paths[i].eligible && link->primary()) {
-						if (! _paths[i].preferred()) {
-							// Found path on primary link, take note in case we don't find a preferred path
-							nonPreferredPathIdx = i;
-							bFoundPrimaryLink = true;
-						}
-						if (_paths[i].preferred()) {
-							_abPathIdx = i;
-							bFoundPrimaryLink = true;
-							if (_paths[_abPathIdx].p) {
-								SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[_abPathIdx].p->localSocket());
-								if (link) {
-									log("found preferred primary link %s", pathToStr(_paths[_abPathIdx].p).c_str());
+					if (link) {
+						if (_paths[i].eligible && link->primary()) {
+							if (! _paths[i].preferred()) {
+								// Found path on primary link, take note in case we don't find a preferred path
+								nonPreferredPathIdx = i;
+								bFoundPrimaryLink = true;
+							}
+							if (_paths[i].preferred()) {
+								_abPathIdx = i;
+								bFoundPrimaryLink = true;
+								if (_paths[_abPathIdx].p) {
+									SharedPtr<Link> abLink = RR->bc->getLinkBySocket(_policyAlias, _paths[_abPathIdx].p->localSocket());
+									if (abLink) {
+										log("found preferred primary link %s", pathToStr(_paths[_abPathIdx].p).c_str());
+									}
+									break;	 // Found preferred path on primary link
 								}
-								break;	 // Found preferred path on primary link
 							}
 						}
 					}
@@ -1307,13 +1335,12 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
 					_abPathIdx = nonPreferredPathIdx;
 				}
 				if (_abPathIdx == ZT_MAX_PEER_NETWORK_PATHS) {
-					log("user-designated primary link is not yet ready");
+					log("user-designated primary link is not available");
 					// TODO: Should wait for some time (failover interval?) and then switch to spare link
 				}
 			}
 
 			else if (! userHasSpecifiedPrimaryLink()) {
-				log("user did not specify a primary link, select first available link");
 				for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
 					if (_paths[i].p && _paths[i].eligible) {
 						_abPathIdx = i;
@@ -1369,7 +1396,9 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
 				continue;
 			}
 			SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket());
-
+			if (! link) {
+				continue;
+			}
 			int failoverScoreHandicap = _paths[i].failoverScore;
 			if (_paths[i].preferred()) {
 				failoverScoreHandicap += ZT_BOND_FAILOVER_HANDICAP_PREFERRED;
@@ -1432,7 +1461,11 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
 			if (! _paths[i].eligible) {
 				failoverScoreHandicap = -10000;
 			}
-			if (getLink(_paths[i].p)->primary() && _abLinkSelectMethod != ZT_BOND_RESELECTION_POLICY_OPTIMIZE) {
+			SharedPtr<Link> link = getLink(_paths[i].p);
+			if (! link) {
+				continue;
+			}
+			if (link->primary() && _abLinkSelectMethod != ZT_BOND_RESELECTION_POLICY_OPTIMIZE) {
 				// If using "optimize" primary re-select mode, ignore user link designations
 				failoverScoreHandicap = ZT_BOND_FAILOVER_HANDICAP_PRIMARY;
 			}
@@ -1501,15 +1534,19 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
 		_lastActiveBackupPathChange = now;
 	}
 	if (_abLinkSelectMethod == ZT_BOND_RESELECTION_POLICY_ALWAYS) {
-		if (! getLink(_paths[_abPathIdx].p)->primary() && _paths[_abFailoverQueue.front()].p && getLink(_paths[_abFailoverQueue.front()].p)->primary()) {
+		SharedPtr<Link> abLink = getLink(_paths[_abPathIdx].p);
+		SharedPtr<Link> abFailoverLink = getLink(_paths[_abFailoverQueue.front()].p);
+		if (abLink && ! abLink->primary() && _paths[_abFailoverQueue.front()].p && abFailoverLink && abFailoverLink->primary()) {
 			dequeueNextActiveBackupPath(now);
 			log("switch back to available primary link %s (select mode: always)", pathToStr(_paths[_abPathIdx].p).c_str());
 		}
 	}
 	if (_abLinkSelectMethod == ZT_BOND_RESELECTION_POLICY_BETTER) {
-		if (! getLink(_paths[_abPathIdx].p)->primary()) {
+		SharedPtr<Link> abLink = getLink(_paths[_abPathIdx].p);
+		if (abLink && ! abLink->primary()) {
 			// Active backup has switched to "better" primary link according to re-select policy.
-			if (_paths[_abFailoverQueue.front()].p && getLink(_paths[_abFailoverQueue.front()].p)->primary() && (_paths[_abFailoverQueue.front()].failoverScore > _paths[_abPathIdx].failoverScore)) {
+			SharedPtr<Link> abFailoverLink = getLink(_paths[_abFailoverQueue.front()].p);
+			if (_paths[_abFailoverQueue.front()].p && abFailoverLink && abFailoverLink->primary() && (_paths[_abFailoverQueue.front()].failoverScore > _paths[_abPathIdx].failoverScore)) {
 				dequeueNextActiveBackupPath(now);
 				log("switch back to user-defined primary link %s (select mode: better)", pathToStr(_paths[_abPathIdx].p).c_str());
 			}
@@ -1709,8 +1746,12 @@ std::string Bond::pathToStr(const SharedPtr<Path>& path)
 		char pathStr[64] = { 0 };
 		char fullPathStr[384] = { 0 };
 		path->address().toString(pathStr);
-		snprintf(fullPathStr, 384, "%.16llx-%s/%s", (unsigned long long)(path->localSocket()), getLink(path)->ifname().c_str(), pathStr);
-		return std::string(fullPathStr);
+		SharedPtr<Link> link = getLink(path);
+		if (link) {
+			std::string ifnameStr = std::string(link->ifname());
+			snprintf(fullPathStr, 384, "%.16llx-%s/%s", (unsigned long long)(path->localSocket()), ifnameStr.c_str(), pathStr);
+			return std::string(fullPathStr);
+		}
 	}
 	return "";
 #else
@@ -1724,11 +1765,11 @@ void Bond::dumpPathStatus(int64_t now, int pathIdx)
 	std::string aliveOrDead = _paths[pathIdx].alive ? std::string("alive") : std::string("dead");
 	std::string eligibleOrNot = _paths[pathIdx].eligible ? std::string("eligible") : std::string("ineligible");
 	std::string bondedOrNot = _paths[pathIdx].bonded ? std::string("bonded") : std::string("unbonded");
-	log("path[%2d] --- %5s (in %7lld, out: %7lld), %10s, %8s, flows=%-6d lat=%-8.3f pdv=%-7.3f err=%-6.4f loss=%-6.4f alloc=%-3d --- (%s)",
+	log("path[%2u] --- %5s (in %7lld, out: %7lld), %10s, %8s, flows=%-6u lat=%-8.3f pdv=%-7.3f err=%-6.4f loss=%-6.4f alloc=%-3u --- (%s)",
 		pathIdx,
 		aliveOrDead.c_str(),
 		static_cast<long long int>(_paths[pathIdx].p->age(now)),
-		static_cast<long long int>(now - _paths[pathIdx].p->_lastOut),
+		static_cast<long long int>(_paths[pathIdx].p->_lastOut == 0 ? 0 : now - _paths[pathIdx].p->_lastOut),
 		eligibleOrNot.c_str(),
 		bondedOrNot.c_str(),
 		_paths[pathIdx].assignedFlowCount,

+ 4 - 3
node/Bond.hpp

@@ -458,9 +458,10 @@ class Bond {
 	 *
 	 * @param policyAlias Policy in use
 	 * @param localSocket Local source socket
+	 * @param createIfNeeded Whether a Link object is created if the name wasn't previously in the link map
 	 * @return Physical link definition
 	 */
-	static SharedPtr<Link> getLinkBySocket(const std::string& policyAlias, uint64_t localSocket);
+	static SharedPtr<Link> getLinkBySocket(const std::string& policyAlias, uint64_t localSocket, bool createIfNeeded);
 
 	/**
 	 * Gets a reference to a physical link definition given its human-readable system name.
@@ -1141,10 +1142,10 @@ class Bond {
 	 *
 	 */
 	void log(const char* fmt, ...)
+	{
 #ifdef __GNUC__
 		__attribute__((format(printf, 2, 3)))
 #endif
-	{
 #ifdef ZT_TRACE
 		time_t rawtime;
 		struct tm* timeinfo;
@@ -1173,10 +1174,10 @@ class Bond {
 	 *
 	 */
 	void debug(const char* fmt, ...)
+	{
 #ifdef __GNUC__
 		__attribute__((format(printf, 2, 3)))
 #endif
-	{
 #ifdef ZT_DEBUG
 		time_t rawtime;
 		struct tm* timeinfo;

+ 30 - 10
osdep/Phy.hpp

@@ -229,23 +229,33 @@ public:
 	 * @param s Socket object
 	 * @return Underlying OS-type (usually int or long) file descriptor associated with object
 	 */
-	static inline ZT_PHY_SOCKFD_TYPE getDescriptor(PhySocket *s) throw() { return reinterpret_cast<PhySocketImpl *>(s)->sock; }
+	static inline ZT_PHY_SOCKFD_TYPE getDescriptor(PhySocket* s) throw()
+	{
+		return reinterpret_cast<PhySocketImpl*>(s)->sock;
+	}
 
 	/**
 	 * @param s Socket object
 	 * @return Pointer to user object
 	 */
-	static inline void** getuptr(PhySocket *s) throw() { return &(reinterpret_cast<PhySocketImpl *>(s)->uptr); }
+	static inline void** getuptr(PhySocket* s) throw()
+	{
+		return &(reinterpret_cast<PhySocketImpl*>(s)->uptr);
+	}
 
 	/**
 	 * @param s Socket object
 	 * @param nameBuf Buffer to store name of interface which this Socket object is bound to
 	 * @param buflen Length of buffer to copy name into
 	 */
-	static inline void getIfName(PhySocket *s, char *nameBuf, int buflen)
+	static inline void getIfName(PhySocket* s, char* nameBuf, int buflen)
 	{
+		PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(s));
+		if (sws.type == ZT_PHY_SOCKET_CLOSED) {
+			return;
+		}
 		if (s) {
-			memcpy(nameBuf, reinterpret_cast<PhySocketImpl *>(s)->ifname, buflen);
+			memcpy(nameBuf, reinterpret_cast<PhySocketImpl*>(s)->ifname, buflen);
 		}
 	}
 
@@ -254,10 +264,14 @@ public:
 	 * @param ifname Buffer containing name of interface that this Socket object is bound to
 	 * @param len Length of name of interface
 	 */
-	static inline void setIfName(PhySocket *s, char *ifname, int len)
+	static inline void setIfName(PhySocket* s, char* ifname, int len)
 	{
+		PhySocketImpl& sws = *(reinterpret_cast<PhySocketImpl*>(s));
+		if (sws.type == ZT_PHY_SOCKET_CLOSED) {
+			return;
+		}
 		if (s) {
-			memcpy(&(reinterpret_cast<PhySocketImpl *>(s)->ifname), ifname, len);
+			memcpy(&(reinterpret_cast<PhySocketImpl*>(s)->ifname), ifname, len);
 		}
 	}
 
@@ -270,21 +284,27 @@ public:
 	inline void whack()
 	{
 #if defined(_WIN32) || defined(_WIN64)
-		::send(_whackSendSocket,(const char *)this,1,0);
+		::send(_whackSendSocket, (const char*)this, 1, 0);
 #else
-		(void)(::write(_whackSendSocket,(PhySocket *)this,1));
+		(void)(::write(_whackSendSocket, (PhySocket*)this, 1));
 #endif
 	}
 
 	/**
 	 * @return Number of open sockets
 	 */
-	inline unsigned long count() const throw() { return _socks.size(); }
+	inline unsigned long count() const throw()
+	{
+		return _socks.size();
+	}
 
 	/**
 	 * @return Maximum number of sockets allowed
 	 */
-	inline unsigned long maxCount() const throw() { return ZT_PHY_MAX_SOCKETS; }
+	inline unsigned long maxCount() const throw()
+	{
+		return ZT_PHY_MAX_SOCKETS;
+	}
 
 	/**
 	 * Wrap a raw file descriptor in a PhySocket structure