Browse Source

Pull out and deprecate old cluster code. New cluster code will not be merged yet.

Adam Ierymenko 8 years ago
parent
commit
dff8c02cfe
4 changed files with 58 additions and 9 deletions
  1. 6 6
      node/IncomingPacket.cpp
  2. 16 2
      node/Peer.cpp
  3. 16 1
      node/Peer.hpp
  4. 20 0
      service/OneService.cpp

+ 6 - 6
node/IncomingPacket.cpp

@@ -1199,9 +1199,9 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
 							(!( ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) == 0) && (peer->hasActivePathTo(now,a)) )) && // not already known
 							(RR->node->shouldUsePathForZeroTierTraffic(tPtr,peer->address(),_path->localSocket(),a)) ) // should use path
 					{
-						//if ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) != 0)
-						//	peer->setClusterPreferred(a);
-						if (++countPerScope[(int)a.ipScope()][0] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) {
+						if ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) != 0) {
+							peer->redirect(tPtr,_path->localSocket(),a,now);
+						} else if (++countPerScope[(int)a.ipScope()][0] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) {
 							TRACE("attempting to contact %s at pushed direct path %s",peer->address().toString().c_str(),a.toString().c_str());
 							peer->attemptToContactAt(tPtr,InetAddress(),a,now,false,0);
 						} else {
@@ -1216,9 +1216,9 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
 							(!( ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) == 0) && (peer->hasActivePathTo(now,a)) )) && // not already known
 							(RR->node->shouldUsePathForZeroTierTraffic(tPtr,peer->address(),_path->localSocket(),a)) ) // should use path
 					{
-						//if ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) != 0)
-						//	peer->setClusterPreferred(a);
-						if (++countPerScope[(int)a.ipScope()][1] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) {
+						if ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) != 0) {
+							peer->redirect(tPtr,_path->localSocket(),a,now);
+						} else if (++countPerScope[(int)a.ipScope()][1] <= ZT_PUSH_DIRECT_PATHS_MAX_PER_SCOPE_AND_FAMILY) {
 							TRACE("attempting to contact %s at pushed direct path %s",peer->address().toString().c_str(),a.toString().c_str());
 							peer->attemptToContactAt(tPtr,InetAddress(),a,now,false,0);
 						} else {

+ 16 - 2
node/Peer.cpp

@@ -170,11 +170,11 @@ void Peer::received(
 			Mutex::Lock _l(_paths_m);
 			_PeerPath *potentialNewPeerPath = (_PeerPath *)0;
 			if (path->address().ss_family == AF_INET) {
-				if ( (!_v4Path.p) || (!_v4Path.p->alive(now)) || (path->preferenceRank() >= _v4Path.p->preferenceRank()) ) {
+				if ( ( (!_v4Path.p) || (!_v4Path.p->alive(now)) || (path->preferenceRank() >= _v4Path.p->preferenceRank()) ) && ( (now - _v4Path.sticky) > ZT_PEER_PATH_EXPIRATION ) ) {
 					potentialNewPeerPath = &_v4Path;
 				}
 			} else if (path->address().ss_family == AF_INET6) {
-				if ( (!_v6Path.p) || (!_v6Path.p->alive(now)) || (path->preferenceRank() >= _v6Path.p->preferenceRank()) ) {
+				if ( ( (!_v6Path.p) || (!_v6Path.p->alive(now)) || (path->preferenceRank() >= _v6Path.p->preferenceRank()) ) && ( (now - _v6Path.sticky) > ZT_PEER_PATH_EXPIRATION ) ) {
 					potentialNewPeerPath = &_v6Path;
 				}
 			}
@@ -422,4 +422,18 @@ bool Peer::doPingAndKeepalive(void *tPtr,uint64_t now,int inetAddressFamily)
 	return false;
 }
 
+void Peer::redirect(void *tPtr,const int64_t localSocket,const InetAddress &remoteAddress,const uint64_t now)
+{
+	Mutex::Lock _l(_paths_m);
+	SharedPtr<Path> p(RR->topology->getPath(localSocket,remoteAddress));
+	attemptToContactAt(tPtr,localSocket,remoteAddress,now,true,p->nextOutgoingCounter());
+	if (remoteAddress.ss_family == AF_INET) {
+		_v4Path.p = p;
+		_v4Path.sticky = now;
+	} else if (remoteAddress.ss_family == AF_INET6) {
+		_v6Path.p = p;
+		_v6Path.sticky = now;
+	}
+}
+
 } // namespace ZeroTier

+ 16 - 1
node/Peer.hpp

@@ -195,6 +195,20 @@ public:
 	 */
 	bool doPingAndKeepalive(void *tPtr,uint64_t now,int inetAddressFamily);
 
+	/**
+	 * Specify remote path for this peer and forget others
+	 *
+	 * This overrides normal path learning and tells this peer to be found
+	 * at this address, at least within the address's family. Other address
+	 * families are not modified.
+	 *
+	 * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
+	 * @param localSocket Local socket as supplied by external code
+	 * @param remoteAddress Remote address
+	 * @param now Current time
+	 */
+	void redirect(void *tPtr,const int64_t localSocket,const InetAddress &remoteAddress,const uint64_t now);
+
 	/**
 	 * Reset paths within a given IP scope and address family
 	 *
@@ -426,8 +440,9 @@ public:
 private:
 	struct _PeerPath
 	{
-		_PeerPath() : lr(0),p() {}
+		_PeerPath() : lr(0),sticky(0),p() {}
 		uint64_t lr; // time of last valid ZeroTier packet
+		uint64_t sticky; // time last set as sticky
 		SharedPtr<Path> p;
 	};
 

+ 20 - 0
service/OneService.cpp

@@ -394,6 +394,8 @@ public:
 	Phy<OneServiceImpl *> _phy;
 	Node *_node;
 	SoftwareUpdater *_updater;
+	PhySocket *_localControlSocket4;
+	PhySocket *_localControlSocket6;
 	bool _updateAutoApply;
 	unsigned int _primaryPort;
 	volatile unsigned int _udpPortPickerCounter;
@@ -488,6 +490,8 @@ public:
 		,_phy(this,false,true)
 		,_node((Node *)0)
 		,_updater((SoftwareUpdater *)0)
+		,_localControlSocket4((PhySocket *)0)
+		,_localControlSocket6((PhySocket *)0)
 		,_updateAutoApply(false)
 		,_primaryPort(port)
 		,_udpPortPickerCounter(0)
@@ -513,6 +517,8 @@ public:
 	virtual ~OneServiceImpl()
 	{
 		_binder.closeAll(_phy);
+		_phy.close(_localControlSocket4);
+		_phy.close(_localControlSocket6);
 #ifdef ZT_USE_MINIUPNPC
 		delete _portMapper;
 #endif
@@ -652,6 +658,20 @@ public:
 				return _termReason;
 			}
 
+			// Bind local control socket
+			{
+				struct sockaddr_in lo4;
+				memset(&lo4,0,sizeof(lo4));
+				lo4.sin_family = AF_INET;
+				lo4.sin_port = Utils::hton((uint16_t)_ports[0]);
+				_localControlSocket4 = _phy.tcpListen((const struct sockaddr *)&lo4);
+				struct sockaddr_in6 lo6;
+				memset(&lo6,0,sizeof(lo6));
+				lo6.sin6_family = AF_INET6;
+				lo6.sin6_port = lo4.sin_port;
+				_localControlSocket6 = _phy.tcpListen((const struct sockaddr *)&lo6);
+			}
+
 			// Save primary port to a file so CLIs and GUIs can learn it easily
 			char portstr[64];
 			Utils::ztsnprintf(portstr,sizeof(portstr),"%u",_ports[0]);