Browse Source

More TCP-related fixes and tweaks to ping timing, resynchronize, and startup.

Adam Ierymenko 11 years ago
parent
commit
c231510f8b
5 changed files with 60 additions and 43 deletions
  1. 18 15
      node/Node.cpp
  2. 1 1
      node/NodeConfig.cpp
  3. 2 2
      node/RuntimeEnvironment.hpp
  4. 37 23
      node/SocketManager.cpp
  5. 2 2
      node/TcpSocket.cpp

+ 18 - 15
node/Node.cpp

@@ -537,10 +537,10 @@ Node::ReasonForTermination Node::run()
 		uint64_t lastClean = Utils::now(); // don't need to do this immediately
 		uint64_t lastClean = Utils::now(); // don't need to do this immediately
 		uint64_t lastNetworkFingerprintCheck = 0;
 		uint64_t lastNetworkFingerprintCheck = 0;
 		uint64_t lastMulticastCheck = 0;
 		uint64_t lastMulticastCheck = 0;
+		long lastDelayDelta = 0;
 
 
 		uint64_t networkConfigurationFingerprint = _r->sysEnv->getNetworkConfigurationFingerprint(_r->nc->networkTapDeviceNames());
 		uint64_t networkConfigurationFingerprint = _r->sysEnv->getNetworkConfigurationFingerprint(_r->nc->networkTapDeviceNames());
-		_r->timeOfLastNetworkEnvironmentChange = Utils::now();
-		long lastDelayDelta = 0;
+		_r->timeOfLastResynchronize = Utils::now();
 
 
 		while (impl->reasonForTermination == NODE_RUNNING) {
 		while (impl->reasonForTermination == NODE_RUNNING) {
 			if (Utils::fileExists(shutdownIfUnreadablePath.c_str(),false)) {
 			if (Utils::fileExists(shutdownIfUnreadablePath.c_str(),false)) {
@@ -551,11 +551,13 @@ Node::ReasonForTermination Node::run()
 			}
 			}
 
 
 			uint64_t now = Utils::now();
 			uint64_t now = Utils::now();
+
+			// Did the user send SIGHUP or otherwise order network resync? (mostly for debugging)
 			bool resynchronize = impl->resynchronize;
 			bool resynchronize = impl->resynchronize;
+			impl->resynchronize = false;
 			if (resynchronize) {
 			if (resynchronize) {
 				LOG("manual resynchronize ordered, resyncing with network");
 				LOG("manual resynchronize ordered, resyncing with network");
 			}
 			}
-			impl->resynchronize = false;
 
 
 			// If it looks like the computer slept and woke, resynchronize.
 			// If it looks like the computer slept and woke, resynchronize.
 			if (lastDelayDelta >= ZT_SLEEP_WAKE_DETECTION_THRESHOLD) {
 			if (lastDelayDelta >= ZT_SLEEP_WAKE_DETECTION_THRESHOLD) {
@@ -571,15 +573,17 @@ Node::ReasonForTermination Node::run()
 				if (fp != networkConfigurationFingerprint) {
 				if (fp != networkConfigurationFingerprint) {
 					LOG("netconf fingerprint change: %.16llx != %.16llx, resyncing with network",networkConfigurationFingerprint,fp);
 					LOG("netconf fingerprint change: %.16llx != %.16llx, resyncing with network",networkConfigurationFingerprint,fp);
 					networkConfigurationFingerprint = fp;
 					networkConfigurationFingerprint = fp;
-					_r->timeOfLastNetworkEnvironmentChange = now;
 					resynchronize = true;
 					resynchronize = true;
 				}
 				}
 			}
 			}
 
 
-			// Ping supernodes separately for two reasons: (1) supernodes only ping each
-			// other, and (2) we still want to ping them first on resynchronize. Also ping
-			// more aggressively if nothing seems to be happening at all.
-			if ((resynchronize)||((now - lastSupernodePing) >= ZT_PEER_DIRECT_PING_DELAY)||((now - _r->timeOfLastPacketReceived) >= ZT_PING_UNANSWERED_AFTER)) {
+			if (resynchronize)
+				_r->timeOfLastResynchronize = now;
+
+			/* Ping supernodes separately, and do so more aggressively if we haven't
+			 * heard anything from anyone since our last resynchronize / startup. */
+			if ( ((now - lastSupernodePing) >= ZT_PEER_DIRECT_PING_DELAY) ||
+			     ((_r->timeOfLastResynchronize > _r->timeOfLastPacketReceived) && ((now - lastSupernodePing) >= ZT_PING_UNANSWERED_AFTER)) ) {
 				lastSupernodePing = now;
 				lastSupernodePing = now;
 				std::vector< SharedPtr<Peer> > sns(_r->topology->supernodePeers());
 				std::vector< SharedPtr<Peer> > sns(_r->topology->supernodePeers());
 				TRACE("pinging %d supernodes",(int)sns.size());
 				TRACE("pinging %d supernodes",(int)sns.size());
@@ -595,8 +599,8 @@ Node::ReasonForTermination Node::run()
 				_r->topology->eachPeer(Topology::ResetActivePeers(_r,now));
 				_r->topology->eachPeer(Topology::ResetActivePeers(_r,now));
 				_r->sm->closeTcpSockets();
 				_r->sm->closeTcpSockets();
 			} else {
 			} else {
-				// Periodically check for changes in our local multicast subscriptions
-				// and broadcast those changes to directly connected peers.
+				/* Periodically check for changes in our local multicast subscriptions
+				 * and broadcast those changes to directly connected peers. */
 				if ((now - lastMulticastCheck) >= ZT_MULTICAST_LOCAL_POLL_PERIOD) {
 				if ((now - lastMulticastCheck) >= ZT_MULTICAST_LOCAL_POLL_PERIOD) {
 					lastMulticastCheck = now;
 					lastMulticastCheck = now;
 					try {
 					try {
@@ -615,8 +619,8 @@ Node::ReasonForTermination Node::run()
 					}
 					}
 				}
 				}
 
 
-				// Periodically ping all our non-stale direct peers unless we're a supernode.
-				// Supernodes only ping each other (which is done above).
+				/* Periodically ping all our non-stale direct peers unless we're a supernode.
+				 * Supernodes only ping each other (which is done above). */
 				if (!_r->topology->amSupernode()) {
 				if (!_r->topology->amSupernode()) {
 					if ((now - lastPingCheck) >= ZT_PING_CHECK_DELAY) {
 					if ((now - lastPingCheck) >= ZT_PING_CHECK_DELAY) {
 						lastPingCheck = now;
 						lastPingCheck = now;
@@ -632,7 +636,7 @@ Node::ReasonForTermination Node::run()
 				}
 				}
 			}
 			}
 
 
-			// Periodically or on resynchronize update network configurations.
+			// Update network configurations when needed.
 			if ((resynchronize)||((now - lastNetworkAutoconfCheck) >= ZT_NETWORK_AUTOCONF_CHECK_DELAY)) {
 			if ((resynchronize)||((now - lastNetworkAutoconfCheck) >= ZT_NETWORK_AUTOCONF_CHECK_DELAY)) {
 				lastNetworkAutoconfCheck = now;
 				lastNetworkAutoconfCheck = now;
 				std::vector< SharedPtr<Network> > nets(_r->nc->networks());
 				std::vector< SharedPtr<Network> > nets(_r->nc->networks());
@@ -642,8 +646,7 @@ Node::ReasonForTermination Node::run()
 				}
 				}
 			}
 			}
 
 
-			// Do periodic cleanup, flushes of stuff to disk, software update
-			// checks, etc.
+			// Do periodic tasks in submodules.
 			if ((now - lastClean) >= ZT_DB_CLEAN_PERIOD) {
 			if ((now - lastClean) >= ZT_DB_CLEAN_PERIOD) {
 				lastClean = now;
 				lastClean = now;
 				_r->mc->clean();
 				_r->mc->clean();

+ 1 - 1
node/NodeConfig.cpp

@@ -210,7 +210,7 @@ void NodeConfig::_doCommand(IpcConnection *ipcc,const char *commandLine)
 			// network environment changed and also less than ZT_PEER_LINK_ACTIVITY_TIMEOUT ago.
 			// network environment changed and also less than ZT_PEER_LINK_ACTIVITY_TIMEOUT ago.
 			bool isOnline = false;
 			bool isOnline = false;
 			uint64_t now = Utils::now();
 			uint64_t now = Utils::now();
-			uint64_t since = _r->timeOfLastNetworkEnvironmentChange;
+			uint64_t since = _r->timeOfLastResynchronize;
 			std::vector< SharedPtr<Peer> > snp(_r->topology->supernodePeers());
 			std::vector< SharedPtr<Peer> > snp(_r->topology->supernodePeers());
 			for(std::vector< SharedPtr<Peer> >::const_iterator sn(snp.begin());sn!=snp.end();++sn) {
 			for(std::vector< SharedPtr<Peer> >::const_iterator sn(snp.begin());sn!=snp.end();++sn) {
 				uint64_t lastRec = (*sn)->lastDirectReceive();
 				uint64_t lastRec = (*sn)->lastDirectReceive();

+ 2 - 2
node/RuntimeEnvironment.hpp

@@ -64,7 +64,7 @@ class RuntimeEnvironment
 public:
 public:
 	RuntimeEnvironment() :
 	RuntimeEnvironment() :
 		shutdownInProgress(false),
 		shutdownInProgress(false),
-		timeOfLastNetworkEnvironmentChange(0),
+		timeOfLastResynchronize(0),
 		timeOfLastPacketReceived(0),
 		timeOfLastPacketReceived(0),
 		log((Logger *)0),
 		log((Logger *)0),
 		prng((CMWC4096 *)0),
 		prng((CMWC4096 *)0),
@@ -91,7 +91,7 @@ public:
 	volatile bool shutdownInProgress;
 	volatile bool shutdownInProgress;
 
 
 	// Time network environment (e.g. fingerprint) last changed -- used to determine online-ness
 	// Time network environment (e.g. fingerprint) last changed -- used to determine online-ness
-	volatile uint64_t timeOfLastNetworkEnvironmentChange;
+	volatile uint64_t timeOfLastResynchronize;
 
 
 	// Time last packet was received -- from anywhere. This is updated in Peer::receive()
 	// Time last packet was received -- from anywhere. This is updated in Peer::receive()
 	// via an ugly const_cast<>.
 	// via an ugly const_cast<>.

+ 37 - 23
node/SocketManager.cpp

@@ -42,8 +42,13 @@
 #include <sys/socket.h>
 #include <sys/socket.h>
 #include <arpa/inet.h>
 #include <arpa/inet.h>
 #include <signal.h>
 #include <signal.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
 #endif
 #endif
 
 
+// Uncomment to turn off TCP Nagle
+//#define ZT_TCP_NODELAY
+
 // Allow us to use the same value on Windows and *nix
 // Allow us to use the same value on Windows and *nix
 #ifndef INVALID_SOCKET
 #ifndef INVALID_SOCKET
 #define INVALID_SOCKET (-1)
 #define INVALID_SOCKET (-1)
@@ -58,8 +63,8 @@
 namespace ZeroTier {
 namespace ZeroTier {
 
 
 #ifdef __WINDOWS__
 #ifdef __WINDOWS__
-// hack from StackOverflow, behaves a bit like pipe() on *nix systems
-static inline void __winpipe(SOCKET fds[2])
+// hack copied from StackOverflow, behaves a bit like pipe() on *nix systems
+static inline void winPipeHack(SOCKET fds[2])
 {
 {
 	struct sockaddr_in inaddr;
 	struct sockaddr_in inaddr;
 	struct sockaddr addr;
 	struct sockaddr addr;
@@ -98,10 +103,11 @@ SocketManager::SocketManager(
 	FD_ZERO(&_readfds);
 	FD_ZERO(&_readfds);
 	FD_ZERO(&_writefds);
 	FD_ZERO(&_writefds);
 
 
+	// Create a pipe or socket pair that can be used to interrupt select()
 #ifdef __WINDOWS__
 #ifdef __WINDOWS__
 	{
 	{
 		SOCKET tmps[2] = { INVALID_SOCKET,INVALID_SOCKET };
 		SOCKET tmps[2] = { INVALID_SOCKET,INVALID_SOCKET };
-		__winpipe(tmps);
+		winPipeHack(tmps);
 		_whackSendPipe = tmps[0];
 		_whackSendPipe = tmps[0];
 		_whackReceivePipe = tmps[1];
 		_whackReceivePipe = tmps[1];
 		u_long iMode=1;
 		u_long iMode=1;
@@ -129,15 +135,12 @@ SocketManager::SocketManager(
 			_tcpV6ListenSocket = ::socket(AF_INET6,SOCK_STREAM,0);
 			_tcpV6ListenSocket = ::socket(AF_INET6,SOCK_STREAM,0);
 #ifdef __WINDOWS__
 #ifdef __WINDOWS__
 			if (_tcpV6ListenSocket == INVALID_SOCKET) {
 			if (_tcpV6ListenSocket == INVALID_SOCKET) {
-				_closeSockets();
-				throw std::runtime_error("unable to create IPv6 SOCK_STREAM socket");
-			}
 #else
 #else
 			if (_tcpV6ListenSocket <= 0) {
 			if (_tcpV6ListenSocket <= 0) {
+#endif
 				_closeSockets();
 				_closeSockets();
 				throw std::runtime_error("unable to create IPv6 SOCK_STREAM socket");
 				throw std::runtime_error("unable to create IPv6 SOCK_STREAM socket");
 			}
 			}
-#endif
 
 
 #ifdef __WINDOWS__
 #ifdef __WINDOWS__
 			{
 			{
@@ -178,15 +181,12 @@ SocketManager::SocketManager(
 			_tcpV4ListenSocket = ::socket(AF_INET,SOCK_STREAM,0);
 			_tcpV4ListenSocket = ::socket(AF_INET,SOCK_STREAM,0);
 #ifdef __WINDOWS__
 #ifdef __WINDOWS__
 			if (_tcpV4ListenSocket == INVALID_SOCKET) {
 			if (_tcpV4ListenSocket == INVALID_SOCKET) {
-				_closeSockets();
-				throw std::runtime_error("unable to create IPv4 SOCK_STREAM socket");
-			}
 #else
 #else
 			if (_tcpV4ListenSocket <= 0) {
 			if (_tcpV4ListenSocket <= 0) {
+#endif
 				_closeSockets();
 				_closeSockets();
 				throw std::runtime_error("unable to create IPv4 SOCK_STREAM socket");
 				throw std::runtime_error("unable to create IPv4 SOCK_STREAM socket");
 			}
 			}
-#endif
 
 
 #ifdef __WINDOWS__
 #ifdef __WINDOWS__
 			{
 			{
@@ -368,10 +368,10 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned
 			::closesocket(s);
 			::closesocket(s);
 			return false;
 			return false;
 		}
 		}
-		{
-			u_long iMode=1;
-			ioctlsocket(s,FIONBIO,&iMode);
-		}
+		{ u_long iMode=1; ioctlsocket(s,FIONBIO,&iMode); }
+#ifdef ZT_TCP_NODELAY
+		{ BOOL f = TRUE; setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); }
+#endif
 #else
 #else
 		int s = ::socket(to.isV4() ? AF_INET : AF_INET6,SOCK_STREAM,0);
 		int s = ::socket(to.isV4() ? AF_INET : AF_INET6,SOCK_STREAM,0);
 		if (s <= 0)
 		if (s <= 0)
@@ -381,6 +381,9 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned
 			return false;
 			return false;
 		}
 		}
 		fcntl(s,F_SETFL,O_NONBLOCK);
 		fcntl(s,F_SETFL,O_NONBLOCK);
+#ifdef ZT_TCP_NODELAY
+		{ int f = 1; setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); }
+#endif
 #endif
 #endif
 
 
 		bool connecting = false;
 		bool connecting = false;
@@ -405,6 +408,7 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned
 		if (connecting)
 		if (connecting)
 			FD_SET(s,&_writefds);
 			FD_SET(s,&_writefds);
 		_fdSetLock.unlock();
 		_fdSetLock.unlock();
+		whack();
 
 
 		return true;
 		return true;
 	} else if (to.isV4()) {
 	} else if (to.isV4()) {
@@ -453,11 +457,11 @@ void SocketManager::poll(unsigned long timeout)
 	select(_nfds + 1,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0);
 	select(_nfds + 1,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0);
 
 
 	if (FD_ISSET(_whackReceivePipe,&rfds)) {
 	if (FD_ISSET(_whackReceivePipe,&rfds)) {
-		char tmp;
+		char tmp[16];
 #ifdef __WINDOWS__
 #ifdef __WINDOWS__
-		::recv(_whackReceivePipe,&tmp,1,0);
+		::recv(_whackReceivePipe,&tmp,16,0);
 #else
 #else
-		::read(_whackReceivePipe,&tmp,1);
+		::read(_whackReceivePipe,&tmp,16);
 #endif
 #endif
 	}
 	}
 
 
@@ -476,10 +480,15 @@ void SocketManager::poll(unsigned long timeout)
 				try {
 				try {
 					_tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia));
 					_tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia));
 #ifdef __WINDOWS__
 #ifdef __WINDOWS__
-					u_long iMode=1;
-					ioctlsocket(sockfd,FIONBIO,&iMode);
+					{ u_long iMode=1; ioctlsocket(sockfd,FIONBIO,&iMode); }
+#ifdef ZT_TCP_NODELAY
+					{ BOOL f = TRUE; setsockopt(sockfd,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); }
+#endif
 #else
 #else
 					fcntl(sockfd,F_SETFL,O_NONBLOCK);
 					fcntl(sockfd,F_SETFL,O_NONBLOCK);
+#ifdef ZT_TCP_NODELAY
+					{ int f = 1; setsockopt(sockfd,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); }
+#endif
 #endif
 #endif
 					_fdSetLock.lock();
 					_fdSetLock.lock();
 					FD_SET(sockfd,&_readfds);
 					FD_SET(sockfd,&_readfds);
@@ -509,10 +518,15 @@ void SocketManager::poll(unsigned long timeout)
 				try {
 				try {
 					_tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia));
 					_tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia));
 #ifdef __WINDOWS__
 #ifdef __WINDOWS__
-					u_long iMode=1;
-					ioctlsocket(sockfd,FIONBIO,&iMode);
+					{ u_long iMode=1; ioctlsocket(sockfd,FIONBIO,&iMode); }
+#ifdef ZT_TCP_NODELAY
+					{ BOOL f = TRUE; setsockopt(sockfd,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); }
+#endif
 #else
 #else
 					fcntl(sockfd,F_SETFL,O_NONBLOCK);
 					fcntl(sockfd,F_SETFL,O_NONBLOCK);
+#ifdef ZT_TCP_NODELAY
+					{ int f = 1; setsockopt(sockfd,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f)); }
+#endif
 #endif
 #endif
 					_fdSetLock.lock();
 					_fdSetLock.lock();
 					FD_SET(sockfd,&_readfds);
 					FD_SET(sockfd,&_readfds);
@@ -538,7 +552,7 @@ void SocketManager::poll(unsigned long timeout)
 	bool closedSockets = false;
 	bool closedSockets = false;
 	{ // grab copy of TCP sockets list because _tcpSockets[] might be changed in a handler
 	{ // grab copy of TCP sockets list because _tcpSockets[] might be changed in a handler
 		Mutex::Lock _l2(_tcpSockets_m);
 		Mutex::Lock _l2(_tcpSockets_m);
-		if (_tcpSockets.size()) {
+		if (!_tcpSockets.empty()) {
 			ts.reserve(_tcpSockets.size());
 			ts.reserve(_tcpSockets.size());
 			uint64_t now = Utils::now();
 			uint64_t now = Utils::now();
 			for(std::map< InetAddress,SharedPtr<Socket> >::iterator s(_tcpSockets.begin());s!=_tcpSockets.end();) {
 			for(std::map< InetAddress,SharedPtr<Socket> >::iterator s(_tcpSockets.begin());s!=_tcpSockets.end();) {

+ 2 - 2
node/TcpSocket.cpp

@@ -73,7 +73,7 @@ bool TcpSocket::send(const InetAddress &to,const void *msg,unsigned int msglen)
 
 
 	Mutex::Lock _l(_writeLock);
 	Mutex::Lock _l(_writeLock);
 
 
-	bool outputWasEnqueued = (_outptr != 0);
+	bool writeInProgress = ((_outptr != 0)||(_connecting));
 
 
 	// Ensure that _outbuf is large enough
 	// Ensure that _outbuf is large enough
 	unsigned int newptr = _outptr + 5 + msglen;
 	unsigned int newptr = _outptr + 5 + msglen;
@@ -102,7 +102,7 @@ bool TcpSocket::send(const InetAddress &to,const void *msg,unsigned int msglen)
 	for(unsigned int i=0;i<msglen;++i)
 	for(unsigned int i=0;i<msglen;++i)
 		_outbuf[_outptr++] = ((const unsigned char *)msg)[i];
 		_outbuf[_outptr++] = ((const unsigned char *)msg)[i];
 
 
-	if (!outputWasEnqueued) {
+	if (!writeInProgress) {
 		// If no output was enqueued before this, try to send() it and then
 		// If no output was enqueued before this, try to send() it and then
 		// start a queued write if any remains after that.
 		// start a queued write if any remains after that.