Browse Source

Netconf service itself works, time to integrate.

Adam Ierymenko 12 years ago
parent
commit
2a6b74746e
6 changed files with 107 additions and 51 deletions
  1. 3 2
      netconf-service/Makefile
  2. 17 0
      netconf-service/netconf-test.cpp
  3. 70 38
      netconf-service/netconf.cpp
  4. 2 2
      node/Identity.hpp
  5. 14 8
      node/Service.cpp
  6. 1 1
      node/Thread.cpp

+ 3 - 2
netconf-service/Makefile

@@ -1,6 +1,7 @@
 all:
 	gcc -O6 -c ../ext/lz4/lz4hc.c ../ext/lz4/lz4.c
-	g++ -DZT_OSNAME="linux" -DZT_ARCH="x86_64" -I/usr/include/mysql -I../ext/bin/libcrypto/include -O -o netconf.service netconf.cpp ../node/Utils.cpp ../node/Identity.cpp ../node/EllipticCurveKeyPair.cpp ../node/Salsa20.cpp ../node/HMAC.cpp lz4.o lz4hc.o ../ext/bin/libcrypto/linux-x86_64/libcrypto.a -lmysqlpp
+	g++ -DZT_OSNAME="linux" -DZT_ARCH="x86_64" -I/usr/include/mysql -I../ext/bin/libcrypto/include -O -pthread -o netconf.service netconf.cpp ../node/Utils.cpp ../node/Identity.cpp ../node/EllipticCurveKeyPair.cpp ../node/Salsa20.cpp ../node/HMAC.cpp lz4.o lz4hc.o ../ext/bin/libcrypto/linux-x86_64/libcrypto.a -lmysqlpp
+	g++ -DZT_OSNAME="linux" -DZT_ARCH="x86_64" -I/usr/include/mysql -I../ext/bin/libcrypto/include -O -pthread -o netconf-test netconf-test.cpp ../node/Utils.cpp ../node/Identity.cpp ../node/EllipticCurveKeyPair.cpp ../node/Salsa20.cpp ../node/HMAC.cpp ../node/Logger.cpp ../node/Service.cpp ../node/Thread.cpp lz4.o lz4hc.o ../ext/bin/libcrypto/linux-x86_64/libcrypto.a
 
 clean:
-	rm -f *.o netconf.service
+	rm -f *.o netconf.service netconf-test

+ 17 - 0
netconf-service/netconf-test.cpp

@@ -42,11 +42,13 @@
 #include "../node/Identity.hpp"
 #include "../node/RuntimeEnvironment.hpp"
 #include "../node/Logger.hpp"
+#include "../node/Thread.hpp"
 
 using namespace ZeroTier;
 
 static void svcHandler(void *arg,Service &svc,const Dictionary &msg)
 {
+	std::cout << msg.toString();
 }
 
 int main(int argc,char **argv)
@@ -59,8 +61,23 @@ int main(int argc,char **argv)
 
 	std::vector<Identity> population;
 	for(;;) {
+		Identity id;
 		if ((population.empty())||(rand() < (RAND_MAX / 4))) {
+			id.generate();
+			population.push_back(id);
+			std::cout << "Testing with new identity: " << id.address().toString() << std::endl;
 		} else {
+			id = population[rand() % population.size()];
+			Thread::sleep(1000);
+			std::cout << "Testing with existing identity: " << id.address().toString() << std::endl;
 		}
+
+		Dictionary request;
+		request["type"] = "netconf-request";
+		request["peerId"] = id.toString(false);
+		request["nwid"] = "6c92786fee000001";
+		request["requestId"] = "12345";
+
+		svc.send(request);
 	}
 }

+ 70 - 38
netconf-service/netconf.cpp

@@ -55,6 +55,7 @@
 #include <string.h>
 #include <stdint.h>
 #include <unistd.h>
+#include <errno.h>
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <arpa/inet.h>
@@ -71,24 +72,34 @@
 #include "../node/Dictionary.hpp"
 #include "../node/Identity.hpp"
 #include "../node/Utils.hpp"
+#include "../node/Mutex.hpp"
 
 using namespace ZeroTier;
 using namespace mysqlpp;
 
+static Mutex stdoutWriteLock;
 static Connection *dbCon = (Connection *)0;
 static char mysqlHost[64],mysqlPort[64],mysqlDatabase[64],mysqlUser[64],mysqlPassword[64];
 
 static void connectOrReconnect()
 {
 	for(;;) {
-		if (dbCon)
-			delete dbCon;
-		dbCon = new Connection(mysqlDatabase,mysqlHost,mysqlUser,mysqlPassword,(unsigned int)strtol(mysqlPort,(char **)0,10));
-		if (dbCon->connected())
-			break;
-		else {
-			fprintf(stderr,"Unable to connect to database server.\n");
-			usleep(1000);
+		delete dbCon;
+		try {
+			dbCon = new Connection(mysqlDatabase,mysqlHost,mysqlUser,mysqlPassword,(unsigned int)strtol(mysqlPort,(char **)0,10));
+			if (dbCon->connected()) {
+				fprintf(stderr,"(re?)-connected to mysql server successfully\n");
+				break;
+			} else {
+				fprintf(stderr,"unable to connect to database server (connection closed), trying again in 1s...\n");
+				usleep(1000000);
+			}
+		} catch (std::exception &exc) {
+			fprintf(stderr,"unable to connect to database server (%s), trying again in 1s...\n",exc.what());
+			usleep(1000000);
+		} catch ( ... ) {
+			fprintf(stderr,"unable to connect to database server (unknown exception), trying again in 1s...\n");
+			usleep(1000000);
 		}
 	}
 }
@@ -98,7 +109,7 @@ int main(int argc,char **argv)
 	{
 		char *ee = getenv("ZT_NETCONF_MYSQL_HOST");
 		if (!ee) {
-			fprintf(stderr,"Missing environment variable: ZT_NETCONF_MYSQL_HOST\n");
+			fprintf(stderr,"missing environment variable: ZT_NETCONF_MYSQL_HOST\n");
 			return -1;
 		}
 		strcpy(mysqlHost,ee);
@@ -108,57 +119,66 @@ int main(int argc,char **argv)
 		else strcpy(mysqlPort,ee);
 		ee = getenv("ZT_NETCONF_MYSQL_DATABASE");
 		if (!ee) {
-			fprintf(stderr,"Missing environment variable: ZT_NETCONF_MYSQL_DATABASE\n");
+			fprintf(stderr,"missing environment variable: ZT_NETCONF_MYSQL_DATABASE\n");
 			return -1;
 		}
 		strcpy(mysqlDatabase,ee);
 		ee = getenv("ZT_NETCONF_MYSQL_USER");
 		if (!ee) {
-			fprintf(stderr,"Missing environment variable: ZT_NETCONF_MYSQL_USER\n");
+			fprintf(stderr,"missing environment variable: ZT_NETCONF_MYSQL_USER\n");
 			return -1;
 		}
 		strcpy(mysqlUser,ee);
 		ee = getenv("ZT_NETCONF_MYSQL_PASSWORD");
 		if (!ee) {
-			fprintf(stderr,"Missing environment variable: ZT_NETCONF_MYSQL_PASSWORD\n");
+			fprintf(stderr,"missing environment variable: ZT_NETCONF_MYSQL_PASSWORD\n");
 			return -1;
 		}
 		strcpy(mysqlPassword,ee);
 	}
 
-	char buf[4096];
+	char buf[131072];
 	std::string dictBuf;
 
 	connectOrReconnect();
 	for(;;) {
-		if (read(STDIN_FILENO,buf,4) != 4) {
-			fprintf(stderr,"Error reading frame size from stdin\n");
-			return -1;
+		for(int l=0;l<4;) {
+			int n = (int)read(STDIN_FILENO,buf + l,4 - l);
+			if (n < 0) {
+				fprintf(stderr,"error reading frame size from stdin: %s\n",strerror(errno));
+				return -1;
+			}
+			l += n;
 		}
 		unsigned int fsize = (unsigned int)ntohl(*((const uint32_t *)buf));
+
 		while (dictBuf.length() < fsize) {
 			int n = (int)read(STDIN_FILENO,buf,std::min((int)sizeof(buf),(int)(fsize - dictBuf.length())));
+			if (n < 0) {
+				fprintf(stderr,"error reading frame from stdin: %s\n",strerror(errno));
+				return -1;
+			}
 			for(int i=0;i<n;++i)
 				dictBuf.push_back(buf[i]);
 		}
-		Dictionary msg(dictBuf);
+		Dictionary request(dictBuf);
 		dictBuf = "";
 
 		if (!dbCon->connected())
 			connectOrReconnect();
 
 		try {
-			const std::string &command = msg.get("command");
-			if (command == "config") { // NETWORK_CONFIG_REQUEST packet
-				Identity peerIdentity(msg.get("peerIdentity"));
-				uint64_t nwid = strtoull(msg.get("nwid").c_str(),(char **)0,16);
+			const std::string &reqType = request.get("type");
+			if (reqType == "netconf-request") { // NETWORK_CONFIG_REQUEST packet
+				Identity peerIdentity(request.get("peerId"));
+				uint64_t nwid = strtoull(request.get("nwid").c_str(),(char **)0,16);
 				Dictionary meta;
-				if (msg.contains("meta"))
-					meta.fromString(msg.get("meta"));
+				if (request.contains("meta"))
+					meta.fromString(request.get("meta"));
 
 				// Do quick signature check / sanity check
 				if (!peerIdentity.locallyValidate(false)) {
-					fprintf(stderr,"identity failed signature check: %s",peerIdentity.toString(false).c_str());
+					fprintf(stderr,"identity failed signature check: %s\n",peerIdentity.toString(false).c_str());
 					continue;
 				}
 
@@ -176,16 +196,22 @@ int main(int argc,char **argv)
 						}
 					} else {
 						q = dbCon->query();
-						uint64_t now = Utils::now();
-						q << "INSERT INTO Node (id,creationTime,lastSeen,identity) VALUES (" << peerIdentity.address().toInt() << "," << now << "," << now << "," << peerIdentity.toString(false) << ")";
+						q << "INSERT INTO Node (id,creationTime,lastSeen,identity) VALUES (" << peerIdentity.address().toInt() << "," << Utils::now() << ",0," << quote << peerIdentity.toString(false) << ")";
 						if (!q.exec()) {
-							fprintf(stderr,"Error inserting Node row for peer %s, aborting netconf request",peerIdentity.address().toString().c_str());
+							fprintf(stderr,"error inserting Node row for peer %s, aborting netconf request\n",peerIdentity.address().toString().c_str());
 							continue;
 						}
 						// TODO: launch background validation
 					}
 				}
 
+				// Update lastSeen
+				{
+					Query q = dbCon->query();
+					q << "UPDATE Node SET lastSeen = " << Utils::now() << " WHERE id = " << peerIdentity.address().toInt();
+					q.exec();
+				}
+
 				bool isOpen = false;
 				{
 					Query q = dbCon->query();
@@ -278,20 +304,26 @@ int main(int argc,char **argv)
 				if (ipv6Static.length())
 					netconf["ipv6Static"] = ipv6Static;
 
-				Dictionary resp;
-				resp["peer"] = peerIdentity.address().toString();
-				resp["nwid"] = msg.get("nwid");
-				resp["requestId"] = msg.get("requestId");
-				resp["netconf"] = netconf.toString();
-				std::string respm = resp.toString();
-				uint32_t respml = (uint32_t)htonl((uint32_t)respm.length());
-				write(STDOUT_FILENO,&respml,4);
-				write(STDOUT_FILENO,respm.data(),respm.length());
+				{
+					Dictionary response;
+					response["peer"] = peerIdentity.address().toString();
+					response["nwid"] = request.get("nwid");
+					response["type"] = "netconf-response";
+					response["requestId"] = request.get("requestId");
+					response["netconf"] = netconf.toString();
+					std::string respm = response.toString();
+					uint32_t respml = (uint32_t)htonl((uint32_t)respm.length());
+
+					stdoutWriteLock.lock();
+					write(STDOUT_FILENO,&respml,4);
+					write(STDOUT_FILENO,respm.data(),respm.length());
+					stdoutWriteLock.unlock();
+				}
 			}
 		} catch (std::exception &exc) {
-			fprintf(stderr,"unexpected exception handling message: %s",exc.what());
+			fprintf(stderr,"unexpected exception handling message: %s\n",exc.what());
 		} catch ( ... ) {
-			fprintf(stderr,"unexpected exception handling message: unknown exception");
+			fprintf(stderr,"unexpected exception handling message: unknown exception\n");
 		}
 	}
 }

+ 2 - 2
node/Identity.hpp

@@ -104,7 +104,7 @@ public:
 		_keyPair((EllipticCurveKeyPair *)0)
 	{
 		if (!fromString(str))
-			throw std::invalid_argument("invalid string-serialized identity");
+			throw std::invalid_argument(std::string("invalid string-serialized identity: ") + str);
 	}
 
 	Identity(const std::string &str)
@@ -112,7 +112,7 @@ public:
 		_keyPair((EllipticCurveKeyPair *)0)
 	{
 		if (!fromString(str))
-			throw std::invalid_argument("invalid string-serialized identity");
+			throw std::invalid_argument(std::string("invalid string-serialized identity: ") + str);
 	}
 
 	template<unsigned int C>

+ 14 - 8
node/Service.cpp

@@ -36,6 +36,7 @@
 #include <signal.h>
 #include <time.h>
 #include <fcntl.h>
+#include <errno.h>
 #include <sys/time.h>
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -109,7 +110,7 @@ bool Service::send(const Dictionary &msg)
 void Service::main()
 	throw()
 {
-	char buf[4096];
+	char buf[131072];
 	fd_set readfds,writefds,exceptfds;
 	struct timeval tv;
 
@@ -126,27 +127,30 @@ void Service::main()
 			pipe(out);
 			pipe(err);
 
-			long pid = fork();
+			long pid = vfork();
 			if (pid < 0) {
 				LOG("service %s terminating: could not fork!",_name.c_str());
 				return;
 			} else if (pid) {
-				close(in[1]);
-				close(out[0]);
-				close(err[0]);
+				// Parent
+				close(in[0]);
+				close(out[1]);
+				close(err[1]);
 				Thread::sleep(500); // give child time to start
 				_childStdin = in[1];
 				_childStdout = out[0];
 				_childStderr = err[0];
 				fcntl(_childStdout,F_SETFL,O_NONBLOCK);
 				fcntl(_childStderr,F_SETFL,O_NONBLOCK);
+				_pid = pid;
 			} else {
-				dup2(in[0],STDIN_FILENO);
-				dup2(out[1],STDOUT_FILENO);
-				dup2(err[1],STDERR_FILENO);
+				// Child
 				close(in[1]);
 				close(out[0]);
 				close(err[0]);
+				dup2(in[0],STDIN_FILENO);
+				dup2(out[1],STDOUT_FILENO);
+				dup2(err[1],STDERR_FILENO);
 				execl(_path.c_str(),_path.c_str(),_r->homePath.c_str(),(const char *)0);
 				exit(-1);
 			}
@@ -169,6 +173,8 @@ void Service::main()
 			}
 		}
 
+		// If we've made it here, _pid is running last we checked.
+
 		FD_ZERO(&readfds);
 		FD_ZERO(&writefds);
 		FD_ZERO(&exceptfds);

+ 1 - 1
node/Thread.cpp

@@ -75,7 +75,7 @@ void Thread::join()
 
 void Thread::sleep(unsigned long ms)
 {
-	usleep(ms);
+	usleep(ms * 1000);
 }
 
 void Thread::__intl_run()