Browse Source

Abstract out change listener from controller itself to permit DBs to shadow changes from other DBs.

Adam Ierymenko 6 năm trước cách đây
mục cha
commit
f6b080b8a2

+ 28 - 19
controller/DB.cpp

@@ -104,8 +104,7 @@ void DB::cleanMember(nlohmann::json &member)
 	member.erase("lastRequestMetaData");
 }
 
-DB::DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) :
-	_controller(nc),
+DB::DB(const Identity &myId,const char *path) :
 	_myId(myId),
 	_myAddress(myId.address()),
 	_path((path) ? path : "")
@@ -115,9 +114,7 @@ DB::DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path
 	_myAddressStr = tmp;
 }
 
-DB::~DB()
-{
-}
+DB::~DB() {}
 
 bool DB::get(const uint64_t networkId,nlohmann::json &network)
 {
@@ -229,7 +226,7 @@ void DB::networks(std::vector<uint64_t> &networks)
 		networks.push_back(n->first);
 }
 
-void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool push)
+void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized)
 {
 	uint64_t memberId = 0;
 	uint64_t networkId = 0;
@@ -313,8 +310,12 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool pu
 			}
 		}
 
-		if (push)
-			_controller->onNetworkMemberUpdate(networkId,memberId);
+		if (initialized) {
+			std::lock_guard<std::mutex> ll(_changeListeners_l);
+			for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
+				(*i)->onNetworkMemberUpdate(networkId,memberId,memberConfig);
+			}
+		}
 	} else if (memberId) {
 		if (nw) {
 			std::lock_guard<std::mutex> l(nw->lock);
@@ -332,20 +333,24 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool pu
 		}
 	}
 
-	if ((push)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId)))
-		_controller->onNetworkMemberDeauthorize(networkId,memberId);
+	if ((initialized)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) {
+		std::lock_guard<std::mutex> ll(_changeListeners_l);
+		for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
+			(*i)->onNetworkMemberDeauthorize(networkId,memberId);
+		}
+	}
 }
 
-void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool push)
+void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized)
 {
 	if (networkConfig.is_object()) {
 		const std::string ids = networkConfig["id"];
-		const uint64_t id = Utils::hexStrToU64(ids.c_str());
-		if (id) {
+		const uint64_t networkId = Utils::hexStrToU64(ids.c_str());
+		if (networkId) {
 			std::shared_ptr<_Network> nw;
 			{
 				std::lock_guard<std::mutex> l(_networks_l);
-				std::shared_ptr<_Network> &nw2 = _networks[id];
+				std::shared_ptr<_Network> &nw2 = _networks[networkId];
 				if (!nw2)
 					nw2.reset(new _Network);
 				nw = nw2;
@@ -354,15 +359,19 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool
 				std::lock_guard<std::mutex> l2(nw->lock);
 				nw->config = networkConfig;
 			}
-			if (push)
-				_controller->onNetworkUpdate(id);
+			if (initialized) {
+				std::lock_guard<std::mutex> ll(_changeListeners_l);
+				for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
+					(*i)->onNetworkUpdate(networkId,networkConfig);
+				}
+			}
 		}
 	} else if (old.is_object()) {
 		const std::string ids = old["id"];
-		const uint64_t id = Utils::hexStrToU64(ids.c_str());
-		if (id) {
+		const uint64_t networkId = Utils::hexStrToU64(ids.c_str());
+		if (networkId) {
 			std::lock_guard<std::mutex> l(_networks_l);
-			_networks.erase(id);
+			_networks.erase(networkId);
 		}
 	}
 }

+ 21 - 26
controller/DB.hpp

@@ -47,14 +47,22 @@
 namespace ZeroTier
 {
 
-class EmbeddedNetworkController;
-
 /**
  * Base class with common infrastructure for all controller DB implementations
  */
 class DB
 {
 public:
+	class ChangeListener
+	{
+	public:
+		ChangeListener() {}
+		virtual ~ChangeListener() {}
+		virtual void onNetworkUpdate(uint64_t networkId,const nlohmann::json &network) {}
+		virtual void onNetworkMemberUpdate(uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {}
+		virtual void onNetworkMemberDeauthorize(uint64_t networkId,uint64_t memberId) {}
+	};
+
 	struct NetworkSummaryInfo
 	{
 		NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {}
@@ -65,27 +73,12 @@ public:
 		int64_t mostRecentDeauthTime;
 	};
 
-	/**
-	 * Ensure that all network fields are present
-	 */
 	static void initNetwork(nlohmann::json &network);
-
-	/**
-	 * Ensure that all member fields are present
-	 */
 	static void initMember(nlohmann::json &member);
-
-	/**
-	 * Remove old and temporary network fields
-	 */
 	static void cleanNetwork(nlohmann::json &network);
-
-	/**
-	 * Remove old and temporary member fields
-	 */
 	static void cleanMember(nlohmann::json &member);
 
-	DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path);
+	DB(const Identity &myId,const char *path);
 	virtual ~DB();
 
 	virtual bool waitForReady() = 0;
@@ -101,19 +94,20 @@ public:
 	bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member);
 	bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info);
 	bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
-
 	bool summary(const uint64_t networkId,NetworkSummaryInfo &info);
-
 	void networks(std::vector<uint64_t> &networks);
 
 	virtual void save(nlohmann::json *orig,nlohmann::json &record) = 0;
-
 	virtual void eraseNetwork(const uint64_t networkId) = 0;
-
 	virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0;
-
 	virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) = 0;
 
+	inline void addListener(DB::ChangeListener *const listener)
+	{
+		std::lock_guard<std::mutex> l(_changeListeners_l);
+		_changeListeners.push_back(listener);
+	}
+
 protected:
 	struct _Network
 	{
@@ -127,18 +121,19 @@ protected:
 		std::mutex lock;
 	};
 
-	void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool push);
-	void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool push);
+	void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized);
+	void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized);
 	void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info);
 
-	EmbeddedNetworkController *const _controller;
 	const Identity _myId;
 	const Address _myAddress;
 	const std::string _path;
 	std::string _myAddressStr;
 
+	std::vector<DB::ChangeListener *> _changeListeners;
 	std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks;
 	std::unordered_multimap< uint64_t,uint64_t > _networkByMember;
+	mutable std::mutex _changeListeners_l;
 	mutable std::mutex _networks_l;
 };
 

+ 7 - 5
controller/EmbeddedNetworkController.cpp

@@ -496,7 +496,7 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender)
 
 #ifdef ZT_CONTROLLER_USE_LIBPQ
 	if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:")) {
-		_db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort, _mqc));
+		_db.reset(new PostgreSQL(_signingId,_path.substr(9).c_str(), _listenPort, _mqc));
 	} else {
 #endif
 
@@ -521,7 +521,7 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender)
 							std::size_t pubHdrEnd = lfOwnerPublic.find_first_of("\n\r\t ");
 							if (pubHdrEnd != std::string::npos) {
 								lfOwnerPublic = lfOwnerPublic.substr(0,pubHdrEnd);
-								_db.reset(new LFDB(this,_signingId,_path.c_str(),lfOwner.c_str(),lfOwnerPublic.c_str(),lfHost.c_str(),lfPort,storeOnlineState));
+								_db.reset(new LFDB(_signingId,_path.c_str(),lfOwner.c_str(),lfOwnerPublic.c_str(),lfHost.c_str(),lfPort,storeOnlineState));
 							}
 						}
 					}
@@ -530,7 +530,9 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender)
 		}
 	}
 	if (!_db)
-		_db.reset(new FileDB(this,_signingId,_path.c_str()));
+		_db.reset(new FileDB(_signingId,_path.c_str()));
+
+	_db->addListener(this);
 
 #ifdef ZT_CONTROLLER_USE_LIBPQ
 	}
@@ -1188,7 +1190,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
 	}
 }
 
-void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId)
+void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network)
 {
 	// Send an update to all members of the network that are online
 	const int64_t now = OSUtils::now();
@@ -1199,7 +1201,7 @@ void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId)
 	}
 }
 
-void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId)
+void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member)
 {
 	// Push update to member if online
 	try {

+ 4 - 5
controller/EmbeddedNetworkController.hpp

@@ -58,7 +58,7 @@ class Node;
 
 struct MQConfig;
 
-class EmbeddedNetworkController : public NetworkController
+class EmbeddedNetworkController : public NetworkController,public DB::ChangeListener
 {
 public:
 	/**
@@ -101,10 +101,9 @@ public:
 
 	void handleRemoteTrace(const ZT_RemoteTrace &rt);
 
-	// Called on update via POST or by JSONDB on external update of network or network member records
-	void onNetworkUpdate(const uint64_t networkId);
-	void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId);
-	void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId);
+	virtual void onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network);
+	virtual void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member);
+	virtual void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId);
 
 private:
 	void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData);

+ 2 - 2
controller/FileDB.cpp

@@ -29,8 +29,8 @@
 namespace ZeroTier
 {
 
-FileDB::FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) :
-	DB(nc,myId,path),
+FileDB::FileDB(const Identity &myId,const char *path) :
+	DB(myId,path),
 	_networksPath(_path + ZT_PATH_SEPARATOR_S + "network"),
 	_tracePath(_path + ZT_PATH_SEPARATOR_S + "trace"),
 	_onlineChanged(false),

+ 1 - 1
controller/FileDB.hpp

@@ -35,7 +35,7 @@ namespace ZeroTier
 class FileDB : public DB
 {
 public:
-	FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path);
+	FileDB(const Identity &myId,const char *path);
 	virtual ~FileDB();
 
 	virtual bool waitForReady();

+ 20 - 7
controller/LFDB.cpp

@@ -37,9 +37,8 @@
 namespace ZeroTier
 {
 
-LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState) :
-	DB(nc,myId,path),
-	_nc(nc),
+LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState) :
+	DB(myId,path),
 	_myId(myId),
 	_lfOwnerPrivate((lfOwnerPrivate) ? lfOwnerPrivate : ""),
 	_lfOwnerPublic((lfOwnerPublic) ? lfOwnerPublic : ""),
@@ -54,7 +53,7 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
 		const uint64_t controllerAddressInt = _myId.address().toInt();
 		_myId.address().toString(controllerAddress);
 		std::string networksSelectorName("com.zerotier.controller.lfdb:"); networksSelectorName.append(controllerAddress); networksSelectorName.append("/network");
-		std::string membersSelectorName("com.zerotier.controller.lfdb:"); membersSelectorName.append(controllerAddress); membersSelectorName.append("/network/member");
+		std::string membersSelectorName("com.zerotier.controller.lfdb:"); membersSelectorName.append(controllerAddress); membersSelectorName.append("/member");
 
 		httplib::Client htcli(_lfNodeHost.c_str(),_lfNodePort,600);
 		int64_t timeRangeStart = 0;
@@ -90,10 +89,10 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
 
 					for(auto ms=ns->second.members.begin();ms!=ns->second.members.end();++ms) {
 						if ((_storeOnlineState)&&(ms->second.lastOnlineDirty)&&(ms->second.lastOnlineAddress)) {
+							nlohmann::json newrec,selector0,selector1,selectors,ip;
 							char tmp[1024],tmp2[128];
 							OSUtils::ztsnprintf(tmp,sizeof(tmp),"com.zerotier.controller.lfdb:%s/network/%.16llx/online",controllerAddress,(unsigned long long)ns->first);
 							ms->second.lastOnlineAddress.toIpString(tmp2);
-							nlohmann::json newrec,selector0,selector1,selectors;
 							selector0["Name"] = tmp;
 							selector0["Ordinal"] = ms->first;
 							selector1["Name"] = tmp2;
@@ -101,7 +100,21 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
 							selectors.push_back(selector0);
 							selectors.push_back(selector1);
 							newrec["Selectors"] = selectors;
-							newrec["Value"] = tmp2;
+							const uint8_t *const rawip = (const uint8_t *)ms->second.lastOnlineAddress.rawIpData();
+							switch(ms->second.lastOnlineAddress) {
+								case AF_INET:
+									for(int j=0;j<4;++j)
+										ip.push_back((unsigned int)rawip[j]);
+									break;
+								case AF_INET6:
+									for(int j=0;j<16;++j)
+										ip.push_back((unsigned int)rawip[j]);
+									break;
+								default:
+									ip = tmp2; // should never happen since only IP transport is currently supported
+									break;
+							}
+							newrec["Value"] = ip;
 							newrec["OwnerPrivate"] = _lfOwnerPrivate;
 							newrec["MaskingKey"] = controllerAddress;
 							newrec["Timestamp"] = ms->second.lastOnlineTime;
@@ -112,7 +125,7 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
 									ms->second.lastOnlineDirty = false;
 									printf("SET member online %.16llx %.10llx %s\n",ns->first,ms->first,resp->body.c_str());
 								} else {
-									fprintf(stderr,"ERROR: LFDB: %d from node (create/update member): %s" ZT_EOL_S,resp->status,resp->body.c_str());
+									fprintf(stderr,"ERROR: LFDB: %d from node (create/update member online status): %s" ZT_EOL_S,resp->status,resp->body.c_str());
 								}
 							} else {
 								fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);

+ 1 - 3
controller/LFDB.hpp

@@ -43,7 +43,6 @@ class LFDB : public DB
 {
 public:
 	/**
-	 * @param nc Network controller
 	 * @param myId Identity of controller node (with secret)
 	 * @param path Base path for ZeroTier node itself
 	 * @param lfOwnerPrivate LF owner private in PEM format
@@ -52,7 +51,7 @@ public:
 	 * @param lfNodePort LF node http (not https) port
 	 * @param storeOnlineState If true, store online/offline state and IP info in LF (a lot of data, only for private networks!)
 	 */
-	LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState);
+	LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState);
 	virtual ~LFDB();
 
 	virtual bool waitForReady();
@@ -63,7 +62,6 @@ public:
 	virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);
 
 protected:
-	EmbeddedNetworkController *const _nc;
 	const Identity _myId;
 
 	std::string _lfOwnerPrivate,_lfOwnerPublic;

+ 2 - 2
controller/PostgreSQL.cpp

@@ -77,8 +77,8 @@ std::string join(const std::vector<std::string> &elements, const char * const se
 
 using namespace ZeroTier;
 
-PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
-    : DB(nc, myId, path)
+PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
+    : DB(myId, path)
     , _ready(0)
 	, _connected(1)
     , _run(1)

+ 2 - 4
controller/PostgreSQL.hpp

@@ -51,7 +51,7 @@ struct MQConfig;
 class PostgreSQL : public DB
 {
 public:
-    PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
+    PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
     virtual ~PostgreSQL();
 
     virtual bool waitForReady();
@@ -78,7 +78,6 @@ private:
     void _networksWatcher_Postgres(PGconn *conn);
     void _networksWatcher_RabbitMQ();
 
-
     void commitThread();
     void onlineNotificationThread();
 
@@ -93,7 +92,6 @@ private:
 
     BlockingQueue<nlohmann::json *> _commitQueue;
 
-
     std::thread _heartbeatThread;
     std::thread _membersDbWatcher;
     std::thread _networksDbWatcher;
@@ -116,4 +114,4 @@ private:
 
 #endif // ZT_CONTROLLER_LIBPQ_HPP
 
-#endif // ZT_CONTROLLER_USE_LIBPQ
+#endif // ZT_CONTROLLER_USE_LIBPQ