Browse Source

More fixes to RethinkDB.

Adam Ierymenko 7 years ago
parent
commit
7fc9094d8e

+ 7 - 4
controller/EmbeddedNetworkController.cpp

@@ -1185,8 +1185,9 @@ void EmbeddedNetworkController::_request(
 		ms.lastRequestTime = now;
 		ms.lastRequestTime = now;
 	}
 	}
 
 
-	OSUtils::ztsnprintf(nwids,sizeof(nwids),"%.16llx",nwid);
-	if (!_db->get(nwid,network,identity.address().toInt(),member,ns)) {
+	Utils::hex(nwid,nwids);
+	_db->get(nwid,network,identity.address().toInt(),member,ns);
+	if ((!network.is_object())||(network.size() == 0)) {
 		_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_OBJECT_NOT_FOUND);
 		_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_OBJECT_NOT_FOUND);
 		return;
 		return;
 	}
 	}
@@ -1684,11 +1685,13 @@ void EmbeddedNetworkController::_startThreads()
 		_threads.emplace_back([this]() {
 		_threads.emplace_back([this]() {
 			for(;;) {
 			for(;;) {
 				_RQEntry *qe = (_RQEntry *)0;
 				_RQEntry *qe = (_RQEntry *)0;
-				if (_queue.get(qe))
+				if (!_queue.get(qe))
 					break;
 					break;
 				try {
 				try {
-					if (qe)
+					if (qe) {
 						_request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData);
 						_request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData);
+						delete qe;
+					}
 				} catch (std::exception &e) {
 				} catch (std::exception &e) {
 					fprintf(stderr,"ERROR: exception in controller request handling thread: %s" ZT_EOL_S,e.what());
 					fprintf(stderr,"ERROR: exception in controller request handling thread: %s" ZT_EOL_S,e.what());
 				} catch ( ... ) {
 				} catch ( ... ) {

+ 77 - 68
controller/RethinkDB.cpp

@@ -78,6 +78,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
 									json &ov = tmp["old_val"];
 									json &ov = tmp["old_val"];
 									json &nv = tmp["new_val"];
 									json &nv = tmp["new_val"];
 									if (ov.is_object()||nv.is_object()) {
 									if (ov.is_object()||nv.is_object()) {
+										//if (nv.is_object()) printf("MEMBER: %s" ZT_EOL_S,nv.dump().c_str());
 										this->_memberChanged(ov,nv);
 										this->_memberChanged(ov,nv);
 									}
 									}
 								} catch ( ... ) {} // ignore bad records
 								} catch ( ... ) {} // ignore bad records
@@ -118,6 +119,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
 									json &ov = tmp["old_val"];
 									json &ov = tmp["old_val"];
 									json &nv = tmp["new_val"];
 									json &nv = tmp["new_val"];
 									if (ov.is_object()||nv.is_object()) {
 									if (ov.is_object()||nv.is_object()) {
+										//if (nv.is_object()) printf("NETWORK: %s" ZT_EOL_S,nv.dump().c_str());
 										this->_networkChanged(ov,nv);
 										this->_networkChanged(ov,nv);
 									}
 									}
 								} catch ( ... ) {} // ignore bad records
 								} catch ( ... ) {} // ignore bad records
@@ -148,34 +150,41 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
 					const std::string objtype = (*config)["objtype"];
 					const std::string objtype = (*config)["objtype"];
 					const char *table;
 					const char *table;
 					std::string deleteId;
 					std::string deleteId;
-					if (objtype == "member") {
-						const std::string nwid = (*config)["nwid"];
-						const std::string id = (*config)["id"];
-						record["id"] = nwid + "-" + id;
-						record["controllerId"] = this->_myAddressStr;
-						record["networkId"] = nwid;
-						record["nodeId"] = id;
-						record["config"] = *config;
-						table = "Member";
-					} else if (objtype == "network") {
-						const std::string id = (*config)["id"];
-						record["id"] = id;
-						record["controllerId"] = this->_myAddressStr;
-						record["config"] = *config;
-						table = "Network";
-					} else if (objtype == "delete_network") {
-						deleteId = (*config)["id"];
-						table = "Network";
-					} else if (objtype == "delete_member") {
-						deleteId = (*config)["nwid"];
-						deleteId.push_back('-');
-						const std::string tmp = (*config)["id"];
-						deleteId.append(tmp);
-						table = "Member";
-					} else if (objtype == "trace") {
-						record = *config;
-						table = "RemoteTrace";
-					} else {
+					try {
+						if (objtype == "member") {
+							const std::string nwid = (*config)["nwid"];
+							const std::string id = (*config)["id"];
+							record["id"] = nwid + "-" + id;
+							record["controllerId"] = this->_myAddressStr;
+							record["networkId"] = nwid;
+							record["nodeId"] = id;
+							record["config"] = *config;
+							table = "Member";
+						} else if (objtype == "network") {
+							const std::string id = (*config)["id"];
+							record["id"] = id;
+							record["controllerId"] = this->_myAddressStr;
+							record["config"] = *config;
+							table = "Network";
+						} else if (objtype == "delete_network") {
+							deleteId = (*config)["id"];
+							table = "Network";
+						} else if (objtype == "delete_member") {
+							deleteId = (*config)["nwid"];
+							deleteId.push_back('-');
+							const std::string tmp = (*config)["id"];
+							deleteId.append(tmp);
+							table = "Member";
+						} else if (objtype == "trace") {
+							record = *config;
+							table = "RemoteTrace";
+						} else {
+							delete config;
+							continue;
+						}
+						delete config;
+					} catch ( ... ) {
+						delete config;
 						continue;
 						continue;
 					}
 					}
 
 
@@ -185,10 +194,10 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
 								rdb = R::connect(this->_host,this->_port,this->_auth);
 								rdb = R::connect(this->_host,this->_port,this->_auth);
 							if (rdb) {
 							if (rdb) {
 								if (deleteId.length() > 0) {
 								if (deleteId.length() > 0) {
-									printf("DELETE: %s" ZT_EOL_S,deleteId.c_str());
+									//printf("DELETE: %s" ZT_EOL_S,deleteId.c_str());
 									R::db(this->_db).table(table).get(deleteId).delete_().run(*rdb);
 									R::db(this->_db).table(table).get(deleteId).delete_().run(*rdb);
 								} else {
 								} else {
-									printf("UPSERT: %s" ZT_EOL_S,record.dump().c_str());
+									//printf("UPSERT: %s" ZT_EOL_S,record.dump().c_str());
 									R::db(this->_db).table(table).insert(R::Datum::from_json(record.dump()),R::optargs("conflict","update","return_changes",false)).run(*rdb);
 									R::db(this->_db).table(table).insert(R::Datum::from_json(record.dump()),R::optargs("conflict","update","return_changes",false)).run(*rdb);
 								}
 								}
 								break;
 								break;
@@ -222,6 +231,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
 						rdb = R::connect(this->_host,this->_port,this->_auth);
 						rdb = R::connect(this->_host,this->_port,this->_auth);
 					if (rdb) {
 					if (rdb) {
 						OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld}",this->_myAddressStr.c_str(),(long long)OSUtils::now());
 						OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld}",this->_myAddressStr.c_str(),(long long)OSUtils::now());
+						//printf("HEARTBEAT: %s" ZT_EOL_S,tmp);
 						R::db(this->_db).table("Controller").update(R::Datum::from_json(tmp)).run(*rdb);
 						R::db(this->_db).table("Controller").update(R::Datum::from_json(tmp)).run(*rdb);
 					}
 					}
 				} catch ( ... ) {
 				} catch ( ... ) {
@@ -240,15 +250,18 @@ RethinkDB::~RethinkDB()
 	_commitQueue.stop();
 	_commitQueue.stop();
 	for(int t=0;t<ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS;++t)
 	for(int t=0;t<ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS;++t)
 		_commitThread[t].join();
 		_commitThread[t].join();
-	_membersDbWatcher.detach();
-	_networksDbWatcher.detach();
+	if (_membersDbWatcherConnection)
+		((R::Connection *)_membersDbWatcherConnection)->close();
+	if (_networksDbWatcherConnection)
+		((R::Connection *)_networksDbWatcherConnection)->close();
+	_membersDbWatcher.join();
+	_networksDbWatcher.join();
 	_heartbeatThread.join();
 	_heartbeatThread.join();
 }
 }
 
 
 bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network)
 bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network)
 {
 {
 	waitForReady();
 	waitForReady();
-
 	std::shared_ptr<_Network> nw;
 	std::shared_ptr<_Network> nw;
 	{
 	{
 		std::lock_guard<std::mutex> l(_networks_l);
 		std::lock_guard<std::mutex> l(_networks_l);
@@ -257,17 +270,16 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network)
 			return false;
 			return false;
 		nw = nwi->second;
 		nw = nwi->second;
 	}
 	}
-
-	std::lock_guard<std::mutex> l2(nw->lock);
-	network = nw->config;
-
+	{
+		std::lock_guard<std::mutex> l2(nw->lock);
+		network = nw->config;
+	}
 	return true;
 	return true;
 }
 }
 
 
 bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member)
 bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member)
 {
 {
 	waitForReady();
 	waitForReady();
-
 	std::shared_ptr<_Network> nw;
 	std::shared_ptr<_Network> nw;
 	{
 	{
 		std::lock_guard<std::mutex> l(_networks_l);
 		std::lock_guard<std::mutex> l(_networks_l);
@@ -276,21 +288,20 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint6
 			return false;
 			return false;
 		nw = nwi->second;
 		nw = nwi->second;
 	}
 	}
-
-	std::lock_guard<std::mutex> l2(nw->lock);
-	auto m = nw->members.find(memberId);
-	if (m == nw->members.end())
-		return false;
-	network = nw->config;
-	member = m->second;
-
+	{
+		std::lock_guard<std::mutex> l2(nw->lock);
+		network = nw->config;
+		auto m = nw->members.find(memberId);
+		if (m == nw->members.end())
+			return false;
+		member = m->second;
+	}
 	return true;
 	return true;
 }
 }
 
 
 bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info)
 bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info)
 {
 {
 	waitForReady();
 	waitForReady();
-
 	std::shared_ptr<_Network> nw;
 	std::shared_ptr<_Network> nw;
 	{
 	{
 		std::lock_guard<std::mutex> l(_networks_l);
 		std::lock_guard<std::mutex> l(_networks_l);
@@ -299,22 +310,21 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint6
 			return false;
 			return false;
 		nw = nwi->second;
 		nw = nwi->second;
 	}
 	}
-
-	std::lock_guard<std::mutex> l2(nw->lock);
-	auto m = nw->members.find(memberId);
-	if (m == nw->members.end())
-		return false;
-	network = nw->config;
-	member = m->second;
-	_fillSummaryInfo(nw,info);
-
+	{
+		std::lock_guard<std::mutex> l2(nw->lock);
+		network = nw->config;
+		_fillSummaryInfo(nw,info);
+		auto m = nw->members.find(memberId);
+		if (m == nw->members.end())
+			return false;
+		member = m->second;
+	}
 	return true;
 	return true;
 }
 }
 
 
 bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members)
 bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members)
 {
 {
 	waitForReady();
 	waitForReady();
-
 	std::shared_ptr<_Network> nw;
 	std::shared_ptr<_Network> nw;
 	{
 	{
 		std::lock_guard<std::mutex> l(_networks_l);
 		std::lock_guard<std::mutex> l(_networks_l);
@@ -323,19 +333,18 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector
 			return false;
 			return false;
 		nw = nwi->second;
 		nw = nwi->second;
 	}
 	}
-
-	std::lock_guard<std::mutex> l2(nw->lock);
-	network = nw->config;
-	for(auto m=nw->members.begin();m!=nw->members.end();++m)
-		members.push_back(m->second);
-
+	{
+		std::lock_guard<std::mutex> l2(nw->lock);
+		network = nw->config;
+		for(auto m=nw->members.begin();m!=nw->members.end();++m)
+			members.push_back(m->second);
+	}
 	return true;
 	return true;
 }
 }
 
 
 bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info)
 bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info)
 {
 {
 	waitForReady();
 	waitForReady();
-
 	std::shared_ptr<_Network> nw;
 	std::shared_ptr<_Network> nw;
 	{
 	{
 		std::lock_guard<std::mutex> l(_networks_l);
 		std::lock_guard<std::mutex> l(_networks_l);
@@ -344,10 +353,10 @@ bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info)
 			return false;
 			return false;
 		nw = nwi->second;
 		nw = nwi->second;
 	}
 	}
-
-	std::lock_guard<std::mutex> l2(nw->lock);
-	_fillSummaryInfo(nw,info);
-
+	{
+		std::lock_guard<std::mutex> l2(nw->lock);
+		_fillSummaryInfo(nw,info);
+	}
 	return true;
 	return true;
 }
 }
 
 

+ 4 - 1
ext/librethinkdbxx/src/connection.cc

@@ -101,6 +101,8 @@ std::unique_ptr<Connection> connect(std::string host, int port, std::string auth
 Connection::Connection(ConnectionPrivate *dd) : d(dd) { }
 Connection::Connection(ConnectionPrivate *dd) : d(dd) { }
 Connection::~Connection() {
 Connection::~Connection() {
     // close();
     // close();
+    if (d->guarded_sockfd >= 0)
+        ::close(d->guarded_sockfd);
 }
 }
 
 
 size_t ReadLock::recv_some(char* buf, size_t size, double wait) {
 size_t ReadLock::recv_some(char* buf, size_t size, double wait) {
@@ -128,7 +130,7 @@ size_t ReadLock::recv_some(char* buf, size_t size, double wait) {
     }
     }
 
 
     ssize_t numbytes = ::recv(conn->guarded_sockfd, buf, size, 0);
     ssize_t numbytes = ::recv(conn->guarded_sockfd, buf, size, 0);
-    if (numbytes == -1) throw Error::from_errno("recv");
+    if (numbytes <= 0) throw Error::from_errno("recv");
     if (debug_net > 1) {
     if (debug_net > 1) {
         fprintf(stderr, "<< %s\n", write_datum(std::string(buf, numbytes)).c_str());
         fprintf(stderr, "<< %s\n", write_datum(std::string(buf, numbytes)).c_str());
     }
     }
@@ -190,6 +192,7 @@ void Connection::close() {
     if (ret == -1) {
     if (ret == -1) {
         throw Error::from_errno("close");
         throw Error::from_errno("close");
     }
     }
+    d->guarded_sockfd = -1;
 }
 }
 
 
 Response ConnectionPrivate::wait_for_response(uint64_t token_want, double wait) {
 Response ConnectionPrivate::wait_for_response(uint64_t token_want, double wait) {

+ 5 - 3
ext/librethinkdbxx/src/cursor.cc

@@ -21,9 +21,11 @@ CursorPrivate::CursorPrivate(uint64_t token_, Connection *conn_, Datum&& datum)
 Cursor::Cursor(CursorPrivate *dd) : d(dd) {}
 Cursor::Cursor(CursorPrivate *dd) : d(dd) {}
 
 
 Cursor::~Cursor() {
 Cursor::~Cursor() {
-    if (d && d->conn) {
-        close();
-    }
+    try {
+        if (d && d->conn) {
+            close();
+        }
+    } catch ( ... ) {}
 }
 }
 
 
 Datum& Cursor::next(double wait) const {
 Datum& Cursor::next(double wait) const {