Ver Fonte

More work on DB mirroring.

Adam Ierymenko há 6 anos atrás
pai
commit
00fb9c475e

+ 8 - 36
controller/DB.cpp

@@ -104,16 +104,7 @@ void DB::cleanMember(nlohmann::json &member)
 	member.erase("lastRequestMetaData");
 }
 
-DB::DB(const Identity &myId,const char *path) :
-	_myId(myId),
-	_myAddress(myId.address()),
-	_path((path) ? path : "")
-{
-	char tmp[32];
-	_myAddress.toString(tmp);
-	_myAddressStr = tmp;
-}
-
+DB::DB() {}
 DB::~DB() {}
 
 bool DB::get(const uint64_t networkId,nlohmann::json &network)
@@ -199,34 +190,15 @@ bool DB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohma
 	return true;
 }
 
-bool DB::summary(const uint64_t networkId,NetworkSummaryInfo &info)
-{
-	waitForReady();
-	std::shared_ptr<_Network> nw;
-	{
-		std::lock_guard<std::mutex> l(_networks_l);
-		auto nwi = _networks.find(networkId);
-		if (nwi == _networks.end())
-			return false;
-		nw = nwi->second;
-	}
-	{
-		std::lock_guard<std::mutex> l2(nw->lock);
-		_fillSummaryInfo(nw,info);
-	}
-	return true;
-}
-
-void DB::networks(std::vector<uint64_t> &networks)
+void DB::networks(std::set<uint64_t> &networks)
 {
 	waitForReady();
 	std::lock_guard<std::mutex> l(_networks_l);
-	networks.reserve(_networks.size() + 1);
 	for(auto n=_networks.begin();n!=_networks.end();++n)
-		networks.push_back(n->first);
+		networks.insert(n->first);
 }
 
-void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized)
+void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool notifyListeners)
 {
 	uint64_t memberId = 0;
 	uint64_t networkId = 0;
@@ -310,7 +282,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in
 			}
 		}
 
-		if (initialized) {
+		if (notifyListeners) {
 			std::lock_guard<std::mutex> ll(_changeListeners_l);
 			for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
 				(*i)->onNetworkMemberUpdate(this,networkId,memberId,memberConfig);
@@ -333,7 +305,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in
 		}
 	}
 
-	if ((initialized)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) {
+	if ((notifyListeners)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) {
 		std::lock_guard<std::mutex> ll(_changeListeners_l);
 		for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
 			(*i)->onNetworkMemberDeauthorize(this,networkId,memberId);
@@ -341,7 +313,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in
 	}
 }
 
-void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized)
+void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool notifyListeners)
 {
 	if (networkConfig.is_object()) {
 		const std::string ids = networkConfig["id"];
@@ -359,7 +331,7 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool
 				std::lock_guard<std::mutex> l2(nw->lock);
 				nw->config = networkConfig;
 			}
-			if (initialized) {
+			if (notifyListeners) {
 				std::lock_guard<std::mutex> ll(_changeListeners_l);
 				for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
 					(*i)->onNetworkUpdate(this,networkId,networkConfig);

+ 9 - 15
controller/DB.hpp

@@ -43,6 +43,7 @@
 #include <vector>
 #include <atomic>
 #include <mutex>
+#include <set>
 
 #include "../ext/json/json.hpp"
 
@@ -60,10 +61,9 @@ public:
 	public:
 		ChangeListener() {}
 		virtual ~ChangeListener() {}
-		virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network) {}
-		virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {}
-		virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId) {}
-		virtual void onNetworkMemberOnline(const DB *db,uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress) {}
+		virtual void onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network) {}
+		virtual void onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {}
+		virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId) {}
 	};
 
 	struct NetworkSummaryInfo
@@ -81,7 +81,7 @@ public:
 	static void cleanNetwork(nlohmann::json &network);
 	static void cleanMember(nlohmann::json &member);
 
-	DB(const Identity &myId,const char *path);
+	DB();
 	virtual ~DB();
 
 	virtual bool waitForReady() = 0;
@@ -98,10 +98,9 @@ public:
 	bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info);
 	bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
 
-	bool summary(const uint64_t networkId,NetworkSummaryInfo &info);
-	void networks(std::vector<uint64_t> &networks);
+	void networks(std::set<uint64_t> &networks);
 
-	virtual void save(nlohmann::json &record) = 0;
+	virtual bool save(nlohmann::json &record,bool notifyListeners) = 0;
 
 	virtual void eraseNetwork(const uint64_t networkId) = 0;
 	virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0;
@@ -127,15 +126,10 @@ protected:
 		std::mutex lock;
 	};
 
-	void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized);
-	void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized);
+	void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool notifyListeners);
+	void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool notifyListeners);
 	void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info);
 
-	const Identity _myId;
-	const Address _myAddress;
-	const std::string _path;
-	std::string _myAddressStr;
-
 	std::vector<DB::ChangeListener *> _changeListeners;
 	std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks;
 	std::unordered_multimap< uint64_t,uint64_t > _networkByMember;

+ 95 - 11
controller/DBMirrorSet.cpp

@@ -28,7 +28,8 @@
 
 namespace ZeroTier {
 
-DBMirrorSet::DBMirrorSet()
+DBMirrorSet::DBMirrorSet(DB::ChangeListener *listener) :
+	_listener(listener)
 {
 }
 
@@ -36,6 +37,65 @@ DBMirrorSet::~DBMirrorSet()
 {
 }
 
+bool DBMirrorSet::hasNetwork(const uint64_t networkId) const
+{
+	std::lock_guard<std::mutex> l(_dbs_l);
+	for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+		if ((*d)->hasNetwork(networkId))
+			return true;
+	}
+	return false;
+}
+
+bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network)
+{
+	std::lock_guard<std::mutex> l(_dbs_l);
+	for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+		if (get(networkId,network)) {
+			return true;
+		}
+	}
+	return false;
+}
+
+bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member)
+{
+	std::lock_guard<std::mutex> l(_dbs_l);
+	for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+		if (get(networkId,network,memberId,member))
+			return true;
+	}
+	return false;
+}
+
+bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,DB::NetworkSummaryInfo &info)
+{
+	std::lock_guard<std::mutex> l(_dbs_l);
+	for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+		if (get(networkId,network,memberId,member,info))
+			return true;
+	}
+	return false;
+}
+
+bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members)
+{
+	std::lock_guard<std::mutex> l(_dbs_l);
+	for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+		if (get(networkId,network,members))
+			return true;
+	}
+	return false;
+}
+
+void DBMirrorSet::networks(std::set<uint64_t> &networks)
+{
+	std::lock_guard<std::mutex> l(_dbs_l);
+	for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+		(*d)->networks(networks);
+	}
+}
+
 bool DBMirrorSet::waitForReady()
 {
 	bool r = false;
@@ -56,11 +116,21 @@ bool DBMirrorSet::isReady()
 	return true;
 }
 
-void DBMirrorSet::save(nlohmann::json &record)
+bool DBMirrorSet::save(nlohmann::json &record,bool notifyListeners)
 {
 	std::lock_guard<std::mutex> l(_dbs_l);
-	for(auto d=_dbs.begin();d!=_dbs.end();++d) {
-		(*d)->save(record);
+	if (notifyListeners) {
+		for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+			if ((*d)->save(record,notifyListeners))
+				return true;
+		}
+		return false;
+	} else {
+		bool modified = false;
+		for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+			modified |= (*d)->save(record,notifyListeners);
+		}
+		return modified;
 	}
 }
 
@@ -88,25 +158,39 @@ void DBMirrorSet::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,
 	}
 }
 
-void DBMirrorSet::onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network)
+void DBMirrorSet::onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network)
 {
+	bool modified = false;
+	nlohmann::json record(network);
 	std::lock_guard<std::mutex> l(_dbs_l);
 	for(auto d=_dbs.begin();d!=_dbs.end();++d) {
 		if (d->get() != db) {
+			modified |= (*d)->save(record,false);
 		}
 	}
+	if (modified) {
+		_listener->onNetworkUpdate(this,networkId,network);
+	}
 }
 
-void DBMirrorSet::onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
-{
-}
-
-void DBMirrorSet::onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId)
+void DBMirrorSet::onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
 {
+	bool modified = false;
+	nlohmann::json record(member);
+	std::lock_guard<std::mutex> l(_dbs_l);
+	for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+		if (d->get() != db) {
+			modified |= (*d)->save(record,false);
+		}
+	}
+	if (modified) {
+		_listener->onNetworkMemberUpdate(this,networkId,memberId,member);
+	}
 }
 
-void DBMirrorSet::onNetworkMemberOnline(const DB *db,uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress)
+void DBMirrorSet::onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId)
 {
+	_listener->onNetworkMemberDeauthorize(this,networkId,memberId);
 }
 
 } // namespace ZeroTier

+ 18 - 7
controller/DBMirrorSet.hpp

@@ -32,37 +32,48 @@
 #include <vector>
 #include <memory>
 #include <mutex>
+#include <set>
 
 namespace ZeroTier {
 
 class DBMirrorSet : public DB::ChangeListener
 {
 public:
-	DBMirrorSet();
+	DBMirrorSet(DB::ChangeListener *listener);
 	virtual ~DBMirrorSet();
 
+	bool hasNetwork(const uint64_t networkId) const;
+
+	bool get(const uint64_t networkId,nlohmann::json &network);
+	bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member);
+	bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,DB::NetworkSummaryInfo &info);
+	bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
+
+	void networks(std::set<uint64_t> &networks);
+
 	bool waitForReady();
 	bool isReady();
-	void save(nlohmann::json &record);
+	bool save(nlohmann::json &record,bool notifyListeners);
 	void eraseNetwork(const uint64_t networkId);
 	void eraseMember(const uint64_t networkId,const uint64_t memberId);
 	void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);
 
 	// These are called by various DB instances when changes occur.
-	virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network);
-	virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member);
-	virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId);
-	virtual void onNetworkMemberOnline(const DB *db,uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress);
+	virtual void onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network);
+	virtual void onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member);
+	virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId);
 
 	inline void addDB(const std::shared_ptr<DB> &db)
 	{
+		db->addListener(this);
 		std::lock_guard<std::mutex> l(_dbs_l);
 		_dbs.push_back(db);
 	}
 
 private:
+	DB::ChangeListener *const _listener;
 	std::vector< std::shared_ptr< DB > > _dbs;
-	std::mutex _dbs_l;
+	mutable std::mutex _dbs_l;
 };
 
 } // namespace ZeroTier

+ 3 - 3
controller/EmbeddedNetworkController.cpp

@@ -1188,7 +1188,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
 	}
 }
 
-void EmbeddedNetworkController::onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network)
+void EmbeddedNetworkController::onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network)
 {
 	// Send an update to all members of the network that are online
 	const int64_t now = OSUtils::now();
@@ -1199,7 +1199,7 @@ void EmbeddedNetworkController::onNetworkUpdate(const DB *db,uint64_t networkId,
 	}
 }
 
-void EmbeddedNetworkController::onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
+void EmbeddedNetworkController::onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
 {
 	// Push update to member if online
 	try {
@@ -1210,7 +1210,7 @@ void EmbeddedNetworkController::onNetworkMemberUpdate(const DB *db,uint64_t netw
 	} catch ( ... ) {}
 }
 
-void EmbeddedNetworkController::onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId)
+void EmbeddedNetworkController::onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId)
 {
 	const int64_t now = OSUtils::now();
 	Revocation rev((uint32_t)_node->prng(),networkId,0,now,ZT_REVOCATION_FLAG_FAST_PROPAGATE,Address(memberId),Revocation::CREDENTIAL_TYPE_COM);

+ 3 - 3
controller/EmbeddedNetworkController.hpp

@@ -101,9 +101,9 @@ public:
 
 	void handleRemoteTrace(const ZT_RemoteTrace &rt);
 
-	virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network);
-	virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member);
-	virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId);
+	virtual void onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network);
+	virtual void onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member);
+	virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId);
 
 private:
 	void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData);

+ 12 - 14
controller/FileDB.cpp

@@ -29,8 +29,9 @@
 namespace ZeroTier
 {
 
-FileDB::FileDB(const Identity &myId,const char *path) :
-	DB(myId,path),
+FileDB::FileDB(const char *path) :
+	DB(),
+	_path(path),
 	_networksPath(_path + ZT_PATH_SEPARATOR_S + "network"),
 	_tracePath(_path + ZT_PATH_SEPARATOR_S + "trace"),
 	_running(true)
@@ -85,9 +86,10 @@ FileDB::~FileDB()
 bool FileDB::waitForReady() { return true; }
 bool FileDB::isReady() { return true; }
 
-void FileDB::save(nlohmann::json &record)
+bool FileDB::save(nlohmann::json &record,bool notifyListeners)
 {
 	char p1[4096],p2[4096],pb[4096];
+	bool modified = false;
 	try {
 		const std::string objtype = record["objtype"];
 		if (objtype == "network") {
@@ -101,7 +103,8 @@ void FileDB::save(nlohmann::json &record)
 					OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid);
 					if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)))
 						fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
-					_networkChanged(old,record,true);
+					_networkChanged(old,record,notifyListeners);
+					modified = true;
 				}
 			}
 
@@ -123,12 +126,14 @@ void FileDB::save(nlohmann::json &record)
 						if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)))
 							fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
 					}
-					_memberChanged(old,record,true);
+					_memberChanged(old,record,notifyListeners);
+					modified = true;
 				}
 			}
 
 		}
 	} catch ( ... ) {} // drop invalid records missing fields
+	return modified;
 }
 
 void FileDB::eraseNetwork(const uint64_t networkId)
@@ -163,15 +168,8 @@ void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const
 	char mid[32],atmp[64];
 	OSUtils::ztsnprintf(mid,sizeof(mid),"%.10llx",(unsigned long long)memberId);
 	physicalAddress.toString(atmp);
-	{
-		std::lock_guard<std::mutex> l(this->_online_l);
-		this->_online[networkId][memberId][OSUtils::now()] = physicalAddress;
-	}
-	{
-		std::lock_guard<std::mutex> l2(_changeListeners_l);
-		for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i)
-			(*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress);
-	}
+	std::lock_guard<std::mutex> l(this->_online_l);
+	this->_online[networkId][memberId][OSUtils::now()] = physicalAddress;
 }
 
 } // namespace ZeroTier

+ 3 - 2
controller/FileDB.hpp

@@ -35,17 +35,18 @@ namespace ZeroTier
 class FileDB : public DB
 {
 public:
-	FileDB(const Identity &myId,const char *path);
+	FileDB(const char *path);
 	virtual ~FileDB();
 
 	virtual bool waitForReady();
 	virtual bool isReady();
-	virtual void save(nlohmann::json &record);
+	virtual bool save(nlohmann::json &record,bool notifyListeners);
 	virtual void eraseNetwork(const uint64_t networkId);
 	virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
 	virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);
 
 protected:
+	std::string _path;
 	std::string _networksPath;
 	std::string _tracePath;
 	std::thread _onlineUpdateThread;

+ 17 - 20
controller/LFDB.cpp

@@ -38,7 +38,7 @@ namespace ZeroTier
 {
 
 LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState) :
-	DB(myId,path),
+	DB(),
 	_myId(myId),
 	_lfOwnerPrivate((lfOwnerPrivate) ? lfOwnerPrivate : ""),
 	_lfOwnerPublic((lfOwnerPublic) ? lfOwnerPublic : ""),
@@ -335,8 +335,9 @@ bool LFDB::isReady()
 	return (_ready.load());
 }
 
-void LFDB::save(nlohmann::json &record)
+bool LFDB::save(nlohmann::json &record,bool notifyListeners)
 {
+	bool modified = false;
 	const std::string objtype = record["objtype"];
 	if (objtype == "network") {
 		const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL);
@@ -345,11 +346,12 @@ void LFDB::save(nlohmann::json &record)
 			get(nwid,old);
 			if ((!old.is_object())||(old != record)) {
 				record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
-				_networkChanged(old,record,true);
+				_networkChanged(old,record,notifyListeners);
 				{
 					std::lock_guard<std::mutex> l(_state_l);
 					_state[nwid].dirty = true;
 				}
+				modified = true;
 			}
 		}
 	} else if (objtype == "member") {
@@ -360,14 +362,16 @@ void LFDB::save(nlohmann::json &record)
 			get(nwid,network,id,old);
 			if ((!old.is_object())||(old != record)) {
 				record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
-				_memberChanged(old,record,true);
+				_memberChanged(old,record,notifyListeners);
 				{
 					std::lock_guard<std::mutex> l(_state_l);
 					_state[nwid].members[id].dirty = true;
 				}
+				modified = true;
 			}
 		}
 	}
+	return modified;
 }
 
 void LFDB::eraseNetwork(const uint64_t networkId)
@@ -382,24 +386,17 @@ void LFDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
 
 void LFDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress)
 {
-	{
-		std::lock_guard<std::mutex> l(_state_l);
-		auto nw = _state.find(networkId);
-		if (nw != _state.end()) {
-			auto m = nw->second.members.find(memberId);
-			if (m != nw->second.members.end()) {
-				m->second.lastOnlineTime = OSUtils::now();
-				if (physicalAddress)
-					m->second.lastOnlineAddress = physicalAddress;
-				m->second.lastOnlineDirty = true;
-			}
+	std::lock_guard<std::mutex> l(_state_l);
+	auto nw = _state.find(networkId);
+	if (nw != _state.end()) {
+		auto m = nw->second.members.find(memberId);
+		if (m != nw->second.members.end()) {
+			m->second.lastOnlineTime = OSUtils::now();
+			if (physicalAddress)
+				m->second.lastOnlineAddress = physicalAddress;
+			m->second.lastOnlineDirty = true;
 		}
 	}
-	{
-		std::lock_guard<std::mutex> l2(_changeListeners_l);
-		for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i)
-			(*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress);
-	}
 }
 
 } // namespace ZeroTier

+ 2 - 2
controller/LFDB.hpp

@@ -43,7 +43,7 @@ class LFDB : public DB
 {
 public:
 	/**
-	 * @param myId Identity of controller node (with secret)
+	 * @param myId This controller's identity
 	 * @param path Base path for ZeroTier node itself
 	 * @param lfOwnerPrivate LF owner private in PEM format
 	 * @param lfOwnerPublic LF owner public in @base62 format
@@ -56,7 +56,7 @@ public:
 
 	virtual bool waitForReady();
 	virtual bool isReady();
-	virtual void save(nlohmann::json &record);
+	virtual bool save(nlohmann::json &record,bool notifyListeners);
 	virtual void eraseNetwork(const uint64_t networkId);
 	virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
 	virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);

+ 35 - 36
controller/PostgreSQL.cpp

@@ -81,7 +81,9 @@ std::string join(const std::vector<std::string> &elements, const char * const se
 using namespace ZeroTier;
 
 PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
-	: DB(myId, path)
+	: DB()
+	, _myId(myId)
+	, _myAddress(myId.address())
 	, _ready(0)
 	, _connected(1)
 	, _run(1)
@@ -89,7 +91,9 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, M
 	, _listenPort(listenPort)
 	, _mqc(mqc)
 {
-	_connString = std::string(path) + " application_name=controller_" +_myAddressStr;
+	char myAddress[64];
+	_myAddressStr = myId.address().toString(myAddress);
+	_connString = std::string(path) + " application_name=controller_" + _myAddressStr;
 
 	// Database Schema Version Check
 	PGconn *conn = getPgConn();
@@ -165,8 +169,9 @@ bool PostgreSQL::isReady()
 	return ((_ready == 2)&&(_connected));
 }
 
-void PostgreSQL::save(nlohmann::json &record)
+bool PostgreSQL::save(nlohmann::json &record,bool notifyListeners)
 {
+	bool modified = false;
 	try {
 		if (!record.is_object())
 			return;
@@ -178,7 +183,8 @@ void PostgreSQL::save(nlohmann::json &record)
 				get(nwid,old);
 				if ((!old.is_object())||(old != record)) {
 					record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
-					_commitQueue.post(new nlohmann::json(record));
+					_commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners));
+					modified = true;
 				}
 			}
 		} else if (objtype == "member") {
@@ -189,7 +195,8 @@ void PostgreSQL::save(nlohmann::json &record)
 				get(nwid,network,id,old);
 				if ((!old.is_object())||(old != record)) {
 					record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
-					_commitQueue.post(new nlohmann::json(record));
+					_commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners));
+					modified = true;
 				}
 			}
 		}
@@ -210,6 +217,7 @@ void PostgreSQL::save(nlohmann::json &record)
 	} catch (...) {
 		fprintf(stderr, "Unknown error on PostgreSQL::save\n");
 	}
+	return modified;
 }
 
 void PostgreSQL::eraseNetwork(const uint64_t networkId)
@@ -217,38 +225,33 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId)
 	char tmp2[24];
 	waitForReady();
 	Utils::hex(networkId, tmp2);
-	json *tmp = new json();
-	(*tmp)["id"] = tmp2;
-	(*tmp)["objtype"] = "_delete_network";
+	std::pair<nlohmann::json,bool> tmp;
+	tmp.first["id"] = tmp2;
+	tmp.first["objtype"] = "_delete_network";
+	tmp.second = true;
 	_commitQueue.post(tmp);
 }
 
 void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId) 
 {
 	char tmp2[24];
-	json *tmp = new json();
+	std::pair<nlohmann::json,bool> tmp;
 	Utils::hex(networkId, tmp2);
-	(*tmp)["nwid"] = tmp2;
+	tmp.first["nwid"] = tmp2;
 	Utils::hex(memberId, tmp2);
-	(*tmp)["id"] = tmp2;
-	(*tmp)["objtype"] = "_delete_member";
+	tmp.first["id"] = tmp2;
+	tmp.first["objtype"] = "_delete_member";
+	tmp.second = true;
 	_commitQueue.post(tmp);
 }
 
 void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress)
 {
-	{
-		std::lock_guard<std::mutex> l(_lastOnline_l);
-		std::pair<int64_t, InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId, memberId)];
-		i.first = OSUtils::now();
-		if (physicalAddress) {
-			i.second = physicalAddress;
-		}
-	}
-	{
-		std::lock_guard<std::mutex> l2(_changeListeners_l);
-		for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i)
-			(*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress);
+	std::lock_guard<std::mutex> l(_lastOnline_l);
+	std::pair<int64_t, InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId, memberId)];
+	i.first = OSUtils::now();
+	if (physicalAddress) {
+		i.second = physicalAddress;
 	}
 }
 
@@ -868,18 +871,18 @@ void PostgreSQL::commitThread()
 		exit(1);
 	}
 
-	json *config = nullptr;
-	while(_commitQueue.get(config)&(_run == 1)) {
-		if (!config) {
+	std::pair<nlohmann::json,bool> qitem;
+	while(_commitQueue.get(qitem)&(_run == 1)) {
+		if (!qitem.first.is_object()) {
 			continue;
 		}
 		if (PQstatus(conn) == CONNECTION_BAD) {
 			fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn));
 			PQfinish(conn);
-			delete config;
 			exit(1);
 		}
-		try { 
+		try {
+			nlohmann::json *config = &(qitem.first);
 			const std::string objtype = (*config)["objtype"];
 			if (objtype == "member") {
 				try {
@@ -1034,10 +1037,10 @@ void PostgreSQL::commitThread()
 						nlohmann::json memOrig;
 
 						nlohmann::json memNew(*config);
-						
+
 						get(nwidInt, nwOrig, memberidInt, memOrig);
 				
-						_memberChanged(memOrig, memNew, (this->_ready>=2));
+						_memberChanged(memOrig, memNew, qitem.second);
 					} else {
 						fprintf(stderr, "Can't notify of change.  Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt);
 					}
@@ -1260,7 +1263,7 @@ void PostgreSQL::commitThread()
 
 						get(nwidInt, nwOrig);
 
-						_networkChanged(nwOrig, nwNew, true);
+						_networkChanged(nwOrig, nwNew, qitem.second);
 					} else {
 						fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt);
 					}
@@ -1268,8 +1271,6 @@ void PostgreSQL::commitThread()
 				} catch (std::exception &e) {
 					fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
 				}
-			} else if (objtype == "trace") {
-				fprintf(stderr, "ERROR: Trace not yet implemented");
 			} else if (objtype == "_delete_network") {
 				try {
 					std::string networkId = (*config)["nwid"];
@@ -1326,8 +1327,6 @@ void PostgreSQL::commitThread()
 		} catch (std::exception &e) {
 			fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what());
 		}
-		delete config;
-		config = nullptr;
 
 		std::this_thread::sleep_for(std::chrono::milliseconds(10));
 	}

+ 5 - 2
controller/PostgreSQL.hpp

@@ -55,7 +55,7 @@ public:
 
 	virtual bool waitForReady();
 	virtual bool isReady();
-	virtual void save(nlohmann::json &record);
+	virtual bool save(nlohmann::json &record,bool notifyListeners);
 	virtual void eraseNetwork(const uint64_t networkId);
 	virtual void eraseMember(const uint64_t networkId, const uint64_t memberId);
 	virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress);
@@ -87,9 +87,12 @@ private:
 
 	PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE );
 
+	const Identity _myId;
+	const Address _myAddress;
+	std::string _myAddressStr;
 	std::string _connString;
 
-	BlockingQueue<nlohmann::json *> _commitQueue;
+	BlockingQueue< std::pair<nlohmann::json,bool> > _commitQueue;
 
 	std::thread _heartbeatThread;
 	std::thread _membersDbWatcher;