Browse Source

Introduced poll service for TCP sockets

Paul-Louis Ageneau 3 years ago
parent
commit
9cd5b4ec93
7 changed files with 440 additions and 160 deletions
  1. 2 0
      CMakeLists.txt
  2. 4 1
      src/impl/init.cpp
  3. 1 1
      src/impl/init.hpp
  4. 216 0
      src/impl/pollservice.cpp
  5. 91 0
      src/impl/pollservice.hpp
  6. 119 150
      src/impl/tcptransport.cpp
  7. 7 8
      src/impl/tcptransport.hpp

+ 2 - 0
CMakeLists.txt

@@ -120,6 +120,7 @@ set(LIBDATACHANNEL_IMPL_SOURCES
 	${CMAKE_CURRENT_SOURCE_DIR}/src/impl/base64.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/impl/sha.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/impl/pollinterrupter.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/impl/pollservice.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/impl/tcpserver.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/impl/tcptransport.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/impl/tlstransport.cpp
@@ -150,6 +151,7 @@ set(LIBDATACHANNEL_IMPL_HEADERS
 	${CMAKE_CURRENT_SOURCE_DIR}/src/impl/base64.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/impl/sha.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/impl/pollinterrupter.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/impl/pollservice.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/impl/tcpserver.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/impl/tcptransport.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/impl/tlstransport.hpp

+ 4 - 1
src/impl/init.cpp

@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2020 Paul-Louis Ageneau
+ * Copyright (c) 2020-2022 Paul-Louis Ageneau
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -21,6 +21,7 @@
 
 #include "certificate.hpp"
 #include "dtlstransport.hpp"
+#include "pollservice.hpp"
 #include "sctptransport.hpp"
 #include "threadpool.hpp"
 #include "tls.hpp"
@@ -124,6 +125,7 @@ void Init::doInit() {
 #endif
 
 	ThreadPool::Instance().spawn(THREADPOOL_SIZE);
+	PollService::Instance().start();
 
 #if USE_GNUTLS
 	// Nothing to do
@@ -152,6 +154,7 @@ void Init::doCleanup() {
 
 	PLOG_DEBUG << "Global cleanup";
 
+	PollService::Instance().join();
 	ThreadPool::Instance().join();
 
 	SctpTransport::Cleanup();

+ 1 - 1
src/impl/init.hpp

@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2020 Paul-Louis Ageneau
+ * Copyright (c) 2020-2022 Paul-Louis Ageneau
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public

+ 216 - 0
src/impl/pollservice.cpp

@@ -0,0 +1,216 @@
+/**
+ * Copyright (c) 2022 Paul-Louis Ageneau
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "pollservice.hpp"
+#include "internals.hpp"
+
+#if RTC_ENABLE_WEBSOCKET
+
+#include <cassert>
+
+namespace rtc::impl {
+
+using namespace std::chrono_literals;
+using std::chrono::duration_cast;
+using std::chrono::milliseconds;
+
+PollService &PollService::Instance() {
+	static PollService *instance = new PollService;
+	return *instance;
+}
+
+PollService::PollService() : mStopped(true) {}
+
+PollService::~PollService() {}
+
+void PollService::start() {
+	mSocks = std::make_unique<SocketMap>();
+	mStopped = false;
+	mThread = std::thread(&PollService::runLoop, this);
+}
+
+void PollService::join() {
+	std::unique_lock lock(mMutex);
+	if(std::exchange(mStopped, true))
+		return;
+
+	lock.unlock();
+
+	mInterrupter.interrupt();
+	mThread.join();
+	mSocks.reset();
+}
+
+void PollService::add(socket_t sock, Params params) {
+	std::unique_lock lock(mMutex);
+	assert(mSocks);
+
+	if (!params.callback)
+		throw std::invalid_argument("poll callback is null");
+
+	mSocks->erase(sock);
+
+	PLOG_VERBOSE << "Registering socket in poll service, direction=" << params.direction;
+	auto until = params.timeout ? std::make_optional(clock::now() + *params.timeout) : nullopt;
+	mSocks->emplace(sock, SocketEntry{std::move(params), std::move(until)});
+
+	mInterrupter.interrupt();
+}
+
+void PollService::remove(socket_t sock) {
+	std::unique_lock lock(mMutex);
+	assert(mSocks);
+
+	PLOG_VERBOSE << "Unregistering socket in poll service";
+	mSocks->erase(sock);
+
+	mInterrupter.interrupt();
+}
+
+void PollService::prepare(std::vector<struct pollfd> &pfds, optional<clock::time_point> &next) {
+	std::unique_lock lock(mMutex);
+	pfds.resize(1 + mSocks->size());
+	next.reset();
+
+	auto it = pfds.begin();
+	mInterrupter.prepare(*it++);
+	for (const auto &[sock, entry] : *mSocks) {
+		it->fd = sock;
+		switch (entry.params.direction) {
+		case Direction::In:
+			it->events = POLLIN;
+			break;
+		case Direction::Out:
+			it->events = POLLOUT;
+			break;
+		default:
+			it->events = POLLIN | POLLOUT;
+			break;
+		}
+		if (entry.until)
+			next = next ? std::min(*next, *entry.until) : *entry.until;
+
+		++it;
+	}
+}
+
+void PollService::process(std::vector<struct pollfd> &pfds) {
+	std::unique_lock lock(mMutex);
+	for (auto it = pfds.begin(); it != pfds.end(); ++it) {
+		socket_t sock = it->fd;
+		auto jt = mSocks->find(sock);
+		if (jt == mSocks->end())
+			continue; // removed
+
+		auto &entry = jt->second;
+		const auto &params = entry.params;
+
+		if (it->revents & POLLNVAL || it->revents & POLLERR) {
+			PLOG_VERBOSE << "Poll error event";
+			auto callback = std::move(params.callback);
+			mSocks->erase(sock);
+			callback(Event::Error);
+			continue;
+		}
+
+		if (it->revents & POLLIN || it->revents & POLLOUT) {
+			entry.until =
+			    params.timeout ? std::make_optional(clock::now() + *params.timeout) : nullopt;
+
+			auto callback = params.callback;
+
+			if (it->revents & POLLIN) {
+				PLOG_VERBOSE << "Poll in event";
+				params.callback(Event::In);
+			}
+
+			if (it->revents & POLLOUT) {
+				PLOG_VERBOSE << "Poll out event";
+				params.callback(Event::Out);
+			}
+
+			continue;
+		}
+
+		if (entry.until && clock::now() >= *entry.until) {
+			PLOG_VERBOSE << "Poll timeout event";
+			auto callback = std::move(params.callback);
+			mSocks->erase(sock);
+			callback(Event::Timeout);
+			continue;
+		}
+	}
+}
+
+void PollService::runLoop() {
+	try {
+		PLOG_DEBUG << "Poll service started";
+		assert(mSocks);
+
+		std::vector<struct pollfd> pfds;
+		optional<clock::time_point> next;
+		while (!mStopped) {
+			prepare(pfds, next);
+
+			PLOG_VERBOSE << "Entering poll";
+			int ret;
+			do {
+				int timeout = next ? duration_cast<milliseconds>(
+				                         std::max(clock::duration::zero(), *next - clock::now()))
+				                         .count()
+				                   : -1;
+				ret = ::poll(pfds.data(), pfds.size(), timeout);
+
+			} while (ret < 0 && (sockerrno == SEINTR || sockerrno == SEAGAIN));
+
+			PLOG_VERBOSE << "Exiting poll";
+
+			if (ret < 0)
+				throw std::runtime_error("Failed to wait for socket connection");
+
+			process(pfds);
+		}
+	} catch (const std::exception &e) {
+		PLOG_FATAL << "Poll service failed: " << e.what();
+	}
+
+	PLOG_DEBUG << "Poll service stopped";
+}
+
+std::ostream &operator<<(std::ostream &out, PollService::Direction direction) {
+	const char *str;
+	switch (direction) {
+	case PollService::Direction::In:
+		str = "in";
+		break;
+	case PollService::Direction::Out:
+		str = "out";
+		break;
+	case PollService::Direction::Both:
+		str = "both";
+		break;
+	default:
+		str = "unknown";
+		break;
+	}
+	return out << str;
+}
+
+} // namespace rtc::impl
+
+#endif

+ 91 - 0
src/impl/pollservice.hpp

@@ -0,0 +1,91 @@
+/**
+ * Copyright (c) 2022 Paul-Louis Ageneau
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef RTC_IMPL_POLL_SERVICE_H
+#define RTC_IMPL_POLL_SERVICE_H
+
+#include "pollinterrupter.hpp"
+#include "socket.hpp"
+#include "common.hpp"
+#include "internals.hpp"
+
+#if RTC_ENABLE_WEBSOCKET
+
+#include <chrono>
+#include <mutex>
+#include <thread>
+#include <unordered_map>
+#include <functional>
+
+namespace rtc::impl {
+
+class PollService {
+public:
+	using clock = std::chrono::steady_clock;
+
+	static PollService &Instance();
+
+	PollService(const PollService &) = delete;
+	PollService &operator=(const PollService &) = delete;
+	PollService(PollService &&) = delete;
+	PollService &operator=(PollService &&) = delete;
+
+	void start();
+	void join();
+
+	enum class Direction { Both, In, Out };
+	enum class Event { None, Error, Timeout, In, Out };
+
+	struct Params {
+		Direction direction;
+		optional<clock::duration> timeout;
+		std::function<void(Event)> callback;
+	};
+
+	void add(socket_t sock, Params params);
+	void remove(socket_t sock);
+
+private:
+	PollService();
+	~PollService();
+
+	void prepare(std::vector<struct pollfd> &pfds, optional<clock::time_point> &next);
+	void process(std::vector<struct pollfd> &pfds);
+	void runLoop();
+
+	struct SocketEntry {
+		Params params;
+		optional<clock::time_point> until;
+	};
+
+	using SocketMap = std::unordered_map<socket_t, SocketEntry>;
+	unique_ptr<SocketMap> mSocks;
+
+	std::recursive_mutex mMutex;
+	std::thread mThread;
+	bool mStopped;
+	PollInterrupter mInterrupter;
+};
+
+std::ostream &operator<<(std::ostream &out, PollService::Direction direction);
+
+} // namespace rtc::impl
+
+#endif
+
+#endif

+ 119 - 150
src/impl/tcptransport.cpp

@@ -30,6 +30,7 @@
 
 namespace rtc::impl {
 
+using namespace std::placeholders;
 using namespace std::chrono_literals;
 using std::chrono::duration_cast;
 using std::chrono::milliseconds;
@@ -73,23 +74,25 @@ TcpTransport::~TcpTransport() { stop(); }
 void TcpTransport::start() {
 	Transport::start();
 
-	PLOG_DEBUG << "Starting TCP recv thread";
-	mThread = std::thread(&TcpTransport::runLoop, this);
+	if (mSock == INVALID_SOCKET) {
+		connect();
+	} else {
+		changeState(State::Connected);
+		setPoll(PollService::Direction::In);
+	}
 }
 
 bool TcpTransport::stop() {
 	if (!Transport::stop())
 		return false;
 
-	PLOG_DEBUG << "Waiting for TCP recv thread";
 	close();
-	mThread.join();
 	return true;
 }
 
 bool TcpTransport::send(message_ptr message) {
-	std::unique_lock lock(mSockMutex);
-	if (state() == State::Connecting)
+	std::lock_guard lock(mSendMutex);
+	if (state() != State::Connected)
 		throw std::runtime_error("Connection is not open");
 
 	if (state() != State::Connected)
@@ -111,20 +114,22 @@ void TcpTransport::incoming(message_ptr message) {
 }
 
 bool TcpTransport::outgoing(message_ptr message) {
-	// mSockMutex must be locked
+	// mSendMutex must be locked
 	// Flush the queue, and if nothing is pending, try to send directly
 	if (trySendQueue() && trySendMessage(message))
 		return true;
 
 	mSendQueue.push(message);
-	mInterrupter.interrupt(); // so the thread waits for writability
+
+	setPoll(PollService::Direction::Both);
 	return false;
 }
 
 string TcpTransport::remoteAddress() const { return mHostname + ':' + mService; }
 
-void TcpTransport::connect(const string &hostname, const string &service) {
-	PLOG_DEBUG << "Connecting to " << hostname << ":" << service;
+void TcpTransport::connect() {
+	PLOG_DEBUG << "Connecting to " << mHostname << ":" << mService;
+	changeState(State::Connecting);
 
 	struct addrinfo hints = {};
 	hints.ai_family = AF_UNSPEC;
@@ -133,39 +138,68 @@ void TcpTransport::connect(const string &hostname, const string &service) {
 	hints.ai_flags = AI_ADDRCONFIG;
 
 	struct addrinfo *result = nullptr;
-	if (getaddrinfo(hostname.c_str(), service.c_str(), &hints, &result))
-		throw std::runtime_error("Resolution failed for \"" + hostname + ":" + service + "\"");
-
-	try {
-		for (auto p = result; p; p = p->ai_next) {
-			try {
-				connect(p->ai_addr, socklen_t(p->ai_addrlen));
-
-				PLOG_INFO << "Connected to " << hostname << ":" << service;
-				freeaddrinfo(result);
-				return;
-
-			} catch (const std::runtime_error &e) {
-				if (p->ai_next) {
-					PLOG_DEBUG << e.what();
-				} else {
-					PLOG_WARNING << e.what();
-				}
-			}
+	if (getaddrinfo(mHostname.c_str(), mService.c_str(), &hints, &result))
+		throw std::runtime_error("Resolution failed for \"" + mHostname + ":" + mService + "\"");
+
+	auto attempt = [this, result](struct addrinfo *ai, auto recurse) {
+		if (!ai) {
+			PLOG_WARNING << "Connection to " << mHostname << ":" << mService << " failed";
+			freeaddrinfo(result);
+			changeState(State::Failed);
+			return;
 		}
 
-		std::ostringstream msg;
-		msg << "Connection to " << hostname << ":" << service << " failed";
-		throw std::runtime_error(msg.str());
+		try {
+			prepare(ai->ai_addr, socklen_t(ai->ai_addrlen));
+
+			auto timeout = 10s;
+			PollService::Instance().add(
+			    mSock,
+			    {PollService::Direction::Out, timeout,
+			     [this, result, ai, recurse](PollService::Event event) mutable {
+				     try {
+					     if (event == PollService::Event::Error)
+						     throw std::runtime_error("Connection interrupted");
+
+						if (event == PollService::Event::Timeout)
+						     throw std::runtime_error("Connection timed out");
+
+					     if (event != PollService::Event::Out)
+						     return;
+
+					     int err = 0;
+					     socklen_t errlen = sizeof(err);
+					     if (::getsockopt(mSock, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen) != 0)
+						     throw std::runtime_error("Failed to get socket error code");
+
+					     if (err != 0) {
+						     std::ostringstream msg;
+						     msg << "TCP connection failed, errno=" << err;
+						     throw std::runtime_error(msg.str());
+					     }
+
+					     PLOG_DEBUG << "TCP connection succeeded";
+					     freeaddrinfo(result);
+					     changeState(State::Connected);
+
+					     setPoll(PollService::Direction::In);
+
+				     } catch (const std::runtime_error &e) {
+					     PLOG_DEBUG << e.what();
+					     recurse(ai->ai_next, recurse);
+				     }
+			     }});
+
+		} catch (const std::runtime_error &e) {
+			PLOG_DEBUG << e.what();
+			recurse(ai->ai_next, recurse);
+		}
+	};
 
-	} catch (...) {
-		freeaddrinfo(result);
-		throw;
-	}
+	attempt(result, attempt);
 }
 
-void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
-	std::unique_lock lock(mSockMutex);
+void TcpTransport::prepare(const sockaddr *addr, socklen_t addrlen) {
 	try {
 		char node[MAX_NUMERICNODE_LEN];
 		char serv[MAX_NUMERICSERV_LEN];
@@ -201,42 +235,6 @@ void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
 			throw std::runtime_error(msg.str());
 		}
 
-		// Wait for connection
-		struct pollfd pfd[1];
-		pfd[0].fd = mSock;
-		pfd[0].events = POLLOUT;
-
-		using clock = std::chrono::steady_clock;
-		auto end = clock::now() + 10s; // TODO: Make the timeout configurable
-
-		do {
-			auto timeout = std::max(clock::duration::zero(), end - clock::now());
-			ret = ::poll(pfd, 1, int(duration_cast<milliseconds>(timeout).count()));
-
-		} while (ret < 0 && (sockerrno == SEINTR || sockerrno == SEAGAIN));
-
-		if (ret < 0)
-			throw std::runtime_error("Failed to wait for socket connection");
-
-		if (!(pfd[0].revents & POLLOUT)) {
-			std::ostringstream msg;
-			msg << "TCP connection to " << node << ":" << serv << " timed out";
-			throw std::runtime_error(msg.str());
-		}
-
-		int err = 0;
-		socklen_t errlen = sizeof(err);
-		if (::getsockopt(mSock, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen) != 0)
-			throw std::runtime_error("Failed to get socket error code");
-
-		if (err != 0) {
-			std::ostringstream msg;
-			msg << "TCP connection to " << node << ":" << serv << " failed, errno=" << err;
-			throw std::runtime_error(msg.str());
-		}
-
-		PLOG_DEBUG << "TCP connection to " << node << ":" << serv << " succeeded";
-
 	} catch (...) {
 		if (mSock != INVALID_SOCKET) {
 			::closesocket(mSock);
@@ -246,19 +244,24 @@ void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
 	}
 }
 
+void TcpTransport::setPoll(PollService::Direction direction) {
+	PollService::Instance().add(mSock,
+	                            {direction, nullopt, std::bind(&TcpTransport::process, this, _1)});
+}
+
 void TcpTransport::close() {
-	std::unique_lock lock(mSockMutex);
+	std::lock_guard lock(mSendMutex);
 	if (mSock != INVALID_SOCKET) {
 		PLOG_DEBUG << "Closing TCP socket";
+		PollService::Instance().remove(mSock);
 		::closesocket(mSock);
 		mSock = INVALID_SOCKET;
 	}
 	changeState(State::Disconnected);
-	mInterrupter.interrupt();
 }
 
 bool TcpTransport::trySendQueue() {
-	// mSockMutex must be locked
+	// mSendMutex must be locked
 	while (auto next = mSendQueue.peek()) {
 		message_ptr message = std::move(*next);
 		if (!trySendMessage(message)) {
@@ -267,11 +270,12 @@ bool TcpTransport::trySendQueue() {
 		}
 		mSendQueue.pop();
 	}
+
 	return true;
 }
 
 bool TcpTransport::trySendMessage(message_ptr &message) {
-	// mSockMutex must be locked
+	// mSendMutex must be locked
 	auto data = reinterpret_cast<const char *>(message->data());
 	auto size = message->size();
 	while (size) {
@@ -298,95 +302,60 @@ bool TcpTransport::trySendMessage(message_ptr &message) {
 	return true;
 }
 
-void TcpTransport::runLoop() {
-	const size_t bufferSize = 4096;
-
-	// Connect
-	try {
-		changeState(State::Connecting);
-		if (mSock == INVALID_SOCKET)
-			connect(mHostname, mService);
-
-	} catch (const std::exception &e) {
-		PLOG_ERROR << "TCP connect: " << e.what();
-		changeState(State::Failed);
-		return;
-	}
-
-	// Receive loop
+void TcpTransport::process(PollService::Event event) {
 	try {
-		PLOG_INFO << "TCP connected";
-		changeState(State::Connected);
+		PLOG_VERBOSE << "Poll event";
 
-		while (true) {
-			std::unique_lock lock(mSockMutex);
+		switch (event) {
+		case PollService::Event::Error: {
+			PLOG_WARNING << "TCP connection terminated";
+			break;
+		}
 
-			if (mSock == INVALID_SOCKET)
-				break;
+		case PollService::Event::Timeout: {
+			PLOG_VERBOSE << "TCP is idle";
+			incoming(make_message(0));
+			return;
+		}
 
-			struct pollfd pfd[2];
-			pfd[0].fd = mSock;
-			pfd[0].events = !mSendQueue.empty() ? (POLLIN | POLLOUT) : POLLIN;
-			mInterrupter.prepare(pfd[1]);
+		case PollService::Event::Out: {
+			if (trySendQueue())
+				setPoll(PollService::Direction::In);
 
-			using clock = std::chrono::steady_clock;
-			auto end = clock::now() + 10s;
+			return;
+		}
 
-			int ret;
-			lock.unlock();
-			do {
-				auto timeout = std::max(clock::duration::zero(), end - clock::now());
-				ret = ::poll(pfd, 2, int(duration_cast<milliseconds>(timeout).count()));
+		case PollService::Event::In: {
+			const size_t bufferSize = 4096;
+			char buffer[bufferSize];
+			int len;
+			while((len = ::recv(mSock, buffer, bufferSize, 0)) > 0) {
+				auto *b = reinterpret_cast<byte *>(buffer);
+				incoming(make_message(b, b + len));
+			}
 
-			} while (ret < 0 && (sockerrno == SEINTR || sockerrno == SEAGAIN));
-			lock.lock();
+			if (len == 0)
+				break; // clean close
 
-			if (mSock == INVALID_SOCKET)
+			if (sockerrno != SEAGAIN && sockerrno != SEWOULDBLOCK) {
+				PLOG_WARNING << "TCP connection lost";
 				break;
-
-			if (ret < 0)
-				throw std::runtime_error("Failed to wait on socket");
-
-			if (ret == 0) {
-				PLOG_VERBOSE << "TCP is idle";
-				lock.unlock(); // unlock now since the upper layer might send on incoming
-				incoming(make_message(0));
-				continue;
-			}
-
-			if (pfd[0].revents & POLLNVAL || pfd[0].revents & POLLERR) {
-				throw std::runtime_error("Error while waiting for socket connection");
 			}
 
-			if (pfd[0].revents & POLLOUT) {
-				trySendQueue();
-			}
+			return;
+		}
 
-			if (pfd[0].revents & POLLIN) {
-				char buffer[bufferSize];
-				int len = ::recv(mSock, buffer, bufferSize, 0);
-				if (len < 0) {
-					if (sockerrno == SEAGAIN || sockerrno == SEWOULDBLOCK) {
-						continue;
-					} else {
-						PLOG_WARNING << "TCP connection lost";
-						break;
-					}
-				}
-
-				if (len == 0)
-					break; // clean close
-
-				lock.unlock(); // unlock now since the upper layer might send on incoming
-				auto *b = reinterpret_cast<byte *>(buffer);
-				incoming(make_message(b, b + len));
-			}
+		default:
+			// Ignore
+			return;
 		}
+
 	} catch (const std::exception &e) {
-		PLOG_ERROR << "TCP recv: " << e.what();
+		PLOG_ERROR << e.what();
 	}
 
 	PLOG_INFO << "TCP disconnected";
+	PollService::Instance().remove(mSock);
 	changeState(State::Disconnected);
 	recv(nullptr);
 }

+ 7 - 8
src/impl/tcptransport.hpp

@@ -20,15 +20,15 @@
 #define RTC_IMPL_TCP_TRANSPORT_H
 
 #include "common.hpp"
+#include "pollservice.hpp"
 #include "queue.hpp"
-#include "pollinterrupter.hpp"
 #include "socket.hpp"
 #include "transport.hpp"
 
 #if RTC_ENABLE_WEBSOCKET
 
 #include <mutex>
-#include <thread>
+#include <future>
 
 namespace rtc::impl {
 
@@ -50,23 +50,22 @@ public:
 	string remoteAddress() const;
 
 private:
-	void connect(const string &hostname, const string &service);
-	void connect(const sockaddr *addr, socklen_t addrlen);
+	void connect();
+	void prepare(const sockaddr *addr, socklen_t addrlen);
+	void setPoll(PollService::Direction direction);
 	void close();
 
 	bool trySendQueue();
 	bool trySendMessage(message_ptr &message);
 
-	void runLoop();
+	void process(PollService::Event event);
 
 	const bool mIsActive;
 	string mHostname, mService;
 
 	socket_t mSock = INVALID_SOCKET;
-	std::mutex mSockMutex;
-	std::thread mThread;
-	PollInterrupter mInterrupter;
 	Queue<message_ptr> mSendQueue;
+	std::mutex mSendMutex;
 };
 
 } // namespace rtc::impl