Browse Source

WINDOWS IS SUFFERING

Adam Ierymenko 11 years ago
parent
commit
67a85221d5
7 changed files with 112 additions and 72 deletions
  1. 6 2
      main.cpp
  2. 68 42
      node/IpcConnection.cpp
  3. 6 1
      node/IpcConnection.hpp
  4. 20 21
      node/IpcListener.cpp
  5. 2 3
      node/IpcListener.hpp
  6. 3 3
      node/SocketManager.cpp
  7. 7 0
      node/Thread.hpp

+ 6 - 2
main.cpp

@@ -98,7 +98,7 @@ static void printHelp(const char *cn,FILE *out)
 	fprintf(out,"  -d                - Fork and run as daemon (Unix-ish OSes)"ZT_EOL_S);
 #endif
 	fprintf(out,"  -q                - Send a query to a running service (zerotier-cli)"ZT_EOL_S);
-	fprintf(out,"  -i                - Run idtool command (zerotier-idtool)"ZT_EOL_S);
+	fprintf(out,"  -i                - Generate and manage identities (zerotier-idtool)"ZT_EOL_S);
 #ifdef __WINDOWS__
 	fprintf(out,"  -C                - Run from command line instead of as service (Windows)"ZT_EOL_S);
 	fprintf(out,"  -I                - Install Windows service (Windows)"ZT_EOL_S);
@@ -139,7 +139,7 @@ static int main(int argc,char **argv)
 	for(int i=1;i<argc;++i) {
 		if (argv[i][0] == '-') {
 			switch(argv[i][1]) {
-				case 'i': // ignore -i since it's used to invoke this
+				case 'q': // ignore -q since it's used to invoke this
 					break;
 				case 'h':
 				default:
@@ -152,6 +152,10 @@ static int main(int argc,char **argv)
 			query.append(argv[i]);
 		}
 	}
+	if (!query.length()) {
+		printHelp(stdout,argv[0]);
+		return 1;
+	}
 
 	try {
 		volatile bool done = false;

+ 68 - 42
node/IpcConnection.cpp

@@ -47,10 +47,13 @@ IpcConnection::IpcConnection(const char *endpoint,void (*commandHandler)(void *,
 	_handler(commandHandler),
 	_arg(arg),
 #ifdef __WINDOWS__
-	_sock(INVALID_HANDLE_VALUE)
+	_sock(INVALID_HANDLE_VALUE),
+	_incoming(false),
 #else
-	_sock(0)
+	_sock(-1),
 #endif
+	_run(true),
+	_running(true)
 {
 #ifdef __WINDOWS__
 	_sock = CreateFileA(endpoint,GENERIC_READ|GENERIC_WRITE,FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE,NULL,OPEN_EXISTING,0,NULL);
@@ -74,7 +77,7 @@ IpcConnection::IpcConnection(const char *endpoint,void (*commandHandler)(void *,
 	}
 #endif
 
-	Thread::start(this);
+	_thread = Thread::start(this);
 }
 
 #ifdef __WINDOWS__
@@ -84,19 +87,26 @@ IpcConnection::IpcConnection(int s,void (*commandHandler)(void *,IpcConnection *
 #endif
 	_handler(commandHandler),
 	_arg(arg),
-	_sock(s)
+	_sock(s),
+#ifdef __WINDOWS__
+	_incoming(true),
+#endif
+	_run(true),
+	_running(true)
 {
-	Thread::start(this);
+	_thread = Thread::start(this);
 }
 
 IpcConnection::~IpcConnection()
 {
 	_writeLock.lock();
+	_run = false;
+	_writeLock.unlock();
+
 #ifdef __WINDOWS__
-	HANDLE s = _sock;
-	_sock = INVALID_HANDLE_VALUE;
-	if (s != INVALID_HANDLE_VALUE) {
-		CloseHandle(s);
+	while (_running) {
+		Thread::cancelIO(_thread);
+		Sleep(100);
 	}
 #else
 	int s = _sock;
@@ -106,7 +116,6 @@ IpcConnection::~IpcConnection()
 		::close(s);
 	}
 #endif
-	_writeLock.unlock();
 }
 
 void IpcConnection::printf(const char *format,...)
@@ -115,22 +124,20 @@ void IpcConnection::printf(const char *format,...)
 	int n;
 	char tmp[65536];
 
-	Mutex::Lock _l(_writeLock);
-
-	if (_sock <= 0)
-		return;
-
 	va_start(ap,format);
 	n = (int)::vsnprintf(tmp,sizeof(tmp),format,ap);
 	va_end(ap);
 	if (n <= 0)
 		return;
 
+	Mutex::Lock _l(_writeLock);
+
 #ifdef __WINDOWS__
-	DWORD bsent = 0;
-	WriteFile(_sock,tmp,n,&bsent,NULL);
+	_writeBuf.append(tmp,n);
+	Thread::cancelIO(_thread);
 #else
-	::write(_sock,tmp,n);
+	if (_sock > 0)
+		::write(_sock,tmp,n);
 #endif
 }
 
@@ -140,29 +147,51 @@ void IpcConnection::threadMain()
 	char tmp[65536];
 	char linebuf[65536];
 	unsigned int lineptr = 0;
+	char c;
+
 #ifdef __WINDOWS__
-	HANDLE s;
 	DWORD n,i;
+	std::string wbuf;
 #else
 	int s,n,i;
 #endif
-	char c;
 
-	for(;;) {
+	while (_run) {
 #ifdef __WINDOWS__
-		s = _sock;
-		if (s == INVALID_HANDLE_VALUE)
-			break;
-		if (!ReadFile(s,tmp,sizeof(tmp),&n,NULL))
+		{
+			Mutex::Lock _l(_writeLock);
+			if (!_run)
+				break;
+			if (_writeBuf.length() > 0) {
+				wbuf.append(_writeBuf);
+				_writeBuf.clear();
+			}
+		}
+		if (wbuf.length() > 0) {
+			n = 0;
+			if ((WriteFile(_sock,wbuf.data(),(DWORD)(wbuf.length()),&n,NULL))&&(n > 0)) {
+				if (n < (DWORD)wbuf.length())
+					wbuf.erase(0,n);
+				else wbuf.clear();
+			} else if (GetLastError() != ERROR_OPERATION_ABORTED)
+				break;
+			FlushFileBuffers(_sock);
+		}
+		if (!_run)
 			break;
-		if (n < 0)
+		n = 0;
+		if ((!ReadFile(_sock,tmp,sizeof(tmp),&n,NULL))||(n <= 0)) {
+			if (GetLastError() == ERROR_OPERATION_ABORTED)
+				n = 0;
+			else break;
+		}
+		if (!_run)
 			break;
 #else
-		s = _sock;
-		if (s <= 0)
+		if ((s = _sock) <= 0)
 			break;
 		n = (int)::read(s,tmp,sizeof(tmp));
-		if (n <= 0)
+		if ((n <= 0)||(_sock <= 0))
 			break;
 #endif
 		for(i=0;i<n;++i) {
@@ -177,22 +206,19 @@ void IpcConnection::threadMain()
 		}
 	}
 
-	{
-		_writeLock.lock();
-		s = _sock;
+	_writeLock.lock();
+	bool r = _run;
+	_writeLock.unlock();
+
 #ifdef __WINDOWS__
-		_sock = INVALID_HANDLE_VALUE;
-		if (s != INVALID_HANDLE_VALUE)
-			CloseHandle(s);
-#else
-		_sock = 0;
-		if (s > 0)
-			::close(s);
+	if (_incoming)
+		DisconnectNamedPipe(_sock);
+	CloseHandle(_sock);
+	_running = false;
 #endif
-		_writeLock.unlock();
-	}
 
-	_handler(_arg,this,IPC_EVENT_CONNECTION_CLOSED,(const char *)0);
+	if (r)
+		_handler(_arg,this,IPC_EVENT_CONNECTION_CLOSED,(const char *)0);
 }
 
 } // namespace ZeroTier

+ 6 - 1
node/IpcConnection.hpp

@@ -88,11 +88,16 @@ private:
 	void (*_handler)(void *,IpcConnection *,IpcConnection::EventType,const char *);
 	void *_arg;
 #ifdef __WINDOWS__
-	volatile HANDLE _sock;
+	HANDLE _sock;
+	std::string _writeBuf;
+	bool _incoming;
 #else
 	volatile int _sock;
 #endif
 	Mutex _writeLock;
+	Thread _thread;
+	volatile bool _run;
+	volatile bool _running;
 };
 
 } // namespace ZeroTier

+ 20 - 21
node/IpcListener.cpp

@@ -47,8 +47,8 @@ IpcListener::IpcListener(const char *ep,void (*commandHandler)(void *,IpcConnect
 	_handler(commandHandler),
 	_arg(arg),
 #ifdef __WINDOWS__
-	_sock(INVALID_HANDLE_VALUE),
-	_die(false)
+	_run(true),
+	_running(true)
 #else
 	_sock(0)
 #endif
@@ -94,14 +94,14 @@ IpcListener::IpcListener(const char *ep,void (*commandHandler)(void *,IpcConnect
 IpcListener::~IpcListener()
 {
 #ifdef __WINDOWS__
-	_sock_m.lock();
-	_die = true;
-	HANDLE s = _sock;
-	_sock = INVALID_HANDLE_VALUE;
-	if (s != INVALID_HANDLE_VALUE)
-		CloseHandle(s);
-	_sock_m.unlock();
-	Thread::join(_thread);
+	_run = false;
+	while (_running) {
+		Thread::cancelIO(_thread);
+		HANDLE tmp = CreateFileA(_endpoint.c_str(),GENERIC_READ|GENERIC_WRITE,FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE,NULL,OPEN_EXISTING,0,NULL);
+		if (tmp != INVALID_HANDLE_VALUE)
+			CloseHandle(tmp);
+		Sleep(250);
+	}
 #else
 	int s = _sock;
 	_sock = 0;
@@ -110,7 +110,7 @@ IpcListener::~IpcListener()
 		::close(s);
 	}
 	Thread::join(_thread);
-	unlink(_endpoint.c_str());
+	::unlink(_endpoint.c_str());
 #endif
 }
 
@@ -119,25 +119,24 @@ void IpcListener::threadMain()
 {
 #ifdef __WINDOWS__
 	HANDLE s;
-	while (!_die) {
-		{
-			Mutex::Lock _l(_sock_m);
-			s = _sock = CreateNamedPipeA(_endpoint.c_str(),PIPE_ACCESS_DUPLEX,PIPE_READMODE_BYTE|PIPE_TYPE_BYTE|PIPE_WAIT,PIPE_UNLIMITED_INSTANCES,4096,4096,0,NULL);
-		}
+	while (_run) {
+		s = CreateNamedPipeA(_endpoint.c_str(),PIPE_ACCESS_DUPLEX,PIPE_READMODE_BYTE|PIPE_TYPE_BYTE|PIPE_WAIT,PIPE_UNLIMITED_INSTANCES,1024,1024,0,NULL);
 		if (s != INVALID_HANDLE_VALUE) {
 			if ((ConnectNamedPipe(s,NULL))||(GetLastError() == ERROR_PIPE_CONNECTED)) {
-				Mutex::Lock _l(_sock_m);
+				if (!_run) {
+					DisconnectNamedPipe(s);
+					CloseHandle(s);
+					break;
+				}
 				try {
-					if (s != INVALID_HANDLE_VALUE)
-						_handler(_arg,new IpcConnection(s,_handler,_arg),IpcConnection::IPC_EVENT_NEW_CONNECTION,(const char *)0);
+					_handler(_arg,new IpcConnection(s,_handler,_arg),IpcConnection::IPC_EVENT_NEW_CONNECTION,(const char *)0);
 				} catch ( ... ) {} // handlers should not throw
 			} else {
-				Mutex::Lock _l(_sock_m);
 				CloseHandle(s);
-				_sock = INVALID_HANDLE_VALUE;
 			}
 		}
 	}
+	_running = false;
 #else
 	struct sockaddr_un unaddr;
 	socklen_t socklen;

+ 2 - 3
node/IpcListener.hpp

@@ -83,9 +83,8 @@ private:
 	void (*_handler)(void *,IpcConnection *,IpcConnection::EventType,const char *);
 	void *_arg;
 #ifdef __WINDOWS__
-	volatile HANDLE _sock;
-	volatile bool _die;
-	Mutex _sock_m;
+	volatile bool _run;
+	volatile bool _running;
 #else
 	volatile int _sock;
 #endif

+ 3 - 3
node/SocketManager.cpp

@@ -485,7 +485,7 @@ void SocketManager::poll(unsigned long timeout)
 					FD_SET(sockfd,&_readfds);
 					_fdSetLock.unlock();
 					if ((int)sockfd > (int)_nfds)
-						_nfds = sockfd;
+						_nfds = (int)sockfd;
 				} catch ( ... ) {
 					CLOSE_SOCKET(sockfd);
 				}
@@ -518,7 +518,7 @@ void SocketManager::poll(unsigned long timeout)
 					FD_SET(sockfd,&_readfds);
 					_fdSetLock.unlock();
 					if ((int)sockfd > (int)_nfds)
-						_nfds = sockfd;
+						_nfds = (int)sockfd;
 				} catch ( ... ) {
 					CLOSE_SOCKET(sockfd);
 				}
@@ -662,7 +662,7 @@ void SocketManager::_updateNfds()
 		if (s->second->_sock > nfds)
 			nfds = s->second->_sock;
 	}
-	_nfds = nfds;
+	_nfds = (int)nfds;
 }
 
 } // namespace ZeroTier

+ 7 - 0
node/Thread.hpp

@@ -80,6 +80,13 @@ public:
 		Sleep((DWORD)ms);
 	}
 
+	// Not available on *nix platforms
+	static inline void cancelIO(const Thread &t)
+	{
+		if (t._th != NULL)
+			CancelSynchronousIo(t._th);
+	}
+
 private:
 	HANDLE _th;
 	DWORD _tid;