Browse Source

Added Track class

Paul-Louis Ageneau 5 years ago
parent
commit
0b055ac17d
9 changed files with 192 additions and 23 deletions
  1. 3 1
      CMakeLists.txt
  2. 2 0
      include/rtc/include.hpp
  3. 1 1
      include/rtc/message.hpp
  4. 6 0
      include/rtc/queue.hpp
  5. 70 0
      include/rtc/track.hpp
  6. 6 9
      src/datachannel.cpp
  7. 3 6
      src/message.cpp
  8. 93 0
      src/track.cpp
  9. 8 6
      src/websocket.cpp

+ 3 - 1
CMakeLists.txt

@@ -53,8 +53,9 @@ set(LIBDATACHANNEL_SOURCES
 	${CMAKE_CURRENT_SOURCE_DIR}/src/peerconnection.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/rtc.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/sctptransport.cpp
-	${CMAKE_CURRENT_SOURCE_DIR}/src/tls.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/threadpool.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/tls.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/track.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/processor.cpp
 )
 
@@ -83,6 +84,7 @@ set(LIBDATACHANNEL_HEADERS
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/reliability.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtc.h
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtc.hpp
+	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/track.hpp
 	${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/websocket.hpp
 )
 

+ 2 - 0
include/rtc/include.hpp

@@ -64,6 +64,8 @@ const uint16_t DEFAULT_SCTP_PORT = 5000; // SCTP port to use by default
 const size_t DEFAULT_MAX_MESSAGE_SIZE = 65536;    // Remote max message size if not specified in SDP
 const size_t LOCAL_MAX_MESSAGE_SIZE = 256 * 1024; // Local max message size
 
+const size_t RECV_QUEUE_LIMIT = 1024 * 1024; // Max per-channel queue size
+
 const int THREADPOOL_SIZE = 4; // Number of threads in the global thread pool
 
 // overloaded helper

+ 1 - 1
include/rtc/message.hpp

@@ -73,7 +73,7 @@ message_ptr make_message(binary &&data, Message::Type type = Message::Binary,
 
 message_ptr make_message(message_variant data);
 
-std::optional<message_variant> to_variant(Message &&message);
+message_variant to_variant(Message &&message);
 
 } // namespace rtc
 

+ 6 - 0
include/rtc/queue.hpp

@@ -39,6 +39,7 @@ public:
 
 	void stop();
 	bool empty() const;
+	bool full() const;
 	size_t size() const;   // elements
 	size_t amount() const; // amount
 	void push(T element);
@@ -80,6 +81,11 @@ template <typename T> bool Queue<T>::empty() const {
 	return mQueue.empty();
 }
 
+template <typename T> bool Queue<T>::full() const {
+	std::lock_guard lock(mMutex);
+	return mQueue.size() >= mLimit;
+}
+
 template <typename T> size_t Queue<T>::size() const {
 	std::lock_guard lock(mMutex);
 	return mQueue.size();

+ 70 - 0
include/rtc/track.hpp

@@ -0,0 +1,70 @@
+/**
+ * Copyright (c) 2020 Paul-Louis Ageneau
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef RTC_TRACK_H
+#define RTC_TRACK_H
+
+#include "channel.hpp"
+#include "include.hpp"
+#include "message.hpp"
+#include "queue.hpp"
+
+#include <atomic>
+#include <variant>
+
+namespace rtc {
+
+class PeerConnection;
+class DtlsSrtpTransport;
+
+class Track final : public std::enable_shared_from_this<Track>, public Channel {
+public:
+	Track(string mid, std::shared_ptr<DtlsSrtpTransport> transport = nullptr);
+	~Track() = default;
+
+	string mid() const;
+
+	void close(void) override;
+	bool send(message_variant data) override;
+	bool send(const byte *data, size_t size);
+
+	bool isOpen(void) const override;
+	bool isClosed(void) const override;
+	size_t maxMessageSize() const override;
+
+	// Extended API
+	size_t availableAmount() const override;
+	std::optional<message_variant> receive() override;
+
+private:
+	void open(std::shared_ptr<DtlsSrtpTransport> transport);
+	bool outgoing(message_ptr message);
+	void incoming(message_ptr message);
+
+	const string mMid;
+	std::weak_ptr<DtlsSrtpTransport> mDtlsSrtpTransport;
+	std::atomic<bool> mIsClosed = false;
+
+	Queue<message_ptr> mRecvQueue;
+
+	friend class PeerConnection;
+};
+
+} // namespace rtc
+
+#endif

+ 6 - 9
src/datachannel.cpp

@@ -72,8 +72,6 @@ struct CloseMessage {
 };
 #pragma pack(pop)
 
-const size_t RECV_QUEUE_LIMIT = 1024 * 1024; // 1 MiB
-
 DataChannel::DataChannel(weak_ptr<PeerConnection> pc, unsigned int stream, string label,
                          string protocol, Reliability reliability)
     : mPeerConnection(pc), mStream(stream), mLabel(std::move(label)),
@@ -130,10 +128,9 @@ std::optional<message_variant> DataChannel::receive() {
 			auto raw = reinterpret_cast<const uint8_t *>(message->data());
 			if (!message->empty() && raw[0] == MESSAGE_CLOSE)
 				remoteClose();
-			continue;
+		} else {
+			return to_variant(std::move(*message));
 		}
-		if (auto variant = to_variant(std::move(*message)))
-			return variant;
 	}
 
 	return nullopt;
@@ -201,12 +198,12 @@ bool DataChannel::outgoing(message_ptr message) {
 	if (mIsClosed)
 		throw std::runtime_error("DataChannel is closed");
 
-	if (message->size() > maxMessageSize())
-		throw std::runtime_error("Message size exceeds limit");
-
 	auto transport = mSctpTransport.lock();
 	if (!transport)
-		throw std::runtime_error("DataChannel has no transport");
+		throw std::runtime_error("DataChannel transport is not open");
+
+	if (message->size() > maxMessageSize())
+		throw std::runtime_error("Message size exceeds limit");
 
 	// Before the ACK has been received on a DataChannel, all messages must be sent ordered
 	message->reliability = mIsOpen ? mReliability : nullptr;

+ 3 - 6
src/message.cpp

@@ -48,15 +48,12 @@ message_ptr make_message(message_variant data) {
 	    std::move(data));
 }
 
-std::optional<message_variant> to_variant(Message &&message) {
+message_variant to_variant(Message &&message) {
 	switch (message.type) {
 	case Message::String:
-		return std::make_optional(
-		    string(reinterpret_cast<const char *>(message.data()), message.size()));
-	case Message::Binary:
-		return std::make_optional(std::move(message));
+		return string(reinterpret_cast<const char *>(message.data()), message.size());
 	default:
-		return nullopt;
+		return message;
 	}
 }
 

+ 93 - 0
src/track.cpp

@@ -0,0 +1,93 @@
+/**
+ * Copyright (c) 2020 Paul-Louis Ageneau
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "track.hpp"
+#include "dtlssrtptransport.hpp"
+#include "include.hpp"
+
+namespace rtc {
+
+using std::shared_ptr;
+using std::weak_ptr;
+
+Track::Track(string mid, shared_ptr<DtlsSrtpTransport> transport)
+    : mMid(std::move(mid)), mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {
+
+	if (transport)
+		open(transport);
+}
+
+string Track::mid() const { return mMid; }
+
+void Track::close() {
+	mIsClosed = true;
+	resetCallbacks();
+}
+
+bool Track::send(message_variant data) { return outgoing(make_message(std::move(data))); }
+
+bool Track::send(const byte *data, size_t size) {
+	return outgoing(std::make_shared<Message>(data, data + size, Message::Binary));
+}
+
+std::optional<message_variant> Track::receive() {
+	if (!mRecvQueue.empty())
+		return to_variant(std::move(**mRecvQueue.pop()));
+
+	return nullopt;
+}
+
+bool Track::isOpen(void) const { return !mIsClosed && mDtlsSrtpTransport.lock(); }
+
+bool Track::isClosed(void) const { return mIsClosed; }
+
+size_t Track::maxMessageSize() const {
+	return 65535 - 12 - 4; // SRTP/UDP
+}
+
+size_t Track::availableAmount() const { return mRecvQueue.amount(); }
+
+void Track::open(shared_ptr<DtlsSrtpTransport> transport) { mDtlsSrtpTransport = transport; }
+
+bool Track::outgoing(message_ptr message) {
+	if (mIsClosed)
+		throw std::runtime_error("Track is closed");
+
+	auto transport = mDtlsSrtpTransport.lock();
+	if (!transport)
+		throw std::runtime_error("Track transport is not open");
+
+	if (message->size() > maxMessageSize())
+		throw std::runtime_error("Message size exceeds limit");
+
+	return transport->send(message);
+}
+
+void Track::incoming(message_ptr message) {
+	if (!message)
+		return;
+
+	// Tail drop if queue is full
+	if (mRecvQueue.full())
+		return;
+
+	mRecvQueue.push(message);
+	triggerAvailable(mRecvQueue.size());
+}
+
+} // namespace rtc

+ 8 - 6
src/websocket.cpp

@@ -38,7 +38,8 @@ namespace rtc {
 using std::shared_ptr;
 
 WebSocket::WebSocket(std::optional<Configuration> config)
-    : mConfig(config ? std::move(*config) : Configuration()) {
+    : mConfig(config ? std::move(*config) : Configuration()),
+      mRecvQueue(RECV_QUEUE_LIMIT, message_size_func) {
 	PLOG_VERBOSE << "Creating WebSocket";
 }
 
@@ -109,10 +110,11 @@ bool WebSocket::isClosed() const { return mState == State::Closed; }
 size_t WebSocket::maxMessageSize() const { return DEFAULT_MAX_MESSAGE_SIZE; }
 
 std::optional<message_variant> WebSocket::receive() {
-	while (!mRecvQueue.empty())
-		if (auto variant = to_variant(std::move(**mRecvQueue.pop())))
-			return variant;
-
+	while (!mRecvQueue.empty()) {
+		auto message = *mRecvQueue.pop();
+		if (message->type != Message::Control)
+			return to_variant(std::move(*message));
+	}
 	return nullopt;
 }
 
@@ -319,6 +321,6 @@ void WebSocket::closeTransports() {
 	});
 }
 
-} // namespace rtc
+	} // namespace rtc
 
 #endif