Browse Source

Bug fixes.

Adam Ierymenko 11 years ago
parent
commit
15e8c18106
3 changed files with 56 additions and 21 deletions
  1. 21 19
      node/IpcListener.cpp
  2. 13 1
      node/SocketManager.cpp
  3. 22 1
      node/SocketManager.hpp

+ 21 - 19
node/IpcListener.cpp

@@ -38,6 +38,8 @@
 #else
 #include <sys/socket.h>
 #include <sys/un.h>
+#include <sys/stat.h>
+#include <sys/types.h>
 #include <unistd.h>
 #endif
 
@@ -56,29 +58,29 @@ IpcListener::IpcListener(const char *ep,void (*commandHandler)(void *,IpcConnect
 	strncpy(unaddr.sun_path,_endpoint.c_str(),sizeof(unaddr.sun_path));
 	unaddr.sun_path[sizeof(unaddr.sun_path) - 1] = (char)0;
 
-	for(int tries=0;tries<3;++tries) {
-		_sock = socket(AF_UNIX,SOCK_STREAM,0);
-		if (_sock <= 0)
+	struct stat stattmp;
+	if (stat(_endpoint.c_str(),&stattmp)) {
+		int testSock = socket(AF_UNIX,SOCK_STREAM,0);
+		if (testSock <= 0)
 			throw std::runtime_error("unable to create socket of type AF_UNIX");
-		if (bind(_sock,(struct sockaddr *)&unaddr,sizeof(unaddr))) {
-			::close(_sock);
-			if (errno == EADDRINUSE) {
-				int testSock = socket(AF_UNIX,SOCK_STREAM,0);
-				if (testSock <= 0)
-					throw std::runtime_error("unable to create socket of type AF_UNIX");
-				if (connect(testSock,(struct sockaddr *)&unaddr,sizeof(unaddr))) {
-					// error indicates nothing is listening on other end, so unlink and try again
-					::close(testSock);
-					unlink(_endpoint.c_str());
-				} else {
-					// success means endpoint is being actively listened to by a process
-					::close(testSock);
-					throw std::runtime_error("IPC endpoint address in use");
-				}
-			} else throw std::runtime_error("IPC endpoint could not be bound");
+		if (connect(testSock,(struct sockaddr *)&unaddr,sizeof(unaddr))) {
+			// error means nothing is listening, orphaned name
+			::close(testSock);
+		} else {
+			// success means endpoint is being actively listened to by a process
+			::close(testSock);
+			throw std::runtime_error("IPC endpoint address in use");
 		}
 	}
+	::unlink(_endpoint.c_str());
 
+	_sock = socket(AF_UNIX,SOCK_STREAM,0);
+	if (_sock <= 0)
+		throw std::runtime_error("unable to create socket of type AF_UNIX");
+	if (bind(_sock,(struct sockaddr *)&unaddr,sizeof(unaddr))) {
+		::close(_sock);
+		throw std::runtime_error("IPC endpoint could not be bound");
+	}
 	if (listen(_sock,8)) {
 		::close(_sock);
 		throw std::runtime_error("listen() failed for bound AF_UNIX socket");

+ 13 - 1
node/SocketManager.cpp

@@ -316,6 +316,8 @@ SocketManager::SocketManager(
 			_udpV4Socket = SharedPtr<Socket>(new UdpSocket(Socket::ZT_SOCKET_TYPE_UDP_V4,s));
 		}
 	}
+
+	_updateNfds();
 }
 
 SocketManager::~SocketManager()
@@ -370,7 +372,7 @@ void SocketManager::poll(unsigned long timeout)
 
 	tv.tv_sec = (long)(timeout / 1000);
 	tv.tv_usec = (long)((timeout % 1000) * 1000);
-	select(_nfds,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0);
+	select(_nfds + 1,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0);
 
 	if (FD_ISSET(_whackReceivePipe,&rfds)) {
 		char tmp[32];
@@ -396,6 +398,8 @@ void SocketManager::poll(unsigned long timeout)
 			_fdSetLock.lock();
 			FD_SET(sockfd,&_readfds);
 			_fdSetLock.unlock();
+			if (sockfd > _nfds)
+				_nfds = sockfd;
 		}
 	}
 	if ((_tcpV6ListenSocket != INVALID_SOCKET)&&(FD_ISSET(_tcpV6ListenSocket,&rfds))) {
@@ -413,6 +417,8 @@ void SocketManager::poll(unsigned long timeout)
 			_fdSetLock.lock();
 			FD_SET(sockfd,&_readfds);
 			_fdSetLock.unlock();
+			if (sockfd > _nfds)
+				_nfds = sockfd;
 		}
 	}
 
@@ -421,6 +427,7 @@ void SocketManager::poll(unsigned long timeout)
 	if ((_udpV6Socket)&&(FD_ISSET(_udpV6Socket->_sock,&rfds)))
 		_udpV6Socket->notifyAvailableForRead(_udpV6Socket,this);
 
+	bool closedSockets = false;
 	{ // grab copy of TCP sockets list because _tcpSockets[] might be changed in a handler
 		Mutex::Lock _l2(_tcpSockets_m);
 		if (_tcpSockets.size()) {
@@ -436,6 +443,7 @@ void SocketManager::poll(unsigned long timeout)
 					FD_CLR(s->second->_sock,&_writefds);
 					_fdSetLock.unlock();
 					_tcpSockets.erase(s++);
+					closedSockets = true;
 				}
 			}
 		}
@@ -451,6 +459,7 @@ void SocketManager::poll(unsigned long timeout)
 				FD_CLR((*s)->_sock,&_readfds);
 				FD_CLR((*s)->_sock,&_writefds);
 				_fdSetLock.unlock();
+				closedSockets = true;
 				continue;
 			}
 		}
@@ -464,10 +473,13 @@ void SocketManager::poll(unsigned long timeout)
 				FD_CLR((*s)->_sock,&_readfds);
 				FD_CLR((*s)->_sock,&_writefds);
 				_fdSetLock.unlock();
+				closedSockets = true;
 				continue;
 			}
 		}
 	}
+	if (closedSockets)
+		_updateNfds();
 }
 
 void SocketManager::whack()

+ 22 - 1
node/SocketManager.hpp

@@ -169,6 +169,27 @@ private:
 #endif
 	}
 
+	inline void _updateNfds()
+	{
+		int nfds = _whackSendPipe;
+		if (_whackReceivePipe > nfds)
+			nfds = _whackReceivePipe;
+		if (_tcpV4ListenSocket > nfds)
+			nfds = _tcpV4ListenSocket;
+		if (_tcpV6ListenSocket > nfds)
+			nfds = _tcpV6ListenSocket;
+		if ((_udpV4Socket)&&(_udpV4Socket->_sock > nfds))
+			nfds = _udpV4Socket->_sock;
+		if ((_udpV6Socket)&&(_udpV6Socket->_sock > nfds))
+			nfds = _udpV6Socket->_sock;
+		Mutex::Lock _l(_tcpSockets_m);
+		for(std::map< InetAddress,SharedPtr<Socket> >::const_iterator s(_tcpSockets.begin());s!=_tcpSockets.end();++s) {
+			if (s->second->_sock > nfds)
+				nfds = s->second->_sock;
+		}
+		_nfds = nfds;
+	}
+
 #ifdef __WINDOWS__
 	SOCKET _whackSendPipe;
 	SOCKET _whackReceivePipe;
@@ -187,7 +208,7 @@ private:
 
 	fd_set _readfds;
 	fd_set _writefds;
-	int _nfds;
+	volatile int _nfds;
 	Mutex _fdSetLock;
 
 	std::map< InetAddress,SharedPtr<Socket> > _tcpSockets;