소스 검색

Add support for pushing network config refresh hints from a MEMORY queue table. That ways it will be possible for network changes to take effect almost immediately across all active peers.

Adam Ierymenko 11 년 전
부모
커밋
7e7e28f5f7
4개의 변경된 파일79개의 추가작업 그리고 7개의 파일을 삭제
  1. 34 0
      netconf-service/netconf.cpp
  2. 28 0
      node/Node.cpp
  3. 1 3
      node/Packet.hpp
  4. 16 4
      node/PacketDecoder.cpp

+ 34 - 0
netconf-service/netconf.cpp

@@ -158,6 +158,40 @@ int main(int argc,char **argv)
 			return -1;
 		}
 
+		// Check QNetworkConfigRefresh (MEMORY table) and push network
+		// config refreshes to queued peer/network pairs.
+		try {
+			Dictionary to;
+			{
+				Query q = dbCon->query();
+				q << "SELECT LOWER(HEX(Node_id)) AS Node_id,LOWER(HEX(Network_id)) AS Network_id FROM QNetworkConfigRefresh";
+				StoreQueryResult rs = q.store();
+				for(unsigned long i=0;i<rs.num_rows();++i) {
+					std::string &nwids = to[rs[i]["Node_id"]];
+					if (nwids.length())
+						nwids.push_back(',');
+					nwids.append(rs[i]["Network_id"]);
+				}
+			}
+
+			{
+				Query q = dbCon->query();
+				q << "DELETE FROM QNetworkConfigRefresh";
+				q.exec();
+			}
+
+			Dictionary response;
+			response["type"] = "netconf-push";
+			response["to"] = to.toString();
+			std::string respm = response.toString();
+			uint32_t respml = (uint32_t)htonl((uint32_t)respm.length());
+
+			stdoutWriteLock.lock();
+			write(STDOUT_FILENO,&respml,4);
+			write(STDOUT_FILENO,respm.data(),respm.length());
+			stdoutWriteLock.unlock();
+		} catch ( ... ) {}
+
 		try {
 			const std::string &reqType = request.get("type");
 			if (reqType == "netconf-request") { // NETWORK_CONFIG_REQUEST packet

+ 28 - 0
node/Node.cpp

@@ -269,6 +269,34 @@ static void _netconfServiceMessageHandler(void *renv,Service &svc,const Dictiona
 					}
 				}
 			}
+		} else if (type == "netconf-push") {
+			if (msg.contains("to")) {
+				Dictionary to(msg.get("to")); // key: peer address, value: comma-delimited network list
+				for(Dictionary::iterator t(to.begin());t!=to.end();++t) {
+					Address ztaddr(t->first);
+					if (ztaddr) {
+						Packet outp(ztaddr,_r->identity.address(),Packet::VERB_NETWORK_CONFIG_REFRESH);
+
+						char *saveptr = (char *)0;
+						// Note: this loop trashes t->second, which is quasi-legal C++ but
+						// shouldn't break anything as long as we don't try to use 'to'
+						// for anything interesting after doing this.
+						for(char *p=Utils::stok(const_cast<char *>(t->second.c_str()),",",&saveptr);(p);p=Utils::stok((char *)0,",",&saveptr)) {
+							uint64_t nwid = Utils::hexStrToU64(p);
+							if (nwid) {
+								if ((outp.size() + sizeof(uint64_t)) >= ZT_UDP_DEFAULT_PAYLOAD_MTU) {
+									_r->sw->send(outp,true);
+									outp.reset(ztaddr,_r->identity.address(),Packet::VERB_NETWORK_CONFIG_REFRESH);
+								}
+								outp.append(nwid);
+							}
+						}
+
+						if (outp.payloadLength())
+							_r->sw->send(outp,true);
+					}
+				}
+			}
 		}
 	} catch (std::exception &exc) {
 		LOG("unexpected exception parsing response from netconf service: %s",exc.what());

+ 1 - 3
node/Packet.hpp

@@ -205,8 +205,6 @@
 #define ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST_IDX_DICT_LEN (ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST_IDX_NETWORK_ID + 8)
 #define ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST_IDX_DICT (ZT_PROTO_VERB_NETWORK_CONFIG_REQUEST_IDX_DICT_LEN + 2)
 
-#define ZT_PROTO_VERB_NETWORK_CONFIG_REFRESH_IDX_NETWORK_ID (ZT_PACKET_IDX_PAYLOAD)
-
 #define ZT_PROTO_VERB_HELLO__OK__IDX_TIMESTAMP (ZT_PROTO_VERB_OK_IDX_PAYLOAD)
 #define ZT_PROTO_VERB_HELLO__OK__IDX_PROTOCOL_VERSION (ZT_PROTO_VERB_HELLO__OK__IDX_TIMESTAMP + 8)
 #define ZT_PROTO_VERB_HELLO__OK__IDX_MAJOR_VERSION (ZT_PROTO_VERB_HELLO__OK__IDX_PROTOCOL_VERSION + 1)
@@ -592,7 +590,7 @@ public:
 		VERB_NETWORK_CONFIG_REQUEST = 11,
 
 		/* Network configuration refresh request:
-		 *   <[8] 64-bit network ID>
+		 *   <[...] array of 64-bit network IDs>
 		 *
 		 * This message can be sent by the network configuration master node
 		 * to request that nodes refresh their network configuration. It can

+ 16 - 4
node/PacketDecoder.cpp

@@ -140,6 +140,10 @@ bool PacketDecoder::_doERROR(const RuntimeEnvironment *_r,const SharedPtr<Peer>
 				if (inReVerb == Packet::VERB_WHOIS) {
 					if (_r->topology->isSupernode(source()))
 						_r->sw->cancelWhoisRequest(Address(field(ZT_PROTO_VERB_ERROR_IDX_PAYLOAD,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH));
+				} else if (inReVerb == Packet::VERB_NETWORK_CONFIG_REQUEST) {
+					SharedPtr<Network> network(_r->nc->network(at<uint64_t>(ZT_PROTO_VERB_ERROR_IDX_PAYLOAD)));
+					if ((network)&&(network->controller() == source()))
+						network->forceStatusTo(Network::NETWORK_NOT_FOUND);
 				}
 				break;
 			case Packet::ERROR_IDENTITY_COLLISION:
@@ -154,6 +158,11 @@ bool PacketDecoder::_doERROR(const RuntimeEnvironment *_r,const SharedPtr<Peer>
 				if (network)
 					network->pushMembershipCertificate(source(),true,Utils::now());
 			}	break;
+			case Packet::ERROR_NETWORK_ACCESS_DENIED: {
+				SharedPtr<Network> network(_r->nc->network(at<uint64_t>(ZT_PROTO_VERB_ERROR_IDX_PAYLOAD)));
+				if ((network)&&(network->controller() == source()))
+					network->forceStatusTo(Network::NETWORK_ACCESS_DENIED);
+			}	break;
 			default:
 				break;
 		}
@@ -732,10 +741,13 @@ bool PacketDecoder::_doNETWORK_CONFIG_REQUEST(const RuntimeEnvironment *_r,const
 bool PacketDecoder::_doNETWORK_CONFIG_REFRESH(const RuntimeEnvironment *_r,const SharedPtr<Peer> &peer)
 {
 	try {
-		uint64_t nwid = at<uint64_t>(ZT_PROTO_VERB_NETWORK_CONFIG_REFRESH_IDX_NETWORK_ID);
-		SharedPtr<Network> nw(_r->nc->network(nwid));
-		if ((nw)&&(source() == nw->controller())) // only respond to requests from controller
-			nw->requestConfiguration();
+		unsigned int ptr = ZT_PACKET_IDX_PAYLOAD;
+		while ((ptr + sizeof(uint64_t)) <= size()) {
+			uint64_t nwid = at<uint64_t>(ptr); ptr += sizeof(uint64_t);
+			SharedPtr<Network> nw(_r->nc->network(nwid));
+			if ((nw)&&(source() == nw->controller())) // only respond to requests from controller
+				nw->requestConfiguration();
+		}
 	} catch (std::exception &exc) {
 		TRACE("dropped NETWORK_CONFIG_REFRESH from %s(%s): unexpected exception: %s",source().toString().c_str(),_remoteAddress.toString().c_str(),exc.what());
 	} catch ( ... ) {