Browse Source

Reworking of paths in Peer work-in-progress, and TCP connect support in SocketManager. Also add FD_SETSIZE checking for the default select implementation of sockets.

Adam Ierymenko 11 years ago
parent
commit
45e823d27c
7 changed files with 258 additions and 189 deletions
  1. 7 2
      node/Constants.hpp
  2. 125 0
      node/Path.hpp
  3. 2 4
      node/Peer.cpp
  4. 25 148
      node/Peer.hpp
  5. 95 34
      node/SocketManager.cpp
  6. 1 1
      node/SocketManager.hpp
  7. 3 0
      node/TcpSocket.cpp

+ 7 - 2
node/Constants.hpp

@@ -297,12 +297,17 @@ error_no_byte_order_defined;
  * 
  * A link that hasn't spoken in this long is simply considered inactive.
  */
-#define ZT_PEER_LINK_ACTIVITY_TIMEOUT ((ZT_PEER_DIRECT_PING_DELAY * 2) + 1000)
+#define ZT_PEER_PATH_ACTIVITY_TIMEOUT ((ZT_PEER_DIRECT_PING_DELAY * 2) + 1000)
 
 /**
  * Close TCP tunnels if unused for this long
  */
-#define ZT_TCP_TUNNEL_ACTIVITY_TIMEOUT ZT_PEER_LINK_ACTIVITY_TIMEOUT
+#define ZT_TCP_TUNNEL_ACTIVITY_TIMEOUT ZT_PEER_PATH_ACTIVITY_TIMEOUT
+
+/**
+ * Try TCP tunnels if no response to UDP PINGs in this many milliseconds
+ */
+#define ZT_TCP_FALLBACK_AFTER 5000
 
 /**
  * Stop relaying via peers that have not responded to direct sends in this long

+ 125 - 0
node/Path.hpp

@@ -0,0 +1,125 @@
+/*
+ * ZeroTier One - Global Peer to Peer Ethernet
+ * Copyright (C) 2011-2014  ZeroTier Networks LLC
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * ZeroTier may be used and distributed under the terms of the GPLv3, which
+ * are available at: http://www.gnu.org/licenses/gpl-3.0.html
+ *
+ * If you would like to embed ZeroTier into a commercial application or
+ * redistribute it in a modified binary form, please contact ZeroTier Networks
+ * LLC. Start here: http://www.zerotier.com/
+ */
+
+#ifndef ZT_PATH_HPP
+#define ZT_PATH_HPP
+
+#include <stdint.h>
+
+#include "Constants.hpp"
+#include "InetAddress.hpp"
+#include "Utils.hpp"
+
+#include <string>
+
+namespace ZeroTier {
+
+/**
+ * WAN address and protocol for reaching a peer
+ */
+class Path
+{
+public:
+	Path() :
+		_lastSent(0),
+		_lastReceived(0),
+		_lastFirewallOpener(0),
+		_lastPing(0),
+		_addr(),
+		_tcp(false),
+		_fixed(false) {}
+
+	Path(const InetAddress &addr,bool tcp,bool fixed) :
+		_lastSent(0),
+		_lastReceived(0),
+		_lastFirewallOpener(0),
+		_lastPing(0),
+		_addr(addr),
+		_tcp(tcp),
+		_fixed(fixed) {}
+
+	inline const InetAddress &address() const throw() { return _addr; }
+	inline bool tcp() const throw() { return _tcp; }
+	inline uint64_t lastSent() const throw() { return _lastSent; }
+	inline uint64_t lastReceived() const throw() { return _lastReceived; }
+	inline uint64_t lastFirewallOpener() const throw() { return _lastFirewallOpener; }
+	inline uint64_t lastPing() const throw() { return _lastPing; }
+	inline bool fixed() const throw() { return _fixed; }
+
+	inline void sent(uint64_t t) throw() { _lastSent = t; }
+	inline void received(uint64_t t) throw() { _lastReceived = t; }
+	inline void firewallOpenerSent(uint64_t t) throw() { _lastFirewallOpener = t; }
+	inline void pinged(uint64_t t) throw() { _lastPing = t; }
+
+	inline bool active(uint64_t now) const
+		throw()
+	{
+		return ((_addr)&&((_fixed)||((now - _lastReceived) < ZT_PEER_PATH_ACTIVITY_TIMEOUT)));
+	}
+
+	/**
+	 * @return Human-readable address and other information about this path, some computed as of current time
+	 */
+	inline std::string toString() const
+	{
+		uint64_t now = Utils::now();
+		char lsago[32],lrago[32],lfoago[32],lpago[32];
+		Utils::snprintf(lsago,sizeof(lsago),"%lld",(long long)((_lastSent != 0) ? (now - _lastSent) : -1));
+		Utils::snprintf(lrago,sizeof(lrago),"%lld",(long long)((_lastReceived != 0) ? (now - _lastReceived) : -1));
+		Utils::snprintf(lfoago,sizeof(lfoago),"%lld",(long long)((_lastFirewallOpener != 0) ? (now - _lastFirewallOpener) : -1));
+		Utils::snprintf(lpago,sizeof(lfoago),"%lld",(long long)((_lastPing != 0) ? (now - _lastPing) : -1));
+		return (std::string(_tcp ? "tcp:" : "udp:") + _addr.toString() + "[" + lsago + "," lrago + "," + lpago + "," + lfoago + "," + (active(now) ? "active" : "inactive") + "," + (_fixed ? "fixed" : "learned") + "]");
+	}
+
+	inline operator==(const Path &p) const throw() { return ((_addr == p._addr)&&(_tcp == p._tcp)); }
+	inline operator!=(const Path &p) const throw() { return ((_addr != p._addr)||(_tcp != p._tcp)); }
+	inline operator<(const Path &p) const
+		throw()
+	{
+		if (_addr == p._addr) {
+			if (!_tcp) // UDP < TCP
+				return p._tcp;
+			return false;
+		} else return (_addr < p._addr);
+	}
+	inline bool operator>(const Path &p) const throw() { return (p < *this); }
+	inline bool operator<=(const Path &p) const throw() { return !(p < *this); }
+	inline bool operator>=(const Path &p) const throw() { return !(*this < p); }
+
+private:
+	volatile uint64_t _lastSent;
+	volatile uint64_t _lastReceived;
+	volatile uint64_t _lastFirewallOpener;
+	volatile uint64_t _lastPing;
+	InetAddress _addr;
+	bool _tcp;
+	bool _fixed;
+};
+
+} // namespace ZeroTier
+
+#endif

+ 2 - 4
node/Peer.cpp

@@ -34,12 +34,11 @@ namespace ZeroTier {
 
 Peer::Peer() :
 	_id(),
-	_ipv4p(),
-	_ipv6p(),
 	_lastUsed(0),
 	_lastUnicastFrame(0),
 	_lastMulticastFrame(0),
 	_lastAnnouncedTo(0),
+	_lastPinged(0),
 	_vMajor(0),
 	_vMinor(0),
 	_vRevision(0),
@@ -50,12 +49,11 @@ Peer::Peer() :
 Peer::Peer(const Identity &myIdentity,const Identity &peerIdentity)
 	throw(std::runtime_error) :
 	_id(peerIdentity),
-	_ipv4p(),
-	_ipv6p(),
 	_lastUsed(0),
 	_lastUnicastFrame(0),
 	_lastMulticastFrame(0),
 	_lastAnnouncedTo(0),
+	_lastPinged(0),
 	_vMajor(0),
 	_vMinor(0),
 	_vRevision(0),

+ 25 - 148
node/Peer.hpp

@@ -30,11 +30,13 @@
 
 #include <stdint.h>
 
+#include <vector>
 #include <algorithm>
 #include <utility>
 #include <stdexcept>
 
 #include "Constants.hpp"
+#include "Path.hpp"
 #include "Address.hpp"
 #include "Utils.hpp"
 #include "Identity.hpp"
@@ -53,15 +55,7 @@
 namespace ZeroTier {
 
 /**
- * A peer on the network
- * 
- * Threading note:
- *
- * This structure contains no locks at the moment, but also performs no
- * memory allocation or pointer manipulation. As a result is is technically
- * "safe" for threads, as in won't crash. Right now it's only changed from
- * the core I/O thread so this isn't an issue. If multiple I/O threads are
- * introduced it ought to have a lock of some kind.
+ * Peer on P2P Network
  */
 class Peer : NonCopyable
 {
@@ -127,13 +121,16 @@ public:
 		uint64_t now);
 
 	/**
-	 * Send a UDP packet to this peer directly (not via relaying)
-	 * 
+	 * Send a packet to this peer using the most recently active direct path
+	 *
+	 * This does not relay. It returns false if there are no available active
+	 * paths.
+	 *
 	 * @param _r Runtime environment
 	 * @param data Data to send
 	 * @param len Length of packet
 	 * @param now Current time
-	 * @return True if packet appears to have been sent
+	 * @return True if packet appears to have been sent, false if no path or other error
 	 */
 	bool send(const RuntimeEnvironment *_r,const void *data,unsigned int len,uint64_t now);
 
@@ -148,6 +145,9 @@ public:
 
 	/**
 	 * Send HELLO to a peer via all active direct paths available
+	 *
+	 * This begins attempting to use TCP paths if no ping response has been
+	 * received from any UDP path in more than ZT_TCP_FALLBACK_AFTER.
 	 * 
 	 * @param _r Runtime environment
 	 * @param now Current time
@@ -155,28 +155,12 @@ public:
 	 */
 	bool sendPing(const RuntimeEnvironment *_r,uint64_t now);
 
-	/**
-	 * Set an address to reach this peer
-	 *
-	 * @param addr Address to set
-	 * @param fixed If true, address is fixed (won't be changed on packet receipt)
-	 */
-	void setPathAddress(const InetAddress &addr,bool fixed);
-
-	/**
-	 * Clear the fixed flag for an address type
-	 *
-	 * @param t Type to clear, or TYPE_NULL to clear flag on all types
-	 */
-	void clearFixedFlag(InetAddress::AddressType t);
-
 	/**
 	 * @return Last successfully sent firewall opener
 	 */
 	inline uint64_t lastFirewallOpener() const
 		throw()
 	{
-		return std::max(_ipv4p.lastFirewallOpener,_ipv6p.lastFirewallOpener);
 	}
 
 	/**
@@ -185,7 +169,6 @@ public:
 	inline uint64_t lastDirectReceive() const
 		throw()
 	{
-		return std::max(_ipv4p.lastReceive,_ipv6p.lastReceive);
 	}
 
 	/**
@@ -194,7 +177,6 @@ public:
 	inline uint64_t lastDirectSend() const
 		throw()
 	{
-		return std::max(_ipv4p.lastSend,_ipv6p.lastSend);
 	}
 
 	/**
@@ -261,58 +243,36 @@ public:
 	/**
 	 * @return True if this peer has at least one direct IP address path
 	 */
-	inline bool hasDirectPath() const throw() { return ((_ipv4p.addr)||(_ipv6p.addr)); }
+	inline bool hasDirectPath() const
+		throw()
+	{
+	}
 
 	/**
 	 * @param now Current time
 	 * @return True if this peer has at least one active or fixed direct path
 	 */
-	inline bool hasActiveDirectPath(uint64_t now) const throw() { return ((_ipv4p.isActive(now))||(_ipv6p.isActive(now))); }
-
-	/**
-	 * @return IPv4 direct address or null InetAddress if none
-	 */
-	inline InetAddress ipv4Path() const throw() { return _ipv4p.addr; }
-
-	/**
-	 * @return IPv6 direct address or null InetAddress if none
-	 */
-	inline InetAddress ipv6Path() const throw() { return _ipv4p.addr; }
-
-	/**
-	 * @return IPv4 direct address or null InetAddress if none
-	 */
-	inline InetAddress ipv4ActivePath(uint64_t now) const
+	inline bool hasActiveDirectPath(uint64_t now) const
 		throw()
 	{
-		if (_ipv4p.isActive(now))
-			return _ipv4p.addr;
-		return InetAddress();
 	}
 
 	/**
-	 * @return IPv6 direct address or null InetAddress if none
+	 * Add a path (if we don't already have it)
+	 *
+	 * @param p New path to add
 	 */
-	inline InetAddress ipv6ActivePath(uint64_t now) const
-		throw()
+	inline void addPath(const Path &p)
 	{
-		if (_ipv6p.isActive(now))
-			return _ipv6p.addr;
-		return InetAddress();
 	}
 
 	/**
-	 * Forget direct paths
+	 * Clear paths
 	 *
-	 * @param fixedToo If true, also forget 'fixed' paths.
+	 * @param fixedToo If true, clear fixed paths as well as learned ones
 	 */
-	inline void forgetDirectPaths(bool fixedToo)
-		throw()
+	inline void clearPaths(bool fixedToo)
 	{
-		if ((fixedToo)||(!_ipv4p.fixed))
-			_ipv4p.addr.zero();
-		if ((fixedToo)||(!_ipv6p.fixed))
-			_ipv6p.addr.zero();
 	}
 
 	/**
@@ -416,93 +376,10 @@ public:
 	}
 
 private:
-	/**
-	 * A direct IP path to a peer
-	 */
-	class WanPath
-	{
-	public:
-		WanPath() :
-			lastSend(0),
-			lastReceive(0),
-			lastFirewallOpener(0),
-			addr(),
-			fixed(false)
-		{
-		}
-
-		inline bool isActive(const uint64_t now) const
-			throw()
-		{
-			return ((addr)&&((fixed)||((now - lastReceive) < ZT_PEER_LINK_ACTIVITY_TIMEOUT)));
-		}
-
-		template<unsigned int C>
-		inline void serialize(Buffer<C> &b)
-			throw(std::out_of_range)
-		{
-			b.append(lastSend);
-			b.append(lastReceive);
-			b.append(lastFirewallOpener);
-
-			b.append((unsigned char)addr.type());
-			switch(addr.type()) {
-				case InetAddress::TYPE_NULL:
-					break;
-				case InetAddress::TYPE_IPV4:
-					b.append(addr.rawIpData(),4);
-					b.append((uint16_t)addr.port());
-					break;
-				case InetAddress::TYPE_IPV6:
-					b.append(addr.rawIpData(),16);
-					b.append((uint16_t)addr.port());
-					break;
-			}
-
-			b.append(fixed ? (unsigned char)1 : (unsigned char)0);
-		}
-
-		template<unsigned int C>
-		inline unsigned int deserialize(const Buffer<C> &b,unsigned int startAt = 0)
-			throw(std::out_of_range,std::invalid_argument)
-		{
-			unsigned int p = startAt;
-
-			lastSend = b.template at<uint64_t>(p); p += sizeof(uint64_t);
-			lastReceive = b.template at<uint64_t>(p); p += sizeof(uint64_t);
-			lastFirewallOpener = b.template at<uint64_t>(p); p += sizeof(uint64_t);
-
-			switch ((InetAddress::AddressType)b[p++]) {
-				case InetAddress::TYPE_NULL:
-					addr.zero();
-					break;
-				case InetAddress::TYPE_IPV4:
-					addr.set(b.field(p,4),4,b.template at<uint16_t>(p + 4));
-					p += 4 + sizeof(uint16_t);
-					break;
-				case InetAddress::TYPE_IPV6:
-					addr.set(b.field(p,16),16,b.template at<uint16_t>(p + 16));
-					p += 16 + sizeof(uint16_t);
-					break;
-			}
-
-			fixed = (b[p++] != 0);
-
-			return (p - startAt);
-		}
-
-		uint64_t lastSend;
-		uint64_t lastReceive;
-		uint64_t lastFirewallOpener;
-		InetAddress addr; // null InetAddress if path is undefined
-		bool fixed; // do not learn address from received packets
-	};
-
 	unsigned char _key[ZT_PEER_SECRET_KEY_LENGTH];
 	Identity _id;
 
-	WanPath _ipv4p;
-	WanPath _ipv6p;
+	std::vector<Path> _paths;
 
 	volatile uint64_t _lastUsed;
 	volatile uint64_t _lastUnicastFrame;

+ 95 - 34
node/SocketManager.cpp

@@ -37,6 +37,7 @@
 #include "TcpSocket.hpp"
 
 #ifndef __WINDOWS__
+#include <errno.h>
 #include <unistd.h>
 #include <sys/socket.h>
 #include <arpa/inet.h>
@@ -48,6 +49,12 @@
 #define INVALID_SOCKET (-1)
 #endif
 
+#ifdef __WINDOWS__
+#define CLOSE_SOCKET(s) ::closesocket(s)
+#else
+#define CLOSE_SOCKET(s) ::close(s)
+#endif
+
 namespace ZeroTier {
 
 #ifdef __WINDOWS__
@@ -256,11 +263,7 @@ SocketManager::SocketManager(
 			sin6.sin6_port = htons(localUdpPort);
 			memcpy(&(sin6.sin6_addr),&in6addr_any,sizeof(struct in6_addr));
 			if (::bind(s,(const struct sockaddr *)&sin6,sizeof(sin6))) {
-#ifdef __WINDOWS__
-				::closesocket(s);
-#else
-				::close(s);
-#endif
+				CLOSE_SOCKET(s);
 				_closeSockets();
 				throw std::runtime_error("unable to bind to port");
 			}
@@ -308,11 +311,8 @@ SocketManager::SocketManager(
 			sin4.sin_port = htons(localUdpPort);
 			sin4.sin_addr.s_addr = INADDR_ANY;
 			if (::bind(s,(const struct sockaddr *)&sin4,sizeof(sin4))) {
-#ifdef __WINDOWS__
-				::closesocket(s);
-#else
-				::close(s);
-#endif
+				CLOSE_SOCKET(s);
+				_closeSockets();
 				throw std::runtime_error("unable to bind to port");
 			}
 
@@ -334,6 +334,59 @@ SocketManager::~SocketManager()
 bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned int msglen)
 {
 	if (tcp) {
+		SharedPtr<Socket> ts;
+		{
+			Mutex::Lock _l(_tcpSockets_m);
+			std::map< InetAddress,SharedPtr<Socket> >::iterator opents(_tcpSockets.find(to));
+			if (opents != _tcpSockets.end())
+				ts = opents->second;
+		}
+		if (ts)
+			return ts->send(to,msg,msglen);
+
+#ifdef __WINDOWS__
+		SOCKET s = ::socket(to.isV4() ? AF_INET : AF_INET6,SOCK_STREAM,0);
+		if (s == INVALID_SOCKET)
+			return false;
+		if (s >= FD_SETSIZE) {
+			::closesocket(s);
+			return false;
+		}
+#else
+		int s = ::socket(to.isV4() ? AF_INET : AF_INET6,SOCK_STREAM,0);
+		if (s <= 0)
+			return false;
+		if (s >= FD_SETSIZE) {
+			::close(s);
+			return false;
+		}
+#endif
+		fcntl(s,F_SETFL,O_NONBLOCK);
+
+		bool connecting = false;
+		if (connect(s,to.saddr(),to.saddrLen())) {
+			if (errno != EINPROGRESS) {
+				CLOSE_SOCKET(s);
+				return false;
+			} else connecting = true;
+		}
+
+		ts = SharedPtr<Socket>(new TcpSocket(this,s,connecting,to));
+		if (!ts->send(to,msg,msglen))
+			return false;
+
+		_fdSetLock.lock();
+		FD_SET(s,&_readfds);
+		if (connecting)
+			FD_SET(s,&_writefds);
+		_fdSetLock.unlock();
+
+		{
+			Mutex::Lock _l(_tcpSockets_m);
+			_tcpSockets[to] = ts;
+		}
+
+		return true;
 	} else if (to.isV4()) {
 		if (_udpV4Socket)
 			return _udpV4Socket->send(to,msg,msglen);
@@ -397,21 +450,25 @@ void SocketManager::poll(unsigned long timeout)
 #else
 		if (sockfd > 0) {
 #endif
-			InetAddress fromia((const struct sockaddr *)&from);
-			Mutex::Lock _l2(_tcpSockets_m);
-			try {
-				_tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia));
+			if (sockfd < FD_SETSIZE) {
+				InetAddress fromia((const struct sockaddr *)&from);
+				Mutex::Lock _l2(_tcpSockets_m);
+				try {
+					_tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia));
 
-				fcntl(sockfd,F_SETFL,O_NONBLOCK);
+					fcntl(sockfd,F_SETFL,O_NONBLOCK);
 
-				_fdSetLock.lock();
-				FD_SET(sockfd,&_readfds);
-				_fdSetLock.unlock();
+					_fdSetLock.lock();
+					FD_SET(sockfd,&_readfds);
+					_fdSetLock.unlock();
 
-				if (sockfd > _nfds)
-					_nfds = sockfd;
-			} catch ( ... ) {
-				::close(sockfd);
+					if (sockfd > _nfds)
+						_nfds = sockfd;
+				} catch ( ... ) {
+					CLOSE_SOCKET(sockfd);
+				}
+			} else {
+				CLOSE_SOCKET(sockfd);
 			}
 		}
 	}
@@ -424,21 +481,25 @@ void SocketManager::poll(unsigned long timeout)
 #else
 		if (sockfd > 0) {
 #endif
-			InetAddress fromia((const struct sockaddr *)&from);
-			Mutex::Lock _l2(_tcpSockets_m);
-			try {
-				_tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia));
+			if (sockfd < FD_SETSIZE) {
+				InetAddress fromia((const struct sockaddr *)&from);
+				Mutex::Lock _l2(_tcpSockets_m);
+				try {
+					_tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia));
 
-				fcntl(sockfd,F_SETFL,O_NONBLOCK);
+					fcntl(sockfd,F_SETFL,O_NONBLOCK);
 
-				_fdSetLock.lock();
-				FD_SET(sockfd,&_readfds);
-				_fdSetLock.unlock();
+					_fdSetLock.lock();
+					FD_SET(sockfd,&_readfds);
+					_fdSetLock.unlock();
 
-				if (sockfd > _nfds)
-					_nfds = sockfd;
-			} catch ( ... ) {
-				::close(sockfd);
+					if (sockfd > _nfds)
+						_nfds = sockfd;
+				} catch ( ... ) {
+					CLOSE_SOCKET(sockfd);
+				}
+			} else {
+				CLOSE_SOCKET(sockfd);
 			}
 		}
 	}

+ 1 - 1
node/SocketManager.hpp

@@ -128,7 +128,7 @@ private:
 		} catch ( ... ) {} // handlers shouldn't throw
 	}
 
-	// Called by socket implementations to register or unregister for available-for-write notification on underlying _sock
+	// Used by TcpSocket to register/unregister for write availability notification
 	inline void startNotifyWrite(const Socket *sock)
 		throw()
 	{

+ 3 - 0
node/TcpSocket.cpp

@@ -159,6 +159,9 @@ bool TcpSocket::notifyAvailableForWrite(const SharedPtr<Socket> &self,SocketMana
 {
 	Mutex::Lock _l(_writeLock);
 
+	if (_connecting)
+		_connecting = false;
+
 	if (_outptr) {
 		int n = (int)::send(_sock,_outbuf,_outptr,0);
 		if (n < 0) {