Browse Source

Fix blocking socket issues in new socket I/O code.

Adam Ierymenko 11 years ago
parent
commit
6f5a4d7e29
4 changed files with 50 additions and 19 deletions
  1. 1 1
      node/Network.cpp
  2. 9 0
      node/Node.cpp
  3. 37 18
      node/SocketManager.cpp
  4. 3 0
      node/Switch.cpp

+ 1 - 1
node/Network.cpp

@@ -179,7 +179,7 @@ void Network::addMembershipCertificate(const CertificateOfMembership &cert)
 
 
 	CertificateOfMembership &old = _membershipCertificates[cert.issuedTo()];
 	CertificateOfMembership &old = _membershipCertificates[cert.issuedTo()];
 	if (cert.timestamp() >= old.timestamp()) {
 	if (cert.timestamp() >= old.timestamp()) {
-		TRACE("got new certificate for %s on network %.16llx",cert.issuedTo().toString().c_str(),cert.networkId());
+		//TRACE("got new certificate for %s on network %.16llx",cert.issuedTo().toString().c_str(),cert.networkId());
 		old = cert;
 		old = cert;
 	}
 	}
 }
 }

+ 9 - 0
node/Node.cpp

@@ -230,15 +230,24 @@ struct _NodeImpl
 
 
 #ifndef __WINDOWS__
 #ifndef __WINDOWS__
 		delete renv.netconfService;
 		delete renv.netconfService;
+		TRACE("shutdown: delete netconfService");
 #endif
 #endif
 		delete renv.updater;
 		delete renv.updater;
+		TRACE("shutdown: delete updater");
 		delete renv.nc;
 		delete renv.nc;
+		TRACE("shutdown: delete nc");
 		delete renv.sysEnv;
 		delete renv.sysEnv;
+		TRACE("shutdown: delete sysEnv");
 		delete renv.topology;
 		delete renv.topology;
+		TRACE("shutdown: delete topology");
 		delete renv.sm;
 		delete renv.sm;
+		TRACE("shutdown: delete sm");
 		delete renv.sw;
 		delete renv.sw;
+		TRACE("shutdown: delete sw");
 		delete renv.mc;
 		delete renv.mc;
+		TRACE("shutdown: delete mc");
 		delete renv.prng;
 		delete renv.prng;
+		TRACE("shutdown: delete prng");
 		delete renv.log;
 		delete renv.log;
 
 
 		return reasonForTermination;
 		return reasonForTermination;

+ 37 - 18
node/SocketManager.cpp

@@ -45,7 +45,7 @@
 
 
 // Allow us to use the same value on Windows and *nix
 // Allow us to use the same value on Windows and *nix
 #ifndef INVALID_SOCKET
 #ifndef INVALID_SOCKET
-#define INVALID_SOCKET 0
+#define INVALID_SOCKET (-1)
 #endif
 #endif
 
 
 namespace ZeroTier {
 namespace ZeroTier {
@@ -107,6 +107,7 @@ SocketManager::SocketManager(
 		_whackReceivePipe = tmpfds[0];
 		_whackReceivePipe = tmpfds[0];
 	}
 	}
 #endif
 #endif
+	fcntl(_whackReceivePipe,F_SETFL,O_NONBLOCK);
 	FD_SET(_whackReceivePipe,&_readfds);
 	FD_SET(_whackReceivePipe,&_readfds);
 
 
 	if (localTcpPort > 0) {
 	if (localTcpPort > 0) {
@@ -142,6 +143,7 @@ SocketManager::SocketManager(
 				f = 1; ::setsockopt(_tcpV6ListenSocket,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f));
 				f = 1; ::setsockopt(_tcpV6ListenSocket,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f));
 			}
 			}
 #endif
 #endif
+			fcntl(_tcpV6ListenSocket,F_SETFL,O_NONBLOCK);
 
 
 			struct sockaddr_in6 sin6;
 			struct sockaddr_in6 sin6;
 			memset(&sin6,0,sizeof(sin6));
 			memset(&sin6,0,sizeof(sin6));
@@ -184,6 +186,7 @@ SocketManager::SocketManager(
 				int f = 1; ::setsockopt(_tcpV4ListenSocket,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f));
 				int f = 1; ::setsockopt(_tcpV4ListenSocket,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f));
 			}
 			}
 #endif
 #endif
+			fcntl(_tcpV4ListenSocket,F_SETFL,O_NONBLOCK);
 
 
 			struct sockaddr_in sin4;
 			struct sockaddr_in sin4;
 			memset(&sin4,0,sizeof(sin4));
 			memset(&sin4,0,sizeof(sin4));
@@ -262,8 +265,8 @@ SocketManager::SocketManager(
 				throw std::runtime_error("unable to bind to port");
 				throw std::runtime_error("unable to bind to port");
 			}
 			}
 
 
-			FD_SET(s,&_readfds);
 			_udpV6Socket = SharedPtr<Socket>(new UdpSocket(Socket::ZT_SOCKET_TYPE_UDP_V6,s));
 			_udpV6Socket = SharedPtr<Socket>(new UdpSocket(Socket::ZT_SOCKET_TYPE_UDP_V6,s));
+			FD_SET(s,&_readfds);
 		}
 		}
 
 
 		{ // bind UDP IPv4
 		{ // bind UDP IPv4
@@ -312,8 +315,8 @@ SocketManager::SocketManager(
 				throw std::runtime_error("unable to bind to port");
 				throw std::runtime_error("unable to bind to port");
 			}
 			}
 
 
-			FD_SET(s,&_readfds);
 			_udpV4Socket = SharedPtr<Socket>(new UdpSocket(Socket::ZT_SOCKET_TYPE_UDP_V4,s));
 			_udpV4Socket = SharedPtr<Socket>(new UdpSocket(Socket::ZT_SOCKET_TYPE_UDP_V4,s));
+			FD_SET(s,&_readfds);
 		}
 		}
 	}
 	}
 
 
@@ -375,11 +378,11 @@ void SocketManager::poll(unsigned long timeout)
 	select(_nfds + 1,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0);
 	select(_nfds + 1,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0);
 
 
 	if (FD_ISSET(_whackReceivePipe,&rfds)) {
 	if (FD_ISSET(_whackReceivePipe,&rfds)) {
-		char tmp[32];
+		char tmp;
 #ifdef __WINDOWS__
 #ifdef __WINDOWS__
-		::recv(_whackReceivePipe,tmp,sizeof(tmp),0);
+		::recv(_whackReceivePipe,&tmp,1,0);
 #else
 #else
-		::read(_whackReceivePipe,tmp,sizeof(tmp));
+		::read(_whackReceivePipe,&tmp,1);
 #endif
 #endif
 	}
 	}
 
 
@@ -394,12 +397,20 @@ void SocketManager::poll(unsigned long timeout)
 #endif
 #endif
 			InetAddress fromia((const struct sockaddr *)&from);
 			InetAddress fromia((const struct sockaddr *)&from);
 			Mutex::Lock _l2(_tcpSockets_m);
 			Mutex::Lock _l2(_tcpSockets_m);
-			_tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia));
-			_fdSetLock.lock();
-			FD_SET(sockfd,&_readfds);
-			_fdSetLock.unlock();
-			if (sockfd > _nfds)
-				_nfds = sockfd;
+			try {
+				_tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia));
+
+				fcntl(sockfd,F_SETFL,O_NONBLOCK);
+
+				_fdSetLock.lock();
+				FD_SET(sockfd,&_readfds);
+				_fdSetLock.unlock();
+
+				if (sockfd > _nfds)
+					_nfds = sockfd;
+			} catch ( ... ) {
+				::close(sockfd);
+			}
 		}
 		}
 	}
 	}
 	if ((_tcpV6ListenSocket != INVALID_SOCKET)&&(FD_ISSET(_tcpV6ListenSocket,&rfds))) {
 	if ((_tcpV6ListenSocket != INVALID_SOCKET)&&(FD_ISSET(_tcpV6ListenSocket,&rfds))) {
@@ -413,12 +424,20 @@ void SocketManager::poll(unsigned long timeout)
 #endif
 #endif
 			InetAddress fromia((const struct sockaddr *)&from);
 			InetAddress fromia((const struct sockaddr *)&from);
 			Mutex::Lock _l2(_tcpSockets_m);
 			Mutex::Lock _l2(_tcpSockets_m);
-			_tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia));
-			_fdSetLock.lock();
-			FD_SET(sockfd,&_readfds);
-			_fdSetLock.unlock();
-			if (sockfd > _nfds)
-				_nfds = sockfd;
+			try {
+				_tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia));
+
+				fcntl(sockfd,F_SETFL,O_NONBLOCK);
+
+				_fdSetLock.lock();
+				FD_SET(sockfd,&_readfds);
+				_fdSetLock.unlock();
+
+				if (sockfd > _nfds)
+					_nfds = sockfd;
+			} catch ( ... ) {
+				::close(sockfd);
+			}
 		}
 		}
 	}
 	}
 
 

+ 3 - 0
node/Switch.cpp

@@ -155,6 +155,9 @@ void Switch::onLocalEthernet(const SharedPtr<Network> &network,const MAC &from,c
 			outp.append((uint16_t)sig.size());
 			outp.append((uint16_t)sig.size());
 			outp.append(sig.data,(unsigned int)sig.size());
 			outp.append(sig.data,(unsigned int)sig.size());
 
 
+			// FIXME: now we send the netconf cert with every single multicast,
+			// which pretty much ensures everyone has it ahead of time but adds
+			// some redundant payload. Maybe think abouut this in the future.
 			if (nconf->com())
 			if (nconf->com())
 				nconf->com().serialize(outp);
 				nconf->com().serialize(outp);