Browse Source

Fix a deadlock and some more work on RethinkDB (for central) integration.

Adam Ierymenko 7 years ago
parent
commit
4166d8ca35

+ 3 - 1
controller/DB.hpp

@@ -78,12 +78,14 @@ public:
 
 
 	void networks(std::vector<uint64_t> &networks);
 	void networks(std::vector<uint64_t> &networks);
 
 
-	virtual void save(const nlohmann::json &record) = 0;
+	virtual void save(nlohmann::json *orig,nlohmann::json &record) = 0;
 
 
 	virtual void eraseNetwork(const uint64_t networkId) = 0;
 	virtual void eraseNetwork(const uint64_t networkId) = 0;
 
 
 	virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0;
 	virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0;
 
 
+	virtual void nodeIsOnline(const uint64_t memberId) = 0;
+
 protected:
 protected:
 	struct _Network
 	struct _Network
 	{
 	{

+ 7 - 24
controller/EmbeddedNetworkController.cpp

@@ -734,12 +734,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
 					member["nwid"] = nwids;
 					member["nwid"] = nwids;
 
 
 					_removeMemberNonPersistedFields(member);
 					_removeMemberNonPersistedFields(member);
-					if (member != origMember) {
-						json &revj = member["revision"];
-						member["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL);
-						_db->save(member);
-					}
-
+					_db->save(&origMember,member);
 					_addMemberNonPersistedFields(nwid,address,member,now);
 					_addMemberNonPersistedFields(nwid,address,member,now);
 					responseBody = OSUtils::jsonDump(member);
 					responseBody = OSUtils::jsonDump(member);
 					responseContentType = "application/json";
 					responseContentType = "application/json";
@@ -986,12 +981,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
 				network["nwid"] = nwids; // legacy
 				network["nwid"] = nwids; // legacy
 
 
 				_removeNetworkNonPersistedFields(network);
 				_removeNetworkNonPersistedFields(network);
-				if (network != origNetwork) {
-					json &revj = network["revision"];
-					network["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL);
-					_db->save(network);
-				}
-
+				_db->save(&origNetwork,network);
 				ControllerDB::NetworkSummaryInfo ns;
 				ControllerDB::NetworkSummaryInfo ns;
 				_db->summary(nwid,ns);
 				_db->summary(nwid,ns);
 				_addNetworkNonPersistedFields(nwid,network,now,ns);
 				_addNetworkNonPersistedFields(nwid,network,now,ns);
@@ -1116,7 +1106,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
 		d["objtype"] = "trace";
 		d["objtype"] = "trace";
 		d["ts"] = now;
 		d["ts"] = now;
 		d["nodeId"] = Utils::hex10(rt.origin,tmp);
 		d["nodeId"] = Utils::hex10(rt.origin,tmp);
-		_db->save(d);
+		_db->save((nlohmann::json *)0,d);
 	} catch ( ... ) {
 	} catch ( ... ) {
 		// drop invalid trace messages if an error occurs
 		// drop invalid trace messages if an error occurs
 	}
 	}
@@ -1185,6 +1175,8 @@ void EmbeddedNetworkController::_request(
 		ms.lastRequestTime = now;
 		ms.lastRequestTime = now;
 	}
 	}
 
 
+	_db->nodeIsOnline(identity.address().toInt());
+
 	Utils::hex(nwid,nwids);
 	Utils::hex(nwid,nwids);
 	_db->get(nwid,network,identity.address().toInt(),member,ns);
 	_db->get(nwid,network,identity.address().toInt(),member,ns);
 	if ((!network.is_object())||(network.size() == 0)) {
 	if ((!network.is_object())||(network.size() == 0)) {
@@ -1299,11 +1291,7 @@ void EmbeddedNetworkController::_request(
 	} else {
 	} else {
 		// If they are not authorized, STOP!
 		// If they are not authorized, STOP!
 		_removeMemberNonPersistedFields(member);
 		_removeMemberNonPersistedFields(member);
-		if (origMember != member) {
-			json &revj = member["revision"];
-			member["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL);
-			_db->save(member);
-		}
+		_db->save(&origMember,member);
 		_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_ACCESS_DENIED);
 		_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_ACCESS_DENIED);
 		return;
 		return;
 	}
 	}
@@ -1666,12 +1654,7 @@ void EmbeddedNetworkController::_request(
 	}
 	}
 
 
 	_removeMemberNonPersistedFields(member);
 	_removeMemberNonPersistedFields(member);
-	if (member != origMember) {
-		json &revj = member["revision"];
-		member["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL);
-		_db->save(member);
-	}
-
+	_db->save(&origMember,member);
 	_sender->ncSendConfig(nwid,requestPacketId,identity.address(),*(nc.get()),metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_VERSION,0) < 6);
 	_sender->ncSendConfig(nwid,requestPacketId,identity.address(),*(nc.get()),metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_VERSION,0) < 6);
 }
 }
 
 

+ 6 - 1
controller/FileDB.cpp

@@ -69,7 +69,7 @@ bool FileDB::waitForReady()
 	return true;
 	return true;
 }
 }
 
 
-void FileDB::save(const nlohmann::json &record)
+void FileDB::save(nlohmann::json *orig,nlohmann::json &record)
 {
 {
 	char p1[16384],p2[16384];
 	char p1[16384],p2[16384];
 	try {
 	try {
@@ -126,4 +126,9 @@ void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
 {
 {
 }
 }
 
 
+void FileDB::nodeIsOnline(const uint64_t memberId)
+{
+	// Nothing to do here right now in the filesystem store mode since we can just get this from the peer list
+}
+
 } // namespace ZeroTier
 } // namespace ZeroTier

+ 3 - 1
controller/FileDB.hpp

@@ -32,12 +32,14 @@ public:
 
 
 	virtual bool waitForReady();
 	virtual bool waitForReady();
 
 
-	virtual void save(const nlohmann::json &record);
+	virtual void save(nlohmann::json *orig,nlohmann::json &record);
 
 
 	virtual void eraseNetwork(const uint64_t networkId);
 	virtual void eraseNetwork(const uint64_t networkId);
 
 
 	virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
 	virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
 
 
+	virtual void nodeIsOnline(const uint64_t memberId);
+
 protected:
 protected:
 	std::string _networksPath;
 	std::string _networksPath;
 };
 };

+ 71 - 3
controller/RethinkDB.cpp

@@ -215,6 +215,47 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
 		});
 		});
 	}
 	}
 
 
+	_onlineNotificationThread = std::thread([this]() {
+		try {
+			std::unique_ptr<R::Connection> rdb;
+			while (_run == 1) {
+				try {
+					if (!rdb)
+						rdb = R::connect(this->_host,this->_port,this->_auth);
+					if (rdb) {
+						std::lock_guard<std::mutex> l(_lastOnline_l);
+						R::Array batch;
+						R::Object tmpobj;
+						for(auto i=_lastOnline.begin();i!=_lastOnline.end();++i) {
+							char nodeId[16];
+							Utils::hex10(i->first,nodeId);
+							tmpobj["id"] = nodeId;
+							tmpobj["ts"] = i->second;
+							batch.emplace_back(tmpobj);
+							if (batch.size() >= 256) {
+								R::db(this->_db).table("NodeLastOnline").insert(R::args(batch),R::optargs("conflict","update")).run(*rdb);
+								batch.clear();
+							}
+						}
+						if (batch.size() > 0)
+							R::db(this->_db).table("NodeLastOnline").insert(R::args(batch),R::optargs("conflict","update")).run(*rdb);
+						_lastOnline.clear();
+					}
+				} catch (std::exception &e) {
+					fprintf(stderr,"ERROR: controller RethinkDB (node status update): %s" ZT_EOL_S,e.what());
+					rdb.reset();
+				} catch (R::Error &e) {
+					fprintf(stderr,"ERROR: controller RethinkDB (node status update): %s" ZT_EOL_S,e.message.c_str());
+					rdb.reset();
+				} catch ( ... ) {
+					fprintf(stderr,"ERROR: controller RethinkDB (node status update): unknown exception" ZT_EOL_S);
+					rdb.reset();
+				}
+				std::this_thread::sleep_for(std::chrono::milliseconds(250));
+			}
+		} catch ( ... ) {}
+	});
+
 	_heartbeatThread = std::thread([this]() {
 	_heartbeatThread = std::thread([this]() {
 		try {
 		try {
 			char tmp[1024];
 			char tmp[1024];
@@ -251,9 +292,10 @@ RethinkDB::~RethinkDB()
 	_membersDbWatcher.join();
 	_membersDbWatcher.join();
 	_networksDbWatcher.join();
 	_networksDbWatcher.join();
 	_heartbeatThread.join();
 	_heartbeatThread.join();
+	_onlineNotificationThread.join();
 }
 }
 
 
-void RethinkDB::waitForReady()
+bool RethinkDB::waitForReady()
 {
 {
 	while (_ready > 0) {
 	while (_ready > 0) {
 		if (!_waitNoticePrinted) {
 		if (!_waitNoticePrinted) {
@@ -263,12 +305,32 @@ void RethinkDB::waitForReady()
 		_readyLock.lock();
 		_readyLock.lock();
 		_readyLock.unlock();
 		_readyLock.unlock();
 	}
 	}
+	return true;
 }
 }
 
 
-void RethinkDB::save(const nlohmann::json &record)
+void RethinkDB::save(nlohmann::json *orig,nlohmann::json &record)
 {
 {
+	if (!record.is_object()) // sanity check
+		return;
 	waitForReady();
 	waitForReady();
-	_commitQueue.post(new nlohmann::json(record));
+	if (orig) {
+		if (*orig != record) {
+			nlohmann::json *q = new nlohmann::json();
+			try {
+				record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
+				for(auto kv=record.begin();kv!=record.end();++kv) {
+					if ((kv.key() == "id")||(kv.key() == "nwid")||(kv.key() == "objtype")||((*q)[kv.key()] != kv.value()))
+						(*q)[kv.key()] = kv.value();
+				}
+			} catch ( ... ) {
+				delete q;
+				throw;
+			}
+		}
+	} else {
+		record["revision"] = 1;
+		_commitQueue.post(new nlohmann::json(record));
+	}
 }
 }
 
 
 void RethinkDB::eraseNetwork(const uint64_t networkId)
 void RethinkDB::eraseNetwork(const uint64_t networkId)
@@ -295,6 +357,12 @@ void RethinkDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
 	_commitQueue.post(tmp);
 	_commitQueue.post(tmp);
 }
 }
 
 
+void RethinkDB::nodeIsOnline(const uint64_t memberId)
+{
+	std::lock_guard<std::mutex> l(_lastOnline_l);
+	_lastOnline[memberId] = OSUtils::now();
+}
+
 } // namespace ZeroTier
 } // namespace ZeroTier
 
 
 #endif // ZT_CONTROLLER_USE_RETHINKDB
 #endif // ZT_CONTROLLER_USE_RETHINKDB

+ 8 - 2
controller/RethinkDB.hpp

@@ -34,14 +34,16 @@ public:
 	RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path);
 	RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path);
 	virtual ~RethinkDB();
 	virtual ~RethinkDB();
 
 
-	virtual void waitForReady();
+	virtual bool waitForReady();
 
 
-	virtual void save(const nlohmann::json &record);
+	virtual void save(nlohmann::json *orig,nlohmann::json &record);
 
 
 	virtual void eraseNetwork(const uint64_t networkId);
 	virtual void eraseNetwork(const uint64_t networkId);
 
 
 	virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
 	virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
 
 
+	virtual void nodeIsOnline(const uint64_t memberId);
+
 protected:
 protected:
 	std::string _host;
 	std::string _host;
 	std::string _db;
 	std::string _db;
@@ -56,6 +58,10 @@ protected:
 	BlockingQueue< nlohmann::json * > _commitQueue;
 	BlockingQueue< nlohmann::json * > _commitQueue;
 	std::thread _commitThread[ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS];
 	std::thread _commitThread[ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS];
 
 
+	std::unordered_map< uint64_t,int64_t > _lastOnline;
+	mutable std::mutex _lastOnline_l;
+	std::thread _onlineNotificationThread;
+
 	std::thread _heartbeatThread;
 	std::thread _heartbeatThread;
 
 
 	mutable std::mutex _readyLock; // locked until ready
 	mutable std::mutex _readyLock; // locked until ready

+ 8 - 51
node/Peer.cpp

@@ -78,54 +78,6 @@ void Peer::received(
 {
 {
 	const int64_t now = RR->node->now();
 	const int64_t now = RR->node->now();
 
 
-/*
-#ifdef ZT_ENABLE_CLUSTER
-	bool isClusterSuboptimalPath = false;
-	if ((RR->cluster)&&(hops == 0)) {
-		// Note: findBetterEndpoint() is first since we still want to check
-		// for a better endpoint even if we don't actually send a redirect.
-		InetAddress redirectTo;
-		if ( (verb != Packet::VERB_OK) && (verb != Packet::VERB_ERROR) && (verb != Packet::VERB_RENDEZVOUS) && (verb != Packet::VERB_PUSH_DIRECT_PATHS) && (RR->cluster->findBetterEndpoint(redirectTo,_id.address(),path->address(),false)) ) {
-			if (_vProto >= 5) {
-				// For newer peers we can send a more idiomatic verb: PUSH_DIRECT_PATHS.
-				Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS);
-				outp.append((uint16_t)1); // count == 1
-				outp.append((uint8_t)ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT); // flags: cluster redirect
-				outp.append((uint16_t)0); // no extensions
-				if (redirectTo.ss_family == AF_INET) {
-					outp.append((uint8_t)4);
-					outp.append((uint8_t)6);
-					outp.append(redirectTo.rawIpData(),4);
-				} else {
-					outp.append((uint8_t)6);
-					outp.append((uint8_t)18);
-					outp.append(redirectTo.rawIpData(),16);
-				}
-				outp.append((uint16_t)redirectTo.port());
-				outp.armor(_key,true,path->nextOutgoingCounter());
-				path->send(RR,tPtr,outp.data(),outp.size(),now);
-			} else {
-				// For older peers we use RENDEZVOUS to coax them into contacting us elsewhere.
-				Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS);
-				outp.append((uint8_t)0); // no flags
-				RR->identity.address().appendTo(outp);
-				outp.append((uint16_t)redirectTo.port());
-				if (redirectTo.ss_family == AF_INET) {
-					outp.append((uint8_t)4);
-					outp.append(redirectTo.rawIpData(),4);
-				} else {
-					outp.append((uint8_t)16);
-					outp.append(redirectTo.rawIpData(),16);
-				}
-				outp.armor(_key,true,path->nextOutgoingCounter());
-				path->send(RR,tPtr,outp.data(),outp.size(),now);
-			}
-			isClusterSuboptimalPath = true;
-		}
-	}
-#endif
-*/
-
 	_lastReceive = now;
 	_lastReceive = now;
 	switch (verb) {
 	switch (verb) {
 		case Packet::VERB_FRAME:
 		case Packet::VERB_FRAME:
@@ -163,6 +115,7 @@ void Peer::received(
 			}
 			}
 		}
 		}
 
 
+		bool attemptToContact = false;
 		if ((!havePath)&&(RR->node->shouldUsePathForZeroTierTraffic(tPtr,_id.address(),path->localSocket(),path->address()))) {
 		if ((!havePath)&&(RR->node->shouldUsePathForZeroTierTraffic(tPtr,_id.address(),path->localSocket(),path->address()))) {
 			Mutex::Lock _l(_paths_m);
 			Mutex::Lock _l(_paths_m);
 
 
@@ -201,13 +154,17 @@ void Peer::received(
 						_paths[replacePath].p = path;
 						_paths[replacePath].p = path;
 						_paths[replacePath].priority = 1;
 						_paths[replacePath].priority = 1;
 					} else {
 					} else {
-						attemptToContactAt(tPtr,path->localSocket(),path->address(),now,true,path->nextOutgoingCounter());
-						path->sent(now);
-						RR->t->peerConfirmingUnknownPath(tPtr,networkId,*this,path,packetId,verb);
+						attemptToContact = true;
 					}
 					}
 				}
 				}
 			}
 			}
 		}
 		}
+
+		if (attemptToContact) {
+			attemptToContactAt(tPtr,path->localSocket(),path->address(),now,true,path->nextOutgoingCounter());
+			path->sent(now);
+			RR->t->peerConfirmingUnknownPath(tPtr,networkId,*this,path,packetId,verb);
+		}
 	}
 	}
 
 
 	// If we have a trust relationship periodically push a message enumerating
 	// If we have a trust relationship periodically push a message enumerating