Browse Source

Helps to use a proper multithreaded queue instead of ugly plebian hackery.

Adam Ierymenko 10 years ago
parent
commit
f873881a0d

+ 7 - 8
testnet.cpp

@@ -540,27 +540,26 @@ static void doUnicast(const std::vector<std::string> &cmd)
 	printf("---------- waiting up to %llu seconds..."ZT_EOL_S,tout / 1000ULL);
 	printf("---------- waiting up to %llu seconds..."ZT_EOL_S,tout / 1000ULL);
 
 
 	std::set< std::pair<Address,Address> > receivedPairs;
 	std::set< std::pair<Address,Address> > receivedPairs;
-	std::vector<TestEthernetTap::TestFrame> frames;
+	TestEthernetTap::TestFrame frame;
 	uint64_t toutend = Utils::now() + tout;
 	uint64_t toutend = Utils::now() + tout;
 	do {
 	do {
 		for(std::vector<Address>::iterator r(receivers.begin());r!=receivers.end();++r) {
 		for(std::vector<Address>::iterator r(receivers.begin());r!=receivers.end();++r) {
 			SimNode *receiver = nodes[*r];
 			SimNode *receiver = nodes[*r];
 			SharedPtr<TestEthernetTap> rtap(receiver->tapFactory.getByNwid(nwid));
 			SharedPtr<TestEthernetTap> rtap(receiver->tapFactory.getByNwid(nwid));
 			if (rtap) {
 			if (rtap) {
-				rtap->get(frames);
-				for(std::vector<TestEthernetTap::TestFrame>::iterator f(frames.begin());f!=frames.end();++f) {
-					if ((f->len == frameLen)&&(!memcmp(f->data + 16,pkt.data + 16,frameLen - 16))) {
+				while (rtap->getNextReceivedFrame(frame,1)) {
+					if ((frame.len == frameLen)&&(!memcmp(frame.data + 16,pkt.data + 16,frameLen - 16))) {
 						uint64_t ints[2];
 						uint64_t ints[2];
-						memcpy(ints,f->data,16);
-						printf("%s <- %.10llx received test packet, latency == %llums"ZT_EOL_S,r->toString().c_str(),ints[0],f->timestamp - ints[1]);
+						memcpy(ints,frame.data,16);
+						printf("%s <- %.10llx received test packet, latency == %llums"ZT_EOL_S,r->toString().c_str(),ints[0],frame.timestamp - ints[1]);
 						receivedPairs.insert(std::pair<Address,Address>(Address(ints[0]),*r));
 						receivedPairs.insert(std::pair<Address,Address>(Address(ints[0]),*r));
 					} else {
 					} else {
-						printf("%s !! got spurious packet, length == %u, etherType == %.4x"ZT_EOL_S,r->toString().c_str(),f->len,f->etherType);
+						printf("%s !! got spurious packet, length == %u, etherType == 0x%.4x"ZT_EOL_S,r->toString().c_str(),frame.len,frame.etherType);
 					}
 					}
 				}
 				}
 			}
 			}
 		}
 		}
-		Thread::sleep(250);
+		Thread::sleep(50);
 	} while ((receivedPairs.size() < sentPairs.size())&&(Utils::now() < toutend));
 	} while ((receivedPairs.size() < sentPairs.size())&&(Utils::now() < toutend));
 
 
 	for(std::vector<Address>::iterator s(senders.begin());s!=senders.end();++s) {
 	for(std::vector<Address>::iterator s(senders.begin());s!=senders.end();++s) {

+ 181 - 0
testnet/MTQ.hpp

@@ -0,0 +1,181 @@
+/*
+ * ZeroTier One - Global Peer to Peer Ethernet
+ * Copyright (C) 2012-2014  ZeroTier Networks LLC
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * ZeroTier may be used and distributed under the terms of the GPLv3, which
+ * are available at: http://www.gnu.org/licenses/gpl-3.0.html
+ *
+ * If you would like to embed ZeroTier into a commercial application or
+ * redistribute it in a modified binary form, please contact ZeroTier Networks
+ * LLC. Start here: http://www.zerotier.com/
+ */
+
+#ifndef ZT_MTQ_HPP
+#define ZT_MTQ_HPP
+
+#include <stdlib.h>
+#include <stdint.h>
+
+#include <queue>
+
+#include "../node/Constants.hpp"
+#include "../node/NonCopyable.hpp"
+#include "../node/Utils.hpp"
+
+#ifdef __WINDOWS__
+#include <Windows.h>
+#else
+#include <time.h>
+#include <pthread.h>
+#endif
+
+namespace ZeroTier {
+
+/**
+ * A synchronized multithreaded FIFO queue
+ *
+ * This is designed for a use case where one thread pushes, the
+ * other pops.
+ */
+template<typename T>
+class MTQ : NonCopyable
+{
+public:
+	MTQ()
+	{
+#ifdef __WINDOWS__
+		_sem = CreateSemaphore(NULL,0,0x7fffffff,NULL);
+		InitializeCriticalSection(&_cs);
+#else
+		pthread_mutex_init(&_mh,(const pthread_mutexattr_t *)0);
+		pthread_cond_init(&_cond,(const pthread_condattr_t *)0);
+#endif
+	}
+
+	~MTQ()
+	{
+#ifdef __WINDOWS__
+		CloseHandle(_sem);
+		DeleteCriticalSection(&_cs);
+#else
+		pthread_cond_destroy(&_cond);
+		pthread_mutex_destroy(&_mh);
+#endif
+	}
+
+	/**
+	 * Push something onto the end of the FIFO and signal waiting thread(s)
+	 *
+	 * @param v Value to push
+	 */
+	inline void push(const T &v)
+	{
+#ifdef __WINDOWS__
+		EnterCriticalSection(&_cs);
+		try {
+			_q.push(v);
+			LeaveCriticalSection(&_cs);
+			ReleaseSemaphore(_sem,1,NULL);
+		} catch ( ... ) {
+			LeaveCriticalSection(&_cs);
+			throw;
+		}
+#else
+		pthread_mutex_lock(const_cast <pthread_mutex_t *>(&_mh));
+		try {
+			_q.push(v);
+			pthread_mutex_unlock(const_cast <pthread_mutex_t *>(&_mh));
+			pthread_cond_signal(const_cast <pthread_cond_t *>(&_cond));
+		} catch ( ... ) {
+			pthread_mutex_unlock(const_cast <pthread_mutex_t *>(&_mh));
+			throw;
+		}
+#endif
+	}
+
+	/**
+	 * Pop fron queue with optional timeout
+	 *
+	 * @param v Result parameter to set to next value
+	 * @param ms Milliseconds timeout or 0 for none
+	 * @return True if v was set to something, false on timeout
+	 */
+	inline bool pop(T &v,unsigned long ms = 0)
+	{
+#ifdef __WINDOWS__
+		if (ms > 0)
+			WaitForSingleObject(_sem,(DWORD)ms);
+		else WaitForSingleObject(_sem,INFINITE);
+		EnterCriticalSection(&_cs);
+		try {
+			if (_q.empty()) {
+				LeaveCriticalSection(&_cs);
+				return false;
+			} else {
+				v = _q.front();
+				_q.pop();
+				LeaveCriticalSection(&_cs);
+				return true;
+			}
+		} catch ( ... ) {
+			LeaveCriticalSection(&_cs);
+			throw;
+		}
+#else
+		pthread_mutex_lock(const_cast <pthread_mutex_t *>(&_mh));
+		try {
+			if (_q.empty()) {
+				if (ms > 0) {
+					uint64_t when = Utils::now() + (uint64_t)ms;
+					struct timespec ts;
+					ts.tv_sec = (unsigned long)(when / 1000);
+					ts.tv_nsec = (unsigned long)(when % 1000) * (unsigned long)1000000;
+					pthread_cond_timedwait(const_cast <pthread_cond_t *>(&_cond),const_cast <pthread_mutex_t *>(&_mh),&ts);
+				} else {
+					pthread_cond_wait(const_cast <pthread_cond_t *>(&_cond),const_cast <pthread_mutex_t *>(&_mh));
+				}
+				if (_q.empty()) {
+					pthread_mutex_unlock(const_cast <pthread_mutex_t *>(&_mh));
+					return false;
+				}
+			}
+			v = _q.front();
+			_q.pop();
+			pthread_mutex_unlock(const_cast <pthread_mutex_t *>(&_mh));
+			return true;
+		} catch ( ... ) {
+			pthread_mutex_unlock(const_cast <pthread_mutex_t *>(&_mh));
+			throw;
+		}
+#endif
+	}
+
+private:
+	std::queue<T> _q;
+#ifdef __WINDOWS__
+	HANDLE _sem;
+	CRITICAL_SECTION _cs;
+#else
+	pthread_cond_t _cond;
+	pthread_mutex_t _mh;
+#endif
+};
+
+} // namespace ZeroTier
+
+#endif

+ 0 - 142
testnet/Semaphore.hpp

@@ -1,142 +0,0 @@
-/*
- * ZeroTier One - Global Peer to Peer Ethernet
- * Copyright (C) 2012-2014  ZeroTier Networks LLC
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- *
- * --
- *
- * ZeroTier may be used and distributed under the terms of the GPLv3, which
- * are available at: http://www.gnu.org/licenses/gpl-3.0.html
- *
- * If you would like to embed ZeroTier into a commercial application or
- * redistribute it in a modified binary form, please contact ZeroTier Networks
- * LLC. Start here: http://www.zerotier.com/
- */
-
-#ifndef ZT_SEMAPHORE_HPP
-#define ZT_SEMAPHORE_HPP
-
-#include "../node/Constants.hpp"
-#include "../node/NonCopyable.hpp"
-
-#ifdef __WINDOWS__
-
-#include <Windows.h>
-#include <stdlib.h>
-
-namespace ZeroTier {
-
-class Semaphore : NonCopyable
-{
-public:
-	Semaphore() throw() { _sem = CreateSemaphore(NULL,0,0x7fffffff,NULL); }
-	~Semaphore() { CloseHandle(_sem); }
-
-	inline void wait(unsigned long ms = 0) const
-		throw()
-	{
-		if (ms > 0)
-			WaitForSingleObject(_sem,(DWORD)ms);
-		else WaitForSingleObject(_sem,INFINITE);
-	}
-
-	inline void signal() const
-		throw()
-	{
-		ReleaseSemaphore(_sem,1,NULL);
-	}
-
-private:
-	HANDLE _sem;
-};
-
-} // namespace ZeroTier
-
-#else // !__WINDOWS__
-
-#include <time.h>
-#include <stdlib.h>
-#include <pthread.h>
-
-#include "../node/Utils.hpp"
-
-namespace ZeroTier {
-
-// This isn't quite a perfect semaphore, but the way we use it it's fine... we
-// just want this to signal when queues are ready.
-class Semaphore : NonCopyable
-{
-public:
-	Semaphore()
-		throw()
-	{
-		pthread_mutex_init(&_mh,(const pthread_mutexattr_t *)0);
-		pthread_cond_init(&_cond,(const pthread_condattr_t *)0);
-		_cnt = 0;
-	}
-
-	~Semaphore()
-	{
-		pthread_cond_destroy(&_cond);
-		pthread_mutex_destroy(&_mh);
-	}
-
-	inline void wait()
-		throw()
-	{
-		pthread_mutex_lock(const_cast <pthread_mutex_t *>(&_mh));
-		if (_cnt <= 0)
-			pthread_cond_wait(const_cast <pthread_cond_t *>(&_cond),const_cast <pthread_mutex_t *>(&_mh));
-		if (_cnt > 0)
-			--_cnt;
-		pthread_mutex_unlock(const_cast <pthread_mutex_t *>(&_mh));
-	}
-
-	inline void wait(unsigned long ms)
-		throw()
-	{
-		uint64_t when = Utils::now() + (uint64_t)ms;
-		struct timespec ts;
-		ts.tv_sec = (unsigned long)(when / 1000);
-		ts.tv_nsec = (unsigned long)(when % 1000) * 1000000;
-
-		pthread_mutex_lock(const_cast <pthread_mutex_t *>(&_mh));
-		if (_cnt <= 0)
-			pthread_cond_timedwait(const_cast <pthread_cond_t *>(&_cond),const_cast <pthread_mutex_t *>(&_mh),&ts);
-		if (_cnt > 0)
-			--_cnt;
-		pthread_mutex_unlock(const_cast <pthread_mutex_t *>(&_mh));
-	}
-
-	inline void signal()
-		throw()
-	{
-		pthread_mutex_lock(const_cast <pthread_mutex_t *>(&_mh));
-		++_cnt;
-		pthread_mutex_unlock(const_cast <pthread_mutex_t *>(&_mh));
-		pthread_cond_signal(const_cast <pthread_cond_t *>(&_cond));
-	}
-
-private:
-	pthread_cond_t _cond;
-	pthread_mutex_t _mh;
-	volatile int _cnt;
-};
-
-} // namespace ZeroTier
-
-#endif // !__WINDOWS__
-
-#endif

+ 4 - 22
testnet/SimNetSocketManager.cpp

@@ -73,32 +73,14 @@ bool SimNetSocketManager::send(const InetAddress &to,bool tcp,bool autoConnectTc
 
 
 void SimNetSocketManager::poll(unsigned long timeout,void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg)
 void SimNetSocketManager::poll(unsigned long timeout,void (*handler)(const SharedPtr<Socket> &,void *,const InetAddress &,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> &),void *arg)
 {
 {
-	std::vector< std::pair< InetAddress,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> > > inb;
-
-	{
-		Mutex::Lock _l(_inbox_m);
-		inb = _inbox;
-		_inbox.clear();
-	}
-	for(std::vector< std::pair< InetAddress,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> > >::iterator i(inb.begin());i!=inb.end();++i)
-		handler(_mySocket,arg,i->first,i->second);
-
-	if (timeout)
-		_waitCond.wait(timeout);
-	else _waitCond.wait();
-
-	{
-		Mutex::Lock _l(_inbox_m);
-		inb = _inbox;
-		_inbox.clear();
-	}
-	for(std::vector< std::pair< InetAddress,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> > >::iterator i(inb.begin());i!=inb.end();++i)
-		handler(_mySocket,arg,i->first,i->second);
+	std::pair< InetAddress,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> > msg;
+	if ((_inbox.pop(msg,timeout))&&(msg.second.size()))
+		handler(_mySocket,arg,msg.first,msg.second);
 }
 }
 
 
 void SimNetSocketManager::whack()
 void SimNetSocketManager::whack()
 {
 {
-	_waitCond.signal();
+	_inbox.push(std::pair< InetAddress,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> >());
 }
 }
 
 
 void SimNetSocketManager::closeTcpSockets()
 void SimNetSocketManager::closeTcpSockets()

+ 4 - 8
testnet/SimNetSocketManager.hpp

@@ -35,7 +35,8 @@
 #include "../node/Constants.hpp"
 #include "../node/Constants.hpp"
 #include "../node/SocketManager.hpp"
 #include "../node/SocketManager.hpp"
 #include "../node/Mutex.hpp"
 #include "../node/Mutex.hpp"
-#include "Semaphore.hpp"
+
+#include "MTQ.hpp"
 
 
 namespace ZeroTier {
 namespace ZeroTier {
 
 
@@ -96,9 +97,7 @@ public:
 	 */
 	 */
 	inline void enqueue(const InetAddress &from,const void *data,unsigned int len)
 	inline void enqueue(const InetAddress &from,const void *data,unsigned int len)
 	{
 	{
-		Mutex::Lock _l(_inbox_m);
-		_inbox.push_back(std::pair< InetAddress,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> >(from,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN>(data,len)));
-		_waitCond.signal();
+		_inbox.push(std::pair< InetAddress,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> >(from,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN>(data,len)));
 	}
 	}
 
 
 	virtual bool send(const InetAddress &to,bool tcp,bool autoConnectTcp,const void *msg,unsigned int msglen);
 	virtual bool send(const InetAddress &to,bool tcp,bool autoConnectTcp,const void *msg,unsigned int msglen);
@@ -114,13 +113,10 @@ private:
 	SharedPtr<Socket> _mySocket;
 	SharedPtr<Socket> _mySocket;
 	TransferStats _totals;
 	TransferStats _totals;
 
 
-	std::vector< std::pair< InetAddress,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> > > _inbox;
-	Mutex _inbox_m;
+	MTQ< std::pair< InetAddress,Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> > > _inbox;
 
 
 	std::map< InetAddress,TransferStats > _stats;
 	std::map< InetAddress,TransferStats > _stats;
 	Mutex _stats_m;
 	Mutex _stats_m;
-
-	Semaphore _waitCond;
 };
 };
 
 
 } // namespace ZeroTier
 } // namespace ZeroTier

+ 10 - 31
testnet/TestEthernetTap.cpp

@@ -77,12 +77,8 @@ TestEthernetTap::TestEthernetTap(
 
 
 TestEthernetTap::~TestEthernetTap()
 TestEthernetTap::~TestEthernetTap()
 {
 {
-	static const TestFrame zf;
-	{
-		Mutex::Lock _l(_pq_m);
-		_pq.push_back(zf); // 0 length frame = exit
-		_pq_c.signal();
-	}
+	static const TestFrame zf; // use a static empty frame because of weirdo G++ warning bug...
+	_pq.push(zf); // empty frame terminates thread
 	Thread::join(_thread);
 	Thread::join(_thread);
 }
 }
 
 
@@ -113,8 +109,7 @@ std::set<InetAddress> TestEthernetTap::ips() const
 
 
 void TestEthernetTap::put(const MAC &from,const MAC &to,unsigned int etherType,const void *data,unsigned int len)
 void TestEthernetTap::put(const MAC &from,const MAC &to,unsigned int etherType,const void *data,unsigned int len)
 {
 {
-	Mutex::Lock _l(_gq_m);
-	_gq.push_back(TestFrame(from,to,data,etherType,len));
+	_gq.push(TestFrame(from,to,data,etherType,len));
 }
 }
 
 
 std::string TestEthernetTap::deviceName() const
 std::string TestEthernetTap::deviceName() const
@@ -135,38 +130,22 @@ bool TestEthernetTap::injectPacketFromHost(const MAC &from,const MAC &to,unsigne
 {
 {
 	if ((len == 0)||(len > 2800))
 	if ((len == 0)||(len > 2800))
 		return false;
 		return false;
-
-	{
-		Mutex::Lock _l(_pq_m);
-		_pq.push_back(TestFrame(from,to,data,etherType & 0xffff,len));
-		_pq_c.signal();
-	}
-
+	_pq.push(TestFrame(from,to,data,etherType & 0xffff,len));
 	return true;
 	return true;
 }
 }
 
 
 void TestEthernetTap::threadMain()
 void TestEthernetTap::threadMain()
 	throw()
 	throw()
 {
 {
-	std::vector<TestFrame> q;
+	TestFrame f;
 	for(;;) {
 	for(;;) {
-		{
-			Mutex::Lock _l(_pq_m);
-			q = _pq;
-			_pq.clear();
-		}
-
-		for(std::vector<TestFrame>::iterator f(q.begin());f!=q.end();++f) {
-			if (!f->len)
-				return; // empty frame signals thread to die
-			else if (_enabled) {
+		if (_pq.pop(f,0)) {
+			if (f.len) {
 				try {
 				try {
-					_handler(_arg,f->from,f->to,f->etherType,Buffer<4096>(f->data,f->len));
-				} catch ( ... ) {} // handlers should not throw
-			}
+					_handler(_arg,f.from,f.to,f.etherType,Buffer<4096>(f.data,f.len));
+				} catch ( ... ) {}
+			} else break;
 		}
 		}
-
-		_pq_c.wait(1000);
 	}
 	}
 }
 }
 
 

+ 5 - 15
testnet/TestEthernetTap.hpp

@@ -40,7 +40,8 @@
 #include "../node/SharedPtr.hpp"
 #include "../node/SharedPtr.hpp"
 #include "../node/Thread.hpp"
 #include "../node/Thread.hpp"
 #include "../node/Mutex.hpp"
 #include "../node/Mutex.hpp"
-#include "Semaphore.hpp"
+
+#include "MTQ.hpp"
 
 
 namespace ZeroTier {
 namespace ZeroTier {
 
 
@@ -105,14 +106,7 @@ public:
 	virtual bool injectPacketFromHost(const MAC &from,const MAC &to,unsigned int etherType,const void *data,unsigned int len);
 	virtual bool injectPacketFromHost(const MAC &from,const MAC &to,unsigned int etherType,const void *data,unsigned int len);
 
 
 	inline uint64_t nwid() const { return _nwid; }
 	inline uint64_t nwid() const { return _nwid; }
-
-	// Get things that have been put() and empty queue
-	inline void get(std::vector<TestFrame> &v)
-	{
-		Mutex::Lock _l(_gq_m);
-		v = _gq;
-		_gq.clear();
-	}
+	inline bool getNextReceivedFrame(TestFrame &v,unsigned long timeout) { return _gq.pop(v,timeout); }
 
 
 	void threadMain()
 	void threadMain()
 		throw();
 		throw();
@@ -127,12 +121,8 @@ private:
 	std::string _dev;
 	std::string _dev;
 	volatile bool _enabled;
 	volatile bool _enabled;
 
 
-	std::vector< TestFrame > _pq;
-	Mutex _pq_m;
-	Semaphore _pq_c;
-
-	std::vector< TestFrame > _gq;
-	Mutex _gq_m;
+	MTQ<TestFrame> _pq;
+	MTQ<TestFrame> _gq;
 
 
 	AtomicCounter __refCount;
 	AtomicCounter __refCount;
 };
 };