Browse Source

Wire should be good to test.

Adam Ierymenko 10 years ago
parent
commit
4445bfc869
1 changed files with 204 additions and 18 deletions
  1. 204 18
      osnet/Wire.hpp

+ 204 - 18
osnet/Wire.hpp

@@ -30,8 +30,9 @@
 
 #include <stdio.h>
 #include <stdlib.h>
+#include <string.h>
 
-#ifdef __WINDOWS__
+#if defined(_WIN32) || defined(_WIN64)
 #include <WinSock2.h>
 #include <WS2tcpip.h>
 #include <Windows.h>
@@ -283,7 +284,8 @@ public:
 		sws.type = ZT_WIRE_SOCKET_UDP;
 		sws.sock = s;
 		sws.uptr = uptr;
-		memcpy(&(sws.saddr),localAddress,sizeof(struct sockaddr_storage));
+		memset(&(sws.saddr),0,sizeof(struct sockaddr_storage));
+		memcpy(&(sws.saddr),localAddress,(localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
 
 		return (WireSocket *)&sws;
 	}
@@ -315,6 +317,58 @@ public:
 	{
 		if (_socks.size() >= ZT_WIRE_MAX_SOCKETS)
 			return (WireSocket *)0;
+
+		ZT_WIRE_SOCKFD_TYPE s = ::socket(localAddress->sa_family,SOCK_STREAM,0);
+		if (!ZT_WIRE_SOCKFD_VALID(s))
+			return (WireSocket *)0;
+
+#if defined(_WIN32) || defined(_WIN64)
+		{
+			BOOL f;
+			f = TRUE; ::setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(const char *)&f,sizeof(f));
+			f = TRUE; ::setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(const char *)&f,sizeof(f));
+			f = (_noDelay ? TRUE : FALSE); setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f));
+			u_long iMode=1;
+			ioctlsocket(s,FIONBIO,&iMode);
+		}
+#else
+		{
+			int f;
+			f = 1; ::setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f));
+			f = 1; ::setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f));
+			f = (_noDelay ? 1 : 0); setsockopt(newSock,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f));
+			fcntl(s,F_SETFL,O_NONBLOCK);
+		}
+#endif
+
+		if (::bind(s,localAddress,(localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) {
+			ZT_WIRE_CLOSE_SOCKET(s);
+			return (WireSocket *)0;
+		}
+
+		if (::listen(s,1024)) {
+			ZT_WIRE_CLOSE_SOCKET(s);
+			return (WireSocket *)0;
+		}
+
+		try {
+			_socks.push_back(WireSocketImpl());
+		} catch ( ... ) {
+			ZT_WIRE_CLOSE_SOCKET(s);
+			return (WireSocket *)0;
+		}
+		WireSocketImpl &sws = _socks.back();
+
+		if ((long)s > _nfds)
+			_nfds = (long)s;
+		FD_SET(s,&_readfds);
+		sws.type = ZT_WIRE_SOCKET_TCP_LISTEN;
+		sws.sock = s;
+		sws.uptr = uptr;
+		memset(&(sws.saddr),0,sizeof(struct sockaddr_storage));
+		memcpy(&(sws.saddr),localAddress,(localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
+
+		return (WireSocket *)&sws;
 	}
 
 	/**
@@ -324,29 +378,140 @@ public:
 	 * a return would indicate failure to allocate the socket, too many
 	 * open sockets, etc.
 	 *
+	 * Also note that an "instant connect" may occur for e.g. loopback
+	 * connections. If this happens the 'connected' result paramter will
+	 * be true. If callConnectHandlerOnInstantConnect is true, the
+	 * TCP connect handler will be called before the function returns
+	 * as well in this case. Otherwise it will not.
+	 *
 	 * @param remoteAddress Remote address
 	 * @param uptr Initial value of uptr for new socket
+	 * @param callConnectHandlerOnInstantConnect If true, call TCP connect handler now if an "instant connect" occurs
+	 * @param connected Reference to result paramter set to true if "instant connect" occurs, false otherwise
 	 * @return New socket or NULL on failure
 	 */
-	inline WireSocket *tcpConnect(const struct sockaddr *remoteAddress,void *uptr)
+	inline WireSocket *tcpConnect(const struct sockaddr *remoteAddress,void *uptr,bool callConnectHandlerOnInstantConnect,bool &connected)
 	{
 		if (_socks.size() >= ZT_WIRE_MAX_SOCKETS)
 			return (WireSocket *)0;
+
+		ZT_WIRE_SOCKFD_TYPE s = ::socket(localAddress->sa_family,SOCK_STREAM,0);
+		if (!ZT_WIRE_SOCKFD_VALID(s))
+			return (WireSocket *)0;
+
+#if defined(_WIN32) || defined(_WIN64)
+		{
+			BOOL f;
+			f = TRUE; ::setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(const char *)&f,sizeof(f));
+			f = TRUE; ::setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(const char *)&f,sizeof(f));
+			f = (_noDelay ? TRUE : FALSE); setsockopt(s,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f));
+			u_long iMode=1;
+			ioctlsocket(s,FIONBIO,&iMode);
+		}
+#else
+		{
+			int f;
+			f = 1; ::setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f));
+			f = 1; ::setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f));
+			f = (_noDelay ? 1 : 0); setsockopt(newSock,IPPROTO_TCP,TCP_NODELAY,(char *)&f,sizeof(f));
+			fcntl(s,F_SETFL,O_NONBLOCK);
+		}
+#endif
+
+		connected = true;
+		if (::connect(s,localAddress,(localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))) {
+#if defined(_WIN32) || defined(_WIN64)
+			if (WSAGetLastError() != WSAEWOULDBLOCK) {
+#else
+			if (errno != EINPROGRESS) {
+#endif
+				ZT_WIRE_CLOSE_SOCKET(s);
+				return (WireSocket *)0;
+			} else connected = false;
+		}
+
+		try {
+			_socks.push_back(WireSocketImpl());
+		} catch ( ... ) {
+			ZT_WIRE_CLOSE_SOCKET(s);
+			return (WireSocket *)0;
+		}
+		WireSocketImpl &sws = _socks.back();
+
+		if ((long)s > _nfds)
+			_nfds = (long)s;
+		if (connected) {
+			FD_SET(s,&_readfds);
+			sws.type = ZT_WIRE_SOCKET_TCP_OUT_CONNECTED;
+		} else {
+			FD_SET(s,&_writefds);
+#if defined(_WIN32) || defined(_WIN64)
+			FD_SET(s,&_exceptfds);
+#endif
+			sws.type = ZT_WIRE_SOCKET_TCP_OUT_PENDING;
+		}
+		sws.sock = s;
+		sws.uptr = uptr;
+		memset(&(sws.saddr),0,sizeof(struct sockaddr_storage));
+		memcpy(&(sws.saddr),localAddress,(localAddress->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
+
+		if ((callConnectHandlerOnInstantConnect)&&(connected)) {
+			try {
+				_tcpConnectHandler((WireSocket *)&sws,uptr,true);
+			} catch ( ... ) {}
+		}
+
+		return (WireSocket *)&sws;
 	}
 
 	/**
 	 * Attempt to send data to a TCP connection (non-blocking)
 	 *
+	 * If -1 is returned, the socket should no longer be used as it is now
+	 * destroyed. If callCloseHandler is true, the close handler will be
+	 * called before the function returns.
+	 *
 	 * @param sock An open TCP socket (other socket types will fail)
 	 * @param data Data to send
 	 * @param len Length of data
-	 * @return Number of bytes actually sent or 0 on failure
+	 * @param callCloseHandler If true, call close handler on socket closing failure condition
+	 * @return Number of bytes actually sent or -1 on fatal error (socket closure)
 	 */
-	inline unsigned long tcpSend(WireSocket *sock,WireSocket *data,unsigned long len)
+	inline long tcpSend(WireSocket *sock,WireSocket *data,unsigned long len,bool callCloseHandler)
 	{
 		WireSocketImpl &sws = *(const_cast <WireSocketImpl *>(reinterpret_cast<const WireSocketImpl *>(sock)));
-		long n = ::send(sws.sock,data,len,0);
-		return ((n > 0) ? (unsigned long)n : 0);
+		long n = (long)::send(sws.sock,data,len,0);
+#if defined(_WIN32) || defined(_WIN64)
+		if (n == SOCKET_ERROR) {
+				switch(WSAGetLastError()) {
+					case WSAEINTR:
+					case WSAEWOULDBLOCK:
+						return 0;
+					default:
+						this->close(sock,callCloseHandler);
+						return -1;
+				}
+		}
+#else // not Windows
+		if (n < 0) {
+			switch(errno) {
+#ifdef EAGAIN
+				case EAGAIN:
+#endif
+#if defined(EWOULDBLOCK) && ( !defined(EAGAIN) || (EWOULDBLOCK != EAGAIN) )
+				case EWOULDBLOCK:
+#endif
+#ifdef EINTR
+				case EINTR:
+#endif
+					return 0;
+				default:
+					this->close(sock,callCloseHandler);
+					return -1;
+			}
+		}
+#endif // Windows or not
+		return n;
 	}
 
 	/**
@@ -387,11 +552,15 @@ public:
 
 		memcpy(&rfds,&_readfds,sizeof(rfds));
 		memcpy(&wfds,&_writefds,sizeof(wfds));
+#if defined(_WIN32) || defined(_WIN64)
 		memcpy(&efds,&_exceptfds,sizeof(efds));
+#else
+		FD_ZERO(&efds);
+#endif
 
 		tv.tv_sec = (long)(timeout / 1000);
 		tv.tv_usec = (long)((timeout % 1000) * 1000);
-		select((int)_nfds + 1,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0);
+		::select((int)_nfds + 1,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0);
 
 		if (FD_ISSET(_whackReceiveSocket,&rfds)) {
 			char tmp[16];
@@ -402,26 +571,35 @@ public:
 #endif
 		}
 
-		for(std::list<WireSocketImpl>::iterator s(_socks.begin());s!=_socks.end();++s) {
+		for(std::list<WireSocketImpl>::iterator s(_socks.begin()),nexts;s!=_socks.end();s=nexts) {
+			nexts = s; ++nexts; // we can delete the linked list item, so traverse now
+
 			switch (s->type) {
+
 				case ZT_WIRE_SOCKET_TCP_OUT_PENDING:
+#if defined(_WIN32) || defined(_WIN64)
 					if (FD_ISSET(s->sock,&efds))
 						this->close((WireSocket *)&(_socks[i]),true);
-					else if (FD_ISSET(s->sock,&wfds)) {
+					else // if ... below
+#endif
+					if (FD_ISSET(s->sock,&wfds)) {
 						socklen_t slen = sizeof(ss);
-						if (::getpeername(s->sock,(strut sockaddr *)&ss,&slen) != 0)
+						if (::getpeername(s->sock,(struct sockaddr *)&ss,&slen) != 0) {
 							this->close((WireSocket *)&(_socks[i]),true);
-						else {
+						} else {
 							s->type = ZT_WIRE_SOCKET_TCP_OUT_CONNECTED;
 							FD_SET(s->sock,&_readfds);
 							FD_CLR(s->sock,&_writefds);
+#if defined(_WIN32) || defined(_WIN64)
 							FD_CLR(s->sock,&_exceptfds);
+#endif
 							try {
 								_tcpConnectHandler((WireSocket *)&(_socks[i]),&(s->uptr),true);
 							} catch ( ... ) {}
 						}
 					}
 					break;
+
 				case ZT_WIRE_SOCKET_TCP_OUT_CONNECTED:
 				case ZT_WIRE_SOCKET_TCP_IN:
 					if (FD_ISSET(s->sock,&rfds)) {
@@ -440,6 +618,7 @@ public:
 						} catch ( ... ) {}
 					}
 					break;
+
 				case ZT_WIRE_SOCKET_TCP_LISTEN:
 					if (FD_ISSET(s->sock,&rfds)) {
 						memset(&ss,0,sizeof(ss));
@@ -461,7 +640,7 @@ public:
 								FD_SET(newSock,&_readfds);
 								if ((long)newSock > _nfds)
 									_nfds = (long)newSock;
-								sws.type = ZT_WIRE_SOCKET_UDP;
+								sws.type = ZT_WIRE_SOCKET_TCP_IN;
 								sws.sock = s;
 								sws.uptr = (void *)0;
 								memcpy(&(sws.saddr),&ss,sizeof(struct sockaddr_storage));
@@ -471,6 +650,7 @@ public:
 						}
 					}
 					break;
+
 				case ZT_WIRE_SOCKET_UDP:
 					if (FD_ISSET(s->sock,&rfds)) {
 						memset(&ss,0,sizeof(ss));
@@ -483,8 +663,10 @@ public:
 						}
 					}
 					break;
+
 				default:
 					break;
+
 			}
 		}
 	}
@@ -497,7 +679,9 @@ public:
 
 		FD_CLR(sws.sock,&_readfds);
 		FD_CLR(sws.sock,&_writefds);
+#if defined(_WIN32) || defined(_WIN64)
 		FD_CLR(sws.sock,&_exceptfds);
+#endif
 
 		ZT_WIRE_CLOSE_SOCKET(sws.sock);
 
@@ -575,7 +759,7 @@ private:
 		ZT_WIRE_SOCKET_TCP_OUT_PENDING = 0x00,
 		ZT_WIRE_SOCKET_TCP_OUT_CONNECTED = 0x01,
 		ZT_WIRE_SOCKET_TCP_IN = 0x02,
-		ZT_WIRE_SOCKET_TCP_LISTEN = 0x03, // isTCP() == ((type & 0x03) != 0)
+		ZT_WIRE_SOCKET_TCP_LISTEN = 0x03,
 		ZT_WIRE_SOCKET_RAW = 0x04,
 		ZT_WIRE_SOCKET_UDP = 0x05
 	};
@@ -585,11 +769,9 @@ private:
 		WireSocketType type;
 		ZT_WIRE_SOCKFD_TYPE sock;
 		void *uptr; // user-settable pointer
-		ZT_WIRE_SOCKADDR_STORAGE_TYPE saddr; // from address for TCP_IN, local address otherwise
+		ZT_WIRE_SOCKADDR_STORAGE_TYPE saddr; // remote for TCP_OUT and TCP_IN, local for TCP_LISTEN, RAW, and UDP
 	};
 
-	inline bool _isTCP(const WireSocketImpl &sws) const throw() { return ((((unsigned int)sws.type) & 0x03) != 0); }
-
 	ON_DATAGRAM_FUNCTION _dgHandler;
 	ON_TCP_CONNECT_FUNCTION _tcpConnectHandler;
 	ON_TCP_ACCEPT_FUNCTION _tcpAcceptHandler;
@@ -598,7 +780,11 @@ private:
 	ON_TCP_WRITABLE_FUNCTION _tcpWritableHandler;
 
 	std::list<WireSocketImpl> _socks;
-	fd_set _readfds,_writefds,_exceptfds;
+	fd_set _readfds;
+	fd_set _writefds;
+#if defined(_WIN32) || defined(_WIN64)
+	fd_set _exceptfds;	
+#endif
 	long _nfds;
 
 	ZT_WIRE_SOCKFD_TYPE _whackReceiveSocket;