Browse Source

RPC infrastructure work in progress.

Adam Ierymenko 12 years ago
parent
commit
af8fcac0fc
7 changed files with 179 additions and 13 deletions
  1. 1 1
      Makefile.linux
  2. 1 1
      Makefile.mac
  3. 5 0
      node/Constants.hpp
  4. 1 1
      node/Packet.hpp
  5. 159 0
      node/RPC.cpp
  6. 11 10
      node/RPC.hpp
  7. 1 0
      objects.mk

+ 1 - 1
Makefile.linux

@@ -20,7 +20,7 @@ CXXFLAGS=$(CFLAGS) -fno-rtti
 # separate binaries for the RedHat and Debian universes to distribute via
 # auto-update. This way we get one Linux binary for all systems of a given
 # architecture.
-LIBS=ext/bin/libcrypto/linux-$(ARCH)/libcrypto.a
+LIBS=ext/bin/libcrypto/linux-$(ARCH)/libcrypto.a -ldl
 
 include objects.mk
 

+ 1 - 1
Makefile.mac

@@ -13,7 +13,7 @@ STRIP=strip
 #STRIP=echo
 
 CXXFLAGS=$(CFLAGS) -fno-rtti
-LIBS=-lcrypto
+LIBS=-lcrypto -ldl
 
 include objects.mk
 

+ 5 - 0
node/Constants.hpp

@@ -324,4 +324,9 @@ error_no_ZT_ARCH_defined;
  */
 #define ZT_RENDEZVOUS_NAT_T_DELAY 500
 
+/**
+ * Timeout for remote RPC calls
+ */
+#define ZT_RPC_TIMEOUT 10000
+
 #endif

+ 1 - 1
node/Packet.hpp

@@ -466,7 +466,7 @@ public:
 		VERB_MULTICAST_FRAME = 9,
 
 		/* Call a plugin via RPC:
-		 *   <[1] length of function name>
+		 *   <[2] length of function name>
 		 *   <[...] function name>
 		 *   [<[2] length of argument>]
 		 *   [<[...] argument>]

+ 159 - 0
node/RPC.cpp

@@ -25,7 +25,166 @@
  * LLC. Start here: http://www.zerotier.com/
  */
 
+#ifndef __WINDOWS__
+#include <dlfcn.h>
+#endif
+
+#include "Utils.hpp"
 #include "RuntimeEnvironment.hpp"
 #include "RPC.hpp"
 #include "Switch.hpp"
 #include "Topology.hpp"
+
+namespace ZeroTier {
+
+RPC::LocalService::LocalService(const char *dllPath)
+	throw(std::invalid_argument) :
+	_handle((void *)0),
+	_init((void *)0),
+	_do((void *)0),
+	_free((void *)0),
+	_destroy((void *)0)
+{
+	_handle = dlopen(dllPath,RTLD_LAZY|RTLD_LOCAL);
+	if (!_handle)
+		throw std::invalid_argument("Unable to load DLL: dlopen() failed");
+
+	_init = dlsym(_handle,"ZeroTierPluginInit");
+	if (!_init) {
+		dlclose(_handle);
+		throw std::invalid_argument("Unable to resolve symbol ZeroTierPluginInit in DLL");
+	}
+	_do = dlsym(_handle,"ZeroTierPluginDo");
+	if (!_do) {
+		dlclose(_handle);
+		throw std::invalid_argument("Unable to resolve symbol ZeroTierPluginDo in DLL");
+	}
+	_free = dlsym(_handle,"ZeroTierPluginFree");
+	if (!_free) {
+		dlclose(_handle);
+		throw std::invalid_argument("Unable to resolve symbol ZeroTierPluginFree in DLL");
+	}
+	_destroy = dlsym(_handle,"ZeroTierPluginDestroy");
+	if (!_destroy) {
+		dlclose(_handle);
+		throw std::invalid_argument("Unable to resolve symbol ZeroTierPluginDestroy in DLL");
+	}
+
+	if (((int (*)())_init)() < 0) {
+		dlclose(_handle);
+		throw std::invalid_argument("ZeroTierPluginInit() returned error");
+	}
+}
+
+RPC::LocalService::~LocalService()
+{
+	if (_handle) {
+		if (_destroy)
+			((void (*)())_destroy)();
+		dlclose(_handle);
+	}
+}
+
+std::pair< int,std::vector<std::string> > RPC::LocalService::operator()(const std::vector<std::string> &args)
+{
+	unsigned int alengths[4096];
+	const void *argptrs[4096];
+	const unsigned int *rlengths = (const unsigned int *)0;
+	const void **resultptrs = (const void **)0;
+	std::vector<std::string> results;
+
+	if (args.size() > 4096)
+		throw std::runtime_error("args[] too long");
+
+	for(unsigned int i=0;i<args.size();++i) {
+		alengths[i] = args[i].length();
+		argptrs[i] = (const void *)args[i].data();
+	}
+
+	int rcount = ((int (*)(unsigned int,const unsigned int *,const void **,const unsigned int **,const void ***))_do)((unsigned int)args.size(),alengths,argptrs,&rlengths,&resultptrs);
+
+	for(int i=0;i<rcount;++i)
+		results.push_back(std::string((const char *)resultptrs[i],rlengths[i]));
+
+	((void (*)(int,const unsigned int *,const void **))_free)(rcount,rlengths,resultptrs);
+
+	return std::pair< int,std::vector<std::string> >(rcount,results);
+}
+
+RPC::RPC(const RuntimeEnvironment *renv) :
+	_r(renv)
+{
+}
+
+RPC::~RPC()
+{
+	for(std::map<uint64_t,RemoteCallOutstanding>::iterator co(_remoteCallsOutstanding.begin());co!=_remoteCallsOutstanding.end();++co) {
+		if (co->second.handler)
+			co->second.handler(co->second.arg,co->first,co->second.peer,ZT_RPC_ERROR_CANCELLED,std::vector<std::string>());
+	}
+
+	for(std::map<std::string,LocalService *>::iterator s(_rpcServices.begin());s!=_rpcServices.end();++s)
+		delete s->second;
+}
+
+std::pair< int,std::vector<std::string> > RPC::callLocal(const std::string &name,const std::vector<std::string> &args)
+{
+	Mutex::Lock _l(_rpcServices_m);
+	std::map<std::string,LocalService *>::iterator s(_rpcServices.find(name));
+	if (s == _rpcServices.end())
+		return std::pair< int,std::vector<std::string> >(ZT_RPC_ERROR_NOT_FOUND,std::vector<std::string>());
+	return ((*(s->second))(args));
+}
+
+uint64_t RPC::callRemote(
+	const Address &peer,
+	const std::string &name,
+	const std::vector<std::string> &args,
+	void (*handler)(void *,uint64_t,const Address &,int,const std::vector<std::string> &),
+	void *arg)
+	throw(std::invalid_argument,std::out_of_range)
+{
+	Packet outp(peer,_r->identity.address(),Packet::VERB_RPC);
+
+	if (name.length() > 0xffff)
+		throw std::invalid_argument("function name too long");
+	outp.append((uint16_t)name.length());
+	outp.append(name);
+	for(std::vector<std::string>::const_iterator a(args.begin());a!=args.end();++a) {
+		if (a->length() > 0xffff)
+			throw std::invalid_argument("argument too long");
+		outp.append((uint16_t)a->length());
+		outp.append(*a);
+	}
+	outp.compress();
+
+	uint64_t id = outp.packetId();
+
+	{
+		Mutex::Lock _l(_remoteCallsOutstanding_m);
+		RemoteCallOutstanding &rc = _remoteCallsOutstanding[id];
+		rc.callTime = Utils::now();
+		rc.peer = peer;
+		rc.handler = handler;
+		rc.arg = arg;
+	}
+
+	_r->sw->send(outp,true);
+
+	return id;
+}
+
+void RPC::clean()
+{
+	Mutex::Lock _l(_remoteCallsOutstanding_m);
+	uint64_t now = Utils::now();
+	for(std::map<uint64_t,RemoteCallOutstanding>::iterator co(_remoteCallsOutstanding.begin());co!=_remoteCallsOutstanding.end();) {
+		if ((now - co->second.callTime) >= ZT_RPC_TIMEOUT) {
+			if (co->second.handler)
+				co->second.handler(co->second.arg,co->first,co->second.peer,ZT_RPC_ERROR_EXPIRED_NO_RESPONSE,std::vector<std::string>());
+			_remoteCallsOutstanding.erase(co++);
+		} else ++co;
+	}
+}
+
+} // namespace ZeroTier

+ 11 - 10
node/RPC.hpp

@@ -35,6 +35,7 @@
 #include <vector>
 #include <utility>
 
+#include "Constants.hpp"
 #include "NonCopyable.hpp"
 #include "Mutex.hpp"
 #include "Address.hpp"
@@ -69,13 +70,13 @@ class RuntimeEnvironment;
 class RPC : NonCopyable
 {
 public:
-#ifndef _WIN32
+#ifndef __WINDOWS__
 	/**
 	 * A local service accessible by RPC, non-Windows only for now
 	 *
 	 * Each service DLL must export these functions:
 	 *
-	 * void ZeroTierPluginInit();
+	 * int ZeroTierPluginInit();
 	 * int ZeroTierPluginDo(unsigned int,const unsigned int *,const void **,const unsigned int **,const void ***);
 	 * void ZeroTierPluginFree(int,const unsigned int *,const void **);
 	 * void ZeroTierPluginDestroy();
@@ -115,11 +116,14 @@ public:
 		 * @return Results from DLL
 		 * @throws std::runtime_error Error calling DLL
 		 */
-		std::vector<std::string> operator()(const std::vector<std::string> &args)
-			throw(std::runtime_error);
+		std::pair< int,std::vector<std::string> > operator()(const std::vector<std::string> &args);
 
 	private:
-		void *_dlhandle;
+		void *_handle;
+		void *_init;
+		void *_do;
+		void *_free;
+		void *_destroy;
 	};
 #endif
 
@@ -133,8 +137,7 @@ public:
 	 * @param args Arguments to method
 	 * @return Return value of method, and results (negative first item and empty vector means error)
 	 */
-	std::pair< int,std::vector<std::string> > callLocal(const std::string &name,const std::vector<std::string> &args)
-		throw();
+	std::pair< int,std::vector<std::string> > callLocal(const std::string &name,const std::vector<std::string> &args);
 
 	/**
 	 * Call a remote service
@@ -152,7 +155,7 @@ public:
 		const std::vector<std::string> &args,
 		void (*handler)(void *,uint64_t,const Address &,int,const std::vector<std::string> &),
 		void *arg)
-		throw(std::invalid_argument);
+		throw(std::invalid_argument,std::out_of_range);
 
 	/**
 	 * Periodically called to clean up, such as by expiring remote calls
@@ -169,8 +172,6 @@ private:
 	{
 		uint64_t callTime;
 		Address peer;
-		std::string name;
-		std::vector<std::string> &args;
 		void (*handler)(void *,uint64_t,const Address &,int,const std::vector<std::string> &);
 		void *arg;
 	};

+ 1 - 0
objects.mk

@@ -21,6 +21,7 @@ OBJS=\
 	node/PacketDecoder.o \
 	node/Pack.o \
 	node/Peer.o \
+	node/RPC.o \
 	node/Salsa20.o \
 	node/Switch.o \
 	node/SysEnv.o \