Browse Source

Moved GnuTLS DTLS and TLS transport implementations to thread pool

Paul-Louis Ageneau 2 years ago
parent
commit
3d4cba4957
4 changed files with 167 additions and 138 deletions
  1. 99 81
      src/impl/dtlstransport.cpp
  2. 5 4
      src/impl/dtlstransport.hpp
  3. 58 49
      src/impl/tlstransport.cpp
  4. 5 4
      src/impl/tlstransport.hpp

+ 99 - 81
src/impl/dtlstransport.cpp

@@ -9,6 +9,7 @@
 #include "dtlstransport.hpp"
 #include "dtlstransport.hpp"
 #include "icetransport.hpp"
 #include "icetransport.hpp"
 #include "internals.hpp"
 #include "internals.hpp"
+#include "threadpool.hpp"
 
 
 #include <algorithm>
 #include <algorithm>
 #include <chrono>
 #include <chrono>
@@ -27,6 +28,16 @@ using namespace std::chrono;
 
 
 namespace rtc::impl {
 namespace rtc::impl {
 
 
+void DtlsTransport::enqueueRecv() {
+	if (mPendingRecvCount > 0)
+		return;
+
+	if (auto shared_this = weak_from_this().lock()) {
+		++mPendingRecvCount;
+		ThreadPool::Instance().enqueue(&DtlsTransport::doRecv, std::move(shared_this));
+	}
+}
+
 #if USE_GNUTLS
 #if USE_GNUTLS
 
 
 void DtlsTransport::Init() {
 void DtlsTransport::Init() {
@@ -50,7 +61,8 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, certificate_ptr cer
 	gnutls_certificate_credentials_t creds = mCertificate->credentials();
 	gnutls_certificate_credentials_t creds = mCertificate->credentials();
 	gnutls_certificate_set_verify_function(creds, CertificateCallback);
 	gnutls_certificate_set_verify_function(creds, CertificateCallback);
 
 
-	unsigned int flags = GNUTLS_DATAGRAM | (mIsClient ? GNUTLS_CLIENT : GNUTLS_SERVER);
+	unsigned int flags =
+	    GNUTLS_DATAGRAM | GNUTLS_NONBLOCK | (mIsClient ? GNUTLS_CLIENT : GNUTLS_SERVER);
 	gnutls::check(gnutls_init(&mSession, flags));
 	gnutls::check(gnutls_init(&mSession, flags));
 
 
 	try {
 	try {
@@ -80,6 +92,10 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, certificate_ptr cer
 		gnutls_transport_set_pull_function(mSession, ReadCallback);
 		gnutls_transport_set_pull_function(mSession, ReadCallback);
 		gnutls_transport_set_pull_timeout_function(mSession, TimeoutCallback);
 		gnutls_transport_set_pull_timeout_function(mSession, TimeoutCallback);
 
 
+		size_t mtu = mMtu.value_or(DEFAULT_MTU) - 8 - 40; // UDP/IPv6
+		gnutls_dtls_set_mtu(mSession, static_cast<unsigned int>(mtu));
+		PLOG_VERBOSE << "DTLS MTU set to " << mtu;
+
 	} catch (...) {
 	} catch (...) {
 		gnutls_deinit(mSession);
 		gnutls_deinit(mSession);
 		throw;
 		throw;
@@ -98,22 +114,17 @@ DtlsTransport::~DtlsTransport() {
 }
 }
 
 
 void DtlsTransport::start() {
 void DtlsTransport::start() {
-	if(mStarted.exchange(true))
-		return;
-
-	PLOG_DEBUG << "Starting DTLS recv thread";
+	PLOG_DEBUG << "Starting DTLS transport";
 	registerIncoming();
 	registerIncoming();
-	mRecvThread = std::thread(&DtlsTransport::runRecvLoop, this);
+	changeState(State::Connecting);
+	enqueueRecv(); // to initiate the handshake
 }
 }
 
 
 void DtlsTransport::stop() {
 void DtlsTransport::stop() {
-	if(!mStarted.exchange(false))
-		return;
-
-	PLOG_DEBUG << "Stopping DTLS recv thread";
+	PLOG_DEBUG << "Stopping DTLS transport";
 	unregisterIncoming();
 	unregisterIncoming();
 	mIncomingQueue.stop();
 	mIncomingQueue.stop();
-	mRecvThread.join();
+	enqueueRecv();
 }
 }
 
 
 bool DtlsTransport::send(message_ptr message) {
 bool DtlsTransport::send(message_ptr message) {
@@ -122,7 +133,6 @@ bool DtlsTransport::send(message_ptr message) {
 
 
 	PLOG_VERBOSE << "Send size=" << message->size();
 	PLOG_VERBOSE << "Send size=" << message->size();
 
 
-
 	ssize_t ret;
 	ssize_t ret;
 	do {
 	do {
 		std::lock_guard lock(mSendMutex);
 		std::lock_guard lock(mSendMutex);
@@ -147,6 +157,7 @@ void DtlsTransport::incoming(message_ptr message) {
 
 
 	PLOG_VERBOSE << "Incoming size=" << message->size();
 	PLOG_VERBOSE << "Incoming size=" << message->size();
 	mIncomingQueue.push(message);
 	mIncomingQueue.push(message);
+	enqueueRecv();
 }
 }
 
 
 bool DtlsTransport::outgoing(message_ptr message) {
 bool DtlsTransport::outgoing(message_ptr message) {
@@ -166,79 +177,82 @@ void DtlsTransport::postHandshake() {
 	// Dummy
 	// Dummy
 }
 }
 
 
-void DtlsTransport::runRecvLoop() {
+void DtlsTransport::doRecv() {
+	std::lock_guard lock(mRecvMutex);
+	--mPendingRecvCount;
+
 	const size_t bufferSize = 4096;
 	const size_t bufferSize = 4096;
+	char buffer[bufferSize];
 
 
-	// Handshake loop
 	try {
 	try {
-		changeState(State::Connecting);
-
-		size_t mtu = mMtu.value_or(DEFAULT_MTU) - 8 - 40; // UDP/IPv6
-		gnutls_dtls_set_mtu(mSession, static_cast<unsigned int>(mtu));
-		PLOG_VERBOSE << "SSL MTU set to " << mtu;
-
-		int ret;
-		do {
-			ret = gnutls_handshake(mSession);
-
-			if (ret == GNUTLS_E_LARGE_PACKET)
-				throw std::runtime_error("MTU is too low");
+		// Handle handshake if connecting
+		if (state() == State::Connecting) {
+			int ret;
+			do {
+				ret = gnutls_handshake(mSession);
+
+				if (ret == GNUTLS_E_AGAIN) {
+					// Schedule next call on timeout and return
+					duration timeout = milliseconds(gnutls_dtls_get_timeout(mSession));
+					ThreadPool::Instance().schedule(timeout, [weak_this = weak_from_this()]() {
+						if (auto locked = weak_this.lock())
+							locked->doRecv();
+					});
+					return;
+				}
 
 
-		} while (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN ||
-		         !gnutls::check(ret, "DTLS handshake failed"));
+				if (ret == GNUTLS_E_LARGE_PACKET) {
+					throw std::runtime_error("MTU is too low");
+				}
 
 
-		// RFC 8261: DTLS MUST support sending messages larger than the current path MTU
-		// See https://www.rfc-editor.org/rfc/rfc8261.html#section-5
-		gnutls_dtls_set_mtu(mSession, bufferSize + 1);
+			} while (!gnutls::check(ret, "DTLS handshake failed")); // Re-call on non-fatal error
 
 
-	} catch (const std::exception &e) {
-		PLOG_ERROR << "DTLS handshake: " << e.what();
-		changeState(State::Failed);
-		return;
-	}
+			// RFC 8261: DTLS MUST support sending messages larger than the current path MTU
+			// See https://www.rfc-editor.org/rfc/rfc8261.html#section-5
+			gnutls_dtls_set_mtu(mSession, bufferSize + 1);
 
 
-	// Receive loop
-	try {
-		PLOG_INFO << "DTLS handshake finished";
-		postHandshake();
-		changeState(State::Connected);
+			PLOG_INFO << "DTLS handshake finished";
+			changeState(State::Connected);
+			postHandshake();
+		}
 
 
-		char buffer[bufferSize];
+		if (state() == State::Connected) {
+			while (true) {
+				ssize_t ret = gnutls_record_recv(mSession, buffer, bufferSize);
 
 
-		while (true) {
-			ssize_t ret;
-			do {
-				ret = gnutls_record_recv(mSession, buffer, bufferSize);
-			} while (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN);
-
-			// RFC 8827: Implementations MUST NOT implement DTLS renegotiation and MUST reject it
-			// with a "no_renegotiation" alert if offered.
-			// See https://www.rfc-editor.org/rfc/rfc8827.html#section-6.5
-			if (ret == GNUTLS_E_REHANDSHAKE) {
-				do {
-					std::lock_guard lock(mSendMutex);
-					ret = gnutls_alert_send(mSession, GNUTLS_AL_WARNING, GNUTLS_A_NO_RENEGOTIATION);
-				} while (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN);
-				continue;
-			}
+				if (ret == GNUTLS_E_AGAIN) {
+					return;
+				}
 
 
-			// Consider premature termination as remote closing
-			if (ret == GNUTLS_E_PREMATURE_TERMINATION) {
-				PLOG_DEBUG << "DTLS connection terminated";
-				break;
-			}
+				// RFC 8827: Implementations MUST NOT implement DTLS renegotiation and MUST reject
+				// it with a "no_renegotiation" alert if offered. See
+				// https://www.rfc-editor.org/rfc/rfc8827.html#section-6.5
+				if (ret == GNUTLS_E_REHANDSHAKE) {
+					do {
+						std::lock_guard lock(mSendMutex);
+						ret = gnutls_alert_send(mSession, GNUTLS_AL_WARNING,
+						                        GNUTLS_A_NO_RENEGOTIATION);
+					} while (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN);
+					continue;
+				}
 
 
-			if (gnutls::check(ret)) {
-				if (ret == 0) {
-					// Closed
-					PLOG_DEBUG << "DTLS connection cleanly closed";
+				// Consider premature termination as remote closing
+				if (ret == GNUTLS_E_PREMATURE_TERMINATION) {
+					PLOG_DEBUG << "DTLS connection terminated";
 					break;
 					break;
 				}
 				}
-				auto *b = reinterpret_cast<byte *>(buffer);
-				recv(make_message(b, b + ret));
+
+				if (gnutls::check(ret)) {
+					if (ret == 0) {
+						// Closed
+						PLOG_DEBUG << "DTLS connection cleanly closed";
+						break;
+					}
+					auto *b = reinterpret_cast<byte *>(buffer);
+					recv(make_message(b, b + ret));
+				}
 			}
 			}
 		}
 		}
-
 	} catch (const std::exception &e) {
 	} catch (const std::exception &e) {
 		PLOG_ERROR << "DTLS recv: " << e.what();
 		PLOG_ERROR << "DTLS recv: " << e.what();
 	}
 	}
@@ -314,9 +328,14 @@ ssize_t DtlsTransport::ReadCallback(gnutls_transport_ptr_t ptr, void *data, size
 			return len;
 			return len;
 		}
 		}
 
 
-		// Closed
-		gnutls_transport_set_errno(t->mSession, 0);
-		return 0;
+		if (t->mIncomingQueue.running()) {
+			gnutls_transport_set_errno(t->mSession, EAGAIN);
+			return -1;
+		} else {
+			// Closed
+			gnutls_transport_set_errno(t->mSession, 0);
+			return 0;
+		}
 
 
 	} catch (const std::exception &e) {
 	} catch (const std::exception &e) {
 		PLOG_WARNING << e.what();
 		PLOG_WARNING << e.what();
@@ -325,12 +344,10 @@ ssize_t DtlsTransport::ReadCallback(gnutls_transport_ptr_t ptr, void *data, size
 	}
 	}
 }
 }
 
 
-int DtlsTransport::TimeoutCallback(gnutls_transport_ptr_t ptr, unsigned int ms) {
+int DtlsTransport::TimeoutCallback(gnutls_transport_ptr_t ptr, unsigned int /* ms */) {
 	DtlsTransport *t = static_cast<DtlsTransport *>(ptr);
 	DtlsTransport *t = static_cast<DtlsTransport *>(ptr);
 	try {
 	try {
-		bool isReadable = t->mIncomingQueue.wait(
-		    ms != GNUTLS_INDEFINITE_TIMEOUT ? std::make_optional(milliseconds(ms)) : nullopt); // TODO
-		return isReadable ? 1 : 0;
+		return !t->mIncomingQueue.empty() ? 1 : 0;
 
 
 	} catch (const std::exception &e) {
 	} catch (const std::exception &e) {
 		PLOG_WARNING << e.what();
 		PLOG_WARNING << e.what();
@@ -438,10 +455,11 @@ DtlsTransport::DtlsTransport(shared_ptr<IceTransport> lower, certificate_ptr cer
 		// See https://www.rfc-editor.org/rfc/rfc8827.html#section-6.5 Warning:
 		// See https://www.rfc-editor.org/rfc/rfc8827.html#section-6.5 Warning:
 		// SSL_set_tlsext_use_srtp() returns 0 on success and 1 on error
 		// SSL_set_tlsext_use_srtp() returns 0 on success and 1 on error
 		// Try to use GCM suite
 		// Try to use GCM suite
-		if (SSL_set_tlsext_use_srtp(mSsl, "SRTP_AEAD_AES_256_GCM:SRTP_AEAD_AES_128_GCM:SRTP_AES128_CM_SHA1_80")) {
+		if (SSL_set_tlsext_use_srtp(
+		        mSsl, "SRTP_AEAD_AES_256_GCM:SRTP_AEAD_AES_128_GCM:SRTP_AES128_CM_SHA1_80")) {
 			if (SSL_set_tlsext_use_srtp(mSsl, "SRTP_AES128_CM_SHA1_80"))
 			if (SSL_set_tlsext_use_srtp(mSsl, "SRTP_AES128_CM_SHA1_80"))
 				throw std::runtime_error("Failed to set SRTP profile: " +
 				throw std::runtime_error("Failed to set SRTP profile: " +
-							openssl::error_string(ERR_get_error()));
+				                         openssl::error_string(ERR_get_error()));
 		}
 		}
 	} catch (...) {
 	} catch (...) {
 		if (mSsl)
 		if (mSsl)
@@ -465,7 +483,7 @@ DtlsTransport::~DtlsTransport() {
 }
 }
 
 
 void DtlsTransport::start() {
 void DtlsTransport::start() {
-	if(mStarted.exchange(true))
+	if (mStarted.exchange(true))
 		return;
 		return;
 
 
 	PLOG_DEBUG << "Starting DTLS recv thread";
 	PLOG_DEBUG << "Starting DTLS recv thread";
@@ -474,7 +492,7 @@ void DtlsTransport::start() {
 }
 }
 
 
 void DtlsTransport::stop() {
 void DtlsTransport::stop() {
-	if(!mStarted.exchange(false))
+	if (!mStarted.exchange(false))
 		return;
 		return;
 
 
 	PLOG_DEBUG << "Stopping DTLS recv thread";
 	PLOG_DEBUG << "Stopping DTLS recv thread";

+ 5 - 4
src/impl/dtlstransport.hpp

@@ -25,7 +25,7 @@ namespace rtc::impl {
 
 
 class IceTransport;
 class IceTransport;
 
 
-class DtlsTransport : public Transport {
+class DtlsTransport : public Transport, public std::enable_shared_from_this<DtlsTransport> {
 public:
 public:
 	static void Init();
 	static void Init();
 	static void Cleanup();
 	static void Cleanup();
@@ -48,7 +48,8 @@ protected:
 	virtual bool demuxMessage(message_ptr message);
 	virtual bool demuxMessage(message_ptr message);
 	virtual void postHandshake();
 	virtual void postHandshake();
 
 
-	void runRecvLoop();
+	void enqueueRecv();
+	void doRecv();
 
 
 	const optional<size_t> mMtu;
 	const optional<size_t> mMtu;
 	const certificate_ptr mCertificate;
 	const certificate_ptr mCertificate;
@@ -56,8 +57,8 @@ protected:
 	const bool mIsClient;
 	const bool mIsClient;
 
 
 	Queue<message_ptr> mIncomingQueue;
 	Queue<message_ptr> mIncomingQueue;
-	std::thread mRecvThread;
-	std::atomic<bool> mStarted = false;
+	std::atomic<int> mPendingRecvCount = 0;
+	std::mutex mRecvMutex;
 	std::atomic<unsigned int> mCurrentDscp = 0;
 	std::atomic<unsigned int> mCurrentDscp = 0;
 	std::atomic<bool> mOutgoingResult = true;
 	std::atomic<bool> mOutgoingResult = true;
 
 

+ 58 - 49
src/impl/tlstransport.cpp

@@ -8,6 +8,7 @@
 
 
 #include "tlstransport.hpp"
 #include "tlstransport.hpp"
 #include "tcptransport.hpp"
 #include "tcptransport.hpp"
+#include "threadpool.hpp"
 
 
 #if RTC_ENABLE_WEBSOCKET
 #if RTC_ENABLE_WEBSOCKET
 
 
@@ -20,6 +21,16 @@ using namespace std::chrono;
 
 
 namespace rtc::impl {
 namespace rtc::impl {
 
 
+void TlsTransport::enqueueRecv() {
+	if (mPendingRecvCount > 0)
+		return;
+
+	if (auto shared_this = weak_from_this().lock()) {
+		++mPendingRecvCount;
+		ThreadPool::Instance().enqueue(&TlsTransport::doRecv, std::move(shared_this));
+	}
+}
+
 #if USE_GNUTLS
 #if USE_GNUTLS
 
 
 namespace {
 namespace {
@@ -54,7 +65,8 @@ TlsTransport::TlsTransport(shared_ptr<TcpTransport> lower, optional<string> host
 
 
 	PLOG_DEBUG << "Initializing TLS transport (GnuTLS)";
 	PLOG_DEBUG << "Initializing TLS transport (GnuTLS)";
 
 
-	gnutls::check(gnutls_init(&mSession, mIsClient ? GNUTLS_CLIENT : GNUTLS_SERVER));
+	unsigned int flags = GNUTLS_NONBLOCK | (mIsClient ? GNUTLS_CLIENT : GNUTLS_SERVER);
+	gnutls::check(gnutls_init(&mSession, flags));
 
 
 	try {
 	try {
 		const char *priorities = "SECURE128:-VERS-SSL3.0:-ARCFOUR-128";
 		const char *priorities = "SECURE128:-VERS-SSL3.0:-ARCFOUR-128";
@@ -89,22 +101,17 @@ TlsTransport::~TlsTransport() {
 }
 }
 
 
 void TlsTransport::start() {
 void TlsTransport::start() {
-	if (mStarted.exchange(true))
-		return;
-
-	PLOG_DEBUG << "Starting TLS recv thread";
+	PLOG_DEBUG << "Starting TLS transport";
 	registerIncoming();
 	registerIncoming();
-	mRecvThread = std::thread(&TlsTransport::runRecvLoop, this);
+	changeState(State::Connecting);
+	enqueueRecv(); // to initiate the handshake
 }
 }
 
 
 void TlsTransport::stop() {
 void TlsTransport::stop() {
-	if (!mStarted.exchange(false))
-		return;
-
-	PLOG_DEBUG << "Stopping TLS recv thread";
+	PLOG_DEBUG << "Stopping TLS transport";
 	unregisterIncoming();
 	unregisterIncoming();
 	mIncomingQueue.stop();
 	mIncomingQueue.stop();
-	mRecvThread.join();
+	enqueueRecv();
 }
 }
 
 
 bool TlsTransport::send(message_ptr message) {
 bool TlsTransport::send(message_ptr message) {
@@ -135,6 +142,7 @@ void TlsTransport::incoming(message_ptr message) {
 
 
 	PLOG_VERBOSE << "Incoming size=" << message->size();
 	PLOG_VERBOSE << "Incoming size=" << message->size();
 	mIncomingQueue.push(message);
 	mIncomingQueue.push(message);
+	enqueueRecv();
 }
 }
 
 
 bool TlsTransport::outgoing(message_ptr message) {
 bool TlsTransport::outgoing(message_ptr message) {
@@ -147,52 +155,52 @@ void TlsTransport::postHandshake() {
 	// Dummy
 	// Dummy
 }
 }
 
 
-void TlsTransport::runRecvLoop() {
+void TlsTransport::doRecv() {
+	std::lock_guard lock(mRecvMutex);
+	--mPendingRecvCount;
+
 	const size_t bufferSize = 4096;
 	const size_t bufferSize = 4096;
 	char buffer[bufferSize];
 	char buffer[bufferSize];
 
 
-	// Handshake loop
 	try {
 	try {
-		changeState(State::Connecting);
+		// Handle handshake if connecting
+		if (state() == State::Connecting) {
+			int ret;
+			do {
+				ret = gnutls_handshake(mSession);
 
 
-		int ret;
-		do {
-			ret = gnutls_handshake(mSession);
-		} while (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN ||
-		         !gnutls::check(ret, "TLS handshake failed"));
+				if (ret == GNUTLS_E_AGAIN)
+					return;
 
 
-	} catch (const std::exception &e) {
-		PLOG_ERROR << "TLS handshake: " << e.what();
-		changeState(State::Failed);
-		return;
-	}
+			} while (!gnutls::check(ret, "TLS handshake failed")); // Re-call on non-fatal error
 
 
-	// Receive loop
-	try {
-		PLOG_INFO << "TLS handshake finished";
-		changeState(State::Connected);
-		postHandshake();
+			PLOG_INFO << "TLS handshake finished";
+			changeState(State::Connected);
+			postHandshake();
+		}
 
 
-		while (true) {
-			ssize_t ret;
-			do {
-				ret = gnutls_record_recv(mSession, buffer, bufferSize);
-			} while (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN);
+		if (state() == State::Connected) {
+			while (true) {
+				ssize_t ret = gnutls_record_recv(mSession, buffer, bufferSize);
 
 
-			// Consider premature termination as remote closing
-			if (ret == GNUTLS_E_PREMATURE_TERMINATION) {
-				PLOG_DEBUG << "TLS connection terminated";
-				break;
-			}
+				if (ret == GNUTLS_E_AGAIN)
+					return;
 
 
-			if (gnutls::check(ret)) {
-				if (ret == 0) {
-					// Closed
-					PLOG_DEBUG << "TLS connection cleanly closed";
+				// Consider premature termination as remote closing
+				if (ret == GNUTLS_E_PREMATURE_TERMINATION) {
+					PLOG_DEBUG << "TLS connection terminated";
 					break;
 					break;
 				}
 				}
-				auto *b = reinterpret_cast<byte *>(buffer);
-				recv(make_message(b, b + ret));
+
+				if (gnutls::check(ret)) {
+					if (ret == 0) {
+						// Closed
+						PLOG_DEBUG << "TLS connection cleanly closed";
+						break;
+					}
+					auto *b = reinterpret_cast<byte *>(buffer);
+					recv(make_message(b, b + ret));
+				}
 			}
 			}
 		}
 		}
 	} catch (const std::exception &e) {
 	} catch (const std::exception &e) {
@@ -250,6 +258,9 @@ ssize_t TlsTransport::ReadCallback(gnutls_transport_ptr_t ptr, void *data, size_
 			position += len;
 			position += len;
 			gnutls_transport_set_errno(t->mSession, 0);
 			gnutls_transport_set_errno(t->mSession, 0);
 			return len;
 			return len;
+		} else if (t->mIncomingQueue.running()) {
+			gnutls_transport_set_errno(t->mSession, EAGAIN);
+			return -1;
 		} else {
 		} else {
 			// Closed
 			// Closed
 			gnutls_transport_set_errno(t->mSession, 0);
 			gnutls_transport_set_errno(t->mSession, 0);
@@ -263,7 +274,7 @@ ssize_t TlsTransport::ReadCallback(gnutls_transport_ptr_t ptr, void *data, size_
 	}
 	}
 }
 }
 
 
-int TlsTransport::TimeoutCallback(gnutls_transport_ptr_t ptr, unsigned int ms) {
+int TlsTransport::TimeoutCallback(gnutls_transport_ptr_t ptr, unsigned int /* ms */) {
 	TlsTransport *t = static_cast<TlsTransport *>(ptr);
 	TlsTransport *t = static_cast<TlsTransport *>(ptr);
 	try {
 	try {
 		message_ptr &message = t->mIncomingMessage;
 		message_ptr &message = t->mIncomingMessage;
@@ -272,9 +283,7 @@ int TlsTransport::TimeoutCallback(gnutls_transport_ptr_t ptr, unsigned int ms) {
 		if(message && position < message->size())
 		if(message && position < message->size())
 			return 1;
 			return 1;
 
 
-		bool isReadable = t->mIncomingQueue.wait(
-		    ms != GNUTLS_INDEFINITE_TIMEOUT ? std::make_optional(milliseconds(ms)) : nullopt);
-		return isReadable ? 1 : 0;
+		return !t->mIncomingQueue.empty() ? 1 : 0;
 
 
 	} catch (const std::exception &e) {
 	} catch (const std::exception &e) {
 		PLOG_WARNING << e.what();
 		PLOG_WARNING << e.what();

+ 5 - 4
src/impl/tlstransport.hpp

@@ -24,7 +24,7 @@ namespace rtc::impl {
 
 
 class TcpTransport;
 class TcpTransport;
 
 
-class TlsTransport : public Transport {
+class TlsTransport : public Transport, public std::enable_shared_from_this<TlsTransport> {
 public:
 public:
 	static void Init();
 	static void Init();
 	static void Cleanup();
 	static void Cleanup();
@@ -44,14 +44,15 @@ protected:
 	virtual bool outgoing(message_ptr message) override;
 	virtual bool outgoing(message_ptr message) override;
 	virtual void postHandshake();
 	virtual void postHandshake();
 
 
-	void runRecvLoop();
+	void enqueueRecv();
+	void doRecv();
 
 
 	const optional<string> mHost;
 	const optional<string> mHost;
 	const bool mIsClient;
 	const bool mIsClient;
 
 
 	Queue<message_ptr> mIncomingQueue;
 	Queue<message_ptr> mIncomingQueue;
-	std::thread mRecvThread;
-	std::atomic<bool> mStarted = false;
+	std::atomic<int> mPendingRecvCount = 0;
+	std::mutex mRecvMutex;
 
 
 #if USE_GNUTLS
 #if USE_GNUTLS
 	gnutls_session_t mSession;
 	gnutls_session_t mSession;