Browse Source

Added TCP transport

Paul-Louis Ageneau 5 years ago
parent
commit
d3a2a88301
4 changed files with 297 additions and 0 deletions
  1. 13 0
      CMakeLists.txt
  2. 11 0
      include/rtc/queue.hpp
  3. 207 0
      src/tcptransport.cpp
  4. 66 0
      src/tcptransport.hpp

+ 13 - 0
CMakeLists.txt

@@ -6,6 +6,7 @@ project (libdatachannel
 
 option(USE_GNUTLS "Use GnuTLS instead of OpenSSL" OFF)
 option(USE_JUICE "Use libjuice instead of libnice" OFF)
+option(ENABLE_WEBSOCKET "Build WebSocket support" OFF)
 
 if(USE_GNUTLS)
 	option(USE_NETTLE "Use Nettle instead of OpenSSL in libjuice" ON)
@@ -34,6 +35,10 @@ set(LIBDATACHANNEL_SOURCES
 	${CMAKE_CURRENT_SOURCE_DIR}/src/peerconnection.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/rtc.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/sctptransport.cpp
+
+	${CMAKE_CURRENT_SOURCE_DIR}/src/tcptransport.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/tlstransport.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/wstransport.cpp
 )
 
 set(LIBDATACHANNEL_HEADERS
@@ -146,6 +151,14 @@ else()
 	target_link_libraries(datachannel-static LibNice::LibNice)
 endif()
 
+if (ENABLE_WEBSOCKET)
+	target_compile_definitions(datachannel PRIVATE ENABLE_WEBSOCKET=1)
+	target_compile_definitions(datachannel-static PRIVATE ENABLE_WEBSOCKET=1)
+else()
+	target_compile_definitions(datachannel PRIVATE ENABLE_WEBSOCKET=0)
+	target_compile_definitions(datachannel-static PRIVATE ENABLE_WEBSOCKET=0)
+endif()
+
 add_library(LibDataChannel::LibDataChannel ALIAS datachannel)
 add_library(LibDataChannel::LibDataChannelStatic ALIAS datachannel-static)
 

+ 11 - 0
include/rtc/queue.hpp

@@ -44,6 +44,7 @@ public:
 	void push(T element);
 	std::optional<T> pop();
 	std::optional<T> peek();
+	std::optional<T> exchange(T element);
 	bool wait(const std::optional<std::chrono::milliseconds> &duration = nullopt);
 
 private:
@@ -118,6 +119,16 @@ template <typename T> std::optional<T> Queue<T>::peek() {
 	}
 }
 
+template <typename T> std::optional<T> Queue<T>::exchange(T element) {
+	std::unique_lock lock(mMutex);
+	if (!mQueue.empty()) {
+		std::swap(mQueue.front(), element);
+		return std::optional<T>{element};
+	} else {
+		return nullopt;
+	}
+}
+
 template <typename T>
 bool Queue<T>::wait(const std::optional<std::chrono::milliseconds> &duration) {
 	std::unique_lock lock(mMutex);

+ 207 - 0
src/tcptransport.cpp

@@ -0,0 +1,207 @@
+/**
+ * Copyright (c) 2020 Paul-Louis Ageneau
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifdef ENABLE_WEBSOCKET
+
+#include "tcptransport.hpp"
+
+namespace rtc {
+
+TcpTransport::TcpTransport(const string &hostname, const string &service)
+    : mHostname(hostname), mService(service) {
+	mThread = std::thread(&TcpTransport::runLoop, this);
+}
+
+TcpTransport::~TcpTransport() { stop(); }
+
+bool TcpTransport::stop() {
+	if (!Transport::stop())
+		return false;
+
+	close();
+	mThread.join();
+	return true;
+}
+
+bool TcpTransport::send(message_ptr message) { return outgoing(message); }
+
+void TcpTransport::incoming(message_ptr message) { recv(message); }
+
+bool TcpTransport::outgoing(message_ptr message) {
+	if (mSock == INVALID_SOCKET)
+		throw std::runtime_error("Not connected");
+
+	if (!message)
+		return mSendQueue.empty();
+
+	// If nothing is pending, try to send directly
+	if (mSendQueue.empty() && trySendMessage(message))
+		return true;
+
+	mSendQueue.push(message);
+	return false;
+}
+
+void TcpTransport::connect(const string &hostname, const string &service) {
+	struct addrinfo hints = {};
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_protocol = IPPROTO_TCP;
+	hints.ai_flags = AI_ADDRCONFIG;
+
+	struct addrinfo *result = nullptr;
+	if (getaddrinfo(hostname.c_str(), service.c_str(), &hints, &result))
+		throw std::runtime_error("Resolution failed for \"" + hostname + ":" + service + "\"");
+
+	for (auto p = result; p; p = p->ai_next)
+		try {
+			connect(p->ai_addr, p->ai_addrlen);
+			freeaddrinfo(result);
+			return;
+		} catch (const std::runtime_error &e) {
+			PLOG_WARNING << e.what();
+		}
+
+	freeaddrinfo(result);
+	throw std::runtime_error("Connection failed to \"" + hostname + ":" + service + "\"");
+}
+
+void TcpTransport::connect(const sockaddr *addr, socklen_t addrlen) {
+	try {
+		// Create socket
+		mSock = ::socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
+		if (mSock == INVALID_SOCKET)
+			throw std::runtime_error("TCP socket creation failed");
+
+		ctl_t b = 1;
+		if (::ioctlsocket(mSock, FIONBIO, &b) < 0)
+			throw std::runtime_error("Failed to set socket non-blocking mode");
+
+		// Initiate connection
+		::connect(mSock, addr, addrlen);
+
+		fd_set writefds;
+		FD_ZERO(&writefds);
+		FD_SET(mSock, &writefds);
+		struct timeval tv;
+		tv.tv_sec = 10; // TODO
+		tv.tv_usec = 0;
+		int ret = ::select(SOCKET_TO_INT(mSock) + 1, NULL, &writefds, NULL, &tv);
+
+		if (ret < 0)
+			throw std::runtime_error("Failed to wait for socket connection");
+
+		if (ret == 0 || ::send(mSock, NULL, 0, MSG_NOSIGNAL) != 0)
+			throw std::runtime_error("Connection failed");
+
+	} catch (...) {
+		close();
+		throw;
+	}
+}
+
+void TcpTransport::close() {
+	if (mSock != INVALID_SOCKET) {
+		::closesocket(mSock);
+		mSock = INVALID_SOCKET;
+	}
+}
+
+bool TcpTransport::trySendQueue() {
+	while (auto next = mSendQueue.peek()) {
+		auto message = *next;
+		if (!trySendMessage(message)) {
+			mSendQueue.exchange(message);
+			return false;
+		}
+		mSendQueue.pop();
+	}
+	return true;
+}
+
+bool TcpTransport::trySendMessage(message_ptr &message) {
+	auto data = reinterpret_cast<const char *>(message->data());
+	auto size = message->size();
+	while (size) {
+		int len = ::send(mSock, data, size, MSG_NOSIGNAL);
+		if (len >= 0) {
+			data += len;
+			size -= len;
+		} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+			message = make_message(message->data() + len, message->data() + size);
+			return false;
+		} else {
+			throw std::runtime_error("Connection lost, errno=" + std::to_string(sockerrno));
+		}
+	}
+	message = nullptr;
+	return true;
+}
+
+void TcpTransport::runLoop() {
+	const size_t bufferSize = 4096;
+
+	// Connect
+	try {
+		connect(mHostname, mService);
+
+	} catch (const std::exception &e) {
+		PLOG_ERROR << "TCP connect: " << e.what();
+		return;
+	}
+
+	// Receive loop
+	try {
+		while (true) {
+			fd_set readfds, writefds;
+			FD_ZERO(&readfds);
+			FD_ZERO(&writefds);
+			FD_SET(mSock, &readfds);
+			if (!mSendQueue.empty())
+				FD_SET(mSock, &writefds);
+			int ret = ::select(SOCKET_TO_INT(mSock) + 1, &readfds, &writefds, NULL, NULL);
+			if (ret < 0)
+				throw std::runtime_error("Failed to wait on socket");
+
+			if (FD_ISSET(mSock, &readfds)) {
+				char buffer[bufferSize];
+				int len = ::recv(mSock, buffer, bufferSize, 0);
+				if (len < 0)
+					throw std::runtime_error("Connection lost, errno=" + std::to_string(sockerrno));
+
+				if (len == 0)
+					break; // clean close
+
+				auto *b = reinterpret_cast<byte *>(buffer);
+				incoming(make_message(b, b + ret));
+			}
+
+			if (FD_ISSET(mSock, &writefds))
+				trySendQueue();
+		}
+	} catch (const std::exception &e) {
+		PLOG_ERROR << "TCP recv: " << e.what();
+	}
+
+	PLOG_INFO << "TCP disconnected";
+	recv(nullptr);
+}
+
+} // namespace rtc
+
+#endif

+ 66 - 0
src/tcptransport.hpp

@@ -0,0 +1,66 @@
+/**
+ * Copyright (c) 2020 Paul-Louis Ageneau
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef RTC_TCP_TRANSPORT_H
+#define RTC_TCP_TRANSPORT_H
+
+#ifdef ENABLE_WEBSOCKET
+
+#include "include.hpp"
+#include "queue.hpp"
+#include "transport.hpp"
+
+#include <thread>
+
+// Use the socket defines from libjuice
+#include "../deps/libjuice/src/socket.h"
+
+namespace rtc {
+
+class TcpTransport : public Transport {
+public:
+	TcpTransport(const string &hostname, const string &service);
+	virtual ~TcpTransport();
+
+	bool stop() override;
+	bool send(message_ptr message) override;
+
+	void incoming(message_ptr message) override;
+	bool outgoing(message_ptr message) override;
+
+private:
+	void connect(const string &hostname, const string &service);
+	void connect(const sockaddr *addr, socklen_t addrlen);
+	void close();
+
+	bool trySendQueue();
+	bool trySendMessage(message_ptr &message);
+
+	void runLoop();
+
+	string mHostname, mService;
+	socket_t mSock;
+	std::thread mThread;
+	Queue<message_ptr> mSendQueue;
+};
+
+} // namespace rtc
+
+#endif
+
+#endif