Browse Source

Added portable select interrupter

Paul-Louis Ageneau 5 years ago
parent
commit
8a3fe4587f
2 changed files with 83 additions and 18 deletions
  1. 64 15
      src/tcptransport.cpp
  2. 19 3
      src/tcptransport.hpp

+ 64 - 15
src/tcptransport.cpp

@@ -20,10 +20,70 @@
 
 #include "tcptransport.hpp"
 
+#include <exception>
+#ifndef _WIN32
+#include <fcntl.h>
+#include <unistd.h>
+#endif
+
 namespace rtc {
 
 using std::to_string;
 
+SelectInterrupter::SelectInterrupter() {
+#ifndef _WIN32
+	int pipefd[2];
+	if (::pipe(pipefd) != 0)
+		throw std::runtime_error("Failed to create pipe");
+	::fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
+	::fcntl(pipefd[1], F_SETFL, O_NONBLOCK);
+	mPipeOut = pipefd[0]; // read
+	mPipeIn = pipefd[1];  // write
+#endif
+}
+
+SelectInterrupter::~SelectInterrupter() {
+	std::lock_guard lock(mMutex);
+#ifdef _WIN32
+	if (mDummySock != INVALID_SOCKET)
+		::closesocket(mDummySock);
+#else
+	::close(mPipeIn);
+	::close(mPipeOut);
+#endif
+}
+
+int SelectInterrupter::prepare(fd_set &readfds, fd_set &writefds) {
+	std::lock_guard lock(mMutex);
+#ifdef _WIN32
+	if (mDummySock == INVALID_SOCKET)
+		mDummySock = ::socket(AF_INET, SOCK_DGRAM, 0);
+	FD_SET(mDummySock, &readfds);
+	return SOCK_TO_INT(mDummySock) + 1;
+#else
+	int ret;
+	do {
+		char dummy;
+		ret = ::read(mPipeIn, &dummy, 1);
+	} while (ret > 0);
+	FD_SET(mPipeIn, &readfds);
+	return mPipeIn + 1;
+#endif
+}
+
+void SelectInterrupter::interrupt() {
+	std::lock_guard lock(mMutex);
+#ifdef _WIN32
+	if (mDummySock != INVALID_SOCKET) {
+		::closesocket(mDummySock);
+		mDummySock = INVALID_SOCKET;
+	}
+#else
+	char dummy = 0;
+	::write(mPipeOut, &dummy, 1);
+#endif
+}
+
 TcpTransport::TcpTransport(const string &hostname, const string &service, state_callback callback)
     : Transport(nullptr, std::move(callback)), mHostname(hostname), mService(service) {
 
@@ -33,8 +93,6 @@ TcpTransport::TcpTransport(const string &hostname, const string &service, state_
 
 TcpTransport::~TcpTransport() {
 	stop();
-	if (mInterruptSock != INVALID_SOCKET)
-		::closesocket(mInterruptSock);
 }
 
 bool TcpTransport::stop() {
@@ -243,7 +301,6 @@ void TcpTransport::runLoop() {
 }
 
 int TcpTransport::prepareSelect(fd_set &readfds, fd_set &writefds) {
-	std::lock_guard lock(mInterruptMutex);
 	FD_ZERO(&readfds);
 	FD_ZERO(&writefds);
 	FD_SET(mSock, &readfds);
@@ -251,20 +308,12 @@ int TcpTransport::prepareSelect(fd_set &readfds, fd_set &writefds) {
 	if (!mSendQueue.empty())
 		FD_SET(mSock, &writefds);
 
-	if (mInterruptSock == INVALID_SOCKET)
-		mInterruptSock = ::socket(AF_INET, SOCK_DGRAM, 0);
-
-	FD_SET(mInterruptSock, &readfds);
-	return std::max(SOCKET_TO_INT(mSock), SOCKET_TO_INT(mInterruptSock)) + 1;
+	int n = SOCKET_TO_INT(mSock) + 1;
+	int m = mInterrupter.prepare(readfds, writefds);
+	return std::max(n, m);
 }
 
-void TcpTransport::interruptSelect() {
-	std::lock_guard lock(mInterruptMutex);
-	if (mInterruptSock != INVALID_SOCKET) {
-		::closesocket(mInterruptSock);
-		mInterruptSock = INVALID_SOCKET;
-	}
-}
+void TcpTransport::interruptSelect() { mInterrupter.interrupt(); }
 
 } // namespace rtc
 

+ 19 - 3
src/tcptransport.hpp

@@ -33,6 +33,24 @@
 
 namespace rtc {
 
+// Utility class to interrupt select()
+class SelectInterrupter {
+public:
+	SelectInterrupter();
+	~SelectInterrupter();
+
+	int prepare(fd_set &readfds, fd_set &writefds);
+	void interrupt();
+
+private:
+	std::mutex mMutex;
+#ifdef _WIN32
+	socket_t mDummySock = INVALID_SOCKET;
+#else // assume POSIX
+	int mPipeIn, mPipeOut;
+#endif
+};
+
 class TcpTransport : public Transport {
 public:
 	TcpTransport(const string &hostname, const string &service, state_callback callback);
@@ -60,10 +78,8 @@ private:
 	string mHostname, mService;
 
 	socket_t mSock = INVALID_SOCKET;
-	socket_t mInterruptSock = INVALID_SOCKET;
-	std::mutex mInterruptMutex;
 	std::thread mThread;
-
+	SelectInterrupter mInterrupter;
 	Queue<message_ptr> mSendQueue;
 };