Browse Source

The return of peer peristence.

Adam Ierymenko 10 years ago
parent
commit
76a95dc58f
4 changed files with 82 additions and 16 deletions
  1. 11 4
      node/Peer.hpp
  2. 2 2
      node/RemotePath.hpp
  3. 66 7
      node/Topology.cpp
  4. 3 3
      node/Topology.hpp

+ 11 - 4
node/Peer.hpp

@@ -53,6 +53,10 @@
 #include "Mutex.hpp"
 #include "NonCopyable.hpp"
 
+// Very rough computed estimate: (8 + 256 + 80 + (16 * 64) + (128 * 256) + (128 * 16))
+// 1048576 provides tons of headroom -- overflow would just cause peer not to be persisted
+#define ZT_PEER_SUGGESTED_SERIALIZATION_BUFFER_SIZE 1048576
+
 namespace ZeroTier {
 
 /**
@@ -450,7 +454,7 @@ public:
 	{
 		Mutex::Lock _l(_lock);
 
-		const unsigned int lengthAt = b.size();
+		const unsigned int atPos = b.size();
 		b.addSize(4); // space for uint32_t field length
 
 		b.append((uint32_t)1); // version of serialized Peer data
@@ -498,7 +502,7 @@ public:
 			}
 		}
 
-		b.setAt(lengthAt,(uint32_t)((b.size() - 4) - lengthAt)); // set size, not including size field itself
+		b.setAt(atPos,(uint32_t)(b.size() - atPos)); // set size
 	}
 
 	/**
@@ -512,9 +516,10 @@ public:
 	template<unsigned int C>
 	static inline SharedPtr<Peer> deserializeNew(const Identity &myIdentity,const Buffer<C> &b,unsigned int &p)
 	{
-		const uint32_t recSize = b.template at<uint32_t>(p); p += 4;
+		const uint32_t recSize = b.template at<uint32_t>(p);
 		if ((p + recSize) > b.size())
 			return SharedPtr<Peer>(); // size invalid
+		p += 4;
 		if (b.template at<uint32_t>(p) != 1)
 			return SharedPtr<Peer>(); // version mismatch
 		p += 4;
@@ -540,7 +545,7 @@ public:
 		np->_vRevision = b.template at<uint16_t>(p); p += 2;
 		np->_latency = b.template at<uint32_t>(p); p += 4;
 
-		const unsigned int numPaths = b.template at<uint32_t>(p); p += 2;
+		const unsigned int numPaths = b.template at<uint32_t>(p); p += 4;
 		for(unsigned int i=0;i<numPaths;++i) {
 			if (i < ZT_MAX_PEER_NETWORK_PATHS) {
 				p += np->_paths[np->_numPaths++].deserialize(b,p);
@@ -564,6 +569,8 @@ public:
 			const uint64_t ts = b.template at<uint64_t>(p); p += 8;
 			np->_lastPushedComs.set(nwid,ts);
 		}
+
+		return np;
 	}
 
 private:

+ 2 - 2
node/RemotePath.hpp

@@ -163,8 +163,8 @@ public:
 		_lastSend = b.template at<uint64_t>(p); p += 8;
 		_lastReceived = b.template at<uint64_t>(p); p += 8;
 		p += _localAddress.deserialize(b,p);
-		_flags = b.template at<uint16_t>(p); p += 4;
-		return (startAt - p);
+		_flags = b.template at<uint16_t>(p); p += 2;
+		return (p - startAt);
 	}
 
 protected:

+ 66 - 7
node/Topology.cpp

@@ -31,6 +31,7 @@
 #include "Defaults.hpp"
 #include "Dictionary.hpp"
 #include "Node.hpp"
+#include "Buffer.hpp"
 
 namespace ZeroTier {
 
@@ -38,10 +39,68 @@ Topology::Topology(const RuntimeEnvironment *renv) :
 	RR(renv),
 	_amRoot(false)
 {
+	std::string alls(RR->node->dataStoreGet("peers.save"));
+	const uint8_t *all = reinterpret_cast<const uint8_t *>(alls.data());
+	RR->node->dataStoreDelete("peers.save");
+
+	unsigned int ptr = 0;
+	while ((ptr + 4) < alls.size()) {
+		// Each Peer serializes itself prefixed by a record length (not including the size of the length itself)
+		unsigned int reclen = (unsigned int)all[ptr] & 0xff;
+		reclen <<= 8;
+		reclen |= (unsigned int)all[ptr + 1] & 0xff;
+		reclen <<= 8;
+		reclen |= (unsigned int)all[ptr + 2] & 0xff;
+		reclen <<= 8;
+		reclen |= (unsigned int)all[ptr + 3] & 0xff;
+
+		if (((ptr + reclen) > alls.size())||(reclen > ZT_PEER_SUGGESTED_SERIALIZATION_BUFFER_SIZE))
+			break;
+
+		try {
+			unsigned int pos = 0;
+			SharedPtr<Peer> p(Peer::deserializeNew(RR->identity,Buffer<ZT_PEER_SUGGESTED_SERIALIZATION_BUFFER_SIZE>(all + ptr,reclen),pos));
+			if (pos != reclen)
+				break;
+			ptr += pos;
+			if ((p)&&(p->address() != RR->identity.address())) {
+				_peers[p->address()] = p;
+			} else {
+				break; // stop if invalid records
+			}
+		} catch (std::exception &exc) {
+			break;
+		} catch ( ... ) {
+			break; // stop if invalid records
+		}
+	}
+
+	clean(RR->node->now());
 }
 
 Topology::~Topology()
 {
+	Buffer<ZT_PEER_SUGGESTED_SERIALIZATION_BUFFER_SIZE> pbuf;
+	std::string all;
+
+	Address *a = (Address *)0;
+	SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
+	Hashtable< Address,SharedPtr<Peer> >::Iterator i(_peers);
+	while (i.next(a,p)) {
+		if (std::find(_rootAddresses.begin(),_rootAddresses.end(),*a) == _rootAddresses.end()) {
+			pbuf.clear();
+			try {
+				(*p)->serialize(pbuf);
+				try {
+					all.append((const char *)pbuf.data(),pbuf.size());
+				} catch ( ... ) {
+					return; // out of memory? just skip
+				}
+			} catch ( ... ) {} // peer too big? shouldn't happen, but it so skip
+		}
+	}
+
+	RR->node->dataStorePut("peers.save",all,true);
 }
 
 void Topology::setRootServers(const std::map< Identity,std::vector<InetAddress> > &sn)
@@ -58,7 +117,7 @@ void Topology::setRootServers(const std::map< Identity,std::vector<InetAddress>
 
 	for(std::map< Identity,std::vector<InetAddress> >::const_iterator i(sn.begin());i!=sn.end();++i) {
 		if (i->first != RR->identity) { // do not add self as a peer
-			SharedPtr<Peer> &p = _activePeers[i->first.address()];
+			SharedPtr<Peer> &p = _peers[i->first.address()];
 			if (!p)
 				p = SharedPtr<Peer>(new Peer(RR->identity,i->first));
 			for(std::vector<InetAddress>::const_iterator j(i->second.begin());j!=i->second.end();++j)
@@ -103,7 +162,7 @@ SharedPtr<Peer> Topology::addPeer(const SharedPtr<Peer> &peer)
 	const uint64_t now = RR->node->now();
 	Mutex::Lock _l(_lock);
 
-	SharedPtr<Peer> &p = _activePeers.set(peer->address(),peer);
+	SharedPtr<Peer> &p = _peers.set(peer->address(),peer);
 	p->use(now);
 	_saveIdentity(p->identity());
 
@@ -120,7 +179,7 @@ SharedPtr<Peer> Topology::getPeer(const Address &zta)
 	const uint64_t now = RR->node->now();
 	Mutex::Lock _l(_lock);
 
-	SharedPtr<Peer> &ap = _activePeers[zta];
+	SharedPtr<Peer> &ap = _peers[zta];
 
 	if (ap) {
 		ap->use(now);
@@ -136,7 +195,7 @@ SharedPtr<Peer> Topology::getPeer(const Address &zta)
 		} catch ( ... ) {} // invalid identity?
 	}
 
-	_activePeers.erase(zta);
+	_peers.erase(zta);
 
 	return SharedPtr<Peer>();
 }
@@ -160,7 +219,7 @@ SharedPtr<Peer> Topology::getBestRoot(const Address *avoid,unsigned int avoidCou
 					if (++sna == _rootAddresses.end())
 						sna = _rootAddresses.begin(); // wrap around at end
 					if (*sna != RR->identity.address()) { // pick one other than us -- starting from me+1 in sorted set order
-						SharedPtr<Peer> *p = _activePeers.get(*sna);
+						SharedPtr<Peer> *p = _peers.get(*sna);
 						if ((p)&&((*p)->hasActiveDirectPath(now))) {
 							bestRoot = *p;
 							break;
@@ -249,12 +308,12 @@ bool Topology::isRoot(const Identity &id) const
 void Topology::clean(uint64_t now)
 {
 	Mutex::Lock _l(_lock);
-	Hashtable< Address,SharedPtr<Peer> >::Iterator i(_activePeers);
+	Hashtable< Address,SharedPtr<Peer> >::Iterator i(_peers);
 	Address *a = (Address *)0;
 	SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
 	while (i.next(a,p)) {
 		if (((now - (*p)->lastUsed()) >= ZT_PEER_IN_MEMORY_EXPIRATION)&&(std::find(_rootAddresses.begin(),_rootAddresses.end(),*a) == _rootAddresses.end())) {
-			_activePeers.erase(*a);
+			_peers.erase(*a);
 		} else {
 			(*p)->clean(RR,now);
 		}

+ 3 - 3
node/Topology.hpp

@@ -164,7 +164,7 @@ public:
 	inline void eachPeer(F f)
 	{
 		Mutex::Lock _l(_lock);
-		Hashtable< Address,SharedPtr<Peer> >::Iterator i(_activePeers);
+		Hashtable< Address,SharedPtr<Peer> >::Iterator i(_peers);
 		Address *a = (Address *)0;
 		SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
 		while (i.next(a,p))
@@ -177,7 +177,7 @@ public:
 	inline std::vector< std::pair< Address,SharedPtr<Peer> > > allPeers() const
 	{
 		Mutex::Lock _l(_lock);
-		return _activePeers.entries();
+		return _peers.entries();
 	}
 
 	/**
@@ -194,7 +194,7 @@ private:
 
 	const RuntimeEnvironment *RR;
 
-	Hashtable< Address,SharedPtr<Peer> > _activePeers;
+	Hashtable< Address,SharedPtr<Peer> > _peers;
 	std::map< Identity,std::vector<InetAddress> > _roots;
 	std::vector< Address > _rootAddresses;
 	std::vector< SharedPtr<Peer> > _rootPeers;