Browse Source

Implemented socket select interruption

Paul-Louis Ageneau 5 years ago
parent
commit
5d9ed26740
2 changed files with 37 additions and 8 deletions
  1. 28 7
      src/tcptransport.cpp
  2. 9 1
      src/tcptransport.hpp

+ 28 - 7
src/tcptransport.cpp

@@ -52,10 +52,12 @@ bool TcpTransport::outgoing(message_ptr message) {
 		return mSendQueue.empty();
 		return mSendQueue.empty();
 
 
 	// If nothing is pending, try to send directly
 	// If nothing is pending, try to send directly
+	// It's safe because if the queue is empty, the thread is not sending
 	if (mSendQueue.empty() && trySendMessage(message))
 	if (mSendQueue.empty() && trySendMessage(message))
 		return true;
 		return true;
 
 
 	mSendQueue.push(message);
 	mSendQueue.push(message);
+	interruptSelect(); // so the thread waits for writability
 	return false;
 	return false;
 }
 }
 
 
@@ -171,13 +173,8 @@ void TcpTransport::runLoop() {
 	try {
 	try {
 		while (true) {
 		while (true) {
 			fd_set readfds, writefds;
 			fd_set readfds, writefds;
-			FD_ZERO(&readfds);
-			FD_ZERO(&writefds);
-			FD_SET(mSock, &readfds);
-			// TODO
-			if (!mSendQueue.empty())
-				FD_SET(mSock, &writefds);
-			int ret = ::select(SOCKET_TO_INT(mSock) + 1, &readfds, &writefds, NULL, NULL);
+			int n = prepareSelect(readfds, writefds);
+			int ret = ::select(n, &readfds, &writefds, NULL, NULL);
 			if (ret < 0)
 			if (ret < 0)
 				throw std::runtime_error("Failed to wait on socket");
 				throw std::runtime_error("Failed to wait on socket");
 
 
@@ -205,6 +202,30 @@ void TcpTransport::runLoop() {
 	recv(nullptr);
 	recv(nullptr);
 }
 }
 
 
+int TcpTransport::prepareSelect(fd_set &readfds, fd_set &writefds) {
+	std::lock_guard lock(mInterruptMutex);
+	FD_ZERO(&readfds);
+	FD_ZERO(&writefds);
+	FD_SET(mSock, &readfds);
+
+	if (!mSendQueue.empty())
+		FD_SET(mSock, &writefds);
+
+	if (mInterruptSock == INVALID_SOCKET)
+		mInterruptSock = ::socket(AF_INET, SOCK_DGRAM, 0);
+
+	FD_SET(mInterruptSock, &readfds);
+	return std::max(SOCKET_TO_INT(mSock), SOCKET_TO_INT(mInterruptSock)) + 1;
+}
+
+void TcpTransport::interruptSelect() {
+	std::lock_guard lock(mInterruptMutex);
+	if (mInterruptSock != INVALID_SOCKET) {
+		::closesocket(mInterruptSock);
+		mInterruptSock = INVALID_SOCKET;
+	}
+}
+
 } // namespace rtc
 } // namespace rtc
 
 
 #endif
 #endif

+ 9 - 1
src/tcptransport.hpp

@@ -25,6 +25,7 @@
 #include "queue.hpp"
 #include "queue.hpp"
 #include "transport.hpp"
 #include "transport.hpp"
 
 
+#include <mutex>
 #include <thread>
 #include <thread>
 
 
 // Use the socket defines from libjuice
 // Use the socket defines from libjuice
@@ -53,9 +54,16 @@ private:
 
 
 	void runLoop();
 	void runLoop();
 
 
+	int prepareSelect(fd_set &readfds, fd_set &writefds);
+	void interruptSelect();
+
 	string mHostname, mService;
 	string mHostname, mService;
-	socket_t mSock;
+
+	socket_t mSock = INVALID_SOCKET;
+	socket_t mInterruptSock = INVALID_SOCKET;
+	std::mutex mInterruptMutex;
 	std::thread mThread;
 	std::thread mThread;
+
 	Queue<message_ptr> mSendQueue;
 	Queue<message_ptr> mSendQueue;
 };
 };